fix: Select ctx.Done() when preprocessing to avoid blocking on cancel (#635)

Close #636 

The PR updates the send logic to use a select with ctx.Done() and
t.p.ctx.Done(), ensuring the operation terminates gracefully. A test
case reproducing the issue is included in the PR for verification.
This commit is contained in:
Dat Duong 2025-08-22 07:01:05 +07:00 committed by GitHub
parent ee9c8434f9
commit ab876fc71c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 24 additions and 1 deletions

View File

@ -349,8 +349,14 @@ func (t *Topic) validate(ctx context.Context, data []byte, opts ...PubOpt) (*Mes
}
msg := &Message{m, "", t.p.host.ID(), pub.validatorData, pub.local}
t.p.eval <- func() {
select {
case t.p.eval <- func() {
t.p.rt.Preprocess(t.p.host.ID(), []*Message{msg})
}:
case <-t.p.ctx.Done():
return nil, t.p.ctx.Err()
case <-ctx.Done():
return nil, ctx.Err()
}
err := t.p.val.ValidateLocal(msg)
if err != nil {

View File

@ -951,6 +951,23 @@ func TestTopicPublishWithKeyInvalidParameters(t *testing.T) {
})
}
func TestTopicPublishWithContextCanceled(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
const topic = "foobar"
const numHosts = 5
hosts := getDefaultHosts(t, numHosts)
topics := getTopics(getPubsubs(ctx, hosts), topic)
cancel()
err := topics[0].Publish(ctx, []byte("buff"))
if err != context.Canceled {
t.Fatal("error should have been of type context.Canceled", err)
}
}
func TestTopicRelayPublishWithKey(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()