kafka cluster封装
目前只封装了consumer,单个consumer支持同时消费一个topic 如果指定了group,且group名称开头不为'test_consumer_'则认为是正式订阅,会在处理完成后标记消息已经被消费 否则会生成随机的group名称,用于测试
示例代码如下
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"golib/kafka"
)
// 回调的消息处理函数,用户自己实现的消息处理逻辑
func processor(msg []byte) {
fmt.Println("recv msg:", string(msg))
}
func main() {
// 配置,可以从文件加载
cfg := kafka.Config{
Brokers: []string{"test.kafka.o-kash.com:9092"},
Topic: "charge_success",
Workers: 5, // 并发的回调函数数量,不是partition的数量
}
if consumer := kafka.NewConsumer(&cfg, processor); consumer != nil {
consumer.Run()
// 监控退出信号通知consumer退出
ch := make(chan os.Signal)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
<-ch
consumer.Stop()
}
}
产生测试用的消息
使用官方工具
./kafka-console-producer.sh --broker-list test.kafka.o-kash.com:9092 --topic charge_success