Maison > Opération et maintenance > Nginx > Comment importer les journaux nginx dans elasticsearch

Comment importer les journaux nginx dans elasticsearch

王林
Libérer: 2023-05-13 12:52:11
avant
753 Les gens l'ont consulté

Collectez les journaux nginx via filebeat et transférez-les vers logstash. Après traitement par logstash, ils sont écrits dans elasticsearch. Filebeat est uniquement responsable du travail de collecte, tandis que logstash effectue le formatage des journaux, le remplacement des données, le fractionnement et la création d'index après avoir écrit les journaux dans elasticsearch.

1. Configurez le format de journal nginx

log_format main    '$remote_addr $http_x_forwarded_for [$time_local] $server_name $request ' 
            '$status $body_bytes_sent $http_referer ' 
            '"$http_user_agent" '
            '"$connection" '
            '"$http_cookie" '
            '$request_time '
            '$upstream_response_time';
Copier après la connexion

2. Installez et configurez filebeat, activez le module nginx

tar -zxvf filebeat-6.2.4-linux-x86_64.tar.gz -c /usr/local
cd /usr/local;ln -s filebeat-6.2.4-linux-x86_64 filebeat
cd /usr/local/filebeat
Copier après la connexion

Activez le module nginx

./filebeat modules enable nginx
Copier après la connexion

Affichez le module

./filebeat modules list
Copier après la connexion

Créez un fichier de configuration

vim /usr/local/filebeat/blog_module_logstash.yml
filebeat.modules:
- module: nginx
 access:
  enabled: true
  var.paths: ["/home/weblog/blog.cnfol.com_access.log"]
 #error:
 # enabled: true
 # var.paths: ["/home/weblogerr/blog.cnfol.com_error.log"]


output.logstash:
 hosts: ["192.168.15.91:5044"]
Copier après la connexion

Démarrez le fichier. battre

./filebeat -c blog_module_logstash.yml -e
Copier après la connexion

3. Configurez logstash

tar -zxvf logstash-6.2.4.tar.gz /usr/local
cd /usr/local;ln -s logstash-6.2.4 logstash
创建一个nginx日志的pipline文件
cd /usr/local/logstash
Copier après la connexion

répertoire de modèles intégré de logstash

vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns
Copier après la connexion

éditez les modèles grok et ajoutez un modèle régulier qui prend en charge plusieurs IP

forword (?:%{ipv4}[,]?[ ]?)+|%{word}
Copier après la connexion

grok officiel

#

créer un fichier de configuration de pipeline logstash

#input {
# stdin {}
#}
# 从filebeat接受数据
input {
 beats {
 port => 5044
 host => "0.0.0.0"
 }
}

