From c8a616688c5dee6a75d5d716f0db512c2a0f688f Mon Sep 17 00:00:00 2001 From: Dmitry Date: Mon, 21 Jan 2019 16:00:10 +0200 Subject: [PATCH] Add request with retries api call --- params/config.go | 3 + services/shhext/api.go | 52 ++++++++++ services/shhext/mailservers/connmanager.go | 15 ++- .../shhext/mailservers/connmanager_test.go | 79 ++++++++++++--- services/shhext/service.go | 7 +- services/shhext/service_test.go | 95 +++++++++++++++++++ 6 files changed, 234 insertions(+), 17 deletions(-) diff --git a/params/config.go b/params/config.go index 30d06d15a..112ad033e 100644 --- a/params/config.go +++ b/params/config.go @@ -319,6 +319,9 @@ type ShhextConfig struct { ConnectionTarget int // RequestsDelay used to ensure that no similar requests are sent within short periods of time. RequestsDelay time.Duration + + // MaxServerFailures defines maximum allowed expired requests before server will be swapped to another one. + MaxServerFailures int } // Validate validates the ShhextConfig struct and returns an error if inconsistent values are found diff --git a/services/shhext/api.go b/services/shhext/api.go index ce2edec29..3dcd148e2 100644 --- a/services/shhext/api.go +++ b/services/shhext/api.go @@ -180,6 +180,58 @@ func (api *PublicAPI) getPeer(rawurl string) (*enode.Node, error) { return enode.ParseV4(rawurl) } +// RetryConfig specifies configuration for retries with timeout and max amount of retries. +type RetryConfig struct { + BaseTimeout time.Duration + // StepTimeout defines duration increase per each retry. + StepTimeout time.Duration + MaxRetries int +} + +// RequestMessagesSync repeats MessagesRequest using configuration in retry conf. +func (api *PublicAPI) RequestMessagesSync(conf RetryConfig, r MessagesRequest) error { + shh := api.service.w + events := make(chan whisper.EnvelopeEvent, 10) + sub := shh.SubscribeEnvelopeEvents(events) + defer sub.Unsubscribe() + var ( + requestID hexutil.Bytes + err error + retries int + ) + for retries <= conf.MaxRetries { + r.Timeout = conf.BaseTimeout + conf.StepTimeout*time.Duration(retries) + // FIXME this weird conversion is required because MessagesRequest expects seconds but defines time.Duration + r.Timeout = time.Duration(int(r.Timeout.Seconds())) + requestID, err = api.RequestMessages(context.Background(), r) + if err != nil { + return err + } + err = waitForExpiredOrCompleted(common.BytesToHash(requestID), events) + if err == nil { + return nil + } + retries++ + api.log.Error("History request failed with %s. Making retry #%d", retries) + } + return fmt.Errorf("failed to request messages after %d retries", retries) +} + +func waitForExpiredOrCompleted(requestID common.Hash, events chan whisper.EnvelopeEvent) error { + for { + ev := <-events + if ev.Hash != requestID { + continue + } + switch ev.Event { + case whisper.EventMailServerRequestCompleted: + return nil + case whisper.EventMailServerRequestExpired: + return errors.New("request expired") + } + } +} + // RequestMessages sends a request for historic messages to a MailServer. func (api *PublicAPI) RequestMessages(_ context.Context, r MessagesRequest) (hexutil.Bytes, error) { api.log.Info("RequestMessages", "request", r) diff --git a/services/shhext/mailservers/connmanager.go b/services/shhext/mailservers/connmanager.go index 2f93f9a50..e8b07bc4f 100644 --- a/services/shhext/mailservers/connmanager.go +++ b/services/shhext/mailservers/connmanager.go @@ -39,11 +39,12 @@ type p2pServer interface { } // NewConnectionManager creates an instance of ConnectionManager. -func NewConnectionManager(server p2pServer, whisper EnvelopeEventSubscbriber, target int, timeout time.Duration) *ConnectionManager { +func NewConnectionManager(server p2pServer, whisper EnvelopeEventSubscbriber, target, maxFailures int, timeout time.Duration) *ConnectionManager { return &ConnectionManager{ server: server, whisper: whisper, connectedTarget: target, + maxFailures: maxFailures, notifications: make(chan []*enode.Node), timeoutWaitAdded: timeout, } @@ -60,6 +61,7 @@ type ConnectionManager struct { notifications chan []*enode.Node connectedTarget int timeoutWaitAdded time.Duration + maxFailures int } // Notify sends a non-blocking notification about new nodes. @@ -81,12 +83,12 @@ func (ps *ConnectionManager) Start() { ps.wg.Add(1) go func() { state := newInternalState(ps.server, ps.connectedTarget, ps.timeoutWaitAdded) - events := make(chan *p2p.PeerEvent, peerEventsBuffer) sub := ps.server.SubscribeEvents(events) whisperEvents := make(chan whisper.EnvelopeEvent, whisperEventsBuffer) whisperSub := ps.whisper.SubscribeEnvelopeEvents(whisperEvents) requests := map[common.Hash]struct{}{} + failuresPerServer := map[enode.ID]int{} defer sub.Unsubscribe() defer whisperSub.Unsubscribe() @@ -106,19 +108,24 @@ func (ps *ConnectionManager) Start() { case ev := <-events: processPeerEvent(state, ev) case ev := <-whisperEvents: - // TODO what about completed but with error? what about expired envelopes? + // TODO treat failed requests the same way as expired switch ev.Event { case whisper.EventMailServerRequestSent: requests[ev.Hash] = struct{}{} case whisper.EventMailServerRequestCompleted: + // reset failures count on first success + failuresPerServer[ev.Peer] = 0 delete(requests, ev.Hash) case whisper.EventMailServerRequestExpired: _, exist := requests[ev.Hash] if !exist { continue } + failuresPerServer[ev.Peer]++ log.Debug("request to a mail server expired, disconncet a peer", "address", ev.Peer) - state.nodeDisconnected(ev.Peer) + if failuresPerServer[ev.Peer] >= ps.maxFailures { + state.nodeDisconnected(ev.Peer) + } } } } diff --git a/services/shhext/mailservers/connmanager_test.go b/services/shhext/mailservers/connmanager_test.go index ad2eb64ca..64d959a6f 100644 --- a/services/shhext/mailservers/connmanager_test.go +++ b/services/shhext/mailservers/connmanager_test.go @@ -86,7 +86,7 @@ type fakeEnvelopeEvents struct { input chan whisper.EnvelopeEvent } -func (f fakeEnvelopeEvents) SubscribeEnvelopeEvents(output chan<- whisper.EnvelopeEvent) event.Subscription { +func (f *fakeEnvelopeEvents) SubscribeEnvelopeEvents(output chan<- whisper.EnvelopeEvent) event.Subscription { return event.NewSubscription(func(quit <-chan struct{}) error { for { select { @@ -100,8 +100,8 @@ func (f fakeEnvelopeEvents) SubscribeEnvelopeEvents(output chan<- whisper.Envelo }) } -func newFakeEnvelopesEvents() fakeEnvelopeEvents { - return fakeEnvelopeEvents{ +func newFakeEnvelopesEvents() *fakeEnvelopeEvents { + return &fakeEnvelopeEvents{ input: make(chan whisper.EnvelopeEvent), } } @@ -193,7 +193,7 @@ func TestConnectionManagerAddDrop(t *testing.T) { server := newFakeServer() whisper := newFakeEnvelopesEvents() target := 1 - connmanager := NewConnectionManager(server, whisper, target, 0) + connmanager := NewConnectionManager(server, whisper, target, 1, 0) connmanager.Start() defer connmanager.Stop() nodes := []*enode.Node{} @@ -235,7 +235,7 @@ func TestConnectionManagerReplace(t *testing.T) { server := newFakeServer() whisper := newFakeEnvelopesEvents() target := 1 - connmanager := NewConnectionManager(server, whisper, target, 0) + connmanager := NewConnectionManager(server, whisper, target, 1, 0) connmanager.Start() defer connmanager.Stop() nodes := []*enode.Node{} @@ -273,13 +273,9 @@ func TestConnectionManagerReplace(t *testing.T) { }, time.Second, 100*time.Millisecond)) } -func TestConnectionChangedAfterExpiry(t *testing.T) { - server := newFakeServer() - whisperMock := newFakeEnvelopesEvents() - target := 1 - connmanager := NewConnectionManager(server, whisperMock, target, 0) +func setupTestConnectionAfterExpiry(t *testing.T, server *fakePeerEvents, whisperMock *fakeEnvelopeEvents, target, maxFailures int, hash common.Hash) (*ConnectionManager, enode.ID) { + connmanager := NewConnectionManager(server, whisperMock, target, maxFailures, 0) connmanager.Start() - defer connmanager.Stop() nodes := []*enode.Node{} for _, n := range getMapWithRandomNodes(t, 2) { nodes = append(nodes, n) @@ -296,7 +292,6 @@ func TestConnectionChangedAfterExpiry(t *testing.T) { initial = nodes[0] return nil }, time.Second, 100*time.Millisecond)) - hash := common.Hash{1} // Send event that history request for connected peer was sent. select { case whisperMock.input <- whisper.EnvelopeEvent{ @@ -304,6 +299,18 @@ func TestConnectionChangedAfterExpiry(t *testing.T) { case <-time.After(time.Second): require.FailNow(t, "can't send a 'sent' event") } + return connmanager, initial +} + +func TestConnectionChangedAfterExpiry(t *testing.T) { + server := newFakeServer() + whisperMock := newFakeEnvelopesEvents() + target := 1 + maxFailures := 1 + hash := common.Hash{1} + connmanager, initial := setupTestConnectionAfterExpiry(t, server, whisperMock, target, maxFailures, hash) + defer connmanager.Stop() + // And eventually expired. select { case whisperMock.input <- whisper.EnvelopeEvent{ @@ -323,6 +330,54 @@ func TestConnectionChangedAfterExpiry(t *testing.T) { }, time.Second, 100*time.Millisecond)) } +func TestConnectionChangedAfterSecondExpiry(t *testing.T) { + server := newFakeServer() + whisperMock := newFakeEnvelopesEvents() + target := 1 + maxFailures := 2 + hash := common.Hash{1} + connmanager, initial := setupTestConnectionAfterExpiry(t, server, whisperMock, target, maxFailures, hash) + defer connmanager.Stop() + + // First expired is sent. Nothing should happen. + select { + case whisperMock.input <- whisper.EnvelopeEvent{ + Event: whisper.EventMailServerRequestExpired, Peer: initial, Hash: hash}: + case <-time.After(time.Second): + require.FailNow(t, "can't send an 'expiry' event") + } + + // we use 'eventually' as 'consistently' because this function will retry for a given timeout while error is received + require.EqualError(t, utils.Eventually(func() error { + nodes := server.Nodes() + if len(nodes) != target { + return fmt.Errorf("unexpected number of connected servers: %d", len(nodes)) + } + if nodes[0] == initial { + return fmt.Errorf("connected node wasn't changed from %s", initial) + } + return nil + }, time.Second, 100*time.Millisecond), fmt.Sprintf("connected node wasn't changed from %s", initial)) + + // second expiry event + select { + case whisperMock.input <- whisper.EnvelopeEvent{ + Event: whisper.EventMailServerRequestExpired, Peer: initial, Hash: hash}: + case <-time.After(time.Second): + require.FailNow(t, "can't send an 'expiry' event") + } + require.NoError(t, utils.Eventually(func() error { + nodes := server.Nodes() + if len(nodes) != target { + return fmt.Errorf("unexpected number of connected servers: %d", len(nodes)) + } + if nodes[0] == initial { + return fmt.Errorf("connected node wasn't changed from %s", initial) + } + return nil + }, time.Second, 100*time.Millisecond)) +} + func TestProcessReplacementWaitsForConnections(t *testing.T) { srv := newFakePeerAdderRemover() target := 1 diff --git a/services/shhext/service.go b/services/shhext/service.go index 5d5b514a0..b47e3333a 100644 --- a/services/shhext/service.go +++ b/services/shhext/service.go @@ -234,7 +234,12 @@ func (s *Service) Start(server *p2p.Server) error { if connectionsTarget == 0 { connectionsTarget = defaultConnectionsTarget } - s.connManager = mailservers.NewConnectionManager(server, s.w, connectionsTarget, defaultTimeoutWaitAdded) + maxFailures := s.config.MaxServerFailures + // if not defined change server on first expired event + if maxFailures == 0 { + maxFailures = 1 + } + s.connManager = mailservers.NewConnectionManager(server, s.w, connectionsTarget, maxFailures, defaultTimeoutWaitAdded) s.connManager.Start() if err := mailservers.EnsureUsedRecordsAddedFirst(s.peerStore, s.connManager); err != nil { return err diff --git a/services/shhext/service_test.go b/services/shhext/service_test.go index 13b34f3a5..7ac5db181 100644 --- a/services/shhext/service_test.go +++ b/services/shhext/service_test.go @@ -6,11 +6,13 @@ import ( "fmt" "io/ioutil" "math" + "net" "os" "testing" "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" @@ -22,6 +24,12 @@ import ( "github.com/syndtr/goleveldb/leveldb/storage" ) +const ( + // internal whisper protocol codes + statusCode = 0 + p2pRequestCompleteCode = 125 +) + func newHandlerMock(buf int) handlerMock { return handlerMock{ confirmations: make(chan common.Hash, buf), @@ -449,3 +457,90 @@ func waitForHashInTracker(track *tracker, hash common.Hash, state EnvelopeState, } } } + +func TestRequestMessagesSync(t *testing.T) { + suite.Run(t, new(RequestMessagesSyncSuite)) +} + +type RequestMessagesSyncSuite struct { + suite.Suite + + localAPI *PublicAPI + localNode *enode.Node + remoteRW *p2p.MsgPipeRW +} + +func (s *RequestMessagesSyncSuite) SetupTest() { + db, err := leveldb.Open(storage.NewMemStorage(), nil) + s.Require().NoError(err) + conf := &whisper.Config{ + MinimumAcceptedPOW: 0, + MaxMessageSize: 100 << 10, + } + w := whisper.New(conf) + pkey, err := crypto.GenerateKey() + s.Require().NoError(err) + node := enode.NewV4(&pkey.PublicKey, net.ParseIP("127.0.0.1"), 1, 1) + peer := p2p.NewPeer(node.ID(), "1", []p2p.Cap{{"shh", 6}}) + rw1, rw2 := p2p.MsgPipe() + errorc := make(chan error, 1) + go func() { + err := w.HandlePeer(peer, rw2) + errorc <- err + }() + s.Require().NoError(p2p.ExpectMsg(rw1, statusCode, []interface{}{whisper.ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), false, true})) + s.Require().NoError(p2p.SendItems(rw1, statusCode, whisper.ProtocolVersion, whisper.ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), true, true)) + + service := New(w, nil, db, params.ShhextConfig{}) + + s.localAPI = NewPublicAPI(service) + s.localNode = node + s.remoteRW = rw1 +} + +func (s *RequestMessagesSyncSuite) TestExpired() { + // intentionally discarding all requests, so that request will timeout + go func() { + msg, err := s.remoteRW.ReadMsg() + s.Require().NoError(err) + s.Require().NoError(msg.Discard()) + }() + s.Require().EqualError(s.localAPI.RequestMessagesSync(RetryConfig{ + BaseTimeout: time.Second, + }, MessagesRequest{ + MailServerPeer: s.localNode.String(), + }), "failed to request messages after 1 retries") +} + +func (s *RequestMessagesSyncSuite) testCompletedFromAttempt(target int) { + go func() { + attempt := 0 + for { + attempt++ + msg, err := s.remoteRW.ReadMsg() + s.Require().NoError(err) + if attempt < target { + s.Require().NoError(msg.Discard()) + continue + } + var e whisper.Envelope + s.Require().NoError(msg.Decode(&e)) + s.Require().NoError(p2p.Send(s.remoteRW, p2pRequestCompleteCode, whisper.CreateMailServerRequestCompletedPayload(e.Hash(), common.Hash{}, []byte{}))) + } + }() + s.Require().NoError(s.localAPI.RequestMessagesSync(RetryConfig{ + BaseTimeout: time.Second, + MaxRetries: target, + }, MessagesRequest{ + MailServerPeer: s.localNode.String(), + Force: true, // force true is convenient here because timeout is less then default delay (3s) + })) +} + +func (s *RequestMessagesSyncSuite) TestCompletedFromFirstAttempt() { + s.testCompletedFromAttempt(1) +} + +func (s *RequestMessagesSyncSuite) TestCompletedFromSecondAttempt() { + s.testCompletedFromAttempt(2) +}