1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        SpringBoot+Kafka+ELK 完成海量日志收集(超詳細)

        共 27460字,需瀏覽 55分鐘

         ·

        2021-08-23 16:00

        來源:jiandansuifeng.blog.csdn.net/

        article/details/107361190

        整體流程大概如下:

        服務(wù)器準備

        在這先列出各服務(wù)器節(jié)點,方便同學們在下文中對照節(jié)點查看相應(yīng)內(nèi)容

        SpringBoot項目準備

        引入log4j2替換SpringBoot默認log,demo項目結(jié)構(gòu)如下:

        pom

        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
                <!--  排除spring-boot-starter-logging -->
                <exclusions>
                    <exclusion>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-logging</artifactId>
                    </exclusion>
                </exclusions>
            </dependency> 
         <!-- log4j2 -->
         <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-log4j2</artifactId>
         </dependency> 
           <dependency>
             <groupId>com.lmax</groupId>
             <artifactId>disruptor</artifactId>
             <version>3.3.4</version>
           </dependency> 
        </dependencies> 

        log4j2.xml

        <?xml version="1.0" encoding="UTF-8"?>
        <Configuration status="INFO" schema="Log4J-V2.0.xsd" monitorInterval="600" >
            <Properties>
                <Property name="LOG_HOME">logs</Property>
                <property name="FILE_NAME">collector</property>
                <property name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n</property>
            </Properties>
            <Appenders>
                <Console name="CONSOLE" target="SYSTEM_OUT">
                    <PatternLayout pattern="${patternLayout}"/>
                </Console>  
                <RollingRandomAccessFile name="appAppender" fileName="${LOG_HOME}/app-${FILE_NAME}.log" filePattern="${LOG_HOME}/app-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" >
                  <PatternLayout pattern="${patternLayout}" />
                  <Policies>
                      <TimeBasedTriggeringPolicy interval="1"/>
                      <SizeBasedTriggeringPolicy size="500MB"/>
                  </Policies>
                  <DefaultRolloverStrategy max="20"/>         
                </RollingRandomAccessFile>
                <RollingRandomAccessFile name="errorAppender" fileName="${LOG_HOME}/error-${FILE_NAME}.log" filePattern="${LOG_HOME}/error-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" >
                  <PatternLayout pattern="${patternLayout}" />
                  <Filters>
                      <ThresholdFilter level="warn" onMatch="ACCEPT" onMismatch="DENY"/>
                  </Filters>              
                  <Policies>
                      <TimeBasedTriggeringPolicy interval="1"/>
                      <SizeBasedTriggeringPolicy size="500MB"/>
                  </Policies>
                  <DefaultRolloverStrategy max="20"/>         
                </RollingRandomAccessFile>            
            </Appenders>
            <Loggers>
                <!-- 業(yè)務(wù)相關(guān) 異步logger -->
                <AsyncLogger name="com.bfxy.*" level="info" includeLocation="true">
                  <AppenderRef ref="appAppender"/>
                </AsyncLogger>
                <AsyncLogger name="com.bfxy.*" level="info" includeLocation="true">
                  <AppenderRef ref="errorAppender"/>
                </AsyncLogger>       
                <Root level="info">
                    <Appender-Ref ref="CONSOLE"/>
                    <Appender-Ref ref="appAppender"/>
                    <AppenderRef ref="errorAppender"/>
                </Root>         
            </Loggers>
        </Configuration>

        IndexController

        測試Controller,用以打印日志進行調(diào)試

        @Slf4j
        @RestController
        public class IndexController {

         @RequestMapping(value = "/index")
         public String index() {
          InputMDC.putMDC();
          
          log.info("我是一條info日志");
          
          log.warn("我是一條warn日志");

          log.error("我是一條error日志");
          
          return "idx";
         }


         @RequestMapping(value = "/err")
         public String err() {
          InputMDC.putMDC();
          try {
           int a = 1/0;
          } catch (Exception e) {
           log.error("算術(shù)異常", e);
          }
          return "err";
         }
         
        }

        InputMDC

        用以獲取log中的[%X{hostName}][%X{ip}]、[%X{applicationName}]三個字段值

        @Component
        public class InputMDC implements EnvironmentAware {

         private static Environment environment;
         
         @Override
         public void setEnvironment(Environment environment) {
          InputMDC.environment = environment;
         }
         
         public static void putMDC() {
          MDC.put("hostName", NetUtil.getLocalHostName());
          MDC.put("ip", NetUtil.getLocalIp());
          MDC.put("applicationName", environment.getProperty("spring.application.name"));
         }

        }

        NetUtil

        public class NetUtil {   
         
         public static String normalizeAddress(String address){
          String[] blocks = address.split("[:]");
          if(blocks.length > 2){
           throw new IllegalArgumentException(address + " is invalid");
          }
          String host = blocks[0];
          int port = 80;
          if(blocks.length > 1){
           port = Integer.valueOf(blocks[1]);
          } else {
           address += ":"+port; //use default 80
          } 
          String serverAddr = String.format("%s:%d", host, port);
          return serverAddr;
         }
         
         public static String getLocalAddress(String address){
          String[] blocks = address.split("[:]");
          if(blocks.length != 2){
           throw new IllegalArgumentException(address + " is invalid address");
          } 
          String host = blocks[0];
          int port = Integer.valueOf(blocks[1]);
          
          if("0.0.0.0".equals(host)){
           return String.format("%s:%d",NetUtil.getLocalIp(), port);
          }
          return address;
         }
         
         private static int matchedIndex(String ip, String[] prefix){
          for(int i=0; i<prefix.length; i++){
           String p = prefix[i];
           if("*".equals(p)){ //*, assumed to be IP
            if(ip.startsWith("127.") ||
               ip.startsWith("10.") || 
               ip.startsWith("172.") ||
               ip.startsWith("192.")){
             continue;
            }
            return i;
           } else {
            if(ip.startsWith(p)){
             return i;
            }
           } 
          }
          
          return -1;
         }
         
         public static String getLocalIp(String ipPreference) {
          if(ipPreference == null){
           ipPreference = "*>10>172>192>127";
          }
          String[] prefix = ipPreference.split("[> ]+");
          try {
           Pattern pattern = Pattern.compile("[0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+");
           Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
           String matchedIp = null;
           int matchedIdx = -1;
           while (interfaces.hasMoreElements()) {
            NetworkInterface ni = interfaces.nextElement();
            Enumeration<InetAddress> en = ni.getInetAddresses(); 
            while (en.hasMoreElements()) {
             InetAddress addr = en.nextElement();
             String ip = addr.getHostAddress();  
             Matcher matcher = pattern.matcher(ip);
             if (matcher.matches()) {  
              int idx = matchedIndex(ip, prefix);
              if(idx == -1continue;
              if(matchedIdx == -1){
               matchedIdx = idx;
               matchedIp = ip;
              } else {
               if(matchedIdx>idx){
                matchedIdx = idx;
                matchedIp = ip;
               }
              }
             } 
            } 
           } 
           if(matchedIp != nullreturn matchedIp;
           return "127.0.0.1";
          } catch (Exception e) { 
           return "127.0.0.1";
          }
         }
         
         public static String getLocalIp() {
          return getLocalIp("*>10>172>192>127");
         }
         
         public static String remoteAddress(SocketChannel channel){
          SocketAddress addr = channel.socket().getRemoteSocketAddress();
          String res = String.format("%s", addr);
          return res;
         }
         
         public static String localAddress(SocketChannel channel){
          SocketAddress addr = channel.socket().getLocalSocketAddress();
          String res = String.format("%s", addr);
          return addr==null? res: res.substring(1);
         }
         
         public static String getPid(){
          RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
                String name = runtime.getName();
                int index = name.indexOf("@");
                if (index != -1) {
                    return name.substring(0, index);
                }
          return null;
         }
         
         public static String getLocalHostName() {
                try {
                    return (InetAddress.getLocalHost()).getHostName();
                } catch (UnknownHostException uhe) {
                    String host = uhe.getMessage();
                    if (host != null) {
                        int colon = host.indexOf(':');
                        if (colon > 0) {
                            return host.substring(0, colon);
                        }
                    }
                    return "UnknownHost";
                }
            }
        }

        啟動項目,訪問/index/ero接口,可以看到項目中生成了app-collector.logerror-collector.log兩個日志文件

        我們將Springboot服務(wù)部署在192.168.11.31這臺機器上。

        Kafka安裝和啟用

        kafka下載地址:

        http://kafka.apache.org/downloads.html

        kafka安裝步驟:首先kafka安裝需要依賴與zookeeper,所以小伙伴們先準備好zookeeper環(huán)境(三個節(jié)點即可),然后我們來一起構(gòu)建kafka broker。

        ## 解壓命令:
        tar -zxvf kafka_2.12-2.1.0.tgz -C /usr/local/
        ## 改名命令:
        mv kafka_2.12-2.1.0/ kafka_2.12
        ## 進入解壓后的目錄,修改server.properties文件:
        vim /usr/local/kafka_2.12/config/server.properties
        ## 修改配置:
        broker.id=0
        port=9092
        host.name=192.168.11.51
        advertised.host.name=192.168.11.51
        log.dirs=/usr/local/kafka_2.12/kafka-logs
        num.partitions=2
        zookeeper.connect=192.168.11.111:2181,192.168.11.112:2181,192.168.11.113:2181

        ## 建立日志文件夾:
        mkdir /usr/local/kafka_2.12/kafka-logs

        ##啟動kafka:
        /usr/local/kafka_2.12/bin/kafka-server-start.sh /usr/local/kafka_2.12/config/server.properties &

        創(chuàng)建兩個topic

        ## 創(chuàng)建topic
        kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic app-log-collector --partitions 1  --replication-factor 1
        kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic error-log-collector --partitions 1  --replication-factor 1 

        我們可以查看一下topic情況

        kafka-topics.sh --zookeeper 192.168.11.111:2181 --topic app-log-test --describe

        可以看到已經(jīng)成功啟用了app-log-collectorerror-log-collector兩個topic

        filebeat安裝和啟用

        filebeat下載

        cd /usr/local/software
        tar -zxvf filebeat-6.6.0-linux-x86_64.tar.gz -C /usr/local/
        cd /usr/local
        mv filebeat-6.6.0-linux-x86_64/ filebeat-6.6.0

        配置filebeat,可以參考下方y(tǒng)ml配置文件

        vim /usr/local/filebeat-5.6.2/filebeat.yml
        ###################### Filebeat Configuration Example #########################
        filebeat.prospectors:

        - input_type: log

          paths:
            ## app-服務(wù)名稱.log, 為什么寫死,防止發(fā)生輪轉(zhuǎn)抓取歷史數(shù)據(jù)
            - /usr/local/logs/app-collector.log
          #定義寫入 ES 時的 _type 值
          document_type: "app-log"
          multiline:
            #pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'   # 指定匹配的表達式(匹配以 2017-11-15 08:04:23:889 時間格式開頭的字符串)
            pattern: '^\['                              # 指定匹配的表達式(匹配以 "{ 開頭的字符串)
            negate: true                                # 是否匹配到
            match: after                                # 合并到上一行的末尾
            max_lines: 2000                             # 最大的行數(shù)
            timeout: 2s                                 # 如果在規(guī)定時間沒有新的日志事件就不等待后面的日志
          fields:
            logbiz: collector
            logtopic: app-log-collector   ## 按服務(wù)劃分用作kafka topic
            evn: dev

        - input_type: log

          paths:
            - /usr/local/logs/error-collector.log
          document_type: "error-log"
          multiline:
            #pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'   # 指定匹配的表達式(匹配以 2017-11-15 08:04:23:889 時間格式開頭的字符串)
            pattern: '^\['                              # 指定匹配的表達式(匹配以 "{ 開頭的字符串)
            negate: true                                # 是否匹配到
            match: after                                # 合并到上一行的末尾
            max_lines: 2000                             # 最大的行數(shù)
            timeout: 2s                                 # 如果在規(guī)定時間沒有新的日志事件就不等待后面的日志
          fields:
            logbiz: collector
            logtopic: error-log-collector   ## 按服務(wù)劃分用作kafka topic
            evn: dev
            
        output.kafka:
          enabled: true
          hosts: ["192.168.11.51:9092"]
          topic: '%{[fields.logtopic]}'
          partition.hash:
            reachable_only: true
          compression: gzip
          max_message_bytes: 1000000
          required_acks: 1
        logging.to_files: true

        filebeat啟動:

        檢查配置是否正確

        cd /usr/local/filebeat-6.6.0
        ./filebeat -c filebeat.yml -configtest
        ## Config OK

        啟動filebeat

        /usr/local/filebeat-6.6.0/filebeat &

        檢查是否啟動成功

        ps -ef | grep filebeat

        可以看到filebeat已經(jīng)啟動成功

        然后我們訪問192.168.11.31:8001/index和192.168.11.31:8001/err,再查看kafka的logs文件,可以看到已經(jīng)生成了app-log-collector-0和error-log-collector-0文件,說明filebeat已經(jīng)幫我們把數(shù)據(jù)收集好放到了kafka上。

        logstash安裝

        我們在logstash的安裝目錄下新建一個文件夾

        mkdir scrpit

        然后cd進該文件,創(chuàng)建一個logstash-script.conf文件

        cd scrpit
        vim logstash-script.conf
        ## multiline 插件也可以用于其他類似的堆棧式信息,比如 linux 的內(nèi)核日志。
        input {
          kafka {
            ## app-log-服務(wù)名稱
            topics_pattern => "app-log-.*"
            bootstrap_servers => "192.168.11.51:9092"
         codec => json
         consumer_threads => 1 ## 增加consumer的并行消費線程數(shù)
         decorate_events => true
            #auto_offset_rest => "latest"
         group_id => "app-log-group"
           }
           
           kafka {
            ## error-log-服務(wù)名稱
            topics_pattern => "error-log-.*"
            bootstrap_servers => "192.168.11.51:9092"
         codec => json
         consumer_threads => 1
         decorate_events => true
            #auto_offset_rest => "latest"
         group_id => "error-log-group"
           }
           
        }

        filter {
          
          ## 時區(qū)轉(zhuǎn)換
          ruby {
         code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
          }

          if "app-log" in [fields][logtopic]{
            grok {
                ## 表達式,這里對應(yīng)的是Springboot輸出的日志格式
                match => ["message""\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
            }
          }

          if "error-log" in [fields][logtopic]{
            grok {
                ## 表達式
                match => ["message""\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
            }
          }
          
        }

        ## 測試輸出到控制臺:
        output {
          stdout { codec => rubydebug }
        }


        ## elasticsearch:
        output {

          if "app-log" in [fields][logtopic]{
         ## es插件
         elasticsearch {
               # es服務(wù)地址
                hosts => ["192.168.11.35:9200"]
                # 用戶名密碼      
                user => "elastic"
                password => "123456"
                ## 索引名,+ 號開頭的,就會自動認為后面是時間格式:
                ## javalog-app-service-2019.01.23 
                index => "app-log-%{[fields][logbiz]}-%{index_time}"
                # 是否嗅探集群ip:一般設(shè)置true;http://192.168.11.35:9200/_nodes/http?pretty
                # 通過嗅探機制進行es集群負載均衡發(fā)日志消息
                sniffing => true
                # logstash默認自帶一個mapping模板,進行模板覆蓋
                template_overwrite => true
            } 
          }
          
          if "error-log" in [fields][logtopic]{
         elasticsearch {
                hosts => ["192.168.11.35:9200"]    
                user => "elastic"
                password => "123456"
                index => "error-log-%{[fields][logbiz]}-%{index_time}"
                sniffing => true
                template_overwrite => true
            } 
          }
          

        }

        啟動logstash

        /usr/local/logstash-6.6.0/bin/logstash -f /usr/local/logstash-6.6.0/script/logstash-script.conf &

        等待啟動成功,我們再次訪問192.168.11.31:8001/err

        可以看到控制臺開始打印日志

        ElasticSearch與Kibana

        ES和Kibana的搭建之前沒寫過博客,網(wǎng)上資料也比較多,大家可以自行搜索。

        搭建完成后,訪問Kibana的管理頁面192.168.11.35:5601,選擇Management -> Kinaba - Index Patterns

        然后Create index pattern

        • index pattern 輸入 app-log-*
        • Time Filter field name 選擇 currentDateTime

        這樣我們就成功創(chuàng)建了索引。

        我們再次訪問192.168.11.31:8001/err,這個時候就可以看到我們已經(jīng)命中了一條log信息

        里面展示了日志的全量信息

        到這里,我們完整的日志收集及可視化就搭建完成了!

        瀏覽 62
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            人人天天操 | 高圆圆一区二区三区 | 国产真实伦视频 | 成人精品午夜无码免费 | 国产精品扒开腿做爽爽爽A片唱戏 | 色老板成人永久免费视频 | 亚洲一区二区三区在线 | 久久成人影音 | 国产精品视频无码 | 色欲网|