package ticket import ( "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/kelindar/bitmap" ) const ( messageHeaderFlags = "internal.flags" ) func NewKafkaMessage(src *kafka.Message) *Message { m := &Message{ Provider: MessageProvider_EmailMessageProvider, Body: string(src.Value), Raw: string(src.Value), Date: src.Timestamp.UnixMilli(), Headers: []*Message_Header{}, } for _, h := range src.Headers { m.Headers = append(m.Headers, &Message_Header{ Key: h.Key, Value: string(h.Value), }) } return m } func (m *Message) SetFlags(flags bitmap.Bitmap) { var h *Message_Header for i := range m.Headers { if m.Headers[i].Key != messageHeaderFlags { continue } h = m.Headers[i] break } if h == nil { h = &Message_Header{ Key: messageHeaderFlags, } m.Headers = append(m.Headers, h) } h.Value = string(flags.ToBytes()) } func (m *Message) Flags() bitmap.Bitmap { var h *Message_Header for i := range m.Headers { if m.Headers[i].Key != messageHeaderFlags { continue } h = m.Headers[i] break } if h == nil { return bitmap.Bitmap{} } return bitmap.FromBytes([]byte(h.Value)) }