From ab876fc71c34e89a7f0c8f4e361720ca9fa8588a Mon Sep 17 00:00:00 2001 From: Dat Duong Date: Fri, 22 Aug 2025 07:01:05 +0700 Subject: [PATCH] fix: Select ctx.Done() when preprocessing to avoid blocking on cancel (#635) Close #636 The PR updates the send logic to use a select with ctx.Done() and t.p.ctx.Done(), ensuring the operation terminates gracefully. A test case reproducing the issue is included in the PR for verification. --- topic.go | 8 +++++++- topic_test.go | 17 +++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/topic.go b/topic.go index 3a65052..dd094ea 100644 --- a/topic.go +++ b/topic.go @@ -349,8 +349,14 @@ func (t *Topic) validate(ctx context.Context, data []byte, opts ...PubOpt) (*Mes } msg := &Message{m, "", t.p.host.ID(), pub.validatorData, pub.local} - t.p.eval <- func() { + select { + case t.p.eval <- func() { t.p.rt.Preprocess(t.p.host.ID(), []*Message{msg}) + }: + case <-t.p.ctx.Done(): + return nil, t.p.ctx.Err() + case <-ctx.Done(): + return nil, ctx.Err() } err := t.p.val.ValidateLocal(msg) if err != nil { diff --git a/topic_test.go b/topic_test.go index ef05feb..aa96cf5 100644 --- a/topic_test.go +++ b/topic_test.go @@ -951,6 +951,23 @@ func TestTopicPublishWithKeyInvalidParameters(t *testing.T) { }) } +func TestTopicPublishWithContextCanceled(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const topic = "foobar" + const numHosts = 5 + + hosts := getDefaultHosts(t, numHosts) + topics := getTopics(getPubsubs(ctx, hosts), topic) + cancel() + + err := topics[0].Publish(ctx, []byte("buff")) + if err != context.Canceled { + t.Fatal("error should have been of type context.Canceled", err) + } +} + func TestTopicRelayPublishWithKey(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel()