Fix dead consumer error

This commit is contained in:
Aykhan Shahsuvarov 2025-05-20 23:35:27 +04:00
parent d8dd2c75a6
commit c4bd6affa6

View File

@ -173,11 +173,12 @@ func ConsumeAndSaveToMongoDB(
sequenceCursor = nil sequenceCursor = nil
} }
consumerLastFlushingTime := time.Now()
go func() { go func() {
defer cancel() defer cancel()
for { for {
err := RunFirehoseConsumer( err := RunFirehoseConsumer(
ctx, localCtx,
relayHost, relayHost,
func(sequence int64, did syntax.DID, recordKey syntax.RecordKey, post bsky.FeedPost) { func(sequence int64, did syntax.DID, recordKey syntax.RecordKey, post bsky.FeedPost) {
firehoseDataChan <- CallbackData{sequence, did, recordKey, post} firehoseDataChan <- CallbackData{sequence, did, recordKey, post}
@ -186,7 +187,7 @@ func ConsumeAndSaveToMongoDB(
) )
if err != nil { if err != nil {
if ctx.Err() != nil { if localCtx.Err() != nil {
break break
} }
logger.Log.Error(err.Error()) logger.Log.Error(err.Error())
@ -214,6 +215,7 @@ func ConsumeAndSaveToMongoDB(
return nil return nil
case <-localCtx.Done(): case <-localCtx.Done():
logger.Log.Error("inactive firehose consumer error")
return nil return nil
case data := <-firehoseDataChan: case data := <-firehoseDataChan:
@ -261,12 +263,18 @@ func ConsumeAndSaveToMongoDB(
case <-ticker.C: case <-ticker.C:
if len(postBatch) > 0 { if len(postBatch) > 0 {
// logger.Log.Info("flushing post batch", "count", len(postBatch)) consumerLastFlushingTime = time.Now()
logger.Log.Info("flushing post batch", "count", len(postBatch))
err := postCollection.Insert(ctx, true, postBatch...) err := postCollection.Insert(ctx, true, postBatch...)
if err != nil { if err != nil {
return fmt.Errorf("mongodb post insert error: %v", err) return fmt.Errorf("mongodb post insert error: %v", err)
} }
postBatch = []*collections.Post{} // Clear batch after insert postBatch = []*collections.Post{} // Clear batch after insert
} else {
// If we haven't seen any data for 25 seconds, cancel the consumer connection
if consumerLastFlushingTime.Add(time.Second*25).Before(time.Now()) {
cancel()
}
} }
} }
} }