Fixed some unnecessary Leave notifications.
Combined Join and Leave events into a single API with a struct that specifies whether the event is a Join or a Leave.
This commit is contained in:
parent
817651a6d1
commit
be69856a1d
|
@ -1111,12 +1111,14 @@ func TestSubscriptionJoinNotification(t *testing.T) {
|
|||
wg.Add(1)
|
||||
go func(peersFound map[peer.ID]struct{}) {
|
||||
defer wg.Done()
|
||||
for i := 0; i < numHosts-1; i++ {
|
||||
pid, err := sub.NextPeerJoin(ctx)
|
||||
for len(peersFound) < numHosts-1 {
|
||||
event, err := sub.NextPeerEvent(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
peersFound[pid] = struct{}{}
|
||||
if event.Type == PEER_JOIN {
|
||||
peersFound[event.Peer] = struct{}{}
|
||||
}
|
||||
}
|
||||
}(peersFound)
|
||||
}
|
||||
|
@ -1163,12 +1165,14 @@ func TestSubscriptionLeaveNotification(t *testing.T) {
|
|||
wg.Add(1)
|
||||
go func(peersFound map[peer.ID]struct{}) {
|
||||
defer wg.Done()
|
||||
for i := 0; i < numHosts-1; i++ {
|
||||
pid, err := sub.NextPeerJoin(ctx)
|
||||
for len(peersFound) < numHosts-1 {
|
||||
event, err := sub.NextPeerEvent(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
peersFound[pid] = struct{}{}
|
||||
if event.Type == PEER_JOIN {
|
||||
peersFound[event.Peer] = struct{}{}
|
||||
}
|
||||
}
|
||||
}(peersFound)
|
||||
}
|
||||
|
@ -1186,12 +1190,14 @@ func TestSubscriptionLeaveNotification(t *testing.T) {
|
|||
psubs[0].BlacklistPeer(hosts[3].ID())
|
||||
|
||||
leavingPeers := make(map[peer.ID]struct{})
|
||||
for i := 0; i < 3; i++ {
|
||||
pid, err := msgs[0].NextPeerLeave(ctx)
|
||||
for len(leavingPeers) < 3 {
|
||||
event, err := msgs[0].NextPeerEvent(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
leavingPeers[pid] = struct{}{}
|
||||
if event.Type == PEER_LEAVE {
|
||||
leavingPeers[event.Peer] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
if _, ok := leavingPeers[hosts[1].ID()]; !ok {
|
||||
|
|
35
pubsub.go
35
pubsub.go
|
@ -337,8 +337,10 @@ func (p *PubSub) processLoop(ctx context.Context) {
|
|||
|
||||
delete(p.peers, pid)
|
||||
for t, tmap := range p.topics {
|
||||
delete(tmap, pid)
|
||||
p.notifySubscriberLeft(t, pid)
|
||||
if _, ok := tmap[pid]; ok {
|
||||
delete(tmap, pid)
|
||||
p.notifyLeave(t, pid)
|
||||
}
|
||||
}
|
||||
|
||||
p.rt.RemovePeer(pid)
|
||||
|
@ -397,8 +399,10 @@ func (p *PubSub) processLoop(ctx context.Context) {
|
|||
close(ch)
|
||||
delete(p.peers, pid)
|
||||
for t, tmap := range p.topics {
|
||||
delete(tmap, pid)
|
||||
p.notifySubscriberLeft(t, pid)
|
||||
if _, ok := tmap[pid]; ok {
|
||||
delete(tmap, pid)
|
||||
p.notifyLeave(t, pid)
|
||||
}
|
||||
}
|
||||
p.rt.RemovePeer(pid)
|
||||
}
|
||||
|
@ -423,8 +427,8 @@ func (p *PubSub) handleRemoveSubscription(sub *Subscription) {
|
|||
|
||||
sub.err = fmt.Errorf("subscription cancelled by calling sub.Cancel()")
|
||||
close(sub.ch)
|
||||
close(sub.inboundSubs)
|
||||
close(sub.leavingSubs)
|
||||
close(sub.joinCh)
|
||||
close(sub.leaveCh)
|
||||
delete(subs, sub)
|
||||
|
||||
if len(subs) == 0 {
|
||||
|
@ -461,12 +465,12 @@ func (p *PubSub) handleAddSubscription(req *addSubReq) {
|
|||
}
|
||||
|
||||
sub.ch = make(chan *Message, 32)
|
||||
sub.inboundSubs = make(chan peer.ID, inboundBufSize)
|
||||
sub.leavingSubs = make(chan peer.ID, 32)
|
||||
sub.joinCh = make(chan peer.ID, inboundBufSize)
|
||||
sub.leaveCh = make(chan peer.ID, 32)
|
||||
sub.cancelCh = p.cancelCh
|
||||
|
||||
for pid := range tmap {
|
||||
sub.inboundSubs <- pid
|
||||
sub.joinCh <- pid
|
||||
}
|
||||
|
||||
p.myTopics[sub.topic][sub] = struct{}{}
|
||||
|
@ -579,11 +583,11 @@ func (p *PubSub) subscribedToMsg(msg *pb.Message) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func (p *PubSub) notifySubscriberLeft(topic string, pid peer.ID) {
|
||||
func (p *PubSub) notifyLeave(topic string, pid peer.ID) {
|
||||
if subs, ok := p.myTopics[topic]; ok {
|
||||
for s := range subs {
|
||||
select {
|
||||
case s.leavingSubs <- pid:
|
||||
case s.leaveCh <- pid:
|
||||
default:
|
||||
log.Infof("Can't deliver leave event to subscription for topic %s; subscriber too slow", topic)
|
||||
}
|
||||
|
@ -607,7 +611,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) {
|
|||
inboundPeer := rpc.from
|
||||
for s := range subs {
|
||||
select {
|
||||
case s.inboundSubs <- inboundPeer:
|
||||
case s.joinCh <- inboundPeer:
|
||||
default:
|
||||
log.Infof("Can't deliver join event to subscription for topic %s; subscriber too slow", t)
|
||||
}
|
||||
|
@ -619,8 +623,11 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) {
|
|||
if !ok {
|
||||
continue
|
||||
}
|
||||
delete(tmap, rpc.from)
|
||||
p.notifySubscriberLeft(t, rpc.from)
|
||||
|
||||
if _, ok := tmap[rpc.from]; ok {
|
||||
delete(tmap, rpc.from)
|
||||
p.notifyLeave(t, rpc.from)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -5,19 +5,33 @@ import (
|
|||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
)
|
||||
|
||||
type EventType uint8
|
||||
|
||||
const (
|
||||
UNKNOWN EventType = iota
|
||||
PEER_JOIN
|
||||
PEER_LEAVE
|
||||
)
|
||||
|
||||
type Subscription struct {
|
||||
topic string
|
||||
ch chan *Message
|
||||
cancelCh chan<- *Subscription
|
||||
inboundSubs chan peer.ID
|
||||
leavingSubs chan peer.ID
|
||||
err error
|
||||
topic string
|
||||
ch chan *Message
|
||||
cancelCh chan<- *Subscription
|
||||
joinCh chan peer.ID
|
||||
leaveCh chan peer.ID
|
||||
err error
|
||||
}
|
||||
|
||||
type PeerEvent struct {
|
||||
Type EventType
|
||||
Peer peer.ID
|
||||
}
|
||||
|
||||
func (sub *Subscription) Topic() string {
|
||||
return sub.topic
|
||||
}
|
||||
|
||||
// Next returns the next message in our subscription
|
||||
func (sub *Subscription) Next(ctx context.Context) (*Message, error) {
|
||||
select {
|
||||
case msg, ok := <-sub.ch:
|
||||
|
@ -35,28 +49,26 @@ func (sub *Subscription) Cancel() {
|
|||
sub.cancelCh <- sub
|
||||
}
|
||||
|
||||
func (sub *Subscription) NextPeerJoin(ctx context.Context) (peer.ID, error) {
|
||||
// NextPeerEvent returns the next event regarding subscribed peers
|
||||
// Note: There is no guarantee that the Peer Join event will fire before
|
||||
// the related Peer Leave event for a given peer
|
||||
func (sub *Subscription) NextPeerEvent(ctx context.Context) (PeerEvent, error) {
|
||||
select {
|
||||
case newPeer, ok := <-sub.inboundSubs:
|
||||
case newPeer, ok := <-sub.joinCh:
|
||||
event := PeerEvent{Type: PEER_JOIN, Peer: newPeer}
|
||||
if !ok {
|
||||
return newPeer, sub.err
|
||||
return event, sub.err
|
||||
}
|
||||
|
||||
return newPeer, nil
|
||||
case <-ctx.Done():
|
||||
return "", ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (sub *Subscription) NextPeerLeave(ctx context.Context) (peer.ID, error) {
|
||||
select {
|
||||
case leavingPeer, ok := <-sub.leavingSubs:
|
||||
return event, nil
|
||||
case leavingPeer, ok := <-sub.leaveCh:
|
||||
event := PeerEvent{Type: PEER_LEAVE, Peer: leavingPeer}
|
||||
if !ok {
|
||||
return leavingPeer, sub.err
|
||||
return event, sub.err
|
||||
}
|
||||
|
||||
return leavingPeer, nil
|
||||
return event, nil
|
||||
case <-ctx.Done():
|
||||
return "", ctx.Err()
|
||||
return PeerEvent{}, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue