From fc7795c38f3a578e7e7439262ace3ae9a7055ed3 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 13 Dec 2018 22:49:35 +0200 Subject: [PATCH] simplify peer disconnect detection logic to ensure a single outgoing stream --- comm.go | 17 +++++++++++++++++ notify.go | 6 ------ pubsub.go | 3 +-- 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/comm.go b/comm.go index 9bdcd7c..c900efc 100644 --- a/comm.go +++ b/comm.go @@ -66,12 +66,29 @@ func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan } go p.handleSendingMessages(ctx, s, outgoing) + go p.handlePeerEOF(ctx, s) select { case p.newPeerStream <- s: case <-ctx.Done(): } } +func (p *PubSub) handlePeerEOF(ctx context.Context, s inet.Stream) { + r := ggio.NewDelimitedReader(s, 1<<20) + rpc := new(RPC) + for { + err := r.ReadMsg(&rpc.RPC) + if err != nil { + select { + case p.peerDead <- s.Conn().RemotePeer(): + case <-ctx.Done(): + } + return + } + log.Warning("unexpected message from ", s.Conn().RemotePeer()) + } +} + func (p *PubSub) handleSendingMessages(ctx context.Context, s inet.Stream, outgoing <-chan *RPC) { bufw := bufio.NewWriter(s) wc := ggio.NewDelimitedWriter(bufw) diff --git a/notify.go b/notify.go index efe89b7..35f02cc 100644 --- a/notify.go +++ b/notify.go @@ -25,12 +25,6 @@ func (p *PubSubNotif) Connected(n inet.Network, c inet.Conn) { } func (p *PubSubNotif) Disconnected(n inet.Network, c inet.Conn) { - go func() { - select { - case p.peerDead <- c.RemotePeer(): - case <-p.ctx.Done(): - } - }() } func (p *PubSubNotif) Listen(n inet.Network, _ ma.Multiaddr) { diff --git a/pubsub.go b/pubsub.go index a59b388..cc0ce3f 100644 --- a/pubsub.go +++ b/pubsub.go @@ -318,8 +318,7 @@ func (p *PubSub) processLoop(ctx context.Context) { if p.host.Network().Connectedness(pid) == inet.Connected { // still connected, must be a duplicate connection being closed. - // we respawn the writer as we need to ensure there is at leat one active - // at worst we can end with two writers pushing messages from the same channel. + // we respawn the writer as we need to ensure there is a stream active log.Warning("peer declared dead but still connected; respawning writer: ", pid) go p.handleNewPeer(ctx, pid, ch) continue