kafka.go 805 B

123456789101112131415161718192021222324252627282930313233343536
  1. package service
  2. import (
  3. "github.com/confluentinc/confluent-kafka-go/v2/kafka"
  4. "github.com/prometheus/client_golang/prometheus"
  5. "github.com/prometheus/client_golang/prometheus/promauto"
  6. )
  7. type ConsumerHandler func(*kafka.Message) error
  8. type ConsumerOptions struct {
  9. Counter prometheus.Counter
  10. }
  11. func ConsumerOptionsWithCounter() ConsumerOptions {
  12. return ConsumerOptions{
  13. Counter: promauto.NewCounter(prometheus.CounterOpts{
  14. Namespace: "kafka",
  15. Subsystem: "consumer",
  16. Name: "received",
  17. Help: "The total number of received messages",
  18. }),
  19. }
  20. }
  21. type Consumer interface {
  22. Service
  23. Subscribe([]string, chan *kafka.Message, ConsumerOptions) error
  24. Subscribed() bool
  25. RegisterHandlers(...ConsumerHandler)
  26. }
  27. type Producer interface {
  28. Service
  29. Produce(*kafka.Message) error
  30. }