kafka.go 844 B

12345678910111213141516171819202122232425262728293031323334353637383940
  1. package service
  2. import (
  3. "github.com/confluentinc/confluent-kafka-go/v2/kafka"
  4. "github.com/prometheus/client_golang/prometheus"
  5. "github.com/prometheus/client_golang/prometheus/promauto"
  6. )
  7. type ConsumerHandler func(*kafka.Message) error
  8. type ConsumerOptions struct {
  9. Counter prometheus.Counter
  10. }
  11. func ConsumerOptionsWithCounter() ConsumerOptions {
  12. return ConsumerOptions{
  13. Counter: promauto.NewCounter(prometheus.CounterOpts{
  14. Namespace: "kafka",
  15. Subsystem: "consumer",
  16. Name: "received",
  17. Help: "The total number of received messages",
  18. }),
  19. }
  20. }
  21. type Consumer interface {
  22. Service
  23. SetPause(bool) error
  24. Paused() bool
  25. Subscribe([]string, chan *kafka.Message, ConsumerOptions) error
  26. Subscribed() bool
  27. RegisterHandlers(...ConsumerHandler)
  28. }
  29. type Producer interface {
  30. Service
  31. Produce(*kafka.Message) error
  32. }