2024-10-13 13:31:58 +04:00

74 lines
2.5 KiB
Go

package kafka
import (
"context"
"encoding/json"
"time"
"github.com/IBM/sarama"
"github.com/aykhans/oh-my-chat/internal/adapter/config"
"github.com/aykhans/oh-my-chat/internal/core/domain"
"github.com/google/uuid"
)
type MessageProducer struct {
saramaProducer *sarama.SyncProducer
}
func (producer *MessageProducer) ProduceMessage(
ctx context.Context,
message *domain.Message,
) error {
messageJSON, err := (&ProducerMessage{
UserID: message.UserID,
ChatID: message.ChatID,
Content: message.Content,
Timestamp: message.Timestamp,
}).JSON()
if err != nil {
return err
}
producerMessage := &sarama.ProducerMessage{
Topic: message.ChatID,
Value: sarama.StringEncoder(messageJSON),
Timestamp: message.Timestamp,
}
_, _, err = (*producer.saramaProducer).SendMessage(producerMessage)
if err != nil {
return err
}
return nil
}
type ProducerMessage struct {
UserID uuid.UUID `json:"user_id"`
ChatID string `json:"chat_id"`
Content string `json:"content"`
Timestamp time.Time `json:"timestamp"`
}
func (message *ProducerMessage) JSON() ([]byte, error) {
return json.Marshal(message)
}
func NewMessageProducer(producerConfig *config.KafkaProducerConfig) (*MessageProducer, error) {
config := sarama.NewConfig()
config.Producer.Return.Successes = true // enable message delivery reports
config.Producer.RequiredAcks = sarama.WaitForAll // require all in-sync replicas to acknowledge the message
config.Producer.Retry.Max = 5 // number of retries before giving up on sending a message to a partition
config.Producer.Retry.Backoff = time.Second * 60 // time to wait between retries
config.Producer.Partitioner = sarama.NewRoundRobinPartitioner // walks through the available partitions one at a time
config.Producer.Compression = sarama.CompressionSnappy // compress messages using Snappy
config.Producer.Idempotent = true // producer will ensure that messages are successfully sent and acknowledged
config.Producer.Flush.Frequency = time.Millisecond * 20 // time to wait before sending a batch of messages
config.Producer.Flush.Bytes = 32 * 1024 // number of bytes to trigger a batch of messages
config.Net.MaxOpenRequests = 1
config.Metadata.AllowAutoTopicCreation = true
producer, err := sarama.NewSyncProducer(producerConfig.BootstrapServers, config)
if err != nil {
return nil, err
}
return &MessageProducer{&producer}, err
}