欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

JavaEE:使用Kafka收集日志

时间:2023-04-15

一、log4j2日志输出:

1.添加依赖+打包信息:

(1)添加log4j2依赖+打包信息:

            org.springframework.boot        spring-boot-starter-web                                            org.springframework.boot                spring-boot-starter-logging                                        org.springframework.boot        spring-boot-starter-log4j2                com.alibaba        fastjson        1.2.79                com.lmax        disruptor        3.4.4        logserver                        src/main/java                            ***.properties                        true                              src/main/resources                            **/*.*                                                org.springframework.boot            spring-boot-maven-plugin            2.2.8.RELEASE                            com.yyh.log.LogApplication                       

(2)application.yml:

server:  port: 10003spring:  application:    name: logserver  http:    encoding:      charset: UTF-8  jackson:    date-format: yyyy-MM-dd HH:mm:ss    time-zone: GMT+8    default-property-inclusion: NON_NULL

2.在application.yml中创建log4j2.xml(基于网上源代码修改):

<?xml version="1.0" encoding="UTF-8"?>            /usr/local/filebeat/logs         mylog         [%level{length=5}] [%d{yyyy-MM-dd'T'HH:mm:ss.SSS}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n                                                                                                                                                                                                                                                                                                                                                                                                                              

3.使用MDC替换log中的字段值:

@Componentpublic class MDCUtil implements EnvironmentAware {    private static Environment environment;    @Override    public void setEnvironment(Environment environment) {        MDCUtil.environment = environment;    }    public static void setMDC() {//替换log中的提定key的值        MDC.put("hostName", NetUtils.getLocalHostname());        MDC.put("ip", NetUtils.getMacAddressString());        MDC.put("applicationName", environment.getProperty("spring.application.name"));  //从application.yml中获取服务名称    }}

4.打印log测试效果:

@RestController@Slf4jpublic class LogController {    @RequestMapping("/testLog")    public String testLog() {        MDCUtil.setMDC();        log.info("这是info日志");        log.warn("这是warn日志");        log.error("这是error日志");        return "测试log";    }}

5.将工程打包为xxx.jar,用XFtp上传到centos上,启动:

[root@localhost ~]# java -jar logserver.jar &

二、Kafka上创建两个用于接收日志的topic:

安装/配置Kafka(主机IP:192.168.233.147):

https://blog.csdn.net/a526001650a/article/details/123062877?spm=1001.2014.3001.5502

1.创建all-log名称的topic(cd /usr/local/kafka/bin目录,--partitions指定分区数量,--replication-factor指定副本集数量):

[root@localhost bin]# ./kafka-topics.sh --bootstrap-server 192.168.233.147:9092 --create --topic all-log --partitions 1 --replication-factor 1

2.创建warn-log名称的topic(cd /usr/local/kafka/bin目录,--partitions指定分区数量,--replication-factor指定副本集数量):

[root@localhost bin]# ./kafka-topics.sh --bootstrap-server 192.168.233.147:9092 --create --topic warn-log --partitions 1 --replication-factor 1

三、使用FileBeat收集日志,并输出到Kafka:

1.FileBeat安装/配置:

(1)下载filebeat-8.0.0-linux-x86_64.tar.gz,使用Xftp上传到centos:

https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-installation-configuration.html

(2)解压到/usr/local目录:

[root@localhost ~]# tar -zxvf filebeat-8.0.0-linux-x86_64.tar.gz -C /usr/local/

(3)重命名(cd /usr/local):

[root@localhost local]# mv filebeat-8.0.0-linux-x86_64 filebeat

(4)修改filebeat.yml配置(cd /usr/local/filebeat):

[root@localhost filebeat]# vi filebeat.yml

内容如下:

..、 #省略其他原有配置filebeat.inputs:   #log输入配置- type: log   #全量log配置  enabled: true #启动此输入配置  paths:    - /usr/local/filebeat/logs/all-mylog.log    #log文件路径  document_type: "all-log"  #document_type为自定义类型,写入ES时的_type值  multiline:  #多行log抓取规则    pattern: '^['                               #匹配[开头的log    negate: true                                #true为必须匹配    match: after                                #以[开头的多行log,从第2行-max_lines行合并到第1行的末尾    max_lines: 2000                             #参与合并的最大行数    timeout: 2s                                 #多行log合并等待时间,超过2秒后不再合并到当前第1行  fields:    web_name: logserver           #应用名称(自定义key:value)    log_topic: all-log            #kafka topic名称(自定义key:value)    evn: test                     #测试环境(自定义key:value)- type: log                                    #warn级别的log配置  enabled: true #启动此输入配置  paths:    - /usr/local/filebeat/logs/warn-mylog.log  #log文件路径  document_type: "warn-log"  multiline:  #多行log抓取规则    pattern: '^['                               #匹配[开头的log    negate: true                                #true为必须匹配    match: after                                #以[开头的多行log,从第2行-max_lines行合并到第1行的末尾    max_lines: 2000                             #参与合并的最大行数    timeout: 2s                                 #多行log合并等待时间,超过2秒后不再合并到当前第1行  fields:    web_name: logserver           #应用名称(自定义key:value)    log_topic: warn-log           #kafka topic名称(自定义key:value)    evn: test                     #测试环境(自定义key:value)    output.kafka:  #配置输出log到kafka,注释掉默认的Elasticsearch Output配置  enabled: true  #启用输出  hosts: ["192.168.233.147:9092"]  #kafka主机地址,多个时:["ip1:port1", "ip2:port2"]  topic: '%{[fields.log_topic]}'   #输出到kafka指定topic(获取上面自定义的log_topic属性值)  partition.hash:                  #分区规则,值:hash(默认)、random、 round_robin    reachable_only: true  compression: gzip                #使用gzip压缩(默认), 其他值:none、snappy、lz4  compression_level: 4             #压缩级别(默认为4),0为不压缩,1(最佳速度)-9(最佳压缩)  max_message_bytes: 1000000       #消息最大值,单位为字节  required_acks: 1                 #ACK级别,0无响应,1等待本地提交(默认),-1等待所有副本提交。logging.to_files: true   #log输出到文件

2.FileBeat启动:

(1)检查配置(cd /usr/local/filebeat):

[root@localhost filebeat]# ./filebeat test config -e

(2)启动:

[root@localhost filebeat]# ./filebeat &

(3)查看是否启动成功:

[root@localhost filebeat]# ps -ef | grep filebeat

3.查看已抓取的log文件内容(cd /usr/local/filebeat/logs/,全量log为all-mylog.log,warn级别log为warn-mylog.log):

[root@localhost logs]# cat all-mylog.log

4.查看已传输到kafka的log文件目录(本例中如:all-log-0目录、warn-log-0目录):

[root@localhost ~]# cd /usr/local/kafka/kafka-logs/

四、使用Logstash过滤/消费日志:

1.Logstasht安装:

(1)下载logstash-8.0.0-linux-x86_64.tar.gz,使用Xftp上传到centos:

https://elasticsearch.cn/download/

(2)解压到/usr/local目录:

[root@localhost ~]# tar -zxvf logstash-8.0.0-linux-x86_64.tar.gz -C /usr/local/

(3)重命名(cd /usr/local):

[root@localhost local]# mv logstash-8.0.0 logstash

(4)创建scrpit目录(cd /usr/local/logstash):

[root@localhost logstash]# mkdir scrpit

2.Logstasht配置:

(1)配置文件说明(cd /usr/local/logstash/config):

jvm.options        #JVM参数配置文件log4j2.properties  #log格式配置文件startup.options    #制作centos服务参数logstash.yml       #logstash配置

(2)修改logstash.yml配置(cd /usr/local/logstash/config):

[root@localhost config]# vi logstash.yml

内容如下:

pipeline.workers: 8   #修改工作线程数

(3)创建logstash-start,用XFtp将上传到/usr/local/logstash/scrpit目录,内容如下(基于网上源代码修改):

input {  #接收log配置  kafka {  #配置接收kafka的all-log topic的日志    topics_pattern => "all-log-.*"  #配置topics    bootstrap_servers => "192.168.233.147:9092"  #配置kafka主机地址    codec => json    consumer_threads => 4       #消费线程为4个    decorate_events => true    #auto_offset_rest => "latest"   #默认已配置    group_id => "all-log-group"   #配置分组名  }  kafka {  #配置接收kafka的warn-log topic的日志    topics_pattern => "warn-log-.*"  #配置topics    bootstrap_servers => "192.168.233.147:9092"  #配置kafka主机地址    codec => json    consumer_threads => 4       #消费线程为4个    decorate_events => true    #auto_offset_rest => "latest"   #默认已配置    group_id => "warn-log-group"   #配置分组名  }}filter { #过滤log配置  ruby {  #修改时区    code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"  }  if "all-log" in [fields][log_topic] {  #fields.log_topic.all-log    grok { #与log4j2.xml中LOG_FORMAT保持一致      match => ["message", "[%{NOTSPACE:level}] [%{NOTSPACE:currentDateTime}] [%{NOTSPACE:thread-id}] [%{NOTSPACE:class}] [%{data:hostName}] [%{data:ip}] [%{data:applicationName}] [%{data:location}] [%{data:messageInfo}] ## (''|%{QUOTEDSTRING:throwable})"]    }  }  if "warn-log" in [fields][log_topic] {  #fields.log_topic.warn-log    grok { #与log4j2.xml中LOG_FORMAT保持一致      match => ["message", "[%{NOTSPACE:level}] [%{NOTSPACE:currentDateTime}] [%{NOTSPACE:thread-id}] [%{NOTSPACE:class}] [%{data:hostName}] [%{data:ip}] [%{data:applicationName}] [%{data:location}] [%{data:messageInfo}] ## (''|%{QUOTEDSTRING:throwable})"]    }  }}output { #1.log输出到elasticsearch  if "all-log" in [fields][log_topic] { #全量log输出到elasticsearch配置    elasticsearch { #使用elasticsearch插件      hosts => ["192.168.233.147:9200"]  #elasticsearch主机地址      user => "root"                     #用户名      password => "root123456"           #密码      index => "all-log-%{[fields][web_name]}-%{index_time}"      sniffing => true   #true启用嗅探机制(使用此机制进行elasticsearch集群负载均衡发送log)      template_overwrite => true   #覆盖logstash默认模板    }  }  if "warn-log" in [fields][logtopic] { #warn log输出到elasticsearch配置    elasticsearch { #使用elasticsearch插件      hosts => ["192.168.233.147:9200"]  #elasticsearch主机地址      user => "root"                     #用户名      password => "root123456"           #密码      index => "warn-log-%{[fields][web_name]}-%{index_time}"      sniffing => true   #true启用嗅探机制(使用此机制进行elasticsearch集群负载均衡发送log)      template_overwrite => true   #覆盖logstash默认模板    }  }}output { #2.log输出到控制台  stdout {    codec => rubydebug  }}

3.Logstash启动(cd /usr/local/logstash/bin):

[root@localhost bin]# ./logstash -f /usr/local/logstash/scrpit/logstash-start.conf &

五、安装/配置Elasticsearch+Kibana(主机:192.168.233.147):

1.安装elasticsearch:

(1)下载elasticsearch-8.0.0-x86_64.rpm,使用Xftp上传到centos:

https://elasticsearch.cn/download/

(2)使用rpm命令安装:

[root@localhost ~]# rpm -i elasticsearch-8.0.0-x86_64.rpm

(3)修改elasticsearch.yml(cd /etc/elasticsearch/):

network.host: 192.168.233.147http.port: 9200

(4)配置开机启动:

[root@localhost ~]# systemctl daemon-reload[root@localhost ~]# systemctl enable elasticsearch.service

(5)启动/停止/重启/查看状态:

[root@localhost ~]# systemctl start elasticsearch.service[root@localhost ~]# systemctl stop elasticsearch.service[root@localhost ~]# systemctl restart elasticsearch.service[root@localhost ~]# systemctl status elasticsearch.service

2.安装Kibana:

(1)下载kibana-8.0.0-x86_64.rpm,使用Xftp上传到centos:

https://elasticsearch.cn/download/

(2)使用rpm命令安装:

[root@localhost ~]# rpm -i kibana-8.0.0-x86_64.rpm

(3)修改kibana.yml(cd /etc/kibana/):

server.port: 5601server.host: "192.168.233.147"elasticsearch.hosts: ["http://192.168.233.147:9200"]    #连接elasticsearchelasticsearch.username: "kibana_system"elasticsearch.password: "pass"elasticsearch.pingTimeout: 30000elasticsearch.requestTimeout: 30000

(4)配置开机启动:

[root@localhost ~]# systemctl daemon-reload[root@localhost ~]# systemctl enable kibana.service

(5)启动/停止/重启/查看状态:

[root@localhost ~]# systemctl start kibana.service[root@localhost ~]# systemctl stop kibana.service[root@localhost ~]# systemctl restart kibana.service[root@localhost ~]# systemctl status kibana.service

(6)浏览器打开kibana首页:

http://192.168.233.147:5601/app/kibana

(7)在Kibana页面中使用Watcher监控告警日志:

Getting started with Watcher | Elasticsearch Guide [8.0] | Elastichttps://www.elastic.co/guide/en/elasticsearch/reference/current/watcher-getting-started.html

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。