mirror of
https://github.com/aykhans/bsky-feedgen.git
synced 2025-06-03 03:26:42 +00:00
Compare commits
3 Commits
e6fec752f9
...
16a2a62ba1
Author | SHA1 | Date | |
---|---|---|---|
16a2a62ba1 | |||
259c139d92 | |||
c0d03ba341 |
@ -12,6 +12,7 @@ import (
|
|||||||
"github.com/aykhans/bsky-feedgen/pkg/logger"
|
"github.com/aykhans/bsky-feedgen/pkg/logger"
|
||||||
"github.com/aykhans/bsky-feedgen/pkg/storage/mongodb"
|
"github.com/aykhans/bsky-feedgen/pkg/storage/mongodb"
|
||||||
"github.com/aykhans/bsky-feedgen/pkg/storage/mongodb/collections"
|
"github.com/aykhans/bsky-feedgen/pkg/storage/mongodb/collections"
|
||||||
|
_ "go.uber.org/automaxprocs"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
@ -17,6 +17,7 @@ import (
|
|||||||
"github.com/aykhans/bsky-feedgen/pkg/logger"
|
"github.com/aykhans/bsky-feedgen/pkg/logger"
|
||||||
"github.com/aykhans/bsky-feedgen/pkg/storage/mongodb"
|
"github.com/aykhans/bsky-feedgen/pkg/storage/mongodb"
|
||||||
"github.com/aykhans/bsky-feedgen/pkg/storage/mongodb/collections"
|
"github.com/aykhans/bsky-feedgen/pkg/storage/mongodb/collections"
|
||||||
|
_ "go.uber.org/automaxprocs"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
@ -17,6 +17,7 @@ import (
|
|||||||
"github.com/aykhans/bsky-feedgen/pkg/logger"
|
"github.com/aykhans/bsky-feedgen/pkg/logger"
|
||||||
"github.com/aykhans/bsky-feedgen/pkg/storage/mongodb"
|
"github.com/aykhans/bsky-feedgen/pkg/storage/mongodb"
|
||||||
"github.com/aykhans/bsky-feedgen/pkg/storage/mongodb/collections"
|
"github.com/aykhans/bsky-feedgen/pkg/storage/mongodb/collections"
|
||||||
|
_ "go.uber.org/automaxprocs"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
2
go.mod
2
go.mod
@ -8,6 +8,8 @@ require (
|
|||||||
go.mongodb.org/mongo-driver v1.17.3
|
go.mongodb.org/mongo-driver v1.17.3
|
||||||
)
|
)
|
||||||
|
|
||||||
|
require go.uber.org/automaxprocs v1.6.0 // indirect
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b // indirect
|
github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b // indirect
|
||||||
github.com/beorn7/perks v1.0.1 // indirect
|
github.com/beorn7/perks v1.0.1 // indirect
|
||||||
|
2
go.sum
2
go.sum
@ -330,6 +330,8 @@ go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
|
|||||||
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
|
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
|
||||||
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
|
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
|
||||||
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
|
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
|
||||||
|
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
|
||||||
|
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
|
||||||
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
|
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
|
||||||
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
|
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
|
||||||
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
|
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
|
||||||
|
@ -163,7 +163,8 @@ func (f FeedAzCollection) CutoffByCount(
|
|||||||
|
|
||||||
findOpts := options.Find().
|
findOpts := options.Find().
|
||||||
SetSort(bson.D{{Key: "created_at", Value: 1}}).
|
SetSort(bson.D{{Key: "created_at", Value: 1}}).
|
||||||
SetLimit(deleteCount)
|
SetLimit(deleteCount).
|
||||||
|
SetProjection(bson.M{"_id": 1})
|
||||||
|
|
||||||
cursor, err := f.Collection.Find(ctx, bson.M{}, findOpts)
|
cursor, err := f.Collection.Find(ctx, bson.M{}, findOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -171,24 +172,46 @@ func (f FeedAzCollection) CutoffByCount(
|
|||||||
}
|
}
|
||||||
defer func() { _ = cursor.Close(ctx) }()
|
defer func() { _ = cursor.Close(ctx) }()
|
||||||
|
|
||||||
var docsToDelete []bson.M
|
// Process documents in batches to avoid potential memory issues
|
||||||
if err = cursor.All(ctx, &docsToDelete); err != nil {
|
const batchSize = 10000
|
||||||
return 0, err
|
var totalDeleted int64 = 0
|
||||||
|
|
||||||
|
for {
|
||||||
|
batch := make([]string, 0, batchSize)
|
||||||
|
batchCount := 0
|
||||||
|
|
||||||
|
for cursor.Next(ctx) && batchCount < batchSize {
|
||||||
|
var doc struct {
|
||||||
|
ID string `bson:"_id"`
|
||||||
|
}
|
||||||
|
if err = cursor.Decode(&doc); err != nil {
|
||||||
|
return totalDeleted, err
|
||||||
|
}
|
||||||
|
batch = append(batch, doc.ID)
|
||||||
|
batchCount++
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(batch) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete the batch
|
||||||
|
result, err := f.Collection.DeleteMany(ctx, bson.M{"_id": bson.M{"$in": batch}})
|
||||||
|
if err != nil {
|
||||||
|
return totalDeleted, err
|
||||||
|
}
|
||||||
|
|
||||||
|
totalDeleted += result.DeletedCount
|
||||||
|
|
||||||
|
if cursor.Err() != nil {
|
||||||
|
return totalDeleted, cursor.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we didn't fill the batch, we're done
|
||||||
|
if batchCount < batchSize {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(docsToDelete) == 0 {
|
return totalDeleted, nil
|
||||||
return 0, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
ids := make([]any, len(docsToDelete))
|
|
||||||
for i := range docsToDelete {
|
|
||||||
ids[i] = docsToDelete[i]["_id"]
|
|
||||||
}
|
|
||||||
|
|
||||||
result, err := f.Collection.DeleteMany(ctx, bson.M{"_id": bson.M{"$in": ids}})
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return result.DeletedCount, nil
|
|
||||||
}
|
}
|
||||||
|
@ -17,10 +17,15 @@ type PostCollection struct {
|
|||||||
func NewPostCollection(client *mongo.Client) (*PostCollection, error) {
|
func NewPostCollection(client *mongo.Client) (*PostCollection, error) {
|
||||||
client.Database(config.MongoDBBaseDB).Collection("")
|
client.Database(config.MongoDBBaseDB).Collection("")
|
||||||
coll := client.Database(config.MongoDBBaseDB).Collection("post")
|
coll := client.Database(config.MongoDBBaseDB).Collection("post")
|
||||||
_, err := coll.Indexes().CreateOne(
|
_, err := coll.Indexes().CreateMany(
|
||||||
context.Background(),
|
context.Background(),
|
||||||
mongo.IndexModel{
|
[]mongo.IndexModel{
|
||||||
Keys: bson.D{{Key: "sequence", Value: -1}},
|
{
|
||||||
|
Keys: bson.D{{Key: "sequence", Value: -1}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Keys: bson.D{{Key: "created_at", Value: 1}},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -71,7 +76,8 @@ func (p PostCollection) CutoffByCount(
|
|||||||
|
|
||||||
findOpts := options.Find().
|
findOpts := options.Find().
|
||||||
SetSort(bson.D{{Key: "created_at", Value: 1}}).
|
SetSort(bson.D{{Key: "created_at", Value: 1}}).
|
||||||
SetLimit(deleteCount)
|
SetLimit(deleteCount).
|
||||||
|
SetProjection(bson.M{"_id": 1})
|
||||||
|
|
||||||
cursor, err := p.Collection.Find(ctx, bson.M{}, findOpts)
|
cursor, err := p.Collection.Find(ctx, bson.M{}, findOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -79,26 +85,48 @@ func (p PostCollection) CutoffByCount(
|
|||||||
}
|
}
|
||||||
defer func() { _ = cursor.Close(ctx) }()
|
defer func() { _ = cursor.Close(ctx) }()
|
||||||
|
|
||||||
var docsToDelete []bson.M
|
// Process documents in batches to avoid potential memory issues
|
||||||
if err = cursor.All(ctx, &docsToDelete); err != nil {
|
const batchSize = 10000
|
||||||
return 0, err
|
var totalDeleted int64 = 0
|
||||||
|
|
||||||
|
for {
|
||||||
|
batch := make([]string, 0, batchSize)
|
||||||
|
batchCount := 0
|
||||||
|
|
||||||
|
for cursor.Next(ctx) && batchCount < batchSize {
|
||||||
|
var doc struct {
|
||||||
|
ID string `bson:"_id"`
|
||||||
|
}
|
||||||
|
if err = cursor.Decode(&doc); err != nil {
|
||||||
|
return totalDeleted, err
|
||||||
|
}
|
||||||
|
batch = append(batch, doc.ID)
|
||||||
|
batchCount++
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(batch) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete the batch
|
||||||
|
result, err := p.Collection.DeleteMany(ctx, bson.M{"_id": bson.M{"$in": batch}})
|
||||||
|
if err != nil {
|
||||||
|
return totalDeleted, err
|
||||||
|
}
|
||||||
|
|
||||||
|
totalDeleted += result.DeletedCount
|
||||||
|
|
||||||
|
if cursor.Err() != nil {
|
||||||
|
return totalDeleted, cursor.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we didn't fill the batch, we're done
|
||||||
|
if batchCount < batchSize {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(docsToDelete) == 0 {
|
return totalDeleted, nil
|
||||||
return 0, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
ids := make([]any, len(docsToDelete))
|
|
||||||
for i := range docsToDelete {
|
|
||||||
ids[i] = docsToDelete[i]["_id"]
|
|
||||||
}
|
|
||||||
|
|
||||||
result, err := p.Collection.DeleteMany(ctx, bson.M{"_id": bson.M{"$in": ids}})
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return result.DeletedCount, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p PostCollection) GetMaxSequence(ctx context.Context) (*int64, error) {
|
func (p PostCollection) GetMaxSequence(ctx context.Context) (*int64, error) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user