diff --git a/p2p/peer.go b/p2p/peer.go index af019d0..cfd63af 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -23,6 +23,7 @@ import ( "net" "sort" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common/mclock" @@ -44,7 +45,10 @@ const ( snappyProtocolVersion = 5 - pingInterval = 15 * time.Second + pingInterval = 1 * time.Second + // watchdogInterval intentionally lower than ping interval. + // this way we reduce potential flaky window size. + watchdogInterval = 200 * time.Millisecond ) const ( @@ -106,6 +110,7 @@ type Peer struct { log log.Logger created mclock.AbsTime + flaky int32 wg sync.WaitGroup protoErr chan error closed chan struct{} @@ -125,6 +130,11 @@ func NewPeer(id enode.ID, name string, caps []Cap) *Peer { return peer } +// IsFlaky returns true if there was no incoming traffic recently. +func (p *Peer) IsFlaky() bool { + return atomic.LoadInt32(&p.flaky) == 1 +} + // ID returns the node's public key. func (p *Peer) ID() enode.ID { return p.rw.node.ID() @@ -201,8 +211,10 @@ func (p *Peer) run() (remoteRequested bool, err error) { readErr = make(chan error, 1) reason DiscReason // sent to the peer ) - p.wg.Add(2) - go p.readLoop(readErr) + p.wg.Add(3) + reads := make(chan struct{}, 10) // channel for reads + go p.readLoop(readErr, reads) + go p.watchdogLoop(reads) go p.pingLoop() // Start all protocol handlers. @@ -262,7 +274,24 @@ func (p *Peer) pingLoop() { } } -func (p *Peer) readLoop(errc chan<- error) { +func (p *Peer) watchdogLoop(reads <-chan struct{}) { + defer p.wg.Done() + hb := time.NewTimer(watchdogInterval) + defer hb.Stop() + for { + select { + case <-reads: + atomic.StoreInt32(&p.flaky, 0) + case <-hb.C: + atomic.StoreInt32(&p.flaky, 1) + case <-p.closed: + return + } + hb.Reset(watchdogInterval) + } +} + +func (p *Peer) readLoop(errc chan<- error, reads chan<- struct{}) { defer p.wg.Done() for { msg, err := p.rw.ReadMsg() @@ -275,6 +304,7 @@ func (p *Peer) readLoop(errc chan<- error) { errc <- err return } + reads <- struct{}{} } } diff --git a/p2p/server.go b/p2p/server.go index 40db758..8546b02 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -49,7 +49,7 @@ const ( // Maximum time allowed for reading a complete message. // This is effectively the amount of time a connection can be idle. - frameReadTimeout = 30 * time.Second + frameReadTimeout = 10 * time.Second // Maximum amount of time allowed for writing a complete message. frameWriteTimeout = 20 * time.Second diff --git a/whisper/whisperv6/peer.go b/whisper/whisperv6/peer.go index eb17d2d..2b7687e 100644 --- a/whisper/whisperv6/peer.go +++ b/whisper/whisperv6/peer.go @@ -195,6 +195,10 @@ func (peer *Peer) expire() { // broadcast iterates over the collection of envelopes and transmits yet unknown // ones over the network. func (peer *Peer) broadcast() error { + if peer.peer.IsFlaky() { + log.Trace("Waiting for a peer to restore communication", "ID", peer.peer.ID()) + return nil + } envelopes := peer.host.Envelopes() bundle := make([]*Envelope, 0, len(envelopes)) for _, envelope := range envelopes {