diff --git a/pubsub.go b/pubsub.go index 7faded5..580d4fa 100644 --- a/pubsub.go +++ b/pubsub.go @@ -78,9 +78,6 @@ type PubSub struct { // topics tracks which topics each of our peers are subscribed to topics map[string]map[peer.ID]struct{} - // a set of notification channels for newly subscribed peers - newSubs map[string]chan peer.ID - // sendMsg handles messages that have been validated sendMsg chan *sendReq @@ -426,9 +423,7 @@ func (p *PubSub) handleRemoveSubscription(sub *Subscription) { } sub.err = fmt.Errorf("subscription cancelled by calling sub.Cancel()") - close(sub.ch) - close(sub.joinCh) - close(sub.leaveCh) + sub.close() delete(subs, sub) if len(subs) == 0 { @@ -608,10 +603,10 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) { if _, ok = tmap[rpc.from]; !ok { tmap[rpc.from] = struct{}{} if subs, ok := p.myTopics[t]; ok { - inboundPeer := rpc.from + peer := rpc.from for s := range subs { select { - case s.joinCh <- inboundPeer: + case s.joinCh <- peer: default: log.Infof("Can't deliver join event to subscription for topic %s; subscriber too slow", t) } diff --git a/subscription.go b/subscription.go index 0090804..56128af 100644 --- a/subscription.go +++ b/subscription.go @@ -5,7 +5,7 @@ import ( "github.com/libp2p/go-libp2p-core/peer" ) -type EventType uint8 +type EventType int8 const ( UNKNOWN EventType = iota @@ -49,6 +49,12 @@ func (sub *Subscription) Cancel() { sub.cancelCh <- sub } +func (sub *Subscription) close(){ + close(sub.ch) + close(sub.joinCh) + close(sub.leaveCh) +} + // NextPeerEvent returns the next event regarding subscribed peers // Note: There is no guarantee that the Peer Join event will fire before // the related Peer Leave event for a given peer