package consumer import ( "errors" "fmt" "git.beejay.kim/tool/service" "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/google/uuid" "github.com/kelindar/bitmap" "github.com/rs/zerolog/log" "strings" "sync" "time" ) const ( flagSubscribed uint32 = iota ) //goland:noinspection ALL type _consumer struct { config *Config state bitmap.Bitmap handlers []service.ConsumerHandler session *kafka.Consumer } func NewKafkaConsumer(cfg *Config) (service.Consumer, error) { var ( c = &_consumer{ config: cfg, } err error ) if cfg == nil { return nil, fmt.Errorf("config must be provided") } opts := &kafka.ConfigMap{ "broker.address.family": "v4", "bootstrap.servers": strings.Join(c.config.Hosts, ","), "group.id": c.config.Group, "partition.assignment.strategy": "cooperative-sticky", "auto.offset.reset": "earliest", "log_level": 0, "enable.auto.commit": false, } if c.session, err = kafka.NewConsumer(opts); err != nil { return nil, err } return c, nil } func (c *_consumer) ID() uuid.UUID { return uuid.NewSHA1(uuid.NameSpaceDNS, []byte("consumer.kafka")) } func (c *_consumer) Subscribed() bool { return c.state.Contains(flagSubscribed) } func (c *_consumer) RegisterHandlers(handlers ...service.ConsumerHandler) { c.handlers = append(c.handlers, handlers...) } func (c *_consumer) Subscribe(topics []string, ch chan *kafka.Message) error { if c.Subscribed() { return fmt.Errorf("illegal state: already subscribed") } if err := c.session.SubscribeTopics(topics, rebalanceCallback); err != nil { return err } c.state.Set(flagSubscribed) for c.Subscribed() { message, err := c.poll() if err != nil { // silently wait for a next message if errors.Is(err, ErrNoMessage) { continue } log.Debug(). Str("service", "consumer"). Err(err). Send() c.state.Remove(flagSubscribed) return err } if message != nil { ch <- message } } log.Debug().Msg("consumer closed gracefully") return nil } func (c *_consumer) Close() error { if c.Subscribed() { c.state.Remove(flagSubscribed) } var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() if c.session != nil { time.Sleep(time.Second) _ = c.session.Close() //nolint:errcheck } }() wg.Wait() return nil } func (c *_consumer) poll() (*kafka.Message, error) { var ( ev = c.session.Poll(c.config.Timeout) err error ) switch e := ev.(type) { case *kafka.Message: for i := range c.handlers { if err = c.handlers[i](e); err != nil { return nil, err } } // Handle manual commit since enable.auto.commit is unset. if err = maybeCommit(c.session, e.TopicPartition); err != nil { return nil, err } return e, nil case kafka.Error: log.Debug(). Str("service", "consumer"). Err(e). Send() return nil, e default: if e != nil { log.Debug(). Str("service", "consumer"). Any("event", e). Send() } return nil, ErrNoMessage } } // maybeCommit is called for each message we receive from a Kafka topic. // This method can be used to apply some arbitary logic/processing to the // offsets, write the offsets into some external storage, and finally, to // decide when we want to commit already-stored offsets into Kafka. func maybeCommit(c *kafka.Consumer, topicPartition kafka.TopicPartition) error { // Commit the already-stored offsets to Kafka whenever the offset is divisible // by 10, otherwise return early. // This logic is completely arbitrary. We can use any other internal or // external variables to decide when we commit the already-stored offsets. if topicPartition.Offset%10 != 0 { return nil } commitedOffsets, err := c.Commit() // ErrNoOffset occurs when there are no stored offsets to commit. This // can happen if we haven't stored anything since the last commit. // While this will never happen for this example since we call this method // per-message, and thus, always have something to commit, the error // handling is illustrative of how to handle it in cases we call Commit() // in another way, for example, every N seconds. if err != nil && err.(kafka.Error).Code() != kafka.ErrNoOffset { return err } log.Debug(). Str("service", "consumer"). Msgf("commited offsets to Kafka: %v", commitedOffsets) return nil } // rebalanceCallback is called on each group rebalance to assign additional // partitions, or remove existing partitions, from the consumer's current // assignment. // // A rebalance occurs when a consumer joins or leaves a consumer group, if it // changes the topic(s) it's subscribed to, or if there's a change in one of // the topics it's subscribed to, for example, the total number of partitions // increases. // // The application may use this optional callback to inspect the assignment, // alter the initial start offset (the .Offset field of each assigned partition), // and read/write offsets to commit to an alternative store outside of Kafka. func rebalanceCallback(c *kafka.Consumer, event kafka.Event) error { switch ev := event.(type) { case kafka.AssignedPartitions: log.Debug(). Str("service", "consumer"). Msgf("%s rebalance: %d new partition(s) assigned: %v", c.GetRebalanceProtocol(), len(ev.Partitions), ev.Partitions) err := c.Assign(ev.Partitions) if err != nil { return err } case kafka.RevokedPartitions: log.Debug(). Str("service", "consumer"). Msgf("%s rebalance: %d partition(s) revoked: %v", c.GetRebalanceProtocol(), len(ev.Partitions), ev.Partitions) // Usually, the rebalance callback for `RevokedPartitions` is called // just before the partitions are revoked. We can be certain that a // partition being revoked is not yet owned by any other consumer. // This way, logic like storing any pending offsets or committing // offsets can be handled. // However, there can be cases where the assignment is lost // involuntarily. In this case, the partition might already be owned // by another consumer, and operations including committing // offsets may not work. if c.AssignmentLost() { // Our consumer has been kicked out of the group and the // entire assignment is thus lost. log.Debug(). Str("service", "consumer"). Msg("Assignment lost involuntarily, commit may fail") } // Since enable.auto.commit is unset, we need to commit offsets manually // before the partition is revoked. commitedOffsets, err := c.Commit() if err != nil && err.(kafka.Error).Code() != kafka.ErrNoOffset { log.Debug(). Str("service", "consumer"). Err(err). Msg("failed to commit offsets") return err } log.Debug(). Str("service", "consumer"). Msgf("commited offsets to Kafka: %v", commitedOffsets) // Similar to Assign, client automatically calls Unassign() unless the // callback has already called that method. Here, we don't call it. default: log.Debug(). Str("service", "consumer"). Msgf("unxpected event type: %v", event) } return nil }