package kafka import ( "fmt" lib "github.com/confluentinc/confluent-kafka-go/v2/kafka" "strings" "sync" "time" ) type _producer struct { config *Config session *lib.Producer } func NewProducer(cfg *Config) (Producer, error) { var ( p = &_producer{ config: cfg, } err error ) if cfg == nil { return nil, fmt.Errorf("config must be provided") } opts := &lib.ConfigMap{ "bootstrap.servers": strings.Join(p.config.Hosts, ","), } if p.session, err = lib.NewProducer(opts); err != nil { return nil, err } return p, nil } func (p *_producer) Produce(message *lib.Message) error { var ( delivery = make(chan lib.Event) err error ) if message.TopicPartition.Topic == nil { return fmt.Errorf("illegal message: at least one topic must be provided for topic partition") } if err = p.session.Produce(message, delivery); err != nil { return err } e := <-delivery m := e.(*lib.Message) if m.TopicPartition.Error != nil { return fmt.Errorf("delivery failed: %v", m.TopicPartition.Error) } close(delivery) return nil } func (p *_producer) String() string { return "kafka:producer" } func (p *_producer) Close() error { var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() if p.session != nil { time.Sleep(time.Second) p.session.Close() //nolint:errcheck } }() wg.Wait() return nil }