2024-10-13 13:31:58 +04:00

128 lines
2.8 KiB
Go

package http
import (
"context"
"encoding/json"
"time"
"github.com/aykhans/oh-my-chat/internal/adapter/logger"
"github.com/aykhans/oh-my-chat/internal/core/domain"
"github.com/aykhans/oh-my-chat/internal/core/port"
"github.com/go-playground/validator/v10"
"github.com/gofiber/contrib/websocket"
"github.com/google/uuid"
)
var log = logger.NewStdLogger()
type ChatHandler struct {
messageService port.MessageService
validator *validator.Validate
}
func NewChatHandler(
messageService port.MessageService,
validator *validator.Validate,
) *ChatHandler {
return &ChatHandler{messageService, validator}
}
type messageReuqest struct {
Content string `json:"content" validate:"required"`
To string `json:"to" validate:"required"`
}
func (chatHandler *ChatHandler) Connect(conn *websocket.Conn) {
authPayload := getAuthPayloadInWS(conn)
streamReceiveMessageCtx, streamReceiveMessageCtxCancel := context.WithCancel(context.Background())
consumeMessageChan := make(chan *domain.StreamMessage)
wsMessageChan := make(chan *domain.Message)
go readMessageFromWS(
conn,
chatHandler.validator,
authPayload.UserID,
wsMessageChan,
)
go func() {
err := chatHandler.messageService.ReceiveMessage(
streamReceiveMessageCtx,
authPayload.UserID,
consumeMessageChan,
)
if err != nil {
return
}
}()
defer func() {
streamReceiveMessageCtxCancel()
}()
var err error
for {
select {
case streamReceivedMessage := <-consumeMessageChan:
messageBytes, _ := json.Marshal(streamReceivedMessage.Message)
if err = conn.WriteMessage(websocket.TextMessage, messageBytes); err != nil {
return
}
err := streamReceivedMessage.Commit()
if err != nil {
log.Error("Stream message commit error", "error", err.Error())
return
}
case wsMessage := <-wsMessageChan:
if wsMessage == nil {
return
}
streamSendMessageCtx := context.Background()
chatHandler.messageService.SendMessage(streamSendMessageCtx, wsMessage)
}
}
}
func readMessageFromWS(
conn *websocket.Conn,
validator *validator.Validate,
userID uuid.UUID,
messageChan chan<- *domain.Message,
) {
var (
wsReceivedMessageType int
wsReceivedMessage []byte
err error
)
for {
if wsReceivedMessageType, wsReceivedMessage, err = conn.ReadMessage(); err != nil {
messageChan <- nil
break
}
if wsReceivedMessageType == websocket.TextMessage {
messageBody := new(messageReuqest)
if err := json.Unmarshal(wsReceivedMessage, &messageBody); err != nil {
messageChan <- nil
break
}
if err := validator.Struct(messageBody); err != nil {
messageChan <- nil
break
}
timestamp := time.Now()
messageChan <- &domain.Message{
UserID: userID,
ChatID: messageBody.To,
Content: messageBody.Content,
Timestamp: timestamp,
}
}
}
}