Compare commits

...

7 Commits

6 changed files with 124 additions and 31 deletions

View File

@ -10,7 +10,7 @@ import (
"syscall"
"time"
"github.com/aykhans/bsky-feedgen/pkg/generator"
feedgenAz "github.com/aykhans/bsky-feedgen/pkg/generator/az"
"github.com/aykhans/bsky-feedgen/pkg/types"
"github.com/aykhans/bsky-feedgen/pkg/config"
@ -87,7 +87,7 @@ Flags:
os.Exit(1)
}
feedGeneratorAz := generator.NewFeedGeneratorAz(postCollection, feedAzCollection)
feedGeneratorAz := feedgenAz.NewGenerator(postCollection, feedAzCollection)
startCrons(ctx, feedGenAzConfig, feedGeneratorAz, feedAzCollection, cursorOption)
logger.Log.Info("Cron jobs started")
@ -98,7 +98,7 @@ Flags:
func startCrons(
ctx context.Context,
feedGenAzConfig *config.FeedGenAzConfig,
feedGeneratorAz *generator.FeedGeneratorAz,
feedGeneratorAz *feedgenAz.Generator,
feedAzCollection *collections.FeedAzCollection,
cursorOption types.GeneratorCursor,
) {

74
compose.prod.yml Normal file
View File

@ -0,0 +1,74 @@
x-common-mongodb-environment: &common-mongodb-environment
MONGODB_HOST: mongodb
MONGODB_PORT: 27017
MONGODB_USERNAME: aykhan
MONGODB_PASSWORD: ch7278832gf99010hgbddewd2y28982v3dvbv28vv2dv2d2gbvby2
services:
mongodb:
image: mongo
restart: unless-stopped
ports:
- 27017:27017
volumes:
- mongodb_data:/data/db
environment:
MONGO_INITDB_ROOT_USERNAME: aykhan
MONGO_INITDB_ROOT_PASSWORD: ch7278832gf99010hgbddewd2y28982v3dvbv28vv2dv2d2gbvby2
healthcheck:
test: echo 'db.runCommand("ping").ok' | mongosh --quiet
interval: 10s
timeout: 5s
retries: 5
start_period: 20s
consumer:
image: git.aykhans.me/bsky/feedgen-consumer:latest
restart: unless-stopped
environment:
<<: *common-mongodb-environment
POST_MAX_DATE: 720h # Save only posts created in the last month
POST_COLLECTION_CUTOFF_CRON_DELAY: 10m # 10 minutes
POST_COLLECTION_CUTOFF_CRON_MAX_DOCUMENT: 2900000 # Delete post documents after 4 million
depends_on:
mongodb:
condition: service_healthy
feedgen_az:
image: git.aykhans.me/bsky/feedgen-generator-az:latest
restart: unless-stopped
environment:
<<: *common-mongodb-environment
FEED_AZ_GENERATER_CRON_DELAY: 1m # 1 minute
FEED_AZ_COLLECTION_CUTOFF_CRON_DELAY: 30m # 30 minutes
FEED_AZ_COLLECTION_CUTOFF_CRON_MAX_DOCUMENT: 10000
depends_on:
mongodb:
condition: service_healthy
api:
image: git.aykhans.me/bsky/feedgen-api:latest
restart: unless-stopped
ports:
- 8421:8421
environment:
<<: *common-mongodb-environment
FEEDGEN_HOSTNAME: https://feeds.bsky.aykhans.me
FEEDGEN_PUBLISHER_DID: did:plc:cs2cbzojm6hmx5lfxiuft3mq
API_PORT: 8421
depends_on:
mongodb:
condition: service_healthy
caddy:
image: caddy:2.10.0-alpine
restart: unless-stopped
ports:
- 80:80
- 443:443
- 443:443/udp
volumes:
- ./Caddyfile:/etc/caddy/Caddyfile
volumes:
mongodb_data:

View File

@ -264,7 +264,7 @@ func ConsumeAndSaveToMongoDB(
case <-ticker.C:
if len(postBatch) > 0 {
consumerLastFlushingTime = time.Now()
logger.Log.Info("flushing post batch", "count", len(postBatch))
// logger.Log.Info("flushing post batch", "count", len(postBatch))
err := postCollection.Insert(ctx, true, postBatch...)
if err != nil {
return fmt.Errorf("mongodb post insert error: %v", err)
@ -272,7 +272,7 @@ func ConsumeAndSaveToMongoDB(
postBatch = []*collections.Post{} // Clear batch after insert
} else {
// If we haven't seen any data for 25 seconds, cancel the consumer connection
if consumerLastFlushingTime.Add(time.Second*25).Before(time.Now()) {
if consumerLastFlushingTime.Add(time.Second * 25).Before(time.Now()) {
cancel()
}
}

View File

@ -1,4 +1,4 @@
package generator
package az
import (
"context"
@ -13,39 +13,24 @@ import (
"go.mongodb.org/mongo-driver/mongo/options"
)
var azInvalidUser []string = []string{
"did:plc:5zww7zorx2ajw7hqrhuix3ba",
"did:plc:c4vhz47h566t2ntgd7gtawen",
}
var azValidUsers []string = []string{
"did:plc:jbt4qi6psd7rutwzedtecsq7",
"did:plc:yzgdpxsklrmfgqmjghdvw3ti",
"did:plc:g7ebgiai577ln3avsi2pt3sn",
"did:plc:phtq2rhgbwipyx5ie3apw44j",
"did:plc:jfdvklrs5n5qv7f25v6swc5h",
"did:plc:u5ez5w6qslh6advti4wyddba",
"did:plc:cs2cbzojm6hmx5lfxiuft3mq",
}
type FeedGeneratorAz struct {
type Generator struct {
postCollection *collections.PostCollection
feedAzCollection *collections.FeedAzCollection
textRegex *regexp.Regexp
}
func NewFeedGeneratorAz(
func NewGenerator(
postCollection *collections.PostCollection,
feedAzCollection *collections.FeedAzCollection,
) *FeedGeneratorAz {
return &FeedGeneratorAz{
) *Generator {
return &Generator{
postCollection: postCollection,
feedAzCollection: feedAzCollection,
textRegex: regexp.MustCompile("(?i)(azerbaijan|azərbaycan|aзербайджан|azerbaycan)"),
}
}
func (generator *FeedGeneratorAz) Start(ctx context.Context, cursorOption types.GeneratorCursor, batchSize int) error {
func (generator *Generator) Start(ctx context.Context, cursorOption types.GeneratorCursor, batchSize int) error {
var mongoCursor *mongo.Cursor
switch cursorOption {
case types.GeneratorCursorLastGenerated:
@ -124,17 +109,16 @@ func (generator *FeedGeneratorAz) Start(ctx context.Context, cursorOption types.
return nil
}
func (generator *FeedGeneratorAz) IsValid(post *collections.Post) bool {
func (generator *Generator) IsValid(post *collections.Post) bool {
if post.Reply != nil && post.Reply.RootURI != post.Reply.ParentURI {
return false
}
if slices.Contains(azInvalidUser, post.DID) {
return false
if isValidUser := users.IsValid(post.DID); isValidUser != nil {
return *isValidUser
}
if slices.Contains(azValidUsers, post.DID) || // Posts from always-valid users
(slices.Contains(post.Langs, "az") && len(post.Langs) < 3) || // Posts in Azerbaijani language with fewer than 3 languages
if (slices.Contains(post.Langs, "az") && len(post.Langs) < 3) || // Posts in Azerbaijani language with fewer than 3 languages
generator.textRegex.MatchString(post.Text) { // Posts containing Azerbaijan-related keywords
return true
}

21
pkg/generator/az/lists.go Normal file
View File

@ -0,0 +1,21 @@
package az
import "github.com/aykhans/bsky-feedgen/pkg/generator"
var users = generator.Users{
// Invalid
"did:plc:5zww7zorx2ajw7hqrhuix3ba": false,
"did:plc:c4vhz47h566t2ntgd7gtawen": false,
"did:plc:lc7j7xdq67gn7vc6vzmydfqk": false,
"did:plc:msian4dqa2rqalf3biilnf3m": false,
"did:plc:gtosalycg7snvodjhsze35jm": false,
// Valid
"did:plc:jbt4qi6psd7rutwzedtecsq7": true,
"did:plc:yzgdpxsklrmfgqmjghdvw3ti": true,
"did:plc:g7ebgiai577ln3avsi2pt3sn": true,
"did:plc:phtq2rhgbwipyx5ie3apw44j": true,
"did:plc:jfdvklrs5n5qv7f25v6swc5h": true,
"did:plc:u5ez5w6qslh6advti4wyddba": true,
"did:plc:cs2cbzojm6hmx5lfxiuft3mq": true,
}

14
pkg/generator/base.go Normal file
View File

@ -0,0 +1,14 @@
package generator
import "github.com/aykhans/bsky-feedgen/pkg/utils"
type Users map[string]bool
func (u Users) IsValid(did string) *bool {
isValid, ok := u[did]
if ok == false {
return nil
}
return utils.ToPtr(isValid)
}