mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-03-19 09:43:08 +00:00
simplify peer disconnect detection logic to ensure a single outgoing stream
This commit is contained in:
parent
2621f893e6
commit
fc7795c38f
17
comm.go
17
comm.go
@ -66,12 +66,29 @@ func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan
|
||||
}
|
||||
|
||||
go p.handleSendingMessages(ctx, s, outgoing)
|
||||
go p.handlePeerEOF(ctx, s)
|
||||
select {
|
||||
case p.newPeerStream <- s:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PubSub) handlePeerEOF(ctx context.Context, s inet.Stream) {
|
||||
r := ggio.NewDelimitedReader(s, 1<<20)
|
||||
rpc := new(RPC)
|
||||
for {
|
||||
err := r.ReadMsg(&rpc.RPC)
|
||||
if err != nil {
|
||||
select {
|
||||
case p.peerDead <- s.Conn().RemotePeer():
|
||||
case <-ctx.Done():
|
||||
}
|
||||
return
|
||||
}
|
||||
log.Warning("unexpected message from ", s.Conn().RemotePeer())
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PubSub) handleSendingMessages(ctx context.Context, s inet.Stream, outgoing <-chan *RPC) {
|
||||
bufw := bufio.NewWriter(s)
|
||||
wc := ggio.NewDelimitedWriter(bufw)
|
||||
|
||||
@ -25,12 +25,6 @@ func (p *PubSubNotif) Connected(n inet.Network, c inet.Conn) {
|
||||
}
|
||||
|
||||
func (p *PubSubNotif) Disconnected(n inet.Network, c inet.Conn) {
|
||||
go func() {
|
||||
select {
|
||||
case p.peerDead <- c.RemotePeer():
|
||||
case <-p.ctx.Done():
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (p *PubSubNotif) Listen(n inet.Network, _ ma.Multiaddr) {
|
||||
|
||||
@ -318,8 +318,7 @@ func (p *PubSub) processLoop(ctx context.Context) {
|
||||
|
||||
if p.host.Network().Connectedness(pid) == inet.Connected {
|
||||
// still connected, must be a duplicate connection being closed.
|
||||
// we respawn the writer as we need to ensure there is at leat one active
|
||||
// at worst we can end with two writers pushing messages from the same channel.
|
||||
// we respawn the writer as we need to ensure there is a stream active
|
||||
log.Warning("peer declared dead but still connected; respawning writer: ", pid)
|
||||
go p.handleNewPeer(ctx, pid, ch)
|
||||
continue
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user