package consumer import ( "errors" "fmt" "git.beejay.kim/tool/service" "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/google/uuid" "github.com/kelindar/bitmap" "github.com/rs/zerolog/log" "github.com/samber/lo" "strings" "sync" "time" ) const ( FlagPause uint32 = iota FlagStop ) //goland:noinspection ALL type _consumer struct { config *Config handlers []service.ConsumerHandler session *kafka.Consumer subscriptions []chan uint32 } func New(cfg *Config) (service.Consumer, error) { var ( c = &_consumer{ config: cfg, } err error ) if cfg == nil { return nil, fmt.Errorf("config must be provided") } opts := &kafka.ConfigMap{ "broker.address.family": "v4", "bootstrap.servers": strings.Join(c.config.Hosts, ","), "group.id": c.config.Group, "partition.assignment.strategy": "cooperative-sticky", "auto.offset.reset": "earliest", "log_level": 0, } if c.session, err = kafka.NewConsumer(opts); err != nil { return nil, err } return c, nil } func (c *_consumer) ID() uuid.UUID { return uuid.NewSHA1(uuid.NameSpaceDNS, []byte("consumer.kafka")) } func (c *_consumer) RegisterHandlers(handlers ...service.ConsumerHandler) { c.handlers = append(c.handlers, handlers...) } func (c *_consumer) Subscribe(topics []string, io chan uint32) (chan *kafka.Message, error) { if topics == nil || len(topics) == 0 { return nil, errors.New("illegal arguments: at least one topic must be provided to subscribe") } var ( messages = make(chan *kafka.Message) err error ) if err = c.session.SubscribeTopics(topics, rebalanceCallback); err != nil { return nil, err } go func() { defer func() { c.subscriptions = lo.Filter(c.subscriptions, func(ch chan uint32, _ int) bool { return io != ch }) }() var ( state bitmap.Bitmap message *kafka.Message ) for { select { case s := <-io: switch s { case FlagStop: log.Debug(). Str("service", "consumer"). Msg("consumer gracefully closed") return case FlagPause: if state.Contains(FlagPause) { state.Remove(FlagPause) } else { state.Set(FlagPause) } } default: } if state.Contains(FlagPause) { <-time.After(100 * time.Millisecond) continue } message, err = c.poll() if err != nil { // silently wait for a next message if errors.Is(err, ErrNoMessage) { continue } log.Debug(). Str("service", "consumer"). Err(err). Send() messages <- nil return } if message != nil { messages <- message } } }() c.subscriptions = append(c.subscriptions, io) return messages, nil } func (c *_consumer) Close() error { var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() for i := range c.subscriptions { if c.subscriptions[i] != nil { c.subscriptions[i] <- FlagStop } } }() go func() { defer wg.Done() if c.session != nil { time.Sleep(time.Second) _ = c.session.Close() //nolint:errcheck } }() wg.Wait() return nil } func (c *_consumer) poll() (*kafka.Message, error) { var ( ev = c.session.Poll(c.config.Timeout) err error ) switch e := ev.(type) { case *kafka.Message: for i := range c.handlers { if err = c.handlers[i](e); err != nil { return nil, err } } return e, nil case kafka.Error: return nil, e case kafka.OffsetsCommitted: if e.Error != nil { return nil, err } l := log.Debug().Str("service", "consumer") for _, offset := range e.Offsets { if offset.Topic == nil { continue } l = l.Str(*offset.Topic, fmt.Sprintf("partition: %d; offset: %s", offset.Partition, offset.Offset.String())) } l.Send() return nil, ErrNoMessage default: if e != nil { log.Debug(). Str("service", "consumer"). Any("event", e). Send() } return nil, ErrNoMessage } } // rebalanceCallback is called on each group rebalance to assign additional // partitions, or remove existing partitions, from the consumer's current // assignment. // // The application may use this optional callback to inspect the assignment, // alter the initial start offset (the .Offset field of each assigned partition), // and read/write offsets to commit to an alternative store outside of Kafka. func rebalanceCallback(c *kafka.Consumer, event kafka.Event) error { switch ev := event.(type) { case kafka.AssignedPartitions: log.Debug(). Str("service", "consumer"). Msgf("%s rebalance: %d new partition(s) assigned: %v", c.GetRebalanceProtocol(), len(ev.Partitions), ev.Partitions) // The application may update the start .Offset of each // assigned partition and then call IncrementalAssign(). if err := c.IncrementalAssign(ev.Partitions); err != nil { panic(err) } case kafka.RevokedPartitions: log.Debug(). Str("service", "consumer"). Msgf("%s rebalance: %d partition(s) revoked: %v", c.GetRebalanceProtocol(), len(ev.Partitions), ev.Partitions) // Usually, the rebalance callback for `RevokedPartitions` is called // just before the partitions are revoked. We can be certain that a // partition being revoked is not yet owned by any other consumer. // This way, logic like storing any pending offsets or committing // offsets can be handled. // However, there can be cases where the assignment is lost // involuntarily. In this case, the partition might already be owned // by another consumer, and operations including committing // offsets may not work. if c.AssignmentLost() { // Our consumer has been kicked out of the group and the // entire assignment is thus lost. log.Debug(). Str("service", "consumer"). Msg("Assignment lost involuntarily, commit may fail") } // The client automatically calls IncrementalUnassign() unless // the callback has already called that method. default: log.Debug(). Str("service", "consumer"). Msgf("unxpected event type: %v", event) } return nil }