package producer import ( "fmt" "git.beejay.kim/tool/service" "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/google/uuid" "github.com/rs/zerolog/log" "strings" "sync" "time" ) type _producer struct { config *Config session *kafka.Producer } func New(cfg *Config) (service.Producer, error) { var ( p = new(_producer) err error ) if cfg == nil { return nil, fmt.Errorf("config must be provided") } p.config = cfg opts := &kafka.ConfigMap{ "bootstrap.servers": strings.Join(p.config.Hosts, ","), } if p.session, err = kafka.NewProducer(opts); err != nil { return nil, err } go func() { for e := range p.session.Events() { switch ev := e.(type) { case kafka.Error: // Generic client instance-level errors, such as // broker connection failures, authentication issues, etc. // // These errors should generally be considered informational // as the underlying client will automatically try to // recover from any errors encountered, the application // does not need to take action on them. log.Debug().Str("service", "producer").Err(ev).Send() default: log.Debug().Str("service", "producer").Msgf("ignored event: %s", ev) } } }() return p, nil } func (p *_producer) ID() uuid.UUID { return uuid.NewSHA1(uuid.NameSpaceDNS, []byte("producer.kafka")) } func (p *_producer) Close() error { var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() if p.session != nil { time.Sleep(time.Second) p.session.Close() //nolint:errcheck } }() wg.Wait() return nil } func (p *_producer) Produce(message *kafka.Message) error { var ( delivery = make(chan kafka.Event) err error ) defer close(delivery) if message.TopicPartition.Topic == nil { return fmt.Errorf("illegal message: at least one topic name must be provided") } if err = p.session.Produce(message, delivery); err != nil { return err } e := <-delivery m := e.(*kafka.Message) if m.TopicPartition.Error != nil { return fmt.Errorf("delivery failed: %v", m.TopicPartition.Error) } return nil }