From 899f9cd62ba3850590439a0749b585171deb547a Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Fri, 1 Nov 2019 17:12:23 -0400 Subject: [PATCH] fixed Topic close error --- topic.go | 1 + topic_test.go | 79 +++++++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 71 insertions(+), 9 deletions(-) diff --git a/topic.go b/topic.go index 2ccc4a4..bbaa448 100644 --- a/topic.go +++ b/topic.go @@ -23,6 +23,7 @@ type Topic struct { // Multiple event handlers may be created and will operate independently of each other func (t *Topic) EventHandler(opts ...TopicEventHandlerOpt) (*TopicEventHandler, error) { h := &TopicEventHandler{ + topic: t, err: nil, evtLog: make(map[peer.ID]EventType), diff --git a/topic_test.go b/topic_test.go index 8ea6d1e..da2b9d3 100644 --- a/topic_test.go +++ b/topic_test.go @@ -38,7 +38,39 @@ func getTopicEvts(topics []*Topic, opts ...TopicEventHandlerOpt) []*TopicEventHa return handlers } -func TestTopicClose(t *testing.T) { +func TestTopicCloseWithOpenSubscription(t *testing.T) { + var sub *Subscription + var err error + testTopicCloseWithOpenResource(t, + func (topic *Topic) { + sub , err = topic.Subscribe() + if err != nil { + t.Fatal(err) + } + }, + func (){ + sub.Cancel() + }, + ) +} + +func TestTopicCloseWithOpenEventHandler(t *testing.T) { + var evts *TopicEventHandler + var err error + testTopicCloseWithOpenResource(t, + func (topic *Topic) { + evts , err = topic.EventHandler() + if err != nil { + t.Fatal(err) + } + }, + func (){ + evts.Cancel() + }, + ) +} + +func testTopicCloseWithOpenResource(t *testing.T, openResource func(topic *Topic), closeResource func()) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -57,23 +89,20 @@ func TestTopicClose(t *testing.T) { t.Fatal(err) } - // Try create and cancel topic while there's an outstanding subscription + // Try create and cancel topic while there's an outstanding subscription/event handler topic, err = ps.Join(topicID) if err != nil { t.Fatal(err) } - sub, err := topic.Subscribe() - if err != nil { - t.Fatal(err) - } + openResource(topic) if err := topic.Close(); err == nil { - t.Fatal("expected an error closing a topic with an open subscription") + t.Fatal("expected an error closing a topic with an open resource") } - // Check if the topic closes properly after canceling the outstanding subscription - sub.Cancel() + // Check if the topic closes properly after closing the resource + closeResource() time.Sleep(time.Millisecond * 100) if err := topic.Close(); err != nil { @@ -81,6 +110,38 @@ func TestTopicClose(t *testing.T) { } } +func TestTopicEventHandlerCancel(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const numHosts = 5 + topicID := "foobar" + hosts := getNetHosts(t, ctx, numHosts) + ps := getPubsub(ctx, hosts[0]) + + // Try create and cancel topic + topic, err := ps.Join(topicID) + if err != nil { + t.Fatal(err) + } + + evts, err := topic.EventHandler() + if err != nil { + t.Fatal(err) + } + evts.Cancel() + timeoutCtx, timeoutCancel := context.WithTimeout(ctx, time.Second * 2) + defer timeoutCancel() + connectAll(t, hosts) + _, err = evts.NextPeerEvent(timeoutCtx) + if err != context.DeadlineExceeded { + if err != nil { + t.Fatal(err) + } + t.Fatal("received event after cancel") + } +} + func TestSubscriptionJoinNotification(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel()