| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 | 
							- package kafka
 
- import (
 
- 	"fmt"
 
- 	lib "github.com/confluentinc/confluent-kafka-go/v2/kafka"
 
- 	"strings"
 
- 	"sync"
 
- 	"time"
 
- )
 
- type _producer struct {
 
- 	config  *Config
 
- 	session *lib.Producer
 
- }
 
- func NewProducer(cfg *Config) (Producer, error) {
 
- 	var (
 
- 		p = &_producer{
 
- 			config: cfg,
 
- 		}
 
- 		err error
 
- 	)
 
- 	if cfg == nil {
 
- 		return nil, fmt.Errorf("config must be provided")
 
- 	}
 
- 	opts := &lib.ConfigMap{
 
- 		"bootstrap.servers": strings.Join(p.config.Hosts, ","),
 
- 	}
 
- 	if p.session, err = lib.NewProducer(opts); err != nil {
 
- 		return nil, err
 
- 	}
 
- 	return p, nil
 
- }
 
- func (p *_producer) Produce(message *lib.Message) error {
 
- 	var (
 
- 		delivery = make(chan lib.Event)
 
- 		err      error
 
- 	)
 
- 	if message.TopicPartition.Topic == nil {
 
- 		return fmt.Errorf("illegal message: at least one topic must be provided for topic partition")
 
- 	}
 
- 	if err = p.session.Produce(message, delivery); err != nil {
 
- 		return err
 
- 	}
 
- 	e := <-delivery
 
- 	m := e.(*lib.Message)
 
- 	if m.TopicPartition.Error != nil {
 
- 		return fmt.Errorf("delivery failed: %v", m.TopicPartition.Error)
 
- 	}
 
- 	close(delivery)
 
- 	return nil
 
- }
 
- func (p *_producer) String() string {
 
- 	return "kafka:producer"
 
- }
 
- func (p *_producer) Close() error {
 
- 	var wg sync.WaitGroup
 
- 	wg.Add(1)
 
- 	go func() {
 
- 		defer wg.Done()
 
- 		if p.session != nil {
 
- 			time.Sleep(time.Second)
 
- 			p.session.Close() //nolint:errcheck
 
- 		}
 
- 	}()
 
- 	wg.Wait()
 
- 	return nil
 
- }
 
 
  |