kafka_consumer.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. package consumer
  2. import (
  3. "errors"
  4. "fmt"
  5. "git.beejay.kim/tool/service"
  6. "github.com/confluentinc/confluent-kafka-go/v2/kafka"
  7. "github.com/google/uuid"
  8. "github.com/kelindar/bitmap"
  9. "github.com/rs/zerolog/log"
  10. "strings"
  11. "sync"
  12. "time"
  13. )
  14. const (
  15. flagSubscribed uint32 = iota
  16. )
  17. //goland:noinspection ALL
  18. type _consumer struct {
  19. config *Config
  20. state bitmap.Bitmap
  21. handlers []service.ConsumerHandler
  22. session *kafka.Consumer
  23. }
  24. func NewKafkaConsumer(cfg *Config) (service.Consumer, error) {
  25. var (
  26. c = &_consumer{
  27. config: cfg,
  28. }
  29. err error
  30. )
  31. if cfg == nil {
  32. return nil, fmt.Errorf("config must be provided")
  33. }
  34. opts := &kafka.ConfigMap{
  35. "broker.address.family": "v4",
  36. "bootstrap.servers": strings.Join(c.config.Hosts, ","),
  37. "group.id": c.config.Group,
  38. "partition.assignment.strategy": "cooperative-sticky",
  39. "auto.offset.reset": "earliest",
  40. "log_level": 0,
  41. "enable.auto.commit": false,
  42. }
  43. if c.session, err = kafka.NewConsumer(opts); err != nil {
  44. return nil, err
  45. }
  46. return c, nil
  47. }
  48. func (c *_consumer) ID() uuid.UUID {
  49. return uuid.NewSHA1(uuid.NameSpaceDNS, []byte("consumer.kafka"))
  50. }
  51. func (c *_consumer) Subscribed() bool {
  52. return c.state.Contains(flagSubscribed)
  53. }
  54. func (c *_consumer) RegisterHandlers(handlers ...service.ConsumerHandler) {
  55. c.handlers = append(c.handlers, handlers...)
  56. }
  57. func (c *_consumer) Subscribe(topics []string, ch chan *kafka.Message) error {
  58. if c.Subscribed() {
  59. return fmt.Errorf("illegal state: already subscribed")
  60. }
  61. if err := c.session.SubscribeTopics(topics, rebalanceCallback); err != nil {
  62. return err
  63. }
  64. c.state.Set(flagSubscribed)
  65. for c.Subscribed() {
  66. message, err := c.poll()
  67. if err != nil {
  68. // silently wait for a next message
  69. if errors.Is(err, ErrNoMessage) {
  70. continue
  71. }
  72. log.Debug().
  73. Str("service", "consumer").
  74. Err(err).
  75. Send()
  76. c.state.Remove(flagSubscribed)
  77. return err
  78. }
  79. if message != nil {
  80. ch <- message
  81. }
  82. }
  83. log.Debug().Msg("consumer closed gracefully")
  84. return nil
  85. }
  86. func (c *_consumer) Close() error {
  87. if c.Subscribed() {
  88. c.state.Remove(flagSubscribed)
  89. }
  90. var wg sync.WaitGroup
  91. wg.Add(1)
  92. go func() {
  93. defer wg.Done()
  94. if c.session != nil {
  95. time.Sleep(time.Second)
  96. _ = c.session.Close() //nolint:errcheck
  97. }
  98. }()
  99. wg.Wait()
  100. return nil
  101. }
  102. func (c *_consumer) poll() (*kafka.Message, error) {
  103. var (
  104. ev = c.session.Poll(c.config.Timeout)
  105. err error
  106. )
  107. switch e := ev.(type) {
  108. case *kafka.Message:
  109. for i := range c.handlers {
  110. if err = c.handlers[i](e); err != nil {
  111. return nil, err
  112. }
  113. }
  114. // Handle manual commit since enable.auto.commit is unset.
  115. if err = maybeCommit(c.session, e.TopicPartition); err != nil {
  116. return nil, err
  117. }
  118. return e, nil
  119. case kafka.Error:
  120. log.Debug().
  121. Str("service", "consumer").
  122. Err(e).
  123. Send()
  124. return nil, e
  125. default:
  126. if e != nil {
  127. log.Debug().
  128. Str("service", "consumer").
  129. Any("event", e).
  130. Send()
  131. }
  132. return nil, ErrNoMessage
  133. }
  134. }
  135. // maybeCommit is called for each message we receive from a Kafka topic.
  136. // This method can be used to apply some arbitary logic/processing to the
  137. // offsets, write the offsets into some external storage, and finally, to
  138. // decide when we want to commit already-stored offsets into Kafka.
  139. func maybeCommit(c *kafka.Consumer, topicPartition kafka.TopicPartition) error {
  140. // Commit the already-stored offsets to Kafka whenever the offset is divisible
  141. // by 10, otherwise return early.
  142. // This logic is completely arbitrary. We can use any other internal or
  143. // external variables to decide when we commit the already-stored offsets.
  144. if topicPartition.Offset%10 != 0 {
  145. return nil
  146. }
  147. commitedOffsets, err := c.Commit()
  148. // ErrNoOffset occurs when there are no stored offsets to commit. This
  149. // can happen if we haven't stored anything since the last commit.
  150. // While this will never happen for this example since we call this method
  151. // per-message, and thus, always have something to commit, the error
  152. // handling is illustrative of how to handle it in cases we call Commit()
  153. // in another way, for example, every N seconds.
  154. if err != nil && err.(kafka.Error).Code() != kafka.ErrNoOffset {
  155. return err
  156. }
  157. log.Debug().
  158. Str("service", "consumer").
  159. Msgf("commited offsets to Kafka: %v", commitedOffsets)
  160. return nil
  161. }
  162. // rebalanceCallback is called on each group rebalance to assign additional
  163. // partitions, or remove existing partitions, from the consumer's current
  164. // assignment.
  165. //
  166. // A rebalance occurs when a consumer joins or leaves a consumer group, if it
  167. // changes the topic(s) it's subscribed to, or if there's a change in one of
  168. // the topics it's subscribed to, for example, the total number of partitions
  169. // increases.
  170. //
  171. // The application may use this optional callback to inspect the assignment,
  172. // alter the initial start offset (the .Offset field of each assigned partition),
  173. // and read/write offsets to commit to an alternative store outside of Kafka.
  174. func rebalanceCallback(c *kafka.Consumer, event kafka.Event) error {
  175. switch ev := event.(type) {
  176. case kafka.AssignedPartitions:
  177. log.Debug().
  178. Str("service", "consumer").
  179. Msgf("%s rebalance: %d new partition(s) assigned: %v",
  180. c.GetRebalanceProtocol(),
  181. len(ev.Partitions),
  182. ev.Partitions)
  183. err := c.Assign(ev.Partitions)
  184. if err != nil {
  185. return err
  186. }
  187. case kafka.RevokedPartitions:
  188. log.Debug().
  189. Str("service", "consumer").
  190. Msgf("%s rebalance: %d partition(s) revoked: %v",
  191. c.GetRebalanceProtocol(),
  192. len(ev.Partitions),
  193. ev.Partitions)
  194. // Usually, the rebalance callback for `RevokedPartitions` is called
  195. // just before the partitions are revoked. We can be certain that a
  196. // partition being revoked is not yet owned by any other consumer.
  197. // This way, logic like storing any pending offsets or committing
  198. // offsets can be handled.
  199. // However, there can be cases where the assignment is lost
  200. // involuntarily. In this case, the partition might already be owned
  201. // by another consumer, and operations including committing
  202. // offsets may not work.
  203. if c.AssignmentLost() {
  204. // Our consumer has been kicked out of the group and the
  205. // entire assignment is thus lost.
  206. log.Debug().
  207. Str("service", "consumer").
  208. Msg("Assignment lost involuntarily, commit may fail")
  209. }
  210. // Since enable.auto.commit is unset, we need to commit offsets manually
  211. // before the partition is revoked.
  212. commitedOffsets, err := c.Commit()
  213. if err != nil && err.(kafka.Error).Code() != kafka.ErrNoOffset {
  214. log.Debug().
  215. Str("service", "consumer").
  216. Err(err).
  217. Msg("failed to commit offsets")
  218. return err
  219. }
  220. log.Debug().
  221. Str("service", "consumer").
  222. Msgf("commited offsets to Kafka: %v", commitedOffsets)
  223. // Similar to Assign, client automatically calls Unassign() unless the
  224. // callback has already called that method. Here, we don't call it.
  225. default:
  226. log.Debug().
  227. Str("service", "consumer").
  228. Msgf("unxpected event type: %v", event)
  229. }
  230. return nil
  231. }