From 238bd82a7f394f02e307502a74a33773d6abff87 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Fri, 15 Sep 2017 16:28:46 -0700 Subject: [PATCH] use reset where appropriate (especially when walking away from read-only streams) --- comm.go | 12 +++++++++--- notify.go | 2 +- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/comm.go b/comm.go index 5ada91f..5657abb 100644 --- a/comm.go +++ b/comm.go @@ -26,15 +26,18 @@ func (p *PubSub) getHelloPacket() *RPC { } func (p *PubSub) handleNewStream(s inet.Stream) { - defer s.Close() - r := ggio.NewDelimitedReader(s, 1<<20) for { rpc := new(RPC) err := r.ReadMsg(&rpc.RPC) if err != nil { if err != io.EOF { + s.Reset() log.Errorf("error reading rpc from %s: %s", s.Conn().RemotePeer(), err) + } else { + // Just be nice. They probably won't read this + // but it doesn't hurt to send it. + s.Close() } return } @@ -43,6 +46,8 @@ func (p *PubSub) handleNewStream(s inet.Stream) { select { case p.incoming <- rpc: case <-p.ctx.Done(): + // Close is useless because the other side isn't reading. + s.Reset() return } } @@ -62,7 +67,7 @@ func (p *PubSub) handleSendingMessages(ctx context.Context, s inet.Stream, outgo return bufw.Flush() } - defer wc.Close() + defer s.Close() for { select { case rpc, ok := <-outgoing: @@ -76,6 +81,7 @@ func (p *PubSub) handleSendingMessages(ctx context.Context, s inet.Stream, outgo err := writeMsg(&rpc.RPC) if err != nil { + s.Reset() log.Warningf("writing message to %s: %s", s.Conn().RemotePeer(), err) dead = true go func() { diff --git a/notify.go b/notify.go index fdcaf39..11cb4e5 100644 --- a/notify.go +++ b/notify.go @@ -26,7 +26,7 @@ func (p *PubSubNotif) Connected(n inet.Network, c inet.Conn) { select { case p.newPeers <- s: case <-p.ctx.Done(): - s.Close() + s.Reset() } }() }