Compare commits

20 Commits

Author SHA1 Message Date
1eecbafd07 Add auth middleware 2025-05-24 02:51:25 +04:00
b6eaaf7331 Auth middleware initial test 2025-05-24 01:34:53 +04:00
667769cbd7 Update 'users' list 2025-05-23 23:45:20 +04:00
4beeb84f07 Update 'users' list 2025-05-23 01:49:39 +04:00
fd6a185bac Delete compose.prod.yml 2025-05-21 22:47:10 +04:00
bcd721e071 Add 'Users' type to generator 2025-05-21 17:12:14 +04:00
4d5abe66a6 Remove flushing log from consumer 2025-05-21 16:54:49 +04:00
58dce559d3 Az generator package to 'az' sub package 2025-05-21 16:53:31 +04:00
e900cd3d47 Update 'azInvalidUsers' list 2025-05-21 16:48:32 +04:00
211f1e7d5a Update 'azInvalidUsers' list 2025-05-21 16:46:27 +04:00
50acf8d432 Update 'azInvalidUser' --> 'azInvalidUsers' 2025-05-21 16:44:07 +04:00
7242754124 Update 'azInvalidUser' list 2025-05-21 16:43:39 +04:00
c4bd6affa6 Fix dead consumer error 2025-05-20 23:35:27 +04:00
d8dd2c75a6 Update 'azInvalidUser' list 2025-05-20 23:09:01 +04:00
74b8324b6f Add 'azInvalidUser' condition to az generator 2025-05-20 21:28:29 +04:00
588cfc0fcc Update 'azValidUsers' list 2025-05-19 20:55:41 +04:00
48a8d2b5f4 Update 'azValidUsers' list 2025-05-19 20:53:18 +04:00
16a2a62ba1 Optimize mongodb collections Cutoff methods 2025-05-19 16:59:50 +04:00
259c139d92 Add 'automaxprocs' 2025-05-19 16:59:17 +04:00
c0d03ba341 Add 'automaxprocs' 2025-05-19 16:53:20 +04:00
17 changed files with 443 additions and 67 deletions

View File

