1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        SpringBoot+Kafka+ELK 完成海量日志收集(超詳細(xì))

        共 10759字,需瀏覽 22分鐘

         ·

        2021-11-11 21:06

        來(lái)源:jiandansuifeng.blog.csdn.net/

        article/details/107361190

        整體流程大概如下:

        服務(wù)器準(zhǔn)備

        在這先列出各服務(wù)器節(jié)點(diǎn),方便同學(xué)們?cè)谙挛闹袑?duì)照節(jié)點(diǎn)查看相應(yīng)內(nèi)容

        SpringBoot項(xiàng)目準(zhǔn)備

        引入log4j2替換SpringBoot默認(rèn)log,demo項(xiàng)目結(jié)構(gòu)如下:

        pom

        <dependencies>
        ????<dependency>
        ????????<groupId>org.springframework.bootgroupId>
        ????????<artifactId>spring-boot-starter-webartifactId>
        ????????
        ????????<exclusions>
        ????????????<exclusion>
        ????????????????<groupId>org.springframework.bootgroupId>
        ????????????????<artifactId>spring-boot-starter-loggingartifactId>
        ????????????exclusion>
        ????????exclusions>
        ????dependency>?
        ?
        ?<dependency>
        ?????<groupId>org.springframework.bootgroupId>
        ?????<artifactId>spring-boot-starter-log4j2artifactId>
        ?dependency>?
        ???<dependency>
        ?????<groupId>com.lmaxgroupId>
        ?????<artifactId>disruptorartifactId>
        ?????<version>3.3.4version>
        ???dependency>?
        dependencies>?

        log4j2.xml


        <Configuration?status="INFO"?schema="Log4J-V2.0.xsd"?monitorInterval="600"?>
        ????<Properties>
        ????????<Property?name="LOG_HOME">logsProperty>
        ????????<property?name="FILE_NAME">collectorproperty>
        ????????<property?name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}]?[%level{length=5}]?[%thread-%tid]?[%logger]?[%X{hostName}]?[%X{ip}]?[%X{applicationName}]?[%F,%L,%C,%M]?[%m]?##?'%ex'%nproperty>
        ????Properties>
        ????<Appenders>
        ????????<Console?name="CONSOLE"?target="SYSTEM_OUT">
        ????????????<PatternLayout?pattern="${patternLayout}"/>
        ????????Console>??
        ????????<RollingRandomAccessFile?name="appAppender"?fileName="${LOG_HOME}/app-${FILE_NAME}.log"?filePattern="${LOG_HOME}/app-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log"?>
        ??????????<PatternLayout?pattern="${patternLayout}"?/>
        ??????????<Policies>
        ??????????????<TimeBasedTriggeringPolicy?interval="1"/>
        ??????????????<SizeBasedTriggeringPolicy?size="500MB"/>
        ??????????Policies>
        ??????????<DefaultRolloverStrategy?max="20"/>?????????
        ????????RollingRandomAccessFile>
        ????????<RollingRandomAccessFile?name="errorAppender"?fileName="${LOG_HOME}/error-${FILE_NAME}.log"?filePattern="${LOG_HOME}/error-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log"?>
        ??????????<PatternLayout?pattern="${patternLayout}"?/>
        ??????????<Filters>
        ??????????????<ThresholdFilter?level="warn"?onMatch="ACCEPT"?onMismatch="DENY"/>
        ??????????Filters>??????????????
        ??????????<Policies>
        ??????????????<TimeBasedTriggeringPolicy?interval="1"/>
        ??????????????<SizeBasedTriggeringPolicy?size="500MB"/>
        ??????????Policies>
        ??????????<DefaultRolloverStrategy?max="20"/>?????????
        ????????RollingRandomAccessFile>????????????
        ????Appenders>
        ????<Loggers>
        ????????
        ????????<AsyncLogger?name="com.bfxy.*"?level="info"?includeLocation="true">
        ??????????<AppenderRef?ref="appAppender"/>
        ????????AsyncLogger>
        ????????<AsyncLogger?name="com.bfxy.*"?level="info"?includeLocation="true">
        ??????????<AppenderRef?ref="errorAppender"/>
        ????????AsyncLogger>???????
        ????????<Root?level="info">
        ????????????<Appender-Ref?ref="CONSOLE"/>
        ????????????<Appender-Ref?ref="appAppender"/>
        ????????????<AppenderRef?ref="errorAppender"/>
        ????????Root>?????????
        ????Loggers>
        Configuration>

        IndexController

        測(cè)試Controller,用以打印日志進(jìn)行調(diào)試

        @Slf4j
        @RestController
        public?class?IndexController?{

        ?@RequestMapping(value?=?"/index")
        ?public?String?index()?{
        ??InputMDC.putMDC();
        ??
        ??log.info("我是一條info日志");
        ??
        ??log.warn("我是一條warn日志");

        ??log.error("我是一條error日志");
        ??
        ??return?"idx";
        ?}


        ?@RequestMapping(value?=?"/err")
        ?public?String?err()?{
        ??InputMDC.putMDC();
        ??try?{
        ???int?a?=?1/0;
        ??}?catch?(Exception?e)?{
        ???log.error("算術(shù)異常",?e);
        ??}
        ??return?"err";
        ?}
        ?
        }

        InputMDC

        用以獲取log中的[%X{hostName}]、[%X{ip}][%X{applicationName}]三個(gè)字段值

        @Component
        public?class?InputMDC?implements?EnvironmentAware?{

        ?private?static?Environment?environment;
        ?
        ?@Override
        ?public?void?setEnvironment(Environment?environment)?{
        ??InputMDC.environment?=?environment;
        ?}
        ?
        ?public?static?void?putMDC()?{
        ??MDC.put("hostName",?NetUtil.getLocalHostName());
        ??MDC.put("ip",?NetUtil.getLocalIp());
        ??MDC.put("applicationName",?environment.getProperty("spring.application.name"));
        ?}

        }

        NetUtil

        public?class?NetUtil?{???
        ?
        ?public?static?String?normalizeAddress(String?address){
        ??String[]?blocks?=?address.split("[:]");
        ??if(blocks.length?>?2){
        ???throw?new?IllegalArgumentException(address?+?"?is?invalid");
        ??}
        ??String?host?=?blocks[0];
        ??int?port?=?80;
        ??if(blocks.length?>?1){
        ???port?=?Integer.valueOf(blocks[1]);
        ??}?else?{
        ???address?+=?":"+port;?//use?default?80
        ??}?
        ??String?serverAddr?=?String.format("%s:%d",?host,?port);
        ??return?serverAddr;
        ?}
        ?
        ?public?static?String?getLocalAddress(String?address){
        ??String[]?blocks?=?address.split("[:]");
        ??if(blocks.length?!=?2){
        ???throw?new?IllegalArgumentException(address?+?"?is?invalid?address");
        ??}?
        ??String?host?=?blocks[0];
        ??int?port?=?Integer.valueOf(blocks[1]);
        ??
        ??if("0.0.0.0".equals(host)){
        ???return?String.format("%s:%d",NetUtil.getLocalIp(),?port);
        ??}
        ??return?address;
        ?}
        ?
        ?private?static?int?matchedIndex(String?ip,?String[]?prefix){
        ??for(int?i=0;?i???String?p?=?prefix[i];
        ???if("*".equals(p)){?//*,?assumed?to?be?IP
        ????if(ip.startsWith("127.")?||
        ???????ip.startsWith("10.")?||?
        ???????ip.startsWith("172.")?||
        ???????ip.startsWith("192.")){
        ?????continue;
        ????}
        ????return?i;
        ???}?else?{
        ????if(ip.startsWith(p)){
        ?????return?i;
        ????}
        ???}?
        ??}
        ??
        ??return?-1;
        ?}
        ?
        ?public?static?String?getLocalIp(String?ipPreference)?{
        ??if(ipPreference?==?null){
        ???ipPreference?=?"*>10>172>192>127";
        ??}
        ??String[]?prefix?=?ipPreference.split("[>?]+");
        ??try?{
        ???Pattern?pattern?=?Pattern.compile("[0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+");
        ???Enumeration?interfaces?=?NetworkInterface.getNetworkInterfaces();
        ???String?matchedIp?=?null;
        ???int?matchedIdx?=?-1;
        ???while?(interfaces.hasMoreElements())?{
        ????NetworkInterface?ni?=?interfaces.nextElement();
        ????Enumeration?en?=?ni.getInetAddresses();?
        ????while?(en.hasMoreElements())?{
        ?????InetAddress?addr?=?en.nextElement();
        ?????String?ip?=?addr.getHostAddress();??
        ?????Matcher?matcher?=?pattern.matcher(ip);
        ?????if?(matcher.matches())?{??
        ??????int?idx?=?matchedIndex(ip,?prefix);
        ??????if(idx?==?-1)?continue;
        ??????if(matchedIdx?==?-1){
        ???????matchedIdx?=?idx;
        ???????matchedIp?=?ip;
        ??????}?else?{
        ???????if(matchedIdx>idx){
        ????????matchedIdx?=?idx;
        ????????matchedIp?=?ip;
        ???????}
        ??????}
        ?????}?
        ????}?
        ???}?
        ???if(matchedIp?!=?null)?return?matchedIp;
        ???return?"127.0.0.1";
        ??}?catch?(Exception?e)?{?
        ???return?"127.0.0.1";
        ??}
        ?}
        ?
        ?public?static?String?getLocalIp()?{
        ??return?getLocalIp("*>10>172>192>127");
        ?}
        ?
        ?public?static?String?remoteAddress(SocketChannel?channel){
        ??SocketAddress?addr?=?channel.socket().getRemoteSocketAddress();
        ??String?res?=?String.format("%s",?addr);
        ??return?res;
        ?}
        ?
        ?public?static?String?localAddress(SocketChannel?channel){
        ??SocketAddress?addr?=?channel.socket().getLocalSocketAddress();
        ??String?res?=?String.format("%s",?addr);
        ??return?addr==null??res:?res.substring(1);
        ?}
        ?
        ?public?static?String?getPid(){
        ??RuntimeMXBean?runtime?=?ManagementFactory.getRuntimeMXBean();
        ????????String?name?=?runtime.getName();
        ????????int?index?=?name.indexOf("@");
        ????????if?(index?!=?-1)?{
        ????????????return?name.substring(0,?index);
        ????????}
        ??return?null;
        ?}
        ?
        ?public?static?String?getLocalHostName()?{
        ????????try?{
        ????????????return?(InetAddress.getLocalHost()).getHostName();
        ????????}?catch?(UnknownHostException?uhe)?{
        ????????????String?host?=?uhe.getMessage();
        ????????????if?(host?!=?null)?{
        ????????????????int?colon?=?host.indexOf(':');
        ????????????????if?(colon?>?0)?{
        ????????????????????return?host.substring(0,?colon);
        ????????????????}
        ????????????}
        ????????????return?"UnknownHost";
        ????????}
        ????}
        }

        啟動(dòng)項(xiàng)目,訪問(wèn)/index/ero接口,可以看到項(xiàng)目中生成了app-collector.logerror-collector.log兩個(gè)日志文件

        我們將Springboot服務(wù)部署在192.168.11.31這臺(tái)機(jī)器上。

        Kafka安裝和啟用

        kafka下載地址:

        http://kafka.apache.org/downloads.html

        kafka安裝步驟:首先kafka安裝需要依賴(lài)與zookeeper,所以小伙伴們先準(zhǔn)備好zookeeper環(huán)境(三個(gè)節(jié)點(diǎn)即可),然后我們來(lái)一起構(gòu)建kafka broker。

        ##?解壓命令:
        tar?-zxvf?kafka_2.12-2.1.0.tgz?-C?/usr/local/
        ##?改名命令:
        mv?kafka_2.12-2.1.0/?kafka_2.12
        ##?進(jìn)入解壓后的目錄,修改server.properties文件:
        vim?/usr/local/kafka_2.12/config/server.properties
        ##?修改配置:
        broker.id=0
        port=9092
        host.name=192.168.11.51
        advertised.host.name=192.168.11.51
        log.dirs=/usr/local/kafka_2.12/kafka-logs
        num.partitions=2
        zookeeper.connect=192.168.11.111:2181,192.168.11.112:2181,192.168.11.113:2181

        ##?建立日志文件夾:
        mkdir?/usr/local/kafka_2.12/kafka-logs

        ##啟動(dòng)kafka:
        /usr/local/kafka_2.12/bin/kafka-server-start.sh?/usr/local/kafka_2.12/config/server.properties?&

        創(chuàng)建兩個(gè)topic

        ##?創(chuàng)建topic
        kafka-topics.sh?--zookeeper?192.168.11.111:2181?--create?--topic?app-log-collector?--partitions?1??--replication-factor?1
        kafka-topics.sh?--zookeeper?192.168.11.111:2181?--create?--topic?error-log-collector?--partitions?1??--replication-factor?1?

        我們可以查看一下topic情況

        kafka-topics.sh?--zookeeper?192.168.11.111:2181?--topic?app-log-test?--describe

        可以看到已經(jīng)成功啟用了app-log-collectorerror-log-collector兩個(gè)topic

        filebeat安裝和啟用

        filebeat下載

        cd?/usr/local/software
        tar?-zxvf?filebeat-6.6.0-linux-x86_64.tar.gz?-C?/usr/local/
        cd?/usr/local
        mv?filebeat-6.6.0-linux-x86_64/?filebeat-6.6.0

        配置filebeat,可以參考下方y(tǒng)ml配置文件

        vim?/usr/local/filebeat-5.6.2/filebeat.yml
        ######################?Filebeat?Configuration?Example?#########################
        filebeat.prospectors:

        -?input_type:?log

        ??paths:
        ????##?app-服務(wù)名稱(chēng).log,?為什么寫(xiě)死,防止發(fā)生輪轉(zhuǎn)抓取歷史數(shù)據(jù)
        ????-?/usr/local/logs/app-collector.log
        ??#定義寫(xiě)入?ES?時(shí)的?_type?值
        ??document_type:?"app-log"
        ??multiline:
        ????#pattern:?'^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'???#?指定匹配的表達(dá)式(匹配以?2017-11-15?08:04:23:889?時(shí)間格式開(kāi)頭的字符串)
        ????pattern:?'^\['??????????????????????????????#?指定匹配的表達(dá)式(匹配以?"{?開(kāi)頭的字符串)
        ????negate:?true????????????????????????????????#?是否匹配到
        ????match:?after????????????????????????????????#?合并到上一行的末尾
        ????max_lines:?2000?????????????????????????????#?最大的行數(shù)
        ????timeout:?2s?????????????????????????????????#?如果在規(guī)定時(shí)間沒(méi)有新的日志事件就不等待后面的日志
        ??fields:
        ????logbiz:?collector
        ????logtopic:?app-log-collector???##?按服務(wù)劃分用作kafka?topic
        ????evn:?dev

        -?input_type:?log

        ??paths:
        ????-?/usr/local/logs/error-collector.log
        ??document_type:?"error-log"
        ??multiline:
        ????#pattern:?'^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'???#?指定匹配的表達(dá)式(匹配以?2017-11-15?08:04:23:889?時(shí)間格式開(kāi)頭的字符串)
        ????pattern:?'^\['??????????????????????????????#?指定匹配的表達(dá)式(匹配以?"{?開(kāi)頭的字符串)
        ????negate:?true????????????????????????????????#?是否匹配到
        ????match:?after????????????????????????????????#?合并到上一行的末尾
        ????max_lines:?2000?????????????????????????????#?最大的行數(shù)
        ????timeout:?2s?????????????????????????????????#?如果在規(guī)定時(shí)間沒(méi)有新的日志事件就不等待后面的日志
        ??fields:
        ????logbiz:?collector
        ????logtopic:?error-log-collector???##?按服務(wù)劃分用作kafka?topic
        ????evn:?dev
        ????
        output.kafka:
        ??enabled:?true
        ??hosts:?["192.168.11.51:9092"]
        ??topic:?'%{[fields.logtopic]}'
        ??partition.hash:
        ????reachable_only:?true
        ??compression:?gzip
        ??max_message_bytes:?1000000
        ??required_acks:?1
        logging.to_files:?true

        filebeat啟動(dòng):

        檢查配置是否正確

        cd?/usr/local/filebeat-6.6.0
        ./filebeat?-c?filebeat.yml?-configtest
        ##?Config?OK

        啟動(dòng)filebeat

        /usr/local/filebeat-6.6.0/filebeat?&

        檢查是否啟動(dòng)成功

        ps?-ef?|?grep?filebeat

        可以看到filebeat已經(jīng)啟動(dòng)成功

        然后我們?cè)L問(wèn)192.168.11.31:8001/index和192.168.11.31:8001/err,再查看kafka的logs文件,可以看到已經(jīng)生成了app-log-collector-0和error-log-collector-0文件,說(shuō)明filebeat已經(jīng)幫我們把數(shù)據(jù)收集好放到了kafka上。

        logstash安裝

        我們?cè)趌ogstash的安裝目錄下新建一個(gè)文件夾

        mkdir?scrpit

        然后cd進(jìn)該文件,創(chuàng)建一個(gè)logstash-script.conf文件

        cd?scrpit
        vim?logstash-script.conf
        ## multiline 插件也可以用于其他類(lèi)似的堆棧式信息,比如 linux 的內(nèi)核日志。
        input?{
        ??kafka?{
        ????##?app-log-服務(wù)名稱(chēng)
        ????topics_pattern?=>?"app-log-.*"
        ????bootstrap_servers?=>?"192.168.11.51:9092"
        ?codec?=>?json
        ?consumer_threads?=>?1?##?增加consumer的并行消費(fèi)線程數(shù)
        ?decorate_events?=>?true
        ????#auto_offset_rest?=>?"latest"
        ?group_id?=>?"app-log-group"
        ???}
        ???
        ???kafka?{
        ????##?error-log-服務(wù)名稱(chēng)
        ????topics_pattern?=>?"error-log-.*"
        ????bootstrap_servers?=>?"192.168.11.51:9092"
        ?codec?=>?json
        ?consumer_threads?=>?1
        ?decorate_events?=>?true
        ????#auto_offset_rest?=>?"latest"
        ?group_id?=>?"error-log-group"
        ???}
        ???
        }

        filter?{
        ??
        ??##?時(shí)區(qū)轉(zhuǎn)換
        ??ruby?{
        ?code?=>?"event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
        ??}

        ??if?"app-log"?in?[fields][logtopic]{
        ????grok?{
        ????????##?表達(dá)式,這里對(duì)應(yīng)的是Springboot輸出的日志格式
        ????????match?=>?["message",?"\[%{NOTSPACE:currentDateTime}\]?\[%{NOTSPACE:level}\]?\[%{NOTSPACE:thread-id}\]?\[%{NOTSPACE:class}\]?\[%{DATA:hostName}\]?\[%{DATA:ip}\]?\[%{DATA:applicationName}\]?\[%{DATA:location}\]?\[%{DATA:messageInfo}\]?##?(\'\'|%{QUOTEDSTRING:throwable})"]
        ????}
        ??}

        ??if?"error-log"?in?[fields][logtopic]{
        ????grok?{
        ????????##?表達(dá)式
        ????????match?=>?["message",?"\[%{NOTSPACE:currentDateTime}\]?\[%{NOTSPACE:level}\]?\[%{NOTSPACE:thread-id}\]?\[%{NOTSPACE:class}\]?\[%{DATA:hostName}\]?\[%{DATA:ip}\]?\[%{DATA:applicationName}\]?\[%{DATA:location}\]?\[%{DATA:messageInfo}\]?##?(\'\'|%{QUOTEDSTRING:throwable})"]
        ????}
        ??}
        ??
        }

        ##?測(cè)試輸出到控制臺(tái):
        output?{
        ??stdout?{?codec?=>?rubydebug?}
        }


        ## elasticsearch:
        output?{

        ??if?"app-log"?in?[fields][logtopic]{
        ?##?es插件
        ?elasticsearch?{
        ???????#?es服務(wù)地址
        ????????hosts?=>?["192.168.11.35:9200"]
        ????????#?用戶(hù)名密碼??????
        ????????user?=>?"elastic"
        ????????password?=>?"123456"
        ????????##?索引名,+?號(hào)開(kāi)頭的,就會(huì)自動(dòng)認(rèn)為后面是時(shí)間格式:
        ????????##?javalog-app-service-2019.01.23?
        ????????index?=>?"app-log-%{[fields][logbiz]}-%{index_time}"
        ????????#?是否嗅探集群ip:一般設(shè)置true;http://192.168.11.35:9200/_nodes/http?pretty
        ????????#?通過(guò)嗅探機(jī)制進(jìn)行es集群負(fù)載均衡發(fā)日志消息
        ????????sniffing?=>?true
        ????????#?logstash默認(rèn)自帶一個(gè)mapping模板,進(jìn)行模板覆蓋
        ????????template_overwrite?=>?true
        ????}?
        ??}
        ??
        ??if?"error-log"?in?[fields][logtopic]{
        ?elasticsearch?{
        ????????hosts?=>?["192.168.11.35:9200"]????
        ????????user?=>?"elastic"
        ????????password?=>?"123456"
        ????????index?=>?"error-log-%{[fields][logbiz]}-%{index_time}"
        ????????sniffing?=>?true
        ????????template_overwrite?=>?true
        ????}?
        ??}
        ??

        }

        啟動(dòng)logstash

        /usr/local/logstash-6.6.0/bin/logstash?-f?/usr/local/logstash-6.6.0/script/logstash-script.conf?&

        等待啟動(dòng)成功,我們?cè)俅卧L問(wèn)192.168.11.31:8001/err

        可以看到控制臺(tái)開(kāi)始打印日志

        ElasticSearch與Kibana

        ES和Kibana的搭建之前沒(méi)寫(xiě)過(guò)博客,網(wǎng)上資料也比較多,大家可以自行搜索。

        搭建完成后,訪問(wèn)Kibana的管理頁(yè)面192.168.11.35:5601,選擇Management -> Kinaba - Index Patterns

        然后Create index pattern

        • index pattern 輸入?app-log-*
        • Time Filter field name 選擇 currentDateTime

        這樣我們就成功創(chuàng)建了索引。

        我們?cè)俅卧L問(wèn)192.168.11.31:8001/err,這個(gè)時(shí)候就可以看到我們已經(jīng)命中了一條log信息

        里面展示了日志的全量信息

        到這里,我們完整的日志收集及可視化就搭建完成了!


        瀏覽 46
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            和少妇做爰猛烈叫床很黄口述 | 伊人在线观看 | 俺也去在线 | 91最新在线视频 | 国产精品国产三级国产传播 | 欧美成人A级毛片 | 午夜大香蕉 | 成人欧美一区二区三区黑人一 | 欧美一级一级 | 日韩人妻精品无码久久 |