1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- package kafka
- import (
- "fmt"
- lib "github.com/confluentinc/confluent-kafka-go/v2/kafka"
- "strings"
- "sync"
- "time"
- )
- type _producer struct {
- config *Config
- session *lib.Producer
- }
- func NewProducer(cfg *Config) (Producer, error) {
- var (
- p = &_producer{
- config: cfg,
- }
- err error
- )
- if cfg == nil {
- return nil, fmt.Errorf("config must be provided")
- }
- opts := &lib.ConfigMap{
- "bootstrap.servers": strings.Join(p.config.Hosts, ","),
- }
- if p.session, err = lib.NewProducer(opts); err != nil {
- return nil, err
- }
- return p, nil
- }
- func (p *_producer) Produce(message *lib.Message) error {
- var (
- delivery = make(chan lib.Event)
- err error
- )
- if message.TopicPartition.Topic == nil {
- return fmt.Errorf("illegal message: at least one topic must be provided for topic partition")
- }
- if err = p.session.Produce(message, delivery); err != nil {
- return err
- }
- e := <-delivery
- m := e.(*lib.Message)
- if m.TopicPartition.Error != nil {
- return fmt.Errorf("delivery failed: %v", m.TopicPartition.Error)
- }
- close(delivery)
- return nil
- }
- func (p *_producer) String() string {
- return "kafka:producer"
- }
- func (p *_producer) Close() error {
- var wg sync.WaitGroup
- wg.Add(1)
- go func() {
- defer wg.Done()
- if p.session != nil {
- time.Sleep(time.Second)
- p.session.Close() //nolint:errcheck
- }
- }()
- wg.Wait()
- return nil
- }
|