mirror of
https://github.com/aykhans/bsky-feedgen.git
synced 2025-06-05 04:12:03 +00:00
Compare commits
3 Commits
588cfc0fcc
...
c4bd6affa6
Author | SHA1 | Date | |
---|---|---|---|
c4bd6affa6 | |||
d8dd2c75a6 | |||
74b8324b6f |
@ -173,11 +173,12 @@ func ConsumeAndSaveToMongoDB(
|
|||||||
sequenceCursor = nil
|
sequenceCursor = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
consumerLastFlushingTime := time.Now()
|
||||||
go func() {
|
go func() {
|
||||||
defer cancel()
|
defer cancel()
|
||||||
for {
|
for {
|
||||||
err := RunFirehoseConsumer(
|
err := RunFirehoseConsumer(
|
||||||
ctx,
|
localCtx,
|
||||||
relayHost,
|
relayHost,
|
||||||
func(sequence int64, did syntax.DID, recordKey syntax.RecordKey, post bsky.FeedPost) {
|
func(sequence int64, did syntax.DID, recordKey syntax.RecordKey, post bsky.FeedPost) {
|
||||||
firehoseDataChan <- CallbackData{sequence, did, recordKey, post}
|
firehoseDataChan <- CallbackData{sequence, did, recordKey, post}
|
||||||
@ -186,7 +187,7 @@ func ConsumeAndSaveToMongoDB(
|
|||||||
)
|
)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if ctx.Err() != nil {
|
if localCtx.Err() != nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
logger.Log.Error(err.Error())
|
logger.Log.Error(err.Error())
|
||||||
@ -214,6 +215,7 @@ func ConsumeAndSaveToMongoDB(
|
|||||||
return nil
|
return nil
|
||||||
|
|
||||||
case <-localCtx.Done():
|
case <-localCtx.Done():
|
||||||
|
logger.Log.Error("inactive firehose consumer error")
|
||||||
return nil
|
return nil
|
||||||
|
|
||||||
case data := <-firehoseDataChan:
|
case data := <-firehoseDataChan:
|
||||||
@ -261,12 +263,18 @@ func ConsumeAndSaveToMongoDB(
|
|||||||
|
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
if len(postBatch) > 0 {
|
if len(postBatch) > 0 {
|
||||||
// logger.Log.Info("flushing post batch", "count", len(postBatch))
|
consumerLastFlushingTime = time.Now()
|
||||||
|
logger.Log.Info("flushing post batch", "count", len(postBatch))
|
||||||
err := postCollection.Insert(ctx, true, postBatch...)
|
err := postCollection.Insert(ctx, true, postBatch...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("mongodb post insert error: %v", err)
|
return fmt.Errorf("mongodb post insert error: %v", err)
|
||||||
}
|
}
|
||||||
postBatch = []*collections.Post{} // Clear batch after insert
|
postBatch = []*collections.Post{} // Clear batch after insert
|
||||||
|
} else {
|
||||||
|
// If we haven't seen any data for 25 seconds, cancel the consumer connection
|
||||||
|
if consumerLastFlushingTime.Add(time.Second*25).Before(time.Now()) {
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -13,6 +13,11 @@ import (
|
|||||||
"go.mongodb.org/mongo-driver/mongo/options"
|
"go.mongodb.org/mongo-driver/mongo/options"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var azInvalidUser []string = []string{
|
||||||
|
"did:plc:5zww7zorx2ajw7hqrhuix3ba",
|
||||||
|
"did:plc:c4vhz47h566t2ntgd7gtawen",
|
||||||
|
}
|
||||||
|
|
||||||
var azValidUsers []string = []string{
|
var azValidUsers []string = []string{
|
||||||
"did:plc:jbt4qi6psd7rutwzedtecsq7",
|
"did:plc:jbt4qi6psd7rutwzedtecsq7",
|
||||||
"did:plc:yzgdpxsklrmfgqmjghdvw3ti",
|
"did:plc:yzgdpxsklrmfgqmjghdvw3ti",
|
||||||
@ -124,6 +129,10 @@ func (generator *FeedGeneratorAz) IsValid(post *collections.Post) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if slices.Contains(azInvalidUser, post.DID) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
if slices.Contains(azValidUsers, post.DID) || // Posts from always-valid users
|
if slices.Contains(azValidUsers, post.DID) || // Posts from always-valid users
|
||||||
(slices.Contains(post.Langs, "az") && len(post.Langs) < 3) || // Posts in Azerbaijani language with fewer than 3 languages
|
(slices.Contains(post.Langs, "az") && len(post.Langs) < 3) || // Posts in Azerbaijani language with fewer than 3 languages
|
||||||
generator.textRegex.MatchString(post.Text) { // Posts containing Azerbaijan-related keywords
|
generator.textRegex.MatchString(post.Text) { // Posts containing Azerbaijan-related keywords
|
||||||
|
Loading…
x
Reference in New Issue
Block a user