From aa09d854ca997fe6e2cbf08296748d053234a520 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Tue, 15 Jan 2019 11:21:33 +0200 Subject: [PATCH] Prevent frequent requests with same topics --- params/config.go | 3 + services/shhext/api.go | 11 ++- services/shhext/requests.go | 83 +++++++++++++++++++ services/shhext/requests_test.go | 44 ++++++++++ services/shhext/service.go | 50 ++++++----- services/shhext/service_test.go | 22 +++++ services/shhext/tracker.go | 6 +- services/shhext/tracker_test.go | 7 +- .../status-im/whisper/whisperv6/message.go | 1 + 9 files changed, 200 insertions(+), 27 deletions(-) create mode 100644 services/shhext/requests.go create mode 100644 services/shhext/requests_test.go diff --git a/params/config.go b/params/config.go index 64c4e54c6..2afc355ab 100644 --- a/params/config.go +++ b/params/config.go @@ -8,6 +8,7 @@ import ( "os" "path/filepath" "strings" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" @@ -315,6 +316,8 @@ type ShhextConfig struct { EnableLastUsedMonitor bool // ConnectionTarget will be used by connection manager. It will ensure that we connected with configured number of servers. ConnectionTarget int + // RequestsDelay used to ensure that no similar requests are sent within short periods of time. + RequestsDelay time.Duration } // Validate validates the ShhextConfig struct and returns an error if inconsistent values are found diff --git a/services/shhext/api.go b/services/shhext/api.go index e9ff9fd2c..ce2edec29 100644 --- a/services/shhext/api.go +++ b/services/shhext/api.go @@ -80,6 +80,9 @@ type MessagesRequest struct { // Timeout is the time to live of the request specified in seconds. // Default is 10 seconds Timeout time.Duration `json:"timeout"` + + // Force ensures that requests will bypass enforced delay. + Force bool `json:"force"` } func (r *MessagesRequest) setDefaults(now time.Time) { @@ -223,11 +226,17 @@ func (api *PublicAPI) RequestMessages(_ context.Context, r MessagesRequest) (hex if err != nil { return nil, err } + hash := envelope.Hash() + if !r.Force { + err = api.service.requestsRegistry.Register(hash, r.Topics) + if err != nil { + return nil, err + } + } if err := shh.RequestHistoricMessagesWithTimeout(mailServerNode.ID().Bytes(), envelope, r.Timeout*time.Second); err != nil { return nil, err } - hash := envelope.Hash() return hash[:], nil } diff --git a/services/shhext/requests.go b/services/shhext/requests.go new file mode 100644 index 000000000..ec537d0b0 --- /dev/null +++ b/services/shhext/requests.go @@ -0,0 +1,83 @@ +package shhext + +import ( + "fmt" + "hash/fnv" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + whisper "github.com/status-im/whisper/whisperv6" +) + +const ( + // defaultRequestsDelay will be used in RequestsRegistry if no other was provided. + defaultRequestsDelay = 3 * time.Second +) + +type requestMeta struct { + timestamp time.Time + lastUID common.Hash +} + +// NewRequestsRegistry creates instance of the RequestsRegistry and returns pointer to it. +func NewRequestsRegistry(delay time.Duration) *RequestsRegistry { + return &RequestsRegistry{ + delay: delay, + uidToTopics: map[common.Hash]common.Hash{}, + byTopicsHash: map[common.Hash]requestMeta{}, + } +} + +// RequestsRegistry keeps map for all requests with timestamp when they were made. +type RequestsRegistry struct { + mu sync.Mutex + delay time.Duration + uidToTopics map[common.Hash]common.Hash + byTopicsHash map[common.Hash]requestMeta +} + +// Register request with given topics. If request with same topics was made in less then configured delay then error +// will be returned. +func (r *RequestsRegistry) Register(uid common.Hash, topics []whisper.TopicType) error { + r.mu.Lock() + defer r.mu.Unlock() + topicsHash := topicsToHash(topics) + if meta, exist := r.byTopicsHash[topicsHash]; exist { + if time.Since(meta.timestamp) < r.delay { + return fmt.Errorf("another request with the same topics was sent less than %s ago. Please wait for a bit longer, or set `force` to true in request parameters", r.delay) + } + } + newMeta := requestMeta{ + timestamp: time.Now(), + lastUID: uid, + } + r.uidToTopics[uid] = topicsHash + r.byTopicsHash[topicsHash] = newMeta + return nil +} + +// Unregister removes request with given UID from registry. +func (r *RequestsRegistry) Unregister(uid common.Hash) { + r.mu.Lock() + defer r.mu.Unlock() + topicsHash, exist := r.uidToTopics[uid] + if !exist { + return + } + delete(r.uidToTopics, uid) + meta := r.byTopicsHash[topicsHash] + // remove topicsHash only if we are trying to unregister last request with this topic. + if meta.lastUID == uid { + delete(r.byTopicsHash, topicsHash) + } +} + +// topicsToHash returns non-cryptographic hash of the topics. +func topicsToHash(topics []whisper.TopicType) common.Hash { + hash := fnv.New32() + for i := range topics { + _, _ = hash.Write(topics[i][:]) // never returns error per documentation + } + return common.BytesToHash(hash.Sum(nil)) +} diff --git a/services/shhext/requests_test.go b/services/shhext/requests_test.go new file mode 100644 index 000000000..574d0a1b9 --- /dev/null +++ b/services/shhext/requests_test.go @@ -0,0 +1,44 @@ +package shhext + +import ( + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + whisper "github.com/status-im/whisper/whisperv6" + "github.com/stretchr/testify/require" +) + +func TestRegisterSameRequests(t *testing.T) { + registry := NewRequestsRegistry(10 * time.Second) + topics := []whisper.TopicType{{1}} + require.NoError(t, registry.Register(common.Hash{1}, topics)) + require.Error(t, registry.Register(common.Hash{2}, topics)) +} + +func TestRegisterSameRequestsWithoutDelay(t *testing.T) { + registry := NewRequestsRegistry(0) + topics := []whisper.TopicType{{1}} + require.NoError(t, registry.Register(common.Hash{1}, topics)) + require.NoError(t, registry.Register(common.Hash{2}, topics)) +} + +func TestRegisterDifferentRequests(t *testing.T) { + registry := NewRequestsRegistry(10 * time.Second) + require.NoError(t, registry.Register(common.Hash{1}, []whisper.TopicType{{1}})) + require.NoError(t, registry.Register(common.Hash{2}, []whisper.TopicType{{2}})) +} + +func TestUnregisterReplacedRequest(t *testing.T) { + registry := NewRequestsRegistry(0) + unreg := common.Hash{1} + topics := []whisper.TopicType{{1}} + require.NoError(t, registry.Register(unreg, topics)) + replacement := common.Hash{2} + require.NoError(t, registry.Register(replacement, topics)) + // record should be replaced with common.Hash{2}, so when we will remove unreg it will not affect topics map + registry.Unregister(unreg) + record, exist := registry.uidToTopics[replacement] + require.True(t, exist, "replaced record should exist") + require.Equal(t, replacement, registry.byTopicsHash[record].lastUID) +} diff --git a/services/shhext/service.go b/services/shhext/service.go index 22213d197..461468158 100644 --- a/services/shhext/service.go +++ b/services/shhext/service.go @@ -41,17 +41,18 @@ type EnvelopeEventsHandler interface { // Service is a service that provides some additional Whisper API. type Service struct { - w *whisper.Whisper - config params.ShhextConfig - tracker *tracker - server *p2p.Server - nodeID *ecdsa.PrivateKey - deduplicator *dedup.Deduplicator - protocol *chat.ProtocolService - debug bool - dataDir string - installationID string - pfsEnabled bool + w *whisper.Whisper + config params.ShhextConfig + tracker *tracker + requestsRegistry *RequestsRegistry + server *p2p.Server + nodeID *ecdsa.PrivateKey + deduplicator *dedup.Deduplicator + protocol *chat.ProtocolService + debug bool + dataDir string + installationID string + pfsEnabled bool peerStore *mailservers.PeerStore cache *mailservers.Cache @@ -66,6 +67,11 @@ var _ node.Service = (*Service)(nil) func New(w *whisper.Whisper, handler EnvelopeEventsHandler, db *leveldb.DB, config params.ShhextConfig) *Service { cache := mailservers.NewCache(db) ps := mailservers.NewPeerStore(cache) + delay := defaultRequestsDelay + if config.RequestsDelay != 0 { + delay = config.RequestsDelay + } + requestsRegistry := NewRequestsRegistry(delay) track := &tracker{ w: w, handler: handler, @@ -73,18 +79,20 @@ func New(w *whisper.Whisper, handler EnvelopeEventsHandler, db *leveldb.DB, conf batches: map[common.Hash]map[common.Hash]struct{}{}, mailPeers: ps, mailServerConfirmation: config.MailServerConfirmations, + requestsRegistry: requestsRegistry, } return &Service{ - w: w, - config: config, - tracker: track, - deduplicator: dedup.NewDeduplicator(w, db), - debug: config.DebugAPIEnabled, - dataDir: config.BackupDisabledDataDir, - installationID: config.InstallationID, - pfsEnabled: config.PFSEnabled, - peerStore: ps, - cache: cache, + w: w, + config: config, + tracker: track, + requestsRegistry: requestsRegistry, + deduplicator: dedup.NewDeduplicator(w, db), + debug: config.DebugAPIEnabled, + dataDir: config.BackupDisabledDataDir, + installationID: config.InstallationID, + pfsEnabled: config.PFSEnabled, + peerStore: ps, + cache: cache, } } diff --git a/services/shhext/service_test.go b/services/shhext/service_test.go index 481da0700..13b34f3a5 100644 --- a/services/shhext/service_test.go +++ b/services/shhext/service_test.go @@ -242,6 +242,26 @@ func (s *ShhExtSuite) TestRequestMessagesErrors() { s.Contains(err.Error(), "Query range is invalid: from > to (10 > 5)") } +func (s *ShhExtSuite) TestMultipleRequestMessagesWithoutForce() { + waitErr := helpers.WaitForPeerAsync(s.nodes[0].Server(), s.nodes[1].Server().Self().String(), p2p.PeerEventTypeAdd, time.Second) + s.nodes[0].Server().AddPeer(s.nodes[1].Server().Self()) + s.Require().NoError(<-waitErr) + client, err := s.nodes[0].Attach() + s.NoError(err) + s.NoError(client.Call(nil, "shhext_requestMessages", MessagesRequest{ + MailServerPeer: s.nodes[1].Server().Self().String(), + Topics: []whisper.TopicType{{1}}, + })) + s.EqualError(client.Call(nil, "shhext_requestMessages", MessagesRequest{ + MailServerPeer: s.nodes[1].Server().Self().String(), + Topics: []whisper.TopicType{{1}}, + }), "another request with the same topics was sent less than 3s ago. Please wait for a bit longer, or set `force` to true in request parameters") + s.NoError(client.Call(nil, "shhext_requestMessages", MessagesRequest{ + MailServerPeer: s.nodes[1].Server().Self().String(), + Topics: []whisper.TopicType{{2}}, + })) +} + func (s *ShhExtSuite) TestRequestMessagesSuccess() { var err error @@ -301,6 +321,7 @@ func (s *ShhExtSuite) TestRequestMessagesSuccess() { hash, err = api.RequestMessages(context.TODO(), MessagesRequest{ MailServerPeer: mailNode.Server().Self().String(), SymKeyID: symKeyID, + Force: true, }) s.Require().NoError(err) s.Require().NotNil(hash) @@ -309,6 +330,7 @@ func (s *ShhExtSuite) TestRequestMessagesSuccess() { // a public key extracted from MailServerPeer will be used. hash, err = api.RequestMessages(context.TODO(), MessagesRequest{ MailServerPeer: mailNode.Server().Self().String(), + Force: true, }) s.Require().NoError(err) s.Require().NotNil(hash) diff --git a/services/shhext/tracker.go b/services/shhext/tracker.go index 2f95752a7..5656a67ce 100644 --- a/services/shhext/tracker.go +++ b/services/shhext/tracker.go @@ -37,6 +37,8 @@ type tracker struct { mailPeers *mailservers.PeerStore + requestsRegistry *RequestsRegistry + wg sync.WaitGroup quit chan struct{} } @@ -194,7 +196,7 @@ func (t *tracker) handleRequestSent(event whisper.EnvelopeEvent) { func (t *tracker) handleEventMailServerRequestCompleted(event whisper.EnvelopeEvent) { t.mu.Lock() defer t.mu.Unlock() - + t.requestsRegistry.Unregister(event.Hash) state, ok := t.cache[event.Hash] if !ok || state != MailServerRequestSent { return @@ -211,7 +213,7 @@ func (t *tracker) handleEventMailServerRequestCompleted(event whisper.EnvelopeEv func (t *tracker) handleEventMailServerRequestExpired(event whisper.EnvelopeEvent) { t.mu.Lock() defer t.mu.Unlock() - + t.requestsRegistry.Unregister(event.Hash) state, ok := t.cache[event.Hash] if !ok || state != MailServerRequestSent { return diff --git a/services/shhext/tracker_test.go b/services/shhext/tracker_test.go index 5150b9d1e..0ef5416c4 100644 --- a/services/shhext/tracker_test.go +++ b/services/shhext/tracker_test.go @@ -33,9 +33,10 @@ func (s *TrackerSuite) SetupTest() { db, err := leveldb.Open(storage.NewMemStorage(), nil) s.Require().NoError(err) s.tracker = &tracker{ - cache: map[common.Hash]EnvelopeState{}, - batches: map[common.Hash]map[common.Hash]struct{}{}, - mailPeers: mailservers.NewPeerStore(mailservers.NewCache(db)), + cache: map[common.Hash]EnvelopeState{}, + batches: map[common.Hash]map[common.Hash]struct{}{}, + mailPeers: mailservers.NewPeerStore(mailservers.NewCache(db)), + requestsRegistry: NewRequestsRegistry(0), } } diff --git a/vendor/github.com/status-im/whisper/whisperv6/message.go b/vendor/github.com/status-im/whisper/whisperv6/message.go index a12b445e2..3cd3cf6a0 100644 --- a/vendor/github.com/status-im/whisper/whisperv6/message.go +++ b/vendor/github.com/status-im/whisper/whisperv6/message.go @@ -75,6 +75,7 @@ type ReceivedMessage struct { SymKeyHash common.Hash // The Keccak256Hash of the key EnvelopeHash common.Hash // Message envelope hash to act as a unique id + History bool } func isMessageSigned(flags byte) bool {