package kafka import ( "context" "encoding/json" "time" "github.com/IBM/sarama" "github.com/aykhans/oh-my-chat/internal/adapter/config" "github.com/aykhans/oh-my-chat/internal/core/domain" "github.com/google/uuid" ) type MessageProducer struct { saramaProducer *sarama.SyncProducer } func (producer *MessageProducer) ProduceMessage( ctx context.Context, message *domain.Message, ) error { messageJSON, err := (&ProducerMessage{ UserID: message.UserID, ChatID: message.ChatID, Content: message.Content, Timestamp: message.Timestamp, }).JSON() if err != nil { return err } producerMessage := &sarama.ProducerMessage{ Topic: message.ChatID, Value: sarama.StringEncoder(messageJSON), Timestamp: message.Timestamp, } _, _, err = (*producer.saramaProducer).SendMessage(producerMessage) if err != nil { return err } return nil } type ProducerMessage struct { UserID uuid.UUID `json:"user_id"` ChatID string `json:"chat_id"` Content string `json:"content"` Timestamp time.Time `json:"timestamp"` } func (message *ProducerMessage) JSON() ([]byte, error) { return json.Marshal(message) } func NewMessageProducer(producerConfig *config.KafkaProducerConfig) (*MessageProducer, error) { config := sarama.NewConfig() config.Producer.Return.Successes = true // enable message delivery reports config.Producer.RequiredAcks = sarama.WaitForAll // require all in-sync replicas to acknowledge the message config.Producer.Retry.Max = 5 // number of retries before giving up on sending a message to a partition config.Producer.Retry.Backoff = time.Second * 60 // time to wait between retries config.Producer.Partitioner = sarama.NewRoundRobinPartitioner // walks through the available partitions one at a time config.Producer.Compression = sarama.CompressionSnappy // compress messages using Snappy config.Producer.Idempotent = true // producer will ensure that messages are successfully sent and acknowledged config.Producer.Flush.Frequency = time.Millisecond * 20 // time to wait before sending a batch of messages config.Producer.Flush.Bytes = 32 * 1024 // number of bytes to trigger a batch of messages config.Net.MaxOpenRequests = 1 config.Metadata.AllowAutoTopicCreation = true producer, err := sarama.NewSyncProducer(producerConfig.BootstrapServers, config) if err != nil { return nil, err } return &MessageProducer{&producer}, err }