Prevent frequent requests with same topics

This commit is contained in:
Dmitry 2019-01-15 11:21:33 +02:00 committed by Dmitry Shulyak
parent 4939268edf
commit aa09d854ca
9 changed files with 200 additions and 27 deletions

View File

@ -8,6 +8,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
@ -315,6 +316,8 @@ type ShhextConfig struct {
EnableLastUsedMonitor bool EnableLastUsedMonitor bool
// ConnectionTarget will be used by connection manager. It will ensure that we connected with configured number of servers. // ConnectionTarget will be used by connection manager. It will ensure that we connected with configured number of servers.
ConnectionTarget int ConnectionTarget int
// RequestsDelay used to ensure that no similar requests are sent within short periods of time.
RequestsDelay time.Duration
} }
// Validate validates the ShhextConfig struct and returns an error if inconsistent values are found // Validate validates the ShhextConfig struct and returns an error if inconsistent values are found

View File

@ -80,6 +80,9 @@ type MessagesRequest struct {
// Timeout is the time to live of the request specified in seconds. // Timeout is the time to live of the request specified in seconds.
// Default is 10 seconds // Default is 10 seconds
Timeout time.Duration `json:"timeout"` Timeout time.Duration `json:"timeout"`
// Force ensures that requests will bypass enforced delay.
Force bool `json:"force"`
} }
func (r *MessagesRequest) setDefaults(now time.Time) { func (r *MessagesRequest) setDefaults(now time.Time) {
@ -223,11 +226,17 @@ func (api *PublicAPI) RequestMessages(_ context.Context, r MessagesRequest) (hex
if err != nil { if err != nil {
return nil, err return nil, err
} }
hash := envelope.Hash()
if !r.Force {
err = api.service.requestsRegistry.Register(hash, r.Topics)
if err != nil {
return nil, err
}
}
if err := shh.RequestHistoricMessagesWithTimeout(mailServerNode.ID().Bytes(), envelope, r.Timeout*time.Second); err != nil { if err := shh.RequestHistoricMessagesWithTimeout(mailServerNode.ID().Bytes(), envelope, r.Timeout*time.Second); err != nil {
return nil, err return nil, err
} }
hash := envelope.Hash()
return hash[:], nil return hash[:], nil
} }

View File

@ -0,0 +1,83 @@
package shhext
import (
"fmt"
"hash/fnv"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
whisper "github.com/status-im/whisper/whisperv6"
)
const (
// defaultRequestsDelay will be used in RequestsRegistry if no other was provided.
defaultRequestsDelay = 3 * time.Second
)
type requestMeta struct {
timestamp time.Time
lastUID common.Hash
}
// NewRequestsRegistry creates instance of the RequestsRegistry and returns pointer to it.
func NewRequestsRegistry(delay time.Duration) *RequestsRegistry {
return &RequestsRegistry{
delay: delay,
uidToTopics: map[common.Hash]common.Hash{},
byTopicsHash: map[common.Hash]requestMeta{},
}
}
// RequestsRegistry keeps map for all requests with timestamp when they were made.
type RequestsRegistry struct {
mu sync.Mutex
delay time.Duration
uidToTopics map[common.Hash]common.Hash
byTopicsHash map[common.Hash]requestMeta
}
// Register request with given topics. If request with same topics was made in less then configured delay then error
// will be returned.
func (r *RequestsRegistry) Register(uid common.Hash, topics []whisper.TopicType) error {
r.mu.Lock()
defer r.mu.Unlock()
topicsHash := topicsToHash(topics)
if meta, exist := r.byTopicsHash[topicsHash]; exist {
if time.Since(meta.timestamp) < r.delay {
return fmt.Errorf("another request with the same topics was sent less than %s ago. Please wait for a bit longer, or set `force` to true in request parameters", r.delay)
}
}
newMeta := requestMeta{
timestamp: time.Now(),
lastUID: uid,
}
r.uidToTopics[uid] = topicsHash
r.byTopicsHash[topicsHash] = newMeta
return nil
}
// Unregister removes request with given UID from registry.
func (r *RequestsRegistry) Unregister(uid common.Hash) {
r.mu.Lock()
defer r.mu.Unlock()
topicsHash, exist := r.uidToTopics[uid]
if !exist {
return
}
delete(r.uidToTopics, uid)
meta := r.byTopicsHash[topicsHash]
// remove topicsHash only if we are trying to unregister last request with this topic.
if meta.lastUID == uid {
delete(r.byTopicsHash, topicsHash)
}
}
// topicsToHash returns non-cryptographic hash of the topics.
func topicsToHash(topics []whisper.TopicType) common.Hash {
hash := fnv.New32()
for i := range topics {
_, _ = hash.Write(topics[i][:]) // never returns error per documentation
}
return common.BytesToHash(hash.Sum(nil))
}

View File

@ -0,0 +1,44 @@
package shhext
import (
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
whisper "github.com/status-im/whisper/whisperv6"
"github.com/stretchr/testify/require"
)
func TestRegisterSameRequests(t *testing.T) {
registry := NewRequestsRegistry(10 * time.Second)
topics := []whisper.TopicType{{1}}
require.NoError(t, registry.Register(common.Hash{1}, topics))
require.Error(t, registry.Register(common.Hash{2}, topics))
}
func TestRegisterSameRequestsWithoutDelay(t *testing.T) {
registry := NewRequestsRegistry(0)
topics := []whisper.TopicType{{1}}
require.NoError(t, registry.Register(common.Hash{1}, topics))
require.NoError(t, registry.Register(common.Hash{2}, topics))
}
func TestRegisterDifferentRequests(t *testing.T) {
registry := NewRequestsRegistry(10 * time.Second)
require.NoError(t, registry.Register(common.Hash{1}, []whisper.TopicType{{1}}))
require.NoError(t, registry.Register(common.Hash{2}, []whisper.TopicType{{2}}))
}
func TestUnregisterReplacedRequest(t *testing.T) {
registry := NewRequestsRegistry(0)
unreg := common.Hash{1}
topics := []whisper.TopicType{{1}}
require.NoError(t, registry.Register(unreg, topics))
replacement := common.Hash{2}
require.NoError(t, registry.Register(replacement, topics))
// record should be replaced with common.Hash{2}, so when we will remove unreg it will not affect topics map
registry.Unregister(unreg)
record, exist := registry.uidToTopics[replacement]
require.True(t, exist, "replaced record should exist")
require.Equal(t, replacement, registry.byTopicsHash[record].lastUID)
}

View File

@ -44,6 +44,7 @@ type Service struct {
w *whisper.Whisper w *whisper.Whisper
config params.ShhextConfig config params.ShhextConfig
tracker *tracker tracker *tracker
requestsRegistry *RequestsRegistry
server *p2p.Server server *p2p.Server
nodeID *ecdsa.PrivateKey nodeID *ecdsa.PrivateKey
deduplicator *dedup.Deduplicator deduplicator *dedup.Deduplicator
@ -66,6 +67,11 @@ var _ node.Service = (*Service)(nil)
func New(w *whisper.Whisper, handler EnvelopeEventsHandler, db *leveldb.DB, config params.ShhextConfig) *Service { func New(w *whisper.Whisper, handler EnvelopeEventsHandler, db *leveldb.DB, config params.ShhextConfig) *Service {
cache := mailservers.NewCache(db) cache := mailservers.NewCache(db)
ps := mailservers.NewPeerStore(cache) ps := mailservers.NewPeerStore(cache)
delay := defaultRequestsDelay
if config.RequestsDelay != 0 {
delay = config.RequestsDelay
}
requestsRegistry := NewRequestsRegistry(delay)
track := &tracker{ track := &tracker{
w: w, w: w,
handler: handler, handler: handler,
@ -73,11 +79,13 @@ func New(w *whisper.Whisper, handler EnvelopeEventsHandler, db *leveldb.DB, conf
batches: map[common.Hash]map[common.Hash]struct{}{}, batches: map[common.Hash]map[common.Hash]struct{}{},
mailPeers: ps, mailPeers: ps,
mailServerConfirmation: config.MailServerConfirmations, mailServerConfirmation: config.MailServerConfirmations,
requestsRegistry: requestsRegistry,
} }
return &Service{ return &Service{
w: w, w: w,
config: config, config: config,
tracker: track, tracker: track,
requestsRegistry: requestsRegistry,
deduplicator: dedup.NewDeduplicator(w, db), deduplicator: dedup.NewDeduplicator(w, db),
debug: config.DebugAPIEnabled, debug: config.DebugAPIEnabled,
dataDir: config.BackupDisabledDataDir, dataDir: config.BackupDisabledDataDir,

View File

@ -242,6 +242,26 @@ func (s *ShhExtSuite) TestRequestMessagesErrors() {
s.Contains(err.Error(), "Query range is invalid: from > to (10 > 5)") s.Contains(err.Error(), "Query range is invalid: from > to (10 > 5)")
} }
func (s *ShhExtSuite) TestMultipleRequestMessagesWithoutForce() {
waitErr := helpers.WaitForPeerAsync(s.nodes[0].Server(), s.nodes[1].Server().Self().String(), p2p.PeerEventTypeAdd, time.Second)
s.nodes[0].Server().AddPeer(s.nodes[1].Server().Self())
s.Require().NoError(<-waitErr)
client, err := s.nodes[0].Attach()
s.NoError(err)
s.NoError(client.Call(nil, "shhext_requestMessages", MessagesRequest{
MailServerPeer: s.nodes[1].Server().Self().String(),
Topics: []whisper.TopicType{{1}},
}))
s.EqualError(client.Call(nil, "shhext_requestMessages", MessagesRequest{
MailServerPeer: s.nodes[1].Server().Self().String(),
Topics: []whisper.TopicType{{1}},
}), "another request with the same topics was sent less than 3s ago. Please wait for a bit longer, or set `force` to true in request parameters")
s.NoError(client.Call(nil, "shhext_requestMessages", MessagesRequest{
MailServerPeer: s.nodes[1].Server().Self().String(),
Topics: []whisper.TopicType{{2}},
}))
}
func (s *ShhExtSuite) TestRequestMessagesSuccess() { func (s *ShhExtSuite) TestRequestMessagesSuccess() {
var err error var err error
@ -301,6 +321,7 @@ func (s *ShhExtSuite) TestRequestMessagesSuccess() {
hash, err = api.RequestMessages(context.TODO(), MessagesRequest{ hash, err = api.RequestMessages(context.TODO(), MessagesRequest{
MailServerPeer: mailNode.Server().Self().String(), MailServerPeer: mailNode.Server().Self().String(),
SymKeyID: symKeyID, SymKeyID: symKeyID,
Force: true,
}) })
s.Require().NoError(err) s.Require().NoError(err)
s.Require().NotNil(hash) s.Require().NotNil(hash)
@ -309,6 +330,7 @@ func (s *ShhExtSuite) TestRequestMessagesSuccess() {
// a public key extracted from MailServerPeer will be used. // a public key extracted from MailServerPeer will be used.
hash, err = api.RequestMessages(context.TODO(), MessagesRequest{ hash, err = api.RequestMessages(context.TODO(), MessagesRequest{
MailServerPeer: mailNode.Server().Self().String(), MailServerPeer: mailNode.Server().Self().String(),
Force: true,
}) })
s.Require().NoError(err) s.Require().NoError(err)
s.Require().NotNil(hash) s.Require().NotNil(hash)

View File

@ -37,6 +37,8 @@ type tracker struct {
mailPeers *mailservers.PeerStore mailPeers *mailservers.PeerStore
requestsRegistry *RequestsRegistry
wg sync.WaitGroup wg sync.WaitGroup
quit chan struct{} quit chan struct{}
} }
@ -194,7 +196,7 @@ func (t *tracker) handleRequestSent(event whisper.EnvelopeEvent) {
func (t *tracker) handleEventMailServerRequestCompleted(event whisper.EnvelopeEvent) { func (t *tracker) handleEventMailServerRequestCompleted(event whisper.EnvelopeEvent) {
t.mu.Lock() t.mu.Lock()
defer t.mu.Unlock() defer t.mu.Unlock()
t.requestsRegistry.Unregister(event.Hash)
state, ok := t.cache[event.Hash] state, ok := t.cache[event.Hash]
if !ok || state != MailServerRequestSent { if !ok || state != MailServerRequestSent {
return return
@ -211,7 +213,7 @@ func (t *tracker) handleEventMailServerRequestCompleted(event whisper.EnvelopeEv
func (t *tracker) handleEventMailServerRequestExpired(event whisper.EnvelopeEvent) { func (t *tracker) handleEventMailServerRequestExpired(event whisper.EnvelopeEvent) {
t.mu.Lock() t.mu.Lock()
defer t.mu.Unlock() defer t.mu.Unlock()
t.requestsRegistry.Unregister(event.Hash)
state, ok := t.cache[event.Hash] state, ok := t.cache[event.Hash]
if !ok || state != MailServerRequestSent { if !ok || state != MailServerRequestSent {
return return

View File

@ -36,6 +36,7 @@ func (s *TrackerSuite) SetupTest() {
cache: map[common.Hash]EnvelopeState{}, cache: map[common.Hash]EnvelopeState{},
batches: map[common.Hash]map[common.Hash]struct{}{}, batches: map[common.Hash]map[common.Hash]struct{}{},
mailPeers: mailservers.NewPeerStore(mailservers.NewCache(db)), mailPeers: mailservers.NewPeerStore(mailservers.NewCache(db)),
requestsRegistry: NewRequestsRegistry(0),
} }
} }

View File

@ -75,6 +75,7 @@ type ReceivedMessage struct {
SymKeyHash common.Hash // The Keccak256Hash of the key SymKeyHash common.Hash // The Keccak256Hash of the key
EnvelopeHash common.Hash // Message envelope hash to act as a unique id EnvelopeHash common.Hash // Message envelope hash to act as a unique id
History bool
} }
func isMessageSigned(flags byte) bool { func isMessageSigned(flags byte) bool {