producer.go 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. package kafka
  2. import (
  3. "fmt"
  4. lib "github.com/confluentinc/confluent-kafka-go/v2/kafka"
  5. "strings"
  6. "sync"
  7. "time"
  8. )
  9. type _producer struct {
  10. config *Config
  11. session *lib.Producer
  12. }
  13. func NewProducer(cfg *Config) (Producer, error) {
  14. var (
  15. p = &_producer{
  16. config: cfg,
  17. }
  18. err error
  19. )
  20. if cfg == nil {
  21. return nil, fmt.Errorf("config must be provided")
  22. }
  23. opts := &lib.ConfigMap{
  24. "bootstrap.servers": strings.Join(p.config.Hosts, ","),
  25. }
  26. if p.session, err = lib.NewProducer(opts); err != nil {
  27. return nil, err
  28. }
  29. return p, nil
  30. }
  31. func (p *_producer) Produce(message *lib.Message) error {
  32. var (
  33. delivery = make(chan lib.Event)
  34. err error
  35. )
  36. if message.TopicPartition.Topic == nil {
  37. return fmt.Errorf("illegal message: at least one topic must be provided for topic partition")
  38. }
  39. if err = p.session.Produce(message, delivery); err != nil {
  40. return err
  41. }
  42. e := <-delivery
  43. m := e.(*lib.Message)
  44. if m.TopicPartition.Error != nil {
  45. return fmt.Errorf("delivery failed: %v", m.TopicPartition.Error)
  46. }
  47. close(delivery)
  48. return nil
  49. }
  50. func (p *_producer) String() string {
  51. return "kafka:producer"
  52. }
  53. func (p *_producer) Close() error {
  54. var wg sync.WaitGroup
  55. wg.Add(1)
  56. go func() {
  57. defer wg.Done()
  58. if p.session != nil {
  59. time.Sleep(time.Second)
  60. p.session.Close() //nolint:errcheck
  61. }
  62. }()
  63. wg.Wait()
  64. return nil
  65. }