123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051 |
- package ticket
- import (
- "github.com/confluentinc/confluent-kafka-go/v2/kafka"
- "github.com/google/uuid"
- "github.com/kelindar/bitmap"
- )
- func NewKafkaMessage(src *kafka.Message) *Message {
- m := &Message{
- Id: uuid.NewString(),
- Provider: MessageProvider_EmailMessageProvider,
- Body: string(src.Value),
- Raw: string(src.Value),
- Date: src.Timestamp.UnixMilli(),
- Headers: []*Message_Header{},
- }
- for _, h := range src.Headers {
- m.Headers = append(m.Headers, &Message_Header{
- Key: h.Key,
- Value: string(h.Value),
- })
- }
- return m
- }
- func (m *Message) SetFlag(x uint32) {
- f := m.bitmap()
- f.Set(x)
- m.Flags = f.ToBytes()
- }
- func (m *Message) HasFlag(x uint32) bool {
- return m.bitmap().Contains(x)
- }
- func (m *Message) SetBitmap(flags bitmap.Bitmap) {
- m.Flags = flags.ToBytes()
- }
- func (m *Message) bitmap() *bitmap.Bitmap {
- var flags bitmap.Bitmap
- if m.Flags != nil {
- flags = bitmap.FromBytes(m.Flags)
- }
- return &flags
- }
|