Files
unitech-golib/kafka/kafka.go

124 lines
2.6 KiB
Go
Raw Permalink Normal View History

2020-04-06 19:57:19 +08:00
package kafka
import (
"fmt"
"math/rand"
"strings"
"sync"
2026-01-30 16:49:29 +08:00
"code.infininov.com/Infini/unitech-golib/tlog"
2020-04-06 19:57:19 +08:00
"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
)
type Config struct {
Brokers []string `toml:"brokers"`
Topic string `toml:"topic"`
Group string `toml:"group"`
Workers int `toml:"workers"`
Oldest bool `toml:"oldest"`
}
type Consumer struct {
test bool
consumer *cluster.Consumer
workers int
handler func([]byte)
stop chan bool
wg *sync.WaitGroup
}
const (
TEST_PREFIX = "test_consumer_"
)
func NewConsumer(cfg *Config, hdl func([]byte)) *Consumer {
var (
csm *cluster.Consumer
err error
group string
)
config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true
if cfg.Oldest {
config.Consumer.Offsets.Initial = sarama.OffsetOldest
} else {
config.Consumer.Offsets.Initial = sarama.OffsetNewest
}
group = cfg.Group
if group == "" {
group = fmt.Sprintf("%s%.8x", TEST_PREFIX, rand.Int63())
}
tlog.Infof("NewConsumer() brokers=%#v, topic=%s, workers=%d, group=%s", cfg.Brokers, cfg.Topic, cfg.Workers, group)
if csm, err = cluster.NewConsumer(cfg.Brokers, group, []string{cfg.Topic}, config); err != nil {
tlog.Errorf("NewConsumer() create fail: %s", err)
return nil
}
tlog.Infof("NewConsumer() Created OK! %#v", csm)
go func() {
for err := range csm.Errors() {
tlog.Errorf("KafkaConsumer<brokers=%#v, group=%s> Error: %s", cfg.Brokers, group, err.Error())
}
}()
go func() {
for ntf := range csm.Notifications() {
tlog.Infof("KafkaConsumer<brokers=%#v, group=%s> Notice: %#v", cfg.Brokers, group, ntf)
}
}()
return &Consumer{
test: strings.HasPrefix(group, TEST_PREFIX),
consumer: csm,
workers: cfg.Workers,
handler: hdl,
stop: make(chan bool),
wg: &sync.WaitGroup{},
}
}
func (self *Consumer) Run() {
for i := 0; i < self.workers; i++ {
go self.Worker()
}
}
func (self *Consumer) Stop() {
close(self.stop)
self.wg.Wait()
self.consumer.Close()
}
func (self *Consumer) Worker() {
self.wg.Add(1)
defer self.wg.Done()
for {
select {
case msg, ok := <-self.consumer.Messages():
if ok {
//tlog.Debugf("topic: %s, partition: %d, offset: %d, msg: %s", msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
self.handler(msg.Value)
// 测试用的group不标记处理完毕防止破坏集群数据
if !self.test {
self.consumer.MarkOffset(msg, "done")
}
}
case <-self.stop:
tlog.Info("KafkaConsumer stop signal received!")
return
}
}
}