124 lines
2.5 KiB
Go
124 lines
2.5 KiB
Go
package kafka
|
||
|
||
import (
|
||
"fmt"
|
||
"math/rand"
|
||
"strings"
|
||
"sync"
|
||
|
||
"globalfintech/golib/tlog"
|
||
|
||
"github.com/Shopify/sarama"
|
||
cluster "github.com/bsm/sarama-cluster"
|
||
)
|
||
|
||
type Config struct {
|
||
Brokers []string `toml:"brokers"`
|
||
Topic string `toml:"topic"`
|
||
Group string `toml:"group"`
|
||
Workers int `toml:"workers"`
|
||
Oldest bool `toml:"oldest"`
|
||
}
|
||
|
||
type Consumer struct {
|
||
test bool
|
||
consumer *cluster.Consumer
|
||
workers int
|
||
handler func([]byte)
|
||
stop chan bool
|
||
wg *sync.WaitGroup
|
||
}
|
||
|
||
const (
|
||
TEST_PREFIX = "test_consumer_"
|
||
)
|
||
|
||
func NewConsumer(cfg *Config, hdl func([]byte)) *Consumer {
|
||
|
||
var (
|
||
csm *cluster.Consumer
|
||
err error
|
||
group string
|
||
)
|
||
|
||
config := cluster.NewConfig()
|
||
config.Consumer.Return.Errors = true
|
||
config.Group.Return.Notifications = true
|
||
|
||
if cfg.Oldest {
|
||
config.Consumer.Offsets.Initial = sarama.OffsetOldest
|
||
} else {
|
||
config.Consumer.Offsets.Initial = sarama.OffsetNewest
|
||
}
|
||
|
||
group = cfg.Group
|
||
if group == "" {
|
||
group = fmt.Sprintf("%s%.8x", TEST_PREFIX, rand.Int63())
|
||
}
|
||
|
||
tlog.Infof("NewConsumer() brokers=%#v, topic=%s, workers=%d, group=%s", cfg.Brokers, cfg.Topic, cfg.Workers, group)
|
||
|
||
if csm, err = cluster.NewConsumer(cfg.Brokers, group, []string{cfg.Topic}, config); err != nil {
|
||
tlog.Errorf("NewConsumer() create fail: %s", err)
|
||
return nil
|
||
}
|
||
|
||
tlog.Infof("NewConsumer() Created OK! %#v", csm)
|
||
|
||
go func() {
|
||
for err := range csm.Errors() {
|
||
tlog.Errorf("KafkaConsumer<brokers=%#v, group=%s> Error: %s", cfg.Brokers, group, err.Error())
|
||
}
|
||
}()
|
||
|
||
go func() {
|
||
for ntf := range csm.Notifications() {
|
||
tlog.Infof("KafkaConsumer<brokers=%#v, group=%s> Notice: %#v", cfg.Brokers, group, ntf)
|
||
}
|
||
}()
|
||
|
||
return &Consumer{
|
||
test: strings.HasPrefix(group, TEST_PREFIX),
|
||
consumer: csm,
|
||
workers: cfg.Workers,
|
||
handler: hdl,
|
||
stop: make(chan bool),
|
||
wg: &sync.WaitGroup{},
|
||
}
|
||
}
|
||
|
||
func (self *Consumer) Run() {
|
||
|
||
for i := 0; i < self.workers; i++ {
|
||
go self.Worker()
|
||
}
|
||
}
|
||
|
||
func (self *Consumer) Stop() {
|
||
close(self.stop)
|
||
self.wg.Wait()
|
||
self.consumer.Close()
|
||
}
|
||
|
||
func (self *Consumer) Worker() {
|
||
self.wg.Add(1)
|
||
defer self.wg.Done()
|
||
for {
|
||
select {
|
||
case msg, ok := <-self.consumer.Messages():
|
||
if ok {
|
||
//tlog.Debugf("topic: %s, partition: %d, offset: %d, msg: %s", msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
|
||
self.handler(msg.Value)
|
||
|
||
// 测试用的group不标记处理完毕,防止破坏集群数据
|
||
if !self.test {
|
||
self.consumer.MarkOffset(msg, "done")
|
||
}
|
||
}
|
||
case <-self.stop:
|
||
tlog.Info("KafkaConsumer stop signal received!")
|
||
return
|
||
}
|
||
}
|
||
}
|