• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

读猿码系列——4. 从filebeat和go-stash日志收集和处理go-stash篇

武飞扬头像
Snippers
帮助1

go-stash是一个高效的从Kafka获取,根据配置的规则进行处理,然后发送到ElasticSearch集群的工具。它属于go-zero生态的一个组件,是logstash 的 Go 语言替代版,它相比于原先的 logstash 节省了2/3的服务器资源。

项目地址:https://github.com/kevwan/go-stash

先从yaml配置中看整体系统设计(stash/etc/config.yaml)其中kafka作为数据输入端,ElasticSearch作为数据输出端,filter 抽象了数据处理过程。

  1.  
    Clusters:
  2.  
    Input:
  3.  
        Kafka:
  4.  
          Name: go-stash
  5.  
          Log:
  6.  
            Modefile
  7.  
          Brokers:
  8.  
          - "172.16.48.41:9092"
  9.  
          - "172.16.48.42:9092"
  10.  
          - "172.16.48.43:9092"
  11.  
          Topic: ngapplog
  12.  
          Group: stash
  13.  
          Conns: 3
  14.  
          Consumers: 10
  15.  
          Processors: 60
  16.  
          MinBytes: 1048576
  17.  
          MaxBytes: 10485760
  18.  
          Offset: first
  19.  
      Filters:
  20.  
      - Action: drop
  21.  
        Conditions:
  22.  
          - Keystatus
  23.  
            Value503
  24.  
            Typecontains
  25.  
          - Keytype
  26.  
            Value"app"
  27.  
            Type: match
  28.  
            Op: and
  29.  
      - Action: remove_field
  30.  
        Fields:
  31.  
        - message
  32.  
        - source
  33.  
        - beat
  34.  
        - fields
  35.  
        - input_type
  36.  
        - offset
  37.  
        - "@version"
  38.  
        - _score
  39.  
        - _type
  40.  
        - clientip
  41.  
        - http_host
  42.  
        - request_time
  43.  
      Output:
  44.  
        ElasticSearch:
  45.  
          Hosts:
  46.  
          - "http://172.16.188.73:9200"
  47.  
          - "http://172.16.188.74:9200"
  48.  
          - "http://172.16.188.75:9200"
  49.  
          Index"go-stash-{{yyyy.MM.dd}}"
  50.  
          MaxChunkBytes: 5242880
  51.  
          GracePeriod: 10s
  52.  
          Compress: false
  53.  
          TimeZone: UTC
学新通

input:

Conn表示kafka的连接数,一般<=CPU核数;

Consumers表示每个连接数打开的线程数,Conns * Consumers不建议超过topic分片数;

Processors为处理数据的线程数量;

MinBytes和MaxBytes表示每次从kafka获取数据块的区间大小;

Offset参数可选last和false,默认为last,表示从头从kafka开始读取数据。

Filters:

- Action: drop为删除标识,表示在处理时将被移除,不进入es。Conditions下放删除条件,可以指定key字段及Value的值,Type字段可选contains(包含)或match(匹配),Op是附加条件可以写and或者or;

- Action: remove_field为移除字段标识,在Fields下列出要移除的字段;

- Action: transfer为转移字段标识:例如可以将message字段,重新定义为data字段。

Output:

ElasticSearch下的Index表示索引名称;

MaxChunkBytes为每次往ES提交的bulk大小;

GracePeriod默认为10s,在程序关闭后,在10s内用于处理余下的消费和数据,优雅退出;

Compress指数据压缩,压缩会减少传输的数据量,但会增加一定的处理性能,默认为false;

TimeZone默认值为UTC,世界标准时间。

我们从主函数入口开始了解整个数据流程,入口函数stash/stash.go:

  1.  
    func main() {
  2.  
        // 解析命令行参数,启动优雅退出
  3.  
        flag.Parse()
  4.  
     
  5.  
        var c config.Config
  6.  
        conf.MustLoad(*configFile, &c)
  7.  
        proc.SetTimeToForceQuit(c.GracePeriod)
  8.  
        // service 组合模式
  9.  
        group := service.NewServiceGroup()
  10.  
        defer group.Stop()
  11.  
     
  12.  
        for _, processor := range c.Clusters {
  13.  
            // 连接es
  14.  
            client, err := elastic.NewClient(
  15.  
                elastic.SetSniff(false),
  16.  
                elastic.SetURL(processor.Output.ElasticSearch.Hosts...),
  17.  
                elastic.SetBasicAuth(processor.Output.ElasticSearch.Username,processor.Output.ElasticSearch.Password),
  18.  
            )
  19.  
            logx.Must(err)
  20.  
            // filter processors 构建
  21.  
            filters := filter.CreateFilters(processor)
  22.  
            writer, err := es.NewWriter(processor.Output.ElasticSearch)
  23.  
            logx.Must(err)
  24.  
     
  25.  
            var loc *time.Location
  26.  
            if len(processor.Output.ElasticSearch.TimeZone) > 0 {
  27.  
                loc, err = time.LoadLocation(processor.Output.ElasticSearch.TimeZone)
  28.  
                logx.Must(err)
  29.  
            } else {
  30.  
                loc = time.Local
  31.  
            }
  32.  
            // 准备es的写入操作 {写入的index, 写入器writer}
  33.  
            indexer := es.NewIndex(client, processor.Output.ElasticSearch.Index, loc)
  34.  
            handle := handler.NewHandler(writer, indexer)
  35.  
            handle.AddFilters(filters...)
  36.  
            handle.AddFilters(filter.AddUriFieldFilter("url""uri"))
  37.  
            // 按照配置启动kafka,并将消费操作传入,同时加入组合器
  38.  
            for _, k := range toKqConf(processor.Input.Kafka) {
  39.  
                group.Add(kq.MustNewQueue(k, handle))
  40.  
            }
  41.  
        }
  42.  
        // 启动这个组合器
  43.  
        group.Start()
  44.  
    }
