producer.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. package producer
  2. import (
  3. "errors"
  4. "fmt"
  5. "git.beejay.kim/tool/service"
  6. "github.com/confluentinc/confluent-kafka-go/v2/kafka"
  7. "github.com/google/uuid"
  8. "github.com/rs/zerolog/log"
  9. "strings"
  10. "sync"
  11. "time"
  12. )
  13. type _producer struct {
  14. config *Config
  15. session *kafka.Producer
  16. }
  17. func New(cfg *Config) (service.Producer, error) {
  18. var (
  19. p = new(_producer)
  20. err error
  21. )
  22. if cfg == nil {
  23. return nil, errors.New("config must be provided")
  24. }
  25. p.config = cfg
  26. opts := &kafka.ConfigMap{
  27. "bootstrap.servers": strings.Join(p.config.Hosts, ","),
  28. }
  29. if p.session, err = kafka.NewProducer(opts); err != nil {
  30. return nil, err
  31. }
  32. go func() {
  33. for e := range p.session.Events() {
  34. switch ev := e.(type) {
  35. case kafka.Error:
  36. // Generic client instance-level errors, such as
  37. // broker connection failures, authentication issues, etc.
  38. //
  39. // These errors should generally be considered informational
  40. // as the underlying client will automatically try to
  41. // recover from any errors encountered, the application
  42. // does not need to take action on them.
  43. log.Debug().Str("service", "producer").Err(ev).Send()
  44. default:
  45. log.Debug().Str("service", "producer").Msgf("ignored event: %s", ev)
  46. }
  47. }
  48. }()
  49. return p, nil
  50. }
  51. func (p *_producer) ID() uuid.UUID {
  52. return uuid.NewSHA1(uuid.NameSpaceDNS, []byte("producer.kafka"))
  53. }
  54. func (p *_producer) Close() error {
  55. var wg sync.WaitGroup
  56. wg.Add(1)
  57. go func() {
  58. defer wg.Done()
  59. if p.session != nil {
  60. time.Sleep(time.Second)
  61. p.session.Close() //nolint:errcheck
  62. }
  63. }()
  64. wg.Wait()
  65. return nil
  66. }
  67. func (p *_producer) Produce(message *kafka.Message) error {
  68. var (
  69. delivery = make(chan kafka.Event)
  70. err error
  71. )
  72. defer close(delivery)
  73. if message.TopicPartition.Topic == nil {
  74. return errors.New("illegal message: at least one topic name must be provided")
  75. }
  76. if err = p.session.Produce(message, delivery); err != nil {
  77. return err
  78. }
  79. e := <-delivery
  80. m := e.(*kafka.Message)
  81. if m.TopicPartition.Error != nil {
  82. return fmt.Errorf("delivery failed: %v", m.TopicPartition.Error)
  83. }
  84. return nil
  85. }