mirror of
https://github.com/aykhans/bsky-feedgen.git
synced 2025-07-17 21:34:00 +00:00
Compare commits
47 Commits
e6fec752f9
...
main
Author | SHA1 | Date | |
---|---|---|---|
562513021e | |||
896ccde393 | |||
4f07272a82 | |||
efe2a42f0e | |||
56ec1a39a0 | |||
b9633e84da | |||
802ff21a42 | |||
bc29dabd8a | |||
7b89230cae | |||
dbf892535b | |||
1f12a3d090 | |||
19be0f09d2 | |||
919fec0aa9 | |||
3097aba9c3 | |||
6799ad241e | |||
3378baf0bc | |||
94df1dd259 | |||
ab3baf76b1 | |||
3c6fb06282 | |||
ea9d7bb67e | |||
56581c7332 | |||
8ea4602141 | |||
12e51b5a22 | |||
50af73f26a | |||
35907380fb | |||
bad7b4a304 | |||
9917f61db1 | |||
1eecbafd07 | |||
b6eaaf7331 | |||
667769cbd7 | |||
4beeb84f07 | |||
fd6a185bac | |||
bcd721e071 | |||
4d5abe66a6 | |||
58dce559d3 | |||
e900cd3d47 | |||
211f1e7d5a | |||
50acf8d432 | |||
7242754124 | |||
c4bd6affa6 | |||
d8dd2c75a6 | |||
74b8324b6f | |||
588cfc0fcc | |||
48a8d2b5f4 | |||
16a2a62ba1 | |||
259c139d92 | |||
c0d03ba341 |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -1 +1,2 @@
|
||||
*.env
|
||||
prod/**/*.env
|
||||
.claude
|
60
Makefile
60
Makefile
@@ -1,60 +0,0 @@
|
||||
# Equivalent Makefile for Taskfile.yaml
|
||||
|
||||
.PHONY: ftl fmt tidy lint run-consumer run-feedgen-az run-api run-manager generate-env
|
||||
|
||||
# Default value for ARGS if not provided on the command line
|
||||
ARGS ?=
|
||||
|
||||
# Runs fmt, tidy, and lint sequentially
|
||||
ftl:
|
||||
$(MAKE) fmt
|
||||
$(MAKE) tidy
|
||||
$(MAKE) lint
|
||||
|
||||
# Format Go code
|
||||
fmt:
|
||||
gofmt -w -d .
|
||||
|
||||
# Tidy Go modules
|
||||
tidy:
|
||||
go mod tidy
|
||||
|
||||
# Run golangci-lint
|
||||
lint:
|
||||
golangci-lint run
|
||||
|
||||
# Run the consumer application, loading environment from dotenv files
|
||||
run-consumer:
|
||||
set -a; \
|
||||
. config/app/.consumer.env; \
|
||||
. config/app/.mongodb.env; \
|
||||
set +a; \
|
||||
go run cmd/consumer/main.go $(ARGS)
|
||||
|
||||
# Run the feedgen-az application, loading environment from dotenv files
|
||||
run-feedgen-az:
|
||||
set -a; \
|
||||
. config/app/feedgen/.az.env; \
|
||||
. config/app/.mongodb.env; \
|
||||
set +a; \
|
||||
go run cmd/feedgen/az/main.go $(ARGS)
|
||||
|
||||
# Run the api application, loading environment from dotenv files
|
||||
run-api:
|
||||
set -a; \
|
||||
. config/app/.api.env; \
|
||||
. config/app/.mongodb.env; \
|
||||
set +a; \
|
||||
go run cmd/api/main.go
|
||||
|
||||
# Run the manager application with arguments (no dotenv)
|
||||
run-manager:
|
||||
go run cmd/manager/main.go $(ARGS)
|
||||
|
||||
# Generate env files from templates
|
||||
generate-env:
|
||||
cp config/app/consumer.env.example config/app/.consumer.env
|
||||
cp config/app/api.env.example config/app/.api.env
|
||||
cp config/app/mongodb.env.example config/app/.mongodb.env
|
||||
cp config/app/feedgen/az.env.example config/app/feedgen/.az.env
|
||||
cp config/mongodb/env.example config/mongodb/.env
|
76
Taskfile.yml
76
Taskfile.yml
@@ -2,6 +2,9 @@
|
||||
|
||||
version: "3"
|
||||
|
||||
vars:
|
||||
DOCKER_REGISTRY: "git.aykhans.me/bsky/"
|
||||
|
||||
tasks:
|
||||
ftl:
|
||||
cmds:
|
||||
@@ -16,35 +19,26 @@ tasks:
|
||||
lint: golangci-lint run
|
||||
|
||||
run-consumer:
|
||||
cmd: go run cmd/consumer/main.go {{.CLI_ARGS}}
|
||||
cmd: go run ./cmd/consumer {{.CLI_ARGS}}
|
||||
dotenv:
|
||||
- config/app/.consumer.env
|
||||
- config/app/.mongodb.env
|
||||
- config/app/consumer.env
|
||||
- config/app/mongodb.env
|
||||
|
||||
run-feedgen-az:
|
||||
cmd: go run cmd/feedgen/az/main.go {{.CLI_ARGS}}
|
||||
cmd: go run ./cmd/feedgen/az {{.CLI_ARGS}}
|
||||
dotenv:
|
||||
- config/app/feedgen/.az.env
|
||||
- config/app/.mongodb.env
|
||||
- config/app/feedgen/az.env
|
||||
- config/app/mongodb.env
|
||||
|
||||
run-api:
|
||||
cmd: go run cmd/api/main.go
|
||||
cmd: go run ./cmd/api {{.CLI_ARGS}}
|
||||
dotenv:
|
||||
- config/app/.api.env
|
||||
- config/app/.mongodb.env
|
||||
- config/app/api.env
|
||||
- config/app/mongodb.env
|
||||
|
||||
run-manager:
|
||||
cmd: go run cmd/manager/main.go {{.CLI_ARGS}}
|
||||
|
||||
generate-env:
|
||||
desc: Generate env files from templates
|
||||
cmds:
|
||||
- cp config/app/consumer.env.example config/app/.consumer.env
|
||||
- cp config/app/api.env.example config/app/.api.env
|
||||
- cp config/app/mongodb.env.example config/app/.mongodb.env
|
||||
- cp config/app/feedgen/az.env.example config/app/feedgen/.az.env
|
||||
- cp config/mongodb/env.example config/mongodb/.env
|
||||
|
||||
docker-publish-all:
|
||||
desc: Publish docker images for all services
|
||||
cmds:
|
||||
@@ -55,21 +49,55 @@ tasks:
|
||||
|
||||
docker-publish-api:
|
||||
desc: Publish docker image for api service
|
||||
vars:
|
||||
GO_VERSION_FILE: ./cmd/api/version.go
|
||||
IMAGE_NAME: feedgen-api
|
||||
VERSION:
|
||||
sh: grep -o 'const version = "[^"]*"' {{.GO_VERSION_FILE}} | grep -o '"[^"]*"' | tr -d '"'
|
||||
VERSIONED_IMAGE: "{{.DOCKER_REGISTRY}}{{.IMAGE_NAME}}:{{.VERSION}}"
|
||||
LATEST_IMAGE: "{{.DOCKER_REGISTRY}}{{.IMAGE_NAME}}:latest"
|
||||
preconditions:
|
||||
- test -f {{.GO_VERSION_FILE}}
|
||||
- sh: '[ -n "{{.VERSION}}" ]'
|
||||
msg: "Could not extract version from {{.GO_FILE}}"
|
||||
cmds:
|
||||
- docker build -t git.aykhans.me/bsky/feedgen-api:latest -f ./cmd/api/Dockerfile .
|
||||
- docker push git.aykhans.me/bsky/feedgen-api:latest
|
||||
- docker build -t {{.VERSIONED_IMAGE}} -f ./cmd/api/Dockerfile .
|
||||
- docker tag {{.VERSIONED_IMAGE}} {{.LATEST_IMAGE}}
|
||||
- docker push {{.VERSIONED_IMAGE}}
|
||||
- docker push {{.LATEST_IMAGE}}
|
||||
- echo "Published {{.VERSIONED_IMAGE}} and {{.LATEST_IMAGE}}"
|
||||
|
||||
docker-publish-consumer:
|
||||
desc: Publish docker image for consumer service
|
||||
vars:
|
||||
GO_VERSION_FILE: ./cmd/consumer/version.go
|
||||
IMAGE_NAME: feedgen-consumer
|
||||
VERSION:
|
||||
sh: grep -o 'const version = "[^"]*"' {{.GO_VERSION_FILE}} | grep -o '"[^"]*"' | tr -d '"'
|
||||
VERSIONED_IMAGE: "{{.DOCKER_REGISTRY}}{{.IMAGE_NAME}}:{{.VERSION}}"
|
||||
LATEST_IMAGE: "{{.DOCKER_REGISTRY}}{{.IMAGE_NAME}}:latest"
|
||||
cmds:
|
||||
- docker build -t git.aykhans.me/bsky/feedgen-consumer:latest -f ./cmd/consumer/Dockerfile .
|
||||
- docker push git.aykhans.me/bsky/feedgen-consumer:latest
|
||||
- docker build -t {{.VERSIONED_IMAGE}} -f ./cmd/consumer/Dockerfile .
|
||||
- docker push {{.VERSIONED_IMAGE}}
|
||||
- docker tag {{.VERSIONED_IMAGE}} {{.LATEST_IMAGE}}
|
||||
- docker push {{.LATEST_IMAGE}}
|
||||
- echo "Published {{.VERSIONED_IMAGE}} and {{.LATEST_IMAGE}}"
|
||||
|
||||
docker-publish-feedgen-az:
|
||||
desc: Publish docker image for feedgen-az service
|
||||
vars:
|
||||
GO_VERSION_FILE: ./cmd/feedgen/az/version.go
|
||||
IMAGE_NAME: feedgen-generator-az
|
||||
VERSION:
|
||||
sh: grep -o 'const version = "[^"]*"' {{.GO_VERSION_FILE}} | grep -o '"[^"]*"' | tr -d '"'
|
||||
VERSIONED_IMAGE: "{{.DOCKER_REGISTRY}}{{.IMAGE_NAME}}:{{.VERSION}}"
|
||||
LATEST_IMAGE: "{{.DOCKER_REGISTRY}}{{.IMAGE_NAME}}:latest"
|
||||
cmds:
|
||||
- docker build -t git.aykhans.me/bsky/feedgen-generator-az:latest -f ./cmd/feedgen/az/Dockerfile .
|
||||
- docker push git.aykhans.me/bsky/feedgen-generator-az:latest
|
||||
- docker build -t {{.VERSIONED_IMAGE}} -f ./cmd/feedgen/az/Dockerfile .
|
||||
- docker push {{.VERSIONED_IMAGE}}
|
||||
- docker tag {{.VERSIONED_IMAGE}} {{.LATEST_IMAGE}}
|
||||
- docker push {{.LATEST_IMAGE}}
|
||||
- echo "Published {{.VERSIONED_IMAGE}} and {{.LATEST_IMAGE}}"
|
||||
|
||||
docker-publish-manager:
|
||||
desc: Publish docker image for manager service
|
||||
|
@@ -6,7 +6,7 @@ COPY go.mod go.sum ./
|
||||
COPY ../../pkg ./pkg
|
||||
COPY ../../cmd/api ./cmd/api
|
||||
|
||||
RUN CGO_ENABLED=0 go build -ldflags "-s -w" -o api ./cmd/api/main.go
|
||||
RUN CGO_ENABLED=0 go build -ldflags "-s -w" -o api ./cmd/api
|
||||
|
||||
FROM gcr.io/distroless/static-debian12:latest
|
||||
|
||||
|
@@ -2,8 +2,11 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"syscall"
|
||||
|
||||
"github.com/aykhans/bsky-feedgen/pkg/api"
|
||||
@@ -12,13 +15,24 @@ import (
|
||||
"github.com/aykhans/bsky-feedgen/pkg/logger"
|
||||
"github.com/aykhans/bsky-feedgen/pkg/storage/mongodb"
|
||||
"github.com/aykhans/bsky-feedgen/pkg/storage/mongodb/collections"
|
||||
_ "go.uber.org/automaxprocs"
|
||||
)
|
||||
|
||||
type flags struct {
|
||||
version bool
|
||||
}
|
||||
|
||||
func main() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go listenForTermination(func() { cancel() })
|
||||
|
||||
flags := getFlags()
|
||||
if flags.version == true {
|
||||
fmt.Printf("API version: %v\n", version)
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
apiConfig, errMap := config.NewAPIConfig()
|
||||
if errMap != nil {
|
||||
logger.Log.Error("API ENV error", "error", errMap.ToStringMap())
|
||||
@@ -58,3 +72,33 @@ func listenForTermination(do func()) {
|
||||
<-sigChan
|
||||
do()
|
||||
}
|
||||
|
||||
func getFlags() *flags {
|
||||
flags := &flags{}
|
||||
|
||||
flag.Usage = func() {
|
||||
fmt.Println(
|
||||
`Usage:
|
||||
|
||||
consumer [flags]
|
||||
|
||||
Flags:
|
||||
-version version information
|
||||
-h, -help Display this help message`)
|
||||
}
|
||||
|
||||
flag.BoolVar(&flags.version, "version", false, "print version information")
|
||||
flag.Parse()
|
||||
|
||||
if args := flag.Args(); len(args) > 0 {
|
||||
if len(args) == 1 {
|
||||
fmt.Printf("unexpected argument: %s\n\n", args[0])
|
||||
} else {
|
||||
fmt.Printf("unexpected arguments: %v\n\n", strings.Join(args, ", "))
|
||||
}
|
||||
flag.CommandLine.Usage()
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
return flags
|
||||
}
|
||||
|
3
cmd/api/version.go
Normal file
3
cmd/api/version.go
Normal file
@@ -0,0 +1,3 @@
|
||||
package main
|
||||
|
||||
const version = "0.2.205"
|
@@ -6,7 +6,7 @@ COPY go.mod go.sum ./
|
||||
COPY ../../pkg ./pkg
|
||||
COPY ../../cmd/consumer ./cmd/consumer
|
||||
|
||||
RUN CGO_ENABLED=0 go build -ldflags "-s -w" -o consumer ./cmd/consumer/main.go
|
||||
RUN CGO_ENABLED=0 go build -ldflags "-s -w" -o consumer ./cmd/consumer
|
||||
|
||||
FROM gcr.io/distroless/static-debian12:latest
|
||||
|
||||
|
@@ -17,44 +17,27 @@ import (
|
||||
"github.com/aykhans/bsky-feedgen/pkg/logger"
|
||||
"github.com/aykhans/bsky-feedgen/pkg/storage/mongodb"
|
||||
"github.com/aykhans/bsky-feedgen/pkg/storage/mongodb/collections"
|
||||
_ "go.uber.org/automaxprocs"
|
||||
)
|
||||
|
||||
type flags struct {
|
||||
version bool
|
||||
cursorOption types.ConsumerCursor
|
||||
}
|
||||
|
||||
func main() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go listenForTermination(func() { cancel() })
|
||||
|
||||
flag.Usage = func() {
|
||||
fmt.Println(
|
||||
`Usage:
|
||||
|
||||
consumer [flags]
|
||||
|
||||
Flags:
|
||||
-h, -help Display this help message
|
||||
-cursor string Specify the starting point for data consumption (default: last-consumed)
|
||||
Options:
|
||||
last-consumed: Resume from the last processed data in storage
|
||||
first-stream: Start from the beginning of the firehose
|
||||
current-stream: Start from the current position in the firehose stream`)
|
||||
flags := getFlags()
|
||||
if flags.version == true {
|
||||
fmt.Printf("Consumer version: %v\n", version)
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
var cursorOption types.ConsumerCursor
|
||||
flag.Var(&cursorOption, "cursor", "")
|
||||
flag.Parse()
|
||||
|
||||
if args := flag.Args(); len(args) > 0 {
|
||||
if len(args) == 1 {
|
||||
fmt.Printf("unexpected argument: %s\n\n", args[0])
|
||||
} else {
|
||||
fmt.Printf("unexpected arguments: %v\n\n", strings.Join(args, ", "))
|
||||
}
|
||||
flag.CommandLine.Usage()
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if cursorOption == "" {
|
||||
_ = cursorOption.Set("")
|
||||
if flags.cursorOption == "" {
|
||||
_ = flags.cursorOption.Set("")
|
||||
}
|
||||
|
||||
consumerConfig, errMap := config.NewConsumerConfig()
|
||||
@@ -88,7 +71,7 @@ Flags:
|
||||
ctx,
|
||||
postCollection,
|
||||
"wss://bsky.network",
|
||||
cursorOption,
|
||||
flags.cursorOption,
|
||||
consumerConfig.PostMaxDate, // Save only posts created before PostMaxDate
|
||||
10*time.Second, // Save consumed data to MongoDB every 10 seconds
|
||||
)
|
||||
@@ -120,3 +103,39 @@ func listenForTermination(do func()) {
|
||||
<-sigChan
|
||||
do()
|
||||
}
|
||||
|
||||
func getFlags() *flags {
|
||||
flags := &flags{}
|
||||
|
||||
flag.Usage = func() {
|
||||
fmt.Println(
|
||||
`Usage:
|
||||
|
||||
consumer [flags]
|
||||
|
||||
Flags:
|
||||
-version version information
|
||||
-h, -help Display this help message
|
||||
-cursor string Specify the starting point for data consumption (default: last-consumed)
|
||||
Options:
|
||||
last-consumed: Resume from the last processed data in storage
|
||||
first-stream: Start from the beginning of the firehose
|
||||
current-stream: Start from the current position in the firehose stream`)
|
||||
}
|
||||
|
||||
flag.BoolVar(&flags.version, "version", false, "print version information")
|
||||
flag.Var(&flags.cursorOption, "cursor", "Specify the starting point for data consumption")
|
||||
flag.Parse()
|
||||
|
||||
if args := flag.Args(); len(args) > 0 {
|
||||
if len(args) == 1 {
|
||||
fmt.Printf("unexpected argument: %s\n\n", args[0])
|
||||
} else {
|
||||
fmt.Printf("unexpected arguments: %v\n\n", strings.Join(args, ", "))
|
||||
}
|
||||
flag.CommandLine.Usage()
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
return flags
|
||||
}
|
||||
|
3
cmd/consumer/version.go
Normal file
3
cmd/consumer/version.go
Normal file
@@ -0,0 +1,3 @@
|
||||
package main
|
||||
|
||||
const version = "0.1.0"
|
@@ -6,7 +6,7 @@ COPY go.mod go.sum ./
|
||||
COPY ../../pkg ./pkg
|
||||
COPY ../../cmd/feedgen/az ./cmd/feedgen/az
|
||||
|
||||
RUN CGO_ENABLED=0 go build -ldflags "-s -w" -o feedgen ./cmd/feedgen/az/main.go
|
||||
RUN CGO_ENABLED=0 go build -ldflags "-s -w" -o feedgen ./cmd/feedgen/az
|
||||
|
||||
FROM gcr.io/distroless/static-debian12:latest
|
||||
|
||||
|
@@ -10,50 +10,34 @@ import (
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/aykhans/bsky-feedgen/pkg/generator"
|
||||
feedgenAz "github.com/aykhans/bsky-feedgen/pkg/generator/az"
|
||||
"github.com/aykhans/bsky-feedgen/pkg/types"
|
||||
|
||||
"github.com/aykhans/bsky-feedgen/pkg/config"
|
||||
"github.com/aykhans/bsky-feedgen/pkg/logger"
|
||||
"github.com/aykhans/bsky-feedgen/pkg/storage/mongodb"
|
||||
"github.com/aykhans/bsky-feedgen/pkg/storage/mongodb/collections"
|
||||
_ "go.uber.org/automaxprocs"
|
||||
)
|
||||
|
||||
type flags struct {
|
||||
version bool
|
||||
cursorOption types.GeneratorCursor
|
||||
}
|
||||
|
||||
func main() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go listenForTermination(func() { cancel() })
|
||||
|
||||
flag.Usage = func() {
|
||||
fmt.Println(
|
||||
`Usage:
|
||||
|
||||
feedgen-az [flags]
|
||||
|
||||
Flags:
|
||||
-h, -help Display this help message
|
||||
-cursor string Specify the starting point for feed data generation (default: last-generated)
|
||||
Options:
|
||||
last-generated: Resume from the last generated data in storage
|
||||
first-post: Start from the beginning of the posts`)
|
||||
flags := getFlags()
|
||||
if flags.version == true {
|
||||
fmt.Printf("Feedgen Az version: %v\n", version)
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
var cursorOption types.GeneratorCursor
|
||||
flag.Var(&cursorOption, "cursor", "")
|
||||
flag.Parse()
|
||||
|
||||
if args := flag.Args(); len(args) > 0 {
|
||||
if len(args) == 1 {
|
||||
fmt.Printf("unexpected argument: %s\n\n", args[0])
|
||||
} else {
|
||||
fmt.Printf("unexpected arguments: %v\n\n", strings.Join(args, ", "))
|
||||
}
|
||||
flag.CommandLine.Usage()
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if cursorOption == "" {
|
||||
_ = cursorOption.Set("")
|
||||
if flags.cursorOption == "" {
|
||||
_ = flags.cursorOption.Set("")
|
||||
}
|
||||
|
||||
feedGenAzConfig, errMap := config.NewFeedGenAzConfig()
|
||||
@@ -86,9 +70,9 @@ Flags:
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
feedGeneratorAz := generator.NewFeedGeneratorAz(postCollection, feedAzCollection)
|
||||
feedGeneratorAz := feedgenAz.NewGenerator(postCollection, feedAzCollection)
|
||||
|
||||
startCrons(ctx, feedGenAzConfig, feedGeneratorAz, feedAzCollection, cursorOption)
|
||||
startCrons(ctx, feedGenAzConfig, feedGeneratorAz, feedAzCollection, flags.cursorOption)
|
||||
logger.Log.Info("Cron jobs started")
|
||||
|
||||
<-ctx.Done()
|
||||
@@ -97,7 +81,7 @@ Flags:
|
||||
func startCrons(
|
||||
ctx context.Context,
|
||||
feedGenAzConfig *config.FeedGenAzConfig,
|
||||
feedGeneratorAz *generator.FeedGeneratorAz,
|
||||
feedGeneratorAz *feedgenAz.Generator,
|
||||
feedAzCollection *collections.FeedAzCollection,
|
||||
cursorOption types.GeneratorCursor,
|
||||
) {
|
||||
@@ -138,3 +122,38 @@ func listenForTermination(do func()) {
|
||||
<-sigChan
|
||||
do()
|
||||
}
|
||||
|
||||
func getFlags() *flags {
|
||||
flags := &flags{}
|
||||
|
||||
flag.Usage = func() {
|
||||
fmt.Println(
|
||||
`Usage:
|
||||
|
||||
feedgen-az [flags]
|
||||
|
||||
Flags:
|
||||
-version version information
|
||||
-h, -help Display this help message
|
||||
-cursor string Specify the starting point for feed data generation (default: last-generated)
|
||||
Options:
|
||||
last-generated: Resume from the last generated data in storage
|
||||
first-post: Start from the beginning of the posts`)
|
||||
}
|
||||
|
||||
flag.BoolVar(&flags.version, "version", false, "print version information")
|
||||
flag.Var(&flags.cursorOption, "cursor", "Specify the starting point for feed data generation")
|
||||
flag.Parse()
|
||||
|
||||
if args := flag.Args(); len(args) > 0 {
|
||||
if len(args) == 1 {
|
||||
fmt.Printf("unexpected argument: %s\n\n", args[0])
|
||||
} else {
|
||||
fmt.Printf("unexpected arguments: %v\n\n", strings.Join(args, ", "))
|
||||
}
|
||||
flag.CommandLine.Usage()
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
return flags
|
||||
}
|
||||
|
3
cmd/feedgen/az/version.go
Normal file
3
cmd/feedgen/az/version.go
Normal file
@@ -0,0 +1,3 @@
|
||||
package main
|
||||
|
||||
const version = "0.1.110"
|
3
config/app/consumer.env
Normal file
3
config/app/consumer.env
Normal file
@@ -0,0 +1,3 @@
|
||||
POST_MAX_DATE=720h # Save only posts created in the last month
|
||||
POST_COLLECTION_CUTOFF_CRON_DELAY=30m # 30 minutes
|
||||
POST_COLLECTION_CUTOFF_CRON_MAX_DOCUMENT=1000000 # Delete post documents after 1 million
|
2
go.mod
2
go.mod
@@ -6,6 +6,7 @@ require (
|
||||
github.com/bluesky-social/indigo v0.0.0-20250516010818-f8de501bd6a0
|
||||
github.com/gorilla/websocket v1.5.1
|
||||
go.mongodb.org/mongo-driver v1.17.3
|
||||
go.uber.org/automaxprocs v1.6.0
|
||||
)
|
||||
|
||||
require (
|
||||
@@ -20,6 +21,7 @@ require (
|
||||
github.com/goccy/go-json v0.10.2 // indirect
|
||||
github.com/gocql/gocql v1.7.0 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/golang-jwt/jwt/v5 v5.2.2
|
||||
github.com/golang/snappy v0.0.4 // indirect
|
||||
github.com/google/uuid v1.4.0 // indirect
|
||||
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
|
||||
|
6
go.sum
6
go.sum
@@ -46,6 +46,8 @@ github.com/gocql/gocql v1.7.0 h1:O+7U7/1gSN7QTEAaMEsJc1Oq2QHXvCWoF3DFK9HDHus=
|
||||
github.com/gocql/gocql v1.7.0/go.mod h1:vnlvXyFZeLBF0Wy+RS8hrOdbn0UWsWtdg07XJnFxZ+4=
|
||||
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
|
||||
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
|
||||
github.com/golang-jwt/jwt/v5 v5.2.2 h1:Rl4B7itRWVtYIHFrSNd7vhTiz9UpLdi6gZhZ3wEeDy8=
|
||||
github.com/golang-jwt/jwt/v5 v5.2.2/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
|
||||
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
|
||||
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||
@@ -250,6 +252,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/polydawn/refmt v0.89.1-0.20221221234430-40501e09de1f h1:VXTQfuJj9vKR4TCkEuWIckKvdHFeJH/huIFJ9/cXOB0=
|
||||
github.com/polydawn/refmt v0.89.1-0.20221221234430-40501e09de1f/go.mod h1:/zvteZs/GwLtCgZ4BL6CBsk9IKIlexP43ObX9AxTqTw=
|
||||
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
|
||||
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
|
||||
github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q=
|
||||
github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY=
|
||||
github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
|
||||
@@ -330,6 +334,8 @@ go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
|
||||
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
|
||||
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
|
||||
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
|
||||
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
|
||||
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
|
||||
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
|
||||
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
|
||||
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
|
||||
|
@@ -7,6 +7,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/aykhans/bsky-feedgen/pkg/api/handler"
|
||||
"github.com/aykhans/bsky-feedgen/pkg/api/middleware"
|
||||
"github.com/aykhans/bsky-feedgen/pkg/config"
|
||||
"github.com/aykhans/bsky-feedgen/pkg/feed"
|
||||
"github.com/aykhans/bsky-feedgen/pkg/logger"
|
||||
@@ -22,15 +23,21 @@ func Run(
|
||||
return err
|
||||
}
|
||||
feedHandler := handler.NewFeedHandler(feeds, apiConfig.FeedgenPublisherDID)
|
||||
generatorHandler := handler.NewGeneratorHandler()
|
||||
|
||||
authMiddleware := middleware.NewAuth(apiConfig.ServiceDID)
|
||||
|
||||
mux := http.NewServeMux()
|
||||
|
||||
mux.HandleFunc("GET /.well-known/did.json", baseHandler.GetWellKnownDIDDoc)
|
||||
mux.HandleFunc("GET /xrpc/app.bsky.feed.describeFeedGenerator", feedHandler.DescribeFeeds)
|
||||
mux.HandleFunc(
|
||||
mux.Handle(
|
||||
"GET /xrpc/app.bsky.feed.getFeedSkeleton",
|
||||
feedHandler.GetFeedSkeleton,
|
||||
authMiddleware.JWTAuthMiddleware(http.HandlerFunc(feedHandler.GetFeedSkeleton)),
|
||||
)
|
||||
mux.HandleFunc("GET /{feed}/users", generatorHandler.GetAllUsers)
|
||||
mux.HandleFunc("GET /{feed}/users/valid/", generatorHandler.GetValidUsers)
|
||||
mux.HandleFunc("GET /{feed}/users/invalid/", generatorHandler.GetInvalidUsers)
|
||||
|
||||
httpServer := &http.Server{
|
||||
Addr: fmt.Sprintf(":%d", apiConfig.APIPort),
|
||||
|
@@ -50,7 +50,7 @@ func (handler *FeedHandler) DescribeFeeds(w http.ResponseWriter, r *http.Request
|
||||
}
|
||||
|
||||
func (handler *FeedHandler) GetFeedSkeleton(w http.ResponseWriter, r *http.Request) {
|
||||
userDID, _ := r.Context().Value(middleware.UserDIDKey).(string)
|
||||
userDID, _ := middleware.GetValue[string](r, middleware.UserDIDKey)
|
||||
|
||||
feedQuery := r.URL.Query().Get("feed")
|
||||
if feedQuery == "" {
|
||||
|
57
pkg/api/handler/generator.go
Normal file
57
pkg/api/handler/generator.go
Normal file
@@ -0,0 +1,57 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/aykhans/bsky-feedgen/pkg/api/response"
|
||||
generatorAz "github.com/aykhans/bsky-feedgen/pkg/generator/az"
|
||||
)
|
||||
|
||||
type GeneratorHandler struct{}
|
||||
|
||||
func NewGeneratorHandler() *GeneratorHandler {
|
||||
return &GeneratorHandler{}
|
||||
}
|
||||
|
||||
func (handler *GeneratorHandler) GetValidUsers(w http.ResponseWriter, r *http.Request) {
|
||||
feed := r.PathValue("feed")
|
||||
|
||||
validUsers := make([]string, 0)
|
||||
switch feed {
|
||||
case "AzPulse":
|
||||
validUsers = generatorAz.Users.GetValidUsers()
|
||||
}
|
||||
|
||||
response.JSON(w, 200, response.M{
|
||||
"feed": feed,
|
||||
"users": validUsers,
|
||||
})
|
||||
}
|
||||
|
||||
func (handler *GeneratorHandler) GetInvalidUsers(w http.ResponseWriter, r *http.Request) {
|
||||
feed := r.PathValue("feed")
|
||||
|
||||
invalidUsers := make([]string, 0)
|
||||
switch feed {
|
||||
case "AzPulse":
|
||||
invalidUsers = generatorAz.Users.GetInvalidUsers()
|
||||
}
|
||||
|
||||
response.JSON(w, 200, response.M{
|
||||
"feed": feed,
|
||||
"users": invalidUsers,
|
||||
})
|
||||
}
|
||||
|
||||
func (handler *GeneratorHandler) GetAllUsers(w http.ResponseWriter, r *http.Request) {
|
||||
feed := r.PathValue("feed")
|
||||
|
||||
responseData := response.M{"feed": feed}
|
||||
switch feed {
|
||||
case "AzPulse":
|
||||
responseData["valid_users"] = generatorAz.Users.GetValidUsers()
|
||||
responseData["invalid_users"] = generatorAz.Users.GetInvalidUsers()
|
||||
}
|
||||
|
||||
response.JSON(w, 200, responseData)
|
||||
}
|
@@ -2,12 +2,73 @@ package middleware
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"slices"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/bluesky-social/indigo/atproto/identity"
|
||||
"github.com/bluesky-social/indigo/atproto/syntax"
|
||||
"github.com/golang-jwt/jwt/v5"
|
||||
"github.com/whyrusleeping/go-did"
|
||||
)
|
||||
|
||||
const UserDIDKey ContextKey = "user_did"
|
||||
|
||||
func JWTAuthMiddleware(next http.Handler) http.Handler {
|
||||
const (
|
||||
authorizationHeaderName = "Authorization"
|
||||
authorizationHeaderValuePrefix = "Bearer "
|
||||
)
|
||||
|
||||
// Global (or dependency-injected) DID resolver with caching.
|
||||
var didResolver *identity.CacheDirectory
|
||||
|
||||
func init() {
|
||||
baseDir := identity.BaseDirectory{}
|
||||
|
||||
// Configure cache with appropriate TTLs.
|
||||
// Capacity 0 means unlimited cache size.
|
||||
// hitTTL: 24 hours for successful resolutions.
|
||||
// errTTL: 5 minutes for failed resolutions.
|
||||
// invalidHandleTTL: also 5 minutes for invalid handles.
|
||||
resolver := identity.NewCacheDirectory(
|
||||
&baseDir,
|
||||
0, // Unlimited capacity
|
||||
24*time.Hour, // hitTTL
|
||||
5*time.Minute, // errTTL
|
||||
5*time.Minute, // invalidHandleTTL
|
||||
)
|
||||
didResolver = &resolver
|
||||
}
|
||||
|
||||
type AuthorizationError struct {
|
||||
Message string
|
||||
Err error
|
||||
}
|
||||
|
||||
func (e *AuthorizationError) Error() string {
|
||||
if e.Err != nil {
|
||||
return fmt.Sprintf("%s: %v", e.Message, e.Err)
|
||||
}
|
||||
return e.Message
|
||||
}
|
||||
|
||||
func (e *AuthorizationError) Unwrap() error {
|
||||
return e.Err
|
||||
}
|
||||
|
||||
type Auth struct {
|
||||
serviceDID *did.DID
|
||||
}
|
||||
|
||||
func NewAuth(serviceDID *did.DID) *Auth {
|
||||
return &Auth{serviceDID}
|
||||
}
|
||||
|
||||
func (auth *Auth) JWTAuthMiddleware(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
authHeader := r.Header.Get("Authorization")
|
||||
if authHeader == "" {
|
||||
@@ -16,8 +77,100 @@ func JWTAuthMiddleware(next http.Handler) http.Handler {
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: Add auth verification
|
||||
ctx := context.WithValue(r.Context(), UserDIDKey, "")
|
||||
userDID, _ := auth.validateAuth(r.Context(), r)
|
||||
ctx := context.WithValue(r.Context(), UserDIDKey, userDID)
|
||||
|
||||
next.ServeHTTP(w, r.WithContext(ctx))
|
||||
})
|
||||
}
|
||||
|
||||
// getDIDSigningKey resolves a DID and extracts its public signing key.
|
||||
// It leverages indigo's identity package which handles multibase decoding and key parsing.
|
||||
func (auth *Auth) getDIDSigningKey(ctx context.Context, did string) (crypto.PublicKey, error) {
|
||||
atID, err := syntax.ParseAtIdentifier(did)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid DID syntax: %w", err)
|
||||
}
|
||||
|
||||
// Use Lookup for bi-directional verification (handle -> DID -> handle).
|
||||
// The `Lookup` method returns an `Identity` struct which contains `PublicKey()` method
|
||||
// to get the signing key.
|
||||
identity, err := didResolver.Lookup(ctx, *atID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("DID resolution failed for %s: %w", did, err)
|
||||
}
|
||||
if identity == nil || identity.DID.String() == "" {
|
||||
return nil, fmt.Errorf("DID resolution returned empty identity for %s", did)
|
||||
}
|
||||
|
||||
publicKey, err := identity.PublicKey()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get signing key for DID %s: %w", did, err)
|
||||
}
|
||||
|
||||
return publicKey, nil
|
||||
}
|
||||
|
||||
// ValidateAuth validates the authorization header and returns the requester's DID.
|
||||
func (auth *Auth) validateAuth(ctx context.Context, r *http.Request) (string, error) {
|
||||
authHeader := r.Header.Get(authorizationHeaderName)
|
||||
if authHeader == "" {
|
||||
return "", &AuthorizationError{Message: "Authorization header is missing"}
|
||||
}
|
||||
|
||||
if !strings.HasPrefix(authHeader, authorizationHeaderValuePrefix) {
|
||||
return "", &AuthorizationError{Message: "Invalid authorization header format"}
|
||||
}
|
||||
|
||||
jwtString := strings.TrimPrefix(authHeader, authorizationHeaderValuePrefix)
|
||||
jwtString = strings.TrimSpace(jwtString)
|
||||
|
||||
claims := jwt.RegisteredClaims{}
|
||||
|
||||
keyFunc := func(token *jwt.Token) (any, error) {
|
||||
regClaims, ok := token.Claims.(*jwt.RegisteredClaims)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("invalid JWT claims type")
|
||||
}
|
||||
|
||||
issuerDID := regClaims.Issuer
|
||||
if issuerDID == "" {
|
||||
return nil, fmt.Errorf("JWT 'iss' claim is missing")
|
||||
}
|
||||
|
||||
publicKey, err := auth.getDIDSigningKey(ctx, issuerDID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get signing key for DID %s: %w", issuerDID, err)
|
||||
}
|
||||
|
||||
return publicKey, nil
|
||||
}
|
||||
|
||||
token, err := jwt.ParseWithClaims(jwtString, &claims, keyFunc)
|
||||
if err != nil {
|
||||
if errors.Is(err, jwt.ErrTokenSignatureInvalid) {
|
||||
return "", &AuthorizationError{Message: "Invalid signature", Err: err}
|
||||
}
|
||||
if errors.Is(err, jwt.ErrTokenExpired) {
|
||||
return "", &AuthorizationError{Message: "Token expired", Err: err}
|
||||
}
|
||||
if errors.Is(err, jwt.ErrTokenNotValidYet) {
|
||||
return "", &AuthorizationError{Message: "Token not valid yet", Err: err}
|
||||
}
|
||||
if errors.Is(err, jwt.ErrTokenMalformed) {
|
||||
return "", &AuthorizationError{Message: "Malformed token", Err: err}
|
||||
}
|
||||
return "", &AuthorizationError{Message: "Failed to parse or validate JWT", Err: err}
|
||||
}
|
||||
|
||||
if !token.Valid {
|
||||
return "", &AuthorizationError{Message: "Token is invalid"}
|
||||
}
|
||||
|
||||
if slices.Contains(claims.Audience, auth.serviceDID.String()) {
|
||||
return "", &AuthorizationError{Message: fmt.Sprintf("Invalid audience (expected %s)", auth.serviceDID)}
|
||||
}
|
||||
|
||||
// Return the issuer's DID.
|
||||
return claims.Issuer, nil
|
||||
}
|
||||
|
@@ -1,3 +1,19 @@
|
||||
package middleware
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/aykhans/bsky-feedgen/pkg/types"
|
||||
)
|
||||
|
||||
type ContextKey string
|
||||
|
||||
func GetValue[T any](r *http.Request, key ContextKey) (T, error) {
|
||||
value, ok := r.Context().Value(key).(T)
|
||||
if ok == false {
|
||||
var zero T
|
||||
return zero, types.ErrNotfound
|
||||
}
|
||||
|
||||
return value, nil
|
||||
}
|
||||
|
89
pkg/api/middleware/es256k.go
Normal file
89
pkg/api/middleware/es256k.go
Normal file
@@ -0,0 +1,89 @@
|
||||
package middleware
|
||||
|
||||
// copied from https://gist.github.com/bnewbold/bc9b97c9b281295da1fa47c03b0b3c69
|
||||
|
||||
import (
|
||||
"crypto"
|
||||
"errors"
|
||||
|
||||
atcrypto "github.com/bluesky-social/indigo/atproto/crypto"
|
||||
"github.com/golang-jwt/jwt/v5"
|
||||
)
|
||||
|
||||
var (
|
||||
SigningMethodES256K *SigningMethodAtproto
|
||||
SigningMethodES256 *SigningMethodAtproto
|
||||
)
|
||||
|
||||
type SigningMethodAtproto struct {
|
||||
alg string
|
||||
hash crypto.Hash
|
||||
toOutSig toOutSig
|
||||
sigLen int
|
||||
}
|
||||
|
||||
type toOutSig func(sig []byte) []byte
|
||||
|
||||
func init() {
|
||||
SigningMethodES256K = &SigningMethodAtproto{
|
||||
alg: "ES256K",
|
||||
hash: crypto.SHA256,
|
||||
toOutSig: toES256K,
|
||||
sigLen: 64,
|
||||
}
|
||||
jwt.RegisterSigningMethod(SigningMethodES256K.Alg(), func() jwt.SigningMethod {
|
||||
return SigningMethodES256K
|
||||
})
|
||||
SigningMethodES256 = &SigningMethodAtproto{
|
||||
alg: "ES256",
|
||||
hash: crypto.SHA256,
|
||||
toOutSig: toES256,
|
||||
sigLen: 64,
|
||||
}
|
||||
jwt.RegisterSigningMethod(SigningMethodES256.Alg(), func() jwt.SigningMethod {
|
||||
return SigningMethodES256
|
||||
})
|
||||
}
|
||||
|
||||
// Errors returned on different problems.
|
||||
var (
|
||||
ErrWrongKeyFormat = errors.New("wrong key type")
|
||||
ErrBadSignature = errors.New("bad signature")
|
||||
ErrVerification = errors.New("signature verification failed")
|
||||
ErrFailedSigning = errors.New("failed generating signature")
|
||||
ErrHashUnavailable = errors.New("hasher unavailable")
|
||||
)
|
||||
|
||||
func (sm *SigningMethodAtproto) Verify(signingString string, sig []byte, key any) error {
|
||||
pub, ok := key.(atcrypto.PublicKey)
|
||||
if !ok {
|
||||
return ErrWrongKeyFormat
|
||||
}
|
||||
|
||||
if !sm.hash.Available() {
|
||||
return ErrHashUnavailable
|
||||
}
|
||||
|
||||
if len(sig) != sm.sigLen {
|
||||
return ErrBadSignature
|
||||
}
|
||||
|
||||
return pub.HashAndVerifyLenient([]byte(signingString), sig)
|
||||
}
|
||||
|
||||
func (sm *SigningMethodAtproto) Sign(signingString string, key any) ([]byte, error) {
|
||||
// TODO: implement signatures
|
||||
return nil, ErrFailedSigning
|
||||
}
|
||||
|
||||
func (sm *SigningMethodAtproto) Alg() string {
|
||||
return sm.alg
|
||||
}
|
||||
|
||||
func toES256K(sig []byte) []byte {
|
||||
return sig[:64]
|
||||
}
|
||||
|
||||
func toES256(sig []byte) []byte {
|
||||
return sig[:64]
|
||||
}
|
@@ -1,5 +1,9 @@
|
||||
package consumer
|
||||
|
||||
// This file contains code for consuming and processing the Bluesky firehose event stream.
|
||||
// Most of this implementation is copied and inspired from the original source at:
|
||||
// https://github.com/bluesky-social/indigo/blob/main/cmd/beemo/firehose_consumer.go
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
@@ -173,11 +177,12 @@ func ConsumeAndSaveToMongoDB(
|
||||
sequenceCursor = nil
|
||||
}
|
||||
|
||||
consumerLastFlushingTime := time.Now()
|
||||
go func() {
|
||||
defer cancel()
|
||||
for {
|
||||
err := RunFirehoseConsumer(
|
||||
ctx,
|
||||
localCtx,
|
||||
relayHost,
|
||||
func(sequence int64, did syntax.DID, recordKey syntax.RecordKey, post bsky.FeedPost) {
|
||||
firehoseDataChan <- CallbackData{sequence, did, recordKey, post}
|
||||
@@ -186,7 +191,7 @@ func ConsumeAndSaveToMongoDB(
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
if ctx.Err() != nil {
|
||||
if localCtx.Err() != nil {
|
||||
break
|
||||
}
|
||||
logger.Log.Error(err.Error())
|
||||
@@ -214,6 +219,7 @@ func ConsumeAndSaveToMongoDB(
|
||||
return nil
|
||||
|
||||
case <-localCtx.Done():
|
||||
logger.Log.Error("inactive firehose consumer error")
|
||||
return nil
|
||||
|
||||
case data := <-firehoseDataChan:
|
||||
@@ -261,12 +267,18 @@ func ConsumeAndSaveToMongoDB(
|
||||
|
||||
case <-ticker.C:
|
||||
if len(postBatch) > 0 {
|
||||
consumerLastFlushingTime = time.Now()
|
||||
// logger.Log.Info("flushing post batch", "count", len(postBatch))
|
||||
err := postCollection.Insert(ctx, true, postBatch...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("mongodb post insert error: %v", err)
|
||||
}
|
||||
postBatch = []*collections.Post{} // Clear batch after insert
|
||||
} else {
|
||||
// If we haven't seen any data for 25 seconds, cancel the consumer connection
|
||||
if consumerLastFlushingTime.Add(time.Second * 25).Before(time.Now()) {
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -39,7 +39,7 @@ func (f *FeedAz) Describe(_ context.Context) bsky.FeedDescribeFeedGenerator_Feed
|
||||
|
||||
func (f *FeedAz) GetPage(
|
||||
ctx context.Context,
|
||||
_ string,
|
||||
_ string, // user did
|
||||
limit int64,
|
||||
cursor string,
|
||||
) ([]*bsky.FeedDefs_SkeletonFeedPost, *string, error) {
|
||||
|
@@ -1,4 +1,4 @@
|
||||
package generator
|
||||
package az
|
||||
|
||||
import (
|
||||
"context"
|
||||
@@ -13,30 +13,24 @@ import (
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
)
|
||||
|
||||
var azValidUsers []string = []string{
|
||||
"did:plc:jbt4qi6psd7rutwzedtecsq7",
|
||||
"did:plc:yzgdpxsklrmfgqmjghdvw3ti",
|
||||
"did:plc:cs2cbzojm6hmx5lfxiuft3mq",
|
||||
}
|
||||
|
||||
type FeedGeneratorAz struct {
|
||||
type Generator struct {
|
||||
postCollection *collections.PostCollection
|
||||
feedAzCollection *collections.FeedAzCollection
|
||||
textRegex *regexp.Regexp
|
||||
}
|
||||
|
||||
func NewFeedGeneratorAz(
|
||||
func NewGenerator(
|
||||
postCollection *collections.PostCollection,
|
||||
feedAzCollection *collections.FeedAzCollection,
|
||||
) *FeedGeneratorAz {
|
||||
return &FeedGeneratorAz{
|
||||
) *Generator {
|
||||
return &Generator{
|
||||
postCollection: postCollection,
|
||||
feedAzCollection: feedAzCollection,
|
||||
textRegex: regexp.MustCompile("(?i)(azerbaijan|azərbaycan|aзербайджан|azerbaycan)"),
|
||||
}
|
||||
}
|
||||
|
||||
func (generator *FeedGeneratorAz) Start(ctx context.Context, cursorOption types.GeneratorCursor, batchSize int) error {
|
||||
func (generator *Generator) Start(ctx context.Context, cursorOption types.GeneratorCursor, batchSize int) error {
|
||||
var mongoCursor *mongo.Cursor
|
||||
switch cursorOption {
|
||||
case types.GeneratorCursorLastGenerated:
|
||||
@@ -115,14 +109,25 @@ func (generator *FeedGeneratorAz) Start(ctx context.Context, cursorOption types.
|
||||
return nil
|
||||
}
|
||||
|
||||
func (generator *FeedGeneratorAz) IsValid(post *collections.Post) bool {
|
||||
func (generator *Generator) IsValid(post *collections.Post) bool {
|
||||
// Skip posts that are deep replies (not direct replies to original posts)
|
||||
if post.Reply != nil && post.Reply.RootURI != post.Reply.ParentURI {
|
||||
return false
|
||||
}
|
||||
|
||||
if slices.Contains(azValidUsers, post.DID) || // Posts from always-valid users
|
||||
(slices.Contains(post.Langs, "az") && len(post.Langs) < 3) || // Posts in Azerbaijani language with fewer than 3 languages
|
||||
generator.textRegex.MatchString(post.Text) { // Posts containing Azerbaijan-related keywords
|
||||
// Check if the user who created this post is in our pre-defined list
|
||||
// This allows for explicit inclusion/exclusion of specific users
|
||||
if isValidUser := Users.IsValid(post.DID); isValidUser != nil {
|
||||
return *isValidUser
|
||||
}
|
||||
|
||||
// A post is considered valid if it meets either of the following criteria:
|
||||
// 1. It's primarily in Azerbaijani (language code "az") with less than 3 detected languages
|
||||
// (to filter out multi-language spam)
|
||||
// 2. It contains Azerbaijan-related keywords in the text AND has at least one valid language
|
||||
// from our approved language list
|
||||
if (slices.Contains(post.Langs, "az") && len(post.Langs) < 3) ||
|
||||
(generator.textRegex.MatchString(post.Text) && Langs.IsExistsAny(post.Langs)) {
|
||||
return true
|
||||
}
|
||||
|
62
pkg/generator/az/lists.go
Normal file
62
pkg/generator/az/lists.go
Normal file
@@ -0,0 +1,62 @@
|
||||
package az
|
||||
|
||||
import "github.com/aykhans/bsky-feedgen/pkg/generator"
|
||||
|
||||
var Users = generator.Users{
|
||||
// Invalid
|
||||
"did:plc:5zww7zorx2ajw7hqrhuix3ba": false,
|
||||
"did:plc:c4vhz47h566t2ntgd7gtawen": false,
|
||||
"did:plc:lc7j7xdq67gn7vc6vzmydfqk": false,
|
||||
"did:plc:msian4dqa2rqalf3biilnf3m": false,
|
||||
"did:plc:gtosalycg7snvodjhsze35jm": false,
|
||||
"did:plc:i53e6y3liw2oaw4s6e6odw5m": false,
|
||||
"did:plc:pvdqvmpkeermkhy7fezam473": false,
|
||||
"did:plc:5vwjnzaibnwscbbcvkzhy57v": false,
|
||||
"did:plc:6mfp3coadoobuvlg6w2avw6x": false,
|
||||
"did:plc:lm2uhaoqoe6yo76oeihndfyi": false,
|
||||
"did:plc:vizwdor43adw3277u2kkrssd": false,
|
||||
"did:plc:oqatvbgbhvqbjl2w2o63ehgi": false,
|
||||
"did:plc:gy7yilnydusx5hy2z3dltynp": false,
|
||||
"did:plc:xk7cs24wk6njv42azm2yd7dv": false,
|
||||
"did:plc:ijmt7f4p3dcfqtg3j3zshimn": false,
|
||||
"did:plc:2q5dx6whenn7pnsrfn3jpd6h": false,
|
||||
"did:plc:s2waw3gkmn7h2nn6od44apng": false,
|
||||
"did:plc:4hm6gb7dzobynqrpypif3dck": false,
|
||||
"did:plc:odvarii7w7soygxet3xvzop7": false,
|
||||
"did:plc:5cbkdchsxjvz5fog2oo7m4le": false,
|
||||
"did:plc:ooeuisen5rtr4rojmz7gkbrh": false,
|
||||
"did:plc:6bvhdvgeqkj7nol2zodtqmww": false,
|
||||
"did:plc:k6sxlkd5ssq2uaylzisap2tw": false,
|
||||
"did:plc:uxljnh22mmfzmr4i3oien6mx": false,
|
||||
"did:plc:w5gg2zgwcyfevphehdcmavev": false,
|
||||
"did:plc:ckawbibgmrwg3lbskfppwtlw": false,
|
||||
"did:plc:43fdk46qa5gsokzygzildsaq": false,
|
||||
"did:plc:3szm5t3tknphjtj73twqfonw": false,
|
||||
"did:plc:4ukvsogndgp67sv6f6ohse3y": false,
|
||||
"did:plc:cdplzvv63u5jxb4fxm4vpfgm": false,
|
||||
"did:plc:namifrcorf6hzy45phd4shvt": false,
|
||||
"did:plc:ltvtwjps77bqgm2knhlbswyk": false,
|
||||
"did:plc:acglo4ret2f2wc5duqtispsa": false,
|
||||
"did:plc:zibx3delbo24mdsccz6s7qa4": false,
|
||||
"did:plc:dbpnhjiyq5e7pe3a4mt3jyhx": false,
|
||||
|
||||
// Valid
|
||||
"did:plc:jbt4qi6psd7rutwzedtecsq7": true,
|
||||
"did:plc:yzgdpxsklrmfgqmjghdvw3ti": true,
|
||||
"did:plc:g7ebgiai577ln3avsi2pt3sn": true,
|
||||
"did:plc:phtq2rhgbwipyx5ie3apw44j": true,
|
||||
"did:plc:jfdvklrs5n5qv7f25v6swc5h": true,
|
||||
"did:plc:u5ez5w6qslh6advti4wyddba": true,
|
||||
"did:plc:x7alwnnjygt2aqcwblhazko7": true,
|
||||
"did:plc:mgciyhgfn65z7iazxuar6o6a": true,
|
||||
"did:plc:ay2f5go4lxq2hspiaqohegac": true,
|
||||
"did:plc:ftoopigdpuzqt2kpeyqxsofx": true,
|
||||
"did:plc:cs2cbzojm6hmx5lfxiuft3mq": true,
|
||||
}
|
||||
|
||||
var Langs = generator.Langs{
|
||||
"az": true,
|
||||
"en": true,
|
||||
"tr": true,
|
||||
"ru": true,
|
||||
}
|
89
pkg/generator/base.go
Normal file
89
pkg/generator/base.go
Normal file
@@ -0,0 +1,89 @@
|
||||
package generator
|
||||
|
||||
import "github.com/aykhans/bsky-feedgen/pkg/utils"
|
||||
|
||||
type Users map[string]bool
|
||||
|
||||
// IsValid checks if a given DID exists in the Users map and returns its validity status.
|
||||
//
|
||||
// Parameters:
|
||||
//
|
||||
// did: The Decentralized Identifier string to check
|
||||
//
|
||||
// Returns:
|
||||
// - *bool: A pointer to the validity status if the DID exists in the map
|
||||
// - nil: If the DID does not exist in the map
|
||||
func (u Users) IsValid(did string) *bool {
|
||||
isValid, ok := u[did]
|
||||
if ok == false {
|
||||
return nil
|
||||
}
|
||||
|
||||
return utils.ToPtr(isValid)
|
||||
}
|
||||
|
||||
// GetValidUsers returns a slice of DIDs that are marked as valid in the Users map.
|
||||
//
|
||||
// Returns:
|
||||
// - []string: A slice of valid DIDs, limited by the specified parameters
|
||||
func (u Users) GetValidUsers() []string {
|
||||
validUsers := make([]string, 0)
|
||||
|
||||
for did, isValid := range u {
|
||||
if isValid {
|
||||
validUsers = append(validUsers, did)
|
||||
}
|
||||
}
|
||||
|
||||
return validUsers
|
||||
}
|
||||
|
||||
// GetInvalidUsers returns a slice of DIDs that are marked as invalid in the Users map.
|
||||
//
|
||||
// Returns:
|
||||
// - []string: A slice of invalid DIDs, limited by the specified parameters
|
||||
func (u Users) GetInvalidUsers() []string {
|
||||
invalidUsers := make([]string, 0)
|
||||
|
||||
for did, isValid := range u {
|
||||
if !isValid {
|
||||
invalidUsers = append(invalidUsers, did)
|
||||
}
|
||||
}
|
||||
|
||||
return invalidUsers
|
||||
}
|
||||
|
||||
// GetAll returns a slice of all DIDs in the Users map, regardless of validity status.
|
||||
//
|
||||
// Returns:
|
||||
// - []string: A slice containing all DIDs in the map
|
||||
func (u Users) GetAll() []string {
|
||||
allUsers := make([]string, 0, len(u))
|
||||
|
||||
for did := range u {
|
||||
allUsers = append(allUsers, did)
|
||||
}
|
||||
|
||||
return allUsers
|
||||
}
|
||||
|
||||
type Langs map[string]bool
|
||||
|
||||
// IsExistsAny checks if any of the given language codes exist in the Langs map.
|
||||
//
|
||||
// Parameters:
|
||||
// - langs: A slice of language code strings to check for existence
|
||||
//
|
||||
// Returns:
|
||||
// - bool: true if at least one language code from the input slice exists in the map,
|
||||
// false if none of the provided language codes exist
|
||||
func (l Langs) IsExistsAny(langs []string) bool {
|
||||
for _, lang := range langs {
|
||||
if _, ok := l[lang]; ok {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
@@ -163,7 +163,8 @@ func (f FeedAzCollection) CutoffByCount(
|
||||
|
||||
findOpts := options.Find().
|
||||
SetSort(bson.D{{Key: "created_at", Value: 1}}).
|
||||
SetLimit(deleteCount)
|
||||
SetLimit(deleteCount).
|
||||
SetProjection(bson.M{"_id": 1})
|
||||
|
||||
cursor, err := f.Collection.Find(ctx, bson.M{}, findOpts)
|
||||
if err != nil {
|
||||
@@ -171,24 +172,46 @@ func (f FeedAzCollection) CutoffByCount(
|
||||
}
|
||||
defer func() { _ = cursor.Close(ctx) }()
|
||||
|
||||
var docsToDelete []bson.M
|
||||
if err = cursor.All(ctx, &docsToDelete); err != nil {
|
||||
return 0, err
|
||||
// Process documents in batches to avoid potential memory issues
|
||||
const batchSize = 10000
|
||||
var totalDeleted int64 = 0
|
||||
|
||||
for {
|
||||
batch := make([]string, 0, batchSize)
|
||||
batchCount := 0
|
||||
|
||||
for cursor.Next(ctx) && batchCount < batchSize {
|
||||
var doc struct {
|
||||
ID string `bson:"_id"`
|
||||
}
|
||||
if err = cursor.Decode(&doc); err != nil {
|
||||
return totalDeleted, err
|
||||
}
|
||||
batch = append(batch, doc.ID)
|
||||
batchCount++
|
||||
}
|
||||
|
||||
if len(batch) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
// Delete the batch
|
||||
result, err := f.Collection.DeleteMany(ctx, bson.M{"_id": bson.M{"$in": batch}})
|
||||
if err != nil {
|
||||
return totalDeleted, err
|
||||
}
|
||||
|
||||
totalDeleted += result.DeletedCount
|
||||
|
||||
if cursor.Err() != nil {
|
||||
return totalDeleted, cursor.Err()
|
||||
}
|
||||
|
||||
// If we didn't fill the batch, we're done
|
||||
if batchCount < batchSize {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if len(docsToDelete) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
ids := make([]any, len(docsToDelete))
|
||||
for i := range docsToDelete {
|
||||
ids[i] = docsToDelete[i]["_id"]
|
||||
}
|
||||
|
||||
result, err := f.Collection.DeleteMany(ctx, bson.M{"_id": bson.M{"$in": ids}})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return result.DeletedCount, nil
|
||||
return totalDeleted, nil
|
||||
}
|
||||
|
@@ -17,10 +17,15 @@ type PostCollection struct {
|
||||
func NewPostCollection(client *mongo.Client) (*PostCollection, error) {
|
||||
client.Database(config.MongoDBBaseDB).Collection("")
|
||||
coll := client.Database(config.MongoDBBaseDB).Collection("post")
|
||||
_, err := coll.Indexes().CreateOne(
|
||||
_, err := coll.Indexes().CreateMany(
|
||||
context.Background(),
|
||||
mongo.IndexModel{
|
||||
Keys: bson.D{{Key: "sequence", Value: -1}},
|
||||
[]mongo.IndexModel{
|
||||
{
|
||||
Keys: bson.D{{Key: "sequence", Value: -1}},
|
||||
},
|
||||
{
|
||||
Keys: bson.D{{Key: "created_at", Value: 1}},
|
||||
},
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
@@ -71,7 +76,8 @@ func (p PostCollection) CutoffByCount(
|
||||
|
||||
findOpts := options.Find().
|
||||
SetSort(bson.D{{Key: "created_at", Value: 1}}).
|
||||
SetLimit(deleteCount)
|
||||
SetLimit(deleteCount).
|
||||
SetProjection(bson.M{"_id": 1})
|
||||
|
||||
cursor, err := p.Collection.Find(ctx, bson.M{}, findOpts)
|
||||
if err != nil {
|
||||
@@ -79,26 +85,48 @@ func (p PostCollection) CutoffByCount(
|
||||
}
|
||||
defer func() { _ = cursor.Close(ctx) }()
|
||||
|
||||
var docsToDelete []bson.M
|
||||
if err = cursor.All(ctx, &docsToDelete); err != nil {
|
||||
return 0, err
|
||||
// Process documents in batches to avoid potential memory issues
|
||||
const batchSize = 10000
|
||||
var totalDeleted int64 = 0
|
||||
|
||||
for {
|
||||
batch := make([]string, 0, batchSize)
|
||||
batchCount := 0
|
||||
|
||||
for cursor.Next(ctx) && batchCount < batchSize {
|
||||
var doc struct {
|
||||
ID string `bson:"_id"`
|
||||
}
|
||||
if err = cursor.Decode(&doc); err != nil {
|
||||
return totalDeleted, err
|
||||
}
|
||||
batch = append(batch, doc.ID)
|
||||
batchCount++
|
||||
}
|
||||
|
||||
if len(batch) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
// Delete the batch
|
||||
result, err := p.Collection.DeleteMany(ctx, bson.M{"_id": bson.M{"$in": batch}})
|
||||
if err != nil {
|
||||
return totalDeleted, err
|
||||
}
|
||||
|
||||
totalDeleted += result.DeletedCount
|
||||
|
||||
if cursor.Err() != nil {
|
||||
return totalDeleted, cursor.Err()
|
||||
}
|
||||
|
||||
// If we didn't fill the batch, we're done
|
||||
if batchCount < batchSize {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if len(docsToDelete) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
ids := make([]any, len(docsToDelete))
|
||||
for i := range docsToDelete {
|
||||
ids[i] = docsToDelete[i]["_id"]
|
||||
}
|
||||
|
||||
result, err := p.Collection.DeleteMany(ctx, bson.M{"_id": bson.M{"$in": ids}})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return result.DeletedCount, nil
|
||||
return totalDeleted, nil
|
||||
}
|
||||
|
||||
func (p PostCollection) GetMaxSequence(ctx context.Context) (*int64, error) {
|
||||
|
@@ -4,4 +4,5 @@ import "errors"
|
||||
|
||||
var (
|
||||
ErrInternal = errors.New("internal error")
|
||||
ErrNotfound = errors.New("not found")
|
||||
)
|
||||
|
26
prod/Makefile
Normal file
26
prod/Makefile
Normal file
@@ -0,0 +1,26 @@
|
||||
.PHONY: configure
|
||||
|
||||
configure:
|
||||
@cp config/app/api.env.example config/app/.api.env
|
||||
@cp config/app/consumer.env.example config/app/.consumer.env
|
||||
@cp config/app/mongodb.env.example config/app/.mongodb.env
|
||||
@cp config/app/feedgen/az.env.example config/app/feedgen/.az.env
|
||||
@cp config/caddy/env.example config/caddy/.env
|
||||
@cp config/mongodb/env.example config/mongodb/.env
|
||||
|
||||
@read -p "Enter MongoDB username: " mongodb_user; \
|
||||
printf "Enter MongoDB password: "; \
|
||||
read mongodb_pass; \
|
||||
sed -i "s/MONGO_INITDB_ROOT_USERNAME=.*/MONGO_INITDB_ROOT_USERNAME=$$mongodb_user/" config/mongodb/.env; \
|
||||
sed -i "s/MONGO_INITDB_ROOT_PASSWORD=.*/MONGO_INITDB_ROOT_PASSWORD=$$mongodb_pass/" config/mongodb/.env; \
|
||||
sed -i "s/MONGODB_USERNAME=.*/MONGODB_USERNAME=$$mongodb_user/" config/app/.mongodb.env; \
|
||||
sed -i "s/MONGODB_PASSWORD=.*/MONGODB_PASSWORD=$$mongodb_pass/" config/app/.mongodb.env
|
||||
|
||||
@read -p "Enter domain name (e.g., feeds.bsky.example.com): " domain; \
|
||||
read -p "Enter your AT Protocol DID: " publisher_did; \
|
||||
sed -i "s/DOMAIN=.*/DOMAIN=$$domain/" config/caddy/.env; \
|
||||
sed -i "s|FEEDGEN_HOSTNAME=.*|FEEDGEN_HOSTNAME=https://$$domain|" config/app/.api.env; \
|
||||
sed -i "s/FEEDGEN_PUBLISHER_DID=.*/FEEDGEN_PUBLISHER_DID=$$publisher_did/" config/app/.api.env
|
||||
|
||||
@echo
|
||||
@echo "Configuration complete! You can now run 'docker compose up -d'"
|
76
prod/README.md
Normal file
76
prod/README.md
Normal file
@@ -0,0 +1,76 @@
|
||||
# Example Production Deployment
|
||||
|
||||
This is an example of a production deployment for the Feed Generator.
|
||||
|
||||
## Architecture
|
||||
|
||||
The production setup includes the following services:
|
||||
|
||||
- **MongoDB**: Database for storing posts and feed data
|
||||
- **Consumer**: Service that consumes AT Protocol firehose data
|
||||
- **Feed Generator (AZ)**: Generates feeds for Azerbaijan-related content
|
||||
- **API**: REST API service for serving feeds
|
||||
- **Caddy**: Reverse proxy
|
||||
|
||||
## Quick Start
|
||||
|
||||
1. **Configure the environment**:
|
||||
```bash
|
||||
make configure
|
||||
```
|
||||
This will:
|
||||
- Copy all example configuration files
|
||||
- Prompt for MongoDB credentials
|
||||
- Prompt for domain name and AT Protocol DID
|
||||
- Update configuration files with your values
|
||||
|
||||
2. **Start the services**:
|
||||
```bash
|
||||
docker compose up -d
|
||||
```
|
||||
|
||||
3. **Check service status**:
|
||||
```bash
|
||||
docker compose ps
|
||||
docker compose logs
|
||||
```
|
||||
|
||||
## Configuration Files
|
||||
|
||||
### Application Configuration
|
||||
- `config/app/.api.env` - API service configuration
|
||||
- `config/app/.consumer.env` - Consumer service configuration
|
||||
- `config/app/.mongodb.env` - MongoDB connection settings
|
||||
- `config/app/feedgen/.az.env` - Azerbaijan feed generator settings
|
||||
|
||||
### Infrastructure Configuration
|
||||
- `config/caddy/.env` - Caddy reverse proxy settings
|
||||
- `config/caddy/Caddyfile` - Caddy server configuration
|
||||
- `config/mongodb/.env` - MongoDB initialization settings
|
||||
|
||||
## Environment Variables
|
||||
|
||||
### API Service
|
||||
- `FEEDGEN_HOSTNAME` - Public hostname for the feed generator
|
||||
- `FEEDGEN_PUBLISHER_DID` - Your AT Protocol DID
|
||||
- `API_PORT` - Port for the API service (default: 8421)
|
||||
|
||||
### Consumer Service
|
||||
- `POST_MAX_DATE` - Maximum age of posts to store (default: 720h/30 days)
|
||||
- `POST_COLLECTION_CUTOFF_CRON_DELAY` - Cleanup interval (default: 30m)
|
||||
- `POST_COLLECTION_CUTOFF_CRON_MAX_DOCUMENT` - Max documents before cleanup (default: 1M)
|
||||
|
||||
### AZ Feed Generator
|
||||
- `FEED_AZ_GENERATER_CRON_DELAY` - Feed generation interval (default: 1m)
|
||||
- `FEED_AZ_COLLECTION_CUTOFF_CRON_DELAY` - Cleanup interval (default: 30m)
|
||||
- `FEED_AZ_COLLECTION_CUTOFF_CRON_MAX_DOCUMENT` - Max documents before cleanup (default: 500K)
|
||||
|
||||
### MongoDB
|
||||
- `MONGODB_HOST` - MongoDB hostname (default: mongodb)
|
||||
- `MONGODB_PORT` - MongoDB port (default: 27017)
|
||||
- `MONGODB_USERNAME` - Database username
|
||||
- `MONGODB_PASSWORD` - Database password
|
||||
|
||||
### Caddy
|
||||
- `DOMAIN` - Your domain name
|
||||
- `API_HOST` - Internal API service URL (default: http://api:8421)
|
3
prod/config/app/api.env.example
Normal file
3
prod/config/app/api.env.example
Normal file
@@ -0,0 +1,3 @@
|
||||
FEEDGEN_HOSTNAME=https://feeds.bsky.example.com
|
||||
FEEDGEN_PUBLISHER_DID=did:plc:qwertyuiopp
|
||||
API_PORT=8421
|
@@ -1,3 +1,3 @@
|
||||
POST_MAX_DATE=720h # Save only posts created in the last month
|
||||
POST_COLLECTION_CUTOFF_CRON_DELAY=30m # 30 minutes
|
||||
POST_COLLECTION_CUTOFF_CRON_MAX_DOCUMENT=10000000 # Delete post documents after 10 million
|
||||
POST_COLLECTION_CUTOFF_CRON_MAX_DOCUMENT=1000000 # Delete post documents after 1 million
|
3
prod/config/app/feedgen/az.env.example
Normal file
3
prod/config/app/feedgen/az.env.example
Normal file
@@ -0,0 +1,3 @@
|
||||
FEED_AZ_GENERATER_CRON_DELAY=1m # 1 minute
|
||||
FEED_AZ_COLLECTION_CUTOFF_CRON_DELAY=30m # 30 minutes
|
||||
FEED_AZ_COLLECTION_CUTOFF_CRON_MAX_DOCUMENT=500000 # Delete post documents after 500 thousand
|
4
prod/config/app/mongodb.env.example
Normal file
4
prod/config/app/mongodb.env.example
Normal file
@@ -0,0 +1,4 @@
|
||||
MONGODB_HOST=mongodb
|
||||
MONGODB_PORT=27017
|
||||
MONGODB_USERNAME=root
|
||||
MONGODB_PASSWORD=toor
|
11
prod/config/caddy/Caddyfile
Normal file
11
prod/config/caddy/Caddyfile
Normal file
@@ -0,0 +1,11 @@
|
||||
{
|
||||
admin off
|
||||
}
|
||||
|
||||
{$DOMAIN} {
|
||||
request_body {
|
||||
max_size 8MB
|
||||
}
|
||||
|
||||
reverse_proxy {$API_HOST}
|
||||
}
|
2
prod/config/caddy/env.example
Normal file
2
prod/config/caddy/env.example
Normal file
@@ -0,0 +1,2 @@
|
||||
DOMAIN=feeds.bsky.example.com
|
||||
API_HOST=http://api:8421
|
2
prod/config/mongodb/env.example
Normal file
2
prod/config/mongodb/env.example
Normal file
@@ -0,0 +1,2 @@
|
||||
MONGO_INITDB_ROOT_USERNAME=root
|
||||
MONGO_INITDB_ROOT_PASSWORD=toor
|
63
prod/docker-compose.yml
Normal file
63
prod/docker-compose.yml
Normal file
@@ -0,0 +1,63 @@
|
||||
services:
|
||||
mongodb:
|
||||
image: mongo:8.0.9-noble
|
||||
restart: unless-stopped
|
||||
# ports:
|
||||
# - 27017:27017
|
||||
env_file: ./config/mongodb/.env
|
||||
volumes:
|
||||
- mongodb_data:/data/db
|
||||
healthcheck:
|
||||
test: echo 'db.runCommand("ping").ok' | mongosh --quiet
|
||||
interval: 10s
|
||||
timeout: 5s
|
||||
retries: 5
|
||||
start_period: 20s
|
||||
|
||||
consumer:
|
||||
image: git.aykhans.me/bsky/feedgen-consumer:latest
|
||||
restart: unless-stopped
|
||||
env_file:
|
||||
- ./config/app/.mongodb.env
|
||||
- ./config/app/.consumer.env
|
||||
depends_on:
|
||||
mongodb:
|
||||
condition: service_healthy
|
||||
|
||||
feedgen_az:
|
||||
image: git.aykhans.me/bsky/feedgen-generator-az:latest
|
||||
restart: unless-stopped
|
||||
env_file:
|
||||
- ./config/app/.mongodb.env
|
||||
- ./config/app/feedgen/.az.env
|
||||
depends_on:
|
||||
mongodb:
|
||||
condition: service_healthy
|
||||
|
||||
api:
|
||||
image: git.aykhans.me/bsky/feedgen-api:latest
|
||||
restart: unless-stopped
|
||||
ports:
|
||||
- 8421:8421
|
||||
env_file:
|
||||
- ./config/app/.mongodb.env
|
||||
- ./config/app/.api.env
|
||||
depends_on:
|
||||
mongodb:
|
||||
condition: service_healthy
|
||||
|
||||
caddy:
|
||||
image: caddy:2.10.0-alpine
|
||||
restart: unless-stopped
|
||||
ports:
|
||||
- 80:80
|
||||
- 443:443
|
||||
- 443:443/udp
|
||||
env_file: ./config/caddy/.env
|
||||
volumes:
|
||||
- ./config/caddy/Caddyfile:/etc/caddy/Caddyfile
|
||||
- caddy_data:/data
|
||||
|
||||
volumes:
|
||||
mongodb_data:
|
||||
caddy_data:
|
Reference in New Issue
Block a user