diff --git a/floodsub_test.go b/floodsub_test.go index dfd4e7e..6541638 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -1210,3 +1210,166 @@ func TestSubscriptionLeaveNotification(t *testing.T) { t.Fatal(fmt.Errorf("blacklisting peer did not cause a leave event")) } } + +func TestSubscriptionNotificationOverflow(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const topic = "foobar" + + const numHosts = 35 + hosts := getNetHosts(t, ctx, numHosts) + + psubs := getPubsubs(ctx, hosts) + + msgs := make([]*Subscription, numHosts) + subPeersFound := make([]map[peer.ID]struct{}, numHosts) + + // Subscribe all peers except one and wait until they've all been found + for i := 1; i < numHosts; i++ { + subch, err := psubs[i].Subscribe(topic) + if err != nil { + t.Fatal(err) + } + + msgs[i] = subch + } + + connectAll(t, hosts) + + time.Sleep(time.Millisecond * 100) + + wg := sync.WaitGroup{} + for i := 1; i < numHosts; i++ { + peersFound := make(map[peer.ID]struct{}) + subPeersFound[i] = peersFound + sub := msgs[i] + wg.Add(1) + go func(peersFound map[peer.ID]struct{}) { + defer wg.Done() + for len(peersFound) < numHosts-2 { + event, err := sub.NextPeerEvent(ctx) + if err != nil { + t.Fatal(err) + } + if event.Type == PEER_JOIN { + peersFound[event.Peer] = struct{}{} + } + } + }(peersFound) + } + + wg.Wait() + for _, peersFound := range subPeersFound[1:] { + if len(peersFound) != numHosts-2 { + t.Fatalf("found %d peers, expected %d", len(peersFound), numHosts-2) + } + } + + // Wait for remaining peer to find other peers + for len(psubs[0].ListPeers(topic)) < numHosts-1 { + time.Sleep(time.Millisecond * 100) + } + + // Subscribe the remaining peer and check that all the events came through + sub, err := psubs[0].Subscribe(topic) + if err != nil { + t.Fatal(err) + } + + msgs[0] = sub + + peerState := readAllQueuedEvents(ctx, t, sub) + + if len(peerState) != numHosts-1 { + t.Fatal("incorrect number of peers found") + } + + for _, e := range peerState { + if e != PEER_JOIN { + t.Fatal("non JOIN event occurred") + } + } + + // Unsubscribe all peers except one and check that all the events came through + for i := 1; i < numHosts; i++ { + msgs[i].Cancel() + } + + // Wait for remaining peer to find other peers + for len(psubs[0].ListPeers(topic)) != 0 { + time.Sleep(time.Millisecond * 100) + } + + peerState = readAllQueuedEvents(ctx, t, sub) + + if len(peerState) != numHosts-1 { + t.Fatal("incorrect number of peers found") + } + + for _, e := range peerState { + if e != PEER_LEAVE { + t.Fatal("non LEAVE event occurred") + } + } + + // Resubscribe and Unsubscribe a peers and check the state for consistency + notifSubThenUnSub(ctx, t, topic, psubs, msgs, 10) + notifSubThenUnSub(ctx, t, topic, psubs, msgs, numHosts-1) +} + +func notifSubThenUnSub(ctx context.Context, t *testing.T, topic string, + psubs []*PubSub, msgs []*Subscription, checkSize int) { + + ps := psubs[0] + sub := msgs[0] + + var err error + + for i := 1; i < checkSize+1; i++ { + msgs[i], err = psubs[i].Subscribe(topic) + if err != nil { + t.Fatal(err) + } + } + + for len(ps.ListPeers(topic)) < checkSize { + time.Sleep(time.Millisecond * 100) + } + + for i := 1; i < checkSize+1; i++ { + msgs[i].Cancel() + } + + // Wait for subscriptions to register + for len(ps.ListPeers(topic)) < 0 { + time.Sleep(time.Millisecond * 100) + } + + peerState := readAllQueuedEvents(ctx, t, sub) + + if len(peerState) != 0 { + t.Fatal("Received incorrect events") + } +} + +func readAllQueuedEvents(ctx context.Context, t *testing.T, sub *Subscription) map[peer.ID]EventType { + peerState := make(map[peer.ID]EventType) + for { + ctx, _ := context.WithTimeout(ctx, time.Millisecond*100) + event, err := sub.NextPeerEvent(ctx) + if err == context.DeadlineExceeded { + break + } else if err != nil { + t.Fatal(err) + } + + e, ok := peerState[event.Peer] + if !ok { + peerState[event.Peer] = event.Type + } else if e != event.Type { + delete(peerState, event.Peer) + } + } + return peerState +} diff --git a/pubsub.go b/pubsub.go index 580d4fa..37391cd 100644 --- a/pubsub.go +++ b/pubsub.go @@ -454,20 +454,12 @@ func (p *PubSub) handleAddSubscription(req *addSubReq) { } tmap := p.topics[sub.topic] - inboundBufSize := len(tmap) - if inboundBufSize < 32 { - inboundBufSize = 32 - } - sub.ch = make(chan *Message, 32) - sub.joinCh = make(chan peer.ID, inboundBufSize) - sub.leaveCh = make(chan peer.ID, 32) + for p := range tmap { + sub.evtBacklog[p] = PEER_JOIN + } sub.cancelCh = p.cancelCh - for pid := range tmap { - sub.joinCh <- pid - } - p.myTopics[sub.topic][sub] = struct{}{} req.resp <- sub @@ -581,11 +573,7 @@ func (p *PubSub) subscribedToMsg(msg *pb.Message) bool { func (p *PubSub) notifyLeave(topic string, pid peer.ID) { if subs, ok := p.myTopics[topic]; ok { for s := range subs { - select { - case s.leaveCh <- pid: - default: - log.Infof("Can't deliver leave event to subscription for topic %s; subscriber too slow", topic) - } + s.sendNotification(PeerEvent{PEER_LEAVE, pid}) } } } @@ -605,11 +593,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) { if subs, ok := p.myTopics[t]; ok { peer := rpc.from for s := range subs { - select { - case s.joinCh <- peer: - default: - log.Infof("Can't deliver join event to subscription for topic %s; subscriber too slow", t) - } + s.sendNotification(PeerEvent{PEER_JOIN, peer}) } } } @@ -712,6 +696,11 @@ func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor, opts ...SubO sub := &Subscription{ topic: td.GetName(), + + ch: make(chan *Message, 32), + peerEvtCh: make(chan PeerEvent, 32), + evtBacklog: make(map[peer.ID]EventType), + backlogCh: make(chan PeerEvent, 1), } for _, opt := range opts { diff --git a/subscription.go b/subscription.go index 7c4bdbe..ae33cde 100644 --- a/subscription.go +++ b/subscription.go @@ -3,6 +3,7 @@ package pubsub import ( "context" "github.com/libp2p/go-libp2p-core/peer" + "sync" ) type EventType int @@ -16,9 +17,12 @@ type Subscription struct { topic string ch chan *Message cancelCh chan<- *Subscription - joinCh chan peer.ID - leaveCh chan peer.ID err error + + peerEvtCh chan PeerEvent + eventMx sync.Mutex + evtBacklog map[peer.ID]EventType + backlogCh chan PeerEvent } type PeerEvent struct { @@ -50,29 +54,77 @@ func (sub *Subscription) Cancel() { func (sub *Subscription) close() { close(sub.ch) - close(sub.joinCh) - close(sub.leaveCh) +} + +func (sub *Subscription) sendNotification(evt PeerEvent) { + sub.eventMx.Lock() + defer sub.eventMx.Unlock() + + e, ok := sub.evtBacklog[evt.Peer] + if ok && e != evt.Type { + delete(sub.evtBacklog, evt.Peer) + } + + select { + case sub.peerEvtCh <- evt: + default: + // Empty event queue into backlog + emptyqueue: + for { + select { + case e := <-sub.peerEvtCh: + sub.addToBacklog(e) + default: + break emptyqueue + } + } + sub.addToBacklog(evt) + if e, ok := sub.pullFromBacklog(); ok { + sub.peerEvtCh <- e + } + } +} + +// addToBacklog assumes a lock has been taken to protect the backlog +func (sub *Subscription) addToBacklog(evt PeerEvent) { + e, ok := sub.evtBacklog[evt.Peer] + if !ok { + sub.evtBacklog[evt.Peer] = evt.Type + } else if e != evt.Type { + delete(sub.evtBacklog, evt.Peer) + } +} + +// pullFromBacklog assumes a lock has been taken to protect the backlog +func (sub *Subscription) pullFromBacklog() (PeerEvent, bool) { + for k, v := range sub.evtBacklog { + evt := PeerEvent{Peer: k, Type: v} + delete(sub.evtBacklog, k) + return evt, true + } + return PeerEvent{}, false } // NextPeerEvent returns the next event regarding subscribed peers -// Note: There is no guarantee that the Peer Join event will fire before -// the related Peer Leave event for a given peer +// Guarantees: Peer Join and Peer Leave events for a given peer will fire in order. +// Unless a peer both Joins and Leaves before NextPeerEvent emits either event +// all events will eventually be received from NextPeerEvent. func (sub *Subscription) NextPeerEvent(ctx context.Context) (PeerEvent, error) { + sub.eventMx.Lock() + evt, ok := sub.pullFromBacklog() + sub.eventMx.Unlock() + + if ok { + return evt, nil + } + select { - case newPeer, ok := <-sub.joinCh: - event := PeerEvent{Type: PEER_JOIN, Peer: newPeer} + case evt, ok := <-sub.peerEvtCh: if !ok { - return event, sub.err + return PeerEvent{}, sub.err } - return event, nil - case leavingPeer, ok := <-sub.leaveCh: - event := PeerEvent{Type: PEER_LEAVE, Peer: leavingPeer} - if !ok { - return event, sub.err - } - - return event, nil + return evt, nil case <-ctx.Done(): return PeerEvent{}, ctx.Err() }