init
This commit is contained in:
94
internal/adapter/streamers/kafka/consumer/message.go
Normal file
94
internal/adapter/streamers/kafka/consumer/message.go
Normal file
@ -0,0 +1,94 @@
|
||||
package consumer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/aykhans/oh-my-chat/internal/adapter/config"
|
||||
"github.com/aykhans/oh-my-chat/internal/adapter/logger"
|
||||
"github.com/aykhans/oh-my-chat/internal/core/domain"
|
||||
"github.com/google/uuid"
|
||||
|
||||
"github.com/segmentio/kafka-go"
|
||||
)
|
||||
|
||||
var log = logger.NewStdLogger()
|
||||
|
||||
type MessageConsumer struct {
|
||||
kafkaConsumerConfig *config.KafkaConsumerConfig
|
||||
}
|
||||
|
||||
type ConsumerMessage struct {
|
||||
UserID uuid.UUID `json:"user_id"`
|
||||
ChatID string `json:"chat_id"`
|
||||
Content string `json:"content"`
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
}
|
||||
|
||||
func NewMessageConsumer(consumerConfig *config.KafkaConsumerConfig) *MessageConsumer {
|
||||
return &MessageConsumer{consumerConfig}
|
||||
}
|
||||
|
||||
func (messageConsumer *MessageConsumer) ConsumeMessage(
|
||||
ctx context.Context,
|
||||
uid string,
|
||||
getChats func() []string,
|
||||
message chan<- *domain.StreamMessage,
|
||||
) error {
|
||||
consumer := kafka.NewReader(kafka.ReaderConfig{
|
||||
Brokers: messageConsumer.kafkaConsumerConfig.BootstrapServers,
|
||||
GroupID: uid,
|
||||
GroupTopics: getChats(),
|
||||
MaxBytes: 10e6, // 10MB
|
||||
ReadLagInterval: -1,
|
||||
MaxWait: 300 * time.Millisecond,
|
||||
GroupBalancers: []kafka.GroupBalancer{kafka.RoundRobinGroupBalancer{}},
|
||||
StartOffset: kafka.FirstOffset,
|
||||
})
|
||||
|
||||
defer func() {
|
||||
if err := consumer.Close(); err != nil {
|
||||
log.Error("Error closing consumer", "error", err.Error())
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
msg, err := consumer.FetchMessage(ctx)
|
||||
if err != nil {
|
||||
switch err {
|
||||
case io.EOF:
|
||||
return nil
|
||||
case context.Canceled:
|
||||
return nil
|
||||
}
|
||||
log.Error("Error fetching message from kafka", "error", err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
consumerMeesage := &ConsumerMessage{}
|
||||
err = json.Unmarshal(msg.Value, consumerMeesage)
|
||||
if err != nil {
|
||||
log.Error("Error unmarshalling message", "error", err.Error())
|
||||
return domain.ErrInternal
|
||||
}
|
||||
|
||||
message <- &domain.StreamMessage{
|
||||
Message: &domain.Message{
|
||||
UserID: consumerMeesage.UserID,
|
||||
ChatID: consumerMeesage.ChatID,
|
||||
Content: consumerMeesage.Content,
|
||||
Timestamp: consumerMeesage.Timestamp,
|
||||
},
|
||||
Commit: func() error {
|
||||
err := consumer.CommitMessages(ctx, msg)
|
||||
if err != nil {
|
||||
log.Error("Error committing kafka message", "error", err.Error())
|
||||
return domain.ErrInternal
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
73
internal/adapter/streamers/kafka/producer/message.go
Normal file
73
internal/adapter/streamers/kafka/producer/message.go
Normal file
@ -0,0 +1,73 @@
|
||||
package kafka
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"github.com/IBM/sarama"
|
||||
"github.com/aykhans/oh-my-chat/internal/adapter/config"
|
||||
"github.com/aykhans/oh-my-chat/internal/core/domain"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type MessageProducer struct {
|
||||
saramaProducer *sarama.SyncProducer
|
||||
}
|
||||
|
||||
func (producer *MessageProducer) ProduceMessage(
|
||||
ctx context.Context,
|
||||
message *domain.Message,
|
||||
) error {
|
||||
messageJSON, err := (&ProducerMessage{
|
||||
UserID: message.UserID,
|
||||
ChatID: message.ChatID,
|
||||
Content: message.Content,
|
||||
Timestamp: message.Timestamp,
|
||||
}).JSON()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
producerMessage := &sarama.ProducerMessage{
|
||||
Topic: message.ChatID,
|
||||
Value: sarama.StringEncoder(messageJSON),
|
||||
Timestamp: message.Timestamp,
|
||||
}
|
||||
_, _, err = (*producer.saramaProducer).SendMessage(producerMessage)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type ProducerMessage struct {
|
||||
UserID uuid.UUID `json:"user_id"`
|
||||
ChatID string `json:"chat_id"`
|
||||
Content string `json:"content"`
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
}
|
||||
|
||||
func (message *ProducerMessage) JSON() ([]byte, error) {
|
||||
return json.Marshal(message)
|
||||
}
|
||||
|
||||
func NewMessageProducer(producerConfig *config.KafkaProducerConfig) (*MessageProducer, error) {
|
||||
config := sarama.NewConfig()
|
||||
config.Producer.Return.Successes = true // enable message delivery reports
|
||||
config.Producer.RequiredAcks = sarama.WaitForAll // require all in-sync replicas to acknowledge the message
|
||||
config.Producer.Retry.Max = 5 // number of retries before giving up on sending a message to a partition
|
||||
config.Producer.Retry.Backoff = time.Second * 60 // time to wait between retries
|
||||
config.Producer.Partitioner = sarama.NewRoundRobinPartitioner // walks through the available partitions one at a time
|
||||
config.Producer.Compression = sarama.CompressionSnappy // compress messages using Snappy
|
||||
config.Producer.Idempotent = true // producer will ensure that messages are successfully sent and acknowledged
|
||||
config.Producer.Flush.Frequency = time.Millisecond * 20 // time to wait before sending a batch of messages
|
||||
config.Producer.Flush.Bytes = 32 * 1024 // number of bytes to trigger a batch of messages
|
||||
config.Net.MaxOpenRequests = 1
|
||||
config.Metadata.AllowAutoTopicCreation = true
|
||||
|
||||
producer, err := sarama.NewSyncProducer(producerConfig.BootstrapServers, config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &MessageProducer{&producer}, err
|
||||
}
|
Reference in New Issue
Block a user