package kafka import ( "errors" "fmt" lib "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/rs/zerolog/log" "strings" "sync" "time" ) var ErrNoMessage = errors.New("no message") type _consumer struct { config *Config session *lib.Consumer } func NewConsumer(cfg *Config) (Consumer, error) { var ( c = &_consumer{ config: cfg, } err error ) if cfg == nil { return nil, fmt.Errorf("config must be provided") } opts := &lib.ConfigMap{ "broker.address.family": "v4", "bootstrap.servers": strings.Join(c.config.Hosts, ","), "group.id": c.config.Group, "partition.assignment.strategy": "cooperative-sticky", "auto.offset.reset": "earliest", "log_level": 0, } if c.session, err = lib.NewConsumer(opts); err != nil { return nil, err } return c, nil } func (c *_consumer) Subscribe(topics []string, ch chan *lib.Message) error { if topics == nil { return fmt.Errorf("illegal arguments: at least one topic must be provided") } if err := c.session.SubscribeTopics(topics, rebalanceCallback); err != nil { return err } for { message, err := c.pollMessage() if err != nil { // silently wait for a next message if errors.Is(err, ErrNoMessage) { continue } } if message != nil { ch <- message } } } func (c *_consumer) String() string { return "kafka:consumer" } func (c *_consumer) Close() error { var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() if c.session != nil { time.Sleep(time.Second) _ = c.session.Close() //nolint:errcheck } }() wg.Wait() return nil } func (c *_consumer) pollMessage() (*lib.Message, error) { ev := c.session.Poll(c.config.Timeout) switch e := ev.(type) { case *lib.Message: return e, nil case lib.Error: return nil, e default: if e != nil { log.Debug(). Str("service", "consumer"). Any("event", e). Send() } return nil, ErrNoMessage } }