consumer.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. package consumer
  2. import (
  3. "errors"
  4. "fmt"
  5. "git.beejay.kim/tool/service"
  6. "github.com/confluentinc/confluent-kafka-go/v2/kafka"
  7. "github.com/google/uuid"
  8. "github.com/kelindar/bitmap"
  9. "github.com/rs/zerolog/log"
  10. "github.com/samber/lo"
  11. "strings"
  12. "sync"
  13. "time"
  14. )
  15. const (
  16. FlagPause uint32 = iota
  17. FlagStop
  18. )
  19. //goland:noinspection ALL
  20. type _consumer struct {
  21. config *Config
  22. handlers []service.ConsumerHandler
  23. session *kafka.Consumer
  24. subscriptions []chan uint32
  25. }
  26. func New(cfg *Config) (service.Consumer, error) {
  27. var (
  28. c = &_consumer{
  29. config: cfg,
  30. }
  31. err error
  32. )
  33. if cfg == nil {
  34. return nil, fmt.Errorf("config must be provided")
  35. }
  36. opts := &kafka.ConfigMap{
  37. "broker.address.family": "v4",
  38. "bootstrap.servers": strings.Join(c.config.Hosts, ","),
  39. "group.id": c.config.Group,
  40. "partition.assignment.strategy": "cooperative-sticky",
  41. "auto.offset.reset": "earliest",
  42. "log_level": 0,
  43. }
  44. if c.session, err = kafka.NewConsumer(opts); err != nil {
  45. return nil, err
  46. }
  47. return c, nil
  48. }
  49. func (c *_consumer) ID() uuid.UUID {
  50. return uuid.NewSHA1(uuid.NameSpaceDNS, []byte("consumer.kafka"))
  51. }
  52. func (c *_consumer) RegisterHandlers(handlers ...service.ConsumerHandler) {
  53. c.handlers = append(c.handlers, handlers...)
  54. }
  55. func (c *_consumer) Subscribe(topics []string, io chan uint32) (chan *kafka.Message, error) {
  56. if topics == nil || len(topics) == 0 {
  57. return nil, errors.New("illegal arguments: at least one topic must be provided to subscribe")
  58. }
  59. var (
  60. messages = make(chan *kafka.Message)
  61. err error
  62. )
  63. if err = c.session.SubscribeTopics(topics, rebalanceCallback); err != nil {
  64. return nil, err
  65. }
  66. go func() {
  67. defer func() {
  68. c.subscriptions = lo.Filter(c.subscriptions, func(ch chan uint32, _ int) bool {
  69. return io != ch
  70. })
  71. }()
  72. var (
  73. state bitmap.Bitmap
  74. message *kafka.Message
  75. )
  76. for {
  77. select {
  78. case s := <-io:
  79. switch s {
  80. case FlagStop:
  81. log.Debug().
  82. Str("service", "consumer").
  83. Msg("consumer gracefully closed")
  84. return
  85. case FlagPause:
  86. if state.Contains(FlagPause) {
  87. state.Remove(FlagPause)
  88. } else {
  89. state.Set(FlagPause)
  90. }
  91. }
  92. default:
  93. }
  94. if state.Contains(FlagPause) {
  95. <-time.After(100 * time.Millisecond)
  96. continue
  97. }
  98. message, err = c.poll()
  99. if err != nil {
  100. // silently wait for a next message
  101. if errors.Is(err, ErrNoMessage) {
  102. continue
  103. }
  104. log.Debug().
  105. Str("service", "consumer").
  106. Err(err).
  107. Send()
  108. messages <- nil
  109. return
  110. }
  111. if message != nil {
  112. messages <- message
  113. }
  114. }
  115. }()
  116. c.subscriptions = append(c.subscriptions, io)
  117. return messages, nil
  118. }
  119. func (c *_consumer) Close() error {
  120. var wg sync.WaitGroup
  121. wg.Add(2)
  122. go func() {
  123. defer wg.Done()
  124. for i := range c.subscriptions {
  125. if c.subscriptions[i] != nil {
  126. c.subscriptions[i] <- FlagStop
  127. }
  128. }
  129. }()
  130. go func() {
  131. defer wg.Done()
  132. if c.session != nil {
  133. time.Sleep(time.Second)
  134. _ = c.session.Close() //nolint:errcheck
  135. }
  136. }()
  137. wg.Wait()
  138. return nil
  139. }
  140. func (c *_consumer) poll() (*kafka.Message, error) {
  141. var (
  142. ev = c.session.Poll(c.config.Timeout)
  143. err error
  144. )
  145. switch e := ev.(type) {
  146. case *kafka.Message:
  147. for i := range c.handlers {
  148. if err = c.handlers[i](e); err != nil {
  149. return nil, err
  150. }
  151. }
  152. return e, nil
  153. case kafka.Error:
  154. return nil, e
  155. case kafka.OffsetsCommitted:
  156. if e.Error != nil {
  157. return nil, err
  158. }
  159. l := log.Debug().Str("service", "consumer")
  160. for _, offset := range e.Offsets {
  161. if offset.Topic == nil {
  162. continue
  163. }
  164. l = l.Str(*offset.Topic, fmt.Sprintf("partition: %d; offset: %s",
  165. offset.Partition,
  166. offset.Offset.String()))
  167. }
  168. l.Send()
  169. return nil, ErrNoMessage
  170. default:
  171. if e != nil {
  172. log.Debug().
  173. Str("service", "consumer").
  174. Any("event", e).
  175. Send()
  176. }
  177. return nil, ErrNoMessage
  178. }
  179. }
  180. // rebalanceCallback is called on each group rebalance to assign additional
  181. // partitions, or remove existing partitions, from the consumer's current
  182. // assignment.
  183. //
  184. // The application may use this optional callback to inspect the assignment,
  185. // alter the initial start offset (the .Offset field of each assigned partition),
  186. // and read/write offsets to commit to an alternative store outside of Kafka.
  187. func rebalanceCallback(c *kafka.Consumer, event kafka.Event) error {
  188. switch ev := event.(type) {
  189. case kafka.AssignedPartitions:
  190. log.Debug().
  191. Str("service", "consumer").
  192. Msgf("%s rebalance: %d new partition(s) assigned: %v",
  193. c.GetRebalanceProtocol(),
  194. len(ev.Partitions),
  195. ev.Partitions)
  196. // The application may update the start .Offset of each
  197. // assigned partition and then call IncrementalAssign().
  198. if err := c.IncrementalAssign(ev.Partitions); err != nil {
  199. panic(err)
  200. }
  201. case kafka.RevokedPartitions:
  202. log.Debug().
  203. Str("service", "consumer").
  204. Msgf("%s rebalance: %d partition(s) revoked: %v",
  205. c.GetRebalanceProtocol(),
  206. len(ev.Partitions),
  207. ev.Partitions)
  208. // Usually, the rebalance callback for `RevokedPartitions` is called
  209. // just before the partitions are revoked. We can be certain that a
  210. // partition being revoked is not yet owned by any other consumer.
  211. // This way, logic like storing any pending offsets or committing
  212. // offsets can be handled.
  213. // However, there can be cases where the assignment is lost
  214. // involuntarily. In this case, the partition might already be owned
  215. // by another consumer, and operations including committing
  216. // offsets may not work.
  217. if c.AssignmentLost() {
  218. // Our consumer has been kicked out of the group and the
  219. // entire assignment is thus lost.
  220. log.Debug().
  221. Str("service", "consumer").
  222. Msg("Assignment lost involuntarily, commit may fail")
  223. }
  224. // The client automatically calls IncrementalUnassign() unless
  225. // the callback has already called that method.
  226. default:
  227. log.Debug().
  228. Str("service", "consumer").
  229. Msgf("unxpected event type: %v", event)
  230. }
  231. return nil
  232. }