From 48c9847240e04eff525b653ec630fce3e7cd1ca4 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Fri, 2 Aug 2019 00:46:49 -0400 Subject: [PATCH] oops forgot a return. separated out and added more comments to the new tests. --- floodsub_test.go | 66 ++++++++++++++++++++++++++++++++++++++++-------- subscription.go | 8 +++--- 2 files changed, 60 insertions(+), 14 deletions(-) diff --git a/floodsub_test.go b/floodsub_test.go index 6541638..70265b0 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -1211,7 +1211,7 @@ func TestSubscriptionLeaveNotification(t *testing.T) { } } -func TestSubscriptionNotificationOverflow(t *testing.T) { +func TestSubscriptionNotificationOverflowSimple(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -1296,7 +1296,7 @@ func TestSubscriptionNotificationOverflow(t *testing.T) { msgs[i].Cancel() } - // Wait for remaining peer to find other peers + // Wait for remaining peer to disconnect from the other peers for len(psubs[0].ListPeers(topic)) != 0 { time.Sleep(time.Millisecond * 100) } @@ -1312,44 +1312,88 @@ func TestSubscriptionNotificationOverflow(t *testing.T) { t.Fatal("non LEAVE event occurred") } } +} +func TestSubscriptionNotificationSubUnSub(t *testing.T) { // Resubscribe and Unsubscribe a peers and check the state for consistency - notifSubThenUnSub(ctx, t, topic, psubs, msgs, 10) - notifSubThenUnSub(ctx, t, topic, psubs, msgs, numHosts-1) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const topic = "foobar" + + const numHosts = 35 + hosts := getNetHosts(t, ctx, numHosts) + psubs := getPubsubs(ctx, hosts) + + for i := 1; i < numHosts; i++ { + connect(t, hosts[0], hosts[i]) + } + time.Sleep(time.Millisecond * 100) + + notifSubThenUnSub(ctx, t, topic, psubs[:11]) +} + +func TestSubscriptionNotificationOverflowSubUnSub(t *testing.T) { + // Resubscribe and Unsubscribe a peers and check the state for consistency + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const topic = "foobar" + + const numHosts = 35 + hosts := getNetHosts(t, ctx, numHosts) + psubs := getPubsubs(ctx, hosts) + + for i := 1; i < numHosts; i++ { + connect(t, hosts[0], hosts[i]) + } + time.Sleep(time.Millisecond * 100) + + notifSubThenUnSub(ctx, t, topic, psubs) } func notifSubThenUnSub(ctx context.Context, t *testing.T, topic string, - psubs []*PubSub, msgs []*Subscription, checkSize int) { + psubs []*PubSub) { ps := psubs[0] - sub := msgs[0] + msgs := make([]*Subscription, len(psubs)) + checkSize := len(psubs) - 1 + // Subscribe all peers to the topic var err error - - for i := 1; i < checkSize+1; i++ { - msgs[i], err = psubs[i].Subscribe(topic) + for i, ps := range psubs { + msgs[i], err = ps.Subscribe(topic) if err != nil { t.Fatal(err) } } + sub := msgs[0] + + // Wait for the primary peer to be connected to the other peers for len(ps.ListPeers(topic)) < checkSize { time.Sleep(time.Millisecond * 100) } + // Unsubscribe all peers except the primary for i := 1; i < checkSize+1; i++ { msgs[i].Cancel() } - // Wait for subscriptions to register + // Wait for the unsubscribe messages to reach the primary peer for len(ps.ListPeers(topic)) < 0 { time.Sleep(time.Millisecond * 100) } + // read all available events and verify that there are no events to process + // this is because every peer that joined also left peerState := readAllQueuedEvents(ctx, t, sub) if len(peerState) != 0 { - t.Fatal("Received incorrect events") + for p, s := range peerState { + fmt.Println(p, s) + } + t.Fatalf("Received incorrect events. %d extra events", len(peerState)) } } diff --git a/subscription.go b/subscription.go index ae33cde..5ed1778 100644 --- a/subscription.go +++ b/subscription.go @@ -60,9 +60,11 @@ func (sub *Subscription) sendNotification(evt PeerEvent) { sub.eventMx.Lock() defer sub.eventMx.Unlock() - e, ok := sub.evtBacklog[evt.Peer] - if ok && e != evt.Type { - delete(sub.evtBacklog, evt.Peer) + if e, ok := sub.evtBacklog[evt.Peer]; ok { + if e != evt.Type { + delete(sub.evtBacklog, evt.Peer) + } + return } select {