message.go 736 B

1234567891011121314151617181920212223242526272829303132333435
  1. package ticket
  2. import (
  3. "github.com/confluentinc/confluent-kafka-go/v2/kafka"
  4. "github.com/google/uuid"
  5. "github.com/kelindar/bitmap"
  6. )
  7. func NewKafkaMessage(src *kafka.Message) *Message {
  8. m := &Message{
  9. Id: uuid.NewString(),
  10. Provider: MessageProvider_EmailMessageProvider,
  11. Body: string(src.Value),
  12. Raw: string(src.Value),
  13. Date: src.Timestamp.UnixMilli(),
  14. Headers: []*Message_Header{},
  15. }
  16. for _, h := range src.Headers {
  17. m.Headers = append(m.Headers, &Message_Header{
  18. Key: h.Key,
  19. Value: string(h.Value),
  20. })
  21. }
  22. return m
  23. }
  24. func (m *Message) SetBitmap(flags bitmap.Bitmap) {
  25. m.Flags = flags.ToBytes()
  26. }
  27. func (m *Message) GetBitmap() bitmap.Bitmap {
  28. return bitmap.FromBytes(m.Flags)
  29. }