message.go 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. package ticket
  2. import (
  3. "github.com/confluentinc/confluent-kafka-go/v2/kafka"
  4. "github.com/kelindar/bitmap"
  5. )
  6. const (
  7. messageHeaderFlags = "internal.flags"
  8. )
  9. func NewKafkaMessage(src *kafka.Message) *Message {
  10. m := &Message{
  11. Provider: MessageProvider_EmailMessageProvider,
  12. Body: string(src.Value),
  13. Raw: string(src.Value),
  14. Date: src.Timestamp.UnixMilli(),
  15. Headers: []*Message_Header{},
  16. }
  17. for _, h := range src.Headers {
  18. m.Headers = append(m.Headers, &Message_Header{
  19. Key: h.Key,
  20. Value: string(h.Value),
  21. })
  22. }
  23. return m
  24. }
  25. func (m *Message) SetFlags(flags bitmap.Bitmap) {
  26. var h *Message_Header
  27. for i := range m.Headers {
  28. if m.Headers[i].Key != messageHeaderFlags {
  29. continue
  30. }
  31. h = m.Headers[i]
  32. break
  33. }
  34. if h == nil {
  35. h = &Message_Header{
  36. Key: messageHeaderFlags,
  37. }
  38. m.Headers = append(m.Headers, h)
  39. }
  40. h.Value = string(flags.ToBytes())
  41. }
  42. func (m *Message) Flags() bitmap.Bitmap {
  43. var h *Message_Header
  44. for i := range m.Headers {
  45. if m.Headers[i].Key != messageHeaderFlags {
  46. continue
  47. }
  48. h = m.Headers[i]
  49. break
  50. }
  51. if h == nil {
  52. return bitmap.Bitmap{}
  53. }
  54. return bitmap.FromBytes([]byte(h.Value))
  55. }