学新通

循环从配置的集群中取出每个processor来处理,首先建立es客户端连接,构建filter processor,其中filter.CreateFilters()方法如下:

  1.  
    func CreateFilters(p config.Cluster) []FilterFunc {
  2.  
        var filters []FilterFunc
  3.  
     
  4.  
        for _, f := range p.Filters {
  5.  
            switch f.Action {
  6.  
            case filterDrop:
  7.  
                filters = append(filters, DropFilter(f.Conditions))
  8.  
            case filterRemoveFields:
  9.  
                filters = append(filters, RemoveFieldFilter(f.Fields))
  10.  
            case filterTransfer:
  11.  
                filters = append(filters, TransferFilter(f.Field, f.Target))
  12.  
            }
  13.  
        }
  14.  
     
  15.  
        return filters
  16.  
    }
学新通

我们看到这里实现的方法对应了我们在yaml配置中约定好的Filters中的drop、remove_field、transfer字段下对应的约束,最终返回满足条件的filters过滤器列表。

es.NewWriter()创建写入器writer,用于es的写入操作,代码如下:

  1.  
    func NewWriter(c config.ElasticSearchConf) (*Writer, error) {
  2.  
        client, err := elastic.NewClient(
  3.  
            elastic.SetSniff(false),
  4.  
            elastic.SetURL(c.Hosts...),
  5.  
            elastic.SetGzip(c.Compress),
  6.  
            elastic.SetBasicAuth(c.Username,c.Password),
  7.  
        )
  8.  
        if err != nil {
  9.  
            return nil, err
  10.  
        }
  11.  
     
  12.  
        writer := Writer{
  13.  
            docType: c.DocType,
  14.  
            client:  client,
  15.  
        }
  16.  
        writer.inserter = executors.NewChunkExecutor(writer.execute, executors.WithChunkBytes(c.MaxChunkBytes))
  17.  
        return &writer, nil
  18.  
    }
学新通

es.NewIndex()创建写入的index,index结构体数据结构如下:

  1.  
    type Index struct {
  2.  
        client       *elastic.Client
  3.  
        indexFormat  IndexFormat
  4.  
        indices      map[string]lang.PlaceholderType
  5.  
        lock         sync.RWMutex
  6.  
        singleFlight syncx.SingleFlight
  7.  
    }

然后用writer、index、filter创建MessageHandler,结构体如下:

  1.  
    type MessageHandler struct {
  2.  
        writer  *es.Writer
  3.  
        indexer *es.Index
  4.  
        filters []filter.FilterFunc
  5.  
    }
  6.  
     
  7.  
    func NewHandler(writer *es.Writer, indexer *es.Index*MessageHandler {
  8.  
        return &MessageHandler{
  9.  
            writer:  writer,
  10.  
            indexer: indexer,
  11.  
        }
  12.  
    }

MessageHandler在结构上对接了下游es,负责数据处理到数据写入;对上接入kafka部分在接口设计上通过go-queue实现了ConsumeHandler接口,在消费过程中执行 handler 的操作,从而写入 es。

  1.  
    func (mh *MessageHandler) Consume(_, val stringerror {
  2.  
        var m map[string]interface{}
  3.  
        // 反序列化从 kafka 中的消息
  4.  
        if err := jsoniter.Unmarshal([]byte(val), &m); err != nil {
  5.  
            return err
  6.  
        }
  7.  
        // es 写入index配置
  8.  
        index := mh.indexer.GetIndex(m)
  9.  
        // filter 链式处理(map进map出)
  10.  
        for _, proc := range mh.filters {
  11.  
            if m = proc(m); m == nil {
  12.  
                return nil
  13.  
            }
  14.  
        }
  15.  
     
  16.  
        bs, err := jsoniter.Marshal(m)
  17.  
        if err != nil {
  18.  
            return err
  19.  
        }
  20.  
        // es 写入
  21.  
        return mh.writer.Write(indexstring(bs))
  22.  
    }
学新通

按照配置启动kafka,并将消费操作传入,同时加入组合器,启动组合器group.Start():

  1.  
    for _, k := range toKqConf(processor.Input.Kafka) {
  2.  
        group.Add(kq.MustNewQueue(k, handle))
  3.  
    }

学新通

至此数据处理以及上下游的连接点已打通,开发者主动从kafka中拉数据拿到es中处理。加入 group 的 service 都是实现 Start()来启动,kafka启动逻辑如下。

即启动kafka消费程序——>从 kafka 拉取消息到 q.channel——>消费程序终止,收尾工作。

func (q *kafkaQueue) Start() {
    q.startConsumers()
    q.startProducers()

    q.producerRoutines.Wait()
    close(q.channel)
    q.consumerRoutines.Wait()
}

q.startConsumers()
  |- [q.consumeOne(key, value) for msg in q.channel]
    |- q.handler.Consume(key, value)

至此整个流程已经串起来了,这里放一张官方数据流程图:

学新通

学新通

参考:

https://github.com/kevwan/go-stash

https://mp.weixin.qq.com/s/UeeSZi_-ZiiHf3P4tmyszw

欢迎关注我的个人公众号才浅coding攻略, 即时收到更新推送~

我是一个热爱技术干货的程序媛,从事游戏及微服务后端开发,分享Go、微服务、云原生及Python、网络及算法等相关内容。日拱一卒。欢迎各位催更扯淡一条龙!

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgjihhf
系列文章
更多 icon
同类精品
更多 icon
继续加载