message.go 970 B

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. package ticket
  2. import (
  3. "github.com/confluentinc/confluent-kafka-go/v2/kafka"
  4. "github.com/google/uuid"
  5. "github.com/kelindar/bitmap"
  6. )
  7. func NewKafkaMessage(src *kafka.Message) *Message {
  8. m := &Message{
  9. Id: uuid.NewString(),
  10. Provider: MessageProvider_EmailMessageProvider,
  11. Body: string(src.Value),
  12. Raw: string(src.Value),
  13. Date: src.Timestamp.UnixMilli(),
  14. Headers: []*Message_Header{},
  15. }
  16. for _, h := range src.Headers {
  17. m.Headers = append(m.Headers, &Message_Header{
  18. Key: h.Key,
  19. Value: string(h.Value),
  20. })
  21. }
  22. return m
  23. }
  24. func (m *Message) SetFlag(x uint32) {
  25. f := m.bitmap()
  26. f.Set(x)
  27. m.Flags = f.ToBytes()
  28. }
  29. func (m *Message) HasFlag(x uint32) bool {
  30. return m.bitmap().Contains(x)
  31. }
  32. func (m *Message) SetBitmap(flags bitmap.Bitmap) {
  33. m.Flags = flags.ToBytes()
  34. }
  35. func (m *Message) bitmap() *bitmap.Bitmap {
  36. var flags bitmap.Bitmap
  37. if m.Flags != nil {
  38. flags = bitmap.FromBytes(m.Flags)
  39. }
  40. return &flags
  41. }