Support Mail Server data synchronization (#1302)

These changes add a support for syncing data between two Mail Servers. It does not give an easy way to start syncing. This will be solved in the next PR.
This commit is contained in:
Adam Babik 2018-12-06 10:48:28 +01:00 committed by GitHub
parent d51761f83e
commit 913dbfca9b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 482 additions and 132 deletions

6
Gopkg.lock generated
View File

@ -822,12 +822,12 @@
revision = "fbcc46a78cd43fef95a110df664aab513116a850" revision = "fbcc46a78cd43fef95a110df664aab513116a850"
[[projects]] [[projects]]
digest = "1:5c62af344925b846377386dec72e06eb3e1e15222542b3d22fe0f0da75c7f090" digest = "1:7f2aeb661efc22a59fff9f7e223c59844e32433c18b3567ad24d23758ecf2f64"
name = "github.com/status-im/whisper" name = "github.com/status-im/whisper"
packages = ["whisperv6"] packages = ["whisperv6"]
pruneopts = "NUT" pruneopts = "NUT"
revision = "96d2199ed511430c642d877afe7bacaac5f37426" revision = "6e5af097a1a80e2e407ff097d4dd22e747768d3b"
version = "v1.4.1" version = "v1.4.2"
[[projects]] [[projects]]
digest = "1:572c783a763db6383aca3179976eb80e4c900f52eba56cba8bb2e3cea7ce720e" digest = "1:572c783a763db6383aca3179976eb80e4c900f52eba56cba8bb2e3cea7ce720e"

View File

@ -29,7 +29,7 @@
[[constraint]] [[constraint]]
name = "github.com/status-im/whisper" name = "github.com/status-im/whisper"
version = "=v1.4.1" version = "=v1.4.2"
[[override]] [[override]]
name = "github.com/golang/protobuf" name = "github.com/golang/protobuf"

View File

@ -181,6 +181,13 @@ func verifyMailserverBehavior(mailserverNode *enode.Node) {
} }
requestID := common.BytesToHash(requestIDBytes) requestID := common.BytesToHash(requestIDBytes)
// wait for mailserver request sent event
err = waitForMailServerRequestSent(mailServerResponseWatcher, requestID, time.Duration(*timeout)*time.Second)
if err != nil {
logger.Error("Error waiting for mailserver request sent event", "error", err)
os.Exit(3)
}
// wait for mailserver response // wait for mailserver response
resp, err := waitForMailServerResponse(mailServerResponseWatcher, requestID, time.Duration(*timeout)*time.Second) resp, err := waitForMailServerResponse(mailServerResponseWatcher, requestID, time.Duration(*timeout)*time.Second)
if err != nil { if err != nil {
@ -313,6 +320,21 @@ func joinPublicChat(w *whisper.Whisper, rpcClient *rpc.Client, name string) (str
return keyID, topic, filterID, err return keyID, topic, filterID, err
} }
func waitForMailServerRequestSent(events chan whisper.EnvelopeEvent, requestID common.Hash, timeout time.Duration) error {
timeoutTimer := time.NewTimer(timeout)
for {
select {
case event := <-events:
if event.Hash == requestID && event.Event == whisper.EventMailServerRequestSent {
timeoutTimer.Stop()
return nil
}
case <-timeoutTimer.C:
return errors.New("timed out waiting for mailserver request sent")
}
}
}
func waitForMailServerResponse(events chan whisper.EnvelopeEvent, requestID common.Hash, timeout time.Duration) (*whisper.MailServerResponse, error) { func waitForMailServerResponse(events chan whisper.EnvelopeEvent, requestID common.Hash, timeout time.Duration) (*whisper.MailServerResponse, error) {
timeoutTimer := time.NewTimer(timeout) timeoutTimer := time.NewTimer(timeout)
for { for {
@ -345,7 +367,7 @@ func decodeMailServerResponse(event whisper.EnvelopeEvent) (*whisper.MailServerR
case whisper.EventMailServerRequestExpired: case whisper.EventMailServerRequestExpired:
return nil, errors.New("no messages available from mailserver") return nil, errors.New("no messages available from mailserver")
default: default:
return nil, errors.New("unexpected event type") return nil, fmt.Errorf("unexpected event type: %v", event.Event)
} }
} }

View File

@ -62,6 +62,7 @@ var (
requestValidationErrorsCounter = metrics.NewRegisteredCounter("mailserver/requestValidationErrors", nil) requestValidationErrorsCounter = metrics.NewRegisteredCounter("mailserver/requestValidationErrors", nil)
processRequestErrorsCounter = metrics.NewRegisteredCounter("mailserver/processRequestErrors", nil) processRequestErrorsCounter = metrics.NewRegisteredCounter("mailserver/processRequestErrors", nil)
historicResponseErrorsCounter = metrics.NewRegisteredCounter("mailserver/historicResponseErrors", nil) historicResponseErrorsCounter = metrics.NewRegisteredCounter("mailserver/historicResponseErrors", nil)
syncRequestsMeter = metrics.NewRegisteredMeter("mailserver/syncRequests", nil)
) )
const ( const (
@ -242,7 +243,10 @@ func (s *WMailServer) Archive(env *whisper.Envelope) {
// DeliverMail sends mail to specified whisper peer. // DeliverMail sends mail to specified whisper peer.
func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) { func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) {
defer recoverLevelDBPanics("DeliverMail")
log.Info("Delivering mail", "peerID", peerIDString(peer)) log.Info("Delivering mail", "peerID", peerIDString(peer))
requestsMeter.Mark(1) requestsMeter.Mark(1)
if peer == nil { if peer == nil {
@ -257,8 +261,6 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
return return
} }
defer recoverLevelDBPanics("DeliverMail")
var ( var (
lower, upper uint32 lower, upper uint32
bloom []byte bloom []byte
@ -288,24 +290,56 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
} }
log.Debug("Processing request", log.Debug("Processing request",
"lower", lower, "upper", upper, "lower", lower,
"upper", upper,
"bloom", bloom, "bloom", bloom,
"limit", limit, "limit", limit,
"cursor", cursor, "cursor", cursor,
"batch", batch) "batch", batch,
)
if batch { if batch {
requestsBatchedCounter.Inc(1) requestsBatchedCounter.Inc(1)
} }
_, lastEnvelopeHash, nextPageCursor, err := s.processRequest( iter := s.createIterator(lower, upper, cursor)
peer, defer iter.Release()
lower, upper,
bundles := make(chan []*whisper.Envelope, 5)
errCh := make(chan error)
go func() {
for bundle := range bundles {
if err := s.sendEnvelopes(peer, bundle, batch); err != nil {
errCh <- err
break
}
}
close(errCh)
}()
start := time.Now()
nextPageCursor, lastEnvelopeHash := s.processRequestInBundles(
iter,
bloom, bloom,
limit, int(limit),
cursor, bundles,
batch) )
if err != nil { requestProcessTimer.UpdateSince(start)
// bundles channel can be closed now because the processing finished.
close(bundles)
// Wait for the goroutine to finish the work. It may return an error.
if err := <-errCh; err != nil {
processRequestErrorsCounter.Inc(1)
log.Error("Error while processing mail server request", "err", err, "peerID", peerIDString(peer))
s.trySendHistoricMessageErrorResponse(peer, request, err)
return
}
// Processing of the request could be finished earlier due to iterator error.
if err := iter.Error(); err != nil {
processRequestErrorsCounter.Inc(1) processRequestErrorsCounter.Inc(1)
log.Error("Error while processing mail server request", "err", err, "peerID", peerIDString(peer)) log.Error("Error while processing mail server request", "err", err, "peerID", peerIDString(peer))
s.trySendHistoricMessageErrorResponse(peer, request, err) s.trySendHistoricMessageErrorResponse(peer, request, err)
@ -322,6 +356,80 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
} }
} }
// SyncMail syncs mail servers between two Mail Servers.
func (s *WMailServer) SyncMail(peer *whisper.Peer, request whisper.SyncMailRequest) error {
log.Info("Started syncing envelopes", "peer", peerIDString(peer), "req", request)
defer recoverLevelDBPanics("SyncMail")
syncRequestsMeter.Mark(1)
// Check rate limiting for a requesting peer.
if s.exceedsPeerRequests(peer.ID()) {
requestErrorsCounter.Inc(1)
log.Error("Peer exceeded request per seconds limit", "peerID", peerIDString(peer))
return fmt.Errorf("requests per seconds limit exceeded")
}
iter := s.createIterator(request.Lower, request.Upper, request.Cursor)
defer iter.Release()
bundles := make(chan []*whisper.Envelope, 5)
errCh := make(chan error)
go func() {
for bundle := range bundles {
resp := whisper.SyncResponse{Envelopes: bundle}
if err := s.w.SendSyncResponse(peer, resp); err != nil {
errCh <- fmt.Errorf("failed to send sync response: %v", err)
break
}
}
close(errCh)
}()
start := time.Now()
nextCursor, _ := s.processRequestInBundles(
iter,
request.Bloom,
int(request.Limit),
bundles,
)
requestProcessTimer.UpdateSince(start)
// bundles channel can be closed now because the processing finished.
close(bundles)
// Wait for the goroutine to finish the work. It may return an error.
if err := <-errCh; err != nil {
_ = s.w.SendSyncResponse(
peer,
whisper.SyncResponse{Error: "failed to send a response"},
)
return err
}
// Processing of the request could be finished earlier due to iterator error.
if err := iter.Error(); err != nil {
_ = s.w.SendSyncResponse(
peer,
whisper.SyncResponse{Error: "failed to process all envelopes"},
)
return fmt.Errorf("levelDB iterator failed: %v", err)
}
log.Info("Finished syncing envelopes", "peer", peerIDString(peer))
if err := s.w.SendSyncResponse(peer, whisper.SyncResponse{
Cursor: nextCursor,
Final: true,
}); err != nil {
return fmt.Errorf("failed to send the final sync response: %v", err)
}
return nil
}
// exceedsPeerRequests in case limit its been setup on the current server and limit // exceedsPeerRequests in case limit its been setup on the current server and limit
// allows the query, it will store/update new query time for the current peer. // allows the query, it will store/update new query time for the current peer.
func (s *WMailServer) exceedsPeerRequests(peer []byte) bool { func (s *WMailServer) exceedsPeerRequests(peer []byte) bool {
@ -360,45 +468,24 @@ func (s *WMailServer) createIterator(lower, upper uint32, cursor cursorType) ite
return i return i
} }
// processRequest processes the current request and re-sends all stored messages // processRequestInBundles processes envelopes using an iterator and passes them
// accomplishing lower and upper limits. The limit parameter determines the maximum number of // to the output channel in bundles.
// messages to be sent back for the current request. func (s *WMailServer) processRequestInBundles(
// The cursor parameter is used for pagination. iter iterator.Iterator, bloom []byte, limit int, output chan<- []*whisper.Envelope,
// After sending all the messages, a message of type p2pRequestCompleteCode is sent by the mailserver to ) (cursorType, common.Hash) {
// the peer.
func (s *WMailServer) processRequest(
peer *whisper.Peer,
lower, upper uint32,
bloom []byte,
limit uint32,
cursor cursorType,
batch bool,
) (ret []*whisper.Envelope, lastEnvelopeHash common.Hash, nextPageCursor cursorType, err error) {
// Recover from possible goleveldb panics
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("recovered from panic in processRequest: %v", r)
}
}()
var (
sentEnvelopes uint32
sentEnvelopesSize int64
)
i := s.createIterator(lower, upper, cursor)
defer i.Release()
var ( var (
bundle []*whisper.Envelope bundle []*whisper.Envelope
bundleSize uint32 bundleSize uint32
processedEnvelopes int
processedEnvelopesSize int64
nextCursor cursorType
lastEnvelopeHash common.Hash
) )
start := time.Now() for iter.Prev() {
for i.Prev() {
var envelope whisper.Envelope var envelope whisper.Envelope
decodeErr := rlp.DecodeBytes(i.Value(), &envelope)
decodeErr := rlp.DecodeBytes(iter.Value(), &envelope)
if decodeErr != nil { if decodeErr != nil {
log.Error("failed to decode RLP", "err", decodeErr) log.Error("failed to decode RLP", "err", decodeErr)
continue continue
@ -409,7 +496,7 @@ func (s *WMailServer) processRequest(
} }
newSize := bundleSize + whisper.EnvelopeHeaderLength + uint32(len(envelope.Data)) newSize := bundleSize + whisper.EnvelopeHeaderLength + uint32(len(envelope.Data))
limitReached := limit != noLimits && (int(sentEnvelopes)+len(bundle)) == int(limit) limitReached := limit != noLimits && (processedEnvelopes+len(bundle)) == limit
if !limitReached && newSize < s.w.MaxMessageSize() { if !limitReached && newSize < s.w.MaxMessageSize() {
bundle = append(bundle, &envelope) bundle = append(bundle, &envelope)
bundleSize = newSize bundleSize = newSize
@ -417,30 +504,22 @@ func (s *WMailServer) processRequest(
continue continue
} }
if peer == nil { output <- bundle
// used for test purposes bundle = bundle[:0]
ret = append(ret, bundle...)
} else {
err = s.sendEnvelopes(peer, bundle, batch)
if err != nil {
return
}
}
sentEnvelopes += uint32(len(bundle))
sentEnvelopesSize += int64(bundleSize)
if limitReached {
bundle = nil
bundleSize = 0 bundleSize = 0
processedEnvelopes += len(bundle)
processedEnvelopesSize += int64(bundleSize)
if limitReached {
// When the limit is reached, the current retrieved envelope // When the limit is reached, the current retrieved envelope
// is not included in the response. // is not included in the response.
// The nextPageCursor is a key used as a limit in a range and // The nextCursor is a key used as a limit in a range and
// is not included in the range, hence, we need to get // is not included in the range, hence, we need to get
// the previous iterator key. // the previous iterator key.
i.Next() // Because we iterate backwards, we use Next().
nextPageCursor = i.Key() iter.Next()
nextCursor = iter.Key()
break break
} else { } else {
// Reset bundle information and add the last read envelope // Reset bundle information and add the last read envelope
@ -452,28 +531,15 @@ func (s *WMailServer) processRequest(
lastEnvelopeHash = envelope.Hash() lastEnvelopeHash = envelope.Hash()
} }
// Send any outstanding envelopes. // There might be some outstanding elements in the bundle.
if len(bundle) > 0 && bundleSize > 0 { if len(bundle) > 0 && bundleSize > 0 {
if peer == nil { output <- bundle
ret = append(ret, bundle...)
} else {
err = s.sendEnvelopes(peer, bundle, batch)
if err != nil {
return
}
}
} }
requestProcessTimer.UpdateSince(start) sentEnvelopesMeter.Mark(int64(processedEnvelopes))
sentEnvelopesMeter.Mark(int64(sentEnvelopes)) sentEnvelopesSizeMeter.Mark(processedEnvelopesSize)
sentEnvelopesSizeMeter.Mark(sentEnvelopesSize)
err = i.Error() return nextCursor, lastEnvelopeHash
if err != nil {
err = fmt.Errorf("levelDB iterator error: %v", err)
}
return
} }
func (s *WMailServer) sendEnvelopes(peer *whisper.Peer, envelopes []*whisper.Envelope, batch bool) error { func (s *WMailServer) sendEnvelopes(peer *whisper.Peer, envelopes []*whisper.Envelope, batch bool) error {

View File

@ -54,9 +54,7 @@ func (s *MailServerDBPanicSuite) TestArchive() {
func (s *MailServerDBPanicSuite) TestDeliverMail() { func (s *MailServerDBPanicSuite) TestDeliverMail() {
defer s.testPanicRecover("DeliverMail") defer s.testPanicRecover("DeliverMail")
_, _, _, err := s.server.processRequest(nil, 10, 20, []byte{}, 0, nil, false) s.server.DeliverMail(&whisper.Peer{}, &whisper.Envelope{})
s.Error(err)
s.Equal("recovered from panic in processRequest: panicDB panic on NewIterator", err.Error())
} }
func (s *MailServerDBPanicSuite) testPanicRecover(method string) { func (s *MailServerDBPanicSuite) testPanicRecover(method string) {

View File

@ -330,31 +330,26 @@ func (s *MailserverSuite) TestRequestPaginationLimit() {
s.Nil(cursor) s.Nil(cursor)
s.Equal(params.limit, limit) s.Equal(params.limit, limit)
envelopes, _, cursor, err := s.server.processRequest(nil, lower, upper, bloom, limit, nil, false) receivedHashes, cursor, _ = processRequestAndCollectHashes(
s.NoError(err) s.server, lower, upper, cursor, bloom, int(limit),
for _, env := range envelopes { )
receivedHashes = append(receivedHashes, env.Hash())
}
// 10 envelopes sent // 10 envelopes sent
s.Equal(count, uint32(len(sentEnvelopes))) s.Equal(count, uint32(len(sentEnvelopes)))
// 6 envelopes received // 6 envelopes received
s.Equal(limit, uint32(len(receivedHashes))) s.Equal(int(limit), len(receivedHashes))
// the 6 envelopes received should be in descending order // the 6 envelopes received should be in descending order
s.Equal(reverseSentHashes[:limit], receivedHashes) s.Equal(reverseSentHashes[:limit], receivedHashes)
// cursor should be the key of the last envelope of the last page // cursor should be the key of the last envelope of the last page
s.Equal(archiveKeys[count-limit], fmt.Sprintf("%x", cursor)) s.Equal(archiveKeys[count-limit], fmt.Sprintf("%x", cursor))
// second page // second page
receivedHashes = []common.Hash{} receivedHashes, cursor, _ = processRequestAndCollectHashes(
envelopes, _, cursor, err = s.server.processRequest(nil, lower, upper, bloom, limit, cursor, false) s.server, lower, upper, cursor, bloom, int(limit),
s.NoError(err) )
for _, env := range envelopes {
receivedHashes = append(receivedHashes, env.Hash())
}
// 4 envelopes received // 4 envelopes received
s.Equal(count-limit, uint32(len(receivedHashes))) s.Equal(int(count-limit), len(receivedHashes))
// cursor is nil because there are no other pages // cursor is nil because there are no other pages
s.Nil(cursor) s.Nil(cursor)
} }
@ -476,16 +471,15 @@ func (s *MailserverSuite) TestDecodeRequest() {
} }
func (s *MailserverSuite) messageExists(envelope *whisper.Envelope, low, upp uint32, bloom []byte, limit uint32) bool { func (s *MailserverSuite) messageExists(envelope *whisper.Envelope, low, upp uint32, bloom []byte, limit uint32) bool {
var exist bool receivedHashes, _, _ := processRequestAndCollectHashes(
mail, _, _, err := s.server.processRequest(nil, low, upp, bloom, limit, nil, false) s.server, low, upp, nil, bloom, int(limit),
s.NoError(err) )
for _, msg := range mail { for _, hash := range receivedHashes {
if msg.Hash() == envelope.Hash() { if hash == envelope.Hash() {
exist = true return true
break
} }
} }
return exist return false
} }
func (s *MailserverSuite) TestBloomFromReceivedMessage() { func (s *MailserverSuite) TestBloomFromReceivedMessage() {
@ -640,6 +634,32 @@ func generateEnvelope(sentTime time.Time) (*whisper.Envelope, error) {
return generateEnvelopeWithKeys(sentTime, h[:], nil) return generateEnvelopeWithKeys(sentTime, h[:], nil)
} }
func processRequestAndCollectHashes(
server *WMailServer, lower, upper uint32, cursor cursorType, bloom []byte, limit int,
) ([]common.Hash, cursorType, common.Hash) {
iter := server.createIterator(lower, upper, cursor)
defer iter.Release()
bundles := make(chan []*whisper.Envelope, 10)
done := make(chan struct{})
var hashes []common.Hash
go func() {
for bundle := range bundles {
for _, env := range bundle {
hashes = append(hashes, env.Hash())
}
}
close(done)
}()
cursor, lastHash := server.processRequestInBundles(iter, bloom, limit, bundles)
close(bundles)
<-done
return hashes, cursor, lastHash
}
// mockPeerWithID is a struct that conforms to peerWithID interface. // mockPeerWithID is a struct that conforms to peerWithID interface.
type mockPeerWithID struct { type mockPeerWithID struct {
id []byte id []byte

View File

@ -19,6 +19,8 @@ import (
"github.com/ethereum/go-ethereum/crypto/sha3" "github.com/ethereum/go-ethereum/crypto/sha3"
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
"github.com/status-im/status-go/api" "github.com/status-im/status-go/api"
"github.com/status-im/status-go/mailserver"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/rpc" "github.com/status-im/status-go/rpc"
"github.com/status-im/status-go/t/helpers" "github.com/status-im/status-go/t/helpers"
. "github.com/status-im/status-go/t/utils" . "github.com/status-im/status-go/t/utils"
@ -39,7 +41,7 @@ func TestWhisperMailboxTestSuite(t *testing.T) {
func (s *WhisperMailboxSuite) TestRequestMessageFromMailboxAsync() { func (s *WhisperMailboxSuite) TestRequestMessageFromMailboxAsync() {
var err error var err error
// Start mailbox and status node. // Start mailbox and status node.
mailboxBackend, stop := s.startMailboxBackend() mailboxBackend, stop := s.startMailboxBackend("")
defer stop() defer stop()
s.Require().True(mailboxBackend.IsNodeRunning()) s.Require().True(mailboxBackend.IsNodeRunning())
mailboxNode := mailboxBackend.StatusNode().GethNode() mailboxNode := mailboxBackend.StatusNode().GethNode()
@ -150,7 +152,7 @@ func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() {
var err error var err error
// Start mailbox, alice, bob, charlie node. // Start mailbox, alice, bob, charlie node.
mailboxBackend, stop := s.startMailboxBackend() mailboxBackend, stop := s.startMailboxBackend("")
defer stop() defer stop()
aliceBackend, stop := s.startBackend("alice") aliceBackend, stop := s.startBackend("alice")
@ -333,7 +335,7 @@ func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() {
func (s *WhisperMailboxSuite) TestRequestMessagesWithPagination() { func (s *WhisperMailboxSuite) TestRequestMessagesWithPagination() {
// Start mailbox // Start mailbox
mailbox, stop := s.startMailboxBackend() mailbox, stop := s.startMailboxBackend("")
defer stop() defer stop()
s.Require().True(mailbox.IsNodeRunning()) s.Require().True(mailbox.IsNodeRunning())
mailboxEnode := mailbox.StatusNode().GethNode().Server().NodeInfo().Enode mailboxEnode := mailbox.StatusNode().GethNode().Server().NodeInfo().Enode
@ -456,6 +458,133 @@ func (s *WhisperMailboxSuite) TestRequestMessagesWithPagination() {
s.Equal(sentEnvelopesHashes, allReceivedHashes) s.Equal(sentEnvelopesHashes, allReceivedHashes)
} }
// TestSyncBetweenTwoMailServers tests syncing state between two mail servers
// using `shhext_requestMessages`.
func (s *WhisperMailboxSuite) TestSyncBetweenTwoMailServers() {
var (
chatName = "some-public-chat"
)
// Start mailbox but with mail server disabled.
// MailServer will be registered below manually.
mailbox, stop := s.startMailboxBackendWithCallback("", func(c *params.NodeConfig) {
c.WhisperConfig.EnableMailServer = false
})
defer stop()
s.Require().True(mailbox.IsNodeRunning())
mailboxEnode := mailbox.StatusNode().GethNode().Server().NodeInfo().Enode
// Whisper services
mailboxWhisperService, err := mailbox.StatusNode().WhisperService()
s.Require().NoError(err)
// symmetric key for public chats
symKeyID, err := mailboxWhisperService.AddSymKeyFromPassword(chatName)
s.Require().NoError(err)
publicChatSymKey, err := mailboxWhisperService.GetSymKey(symKeyID)
s.Require().NoError(err)
var mailServer mailserver.WMailServer
err = mailServer.Init(mailboxWhisperService, &mailbox.StatusNode().Config().WhisperConfig)
s.Require().NoError(err)
mailboxWhisperService.RegisterServer(&mailServer)
// envelopes to archive
envelopesCount := 5
topic := s.createPublicChatTopic(chatName)
for i := 0; i < envelopesCount; i++ {
params := whisper.MessageParams{
Topic: topic,
WorkTime: 10,
PoW: 2.0,
Payload: []byte("some-payload"),
KeySym: publicChatSymKey,
}
sentMessage, err := whisper.NewSentMessage(&params)
s.Require().NoError(err)
envelope, err := sentMessage.Wrap(&params, time.Now())
s.Require().NoError(err)
mailServer.Archive(envelope)
}
// Start a second mail server which would like to sync the state.
emptyMailbox, stopEmptyMailbox := s.startMailboxBackend("empty")
defer stopEmptyMailbox()
s.Require().True(emptyMailbox.IsNodeRunning())
emptyMailboxEnode := emptyMailbox.StatusNode().Server().Self().String()
errCh := helpers.WaitForPeerAsync(emptyMailbox.StatusNode().Server(), mailboxEnode, p2p.PeerEventTypeAdd, 5*time.Second)
s.Require().NoError(emptyMailbox.StatusNode().AddPeer(mailboxEnode))
s.Require().NoError(<-errCh)
emptyMailboxWhisperService, err := emptyMailbox.StatusNode().WhisperService()
s.Require().NoError(err)
err = emptyMailboxWhisperService.AllowP2PMessagesFromPeer(
mailbox.StatusNode().Server().Self().ID().Bytes(),
)
s.Require().NoError(err)
err = emptyMailboxWhisperService.SyncMessages(
mailbox.StatusNode().Server().Self().ID().Bytes(),
whisper.SyncMailRequest{
Lower: 0,
Upper: uint32(time.Now().Unix()),
Bloom: whisper.MakeFullNodeBloom(),
},
)
s.Require().NoError(err)
// create and start a client
client, stop := s.startBackend("client")
defer stop()
s.Require().True(client.IsNodeRunning())
clientWhisperService, err := client.StatusNode().WhisperService()
s.Require().NoError(err)
clientRPCClient := client.StatusNode().RPCPrivateClient()
// create filter for messages
messagesKeyID, err := clientWhisperService.AddSymKeyFromPassword(chatName)
s.Require().NoError(err)
messageFilterID := s.createGroupChatMessageFilter(clientRPCClient, messagesKeyID, topic.String())
// add mailbox password
mailServerKeyID, err := clientWhisperService.AddSymKeyFromPassword(mailboxPassword)
s.Require().NoError(err)
// add the second mail server as a peer
errCh = helpers.WaitForPeerAsync(
client.StatusNode().Server(),
emptyMailboxEnode,
p2p.PeerEventTypeAdd,
5*time.Second)
s.Require().NoError(client.StatusNode().AddPeer(emptyMailboxEnode))
s.Require().NoError(<-errCh)
// request for messages
mailboxResponseWatcher := make(chan whisper.EnvelopeEvent, 1024)
sub := clientWhisperService.SubscribeEnvelopeEvents(mailboxResponseWatcher)
defer sub.Unsubscribe()
requestID := s.requestHistoricMessagesFromLast12Hours(
clientWhisperService,
clientRPCClient,
emptyMailboxEnode,
mailServerKeyID,
topic.String(),
0,
"",
)
response := s.waitForMailServerResponse(mailboxResponseWatcher, requestID)
s.Require().NotEqual(common.Hash{}.Bytes(), response.LastEnvelopeHash.Bytes())
// get messages
messages := s.getMessagesByMessageFilterID(clientRPCClient, messageFilterID)
s.Require().NotEmpty(messages)
}
func (s *WhisperMailboxSuite) waitForEnvelopeEvents(events chan whisper.EnvelopeEvent, hashes []string, event whisper.EventType) { func (s *WhisperMailboxSuite) waitForEnvelopeEvents(events chan whisper.EnvelopeEvent, hashes []string, event whisper.EventType) {
check := make(map[string]struct{}) check := make(map[string]struct{})
for _, hash := range hashes { for _, hash := range hashes {
@ -558,11 +687,23 @@ func (s *WhisperMailboxSuite) startBackend(name string) (*api.StatusBackend, fun
} }
// Start mailbox node. // Start mailbox node.
func (s *WhisperMailboxSuite) startMailboxBackend() (*api.StatusBackend, func()) { func (s *WhisperMailboxSuite) startMailboxBackend(name string) (*api.StatusBackend, func()) {
mailboxBackend := api.NewStatusBackend() return s.startMailboxBackendWithCallback(name, nil)
}
func (s *WhisperMailboxSuite) startMailboxBackendWithCallback(
name string,
callback func(*params.NodeConfig),
) (*api.StatusBackend, func()) {
if name == "" {
name = "mailserver"
}
mailboxConfig, err := MakeTestNodeConfig(GetNetworkID()) mailboxConfig, err := MakeTestNodeConfig(GetNetworkID())
s.Require().NoError(err) s.Require().NoError(err)
datadir := filepath.Join(RootDir, ".ethereumtest/mailbox/mailserver")
mailboxBackend := api.NewStatusBackend()
datadir := filepath.Join(RootDir, ".ethereumtest/mailbox", name)
mailboxConfig.LightEthConfig.Enabled = false mailboxConfig.LightEthConfig.Enabled = false
mailboxConfig.WhisperConfig.Enabled = true mailboxConfig.WhisperConfig.Enabled = true
@ -572,6 +713,10 @@ func (s *WhisperMailboxSuite) startMailboxBackend() (*api.StatusBackend, func())
mailboxConfig.WhisperConfig.DataDir = filepath.Join(datadir, "data") mailboxConfig.WhisperConfig.DataDir = filepath.Join(datadir, "data")
mailboxConfig.DataDir = datadir mailboxConfig.DataDir = datadir
if callback != nil {
callback(mailboxConfig)
}
s.Require().False(mailboxBackend.IsNodeRunning()) s.Require().False(mailboxBackend.IsNodeRunning())
s.Require().NoError(mailboxBackend.StartNode(mailboxConfig)) s.Require().NoError(mailboxBackend.StartNode(mailboxConfig))
s.Require().True(mailboxBackend.IsNodeRunning()) s.Require().True(mailboxBackend.IsNodeRunning())
@ -579,7 +724,7 @@ func (s *WhisperMailboxSuite) startMailboxBackend() (*api.StatusBackend, func())
s.True(mailboxBackend.IsNodeRunning()) s.True(mailboxBackend.IsNodeRunning())
s.NoError(mailboxBackend.StopNode()) s.NoError(mailboxBackend.StopNode())
s.False(mailboxBackend.IsNodeRunning()) s.False(mailboxBackend.IsNodeRunning())
err = os.RemoveAll(datadir) err := os.RemoveAll(datadir)
s.Require().NoError(err) s.Require().NoError(err)
} }
} }
@ -608,7 +753,7 @@ func (s *WhisperMailboxSuite) createGroupChatMessageFilter(rpcCli *rpc.Client, s
resp := rpcCli.CallRaw(`{ resp := rpcCli.CallRaw(`{
"jsonrpc": "2.0", "jsonrpc": "2.0",
"method": "shh_newMessageFilter", "params": [ "method": "shh_newMessageFilter", "params": [
{"symKeyID": "` + symkeyID + `", "topics": [ "` + topic + `"], "allowP2P":true} {"symKeyID": "` + symkeyID + `", "topics": ["` + topic + `"], "allowP2P": true}
], ],
"id": 1 "id": 1
}`) }`)
@ -745,17 +890,20 @@ func (s *WhisperMailboxSuite) requestHistoricMessages(w *whisper.Whisper, rpcCli
return common.Hash{} return common.Hash{}
} }
func (s *WhisperMailboxSuite) createPublicChatTopic(name string) whisper.TopicType {
h := sha3.NewKeccak256()
_, err := h.Write([]byte(name))
if err != nil {
s.Fail("error generating topic", "failed gerating topic from chat name, %+v", err)
}
return whisper.BytesToTopic(h.Sum(nil))
}
func (s *WhisperMailboxSuite) joinPublicChat(w *whisper.Whisper, rpcClient *rpc.Client, name string) (string, whisper.TopicType, string) { func (s *WhisperMailboxSuite) joinPublicChat(w *whisper.Whisper, rpcClient *rpc.Client, name string) (string, whisper.TopicType, string) {
keyID, err := w.AddSymKeyFromPassword(name) keyID, err := w.AddSymKeyFromPassword(name)
s.Require().NoError(err) s.Require().NoError(err)
h := sha3.NewKeccak256() topic := s.createPublicChatTopic(name)
_, err = h.Write([]byte(name))
if err != nil {
s.Fail("error generating topic", "failed gerating topic from chat name, %+v", err)
}
fullTopic := h.Sum(nil)
topic := whisper.BytesToTopic(fullTopic)
filterID := s.createGroupChatMessageFilter(rpcClient, keyID, topic.String()) filterID := s.createGroupChatMessageFilter(rpcClient, keyID, topic.String())

View File

@ -48,6 +48,8 @@ const (
powRequirementCode = 2 // PoW requirement powRequirementCode = 2 // PoW requirement
bloomFilterExCode = 3 // bloom filter exchange bloomFilterExCode = 3 // bloom filter exchange
batchAcknowledgedCode = 11 // confirmation that batch of envelopes was received batchAcknowledgedCode = 11 // confirmation that batch of envelopes was received
p2pSyncRequestCode = 123 // used to sync envelopes between two mail servers
p2pSyncResponseCode = 124 // used to sync envelopes between two mail servers
p2pRequestCompleteCode = 125 // peer-to-peer message, used by Dapp protocol p2pRequestCompleteCode = 125 // peer-to-peer message, used by Dapp protocol
p2pRequestCode = 126 // peer-to-peer message, used by Dapp protocol p2pRequestCode = 126 // peer-to-peer message, used by Dapp protocol
p2pMessageCode = 127 // peer-to-peer message (to be consumed by the peer, but not forwarded any further) p2pMessageCode = 127 // peer-to-peer message (to be consumed by the peer, but not forwarded any further)
@ -89,4 +91,36 @@ const (
type MailServer interface { type MailServer interface {
Archive(env *Envelope) Archive(env *Envelope)
DeliverMail(whisperPeer *Peer, request *Envelope) DeliverMail(whisperPeer *Peer, request *Envelope)
SyncMail(*Peer, SyncMailRequest) error
}
// SyncMailRequest contains details which envelopes should be synced
// between Mail Servers.
type SyncMailRequest struct {
// Lower is a lower bound of time range for which messages are requested.
Lower uint32
// Upper is a lower bound of time range for which messages are requested.
Upper uint32
// Bloom is a bloom filter to filter envelopes.
Bloom []byte
// Limit is the max number of envelopes to return.
Limit uint32
// Cursor is used for pagination of the results.
Cursor []byte
}
// SyncResponse is a struct representing a response sent to the peer
// asking for syncing archived envelopes.
type SyncResponse struct {
Envelopes []*Envelope
Cursor []byte
Final bool // if true it means all envelopes were processed
Error string
}
// IsFinal returns true if it's the final response for the request.
// It might be a successful final response (r.Final being true)
// or an error occured (r.Error being not empty).
func (r SyncResponse) IsFinal() bool {
return r.Final || r.Error != ""
} }

View File

@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/p2p/enode"
) )
const ( const (
@ -40,22 +41,22 @@ func CreateMailServerRequestFailedPayload(requestID common.Hash, err error) []by
// * request completed successfully // * request completed successfully
// * request failed // * request failed
// If the payload is unknown/unparseable, it returns `nil` // If the payload is unknown/unparseable, it returns `nil`
func CreateMailServerEvent(payload []byte) (*EnvelopeEvent, error) { func CreateMailServerEvent(nodeID enode.ID, payload []byte) (*EnvelopeEvent, error) {
if len(payload) < common.HashLength { if len(payload) < common.HashLength {
return nil, invalidResponseSizeError(len(payload)) return nil, invalidResponseSizeError(len(payload))
} }
event, err := tryCreateMailServerRequestFailedEvent(payload) event, err := tryCreateMailServerRequestFailedEvent(nodeID, payload)
if err != nil || event != nil { if err != nil || event != nil {
return event, err return event, err
} }
return tryCreateMailServerRequestCompletedEvent(payload) return tryCreateMailServerRequestCompletedEvent(nodeID, payload)
} }
func tryCreateMailServerRequestFailedEvent(payload []byte) (*EnvelopeEvent, error) { func tryCreateMailServerRequestFailedEvent(nodeID enode.ID, payload []byte) (*EnvelopeEvent, error) {
if len(payload) < common.HashLength+len(mailServerFailedPayloadPrefix) { if len(payload) < common.HashLength+len(mailServerFailedPayloadPrefix) {
return nil, nil return nil, nil
} }
@ -75,6 +76,7 @@ func tryCreateMailServerRequestFailedEvent(payload []byte) (*EnvelopeEvent, erro
errorMsg = string(remainder) errorMsg = string(remainder)
event := EnvelopeEvent{ event := EnvelopeEvent{
Peer: nodeID,
Hash: requestID, Hash: requestID,
Event: EventMailServerRequestCompleted, Event: EventMailServerRequestCompleted,
Data: &MailServerResponse{ Data: &MailServerResponse{
@ -86,7 +88,7 @@ func tryCreateMailServerRequestFailedEvent(payload []byte) (*EnvelopeEvent, erro
} }
func tryCreateMailServerRequestCompletedEvent(payload []byte) (*EnvelopeEvent, error) { func tryCreateMailServerRequestCompletedEvent(nodeID enode.ID, payload []byte) (*EnvelopeEvent, error) {
// check if payload is // check if payload is
// - requestID or // - requestID or
// - requestID + lastEnvelopeHash or // - requestID + lastEnvelopeHash or
@ -115,6 +117,7 @@ func tryCreateMailServerRequestCompletedEvent(payload []byte) (*EnvelopeEvent, e
} }
event := EnvelopeEvent{ event := EnvelopeEvent{
Peer: nodeID,
Hash: requestID, Hash: requestID,
Event: EventMailServerRequestCompleted, Event: EventMailServerRequestCompleted,
Data: &MailServerResponse{ Data: &MailServerResponse{

View File

@ -452,6 +452,25 @@ func (whisper *Whisper) SendHistoricMessageResponse(peer *Peer, payload []byte)
return peer.ws.WriteMsg(p2p.Msg{Code: p2pRequestCompleteCode, Size: uint32(size), Payload: r}) return peer.ws.WriteMsg(p2p.Msg{Code: p2pRequestCompleteCode, Size: uint32(size), Payload: r})
} }
// SyncMessages can be sent between two Mail Servers and syncs envelopes between them.
func (whisper *Whisper) SyncMessages(peerID []byte, req SyncMailRequest) error {
if whisper.mailServer == nil {
return errors.New("can not sync messages if Mail Server is not configured")
}
p, err := whisper.getPeer(peerID)
if err != nil {
return err
}
return p2p.Send(p.ws, p2pSyncRequestCode, req)
}
// SendSyncResponse sends a response to a Mail Server with a slice of envelopes.
func (whisper *Whisper) SendSyncResponse(p *Peer, data SyncResponse) error {
return p2p.Send(p.ws, p2pSyncResponseCode, data)
}
// SendP2PMessage sends a peer-to-peer message to a specific peer. // SendP2PMessage sends a peer-to-peer message to a specific peer.
func (whisper *Whisper) SendP2PMessage(peerID []byte, envelopes ...*Envelope) error { func (whisper *Whisper) SendP2PMessage(peerID []byte, envelopes ...*Envelope) error {
p, err := whisper.getPeer(peerID) p, err := whisper.getPeer(peerID)
@ -952,6 +971,46 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
return fmt.Errorf("invalid direct message: %v", err) return fmt.Errorf("invalid direct message: %v", err)
} }
} }
case p2pSyncRequestCode:
// TODO(adam): should we limit who can send this request?
if whisper.mailServer != nil {
var request SyncMailRequest
if err = packet.Decode(&request); err != nil {
return fmt.Errorf("failed to decode p2pSyncRequestCode payload: %v", err)
}
if err := whisper.mailServer.SyncMail(p, request); err != nil {
log.Error("failed to sync envelopes", "peer", p.peer.ID().String())
}
} else {
log.Debug("requested to sync messages but mail servers is not registered", "peer", p.peer.ID().String())
}
case p2pSyncResponseCode:
// TODO(adam): currently, there is no feedback when a sync response
// is received. An idea to fix this:
// 1. Sending a request contains an ID,
// 2. Each sync reponse contains this ID,
// 3. There is a way to call whisper.SyncMessages() and wait for the response.Final to be received for that particular request ID.
// 4. If Cursor is not empty, another p2pSyncRequestCode should be sent.
if p.trusted && whisper.mailServer != nil {
var resp SyncResponse
if err = packet.Decode(&resp); err != nil {
return fmt.Errorf("failed to decode p2pSyncResponseCode payload: %v", err)
}
log.Info("received sync response", "count", len(resp.Envelopes), "final", resp.Final, "err", resp.Error)
for _, envelope := range resp.Envelopes {
whisper.mailServer.Archive(envelope)
}
if resp.Error != "" {
log.Error("failed to sync envelopes", "err", resp.Error)
}
if resp.Final {
log.Info("finished to sync envelopes successfully")
}
}
case p2pRequestCode: case p2pRequestCode:
// Must be processed if mail server is implemented. Otherwise ignore. // Must be processed if mail server is implemented. Otherwise ignore.
if whisper.mailServer != nil { if whisper.mailServer != nil {
@ -971,7 +1030,7 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
return errors.New("invalid request response message") return errors.New("invalid request response message")
} }
event, err := CreateMailServerEvent(payload) event, err := CreateMailServerEvent(p.peer.ID(), payload)
if err != nil { if err != nil {
log.Warn("error while parsing request complete code, peer will be disconnected", "peer", p.peer.ID(), "err", err) log.Warn("error while parsing request complete code, peer will be disconnected", "peer", p.peer.ID(), "err", err)