6.4.4 logstash配置

针对公司不同的日志类型,日志来源以及日志处理方式,logstash的配置也不同

1. app日志直接打kafka

app > kafka > logstash > elasticsearch > kibana

首先app将日志直接输出到kafka中,topic为gen_log_valid

然后由logstash从kafka中fetch日志,经过处理

最后sink到elasticsearch中保存,kibana中得以展现。

针对于不同语言的应用程序,打日志的手段不一样,但大致相同:

  1. 定义log formatter,将日志格式化为json

  2. 定义log appender,将日志输出一份到kafka

1.1. 考虑

  • 中间引入kafka作为broker,方便日志组件的运维,理论上只要保证kafka的可用性,日志就不会丢

  • 使用json作为日志格式只要有几方面考虑:

    • 便于做日志管理

    • 便于使用日志做业务方面的统计分析

    • 将日志规范提前,可以简化logstash的配置

    • elasticsearch原生就是文档类型的

    • 无需使用grok提取,解放logstash计算性能,也无需根据格式变更而变更logstash配置

  • app直接打kafka,如果app到kafka组件故障,有可能会影响业务,这完全看app对日志部分的处理逻辑

2. app日志打本地文件

app > logfile > filebeat [> kafka] > logstash > elasticsearch > kibana

其中kafka为可选组件

与1中的过程类似,差别就是通过log agent组件filebeat去收集app本地的日志,然后输出到kafka中。

2.1. 考虑

  • 相比于1,可以无需关心kafka appender,也就和kafka解关联,也算对app更少的侵入吧。

3. 其他日志

其他日志,比如syslog等,都有现成的配置,现成的grok规则配置

4. logstash关键配置示例

对于logstash的配置,这是使用按照日志流转阶段和插件类型组合命名配置文件,最终读取配置文件,会将配置合并。

4.1. kafka input实例

prod_output_es:

