123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- package kafka
- import (
- "errors"
- "fmt"
- lib "github.com/confluentinc/confluent-kafka-go/v2/kafka"
- "github.com/rs/zerolog/log"
- "strings"
- "sync"
- "time"
- )
- var ErrNoMessage = errors.New("no message")
- type _consumer struct {
- config *Config
- session *lib.Consumer
- }
- func NewConsumer(cfg *Config) (Consumer, error) {
- var (
- c = &_consumer{
- config: cfg,
- }
- err error
- )
- if cfg == nil {
- return nil, fmt.Errorf("config must be provided")
- }
- opts := &lib.ConfigMap{
- "broker.address.family": "v4",
- "bootstrap.servers": strings.Join(c.config.Hosts, ","),
- "group.id": c.config.Group,
- "partition.assignment.strategy": "cooperative-sticky",
- "auto.offset.reset": "earliest",
- "log_level": 0,
- }
- if c.session, err = lib.NewConsumer(opts); err != nil {
- return nil, err
- }
- return c, nil
- }
- func (c *_consumer) Subscribe(topics []string, ch chan *lib.Message) error {
- if topics == nil {
- return fmt.Errorf("illegal arguments: at least one topic must be provided")
- }
- if err := c.session.SubscribeTopics(topics, rebalanceCallback); err != nil {
- return err
- }
- for {
- message, err := c.pollMessage()
- if err != nil {
- // silently wait for a next message
- if errors.Is(err, ErrNoMessage) {
- continue
- }
- }
- if message != nil {
- ch <- message
- }
- }
- }
- func (c *_consumer) String() string {
- return "kafka:consumer"
- }
- func (c *_consumer) Close() error {
- var wg sync.WaitGroup
- wg.Add(1)
- go func() {
- defer wg.Done()
- if c.session != nil {
- time.Sleep(time.Second)
- _ = c.session.Close() //nolint:errcheck
- }
- }()
- wg.Wait()
- return nil
- }
- func (c *_consumer) pollMessage() (*lib.Message, error) {
- ev := c.session.Poll(c.config.Timeout)
- switch e := ev.(type) {
- case *lib.Message:
- return e, nil
- case lib.Error:
- return nil, e
- default:
- if e != nil {
- log.Debug().
- Str("service", "consumer").
- Any("event", e).
- Send()
- }
- return nil, ErrNoMessage
- }
- }
|