Go实现海量日志收集系统(三)

内容预览:

    再次整理了一下这个日志收集系统的框,如下图

    这次要实现的代码的整体逻辑为:

    完整代码地址为: https://github.com/pythonsite/logagent

    etcd介绍

    高可用的分布式key-value存储,可以用于配置共享和服务发现

    类似的项目:zookeeper和consul

    开发语言:go

    接口:提供restful的接口,使用简单

    实现算法:基于raft算法的强一致性,高可用的服务存储目录

    etcd的应用场景:

    • 服务发现和服务注册
    • 配置中心(我们实现的日志收集客户端需要用到)
    • 分布式锁
    • master选举

    官网对etcd的有一个非常简明的介绍:

    etcd搭建:
    下载地址:https://github.com/coreos/etcd/releases/
    根据自己的环境下载对应的版本然后启动起来就可以了

    启动之后可以通过如下命令验证一下:

    [[email protected] etcd-v3.2.18-linux-amd64]# ./etcdctl set name zhaofan 
    

    zhaofan
    [[email protected] etcd
    -v3.2.18-linux-amd64]# ./etcdctl get name
    zhaofan
    [[email protected] etcd
    -v3.2.18-linux-amd64]#

    context 介绍和使用

    其实这个东西翻译过来就是上下文管理,那么context的作用是做什么,主要有如下两个作用:

    • 控制goroutine的超时
    • 保存上下文数据

    通过下面一个简单的例子进行理解:

     

    package main
    

    import (
    "fmt"
    "time"
    "net/http"
    "context"
    "io/ioutil"
    )


    type Result struct{
    r
    *http.Response
    err error
    }

    func process(){
    ctx,cancel :
    = context.WithTimeout(context.Background(),2*time.Second)
    defer cancel()
    tr :
    = &http.Transport{}
    client :
    = &http.Client
    c :
    = make(chan Result,1)
    req,err :
    = http.NewRequest("GET","http://www.google.com",nil)
    if err != nil{
    fmt.Println(
    "http request failed,err:",err)
    return
    }
    // 如果请求成功了会将数据存入到管道中
    go func(){
    resp,err :
    = client.Do(req)
    pack :
    = Result
    c
    <- pack
    }()

    select{
    case
    <- ctx.Done():
    tr.CancelRequest(req)
    fmt.Println(
    "timeout!")
    case res :
    = <-c:
    defer res.r.Body.Close()
    out,_:
    = ioutil.ReadAll(res.r.Body)
    fmt.Printf(
    "server response:%s",out)
    }
    return

    }

    func main() {
    process()
    }

    写一个通过context保存上下文,代码例子如:

    package main
    

    import (
    "github.com/Go-zh/net/context"
    "fmt"
    )

    func add(ctx context.Context,a,b
    int) int {
    traceId :
    = ctx.Value("trace_id").(string)
    fmt.Printf(
    "trace_id:%vn",traceId)
    return a
    +b
    }

    func calc(ctx context.Context,a, b
    int) int{
    traceId :
    = ctx.Value("trace_id").(string)
    fmt.Printf(
    "trace_id:%vn",traceId)
    //再将ctx传入到add中
    return add(ctx,a,b)
    }

    func main() {
    //将ctx传递到calc中
    ctx :
    = context.WithValue(context.Background(),"trace_id","123456")
    calc(ctx,
    20,30)

    }

    结合etcd和context使用

    关于通过go连接etcd的简单例子:(这里有个小问题需要注意就是etcd的启动方式,默认启动可能会连接不上,尤其你是在虚拟你安装,所以需要通过如下命令启动:
    ./etcd –listen-client-urls http://0.0.0.0:2371 –advertise-client-urls http://0.0.0.0:2371 –listen-peer-urls http://0.0.0.0:2381
    )

    package main
    

    import (
    etcd_client
    "github.com/coreos/etcd/clientv3"
    "time"
    "fmt"
    )

    func main() {
    cli, err :
    = etcd_client.New(etcd_client.Config{
    Endpoints:[]string{
    "192.168.0.118:2371"},
    DialTimeout:
    5*time.Second,
    })
    if err != nil{
    fmt.Println(
    "connect failed,err:",err)
    return
    }

    fmt.Println(
    "connect success")
    defer cli.Close()
    }

    下面一个例子是通过连接etcd,存值并取值

    package main
    

    import (
    "github.com/coreos/etcd/clientv3"
    "time"
    "fmt"
    "context"
    )

    func main() {
    cli,err :
    = clientv3.New(clientv3.Config{
    Endpoints:[]string{
    "192.168.0.118:2371"},
    DialTimeout:
    5*time.Second,
    })
    if err != nil{
    fmt.Println(
    "connect failed,err:",err)
    return
    }
    fmt.Println(
    "connect succ")
    defer cli.Close()
    ctx,cancel :
    = context.WithTimeout(context.Background(),time.Second)
    _,err
    = cli.Put(ctx,"logagent/conf/","sample_value")
    cancel()
    if err != nil{
    fmt.Println(
    "put failed,err",err)
    return
    }
    ctx, cancel
    = context.WithTimeout(context.Background(),time.Second)
    resp,err :
    = cli.Get(ctx,"logagent/conf/")
    cancel()
    if err != nil{
    fmt.Println(
    "get failed,err:",err)
    return
    }
    for _,ev :
    = range resp.Kvs{
    fmt.Printf(
    "%s:%sn",ev.Key,ev.Value)
    }
    }

    关于context官网也有一个例子非常有用,用于控制开启的goroutine的退出,代码如下:

     

    package main
    

    import (
    "context"
    "fmt"
    )

    func main() {
    // gen generates integers in a separate goroutine and
    // sends them to the returned channel.
    // The callers of gen need to cancel the context once
    // they are done consuming generated integers not to leak
    // the internal goroutine started by gen.
    gen :
    = func(ctx context.Context) <-chan int {
    dst :
    = make(chan int)
    n :
    = 1
    go func() {
    for {
    select {
    case <-ctx.Done():
    return
    // returning not to leak the goroutine
    case dst <- n:
    n
    ++
    }
    }
    }()
    return dst
    }

    ctx, cancel :
    = context.WithCancel(context.Background())
    defer cancel()
    // cancel when we are finished consuming integers

    for n :
    = range gen(ctx) {
    fmt.Println(n)
    if n == 5 {
    break
    }
    }
    }

    关于官网文档中的WithDeadline演示的代码例子:

    package main
    


    import (
    "context"
    "fmt"
    "time"
    )

    func main() {
    d :
    = time.Now().Add(50 * time.Millisecond)
    ctx, cancel :
    = context.WithDeadline(context.Background(), d)

    // Even though ctx will be expired, it is good practice to call its
    // cancelation function in any case. Failure to do so may keep the
    // context and its parent alive longer than necessary.
    defer cancel()

    select {
    case <-time.After(1 * time.Second):
    fmt.Println(
    "overslept")
    case <-ctx.Done():
    fmt.Println(ctx.Err())
    }

    }

    通过上面的代码有了一个基本的使用,那么如果我们通过etcd来做配置管理,如果配置更改之后,我们如何通知对应的服务器配置更改,通过下面例子演示:

    package main
    

    import (
    "github.com/coreos/etcd/clientv3"
    "time"
    "fmt"
    "context"
    )

    func main() {
    cli,err :
    = clientv3.New(clientv3.Config{
    Endpoints:[]string{
    "192.168.0.118:2371"},
    DialTimeout:
    5*time.Second,
    })
    if err != nil {
    fmt.Println(
    "connect failed,err:",err)
    return
    }
    defer cli.Close()
    // 这里会阻塞
    rch :
    = cli.Watch(context.Background(),"logagent/conf/")
    for wresp :
    = range rch{
    for _,ev :
    = range wresp.Events{
    fmt.Printf(
    "%s %q : %qn", ev.Type, ev.Kv.Key, ev.Kv.Value)
    }
    }
    }

    实现一个kafka的消费者代码的简单例子:

    package main
    

    import (
    "github.com/Shopify/sarama"
    "strings"
    "fmt"
    "time"
    )

    func main() {
    consumer,err :
    = sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil)
    if err != nil{
    fmt.Println(
    "failed to start consumer:",err)
    return
    }
    partitionList,err :
    = consumer.Partitions("nginx_log")
    if err != nil {
    fmt.Println(
    "Failed to get the list of partitions:",err)
    return
    }
    fmt.Println(partitionList)
    for partition :
    = range partitionList{
    pc,err :
    = consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest)
    if err != nil {
    fmt.Printf(
    "failed to start consumer for partition %d:%sn",partition,err)
    return
    }
    defer pc.AsyncClose()
    go func(partitionConsumer sarama.PartitionConsumer){
    for msg :
    = range pc.Messages(){
    fmt.Printf(
    "partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))
    }
    }(pc)
    }
    time.Sleep(time.Hour)
    consumer.Close()

    }

    但是上面的代码并不是最佳代码,因为我们最后是通过time.sleep等待goroutine的执行,我们可以更改为通过sync.WaitGroup方式实现

    package main
    

    import (
    "github.com/Shopify/sarama"
    "strings"
    "fmt"
    "sync"
    )

    var (
    wg sync.WaitGroup
    )

    func main() {
    consumer,err :
    = sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil)
    if err != nil{
    fmt.Println(
    "failed to start consumer:",err)
    return
    }
    partitionList,err :
    = consumer.Partitions("nginx_log")
    if err != nil {
    fmt.Println(
    "Failed to get the list of partitions:",err)
    return
    }
    fmt.Println(partitionList)
    for partition :
    = range partitionList{
    pc,err :
    = consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest)
    if err != nil {
    fmt.Printf(
    "failed to start consumer for partition %d:%sn",partition,err)
    return
    }
    defer pc.AsyncClose()
    go func(partitionConsumer sarama.PartitionConsumer){
    wg.Add(
    1)
    for msg :
    = range partitionConsumer.Messages(){
    fmt.Printf(
    "partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))
    }
    wg.Done()
    }(pc)
    }

    //time.Sleep(time.Hour)
    wg.Wait()
    consumer.Close()

    }

    将客户端需要收集的日志信息放到etcd中

    关于etcd处理的代码为:

    package main
    

    import (
    "github.com/coreos/etcd/clientv3"
    "time"
    "github.com/astaxie/beego/logs"
    "context"
    "fmt"
    )

    var Client
    *clientv3.Client
    var logConfChan chan string


    // 初始化etcd
    func initEtcd(addr []string,keyfmt string,timeout time.Duration)(err error){

    var keys []string
    for _,ip :
    = range ipArrays{
    //keyfmt = /logagent/%s/log_config
    keys
    = append(keys,fmt.Sprintf(keyfmt,ip))
    }

    logConfChan
    = make(chan string,10)
    logs.Debug(
    "etcd watch key:%v timeout:%v", keys, timeout)

    Client,err
    = clientv3.New(clientv3.Config{
    Endpoints:addr,
    DialTimeout: timeout,
    })
    if err != nil{
    logs.Error(
    "connect failed,err:%v",err)
    return
    }
    logs.Debug(
    "init etcd success")
    waitGroup.Add(
    1)
    for _, key :
    = range keys{
    ctx,cancel :
    = context.WithTimeout(context.Background(),2*time.Second)
    // 从etcd中获取要收集日志的信息
    resp,err :
    = Client.Get(ctx,key)
    cancel()
    if err != nil {
    logs.Warn(
    "get key %s failed,err:%v",key,err)
    continue
    }

    for _, ev :
    = range resp.Kvs{
    logs.Debug(
    "%q : %qn", ev.Key, ev.Value)
    logConfChan
    <- string(ev.Value)
    }
    }
    go WatchEtcd(keys)
    return
    }

    func WatchEtcd(keys []string){
    // 这里用于检测当需要收集的日志信息更改时及时更新
    var watchChans []clientv3.WatchChan
    for _,key :
    = range keys{
    rch :
    = Client.Watch(context.Background(),key)
    watchChans
    = append(watchChans,rch)
    }

    for {
    for _,watchC :
    = range watchChans{
    select{
    case wresp := <-watchC:
    for _,ev:
    = range wresp.Events{
    logs.Debug(
    "%s %q : %qn", ev.Type, ev.Kv.Key, ev.Kv.Value)
    logConfChan
    <- string(ev.Kv.Value)
    }
    default:

    }
    }
    time.Sleep(time.Second)
    }
    waitGroup.Done()
    }

    func GetLogConf()chan string{
    return logConfChan
    }

    同样的这里增加对了限速的处理,毕竟日志收集程序不能影响了当前业务的性能,所以增加了limit.go用于限制速度:

    package main
    

    import (
    "time"
    "sync/atomic"
    "github.com/astaxie/beego/logs"
    )

    type SecondLimit struct {
    unixSecond int64
    curCount int32
    limit int32
    }

    func NewSecondLimit(limit int32)
    *SecondLimit {
    secLimit :
    = &SecondLimit{
    unixSecond:time.Now().Unix(),
    curCount:
    0,
    limit:limit,
    }
    return secLimit
    }

    func (s
    *SecondLimit) Add(count int) {
    sec :
    = time.Now().Unix()
    if sec == s.unixSecond {
    atomic.AddInt32(
    &s.curCount,int32(count))
    return
    }
    atomic.StoreInt64(
    &s.unixSecond,sec)
    atomic.StoreInt32(
    &s.curCount, int32(count))
    }

    func (s
    *SecondLimit) Wait()bool {
    for {
    sec :
    = time.Now().Unix()
    if (sec == atomic.LoadInt64(&s.unixSecond)) && s.curCount == s.limit {
    time.Sleep(time.Microsecond)
    logs.Debug(
    "limit is running,limit:%d s.curCount:%d",s.limit,s.curCount)
    continue
    }

    if sec != atomic.LoadInt64(&s.unixSecond) {
    atomic.StoreInt64(
    &s.unixSecond,sec)
    atomic.StoreInt32(
    &s.curCount,0)
    }
    logs.Debug(
    "limit is exited")
    return
    false
    }
    }

    小结

    这次基本实现了日志收集的前半段的处理,后面将把日志扔到es中,并最终在页面上呈现

     

    以上就是:Go实现海量日志收集系统(三) 的全部内容。

    本站部分内容来源于互联网和用户投稿,如有侵权请联系我们删除,谢谢。
    Email:[email protected]


    0 条回复 A 作者 M 管理员
      所有的伟大,都源于一个勇敢的开始!
    欢迎您,新朋友,感谢参与互动!欢迎您 {{author}},您在本站有{{commentsCount}}条评论