From 16a2a62ba1721d5fdc91b6001459ff52e9e94ab1 Mon Sep 17 00:00:00 2001 From: Aykhan Shahsuvarov Date: Mon, 19 May 2025 16:59:50 +0400 Subject: [PATCH] Optimize mongodb collections Cutoff methods --- pkg/storage/mongodb/collections/feed_az.go | 61 ++++++++++++------ pkg/storage/mongodb/collections/post.go | 72 +++++++++++++++------- 2 files changed, 92 insertions(+), 41 deletions(-) diff --git a/pkg/storage/mongodb/collections/feed_az.go b/pkg/storage/mongodb/collections/feed_az.go index 94e644b..dfdb6ed 100644 --- a/pkg/storage/mongodb/collections/feed_az.go +++ b/pkg/storage/mongodb/collections/feed_az.go @@ -163,7 +163,8 @@ func (f FeedAzCollection) CutoffByCount( findOpts := options.Find(). SetSort(bson.D{{Key: "created_at", Value: 1}}). - SetLimit(deleteCount) + SetLimit(deleteCount). + SetProjection(bson.M{"_id": 1}) cursor, err := f.Collection.Find(ctx, bson.M{}, findOpts) if err != nil { @@ -171,24 +172,46 @@ func (f FeedAzCollection) CutoffByCount( } defer func() { _ = cursor.Close(ctx) }() - var docsToDelete []bson.M - if err = cursor.All(ctx, &docsToDelete); err != nil { - return 0, err + // Process documents in batches to avoid potential memory issues + const batchSize = 10000 + var totalDeleted int64 = 0 + + for { + batch := make([]string, 0, batchSize) + batchCount := 0 + + for cursor.Next(ctx) && batchCount < batchSize { + var doc struct { + ID string `bson:"_id"` + } + if err = cursor.Decode(&doc); err != nil { + return totalDeleted, err + } + batch = append(batch, doc.ID) + batchCount++ + } + + if len(batch) == 0 { + break + } + + // Delete the batch + result, err := f.Collection.DeleteMany(ctx, bson.M{"_id": bson.M{"$in": batch}}) + if err != nil { + return totalDeleted, err + } + + totalDeleted += result.DeletedCount + + if cursor.Err() != nil { + return totalDeleted, cursor.Err() + } + + // If we didn't fill the batch, we're done + if batchCount < batchSize { + break + } } - if len(docsToDelete) == 0 { - return 0, nil - } - - ids := make([]any, len(docsToDelete)) - for i := range docsToDelete { - ids[i] = docsToDelete[i]["_id"] - } - - result, err := f.Collection.DeleteMany(ctx, bson.M{"_id": bson.M{"$in": ids}}) - if err != nil { - return 0, err - } - - return result.DeletedCount, nil + return totalDeleted, nil } diff --git a/pkg/storage/mongodb/collections/post.go b/pkg/storage/mongodb/collections/post.go index d5371db..a0b65d1 100644 --- a/pkg/storage/mongodb/collections/post.go +++ b/pkg/storage/mongodb/collections/post.go @@ -17,10 +17,15 @@ type PostCollection struct { func NewPostCollection(client *mongo.Client) (*PostCollection, error) { client.Database(config.MongoDBBaseDB).Collection("") coll := client.Database(config.MongoDBBaseDB).Collection("post") - _, err := coll.Indexes().CreateOne( + _, err := coll.Indexes().CreateMany( context.Background(), - mongo.IndexModel{ - Keys: bson.D{{Key: "sequence", Value: -1}}, + []mongo.IndexModel{ + { + Keys: bson.D{{Key: "sequence", Value: -1}}, + }, + { + Keys: bson.D{{Key: "created_at", Value: 1}}, + }, }, ) if err != nil { @@ -71,7 +76,8 @@ func (p PostCollection) CutoffByCount( findOpts := options.Find(). SetSort(bson.D{{Key: "created_at", Value: 1}}). - SetLimit(deleteCount) + SetLimit(deleteCount). + SetProjection(bson.M{"_id": 1}) cursor, err := p.Collection.Find(ctx, bson.M{}, findOpts) if err != nil { @@ -79,26 +85,48 @@ func (p PostCollection) CutoffByCount( } defer func() { _ = cursor.Close(ctx) }() - var docsToDelete []bson.M - if err = cursor.All(ctx, &docsToDelete); err != nil { - return 0, err + // Process documents in batches to avoid potential memory issues + const batchSize = 10000 + var totalDeleted int64 = 0 + + for { + batch := make([]string, 0, batchSize) + batchCount := 0 + + for cursor.Next(ctx) && batchCount < batchSize { + var doc struct { + ID string `bson:"_id"` + } + if err = cursor.Decode(&doc); err != nil { + return totalDeleted, err + } + batch = append(batch, doc.ID) + batchCount++ + } + + if len(batch) == 0 { + break + } + + // Delete the batch + result, err := p.Collection.DeleteMany(ctx, bson.M{"_id": bson.M{"$in": batch}}) + if err != nil { + return totalDeleted, err + } + + totalDeleted += result.DeletedCount + + if cursor.Err() != nil { + return totalDeleted, cursor.Err() + } + + // If we didn't fill the batch, we're done + if batchCount < batchSize { + break + } } - if len(docsToDelete) == 0 { - return 0, nil - } - - ids := make([]any, len(docsToDelete)) - for i := range docsToDelete { - ids[i] = docsToDelete[i]["_id"] - } - - result, err := p.Collection.DeleteMany(ctx, bson.M{"_id": bson.M{"$in": ids}}) - if err != nil { - return 0, err - } - - return result.DeletedCount, nil + return totalDeleted, nil } func (p PostCollection) GetMaxSequence(ctx context.Context) (*int64, error) {