consumer.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. package consumer
  2. import (
  3. "errors"
  4. "fmt"
  5. "git.beejay.kim/tool/service"
  6. "github.com/confluentinc/confluent-kafka-go/v2/kafka"
  7. "github.com/google/uuid"
  8. "github.com/kelindar/bitmap"
  9. "github.com/rs/zerolog/log"
  10. "strings"
  11. "sync"
  12. "time"
  13. )
  14. const (
  15. flagSubscribed uint32 = iota
  16. flagPaused
  17. )
  18. //goland:noinspection ALL
  19. type _consumer struct {
  20. config *Config
  21. state bitmap.Bitmap
  22. handlers []service.ConsumerHandler
  23. session *kafka.Consumer
  24. }
  25. func New(cfg *Config) (service.Consumer, error) {
  26. var (
  27. c = &_consumer{
  28. config: cfg,
  29. }
  30. err error
  31. )
  32. if cfg == nil {
  33. return nil, fmt.Errorf("config must be provided")
  34. }
  35. opts := &kafka.ConfigMap{
  36. "broker.address.family": "v4",
  37. "bootstrap.servers": strings.Join(c.config.Hosts, ","),
  38. "group.id": c.config.Group,
  39. "partition.assignment.strategy": "cooperative-sticky",
  40. "auto.offset.reset": "earliest",
  41. "log_level": 0,
  42. }
  43. if c.session, err = kafka.NewConsumer(opts); err != nil {
  44. return nil, err
  45. }
  46. return c, nil
  47. }
  48. func (c *_consumer) ID() uuid.UUID {
  49. return uuid.NewSHA1(uuid.NameSpaceDNS, []byte("consumer.kafka"))
  50. }
  51. func (c *_consumer) Subscribed() bool {
  52. return c.state.Contains(flagSubscribed)
  53. }
  54. func (c *_consumer) Paused() bool {
  55. return c.state.Contains(flagPaused)
  56. }
  57. func (c *_consumer) SetPause(is bool) error {
  58. if is {
  59. c.state.Set(flagPaused)
  60. } else {
  61. c.state.Remove(flagPaused)
  62. }
  63. return nil
  64. }
  65. func (c *_consumer) RegisterHandlers(handlers ...service.ConsumerHandler) {
  66. c.handlers = append(c.handlers, handlers...)
  67. }
  68. func (c *_consumer) Subscribe(topics []string, ch chan *kafka.Message, opts service.ConsumerOptions) error {
  69. if c.Subscribed() {
  70. return fmt.Errorf("illegal state: already subscribed")
  71. }
  72. if err := c.session.SubscribeTopics(topics, rebalanceCallback); err != nil {
  73. return err
  74. }
  75. c.state.Set(flagSubscribed)
  76. for c.Subscribed() {
  77. for c.Paused() {
  78. time.Sleep(100 * time.Millisecond)
  79. }
  80. message, err := c.poll()
  81. if err != nil {
  82. // silently wait for a next message
  83. if errors.Is(err, ErrNoMessage) {
  84. continue
  85. }
  86. log.Debug().
  87. Str("service", "consumer").
  88. Err(err).
  89. Send()
  90. c.state.Remove(flagSubscribed)
  91. return err
  92. }
  93. if message != nil {
  94. if opts.Counter != nil {
  95. opts.Counter.Inc()
  96. }
  97. ch <- message
  98. }
  99. }
  100. log.Debug().Msg("consumer closed gracefully")
  101. return nil
  102. }
  103. func (c *_consumer) Close() error {
  104. if c.Subscribed() {
  105. c.state.Remove(flagSubscribed)
  106. }
  107. var wg sync.WaitGroup
  108. wg.Add(1)
  109. go func() {
  110. defer wg.Done()
  111. if c.session != nil {
  112. time.Sleep(time.Second)
  113. _ = c.session.Close() //nolint:errcheck
  114. }
  115. }()
  116. wg.Wait()
  117. return nil
  118. }
  119. func (c *_consumer) poll() (*kafka.Message, error) {
  120. var (
  121. ev = c.session.Poll(c.config.Timeout)
  122. err error
  123. )
  124. switch e := ev.(type) {
  125. case *kafka.Message:
  126. for i := range c.handlers {
  127. if err = c.handlers[i](e); err != nil {
  128. return nil, err
  129. }
  130. }
  131. return e, nil
  132. case kafka.Error:
  133. log.Debug().
  134. Str("service", "consumer").
  135. Err(e).
  136. Send()
  137. if e.Code() == kafka.ErrAllBrokersDown {
  138. c.state.Remove(flagSubscribed)
  139. }
  140. return nil, e
  141. default:
  142. if e != nil {
  143. log.Debug().
  144. Str("service", "consumer").
  145. Any("event", e).
  146. Send()
  147. }
  148. return nil, ErrNoMessage
  149. }
  150. }
  151. // rebalanceCallback is called on each group rebalance to assign additional
  152. // partitions, or remove existing partitions, from the consumer's current
  153. // assignment.
  154. //
  155. // The application may use this optional callback to inspect the assignment,
  156. // alter the initial start offset (the .Offset field of each assigned partition),
  157. // and read/write offsets to commit to an alternative store outside of Kafka.
  158. func rebalanceCallback(c *kafka.Consumer, event kafka.Event) error {
  159. switch ev := event.(type) {
  160. case kafka.AssignedPartitions:
  161. log.Debug().
  162. Str("service", "consumer").
  163. Msgf("%s rebalance: %d new partition(s) assigned: %v",
  164. c.GetRebalanceProtocol(),
  165. len(ev.Partitions),
  166. ev.Partitions)
  167. // The application may update the start .Offset of each
  168. // assigned partition and then call IncrementalAssign().
  169. if err := c.IncrementalAssign(ev.Partitions); err != nil {
  170. panic(err)
  171. }
  172. case kafka.RevokedPartitions:
  173. log.Debug().
  174. Str("service", "consumer").
  175. Msgf("%s rebalance: %d partition(s) revoked: %v",
  176. c.GetRebalanceProtocol(),
  177. len(ev.Partitions),
  178. ev.Partitions)
  179. // Usually, the rebalance callback for `RevokedPartitions` is called
  180. // just before the partitions are revoked. We can be certain that a
  181. // partition being revoked is not yet owned by any other consumer.
  182. // This way, logic like storing any pending offsets or committing
  183. // offsets can be handled.
  184. // However, there can be cases where the assignment is lost
  185. // involuntarily. In this case, the partition might already be owned
  186. // by another consumer, and operations including committing
  187. // offsets may not work.
  188. if c.AssignmentLost() {
  189. // Our consumer has been kicked out of the group and the
  190. // entire assignment is thus lost.
  191. log.Debug().
  192. Str("service", "consumer").
  193. Msg("Assignment lost involuntarily, commit may fail")
  194. }
  195. // The client automatically calls IncrementalUnassign() unless
  196. // the callback has already called that method.
  197. default:
  198. log.Debug().
  199. Str("service", "consumer").
  200. Msgf("unxpected event type: %v", event)
  201. }
  202. return nil
  203. }