filter {
 # 添加一个调试的开关
 mutate{add_field => {"[@metadata][debug]"=>true}}
 grok {
 # 过滤nginx日志
 #match => { "message" => "%{nginxaccess_test2}" }
 #match => { "message" => &#39;%{iporhost:clientip} # (?<http_x_forwarded_for>[^\#]*) # \[%{httpdate:[@metadata][webtime]}\] # %{notspace:hostname} # %{word:verb} %{uripathparam:request} http/%{number:httpversion} # %{number:response} # (?:%{number:bytes}|-) # (?:"(?:%{notspace:referrer}|-)"|%{notspace:referrer}|-) # (?:"(?<http_user_agent>[^#]*)") # (?:"(?:%{number:connection}|-)"|%{number:connection}|-) # (?:"(?<cookies>[^#]*)") # %{number:request_time:float} # (?:%{number:upstream_response_time:float}|-)&#39; }
 #match => { "message" => &#39;(?:%{iporhost:clientip}|-) (?:%{two_ip:http_x_forwarded_for}|%{ipv4:http_x_forwarded_for}|-) \[%{httpdate:[@metadata][webtime]}\] (?:%{hostname:hostname}|-) %{word:method} %{uripathparam:request} http/%{number:httpversion} %{number:response} (?:%{number:bytes}|-) (?:"(?:%{notspace:referrer}|-)"|%{notspace:referrer}|-) %{qs:agent} (?:"(?:%{number:connection}|-)"|%{number:connection}|-) (?:"(?<cookies>[^#]*)") %{number:request_time:float} (?:%{number:upstream_response_time:float}|-)&#39; }
    match => { "message" => &#39;(?:%{iporhost:clientip}|-) %{forword:http_x_forwarded_for} \[%{httpdate:[@metadata][webtime]}\] (?:%{hostname:hostname}|-) %{word:method} %{uripathparam:request} http/%{number:httpversion} %{number:response} (?:%{number:bytes}|-) (?:"(?:%{notspace:referrer}|-)"|%{notspace:referrer}|-) %{qs:agent} (?:"(?:%{number:connection}|-)"|%{number:connection}|-) %{qs:cookie} %{number:request_time:float} (?:%{number:upstream_response_time:float}|-)&#39; }
 }
 # 将默认的@timestamp(beats收集日志的时间)的值赋值给新字段@read_tiimestamp
 ruby { 
 #code => "event.set(&#39;@read_timestamp&#39;,event.get(&#39;@timestamp&#39;))"
 #将时区改为东8区
 code => "event.set(&#39;@read_timestamp&#39;,event.get(&#39;@timestamp&#39;).time.localtime + 8*60*60)"
 }
 # 将nginx的日志记录时间格式化
 # 格式化时间 20/may/2015:21:05:56 +0000
 date {
 locale => "en"
 match => ["[@metadata][webtime]","dd/mmm/yyyy:hh:mm:ss z"]
 }
 # 将bytes字段由字符串转换为数字
 mutate {
 convert => {"bytes" => "integer"}
 }
 # 将cookie字段解析成一个json
 #mutate {
 # gsub => ["cookies",&#39;\;&#39;,&#39;,&#39;]
 #} 
 # 如果有使用到cdn加速http_x_forwarded_for会有多个ip,第一个ip是用户真实ip
 if[http_x_forwarded_for] =~ ", "{
     ruby {
         code => &#39;event.set("http_x_forwarded_for", event.get("http_x_forwarded_for").split(",")[0])&#39;
        }
    }
 # 解析ip,获得ip的地理位置
 geoip {
 source => "http_x_forwarded_for"
 # # 只获取ip的经纬度、国家、城市、时区
 fields => ["location","country_name","city_name","region_name"] 
 }
 # 将agent字段解析,获得浏览器、系统版本等具体信息
 useragent {
 source => "agent"
 target => "useragent"
 }
 #指定要删除的数据
 #mutate{remove_field=>["message"]}
 # 根据日志名设置索引名的前缀
 ruby {
 code => &#39;event.set("@[metadata][index_pre]",event.get("source").split("/")[-1])&#39;
 } 
 # 将@timestamp 格式化为2019.04.23
 ruby {
 code => &#39;event.set("@[metadata][index_day]",event.get("@timestamp").time.localtime.strftime("%y.%m.%d"))&#39;
 }
 # 设置输出的默认索引名
 mutate {
 add_field => {
  #"[@metadata][index]" => "%{@[metadata][index_pre]}_%{+yyyy.mm.dd}"
  "[@metadata][index]" => "%{@[metadata][index_pre]}_%{@[metadata][index_day]}"
 }
 }
 # 将cookies字段解析成json
# mutate {
# gsub => [
#  "cookies", ";", ",",
#  "cookies", "=", ":"
# ]
# #split => {"cookies" => ","}
# }
# json_encode {
# source => "cookies"
# target => "cookies_json"
# }
# mutate {
# gsub => [
#  "cookies_json", &#39;,&#39;, &#39;","&#39;,
#  "cookies_json", &#39;:&#39;, &#39;":"&#39;
# ]
# }
# json {
# source => "cookies_json"
# target => "cookies2"
# }
 # 如果grok解析存在错误,将错误独立写入一个索引
 if "_grokparsefailure" in [tags] {
 #if "_dateparsefailure" in [tags] {
 mutate {
  replace => {
  #"[@metadata][index]" => "%{@[metadata][index_pre]}_failure_%{+yyyy.mm.dd}"
  "[@metadata][index]" => "%{@[metadata][index_pre]}_failure_%{@[metadata][index_day]}"
  }
 }
 # 如果不存在错误就删除message
 }else{
 mutate{remove_field=>["message"]}
 }
}

output {
 if [@metadata][debug]{
 # 输出到rubydebuyg并输出metadata
 stdout{codec => rubydebug{metadata => true}}
 }else{
 # 将输出内容转换成 "."
 stdout{codec => dots} 
 # 将输出到指定的es
 elasticsearch {
  hosts => ["192.168.15.160:9200"]
  index => "%{[@metadata][index]}"
  document_type => "doc"
 } 
 }
}
Copier après la connexion

start logstash

nohup bin/logstash -f test_pipline2.conf &
Copier après la connexion

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Étiquettes associées:
source:yisu.com
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal