Execute writes atomically only after request was processed without errors (#1454)
* Replace request ID when same request is restarted * Remove unnecessary changes * Execute all writes atomically only if request was processed succesfully * Fix linter * Fix shadowed errors * Fix spelling * Do not append same reference to a byte slice
This commit is contained in:
parent
cba00153e2
commit
14c513bd5a
14
db/db.go
14
db/db.go
|
@ -34,7 +34,7 @@ func NewMemoryDB() (*leveldb.DB, error) {
|
|||
}
|
||||
|
||||
// NewDBNamespace returns instance that ensures isolated operations.
|
||||
func NewDBNamespace(db *leveldb.DB, prefix storagePrefix) LevelDBNamespace {
|
||||
func NewDBNamespace(db Storage, prefix storagePrefix) LevelDBNamespace {
|
||||
return LevelDBNamespace{
|
||||
db: db,
|
||||
prefix: prefix,
|
||||
|
@ -48,7 +48,7 @@ func NewMemoryDBNamespace(prefix storagePrefix) (pdb LevelDBNamespace, err error
|
|||
if err != nil {
|
||||
return pdb, err
|
||||
}
|
||||
return NewDBNamespace(db, prefix), nil
|
||||
return NewDBNamespace(LevelDBStorage{db: db}, prefix), nil
|
||||
}
|
||||
|
||||
// Key creates a DB key for a specified service with specified data
|
||||
|
@ -91,7 +91,7 @@ func Open(path string, opts *opt.Options) (db *leveldb.DB, err error) {
|
|||
|
||||
// LevelDBNamespace database where all operations will be prefixed with a certain bucket.
|
||||
type LevelDBNamespace struct {
|
||||
db *leveldb.DB
|
||||
db Storage
|
||||
prefix storagePrefix
|
||||
}
|
||||
|
||||
|
@ -103,11 +103,11 @@ func (db LevelDBNamespace) prefixedKey(key []byte) []byte {
|
|||
}
|
||||
|
||||
func (db LevelDBNamespace) Put(key, value []byte) error {
|
||||
return db.db.Put(db.prefixedKey(key), value, nil)
|
||||
return db.db.Put(db.prefixedKey(key), value)
|
||||
}
|
||||
|
||||
func (db LevelDBNamespace) Get(key []byte) ([]byte, error) {
|
||||
return db.db.Get(db.prefixedKey(key), nil)
|
||||
return db.db.Get(db.prefixedKey(key))
|
||||
}
|
||||
|
||||
// Range returns leveldb util.Range prefixed with a single byte.
|
||||
|
@ -121,12 +121,12 @@ func (db LevelDBNamespace) Range(prefix, limit []byte) *util.Range {
|
|||
|
||||
// Delete removes key from database.
|
||||
func (db LevelDBNamespace) Delete(key []byte) error {
|
||||
return db.db.Delete(db.prefixedKey(key), nil)
|
||||
return db.db.Delete(db.prefixedKey(key))
|
||||
}
|
||||
|
||||
// NewIterator returns iterator for a given slice.
|
||||
func (db LevelDBNamespace) NewIterator(slice *util.Range) NamespaceIterator {
|
||||
return NamespaceIterator{db.db.NewIterator(slice, nil)}
|
||||
return NamespaceIterator{db.db.NewIterator(slice)}
|
||||
}
|
||||
|
||||
// NamespaceIterator wraps leveldb iterator, works mostly the same way.
|
||||
|
|
|
@ -5,15 +5,14 @@ import (
|
|||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
whisper "github.com/status-im/whisper/whisperv6"
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
"github.com/syndtr/goleveldb/leveldb/errors"
|
||||
)
|
||||
|
||||
// NewHistoryStore returns HistoryStore instance.
|
||||
func NewHistoryStore(db *leveldb.DB) HistoryStore {
|
||||
func NewHistoryStore(storage Storage) HistoryStore {
|
||||
return HistoryStore{
|
||||
topicDB: NewDBNamespace(db, TopicHistoryBucket),
|
||||
requestDB: NewDBNamespace(db, HistoryRequestBucket),
|
||||
topicDB: NewDBNamespace(storage, TopicHistoryBucket),
|
||||
requestDB: NewDBNamespace(storage, HistoryRequestBucket),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -12,7 +12,7 @@ import (
|
|||
func createInMemStore(t *testing.T) HistoryStore {
|
||||
db, err := NewMemoryDB()
|
||||
require.NoError(t, err)
|
||||
return NewHistoryStore(db)
|
||||
return NewHistoryStore(LevelDBStorage{db: db})
|
||||
}
|
||||
|
||||
func TestGetNewHistory(t *testing.T) {
|
||||
|
|
|
@ -0,0 +1,75 @@
|
|||
package db
|
||||
|
||||
import (
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
"github.com/syndtr/goleveldb/leveldb/iterator"
|
||||
"github.com/syndtr/goleveldb/leveldb/storage"
|
||||
"github.com/syndtr/goleveldb/leveldb/util"
|
||||
)
|
||||
|
||||
// Storage is an interface for common db operations.
|
||||
type Storage interface {
|
||||
Put([]byte, []byte) error
|
||||
Delete([]byte) error
|
||||
Get([]byte) ([]byte, error)
|
||||
NewIterator(*util.Range) iterator.Iterator
|
||||
}
|
||||
|
||||
// CommitStorage allows to write all tx/batched values atomically.
|
||||
type CommitStorage interface {
|
||||
Storage
|
||||
Commit() error
|
||||
}
|
||||
|
||||
// TransactionalStorage adds transaction features on top of regular storage.
|
||||
type TransactionalStorage interface {
|
||||
Storage
|
||||
NewTx() CommitStorage
|
||||
}
|
||||
|
||||
// NewMemoryLevelDBStorage returns LevelDBStorage instance with in memory leveldb backend.
|
||||
func NewMemoryLevelDBStorage() (LevelDBStorage, error) {
|
||||
mdb, err := leveldb.Open(storage.NewMemStorage(), nil)
|
||||
if err != nil {
|
||||
return LevelDBStorage{}, err
|
||||
}
|
||||
return NewLevelDBStorage(mdb), nil
|
||||
}
|
||||
|
||||
// NewLevelDBStorage creates new LevelDBStorage instance.
|
||||
func NewLevelDBStorage(db *leveldb.DB) LevelDBStorage {
|
||||
return LevelDBStorage{db: db}
|
||||
}
|
||||
|
||||
// LevelDBStorage wrapper around leveldb.DB.
|
||||
type LevelDBStorage struct {
|
||||
db *leveldb.DB
|
||||
}
|
||||
|
||||
// Put upserts given key/value pair.
|
||||
func (db LevelDBStorage) Put(key, buf []byte) error {
|
||||
return db.db.Put(key, buf, nil)
|
||||
}
|
||||
|
||||
// Delete removes given key from database..
|
||||
func (db LevelDBStorage) Delete(key []byte) error {
|
||||
return db.db.Delete(key, nil)
|
||||
}
|
||||
|
||||
// Get returns value for a given key.
|
||||
func (db LevelDBStorage) Get(key []byte) ([]byte, error) {
|
||||
return db.db.Get(key, nil)
|
||||
}
|
||||
|
||||
// NewIterator returns new leveldb iterator.Iterator instance for a given range.
|
||||
func (db LevelDBStorage) NewIterator(slice *util.Range) iterator.Iterator {
|
||||
return db.db.NewIterator(slice, nil)
|
||||
}
|
||||
|
||||
// NewTx is a wrapper around leveldb.Batch that allows to write atomically.
|
||||
func (db LevelDBStorage) NewTx() CommitStorage {
|
||||
return LevelDBTx{
|
||||
batch: &leveldb.Batch{},
|
||||
db: db,
|
||||
}
|
||||
}
|
|
@ -0,0 +1,40 @@
|
|||
package db
|
||||
|
||||
import (
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
"github.com/syndtr/goleveldb/leveldb/iterator"
|
||||
"github.com/syndtr/goleveldb/leveldb/util"
|
||||
)
|
||||
|
||||
// LevelDBTx doesn't provide any read isolation. It allows committing all writes atomically (put/delete).
|
||||
type LevelDBTx struct {
|
||||
batch *leveldb.Batch
|
||||
db LevelDBStorage
|
||||
}
|
||||
|
||||
// Put adds key/value to associated batch.
|
||||
func (tx LevelDBTx) Put(key, buf []byte) error {
|
||||
tx.batch.Put(key, buf)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete adds delete operation to associated batch.
|
||||
func (tx LevelDBTx) Delete(key []byte) error {
|
||||
tx.batch.Delete(key)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get reads from currently committed state.
|
||||
func (tx LevelDBTx) Get(key []byte) ([]byte, error) {
|
||||
return tx.db.Get(key)
|
||||
}
|
||||
|
||||
// NewIterator returns iterator.Iterator that will read from currently committed state.
|
||||
func (tx LevelDBTx) NewIterator(slice *util.Range) iterator.Iterator {
|
||||
return tx.db.NewIterator(slice)
|
||||
}
|
||||
|
||||
// Commit writes batch atomically.
|
||||
func (tx LevelDBTx) Commit() error {
|
||||
return tx.db.db.Write(tx.batch, nil)
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
package db
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestTxWritesOnCommit(t *testing.T) {
|
||||
storage, err := NewMemoryLevelDBStorage()
|
||||
tx := storage.NewTx()
|
||||
require.NoError(t, err)
|
||||
key := []byte{1}
|
||||
val := []byte{1, 1}
|
||||
require.NoError(t, tx.Put(key, val))
|
||||
result, err := storage.Get(key)
|
||||
require.Error(t, err)
|
||||
require.Nil(t, result)
|
||||
require.NoError(t, tx.Commit())
|
||||
result, err = storage.Get(key)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, val, result)
|
||||
}
|
|
@ -427,16 +427,24 @@ func (api *PublicAPI) GetNewFilterMessages(filterID string) ([]dedup.Deduplicate
|
|||
|
||||
// ConfirmMessagesProcessed is a method to confirm that messages was consumed by
|
||||
// the client side.
|
||||
func (api *PublicAPI) ConfirmMessagesProcessed(messages []*whisper.Message) error {
|
||||
func (api *PublicAPI) ConfirmMessagesProcessed(messages []*whisper.Message) (err error) {
|
||||
tx := api.service.storage.NewTx()
|
||||
defer func() {
|
||||
if err == nil {
|
||||
err = tx.Commit()
|
||||
}
|
||||
}()
|
||||
ctx := NewContextFromService(context.Background(), api.service, tx)
|
||||
for _, msg := range messages {
|
||||
if msg.P2P {
|
||||
err := api.service.historyUpdates.UpdateTopicHistory(msg.Topic, time.Unix(int64(msg.Timestamp), 0))
|
||||
err = api.service.historyUpdates.UpdateTopicHistory(ctx, msg.Topic, time.Unix(int64(msg.Timestamp), 0))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return api.service.deduplicator.AddMessages(messages)
|
||||
err = api.service.deduplicator.AddMessages(messages)
|
||||
return err
|
||||
}
|
||||
|
||||
// ConfirmMessagesProcessedByID is a method to confirm that messages was consumed by
|
||||
|
@ -575,32 +583,48 @@ func (api *PublicAPI) requestMessagesUsingPayload(request db.HistoryRequest, pee
|
|||
// - Topic
|
||||
// - Duration in nanoseconds. Will be used to determine starting time for history request.
|
||||
// After that status-go will guarantee that request for this topic and date will be performed.
|
||||
func (api *PublicAPI) InitiateHistoryRequests(request InitiateHistoryRequestParams) ([]hexutil.Bytes, error) {
|
||||
rst := []hexutil.Bytes{}
|
||||
requests, err := api.service.historyUpdates.CreateRequests(request.Requests)
|
||||
func (api *PublicAPI) InitiateHistoryRequests(parent context.Context, request InitiateHistoryRequestParams) (rst []hexutil.Bytes, err error) {
|
||||
tx := api.service.storage.NewTx()
|
||||
defer func() {
|
||||
if err == nil {
|
||||
err = tx.Commit()
|
||||
}
|
||||
}()
|
||||
ctx := NewContextFromService(parent, api.service, tx)
|
||||
requests, err := api.service.historyUpdates.CreateRequests(ctx, request.Requests)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var (
|
||||
payload []byte
|
||||
hash common.Hash
|
||||
)
|
||||
for i := range requests {
|
||||
req := requests[i]
|
||||
options := CreateTopicOptionsFromRequest(req)
|
||||
bloom := options.ToBloomFilterOption()
|
||||
payload, err := bloom.ToMessagesRequestPayload()
|
||||
payload, err = bloom.ToMessagesRequestPayload()
|
||||
if err != nil {
|
||||
return rst, err
|
||||
}
|
||||
hash, err := api.requestMessagesUsingPayload(req, request.Peer, request.SymKeyID, payload, request.Force, request.Timeout, options.Topics())
|
||||
hash, err = api.requestMessagesUsingPayload(req, request.Peer, request.SymKeyID, payload, request.Force, request.Timeout, options.Topics())
|
||||
if err != nil {
|
||||
return rst, err
|
||||
}
|
||||
rst = append(rst, hash[:])
|
||||
rst = append(rst, hash.Bytes())
|
||||
}
|
||||
return rst, nil
|
||||
return rst, err
|
||||
}
|
||||
|
||||
// CompleteRequest client must mark request completed when all envelopes were processed.
|
||||
func (api *PublicAPI) CompleteRequest(ctx context.Context, hex string) error {
|
||||
return api.service.historyUpdates.UpdateFinishedRequest(common.HexToHash(hex))
|
||||
func (api *PublicAPI) CompleteRequest(parent context.Context, hex string) (err error) {
|
||||
tx := api.service.storage.NewTx()
|
||||
ctx := NewContextFromService(parent, api.service, tx)
|
||||
err = api.service.historyUpdates.UpdateFinishedRequest(ctx, common.HexToHash(hex))
|
||||
if err == nil {
|
||||
return tx.Commit()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// DEPRECATED: use SendDirectMessage with DH flag
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
package shhext
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/status-im/status-go/db"
|
||||
)
|
||||
|
||||
// ContextKey is a type used for keys in shhext Context.
|
||||
type ContextKey struct {
|
||||
Name string
|
||||
}
|
||||
|
||||
// NewContextKey returns new ContextKey instance.
|
||||
func NewContextKey(name string) ContextKey {
|
||||
return ContextKey{Name: name}
|
||||
}
|
||||
|
||||
var (
|
||||
historyDBKey = NewContextKey("history_db")
|
||||
requestRegistryKey = NewContextKey("request_registry")
|
||||
timeKey = NewContextKey("time")
|
||||
)
|
||||
|
||||
// NewContextFromService creates new context instance using Service fileds directly and Storage.
|
||||
func NewContextFromService(ctx context.Context, service *Service, storage db.Storage) Context {
|
||||
return NewContext(ctx, service.w.GetCurrentTime, service.requestsRegistry, storage)
|
||||
}
|
||||
|
||||
// NewContext creates Context with all required fields.
|
||||
func NewContext(ctx context.Context, source TimeSource, registry *RequestsRegistry, storage db.Storage) Context {
|
||||
ctx = context.WithValue(ctx, historyDBKey, db.NewHistoryStore(storage))
|
||||
ctx = context.WithValue(ctx, timeKey, source)
|
||||
ctx = context.WithValue(ctx, requestRegistryKey, registry)
|
||||
return Context{ctx}
|
||||
}
|
||||
|
||||
// TimeSource is a type used for current time.
|
||||
type TimeSource func() time.Time
|
||||
|
||||
// Context provides access to request-scoped values.
|
||||
type Context struct {
|
||||
context.Context
|
||||
}
|
||||
|
||||
// HistoryStore returns db.HistoryStore instance associated with this request.
|
||||
func (c Context) HistoryStore() db.HistoryStore {
|
||||
return c.Value(historyDBKey).(db.HistoryStore)
|
||||
}
|
||||
|
||||
// Time returns current time using time function associated with this request.
|
||||
func (c Context) Time() time.Time {
|
||||
return c.Value(timeKey).(TimeSource)()
|
||||
}
|
||||
|
||||
// RequestRegistry returns RequestRegistry that tracks each request life-span.
|
||||
func (c Context) RequestRegistry() *RequestsRegistry {
|
||||
return c.Value(requestRegistryKey).(*RequestsRegistry)
|
||||
}
|
|
@ -20,16 +20,9 @@ const (
|
|||
WhisperTimeAllowance = 20 * time.Second
|
||||
)
|
||||
|
||||
// TimeSource is a function that returns current time.
|
||||
type TimeSource func() time.Time
|
||||
|
||||
// NewHistoryUpdateReactor creates HistoryUpdateReactor instance.
|
||||
func NewHistoryUpdateReactor(store db.HistoryStore, registry *RequestsRegistry, timeSource TimeSource) *HistoryUpdateReactor {
|
||||
return &HistoryUpdateReactor{
|
||||
store: store,
|
||||
registry: registry,
|
||||
timeSource: timeSource,
|
||||
}
|
||||
func NewHistoryUpdateReactor() *HistoryUpdateReactor {
|
||||
return &HistoryUpdateReactor{}
|
||||
}
|
||||
|
||||
// HistoryUpdateReactor responsible for tracking progress for all history requests.
|
||||
|
@ -38,18 +31,15 @@ func NewHistoryUpdateReactor(store db.HistoryStore, registry *RequestsRegistry,
|
|||
// - when confirmation for request completion is received - we will set last envelope timestamp as the last timestamp
|
||||
// for all TopicLists in current request.
|
||||
type HistoryUpdateReactor struct {
|
||||
mu sync.Mutex
|
||||
store db.HistoryStore
|
||||
registry *RequestsRegistry
|
||||
timeSource TimeSource
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// UpdateFinishedRequest removes successfully finished request and updates every topic
|
||||
// attached to the request.
|
||||
func (reactor *HistoryUpdateReactor) UpdateFinishedRequest(id common.Hash) error {
|
||||
func (reactor *HistoryUpdateReactor) UpdateFinishedRequest(ctx Context, id common.Hash) error {
|
||||
reactor.mu.Lock()
|
||||
defer reactor.mu.Unlock()
|
||||
req, err := reactor.store.GetRequest(id)
|
||||
req, err := ctx.HistoryStore().GetRequest(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -66,10 +56,10 @@ func (reactor *HistoryUpdateReactor) UpdateFinishedRequest(id common.Hash) error
|
|||
}
|
||||
|
||||
// UpdateTopicHistory updates Current timestamp for the TopicHistory with a given timestamp.
|
||||
func (reactor *HistoryUpdateReactor) UpdateTopicHistory(topic whisper.TopicType, timestamp time.Time) error {
|
||||
func (reactor *HistoryUpdateReactor) UpdateTopicHistory(ctx Context, topic whisper.TopicType, timestamp time.Time) error {
|
||||
reactor.mu.Lock()
|
||||
defer reactor.mu.Unlock()
|
||||
histories, err := reactor.store.GetHistoriesByTopic(topic)
|
||||
histories, err := ctx.HistoryStore().GetHistoriesByTopic(topic)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -103,7 +93,7 @@ type TopicRequest struct {
|
|||
|
||||
// CreateRequests receives list of topic with desired timestamps and initiates both pending requests and requests
|
||||
// that cover new topics.
|
||||
func (reactor *HistoryUpdateReactor) CreateRequests(topicRequests []TopicRequest) ([]db.HistoryRequest, error) {
|
||||
func (reactor *HistoryUpdateReactor) CreateRequests(ctx Context, topicRequests []TopicRequest) ([]db.HistoryRequest, error) {
|
||||
reactor.mu.Lock()
|
||||
defer reactor.mu.Unlock()
|
||||
seen := map[whisper.TopicType]struct{}{}
|
||||
|
@ -115,13 +105,13 @@ func (reactor *HistoryUpdateReactor) CreateRequests(topicRequests []TopicRequest
|
|||
}
|
||||
histories := map[whisper.TopicType]db.TopicHistory{}
|
||||
for i := range topicRequests {
|
||||
th, err := reactor.store.GetHistory(topicRequests[i].Topic, topicRequests[i].Duration)
|
||||
th, err := ctx.HistoryStore().GetHistory(topicRequests[i].Topic, topicRequests[i].Duration)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
histories[th.Topic] = th
|
||||
}
|
||||
requests, err := reactor.store.GetAllRequests()
|
||||
requests, err := ctx.HistoryStore().GetAllRequests()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -133,17 +123,17 @@ func (reactor *HistoryUpdateReactor) CreateRequests(topicRequests []TopicRequest
|
|||
delete(histories, th.Topic)
|
||||
}
|
||||
}
|
||||
if !reactor.registry.Has(req.ID) {
|
||||
if !ctx.RequestRegistry().Has(req.ID) {
|
||||
filtered = append(filtered, req)
|
||||
}
|
||||
}
|
||||
adjusted, err := adjustRequestedHistories(reactor.store, mapToList(histories))
|
||||
adjusted, err := adjustRequestedHistories(ctx.HistoryStore(), mapToList(histories))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
filtered = append(filtered,
|
||||
GroupHistoriesByRequestTimespan(reactor.store, adjusted)...)
|
||||
return RenewRequests(filtered, reactor.timeSource()), nil
|
||||
GroupHistoriesByRequestTimespan(ctx.HistoryStore(), adjusted)...)
|
||||
return RenewRequests(filtered, ctx.Time()), nil
|
||||
}
|
||||
|
||||
// for every history that is not included in any request check if there are other ranges with such topic in db
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package shhext
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -13,10 +14,16 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func newTestContext(t *testing.T) Context {
|
||||
mdb, err := db.NewMemoryDB()
|
||||
require.NoError(t, err)
|
||||
return NewContext(context.Background(), time.Now, NewRequestsRegistry(0), db.NewLevelDBStorage(mdb))
|
||||
}
|
||||
|
||||
func createInMemStore(t *testing.T) db.HistoryStore {
|
||||
mdb, err := db.NewMemoryDB()
|
||||
require.NoError(t, err)
|
||||
return db.NewHistoryStore(mdb)
|
||||
return db.NewHistoryStore(db.NewLevelDBStorage(mdb))
|
||||
}
|
||||
|
||||
func TestRenewRequest(t *testing.T) {
|
||||
|
@ -95,11 +102,9 @@ func TestBloomFilterToMessageRequestPayload(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestCreateRequestsEmptyState(t *testing.T) {
|
||||
now := time.Now()
|
||||
reactor := NewHistoryUpdateReactor(
|
||||
createInMemStore(t), NewRequestsRegistry(0),
|
||||
func() time.Time { return now })
|
||||
requests, err := reactor.CreateRequests([]TopicRequest{
|
||||
ctx := newTestContext(t)
|
||||
reactor := NewHistoryUpdateReactor()
|
||||
requests, err := reactor.CreateRequests(ctx, []TopicRequest{
|
||||
{Topic: whisper.TopicType{1}, Duration: time.Hour},
|
||||
{Topic: whisper.TopicType{2}, Duration: time.Hour},
|
||||
{Topic: whisper.TopicType{3}, Duration: 10 * time.Hour},
|
||||
|
@ -120,14 +125,15 @@ func TestCreateRequestsEmptyState(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestCreateRequestsWithExistingRequest(t *testing.T) {
|
||||
store := createInMemStore(t)
|
||||
ctx := newTestContext(t)
|
||||
store := ctx.HistoryStore()
|
||||
req := store.NewRequest()
|
||||
req.ID = common.Hash{1}
|
||||
th := store.NewHistory(whisper.TopicType{1}, time.Hour)
|
||||
req.AddHistory(th)
|
||||
require.NoError(t, req.Save())
|
||||
reactor := NewHistoryUpdateReactor(store, NewRequestsRegistry(0), time.Now)
|
||||
requests, err := reactor.CreateRequests([]TopicRequest{
|
||||
reactor := NewHistoryUpdateReactor()
|
||||
requests, err := reactor.CreateRequests(ctx, []TopicRequest{
|
||||
{Topic: whisper.TopicType{1}, Duration: time.Hour},
|
||||
{Topic: whisper.TopicType{2}, Duration: time.Hour},
|
||||
{Topic: whisper.TopicType{3}, Duration: time.Hour},
|
||||
|
@ -148,12 +154,11 @@ func TestCreateRequestsWithExistingRequest(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestCreateMultiRequestsWithSameTopic(t *testing.T) {
|
||||
now := time.Now()
|
||||
reactor := NewHistoryUpdateReactor(
|
||||
createInMemStore(t), NewRequestsRegistry(0),
|
||||
func() time.Time { return now })
|
||||
ctx := newTestContext(t)
|
||||
store := ctx.HistoryStore()
|
||||
reactor := NewHistoryUpdateReactor()
|
||||
topic := whisper.TopicType{1}
|
||||
requests, err := reactor.CreateRequests([]TopicRequest{
|
||||
requests, err := reactor.CreateRequests(ctx, []TopicRequest{
|
||||
{Topic: topic, Duration: time.Hour},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
@ -162,7 +167,7 @@ func TestCreateMultiRequestsWithSameTopic(t *testing.T) {
|
|||
require.NoError(t, requests[0].Save())
|
||||
|
||||
// duration changed. request wasn't finished
|
||||
requests, err = reactor.CreateRequests([]TopicRequest{
|
||||
requests, err = reactor.CreateRequests(ctx, []TopicRequest{
|
||||
{Topic: topic, Duration: 10 * time.Hour},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
@ -180,25 +185,26 @@ func TestCreateMultiRequestsWithSameTopic(t *testing.T) {
|
|||
require.Equal(t, requests[longest].Histories()[0].End, requests[longest^1].Histories()[0].First)
|
||||
|
||||
for _, r := range requests {
|
||||
require.NoError(t, reactor.UpdateFinishedRequest(r.ID))
|
||||
require.NoError(t, reactor.UpdateFinishedRequest(ctx, r.ID))
|
||||
}
|
||||
requests, err = reactor.CreateRequests([]TopicRequest{
|
||||
requests, err = reactor.CreateRequests(ctx, []TopicRequest{
|
||||
{Topic: topic, Duration: 10 * time.Hour},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, requests, 1)
|
||||
|
||||
topics, err := reactor.store.GetHistoriesByTopic(topic)
|
||||
topics, err := store.GetHistoriesByTopic(topic)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, topics, 1)
|
||||
require.Equal(t, 10*time.Hour, topics[0].Duration)
|
||||
}
|
||||
|
||||
func TestRequestFinishedUpdate(t *testing.T) {
|
||||
store := createInMemStore(t)
|
||||
ctx := newTestContext(t)
|
||||
store := ctx.HistoryStore()
|
||||
req := store.NewRequest()
|
||||
req.ID = common.Hash{1}
|
||||
now := time.Now()
|
||||
now := ctx.Time()
|
||||
thOne := store.NewHistory(whisper.TopicType{1}, time.Hour)
|
||||
thOne.End = now
|
||||
thTwo := store.NewHistory(whisper.TopicType{2}, time.Hour)
|
||||
|
@ -207,9 +213,9 @@ func TestRequestFinishedUpdate(t *testing.T) {
|
|||
req.AddHistory(thTwo)
|
||||
require.NoError(t, req.Save())
|
||||
|
||||
reactor := NewHistoryUpdateReactor(store, NewRequestsRegistry(0), time.Now)
|
||||
require.NoError(t, reactor.UpdateTopicHistory(thOne.Topic, now.Add(-time.Minute)))
|
||||
require.NoError(t, reactor.UpdateFinishedRequest(req.ID))
|
||||
reactor := NewHistoryUpdateReactor()
|
||||
require.NoError(t, reactor.UpdateTopicHistory(ctx, thOne.Topic, now.Add(-time.Minute)))
|
||||
require.NoError(t, reactor.UpdateFinishedRequest(ctx, req.ID))
|
||||
_, err := store.GetRequest(req.ID)
|
||||
require.EqualError(t, err, "leveldb: not found")
|
||||
|
||||
|
@ -220,8 +226,9 @@ func TestRequestFinishedUpdate(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestTopicHistoryUpdate(t *testing.T) {
|
||||
ctx := newTestContext(t)
|
||||
store := ctx.HistoryStore()
|
||||
reqID := common.Hash{1}
|
||||
store := createInMemStore(t)
|
||||
request := store.NewRequest()
|
||||
request.ID = reqID
|
||||
now := time.Now()
|
||||
|
@ -230,14 +237,14 @@ func TestTopicHistoryUpdate(t *testing.T) {
|
|||
th.RequestID = request.ID
|
||||
th.End = now
|
||||
require.NoError(t, th.Save())
|
||||
reactor := NewHistoryUpdateReactor(store, NewRequestsRegistry(0), time.Now)
|
||||
reactor := NewHistoryUpdateReactor()
|
||||
timestamp := now.Add(-time.Minute)
|
||||
|
||||
require.NoError(t, reactor.UpdateTopicHistory(th.Topic, timestamp))
|
||||
require.NoError(t, reactor.UpdateTopicHistory(ctx, th.Topic, timestamp))
|
||||
require.NoError(t, th.Load())
|
||||
require.Equal(t, timestamp.Unix(), th.Current.Unix())
|
||||
|
||||
require.NoError(t, reactor.UpdateTopicHistory(th.Topic, now))
|
||||
require.NoError(t, reactor.UpdateTopicHistory(ctx, th.Topic, now))
|
||||
require.NoError(t, th.Load())
|
||||
require.Equal(t, timestamp.Unix(), th.Current.Unix())
|
||||
}
|
||||
|
|
|
@ -42,6 +42,7 @@ type EnvelopeEventsHandler interface {
|
|||
|
||||
// Service is a service that provides some additional Whisper API.
|
||||
type Service struct {
|
||||
storage db.TransactionalStorage
|
||||
w *whisper.Whisper
|
||||
config params.ShhextConfig
|
||||
envelopesMonitor *EnvelopesMonitor
|
||||
|
@ -73,7 +74,7 @@ func New(w *whisper.Whisper, handler EnvelopeEventsHandler, ldb *leveldb.DB, con
|
|||
delay = config.RequestsDelay
|
||||
}
|
||||
requestsRegistry := NewRequestsRegistry(delay)
|
||||
historyUpdates := NewHistoryUpdateReactor(db.NewHistoryStore(ldb), requestsRegistry, w.GetCurrentTime)
|
||||
historyUpdates := NewHistoryUpdateReactor()
|
||||
mailMonitor := &MailRequestMonitor{
|
||||
w: w,
|
||||
handler: handler,
|
||||
|
@ -82,6 +83,7 @@ func New(w *whisper.Whisper, handler EnvelopeEventsHandler, ldb *leveldb.DB, con
|
|||
}
|
||||
envelopesMonitor := NewEnvelopesMonitor(w, handler, config.MailServerConfirmations, ps, config.MaxMessageDeliveryAttempts)
|
||||
return &Service{
|
||||
storage: db.NewLevelDBStorage(ldb),
|
||||
w: w,
|
||||
config: config,
|
||||
envelopesMonitor: envelopesMonitor,
|
||||
|
|
|
@ -698,6 +698,7 @@ type RequestWithTrackingHistorySuite struct {
|
|||
localWhisperAPI *whisper.PublicWhisperAPI
|
||||
localAPI *PublicAPI
|
||||
localService *Service
|
||||
localContext Context
|
||||
mailSymKey string
|
||||
|
||||
remoteMailserver *mailserver.WMailServer
|
||||
|
@ -717,6 +718,7 @@ func (s *RequestWithTrackingHistorySuite) SetupTest() {
|
|||
|
||||
s.localWhisperAPI = whisper.NewPublicWhisperAPI(local)
|
||||
s.localService = New(local, nil, db, params.ShhextConfig{})
|
||||
s.localContext = NewContextFromService(context.Background(), s.localService, s.localService.storage)
|
||||
localPkey, err := crypto.GenerateKey()
|
||||
s.Require().NoError(err)
|
||||
s.Require().NoError(s.localService.Start(&p2p.Server{Config: p2p.Config{PrivateKey: localPkey}}))
|
||||
|
@ -799,7 +801,7 @@ func (s *RequestWithTrackingHistorySuite) createEmptyFilter(topics ...whisper.To
|
|||
}
|
||||
|
||||
func (s *RequestWithTrackingHistorySuite) initiateHistoryRequest(topics ...TopicRequest) []hexutil.Bytes {
|
||||
requests, err := s.localAPI.InitiateHistoryRequests(InitiateHistoryRequestParams{
|
||||
requests, err := s.localAPI.InitiateHistoryRequests(context.Background(), InitiateHistoryRequestParams{
|
||||
Peer: s.remoteNode.String(),
|
||||
SymKeyID: s.mailSymKey,
|
||||
Timeout: 10 * time.Second,
|
||||
|
@ -826,7 +828,7 @@ func (s *RequestWithTrackingHistorySuite) waitMessagesDelivered(filterid string,
|
|||
}
|
||||
|
||||
func (s *RequestWithTrackingHistorySuite) waitNoRequests() {
|
||||
store := s.localService.historyUpdates.store
|
||||
store := s.localContext.HistoryStore()
|
||||
s.Require().NoError(utils.Eventually(func() error {
|
||||
reqs, err := store.GetAllRequests()
|
||||
if err != nil {
|
||||
|
@ -856,9 +858,9 @@ func (s *RequestWithTrackingHistorySuite) TestMultipleMergeIntoOne() {
|
|||
s.Require().Len(requests, 2)
|
||||
s.waitMessagesDelivered(filterid, hexes...)
|
||||
|
||||
s.Require().NoError(s.localService.historyUpdates.UpdateTopicHistory(topic1, time.Now()))
|
||||
s.Require().NoError(s.localService.historyUpdates.UpdateTopicHistory(topic2, time.Now()))
|
||||
s.Require().NoError(s.localService.historyUpdates.UpdateTopicHistory(topic3, time.Now()))
|
||||
s.Require().NoError(s.localService.historyUpdates.UpdateTopicHistory(s.localContext, topic1, time.Now()))
|
||||
s.Require().NoError(s.localService.historyUpdates.UpdateTopicHistory(s.localContext, topic2, time.Now()))
|
||||
s.Require().NoError(s.localService.historyUpdates.UpdateTopicHistory(s.localContext, topic3, time.Now()))
|
||||
for _, r := range requests {
|
||||
s.Require().NoError(s.localAPI.CompleteRequest(context.TODO(), r.String()))
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue