123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273 |
- package consumer
- import (
- "errors"
- "fmt"
- "git.beejay.kim/tool/service"
- "github.com/confluentinc/confluent-kafka-go/v2/kafka"
- "github.com/google/uuid"
- "github.com/kelindar/bitmap"
- "github.com/rs/zerolog/log"
- "strings"
- "sync"
- "time"
- )
- const (
- flagSubscribed uint32 = iota
- )
- //goland:noinspection ALL
- type _consumer struct {
- config *Config
- state bitmap.Bitmap
- handlers []service.ConsumerHandler
- session *kafka.Consumer
- }
- func NewKafkaConsumer(cfg *Config) (service.Consumer, error) {
- var (
- c = &_consumer{
- config: cfg,
- }
- err error
- )
- if cfg == nil {
- return nil, fmt.Errorf("config must be provided")
- }
- opts := &kafka.ConfigMap{
- "broker.address.family": "v4",
- "bootstrap.servers": strings.Join(c.config.Hosts, ","),
- "group.id": c.config.Group,
- "partition.assignment.strategy": "cooperative-sticky",
- "auto.offset.reset": "earliest",
- "log_level": 0,
- "enable.auto.commit": false,
- }
- if c.session, err = kafka.NewConsumer(opts); err != nil {
- return nil, err
- }
- return c, nil
- }
- func (c *_consumer) ID() uuid.UUID {
- return uuid.NewSHA1(uuid.NameSpaceDNS, []byte("consumer.kafka"))
- }
- func (c *_consumer) Subscribed() bool {
- return c.state.Contains(flagSubscribed)
- }
- func (c *_consumer) RegisterHandlers(handlers ...service.ConsumerHandler) {
- c.handlers = append(c.handlers, handlers...)
- }
- func (c *_consumer) Subscribe(topics []string, ch chan *kafka.Message) error {
- if c.Subscribed() {
- return fmt.Errorf("illegal state: already subscribed")
- }
- if err := c.session.SubscribeTopics(topics, rebalanceCallback); err != nil {
- return err
- }
- c.state.Set(flagSubscribed)
- for c.Subscribed() {
- message, err := c.poll()
- if err != nil {
- // silently wait for a next message
- if errors.Is(err, ErrNoMessage) {
- continue
- }
- log.Debug().
- Str("service", "consumer").
- Err(err).
- Send()
- c.state.Remove(flagSubscribed)
- return err
- }
- if message != nil {
- ch <- message
- }
- }
- log.Debug().Msg("consumer closed gracefully")
- return nil
- }
- func (c *_consumer) Close() error {
- if c.Subscribed() {
- c.state.Remove(flagSubscribed)
- }
- var wg sync.WaitGroup
- wg.Add(1)
- go func() {
- defer wg.Done()
- if c.session != nil {
- time.Sleep(time.Second)
- _ = c.session.Close() //nolint:errcheck
- }
- }()
- wg.Wait()
- return nil
- }
- func (c *_consumer) poll() (*kafka.Message, error) {
- var (
- ev = c.session.Poll(c.config.Timeout)
- err error
- )
- switch e := ev.(type) {
- case *kafka.Message:
- for i := range c.handlers {
- if err = c.handlers[i](e); err != nil {
- return nil, err
- }
- }
- // Handle manual commit since enable.auto.commit is unset.
- if err = maybeCommit(c.session, e.TopicPartition); err != nil {
- return nil, err
- }
- return e, nil
- case kafka.Error:
- log.Debug().
- Str("service", "consumer").
- Err(e).
- Send()
- return nil, e
- default:
- if e != nil {
- log.Debug().
- Str("service", "consumer").
- Any("event", e).
- Send()
- }
- return nil, ErrNoMessage
- }
- }
- // maybeCommit is called for each message we receive from a Kafka topic.
- // This method can be used to apply some arbitary logic/processing to the
- // offsets, write the offsets into some external storage, and finally, to
- // decide when we want to commit already-stored offsets into Kafka.
- func maybeCommit(c *kafka.Consumer, topicPartition kafka.TopicPartition) error {
- // Commit the already-stored offsets to Kafka whenever the offset is divisible
- // by 10, otherwise return early.
- // This logic is completely arbitrary. We can use any other internal or
- // external variables to decide when we commit the already-stored offsets.
- if topicPartition.Offset%10 != 0 {
- return nil
- }
- commitedOffsets, err := c.Commit()
- // ErrNoOffset occurs when there are no stored offsets to commit. This
- // can happen if we haven't stored anything since the last commit.
- // While this will never happen for this example since we call this method
- // per-message, and thus, always have something to commit, the error
- // handling is illustrative of how to handle it in cases we call Commit()
- // in another way, for example, every N seconds.
- if err != nil && err.(kafka.Error).Code() != kafka.ErrNoOffset {
- return err
- }
- log.Debug().
- Str("service", "consumer").
- Msgf("commited offsets to Kafka: %v", commitedOffsets)
- return nil
- }
- // rebalanceCallback is called on each group rebalance to assign additional
- // partitions, or remove existing partitions, from the consumer's current
- // assignment.
- //
- // A rebalance occurs when a consumer joins or leaves a consumer group, if it
- // changes the topic(s) it's subscribed to, or if there's a change in one of
- // the topics it's subscribed to, for example, the total number of partitions
- // increases.
- //
- // The application may use this optional callback to inspect the assignment,
- // alter the initial start offset (the .Offset field of each assigned partition),
- // and read/write offsets to commit to an alternative store outside of Kafka.
- func rebalanceCallback(c *kafka.Consumer, event kafka.Event) error {
- switch ev := event.(type) {
- case kafka.AssignedPartitions:
- log.Debug().
- Str("service", "consumer").
- Msgf("%s rebalance: %d new partition(s) assigned: %v",
- c.GetRebalanceProtocol(),
- len(ev.Partitions),
- ev.Partitions)
- err := c.Assign(ev.Partitions)
- if err != nil {
- return err
- }
- case kafka.RevokedPartitions:
- log.Debug().
- Str("service", "consumer").
- Msgf("%s rebalance: %d partition(s) revoked: %v",
- c.GetRebalanceProtocol(),
- len(ev.Partitions),
- ev.Partitions)
- // Usually, the rebalance callback for `RevokedPartitions` is called
- // just before the partitions are revoked. We can be certain that a
- // partition being revoked is not yet owned by any other consumer.
- // This way, logic like storing any pending offsets or committing
- // offsets can be handled.
- // However, there can be cases where the assignment is lost
- // involuntarily. In this case, the partition might already be owned
- // by another consumer, and operations including committing
- // offsets may not work.
- if c.AssignmentLost() {
- // Our consumer has been kicked out of the group and the
- // entire assignment is thus lost.
- log.Debug().
- Str("service", "consumer").
- Msg("Assignment lost involuntarily, commit may fail")
- }
- // Since enable.auto.commit is unset, we need to commit offsets manually
- // before the partition is revoked.
- commitedOffsets, err := c.Commit()
- if err != nil && err.(kafka.Error).Code() != kafka.ErrNoOffset {
- log.Debug().
- Str("service", "consumer").
- Err(err).
- Msg("failed to commit offsets")
- return err
- }
- log.Debug().
- Str("service", "consumer").
- Msgf("commited offsets to Kafka: %v", commitedOffsets)
- // Similar to Assign, client automatically calls Unassign() unless the
- // callback has already called that method. Here, we don't call it.
- default:
- log.Debug().
- Str("service", "consumer").
- Msgf("unxpected event type: %v", event)
- }
- return nil
- }
|