123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869 |
- package ticket
- import (
- "github.com/confluentinc/confluent-kafka-go/v2/kafka"
- "github.com/kelindar/bitmap"
- )
- const (
- messageHeaderFlags = "internal.flags"
- )
- func NewKafkaMessage(src *kafka.Message) *Message {
- m := &Message{
- Provider: MessageProvider_EmailMessageProvider,
- Body: string(src.Value),
- Raw: string(src.Value),
- Date: src.Timestamp.UnixMilli(),
- Headers: []*Message_Header{},
- }
- for _, h := range src.Headers {
- m.Headers = append(m.Headers, &Message_Header{
- Key: h.Key,
- Value: string(h.Value),
- })
- }
- return m
- }
- func (m *Message) SetFlags(flags bitmap.Bitmap) {
- var h *Message_Header
- for i := range m.Headers {
- if m.Headers[i].Key != messageHeaderFlags {
- continue
- }
- h = m.Headers[i]
- break
- }
- if h == nil {
- h = &Message_Header{
- Key: messageHeaderFlags,
- }
- m.Headers = append(m.Headers, h)
- }
- h.Value = string(flags.ToBytes())
- }
- func (m *Message) Flags() bitmap.Bitmap {
- var h *Message_Header
- for i := range m.Headers {
- if m.Headers[i].Key != messageHeaderFlags {
- continue
- }
- h = m.Headers[i]
- break
- }
- if h == nil {
- return bitmap.Bitmap{}
- }
- return bitmap.FromBytes([]byte(h.Value))
- }
|