input {
    kafka {
        id => "input_kafka_json_pubg_01"
        group_id => "lgs-welog02cn-new"
        bootstrap_servers => "kfk-ppubg01cn-01.svc.example.net:9092,kfk-ppubg01cn-02.svc.example.net:9092,kfk-ppubg01cn-03.svc.example.net:9092"
        topics => [
            "gen_log_valid"
        ]
        tags => [
            "input_kafka_json_pubg"
        ]
        codec => "plain"
        decorate_events => true
        auto_offset_reset => "latest"
        consumer_threads => 2
    }
    kafka {
        id => "input_kafka_json_pubg_02"
        group_id => "lgs-welog02cn-new"
        bootstrap_servers => "kfk-ppubg01cn-01.svc.example.net:9092,kfk-ppubg01cn-02.svc.example.net:9092,kfk-ppubg01cn-03.svc.example.net:9092"
        topics => [
            "examplec-logs-topic"
        ]
        tags => [
            "input_kafka_json_pubg"
        ]
        codec => "plain"
        decorate_events => true
        auto_offset_reset => "latest"
        consumer_threads => 2
    }
    kafka {
        id => "input_kafka_json_bi"
        group_id => "lgs-welog02cn-new"
        bootstrap_servers => "10.40.83.90:9092,10.40.14.12:9092,10.40.75.137:9092"
        topics => [
            "T1StatActivation",
            "T1StatViewPage",
            "tick_call_record_1",
            "T1StatActionEvent",
            "device_info"
        ]
        tags => [
            "input_kafka_json_bi"
        ]
        codec => "plain"
        decorate_events => true
        auto_offset_reset => "earliest"
    }
    kafka {
        id => "input_kafka_json_shanxin"
        group_id => "lgs-welog02cn-new"
        bootstrap_servers => "10.49.48.129:9092"
        topics => [
            "gen_log_valid"
        ]
        tags => [
            "input_kafka_json_shanxin"
        ]
        codec => "plain"
        decorate_events => true
        auto_offset_reset => "latest"
    }

}
# 整理日志,整理完的日志保证具有 1.增加输出方式 2.增加index前缀
filter {
      #敏感信息过滤, 类似身份证号码、手机号码全部替换
    mutate {
      rename => ["message", "_message"]
      gsub => [  
        "_message", "(\D[2-9]\d{2})\d{12}(\d{4}\D)", '\1000000000000\2',
        "_message", "(\D[1-9][0-9]{2})[0-9]{3}[12][089][0-9]{2}[01][0-9][0123][0-9]([0-9]{3}[0-9Xx]\D)", '\100000000000\2',
        "_message", "(\D[2-9]\d{2})\d{9}(\d{4}\D)", '\1000000000\2',
        "_message", "(\D1[3-9]\d)[0-9]{4}([0-9][0-9]{3}\D)", '\10000\2'
      ]
    }

    json {
      source => "_message"
    }

    mutate {
      remove_field => "_message"
    }

    # 如果日志来自于kafka,tags加上来源topic
    #if [@metadata][kafka] {
    #    mutate { 
    #        add_field => { "kafka_topic" => "%{[@metadata][kafka][topic]}" }
    #    }
    #}

    # 丢弃不含有department字段的数据
    if !([department]) {
        drop { }
    }

    # 一般日志处理
    else if [department] in [
        "ambition",
        "capital",
        "consumer-nginx",
        "consumer-frontend",
        "crawler4j",
        "data4j",
        "data4n",
        "databrazil",
        "datahub",
        "mall",
        "nano",
        "nearprime",
        "o2o",
        "examplec",
        "weops",
        "brazil_open",
        "market",
        "debtnano",
        "spacebox",
        "autofinance",
        "kong-weops",
        "datacenter",
        "datacenter-nginx",
        "shandun"
    ] {
        mutate {
            add_field => { "[@metadata][index_prefix]" => "logstash-%{department}" }
        }
    }

    # 新接入日志采用标准department及logtype形式
    else if [department] in [
        "vdba",
        "histore",
        "bi",
        "model",
        "consumer.v2",
        "devops.v2",
        "coresystem",
        "future",
        "example2",
        "capital.v2",
        "security",
        "primeloan",
        "frontend",
        "open",
        "mongodb",
        "nano.v2",
        "data",
        "shanxin",
        "traffic",
        "o2o.v2",
        "prime",
        "public"
    ] {
        # 如果不存在logtype字段或为空,则为logtype字段赋默认值
        if ![logtype] or [logtype] == "" {
            mutate { 
                add_field => { "[logtype]" => "common" } 
            } 
        }
        mutate { 
            add_field => { "[@metadata][index_prefix]" => "logstash-%{department}-%{logtype}-prod" }
        }
    }

    # bi日志单独处理,type字段需要转换为小写
    else if [department] == "bi-tj-data" {
        mutate {
            lowercase => ["type"]
            add_field => { "[@metadata][index_prefix]" => "logstash-bi-%{type}" }
        }
    }

    # k8s日志单独处理
    else if [department] in [
        "k8s-ops",
        "k8s-node"
    ] {
        # 如果不存在logtype字段或为空,则为logtype字段赋默认值
        if ![logtype] or [logtype] == "" {
            mutate { 
                add_field => { "[logtype]" => "common" }
            }
        }
        mutate {
            add_field => { "[@metadata][index_prefix]" => "logstash-%{department}-%{logtype}-prod" }
        }
    }

    # k8s标准输出
    else if [department] == "k8s-std" {
        mutate {
            add_field => { "[@metadata][index_prefix]" => "logstash-%{department}-%{stream}-prod" }
        }
    }

    # 避免其他漏网日志输出
    else {
        drop { }
    }

    # 设置后缀
    if [department] == "vdba" {
        if [logtype] == "audit-proxy" and [month_tag] in [
            "1",
            "2",
            "3",
            "4",
            "5",
            "6",
            "7",
            "8",
            "9",
            "10",
            "11",
            "12"
        ] {
            mutate { 
                add_field => { "[@metadata][index_suffix]" => "%{+YYYY.MM}-%{month_tag}" }
            }
        }
        else {
            mutate { 
                add_field => { "[@metadata][index_suffix]" => "%{+YYYY.MM}" }
            }
        }
    }
    else {
        mutate { 
            add_field => { "[@metadata][index_suffix]" => "%{+YYYY.MM.dd}" }
        }
    }

}
output {
    elasticsearch {
        hosts => [
            "es-welog02cn-p004.pek3.example.net:9200",
            "es-welog02cn-p005.pek4.example.net:9200"
        ]
        user => "logstash_internal"
        password => "CBd2GtRYvWvH8qJ6Vd8y"
        index => "%{[@metadata][index_prefix]}-%{[@metadata][index_suffix]}"
        manage_template => false
        sniffing => true
    } 
}

dev_output_es

