|
@@ -0,0 +1,273 @@
|
|
|
|
+package consumer
|
|
|
|
+
|
|
|
|
+import (
|
|
|
|
+ "errors"
|
|
|
|
+ "fmt"
|
|
|
|
+ "git.beejay.kim/tool/service"
|
|
|
|
+ "github.com/confluentinc/confluent-kafka-go/v2/kafka"
|
|
|
|
+ "github.com/google/uuid"
|
|
|
|
+ "github.com/kelindar/bitmap"
|
|
|
|
+ "github.com/rs/zerolog/log"
|
|
|
|
+ "strings"
|
|
|
|
+ "sync"
|
|
|
|
+ "time"
|
|
|
|
+)
|
|
|
|
+
|
|
|
|
+const (
|
|
|
|
+ flagSubscribed uint32 = iota
|
|
|
|
+)
|
|
|
|
+
|
|
|
|
+//goland:noinspection ALL
|
|
|
|
+type _consumer struct {
|
|
|
|
+ config *Config
|
|
|
|
+ state bitmap.Bitmap
|
|
|
|
+ handlers []service.ConsumerHandler
|
|
|
|
+ session *kafka.Consumer
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func NewKafkaConsumer(cfg *Config) (service.Consumer, error) {
|
|
|
|
+ var (
|
|
|
|
+ c = &_consumer{
|
|
|
|
+ config: cfg,
|
|
|
|
+ }
|
|
|
|
+ err error
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ if cfg == nil {
|
|
|
|
+ return nil, fmt.Errorf("config must be provided")
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ opts := &kafka.ConfigMap{
|
|
|
|
+ "broker.address.family": "v4",
|
|
|
|
+ "bootstrap.servers": strings.Join(c.config.Hosts, ","),
|
|
|
|
+ "group.id": c.config.Group,
|
|
|
|
+ "partition.assignment.strategy": "cooperative-sticky",
|
|
|
|
+ "auto.offset.reset": "earliest",
|
|
|
|
+ "log_level": 0,
|
|
|
|
+ "enable.auto.commit": false,
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if c.session, err = kafka.NewConsumer(opts); err != nil {
|
|
|
|
+ return nil, err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return c, nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (c *_consumer) ID() uuid.UUID {
|
|
|
|
+ return uuid.NewSHA1(uuid.NameSpaceDNS, []byte("consumer.kafka"))
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (c *_consumer) Subscribed() bool {
|
|
|
|
+ return c.state.Contains(flagSubscribed)
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (c *_consumer) RegisterHandlers(handlers ...service.ConsumerHandler) {
|
|
|
|
+ c.handlers = append(c.handlers, handlers...)
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (c *_consumer) Subscribe(topics []string, ch chan *kafka.Message) error {
|
|
|
|
+ if c.Subscribed() {
|
|
|
|
+ return fmt.Errorf("illegal state: already subscribed")
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if err := c.session.SubscribeTopics(topics, rebalanceCallback); err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ c.state.Set(flagSubscribed)
|
|
|
|
+ for c.Subscribed() {
|
|
|
|
+ message, err := c.poll()
|
|
|
|
+ if err != nil {
|
|
|
|
+ // silently wait for a next message
|
|
|
|
+ if errors.Is(err, ErrNoMessage) {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ log.Debug().
|
|
|
|
+ Str("service", "consumer").
|
|
|
|
+ Err(err).
|
|
|
|
+ Send()
|
|
|
|
+
|
|
|
|
+ c.state.Remove(flagSubscribed)
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if message != nil {
|
|
|
|
+ ch <- message
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ log.Debug().Msg("consumer closed gracefully")
|
|
|
|
+ return nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (c *_consumer) Close() error {
|
|
|
|
+ if c.Subscribed() {
|
|
|
|
+ c.state.Remove(flagSubscribed)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ var wg sync.WaitGroup
|
|
|
|
+ wg.Add(1)
|
|
|
|
+
|
|
|
|
+ go func() {
|
|
|
|
+ defer wg.Done()
|
|
|
|
+
|
|
|
|
+ if c.session != nil {
|
|
|
|
+ time.Sleep(time.Second)
|
|
|
|
+ _ = c.session.Close() //nolint:errcheck
|
|
|
|
+ }
|
|
|
|
+ }()
|
|
|
|
+
|
|
|
|
+ wg.Wait()
|
|
|
|
+ return nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (c *_consumer) poll() (*kafka.Message, error) {
|
|
|
|
+ var (
|
|
|
|
+ ev = c.session.Poll(c.config.Timeout)
|
|
|
|
+ err error
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ switch e := ev.(type) {
|
|
|
|
+ case *kafka.Message:
|
|
|
|
+ for i := range c.handlers {
|
|
|
|
+ if err = c.handlers[i](e); err != nil {
|
|
|
|
+ return nil, err
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Handle manual commit since enable.auto.commit is unset.
|
|
|
|
+ if err = maybeCommit(c.session, e.TopicPartition); err != nil {
|
|
|
|
+ return nil, err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return e, nil
|
|
|
|
+ case kafka.Error:
|
|
|
|
+ log.Debug().
|
|
|
|
+ Str("service", "consumer").
|
|
|
|
+ Err(e).
|
|
|
|
+ Send()
|
|
|
|
+ return nil, e
|
|
|
|
+ default:
|
|
|
|
+ if e != nil {
|
|
|
|
+ log.Debug().
|
|
|
|
+ Str("service", "consumer").
|
|
|
|
+ Any("event", e).
|
|
|
|
+ Send()
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return nil, ErrNoMessage
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// maybeCommit is called for each message we receive from a Kafka topic.
|
|
|
|
+// This method can be used to apply some arbitary logic/processing to the
|
|
|
|
+// offsets, write the offsets into some external storage, and finally, to
|
|
|
|
+// decide when we want to commit already-stored offsets into Kafka.
|
|
|
|
+func maybeCommit(c *kafka.Consumer, topicPartition kafka.TopicPartition) error {
|
|
|
|
+ // Commit the already-stored offsets to Kafka whenever the offset is divisible
|
|
|
|
+ // by 10, otherwise return early.
|
|
|
|
+ // This logic is completely arbitrary. We can use any other internal or
|
|
|
|
+ // external variables to decide when we commit the already-stored offsets.
|
|
|
|
+ if topicPartition.Offset%10 != 0 {
|
|
|
|
+ return nil
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ commitedOffsets, err := c.Commit()
|
|
|
|
+
|
|
|
|
+ // ErrNoOffset occurs when there are no stored offsets to commit. This
|
|
|
|
+ // can happen if we haven't stored anything since the last commit.
|
|
|
|
+ // While this will never happen for this example since we call this method
|
|
|
|
+ // per-message, and thus, always have something to commit, the error
|
|
|
|
+ // handling is illustrative of how to handle it in cases we call Commit()
|
|
|
|
+ // in another way, for example, every N seconds.
|
|
|
|
+ if err != nil && err.(kafka.Error).Code() != kafka.ErrNoOffset {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ log.Debug().
|
|
|
|
+ Str("service", "consumer").
|
|
|
|
+ Msgf("commited offsets to Kafka: %v", commitedOffsets)
|
|
|
|
+ return nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// rebalanceCallback is called on each group rebalance to assign additional
|
|
|
|
+// partitions, or remove existing partitions, from the consumer's current
|
|
|
|
+// assignment.
|
|
|
|
+//
|
|
|
|
+// A rebalance occurs when a consumer joins or leaves a consumer group, if it
|
|
|
|
+// changes the topic(s) it's subscribed to, or if there's a change in one of
|
|
|
|
+// the topics it's subscribed to, for example, the total number of partitions
|
|
|
|
+// increases.
|
|
|
|
+//
|
|
|
|
+// The application may use this optional callback to inspect the assignment,
|
|
|
|
+// alter the initial start offset (the .Offset field of each assigned partition),
|
|
|
|
+// and read/write offsets to commit to an alternative store outside of Kafka.
|
|
|
|
+func rebalanceCallback(c *kafka.Consumer, event kafka.Event) error {
|
|
|
|
+ switch ev := event.(type) {
|
|
|
|
+ case kafka.AssignedPartitions:
|
|
|
|
+ log.Debug().
|
|
|
|
+ Str("service", "consumer").
|
|
|
|
+ Msgf("%s rebalance: %d new partition(s) assigned: %v",
|
|
|
|
+ c.GetRebalanceProtocol(),
|
|
|
|
+ len(ev.Partitions),
|
|
|
|
+ ev.Partitions)
|
|
|
|
+
|
|
|
|
+ err := c.Assign(ev.Partitions)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ case kafka.RevokedPartitions:
|
|
|
|
+ log.Debug().
|
|
|
|
+ Str("service", "consumer").
|
|
|
|
+ Msgf("%s rebalance: %d partition(s) revoked: %v",
|
|
|
|
+ c.GetRebalanceProtocol(),
|
|
|
|
+ len(ev.Partitions),
|
|
|
|
+ ev.Partitions)
|
|
|
|
+
|
|
|
|
+ // Usually, the rebalance callback for `RevokedPartitions` is called
|
|
|
|
+ // just before the partitions are revoked. We can be certain that a
|
|
|
|
+ // partition being revoked is not yet owned by any other consumer.
|
|
|
|
+ // This way, logic like storing any pending offsets or committing
|
|
|
|
+ // offsets can be handled.
|
|
|
|
+ // However, there can be cases where the assignment is lost
|
|
|
|
+ // involuntarily. In this case, the partition might already be owned
|
|
|
|
+ // by another consumer, and operations including committing
|
|
|
|
+ // offsets may not work.
|
|
|
|
+ if c.AssignmentLost() {
|
|
|
|
+ // Our consumer has been kicked out of the group and the
|
|
|
|
+ // entire assignment is thus lost.
|
|
|
|
+ log.Debug().
|
|
|
|
+ Str("service", "consumer").
|
|
|
|
+ Msg("Assignment lost involuntarily, commit may fail")
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Since enable.auto.commit is unset, we need to commit offsets manually
|
|
|
|
+ // before the partition is revoked.
|
|
|
|
+ commitedOffsets, err := c.Commit()
|
|
|
|
+ if err != nil && err.(kafka.Error).Code() != kafka.ErrNoOffset {
|
|
|
|
+ log.Debug().
|
|
|
|
+ Str("service", "consumer").
|
|
|
|
+ Err(err).
|
|
|
|
+ Msg("failed to commit offsets")
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ log.Debug().
|
|
|
|
+ Str("service", "consumer").
|
|
|
|
+ Msgf("commited offsets to Kafka: %v", commitedOffsets)
|
|
|
|
+
|
|
|
|
+ // Similar to Assign, client automatically calls Unassign() unless the
|
|
|
|
+ // callback has already called that method. Here, we don't call it.
|
|
|
|
+
|
|
|
|
+ default:
|
|
|
|
+ log.Debug().
|
|
|
|
+ Str("service", "consumer").
|
|
|
|
+ Msgf("unxpected event type: %v", event)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return nil
|
|
|
|
+}
|