123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104 |
- package producer
- import (
- "fmt"
- "git.beejay.kim/tool/service"
- "github.com/confluentinc/confluent-kafka-go/v2/kafka"
- "github.com/google/uuid"
- "github.com/rs/zerolog/log"
- "strings"
- "sync"
- "time"
- )
- type _producer struct {
- config *Config
- session *kafka.Producer
- }
- func New(cfg *Config) (service.Producer, error) {
- var (
- p = new(_producer)
- err error
- )
- if cfg == nil {
- return nil, fmt.Errorf("config must be provided")
- }
- p.config = cfg
- opts := &kafka.ConfigMap{
- "bootstrap.servers": strings.Join(p.config.Hosts, ","),
- }
- if p.session, err = kafka.NewProducer(opts); err != nil {
- return nil, err
- }
- go func() {
- for e := range p.session.Events() {
- switch ev := e.(type) {
- case kafka.Error:
- // Generic client instance-level errors, such as
- // broker connection failures, authentication issues, etc.
- //
- // These errors should generally be considered informational
- // as the underlying client will automatically try to
- // recover from any errors encountered, the application
- // does not need to take action on them.
- log.Debug().Str("service", "producer").Err(ev).Send()
- default:
- log.Debug().Str("service", "producer").Msgf("ignored event: %s", ev)
- }
- }
- }()
- return p, nil
- }
- func (p *_producer) ID() uuid.UUID {
- return uuid.NewSHA1(uuid.NameSpaceDNS, []byte("producer.kafka"))
- }
- func (p *_producer) Close() error {
- var wg sync.WaitGroup
- wg.Add(1)
- go func() {
- defer wg.Done()
- if p.session != nil {
- time.Sleep(time.Second)
- p.session.Close() //nolint:errcheck
- }
- }()
- wg.Wait()
- return nil
- }
- func (p *_producer) Produce(message *kafka.Message) error {
- var (
- delivery = make(chan kafka.Event)
- err error
- )
- defer close(delivery)
- if message.TopicPartition.Topic == nil {
- return fmt.Errorf("illegal message: at least one topic name must be provided")
- }
- if err = p.session.Produce(message, delivery); err != nil {
- return err
- }
- e := <-delivery
- m := e.(*kafka.Message)
- if m.TopicPartition.Error != nil {
- return fmt.Errorf("delivery failed: %v", m.TopicPartition.Error)
- }
- return nil
- }
|