123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244 |
- package consumer
- import (
- "errors"
- "fmt"
- "git.beejay.kim/tool/service"
- "github.com/confluentinc/confluent-kafka-go/v2/kafka"
- "github.com/google/uuid"
- "github.com/kelindar/bitmap"
- "github.com/rs/zerolog/log"
- "strings"
- "sync"
- "time"
- )
- const (
- flagSubscribed uint32 = iota
- flagPaused
- )
- //goland:noinspection ALL
- type _consumer struct {
- config *Config
- state bitmap.Bitmap
- handlers []service.ConsumerHandler
- session *kafka.Consumer
- }
- func New(cfg *Config) (service.Consumer, error) {
- var (
- c = &_consumer{
- config: cfg,
- }
- err error
- )
- if cfg == nil {
- return nil, fmt.Errorf("config must be provided")
- }
- opts := &kafka.ConfigMap{
- "broker.address.family": "v4",
- "bootstrap.servers": strings.Join(c.config.Hosts, ","),
- "group.id": c.config.Group,
- "partition.assignment.strategy": "cooperative-sticky",
- "auto.offset.reset": "earliest",
- "log_level": 0,
- }
- if c.session, err = kafka.NewConsumer(opts); err != nil {
- return nil, err
- }
- return c, nil
- }
- func (c *_consumer) ID() uuid.UUID {
- return uuid.NewSHA1(uuid.NameSpaceDNS, []byte("consumer.kafka"))
- }
- func (c *_consumer) Subscribed() bool {
- return c.state.Contains(flagSubscribed)
- }
- func (c *_consumer) Paused() bool {
- return c.state.Contains(flagPaused)
- }
- func (c *_consumer) SetPause(is bool) error {
- if is {
- c.state.Set(flagPaused)
- } else {
- c.state.Remove(flagPaused)
- }
- return nil
- }
- func (c *_consumer) RegisterHandlers(handlers ...service.ConsumerHandler) {
- c.handlers = append(c.handlers, handlers...)
- }
- func (c *_consumer) Subscribe(topics []string, ch chan *kafka.Message, opts service.ConsumerOptions) error {
- if c.Subscribed() {
- return fmt.Errorf("illegal state: already subscribed")
- }
- if err := c.session.SubscribeTopics(topics, rebalanceCallback); err != nil {
- return err
- }
- c.state.Set(flagSubscribed)
- for c.Subscribed() {
- for c.Paused() {
- time.Sleep(100 * time.Millisecond)
- }
- message, err := c.poll()
- if err != nil {
- // silently wait for a next message
- if errors.Is(err, ErrNoMessage) {
- continue
- }
- log.Debug().
- Str("service", "consumer").
- Err(err).
- Send()
- c.state.Remove(flagSubscribed)
- return err
- }
- if message != nil {
- if opts.Counter != nil {
- opts.Counter.Inc()
- }
- ch <- message
- }
- }
- log.Debug().Msg("consumer closed gracefully")
- return nil
- }
- func (c *_consumer) Close() error {
- if c.Subscribed() {
- c.state.Remove(flagSubscribed)
- }
- var wg sync.WaitGroup
- wg.Add(1)
- go func() {
- defer wg.Done()
- if c.session != nil {
- time.Sleep(time.Second)
- _ = c.session.Close() //nolint:errcheck
- }
- }()
- wg.Wait()
- return nil
- }
- func (c *_consumer) poll() (*kafka.Message, error) {
- var (
- ev = c.session.Poll(c.config.Timeout)
- err error
- )
- switch e := ev.(type) {
- case *kafka.Message:
- for i := range c.handlers {
- if err = c.handlers[i](e); err != nil {
- return nil, err
- }
- }
- return e, nil
- case kafka.Error:
- log.Debug().
- Str("service", "consumer").
- Err(e).
- Send()
- if e.Code() == kafka.ErrAllBrokersDown {
- c.state.Remove(flagSubscribed)
- }
- return nil, e
- default:
- if e != nil {
- log.Debug().
- Str("service", "consumer").
- Any("event", e).
- Send()
- }
- return nil, ErrNoMessage
- }
- }
- // rebalanceCallback is called on each group rebalance to assign additional
- // partitions, or remove existing partitions, from the consumer's current
- // assignment.
- //
- // The application may use this optional callback to inspect the assignment,
- // alter the initial start offset (the .Offset field of each assigned partition),
- // and read/write offsets to commit to an alternative store outside of Kafka.
- func rebalanceCallback(c *kafka.Consumer, event kafka.Event) error {
- switch ev := event.(type) {
- case kafka.AssignedPartitions:
- log.Debug().
- Str("service", "consumer").
- Msgf("%s rebalance: %d new partition(s) assigned: %v",
- c.GetRebalanceProtocol(),
- len(ev.Partitions),
- ev.Partitions)
- // The application may update the start .Offset of each
- // assigned partition and then call IncrementalAssign().
- if err := c.IncrementalAssign(ev.Partitions); err != nil {
- panic(err)
- }
- case kafka.RevokedPartitions:
- log.Debug().
- Str("service", "consumer").
- Msgf("%s rebalance: %d partition(s) revoked: %v",
- c.GetRebalanceProtocol(),
- len(ev.Partitions),
- ev.Partitions)
- // Usually, the rebalance callback for `RevokedPartitions` is called
- // just before the partitions are revoked. We can be certain that a
- // partition being revoked is not yet owned by any other consumer.
- // This way, logic like storing any pending offsets or committing
- // offsets can be handled.
- // However, there can be cases where the assignment is lost
- // involuntarily. In this case, the partition might already be owned
- // by another consumer, and operations including committing
- // offsets may not work.
- if c.AssignmentLost() {
- // Our consumer has been kicked out of the group and the
- // entire assignment is thus lost.
- log.Debug().
- Str("service", "consumer").
- Msg("Assignment lost involuntarily, commit may fail")
- }
- // The client automatically calls IncrementalUnassign() unless
- // the callback has already called that method.
- default:
- log.Debug().
- Str("service", "consumer").
- Msgf("unxpected event type: %v", event)
- }
- return nil
- }
|