前言
原设计优化方案 高并发工作池
架构设计channel源码调度源码 kafka
生产者源码 LogstashElasticsearchKibana压测 前言
优化了公司的一个日志系统,之前的人写的日志系统频繁出现各种问题,还会导致服务异常重启的情况。
原设计 之前的人设计很简单,每条日志直接开个goroutine,以http请求的方式写入logstash,logstash写入es也只有一个管道。
这样设计的问题会导致在高峰的时候会有大量的goroutine堆积,导致内存溢出,服务重启。
channel->kafka->logstash->elasticsearch->kibana
Channel:本地维护一个全局缓存队列,从队列出来之后再通过工作池分发,chan chan嵌套类型。Kafka: 消息队列中间件,异步处理,高吞吐量,削峰填谷。Logstash:作用就是一个数据收集器,将各种格式各种渠道的数据通过它收集解析之后格式化输出到 ElasticsearchElasticsearch:存储日志数据,并且提供索引提供给Kibana进行分析。Kibana:提供的比较友好的 Web 界面进行汇总、分析、搜索。 高并发工作池 架构设计 channel源码
package channelimport ("crazyfox-micro/tkpkg/kafka")type Worker struct {WorkerPool chan chan stringJobChannel chan stringquit chan bool}func NewWorker(workerPool chan chan string) Worker {return Worker{WorkerPool: workerPool,JobChannel: make(chan string), quit: make(chan bool),}}func (w *Worker) WorkStart() {go func() {for {w.WorkerPool <- w.JobChannelselect {case logStr := <-w.JobChannel://写入kafkakafka.SingleProducer.SendMessage(logStr)case <-w.quit:return}}}()}func (w *Worker) WorkStop() {go func() {w.quit <- true}()}
调度源码package channelimport ("github.com/panjf2000/ants/v2")const (defaultMaxWorker = 500 //最大工作数defaultMaxQueue = 1000 //全局队列缓存数)var JobQueue chan string //全局缓存type Dispatcher struct {maxWorkers int //最大工作数pool chan chan string //工作池goroutinePool *ants.Pool //goroutine协程池}type Options struct {maxWorkers intmaxQueue int}type Option func(options *Options)func getOption(opts ...Option) *Options {opt := &Options{maxWorkers: defaultMaxWorker,maxQueue: defaultMaxQueue,}for _, f := range opts {f(opt)}return opt}func MaxWorker(i int) func(o *Options) {return func(o *Options) {if i > 0 {o.maxWorkers = i}}}func MaxQueue(i int) func(o *Options) {return func(o *Options) {if i > 0 {o.maxQueue = i}}}func NewDispatcher(opt *Options) *Dispatcher {JobQueue = make(chan string, opt.maxQueue)pool := make(chan chan string, opt.maxWorkers)goroutinePool := InitGoroutinePool(opt.maxQueue)return &Dispatcher{pool: pool,maxWorkers: opt.maxWorkers,goroutinePool: goroutinePool,}}// InitGoroutinePool 初始化协程池func InitGoroutinePool(num int) *ants.Pool {var err errorpool, err := ants.NewPool(num)if err != nil {panic(err)}return pool}func (d *Dispatcher) Run() {// 定义n个workerfor i := 0; i < d.maxWorkers; i++ {work := NewWorker(d.pool)work.WorkStart()}go d.dispatch()}// dispatch 监听全局缓存队列func (d *Dispatcher) dispatch() {for {select {case logStr := <-JobQueue:tk := func() {jobChannel := <-d.pooljobChannel <- logStr}// 协程池调度d.goroutinePool.Submit(tk)}}}// InitPool 初始化对象池func InitPool(opts ...Option) {opt := getOption(opts...)// 初始化一个任务调度者,指定工作者数量dispatcher := NewDispatcher(opt)// 启动dispatcher.Run()}
调用方式
//初始化channel.InitPool(channel.MaxWorker(500), channel.MaxQueue(1000))...idleDelay := time.NewTimer(time.Second)defer idleDelay.Stop()select {//日志logStr写入到JobQueue case channel.JobQueue <- logStr://防止塞满阻塞,延迟1s后丢弃日志,不要用time.After(time.Second),这种方式会导致内存泄漏case <-idleDelay.C:fmt.Printf("JobQueue is full,message:%vn", logStr)}
kafkakafka集群搭建看我另一篇文章:Kafka集群搭建
生产者源码package kafkaimport ("fmt""strings""sync""time""crazyfox-micro/tkpkg/exp""github.com/Shopify/sarama")var (kfLock sync.MutexSingleProducer *Producer//SuccessesNum int)const Prefix = "elk-"type Producer struct {producer sarama.AsyncProducertopic string}// InitKafkaProducer 初始化,单例模式,一个服务一个topicfunc InitKafkaProducer(kafkaAddr []string, service string) *Producer {if SingleProducer != nil {return SingleProducer}//SuccessesNum = 0topic := Prefix + strings.ToLower(service)kfLock.Lock()defer kfLock.Unlock()SingleProducer = NewProducer(kafkaAddr, topic)return SingleProducer}// NewProducer 初始化并返回生产者对象func NewProducer(kafkaAddr []string, topic string) *Producer {config := sarama.NewConfig()config.Version = sarama.MaxVersionconfig.Producer.Partitioner = sarama.NewRoundRobinPartitioner //轮询分区config.Producer.RequiredAcks = sarama.WaitForLocal // ack=1config.Producer.Compression = sarama.CompressionSnappy // 数据压缩//config.Producer.Flush.Frequency = time.Duration(10) * time.Millisecondconfig.Producer.Return.Errors = true // 开启消息发送失败返回//config.Producer.Return.Successes = trueclient, err := sarama.NewClient(kafkaAddr, config)if err != nil {fmt.Printf("create kafka client fail,err:%vn", err.Error())}// 异步方式producer, err := sarama.NewAsyncProducerFromClient(client)if err != nil {fmt.Printf("create kafka producer fail,err:%vn", err.Error())}instance := &Producer{producer: producer,topic: topic,}// 启用消息返回监听go instance.Return()return instance}// SendMessage 异步发送日志消息func (p *Producer) SendMessage(value string) {exp.Try(func() {kValue := sarama.StringEncoder(value)timestamp := time.Now()message := &sarama.ProducerMessage{Topic: p.topic, Value: kValue, Timestamp: timestamp}p.producer.Input() <- message}, func(ex exp.Exception) {fmt.Printf("SendMessage is fail,err:%vn", ex)})}// 监听消息返回func (p *Producer) Return() {for {select {//case msg := <-p.producer.Successes()://SuccessesNum++//fmt.Printf("Successes,num:%v,msg:%vn", SuccessesNum, msg)case err := <-p.producer.Errors():fmt.Printf("send message to kafka is fail,err:%vn", err)}}}func (p *Producer) Close() {p.producer.AsyncClose()}
调用方式
// 初始化,sysConfig.Kafka=Kafka集群地址,service=topickafka.InitKafkaProducer(sysConfig.Kafka, service)...//写入kafkakafka.SingleProducer.SendMessage(logStr)
Logstash根据不同topic输出到不同的ES索引,建立3个管道来传输数据
# input插件需要监听Logstash进程所在节点的端口,请使用8000~9000范围内的端口。input { kafka { bootstrap_servers => '10.0.24.131:9092,10.0.24.132:9092,10.0.24.133:9092' # 设置分组 group_id => 'elk-test' # 多个客户端同时消费需要设置不同的client_id,注意同一分组的客户端数量≤kafka分区数量 client_id => 'elk-test0' # 正则匹配topic topics_pattern => "elk-.*" # 数据格式 codec => "json" #默认为false,只有为true的时候才会获取到元数据metadata decorate_events => true }}filter { mutate { #从kafka的key中获取数据并按照"-"切割 split => ["[@metadata][kafka][topic]", "-"] add_field => { #将切割后的第一位数据放入自定义的“index”字段中 "index" => "%{[@metadata][kafka][topic][1]}" } }}output { # 支持output中添加file_extend output配置,即可在管道部署完成后直接查看输出结果,进行结果验证与调试 # 请勿修改系统指定路径,注释或删除file_extend output部分配置,可关闭配置调试。详情见下方提示 #file_extend { # path => "/ssd/1/ls-cn-st221ygwu002/logstash/logs/debug/dev" #} elasticsearch { hosts => ["es-cn-zxxxxxxbm.elasticsearch.aliyuncs.com"] user => "elastic" password => "Super111$" #使用上面的index用作ES的索引,es的index必须小写 index => "systemlog-dev-%{index}-%{+YYYY.MM.dd}" }}
Elasticsearch设置最大分片数
curl -XPUT -H "Content-Type:application/json"http://localhost:9200/_cluster/settings -d'{ "persistent": { "cluster": { "max_shards_per_node": 10000 } } }'
设置主分片数和副分片数,主分片数需为ES集群的倍数,这样才能负载均衡
PUT _template/default{ "index_patterns" : ["*"], "order" : 1, "settings": { "number_of_shards": "12", "number_of_replicas": "1" }}
Kibana数据可视化创建流程
压测 压测工具是用的是:go-stress-testing
100W并发下的压测结果:
测试环境配置kafka3个节点,3个分区,logstash3个管道
100w并发,服务端到kafka: 1000000,logstash到eS:998568,丢失率0.14%