From c4bd6affa6a2d06b8c9a34ff3a3d5d9ebdeb61d1 Mon Sep 17 00:00:00 2001 From: Aykhan Shahsuvarov Date: Tue, 20 May 2025 23:35:27 +0400 Subject: [PATCH] Fix dead consumer error --- pkg/consumer/base.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/pkg/consumer/base.go b/pkg/consumer/base.go index d3f06c5..349afa1 100644 --- a/pkg/consumer/base.go +++ b/pkg/consumer/base.go @@ -173,11 +173,12 @@ func ConsumeAndSaveToMongoDB( sequenceCursor = nil } + consumerLastFlushingTime := time.Now() go func() { defer cancel() for { err := RunFirehoseConsumer( - ctx, + localCtx, relayHost, func(sequence int64, did syntax.DID, recordKey syntax.RecordKey, post bsky.FeedPost) { firehoseDataChan <- CallbackData{sequence, did, recordKey, post} @@ -186,7 +187,7 @@ func ConsumeAndSaveToMongoDB( ) if err != nil { - if ctx.Err() != nil { + if localCtx.Err() != nil { break } logger.Log.Error(err.Error()) @@ -214,6 +215,7 @@ func ConsumeAndSaveToMongoDB( return nil case <-localCtx.Done(): + logger.Log.Error("inactive firehose consumer error") return nil case data := <-firehoseDataChan: @@ -261,12 +263,18 @@ func ConsumeAndSaveToMongoDB( case <-ticker.C: if len(postBatch) > 0 { - // logger.Log.Info("flushing post batch", "count", len(postBatch)) + consumerLastFlushingTime = time.Now() + logger.Log.Info("flushing post batch", "count", len(postBatch)) err := postCollection.Insert(ctx, true, postBatch...) if err != nil { return fmt.Errorf("mongodb post insert error: %v", err) } postBatch = []*collections.Post{} // Clear batch after insert + } else { + // If we haven't seen any data for 25 seconds, cancel the consumer connection + if consumerLastFlushingTime.Add(time.Second*25).Before(time.Now()) { + cancel() + } } } }