Add request with retries api call

This commit is contained in:
Dmitry 2019-01-21 16:00:10 +02:00 committed by Dmitry Shulyak
parent d20b5dc3b3
commit c8a616688c
6 changed files with 234 additions and 17 deletions

View File

@ -319,6 +319,9 @@ type ShhextConfig struct {
ConnectionTarget int
// RequestsDelay used to ensure that no similar requests are sent within short periods of time.
RequestsDelay time.Duration
// MaxServerFailures defines maximum allowed expired requests before server will be swapped to another one.
MaxServerFailures int
}
// Validate validates the ShhextConfig struct and returns an error if inconsistent values are found

View File

@ -180,6 +180,58 @@ func (api *PublicAPI) getPeer(rawurl string) (*enode.Node, error) {
return enode.ParseV4(rawurl)
}
// RetryConfig specifies configuration for retries with timeout and max amount of retries.
type RetryConfig struct {
BaseTimeout time.Duration
// StepTimeout defines duration increase per each retry.
StepTimeout time.Duration
MaxRetries int
}
// RequestMessagesSync repeats MessagesRequest using configuration in retry conf.
func (api *PublicAPI) RequestMessagesSync(conf RetryConfig, r MessagesRequest) error {
shh := api.service.w
events := make(chan whisper.EnvelopeEvent, 10)
sub := shh.SubscribeEnvelopeEvents(events)
defer sub.Unsubscribe()
var (
requestID hexutil.Bytes
err error
retries int
)
for retries <= conf.MaxRetries {
r.Timeout = conf.BaseTimeout + conf.StepTimeout*time.Duration(retries)
// FIXME this weird conversion is required because MessagesRequest expects seconds but defines time.Duration
r.Timeout = time.Duration(int(r.Timeout.Seconds()))
requestID, err = api.RequestMessages(context.Background(), r)
if err != nil {
return err
}
err = waitForExpiredOrCompleted(common.BytesToHash(requestID), events)
if err == nil {
return nil
}
retries++
api.log.Error("History request failed with %s. Making retry #%d", retries)
}
return fmt.Errorf("failed to request messages after %d retries", retries)
}
func waitForExpiredOrCompleted(requestID common.Hash, events chan whisper.EnvelopeEvent) error {
for {
ev := <-events
if ev.Hash != requestID {
continue
}
switch ev.Event {
case whisper.EventMailServerRequestCompleted:
return nil
case whisper.EventMailServerRequestExpired:
return errors.New("request expired")
}
}
}
// RequestMessages sends a request for historic messages to a MailServer.
func (api *PublicAPI) RequestMessages(_ context.Context, r MessagesRequest) (hexutil.Bytes, error) {
api.log.Info("RequestMessages", "request", r)

View File

@ -39,11 +39,12 @@ type p2pServer interface {
}
// NewConnectionManager creates an instance of ConnectionManager.
func NewConnectionManager(server p2pServer, whisper EnvelopeEventSubscbriber, target int, timeout time.Duration) *ConnectionManager {
func NewConnectionManager(server p2pServer, whisper EnvelopeEventSubscbriber, target, maxFailures int, timeout time.Duration) *ConnectionManager {
return &ConnectionManager{
server: server,
whisper: whisper,
connectedTarget: target,
maxFailures: maxFailures,
notifications: make(chan []*enode.Node),
timeoutWaitAdded: timeout,
}
@ -60,6 +61,7 @@ type ConnectionManager struct {
notifications chan []*enode.Node
connectedTarget int
timeoutWaitAdded time.Duration
maxFailures int
}
// Notify sends a non-blocking notification about new nodes.
@ -81,12 +83,12 @@ func (ps *ConnectionManager) Start() {
ps.wg.Add(1)
go func() {
state := newInternalState(ps.server, ps.connectedTarget, ps.timeoutWaitAdded)
events := make(chan *p2p.PeerEvent, peerEventsBuffer)
sub := ps.server.SubscribeEvents(events)
whisperEvents := make(chan whisper.EnvelopeEvent, whisperEventsBuffer)
whisperSub := ps.whisper.SubscribeEnvelopeEvents(whisperEvents)
requests := map[common.Hash]struct{}{}
failuresPerServer := map[enode.ID]int{}
defer sub.Unsubscribe()
defer whisperSub.Unsubscribe()
@ -106,22 +108,27 @@ func (ps *ConnectionManager) Start() {
case ev := <-events:
processPeerEvent(state, ev)
case ev := <-whisperEvents:
// TODO what about completed but with error? what about expired envelopes?
// TODO treat failed requests the same way as expired
switch ev.Event {
case whisper.EventMailServerRequestSent:
requests[ev.Hash] = struct{}{}
case whisper.EventMailServerRequestCompleted:
// reset failures count on first success
failuresPerServer[ev.Peer] = 0
delete(requests, ev.Hash)
case whisper.EventMailServerRequestExpired:
_, exist := requests[ev.Hash]
if !exist {
continue
}
failuresPerServer[ev.Peer]++
log.Debug("request to a mail server expired, disconncet a peer", "address", ev.Peer)
if failuresPerServer[ev.Peer] >= ps.maxFailures {
state.nodeDisconnected(ev.Peer)
}
}
}
}
}()
}

View File

@ -86,7 +86,7 @@ type fakeEnvelopeEvents struct {
input chan whisper.EnvelopeEvent
}
func (f fakeEnvelopeEvents) SubscribeEnvelopeEvents(output chan<- whisper.EnvelopeEvent) event.Subscription {
func (f *fakeEnvelopeEvents) SubscribeEnvelopeEvents(output chan<- whisper.EnvelopeEvent) event.Subscription {
return event.NewSubscription(func(quit <-chan struct{}) error {
for {
select {
@ -100,8 +100,8 @@ func (f fakeEnvelopeEvents) SubscribeEnvelopeEvents(output chan<- whisper.Envelo
})
}
func newFakeEnvelopesEvents() fakeEnvelopeEvents {
return fakeEnvelopeEvents{
func newFakeEnvelopesEvents() *fakeEnvelopeEvents {
return &fakeEnvelopeEvents{
input: make(chan whisper.EnvelopeEvent),
}
}
@ -193,7 +193,7 @@ func TestConnectionManagerAddDrop(t *testing.T) {
server := newFakeServer()
whisper := newFakeEnvelopesEvents()
target := 1
connmanager := NewConnectionManager(server, whisper, target, 0)
connmanager := NewConnectionManager(server, whisper, target, 1, 0)
connmanager.Start()
defer connmanager.Stop()
nodes := []*enode.Node{}
@ -235,7 +235,7 @@ func TestConnectionManagerReplace(t *testing.T) {
server := newFakeServer()
whisper := newFakeEnvelopesEvents()
target := 1
connmanager := NewConnectionManager(server, whisper, target, 0)
connmanager := NewConnectionManager(server, whisper, target, 1, 0)
connmanager.Start()
defer connmanager.Stop()
nodes := []*enode.Node{}
@ -273,13 +273,9 @@ func TestConnectionManagerReplace(t *testing.T) {
}, time.Second, 100*time.Millisecond))
}
func TestConnectionChangedAfterExpiry(t *testing.T) {
server := newFakeServer()
whisperMock := newFakeEnvelopesEvents()
target := 1
connmanager := NewConnectionManager(server, whisperMock, target, 0)
func setupTestConnectionAfterExpiry(t *testing.T, server *fakePeerEvents, whisperMock *fakeEnvelopeEvents, target, maxFailures int, hash common.Hash) (*ConnectionManager, enode.ID) {
connmanager := NewConnectionManager(server, whisperMock, target, maxFailures, 0)
connmanager.Start()
defer connmanager.Stop()
nodes := []*enode.Node{}
for _, n := range getMapWithRandomNodes(t, 2) {
nodes = append(nodes, n)
@ -296,7 +292,6 @@ func TestConnectionChangedAfterExpiry(t *testing.T) {
initial = nodes[0]
return nil
}, time.Second, 100*time.Millisecond))
hash := common.Hash{1}
// Send event that history request for connected peer was sent.
select {
case whisperMock.input <- whisper.EnvelopeEvent{
@ -304,6 +299,18 @@ func TestConnectionChangedAfterExpiry(t *testing.T) {
case <-time.After(time.Second):
require.FailNow(t, "can't send a 'sent' event")
}
return connmanager, initial
}
func TestConnectionChangedAfterExpiry(t *testing.T) {
server := newFakeServer()
whisperMock := newFakeEnvelopesEvents()
target := 1
maxFailures := 1
hash := common.Hash{1}
connmanager, initial := setupTestConnectionAfterExpiry(t, server, whisperMock, target, maxFailures, hash)
defer connmanager.Stop()
// And eventually expired.
select {
case whisperMock.input <- whisper.EnvelopeEvent{
@ -323,6 +330,54 @@ func TestConnectionChangedAfterExpiry(t *testing.T) {
}, time.Second, 100*time.Millisecond))
}
func TestConnectionChangedAfterSecondExpiry(t *testing.T) {
server := newFakeServer()
whisperMock := newFakeEnvelopesEvents()
target := 1
maxFailures := 2
hash := common.Hash{1}
connmanager, initial := setupTestConnectionAfterExpiry(t, server, whisperMock, target, maxFailures, hash)
defer connmanager.Stop()
// First expired is sent. Nothing should happen.
select {
case whisperMock.input <- whisper.EnvelopeEvent{
Event: whisper.EventMailServerRequestExpired, Peer: initial, Hash: hash}:
case <-time.After(time.Second):
require.FailNow(t, "can't send an 'expiry' event")
}
// we use 'eventually' as 'consistently' because this function will retry for a given timeout while error is received
require.EqualError(t, utils.Eventually(func() error {
nodes := server.Nodes()
if len(nodes) != target {
return fmt.Errorf("unexpected number of connected servers: %d", len(nodes))
}
if nodes[0] == initial {
return fmt.Errorf("connected node wasn't changed from %s", initial)
}
return nil
}, time.Second, 100*time.Millisecond), fmt.Sprintf("connected node wasn't changed from %s", initial))
// second expiry event
select {
case whisperMock.input <- whisper.EnvelopeEvent{
Event: whisper.EventMailServerRequestExpired, Peer: initial, Hash: hash}:
case <-time.After(time.Second):
require.FailNow(t, "can't send an 'expiry' event")
}
require.NoError(t, utils.Eventually(func() error {
nodes := server.Nodes()
if len(nodes) != target {
return fmt.Errorf("unexpected number of connected servers: %d", len(nodes))
}
if nodes[0] == initial {
return fmt.Errorf("connected node wasn't changed from %s", initial)
}
return nil
}, time.Second, 100*time.Millisecond))
}
func TestProcessReplacementWaitsForConnections(t *testing.T) {
srv := newFakePeerAdderRemover()
target := 1

View File

@ -234,7 +234,12 @@ func (s *Service) Start(server *p2p.Server) error {
if connectionsTarget == 0 {
connectionsTarget = defaultConnectionsTarget
}
s.connManager = mailservers.NewConnectionManager(server, s.w, connectionsTarget, defaultTimeoutWaitAdded)
maxFailures := s.config.MaxServerFailures
// if not defined change server on first expired event
if maxFailures == 0 {
maxFailures = 1
}
s.connManager = mailservers.NewConnectionManager(server, s.w, connectionsTarget, maxFailures, defaultTimeoutWaitAdded)
s.connManager.Start()
if err := mailservers.EnsureUsedRecordsAddedFirst(s.peerStore, s.connManager); err != nil {
return err

View File

@ -6,11 +6,13 @@ import (
"fmt"
"io/ioutil"
"math"
"net"
"os"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
@ -22,6 +24,12 @@ import (
"github.com/syndtr/goleveldb/leveldb/storage"
)
const (
// internal whisper protocol codes
statusCode = 0
p2pRequestCompleteCode = 125
)
func newHandlerMock(buf int) handlerMock {
return handlerMock{
confirmations: make(chan common.Hash, buf),
@ -449,3 +457,90 @@ func waitForHashInTracker(track *tracker, hash common.Hash, state EnvelopeState,
}
}
}
func TestRequestMessagesSync(t *testing.T) {
suite.Run(t, new(RequestMessagesSyncSuite))
}
type RequestMessagesSyncSuite struct {
suite.Suite
localAPI *PublicAPI
localNode *enode.Node
remoteRW *p2p.MsgPipeRW
}
func (s *RequestMessagesSyncSuite) SetupTest() {
db, err := leveldb.Open(storage.NewMemStorage(), nil)
s.Require().NoError(err)
conf := &whisper.Config{
MinimumAcceptedPOW: 0,
MaxMessageSize: 100 << 10,
}
w := whisper.New(conf)
pkey, err := crypto.GenerateKey()
s.Require().NoError(err)
node := enode.NewV4(&pkey.PublicKey, net.ParseIP("127.0.0.1"), 1, 1)
peer := p2p.NewPeer(node.ID(), "1", []p2p.Cap{{"shh", 6}})
rw1, rw2 := p2p.MsgPipe()
errorc := make(chan error, 1)
go func() {
err := w.HandlePeer(peer, rw2)
errorc <- err
}()
s.Require().NoError(p2p.ExpectMsg(rw1, statusCode, []interface{}{whisper.ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), false, true}))
s.Require().NoError(p2p.SendItems(rw1, statusCode, whisper.ProtocolVersion, whisper.ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), true, true))
service := New(w, nil, db, params.ShhextConfig{})
s.localAPI = NewPublicAPI(service)
s.localNode = node
s.remoteRW = rw1
}
func (s *RequestMessagesSyncSuite) TestExpired() {
// intentionally discarding all requests, so that request will timeout
go func() {
msg, err := s.remoteRW.ReadMsg()
s.Require().NoError(err)
s.Require().NoError(msg.Discard())
}()
s.Require().EqualError(s.localAPI.RequestMessagesSync(RetryConfig{
BaseTimeout: time.Second,
}, MessagesRequest{
MailServerPeer: s.localNode.String(),
}), "failed to request messages after 1 retries")
}
func (s *RequestMessagesSyncSuite) testCompletedFromAttempt(target int) {
go func() {
attempt := 0
for {
attempt++
msg, err := s.remoteRW.ReadMsg()
s.Require().NoError(err)
if attempt < target {
s.Require().NoError(msg.Discard())
continue
}
var e whisper.Envelope
s.Require().NoError(msg.Decode(&e))
s.Require().NoError(p2p.Send(s.remoteRW, p2pRequestCompleteCode, whisper.CreateMailServerRequestCompletedPayload(e.Hash(), common.Hash{}, []byte{})))
}
}()
s.Require().NoError(s.localAPI.RequestMessagesSync(RetryConfig{
BaseTimeout: time.Second,
MaxRetries: target,
}, MessagesRequest{
MailServerPeer: s.localNode.String(),
Force: true, // force true is convenient here because timeout is less then default delay (3s)
}))
}
func (s *RequestMessagesSyncSuite) TestCompletedFromFirstAttempt() {
s.testCompletedFromAttempt(1)
}
func (s *RequestMessagesSyncSuite) TestCompletedFromSecondAttempt() {
s.testCompletedFromAttempt(2)
}