128 lines
2.8 KiB
Go
128 lines
2.8 KiB
Go
package http
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"time"
|
|
|
|
"github.com/aykhans/oh-my-chat/internal/adapter/logger"
|
|
"github.com/aykhans/oh-my-chat/internal/core/domain"
|
|
"github.com/aykhans/oh-my-chat/internal/core/port"
|
|
"github.com/go-playground/validator/v10"
|
|
"github.com/gofiber/contrib/websocket"
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
var log = logger.NewStdLogger()
|
|
|
|
type ChatHandler struct {
|
|
messageService port.MessageService
|
|
validator *validator.Validate
|
|
}
|
|
|
|
func NewChatHandler(
|
|
messageService port.MessageService,
|
|
validator *validator.Validate,
|
|
) *ChatHandler {
|
|
return &ChatHandler{messageService, validator}
|
|
}
|
|
|
|
type messageReuqest struct {
|
|
Content string `json:"content" validate:"required"`
|
|
To string `json:"to" validate:"required"`
|
|
}
|
|
|
|
func (chatHandler *ChatHandler) Connect(conn *websocket.Conn) {
|
|
authPayload := getAuthPayloadInWS(conn)
|
|
|
|
streamReceiveMessageCtx, streamReceiveMessageCtxCancel := context.WithCancel(context.Background())
|
|
consumeMessageChan := make(chan *domain.StreamMessage)
|
|
wsMessageChan := make(chan *domain.Message)
|
|
|
|
go readMessageFromWS(
|
|
conn,
|
|
chatHandler.validator,
|
|
authPayload.UserID,
|
|
wsMessageChan,
|
|
)
|
|
|
|
go func() {
|
|
err := chatHandler.messageService.ReceiveMessage(
|
|
streamReceiveMessageCtx,
|
|
authPayload.UserID,
|
|
consumeMessageChan,
|
|
)
|
|
if err != nil {
|
|
return
|
|
}
|
|
}()
|
|
|
|
defer func() {
|
|
streamReceiveMessageCtxCancel()
|
|
}()
|
|
|
|
var err error
|
|
|
|
for {
|
|
select {
|
|
case streamReceivedMessage := <-consumeMessageChan:
|
|
messageBytes, _ := json.Marshal(streamReceivedMessage.Message)
|
|
if err = conn.WriteMessage(websocket.TextMessage, messageBytes); err != nil {
|
|
return
|
|
}
|
|
err := streamReceivedMessage.Commit()
|
|
if err != nil {
|
|
log.Error("Stream message commit error", "error", err.Error())
|
|
return
|
|
}
|
|
|
|
case wsMessage := <-wsMessageChan:
|
|
if wsMessage == nil {
|
|
return
|
|
}
|
|
streamSendMessageCtx := context.Background()
|
|
chatHandler.messageService.SendMessage(streamSendMessageCtx, wsMessage)
|
|
}
|
|
}
|
|
}
|
|
|
|
func readMessageFromWS(
|
|
conn *websocket.Conn,
|
|
validator *validator.Validate,
|
|
userID uuid.UUID,
|
|
messageChan chan<- *domain.Message,
|
|
) {
|
|
var (
|
|
wsReceivedMessageType int
|
|
wsReceivedMessage []byte
|
|
err error
|
|
)
|
|
for {
|
|
if wsReceivedMessageType, wsReceivedMessage, err = conn.ReadMessage(); err != nil {
|
|
messageChan <- nil
|
|
break
|
|
}
|
|
|
|
if wsReceivedMessageType == websocket.TextMessage {
|
|
messageBody := new(messageReuqest)
|
|
if err := json.Unmarshal(wsReceivedMessage, &messageBody); err != nil {
|
|
messageChan <- nil
|
|
break
|
|
}
|
|
|
|
if err := validator.Struct(messageBody); err != nil {
|
|
messageChan <- nil
|
|
break
|
|
}
|
|
|
|
timestamp := time.Now()
|
|
messageChan <- &domain.Message{
|
|
UserID: userID,
|
|
ChatID: messageBody.To,
|
|
Content: messageBody.Content,
|
|
Timestamp: timestamp,
|
|
}
|
|
}
|
|
}
|
|
}
|