Files

56 lines
1.2 KiB
Markdown
Raw Permalink Normal View History

2020-04-06 19:57:19 +08:00
# kafka cluster封装
> 目前只封装了consumer单个consumer支持同时消费一个topic
> 如果指定了group且group名称开头不为'test_consumer_'则认为是正式订阅,会在处理完成后标记消息已经被消费
> 否则会生成随机的group名称用于测试
## 示例代码如下
```go
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"golib/kafka"
)
// 回调的消息处理函数,用户自己实现的消息处理逻辑
func processor(msg []byte) {
fmt.Println("recv msg:", string(msg))
}
func main() {
// 配置,可以从文件加载
cfg := kafka.Config{
Brokers: []string{"test.kafka.o-kash.com:9092"},
Topic: "charge_success",
Workers: 5, // 并发的回调函数数量不是partition的数量
}
if consumer := kafka.NewConsumer(&cfg, processor); consumer != nil {
consumer.Run()
// 监控退出信号通知consumer退出
ch := make(chan os.Signal)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
<-ch
consumer.Stop()
}
}
```
## 产生测试用的消息
使用官方工具
```shell
./kafka-console-producer.sh --broker-list test.kafka.o-kash.com:9092 --topic charge_success
```