1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        SpringBoot + Kafka + ELK 完成海量日志收集(超詳細(xì))

        2021-09-02 18:38

        點(diǎn)擊關(guān)注公眾號(hào),Java干貨及時(shí)送達(dá)

        整體流程大概如下:

        服務(wù)器準(zhǔn)備

        在這先列出各服務(wù)器節(jié)點(diǎn),方便同學(xué)們?cè)谙挛闹袑?duì)照節(jié)點(diǎn)查看相應(yīng)內(nèi)容

        SpringBoot項(xiàng)目準(zhǔn)備

        引入log4j2替換SpringBoot默認(rèn)log,demo項(xiàng)目結(jié)構(gòu)如下:

        pom

        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
                <!--  排除spring-boot-starter-logging -->
                <exclusions>
                    <exclusion>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-logging</artifactId>
                    </exclusion>
                </exclusions>
            </dependency> 
         <!-- log4j2 -->
         <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-log4j2</artifactId>
         </dependency> 
           <dependency>
             <groupId>com.lmax</groupId>
             <artifactId>disruptor</artifactId>
             <version>3.3.4</version>
           </dependency> 
        </dependencies> 

        log4j2.xml

        <?xml version="1.0" encoding="UTF-8"?>
        <Configuration status="INFO" schema="Log4J-V2.0.xsd" monitorInterval="600" >
            <Properties>
                <Property name="LOG_HOME">logs</Property>
                <property name="FILE_NAME">collector</property>
                <property name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n</property>
            </Properties>
            <Appenders>
                <Console name="CONSOLE" target="SYSTEM_OUT">
                    <PatternLayout pattern="${patternLayout}"/>
                </Console>  
                <RollingRandomAccessFile name="appAppender" fileName="${LOG_HOME}/app-${FILE_NAME}.log" filePattern="${LOG_HOME}/app-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" >
                  <PatternLayout pattern="${patternLayout}" />
                  <Policies>
                      <TimeBasedTriggeringPolicy interval="1"/>
                      <SizeBasedTriggeringPolicy size="500MB"/>
                  </Policies>
                  <DefaultRolloverStrategy max="20"/>         
                </RollingRandomAccessFile>
                <RollingRandomAccessFile name="errorAppender" fileName="${LOG_HOME}/error-${FILE_NAME}.log" filePattern="${LOG_HOME}/error-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" >
                  <PatternLayout pattern="${patternLayout}" />
                  <Filters>
                      <ThresholdFilter level="warn" onMatch="ACCEPT" onMismatch="DENY"/>
                  </Filters>              
                  <Policies>
                      <TimeBasedTriggeringPolicy interval="1"/>
                      <SizeBasedTriggeringPolicy size="500MB"/>
                  </Policies>
                  <DefaultRolloverStrategy max="20"/>         
                </RollingRandomAccessFile>            
            </Appenders>
            <Loggers>
                <!-- 業(yè)務(wù)相關(guān) 異步logger -->
                <AsyncLogger name="com.bfxy.*" level="info" includeLocation="true">
                  <AppenderRef ref="appAppender"/>
                </AsyncLogger>
                <AsyncLogger name="com.bfxy.*" level="info" includeLocation="true">
                  <AppenderRef ref="errorAppender"/>
                </AsyncLogger>       
                <Root level="info">
                    <Appender-Ref ref="CONSOLE"/>
                    <Appender-Ref ref="appAppender"/>
                    <AppenderRef ref="errorAppender"/>
                </Root>         
            </Loggers>
        </Configuration>

        IndexController

        測(cè)試Controller,用以打印日志進(jìn)行調(diào)試

        @Slf4j
        @RestController
        public class IndexController {

         @RequestMapping(value = "/index")
         public String index() {
          InputMDC.putMDC();
          
          log.info("我是一條info日志");
          
          log.warn("我是一條warn日志");

          log.error("我是一條error日志");
          
          return "idx";
         }


         @RequestMapping(value = "/err")
         public String err() {
          InputMDC.putMDC();
          try {
           int a = 1/0;
          } catch (Exception e) {
           log.error("算術(shù)異常", e);
          }
          return "err";
         }
         
        }

        InputMDC

        用以獲取log中的[%X{hostName}]、[%X{ip}]、[%X{applicationName}]三個(gè)字段值

        @Component
        public class InputMDC implements EnvironmentAware {

         private static Environment environment;
         
         @Override
         public void setEnvironment(Environment environment) {
          InputMDC.environment = environment;
         }
         
         public static void putMDC() {
          MDC.put("hostName", NetUtil.getLocalHostName());
          MDC.put("ip", NetUtil.getLocalIp());
          MDC.put("applicationName", environment.getProperty("spring.application.name"));
         }

        }

        NetUtil

        public class NetUtil {   
         
         public static String normalizeAddress(String address){
          String[] blocks = address.split("[:]");
          if(blocks.length > 2){
           throw new IllegalArgumentException(address + " is invalid");
          }
          String host = blocks[0];
          int port = 80;
          if(blocks.length > 1){
           port = Integer.valueOf(blocks[1]);
          } else {
           address += ":"+port; //use default 80
          } 
          String serverAddr = String.format("%s:%d", host, port);
          return serverAddr;
         }
         
         public static String getLocalAddress(String address){
          String[] blocks = address.split("[:]");
          if(blocks.length != 2){
           throw new IllegalArgumentException(address + " is invalid address");
          } 
          String host = blocks[0];
          int port = Integer.valueOf(blocks[1]);
          
          if("0.0.0.0".equals(host)){
           return String.format("%s:%d",NetUtil.getLocalIp(), port);
          }
          return address;
         }
         
         private static int matchedIndex(String ip, String[] prefix){
          for(int i=0; i<prefix.length; i++){
           String p = prefix[i];
           if("*".equals(p)){ //*, assumed to be IP
            if(ip.startsWith("127.") ||
               ip.startsWith("10.") || 
               ip.startsWith("172.") ||
               ip.startsWith("192.")){
             continue;
            }
            return i;
           } else {
            if(ip.startsWith(p)){
             return i;
            }
           } 
          }
          
          return -1;
         }
         
         public static String getLocalIp(String ipPreference) {
          if(ipPreference == null){
           ipPreference = "*>10>172>192>127";
          }
          String[] prefix = ipPreference.split("[> ]+");
          try {
           Pattern pattern = Pattern.compile("[0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+");
           Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
           String matchedIp = null;
           int matchedIdx = -1;
           while (interfaces.hasMoreElements()) {
            NetworkInterface ni = interfaces.nextElement();
            Enumeration<InetAddress> en = ni.getInetAddresses(); 
            while (en.hasMoreElements()) {
             InetAddress addr = en.nextElement();
             String ip = addr.getHostAddress();  
             Matcher matcher = pattern.matcher(ip);
             if (matcher.matches()) {  
              int idx = matchedIndex(ip, prefix);
              if(idx == -1) continue;
              if(matchedIdx == -1){
               matchedIdx = idx;
               matchedIp = ip;
              } else {
               if(matchedIdx>idx){
                matchedIdx = idx;
                matchedIp = ip;
               }
              }
             } 
            } 
           } 
           if(matchedIp != null) return matchedIp;
           return "127.0.0.1";
          } catch (Exception e) { 
           return "127.0.0.1";
          }
         }
         
         public static String getLocalIp() {
          return getLocalIp("*>10>172>192>127");
         }
         
         public static String remoteAddress(SocketChannel channel){
          SocketAddress addr = channel.socket().getRemoteSocketAddress();
          String res = String.format("%s", addr);
          return res;
         }
         
         public static String localAddress(SocketChannel channel){
          SocketAddress addr = channel.socket().getLocalSocketAddress();
          String res = String.format("%s", addr);
          return addr==null? res: res.substring(1);
         }
         
         public static String getPid(){
          RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
                String name = runtime.getName();
                int index = name.indexOf("@");
                if (index != -1) {
                    return name.substring(0, index);
                }
          return null;
         }
         
         public static String getLocalHostName() {
                try {
                    return (InetAddress.getLocalHost()).getHostName();
                } catch (UnknownHostException uhe) {
                    String host = uhe.getMessage();
                    if (host != null) {
                        int colon = host.indexOf(':');
                        if (colon > 0) {
                            return host.substring(0, colon);
                        }
                    }
                    return "UnknownHost";
                }
            }
        }

        動(dòng)項(xiàng)目,訪問(wèn)/index/ero接口,可以看到項(xiàng)目中生成了app-collector.logerror-collector.log兩個(gè)日志文件:

        分享資料:Spring Boot 學(xué)習(xí)筆記,這個(gè)太全了!

        我們將Springboot服務(wù)部署在192.168.11.31這臺(tái)機(jī)器上。推薦一個(gè) Spring Boot 基礎(chǔ)教程及實(shí)戰(zhàn)示例:https://github.com/javastacks/spring-boot-best-practice

        Kafka安裝和啟用

        kafka下載地址:

        http://kafka.apache.org/downloads.html

        kafka安裝步驟:首先kafka安裝需要依賴(lài)與zookeeper,所以小伙伴們先準(zhǔn)備好zookeeper環(huán)境(三個(gè)節(jié)點(diǎn)即可),然后我們來(lái)一起構(gòu)建kafka broker。

        另外,Kafka 系列面試題和答案全部整理好了,微信搜索Java技術(shù)棧,在后臺(tái)發(fā)送:面試,可以在線閱讀。

        ## 解壓命令:
        tar -zxvf kafka_2.12-2.1.0.tgz -C /usr/local/
        ## 改名命令:
        mv kafka_2.12-2.1.0/ kafka_2.12
        ## 進(jìn)入解壓后的目錄,修改server.properties文件:
        vim /usr/local/kafka_2.12/config/server.properties
        ## 修改配置:
        broker.id=0
        port=9092
        host.name=192.168.11.51
        advertised.host.name=192.168.11.51
        log.dirs=/usr/local/kafka_2.12/kafka-logs
        num.partitions=2
        zookeeper.connect=192.168.11.111:2181,192.168.11.112:2181,192.168.11.113:2181

        ## 建立日志文件夾:
        mkdir /usr/local/kafka_2.12/kafka-logs

        ##啟動(dòng)kafka:
        /usr/local/kafka_2.12/bin/kafka-server-start.sh /usr/local/kafka_2.12/config/server.properties &

        創(chuàng)建兩個(gè)topic

        ## 創(chuàng)建topic
        kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic app-log-collector --partitions 1  --replication-factor 1
        kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic error-log-collector --partitions 1  --replication-factor 1 

        我們可以查看一下topic情況

        kafka-topics.sh --zookeeper 192.168.11.111:2181 --topic app-log-test --describe

        可以看到已經(jīng)成功啟用了app-log-collectorerror-log-collector兩個(gè)topic

        點(diǎn)擊關(guān)注公眾號(hào),Java干貨及時(shí)送達(dá)

        filebeat安裝和啟用

        filebeat下載

        cd /usr/local/software
        tar -zxvf filebeat-6.6.0-linux-x86_64.tar.gz -C /usr/local/
        cd /usr/local
        mv filebeat-6.6.0-linux-x86_64/ filebeat-6.6.0

        配置filebeat,可以參考下方y(tǒng)ml配置文件

        vim /usr/local/filebeat-5.6.2/filebeat.yml
        ###################### Filebeat Configuration Example #########################
        filebeat.prospectors:

        - input_type: log

          paths:
            ## app-服務(wù)名稱(chēng).log, 為什么寫(xiě)死,防止發(fā)生輪轉(zhuǎn)抓取歷史數(shù)據(jù)
            - /usr/local/logs/app-collector.log
          #定義寫(xiě)入 ES 時(shí)的 _type 值
          document_type: "app-log"
          multiline:
            #pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'   # 指定匹配的表達(dá)式(匹配以 2017-11-15 08:04:23:889 時(shí)間格式開(kāi)頭的字符串)
            pattern: '^\['                              # 指定匹配的表達(dá)式(匹配以 "{ 開(kāi)頭的字符串)
            negate: true                                # 是否匹配到
            match: after                                # 合并到上一行的末尾
            max_lines: 2000                             # 最大的行數(shù)
            timeout: 2s                                 # 如果在規(guī)定時(shí)間沒(méi)有新的日志事件就不等待后面的日志
          fields:
            logbiz: collector
            logtopic: app-log-collector   ## 按服務(wù)劃分用作kafka topic
            evn: dev

        - input_type: log

          paths:
            - /usr/local/logs/error-collector.log
          document_type: "error-log"
          multiline:
            #pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'   # 指定匹配的表達(dá)式(匹配以 2017-11-15 08:04:23:889 時(shí)間格式開(kāi)頭的字符串)
            pattern: '^\['                              # 指定匹配的表達(dá)式(匹配以 "{ 開(kāi)頭的字符串)
            negate: true                                # 是否匹配到
            match: after                                # 合并到上一行的末尾
            max_lines: 2000                             # 最大的行數(shù)
            timeout: 2s                                 # 如果在規(guī)定時(shí)間沒(méi)有新的日志事件就不等待后面的日志
          fields:
            logbiz: collector
            logtopic: error-log-collector   ## 按服務(wù)劃分用作kafka topic
            evn: dev
            
        output.kafka:
          enabled: true
          hosts: ["192.168.11.51:9092"]
          topic: '%{[fields.logtopic]}'
          partition.hash:
            reachable_only: true
          compression: gzip
          max_message_bytes: 1000000
          required_acks: 1
        logging.to_files: true

        filebeat啟動(dòng):

        檢查配置是否正確

        cd /usr/local/filebeat-6.6.0
        ./filebeat -c filebeat.yml -configtest
        ## Config OK

        啟動(dòng)filebeat

        /usr/local/filebeat-6.6.0/filebeat &

        檢查是否啟動(dòng)成功

        ps -ef | grep filebeat

        可以看到filebeat已經(jīng)啟動(dòng)成功

        然后我們?cè)L問(wèn)192.168.11.31:8001/index和192.168.11.31:8001/err,再查看kafka的logs文件,可以看到已經(jīng)生成了app-log-collector-0和error-log-collector-0文件,說(shuō)明filebeat已經(jīng)幫我們把數(shù)據(jù)收集好放到了kafka上。

        logstash安裝

        我們?cè)趌ogstash的安裝目錄下新建一個(gè)文件夾

        mkdir scrpit

        然后cd進(jìn)該文件,創(chuàng)建一個(gè)logstash-script.conf文件

        cd scrpit
        vim logstash-script.conf
        ## multiline 插件也可以用于其他類(lèi)似的堆棧式信息,比如 linux 的內(nèi)核日志。
        input {
          kafka {
            ## app-log-服務(wù)名稱(chēng)
            topics_pattern => "app-log-.*"
            bootstrap_servers => "192.168.11.51:9092"
         codec => json
         consumer_threads => 1 ## 增加consumer的并行消費(fèi)線程數(shù)
         decorate_events => true
            #auto_offset_rest => "latest"
         group_id => "app-log-group"
           }
           
           kafka {
            ## error-log-服務(wù)名稱(chēng)
            topics_pattern => "error-log-.*"
            bootstrap_servers => "192.168.11.51:9092"
         codec => json
         consumer_threads => 1
         decorate_events => true
            #auto_offset_rest => "latest"
         group_id => "error-log-group"
           }
           
        }

        filter {
          
          ## 時(shí)區(qū)轉(zhuǎn)換
          ruby {
         code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
          }

          if "app-log" in [fields][logtopic]{
            grok {
                ## 表達(dá)式,這里對(duì)應(yīng)的是Springboot輸出的日志格式
                match => ["message""\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
            }
          }

          if "error-log" in [fields][logtopic]{
            grok {
                ## 表達(dá)式
                match => ["message""\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
            }
          }
          
        }

        ## 測(cè)試輸出到控制臺(tái):
        output {
          stdout { codec => rubydebug }
        }


        ## elasticsearch:
        output {

          if "app-log" in [fields][logtopic]{
         ## es插件
         elasticsearch {
               # es服務(wù)地址
                hosts => ["192.168.11.35:9200"]
                # 用戶(hù)名密碼      
                user => "elastic"
                password => "123456"
                ## 索引名,+ 號(hào)開(kāi)頭的,就會(huì)自動(dòng)認(rèn)為后面是時(shí)間格式:
                ## javalog-app-service-2019.01.23 
                index => "app-log-%{[fields][logbiz]}-%{index_time}"
                # 是否嗅探集群ip:一般設(shè)置true;http://192.168.11.35:9200/_nodes/http?pretty
                # 通過(guò)嗅探機(jī)制進(jìn)行es集群負(fù)載均衡發(fā)日志消息
                sniffing => true
                # logstash默認(rèn)自帶一個(gè)mapping模板,進(jìn)行模板覆蓋
                template_overwrite => true
            } 
          }
          
          if "error-log" in [fields][logtopic]{
         elasticsearch {
                hosts => ["192.168.11.35:9200"]    
                user => "elastic"
                password => "123456"
                index => "error-log-%{[fields][logbiz]}-%{index_time}"
                sniffing => true
                template_overwrite => true
            } 
          }
          

        }

        啟動(dòng)logstash

        /usr/local/logstash-6.6.0/bin/logstash -f /usr/local/logstash-6.6.0/script/logstash-script.conf &

        等待啟動(dòng)成功,我們?cè)俅卧L問(wèn)192.168.11.31:8001/err

        可以看到控制臺(tái)開(kāi)始打印日志

        ElasticSearch與Kibana

        ES和Kibana的搭建之前沒(méi)寫(xiě)過(guò)博客,網(wǎng)上資料也比較多,大家可以自行搜索。

        搭建完成后,訪問(wèn)Kibana的管理頁(yè)面192.168.11.35:5601,選擇Management -> Kinaba - Index Patterns

        然后Create index pattern

        • index pattern 輸入 app-log-*
        • Time Filter field name 選擇 currentDateTime

        這樣我們就成功創(chuàng)建了索引。

        我們?cè)俅卧L問(wèn)192.168.11.31:8001/err,這個(gè)時(shí)候就可以看到我們已經(jīng)命中了一條log信息

        里面展示了日志的全量信息

        到這里,我們完整的日志收集及可視化就搭建完成了!

        原文鏈接:https://blog.csdn.net/lt326030434/article/details/107361190

        版權(quán)聲明:本文為CSDN博主「簡(jiǎn)單隨風(fēng)」的原創(chuàng)文章,遵循CC 4.0 BY-SA版權(quán)協(xié)議,轉(zhuǎn)載請(qǐng)附上原文出處鏈接及本聲明。






        關(guān)注Java技術(shù)??锤喔韶?/strong>



        獲取 Spring Boot 實(shí)戰(zhàn)筆記!
        瀏覽 33
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

          <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            国产色影院 | 啪啪网站免费看 | 日韩3级电影 | 女被c视频 | 午夜在线观看视频 | 女人被狂躁yy视频免费看 | 夫妇性派对交换hd | 激情综合五月天婷婷 | 草草草av| 亚洲欧美国产高清vA在线播放 |