consumer.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. package consumer
  2. import (
  3. "errors"
  4. "fmt"
  5. "git.beejay.kim/tool/service"
  6. "github.com/confluentinc/confluent-kafka-go/v2/kafka"
  7. "github.com/google/uuid"
  8. "github.com/kelindar/bitmap"
  9. "github.com/rs/zerolog/log"
  10. "strings"
  11. "sync"
  12. "time"
  13. )
  14. const (
  15. flagSubscribed uint32 = iota
  16. )
  17. //goland:noinspection ALL
  18. type _consumer struct {
  19. config *Config
  20. state bitmap.Bitmap
  21. handlers []service.ConsumerHandler
  22. session *kafka.Consumer
  23. }
  24. func New(cfg *Config) (service.Consumer, error) {
  25. var (
  26. c = &_consumer{
  27. config: cfg,
  28. }
  29. err error
  30. )
  31. if cfg == nil {
  32. return nil, fmt.Errorf("config must be provided")
  33. }
  34. opts := &kafka.ConfigMap{
  35. "broker.address.family": "v4",
  36. "bootstrap.servers": strings.Join(c.config.Hosts, ","),
  37. "group.id": c.config.Group,
  38. "partition.assignment.strategy": "cooperative-sticky",
  39. "auto.offset.reset": "earliest",
  40. "log_level": 0,
  41. }
  42. if c.session, err = kafka.NewConsumer(opts); err != nil {
  43. return nil, err
  44. }
  45. return c, nil
  46. }
  47. func (c *_consumer) ID() uuid.UUID {
  48. return uuid.NewSHA1(uuid.NameSpaceDNS, []byte("consumer.kafka"))
  49. }
  50. func (c *_consumer) Subscribed() bool {
  51. return c.state.Contains(flagSubscribed)
  52. }
  53. func (c *_consumer) RegisterHandlers(handlers ...service.ConsumerHandler) {
  54. c.handlers = append(c.handlers, handlers...)
  55. }
  56. func (c *_consumer) Subscribe(topics []string, ch chan *kafka.Message, opts service.ConsumerOptions) error {
  57. if c.Subscribed() {
  58. return fmt.Errorf("illegal state: already subscribed")
  59. }
  60. if err := c.session.SubscribeTopics(topics, rebalanceCallback); err != nil {
  61. return err
  62. }
  63. c.state.Set(flagSubscribed)
  64. for c.Subscribed() {
  65. message, err := c.poll()
  66. if err != nil {
  67. // silently wait for a next message
  68. if errors.Is(err, ErrNoMessage) {
  69. continue
  70. }
  71. log.Debug().
  72. Str("service", "consumer").
  73. Err(err).
  74. Send()
  75. c.state.Remove(flagSubscribed)
  76. return err
  77. }
  78. if message != nil {
  79. if opts.Counter != nil {
  80. opts.Counter.Inc()
  81. }
  82. ch <- message
  83. }
  84. }
  85. log.Debug().Msg("consumer closed gracefully")
  86. return nil
  87. }
  88. func (c *_consumer) Close() error {
  89. if c.Subscribed() {
  90. c.state.Remove(flagSubscribed)
  91. }
  92. var wg sync.WaitGroup
  93. wg.Add(1)
  94. go func() {
  95. defer wg.Done()
  96. if c.session != nil {
  97. time.Sleep(time.Second)
  98. _ = c.session.Close() //nolint:errcheck
  99. }
  100. }()
  101. wg.Wait()
  102. return nil
  103. }
  104. func (c *_consumer) poll() (*kafka.Message, error) {
  105. var (
  106. ev = c.session.Poll(c.config.Timeout)
  107. err error
  108. )
  109. switch e := ev.(type) {
  110. case *kafka.Message:
  111. for i := range c.handlers {
  112. if err = c.handlers[i](e); err != nil {
  113. return nil, err
  114. }
  115. }
  116. return e, nil
  117. case kafka.Error:
  118. log.Debug().
  119. Str("service", "consumer").
  120. Err(e).
  121. Send()
  122. if e.Code() == kafka.ErrAllBrokersDown {
  123. c.state.Remove(flagSubscribed)
  124. }
  125. return nil, e
  126. default:
  127. if e != nil {
  128. log.Debug().
  129. Str("service", "consumer").
  130. Any("event", e).
  131. Send()
  132. }
  133. return nil, ErrNoMessage
  134. }
  135. }
  136. // rebalanceCallback is called on each group rebalance to assign additional
  137. // partitions, or remove existing partitions, from the consumer's current
  138. // assignment.
  139. //
  140. // The application may use this optional callback to inspect the assignment,
  141. // alter the initial start offset (the .Offset field of each assigned partition),
  142. // and read/write offsets to commit to an alternative store outside of Kafka.
  143. func rebalanceCallback(c *kafka.Consumer, event kafka.Event) error {
  144. switch ev := event.(type) {
  145. case kafka.AssignedPartitions:
  146. log.Debug().
  147. Str("service", "consumer").
  148. Msgf("%s rebalance: %d new partition(s) assigned: %v",
  149. c.GetRebalanceProtocol(),
  150. len(ev.Partitions),
  151. ev.Partitions)
  152. // The application may update the start .Offset of each
  153. // assigned partition and then call IncrementalAssign().
  154. if err := c.IncrementalAssign(ev.Partitions); err != nil {
  155. panic(err)
  156. }
  157. case kafka.RevokedPartitions:
  158. log.Debug().
  159. Str("service", "consumer").
  160. Msgf("%s rebalance: %d partition(s) revoked: %v",
  161. c.GetRebalanceProtocol(),
  162. len(ev.Partitions),
  163. ev.Partitions)
  164. // Usually, the rebalance callback for `RevokedPartitions` is called
  165. // just before the partitions are revoked. We can be certain that a
  166. // partition being revoked is not yet owned by any other consumer.
  167. // This way, logic like storing any pending offsets or committing
  168. // offsets can be handled.
  169. // However, there can be cases where the assignment is lost
  170. // involuntarily. In this case, the partition might already be owned
  171. // by another consumer, and operations including committing
  172. // offsets may not work.
  173. if c.AssignmentLost() {
  174. // Our consumer has been kicked out of the group and the
  175. // entire assignment is thus lost.
  176. log.Debug().
  177. Str("service", "consumer").
  178. Msg("Assignment lost involuntarily, commit may fail")
  179. }
  180. // The client automatically calls IncrementalUnassign() unless
  181. // the callback has already called that method.
  182. default:
  183. log.Debug().
  184. Str("service", "consumer").
  185. Msgf("unxpected event type: %v", event)
  186. }
  187. return nil
  188. }