@@ -12,6 +12,7 @@ import (
"github.com/aykhans/bsky-feedgen/pkg/logger"
"github.com/aykhans/bsky-feedgen/pkg/storage/mongodb"
"github.com/aykhans/bsky-feedgen/pkg/storage/mongodb/collections"
_ "go.uber.org/automaxprocs"
)
func main() {

View File

@@ -17,6 +17,7 @@ import (
"github.com/aykhans/bsky-feedgen/pkg/logger"
"github.com/aykhans/bsky-feedgen/pkg/storage/mongodb"
"github.com/aykhans/bsky-feedgen/pkg/storage/mongodb/collections"
_ "go.uber.org/automaxprocs"
)
func main() {

View File

@@ -10,13 +10,14 @@ import (
"syscall"
"time"
"github.com/aykhans/bsky-feedgen/pkg/generator"
feedgenAz "github.com/aykhans/bsky-feedgen/pkg/generator/az"
"github.com/aykhans/bsky-feedgen/pkg/types"
"github.com/aykhans/bsky-feedgen/pkg/config"
"github.com/aykhans/bsky-feedgen/pkg/logger"
"github.com/aykhans/bsky-feedgen/pkg/storage/mongodb"
"github.com/aykhans/bsky-feedgen/pkg/storage/mongodb/collections"
_ "go.uber.org/automaxprocs"
)
func main() {
@@ -86,7 +87,7 @@ Flags:
os.Exit(1)
}
feedGeneratorAz := generator.NewFeedGeneratorAz(postCollection, feedAzCollection)
feedGeneratorAz := feedgenAz.NewGenerator(postCollection, feedAzCollection)
startCrons(ctx, feedGenAzConfig, feedGeneratorAz, feedAzCollection, cursorOption)
logger.Log.Info("Cron jobs started")
@@ -97,7 +98,7 @@ Flags:
func startCrons(
ctx context.Context,
feedGenAzConfig *config.FeedGenAzConfig,
feedGeneratorAz *generator.FeedGeneratorAz,
feedGeneratorAz *feedgenAz.Generator,
feedAzCollection *collections.FeedAzCollection,
cursorOption types.GeneratorCursor,
) {

2
go.mod
View File

@@ -6,6 +6,7 @@ require (
github.com/bluesky-social/indigo v0.0.0-20250516010818-f8de501bd6a0
github.com/gorilla/websocket v1.5.1
go.mongodb.org/mongo-driver v1.17.3
go.uber.org/automaxprocs v1.6.0
)
require (
@@ -20,6 +21,7 @@ require (
github.com/goccy/go-json v0.10.2 // indirect
github.com/gocql/gocql v1.7.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v5 v5.2.2
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect

6
go.sum
View File

@@ -46,6 +46,8 @@ github.com/gocql/gocql v1.7.0 h1:O+7U7/1gSN7QTEAaMEsJc1Oq2QHXvCWoF3DFK9HDHus=
github.com/gocql/gocql v1.7.0/go.mod h1:vnlvXyFZeLBF0Wy+RS8hrOdbn0UWsWtdg07XJnFxZ+4=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-jwt/jwt/v5 v5.2.2 h1:Rl4B7itRWVtYIHFrSNd7vhTiz9UpLdi6gZhZ3wEeDy8=
github.com/golang-jwt/jwt/v5 v5.2.2/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
@@ -250,6 +252,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/polydawn/refmt v0.89.1-0.20221221234430-40501e09de1f h1:VXTQfuJj9vKR4TCkEuWIckKvdHFeJH/huIFJ9/cXOB0=
github.com/polydawn/refmt v0.89.1-0.20221221234430-40501e09de1f/go.mod h1:/zvteZs/GwLtCgZ4BL6CBsk9IKIlexP43ObX9AxTqTw=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q=
github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY=
github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
@@ -330,6 +334,8 @@ go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=

View File

@@ -7,6 +7,7 @@ import (
"time"
"github.com/aykhans/bsky-feedgen/pkg/api/handler"
"github.com/aykhans/bsky-feedgen/pkg/api/middleware"
"github.com/aykhans/bsky-feedgen/pkg/config"
"github.com/aykhans/bsky-feedgen/pkg/feed"
"github.com/aykhans/bsky-feedgen/pkg/logger"
@@ -23,13 +24,15 @@ func Run(
}
feedHandler := handler.NewFeedHandler(feeds, apiConfig.FeedgenPublisherDID)
authMiddleware := middleware.NewAuth(apiConfig.ServiceDID)
mux := http.NewServeMux()
mux.HandleFunc("GET /.well-known/did.json", baseHandler.GetWellKnownDIDDoc)
mux.HandleFunc("GET /xrpc/app.bsky.feed.describeFeedGenerator", feedHandler.DescribeFeeds)
mux.HandleFunc(
mux.Handle(
"GET /xrpc/app.bsky.feed.getFeedSkeleton",
feedHandler.GetFeedSkeleton,
authMiddleware.JWTAuthMiddleware(http.HandlerFunc(feedHandler.GetFeedSkeleton)),
)
httpServer := &http.Server{

View File

@@ -50,7 +50,7 @@ func (handler *FeedHandler) DescribeFeeds(w http.ResponseWriter, r *http.Request
}
func (handler *FeedHandler) GetFeedSkeleton(w http.ResponseWriter, r *http.Request) {
userDID, _ := r.Context().Value(middleware.UserDIDKey).(string)
userDID, _ := middleware.GetValue[string](r, middleware.UserDIDKey)
feedQuery := r.URL.Query().Get("feed")
if feedQuery == "" {

View File

@@ -2,12 +2,73 @@ package middleware
import (
"context"
"crypto"
"errors"
"fmt"
"net/http"
"slices"
"strings"
"time"
"github.com/bluesky-social/indigo/atproto/identity"
"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/golang-jwt/jwt/v5"
"github.com/whyrusleeping/go-did"
)
const UserDIDKey ContextKey = "user_did"
func JWTAuthMiddleware(next http.Handler) http.Handler {
const (
authorizationHeaderName = "Authorization"
authorizationHeaderValuePrefix = "Bearer "
)
// Global (or dependency-injected) DID resolver with caching.
var didResolver *identity.CacheDirectory
func init() {
baseDir := identity.BaseDirectory{}
// Configure cache with appropriate TTLs.
// Capacity 0 means unlimited cache size.
// hitTTL: 24 hours for successful resolutions.
// errTTL: 5 minutes for failed resolutions.
// invalidHandleTTL: also 5 minutes for invalid handles.
resolver := identity.NewCacheDirectory(
&baseDir,
0, // Unlimited capacity
24*time.Hour, // hitTTL
5*time.Minute, // errTTL
5*time.Minute, // invalidHandleTTL
)
didResolver = &resolver
}
type AuthorizationError struct {
Message string
Err error
}
func (e *AuthorizationError) Error() string {
if e.Err != nil {
return fmt.Sprintf("%s: %v", e.Message, e.Err)
}
return e.Message
}
func (e *AuthorizationError) Unwrap() error {
return e.Err
}
type Auth struct {
serviceDID *did.DID
}
func NewAuth(serviceDID *did.DID) *Auth {
return &Auth{serviceDID}
}
func (auth *Auth) JWTAuthMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
authHeader := r.Header.Get("Authorization")
if authHeader == "" {
@@ -16,8 +77,100 @@ func JWTAuthMiddleware(next http.Handler) http.Handler {
return
}
// TODO: Add auth verification
ctx := context.WithValue(r.Context(), UserDIDKey, "")
userDID, _ := auth.validateAuth(r.Context(), r)
ctx := context.WithValue(r.Context(), UserDIDKey, userDID)
next.ServeHTTP(w, r.WithContext(ctx))
})
}
// getDIDSigningKey resolves a DID and extracts its public signing key.
// It leverages indigo's identity package which handles multibase decoding and key parsing.
func (auth *Auth) getDIDSigningKey(ctx context.Context, did string) (crypto.PublicKey, error) {
atID, err := syntax.ParseAtIdentifier(did)
if err != nil {
return nil, fmt.Errorf("invalid DID syntax: %w", err)
}
// Use Lookup for bi-directional verification (handle -> DID -> handle).
// The `Lookup` method returns an `Identity` struct which contains `PublicKey()` method
// to get the signing key.
identity, err := didResolver.Lookup(ctx, *atID)
if err != nil {
return nil, fmt.Errorf("DID resolution failed for %s: %w", did, err)
}
if identity == nil || identity.DID.String() == "" {
return nil, fmt.Errorf("DID resolution returned empty identity for %s", did)
}
publicKey, err := identity.PublicKey()
if err != nil {
return nil, fmt.Errorf("failed to get signing key for DID %s: %w", did, err)
}
return publicKey, nil
}
// ValidateAuth validates the authorization header and returns the requester's DID.
func (auth *Auth) validateAuth(ctx context.Context, r *http.Request) (string, error) {
authHeader := r.Header.Get(authorizationHeaderName)
if authHeader == "" {
return "", &AuthorizationError{Message: "Authorization header is missing"}
}
if !strings.HasPrefix(authHeader, authorizationHeaderValuePrefix) {
return "", &AuthorizationError{Message: "Invalid authorization header format"}
}
jwtString := strings.TrimPrefix(authHeader, authorizationHeaderValuePrefix)
jwtString = strings.TrimSpace(jwtString)
claims := jwt.RegisteredClaims{}
keyFunc := func(token *jwt.Token) (any, error) {
regClaims, ok := token.Claims.(*jwt.RegisteredClaims)
if !ok {
return nil, fmt.Errorf("invalid JWT claims type")
}
issuerDID := regClaims.Issuer
if issuerDID == "" {
return nil, fmt.Errorf("JWT 'iss' claim is missing")
}
publicKey, err := auth.getDIDSigningKey(ctx, issuerDID)
if err != nil {
return nil, fmt.Errorf("failed to get signing key for DID %s: %w", issuerDID, err)
}
return publicKey, nil
}
token, err := jwt.ParseWithClaims(jwtString, &claims, keyFunc)
if err != nil {
if errors.Is(err, jwt.ErrTokenSignatureInvalid) {
return "", &AuthorizationError{Message: "Invalid signature", Err: err}
}
if errors.Is(err, jwt.ErrTokenExpired) {
return "", &AuthorizationError{Message: "Token expired", Err: err}
}
if errors.Is(err, jwt.ErrTokenNotValidYet) {
return "", &AuthorizationError{Message: "Token not valid yet", Err: err}
}
if errors.Is(err, jwt.ErrTokenMalformed) {
return "", &AuthorizationError{Message: "Malformed token", Err: err}
}
return "", &AuthorizationError{Message: "Failed to parse or validate JWT", Err: err}
}
if !token.Valid {
return "", &AuthorizationError{Message: "Token is invalid"}
}
if slices.Contains(claims.Audience, auth.serviceDID.String()) {
return "", &AuthorizationError{Message: fmt.Sprintf("Invalid audience (expected %s)", auth.serviceDID)}
}
// Return the issuer's DID.
return claims.Issuer, nil
}

View File

@@ -1,3 +1,19 @@
package middleware
import (
"net/http"
"github.com/aykhans/bsky-feedgen/pkg/types"
)
type ContextKey string
func GetValue[T any](r *http.Request, key ContextKey) (T, error) {
value, ok := r.Context().Value(key).(T)
if ok == false {
var zero T
return zero, types.ErrNotfound
}
return value, nil
}

View File

@@ -0,0 +1,91 @@
package middleware
// copied from https://gist.github.com/bnewbold/bc9b97c9b281295da1fa47c03b0b3c69
import (
"crypto"
"errors"
"fmt"
atcrypto "github.com/bluesky-social/indigo/atproto/crypto"
"github.com/golang-jwt/jwt/v5"
)
var (
SigningMethodES256K *SigningMethodAtproto
SigningMethodES256 *SigningMethodAtproto
)
type SigningMethodAtproto struct {
alg string
hash crypto.Hash
toOutSig toOutSig
sigLen int
}
type toOutSig func(sig []byte) []byte
func init() {
SigningMethodES256K = &SigningMethodAtproto{
alg: "ES256K",
hash: crypto.SHA256,
toOutSig: toES256K,
sigLen: 64,
}
jwt.RegisterSigningMethod(SigningMethodES256K.Alg(), func() jwt.SigningMethod {
return SigningMethodES256K
})
SigningMethodES256 = &SigningMethodAtproto{
alg: "ES256",
hash: crypto.SHA256,
toOutSig: toES256,
sigLen: 64,
}
jwt.RegisterSigningMethod(SigningMethodES256.Alg(), func() jwt.SigningMethod {
return SigningMethodES256
})
fmt.Println("init Completed")
}
// Errors returned on different problems.
var (
ErrWrongKeyFormat = errors.New("wrong key type")
ErrBadSignature = errors.New("bad signature")
ErrVerification = errors.New("signature verification failed")
ErrFailedSigning = errors.New("failed generating signature")
ErrHashUnavailable = errors.New("hasher unavailable")
)
func (sm *SigningMethodAtproto) Verify(signingString string, sig []byte, key any) error {
pub, ok := key.(atcrypto.PublicKey)
if !ok {
return ErrWrongKeyFormat
}
if !sm.hash.Available() {
return ErrHashUnavailable
}
if len(sig) != sm.sigLen {
return ErrBadSignature
}
return pub.HashAndVerifyLenient([]byte(signingString), sig)
}
func (sm *SigningMethodAtproto) Sign(signingString string, key any) ([]byte, error) {
// TODO: implement signatures
return nil, ErrFailedSigning
}
func (sm *SigningMethodAtproto) Alg() string {
return sm.alg
}
func toES256K(sig []byte) []byte {
return sig[:64]
}
func toES256(sig []byte) []byte {
return sig[:64]
}

View File

@@ -1,5 +1,9 @@
package consumer
// This file contains code for consuming and processing the Bluesky firehose event stream.
// Most of this implementation is copied and inspired from the original source at:
// https://github.com/bluesky-social/indigo/blob/main/cmd/beemo/firehose_consumer.go
import (
"bytes"
"context"
@@ -173,11 +177,12 @@ func ConsumeAndSaveToMongoDB(
sequenceCursor = nil
}
consumerLastFlushingTime := time.Now()
go func() {
defer cancel()
for {
err := RunFirehoseConsumer(
ctx,
localCtx,
relayHost,
func(sequence int64, did syntax.DID, recordKey syntax.RecordKey, post bsky.FeedPost) {
firehoseDataChan <- CallbackData{sequence, did, recordKey, post}
@@ -186,7 +191,7 @@ func ConsumeAndSaveToMongoDB(
)
if err != nil {
if ctx.Err() != nil {
if localCtx.Err() != nil {
break
}
logger.Log.Error(err.Error())
@@ -214,6 +219,7 @@ func ConsumeAndSaveToMongoDB(
return nil
case <-localCtx.Done():
logger.Log.Error("inactive firehose consumer error")
return nil
case data := <-firehoseDataChan:
@@ -261,12 +267,18 @@ func ConsumeAndSaveToMongoDB(
case <-ticker.C:
if len(postBatch) > 0 {
consumerLastFlushingTime = time.Now()
// logger.Log.Info("flushing post batch", "count", len(postBatch))
err := postCollection.Insert(ctx, true, postBatch...)
if err != nil {
return fmt.Errorf("mongodb post insert error: %v", err)
}
postBatch = []*collections.Post{} // Clear batch after insert
} else {
// If we haven't seen any data for 25 seconds, cancel the consumer connection
if consumerLastFlushingTime.Add(time.Second * 25).Before(time.Now()) {
cancel()
}
}
}
}

View File

@@ -1,4 +1,4 @@
package generator
package az
import (
"context"
@@ -13,30 +13,24 @@ import (
"go.mongodb.org/mongo-driver/mongo/options"
)
var azValidUsers []string = []string{
"did:plc:jbt4qi6psd7rutwzedtecsq7",
"did:plc:yzgdpxsklrmfgqmjghdvw3ti",
"did:plc:cs2cbzojm6hmx5lfxiuft3mq",
}
type FeedGeneratorAz struct {
type Generator struct {
postCollection *collections.PostCollection
feedAzCollection *collections.FeedAzCollection
textRegex *regexp.Regexp
}
func NewFeedGeneratorAz(
func NewGenerator(
postCollection *collections.PostCollection,
feedAzCollection *collections.FeedAzCollection,
) *FeedGeneratorAz {
return &FeedGeneratorAz{
) *Generator {
return &Generator{
postCollection: postCollection,
feedAzCollection: feedAzCollection,
textRegex: regexp.MustCompile("(?i)(azerbaijan|azərbaycan|aзербайджан|azerbaycan)"),
}
}
func (generator *FeedGeneratorAz) Start(ctx context.Context, cursorOption types.GeneratorCursor, batchSize int) error {
func (generator *Generator) Start(ctx context.Context, cursorOption types.GeneratorCursor, batchSize int) error {
var mongoCursor *mongo.Cursor
switch cursorOption {
case types.GeneratorCursorLastGenerated:
@@ -115,13 +109,16 @@ func (generator *FeedGeneratorAz) Start(ctx context.Context, cursorOption types.
return nil
}
func (generator *FeedGeneratorAz) IsValid(post *collections.Post) bool {
func (generator *Generator) IsValid(post *collections.Post) bool {
if post.Reply != nil && post.Reply.RootURI != post.Reply.ParentURI {
return false
}
if slices.Contains(azValidUsers, post.DID) || // Posts from always-valid users
(slices.Contains(post.Langs, "az") && len(post.Langs) < 3) || // Posts in Azerbaijani language with fewer than 3 languages
if isValidUser := users.IsValid(post.DID); isValidUser != nil {
return *isValidUser
}
if (slices.Contains(post.Langs, "az") && len(post.Langs) < 3) || // Posts in Azerbaijani language with fewer than 3 languages
generator.textRegex.MatchString(post.Text) { // Posts containing Azerbaijan-related keywords
return true
}

27
pkg/generator/az/lists.go Normal file
View File

@@ -0,0 +1,27 @@
package az
import "github.com/aykhans/bsky-feedgen/pkg/generator"
var users = generator.Users{
// Invalid
"did:plc:5zww7zorx2ajw7hqrhuix3ba": false,
"did:plc:c4vhz47h566t2ntgd7gtawen": false,
"did:plc:lc7j7xdq67gn7vc6vzmydfqk": false,
"did:plc:msian4dqa2rqalf3biilnf3m": false,
"did:plc:gtosalycg7snvodjhsze35jm": false,
"did:plc:i53e6y3liw2oaw4s6e6odw5m": false,
"did:plc:pvdqvmpkeermkhy7fezam473": false,
"did:plc:5vwjnzaibnwscbbcvkzhy57v": false,
"did:plc:6mfp3coadoobuvlg6w2avw6x": false,
"did:plc:lm2uhaoqoe6yo76oeihndfyi": false,
// Valid
"did:plc:jbt4qi6psd7rutwzedtecsq7": true,
"did:plc:yzgdpxsklrmfgqmjghdvw3ti": true,
"did:plc:g7ebgiai577ln3avsi2pt3sn": true,
"did:plc:phtq2rhgbwipyx5ie3apw44j": true,
"did:plc:jfdvklrs5n5qv7f25v6swc5h": true,
"did:plc:u5ez5w6qslh6advti4wyddba": true,
"did:plc:cs2cbzojm6hmx5lfxiuft3mq": true,
"did:plc:x7alwnnjygt2aqcwblhazko7": true,
}

14
pkg/generator/base.go Normal file
View File

@@ -0,0 +1,14 @@
package generator
import "github.com/aykhans/bsky-feedgen/pkg/utils"
type Users map[string]bool
func (u Users) IsValid(did string) *bool {
isValid, ok := u[did]
if ok == false {
return nil
}
return utils.ToPtr(isValid)
}

View File

@@ -163,7 +163,8 @@ func (f FeedAzCollection) CutoffByCount(
findOpts := options.Find().
SetSort(bson.D{{Key: "created_at", Value: 1}}).
SetLimit(deleteCount)
SetLimit(deleteCount).
SetProjection(bson.M{"_id": 1})
cursor, err := f.Collection.Find(ctx, bson.M{}, findOpts)
if err != nil {
@@ -171,24 +172,46 @@ func (f FeedAzCollection) CutoffByCount(
}
defer func() { _ = cursor.Close(ctx) }()
var docsToDelete []bson.M
if err = cursor.All(ctx, &docsToDelete); err != nil {
return 0, err
// Process documents in batches to avoid potential memory issues
const batchSize = 10000
var totalDeleted int64 = 0
for {
batch := make([]string, 0, batchSize)
batchCount := 0
for cursor.Next(ctx) && batchCount < batchSize {
var doc struct {
ID string `bson:"_id"`
}
if err = cursor.Decode(&doc); err != nil {
return totalDeleted, err
}
batch = append(batch, doc.ID)
batchCount++
}
if len(batch) == 0 {
break
}
// Delete the batch
result, err := f.Collection.DeleteMany(ctx, bson.M{"_id": bson.M{"$in": batch}})
if err != nil {
return totalDeleted, err
}
totalDeleted += result.DeletedCount
if cursor.Err() != nil {
return totalDeleted, cursor.Err()
}
// If we didn't fill the batch, we're done
if batchCount < batchSize {
break
}
}
if len(docsToDelete) == 0 {
return 0, nil
}
ids := make([]any, len(docsToDelete))
for i := range docsToDelete {
ids[i] = docsToDelete[i]["_id"]
}
result, err := f.Collection.DeleteMany(ctx, bson.M{"_id": bson.M{"$in": ids}})
if err != nil {
return 0, err
}
return result.DeletedCount, nil
return totalDeleted, nil
}

View File

@@ -17,10 +17,15 @@ type PostCollection struct {
func NewPostCollection(client *mongo.Client) (*PostCollection, error) {
client.Database(config.MongoDBBaseDB).Collection("")
coll := client.Database(config.MongoDBBaseDB).Collection("post")
_, err := coll.Indexes().CreateOne(
_, err := coll.Indexes().CreateMany(
context.Background(),
mongo.IndexModel{
Keys: bson.D{{Key: "sequence", Value: -1}},
[]mongo.IndexModel{
{
Keys: bson.D{{Key: "sequence", Value: -1}},
},
{
Keys: bson.D{{Key: "created_at", Value: 1}},
},
},
)
if err != nil {
@@ -71,7 +76,8 @@ func (p PostCollection) CutoffByCount(
findOpts := options.Find().
SetSort(bson.D{{Key: "created_at", Value: 1}}).
SetLimit(deleteCount)
SetLimit(deleteCount).
SetProjection(bson.M{"_id": 1})
cursor, err := p.Collection.Find(ctx, bson.M{}, findOpts)
if err != nil {
@@ -79,26 +85,48 @@ func (p PostCollection) CutoffByCount(
}
defer func() { _ = cursor.Close(ctx) }()
var docsToDelete []bson.M
if err = cursor.All(ctx, &docsToDelete); err != nil {
return 0, err
// Process documents in batches to avoid potential memory issues
const batchSize = 10000
var totalDeleted int64 = 0
for {
batch := make([]string, 0, batchSize)
batchCount := 0
for cursor.Next(ctx) && batchCount < batchSize {
var doc struct {
ID string `bson:"_id"`
}
if err = cursor.Decode(&doc); err != nil {
return totalDeleted, err
}
batch = append(batch, doc.ID)
batchCount++
}
if len(batch) == 0 {
break
}
// Delete the batch
result, err := p.Collection.DeleteMany(ctx, bson.M{"_id": bson.M{"$in": batch}})
if err != nil {
return totalDeleted, err
}
totalDeleted += result.DeletedCount
if cursor.Err() != nil {
return totalDeleted, cursor.Err()
}
// If we didn't fill the batch, we're done
if batchCount < batchSize {
break
}
}
if len(docsToDelete) == 0 {
return 0, nil
}
ids := make([]any, len(docsToDelete))
for i := range docsToDelete {
ids[i] = docsToDelete[i]["_id"]
}
result, err := p.Collection.DeleteMany(ctx, bson.M{"_id": bson.M{"$in": ids}})
if err != nil {
return 0, err
}
return result.DeletedCount, nil
return totalDeleted, nil
}
func (p PostCollection) GetMaxSequence(ctx context.Context) (*int64, error) {

View File

@@ -4,4 +4,5 @@ import "errors"
var (
ErrInternal = errors.New("internal error")
ErrNotfound = errors.New("not found")
)