From ad97d9bf1768426436864f1ca1bf74b7c39a56ef Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Fri, 1 Nov 2019 17:18:39 -0400 Subject: [PATCH] fixed closed Topic handles still being able to perform some actions on the topic --- discovery_test.go | 15 +++++++ topic.go | 47 +++++++++++++++++++- topic_test.go | 109 +++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 162 insertions(+), 9 deletions(-) diff --git a/discovery_test.go b/discovery_test.go index 59e5181..0444c54 100644 --- a/discovery_test.go +++ b/discovery_test.go @@ -108,6 +108,21 @@ func (d *mockDiscoveryClient) FindPeers(ctx context.Context, ns string, opts ... return d.server.FindPeers(ns, options.Limit) } +type dummyDiscovery struct{} + +func (d *dummyDiscovery) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) { + return time.Hour, nil +} + +func (d *dummyDiscovery) FindPeers(ctx context.Context, ns string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) { + retCh := make(chan peer.AddrInfo) + go func() { + time.Sleep(time.Second) + close(retCh) + }() + return retCh, nil +} + func TestSimpleDiscovery(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/topic.go b/topic.go index bbaa448..651ae11 100644 --- a/topic.go +++ b/topic.go @@ -2,6 +2,7 @@ package pubsub import ( "context" + "errors" "fmt" "sync" @@ -10,6 +11,9 @@ import ( "github.com/libp2p/go-libp2p-core/peer" ) +// ErrTopicClosed is returned if a Topic is utilized after it has been closed +var ErrTopicClosed = errors.New("this Topic is closed, try opening a new one") + // Topic is the handle for a pubsub topic type Topic struct { p *PubSub @@ -17,14 +21,23 @@ type Topic struct { evtHandlerMux sync.RWMutex evtHandlers map[*TopicEventHandler]struct{} + + mux sync.RWMutex + closed bool } // EventHandler creates a handle for topic specific events // Multiple event handlers may be created and will operate independently of each other func (t *Topic) EventHandler(opts ...TopicEventHandlerOpt) (*TopicEventHandler, error) { + t.mux.RLock() + defer t.mux.RUnlock() + if t.closed { + return nil, ErrTopicClosed + } + h := &TopicEventHandler{ topic: t, - err: nil, + err: nil, evtLog: make(map[peer.ID]EventType), evtLogCh: make(chan struct{}, 1), @@ -68,6 +81,12 @@ func (t *Topic) sendNotification(evt PeerEvent) { // Note that subscription is not an instanteneous operation. It may take some time // before the subscription is processed by the pubsub main loop and propagated to our peers. func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error) { + t.mux.RLock() + defer t.mux.RUnlock() + if t.closed { + return nil, ErrTopicClosed + } + sub := &Subscription{ topic: t.topic, ch: make(chan *Message, 32), @@ -104,6 +123,12 @@ type PubOpt func(pub *PublishOptions) error // Publish publishes data to topic. func (t *Topic) Publish(ctx context.Context, data []byte, opts ...PubOpt) error { + t.mux.RLock() + defer t.mux.RUnlock() + if t.closed { + return ErrTopicClosed + } + seqno := t.p.nextSeqno() id := t.p.host.ID() m := &pb.Message{ @@ -149,13 +174,31 @@ func WithReadiness(ready RouterReady) PubOpt { // Close closes down the topic. Will return an error unless there are no active event handlers or subscriptions. // Does not error if the topic is already closed. func (t *Topic) Close() error { + t.mux.Lock() + defer t.mux.Unlock() + if t.closed { + return nil + } + req := &rmTopicReq{t, make(chan error, 1)} t.p.rmTopic <- req - return <-req.resp + err := <-req.resp + + if err == nil { + t.closed = true + } + + return err } // ListPeers returns a list of peers we are connected to in the given topic. func (t *Topic) ListPeers() []peer.ID { + t.mux.RLock() + defer t.mux.RUnlock() + if t.closed { + return []peer.ID{} + } + return t.p.ListPeers(t.topic) } diff --git a/topic_test.go b/topic_test.go index da2b9d3..1361817 100644 --- a/topic_test.go +++ b/topic_test.go @@ -1,6 +1,7 @@ package pubsub import ( + "bytes" "context" "fmt" "sync" @@ -42,13 +43,13 @@ func TestTopicCloseWithOpenSubscription(t *testing.T) { var sub *Subscription var err error testTopicCloseWithOpenResource(t, - func (topic *Topic) { - sub , err = topic.Subscribe() + func(topic *Topic) { + sub, err = topic.Subscribe() if err != nil { t.Fatal(err) } }, - func (){ + func() { sub.Cancel() }, ) @@ -58,13 +59,13 @@ func TestTopicCloseWithOpenEventHandler(t *testing.T) { var evts *TopicEventHandler var err error testTopicCloseWithOpenResource(t, - func (topic *Topic) { - evts , err = topic.EventHandler() + func(topic *Topic) { + evts, err = topic.EventHandler() if err != nil { t.Fatal(err) } }, - func (){ + func() { evts.Cancel() }, ) @@ -110,6 +111,100 @@ func testTopicCloseWithOpenResource(t *testing.T, openResource func(topic *Topic } } +func TestTopicReuse(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const numHosts = 2 + topicID := "foobar" + hosts := getNetHosts(t, ctx, numHosts) + + sender := getPubsub(ctx, hosts[0], WithDiscovery(&dummyDiscovery{})) + receiver := getPubsub(ctx, hosts[1]) + + connectAll(t, hosts) + + // Sender creates topic + sendTopic, err := sender.Join(topicID) + if err != nil { + t.Fatal(err) + } + + // Receiver creates and subscribes to the topic + receiveTopic, err := receiver.Join(topicID) + if err != nil { + t.Fatal(err) + } + + sub, err := receiveTopic.Subscribe() + if err != nil { + t.Fatal(err) + } + + firstMsg := []byte("1") + if err := sendTopic.Publish(ctx, firstMsg, WithReadiness(MinTopicSize(1))); err != nil { + t.Fatal(err) + } + + msg, err := sub.Next(ctx) + if err != nil { + t.Fatal(err) + } + if bytes.Compare(msg.GetData(), firstMsg) != 0 { + t.Fatal("received incorrect message") + } + + if err := sendTopic.Close(); err != nil { + t.Fatal(err) + } + + // Recreate the same topic + newSendTopic, err := sender.Join(topicID) + if err != nil { + t.Fatal(err) + } + + // Try sending data with original topic + illegalSend := []byte("illegal") + if err := sendTopic.Publish(ctx, illegalSend); err != ErrTopicClosed { + t.Fatal(err) + } + + timeoutCtx, timeoutCancel := context.WithTimeout(ctx, time.Second*2) + defer timeoutCancel() + msg, err = sub.Next(timeoutCtx) + if err != context.DeadlineExceeded { + if err != nil { + t.Fatal(err) + } + if bytes.Compare(msg.GetData(), illegalSend) != 0 { + t.Fatal("received incorrect message from illegal topic") + } + t.Fatal("received message sent by illegal topic") + } + timeoutCancel() + + // Try cancelling the new topic by using the original topic + if err := sendTopic.Close(); err != nil { + t.Fatal(err) + } + + secondMsg := []byte("2") + if err := newSendTopic.Publish(ctx, secondMsg); err != nil { + t.Fatal(err) + } + + timeoutCtx, timeoutCancel = context.WithTimeout(ctx, time.Second*2) + defer timeoutCancel() + msg, err = sub.Next(ctx) + if err != nil { + t.Fatal(err) + } + if bytes.Compare(msg.GetData(), secondMsg) != 0 { + t.Fatal("received incorrect message") + } +} + func TestTopicEventHandlerCancel(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -130,7 +225,7 @@ func TestTopicEventHandlerCancel(t *testing.T) { t.Fatal(err) } evts.Cancel() - timeoutCtx, timeoutCancel := context.WithTimeout(ctx, time.Second * 2) + timeoutCtx, timeoutCancel := context.WithTimeout(ctx, time.Second*2) defer timeoutCancel() connectAll(t, hosts) _, err = evts.NextPeerEvent(timeoutCtx)