Initial commit
This commit is contained in:
123
kafka/kafka.go
Normal file
123
kafka/kafka.go
Normal file
@@ -0,0 +1,123 @@
|
||||
package kafka
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"globalfintech/golib/tlog"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
cluster "github.com/bsm/sarama-cluster"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
Brokers []string `toml:"brokers"`
|
||||
Topic string `toml:"topic"`
|
||||
Group string `toml:"group"`
|
||||
Workers int `toml:"workers"`
|
||||
Oldest bool `toml:"oldest"`
|
||||
}
|
||||
|
||||
type Consumer struct {
|
||||
test bool
|
||||
consumer *cluster.Consumer
|
||||
workers int
|
||||
handler func([]byte)
|
||||
stop chan bool
|
||||
wg *sync.WaitGroup
|
||||
}
|
||||
|
||||
const (
|
||||
TEST_PREFIX = "test_consumer_"
|
||||
)
|
||||
|
||||
func NewConsumer(cfg *Config, hdl func([]byte)) *Consumer {
|
||||
|
||||
var (
|
||||
csm *cluster.Consumer
|
||||
err error
|
||||
group string
|
||||
)
|
||||
|
||||
config := cluster.NewConfig()
|
||||
config.Consumer.Return.Errors = true
|
||||
config.Group.Return.Notifications = true
|
||||
|
||||
if cfg.Oldest {
|
||||
config.Consumer.Offsets.Initial = sarama.OffsetOldest
|
||||
} else {
|
||||
config.Consumer.Offsets.Initial = sarama.OffsetNewest
|
||||
}
|
||||
|
||||
group = cfg.Group
|
||||
if group == "" {
|
||||
group = fmt.Sprintf("%s%.8x", TEST_PREFIX, rand.Int63())
|
||||
}
|
||||
|
||||
tlog.Infof("NewConsumer() brokers=%#v, topic=%s, workers=%d, group=%s", cfg.Brokers, cfg.Topic, cfg.Workers, group)
|
||||
|
||||
if csm, err = cluster.NewConsumer(cfg.Brokers, group, []string{cfg.Topic}, config); err != nil {
|
||||
tlog.Errorf("NewConsumer() create fail: %s", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
tlog.Infof("NewConsumer() Created OK! %#v", csm)
|
||||
|
||||
go func() {
|
||||
for err := range csm.Errors() {
|
||||
tlog.Errorf("KafkaConsumer<brokers=%#v, group=%s> Error: %s", cfg.Brokers, group, err.Error())
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
for ntf := range csm.Notifications() {
|
||||
tlog.Infof("KafkaConsumer<brokers=%#v, group=%s> Notice: %#v", cfg.Brokers, group, ntf)
|
||||
}
|
||||
}()
|
||||
|
||||
return &Consumer{
|
||||
test: strings.HasPrefix(group, TEST_PREFIX),
|
||||
consumer: csm,
|
||||
workers: cfg.Workers,
|
||||
handler: hdl,
|
||||
stop: make(chan bool),
|
||||
wg: &sync.WaitGroup{},
|
||||
}
|
||||
}
|
||||
|
||||
func (self *Consumer) Run() {
|
||||
|
||||
for i := 0; i < self.workers; i++ {
|
||||
go self.Worker()
|
||||
}
|
||||
}
|
||||
|
||||
func (self *Consumer) Stop() {
|
||||
close(self.stop)
|
||||
self.wg.Wait()
|
||||
self.consumer.Close()
|
||||
}
|
||||
|
||||
func (self *Consumer) Worker() {
|
||||
self.wg.Add(1)
|
||||
defer self.wg.Done()
|
||||
for {
|
||||
select {
|
||||
case msg, ok := <-self.consumer.Messages():
|
||||
if ok {
|
||||
//tlog.Debugf("topic: %s, partition: %d, offset: %d, msg: %s", msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
|
||||
self.handler(msg.Value)
|
||||
|
||||
// 测试用的group不标记处理完毕,防止破坏集群数据
|
||||
if !self.test {
|
||||
self.consumer.MarkOffset(msg, "done")
|
||||
}
|
||||
}
|
||||
case <-self.stop:
|
||||
tlog.Info("KafkaConsumer stop signal received!")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user