2020-04-06 19:57:19 +08:00
|
|
|
|
package kafka
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
|
"fmt"
|
|
|
|
|
|
"math/rand"
|
|
|
|
|
|
"strings"
|
|
|
|
|
|
"sync"
|
|
|
|
|
|
|
2020-04-18 22:47:25 +08:00
|
|
|
|
"unitechdev/golib/tlog"
|
2020-04-06 19:57:19 +08:00
|
|
|
|
|
|
|
|
|
|
"github.com/Shopify/sarama"
|
|
|
|
|
|
cluster "github.com/bsm/sarama-cluster"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
type Config struct {
|
|
|
|
|
|
Brokers []string `toml:"brokers"`
|
|
|
|
|
|
Topic string `toml:"topic"`
|
|
|
|
|
|
Group string `toml:"group"`
|
|
|
|
|
|
Workers int `toml:"workers"`
|
|
|
|
|
|
Oldest bool `toml:"oldest"`
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
type Consumer struct {
|
|
|
|
|
|
test bool
|
|
|
|
|
|
consumer *cluster.Consumer
|
|
|
|
|
|
workers int
|
|
|
|
|
|
handler func([]byte)
|
|
|
|
|
|
stop chan bool
|
|
|
|
|
|
wg *sync.WaitGroup
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
|
TEST_PREFIX = "test_consumer_"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
func NewConsumer(cfg *Config, hdl func([]byte)) *Consumer {
|
|
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
|
csm *cluster.Consumer
|
|
|
|
|
|
err error
|
|
|
|
|
|
group string
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
config := cluster.NewConfig()
|
|
|
|
|
|
config.Consumer.Return.Errors = true
|
|
|
|
|
|
config.Group.Return.Notifications = true
|
|
|
|
|
|
|
|
|
|
|
|
if cfg.Oldest {
|
|
|
|
|
|
config.Consumer.Offsets.Initial = sarama.OffsetOldest
|
|
|
|
|
|
} else {
|
|
|
|
|
|
config.Consumer.Offsets.Initial = sarama.OffsetNewest
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
group = cfg.Group
|
|
|
|
|
|
if group == "" {
|
|
|
|
|
|
group = fmt.Sprintf("%s%.8x", TEST_PREFIX, rand.Int63())
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
tlog.Infof("NewConsumer() brokers=%#v, topic=%s, workers=%d, group=%s", cfg.Brokers, cfg.Topic, cfg.Workers, group)
|
|
|
|
|
|
|
|
|
|
|
|
if csm, err = cluster.NewConsumer(cfg.Brokers, group, []string{cfg.Topic}, config); err != nil {
|
|
|
|
|
|
tlog.Errorf("NewConsumer() create fail: %s", err)
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
tlog.Infof("NewConsumer() Created OK! %#v", csm)
|
|
|
|
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
|
|
for err := range csm.Errors() {
|
|
|
|
|
|
tlog.Errorf("KafkaConsumer<brokers=%#v, group=%s> Error: %s", cfg.Brokers, group, err.Error())
|
|
|
|
|
|
}
|
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
|
|
for ntf := range csm.Notifications() {
|
|
|
|
|
|
tlog.Infof("KafkaConsumer<brokers=%#v, group=%s> Notice: %#v", cfg.Brokers, group, ntf)
|
|
|
|
|
|
}
|
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
|
|
return &Consumer{
|
|
|
|
|
|
test: strings.HasPrefix(group, TEST_PREFIX),
|
|
|
|
|
|
consumer: csm,
|
|
|
|
|
|
workers: cfg.Workers,
|
|
|
|
|
|
handler: hdl,
|
|
|
|
|
|
stop: make(chan bool),
|
|
|
|
|
|
wg: &sync.WaitGroup{},
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (self *Consumer) Run() {
|
|
|
|
|
|
|
|
|
|
|
|
for i := 0; i < self.workers; i++ {
|
|
|
|
|
|
go self.Worker()
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (self *Consumer) Stop() {
|
|
|
|
|
|
close(self.stop)
|
|
|
|
|
|
self.wg.Wait()
|
|
|
|
|
|
self.consumer.Close()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (self *Consumer) Worker() {
|
|
|
|
|
|
self.wg.Add(1)
|
|
|
|
|
|
defer self.wg.Done()
|
|
|
|
|
|
for {
|
|
|
|
|
|
select {
|
|
|
|
|
|
case msg, ok := <-self.consumer.Messages():
|
|
|
|
|
|
if ok {
|
|
|
|
|
|
//tlog.Debugf("topic: %s, partition: %d, offset: %d, msg: %s", msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
|
|
|
|
|
|
self.handler(msg.Value)
|
|
|
|
|
|
|
|
|
|
|
|
// 测试用的group不标记处理完毕,防止破坏集群数据
|
|
|
|
|
|
if !self.test {
|
|
|
|
|
|
self.consumer.MarkOffset(msg, "done")
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
case <-self.stop:
|
|
|
|
|
|
tlog.Info("KafkaConsumer stop signal received!")
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|