message.go 1.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. package ticket
  2. import (
  3. "github.com/confluentinc/confluent-kafka-go/v2/kafka"
  4. "github.com/google/uuid"
  5. "github.com/kelindar/bitmap"
  6. )
  7. func NewKafkaMessage(src *kafka.Message) *Message {
  8. m := &Message{
  9. Id: uuid.NewString(),
  10. Provider: MessageProvider_EmailMessageProvider,
  11. Body: string(src.Value),
  12. Raw: string(src.Value),
  13. Date: src.Timestamp.UnixMilli(),
  14. Headers: []*Message_Header{},
  15. }
  16. for _, h := range src.Headers {
  17. m.Headers = append(m.Headers, &Message_Header{
  18. Key: h.Key,
  19. Value: string(h.Value),
  20. })
  21. }
  22. return m
  23. }
  24. func (m *Message) SetFlag(x uint32) {
  25. f := m.bitmap()
  26. f.Set(x)
  27. m.Flags = f.ToBytes()
  28. }
  29. func (m *Message) HasFlag(x uint32) bool {
  30. return m.bitmap().Contains(x)
  31. }
  32. func (m *Message) FlagBytes() []byte {
  33. return m.bitmap().ToBytes()
  34. }
  35. func (m *Message) SetBitmap(flags bitmap.Bitmap) {
  36. m.Flags = flags.ToBytes()
  37. }
  38. func (m *Message) bitmap() *bitmap.Bitmap {
  39. var flags bitmap.Bitmap
  40. if m.Flags != nil {
  41. flags = bitmap.FromBytes(m.Flags)
  42. }
  43. return &flags
  44. }