使用Logstash解析日志
输入
input {
kafka{
bootstrap_servers => ["127.0.0.1:9092"]
group_id => "logstash"
auto_offset_reset => "smallest"
consumer_threads => 5
decorate_events => "true"
topics => ["filebeat"]
type => "log"
}
}
过滤
filter {
grok {
match => {"message" => "\[%{TIMESTAMP_ISO8601:timestamp}\] %{LOGLEVEL:level} (?<info>([\s\S]*))"}
}
}
使用grok解析日志
输出
output {
if [level] == "INFO" {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "info"
timeout => 300
#user => "elastic"
#password => "changeme"
}
}
if [level] == "WARN" {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "warn"
timeout => 300
#user => "elastic"
#password => "changeme"
}
}
}