6.4.4 logstash配置
针对公司不同的日志类型,日志来源以及日志处理方式,logstash的配置也不同
1. app日志直接打kafka
app > kafka > logstash > elasticsearch > kibana
首先app将日志直接输出到kafka中,topic为gen_log_valid
然后由logstash从kafka中fetch日志,经过处理
最后sink到elasticsearch中保存,kibana中得以展现。
针对于不同语言的应用程序,打日志的手段不一样,但大致相同:
定义log formatter,将日志格式化为json
定义log appender,将日志输出一份到kafka
1.1. 考虑
中间引入kafka作为broker,方便日志组件的运维,理论上只要保证kafka的可用性,日志就不会丢
使用json作为日志格式只要有几方面考虑:
便于做日志管理
便于使用日志做业务方面的统计分析
将日志规范提前,可以简化logstash的配置
elasticsearch原生就是文档类型的
无需使用grok提取,解放logstash计算性能,也无需根据格式变更而变更logstash配置
app直接打kafka,如果app到kafka组件故障,有可能会影响业务,这完全看app对日志部分的处理逻辑
2. app日志打本地文件
app > logfile > filebeat [> kafka] > logstash > elasticsearch > kibana
其中kafka为可选组件
与1中的过程类似,差别就是通过log agent组件filebeat去收集app本地的日志,然后输出到kafka中。
2.1. 考虑
相比于1,可以无需关心kafka appender,也就和kafka解关联,也算对app更少的侵入吧。
3. 其他日志
其他日志,比如syslog等,都有现成的配置,现成的grok规则配置
4. logstash关键配置示例
对于logstash的配置,这是使用按照日志流转阶段和插件类型组合命名配置文件,最终读取配置文件,会将配置合并。
4.1. kafka input实例
prod_output_es:
input {
kafka {
id => "input_kafka_json_pubg_01"
group_id => "lgs-welog02cn-new"
bootstrap_servers => "kfk-ppubg01cn-01.svc.example.net:9092,kfk-ppubg01cn-02.svc.example.net:9092,kfk-ppubg01cn-03.svc.example.net:9092"
topics => [
"gen_log_valid"
]
tags => [
"input_kafka_json_pubg"
]
codec => "plain"
decorate_events => true
auto_offset_reset => "latest"
consumer_threads => 2
}
kafka {
id => "input_kafka_json_pubg_02"
group_id => "lgs-welog02cn-new"
bootstrap_servers => "kfk-ppubg01cn-01.svc.example.net:9092,kfk-ppubg01cn-02.svc.example.net:9092,kfk-ppubg01cn-03.svc.example.net:9092"
topics => [
"examplec-logs-topic"
]
tags => [
"input_kafka_json_pubg"
]
codec => "plain"
decorate_events => true
auto_offset_reset => "latest"
consumer_threads => 2
}
kafka {
id => "input_kafka_json_bi"
group_id => "lgs-welog02cn-new"
bootstrap_servers => "10.40.83.90:9092,10.40.14.12:9092,10.40.75.137:9092"
topics => [
"T1StatActivation",
"T1StatViewPage",
"tick_call_record_1",
"T1StatActionEvent",
"device_info"
]
tags => [
"input_kafka_json_bi"
]
codec => "plain"
decorate_events => true
auto_offset_reset => "earliest"
}
kafka {
id => "input_kafka_json_shanxin"
group_id => "lgs-welog02cn-new"
bootstrap_servers => "10.49.48.129:9092"
topics => [
"gen_log_valid"
]
tags => [
"input_kafka_json_shanxin"
]
codec => "plain"
decorate_events => true
auto_offset_reset => "latest"
}
}
# 整理日志,整理完的日志保证具有 1.增加输出方式 2.增加index前缀
filter {
#敏感信息过滤, 类似身份证号码、手机号码全部替换
mutate {
rename => ["message", "_message"]
gsub => [
"_message", "(\D[2-9]\d{2})\d{12}(\d{4}\D)", '\1000000000000\2',
"_message", "(\D[1-9][0-9]{2})[0-9]{3}[12][089][0-9]{2}[01][0-9][0123][0-9]([0-9]{3}[0-9Xx]\D)", '\100000000000\2',
"_message", "(\D[2-9]\d{2})\d{9}(\d{4}\D)", '\1000000000\2',
"_message", "(\D1[3-9]\d)[0-9]{4}([0-9][0-9]{3}\D)", '\10000\2'
]
}
json {
source => "_message"
}
mutate {
remove_field => "_message"
}
# 如果日志来自于kafka,tags加上来源topic
#if [@metadata][kafka] {
# mutate {
# add_field => { "kafka_topic" => "%{[@metadata][kafka][topic]}" }
# }
#}
# 丢弃不含有department字段的数据
if !([department]) {
drop { }
}
# 一般日志处理
else if [department] in [
"ambition",
"capital",
"consumer-nginx",
"consumer-frontend",
"crawler4j",
"data4j",
"data4n",
"databrazil",
"datahub",
"mall",
"nano",
"nearprime",
"o2o",
"examplec",
"weops",
"brazil_open",
"market",
"debtnano",
"spacebox",
"autofinance",
"kong-weops",
"datacenter",
"datacenter-nginx",
"shandun"
] {
mutate {
add_field => { "[@metadata][index_prefix]" => "logstash-%{department}" }
}
}
# 新接入日志采用标准department及logtype形式
else if [department] in [
"vdba",
"histore",
"bi",
"model",
"consumer.v2",
"devops.v2",
"coresystem",
"future",
"example2",
"capital.v2",
"security",
"primeloan",
"frontend",
"open",
"mongodb",
"nano.v2",
"data",
"shanxin",
"traffic",
"o2o.v2",
"prime",
"public"
] {
# 如果不存在logtype字段或为空,则为logtype字段赋默认值
if ![logtype] or [logtype] == "" {
mutate {
add_field => { "[logtype]" => "common" }
}
}
mutate {
add_field => { "[@metadata][index_prefix]" => "logstash-%{department}-%{logtype}-prod" }
}
}
# bi日志单独处理,type字段需要转换为小写
else if [department] == "bi-tj-data" {
mutate {
lowercase => ["type"]
add_field => { "[@metadata][index_prefix]" => "logstash-bi-%{type}" }
}
}
# k8s日志单独处理
else if [department] in [
"k8s-ops",
"k8s-node"
] {
# 如果不存在logtype字段或为空,则为logtype字段赋默认值
if ![logtype] or [logtype] == "" {
mutate {
add_field => { "[logtype]" => "common" }
}
}
mutate {
add_field => { "[@metadata][index_prefix]" => "logstash-%{department}-%{logtype}-prod" }
}
}
# k8s标准输出
else if [department] == "k8s-std" {
mutate {
add_field => { "[@metadata][index_prefix]" => "logstash-%{department}-%{stream}-prod" }
}
}
# 避免其他漏网日志输出
else {
drop { }
}
# 设置后缀
if [department] == "vdba" {
if [logtype] == "audit-proxy" and [month_tag] in [
"1",
"2",
"3",
"4",
"5",
"6",
"7",
"8",
"9",
"10",
"11",
"12"
] {
mutate {
add_field => { "[@metadata][index_suffix]" => "%{+YYYY.MM}-%{month_tag}" }
}
}
else {
mutate {
add_field => { "[@metadata][index_suffix]" => "%{+YYYY.MM}" }
}
}
}
else {
mutate {
add_field => { "[@metadata][index_suffix]" => "%{+YYYY.MM.dd}" }
}
}
}
output {
elasticsearch {
hosts => [
"es-welog02cn-p004.pek3.example.net:9200",
"es-welog02cn-p005.pek4.example.net:9200"
]
user => "logstash_internal"
password => "CBd2GtRYvWvH8qJ6Vd8y"
index => "%{[@metadata][index_prefix]}-%{[@metadata][index_suffix]}"
manage_template => false
sniffing => true
}
}
dev_output_es
input {
kafka {
id => "input_kafka_json_dev"
group_id => "lgs-welog02cn-dev"
bootstrap_servers => "kfk-tpubg01cn-01.svc.example.net:9092"
topics => [
"gen_log_valid"
]
tags => [
"input_kafka_json_dev"
]
codec => "json"
decorate_events => true
}
kafka {
id => "input_kafka_json_dev_audit"
group_id => "lgs-welog02cn-dev"
bootstrap_servers => "kfk-tpubg01cn-01.svc.example.net:9092"
topics => [
"pii-audit-log"
]
tags => [
"input_kafka_json_dev"
]
codec => "json"
decorate_events => true
}
}
# 整理日志,整理完的日志保证具有 1.增加输出方式 2.增加index前缀
filter {
# 丢弃不含有department字段的数据
if !([department]) {
drop { }
}
# # 一般日志处理
# else if [department] in [
# "capital",
# "capitalttt" # 占位用
# ] {
# mutate {
# add_field => { "[@metadata][index_prefix]" => "logstash-debug-%{department}" }
# }
# }
# 新接入日志采用标准department及logtype形式
else if [department] in [
"histore",
"consumer.v2",
"devops.v2",
"vdba",
"autofinance",
"data4j",
"capital",
"capital.v2",
"primeloan",
"security",
"model",
"mongodb",
"nano.v2",
"data",
"traffic",
"example2",
"pii",
"public"
] {
# 如果不存在logtype字段或为空,则为logtype字段赋默认值
if ![logtype] or [logtype] == "" {
mutate {
add_field => { "[logtype]" => "common" }
}
}
mutate {
add_field => { "[@metadata][index_prefix]" => "logstash-%{department}-%{logtype}" }
}
}
# k8s日志单独处理
else if [department] in [
"k8s-ops",
"k8s-node"
] {
# 如果不存在logtype字段或为空,则为logtype字段赋默认值
if ![logtype] or [logtype] == "" {
mutate {
add_field => { "[logtype]" => "common" }
}
}
mutate {
add_field => { "[@metadata][index_prefix]" => "logstash-%{department}-%{logtype}" }
}
}
# k8s标准输出
else if [department] == "k8s-std" {
mutate {
add_field => { "[@metadata][index_prefix]" => "logstash-%{department}-%{stream}" }
}
}
# 避免其他漏网日志输出
else {
drop { }
}
}
output {
elasticsearch {
hosts => [
"es-welog02cn-p004.pek3.example.net:9200",
"es-welog02cn-p005.pek4.example.net:9200"
]
user => "logstash_internal"
password => "CBd2GtRYvWvH8qJ6Vd8y"
index => "%{[@metadata][index_prefix]}-dev-%{+YYYY.MM.dd}"
manage_template => false
sniffing => true
}
}
prod_output_kafka_json
input {
tcp {
id => "input_tcp_json_50000"
port => 50000
tags => ["input_tcp_json_50000"]
codec => "json"
}
}
filter {
# kong只支持tcp方式输出日志,字段不可编辑,单独处理
mutate {
add_field => { "[department]" => "devops.v2" }
add_field => { "[logtype]" => "kong" }
remove_field => [ "[api][methods]" ]
}
}
output {
kafka {
topic_id => "gen_log_valid"
bootstrap_servers => "kfk-ppubg01cn-01.svc.example.net:9092,kfk-ppubg01cn-02.svc.example.net:9092,kfk-ppubg01cn-03.svc.example.net:9092"
codec => json
}
}
prod_output_kafka_plain
input {
kafka {
id => "input_kafka_json_tunaikita"
group_id => "lgs-tunaikita"
bootstrap_servers => "10.69.24.122:9092,10.69.24.123:9092,10.69.29.32:9092,10.69.29.34:9092"
topics => [
"crawler-kafka-topic-visor-auth-step-indonesia"
]
tags => [
"input_kafka_json_tunaikita_prod"
]
decorate_events => true
consumer_threads => 2
}
}
filter {
# 如果日志来自于kafka,tags加上来源topic
if [@metadata][kafka] {
mutate {
add_field => { "[@metadata][topic_id]" => "%{[@metadata][kafka][topic]}" }
}
}
}
output {
kafka {
topic_id => "%{[@metadata][topic_id]}"
bootstrap_servers => "kfk-ppubg01cn-01.svc.example.net:9092,kfk-ppubg01cn-02.svc.example.net:9092,kfk-ppubg01cn-03.svc.example.net:9092"
codec => plain {
format => "%{message}"
}
}
}
说明:
打tag说明日志来源,日志类型等信息
codec决定如何解析输入流日志,很重要
如果只是单纯的想将kafkaA的日志流转到kafkaB,那么不要指定codec,这样日志内部的字段并不会被解析,也就不会符合其他filter或者output的条件。
4.2. kafka output示例
这里主要用于kafka之间的日志同步,对应的场景,就是国外kafka同步到国内kafka然后走日志校验流程。
output {
if "sync_visor_from_brazil" in [tags] {
kafka {
topic_id => "prod-dc-visor-brazil"
bootstrap_servers => "kfk-ppubg01cn-01.svc.example.net:9092,kfk-ppubg01cn-02.svc.example.net:9092,kfk-ppubg01cn-03.svc.example.net:9092"
codec => plain {
format => "%{message}"
}
}
}
if "gen_log_from_br" in [tags] {
kafka {
topic_id => "gen_log_raw"
bootstrap_servers => "kfk-ppubg01cn-01.svc.example.net:9092,kfk-ppubg01cn-02.svc.example.net:9092,kfk-ppubg01cn-03.svc.example.net:9092"
codec => plain {
format => "%{message}"
}
}
}
}
说明:logstash会默认给流转的日志增加host以及时间戳,所以如果要保证原格式不动,需要提取message字段,另外如4.1.所说,kafka之间同步,codec不要指定,或者指定为plain。
4.3. filebeat input示例
input {
beats {
port => 5045
tags => ["beats_5045"]
}
}
用于接收filebeat直连logstash模式。
4.4. tcp input示例
input {
tcp {
port => 40000
tags => ["tcp_40000"]
codec => "json"
}
}
用于接收tcp直连logstash模式,其中一个应用就是kong的日志收集,使用tcp log
插件,不过需要注意的是kong日志中有个api.methods
类型不一致的问题,会导致日志进不来。
5. logstash的其他配置文件
5.1. logstash.yml
# node
node.name: lgs-example01cn-p001
path.data: /data/logstash/data
path.logs: /data/logstash/logs
path.config: /etc/logstash/conf.d
# pipeline
pipeline.workers: 8
pipeline.output.workers: 2
pipeline.batch.size: 1024
pipeline.batch.delay: 100
# http
http.host: 127.0.0.1
说明:在5.x以后,还有一个queue相关的参数,默认为memory。如果设置为persisted,可以使用磁盘作为队列存储,算是kafka的简单实现吧,生产上使用过一段时间,故障重启时会因为队列问题起不来,随意就不使用了。另外我们前置kafka队列,已经算是一层缓冲了,所以在logstash上做缓冲也就可有可无了。
5.2. jvm.options
-Xms3992m
-Xmx3992m
-XX:+UseParNewGC
-XX:+UseConcMarkSweepGC
-XX:CMSInitiatingOccupancyFraction=75
-XX:+UseCMSInitiatingOccupancyOnly
-XX:+DisableExplicitGC
-Djava.awt.headless=true
-Dfile.encoding=UTF-8
-XX:+HeapDumpOnOutOfMemoryError
说明:主要是堆内存设置,按实际情况来,无规定。
5.3. /etc/default/logstash
JAVACMD="/usr/bin/java"
LS_HOME="/usr/share/logstash"
LS_SETTINGS_DIR="/etc/logstash"
LS_PIDFILE="/var/run/logstash.pid"
LS_USER="logstash"
LS_GROUP="logstash"
LS_GC_LOG_FILE="/var/log/logstash/gc.log"
LS_OPEN_FILES="16384"
LS_NICE="19"
SERVICE_NAME="logstash"
SERVICE_DESCRIPTION="logstash"
说明:
有人说:装了jdk,还是没法启动logstash,什么问题?
,配置文件里配置了JAVACMD,自装的位置不匹配,修改即可。
有人说:为啥我的CPU使用率nice部分很高,nice不是和优先级相关的吗?
,首先需要理解nice cpu的含义,低优先级程序运行的cpu使用率
,而该配置文件中,指定了LS_NICE="19"
,优先级很低。
最后更新于
这有帮助吗?