package kafka import ( "fmt" "math/rand" "strings" "sync" "code.infininov.com/Infini/unitech-golib/tlog" "github.com/Shopify/sarama" cluster "github.com/bsm/sarama-cluster" ) type Config struct { Brokers []string `toml:"brokers"` Topic string `toml:"topic"` Group string `toml:"group"` Workers int `toml:"workers"` Oldest bool `toml:"oldest"` } type Consumer struct { test bool consumer *cluster.Consumer workers int handler func([]byte) stop chan bool wg *sync.WaitGroup } const ( TEST_PREFIX = "test_consumer_" ) func NewConsumer(cfg *Config, hdl func([]byte)) *Consumer { var ( csm *cluster.Consumer err error group string ) config := cluster.NewConfig() config.Consumer.Return.Errors = true config.Group.Return.Notifications = true if cfg.Oldest { config.Consumer.Offsets.Initial = sarama.OffsetOldest } else { config.Consumer.Offsets.Initial = sarama.OffsetNewest } group = cfg.Group if group == "" { group = fmt.Sprintf("%s%.8x", TEST_PREFIX, rand.Int63()) } tlog.Infof("NewConsumer() brokers=%#v, topic=%s, workers=%d, group=%s", cfg.Brokers, cfg.Topic, cfg.Workers, group) if csm, err = cluster.NewConsumer(cfg.Brokers, group, []string{cfg.Topic}, config); err != nil { tlog.Errorf("NewConsumer() create fail: %s", err) return nil } tlog.Infof("NewConsumer() Created OK! %#v", csm) go func() { for err := range csm.Errors() { tlog.Errorf("KafkaConsumer Error: %s", cfg.Brokers, group, err.Error()) } }() go func() { for ntf := range csm.Notifications() { tlog.Infof("KafkaConsumer Notice: %#v", cfg.Brokers, group, ntf) } }() return &Consumer{ test: strings.HasPrefix(group, TEST_PREFIX), consumer: csm, workers: cfg.Workers, handler: hdl, stop: make(chan bool), wg: &sync.WaitGroup{}, } } func (self *Consumer) Run() { for i := 0; i < self.workers; i++ { go self.Worker() } } func (self *Consumer) Stop() { close(self.stop) self.wg.Wait() self.consumer.Close() } func (self *Consumer) Worker() { self.wg.Add(1) defer self.wg.Done() for { select { case msg, ok := <-self.consumer.Messages(): if ok { //tlog.Debugf("topic: %s, partition: %d, offset: %d, msg: %s", msg.Topic, msg.Partition, msg.Offset, string(msg.Value)) self.handler(msg.Value) // 测试用的group不标记处理完毕,防止破坏集群数据 if !self.test { self.consumer.MarkOffset(msg, "done") } } case <-self.stop: tlog.Info("KafkaConsumer stop signal received!") return } } }