2024-10-13 13:31:58 +04:00

95 lines
2.3 KiB
Go

package consumer
import (
"context"
"encoding/json"
"io"
"time"
"github.com/aykhans/oh-my-chat/internal/adapter/config"
"github.com/aykhans/oh-my-chat/internal/adapter/logger"
"github.com/aykhans/oh-my-chat/internal/core/domain"
"github.com/google/uuid"
"github.com/segmentio/kafka-go"
)
var log = logger.NewStdLogger()
type MessageConsumer struct {
kafkaConsumerConfig *config.KafkaConsumerConfig
}
type ConsumerMessage struct {
UserID uuid.UUID `json:"user_id"`
ChatID string `json:"chat_id"`
Content string `json:"content"`
Timestamp time.Time `json:"timestamp"`
}
func NewMessageConsumer(consumerConfig *config.KafkaConsumerConfig) *MessageConsumer {
return &MessageConsumer{consumerConfig}
}
func (messageConsumer *MessageConsumer) ConsumeMessage(
ctx context.Context,
uid string,
getChats func() []string,
message chan<- *domain.StreamMessage,
) error {
consumer := kafka.NewReader(kafka.ReaderConfig{
Brokers: messageConsumer.kafkaConsumerConfig.BootstrapServers,
GroupID: uid,
GroupTopics: getChats(),
MaxBytes: 10e6, // 10MB
ReadLagInterval: -1,
MaxWait: 300 * time.Millisecond,
GroupBalancers: []kafka.GroupBalancer{kafka.RoundRobinGroupBalancer{}},
StartOffset: kafka.FirstOffset,
})
defer func() {
if err := consumer.Close(); err != nil {
log.Error("Error closing consumer", "error", err.Error())
}
}()
for {
msg, err := consumer.FetchMessage(ctx)
if err != nil {
switch err {
case io.EOF:
return nil
case context.Canceled:
return nil
}
log.Error("Error fetching message from kafka", "error", err.Error())
continue
}
consumerMeesage := &ConsumerMessage{}
err = json.Unmarshal(msg.Value, consumerMeesage)
if err != nil {
log.Error("Error unmarshalling message", "error", err.Error())
return domain.ErrInternal
}
message <- &domain.StreamMessage{
Message: &domain.Message{
UserID: consumerMeesage.UserID,
ChatID: consumerMeesage.ChatID,
Content: consumerMeesage.Content,
Timestamp: consumerMeesage.Timestamp,
},
Commit: func() error {
err := consumer.CommitMessages(ctx, msg)
if err != nil {
log.Error("Error committing kafka message", "error", err.Error())
return domain.ErrInternal
}
return nil
},
}
}
}