From aec8d7ed4829b94c743fb9c01a52d0c514553d5d Mon Sep 17 00:00:00 2001 From: Aykhan Shahsuvarov Date: Sun, 13 Oct 2024 13:31:58 +0400 Subject: [PATCH] init --- .gitignore | 1 + README.md | 0 cmd/http/main.go | 83 ++ .../cassandra-rackdc.properties.dc1.rack1 | 12 + .../cassandra-rackdc.properties.dc1.rack2 | 12 + config/scylla/message.cql | 37 + config/scylla/scylla.yaml | 803 ++++++++++++++++++ docker-compose.yml | 84 ++ go.mod | 68 ++ go.sum | 212 +++++ internal/adapter/auth/jwt/jwt.go | 76 ++ internal/adapter/config/app.go | 25 + internal/adapter/config/config.go | 17 + internal/adapter/config/helper.go | 44 + internal/adapter/config/kafka.go | 45 + internal/adapter/config/postgres.go | 19 + internal/adapter/config/scylla.go | 21 + internal/adapter/handlers/http/app.go | 76 ++ internal/adapter/handlers/http/auth.go | 50 ++ internal/adapter/handlers/http/chat.go | 127 +++ internal/adapter/handlers/http/helper.go | 23 + .../adapter/handlers/http/middlewares/auth.go | 33 + .../adapter/handlers/http/middlewares/ws.go | 20 + internal/adapter/handlers/http/responses.go | 29 + internal/adapter/handlers/http/user.go | 57 ++ internal/adapter/handlers/http/validator.go | 60 ++ internal/adapter/logger/slog.go | 15 + internal/adapter/storages/postgres/db.go | 38 + .../000001_create_users_table.down.sql | 1 + .../000001_create_users_table.up.sql | 11 + .../adapter/storages/postgres/models/user.go | 24 + .../storages/postgres/repository/user.go | 109 +++ internal/adapter/storages/scylla/db.go | 33 + .../000001_create_messages_table.down.cql | 1 + .../000001_create_messages_table.up.cql | 8 + .../000002_create_user_chats_table.down.cql | 1 + .../000002_create_user_chats_table.up.cql | 7 + .../storages/scylla/repository/message.go | 23 + .../streamers/kafka/consumer/message.go | 94 ++ .../streamers/kafka/producer/message.go | 73 ++ internal/core/domain/errors.go | 16 + internal/core/domain/message.go | 20 + internal/core/domain/token.go | 7 + internal/core/domain/user.go | 14 + internal/core/port/auth.go | 17 + internal/core/port/message.go | 26 + internal/core/port/user.go | 22 + internal/core/service/auth.go | 72 ++ internal/core/service/message.go | 55 ++ internal/core/service/user.go | 49 ++ internal/core/utils/convert.go | 9 + internal/core/utils/os.go | 15 + internal/core/utils/password.go | 26 + internal/core/utils/time.go | 7 + 54 files changed, 2827 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 cmd/http/main.go create mode 100644 config/scylla/cassandra-rackdc.properties.dc1.rack1 create mode 100644 config/scylla/cassandra-rackdc.properties.dc1.rack2 create mode 100644 config/scylla/message.cql create mode 100644 config/scylla/scylla.yaml create mode 100644 docker-compose.yml create mode 100644 go.mod create mode 100644 go.sum create mode 100644 internal/adapter/auth/jwt/jwt.go create mode 100644 internal/adapter/config/app.go create mode 100644 internal/adapter/config/config.go create mode 100644 internal/adapter/config/helper.go create mode 100644 internal/adapter/config/kafka.go create mode 100644 internal/adapter/config/postgres.go create mode 100644 internal/adapter/config/scylla.go create mode 100644 internal/adapter/handlers/http/app.go create mode 100644 internal/adapter/handlers/http/auth.go create mode 100644 internal/adapter/handlers/http/chat.go create mode 100644 internal/adapter/handlers/http/helper.go create mode 100644 internal/adapter/handlers/http/middlewares/auth.go create mode 100644 internal/adapter/handlers/http/middlewares/ws.go create mode 100644 internal/adapter/handlers/http/responses.go create mode 100644 internal/adapter/handlers/http/user.go create mode 100644 internal/adapter/handlers/http/validator.go create mode 100644 internal/adapter/logger/slog.go create mode 100644 internal/adapter/storages/postgres/db.go create mode 100644 internal/adapter/storages/postgres/migrations/000001_create_users_table.down.sql create mode 100644 internal/adapter/storages/postgres/migrations/000001_create_users_table.up.sql create mode 100644 internal/adapter/storages/postgres/models/user.go create mode 100644 internal/adapter/storages/postgres/repository/user.go create mode 100644 internal/adapter/storages/scylla/db.go create mode 100644 internal/adapter/storages/scylla/migrations/000001_create_messages_table.down.cql create mode 100644 internal/adapter/storages/scylla/migrations/000001_create_messages_table.up.cql create mode 100644 internal/adapter/storages/scylla/migrations/000002_create_user_chats_table.down.cql create mode 100644 internal/adapter/storages/scylla/migrations/000002_create_user_chats_table.up.cql create mode 100644 internal/adapter/storages/scylla/repository/message.go create mode 100644 internal/adapter/streamers/kafka/consumer/message.go create mode 100644 internal/adapter/streamers/kafka/producer/message.go create mode 100644 internal/core/domain/errors.go create mode 100644 internal/core/domain/message.go create mode 100644 internal/core/domain/token.go create mode 100644 internal/core/domain/user.go create mode 100644 internal/core/port/auth.go create mode 100644 internal/core/port/message.go create mode 100644 internal/core/port/user.go create mode 100644 internal/core/service/auth.go create mode 100644 internal/core/service/message.go create mode 100644 internal/core/service/user.go create mode 100644 internal/core/utils/convert.go create mode 100644 internal/core/utils/os.go create mode 100644 internal/core/utils/password.go create mode 100644 internal/core/utils/time.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..03bd412 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.env diff --git a/README.md b/README.md new file mode 100644 index 0000000..e69de29 diff --git a/cmd/http/main.go b/cmd/http/main.go new file mode 100644 index 0000000..6473d02 --- /dev/null +++ b/cmd/http/main.go @@ -0,0 +1,83 @@ +package main + +import ( + "fmt" + + "github.com/aykhans/oh-my-chat/internal/adapter/auth/jwt" + "github.com/aykhans/oh-my-chat/internal/adapter/config" + httpHandlers "github.com/aykhans/oh-my-chat/internal/adapter/handlers/http" + middlewares "github.com/aykhans/oh-my-chat/internal/adapter/handlers/http/middlewares" + "github.com/aykhans/oh-my-chat/internal/adapter/logger" + "github.com/aykhans/oh-my-chat/internal/adapter/storages/postgres" + postgresRepo "github.com/aykhans/oh-my-chat/internal/adapter/storages/postgres/repository" + "github.com/aykhans/oh-my-chat/internal/adapter/storages/scylla" + scyllaRepo "github.com/aykhans/oh-my-chat/internal/adapter/storages/scylla/repository" + kafkaConsumer "github.com/aykhans/oh-my-chat/internal/adapter/streamers/kafka/consumer" + kafkaProducer "github.com/aykhans/oh-my-chat/internal/adapter/streamers/kafka/producer" + "github.com/aykhans/oh-my-chat/internal/core/service" + "github.com/aykhans/oh-my-chat/internal/core/utils" +) + +var log = logger.NewStdLogger() + +func main() { + containerConfig := config.NewContainerConfig() + + postgresDB, err := postgres.NewDB(containerConfig.PostgresConfig) + if err != nil { + log.Error("Error postgres connection", "error", err) + utils.ExitErr() + } + + userRepository := postgresRepo.NewUserRepository(postgresDB) + userService := service.NewUserService(userRepository) + validator := httpHandlers.NewValidator() + userHandler := httpHandlers.NewUserHandler(userService, validator) + + jwtService := jwt.NewJWTService( + *containerConfig.AppConfig.JWTDuration, + containerConfig.AppConfig.SecretKey, + ) + authService := service.NewAuthService(userRepository, jwtService) + authHandler := httpHandlers.NewAuthHandler(authService, validator) + + messageProducer, err := kafkaProducer.NewMessageProducer(containerConfig.KafkaConfig.KafkaProducerConfig) + if err != nil { + log.Error("Error kafka producer connection", "error", err) + utils.ExitErr() + } + + messageConsumer := kafkaConsumer.NewMessageConsumer(containerConfig.KafkaConfig.KafkaConsumerConfig) + scyllaDB, err := scylla.NewDB(containerConfig.ScyllaConfig) + if err != nil { + log.Error("Error scylla connection", "error", err) + utils.ExitErr() + } + + messageRepository := scyllaRepo.NewMessageRepository(scyllaDB) + messageService := service.NewMessageService(messageProducer, messageConsumer, messageRepository) + chatHandler := httpHandlers.NewChatHandler(messageService, validator) + + authMiddleware := middlewares.NewAuthMiddleware(jwtService) + wsMiddleware := middlewares.NewWSMiddleware() + + app := httpHandlers.NewApp( + // configs + containerConfig.AppConfig.IsDev, + containerConfig.CORSAllowedOrigins, + // middlewares + authMiddleware, wsMiddleware, + // handlers + userHandler, authHandler, chatHandler, + ) + + err = app.Listen( + fmt.Sprintf( + ":%d", containerConfig.AppConfig.ListenerPort, + ), + ) + if err != nil { + log.Error("Error starting app", "error", err) + utils.ExitErr() + } +} diff --git a/config/scylla/cassandra-rackdc.properties.dc1.rack1 b/config/scylla/cassandra-rackdc.properties.dc1.rack1 new file mode 100644 index 0000000..a87e7c9 --- /dev/null +++ b/config/scylla/cassandra-rackdc.properties.dc1.rack1 @@ -0,0 +1,12 @@ +# +# cassandra-rackdc.properties +# +# The lines may include white spaces at the beginning and the end. +# The rack and data center names may also include white spaces. +# All trailing and leading white spaces will be trimmed. +# +dc=DC1 +rack=Rack1 +# prefer_local= +# dc_suffix= +# diff --git a/config/scylla/cassandra-rackdc.properties.dc1.rack2 b/config/scylla/cassandra-rackdc.properties.dc1.rack2 new file mode 100644 index 0000000..8db09b9 --- /dev/null +++ b/config/scylla/cassandra-rackdc.properties.dc1.rack2 @@ -0,0 +1,12 @@ +# +# cassandra-rackdc.properties +# +# The lines may include white spaces at the beginning and the end. +# The rack and data center names may also include white spaces. +# All trailing and leading white spaces will be trimmed. +# +dc=DC1 +rack=Rack2 +# prefer_local= +# dc_suffix= +# diff --git a/config/scylla/message.cql b/config/scylla/message.cql new file mode 100644 index 0000000..e509701 --- /dev/null +++ b/config/scylla/message.cql @@ -0,0 +1,37 @@ +CREATE TABLE messages ( + chat_id UUID, -- Partition key + user_id UUID, + content text, + type text, + created_at timestamp, -- Clustering column + PRIMARY KEY (chat_id, content, type, created_at) +) WITH CLUSTERING ORDER BY (created_at DESC); + +CREATE TABLE user_chats ( + user_id UUID, -- Partition key + chat_id UUID, -- Clustering column + blocked boolean, + created_at timestamp, + PRIMARY KEY (user_id, chat_id, blocked, created_at) +); + +CREATE MATERIALIZED VIEW chat_users AS + SELECT chat_id, user_id, blocked, created_at + FROM user_chats + WHERE chat_id IS NOT NULL AND user_id IS NOT NULL AND created_at IS NOT NULL + PRIMARY KEY (chat_id, user_id, blocked, created_at); + +BEGIN BATCH + INSERT INTO messages (chat_id, user_id, content, type, created_at) + VALUES ('b9b29446-d50a-42d0-b3e2-bfaabd85fa4b', '8c796bf4-c9cd-4851-8379-6f67f69e1b99', 'Message 1', 'text', toTimestamp(now())) + IF EXISTS + AND (SELECT blocked FROM user_chats WHERE user_id = '8c796bf4-c9cd-4851-8379-6f67f69e1b99' AND + chat_id = 'b9b29446-d50a-42d0-b3e2-bfaabd85fa4b') = false; +APPLY BATCH; + +INSERT INTO user_chats (user_id, chat_id, blocked, created_at) VALUES (uuid(), uuid(), false, toTimestamp(now())); + +INSERT INTO messages (chat_id, user_id, content, type, created_at, year_month, seen) +VALUES (uuid(), uuid(), 'Your message content', 'text', toTimestamp(now()), '2020-01'); + +SELECT * FROM messages LIMIT 1; diff --git a/config/scylla/scylla.yaml b/config/scylla/scylla.yaml new file mode 100644 index 0000000..9153297 --- /dev/null +++ b/config/scylla/scylla.yaml @@ -0,0 +1,803 @@ +# Scylla storage config YAML + +####################################### +# This file is split to two sections: +# 1. Supported parameters +# 2. Unsupported parameters: reserved for future use or backwards +# compatibility. +# Scylla will only read and use the first segment +####################################### + +### Supported Parameters + +# The name of the cluster. This is mainly used to prevent machines in +# one logical cluster from joining another. +# It is recommended to change the default value when creating a new cluster. +# You can NOT modify this value for an existing cluster +#cluster_name: 'Test Cluster' + +# This defines the number of tokens randomly assigned to this node on the ring +# The more tokens, relative to other nodes, the larger the proportion of data +# that this node will store. You probably want all nodes to have the same number +# of tokens assuming they have equal hardware capability. +# +# If you already have a cluster with 1 token per node, and wish to migrate to +# multiple tokens per node, see http://wiki.apache.org/cassandra/Operations +num_tokens: 256 + +# Directory where Scylla should store data on disk. +# If not set, the default directory is $CASSANDRA_HOME/data/data. +data_file_directories: + - /var/lib/scylla/data + +# commit log. when running on magnetic HDD, this should be a +# separate spindle than the data directories. +# If not set, the default directory is $CASSANDRA_HOME/data/commitlog. +commitlog_directory: /var/lib/scylla/commitlog + +# commitlog_sync may be either "periodic" or "batch." +# +# When in batch mode, Scylla won't ack writes until the commit log +# has been fsynced to disk. It will wait +# commitlog_sync_batch_window_in_ms milliseconds between fsyncs. +# This window should be kept short because the writer threads will +# be unable to do extra work while waiting. (You may need to increase +# concurrent_writes for the same reason.) +# +# commitlog_sync: batch +# commitlog_sync_batch_window_in_ms: 2 +# +# the other option is "periodic" where writes may be acked immediately +# and the CommitLog is simply synced every commitlog_sync_period_in_ms +# milliseconds. +commitlog_sync: periodic +commitlog_sync_period_in_ms: 10000 + +# The size of the individual commitlog file segments. A commitlog +# segment may be archived, deleted, or recycled once all the data +# in it (potentially from each columnfamily in the system) has been +# flushed to sstables. +# +# The default size is 32, which is almost always fine, but if you are +# archiving commitlog segments (see commitlog_archiving.properties), +# then you probably want a finer granularity of archiving; 8 or 16 MB +# is reasonable. +commitlog_segment_size_in_mb: 32 + +# seed_provider class_name is saved for future use. +# seeds address are mandatory! +seed_provider: + # Addresses of hosts that are deemed contact points. + # Scylla nodes use this list of hosts to find each other and learn + # the topology of the ring. You must change this if you are running + # multiple nodes! + - class_name: org.apache.cassandra.locator.SimpleSeedProvider + parameters: + # seeds is actually a comma-delimited list of addresses. + # Ex: ",," + - seeds: "127.0.0.1" + +# Address or interface to bind to and tell other Scylla nodes to connect to. +# You _must_ change this if you want multiple nodes to be able to communicate! +# +# Setting listen_address to 0.0.0.0 is always wrong. +listen_address: localhost + +# Address to broadcast to other Scylla nodes +# Leaving this blank will set it to the same value as listen_address +# broadcast_address: 1.2.3.4 + + +# When using multiple physical network interfaces, set this to true to listen on broadcast_address +# in addition to the listen_address, allowing nodes to communicate in both interfaces. +# Ignore this property if the network configuration automatically routes between the public and private networks such as EC2. +# +# listen_on_broadcast_address: false + +# port for the CQL native transport to listen for clients on +# For security reasons, you should not expose this port to the internet. Firewall it if needed. +native_transport_port: 9042 + +# Enabling native transport encryption in client_encryption_options allows you to either use +# encryption for the standard port or to use a dedicated, additional port along with the unencrypted +# standard native_transport_port. +# Enabling client encryption and keeping native_transport_port_ssl disabled will use encryption +# for native_transport_port. Setting native_transport_port_ssl to a different value +# from native_transport_port will use encryption for native_transport_port_ssl while +# keeping native_transport_port unencrypted. +#native_transport_port_ssl: 9142 + +# How long the coordinator should wait for read operations to complete +read_request_timeout_in_ms: 5000 + +# How long the coordinator should wait for writes to complete +write_request_timeout_in_ms: 2000 + +# phi value that must be reached for a host to be marked down. +# most users should never need to adjust this. +# phi_convict_threshold: 8 + +# IEndpointSnitch. The snitch has two functions: +# - it teaches Scylla enough about your network topology to route +# requests efficiently +# - it allows Scylla to spread replicas around your cluster to avoid +# correlated failures. It does this by grouping machines into +# "datacenters" and "racks." Scylla will do its best not to have +# more than one replica on the same "rack" (which may not actually +# be a physical location) +# +# IF YOU CHANGE THE SNITCH AFTER DATA IS INSERTED INTO THE CLUSTER, +# YOU MUST RUN A FULL REPAIR, SINCE THE SNITCH AFFECTS WHERE REPLICAS +# ARE PLACED. +# +# Out of the box, Scylla provides +# - SimpleSnitch: +# Treats Strategy order as proximity. This can improve cache +# locality when disabling read repair. Only appropriate for +# single-datacenter deployments. +# - GossipingPropertyFileSnitch +# This should be your go-to snitch for production use. The rack +# and datacenter for the local node are defined in +# cassandra-rackdc.properties and propagated to other nodes via +# gossip. If cassandra-topology.properties exists, it is used as a +# fallback, allowing migration from the PropertyFileSnitch. +# - PropertyFileSnitch: +# Proximity is determined by rack and data center, which are +# explicitly configured in cassandra-topology.properties. +# - Ec2Snitch: +# Appropriate for EC2 deployments in a single Region. Loads Region +# and Availability Zone information from the EC2 API. The Region is +# treated as the datacenter, and the Availability Zone as the rack. +# Only private IPs are used, so this will not work across multiple +# Regions. +# - Ec2MultiRegionSnitch: +# Uses public IPs as broadcast_address to allow cross-region +# connectivity. (Thus, you should set seed addresses to the public +# IP as well.) You will need to open the storage_port or +# ssl_storage_port on the public IP firewall. (For intra-Region +# traffic, Scylla will switch to the private IP after +# establishing a connection.) +# - RackInferringSnitch: +# Proximity is determined by rack and data center, which are +# assumed to correspond to the 3rd and 2nd octet of each node's IP +# address, respectively. Unless this happens to match your +# deployment conventions, this is best used as an example of +# writing a custom Snitch class and is provided in that spirit. +# +# You can use a custom Snitch by setting this to the full class name +# of the snitch, which will be assumed to be on your classpath. +endpoint_snitch: GossipingPropertyFileSnitch + +# The address or interface to bind the Thrift RPC service and native transport +# server to. +# +# Set rpc_address OR rpc_interface, not both. Interfaces must correspond +# to a single address, IP aliasing is not supported. +# +# Leaving rpc_address blank has the same effect as on listen_address +# (i.e. it will be based on the configured hostname of the node). +# +# Note that unlike listen_address, you can specify 0.0.0.0, but you must also +# set broadcast_rpc_address to a value other than 0.0.0.0. +# +# For security reasons, you should not expose this port to the internet. Firewall it if needed. +# +# If you choose to specify the interface by name and the interface has an ipv4 and an ipv6 address +# you can specify which should be chosen using rpc_interface_prefer_ipv6. If false the first ipv4 +# address will be used. If true the first ipv6 address will be used. Defaults to false preferring +# ipv4. If there is only one address it will be selected regardless of ipv4/ipv6. +rpc_address: localhost +# rpc_interface: eth1 +# rpc_interface_prefer_ipv6: false + +# port for Thrift to listen for clients on +rpc_port: 9160 + +# port for REST API server +api_port: 10000 + +# IP for the REST API server +api_address: 127.0.0.1 + +# Log WARN on any batch size exceeding this value. 5kb per batch by default. +# Caution should be taken on increasing the size of this threshold as it can lead to node instability. +batch_size_warn_threshold_in_kb: 5 + +# Fail any multiple-partition batch exceeding this value. 50kb (10x warn threshold) by default. +batch_size_fail_threshold_in_kb: 50 + +# Authentication backend, identifying users +# Out of the box, Scylla provides org.apache.cassandra.auth.{AllowAllAuthenticator, +# PasswordAuthenticator}. +# +# - AllowAllAuthenticator performs no checks - set it to disable authentication. +# - PasswordAuthenticator relies on username/password pairs to authenticate +# users. It keeps usernames and hashed passwords in system_auth.credentials table. +# Please increase system_auth keyspace replication factor if you use this authenticator. +# authenticator: AllowAllAuthenticator + +# Authorization backend, implementing IAuthorizer; used to limit access/provide permissions +# Out of the box, Scylla provides org.apache.cassandra.auth.{AllowAllAuthorizer, +# CassandraAuthorizer}. +# +# - AllowAllAuthorizer allows any action to any user - set it to disable authorization. +# - CassandraAuthorizer stores permissions in system_auth.permissions table. Please +# increase system_auth keyspace replication factor if you use this authorizer. +# authorizer: AllowAllAuthorizer + +# initial_token allows you to specify tokens manually. While you can use # it with +# vnodes (num_tokens > 1, above) -- in which case you should provide a +# comma-separated list -- it's primarily used when adding nodes # to legacy clusters +# that do not have vnodes enabled. +# initial_token: + +# RPC address to broadcast to drivers and other Scylla nodes. This cannot +# be set to 0.0.0.0. If left blank, this will be set to the value of +# rpc_address. If rpc_address is set to 0.0.0.0, broadcast_rpc_address must +# be set. +# broadcast_rpc_address: 1.2.3.4 + +# Uncomment to enable experimental features +# experimental: true + +# The directory where hints files are stored if hinted handoff is enabled. +# hints_directory: /var/lib/scylla/hints + +# See http://wiki.apache.org/cassandra/HintedHandoff +# May either be "true" or "false" to enable globally, or contain a list +# of data centers to enable per-datacenter. +# hinted_handoff_enabled: DC1,DC2 +# hinted_handoff_enabled: true + +# this defines the maximum amount of time a dead host will have hints +# generated. After it has been dead this long, new hints for it will not be +# created until it has been seen alive and gone down again. +# max_hint_window_in_ms: 10800000 # 3 hours + +# Maximum throttle in KBs per second, per delivery thread. This will be +# reduced proportionally to the number of nodes in the cluster. (If there +# are two nodes in the cluster, each delivery thread will use the maximum +# rate; if there are three, each will throttle to half of the maximum, +# since we expect two nodes to be delivering hints simultaneously.) +# hinted_handoff_throttle_in_kb: 1024 +# Number of threads with which to deliver hints; +# Consider increasing this number when you have multi-dc deployments, since +# cross-dc handoff tends to be slower +# max_hints_delivery_threads: 2 + +################################################### +## Not currently supported, reserved for future use +################################################### + +# Maximum throttle in KBs per second, total. This will be +# reduced proportionally to the number of nodes in the cluster. +# batchlog_replay_throttle_in_kb: 1024 + +# Validity period for permissions cache (fetching permissions can be an +# expensive operation depending on the authorizer, CassandraAuthorizer is +# one example). Defaults to 10000, set to 0 to disable. +# Will be disabled automatically for AllowAllAuthorizer. +# permissions_validity_in_ms: 10000 + +# Refresh interval for permissions cache (if enabled). +# After this interval, cache entries become eligible for refresh. Upon next +# access, an async reload is scheduled and the old value returned until it +# completes. If permissions_validity_in_ms is non-zero, then this also must have +# a non-zero value. Defaults to 2000. It's recommended to set this value to +# be at least 3 times smaller than the permissions_validity_in_ms. +# permissions_update_interval_in_ms: 2000 + +# The partitioner is responsible for distributing groups of rows (by +# partition key) across nodes in the cluster. You should leave this +# alone for new clusters. The partitioner can NOT be changed without +# reloading all data, so when upgrading you should set this to the +# same partitioner you were already using. +# +# Besides Murmur3Partitioner, partitioners included for backwards +# compatibility include RandomPartitioner, ByteOrderedPartitioner, and +# OrderPreservingPartitioner. +# +partitioner: org.apache.cassandra.dht.Murmur3Partitioner + +# Maximum size of the key cache in memory. +# +# Each key cache hit saves 1 seek and each row cache hit saves 2 seeks at the +# minimum, sometimes more. The key cache is fairly tiny for the amount of +# time it saves, so it's worthwhile to use it at large numbers. +# The row cache saves even more time, but must contain the entire row, +# so it is extremely space-intensive. It's best to only use the +# row cache if you have hot rows or static rows. +# +# NOTE: if you reduce the size, you may not get you hottest keys loaded on startup. +# +# Default value is empty to make it "auto" (min(5% of Heap (in MB), 100MB)). Set to 0 to disable key cache. +# key_cache_size_in_mb: + +# Duration in seconds after which Scylla should +# save the key cache. Caches are saved to saved_caches_directory as +# specified in this configuration file. +# +# Saved caches greatly improve cold-start speeds, and is relatively cheap in +# terms of I/O for the key cache. Row cache saving is much more expensive and +# has limited use. +# +# Default is 14400 or 4 hours. +# key_cache_save_period: 14400 + +# Number of keys from the key cache to save +# Disabled by default, meaning all keys are going to be saved +# key_cache_keys_to_save: 100 + +# Maximum size of the row cache in memory. +# NOTE: if you reduce the size, you may not get you hottest keys loaded on startup. +# +# Default value is 0, to disable row caching. +# row_cache_size_in_mb: 0 + +# Duration in seconds after which Scylla should +# save the row cache. Caches are saved to saved_caches_directory as specified +# in this configuration file. +# +# Saved caches greatly improve cold-start speeds, and is relatively cheap in +# terms of I/O for the key cache. Row cache saving is much more expensive and +# has limited use. +# +# Default is 0 to disable saving the row cache. +# row_cache_save_period: 0 + +# Number of keys from the row cache to save +# Disabled by default, meaning all keys are going to be saved +# row_cache_keys_to_save: 100 + +# Maximum size of the counter cache in memory. +# +# Counter cache helps to reduce counter locks' contention for hot counter cells. +# In case of RF = 1 a counter cache hit will cause Scylla to skip the read before +# write entirely. With RF > 1 a counter cache hit will still help to reduce the duration +# of the lock hold, helping with hot counter cell updates, but will not allow skipping +# the read entirely. Only the local (clock, count) tuple of a counter cell is kept +# in memory, not the whole counter, so it's relatively cheap. +# +# NOTE: if you reduce the size, you may not get you hottest keys loaded on startup. +# +# Default value is empty to make it "auto" (min(2.5% of Heap (in MB), 50MB)). Set to 0 to disable counter cache. +# NOTE: if you perform counter deletes and rely on low gcgs, you should disable the counter cache. +# counter_cache_size_in_mb: + +# Duration in seconds after which Scylla should +# save the counter cache (keys only). Caches are saved to saved_caches_directory as +# specified in this configuration file. +# +# Default is 7200 or 2 hours. +# counter_cache_save_period: 7200 + +# Number of keys from the counter cache to save +# Disabled by default, meaning all keys are going to be saved +# counter_cache_keys_to_save: 100 + +# The off-heap memory allocator. Affects storage engine metadata as +# well as caches. Experiments show that JEMAlloc saves some memory +# than the native GCC allocator (i.e., JEMalloc is more +# fragmentation-resistant). +# +# Supported values are: NativeAllocator, JEMallocAllocator +# +# If you intend to use JEMallocAllocator you have to install JEMalloc as library and +# modify cassandra-env.sh as directed in the file. +# +# Defaults to NativeAllocator +# memory_allocator: NativeAllocator + +# saved caches +# If not set, the default directory is $CASSANDRA_HOME/data/saved_caches. +# saved_caches_directory: /var/lib/scylla/saved_caches + + + +# For workloads with more data than can fit in memory, Scylla's +# bottleneck will be reads that need to fetch data from +# disk. "concurrent_reads" should be set to (16 * number_of_drives) in +# order to allow the operations to enqueue low enough in the stack +# that the OS and drives can reorder them. Same applies to +# "concurrent_counter_writes", since counter writes read the current +# values before incrementing and writing them back. +# +# On the other hand, since writes are almost never IO bound, the ideal +# number of "concurrent_writes" is dependent on the number of cores in +# your system; (8 * number_of_cores) is a good rule of thumb. +# concurrent_reads: 32 +# concurrent_writes: 32 +# concurrent_counter_writes: 32 + +# Total memory to use for sstable-reading buffers. Defaults to +# the smaller of 1/4 of heap or 512MB. +# file_cache_size_in_mb: 512 + +# Total space to use for commitlogs. +# +# If space gets above this value (it will round up to the next nearest +# segment multiple), Scylla will flush every dirty CF in the oldest +# segment and remove it. So a small total commitlog space will tend +# to cause more flush activity on less-active columnfamilies. +# +# A value of -1 (default) will automatically equate it to the total amount of memory +# available for Scylla. +commitlog_total_space_in_mb: -1 + +# A fixed memory pool size in MB for for SSTable index summaries. If left +# empty, this will default to 5% of the heap size. If the memory usage of +# all index summaries exceeds this limit, SSTables with low read rates will +# shrink their index summaries in order to meet this limit. However, this +# is a best-effort process. In extreme conditions Scylla may need to use +# more than this amount of memory. +# index_summary_capacity_in_mb: + +# How frequently index summaries should be resampled. This is done +# periodically to redistribute memory from the fixed-size pool to sstables +# proportional their recent read rates. Setting to -1 will disable this +# process, leaving existing index summaries at their current sampling level. +# index_summary_resize_interval_in_minutes: 60 + +# Whether to, when doing sequential writing, fsync() at intervals in +# order to force the operating system to flush the dirty +# buffers. Enable this to avoid sudden dirty buffer flushing from +# impacting read latencies. Almost always a good idea on SSDs; not +# necessarily on platters. +# trickle_fsync: false +# trickle_fsync_interval_in_kb: 10240 + +# TCP port, for commands and data +# For security reasons, you should not expose this port to the internet. Firewall it if needed. +# storage_port: 7000 + +# SSL port, for encrypted communication. Unused unless enabled in +# encryption_options +# For security reasons, you should not expose this port to the internet. Firewall it if needed. +# ssl_storage_port: 7001 + +# listen_interface: eth0 +# listen_interface_prefer_ipv6: false + +# Internode authentication backend, implementing IInternodeAuthenticator; +# used to allow/disallow connections from peer nodes. +# internode_authenticator: org.apache.cassandra.auth.AllowAllInternodeAuthenticator + +# Whether to start the native transport server. +# Please note that the address on which the native transport is bound is the +# same as the rpc_address. The port however is different and specified below. +# start_native_transport: true + +# The maximum threads for handling requests when the native transport is used. +# This is similar to rpc_max_threads though the default differs slightly (and +# there is no native_transport_min_threads, idle threads will always be stopped +# after 30 seconds). +# native_transport_max_threads: 128 +# +# The maximum size of allowed frame. Frame (requests) larger than this will +# be rejected as invalid. The default is 256MB. +# native_transport_max_frame_size_in_mb: 256 + +# The maximum number of concurrent client connections. +# The default is -1, which means unlimited. +# native_transport_max_concurrent_connections: -1 + +# The maximum number of concurrent client connections per source ip. +# The default is -1, which means unlimited. +# native_transport_max_concurrent_connections_per_ip: -1 + +# Whether to start the thrift rpc server. +# start_rpc: true + +# enable or disable keepalive on rpc/native connections +# rpc_keepalive: true + +# Scylla provides two out-of-the-box options for the RPC Server: +# +# sync -> One thread per thrift connection. For a very large number of clients, memory +# will be your limiting factor. On a 64 bit JVM, 180KB is the minimum stack size +# per thread, and that will correspond to your use of virtual memory (but physical memory +# may be limited depending on use of stack space). +# +# hsha -> Stands for "half synchronous, half asynchronous." All thrift clients are handled +# asynchronously using a small number of threads that does not vary with the amount +# of thrift clients (and thus scales well to many clients). The rpc requests are still +# synchronous (one thread per active request). If hsha is selected then it is essential +# that rpc_max_threads is changed from the default value of unlimited. +# +# The default is sync because on Windows hsha is about 30% slower. On Linux, +# sync/hsha performance is about the same, with hsha of course using less memory. +# +# Alternatively, can provide your own RPC server by providing the fully-qualified class name +# of an o.a.c.t.TServerFactory that can create an instance of it. +# rpc_server_type: sync + +# Uncomment rpc_min|max_thread to set request pool size limits. +# +# Regardless of your choice of RPC server (see above), the number of maximum requests in the +# RPC thread pool dictates how many concurrent requests are possible (but if you are using the sync +# RPC server, it also dictates the number of clients that can be connected at all). +# +# The default is unlimited and thus provides no protection against clients overwhelming the server. You are +# encouraged to set a maximum that makes sense for you in production, but do keep in mind that +# rpc_max_threads represents the maximum number of client requests this server may execute concurrently. +# +# rpc_min_threads: 16 +# rpc_max_threads: 2048 + +# uncomment to set socket buffer sizes on rpc connections +# rpc_send_buff_size_in_bytes: +# rpc_recv_buff_size_in_bytes: + +# Uncomment to set socket buffer size for internode communication +# Note that when setting this, the buffer size is limited by net.core.wmem_max +# and when not setting it it is defined by net.ipv4.tcp_wmem +# See: +# /proc/sys/net/core/wmem_max +# /proc/sys/net/core/rmem_max +# /proc/sys/net/ipv4/tcp_wmem +# /proc/sys/net/ipv4/tcp_wmem +# and: man tcp +# internode_send_buff_size_in_bytes: +# internode_recv_buff_size_in_bytes: + +# Frame size for thrift (maximum message length). +# thrift_framed_transport_size_in_mb: 15 + +# Set to true to have Scylla create a hard link to each sstable +# flushed or streamed locally in a backups/ subdirectory of the +# keyspace data. Removing these links is the operator's +# responsibility. +# incremental_backups: false + +# Whether or not to take a snapshot before each compaction. Be +# careful using this option, since Scylla won't clean up the +# snapshots for you. Mostly useful if you're paranoid when there +# is a data format change. +# snapshot_before_compaction: false + +# Whether or not a snapshot is taken of the data before keyspace truncation +# or dropping of column families. The STRONGLY advised default of true +# should be used to provide data safety. If you set this flag to false, you will +# lose data on truncation or drop. +# auto_snapshot: true + +# When executing a scan, within or across a partition, we need to keep the +# tombstones seen in memory so we can return them to the coordinator, which +# will use them to make sure other replicas also know about the deleted rows. +# With workloads that generate a lot of tombstones, this can cause performance +# problems and even exaust the server heap. +# (http://www.datastax.com/dev/blog/cassandra-anti-patterns-queues-and-queue-like-datasets) +# Adjust the thresholds here if you understand the dangers and want to +# scan more tombstones anyway. These thresholds may also be adjusted at runtime +# using the StorageService mbean. +# tombstone_warn_threshold: 1000 +# tombstone_failure_threshold: 100000 + +# Granularity of the collation index of rows within a partition. +# Increase if your rows are large, or if you have a very large +# number of rows per partition. The competing goals are these: +# 1) a smaller granularity means more index entries are generated +# and looking up rows withing the partition by collation column +# is faster +# 2) but, Scylla will keep the collation index in memory for hot +# rows (as part of the key cache), so a larger granularity means +# you can cache more hot rows +# column_index_size_in_kb: 64 + + +# Number of simultaneous compactions to allow, NOT including +# validation "compactions" for anti-entropy repair. Simultaneous +# compactions can help preserve read performance in a mixed read/write +# workload, by mitigating the tendency of small sstables to accumulate +# during a single long running compactions. The default is usually +# fine and if you experience problems with compaction running too +# slowly or too fast, you should look at +# compaction_throughput_mb_per_sec first. +# +# concurrent_compactors defaults to the smaller of (number of disks, +# number of cores), with a minimum of 2 and a maximum of 8. +# +# If your data directories are backed by SSD, you should increase this +# to the number of cores. +#concurrent_compactors: 1 + +# Throttles compaction to the given total throughput across the entire +# system. The faster you insert data, the faster you need to compact in +# order to keep the sstable count down, but in general, setting this to +# 16 to 32 times the rate you are inserting data is more than sufficient. +# Setting this to 0 disables throttling. Note that this account for all types +# of compaction, including validation compaction. +# compaction_throughput_mb_per_sec: 16 + +# Log a warning when compacting partitions larger than this value +# compaction_large_partition_warning_threshold_mb: 100 + +# When compacting, the replacement sstable(s) can be opened before they +# are completely written, and used in place of the prior sstables for +# any range that has been written. This helps to smoothly transfer reads +# between the sstables, reducing page cache churn and keeping hot rows hot +# sstable_preemptive_open_interval_in_mb: 50 + +# Throttles all streaming file transfer between the datacenters, +# this setting allows users to throttle inter dc stream throughput in addition +# to throttling all network stream traffic as configured with +# stream_throughput_outbound_megabits_per_sec +# inter_dc_stream_throughput_outbound_megabits_per_sec: + +# How long the coordinator should wait for seq or index scans to complete +# range_request_timeout_in_ms: 10000 +# How long the coordinator should wait for writes to complete +# counter_write_request_timeout_in_ms: 5000 +# How long a coordinator should continue to retry a CAS operation +# that contends with other proposals for the same row +# cas_contention_timeout_in_ms: 1000 +# How long the coordinator should wait for truncates to complete +# (This can be much longer, because unless auto_snapshot is disabled +# we need to flush first so we can snapshot before removing the data.) +# truncate_request_timeout_in_ms: 60000 +# The default timeout for other, miscellaneous operations +# request_timeout_in_ms: 10000 + +# Enable operation timeout information exchange between nodes to accurately +# measure request timeouts. If disabled, replicas will assume that requests +# were forwarded to them instantly by the coordinator, which means that +# under overload conditions we will waste that much extra time processing +# already-timed-out requests. +# +# Warning: before enabling this property make sure to ntp is installed +# and the times are synchronized between the nodes. +# cross_node_timeout: false + +# Enable socket timeout for streaming operation. +# When a timeout occurs during streaming, streaming is retried from the start +# of the current file. This _can_ involve re-streaming an important amount of +# data, so you should avoid setting the value too low. +# Default value is 0, which never timeout streams. +# streaming_socket_timeout_in_ms: 0 + +# controls how often to perform the more expensive part of host score +# calculation +# dynamic_snitch_update_interval_in_ms: 100 + +# controls how often to reset all host scores, allowing a bad host to +# possibly recover +# dynamic_snitch_reset_interval_in_ms: 600000 + +# if set greater than zero and read_repair_chance is < 1.0, this will allow +# 'pinning' of replicas to hosts in order to increase cache capacity. +# The badness threshold will control how much worse the pinned host has to be +# before the dynamic snitch will prefer other replicas over it. This is +# expressed as a double which represents a percentage. Thus, a value of +# 0.2 means Scylla would continue to prefer the static snitch values +# until the pinned host was 20% worse than the fastest. +# dynamic_snitch_badness_threshold: 0.1 + +# request_scheduler -- Set this to a class that implements +# RequestScheduler, which will schedule incoming client requests +# according to the specific policy. This is useful for multi-tenancy +# with a single Scylla cluster. +# NOTE: This is specifically for requests from the client and does +# not affect inter node communication. +# org.apache.cassandra.scheduler.NoScheduler - No scheduling takes place +# org.apache.cassandra.scheduler.RoundRobinScheduler - Round robin of +# client requests to a node with a separate queue for each +# request_scheduler_id. The scheduler is further customized by +# request_scheduler_options as described below. +# request_scheduler: org.apache.cassandra.scheduler.NoScheduler + +# Scheduler Options vary based on the type of scheduler +# NoScheduler - Has no options +# RoundRobin +# - throttle_limit -- The throttle_limit is the number of in-flight +# requests per client. Requests beyond +# that limit are queued up until +# running requests can complete. +# The value of 80 here is twice the number of +# concurrent_reads + concurrent_writes. +# - default_weight -- default_weight is optional and allows for +# overriding the default which is 1. +# - weights -- Weights are optional and will default to 1 or the +# overridden default_weight. The weight translates into how +# many requests are handled during each turn of the +# RoundRobin, based on the scheduler id. +# +# request_scheduler_options: +# throttle_limit: 80 +# default_weight: 5 +# weights: +# Keyspace1: 1 +# Keyspace2: 5 + +# request_scheduler_id -- An identifier based on which to perform +# the request scheduling. Currently the only valid option is keyspace. +# request_scheduler_id: keyspace + +# Enable or disable inter-node encryption. +# You must also generate keys and provide the appropriate key and trust store locations and passwords. +# No custom encryption options are currently enabled. The available options are: +# +# The available internode options are : all, none, dc, rack +# If set to dc scylla will encrypt the traffic between the DCs +# If set to rack scylla will encrypt the traffic between the racks +# +# server_encryption_options: +# internode_encryption: none +# certificate: conf/scylla.crt +# keyfile: conf/scylla.key +# truststore: +# require_client_auth: False +# priority_string: + +# enable or disable client/server encryption. +# client_encryption_options: +# enabled: false +# certificate: conf/scylla.crt +# keyfile: conf/scylla.key +# truststore: +# require_client_auth: False +# priority_string: + +# internode_compression controls whether traffic between nodes is +# compressed. +# can be: all - all traffic is compressed +# dc - traffic between different datacenters is compressed +# none - nothing is compressed. +# internode_compression: none + +# Enable or disable tcp_nodelay for inter-dc communication. +# Disabling it will result in larger (but fewer) network packets being sent, +# reducing overhead from the TCP protocol itself, at the cost of increasing +# latency if you block for cross-datacenter responses. +# inter_dc_tcp_nodelay: false + +# Relaxation of environment checks. +# +# Scylla places certain requirements on its environment. If these requirements are +# not met, performance and reliability can be degraded. +# +# These requirements include: +# - A filesystem with good support for aysnchronous I/O (AIO). Currently, +# this means XFS. +# +# false: strict environment checks are in place; do not start if they are not met. +# true: relaxed environment checks; performance and reliability may degraade. +# +# developer_mode: false + + +# Idle-time background processing +# +# Scylla can perform certain jobs in the background while the system is otherwise idle, +# freeing processor resources when there is other work to be done. +# +# defragment_memory_on_idle: true +# +# prometheus port +# By default, Scylla opens prometheus API port on port 9180 +# setting the port to 0 will disable the prometheus API. +# prometheus_port: 9180 +# +# prometheus address +# By default, Scylla binds all interfaces to the prometheus API +# It is possible to restrict the listening address to a specific one +# prometheus_address: 0.0.0.0 + +# Distribution of data among cores (shards) within a node +# +# Scylla distributes data within a node among shards, using a round-robin +# strategy: +# [shard0] [shard1] ... [shardN-1] [shard0] [shard1] ... [shardN-1] ... +# +# Scylla versions 1.6 and below used just one repetition of the pattern; +# this intefered with data placement among nodes (vnodes). +# +# Scylla versions 1.7 and above use 4096 repetitions of the pattern; this +# provides for better data distribution. +# +# the value below is log (base 2) of the number of repetitions. +# +# Set to 0 to avoid rewriting all data when upgrading from Scylla 1.6 and +# below. +# +# Keep at 12 for new clusters. +murmur3_partitioner_ignore_msb_bits: 12 diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..0115fe8 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,84 @@ +services: + postgres: + image: postgres:16.3-alpine + container_name: "ohmychat-postgres" + hostname: "ohmychat-postgres" + ports: + - "5432:5432" + volumes: + - dbdata:/var/lib/postgresql/data + env_file: + - ./config/postgres/.env + init: true + + kafka-1: + image: confluentinc/cp-kafka:latest + container_name: kafka-1 + hostname: kafka-1 + ports: + - "9092:9092" + environment: + KAFKA_NODE_ID: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_LISTENERS: PLAINTEXT://kafka-1:29092,PLAINTEXT_HOST://0.0.0.0:9092,CONTROLLER://kafka-1:29093 + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:29093,2@kafka-2:29093 + KAFKA_LOG_DIRS: /var/lib/kafka/data + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + CLUSTER_ID: NeEcW4ddRIWWZOxkgFaC4w + init: true + # https://docs.confluent.io/platform/current/multi-dc-deployments/multi-region.html + # KAFKA_REPLICA_SELECTOR_CLASS: org.apache.kafka.common.replica.RackAwareReplicaSelector + # KAFKA_BROKER_RACK: rack-1 + + kafka-2: + image: confluentinc/cp-kafka:latest + container_name: kafka-2 + hostname: kafka-2 + ports: + - "9093:9092" + environment: + KAFKA_NODE_ID: 2 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:29092,PLAINTEXT_HOST://localhost:9093 + KAFKA_LISTENERS: PLAINTEXT://kafka-2:29092,PLAINTEXT_HOST://0.0.0.0:9092,CONTROLLER://kafka-2:29093 + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:29093,2@kafka-2:29093 + KAFKA_LOG_DIRS: /var/lib/kafka/data + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + CLUSTER_ID: NeEcW4ddRIWWZOxkgFaC4w + init: true + + scylla-1: + image: scylladb/scylla:latest + container_name: scylla-1 + hostname: scylla-1 + volumes: + - "./config/scylla/scylla.yaml:/etc/scylla/scylla.yaml" + - "./config/scylla/cassandra-rackdc.properties.dc1.rack1:/etc/scylla/cassandra-rackdc.properties" + ports: + - "9042:9042" + command: --seeds=scylla-1,scylla-2 --smp 2 + + scylla-2: + image: scylladb/scylla:latest + container_name: scylla-2 + hostname: scylla-2 + volumes: + - "./config/scylla/scylla.yaml:/etc/scylla/scylla.yaml" + - "./config/scylla/cassandra-rackdc.properties.dc1.rack2:/etc/scylla/cassandra-rackdc.properties" + ports: + - "9043:9042" + command: --seeds=scylla-1,scylla-2 --smp 2 + depends_on: + - scylla-1 + +volumes: + dbdata: diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..e45bbbe --- /dev/null +++ b/go.mod @@ -0,0 +1,68 @@ +module github.com/aykhans/oh-my-chat + +go 1.22 + +toolchain go1.22.6 + +replace github.com/gocql/gocql => github.com/scylladb/gocql v1.14.4 + +require ( + github.com/IBM/sarama v1.43.2 + github.com/go-playground/validator/v10 v10.22.0 + github.com/gocql/gocql v1.6.0 + github.com/gofiber/contrib/websocket v1.3.2 + github.com/gofiber/fiber/v2 v2.52.5 + github.com/golang-jwt/jwt/v5 v5.2.1 + github.com/google/uuid v1.6.0 + github.com/segmentio/kafka-go v0.4.47 + golang.org/x/crypto v0.25.0 + gorm.io/driver/postgres v1.5.9 + gorm.io/gorm v1.25.11 +) + +require ( + github.com/andybalholm/brotli v1.1.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/eapache/go-resiliency v1.6.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect + github.com/eapache/queue v1.1.0 // indirect + github.com/fasthttp/websocket v1.5.8 // indirect + github.com/gabriel-vasile/mimetype v1.4.5 // indirect + github.com/go-playground/locales v0.14.1 // indirect + github.com/go-playground/universal-translator v0.18.1 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/google/go-cmp v0.6.0 // indirect + github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect + github.com/hashicorp/errwrap v1.1.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/pgx/v5 v5.6.0 // indirect + github.com/jackc/puddle/v2 v2.2.1 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect + github.com/jinzhu/inflection v1.0.0 // indirect + github.com/jinzhu/now v1.1.5 // indirect + github.com/klauspost/compress v1.17.9 // indirect + github.com/leodido/go-urn v1.4.0 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/mattn/go-runewidth v0.0.16 // indirect + github.com/pierrec/lz4/v4 v4.1.21 // indirect + github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect + github.com/rivo/uniseg v0.4.7 // indirect + github.com/rogpeppe/go-internal v1.13.1 // indirect + github.com/savsgio/gotils v0.0.0-20240303185622-093b76447511 // indirect + github.com/valyala/bytebufferpool v1.0.0 // indirect + github.com/valyala/fasthttp v1.55.0 // indirect + github.com/valyala/tcplisten v1.0.0 // indirect + golang.org/x/net v0.27.0 // indirect + golang.org/x/sync v0.8.0 // indirect + golang.org/x/sys v0.23.0 // indirect + golang.org/x/text v0.16.0 // indirect + gopkg.in/inf.v0 v0.9.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..cb276d9 --- /dev/null +++ b/go.sum @@ -0,0 +1,212 @@ +github.com/IBM/sarama v1.43.2 h1:HABeEqRUh32z8yzY2hGB/j8mHSzC/HA9zlEjqFNCzSw= +github.com/IBM/sarama v1.43.2/go.mod h1:Kyo4WkF24Z+1nz7xeVUFWIuKVV8RS3wM8mkvPKMdXFQ= +github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= +github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= +github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY= +github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eapache/go-resiliency v1.6.0 h1:CqGDTLtpwuWKn6Nj3uNUdflaq+/kIPsg0gfNzHton30= +github.com/eapache/go-resiliency v1.6.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/fasthttp/websocket v1.5.8 h1:k5DpirKkftIF/w1R8ZzjSgARJrs54Je9YJK37DL/Ah8= +github.com/fasthttp/websocket v1.5.8/go.mod h1:d08g8WaT6nnyvg9uMm8K9zMYyDjfKyj3170AtPRuVU0= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/gabriel-vasile/mimetype v1.4.5 h1:J7wGKdGu33ocBOhGy0z653k/lFKLFDPJMG8Gql0kxn4= +github.com/gabriel-vasile/mimetype v1.4.5/go.mod h1:ibHel+/kbxn9x2407k1izTA1S81ku1z/DlgOW2QE0M4= +github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= +github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= +github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= +github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= +github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= +github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= +github.com/go-playground/validator/v10 v10.22.0 h1:k6HsTZ0sTnROkhS//R0O+55JgM8C4Bx7ia+JlgcnOao= +github.com/go-playground/validator/v10 v10.22.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM= +github.com/gofiber/contrib/websocket v1.3.2 h1:AUq5PYeKwK50s0nQrnluuINYeep1c4nRCJ0NWsV3cvg= +github.com/gofiber/contrib/websocket v1.3.2/go.mod h1:07u6QGMsvX+sx7iGNCl5xhzuUVArWwLQ3tBIH24i+S8= +github.com/gofiber/fiber/v2 v2.52.5 h1:tWoP1MJQjGEe4GB5TUGOi7P2E0ZMMRx5ZTG4rT+yGMo= +github.com/gofiber/fiber/v2 v2.52.5/go.mod h1:KEOE+cXMhXG0zHc9d8+E38hoX+ZN7bhOtgeF2oT6jrQ= +github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= +github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= +github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8= +github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= +github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= +github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY= +github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw= +github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= +github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= +github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8= +github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= +github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= +github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= +github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= +github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc= +github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= +github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= +github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/savsgio/gotils v0.0.0-20240303185622-093b76447511 h1:KanIMPX0QdEdB4R3CiimCAbxFrhB3j7h0/OvpYGVQa8= +github.com/savsgio/gotils v0.0.0-20240303185622-093b76447511/go.mod h1:sM7Mt7uEoCeFSCBM+qBrqvEo+/9vdmj19wzp3yzUhmg= +github.com/scylladb/gocql v1.14.4 h1:MhevwCfyAraQ6RvZYFO3pF4Lt0YhvQlfg8Eo2HEqVQA= +github.com/scylladb/gocql v1.14.4/go.mod h1:ZLEJ0EVE5JhmtxIW2stgHq/v1P4fWap0qyyXSKyV8K0= +github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0= +github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/valyala/fasthttp v1.55.0 h1:Zkefzgt6a7+bVKHnu/YaYSOPfNYNisSVBo/unVCf8k8= +github.com/valyala/fasthttp v1.55.0/go.mod h1:NkY9JtkrpPKmgwV3HTaS2HWaJss9RSIsRVfcxxoHiOM= +github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8= +github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= +golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30= +golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220526153639-5463443f8c37/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= +golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM= +golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= +gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gorm.io/driver/postgres v1.5.9 h1:DkegyItji119OlcaLjqN11kHoUgZ/j13E0jkJZgD6A8= +gorm.io/driver/postgres v1.5.9/go.mod h1:DX3GReXH+3FPWGrrgffdvCk3DQ1dwDPdmbenSkweRGI= +gorm.io/gorm v1.25.11 h1:/Wfyg1B/je1hnDx3sMkX+gAlxrlZpn6X0BXRlwXlvHg= +gorm.io/gorm v1.25.11/go.mod h1:xh7N7RHfYlNc5EmcI/El95gXusucDrQnHXe0+CgWcLQ= +sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo= +sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8= diff --git a/internal/adapter/auth/jwt/jwt.go b/internal/adapter/auth/jwt/jwt.go new file mode 100644 index 0000000..e977d0c --- /dev/null +++ b/internal/adapter/auth/jwt/jwt.go @@ -0,0 +1,76 @@ +package jwt + +import ( + "errors" + "time" + + "github.com/aykhans/oh-my-chat/internal/core/domain" + "github.com/golang-jwt/jwt/v5" + "github.com/google/uuid" +) + +type JWTService struct { + key string + duration time.Duration +} + +type tokenPayload struct { + UserID uuid.UUID `json:"user_id,omitempty"` + jwt.RegisteredClaims +} + +func NewJWTService(duration time.Duration, key string) *JWTService { + return &JWTService{ + key: key, + duration: duration, + } +} + +func (jwtService *JWTService) CreateToken( + user *domain.User, +) (string, error) { + token := jwt.NewWithClaims( + jwt.SigningMethodHS256, + &tokenPayload{ + UserID: user.ID, + RegisteredClaims: jwt.RegisteredClaims{ + ExpiresAt: jwt.NewNumericDate( + time.Now().Add(jwtService.duration), + ), + Audience: jwt.ClaimStrings{"user"}, + IssuedAt: jwt.NewNumericDate(time.Now()), + }, + }, + ) + + tokenStr, err := token.SignedString([]byte(jwtService.key)) + if err != nil { + return "", domain.ErrTokenCreation + } + + return tokenStr, nil +} + +func (jwtService *JWTService) VerifyToken( + token string, +) (*domain.AuthPayload, error) { + keyFunc := func(token *jwt.Token) (interface{}, error) { + if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok { + return nil, domain.ErrInvalidToken + } + return []byte(jwtService.key), nil + } + + parsedToken, err := jwt.ParseWithClaims(token, &tokenPayload{}, keyFunc) + if err != nil { + if errors.Is(err, jwt.ErrTokenExpired) { + return nil, domain.ErrExpiredToken + } + if errors.Is(err, jwt.ErrSignatureInvalid) { + return nil, domain.ErrInvalidToken + } + return nil, domain.ErrInternal + } + payload := parsedToken.Claims.(*tokenPayload) + return &domain.AuthPayload{UserID: payload.UserID}, nil +} diff --git a/internal/adapter/config/app.go b/internal/adapter/config/app.go new file mode 100644 index 0000000..a3ffc89 --- /dev/null +++ b/internal/adapter/config/app.go @@ -0,0 +1,25 @@ +package config + +import ( + "time" + + "github.com/aykhans/oh-my-chat/internal/core/utils" +) + +type AppConfig struct { + IsDev bool + CORSAllowedOrigins string + ListenerPort int + SecretKey string + JWTDuration *time.Duration +} + +func NewAppConfig() *AppConfig { + return &AppConfig{ + IsDev: utils.GetEnvOrDefault("APP_IS_PROD", "true") == "true", + CORSAllowedOrigins: utils.GetEnvOrDefault("APP_CORS_ALLOWED_ORIGINS", "*"), + ListenerPort: Str2IntOrDie(GetEnvOrDie("APP_LISTENER_PORT")), + SecretKey: GetEnvOrDie("APP_SECRET_KEY"), + JWTDuration: Str2DurationOrDie(GetEnvOrDie("APP_JWT_DURATION")), + } +} diff --git a/internal/adapter/config/config.go b/internal/adapter/config/config.go new file mode 100644 index 0000000..8ad9681 --- /dev/null +++ b/internal/adapter/config/config.go @@ -0,0 +1,17 @@ +package config + +type ContainerConfig struct { + *AppConfig + *PostgresConfig + *KafkaConfig + *ScyllaConfig +} + +func NewContainerConfig() *ContainerConfig { + return &ContainerConfig{ + NewAppConfig(), + NewPostgresConfig(), + NewKafkaConfig(), + NewScyllaConfig(), + } +} diff --git a/internal/adapter/config/helper.go b/internal/adapter/config/helper.go new file mode 100644 index 0000000..8bf25c4 --- /dev/null +++ b/internal/adapter/config/helper.go @@ -0,0 +1,44 @@ +package config + +import ( + "fmt" + "os" + "strconv" + "time" + + "github.com/aykhans/oh-my-chat/internal/adapter/logger" + "github.com/aykhans/oh-my-chat/internal/core/utils" +) + +var log = logger.NewStdLogger() + +func GetEnvOrDie(key string) string { + value := os.Getenv(key) + if value == "" { + log.Error( + "Error get environment variable", + "error", + fmt.Errorf("Environment variable "+key+" is not set"), + ) + utils.ExitErr() + } + return value +} + +func Str2IntOrDie(value string) int { + intValue, err := strconv.Atoi(value) + if err != nil { + log.Error("Error convert string to int", "error", err) + utils.ExitErr() + } + return intValue +} + +func Str2DurationOrDie(value string) *time.Duration { + duration, err := time.ParseDuration(value) + if err != nil { + log.Error("Error convert string to duration", "error", err) + utils.ExitErr() + } + return &duration +} diff --git a/internal/adapter/config/kafka.go b/internal/adapter/config/kafka.go new file mode 100644 index 0000000..499a83b --- /dev/null +++ b/internal/adapter/config/kafka.go @@ -0,0 +1,45 @@ +package config + +import ( + "strings" + + "github.com/aykhans/oh-my-chat/internal/core/utils" +) + +type BootstrapServers []string + +func (b *BootstrapServers) String() string { + return strings.Join(*b, ",") +} + +type KafkaConfig struct { + *KafkaProducerConfig + *KafkaConsumerConfig +} + +type KafkaProducerConfig struct { + BootstrapServers BootstrapServers +} + +type KafkaConsumerConfig struct { + BootstrapServers BootstrapServers +} + +func NewKafkaConfig() *KafkaConfig { + return &KafkaConfig{ + NewKafkaProducerConfig(), + NewKafkaConsumerConfig(), + } +} + +func NewKafkaProducerConfig() *KafkaProducerConfig { + return &KafkaProducerConfig{ + BootstrapServers: utils.Str2StrSlice(GetEnvOrDie("KAFKA_PRODUCER_BOOTSTRAP_SERVERS")), + } +} + +func NewKafkaConsumerConfig() *KafkaConsumerConfig { + return &KafkaConsumerConfig{ + BootstrapServers: utils.Str2StrSlice(GetEnvOrDie("KAFKA_CONSUMER_BOOTSTRAP_SERVERS")), + } +} diff --git a/internal/adapter/config/postgres.go b/internal/adapter/config/postgres.go new file mode 100644 index 0000000..cff18f2 --- /dev/null +++ b/internal/adapter/config/postgres.go @@ -0,0 +1,19 @@ +package config + +type PostgresConfig struct { + User string + Password string + Host string + Port string + DBName string +} + +func NewPostgresConfig() *PostgresConfig { + return &PostgresConfig{ + User: GetEnvOrDie("POSTGRES_USER"), + Password: GetEnvOrDie("POSTGRES_PASSWORD"), + Host: GetEnvOrDie("POSTGRES_HOST"), + Port: GetEnvOrDie("POSTGRES_PORT"), + DBName: GetEnvOrDie("POSTGRES_DB"), + } +} diff --git a/internal/adapter/config/scylla.go b/internal/adapter/config/scylla.go new file mode 100644 index 0000000..40b2b53 --- /dev/null +++ b/internal/adapter/config/scylla.go @@ -0,0 +1,21 @@ +package config + +import "github.com/aykhans/oh-my-chat/internal/core/utils" + +type ScyllaConfig struct { + Hosts []string + DataCenter string + Keyspace string + User string + Password string +} + +func NewScyllaConfig() *ScyllaConfig { + return &ScyllaConfig{ + Hosts: utils.Str2StrSlice(GetEnvOrDie("SCYLLA_HOSTS")), + DataCenter: GetEnvOrDie("SCYLLA_DATACENTER"), + Keyspace: GetEnvOrDie("SCYLLA_KEYSPACE"), + User: GetEnvOrDie("SCYLLA_USER"), + Password: GetEnvOrDie("SCYLLA_PASSWORD"), + } +} diff --git a/internal/adapter/handlers/http/app.go b/internal/adapter/handlers/http/app.go new file mode 100644 index 0000000..ef4994c --- /dev/null +++ b/internal/adapter/handlers/http/app.go @@ -0,0 +1,76 @@ +package http + +import ( + "github.com/aykhans/oh-my-chat/internal/adapter/handlers/http/middlewares" + "github.com/gofiber/contrib/websocket" + "github.com/gofiber/fiber/v2" + "github.com/gofiber/fiber/v2/middleware/cors" + "github.com/gofiber/fiber/v2/middleware/recover" +) + +type Handlers struct { + userHandler *UserHandler + authHandler *AuthHandler + chatHandler *ChatHandler +} + +type Middlewares struct { + authMiddleware *middlewares.AuthMiddleware + wsMiddleware *middlewares.WSMiddleware +} + +func NewApp( + isDev bool, + corsAllowedOrigins string, + authMiddleware *middlewares.AuthMiddleware, + wsMiddleware *middlewares.WSMiddleware, + userHandler *UserHandler, + authHandler *AuthHandler, + chatHandler *ChatHandler, +) *fiber.App { + handlers := &Handlers{ + userHandler: userHandler, + authHandler: authHandler, + chatHandler: chatHandler, + } + middlewares := &Middlewares{ + authMiddleware: authMiddleware, + wsMiddleware: wsMiddleware, + } + + app := fiber.New() + if !isDev { + app.Use(recover.New()) + app.Use(cors.New(cors.Config{ + AllowOrigins: corsAllowedOrigins, + })) + } + router := app.Group("/api") + setV1Routers(router, handlers, middlewares) + + return app +} + +func setV1Routers( + router fiber.Router, + handlers *Handlers, + middlewares *Middlewares, +) { + router = router.Group("/v1") + + { // User routes + user := router.Group("/user") + user.Post("/register", handlers.userHandler.Register) + } + + { // Auth routes + auth := router.Group("/auth") + auth.Post("/login", handlers.authHandler.Login) + } + + { // Chat routes + chat := router.Group("/chat") + chat.Use("/ws", middlewares.authMiddleware.IsUser, middlewares.wsMiddleware.Upgrade) + chat.Get("/ws", websocket.New(handlers.chatHandler.Connect)) + } +} diff --git a/internal/adapter/handlers/http/auth.go b/internal/adapter/handlers/http/auth.go new file mode 100644 index 0000000..fc05d4e --- /dev/null +++ b/internal/adapter/handlers/http/auth.go @@ -0,0 +1,50 @@ +package http + +import ( + "github.com/aykhans/oh-my-chat/internal/core/port" + "github.com/go-playground/validator/v10" + "github.com/gofiber/fiber/v2" +) + +type AuthHandler struct { + authService port.AuthService + validator *validator.Validate +} + +func NewAuthHandler(authService port.AuthService, validator *validator.Validate) *AuthHandler { + return &AuthHandler{authService, validator} +} + +type loginRequest struct { + Username string `json:"username"` + Email string `json:"email" validate:"email"` + Password string `json:"password" validate:"required"` +} + +func (authHandler *AuthHandler) Login(ctx *fiber.Ctx) error { + loginBody := new(loginRequest) + if err := ctx.BodyParser(loginBody); err != nil { + return invalidRequestBodyResponse(ctx) + } + + if err := authHandler.validator.Struct(loginBody); err != nil { + return validationErrorResponse(ctx, err) + } + + loginField := loginBody.Email + loginFunc := authHandler.authService.LoginByEmail + if loginField == "" { + if loginBody.Username == "" { + return fiber.NewError(fiber.StatusBadRequest, "email or username is required") + } + loginField = loginBody.Username + loginFunc = authHandler.authService.LoginByUsername + } + + serviceCtx := ctx.Context() + token, err := loginFunc(serviceCtx, loginField, loginBody.Password) + if err != nil { + return fiber.NewError(fiber.StatusUnauthorized, err.Error()) + } + return ctx.JSON(fiber.Map{"token": token}) +} diff --git a/internal/adapter/handlers/http/chat.go b/internal/adapter/handlers/http/chat.go new file mode 100644 index 0000000..d77803b --- /dev/null +++ b/internal/adapter/handlers/http/chat.go @@ -0,0 +1,127 @@ +package http + +import ( + "context" + "encoding/json" + "time" + + "github.com/aykhans/oh-my-chat/internal/adapter/logger" + "github.com/aykhans/oh-my-chat/internal/core/domain" + "github.com/aykhans/oh-my-chat/internal/core/port" + "github.com/go-playground/validator/v10" + "github.com/gofiber/contrib/websocket" + "github.com/google/uuid" +) + +var log = logger.NewStdLogger() + +type ChatHandler struct { + messageService port.MessageService + validator *validator.Validate +} + +func NewChatHandler( + messageService port.MessageService, + validator *validator.Validate, +) *ChatHandler { + return &ChatHandler{messageService, validator} +} + +type messageReuqest struct { + Content string `json:"content" validate:"required"` + To string `json:"to" validate:"required"` +} + +func (chatHandler *ChatHandler) Connect(conn *websocket.Conn) { + authPayload := getAuthPayloadInWS(conn) + + streamReceiveMessageCtx, streamReceiveMessageCtxCancel := context.WithCancel(context.Background()) + consumeMessageChan := make(chan *domain.StreamMessage) + wsMessageChan := make(chan *domain.Message) + + go readMessageFromWS( + conn, + chatHandler.validator, + authPayload.UserID, + wsMessageChan, + ) + + go func() { + err := chatHandler.messageService.ReceiveMessage( + streamReceiveMessageCtx, + authPayload.UserID, + consumeMessageChan, + ) + if err != nil { + return + } + }() + + defer func() { + streamReceiveMessageCtxCancel() + }() + + var err error + + for { + select { + case streamReceivedMessage := <-consumeMessageChan: + messageBytes, _ := json.Marshal(streamReceivedMessage.Message) + if err = conn.WriteMessage(websocket.TextMessage, messageBytes); err != nil { + return + } + err := streamReceivedMessage.Commit() + if err != nil { + log.Error("Stream message commit error", "error", err.Error()) + return + } + + case wsMessage := <-wsMessageChan: + if wsMessage == nil { + return + } + streamSendMessageCtx := context.Background() + chatHandler.messageService.SendMessage(streamSendMessageCtx, wsMessage) + } + } +} + +func readMessageFromWS( + conn *websocket.Conn, + validator *validator.Validate, + userID uuid.UUID, + messageChan chan<- *domain.Message, +) { + var ( + wsReceivedMessageType int + wsReceivedMessage []byte + err error + ) + for { + if wsReceivedMessageType, wsReceivedMessage, err = conn.ReadMessage(); err != nil { + messageChan <- nil + break + } + + if wsReceivedMessageType == websocket.TextMessage { + messageBody := new(messageReuqest) + if err := json.Unmarshal(wsReceivedMessage, &messageBody); err != nil { + messageChan <- nil + break + } + + if err := validator.Struct(messageBody); err != nil { + messageChan <- nil + break + } + + timestamp := time.Now() + messageChan <- &domain.Message{ + UserID: userID, + ChatID: messageBody.To, + Content: messageBody.Content, + Timestamp: timestamp, + } + } + } +} diff --git a/internal/adapter/handlers/http/helper.go b/internal/adapter/handlers/http/helper.go new file mode 100644 index 0000000..bc29ea0 --- /dev/null +++ b/internal/adapter/handlers/http/helper.go @@ -0,0 +1,23 @@ +package http + +import ( + "github.com/aykhans/oh-my-chat/internal/core/domain" + "github.com/gofiber/contrib/websocket" + // "github.com/gofiber/fiber/v2" +) + +// func getAuthPayload(ctx *fiber.Ctx) *domain.AuthPayload { +// payload := ctx.Locals("authPayload") +// if payload == nil { +// return nil +// } +// return payload.(*domain.AuthPayload) +// } + +func getAuthPayloadInWS(conn *websocket.Conn) *domain.AuthPayload { + payload := conn.Locals("authPayload") + if payload == nil { + return nil + } + return payload.(*domain.AuthPayload) +} diff --git a/internal/adapter/handlers/http/middlewares/auth.go b/internal/adapter/handlers/http/middlewares/auth.go new file mode 100644 index 0000000..8157223 --- /dev/null +++ b/internal/adapter/handlers/http/middlewares/auth.go @@ -0,0 +1,33 @@ +package middlewares + +import ( + "fmt" + + "github.com/aykhans/oh-my-chat/internal/core/domain" + "github.com/aykhans/oh-my-chat/internal/core/port" + "github.com/gofiber/fiber/v2" +) + +type AuthMiddleware struct { + tokenService port.TokenService +} + +func NewAuthMiddleware(tokenService port.TokenService) *AuthMiddleware { + return &AuthMiddleware{tokenService} +} + +func (authMiddleware *AuthMiddleware) IsUser(ctx *fiber.Ctx) error { + token := ctx.Get("Authorization") + if token == "" { + return fiber.NewError(fiber.StatusBadRequest, "Authorization header is required") + } + payload, err := authMiddleware.tokenService.VerifyToken(token[7:]) + if err != nil { + if err == domain.ErrInternal { + fiber.NewError(fiber.StatusInternalServerError, "Internal Server Error") + } + return fiber.NewError(fiber.StatusUnauthorized, fmt.Sprintf("Unauthorized: %v", err)) + } + ctx.Locals("authPayload", payload) + return ctx.Next() +} diff --git a/internal/adapter/handlers/http/middlewares/ws.go b/internal/adapter/handlers/http/middlewares/ws.go new file mode 100644 index 0000000..7204b64 --- /dev/null +++ b/internal/adapter/handlers/http/middlewares/ws.go @@ -0,0 +1,20 @@ +package middlewares + +import ( + "github.com/gofiber/contrib/websocket" + "github.com/gofiber/fiber/v2" +) + +type WSMiddleware struct{} + +func NewWSMiddleware() *WSMiddleware { + return &WSMiddleware{} +} + +func (wsMiddleware *WSMiddleware) Upgrade(ctx *fiber.Ctx) error { + if websocket.IsWebSocketUpgrade(ctx) { + ctx.Locals("allowed", true) + return ctx.Next() + } + return fiber.ErrUpgradeRequired +} diff --git a/internal/adapter/handlers/http/responses.go b/internal/adapter/handlers/http/responses.go new file mode 100644 index 0000000..fdfe768 --- /dev/null +++ b/internal/adapter/handlers/http/responses.go @@ -0,0 +1,29 @@ +package http + +import ( + "github.com/go-playground/validator/v10" + "github.com/gofiber/fiber/v2" +) + +func notFoundResponse(ctx *fiber.Ctx, err ...error) error { + errMsg := "Not found" + if len(err) > 0 { + errMsg = err[0].Error() + } + return ctx.Status(fiber.StatusNotFound).JSON( + fiber.Map{"error": errMsg}, + ) +} + +func invalidRequestBodyResponse(ctx *fiber.Ctx) error { + return ctx.Status(fiber.StatusBadRequest).JSON( + fiber.Map{"error": "Invalid request body"}, + ) +} + +func validationErrorResponse(ctx *fiber.Ctx, err error) error { + errs := err.(validator.ValidationErrors) + return ctx.Status(fiber.StatusBadRequest).JSON( + fiber.Map{"errors": validationErrorFormater(errs)}, + ) +} diff --git a/internal/adapter/handlers/http/user.go b/internal/adapter/handlers/http/user.go new file mode 100644 index 0000000..07a077e --- /dev/null +++ b/internal/adapter/handlers/http/user.go @@ -0,0 +1,57 @@ +package http + +import ( + "github.com/aykhans/oh-my-chat/internal/core/domain" + "github.com/aykhans/oh-my-chat/internal/core/port" + "github.com/go-playground/validator/v10" + "github.com/gofiber/fiber/v2" +) + +type UserHandler struct { + userService port.UserService + validator *validator.Validate +} + +func NewUserHandler(svc port.UserService, validator *validator.Validate) *UserHandler { + return &UserHandler{svc, validator} +} + +type registerRequest struct { + Username string `json:"username" validate:"required,max=50"` + Email string `json:"email" validate:"email"` + Password string `json:"password" validate:"min=8,max=72"` +} + +func (userHandler *UserHandler) Register(ctx *fiber.Ctx) error { + registerBody := new(registerRequest) + if err := ctx.BodyParser(registerBody); err != nil { + return invalidRequestBodyResponse(ctx) + } + + if err := userHandler.validator.Struct(registerBody); err != nil { + return validationErrorResponse(ctx, err) + } + + user := domain.User{ + Username: registerBody.Username, + Email: registerBody.Email, + Password: registerBody.Password, + } + + serviceCtx := ctx.Context() + _, err := userHandler.userService.Register(serviceCtx, &user) + if err != nil { + if err == domain.ErrUsernameExists || err == domain.ErrEmailExists { + return notFoundResponse(ctx, err) + } + return fiber.ErrInternalServerError + } + + return ctx.Status(fiber.StatusCreated).JSON( + fiber.Map{ + "ID": user.ID, + "Username": user.Username, + "Email": user.Email, + }, + ) +} diff --git a/internal/adapter/handlers/http/validator.go b/internal/adapter/handlers/http/validator.go new file mode 100644 index 0000000..d0c3f31 --- /dev/null +++ b/internal/adapter/handlers/http/validator.go @@ -0,0 +1,60 @@ +package http + +import ( + "fmt" + "reflect" + + "github.com/go-playground/validator/v10" +) + +type fieldError struct { + Field string + Message string +} + +func NewValidator() *validator.Validate { + validate := validator.New() + validate.RegisterTagNameFunc(func(fld reflect.StructField) string { + if fld.Tag.Get("validation_name") != "" { + return fld.Tag.Get("validation_name") + } else { + return fld.Tag.Get("json") + } + }) + + return validate +} + +func validationErrorFormater(errs validator.ValidationErrors) []fieldError { + fieldErrs := make([]fieldError, 0) + + if errs != nil { + for _, err := range errs { + fieldErrs = append( + fieldErrs, + fieldError{ + Field: err.Field(), + Message: msgForTag(err.Tag(), err.Field(), err.Param()), + }, + ) + } + return fieldErrs + } + + return nil +} + +func msgForTag(tag, field, param string) string { + switch tag { + case "required": + return fmt.Sprintf("%s is required", field) + case "email": + return "email is invalid" + case "min": + return fmt.Sprintf("The length of %s must be at least %s", field, param) + case "max": + return fmt.Sprintf("The length of %s must be at most %s", field, param) + default: + return fmt.Sprintf("%s is invalid", field) + } +} diff --git a/internal/adapter/logger/slog.go b/internal/adapter/logger/slog.go new file mode 100644 index 0000000..2c2b36a --- /dev/null +++ b/internal/adapter/logger/slog.go @@ -0,0 +1,15 @@ +package logger + +import ( + "log/slog" + "os" +) + +func NewStdLogger() *slog.Logger { + return slog.New( + slog.NewJSONHandler( + os.Stdout, + &slog.HandlerOptions{AddSource: true}, + ), + ) +} diff --git a/internal/adapter/storages/postgres/db.go b/internal/adapter/storages/postgres/db.go new file mode 100644 index 0000000..1c93c1e --- /dev/null +++ b/internal/adapter/storages/postgres/db.go @@ -0,0 +1,38 @@ +package postgres + +import ( + "fmt" + "time" + + "github.com/aykhans/oh-my-chat/internal/adapter/config" + postgresDriver "gorm.io/driver/postgres" + "gorm.io/gorm" +) + +func NewDB(config *config.PostgresConfig) (*gorm.DB, error) { + dsn := fmt.Sprintf( + "host=%s user=%s password=%s dbname=%s port=%s sslmode=%s TimeZone=%s", + config.Host, + config.User, + config.Password, + config.DBName, + config.Port, + "disable", + "UTC", + ) + + var db *gorm.DB + var err error + for range 3 { + db, err = gorm.Open(postgresDriver.Open(dsn), &gorm.Config{}) + if err == nil { + break + } + time.Sleep(3 * time.Second) + } + if err != nil { + return nil, err + } + + return db, nil +} diff --git a/internal/adapter/storages/postgres/migrations/000001_create_users_table.down.sql b/internal/adapter/storages/postgres/migrations/000001_create_users_table.down.sql new file mode 100644 index 0000000..737bd29 --- /dev/null +++ b/internal/adapter/storages/postgres/migrations/000001_create_users_table.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS "public"."users"; \ No newline at end of file diff --git a/internal/adapter/storages/postgres/migrations/000001_create_users_table.up.sql b/internal/adapter/storages/postgres/migrations/000001_create_users_table.up.sql new file mode 100644 index 0000000..eebaa3f --- /dev/null +++ b/internal/adapter/storages/postgres/migrations/000001_create_users_table.up.sql @@ -0,0 +1,11 @@ +CREATE TABLE "public"."users" ( + "id" uuid NOT NULL DEFAULT gen_random_uuid(), + "username" character varying(50) NOT NULL, + "email" text NOT NULL, + "password" character varying(72) NOT NULL, + "created_at" timestamptz NOT NULL DEFAULT now(), + "updated_at" timestamptz NOT NULL DEFAULT now(), + PRIMARY KEY ("id"), + CONSTRAINT "uni_users_email" UNIQUE ("email"), + CONSTRAINT "uni_users_username" UNIQUE ("username") +); \ No newline at end of file diff --git a/internal/adapter/storages/postgres/models/user.go b/internal/adapter/storages/postgres/models/user.go new file mode 100644 index 0000000..2271b9f --- /dev/null +++ b/internal/adapter/storages/postgres/models/user.go @@ -0,0 +1,24 @@ +package models + +import ( + "time" + + "github.com/google/uuid" +) + +const ( + UserTableName = "users" +) + +type User struct { + ID uuid.UUID `gorm:"primarykey;unique;type:uuid;default:gen_random_uuid()"` + Username string `gorm:"unique;not null;size:50"` + Email string `gorm:"unique;not null"` + Password string `gorm:"not null;size:72"` + CreatedAt time.Time + UpdatedAt time.Time +} + +func (u User) TableName() string { + return UserTableName +} diff --git a/internal/adapter/storages/postgres/repository/user.go b/internal/adapter/storages/postgres/repository/user.go new file mode 100644 index 0000000..6eb60fe --- /dev/null +++ b/internal/adapter/storages/postgres/repository/user.go @@ -0,0 +1,109 @@ +package repository + +import ( + "context" + "errors" + + "github.com/aykhans/oh-my-chat/internal/adapter/storages/postgres/models" + "github.com/aykhans/oh-my-chat/internal/core/domain" + "gorm.io/gorm" +) + +type UserRepository struct { + db *gorm.DB +} + +func NewUserRepository(db *gorm.DB) *UserRepository { + return &UserRepository{db} +} + +func (userRepository *UserRepository) CreateUser( + ctx context.Context, + user *domain.User, +) (*domain.User, error) { + userModel := &models.User{ + Username: user.Username, + Email: user.Email, + Password: user.Password, + } + tx := userRepository.db.Create(userModel) + if tx.Error != nil { + return nil, tx.Error + } + + user.ID = userModel.ID + user.Username = userModel.Username + user.Email = userModel.Email + user.Password = userModel.Password + return user, nil +} + +func (userRepository *UserRepository) IsUsernameExists( + ctx context.Context, + username string, +) (bool, error) { + var count int64 + tx := userRepository.db. + Table(models.UserTableName). + Where("username = ?", username). + Count(&count) + if tx.Error != nil { + return false, tx.Error + } + + return count > 0, nil +} + +func (userRepository *UserRepository) IsEmailExists( + ctx context.Context, + email string, +) (bool, error) { + var count int64 + tx := userRepository.db. + Table(models.UserTableName). + Where("email = ?", email). + Count(&count) + if tx.Error != nil { + return false, tx.Error + } + + return count > 0, nil +} + +func (userRepository *UserRepository) GetUserByEmail( + ctx context.Context, + email string, +) (*domain.User, error) { + user := &domain.User{} + tx := userRepository.db. + Table(models.UserTableName). + Where("email = ?", email). + First(user) + if tx.Error != nil { + if errors.Is(tx.Error, gorm.ErrRecordNotFound) { + return nil, domain.ErrDataNotFound + } + return nil, tx.Error + } + + return user, nil +} + +func (userRepository *UserRepository) GetUserByUsername( + ctx context.Context, + username string, +) (*domain.User, error) { + user := &domain.User{} + tx := userRepository.db. + Table(models.UserTableName). + Where("username = ?", username). + First(user) + if tx.Error != nil { + if errors.Is(tx.Error, gorm.ErrRecordNotFound) { + return nil, domain.ErrDataNotFound + } + return nil, tx.Error + } + + return user, nil +} diff --git a/internal/adapter/storages/scylla/db.go b/internal/adapter/storages/scylla/db.go new file mode 100644 index 0000000..ea2c901 --- /dev/null +++ b/internal/adapter/storages/scylla/db.go @@ -0,0 +1,33 @@ +package scylla + +import ( + "time" + + "github.com/aykhans/oh-my-chat/internal/adapter/config" + "github.com/gocql/gocql" +) + +func NewDB(config *config.ScyllaConfig) (*gocql.Session, error) { + cluster := gocql.NewCluster(config.Hosts...) + cluster.Keyspace = config.Keyspace + cluster.Consistency = gocql.LocalQuorum + cluster.Authenticator = gocql.PasswordAuthenticator{ + Username: config.User, + Password: config.Password, + } + cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy( + gocql.DCAwareRoundRobinPolicy(config.DataCenter), + ) + + var session *gocql.Session + var err error + for range 20 { + session, err = cluster.CreateSession() + if err == nil { + return session, nil + } + time.Sleep(3 * time.Second) + } + + return nil, err +} diff --git a/internal/adapter/storages/scylla/migrations/000001_create_messages_table.down.cql b/internal/adapter/storages/scylla/migrations/000001_create_messages_table.down.cql new file mode 100644 index 0000000..36f514b --- /dev/null +++ b/internal/adapter/storages/scylla/migrations/000001_create_messages_table.down.cql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS messages; \ No newline at end of file diff --git a/internal/adapter/storages/scylla/migrations/000001_create_messages_table.up.cql b/internal/adapter/storages/scylla/migrations/000001_create_messages_table.up.cql new file mode 100644 index 0000000..f7ab668 --- /dev/null +++ b/internal/adapter/storages/scylla/migrations/000001_create_messages_table.up.cql @@ -0,0 +1,8 @@ +CREATE TABLE IF NOT EXISTS messages ( + chat_id UUID, -- Partition key + user_id UUID, + content text, -- Clustering column + type text, -- Clustering column + created_at timestamp, -- Clustering column + PRIMARY KEY (chat_id, created_at, content, type) +) WITH CLUSTERING ORDER BY (created_at DESC); \ No newline at end of file diff --git a/internal/adapter/storages/scylla/migrations/000002_create_user_chats_table.down.cql b/internal/adapter/storages/scylla/migrations/000002_create_user_chats_table.down.cql new file mode 100644 index 0000000..d20575f --- /dev/null +++ b/internal/adapter/storages/scylla/migrations/000002_create_user_chats_table.down.cql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS user_chats; \ No newline at end of file diff --git a/internal/adapter/storages/scylla/migrations/000002_create_user_chats_table.up.cql b/internal/adapter/storages/scylla/migrations/000002_create_user_chats_table.up.cql new file mode 100644 index 0000000..d2fe3d8 --- /dev/null +++ b/internal/adapter/storages/scylla/migrations/000002_create_user_chats_table.up.cql @@ -0,0 +1,7 @@ +CREATE TABLE IF NOT EXISTS user_chats ( + user_id UUID, -- Partition key + chat_id UUID, -- Clustering column + blocked boolean, + created_at timestamp, + PRIMARY KEY (user_id, created_at, chat_id, blocked) +); \ No newline at end of file diff --git a/internal/adapter/storages/scylla/repository/message.go b/internal/adapter/storages/scylla/repository/message.go new file mode 100644 index 0000000..56b35f3 --- /dev/null +++ b/internal/adapter/storages/scylla/repository/message.go @@ -0,0 +1,23 @@ +package repository + +import ( + "context" + + "github.com/aykhans/oh-my-chat/internal/core/domain" + "github.com/gocql/gocql" +) + +type MessageRepository struct { + db *gocql.Session +} + +func NewMessageRepository(db *gocql.Session) *MessageRepository { + return &MessageRepository{db} +} + +func (messageRepository *MessageRepository) CreateMessage( + ctx context.Context, + message *domain.Message, +) (*domain.Message, error) { + return nil, nil +} diff --git a/internal/adapter/streamers/kafka/consumer/message.go b/internal/adapter/streamers/kafka/consumer/message.go new file mode 100644 index 0000000..9b8230e --- /dev/null +++ b/internal/adapter/streamers/kafka/consumer/message.go @@ -0,0 +1,94 @@ +package consumer + +import ( + "context" + "encoding/json" + "io" + "time" + + "github.com/aykhans/oh-my-chat/internal/adapter/config" + "github.com/aykhans/oh-my-chat/internal/adapter/logger" + "github.com/aykhans/oh-my-chat/internal/core/domain" + "github.com/google/uuid" + + "github.com/segmentio/kafka-go" +) + +var log = logger.NewStdLogger() + +type MessageConsumer struct { + kafkaConsumerConfig *config.KafkaConsumerConfig +} + +type ConsumerMessage struct { + UserID uuid.UUID `json:"user_id"` + ChatID string `json:"chat_id"` + Content string `json:"content"` + Timestamp time.Time `json:"timestamp"` +} + +func NewMessageConsumer(consumerConfig *config.KafkaConsumerConfig) *MessageConsumer { + return &MessageConsumer{consumerConfig} +} + +func (messageConsumer *MessageConsumer) ConsumeMessage( + ctx context.Context, + uid string, + getChats func() []string, + message chan<- *domain.StreamMessage, +) error { + consumer := kafka.NewReader(kafka.ReaderConfig{ + Brokers: messageConsumer.kafkaConsumerConfig.BootstrapServers, + GroupID: uid, + GroupTopics: getChats(), + MaxBytes: 10e6, // 10MB + ReadLagInterval: -1, + MaxWait: 300 * time.Millisecond, + GroupBalancers: []kafka.GroupBalancer{kafka.RoundRobinGroupBalancer{}}, + StartOffset: kafka.FirstOffset, + }) + + defer func() { + if err := consumer.Close(); err != nil { + log.Error("Error closing consumer", "error", err.Error()) + } + }() + + for { + msg, err := consumer.FetchMessage(ctx) + if err != nil { + switch err { + case io.EOF: + return nil + case context.Canceled: + return nil + } + log.Error("Error fetching message from kafka", "error", err.Error()) + continue + } + + consumerMeesage := &ConsumerMessage{} + err = json.Unmarshal(msg.Value, consumerMeesage) + if err != nil { + log.Error("Error unmarshalling message", "error", err.Error()) + return domain.ErrInternal + } + + message <- &domain.StreamMessage{ + Message: &domain.Message{ + UserID: consumerMeesage.UserID, + ChatID: consumerMeesage.ChatID, + Content: consumerMeesage.Content, + Timestamp: consumerMeesage.Timestamp, + }, + Commit: func() error { + err := consumer.CommitMessages(ctx, msg) + if err != nil { + log.Error("Error committing kafka message", "error", err.Error()) + return domain.ErrInternal + } + return nil + }, + } + } +} diff --git a/internal/adapter/streamers/kafka/producer/message.go b/internal/adapter/streamers/kafka/producer/message.go new file mode 100644 index 0000000..758f744 --- /dev/null +++ b/internal/adapter/streamers/kafka/producer/message.go @@ -0,0 +1,73 @@ +package kafka + +import ( + "context" + "encoding/json" + "time" + + "github.com/IBM/sarama" + "github.com/aykhans/oh-my-chat/internal/adapter/config" + "github.com/aykhans/oh-my-chat/internal/core/domain" + "github.com/google/uuid" +) + +type MessageProducer struct { + saramaProducer *sarama.SyncProducer +} + +func (producer *MessageProducer) ProduceMessage( + ctx context.Context, + message *domain.Message, +) error { + messageJSON, err := (&ProducerMessage{ + UserID: message.UserID, + ChatID: message.ChatID, + Content: message.Content, + Timestamp: message.Timestamp, + }).JSON() + if err != nil { + return err + } + producerMessage := &sarama.ProducerMessage{ + Topic: message.ChatID, + Value: sarama.StringEncoder(messageJSON), + Timestamp: message.Timestamp, + } + _, _, err = (*producer.saramaProducer).SendMessage(producerMessage) + if err != nil { + return err + } + return nil +} + +type ProducerMessage struct { + UserID uuid.UUID `json:"user_id"` + ChatID string `json:"chat_id"` + Content string `json:"content"` + Timestamp time.Time `json:"timestamp"` +} + +func (message *ProducerMessage) JSON() ([]byte, error) { + return json.Marshal(message) +} + +func NewMessageProducer(producerConfig *config.KafkaProducerConfig) (*MessageProducer, error) { + config := sarama.NewConfig() + config.Producer.Return.Successes = true // enable message delivery reports + config.Producer.RequiredAcks = sarama.WaitForAll // require all in-sync replicas to acknowledge the message + config.Producer.Retry.Max = 5 // number of retries before giving up on sending a message to a partition + config.Producer.Retry.Backoff = time.Second * 60 // time to wait between retries + config.Producer.Partitioner = sarama.NewRoundRobinPartitioner // walks through the available partitions one at a time + config.Producer.Compression = sarama.CompressionSnappy // compress messages using Snappy + config.Producer.Idempotent = true // producer will ensure that messages are successfully sent and acknowledged + config.Producer.Flush.Frequency = time.Millisecond * 20 // time to wait before sending a batch of messages + config.Producer.Flush.Bytes = 32 * 1024 // number of bytes to trigger a batch of messages + config.Net.MaxOpenRequests = 1 + config.Metadata.AllowAutoTopicCreation = true + + producer, err := sarama.NewSyncProducer(producerConfig.BootstrapServers, config) + if err != nil { + return nil, err + } + return &MessageProducer{&producer}, err +} diff --git a/internal/core/domain/errors.go b/internal/core/domain/errors.go new file mode 100644 index 0000000..ed06b96 --- /dev/null +++ b/internal/core/domain/errors.go @@ -0,0 +1,16 @@ +package domain + +import "errors" + +var ( + ErrInternal = errors.New("internal error") + ErrConflictingData = errors.New("data conflicts with existing data in unique column") + ErrDataNotFound = errors.New("data not found") + ErrInvalidEmailCredentials = errors.New("invalid email or password") + ErrInvalidUsernameCredentials = errors.New("invalid username or password") + ErrTokenCreation = errors.New("error creating token") + ErrExpiredToken = errors.New("access token has expired") + ErrUsernameExists = errors.New("username already exists") + ErrEmailExists = errors.New("email already exists") + ErrInvalidToken = errors.New("invalid token") +) diff --git a/internal/core/domain/message.go b/internal/core/domain/message.go new file mode 100644 index 0000000..8403c3f --- /dev/null +++ b/internal/core/domain/message.go @@ -0,0 +1,20 @@ +package domain + +import ( + "time" + + "github.com/google/uuid" +) + +type Message struct { + UserID uuid.UUID + ChatID string + Content string + Type string + Timestamp time.Time +} + +type StreamMessage struct { + *Message + Commit func() error +} diff --git a/internal/core/domain/token.go b/internal/core/domain/token.go new file mode 100644 index 0000000..6b6be19 --- /dev/null +++ b/internal/core/domain/token.go @@ -0,0 +1,7 @@ +package domain + +import "github.com/google/uuid" + +type AuthPayload struct { + UserID uuid.UUID +} diff --git a/internal/core/domain/user.go b/internal/core/domain/user.go new file mode 100644 index 0000000..b18b7ea --- /dev/null +++ b/internal/core/domain/user.go @@ -0,0 +1,14 @@ +package domain + +import ( + "github.com/google/uuid" +) + +type User struct { + ID uuid.UUID + Username string + Email string + Password string + // CreatedAt time.Time + // UpdatedAt time.Time +} diff --git a/internal/core/port/auth.go b/internal/core/port/auth.go new file mode 100644 index 0000000..9565d6d --- /dev/null +++ b/internal/core/port/auth.go @@ -0,0 +1,17 @@ +package port + +import ( + "context" + + "github.com/aykhans/oh-my-chat/internal/core/domain" +) + +type TokenService interface { + CreateToken(user *domain.User) (string, error) + VerifyToken(token string) (*domain.AuthPayload, error) +} + +type AuthService interface { + LoginByEmail(ctx context.Context, email, password string) (string, error) + LoginByUsername(ctx context.Context, username, password string) (string, error) +} diff --git a/internal/core/port/message.go b/internal/core/port/message.go new file mode 100644 index 0000000..ddcde2e --- /dev/null +++ b/internal/core/port/message.go @@ -0,0 +1,26 @@ +package port + +import ( + "context" + + "github.com/aykhans/oh-my-chat/internal/core/domain" + "github.com/google/uuid" +) + +type MessageProducer interface { + ProduceMessage(ctx context.Context, message *domain.Message) error +} + +type MessageConsumer interface { + ConsumeMessage(ctx context.Context, uid string, getChats func() []string, message chan<- *domain.StreamMessage) error +} + +type MessageRepository interface { + CreateMessage(ctx context.Context, message *domain.Message) (*domain.Message, error) +} + +type MessageService interface { + SendMessage(ctx context.Context, message *domain.Message) error + ReceiveMessage(ctx context.Context, userID uuid.UUID, message chan<- *domain.StreamMessage) error + CreateMessage(ctx context.Context, message *domain.Message) (*domain.Message, error) +} diff --git a/internal/core/port/user.go b/internal/core/port/user.go new file mode 100644 index 0000000..57962f8 --- /dev/null +++ b/internal/core/port/user.go @@ -0,0 +1,22 @@ +package port + +import ( + "context" + "github.com/aykhans/oh-my-chat/internal/core/domain" +) + +type UserRepository interface { + CreateUser(ctx context.Context, user *domain.User) (*domain.User, error) + GetUserByEmail(ctx context.Context, email string) (*domain.User, error) + GetUserByUsername(ctx context.Context, username string) (*domain.User, error) + IsUsernameExists(ctx context.Context, username string) (bool, error) + IsEmailExists(ctx context.Context, email string) (bool, error) + // GetUserByID(ctx context.Context, id uint64) (*domain.User, error) + // DeleteUser(ctx context.Context, id uint64) error +} + +type UserService interface { + Register(ctx context.Context, user *domain.User) (*domain.User, error) + // GetUser(ctx context.Context, id uint64) (*domain.User, error) + // DeleteUser(ctx context.Context, id uint64) error +} diff --git a/internal/core/service/auth.go b/internal/core/service/auth.go new file mode 100644 index 0000000..f2ca473 --- /dev/null +++ b/internal/core/service/auth.go @@ -0,0 +1,72 @@ +package service + +import ( + "context" + + "github.com/aykhans/oh-my-chat/internal/core/domain" + "github.com/aykhans/oh-my-chat/internal/core/port" + "github.com/aykhans/oh-my-chat/internal/core/utils" +) + +type AuthService struct { + userRepository port.UserRepository + tokenService port.TokenService +} + +// NewAuthService creates a new auth service instance +func NewAuthService(userRepository port.UserRepository, tokenService port.TokenService) *AuthService { + return &AuthService{ + userRepository, + tokenService, + } +} + +func (authService *AuthService) LoginByEmail( + ctx context.Context, + email, password string, +) (string, error) { + user, err := authService.userRepository.GetUserByEmail(ctx, email) + if err != nil { + if err == domain.ErrDataNotFound { + return "", domain.ErrInvalidEmailCredentials + } + return "", domain.ErrInternal + } + + err = utils.ComparePassword(password, user.Password) + if err != nil { + return "", domain.ErrInvalidEmailCredentials + } + + accessToken, err := authService.tokenService.CreateToken(user) + if err != nil { + return "", domain.ErrTokenCreation + } + + return accessToken, nil +} + +func (authService *AuthService) LoginByUsername( + ctx context.Context, + username, password string, +) (string, error) { + user, err := authService.userRepository.GetUserByEmail(ctx, username) + if err != nil { + if err == domain.ErrDataNotFound { + return "", domain.ErrInvalidUsernameCredentials + } + return "", domain.ErrInternal + } + + err = utils.ComparePassword(password, user.Password) + if err != nil { + return "", domain.ErrInvalidUsernameCredentials + } + + accessToken, err := authService.tokenService.CreateToken(user) + if err != nil { + return "", domain.ErrTokenCreation + } + + return accessToken, nil +} diff --git a/internal/core/service/message.go b/internal/core/service/message.go new file mode 100644 index 0000000..74206db --- /dev/null +++ b/internal/core/service/message.go @@ -0,0 +1,55 @@ +package service + +import ( + "context" + + "github.com/aykhans/oh-my-chat/internal/core/domain" + "github.com/aykhans/oh-my-chat/internal/core/port" + "github.com/google/uuid" +) + +type MessageService struct { + producer port.MessageProducer + consumer port.MessageConsumer + repo port.MessageRepository +} + +func NewMessageService( + producerService port.MessageProducer, + consumerService port.MessageConsumer, + repo port.MessageRepository, +) *MessageService { + return &MessageService{ + producerService, + consumerService, + repo, + } +} + +func (chatServie *MessageService) SendMessage( + ctx context.Context, + message *domain.Message, +) error { + message.ChatID = "chat_" + message.ChatID + return chatServie.producer.ProduceMessage(ctx, message) +} + +func (chatServie *MessageService) ReceiveMessage( + ctx context.Context, + userID uuid.UUID, + message chan<- *domain.StreamMessage, +) error { + return chatServie.consumer.ConsumeMessage( + ctx, + userID.String(), + func() []string { return []string{"chat_1", "chat_5", "chat_9"} }, + message, + ) +} + +func (chatServie *MessageService) CreateMessage( + ctx context.Context, + message *domain.Message, +) (*domain.Message, error) { + return chatServie.repo.CreateMessage(ctx, message) +} diff --git a/internal/core/service/user.go b/internal/core/service/user.go new file mode 100644 index 0000000..a0c042f --- /dev/null +++ b/internal/core/service/user.go @@ -0,0 +1,49 @@ +package service + +import ( + "context" + + "github.com/aykhans/oh-my-chat/internal/core/domain" + "github.com/aykhans/oh-my-chat/internal/core/port" + "github.com/aykhans/oh-my-chat/internal/core/utils" +) + +type UserService struct { + repo port.UserRepository +} + +func NewUserService(repo port.UserRepository) *UserService { + return &UserService{repo: repo} +} + +func (userService *UserService) Register( + ctx context.Context, + user *domain.User, +) (*domain.User, error) { + if exists, err := userService.repo.IsUsernameExists(ctx, user.Username); err != nil { + return nil, domain.ErrInternal + } else if exists { + return nil, domain.ErrUsernameExists + } + if exists, err := userService.repo.IsEmailExists(ctx, user.Email); err != nil { + return nil, domain.ErrInternal + } else if exists { + return nil, domain.ErrEmailExists + } + + hashedPassword, err := utils.HashPassword(user.Password) + if err != nil { + return nil, domain.ErrInternal + } + user.Password = hashedPassword + + user, err = userService.repo.CreateUser(ctx, user) + if err != nil { + if err == domain.ErrConflictingData { + return nil, err + } + return nil, domain.ErrInternal + } + + return user, nil +} diff --git a/internal/core/utils/convert.go b/internal/core/utils/convert.go new file mode 100644 index 0000000..a5bb816 --- /dev/null +++ b/internal/core/utils/convert.go @@ -0,0 +1,9 @@ +package utils + +import ( + "strings" +) + +func Str2StrSlice(value string) []string { + return strings.Split(strings.ReplaceAll(value, " ", ""), ",") +} diff --git a/internal/core/utils/os.go b/internal/core/utils/os.go new file mode 100644 index 0000000..adc61db --- /dev/null +++ b/internal/core/utils/os.go @@ -0,0 +1,15 @@ +package utils + +import "os" + +func ExitErr() { + os.Exit(1) +} + +func GetEnvOrDefault(key, defaultValue string) string { + value := os.Getenv(key) + if value == "" { + return defaultValue + } + return value +} diff --git a/internal/core/utils/password.go b/internal/core/utils/password.go new file mode 100644 index 0000000..8da15fd --- /dev/null +++ b/internal/core/utils/password.go @@ -0,0 +1,26 @@ +package utils + +import ( + "golang.org/x/crypto/bcrypt" +) + +// HashPassword hashes password and returns hashed password or error +func HashPassword(password string) (string, error) { + hashedPassword, err := bcrypt.GenerateFromPassword( + []byte(password), + bcrypt.DefaultCost, + ) + if err != nil { + return "", err + } + + return string(hashedPassword), nil +} + +// ComparePassword compares password with hashed password and returns error if they don't match or nil if they do +func ComparePassword(password, hashedPassword string) error { + return bcrypt.CompareHashAndPassword( + []byte(hashedPassword), + []byte(password), + ) +} diff --git a/internal/core/utils/time.go b/internal/core/utils/time.go new file mode 100644 index 0000000..6ad322b --- /dev/null +++ b/internal/core/utils/time.go @@ -0,0 +1,7 @@ +package utils + +import "time" + +func GetNow() time.Time { + return time.Now() +}