56 lines
1.2 KiB
Markdown
56 lines
1.2 KiB
Markdown
|
|
|
|||
|
|
# kafka cluster封装
|
|||
|
|
|
|||
|
|
> 目前只封装了consumer,单个consumer支持同时消费一个topic
|
|||
|
|
> 如果指定了group,且group名称开头不为'test_consumer_'则认为是正式订阅,会在处理完成后标记消息已经被消费
|
|||
|
|
> 否则会生成随机的group名称,用于测试
|
|||
|
|
|
|||
|
|
## 示例代码如下
|
|||
|
|
|
|||
|
|
```go
|
|||
|
|
package main
|
|||
|
|
|
|||
|
|
import (
|
|||
|
|
"fmt"
|
|||
|
|
"os"
|
|||
|
|
"os/signal"
|
|||
|
|
"syscall"
|
|||
|
|
|
|||
|
|
"golib/kafka"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
// 回调的消息处理函数,用户自己实现的消息处理逻辑
|
|||
|
|
func processor(msg []byte) {
|
|||
|
|
fmt.Println("recv msg:", string(msg))
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func main() {
|
|||
|
|
|
|||
|
|
// 配置,可以从文件加载
|
|||
|
|
cfg := kafka.Config{
|
|||
|
|
Brokers: []string{"test.kafka.o-kash.com:9092"},
|
|||
|
|
Topic: "charge_success",
|
|||
|
|
Workers: 5, // 并发的回调函数数量,不是partition的数量
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if consumer := kafka.NewConsumer(&cfg, processor); consumer != nil {
|
|||
|
|
consumer.Run()
|
|||
|
|
|
|||
|
|
// 监控退出信号通知consumer退出
|
|||
|
|
ch := make(chan os.Signal)
|
|||
|
|
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
|
|||
|
|
<-ch
|
|||
|
|
|
|||
|
|
consumer.Stop()
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## 产生测试用的消息
|
|||
|
|
|
|||
|
|
使用官方工具
|
|||
|
|
|
|||
|
|
```shell
|
|||
|
|
./kafka-console-producer.sh --broker-list test.kafka.o-kash.com:9092 --topic charge_success
|
|||
|
|
```
|