Prevent sending messages to flaky peers (#917)

This commit is contained in:
Dmitry Shulyak 2018-05-15 20:08:31 +03:00 committed by GitHub
parent ee60b7e727
commit 5aae87aba8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 238 additions and 7 deletions

View File

@ -0,0 +1,119 @@
diff --git c/p2p/peer.go w/p2p/peer.go
index 73e33418e..322268b28 100644
--- c/p2p/peer.go
+++ w/p2p/peer.go
@@ -22,6 +22,7 @@ import (
"net"
"sort"
"sync"
+ "sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common/mclock"
@@ -38,7 +39,10 @@ const (
snappyProtocolVersion = 5
- pingInterval = 15 * time.Second
+ pingInterval = 1 * time.Second
+ // watchdogInterval intentionally lower than ping interval.
+ // this way we reduce potential flaky window size.
+ watchdogInterval = 200 * time.Millisecond
)
const (
@@ -100,6 +104,7 @@ type Peer struct {
log log.Logger
created mclock.AbsTime
+ flaky int32
wg sync.WaitGroup
protoErr chan error
closed chan struct{}
@@ -118,6 +123,11 @@ func NewPeer(id discover.NodeID, name string, caps []Cap) *Peer {
return peer
}
+// IsFlaky returns true if there was no incoming traffic recently.
+func (p *Peer) IsFlaky() bool {
+ return atomic.LoadInt32(&p.flaky) == 1
+}
+
// ID returns the node's public key.
func (p *Peer) ID() discover.NodeID {
return p.rw.id
@@ -188,8 +198,10 @@ func (p *Peer) run() (remoteRequested bool, err error) {
readErr = make(chan error, 1)
reason DiscReason // sent to the peer
)
- p.wg.Add(2)
- go p.readLoop(readErr)
+ p.wg.Add(3)
+ reads := make(chan struct{}, 10) // channel for reads
+ go p.readLoop(readErr, reads)
+ go p.watchdogLoop(reads)
go p.pingLoop()
// Start all protocol handlers.
@@ -248,7 +260,24 @@ func (p *Peer) pingLoop() {
}
}
-func (p *Peer) readLoop(errc chan<- error) {
+func (p *Peer) watchdogLoop(reads <-chan struct{}) {
+ defer p.wg.Done()
+ hb := time.NewTimer(watchdogInterval)
+ defer hb.Stop()
+ for {
+ select {
+ case <-reads:
+ atomic.StoreInt32(&p.flaky, 0)
+ case <-hb.C:
+ atomic.StoreInt32(&p.flaky, 1)
+ case <-p.closed:
+ return
+ }
+ hb.Reset(watchdogInterval)
+ }
+}
+
+func (p *Peer) readLoop(errc chan<- error, reads chan<- struct{}) {
defer p.wg.Done()
for {
msg, err := p.rw.ReadMsg()
@@ -261,6 +290,7 @@ func (p *Peer) readLoop(errc chan<- error) {
errc <- err
return
}
+ reads <- struct{}{}
}
}
diff --git c/p2p/server.go w/p2p/server.go
index c41d1dc15..04c6f7147 100644
--- c/p2p/server.go
+++ w/p2p/server.go
@@ -45,7 +45,7 @@ const (
// Maximum time allowed for reading a complete message.
// This is effectively the amount of time a connection can be idle.
- frameReadTimeout = 30 * time.Second
+ frameReadTimeout = 10 * time.Second
// Maximum amount of time allowed for writing a complete message.
frameWriteTimeout = 20 * time.Second
diff --git c/whisper/whisperv6/peer.go w/whisper/whisperv6/peer.go
index 427127290..c30e92d1c 100644
--- c/whisper/whisperv6/peer.go
+++ w/whisper/whisperv6/peer.go
@@ -187,6 +187,10 @@ func (peer *Peer) expire() {
// broadcast iterates over the collection of envelopes and transmits yet unknown
// ones over the network.
func (peer *Peer) broadcast() error {
+ if peer.peer.IsFlaky() {
+ log.Debug("Waiting for a peer to restore communication", "ID", peer.peer.ID())
+ return nil
+ }
envelopes := peer.host.Envelopes()
bundle := make([]*Envelope, 0, len(envelopes))
for _, envelope := range envelopes {

View File

@ -5,7 +5,9 @@ import (
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/whisper/whisperv6"
"github.com/status-im/status-go/geth/api"
. "github.com/status-im/status-go/t/utils"
@ -57,6 +59,79 @@ func consumeUntil(events <-chan *p2p.PeerEvent, f func(ev *p2p.PeerEvent) bool,
}
}
func (s *PeersTestSuite) TestSentEnvelope() {
node := s.backend.StatusNode()
w, err := node.WhisperService()
s.NoError(err)
client, _ := node.GethNode().Attach()
s.NotNil(client)
var symID string
s.NoError(client.Call(&symID, "shh_newSymKey"))
msg := whisperv6.NewMessage{
SymKeyID: symID,
PowTarget: whisperv6.DefaultMinimumPoW,
PowTime: 200,
TTL: 10,
Topic: whisperv6.TopicType{0x01, 0x01, 0x01, 0x01},
Payload: []byte("hello"),
}
stop := make(chan struct{})
defer close(stop)
go func() {
ticker := time.NewTicker(2 * time.Second)
for {
select {
case <-stop:
return
case <-ticker.C:
var hash common.Hash
s.NoError(client.Call(&hash, "shhext_post", msg))
}
}
}()
events := make(chan whisperv6.EnvelopeEvent, 100)
sub := w.SubscribeEnvelopeEvents(events)
defer sub.Unsubscribe()
waitAtLeastOneSent := func(timelimit time.Duration) {
timeout := time.After(timelimit)
for {
select {
case ev := <-events:
if ev.Event == whisperv6.EventEnvelopeSent {
return
}
case <-timeout:
s.FailNow("failed waiting for at least one envelope SENT")
return
}
}
}
waitAtLeastOneSent(60 * time.Second)
s.Require().NoError(s.controller.Enable())
waitEnvelopes := func(timelimit time.Duration, expect bool) {
timeout := time.After(timelimit)
for {
select {
case ev := <-events:
if ev.Event == whisperv6.EventEnvelopeSent {
if !expect {
s.FailNow("Unexpected SENT event")
}
}
case <-timeout:
return
}
}
}
// we verify that during this time no SENT events were fired
// must be less then 10s (current read socket deadline) to avoid reconnect
waitEnvelopes(9*time.Second, false)
s.Require().NoError(s.controller.Disable())
waitAtLeastOneSent(3 * time.Second)
}
// TestStaticPeersReconnect : it tests how long it takes to reconnect with
// peers after losing connection. This is something we will have to support
// in order for mobile devices to reconnect fast if network connectivity

View File

@ -61,6 +61,7 @@ func (s *WhisperExtensionSuite) TestSentSignal() {
confirmed <- event.Hash
}
})
defer signal.ResetDefaultNodeNotificationHandler()
client := s.nodes[0].RPCClient()
s.NotNil(client)
var symID string
@ -71,6 +72,7 @@ func (s *WhisperExtensionSuite) TestSentSignal() {
PowTime: 200,
Topic: whisper.TopicType{0x01, 0x01, 0x01, 0x01},
Payload: []byte("hello"),
TTL: 5,
}
var hash common.Hash
s.NoError(client.Call(&hash, "shhext_post", msg))
@ -78,7 +80,7 @@ func (s *WhisperExtensionSuite) TestSentSignal() {
select {
case conf := <-confirmed:
s.Equal(hash, conf)
case <-time.After(time.Second):
case <-time.After(5 * time.Second):
s.Fail("timed out while waiting for confirmation")
}
}
@ -99,6 +101,7 @@ func (s *WhisperExtensionSuite) TestExpiredSignal() {
expired <- event.Hash
}
})
defer signal.ResetDefaultNodeNotificationHandler()
client := s.nodes[0].RPCClient()
s.NotNil(client)
var symID string
@ -117,7 +120,7 @@ func (s *WhisperExtensionSuite) TestExpiredSignal() {
select {
case exp := <-expired:
s.Equal(hash, exp)
case <-time.After(3 * time.Second):
case <-time.After(5 * time.Second):
s.Fail("timed out while waiting for expiration")
}
}

View File

@ -22,6 +22,7 @@ import (
"net"
"sort"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common/mclock"
@ -38,7 +39,10 @@ const (
snappyProtocolVersion = 5
pingInterval = 15 * time.Second
pingInterval = 1 * time.Second
// watchdogInterval intentionally lower than ping interval.
// this way we reduce potential flaky window size.
watchdogInterval = 200 * time.Millisecond
)
const (
@ -100,6 +104,7 @@ type Peer struct {
log log.Logger
created mclock.AbsTime
flaky int32
wg sync.WaitGroup
protoErr chan error
closed chan struct{}
@ -118,6 +123,11 @@ func NewPeer(id discover.NodeID, name string, caps []Cap) *Peer {
return peer
}
// IsFlaky returns true if there was no incoming traffic recently.
func (p *Peer) IsFlaky() bool {
return atomic.LoadInt32(&p.flaky) == 1
}
// ID returns the node's public key.
func (p *Peer) ID() discover.NodeID {
return p.rw.id
@ -188,8 +198,10 @@ func (p *Peer) run() (remoteRequested bool, err error) {
readErr = make(chan error, 1)
reason DiscReason // sent to the peer
)
p.wg.Add(2)
go p.readLoop(readErr)
p.wg.Add(3)
reads := make(chan struct{}, 10) // channel for reads
go p.readLoop(readErr, reads)
go p.watchdogLoop(reads)
go p.pingLoop()
// Start all protocol handlers.
@ -248,7 +260,24 @@ func (p *Peer) pingLoop() {
}
}
func (p *Peer) readLoop(errc chan<- error) {
func (p *Peer) watchdogLoop(reads <-chan struct{}) {
defer p.wg.Done()
hb := time.NewTimer(watchdogInterval)
defer hb.Stop()
for {
select {
case <-reads:
atomic.StoreInt32(&p.flaky, 0)
case <-hb.C:
atomic.StoreInt32(&p.flaky, 1)
case <-p.closed:
return
}
hb.Reset(watchdogInterval)
}
}
func (p *Peer) readLoop(errc chan<- error, reads chan<- struct{}) {
defer p.wg.Done()
for {
msg, err := p.rw.ReadMsg()
@ -261,6 +290,7 @@ func (p *Peer) readLoop(errc chan<- error) {
errc <- err
return
}
reads <- struct{}{}
}
}

View File

@ -45,7 +45,7 @@ const (
// Maximum time allowed for reading a complete message.
// This is effectively the amount of time a connection can be idle.
frameReadTimeout = 30 * time.Second
frameReadTimeout = 10 * time.Second
// Maximum amount of time allowed for writing a complete message.
frameWriteTimeout = 20 * time.Second

View File

@ -187,6 +187,10 @@ func (peer *Peer) expire() {
// broadcast iterates over the collection of envelopes and transmits yet unknown
// ones over the network.
func (peer *Peer) broadcast() error {
if peer.peer.IsFlaky() {
log.Debug("Waiting for a peer to restore communication", "ID", peer.peer.ID())
return nil
}
envelopes := peer.host.Envelopes()
bundle := make([]*Envelope, 0, len(envelopes))
for _, envelope := range envelopes {