mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-07 15:23:08 +00:00
batch process dead peer notifications
This commit is contained in:
parent
9d86090f42
commit
b289ec5017
35
comm.go
35
comm.go
@ -88,21 +88,31 @@ func (p *PubSub) handleNewStream(s network.Stream) {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PubSub) notifyPeerDead(pid peer.ID) {
|
||||
p.peerDeadPrioLk.RLock()
|
||||
p.peerDeadMx.Lock()
|
||||
p.peerDeadPend[pid] = struct{}{}
|
||||
p.peerDeadMx.Unlock()
|
||||
p.peerDeadPrioLk.RUnlock()
|
||||
|
||||
select {
|
||||
case p.peerDead <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan *RPC) {
|
||||
s, err := p.host.NewStream(p.ctx, pid, p.rt.Protocols()...)
|
||||
if err != nil {
|
||||
log.Debug("opening new stream to peer: ", err, pid)
|
||||
|
||||
var ch chan peer.ID
|
||||
if err == ms.ErrNotSupported {
|
||||
ch = p.newPeerError
|
||||
select {
|
||||
case p.newPeerError <- pid:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
} else {
|
||||
ch = p.peerDead
|
||||
}
|
||||
|
||||
select {
|
||||
case ch <- pid:
|
||||
case <-ctx.Done():
|
||||
p.notifyPeerDead(pid)
|
||||
}
|
||||
return
|
||||
}
|
||||
@ -116,18 +126,17 @@ func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan
|
||||
}
|
||||
|
||||
func (p *PubSub) handlePeerEOF(ctx context.Context, s network.Stream) {
|
||||
pid := s.Conn().RemotePeer()
|
||||
r := protoio.NewDelimitedReader(s, p.maxMessageSize)
|
||||
rpc := new(RPC)
|
||||
for {
|
||||
err := r.ReadMsg(&rpc.RPC)
|
||||
if err != nil {
|
||||
select {
|
||||
case p.peerDead <- s.Conn().RemotePeer():
|
||||
case <-ctx.Done():
|
||||
}
|
||||
p.notifyPeerDead(pid)
|
||||
return
|
||||
}
|
||||
log.Debugf("unexpected message from %s", s.Conn().RemotePeer())
|
||||
|
||||
log.Debugf("unexpected message from %s", pid)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
80
pubsub.go
80
pubsub.go
@ -102,7 +102,10 @@ type PubSub struct {
|
||||
newPeerError chan peer.ID
|
||||
|
||||
// a notification channel for when our peers die
|
||||
peerDead chan peer.ID
|
||||
peerDead chan struct{}
|
||||
peerDeadPrioLk sync.RWMutex
|
||||
peerDeadMx sync.Mutex
|
||||
peerDeadPend map[peer.ID]struct{}
|
||||
|
||||
// The set of topics we are subscribed to
|
||||
mySubs map[string]map[*Subscription]struct{}
|
||||
@ -238,7 +241,8 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
|
||||
newPeersPend: make(map[peer.ID]struct{}),
|
||||
newPeerStream: make(chan network.Stream),
|
||||
newPeerError: make(chan peer.ID),
|
||||
peerDead: make(chan peer.ID),
|
||||
peerDead: make(chan struct{}, 1),
|
||||
peerDeadPend: make(map[peer.ID]struct{}),
|
||||
cancelCh: make(chan *Subscription),
|
||||
getPeers: make(chan *listPeerReq),
|
||||
addSub: make(chan *addSubReq),
|
||||
@ -509,34 +513,8 @@ func (p *PubSub) processLoop(ctx context.Context) {
|
||||
case pid := <-p.newPeerError:
|
||||
delete(p.peers, pid)
|
||||
|
||||
case pid := <-p.peerDead:
|
||||
ch, ok := p.peers[pid]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
close(ch)
|
||||
|
||||
if p.host.Network().Connectedness(pid) == network.Connected {
|
||||
// still connected, must be a duplicate connection being closed.
|
||||
// we respawn the writer as we need to ensure there is a stream active
|
||||
log.Debugf("peer declared dead but still connected; respawning writer: %s", pid)
|
||||
messages := make(chan *RPC, p.peerOutboundQueueSize)
|
||||
messages <- p.getHelloPacket()
|
||||
go p.handleNewPeer(ctx, pid, messages)
|
||||
p.peers[pid] = messages
|
||||
continue
|
||||
}
|
||||
|
||||
delete(p.peers, pid)
|
||||
for t, tmap := range p.topics {
|
||||
if _, ok := tmap[pid]; ok {
|
||||
delete(tmap, pid)
|
||||
p.notifyLeave(t, pid)
|
||||
}
|
||||
}
|
||||
|
||||
p.rt.RemovePeer(pid)
|
||||
case <-p.peerDead:
|
||||
p.handleDeadPeers()
|
||||
|
||||
case treq := <-p.getTopics:
|
||||
var out []string
|
||||
@ -645,6 +623,48 @@ func (p *PubSub) handlePendingPeers() {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PubSub) handleDeadPeers() {
|
||||
p.peerDeadPrioLk.Lock()
|
||||
defer p.peerDeadPrioLk.Unlock()
|
||||
|
||||
if len(p.peerDeadPend) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
deadPeers := p.peerDeadPend
|
||||
p.peerDeadPend = make(map[peer.ID]struct{})
|
||||
|
||||
for pid := range deadPeers {
|
||||
ch, ok := p.peers[pid]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
close(ch)
|
||||
|
||||
if p.host.Network().Connectedness(pid) == network.Connected {
|
||||
// still connected, must be a duplicate connection being closed.
|
||||
// we respawn the writer as we need to ensure there is a stream active
|
||||
log.Debugf("peer declared dead but still connected; respawning writer: %s", pid)
|
||||
messages := make(chan *RPC, p.peerOutboundQueueSize)
|
||||
messages <- p.getHelloPacket()
|
||||
go p.handleNewPeer(p.ctx, pid, messages)
|
||||
p.peers[pid] = messages
|
||||
continue
|
||||
}
|
||||
|
||||
delete(p.peers, pid)
|
||||
for t, tmap := range p.topics {
|
||||
if _, ok := tmap[pid]; ok {
|
||||
delete(tmap, pid)
|
||||
p.notifyLeave(t, pid)
|
||||
}
|
||||
}
|
||||
|
||||
p.rt.RemovePeer(pid)
|
||||
}
|
||||
}
|
||||
|
||||
// handleAddTopic adds a tracker for a particular topic.
|
||||
// Only called from processLoop.
|
||||
func (p *PubSub) handleAddTopic(req *addTopicReq) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user