95 lines
2.3 KiB
Go
95 lines
2.3 KiB
Go
package consumer
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"io"
|
|
"time"
|
|
|
|
"github.com/aykhans/oh-my-chat/internal/adapter/config"
|
|
"github.com/aykhans/oh-my-chat/internal/adapter/logger"
|
|
"github.com/aykhans/oh-my-chat/internal/core/domain"
|
|
"github.com/google/uuid"
|
|
|
|
"github.com/segmentio/kafka-go"
|
|
)
|
|
|
|
var log = logger.NewStdLogger()
|
|
|
|
type MessageConsumer struct {
|
|
kafkaConsumerConfig *config.KafkaConsumerConfig
|
|
}
|
|
|
|
type ConsumerMessage struct {
|
|
UserID uuid.UUID `json:"user_id"`
|
|
ChatID string `json:"chat_id"`
|
|
Content string `json:"content"`
|
|
Timestamp time.Time `json:"timestamp"`
|
|
}
|
|
|
|
func NewMessageConsumer(consumerConfig *config.KafkaConsumerConfig) *MessageConsumer {
|
|
return &MessageConsumer{consumerConfig}
|
|
}
|
|
|
|
func (messageConsumer *MessageConsumer) ConsumeMessage(
|
|
ctx context.Context,
|
|
uid string,
|
|
getChats func() []string,
|
|
message chan<- *domain.StreamMessage,
|
|
) error {
|
|
consumer := kafka.NewReader(kafka.ReaderConfig{
|
|
Brokers: messageConsumer.kafkaConsumerConfig.BootstrapServers,
|
|
GroupID: uid,
|
|
GroupTopics: getChats(),
|
|
MaxBytes: 10e6, // 10MB
|
|
ReadLagInterval: -1,
|
|
MaxWait: 300 * time.Millisecond,
|
|
GroupBalancers: []kafka.GroupBalancer{kafka.RoundRobinGroupBalancer{}},
|
|
StartOffset: kafka.FirstOffset,
|
|
})
|
|
|
|
defer func() {
|
|
if err := consumer.Close(); err != nil {
|
|
log.Error("Error closing consumer", "error", err.Error())
|
|
}
|
|
}()
|
|
|
|
for {
|
|
msg, err := consumer.FetchMessage(ctx)
|
|
if err != nil {
|
|
switch err {
|
|
case io.EOF:
|
|
return nil
|
|
case context.Canceled:
|
|
return nil
|
|
}
|
|
log.Error("Error fetching message from kafka", "error", err.Error())
|
|
continue
|
|
}
|
|
|
|
consumerMeesage := &ConsumerMessage{}
|
|
err = json.Unmarshal(msg.Value, consumerMeesage)
|
|
if err != nil {
|
|
log.Error("Error unmarshalling message", "error", err.Error())
|
|
return domain.ErrInternal
|
|
}
|
|
|
|
message <- &domain.StreamMessage{
|
|
Message: &domain.Message{
|
|
UserID: consumerMeesage.UserID,
|
|
ChatID: consumerMeesage.ChatID,
|
|
Content: consumerMeesage.Content,
|
|
Timestamp: consumerMeesage.Timestamp,
|
|
},
|
|
Commit: func() error {
|
|
err := consumer.CommitMessages(ctx, msg)
|
|
if err != nil {
|
|
log.Error("Error committing kafka message", "error", err.Error())
|
|
return domain.ErrInternal
|
|
}
|
|
return nil
|
|
},
|
|
}
|
|
}
|
|
}
|