package consumer import ( "errors" "fmt" "git.beejay.kim/tool/service" "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/google/uuid" "github.com/kelindar/bitmap" "github.com/rs/zerolog/log" "strings" "sync" "time" ) const ( flagSubscribed uint32 = iota flagPaused ) //goland:noinspection ALL type _consumer struct { config *Config state bitmap.Bitmap handlers []service.ConsumerHandler session *kafka.Consumer } func New(cfg *Config) (service.Consumer, error) { var ( c = &_consumer{ config: cfg, } err error ) if cfg == nil { return nil, fmt.Errorf("config must be provided") } opts := &kafka.ConfigMap{ "broker.address.family": "v4", "bootstrap.servers": strings.Join(c.config.Hosts, ","), "group.id": c.config.Group, "partition.assignment.strategy": "cooperative-sticky", "auto.offset.reset": "earliest", "log_level": 0, } if c.session, err = kafka.NewConsumer(opts); err != nil { return nil, err } return c, nil } func (c *_consumer) ID() uuid.UUID { return uuid.NewSHA1(uuid.NameSpaceDNS, []byte("consumer.kafka")) } func (c *_consumer) Subscribed() bool { return c.state.Contains(flagSubscribed) } func (c *_consumer) Paused() bool { return c.state.Contains(flagPaused) } func (c *_consumer) SetPause(is bool) error { if is { c.state.Set(flagPaused) } else { c.state.Remove(flagPaused) } return nil } func (c *_consumer) RegisterHandlers(handlers ...service.ConsumerHandler) { c.handlers = append(c.handlers, handlers...) } func (c *_consumer) Subscribe(topics []string, ch chan *kafka.Message, opts service.ConsumerOptions) error { if c.Subscribed() { return fmt.Errorf("illegal state: already subscribed") } if err := c.session.SubscribeTopics(topics, rebalanceCallback); err != nil { return err } c.state.Set(flagSubscribed) for c.Subscribed() { for c.Paused() { time.Sleep(100 * time.Millisecond) } message, err := c.poll() if err != nil { // silently wait for a next message if errors.Is(err, ErrNoMessage) { continue } log.Debug(). Str("service", "consumer"). Err(err). Send() c.state.Remove(flagSubscribed) return err } if message != nil { if opts.Counter != nil { opts.Counter.Inc() } ch <- message } } log.Debug().Msg("consumer closed gracefully") return nil } func (c *_consumer) Close() error { if c.Subscribed() { c.state.Remove(flagSubscribed) } var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() if c.session != nil { time.Sleep(time.Second) _ = c.session.Close() //nolint:errcheck } }() wg.Wait() return nil } func (c *_consumer) poll() (*kafka.Message, error) { var ( ev = c.session.Poll(c.config.Timeout) err error ) switch e := ev.(type) { case *kafka.Message: for i := range c.handlers { if err = c.handlers[i](e); err != nil { return nil, err } } return e, nil case kafka.Error: log.Debug(). Str("service", "consumer"). Err(e). Send() if e.Code() == kafka.ErrAllBrokersDown { c.state.Remove(flagSubscribed) } return nil, e default: if e != nil { log.Debug(). Str("service", "consumer"). Any("event", e). Send() } return nil, ErrNoMessage } } // rebalanceCallback is called on each group rebalance to assign additional // partitions, or remove existing partitions, from the consumer's current // assignment. // // The application may use this optional callback to inspect the assignment, // alter the initial start offset (the .Offset field of each assigned partition), // and read/write offsets to commit to an alternative store outside of Kafka. func rebalanceCallback(c *kafka.Consumer, event kafka.Event) error { switch ev := event.(type) { case kafka.AssignedPartitions: log.Debug(). Str("service", "consumer"). Msgf("%s rebalance: %d new partition(s) assigned: %v", c.GetRebalanceProtocol(), len(ev.Partitions), ev.Partitions) // The application may update the start .Offset of each // assigned partition and then call IncrementalAssign(). if err := c.IncrementalAssign(ev.Partitions); err != nil { panic(err) } case kafka.RevokedPartitions: log.Debug(). Str("service", "consumer"). Msgf("%s rebalance: %d partition(s) revoked: %v", c.GetRebalanceProtocol(), len(ev.Partitions), ev.Partitions) // Usually, the rebalance callback for `RevokedPartitions` is called // just before the partitions are revoked. We can be certain that a // partition being revoked is not yet owned by any other consumer. // This way, logic like storing any pending offsets or committing // offsets can be handled. // However, there can be cases where the assignment is lost // involuntarily. In this case, the partition might already be owned // by another consumer, and operations including committing // offsets may not work. if c.AssignmentLost() { // Our consumer has been kicked out of the group and the // entire assignment is thus lost. log.Debug(). Str("service", "consumer"). Msg("Assignment lost involuntarily, commit may fail") } // The client automatically calls IncrementalUnassign() unless // the callback has already called that method. default: log.Debug(). Str("service", "consumer"). Msgf("unxpected event type: %v", event) } return nil }