consumer.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. package consumer
  2. import (
  3. "errors"
  4. "fmt"
  5. "git.beejay.kim/tool/service"
  6. "github.com/confluentinc/confluent-kafka-go/v2/kafka"
  7. "github.com/google/uuid"
  8. "github.com/kelindar/bitmap"
  9. "github.com/rs/zerolog/log"
  10. "strings"
  11. "sync"
  12. "time"
  13. )
  14. const (
  15. flagSubscribed uint32 = iota
  16. )
  17. //goland:noinspection ALL
  18. type _consumer struct {
  19. config *Config
  20. state bitmap.Bitmap
  21. handlers []service.ConsumerHandler
  22. session *kafka.Consumer
  23. }
  24. func New(cfg *Config) (service.Consumer, error) {
  25. var (
  26. c = &_consumer{
  27. config: cfg,
  28. }
  29. err error
  30. )
  31. if cfg == nil {
  32. return nil, fmt.Errorf("config must be provided")
  33. }
  34. opts := &kafka.ConfigMap{
  35. "broker.address.family": "v4",
  36. "bootstrap.servers": strings.Join(c.config.Hosts, ","),
  37. "group.id": c.config.Group,
  38. "partition.assignment.strategy": "cooperative-sticky",
  39. "auto.offset.reset": "earliest",
  40. "log_level": 0,
  41. }
  42. if c.session, err = kafka.NewConsumer(opts); err != nil {
  43. return nil, err
  44. }
  45. return c, nil
  46. }
  47. func (c *_consumer) ID() uuid.UUID {
  48. return uuid.NewSHA1(uuid.NameSpaceDNS, []byte("consumer.kafka"))
  49. }
  50. func (c *_consumer) Subscribed() bool {
  51. return c.state.Contains(flagSubscribed)
  52. }
  53. func (c *_consumer) RegisterHandlers(handlers ...service.ConsumerHandler) {
  54. c.handlers = append(c.handlers, handlers...)
  55. }
  56. func (c *_consumer) Subscribe(topics []string, ch chan *kafka.Message) error {
  57. if c.Subscribed() {
  58. return fmt.Errorf("illegal state: already subscribed")
  59. }
  60. if err := c.session.SubscribeTopics(topics, rebalanceCallback); err != nil {
  61. return err
  62. }
  63. c.state.Set(flagSubscribed)
  64. for c.Subscribed() {
  65. message, err := c.poll()
  66. if err != nil {
  67. // silently wait for a next message
  68. if errors.Is(err, ErrNoMessage) {
  69. continue
  70. }
  71. log.Debug().
  72. Str("service", "consumer").
  73. Err(err).
  74. Send()
  75. c.state.Remove(flagSubscribed)
  76. return err
  77. }
  78. if message != nil {
  79. ch <- message
  80. }
  81. }
  82. log.Debug().Msg("consumer closed gracefully")
  83. return nil
  84. }
  85. func (c *_consumer) Close() error {
  86. if c.Subscribed() {
  87. c.state.Remove(flagSubscribed)
  88. }
  89. var wg sync.WaitGroup
  90. wg.Add(1)
  91. go func() {
  92. defer wg.Done()
  93. if c.session != nil {
  94. time.Sleep(time.Second)
  95. _ = c.session.Close() //nolint:errcheck
  96. }
  97. }()
  98. wg.Wait()
  99. return nil
  100. }
  101. func (c *_consumer) poll() (*kafka.Message, error) {
  102. var (
  103. ev = c.session.Poll(c.config.Timeout)
  104. err error
  105. )
  106. switch e := ev.(type) {
  107. case *kafka.Message:
  108. for i := range c.handlers {
  109. if err = c.handlers[i](e); err != nil {
  110. return nil, err
  111. }
  112. }
  113. return e, nil
  114. case kafka.Error:
  115. log.Debug().
  116. Str("service", "consumer").
  117. Err(e).
  118. Send()
  119. if e.Code() == kafka.ErrAllBrokersDown {
  120. c.state.Remove(flagSubscribed)
  121. }
  122. return nil, e
  123. default:
  124. if e != nil {
  125. log.Debug().
  126. Str("service", "consumer").
  127. Any("event", e).
  128. Send()
  129. }
  130. return nil, ErrNoMessage
  131. }
  132. }
  133. // rebalanceCallback is called on each group rebalance to assign additional
  134. // partitions, or remove existing partitions, from the consumer's current
  135. // assignment.
  136. //
  137. // The application may use this optional callback to inspect the assignment,
  138. // alter the initial start offset (the .Offset field of each assigned partition),
  139. // and read/write offsets to commit to an alternative store outside of Kafka.
  140. func rebalanceCallback(c *kafka.Consumer, event kafka.Event) error {
  141. switch ev := event.(type) {
  142. case kafka.AssignedPartitions:
  143. log.Debug().
  144. Str("service", "consumer").
  145. Msgf("%s rebalance: %d new partition(s) assigned: %v",
  146. c.GetRebalanceProtocol(),
  147. len(ev.Partitions),
  148. ev.Partitions)
  149. // The application may update the start .Offset of each
  150. // assigned partition and then call IncrementalAssign().
  151. if err := c.IncrementalAssign(ev.Partitions); err != nil {
  152. panic(err)
  153. }
  154. case kafka.RevokedPartitions:
  155. log.Debug().
  156. Str("service", "consumer").
  157. Msgf("%s rebalance: %d partition(s) revoked: %v",
  158. c.GetRebalanceProtocol(),
  159. len(ev.Partitions),
  160. ev.Partitions)
  161. // Usually, the rebalance callback for `RevokedPartitions` is called
  162. // just before the partitions are revoked. We can be certain that a
  163. // partition being revoked is not yet owned by any other consumer.
  164. // This way, logic like storing any pending offsets or committing
  165. // offsets can be handled.
  166. // However, there can be cases where the assignment is lost
  167. // involuntarily. In this case, the partition might already be owned
  168. // by another consumer, and operations including committing
  169. // offsets may not work.
  170. if c.AssignmentLost() {
  171. // Our consumer has been kicked out of the group and the
  172. // entire assignment is thus lost.
  173. log.Debug().
  174. Str("service", "consumer").
  175. Msg("Assignment lost involuntarily, commit may fail")
  176. }
  177. // The client automatically calls IncrementalUnassign() unless
  178. // the callback has already called that method.
  179. default:
  180. log.Debug().
  181. Str("service", "consumer").
  182. Msgf("unxpected event type: %v", event)
  183. }
  184. return nil
  185. }