读猿码系列——4. 从filebeat和go-stash日志收集和处理go-stash篇
go-stash是一个高效的从Kafka获取,根据配置的规则进行处理,然后发送到ElasticSearch集群的工具。它属于go-zero生态的一个组件,是logstash 的 Go 语言替代版,它相比于原先的 logstash 节省了2/3的服务器资源。
项目地址:https://github.com/kevwan/go-stash
先从yaml配置中看整体系统设计(stash/etc/config.yaml)其中kafka作为数据输入端,ElasticSearch作为数据输出端,filter 抽象了数据处理过程。
-
Clusters:
-
- Input:
-
Kafka:
-
Name: go-stash
-
Log:
-
Mode: file
-
Brokers:
-
- "172.16.48.41:9092"
-
- "172.16.48.42:9092"
-
- "172.16.48.43:9092"
-
Topic: ngapplog
-
Group: stash
-
Conns: 3
-
Consumers: 10
-
Processors: 60
-
MinBytes: 1048576
-
MaxBytes: 10485760
-
Offset: first
-
Filters:
-
- Action: drop
-
Conditions:
-
- Key: status
-
Value: 503
-
Type: contains
-
- Key: type
-
Value: "app"
-
Type: match
-
Op: and
-
- Action: remove_field
-
Fields:
-
- message
-
- source
-
- beat
-
- fields
-
- input_type
-
- offset
-
- "@version"
-
- _score
-
- _type
-
- clientip
-
- http_host
-
- request_time
-
Output:
-
ElasticSearch:
-
Hosts:
-
- "http://172.16.188.73:9200"
-
- "http://172.16.188.74:9200"
-
- "http://172.16.188.75:9200"
-
Index: "go-stash-{{yyyy.MM.dd}}"
-
MaxChunkBytes: 5242880
-
GracePeriod: 10s
-
Compress: false
-
TimeZone: UTC
input:
Conn表示kafka的连接数,一般<=CPU核数;
Consumers表示每个连接数打开的线程数,Conns * Consumers不建议超过topic分片数;
Processors为处理数据的线程数量;
MinBytes和MaxBytes表示每次从kafka获取数据块的区间大小;
Offset参数可选last和false,默认为last,表示从头从kafka开始读取数据。
Filters:
- Action: drop为删除标识,表示在处理时将被移除,不进入es。Conditions下放删除条件,可以指定key字段及Value的值,Type字段可选contains(包含)或match(匹配),Op是附加条件可以写and或者or;
- Action: remove_field为移除字段标识,在Fields下列出要移除的字段;
- Action: transfer为转移字段标识:例如可以将message字段,重新定义为data字段。
Output:
ElasticSearch下的Index表示索引名称;
MaxChunkBytes为每次往ES提交的bulk大小;
GracePeriod默认为10s,在程序关闭后,在10s内用于处理余下的消费和数据,优雅退出;
Compress指数据压缩,压缩会减少传输的数据量,但会增加一定的处理性能,默认为false;
TimeZone默认值为UTC,世界标准时间。
我们从主函数入口开始了解整个数据流程,入口函数stash/stash.go:
-
func main() {
-
// 解析命令行参数,启动优雅退出
-
flag.Parse()
-
-
var c config.Config
-
conf.MustLoad(*configFile, &c)
-
proc.SetTimeToForceQuit(c.GracePeriod)
-
// service 组合模式
-
group := service.NewServiceGroup()
-
defer group.Stop()
-
-
for _, processor := range c.Clusters {
-
// 连接es
-
client, err := elastic.NewClient(
-
elastic.SetSniff(false),
-
elastic.SetURL(processor.Output.ElasticSearch.Hosts...),
-
elastic.SetBasicAuth(processor.Output.ElasticSearch.Username,processor.Output.ElasticSearch.Password),
-
)
-
logx.Must(err)
-
// filter processors 构建
-
filters := filter.CreateFilters(processor)
-
writer, err := es.NewWriter(processor.Output.ElasticSearch)
-
logx.Must(err)
-
-
var loc *time.Location
-
if len(processor.Output.ElasticSearch.TimeZone) > 0 {
-
loc, err = time.LoadLocation(processor.Output.ElasticSearch.TimeZone)
-
logx.Must(err)
-
} else {
-
loc = time.Local
-
}
-
// 准备es的写入操作 {写入的index, 写入器writer}
-
indexer := es.NewIndex(client, processor.Output.ElasticSearch.Index, loc)
-
handle := handler.NewHandler(writer, indexer)
-
handle.AddFilters(filters...)
-
handle.AddFilters(filter.AddUriFieldFilter("url", "uri"))
-
// 按照配置启动kafka,并将消费操作传入,同时加入组合器
-
for _, k := range toKqConf(processor.Input.Kafka) {
-
group.Add(kq.MustNewQueue(k, handle))
-
}
-
}
-
// 启动这个组合器
-
group.Start()
-
}
循环从配置的集群中取出每个processor来处理,首先建立es客户端连接,构建filter processor,其中filter.CreateFilters()方法如下:
-
func CreateFilters(p config.Cluster) []FilterFunc {
-
var filters []FilterFunc
-
-
for _, f := range p.Filters {
-
switch f.Action {
-
case filterDrop:
-
filters = append(filters, DropFilter(f.Conditions))
-
case filterRemoveFields:
-
filters = append(filters, RemoveFieldFilter(f.Fields))
-
case filterTransfer:
-
filters = append(filters, TransferFilter(f.Field, f.Target))
-
}
-
}
-
-
return filters
-
}
我们看到这里实现的方法对应了我们在yaml配置中约定好的Filters中的drop、remove_field、transfer字段下对应的约束,最终返回满足条件的filters过滤器列表。
es.NewWriter()创建写入器writer,用于es的写入操作,代码如下:
-
func NewWriter(c config.ElasticSearchConf) (*Writer, error) {
-
client, err := elastic.NewClient(
-
elastic.SetSniff(false),
-
elastic.SetURL(c.Hosts...),
-
elastic.SetGzip(c.Compress),
-
elastic.SetBasicAuth(c.Username,c.Password),
-
)
-
if err != nil {
-
return nil, err
-
}
-
-
writer := Writer{
-
docType: c.DocType,
-
client: client,
-
}
-
writer.inserter = executors.NewChunkExecutor(writer.execute, executors.WithChunkBytes(c.MaxChunkBytes))
-
return &writer, nil
-
}
es.NewIndex()创建写入的index,index结构体数据结构如下:
-
type Index struct {
-
client *elastic.Client
-
indexFormat IndexFormat
-
indices map[string]lang.PlaceholderType
-
lock sync.RWMutex
-
singleFlight syncx.SingleFlight
-
}
然后用writer、index、filter创建MessageHandler,结构体如下:
-
type MessageHandler struct {
-
writer *es.Writer
-
indexer *es.Index
-
filters []filter.FilterFunc
-
}
-
-
func NewHandler(writer *es.Writer, indexer *es.Index) *MessageHandler {
-
return &MessageHandler{
-
writer: writer,
-
indexer: indexer,
-
}
-
}
MessageHandler在结构上对接了下游es,负责数据处理到数据写入;对上接入kafka部分在接口设计上通过go-queue实现了ConsumeHandler接口,在消费过程中执行 handler 的操作,从而写入 es。
-
func (mh *MessageHandler) Consume(_, val string) error {
-
var m map[string]interface{}
-
// 反序列化从 kafka 中的消息
-
if err := jsoniter.Unmarshal([]byte(val), &m); err != nil {
-
return err
-
}
-
// es 写入index配置
-
index := mh.indexer.GetIndex(m)
-
// filter 链式处理(map进map出)
-
for _, proc := range mh.filters {
-
if m = proc(m); m == nil {
-
return nil
-
}
-
}
-
-
bs, err := jsoniter.Marshal(m)
-
if err != nil {
-
return err
-
}
-
// es 写入
-
return mh.writer.Write(index, string(bs))
-
}
按照配置启动kafka,并将消费操作传入,同时加入组合器,启动组合器group.Start():
-
for _, k := range toKqConf(processor.Input.Kafka) {
-
group.Add(kq.MustNewQueue(k, handle))
-
}
至此数据处理以及上下游的连接点已打通,开发者主动从kafka中拉数据拿到es中处理。加入 group 的 service 都是实现 Start()来启动,kafka启动逻辑如下。
即启动kafka消费程序——>从 kafka 拉取消息到 q.channel——>消费程序终止,收尾工作。
func (q *kafkaQueue) Start() {
q.startConsumers()
q.startProducers()
q.producerRoutines.Wait()
close(q.channel)
q.consumerRoutines.Wait()
}
q.startConsumers()
|- [q.consumeOne(key, value) for msg in q.channel]
|- q.handler.Consume(key, value)
至此整个流程已经串起来了,这里放一张官方数据流程图:
参考:
https://github.com/kevwan/go-stash
https://mp.weixin.qq.com/s/UeeSZi_-ZiiHf3P4tmyszw
欢迎关注我的个人公众号才浅coding攻略, 即时收到更新推送~
我是一个热爱技术干货的程序媛,从事游戏及微服务后端开发,分享Go、微服务、云原生及Python、网络及算法等相关内容。日拱一卒。欢迎各位催更扯淡一条龙!
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgjihhf
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13