Compare commits

..

No commits in common. "c4bd6affa6a2d06b8c9a34ff3a3d5d9ebdeb61d1" and "588cfc0fccbb35579f81cf676fb6d791817d9013" have entirely different histories.

2 changed files with 3 additions and 20 deletions

View File

@ -173,12 +173,11 @@ func ConsumeAndSaveToMongoDB(
sequenceCursor = nil
}
consumerLastFlushingTime := time.Now()
go func() {
defer cancel()
for {
err := RunFirehoseConsumer(
localCtx,
ctx,
relayHost,
func(sequence int64, did syntax.DID, recordKey syntax.RecordKey, post bsky.FeedPost) {
firehoseDataChan <- CallbackData{sequence, did, recordKey, post}
@ -187,7 +186,7 @@ func ConsumeAndSaveToMongoDB(
)
if err != nil {
if localCtx.Err() != nil {
if ctx.Err() != nil {
break
}
logger.Log.Error(err.Error())
@ -215,7 +214,6 @@ func ConsumeAndSaveToMongoDB(
return nil
case <-localCtx.Done():
logger.Log.Error("inactive firehose consumer error")
return nil
case data := <-firehoseDataChan:
@ -263,18 +261,12 @@ func ConsumeAndSaveToMongoDB(
case <-ticker.C:
if len(postBatch) > 0 {
consumerLastFlushingTime = time.Now()
logger.Log.Info("flushing post batch", "count", len(postBatch))
// logger.Log.Info("flushing post batch", "count", len(postBatch))
err := postCollection.Insert(ctx, true, postBatch...)
if err != nil {
return fmt.Errorf("mongodb post insert error: %v", err)
}
postBatch = []*collections.Post{} // Clear batch after insert
} else {
// If we haven't seen any data for 25 seconds, cancel the consumer connection
if consumerLastFlushingTime.Add(time.Second*25).Before(time.Now()) {
cancel()
}
}
}
}

View File

@ -13,11 +13,6 @@ import (
"go.mongodb.org/mongo-driver/mongo/options"
)
var azInvalidUser []string = []string{
"did:plc:5zww7zorx2ajw7hqrhuix3ba",
"did:plc:c4vhz47h566t2ntgd7gtawen",
}
var azValidUsers []string = []string{
"did:plc:jbt4qi6psd7rutwzedtecsq7",
"did:plc:yzgdpxsklrmfgqmjghdvw3ti",
@ -129,10 +124,6 @@ func (generator *FeedGeneratorAz) IsValid(post *collections.Post) bool {
return false
}
if slices.Contains(azInvalidUser, post.DID) {
return false
}
if slices.Contains(azValidUsers, post.DID) || // Posts from always-valid users
(slices.Contains(post.Langs, "az") && len(post.Langs) < 3) || // Posts in Azerbaijani language with fewer than 3 languages
generator.textRegex.MatchString(post.Text) { // Posts containing Azerbaijan-related keywords