Small code cleanup + refactor
This commit is contained in:
parent
be69856a1d
commit
e26e489bdd
11
pubsub.go
11
pubsub.go
|
@ -78,9 +78,6 @@ type PubSub struct {
|
||||||
// topics tracks which topics each of our peers are subscribed to
|
// topics tracks which topics each of our peers are subscribed to
|
||||||
topics map[string]map[peer.ID]struct{}
|
topics map[string]map[peer.ID]struct{}
|
||||||
|
|
||||||
// a set of notification channels for newly subscribed peers
|
|
||||||
newSubs map[string]chan peer.ID
|
|
||||||
|
|
||||||
// sendMsg handles messages that have been validated
|
// sendMsg handles messages that have been validated
|
||||||
sendMsg chan *sendReq
|
sendMsg chan *sendReq
|
||||||
|
|
||||||
|
@ -426,9 +423,7 @@ func (p *PubSub) handleRemoveSubscription(sub *Subscription) {
|
||||||
}
|
}
|
||||||
|
|
||||||
sub.err = fmt.Errorf("subscription cancelled by calling sub.Cancel()")
|
sub.err = fmt.Errorf("subscription cancelled by calling sub.Cancel()")
|
||||||
close(sub.ch)
|
sub.close()
|
||||||
close(sub.joinCh)
|
|
||||||
close(sub.leaveCh)
|
|
||||||
delete(subs, sub)
|
delete(subs, sub)
|
||||||
|
|
||||||
if len(subs) == 0 {
|
if len(subs) == 0 {
|
||||||
|
@ -608,10 +603,10 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) {
|
||||||
if _, ok = tmap[rpc.from]; !ok {
|
if _, ok = tmap[rpc.from]; !ok {
|
||||||
tmap[rpc.from] = struct{}{}
|
tmap[rpc.from] = struct{}{}
|
||||||
if subs, ok := p.myTopics[t]; ok {
|
if subs, ok := p.myTopics[t]; ok {
|
||||||
inboundPeer := rpc.from
|
peer := rpc.from
|
||||||
for s := range subs {
|
for s := range subs {
|
||||||
select {
|
select {
|
||||||
case s.joinCh <- inboundPeer:
|
case s.joinCh <- peer:
|
||||||
default:
|
default:
|
||||||
log.Infof("Can't deliver join event to subscription for topic %s; subscriber too slow", t)
|
log.Infof("Can't deliver join event to subscription for topic %s; subscriber too slow", t)
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,7 +5,7 @@ import (
|
||||||
"github.com/libp2p/go-libp2p-core/peer"
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
)
|
)
|
||||||
|
|
||||||
type EventType uint8
|
type EventType int8
|
||||||
|
|
||||||
const (
|
const (
|
||||||
UNKNOWN EventType = iota
|
UNKNOWN EventType = iota
|
||||||
|
@ -49,6 +49,12 @@ func (sub *Subscription) Cancel() {
|
||||||
sub.cancelCh <- sub
|
sub.cancelCh <- sub
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (sub *Subscription) close(){
|
||||||
|
close(sub.ch)
|
||||||
|
close(sub.joinCh)
|
||||||
|
close(sub.leaveCh)
|
||||||
|
}
|
||||||
|
|
||||||
// NextPeerEvent returns the next event regarding subscribed peers
|
// NextPeerEvent returns the next event regarding subscribed peers
|
||||||
// Note: There is no guarantee that the Peer Join event will fire before
|
// Note: There is no guarantee that the Peer Join event will fire before
|
||||||
// the related Peer Leave event for a given peer
|
// the related Peer Leave event for a given peer
|
||||||
|
|
Loading…
Reference in New Issue