input {
    kafka {
        id => "input_kafka_json_dev"
        group_id => "lgs-welog02cn-dev"
        bootstrap_servers => "kfk-tpubg01cn-01.svc.example.net:9092"
        topics => [
            "gen_log_valid"
        ]
        tags => [
            "input_kafka_json_dev"
        ]
        codec => "json"
        decorate_events => true
    }
    kafka {
        id => "input_kafka_json_dev_audit"
        group_id => "lgs-welog02cn-dev"
        bootstrap_servers => "kfk-tpubg01cn-01.svc.example.net:9092"
        topics => [
            "pii-audit-log"
        ]
        tags => [
            "input_kafka_json_dev"
        ]
        codec => "json"
        decorate_events => true
    }
}
# 整理日志,整理完的日志保证具有 1.增加输出方式 2.增加index前缀
filter {

    # 丢弃不含有department字段的数据
    if !([department]) {
        drop { }
    }

#    # 一般日志处理
#    else if [department] in [
#        "capital",
#        "capitalttt" # 占位用
#    ] {
#        mutate {
#            add_field => { "[@metadata][index_prefix]" => "logstash-debug-%{department}" }
#        }
#    }

    # 新接入日志采用标准department及logtype形式
    else if [department] in [
        "histore",
        "consumer.v2",
        "devops.v2",
        "vdba",
        "autofinance",
        "data4j",
        "capital",
        "capital.v2",
        "primeloan",
        "security",
        "model",
        "mongodb",
        "nano.v2",
        "data",
        "traffic",
        "example2",
        "pii",
        "public"
    ] {
        # 如果不存在logtype字段或为空,则为logtype字段赋默认值
        if ![logtype] or [logtype] == "" {
            mutate { 
                add_field => { "[logtype]" => "common" }
            }
        }
        mutate {
            add_field => { "[@metadata][index_prefix]" => "logstash-%{department}-%{logtype}" }
        }
    }

    # k8s日志单独处理
    else if [department] in [
        "k8s-ops",
        "k8s-node"
    ] {
        # 如果不存在logtype字段或为空,则为logtype字段赋默认值
        if ![logtype] or [logtype] == "" {
            mutate { 
                add_field => { "[logtype]" => "common" }
            }
        }
        mutate {
            add_field => { "[@metadata][index_prefix]" => "logstash-%{department}-%{logtype}" }
        }
    }

    # k8s标准输出
    else if [department] == "k8s-std" {
        mutate {
            add_field => { "[@metadata][index_prefix]" => "logstash-%{department}-%{stream}" }
        }
    }

    # 避免其他漏网日志输出
    else {
        drop { }
    }

}
output {
    elasticsearch {
        hosts => [
            "es-welog02cn-p004.pek3.example.net:9200",
            "es-welog02cn-p005.pek4.example.net:9200"
        ]
        user => "logstash_internal"
        password => "CBd2GtRYvWvH8qJ6Vd8y"
        index => "%{[@metadata][index_prefix]}-dev-%{+YYYY.MM.dd}"
        manage_template => false
        sniffing => true
    } 
}

prod_output_kafka_json

input {
    tcp {
        id => "input_tcp_json_50000"
        port => 50000
        tags => ["input_tcp_json_50000"]
        codec => "json"
    }
}

filter {
    # kong只支持tcp方式输出日志,字段不可编辑,单独处理
    mutate {
        add_field => { "[department]" => "devops.v2" }
        add_field => { "[logtype]" => "kong" }
        remove_field => [ "[api][methods]" ]
    }
}

output {
    kafka {
        topic_id => "gen_log_valid"
        bootstrap_servers => "kfk-ppubg01cn-01.svc.example.net:9092,kfk-ppubg01cn-02.svc.example.net:9092,kfk-ppubg01cn-03.svc.example.net:9092"
        codec => json 
    }
}

prod_output_kafka_plain

input {
    kafka {
        id => "input_kafka_json_tunaikita"
        group_id => "lgs-tunaikita"
        bootstrap_servers => "10.69.24.122:9092,10.69.24.123:9092,10.69.29.32:9092,10.69.29.34:9092"
        topics => [
            "crawler-kafka-topic-visor-auth-step-indonesia"
        ]
        tags => [
            "input_kafka_json_tunaikita_prod"
        ]
        decorate_events => true
        consumer_threads => 2
    }
}

filter {

    # 如果日志来自于kafka,tags加上来源topic
    if [@metadata][kafka] {
        mutate { 
            add_field => { "[@metadata][topic_id]" => "%{[@metadata][kafka][topic]}" }
        }
    }
}

