diff --git a/Gopkg.toml b/Gopkg.toml index de2597431..2a62a1c2c 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -27,6 +27,10 @@ version = "=v1.8.16" source = "github.com/status-im/go-ethereum" +[[constraint]] + name = "github.com/status-im/whisper" + version = "=v1.2.0" + [[override]] name = "github.com/golang/protobuf" version = "1.1.0" @@ -161,7 +165,3 @@ [[constraint]] name = "github.com/status-im/migrate" branch = "master" - -[[constraint]] - version = "v1.2.0" - name = "github.com/status-im/whisper" diff --git a/mailserver/mailserver.go b/mailserver/mailserver.go index 10c82c49c..a402f1f71 100644 --- a/mailserver/mailserver.go +++ b/mailserver/mailserver.go @@ -31,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/rlp" "github.com/status-im/status-go/db" "github.com/status-im/status-go/params" + "github.com/status-im/status-go/services/shhext" whisper "github.com/status-im/whisper/whisperv6" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/iterator" @@ -220,6 +221,8 @@ func recoverLevelDBPanics(calleMethodName string) { func (s *WMailServer) Archive(env *whisper.Envelope) { defer recoverLevelDBPanics("Archive") + log.Debug("Archiving envelope", "hash", env.Hash().Hex()) + key := NewDbKey(env.Expiry-env.TTL, env.Hash()) rawEnvelope, err := rlp.EncodeToBytes(env) if err != nil { @@ -237,7 +240,7 @@ func (s *WMailServer) Archive(env *whisper.Envelope) { // DeliverMail sends mail to specified whisper peer. func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) { - log.Info("Delivering mail", "peer", peer.ID) + log.Info("Delivering mail", "peer", peer.ID()) requestsMeter.Mark(1) if peer == nil { @@ -254,26 +257,62 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) defer recoverLevelDBPanics("DeliverMail") - if lower, upper, bloom, limit, cursor, err := s.validateRequest(peer.ID(), request); err != nil { + var ( + lower, upper uint32 + bloom []byte + limit uint32 + cursor cursorType + batch bool + err error + ) + + payload, err := s.decodeRequest(peer.ID(), request) + if err == nil { + lower, upper = payload.Lower, payload.Upper + bloom = payload.Bloom + cursor = cursorType(payload.Cursor) + limit = payload.Limit + batch = payload.Batch + } else { + log.Debug("Failed to decode request", "err", err, "peerID", peer.ID()) + lower, upper, bloom, limit, cursor, err = s.validateRequest(peer.ID(), request) + } + + if err != nil { requestValidationErrorsCounter.Inc(1) log.Error("Mailserver request failed validaton", "peerID", peer.ID()) s.trySendHistoricMessageErrorResponse(peer, request, err) - } else { - _, lastEnvelopeHash, nextPageCursor, err := s.processRequest(peer, lower, upper, bloom, limit, cursor) + return + } - if err != nil { - processRequestErrorsCounter.Inc(1) - log.Error("Error while delivering mail to the peer", "err", err, "peerID", peer.ID()) - s.trySendHistoricMessageErrorResponse(peer, request, err) - return - } + log.Debug("Processing request", + "lower", lower, "upper", upper, + "bloom", bloom, + "limit", limit, + "cursor", cursor, + "batch", batch) - if err := s.sendHistoricMessageResponse(peer, request, lastEnvelopeHash, nextPageCursor); err != nil { - historicResponseErrorsCounter.Inc(1) - log.Error("Error while sending historic message response", "err", err, "peerID", peer.ID()) - // we still want to try to report error even it it is a p2p error and it is unlikely - s.trySendHistoricMessageErrorResponse(peer, request, err) - } + _, lastEnvelopeHash, nextPageCursor, err := s.processRequest( + peer, + lower, upper, + bloom, + limit, + cursor, + batch) + if err != nil { + processRequestErrorsCounter.Inc(1) + log.Error("Error while processing mail server request", "err", err, "peerID", peer.ID()) + s.trySendHistoricMessageErrorResponse(peer, request, err) + return + } + + log.Debug("Sending historic message response", "last", lastEnvelopeHash, "next", nextPageCursor) + + if err := s.sendHistoricMessageResponse(peer, request, lastEnvelopeHash, nextPageCursor); err != nil { + historicResponseErrorsCounter.Inc(1) + log.Error("Error sending historic message response", "err", err, "peerID", peer.ID()) + // we still want to try to report error even it it is a p2p error and it is unlikely + s.trySendHistoricMessageErrorResponse(peer, request, err) } } @@ -294,13 +333,41 @@ func (s *WMailServer) exceedsPeerRequests(peer []byte) bool { return false } +func (s *WMailServer) createIterator(lower, upper uint32, cursor cursorType) iterator.Iterator { + var ( + emptyHash common.Hash + ku []byte + kl []byte + ) + + kl = NewDbKey(lower, emptyHash).raw + if len(cursor) == dbKeyLength { + ku = cursor + } else { + ku = NewDbKey(upper+1, emptyHash).raw + } + + i := s.db.NewIterator(&util.Range{Start: kl, Limit: ku}, nil) + // seek to the end as we want to return envelopes in a descending order + i.Seek(ku) + + return i +} + // processRequest processes the current request and re-sends all stored messages // accomplishing lower and upper limits. The limit parameter determines the maximum number of // messages to be sent back for the current request. // The cursor parameter is used for pagination. // After sending all the messages, a message of type p2pRequestCompleteCode is sent by the mailserver to // the peer. -func (s *WMailServer) processRequest(peer *whisper.Peer, lower, upper uint32, bloom []byte, limit uint32, cursor cursorType) (ret []*whisper.Envelope, lastEnvelopeHash common.Hash, nextPageCursor cursorType, err error) { +func (s *WMailServer) processRequest( + peer *whisper.Peer, + lower, upper uint32, + bloom []byte, + limit uint32, + cursor cursorType, + batch bool, +) (ret []*whisper.Envelope, lastEnvelopeHash common.Hash, nextPageCursor cursorType, err error) { // Recover from possible goleveldb panics defer func() { if r := recover(); r != nil { @@ -311,50 +378,82 @@ func (s *WMailServer) processRequest(peer *whisper.Peer, lower, upper uint32, bl var ( sentEnvelopes uint32 sentEnvelopesSize int64 - zero common.Hash - ku []byte - kl []byte ) - kl = NewDbKey(lower, zero).raw - if cursor != nil { - ku = cursor - } else { - ku = NewDbKey(upper+1, zero).raw - } - - i := s.db.NewIterator(&util.Range{Start: kl, Limit: ku}, nil) - i.Seek(ku) + i := s.createIterator(lower, upper, cursor) defer i.Release() + var ( + bundle []*whisper.Envelope + bundleSize uint32 + ) + start := time.Now() for i.Prev() { var envelope whisper.Envelope decodeErr := rlp.DecodeBytes(i.Value(), &envelope) if decodeErr != nil { - log.Error(fmt.Sprintf("failed to decode RLP: %s", decodeErr)) + log.Error("failed to decode RLP", "err", decodeErr) continue } - if whisper.BloomFilterMatch(bloom, envelope.Bloom()) { - if peer == nil { - // used for test purposes - ret = append(ret, &envelope) - } else { - err = s.w.SendP2PDirect(peer, &envelope) - if err != nil { - log.Error(fmt.Sprintf("failed to send direct message to peer: %s", err)) - return - } - lastEnvelopeHash = envelope.Hash() - } - sentEnvelopes++ - sentEnvelopesSize += whisper.EnvelopeHeaderLength + int64(len(envelope.Data)) + if !whisper.BloomFilterMatch(bloom, envelope.Bloom()) { + continue + } - if limit != noLimits && sentEnvelopes == limit { - nextPageCursor = i.Key() - break + newSize := bundleSize + whisper.EnvelopeHeaderLength + uint32(len(envelope.Data)) + limitReached := limit != noLimits && (int(sentEnvelopes)+len(bundle)) == int(limit) + if !limitReached && newSize < s.w.MaxMessageSize() { + bundle = append(bundle, &envelope) + bundleSize = newSize + lastEnvelopeHash = envelope.Hash() + continue + } + + if peer == nil { + // used for test purposes + ret = append(ret, bundle...) + } else { + err = s.sendEnvelopes(peer, bundle, batch) + if err != nil { + return + } + } + + sentEnvelopes += uint32(len(bundle)) + sentEnvelopesSize += int64(bundleSize) + + if limitReached { + bundle = nil + bundleSize = 0 + + // When the limit is reached, the current retrieved envelope + // is not included in the response. + // The nextPageCursor is a key used as a limit in a range and + // is not included in the range, hence, we need to get + // the previous iterator key. + i.Next() + nextPageCursor = i.Key() + break + } else { + // Reset bundle information and add the last read envelope + // which did not make in the last batch. + bundle = []*whisper.Envelope{&envelope} + bundleSize = whisper.EnvelopeHeaderLength + uint32(len(envelope.Data)) + } + + lastEnvelopeHash = envelope.Hash() + } + + // Send any outstanding envelopes. + if len(bundle) > 0 && bundleSize > 0 { + if peer == nil { + ret = append(ret, bundle...) + } else { + err = s.sendEnvelopes(peer, bundle, batch) + if err != nil { + return } } } @@ -365,12 +464,26 @@ func (s *WMailServer) processRequest(peer *whisper.Peer, lower, upper uint32, bl err = i.Error() if err != nil { - log.Error(fmt.Sprintf("Level DB iterator error: %s", err)) + err = fmt.Errorf("levelDB iterator error: %v", err) } return } +func (s *WMailServer) sendEnvelopes(peer *whisper.Peer, envelopes []*whisper.Envelope, batch bool) error { + if batch { + return s.w.SendP2PDirect(peer, envelopes...) + } + + for _, env := range envelopes { + if err := s.w.SendP2PDirect(peer, env); err != nil { + return err + } + } + + return nil +} + func (s *WMailServer) sendHistoricMessageResponse(peer *whisper.Peer, request *whisper.Envelope, lastEnvelopeHash common.Hash, cursor cursorType) error { payload := whisper.CreateMailServerRequestCompletedPayload(request.Hash(), lastEnvelopeHash, cursor) return s.w.SendHistoricMessageResponse(peer, payload) @@ -404,8 +517,49 @@ func (s *WMailServer) openEnvelope(request *whisper.Envelope) *whisper.ReceivedM return nil } +func (s *WMailServer) decodeRequest(peerID []byte, request *whisper.Envelope) (shhext.MessagesRequestPayload, error) { + var payload shhext.MessagesRequestPayload + + if s.pow > 0.0 && request.PoW() < s.pow { + return payload, errors.New("PoW too low") + } + + decrypted := s.openEnvelope(request) + if decrypted == nil { + log.Warn("Failed to decrypt p2p request") + return payload, errors.New("failed to decrypt p2p request") + } + + if err := s.checkMsgSignature(decrypted, peerID); err != nil { + log.Warn("Check message signature failed: %s", "err", err.Error()) + return payload, fmt.Errorf("check message signature failed: %v", err) + } + + if err := rlp.DecodeBytes(decrypted.Payload, &payload); err != nil { + return payload, fmt.Errorf("failed to decode data: %v", err) + } + + if payload.Upper < payload.Lower { + log.Error("Query range is invalid: lower > upper", "lower", payload.Lower, "upper", payload.Upper) + return payload, errors.New("query range is invalid: lower > upper") + } + + lowerTime := time.Unix(int64(payload.Lower), 0) + upperTime := time.Unix(int64(payload.Upper), 0) + if upperTime.Sub(lowerTime) > maxQueryRange { + log.Warn("Query range too long", "peerID", peerID, "length", upperTime.Sub(lowerTime), "max", maxQueryRange) + return payload, fmt.Errorf("query range must be shorted than %d", maxQueryRange) + } + + return payload, nil +} + // validateRequest runs different validations on the current request. -func (s *WMailServer) validateRequest(peerID []byte, request *whisper.Envelope) (uint32, uint32, []byte, uint32, cursorType, error) { +// DEPRECATED +func (s *WMailServer) validateRequest( + peerID []byte, + request *whisper.Envelope, +) (uint32, uint32, []byte, uint32, cursorType, error) { if s.pow > 0.0 && request.PoW() < s.pow { return 0, 0, nil, 0, nil, fmt.Errorf("PoW() is too low") } diff --git a/mailserver/mailserver_db_panic_test.go b/mailserver/mailserver_db_panic_test.go index 026efe518..e316d2bc4 100644 --- a/mailserver/mailserver_db_panic_test.go +++ b/mailserver/mailserver_db_panic_test.go @@ -54,7 +54,7 @@ func (s *MailServerDBPanicSuite) TestArchive() { func (s *MailServerDBPanicSuite) TestDeliverMail() { defer s.testPanicRecover("DeliverMail") - _, _, _, err := s.server.processRequest(nil, 10, 20, []byte{}, 0, nil) + _, _, _, err := s.server.processRequest(nil, 10, 20, []byte{}, 0, nil, false) s.Error(err) s.Equal("recovered from panic in processRequest: panicDB panic on NewIterator", err.Error()) } diff --git a/mailserver/mailserver_test.go b/mailserver/mailserver_test.go index 90f16bd8a..d6ccf3092 100644 --- a/mailserver/mailserver_test.go +++ b/mailserver/mailserver_test.go @@ -31,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/rlp" "github.com/status-im/status-go/params" + "github.com/status-im/status-go/services/shhext" whisper "github.com/status-im/whisper/whisperv6" "github.com/stretchr/testify/suite" ) @@ -327,7 +328,7 @@ func (s *MailserverSuite) TestRequestPaginationLimit() { s.Nil(cursor) s.Equal(params.limit, limit) - envelopes, _, cursor, err := s.server.processRequest(nil, lower, upper, bloom, limit, nil) + envelopes, _, cursor, err := s.server.processRequest(nil, lower, upper, bloom, limit, nil, false) s.NoError(err) for _, env := range envelopes { receivedHashes = append(receivedHashes, env.Hash()) @@ -339,12 +340,12 @@ func (s *MailserverSuite) TestRequestPaginationLimit() { s.Equal(limit, uint32(len(receivedHashes))) // the 6 envelopes received should be in descending order s.Equal(reverseSentHashes[:limit], receivedHashes) - // cursor should be the key of the first envelope of the next page + // cursor should be the key of the last envelope of the last page s.Equal(archiveKeys[count-limit], fmt.Sprintf("%x", cursor)) // second page receivedHashes = []common.Hash{} - envelopes, _, cursor, err = s.server.processRequest(nil, lower, upper, bloom, limit, cursor) + envelopes, _, cursor, err = s.server.processRequest(nil, lower, upper, bloom, limit, cursor, false) s.NoError(err) for _, env := range envelopes { receivedHashes = append(receivedHashes, env.Hash()) @@ -445,9 +446,36 @@ func (s *MailserverSuite) TestMailServer() { } } +func (s *MailserverSuite) TestDecodeRequest() { + s.setupServer(s.server) + defer s.server.Close() + + payload := shhext.MessagesRequestPayload{ + Lower: 50, + Upper: 100, + Bloom: []byte{0x01}, + Limit: 10, + Cursor: []byte{}, + Batch: true, + } + data, err := rlp.EncodeToBytes(payload) + s.Require().NoError(err) + + id, err := s.shh.NewKeyPair() + s.Require().NoError(err) + srcKey, err := s.shh.GetPrivateKey(id) + s.Require().NoError(err) + + env := s.createEnvelope(whisper.TopicType{0x01}, data, srcKey) + + decodedPayload, err := s.server.decodeRequest(nil, env) + s.Require().NoError(err) + s.Equal(payload, decodedPayload) +} + func (s *MailserverSuite) messageExists(envelope *whisper.Envelope, low, upp uint32, bloom []byte, limit uint32) bool { var exist bool - mail, _, _, err := s.server.processRequest(nil, low, upp, bloom, limit, nil) + mail, _, _, err := s.server.processRequest(nil, low, upp, bloom, limit, nil, false) s.NoError(err) for _, msg := range mail { if msg.Hash() == envelope.Hash() { @@ -549,6 +577,10 @@ func (s *MailserverSuite) createRequest(p *ServerTestParams) *whisper.Envelope { data = append(data, limitData...) } + return s.createEnvelope(p.topic, data, p.key) +} + +func (s *MailserverSuite) createEnvelope(topic whisper.TopicType, data []byte, srcKey *ecdsa.PrivateKey) *whisper.Envelope { key, err := s.shh.GetSymKey(keyID) if err != nil { s.T().Fatalf("failed to retrieve sym key with seed %d: %s.", seed, err) @@ -556,17 +588,18 @@ func (s *MailserverSuite) createRequest(p *ServerTestParams) *whisper.Envelope { params := &whisper.MessageParams{ KeySym: key, - Topic: p.topic, + Topic: topic, Payload: data, PoW: powRequirement * 2, WorkTime: 2, - Src: p.key, + Src: srcKey, } msg, err := whisper.NewSentMessage(params) if err != nil { s.T().Fatalf("failed to create new message with seed %d: %s.", seed, err) } + env, err := msg.Wrap(params, time.Now()) if err != nil { s.T().Fatalf("failed to wrap with seed %d: %s.", seed, err) diff --git a/node/node.go b/node/node.go index 96ac20adb..4b4fdce6d 100644 --- a/node/node.go +++ b/node/node.go @@ -277,7 +277,14 @@ func activateShhService(stack *node.Node, config *params.NodeConfig, db *leveldb err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) { whisperServiceConfig := &whisper.Config{ MaxMessageSize: whisper.DefaultMaxMessageSize, - MinimumAcceptedPOW: 0.001, + MinimumAcceptedPOW: params.WhisperMinimumPoW, + } + + if config.WhisperConfig.MaxMessageSize > 0 { + whisperServiceConfig.MaxMessageSize = config.WhisperConfig.MaxMessageSize + } + if config.WhisperConfig.MinimumPoW > 0 { + whisperServiceConfig.MinimumAcceptedPOW = config.WhisperConfig.MinimumPoW } whisperService := whisper.New(whisperServiceConfig) diff --git a/params/config.go b/params/config.go index facf675e9..c0857de02 100644 --- a/params/config.go +++ b/params/config.go @@ -15,6 +15,7 @@ import ( "github.com/ethereum/go-ethereum/p2p/discv5" "github.com/ethereum/go-ethereum/params" "github.com/status-im/status-go/static" + whisper "github.com/status-im/whisper/whisperv6" validator "gopkg.in/go-playground/validator.v9" ) @@ -72,6 +73,10 @@ type WhisperConfig struct { // EnableNTPSync enables NTP synchronizations EnableNTPSync bool + + // MaxMessageSize is a maximum size of a devp2p packet handled by the Whisper protocol, + // not only the size of envelopes sent in that packet. + MaxMessageSize uint32 } // String dumps config object as nicely indented JSON @@ -430,9 +435,10 @@ func NewNodeConfig(dataDir string, networkID uint64) (*NodeConfig, error) { DatabaseCache: 16, }, WhisperConfig: WhisperConfig{ - DataDir: wnodeDir, - MinimumPoW: WhisperMinimumPoW, - TTL: WhisperTTL, + DataDir: wnodeDir, + MinimumPoW: WhisperMinimumPoW, + TTL: WhisperTTL, + MaxMessageSize: whisper.DefaultMaxMessageSize, }, SwarmConfig: SwarmConfig{}, RegisterTopics: []discv5.Topic{}, diff --git a/services/shhext/api.go b/services/shhext/api.go index 897cc2f54..c0e764bfd 100644 --- a/services/shhext/api.go +++ b/services/shhext/api.go @@ -3,13 +3,14 @@ package shhext import ( "context" "crypto/ecdsa" - "encoding/binary" "encoding/hex" "errors" "fmt" "math/big" "time" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" @@ -102,6 +103,22 @@ func (r *MessagesRequest) setDefaults(now time.Time) { } } +// MessagesRequestPayload is a payload sent to the Mail Server. +type MessagesRequestPayload struct { + // Lower is a lower bound of time range for which messages are requested. + Lower uint32 + // Upper is a lower bound of time range for which messages are requested. + Upper uint32 + // Bloom is a bloom filter to filter envelopes. + Bloom []byte + // Limit is the max number of envelopes to return. + Limit uint32 + // Cursor is used for pagination of the results. + Cursor []byte + // Batch set to true indicates that the client supports batched response. + Batch bool +} + // ----- // PUBLIC API // ----- @@ -168,8 +185,13 @@ func (api *PublicAPI) RequestMessages(_ context.Context, r MessagesRequest) (hex } } + payload, err := makePayload(r) + if err != nil { + return nil, err + } + envelope, err := makeEnvelop( - makePayload(r), + payload, symKey, publicKey, api.service.nodeID, @@ -461,33 +483,25 @@ func makeEnvelop( } // makePayload makes a specific payload for MailServer to request historic messages. -func makePayload(r MessagesRequest) []byte { - // Payload format: - // 4 bytes for lower - // 4 bytes for upper - // 64 bytes for the bloom filter - // 4 bytes for limit - // 36 bytes for the cursor. optional. - data := make([]byte, 12+whisper.BloomFilterSize) - - // from - binary.BigEndian.PutUint32(data, r.From) - // to - binary.BigEndian.PutUint32(data[4:], r.To) - // bloom - copy(data[8:], createBloomFilter(r)) - // limit - binary.BigEndian.PutUint32(data[8+whisper.BloomFilterSize:], r.Limit) - - // cursor is the key of an envelope in leveldb. - // it's 36 bytes. 4 bytes for the timestamp + 32 bytes for the envelope hash +func makePayload(r MessagesRequest) ([]byte, error) { expectedCursorSize := common.HashLength + 4 - cursorBytes, err := hex.DecodeString(r.Cursor) - if err != nil || len(cursorBytes) != expectedCursorSize { - return data + cursor, err := hex.DecodeString(r.Cursor) + if err != nil || len(cursor) != expectedCursorSize { + cursor = nil } - return append(data, cursorBytes...) + payload := MessagesRequestPayload{ + Lower: r.From, + Upper: r.To, + Bloom: createBloomFilter(r), + Limit: r.Limit, + Cursor: cursor, + // Client must tell the MailServer if it supports batch responses. + // This can be removed in the future. + Batch: true, + } + + return rlp.EncodeToBytes(payload) } func createBloomFilter(r MessagesRequest) []byte {