add shhext_requestMessagesNew (#1412)

This commit is contained in:
Adam Babik 2019-03-15 14:27:08 +01:00 committed by GitHub
parent 9441f798bb
commit 4b790adf34
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 55 additions and 22 deletions

View File

@ -106,6 +106,17 @@ func (r *MessagesRequest) setDefaults(now time.Time) {
} }
} }
// MessagesResponse is a response for shhext_requestMessages2 method.
type MessagesResponse struct {
// Cursor from the response can be used to retrieve more messages
// for the previous request.
Cursor string `json:"cursor"`
// Error indicates that something wrong happened when sending messages
// to the requester.
Error error `json:"error"`
}
// SyncMessagesRequest is a SyncMessages() request payload. // SyncMessagesRequest is a SyncMessages() request payload.
type SyncMessagesRequest struct { type SyncMessagesRequest struct {
// MailServerPeer is MailServer's enode address. // MailServerPeer is MailServer's enode address.
@ -191,7 +202,9 @@ type RetryConfig struct {
} }
// RequestMessagesSync repeats MessagesRequest using configuration in retry conf. // RequestMessagesSync repeats MessagesRequest using configuration in retry conf.
func (api *PublicAPI) RequestMessagesSync(conf RetryConfig, r MessagesRequest) error { func (api *PublicAPI) RequestMessagesSync(conf RetryConfig, r MessagesRequest) (MessagesResponse, error) {
var resp MessagesResponse
shh := api.service.w shh := api.service.w
events := make(chan whisper.EnvelopeEvent, 10) events := make(chan whisper.EnvelopeEvent, 10)
sub := shh.SubscribeEnvelopeEvents(events) sub := shh.SubscribeEnvelopeEvents(events)
@ -207,19 +220,21 @@ func (api *PublicAPI) RequestMessagesSync(conf RetryConfig, r MessagesRequest) e
r.Timeout = time.Duration(int(r.Timeout.Seconds())) r.Timeout = time.Duration(int(r.Timeout.Seconds()))
requestID, err = api.RequestMessages(context.Background(), r) requestID, err = api.RequestMessages(context.Background(), r)
if err != nil { if err != nil {
return err return resp, err
} }
err = waitForExpiredOrCompleted(common.BytesToHash(requestID), events) mailServerResp, err := waitForExpiredOrCompleted(common.BytesToHash(requestID), events)
if err == nil { if err == nil {
return nil resp.Cursor = hex.EncodeToString(mailServerResp.Cursor)
resp.Error = mailServerResp.Error
return resp, nil
} }
retries++ retries++
api.log.Error("History request failed with %s. Making retry #%d", retries) api.log.Error("[RequestMessagesSync] failed", "err", err, "retries", retries)
} }
return fmt.Errorf("failed to request messages after %d retries", retries) return resp, fmt.Errorf("failed to request messages after %d retries", retries)
} }
func waitForExpiredOrCompleted(requestID common.Hash, events chan whisper.EnvelopeEvent) error { func waitForExpiredOrCompleted(requestID common.Hash, events chan whisper.EnvelopeEvent) (*whisper.MailServerResponse, error) {
for { for {
ev := <-events ev := <-events
if ev.Hash != requestID { if ev.Hash != requestID {
@ -227,9 +242,13 @@ func waitForExpiredOrCompleted(requestID common.Hash, events chan whisper.Envelo
} }
switch ev.Event { switch ev.Event {
case whisper.EventMailServerRequestCompleted: case whisper.EventMailServerRequestCompleted:
return nil data, ok := ev.Data.(*whisper.MailServerResponse)
if ok {
return data, nil
}
return nil, errors.New("invalid event data type")
case whisper.EventMailServerRequestExpired: case whisper.EventMailServerRequestExpired:
return errors.New("request expired") return nil, errors.New("request expired")
} }
} }
} }

View File

@ -2,6 +2,7 @@ package shhext
import ( import (
"context" "context"
"encoding/hex"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"math" "math"
@ -473,14 +474,22 @@ func (s *RequestMessagesSyncSuite) TestExpired() {
s.Require().NoError(err) s.Require().NoError(err)
s.Require().NoError(msg.Discard()) s.Require().NoError(msg.Discard())
}() }()
s.Require().EqualError(s.localAPI.RequestMessagesSync(RetryConfig{ _, err := s.localAPI.RequestMessagesSync(
BaseTimeout: time.Second, RetryConfig{
}, MessagesRequest{ BaseTimeout: time.Second,
MailServerPeer: s.localNode.String(), },
}), "failed to request messages after 1 retries") MessagesRequest{
MailServerPeer: s.localNode.String(),
},
)
s.Require().EqualError(err, "failed to request messages after 1 retries")
} }
func (s *RequestMessagesSyncSuite) testCompletedFromAttempt(target int) { func (s *RequestMessagesSyncSuite) testCompletedFromAttempt(target int) {
const cursorSize = 36 // taken from mailserver_response.go from whisperv6 package
cursor := [cursorSize]byte{}
cursor[0] = 0x01
go func() { go func() {
attempt := 0 attempt := 0
for { for {
@ -493,16 +502,21 @@ func (s *RequestMessagesSyncSuite) testCompletedFromAttempt(target int) {
} }
var e whisper.Envelope var e whisper.Envelope
s.Require().NoError(msg.Decode(&e)) s.Require().NoError(msg.Decode(&e))
s.Require().NoError(p2p.Send(s.remoteRW, p2pRequestCompleteCode, whisper.CreateMailServerRequestCompletedPayload(e.Hash(), common.Hash{}, []byte{}))) s.Require().NoError(p2p.Send(s.remoteRW, p2pRequestCompleteCode, whisper.CreateMailServerRequestCompletedPayload(e.Hash(), common.Hash{}, cursor[:])))
} }
}() }()
s.Require().NoError(s.localAPI.RequestMessagesSync(RetryConfig{ resp, err := s.localAPI.RequestMessagesSync(
BaseTimeout: time.Second, RetryConfig{
MaxRetries: target, BaseTimeout: time.Second,
}, MessagesRequest{ MaxRetries: target,
MailServerPeer: s.localNode.String(), },
Force: true, // force true is convenient here because timeout is less then default delay (3s) MessagesRequest{
})) MailServerPeer: s.localNode.String(),
Force: true, // force true is convenient here because timeout is less then default delay (3s)
},
)
s.Require().NoError(err)
s.Require().Equal(MessagesResponse{Cursor: hex.EncodeToString(cursor[:])}, resp)
} }
func (s *RequestMessagesSyncSuite) TestCompletedFromFirstAttempt() { func (s *RequestMessagesSyncSuite) TestCompletedFromFirstAttempt() {