use reset where appropriate

(especially when walking away from read-only streams)
This commit is contained in:
Steven Allen 2017-09-15 16:28:46 -07:00
parent 33a57ac760
commit 238bd82a7f
2 changed files with 10 additions and 4 deletions

12
comm.go
View File

@ -26,15 +26,18 @@ func (p *PubSub) getHelloPacket() *RPC {
} }
func (p *PubSub) handleNewStream(s inet.Stream) { func (p *PubSub) handleNewStream(s inet.Stream) {
defer s.Close()
r := ggio.NewDelimitedReader(s, 1<<20) r := ggio.NewDelimitedReader(s, 1<<20)
for { for {
rpc := new(RPC) rpc := new(RPC)
err := r.ReadMsg(&rpc.RPC) err := r.ReadMsg(&rpc.RPC)
if err != nil { if err != nil {
if err != io.EOF { if err != io.EOF {
s.Reset()
log.Errorf("error reading rpc from %s: %s", s.Conn().RemotePeer(), err) log.Errorf("error reading rpc from %s: %s", s.Conn().RemotePeer(), err)
} else {
// Just be nice. They probably won't read this
// but it doesn't hurt to send it.
s.Close()
} }
return return
} }
@ -43,6 +46,8 @@ func (p *PubSub) handleNewStream(s inet.Stream) {
select { select {
case p.incoming <- rpc: case p.incoming <- rpc:
case <-p.ctx.Done(): case <-p.ctx.Done():
// Close is useless because the other side isn't reading.
s.Reset()
return return
} }
} }
@ -62,7 +67,7 @@ func (p *PubSub) handleSendingMessages(ctx context.Context, s inet.Stream, outgo
return bufw.Flush() return bufw.Flush()
} }
defer wc.Close() defer s.Close()
for { for {
select { select {
case rpc, ok := <-outgoing: case rpc, ok := <-outgoing:
@ -76,6 +81,7 @@ func (p *PubSub) handleSendingMessages(ctx context.Context, s inet.Stream, outgo
err := writeMsg(&rpc.RPC) err := writeMsg(&rpc.RPC)
if err != nil { if err != nil {
s.Reset()
log.Warningf("writing message to %s: %s", s.Conn().RemotePeer(), err) log.Warningf("writing message to %s: %s", s.Conn().RemotePeer(), err)
dead = true dead = true
go func() { go func() {

View File

@ -26,7 +26,7 @@ func (p *PubSubNotif) Connected(n inet.Network, c inet.Conn) {
select { select {
case p.newPeers <- s: case p.newPeers <- s:
case <-p.ctx.Done(): case <-p.ctx.Done():
s.Close() s.Reset()
} }
}() }()
} }