diff --git a/topic.go b/topic.go index 3a65052..dd094ea 100644 --- a/topic.go +++ b/topic.go @@ -349,8 +349,14 @@ func (t *Topic) validate(ctx context.Context, data []byte, opts ...PubOpt) (*Mes } msg := &Message{m, "", t.p.host.ID(), pub.validatorData, pub.local} - t.p.eval <- func() { + select { + case t.p.eval <- func() { t.p.rt.Preprocess(t.p.host.ID(), []*Message{msg}) + }: + case <-t.p.ctx.Done(): + return nil, t.p.ctx.Err() + case <-ctx.Done(): + return nil, ctx.Err() } err := t.p.val.ValidateLocal(msg) if err != nil { diff --git a/topic_test.go b/topic_test.go index ef05feb..aa96cf5 100644 --- a/topic_test.go +++ b/topic_test.go @@ -951,6 +951,23 @@ func TestTopicPublishWithKeyInvalidParameters(t *testing.T) { }) } +func TestTopicPublishWithContextCanceled(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const topic = "foobar" + const numHosts = 5 + + hosts := getDefaultHosts(t, numHosts) + topics := getTopics(getPubsubs(ctx, hosts), topic) + cancel() + + err := topics[0].Publish(ctx, []byte("buff")) + if err != context.Canceled { + t.Fatal("error should have been of type context.Canceled", err) + } +} + func TestTopicRelayPublishWithKey(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel()