74 lines
2.5 KiB
Go
74 lines
2.5 KiB
Go
package kafka
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"time"
|
|
|
|
"github.com/IBM/sarama"
|
|
"github.com/aykhans/oh-my-chat/internal/adapter/config"
|
|
"github.com/aykhans/oh-my-chat/internal/core/domain"
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
type MessageProducer struct {
|
|
saramaProducer *sarama.SyncProducer
|
|
}
|
|
|
|
func (producer *MessageProducer) ProduceMessage(
|
|
ctx context.Context,
|
|
message *domain.Message,
|
|
) error {
|
|
messageJSON, err := (&ProducerMessage{
|
|
UserID: message.UserID,
|
|
ChatID: message.ChatID,
|
|
Content: message.Content,
|
|
Timestamp: message.Timestamp,
|
|
}).JSON()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
producerMessage := &sarama.ProducerMessage{
|
|
Topic: message.ChatID,
|
|
Value: sarama.StringEncoder(messageJSON),
|
|
Timestamp: message.Timestamp,
|
|
}
|
|
_, _, err = (*producer.saramaProducer).SendMessage(producerMessage)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type ProducerMessage struct {
|
|
UserID uuid.UUID `json:"user_id"`
|
|
ChatID string `json:"chat_id"`
|
|
Content string `json:"content"`
|
|
Timestamp time.Time `json:"timestamp"`
|
|
}
|
|
|
|
func (message *ProducerMessage) JSON() ([]byte, error) {
|
|
return json.Marshal(message)
|
|
}
|
|
|
|
func NewMessageProducer(producerConfig *config.KafkaProducerConfig) (*MessageProducer, error) {
|
|
config := sarama.NewConfig()
|
|
config.Producer.Return.Successes = true // enable message delivery reports
|
|
config.Producer.RequiredAcks = sarama.WaitForAll // require all in-sync replicas to acknowledge the message
|
|
config.Producer.Retry.Max = 5 // number of retries before giving up on sending a message to a partition
|
|
config.Producer.Retry.Backoff = time.Second * 60 // time to wait between retries
|
|
config.Producer.Partitioner = sarama.NewRoundRobinPartitioner // walks through the available partitions one at a time
|
|
config.Producer.Compression = sarama.CompressionSnappy // compress messages using Snappy
|
|
config.Producer.Idempotent = true // producer will ensure that messages are successfully sent and acknowledged
|
|
config.Producer.Flush.Frequency = time.Millisecond * 20 // time to wait before sending a batch of messages
|
|
config.Producer.Flush.Bytes = 32 * 1024 // number of bytes to trigger a batch of messages
|
|
config.Net.MaxOpenRequests = 1
|
|
config.Metadata.AllowAutoTopicCreation = true
|
|
|
|
producer, err := sarama.NewSyncProducer(producerConfig.BootstrapServers, config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &MessageProducer{&producer}, err
|
|
}
|