producer.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. package producer
  2. import (
  3. "fmt"
  4. "git.beejay.kim/tool/service"
  5. "github.com/confluentinc/confluent-kafka-go/v2/kafka"
  6. "github.com/google/uuid"
  7. "github.com/rs/zerolog/log"
  8. "strings"
  9. "sync"
  10. "time"
  11. )
  12. type _producer struct {
  13. config *Config
  14. session *kafka.Producer
  15. }
  16. func New(cfg *Config) (service.Producer, error) {
  17. var (
  18. p = new(_producer)
  19. err error
  20. )
  21. if cfg == nil {
  22. return nil, fmt.Errorf("config must be provided")
  23. }
  24. p.config = cfg
  25. opts := &kafka.ConfigMap{
  26. "bootstrap.servers": strings.Join(p.config.Hosts, ","),
  27. }
  28. if p.session, err = kafka.NewProducer(opts); err != nil {
  29. return nil, err
  30. }
  31. go func() {
  32. for e := range p.session.Events() {
  33. switch ev := e.(type) {
  34. case kafka.Error:
  35. // Generic client instance-level errors, such as
  36. // broker connection failures, authentication issues, etc.
  37. //
  38. // These errors should generally be considered informational
  39. // as the underlying client will automatically try to
  40. // recover from any errors encountered, the application
  41. // does not need to take action on them.
  42. log.Debug().Str("service", "producer").Err(ev).Send()
  43. default:
  44. log.Debug().Str("service", "producer").Msgf("ignored event: %s", ev)
  45. }
  46. }
  47. }()
  48. return p, nil
  49. }
  50. func (p *_producer) ID() uuid.UUID {
  51. return uuid.NewSHA1(uuid.NameSpaceDNS, []byte("producer.kafka"))
  52. }
  53. func (p *_producer) Close() error {
  54. var wg sync.WaitGroup
  55. wg.Add(1)
  56. go func() {
  57. defer wg.Done()
  58. if p.session != nil {
  59. time.Sleep(time.Second)
  60. p.session.Close() //nolint:errcheck
  61. }
  62. }()
  63. wg.Wait()
  64. return nil
  65. }
  66. func (p *_producer) Produce(message *kafka.Message) error {
  67. var (
  68. delivery = make(chan kafka.Event)
  69. err error
  70. )
  71. defer close(delivery)
  72. if message.TopicPartition.Topic == nil {
  73. return fmt.Errorf("illegal message: at least one topic name must be provided")
  74. }
  75. if err = p.session.Produce(message, delivery); err != nil {
  76. return err
  77. }
  78. e := <-delivery
  79. m := e.(*kafka.Message)
  80. if m.TopicPartition.Error != nil {
  81. return fmt.Errorf("delivery failed: %v", m.TopicPartition.Error)
  82. }
  83. return nil
  84. }