1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        Go實現(xiàn)海量日志收集系統(tǒng)

        共 11495字,需瀏覽 23分鐘

         ·

        2021-07-08 10:35

        再次整理了一下這個日志收集系統(tǒng)的框,如下圖

        這次要實現(xiàn)的代碼的整體邏輯為:


        完整代碼地址為: https://github.com/pythonsite/logagent

        etcd介紹

        高可用的分布式key-value存儲,可以用于配置共享和服務(wù)發(fā)現(xiàn)

        類似的項目:zookeeper和consul

        開發(fā)語言:go

        接口:提供restful的接口,使用簡單

        實現(xiàn)算法:基于raft算法的強一致性,高可用的服務(wù)存儲目錄

        etcd的應(yīng)用場景:

        • 服務(wù)發(fā)現(xiàn)和服務(wù)注冊

        • 配置中心(我們實現(xiàn)的日志收集客戶端需要用到)

        • 分布式鎖

        • master選舉

        官網(wǎng)對etcd的有一個非常簡明的介紹:

        etcd搭建:
        下載地址:https://github.com/coreos/etcd/releases/
        根據(jù)自己的環(huán)境下載對應(yīng)的版本然后啟動起來就可以了

        啟動之后可以通過如下命令驗證一下:

        [root@localhost etcd-v3.2.18-linux-amd64]# ./etcdctl set name zhaofan 

        zhaofan
        [root@localhost etcd-v3.2.18-linux-amd64]# ./etcdctl get name
        zhaofan
        [root@localhost etcd-v3.2.18-linux-amd64]#

        context 介紹和使用

        其實這個東西翻譯過來就是上下文管理,那么context的作用是做什么,主要有如下兩個作用:

        • 控制goroutine的超時

        • 保存上下文數(shù)據(jù)

        通過下面一個簡單的例子進行理解:

        package main

        import (
        "fmt"
        "time"
        "net/http"
        "context"
        "io/ioutil"
        )


        type Result struct{
        r *http.Response
        err error
        }

        func process(){
        ctx,cancel := context.WithTimeout(context.Background(),2*time.Second)
        defer cancel()
        tr := &http.Transport{}
        client := &http.Client{Transport:tr}
        c := make(chan Result,1)
        req,err := http.NewRequest("GET","http://www.google.com",nil)
        if err != nil{
        fmt.Println("http request failed,err:",err)
        return
        }
        // 如果請求成功了會將數(shù)據(jù)存入到管道中
        go func(){
        resp,err := client.Do(req)
        pack := Result{resp,err}
        c <- pack
        }()

        select{
        case <- ctx.Done():
        tr.CancelRequest(req)
        fmt.Println("timeout!")
        case res := <-c:
        defer res.r.Body.Close()
        out,_:= ioutil.ReadAll(res.r.Body)
        fmt.Printf("server response:%s",out)
        }
        return

        }

        func main() {
        process()
        }

        寫一個通過context保存上下文,代碼例子如:

        package main

        import (
        "github.com/Go-zh/net/context"
        "fmt"
        )

        func add(ctx context.Context,a,b int) int {
        traceId := ctx.Value("trace_id").(string)
        fmt.Printf("trace_id:%v\n",traceId)
        return a+b
        }

        func calc(ctx context.Context,a, b int) int{
        traceId := ctx.Value("trace_id").(string)
        fmt.Printf("trace_id:%v\n",traceId)
        //再將ctx傳入到add中
        return add(ctx,a,b)
        }

        func main() {
        //將ctx傳遞到calc中
        ctx := context.WithValue(context.Background(),"trace_id","123456")
        calc(ctx,20,30)

        }

        結(jié)合etcd和context使用

        關(guān)于通過go連接etcd的簡單例子:(這里有個小問題需要注意就是etcd的啟動方式,默認啟動可能會連接不上,尤其你是在虛擬你安裝,所以需要通過如下命令啟動:
        ./etcd --listen-client-urls http://0.0.0.0:2371 --advertise-client-urls http://0.0.0.0:2371 --listen-peer-urls http://0.0.0.0:2381
        )

        package main

        import (
        etcd_client "github.com/coreos/etcd/clientv3"
        "time"
        "fmt"
        )

        func main() {
        cli, err := etcd_client.New(etcd_client.Config{
        Endpoints:[]string{"192.168.0.118:2371"},
        DialTimeout:5*time.Second,
        })
        if err != nil{
        fmt.Println("connect failed,err:",err)
        return
        }

        fmt.Println("connect success")
        defer cli.Close()
        }

        下面一個例子是通過連接etcd,存值并取值

        package main

        import (
        "github.com/coreos/etcd/clientv3"
        "time"
        "fmt"
        "context"
        )

        func main() {
        cli,err := clientv3.New(clientv3.Config{
        Endpoints:[]string{"192.168.0.118:2371"},
        DialTimeout:5*time.Second,
        })
        if err != nil{
        fmt.Println("connect failed,err:",err)
        return
        }
        fmt.Println("connect succ")
        defer cli.Close()
        ctx,cancel := context.WithTimeout(context.Background(),time.Second)
        _,err = cli.Put(ctx,"logagent/conf/","sample_value")
        cancel()
        if err != nil{
        fmt.Println("put failed,err",err)
        return
        }
        ctx, cancel = context.WithTimeout(context.Background(),time.Second)
        resp,err := cli.Get(ctx,"logagent/conf/")
        cancel()
        if err != nil{
        fmt.Println("get failed,err:",err)
        return
        }
        for _,ev := range resp.Kvs{
        fmt.Printf("%s:%s\n",ev.Key,ev.Value)
        }
        }

        關(guān)于context官網(wǎng)也有一個例子非常有用,用于控制開啟的goroutine的退出,代碼如下:

        package main

        import (
        "context"
        "fmt"
        )

        func main() {
        // gen generates integers in a separate goroutine and
        // sends them to the returned channel.
        // The callers of gen need to cancel the context once
        // they are done consuming generated integers not to leak
        // the internal goroutine started by gen.
        gen := func(ctx context.Context) <-chan int {
        dst := make(chan int)
        n := 1
        go func() {
        for {
        select {
        case <-ctx.Done():
        return // returning not to leak the goroutine
        case dst <- n:
        n++
        }
        }
        }()
        return dst
        }

        ctx, cancel := context.WithCancel(context.Background())
        defer cancel() // cancel when we are finished consuming integers

        for n := range gen(ctx) {
        fmt.Println(n)
        if n == 5 {
        break
        }
        }
        }

        關(guān)于官網(wǎng)文檔中的WithDeadline演示的代碼例子:

        package main


        import (
        "context"
        "fmt"
        "time"
        )

        func main() {
        d := time.Now().Add(50 * time.Millisecond)
        ctx, cancel := context.WithDeadline(context.Background(), d)

        // Even though ctx will be expired, it is good practice to call its
        // cancelation function in any case. Failure to do so may keep the
        // context and its parent alive longer than necessary.
        defer cancel()

        select {
        case <-time.After(1 * time.Second):
        fmt.Println("overslept")
        case <-ctx.Done():
        fmt.Println(ctx.Err())
        }

        }

        通過上面的代碼有了一個基本的使用,那么如果我們通過etcd來做配置管理,如果配置更改之后,我們?nèi)绾瓮ㄖ獙?yīng)的服務(wù)器配置更改,通過下面例子演示:

        package main

        import (
        "github.com/coreos/etcd/clientv3"
        "time"
        "fmt"
        "context"
        )

        func main() {
        cli,err := clientv3.New(clientv3.Config{
        Endpoints:[]string{"192.168.0.118:2371"},
        DialTimeout:5*time.Second,
        })
        if err != nil {
        fmt.Println("connect failed,err:",err)
        return
        }
        defer cli.Close()
        // 這里會阻塞
        rch := cli.Watch(context.Background(),"logagent/conf/")
        for wresp := range rch{
        for _,ev := range wresp.Events{
        fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
        }
        }
        }

        實現(xiàn)一個kafka的消費者代碼的簡單例子:

        package main

        import (
        "github.com/Shopify/sarama"
        "strings"
        "fmt"
        "time"
        )

        func main() {
        consumer,err := sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil)
        if err != nil{
        fmt.Println("failed to start consumer:",err)
        return
        }
        partitionList,err := consumer.Partitions("nginx_log")
        if err != nil {
        fmt.Println("Failed to get the list of partitions:",err)
        return
        }
        fmt.Println(partitionList)
        for partition := range partitionList{
        pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest)
        if err != nil {
        fmt.Printf("failed to start consumer for partition %d:%s\n",partition,err)
        return
        }
        defer pc.AsyncClose()
        go func(partitionConsumer sarama.PartitionConsumer){
        for msg := range pc.Messages(){
        fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))
        }
        }(pc)
        }
        time.Sleep(time.Hour)
        consumer.Close()

        }

        但是上面的代碼并不是最佳代碼,因為我們最后是通過time.sleep等待goroutine的執(zhí)行,我們可以更改為通過sync.WaitGroup方式實現(xiàn)

        package main

        import (
        "github.com/Shopify/sarama"
        "strings"
        "fmt"
        "sync"
        )

        var (
        wg sync.WaitGroup
        )

        func main() {
        consumer,err := sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil)
        if err != nil{
        fmt.Println("failed to start consumer:",err)
        return
        }
        partitionList,err := consumer.Partitions("nginx_log")
        if err != nil {
        fmt.Println("Failed to get the list of partitions:",err)
        return
        }
        fmt.Println(partitionList)
        for partition := range partitionList{
        pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest)
        if err != nil {
        fmt.Printf("failed to start consumer for partition %d:%s\n",partition,err)
        return
        }
        defer pc.AsyncClose()
        go func(partitionConsumer sarama.PartitionConsumer){
        wg.Add(1)
        for msg := range partitionConsumer.Messages(){
        fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))
        }
        wg.Done()
        }(pc)
        }

        //time.Sleep(time.Hour)
        wg.Wait()
        consumer.Close()

        }

        將客戶端需要收集的日志信息放到etcd中

        關(guān)于etcd處理的代碼為:

        package main

        import (
        "github.com/coreos/etcd/clientv3"
        "time"
        "github.com/astaxie/beego/logs"
        "context"
        "fmt"
        )

        var Client *clientv3.Client
        var logConfChan chan string


        // 初始化etcd
        func initEtcd(addr []string,keyfmt string,timeout time.Duration)(err error){

        var keys []string
        for _,ip := range ipArrays{
        //keyfmt = /logagent/%s/log_config
        keys = append(keys,fmt.Sprintf(keyfmt,ip))
        }

        logConfChan = make(chan string,10)
        logs.Debug("etcd watch key:%v timeout:%v", keys, timeout)

        Client,err = clientv3.New(clientv3.Config{
        Endpoints:addr,
        DialTimeout: timeout,
        })
        if err != nil{
        logs.Error("connect failed,err:%v",err)
        return
        }
        logs.Debug("init etcd success")
        waitGroup.Add(1)
        for _, key := range keys{
        ctx,cancel := context.WithTimeout(context.Background(),2*time.Second)
        // 從etcd中獲取要收集日志的信息
        resp,err := Client.Get(ctx,key)
        cancel()
        if err != nil {
        logs.Warn("get key %s failed,err:%v",key,err)
        continue
        }

        for _, ev := range resp.Kvs{
        logs.Debug("%q : %q\n", ev.Key, ev.Value)
        logConfChan <- string(ev.Value)
        }
        }
        go WatchEtcd(keys)
        return
        }

        func WatchEtcd(keys []string){
        // 這里用于檢測當(dāng)需要收集的日志信息更改時及時更新
        var watchChans []clientv3.WatchChan
        for _,key := range keys{
        rch := Client.Watch(context.Background(),key)
        watchChans = append(watchChans,rch)
        }

        for {
        for _,watchC := range watchChans{
        select{
        case wresp := <-watchC:
        for _,ev:= range wresp.Events{
        logs.Debug("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
        logConfChan <- string(ev.Kv.Value)
        }
        default:

        }
        }
        time.Sleep(time.Second)
        }
        waitGroup.Done()
        }

        func GetLogConf()chan string{
        return logConfChan
        }

        同樣的這里增加對了限速的處理,畢竟日志收集程序不能影響了當(dāng)前業(yè)務(wù)的性能,所以增加了limit.go用于限制速度:

        package main

        import (
        "time"
        "sync/atomic"
        "github.com/astaxie/beego/logs"
        )

        type SecondLimit struct {
        unixSecond int64
        curCount int32
        limit int32
        }

        func NewSecondLimit(limit int32) *SecondLimit {
        secLimit := &SecondLimit{
        unixSecond:time.Now().Unix(),
        curCount:0,
        limit:limit,
        }
        return secLimit
        }

        func (s *SecondLimit) Add(count int) {
        sec := time.Now().Unix()
        if sec == s.unixSecond {
        atomic.AddInt32(&s.curCount,int32(count))
        return
        }
        atomic.StoreInt64(&s.unixSecond,sec)
        atomic.StoreInt32(&s.curCount, int32(count))
        }

        func (s *SecondLimit) Wait()bool {
        for {
        sec := time.Now().Unix()
        if (sec == atomic.LoadInt64(&s.unixSecond)) && s.curCount == s.limit {
        time.Sleep(time.Microsecond)
        logs.Debug("limit is running,limit:%d s.curCount:%d",s.limit,s.curCount)
        continue
        }

        if sec != atomic.LoadInt64(&s.unixSecond) {
        atomic.StoreInt64(&s.unixSecond,sec)
        atomic.StoreInt32(&s.curCount,0)
        }
        logs.Debug("limit is exited")
        return false
        }
        }

        小結(jié)

        這次基本實現(xiàn)了日志收集的前半段的處理,后面將把日志扔到es中,并最終在頁面上呈現(xiàn)

        來源:

        https://www.toutiao.com/a6916833750924018179/

        文章轉(zhuǎn)載:IT大咖說 
        (版權(quán)歸原作者所有,侵刪)


        點擊下方“閱讀原文”查看更多

        瀏覽 41
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            色翁荡熄月月小说 | 美女自淫网站 | 女人被舔荫蒂舒服极了的视频 | 巨胸喷奶水www免费看网站软件 | 小黄片在线免费观看 | 殴美操逼视频;欧美操逼视频 | 日本巨乳女优排名 | 插骚逼99AV | 国产精品播放在线 | 娇妻被别人调教成绿奴电影 |