consumer.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. package kafka
  2. import (
  3. "errors"
  4. "fmt"
  5. lib "github.com/confluentinc/confluent-kafka-go/v2/kafka"
  6. "github.com/rs/zerolog/log"
  7. "strings"
  8. "sync"
  9. "time"
  10. )
  11. var ErrNoMessage = errors.New("no message")
  12. type _consumer struct {
  13. config *Config
  14. session *lib.Consumer
  15. }
  16. func NewConsumer(cfg *Config) (Consumer, error) {
  17. var (
  18. c = &_consumer{
  19. config: cfg,
  20. }
  21. err error
  22. )
  23. if cfg == nil {
  24. return nil, fmt.Errorf("config must be provided")
  25. }
  26. opts := &lib.ConfigMap{
  27. "broker.address.family": "v4",
  28. "bootstrap.servers": strings.Join(c.config.Hosts, ","),
  29. "group.id": c.config.Group,
  30. "partition.assignment.strategy": "cooperative-sticky",
  31. "auto.offset.reset": "earliest",
  32. "log_level": 0,
  33. }
  34. if c.session, err = lib.NewConsumer(opts); err != nil {
  35. return nil, err
  36. }
  37. return c, nil
  38. }
  39. func (c *_consumer) Subscribe(topics []string, ch chan *lib.Message) error {
  40. if topics == nil {
  41. return fmt.Errorf("illegal arguments: at least one topic must be provided")
  42. }
  43. if err := c.session.SubscribeTopics(topics, rebalanceCallback); err != nil {
  44. return err
  45. }
  46. for {
  47. message, err := c.pollMessage()
  48. if err != nil {
  49. // silently wait for a next message
  50. if errors.Is(err, ErrNoMessage) {
  51. continue
  52. }
  53. }
  54. if message != nil {
  55. ch <- message
  56. }
  57. }
  58. }
  59. func (c *_consumer) String() string {
  60. return "kafka:consumer"
  61. }
  62. func (c *_consumer) Close() error {
  63. var wg sync.WaitGroup
  64. wg.Add(1)
  65. go func() {
  66. defer wg.Done()
  67. if c.session != nil {
  68. time.Sleep(time.Second)
  69. _ = c.session.Close() //nolint:errcheck
  70. }
  71. }()
  72. wg.Wait()
  73. return nil
  74. }
  75. func (c *_consumer) pollMessage() (*lib.Message, error) {
  76. ev := c.session.Poll(c.config.Timeout)
  77. switch e := ev.(type) {
  78. case *lib.Message:
  79. return e, nil
  80. case lib.Error:
  81. return nil, e
  82. default:
  83. if e != nil {
  84. log.Debug().
  85. Str("service", "consumer").
  86. Any("event", e).
  87. Send()
  88. }
  89. return nil, ErrNoMessage
  90. }
  91. }