123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278 |
- package consumer
- import (
- "errors"
- "fmt"
- "git.beejay.kim/tool/service"
- "github.com/confluentinc/confluent-kafka-go/v2/kafka"
- "github.com/google/uuid"
- "github.com/kelindar/bitmap"
- "github.com/rs/zerolog/log"
- "github.com/samber/lo"
- "strings"
- "sync"
- "time"
- )
- const (
- FlagPause uint32 = iota
- FlagStop
- )
- //goland:noinspection ALL
- type _consumer struct {
- config *Config
- handlers []service.ConsumerHandler
- session *kafka.Consumer
- subscriptions []chan uint32
- }
- func New(cfg *Config) (service.Consumer, error) {
- var (
- c = &_consumer{
- config: cfg,
- }
- err error
- )
- if cfg == nil {
- return nil, fmt.Errorf("config must be provided")
- }
- opts := &kafka.ConfigMap{
- "broker.address.family": "v4",
- "bootstrap.servers": strings.Join(c.config.Hosts, ","),
- "group.id": c.config.Group,
- "partition.assignment.strategy": "cooperative-sticky",
- "auto.offset.reset": "earliest",
- "log_level": 0,
- }
- if c.session, err = kafka.NewConsumer(opts); err != nil {
- return nil, err
- }
- return c, nil
- }
- func (c *_consumer) ID() uuid.UUID {
- return uuid.NewSHA1(uuid.NameSpaceDNS, []byte("consumer.kafka"))
- }
- func (c *_consumer) RegisterHandlers(handlers ...service.ConsumerHandler) {
- c.handlers = append(c.handlers, handlers...)
- }
- func (c *_consumer) Subscribe(topics []string, io chan uint32) (chan *kafka.Message, error) {
- if topics == nil || len(topics) == 0 {
- return nil, errors.New("illegal arguments: at least one topic must be provided to subscribe")
- }
- var (
- messages = make(chan *kafka.Message)
- err error
- )
- if err = c.session.SubscribeTopics(topics, rebalanceCallback); err != nil {
- return nil, err
- }
- go func() {
- defer func() {
- c.subscriptions = lo.Filter(c.subscriptions, func(ch chan uint32, _ int) bool {
- return io != ch
- })
- }()
- var (
- state bitmap.Bitmap
- message *kafka.Message
- )
- for {
- select {
- case s := <-io:
- switch s {
- case FlagStop:
- log.Debug().
- Str("service", "consumer").
- Msg("consumer gracefully closed")
- return
- case FlagPause:
- if state.Contains(FlagPause) {
- state.Remove(FlagPause)
- } else {
- state.Set(FlagPause)
- }
- }
- default:
- }
- if state.Contains(FlagPause) {
- <-time.After(100 * time.Millisecond)
- continue
- }
- message, err = c.poll()
- if err != nil {
- // silently wait for a next message
- if errors.Is(err, ErrNoMessage) {
- continue
- }
- log.Debug().
- Str("service", "consumer").
- Err(err).
- Send()
- messages <- nil
- return
- }
- if message != nil {
- messages <- message
- }
- }
- }()
- c.subscriptions = append(c.subscriptions, io)
- return messages, nil
- }
- func (c *_consumer) Close() error {
- var wg sync.WaitGroup
- wg.Add(2)
- go func() {
- defer wg.Done()
- for i := range c.subscriptions {
- if c.subscriptions[i] != nil {
- c.subscriptions[i] <- FlagStop
- }
- }
- }()
- go func() {
- defer wg.Done()
- if c.session != nil {
- time.Sleep(time.Second)
- _ = c.session.Close() //nolint:errcheck
- }
- }()
- wg.Wait()
- return nil
- }
- func (c *_consumer) poll() (*kafka.Message, error) {
- var (
- ev = c.session.Poll(c.config.Timeout)
- err error
- )
- switch e := ev.(type) {
- case *kafka.Message:
- for i := range c.handlers {
- if err = c.handlers[i](e); err != nil {
- return nil, err
- }
- }
- return e, nil
- case kafka.Error:
- return nil, e
- case kafka.OffsetsCommitted:
- if e.Error != nil {
- return nil, err
- }
- l := log.Debug().Str("service", "consumer")
- for _, offset := range e.Offsets {
- if offset.Topic == nil {
- continue
- }
- l = l.Str(*offset.Topic, fmt.Sprintf("partition: %d; offset: %s",
- offset.Partition,
- offset.Offset.String()))
- }
- l.Send()
- return nil, ErrNoMessage
- default:
- if e != nil {
- log.Debug().
- Str("service", "consumer").
- Any("event", e).
- Send()
- }
- return nil, ErrNoMessage
- }
- }
- // rebalanceCallback is called on each group rebalance to assign additional
- // partitions, or remove existing partitions, from the consumer's current
- // assignment.
- //
- // The application may use this optional callback to inspect the assignment,
- // alter the initial start offset (the .Offset field of each assigned partition),
- // and read/write offsets to commit to an alternative store outside of Kafka.
- func rebalanceCallback(c *kafka.Consumer, event kafka.Event) error {
- switch ev := event.(type) {
- case kafka.AssignedPartitions:
- log.Debug().
- Str("service", "consumer").
- Msgf("%s rebalance: %d new partition(s) assigned: %v",
- c.GetRebalanceProtocol(),
- len(ev.Partitions),
- ev.Partitions)
- // The application may update the start .Offset of each
- // assigned partition and then call IncrementalAssign().
- if err := c.IncrementalAssign(ev.Partitions); err != nil {
- panic(err)
- }
- case kafka.RevokedPartitions:
- log.Debug().
- Str("service", "consumer").
- Msgf("%s rebalance: %d partition(s) revoked: %v",
- c.GetRebalanceProtocol(),
- len(ev.Partitions),
- ev.Partitions)
- // Usually, the rebalance callback for `RevokedPartitions` is called
- // just before the partitions are revoked. We can be certain that a
- // partition being revoked is not yet owned by any other consumer.
- // This way, logic like storing any pending offsets or committing
- // offsets can be handled.
- // However, there can be cases where the assignment is lost
- // involuntarily. In this case, the partition might already be owned
- // by another consumer, and operations including committing
- // offsets may not work.
- if c.AssignmentLost() {
- // Our consumer has been kicked out of the group and the
- // entire assignment is thus lost.
- log.Debug().
- Str("service", "consumer").
- Msg("Assignment lost involuntarily, commit may fail")
- }
- // The client automatically calls IncrementalUnassign() unless
- // the callback has already called that method.
- default:
- log.Debug().
- Str("service", "consumer").
- Msgf("unxpected event type: %v", event)
- }
- return nil
- }
|