From 26071eaad07c40f3f624e7b10253e81b4d016b11 Mon Sep 17 00:00:00 2001 From: Yaroslav Kolomiiets Date: Wed, 15 Apr 2020 13:02:34 +0100 Subject: [PATCH] PEX: integrate with send throttling --- peerconn.go | 4 +++- pexconn.go | 8 +++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/peerconn.go b/peerconn.go index 1be12891..052dcdc0 100644 --- a/peerconn.go +++ b/peerconn.go @@ -564,7 +564,9 @@ func (cn *PeerConn) fillWriteBuffer(msg func(pp.Message) bool) { cn.requestsLowWater = len(cn.requests) / 2 } if cn.pex.IsEnabled() { - cn.pex.Share(msg) // gated internally + if flow := cn.pex.Share(msg); !flow { + return + } } cn.upload(msg) } diff --git a/pexconn.go b/pexconn.go index 51aac7e9..e70ed105 100644 --- a/pexconn.go +++ b/pexconn.go @@ -68,20 +68,22 @@ func (s *pexConnState) genmsg() *pp.PexMsg { } // Share is called from the writer goroutine if when it is woken up with the write buffers empty -func (s *pexConnState) Share(postfn messageWriter) { +// Returns whether there's more room on the send buffer to write to. +func (s *pexConnState) Share(postfn messageWriter) bool { select { case <-s.gate: if tx := s.genmsg(); tx != nil { s.dbg.Print("sending PEX message: ", tx) - postfn(tx.Message(s.xid)) + flow := postfn(tx.Message(s.xid)) s.sched(pexInterval) + return flow } else { // no PEX to send this time - try again shortly s.sched(pexRetryDelay) } default: - return } + return true } // Recv is called from the reader goroutine