From 14c513bd5ac5037e7242d8f947c0c9ed1c0caf73 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Mon, 6 May 2019 09:33:19 +0300 Subject: [PATCH] Execute writes atomically only after request was processed without errors (#1454) * Replace request ID when same request is restarted * Remove unnecessary changes * Execute all writes atomically only if request was processed succesfully * Fix linter * Fix shadowed errors * Fix spelling * Do not append same reference to a byte slice --- db/db.go | 14 +++--- db/history_store.go | 7 ++- db/history_store_test.go | 2 +- db/storage.go | 75 +++++++++++++++++++++++++++++++++ db/tx.go | 40 ++++++++++++++++++ db/tx_test.go | 23 ++++++++++ services/shhext/api.go | 48 +++++++++++++++------ services/shhext/context.go | 60 ++++++++++++++++++++++++++ services/shhext/history.go | 38 ++++++----------- services/shhext/history_test.go | 61 +++++++++++++++------------ services/shhext/service.go | 4 +- services/shhext/service_test.go | 12 +++--- 12 files changed, 303 insertions(+), 81 deletions(-) create mode 100644 db/storage.go create mode 100644 db/tx.go create mode 100644 db/tx_test.go create mode 100644 services/shhext/context.go diff --git a/db/db.go b/db/db.go index 87d89f766..9e9a3b1da 100644 --- a/db/db.go +++ b/db/db.go @@ -34,7 +34,7 @@ func NewMemoryDB() (*leveldb.DB, error) { } // NewDBNamespace returns instance that ensures isolated operations. -func NewDBNamespace(db *leveldb.DB, prefix storagePrefix) LevelDBNamespace { +func NewDBNamespace(db Storage, prefix storagePrefix) LevelDBNamespace { return LevelDBNamespace{ db: db, prefix: prefix, @@ -48,7 +48,7 @@ func NewMemoryDBNamespace(prefix storagePrefix) (pdb LevelDBNamespace, err error if err != nil { return pdb, err } - return NewDBNamespace(db, prefix), nil + return NewDBNamespace(LevelDBStorage{db: db}, prefix), nil } // Key creates a DB key for a specified service with specified data @@ -91,7 +91,7 @@ func Open(path string, opts *opt.Options) (db *leveldb.DB, err error) { // LevelDBNamespace database where all operations will be prefixed with a certain bucket. type LevelDBNamespace struct { - db *leveldb.DB + db Storage prefix storagePrefix } @@ -103,11 +103,11 @@ func (db LevelDBNamespace) prefixedKey(key []byte) []byte { } func (db LevelDBNamespace) Put(key, value []byte) error { - return db.db.Put(db.prefixedKey(key), value, nil) + return db.db.Put(db.prefixedKey(key), value) } func (db LevelDBNamespace) Get(key []byte) ([]byte, error) { - return db.db.Get(db.prefixedKey(key), nil) + return db.db.Get(db.prefixedKey(key)) } // Range returns leveldb util.Range prefixed with a single byte. @@ -121,12 +121,12 @@ func (db LevelDBNamespace) Range(prefix, limit []byte) *util.Range { // Delete removes key from database. func (db LevelDBNamespace) Delete(key []byte) error { - return db.db.Delete(db.prefixedKey(key), nil) + return db.db.Delete(db.prefixedKey(key)) } // NewIterator returns iterator for a given slice. func (db LevelDBNamespace) NewIterator(slice *util.Range) NamespaceIterator { - return NamespaceIterator{db.db.NewIterator(slice, nil)} + return NamespaceIterator{db.db.NewIterator(slice)} } // NamespaceIterator wraps leveldb iterator, works mostly the same way. diff --git a/db/history_store.go b/db/history_store.go index d2173dafd..2597feb0c 100644 --- a/db/history_store.go +++ b/db/history_store.go @@ -5,15 +5,14 @@ import ( "github.com/ethereum/go-ethereum/common" whisper "github.com/status-im/whisper/whisperv6" - "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/errors" ) // NewHistoryStore returns HistoryStore instance. -func NewHistoryStore(db *leveldb.DB) HistoryStore { +func NewHistoryStore(storage Storage) HistoryStore { return HistoryStore{ - topicDB: NewDBNamespace(db, TopicHistoryBucket), - requestDB: NewDBNamespace(db, HistoryRequestBucket), + topicDB: NewDBNamespace(storage, TopicHistoryBucket), + requestDB: NewDBNamespace(storage, HistoryRequestBucket), } } diff --git a/db/history_store_test.go b/db/history_store_test.go index 2bf8cdb2d..48228b3c2 100644 --- a/db/history_store_test.go +++ b/db/history_store_test.go @@ -12,7 +12,7 @@ import ( func createInMemStore(t *testing.T) HistoryStore { db, err := NewMemoryDB() require.NoError(t, err) - return NewHistoryStore(db) + return NewHistoryStore(LevelDBStorage{db: db}) } func TestGetNewHistory(t *testing.T) { diff --git a/db/storage.go b/db/storage.go new file mode 100644 index 000000000..f95587f29 --- /dev/null +++ b/db/storage.go @@ -0,0 +1,75 @@ +package db + +import ( + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/iterator" + "github.com/syndtr/goleveldb/leveldb/storage" + "github.com/syndtr/goleveldb/leveldb/util" +) + +// Storage is an interface for common db operations. +type Storage interface { + Put([]byte, []byte) error + Delete([]byte) error + Get([]byte) ([]byte, error) + NewIterator(*util.Range) iterator.Iterator +} + +// CommitStorage allows to write all tx/batched values atomically. +type CommitStorage interface { + Storage + Commit() error +} + +// TransactionalStorage adds transaction features on top of regular storage. +type TransactionalStorage interface { + Storage + NewTx() CommitStorage +} + +// NewMemoryLevelDBStorage returns LevelDBStorage instance with in memory leveldb backend. +func NewMemoryLevelDBStorage() (LevelDBStorage, error) { + mdb, err := leveldb.Open(storage.NewMemStorage(), nil) + if err != nil { + return LevelDBStorage{}, err + } + return NewLevelDBStorage(mdb), nil +} + +// NewLevelDBStorage creates new LevelDBStorage instance. +func NewLevelDBStorage(db *leveldb.DB) LevelDBStorage { + return LevelDBStorage{db: db} +} + +// LevelDBStorage wrapper around leveldb.DB. +type LevelDBStorage struct { + db *leveldb.DB +} + +// Put upserts given key/value pair. +func (db LevelDBStorage) Put(key, buf []byte) error { + return db.db.Put(key, buf, nil) +} + +// Delete removes given key from database.. +func (db LevelDBStorage) Delete(key []byte) error { + return db.db.Delete(key, nil) +} + +// Get returns value for a given key. +func (db LevelDBStorage) Get(key []byte) ([]byte, error) { + return db.db.Get(key, nil) +} + +// NewIterator returns new leveldb iterator.Iterator instance for a given range. +func (db LevelDBStorage) NewIterator(slice *util.Range) iterator.Iterator { + return db.db.NewIterator(slice, nil) +} + +// NewTx is a wrapper around leveldb.Batch that allows to write atomically. +func (db LevelDBStorage) NewTx() CommitStorage { + return LevelDBTx{ + batch: &leveldb.Batch{}, + db: db, + } +} diff --git a/db/tx.go b/db/tx.go new file mode 100644 index 000000000..1830b5f76 --- /dev/null +++ b/db/tx.go @@ -0,0 +1,40 @@ +package db + +import ( + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/iterator" + "github.com/syndtr/goleveldb/leveldb/util" +) + +// LevelDBTx doesn't provide any read isolation. It allows committing all writes atomically (put/delete). +type LevelDBTx struct { + batch *leveldb.Batch + db LevelDBStorage +} + +// Put adds key/value to associated batch. +func (tx LevelDBTx) Put(key, buf []byte) error { + tx.batch.Put(key, buf) + return nil +} + +// Delete adds delete operation to associated batch. +func (tx LevelDBTx) Delete(key []byte) error { + tx.batch.Delete(key) + return nil +} + +// Get reads from currently committed state. +func (tx LevelDBTx) Get(key []byte) ([]byte, error) { + return tx.db.Get(key) +} + +// NewIterator returns iterator.Iterator that will read from currently committed state. +func (tx LevelDBTx) NewIterator(slice *util.Range) iterator.Iterator { + return tx.db.NewIterator(slice) +} + +// Commit writes batch atomically. +func (tx LevelDBTx) Commit() error { + return tx.db.db.Write(tx.batch, nil) +} diff --git a/db/tx_test.go b/db/tx_test.go new file mode 100644 index 000000000..6732d4301 --- /dev/null +++ b/db/tx_test.go @@ -0,0 +1,23 @@ +package db + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestTxWritesOnCommit(t *testing.T) { + storage, err := NewMemoryLevelDBStorage() + tx := storage.NewTx() + require.NoError(t, err) + key := []byte{1} + val := []byte{1, 1} + require.NoError(t, tx.Put(key, val)) + result, err := storage.Get(key) + require.Error(t, err) + require.Nil(t, result) + require.NoError(t, tx.Commit()) + result, err = storage.Get(key) + require.NoError(t, err) + require.Equal(t, val, result) +} diff --git a/services/shhext/api.go b/services/shhext/api.go index ea5babdc8..0d5b8fe72 100644 --- a/services/shhext/api.go +++ b/services/shhext/api.go @@ -427,16 +427,24 @@ func (api *PublicAPI) GetNewFilterMessages(filterID string) ([]dedup.Deduplicate // ConfirmMessagesProcessed is a method to confirm that messages was consumed by // the client side. -func (api *PublicAPI) ConfirmMessagesProcessed(messages []*whisper.Message) error { +func (api *PublicAPI) ConfirmMessagesProcessed(messages []*whisper.Message) (err error) { + tx := api.service.storage.NewTx() + defer func() { + if err == nil { + err = tx.Commit() + } + }() + ctx := NewContextFromService(context.Background(), api.service, tx) for _, msg := range messages { if msg.P2P { - err := api.service.historyUpdates.UpdateTopicHistory(msg.Topic, time.Unix(int64(msg.Timestamp), 0)) + err = api.service.historyUpdates.UpdateTopicHistory(ctx, msg.Topic, time.Unix(int64(msg.Timestamp), 0)) if err != nil { return err } } } - return api.service.deduplicator.AddMessages(messages) + err = api.service.deduplicator.AddMessages(messages) + return err } // ConfirmMessagesProcessedByID is a method to confirm that messages was consumed by @@ -575,32 +583,48 @@ func (api *PublicAPI) requestMessagesUsingPayload(request db.HistoryRequest, pee // - Topic // - Duration in nanoseconds. Will be used to determine starting time for history request. // After that status-go will guarantee that request for this topic and date will be performed. -func (api *PublicAPI) InitiateHistoryRequests(request InitiateHistoryRequestParams) ([]hexutil.Bytes, error) { - rst := []hexutil.Bytes{} - requests, err := api.service.historyUpdates.CreateRequests(request.Requests) +func (api *PublicAPI) InitiateHistoryRequests(parent context.Context, request InitiateHistoryRequestParams) (rst []hexutil.Bytes, err error) { + tx := api.service.storage.NewTx() + defer func() { + if err == nil { + err = tx.Commit() + } + }() + ctx := NewContextFromService(parent, api.service, tx) + requests, err := api.service.historyUpdates.CreateRequests(ctx, request.Requests) if err != nil { return nil, err } + var ( + payload []byte + hash common.Hash + ) for i := range requests { req := requests[i] options := CreateTopicOptionsFromRequest(req) bloom := options.ToBloomFilterOption() - payload, err := bloom.ToMessagesRequestPayload() + payload, err = bloom.ToMessagesRequestPayload() if err != nil { return rst, err } - hash, err := api.requestMessagesUsingPayload(req, request.Peer, request.SymKeyID, payload, request.Force, request.Timeout, options.Topics()) + hash, err = api.requestMessagesUsingPayload(req, request.Peer, request.SymKeyID, payload, request.Force, request.Timeout, options.Topics()) if err != nil { return rst, err } - rst = append(rst, hash[:]) + rst = append(rst, hash.Bytes()) } - return rst, nil + return rst, err } // CompleteRequest client must mark request completed when all envelopes were processed. -func (api *PublicAPI) CompleteRequest(ctx context.Context, hex string) error { - return api.service.historyUpdates.UpdateFinishedRequest(common.HexToHash(hex)) +func (api *PublicAPI) CompleteRequest(parent context.Context, hex string) (err error) { + tx := api.service.storage.NewTx() + ctx := NewContextFromService(parent, api.service, tx) + err = api.service.historyUpdates.UpdateFinishedRequest(ctx, common.HexToHash(hex)) + if err == nil { + return tx.Commit() + } + return err } // DEPRECATED: use SendDirectMessage with DH flag diff --git a/services/shhext/context.go b/services/shhext/context.go new file mode 100644 index 000000000..107506ed6 --- /dev/null +++ b/services/shhext/context.go @@ -0,0 +1,60 @@ +package shhext + +import ( + "context" + "time" + + "github.com/status-im/status-go/db" +) + +// ContextKey is a type used for keys in shhext Context. +type ContextKey struct { + Name string +} + +// NewContextKey returns new ContextKey instance. +func NewContextKey(name string) ContextKey { + return ContextKey{Name: name} +} + +var ( + historyDBKey = NewContextKey("history_db") + requestRegistryKey = NewContextKey("request_registry") + timeKey = NewContextKey("time") +) + +// NewContextFromService creates new context instance using Service fileds directly and Storage. +func NewContextFromService(ctx context.Context, service *Service, storage db.Storage) Context { + return NewContext(ctx, service.w.GetCurrentTime, service.requestsRegistry, storage) +} + +// NewContext creates Context with all required fields. +func NewContext(ctx context.Context, source TimeSource, registry *RequestsRegistry, storage db.Storage) Context { + ctx = context.WithValue(ctx, historyDBKey, db.NewHistoryStore(storage)) + ctx = context.WithValue(ctx, timeKey, source) + ctx = context.WithValue(ctx, requestRegistryKey, registry) + return Context{ctx} +} + +// TimeSource is a type used for current time. +type TimeSource func() time.Time + +// Context provides access to request-scoped values. +type Context struct { + context.Context +} + +// HistoryStore returns db.HistoryStore instance associated with this request. +func (c Context) HistoryStore() db.HistoryStore { + return c.Value(historyDBKey).(db.HistoryStore) +} + +// Time returns current time using time function associated with this request. +func (c Context) Time() time.Time { + return c.Value(timeKey).(TimeSource)() +} + +// RequestRegistry returns RequestRegistry that tracks each request life-span. +func (c Context) RequestRegistry() *RequestsRegistry { + return c.Value(requestRegistryKey).(*RequestsRegistry) +} diff --git a/services/shhext/history.go b/services/shhext/history.go index 8bbf3bc37..430cde598 100644 --- a/services/shhext/history.go +++ b/services/shhext/history.go @@ -20,16 +20,9 @@ const ( WhisperTimeAllowance = 20 * time.Second ) -// TimeSource is a function that returns current time. -type TimeSource func() time.Time - // NewHistoryUpdateReactor creates HistoryUpdateReactor instance. -func NewHistoryUpdateReactor(store db.HistoryStore, registry *RequestsRegistry, timeSource TimeSource) *HistoryUpdateReactor { - return &HistoryUpdateReactor{ - store: store, - registry: registry, - timeSource: timeSource, - } +func NewHistoryUpdateReactor() *HistoryUpdateReactor { + return &HistoryUpdateReactor{} } // HistoryUpdateReactor responsible for tracking progress for all history requests. @@ -38,18 +31,15 @@ func NewHistoryUpdateReactor(store db.HistoryStore, registry *RequestsRegistry, // - when confirmation for request completion is received - we will set last envelope timestamp as the last timestamp // for all TopicLists in current request. type HistoryUpdateReactor struct { - mu sync.Mutex - store db.HistoryStore - registry *RequestsRegistry - timeSource TimeSource + mu sync.Mutex } // UpdateFinishedRequest removes successfully finished request and updates every topic // attached to the request. -func (reactor *HistoryUpdateReactor) UpdateFinishedRequest(id common.Hash) error { +func (reactor *HistoryUpdateReactor) UpdateFinishedRequest(ctx Context, id common.Hash) error { reactor.mu.Lock() defer reactor.mu.Unlock() - req, err := reactor.store.GetRequest(id) + req, err := ctx.HistoryStore().GetRequest(id) if err != nil { return err } @@ -66,10 +56,10 @@ func (reactor *HistoryUpdateReactor) UpdateFinishedRequest(id common.Hash) error } // UpdateTopicHistory updates Current timestamp for the TopicHistory with a given timestamp. -func (reactor *HistoryUpdateReactor) UpdateTopicHistory(topic whisper.TopicType, timestamp time.Time) error { +func (reactor *HistoryUpdateReactor) UpdateTopicHistory(ctx Context, topic whisper.TopicType, timestamp time.Time) error { reactor.mu.Lock() defer reactor.mu.Unlock() - histories, err := reactor.store.GetHistoriesByTopic(topic) + histories, err := ctx.HistoryStore().GetHistoriesByTopic(topic) if err != nil { return err } @@ -103,7 +93,7 @@ type TopicRequest struct { // CreateRequests receives list of topic with desired timestamps and initiates both pending requests and requests // that cover new topics. -func (reactor *HistoryUpdateReactor) CreateRequests(topicRequests []TopicRequest) ([]db.HistoryRequest, error) { +func (reactor *HistoryUpdateReactor) CreateRequests(ctx Context, topicRequests []TopicRequest) ([]db.HistoryRequest, error) { reactor.mu.Lock() defer reactor.mu.Unlock() seen := map[whisper.TopicType]struct{}{} @@ -115,13 +105,13 @@ func (reactor *HistoryUpdateReactor) CreateRequests(topicRequests []TopicRequest } histories := map[whisper.TopicType]db.TopicHistory{} for i := range topicRequests { - th, err := reactor.store.GetHistory(topicRequests[i].Topic, topicRequests[i].Duration) + th, err := ctx.HistoryStore().GetHistory(topicRequests[i].Topic, topicRequests[i].Duration) if err != nil { return nil, err } histories[th.Topic] = th } - requests, err := reactor.store.GetAllRequests() + requests, err := ctx.HistoryStore().GetAllRequests() if err != nil { return nil, err } @@ -133,17 +123,17 @@ func (reactor *HistoryUpdateReactor) CreateRequests(topicRequests []TopicRequest delete(histories, th.Topic) } } - if !reactor.registry.Has(req.ID) { + if !ctx.RequestRegistry().Has(req.ID) { filtered = append(filtered, req) } } - adjusted, err := adjustRequestedHistories(reactor.store, mapToList(histories)) + adjusted, err := adjustRequestedHistories(ctx.HistoryStore(), mapToList(histories)) if err != nil { return nil, err } filtered = append(filtered, - GroupHistoriesByRequestTimespan(reactor.store, adjusted)...) - return RenewRequests(filtered, reactor.timeSource()), nil + GroupHistoriesByRequestTimespan(ctx.HistoryStore(), adjusted)...) + return RenewRequests(filtered, ctx.Time()), nil } // for every history that is not included in any request check if there are other ranges with such topic in db diff --git a/services/shhext/history_test.go b/services/shhext/history_test.go index 904cdf486..a75537a93 100644 --- a/services/shhext/history_test.go +++ b/services/shhext/history_test.go @@ -1,6 +1,7 @@ package shhext import ( + "context" "testing" "time" @@ -13,10 +14,16 @@ import ( "github.com/stretchr/testify/require" ) +func newTestContext(t *testing.T) Context { + mdb, err := db.NewMemoryDB() + require.NoError(t, err) + return NewContext(context.Background(), time.Now, NewRequestsRegistry(0), db.NewLevelDBStorage(mdb)) +} + func createInMemStore(t *testing.T) db.HistoryStore { mdb, err := db.NewMemoryDB() require.NoError(t, err) - return db.NewHistoryStore(mdb) + return db.NewHistoryStore(db.NewLevelDBStorage(mdb)) } func TestRenewRequest(t *testing.T) { @@ -95,11 +102,9 @@ func TestBloomFilterToMessageRequestPayload(t *testing.T) { } func TestCreateRequestsEmptyState(t *testing.T) { - now := time.Now() - reactor := NewHistoryUpdateReactor( - createInMemStore(t), NewRequestsRegistry(0), - func() time.Time { return now }) - requests, err := reactor.CreateRequests([]TopicRequest{ + ctx := newTestContext(t) + reactor := NewHistoryUpdateReactor() + requests, err := reactor.CreateRequests(ctx, []TopicRequest{ {Topic: whisper.TopicType{1}, Duration: time.Hour}, {Topic: whisper.TopicType{2}, Duration: time.Hour}, {Topic: whisper.TopicType{3}, Duration: 10 * time.Hour}, @@ -120,14 +125,15 @@ func TestCreateRequestsEmptyState(t *testing.T) { } func TestCreateRequestsWithExistingRequest(t *testing.T) { - store := createInMemStore(t) + ctx := newTestContext(t) + store := ctx.HistoryStore() req := store.NewRequest() req.ID = common.Hash{1} th := store.NewHistory(whisper.TopicType{1}, time.Hour) req.AddHistory(th) require.NoError(t, req.Save()) - reactor := NewHistoryUpdateReactor(store, NewRequestsRegistry(0), time.Now) - requests, err := reactor.CreateRequests([]TopicRequest{ + reactor := NewHistoryUpdateReactor() + requests, err := reactor.CreateRequests(ctx, []TopicRequest{ {Topic: whisper.TopicType{1}, Duration: time.Hour}, {Topic: whisper.TopicType{2}, Duration: time.Hour}, {Topic: whisper.TopicType{3}, Duration: time.Hour}, @@ -148,12 +154,11 @@ func TestCreateRequestsWithExistingRequest(t *testing.T) { } func TestCreateMultiRequestsWithSameTopic(t *testing.T) { - now := time.Now() - reactor := NewHistoryUpdateReactor( - createInMemStore(t), NewRequestsRegistry(0), - func() time.Time { return now }) + ctx := newTestContext(t) + store := ctx.HistoryStore() + reactor := NewHistoryUpdateReactor() topic := whisper.TopicType{1} - requests, err := reactor.CreateRequests([]TopicRequest{ + requests, err := reactor.CreateRequests(ctx, []TopicRequest{ {Topic: topic, Duration: time.Hour}, }) require.NoError(t, err) @@ -162,7 +167,7 @@ func TestCreateMultiRequestsWithSameTopic(t *testing.T) { require.NoError(t, requests[0].Save()) // duration changed. request wasn't finished - requests, err = reactor.CreateRequests([]TopicRequest{ + requests, err = reactor.CreateRequests(ctx, []TopicRequest{ {Topic: topic, Duration: 10 * time.Hour}, }) require.NoError(t, err) @@ -180,25 +185,26 @@ func TestCreateMultiRequestsWithSameTopic(t *testing.T) { require.Equal(t, requests[longest].Histories()[0].End, requests[longest^1].Histories()[0].First) for _, r := range requests { - require.NoError(t, reactor.UpdateFinishedRequest(r.ID)) + require.NoError(t, reactor.UpdateFinishedRequest(ctx, r.ID)) } - requests, err = reactor.CreateRequests([]TopicRequest{ + requests, err = reactor.CreateRequests(ctx, []TopicRequest{ {Topic: topic, Duration: 10 * time.Hour}, }) require.NoError(t, err) require.Len(t, requests, 1) - topics, err := reactor.store.GetHistoriesByTopic(topic) + topics, err := store.GetHistoriesByTopic(topic) require.NoError(t, err) require.Len(t, topics, 1) require.Equal(t, 10*time.Hour, topics[0].Duration) } func TestRequestFinishedUpdate(t *testing.T) { - store := createInMemStore(t) + ctx := newTestContext(t) + store := ctx.HistoryStore() req := store.NewRequest() req.ID = common.Hash{1} - now := time.Now() + now := ctx.Time() thOne := store.NewHistory(whisper.TopicType{1}, time.Hour) thOne.End = now thTwo := store.NewHistory(whisper.TopicType{2}, time.Hour) @@ -207,9 +213,9 @@ func TestRequestFinishedUpdate(t *testing.T) { req.AddHistory(thTwo) require.NoError(t, req.Save()) - reactor := NewHistoryUpdateReactor(store, NewRequestsRegistry(0), time.Now) - require.NoError(t, reactor.UpdateTopicHistory(thOne.Topic, now.Add(-time.Minute))) - require.NoError(t, reactor.UpdateFinishedRequest(req.ID)) + reactor := NewHistoryUpdateReactor() + require.NoError(t, reactor.UpdateTopicHistory(ctx, thOne.Topic, now.Add(-time.Minute))) + require.NoError(t, reactor.UpdateFinishedRequest(ctx, req.ID)) _, err := store.GetRequest(req.ID) require.EqualError(t, err, "leveldb: not found") @@ -220,8 +226,9 @@ func TestRequestFinishedUpdate(t *testing.T) { } func TestTopicHistoryUpdate(t *testing.T) { + ctx := newTestContext(t) + store := ctx.HistoryStore() reqID := common.Hash{1} - store := createInMemStore(t) request := store.NewRequest() request.ID = reqID now := time.Now() @@ -230,14 +237,14 @@ func TestTopicHistoryUpdate(t *testing.T) { th.RequestID = request.ID th.End = now require.NoError(t, th.Save()) - reactor := NewHistoryUpdateReactor(store, NewRequestsRegistry(0), time.Now) + reactor := NewHistoryUpdateReactor() timestamp := now.Add(-time.Minute) - require.NoError(t, reactor.UpdateTopicHistory(th.Topic, timestamp)) + require.NoError(t, reactor.UpdateTopicHistory(ctx, th.Topic, timestamp)) require.NoError(t, th.Load()) require.Equal(t, timestamp.Unix(), th.Current.Unix()) - require.NoError(t, reactor.UpdateTopicHistory(th.Topic, now)) + require.NoError(t, reactor.UpdateTopicHistory(ctx, th.Topic, now)) require.NoError(t, th.Load()) require.Equal(t, timestamp.Unix(), th.Current.Unix()) } diff --git a/services/shhext/service.go b/services/shhext/service.go index 9a1066e16..f27d3a520 100644 --- a/services/shhext/service.go +++ b/services/shhext/service.go @@ -42,6 +42,7 @@ type EnvelopeEventsHandler interface { // Service is a service that provides some additional Whisper API. type Service struct { + storage db.TransactionalStorage w *whisper.Whisper config params.ShhextConfig envelopesMonitor *EnvelopesMonitor @@ -73,7 +74,7 @@ func New(w *whisper.Whisper, handler EnvelopeEventsHandler, ldb *leveldb.DB, con delay = config.RequestsDelay } requestsRegistry := NewRequestsRegistry(delay) - historyUpdates := NewHistoryUpdateReactor(db.NewHistoryStore(ldb), requestsRegistry, w.GetCurrentTime) + historyUpdates := NewHistoryUpdateReactor() mailMonitor := &MailRequestMonitor{ w: w, handler: handler, @@ -82,6 +83,7 @@ func New(w *whisper.Whisper, handler EnvelopeEventsHandler, ldb *leveldb.DB, con } envelopesMonitor := NewEnvelopesMonitor(w, handler, config.MailServerConfirmations, ps, config.MaxMessageDeliveryAttempts) return &Service{ + storage: db.NewLevelDBStorage(ldb), w: w, config: config, envelopesMonitor: envelopesMonitor, diff --git a/services/shhext/service_test.go b/services/shhext/service_test.go index 43dc87fed..43d397c50 100644 --- a/services/shhext/service_test.go +++ b/services/shhext/service_test.go @@ -698,6 +698,7 @@ type RequestWithTrackingHistorySuite struct { localWhisperAPI *whisper.PublicWhisperAPI localAPI *PublicAPI localService *Service + localContext Context mailSymKey string remoteMailserver *mailserver.WMailServer @@ -717,6 +718,7 @@ func (s *RequestWithTrackingHistorySuite) SetupTest() { s.localWhisperAPI = whisper.NewPublicWhisperAPI(local) s.localService = New(local, nil, db, params.ShhextConfig{}) + s.localContext = NewContextFromService(context.Background(), s.localService, s.localService.storage) localPkey, err := crypto.GenerateKey() s.Require().NoError(err) s.Require().NoError(s.localService.Start(&p2p.Server{Config: p2p.Config{PrivateKey: localPkey}})) @@ -799,7 +801,7 @@ func (s *RequestWithTrackingHistorySuite) createEmptyFilter(topics ...whisper.To } func (s *RequestWithTrackingHistorySuite) initiateHistoryRequest(topics ...TopicRequest) []hexutil.Bytes { - requests, err := s.localAPI.InitiateHistoryRequests(InitiateHistoryRequestParams{ + requests, err := s.localAPI.InitiateHistoryRequests(context.Background(), InitiateHistoryRequestParams{ Peer: s.remoteNode.String(), SymKeyID: s.mailSymKey, Timeout: 10 * time.Second, @@ -826,7 +828,7 @@ func (s *RequestWithTrackingHistorySuite) waitMessagesDelivered(filterid string, } func (s *RequestWithTrackingHistorySuite) waitNoRequests() { - store := s.localService.historyUpdates.store + store := s.localContext.HistoryStore() s.Require().NoError(utils.Eventually(func() error { reqs, err := store.GetAllRequests() if err != nil { @@ -856,9 +858,9 @@ func (s *RequestWithTrackingHistorySuite) TestMultipleMergeIntoOne() { s.Require().Len(requests, 2) s.waitMessagesDelivered(filterid, hexes...) - s.Require().NoError(s.localService.historyUpdates.UpdateTopicHistory(topic1, time.Now())) - s.Require().NoError(s.localService.historyUpdates.UpdateTopicHistory(topic2, time.Now())) - s.Require().NoError(s.localService.historyUpdates.UpdateTopicHistory(topic3, time.Now())) + s.Require().NoError(s.localService.historyUpdates.UpdateTopicHistory(s.localContext, topic1, time.Now())) + s.Require().NoError(s.localService.historyUpdates.UpdateTopicHistory(s.localContext, topic2, time.Now())) + s.Require().NoError(s.localService.historyUpdates.UpdateTopicHistory(s.localContext, topic3, time.Now())) for _, r := range requests { s.Require().NoError(s.localAPI.CompleteRequest(context.TODO(), r.String())) }