input {
kafka {
id => "input_kafka_json_pubg_01"
group_id => "lgs-welog02cn-new"
bootstrap_servers => "kfk-ppubg01cn-01.svc.example.net:9092,kfk-ppubg01cn-02.svc.example.net:9092,kfk-ppubg01cn-03.svc.example.net:9092"
topics => [
"gen_log_valid"
]
tags => [
"input_kafka_json_pubg"
]
codec => "plain"
decorate_events => true
auto_offset_reset => "latest"
consumer_threads => 2
}
kafka {
id => "input_kafka_json_pubg_02"
group_id => "lgs-welog02cn-new"
bootstrap_servers => "kfk-ppubg01cn-01.svc.example.net:9092,kfk-ppubg01cn-02.svc.example.net:9092,kfk-ppubg01cn-03.svc.example.net:9092"
topics => [
"examplec-logs-topic"
]
tags => [
"input_kafka_json_pubg"
]
codec => "plain"
decorate_events => true
auto_offset_reset => "latest"
consumer_threads => 2
}
kafka {
id => "input_kafka_json_bi"
group_id => "lgs-welog02cn-new"
bootstrap_servers => "10.40.83.90:9092,10.40.14.12:9092,10.40.75.137:9092"
topics => [
"T1StatActivation",
"T1StatViewPage",
"tick_call_record_1",
"T1StatActionEvent",
"device_info"
]
tags => [
"input_kafka_json_bi"
]
codec => "plain"
decorate_events => true
auto_offset_reset => "earliest"
}
kafka {
id => "input_kafka_json_shanxin"
group_id => "lgs-welog02cn-new"
bootstrap_servers => "10.49.48.129:9092"
topics => [
"gen_log_valid"
]
tags => [
"input_kafka_json_shanxin"
]
codec => "plain"
decorate_events => true
auto_offset_reset => "latest"
}
}
# 整理日志,整理完的日志保证具有 1.增加输出方式 2.增加index前缀
filter {
#敏感信息过滤, 类似身份证号码、手机号码全部替换
mutate {
rename => ["message", "_message"]
gsub => [
"_message", "(\D[2-9]\d{2})\d{12}(\d{4}\D)", '\1000000000000\2',
"_message", "(\D[1-9][0-9]{2})[0-9]{3}[12][089][0-9]{2}[01][0-9][0123][0-9]([0-9]{3}[0-9Xx]\D)", '\100000000000\2',
"_message", "(\D[2-9]\d{2})\d{9}(\d{4}\D)", '\1000000000\2',
"_message", "(\D1[3-9]\d)[0-9]{4}([0-9][0-9]{3}\D)", '\10000\2'
]
}
json {
source => "_message"
}
mutate {
remove_field => "_message"
}
# 如果日志来自于kafka,tags加上来源topic
#if [@metadata][kafka] {
# mutate {
# add_field => { "kafka_topic" => "%{[@metadata][kafka][topic]}" }
# }
#}
# 丢弃不含有department字段的数据
if !([department]) {
drop { }
}
# 一般日志处理
else if [department] in [
"ambition",
"capital",
"consumer-nginx",
"consumer-frontend",
"crawler4j",
"data4j",
"data4n",
"databrazil",
"datahub",
"mall",
"nano",
"nearprime",
"o2o",
"examplec",
"weops",
"brazil_open",
"market",
"debtnano",
"spacebox",
"autofinance",
"kong-weops",
"datacenter",
"datacenter-nginx",
"shandun"
] {
mutate {
add_field => { "[@metadata][index_prefix]" => "logstash-%{department}" }
}
}
# 新接入日志采用标准department及logtype形式
else if [department] in [
"vdba",
"histore",
"bi",
"model",
"consumer.v2",
"devops.v2",
"coresystem",
"future",
"example2",
"capital.v2",
"security",
"primeloan",
"frontend",
"open",
"mongodb",
"nano.v2",
"data",
"shanxin",
"traffic",
"o2o.v2",
"prime",
"public"
] {
# 如果不存在logtype字段或为空,则为logtype字段赋默认值
if ![logtype] or [logtype] == "" {
mutate {
add_field => { "[logtype]" => "common" }
}
}
mutate {
add_field => { "[@metadata][index_prefix]" => "logstash-%{department}-%{logtype}-prod" }
}
}
# bi日志单独处理,type字段需要转换为小写
else if [department] == "bi-tj-data" {
mutate {
lowercase => ["type"]
add_field => { "[@metadata][index_prefix]" => "logstash-bi-%{type}" }
}
}
# k8s日志单独处理
else if [department] in [
"k8s-ops",
"k8s-node"
] {
# 如果不存在logtype字段或为空,则为logtype字段赋默认值
if ![logtype] or [logtype] == "" {
mutate {
add_field => { "[logtype]" => "common" }
}
}
mutate {
add_field => { "[@metadata][index_prefix]" => "logstash-%{department}-%{logtype}-prod" }
}
}
# k8s标准输出
else if [department] == "k8s-std" {
mutate {
add_field => { "[@metadata][index_prefix]" => "logstash-%{department}-%{stream}-prod" }
}
}
# 避免其他漏网日志输出
else {
drop { }
}
# 设置后缀
if [department] == "vdba" {
if [logtype] == "audit-proxy" and [month_tag] in [
"1",
"2",
"3",
"4",
"5",
"6",
"7",
"8",
"9",
"10",
"11",
"12"
] {
mutate {
add_field => { "[@metadata][index_suffix]" => "%{+YYYY.MM}-%{month_tag}" }
}
}
else {
mutate {
add_field => { "[@metadata][index_suffix]" => "%{+YYYY.MM}" }
}
}
}
else {
mutate {
add_field => { "[@metadata][index_suffix]" => "%{+YYYY.MM.dd}" }
}
}
}
output {
elasticsearch {
hosts => [
"es-welog02cn-p004.pek3.example.net:9200",
"es-welog02cn-p005.pek4.example.net:9200"
]
user => "logstash_internal"
password => "CBd2GtRYvWvH8qJ6Vd8y"
index => "%{[@metadata][index_prefix]}-%{[@metadata][index_suffix]}"
manage_template => false
sniffing => true
}
}