output {
    kafka {
        topic_id => "%{[@metadata][topic_id]}"
        bootstrap_servers => "kfk-ppubg01cn-01.svc.example.net:9092,kfk-ppubg01cn-02.svc.example.net:9092,kfk-ppubg01cn-03.svc.example.net:9092"
        codec => plain {
            format => "%{message}"
        }
    }
}

说明:

  • 打tag说明日志来源,日志类型等信息

  • codec决定如何解析输入流日志,很重要

如果只是单纯的想将kafkaA的日志流转到kafkaB,那么不要指定codec,这样日志内部的字段并不会被解析,也就不会符合其他filter或者output的条件。

4.2. kafka output示例

这里主要用于kafka之间的日志同步,对应的场景,就是国外kafka同步到国内kafka然后走日志校验流程。

output {
    if "sync_visor_from_brazil" in [tags] {
        kafka {
            topic_id => "prod-dc-visor-brazil"
            bootstrap_servers => "kfk-ppubg01cn-01.svc.example.net:9092,kfk-ppubg01cn-02.svc.example.net:9092,kfk-ppubg01cn-03.svc.example.net:9092"
            codec => plain {
                format => "%{message}"
           }
       }
   }
    if "gen_log_from_br" in [tags] {
        kafka {
            topic_id => "gen_log_raw"
            bootstrap_servers => "kfk-ppubg01cn-01.svc.example.net:9092,kfk-ppubg01cn-02.svc.example.net:9092,kfk-ppubg01cn-03.svc.example.net:9092"
            codec => plain {
                format => "%{message}"
           }
       }
   }
}

说明:logstash会默认给流转的日志增加host以及时间戳,所以如果要保证原格式不动,需要提取message字段,另外如4.1.所说,kafka之间同步,codec不要指定,或者指定为plain。

4.3. filebeat input示例

input {
    beats {
        port => 5045
        tags => ["beats_5045"]
   }
}

用于接收filebeat直连logstash模式。

4.4. tcp input示例

input {
    tcp {
        port => 40000
        tags => ["tcp_40000"]
        codec => "json"
   }
}

用于接收tcp直连logstash模式,其中一个应用就是kong的日志收集,使用tcp log插件,不过需要注意的是kong日志中有个api.methods类型不一致的问题,会导致日志进不来。

5. logstash的其他配置文件

5.1. logstash.yml

# node
node.name: lgs-example01cn-p001
path.data: /data/logstash/data
path.logs: /data/logstash/logs
path.config: /etc/logstash/conf.d

# pipeline
pipeline.workers: 8
pipeline.output.workers: 2
pipeline.batch.size: 1024
pipeline.batch.delay: 100

# http
http.host: 127.0.0.1

说明:在5.x以后,还有一个queue相关的参数,默认为memory。如果设置为persisted,可以使用磁盘作为队列存储,算是kafka的简单实现吧,生产上使用过一段时间,故障重启时会因为队列问题起不来,随意就不使用了。另外我们前置kafka队列,已经算是一层缓冲了,所以在logstash上做缓冲也就可有可无了。

5.2. jvm.options

-Xms3992m
-Xmx3992m
-XX:+UseParNewGC
-XX:+UseConcMarkSweepGC
-XX:CMSInitiatingOccupancyFraction=75
-XX:+UseCMSInitiatingOccupancyOnly
-XX:+DisableExplicitGC
-Djava.awt.headless=true
-Dfile.encoding=UTF-8
-XX:+HeapDumpOnOutOfMemoryError

说明:主要是堆内存设置,按实际情况来,无规定。

5.3. /etc/default/logstash

JAVACMD="/usr/bin/java"
LS_HOME="/usr/share/logstash"
LS_SETTINGS_DIR="/etc/logstash"
LS_PIDFILE="/var/run/logstash.pid"
LS_USER="logstash"
LS_GROUP="logstash"
LS_GC_LOG_FILE="/var/log/logstash/gc.log"
LS_OPEN_FILES="16384"
LS_NICE="19"
SERVICE_NAME="logstash"
SERVICE_DESCRIPTION="logstash"

说明:

有人说:装了jdk,还是没法启动logstash,什么问题?,配置文件里配置了JAVACMD,自装的位置不匹配,修改即可。

有人说:为啥我的CPU使用率nice部分很高,nice不是和优先级相关的吗?,首先需要理解nice cpu的含义,低优先级程序运行的cpu使用率,而该配置文件中,指定了LS_NICE="19",优先级很低。

最后更新于