refactor(store): remove deprecated waku message store type

This commit is contained in:
Lorenzo Delgado 2022-09-13 13:36:04 +02:00 committed by GitHub
parent f9784b908a
commit 62cfc2abc1
8 changed files with 64 additions and 68 deletions

View File

@ -7,7 +7,7 @@ import
chronicles, chronicles,
sqlite3_abi sqlite3_abi
import import
../../waku/v2/node/storage/message/waku_message_store, ../../waku/v2/node/storage/message/sqlite_store,
../../waku/v2/node/storage/sqlite, ../../waku/v2/node/storage/sqlite,
../../waku/v2/protocol/waku_message, ../../waku/v2/protocol/waku_message,
../../waku/v2/utils/time, ../../waku/v2/utils/time,
@ -43,12 +43,13 @@ suite "SQLite message store - init store":
test "init store": test "init store":
## Given ## Given
const storeCapacity = 20 const storeCapacity = 20
const retentionTime = days(20).seconds
let database = newTestDatabase() let database = newTestDatabase()
## When ## When
let resStore = WakuMessageStore.init(database, capacity=storeCapacity, retentionTime=retentionTime) let
retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=storeCapacity)
resStore = SqliteStore.init(database, retentionPolicy=some(retentionPolicy))
## Then ## Then
check: check:
@ -78,7 +79,7 @@ suite "SQLite message store - insert messages":
let let
database = newTestDatabase() database = newTestDatabase()
store = WakuMessageStore.init(database).tryGet() store = SqliteStore.init(database).tryGet()
let message = fakeWakuMessage(contentTopic=contentTopic) let message = fakeWakuMessage(contentTopic=contentTopic)
let messageIndex = Index.compute(message, getNanosecondTime(epochTime()), DefaultPubsubTopic) let messageIndex = Index.compute(message, getNanosecondTime(epochTime()), DefaultPubsubTopic)
@ -108,7 +109,8 @@ suite "SQLite message store - insert messages":
let let
database = newTestDatabase() database = newTestDatabase()
store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet() retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=storeCapacity)
store = SqliteStore.init(database, retentionPolicy=some(retentionPolicy)).tryGet()
let messages = @[ let messages = @[
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0),
@ -146,7 +148,7 @@ suite "Message Store":
## Given ## Given
let let
database = newTestDatabase() database = newTestDatabase()
store = WakuMessageStore.init(database).get() store = SqliteStore.init(database).get()
topic = DefaultContentTopic topic = DefaultContentTopic
pubsubTopic = DefaultPubsubTopic pubsubTopic = DefaultPubsubTopic
@ -229,7 +231,7 @@ suite "Message Store":
## Given ## Given
let let
database = newTestDatabase() database = newTestDatabase()
store = WakuMessageStore.init(database).get() store = SqliteStore.init(database).get()
## When ## When
let resSetVersion = database.setUserVersion(5) let resSetVersion = database.setUserVersion(5)
@ -250,7 +252,7 @@ suite "Message Store":
test "migration": test "migration":
let let
database = SqliteDatabase.init("", inMemory = true)[] database = SqliteDatabase.init("", inMemory = true)[]
store = WakuMessageStore.init(database)[] store = SqliteStore.init(database)[]
defer: store.close() defer: store.close()
template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0] template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0]
@ -267,11 +269,14 @@ suite "Message Store":
test "number of messages retrieved by getAll is bounded by storeCapacity": test "number of messages retrieved by getAll is bounded by storeCapacity":
let let
database = SqliteDatabase.init("", inMemory = true)[]
contentTopic = ContentTopic("/waku/2/default-content/proto") contentTopic = ContentTopic("/waku/2/default-content/proto")
pubsubTopic = "/waku/2/default-waku/proto" pubsubTopic = "/waku/2/default-waku/proto"
capacity = 10 capacity = 10
store = WakuMessageStore.init(database, capacity)[]
let
database = SqliteDatabase.init("", inMemory = true)[]
retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=capacity)
store = SqliteStore.init(database, retentionPolicy=some(retentionPolicy)).tryGet()
for i in 1..capacity: for i in 1..capacity:
@ -299,13 +304,15 @@ suite "Message Store":
test "DB store capacity": test "DB store capacity":
let let
database = SqliteDatabase.init("", inMemory = true)[]
contentTopic = ContentTopic("/waku/2/default-content/proto") contentTopic = ContentTopic("/waku/2/default-content/proto")
pubsubTopic = "/waku/2/default-waku/proto" pubsubTopic = "/waku/2/default-waku/proto"
capacity = 100 capacity = 100
overload = 65 overload = 65
store = WakuMessageStore.init(database, capacity)[]
let
database = SqliteDatabase.init("", inMemory = true)[]
retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=capacity)
store = SqliteStore.init(database, retentionPolicy=some(retentionPolicy)).tryGet()
defer: store.close() defer: store.close()
for i in 1..capacity+overload: for i in 1..capacity+overload:
@ -319,11 +326,11 @@ suite "Message Store":
var numMessages: int64 var numMessages: int64
proc handler(s: ptr sqlite3_stmt) = proc handler(s: ptr sqlite3_stmt) =
numMessages = sqlite3_column_int64(s, 0) numMessages = sqlite3_column_int64(s, 0)
let countQuery = "SELECT COUNT(*) FROM message" # the table name is set in a const in waku_message_store let countQuery = "SELECT COUNT(*) FROM message" # the table name is set in a const in sqlite_store
discard database.query(countQuery, handler) discard database.query(countQuery, handler)
check: check:
# expected number of messages is 120 because # expected number of messages is 120 because
# (capacity = 100) + (half of the overflow window = 15) + (5 messages added after after the last delete) # (capacity = 100) + (half of the overflow window = 15) + (5 messages added after after the last delete)
# the window size changes when changing `const maxStoreOverflow = 1.3 in waku_message_store # the window size changes when changing `const maxStoreOverflow = 1.3 in sqlite_store
numMessages == 120 numMessages == 120

View File

@ -6,7 +6,7 @@ import
unittest2, unittest2,
chronos, chronos,
chronicles, chronicles,
../../waku/v2/node/storage/message/waku_message_store, ../../waku/v2/node/storage/message/sqlite_store,
../../waku/v2/node/storage/sqlite, ../../waku/v2/node/storage/sqlite,
../../waku/v2/protocol/waku_message, ../../waku/v2/protocol/waku_message,
../../waku/v2/utils/time, ../../waku/v2/utils/time,
@ -44,7 +44,8 @@ suite "message store - history query":
let let
database = newTestDatabase() database = newTestDatabase()
store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet() retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=storeCapacity)
store = SqliteStore.init(database, retentionPolicy=some(retentionPolicy)).tryGet()
let messages = @[ let messages = @[
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0),
@ -95,7 +96,8 @@ suite "message store - history query":
let let
database = newTestDatabase() database = newTestDatabase()
store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet() retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=storeCapacity)
store = SqliteStore.init(database, retentionPolicy=some(retentionPolicy)).tryGet()
let messages = @[ let messages = @[
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0),
@ -148,7 +150,8 @@ suite "message store - history query":
let let
database = newTestDatabase() database = newTestDatabase()
store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet() retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=storeCapacity)
store = SqliteStore.init(database, retentionPolicy=some(retentionPolicy)).tryGet()
let messages = @[ let messages = @[
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0),
@ -200,7 +203,8 @@ suite "message store - history query":
let let
database = newTestDatabase() database = newTestDatabase()
store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet() retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=storeCapacity)
store = SqliteStore.init(database, retentionPolicy=some(retentionPolicy)).tryGet()
let messages1 = @[ let messages1 = @[
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0),
@ -257,7 +261,8 @@ suite "message store - history query":
let let
database = newTestDatabase() database = newTestDatabase()
store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet() retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=storeCapacity)
store = SqliteStore.init(database, retentionPolicy=some(retentionPolicy)).tryGet()
let messages = @[ let messages = @[
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0),
@ -311,7 +316,8 @@ suite "message store - history query":
let let
database = newTestDatabase() database = newTestDatabase()
store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet() retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=storeCapacity)
store = SqliteStore.init(database, retentionPolicy=some(retentionPolicy)).tryGet()
let messages = @[ let messages = @[
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0),
@ -366,7 +372,8 @@ suite "message store - history query":
let let
database = newTestDatabase() database = newTestDatabase()
store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet() retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=storeCapacity)
store = SqliteStore.init(database, retentionPolicy=some(retentionPolicy)).tryGet()
let messages1 = @[ let messages1 = @[
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0),
@ -425,7 +432,8 @@ suite "message store - history query":
let let
database = newTestDatabase() database = newTestDatabase()
store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet() retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=storeCapacity)
store = SqliteStore.init(database, retentionPolicy=some(retentionPolicy)).tryGet()
let messages = @[ let messages = @[
fakeWakuMessage("MSG-01", contentTopic=DefaultContentTopic, ts=getNanosecondTime(epochTime()) + 2), fakeWakuMessage("MSG-01", contentTopic=DefaultContentTopic, ts=getNanosecondTime(epochTime()) + 2),
@ -468,7 +476,8 @@ suite "message store - history query":
let let
database = newTestDatabase() database = newTestDatabase()
store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet() retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=storeCapacity)
store = SqliteStore.init(database, retentionPolicy=some(retentionPolicy)).tryGet()
let messages = @[ let messages = @[
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=timeOrigin + 00), fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=timeOrigin + 00),
@ -519,7 +528,8 @@ suite "message store - history query":
let let
database = newTestDatabase() database = newTestDatabase()
store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet() retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=storeCapacity)
store = SqliteStore.init(database, retentionPolicy=some(retentionPolicy)).tryGet()
let messages = @[ let messages = @[
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=timeOrigin + 00), fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=timeOrigin + 00),
@ -563,7 +573,8 @@ suite "message store - history query":
let let
database = newTestDatabase() database = newTestDatabase()
store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet() retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=storeCapacity)
store = SqliteStore.init(database, retentionPolicy=some(retentionPolicy)).tryGet()
let messages = @[ let messages = @[
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=timeOrigin + 00), fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=timeOrigin + 00),
@ -611,7 +622,8 @@ suite "message store - history query":
let let
database = newTestDatabase() database = newTestDatabase()
store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet() retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=storeCapacity)
store = SqliteStore.init(database, retentionPolicy=some(retentionPolicy)).tryGet()
let messages = @[ let messages = @[
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=timeOrigin + 00), fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=timeOrigin + 00),

View File

@ -11,9 +11,10 @@ import
import import
../../waku/v2/protocol/waku_message, ../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_store, ../../waku/v2/protocol/waku_store,
../../waku/v2/node/storage/sqlite,
../../waku/v2/node/storage/message/message_store, ../../waku/v2/node/storage/message/message_store,
../../waku/v2/node/storage/message/waku_store_queue, ../../waku/v2/node/storage/message/waku_store_queue,
../../waku/v2/node/storage/message/waku_message_store, ../../waku/v2/node/storage/message/sqlite_store,
../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/utils/pagination, ../../waku/v2/utils/pagination,
../../waku/v2/utils/time, ../../waku/v2/utils/time,
@ -53,7 +54,7 @@ proc newTestWakuStore(switch: Switch): WakuStore =
peerManager = PeerManager.new(switch) peerManager = PeerManager.new(switch)
rng = crypto.newRng() rng = crypto.newRng()
database = newTestDatabase() database = newTestDatabase()
store = WakuMessageStore.init(database).tryGet() store = SqliteStore.init(database).tryGet()
proto = WakuStore.init(peerManager, rng, store) proto = WakuStore.init(peerManager, rng, store)
waitFor proto.start() waitFor proto.start()
@ -499,7 +500,7 @@ procSuite "Waku Store - fault tolerant store":
peerManager = PeerManager.new(dialsWitch) peerManager = PeerManager.new(dialsWitch)
rng = crypto.newRng() rng = crypto.newRng()
database = newTestDatabase() database = newTestDatabase()
store = WakuMessageStore.init(database).tryGet() store = SqliteStore.init(database).tryGet()
proto = WakuStore.init(peerManager, rng, store) proto = WakuStore.init(peerManager, rng, store)
let storePeer = peer.get(listenSwitch.peerInfo.toRemotePeerInfo()) let storePeer = peer.get(listenSwitch.peerInfo.toRemotePeerInfo())

View File

@ -16,7 +16,7 @@ import
eth/keys eth/keys
import import
../../waku/v2/node/storage/sqlite, ../../waku/v2/node/storage/sqlite,
../../waku/v2/node/storage/message/waku_message_store, ../../waku/v2/node/storage/message/sqlite_store,
../../waku/v2/node/storage/message/waku_store_queue, ../../waku/v2/node/storage/message/waku_store_queue,
../../waku/v2/protocol/[waku_relay, waku_message], ../../waku/v2/protocol/[waku_relay, waku_message],
../../waku/v2/protocol/waku_store, ../../waku/v2/protocol/waku_store,
@ -176,7 +176,7 @@ procSuite "WakuNode - Store":
# setup sqlite database for node1 # setup sqlite database for node1
let let
database = SqliteDatabase.init("", inMemory = true)[] database = SqliteDatabase.init("", inMemory = true)[]
store = WakuMessageStore.init(database)[] store = SqliteStore.init(database).tryGet()
await node1.start() await node1.start()
await node1.mountStore(persistMessages = true, store = store) await node1.mountStore(persistMessages = true, store = store)

View File

@ -46,7 +46,7 @@ type SqliteStore* = ref object of MessageStore
retentionPolicy: Option[MessageRetentionPolicy] retentionPolicy: Option[MessageRetentionPolicy]
insertStmt: SqliteStmt[InsertMessageParams, void] insertStmt: SqliteStmt[InsertMessageParams, void]
proc init*(T: type SqliteStore, db: SqliteDatabase, retentionPolicy: Option[MessageRetentionPolicy]): MessageStoreResult[T] = proc init*(T: type SqliteStore, db: SqliteDatabase, retentionPolicy=none(MessageRetentionPolicy)): MessageStoreResult[T] =
# Database initialization # Database initialization
let resInit = init(db) let resInit = init(db)

View File

@ -1,25 +0,0 @@
import
std/options,
stew/results
import
../sqlite,
./message_store,
./sqlite_store
export
sqlite,
sqlite_store
{.deprecated: "import sqlite_store".}
type WakuMessageStore* {.deprecated: "use SqliteStore".} = SqliteStore
proc init*(T: type WakuMessageStore, db: SqliteDatabase,
capacity: int = StoreDefaultCapacity,
isSqliteOnly = false,
retentionTime = StoreDefaultRetentionTime): MessageStoreResult[T] {.deprecated: "use SqliteStore.init()".} =
let retentionPolicy = if isSqliteOnly: TimeRetentionPolicy.init(retentionTime)
else: CapacityRetentionPolicy.init(capacity)
SqliteStore.init(db, retentionPolicy=some(retentionPolicy))

View File

@ -822,7 +822,7 @@ when isMainModule:
./wakunode2_setup_metrics, ./wakunode2_setup_metrics,
./wakunode2_setup_rpc, ./wakunode2_setup_rpc,
./wakunode2_setup_sql_migrations, ./wakunode2_setup_sql_migrations,
./storage/message/waku_message_store, ./storage/message/sqlite_store,
./storage/peer/waku_peer_storage ./storage/peer/waku_peer_storage
logScope: logScope:
@ -834,7 +834,7 @@ when isMainModule:
# 1/7 Setup storage # 1/7 Setup storage
proc setupStorage(conf: WakuNodeConf): proc setupStorage(conf: WakuNodeConf):
SetupResult[tuple[pStorage: WakuPeerStorage, mStorage: WakuMessageStore]] = SetupResult[tuple[pStorage: WakuPeerStorage, mStorage: SqliteStore]] =
## Setup a SQLite Database for a wakunode based on a supplied ## Setup a SQLite Database for a wakunode based on a supplied
## configuration file and perform all necessary migration. ## configuration file and perform all necessary migration.
@ -844,7 +844,7 @@ when isMainModule:
var var
sqliteDatabase: SqliteDatabase sqliteDatabase: SqliteDatabase
storeTuple: tuple[pStorage: WakuPeerStorage, mStorage: WakuMessageStore] storeTuple: tuple[pStorage: WakuPeerStorage, mStorage: SqliteStore]
# Setup DB # Setup DB
if conf.dbPath != "": if conf.dbPath != "":
@ -872,10 +872,11 @@ when isMainModule:
if conf.persistMessages: if conf.persistMessages:
# Historical message persistence enable. Set up Message table in storage # Historical message persistence enable. Set up Message table in storage
let res = WakuMessageStore.init(sqliteDatabase, conf.storeCapacity, conf.sqliteStore, conf.sqliteRetentionTime) let retentionPolicy = if conf.sqliteStore: TimeRetentionPolicy.init(conf.sqliteRetentionTime)
else: CapacityRetentionPolicy.init(conf.storeCapacity)
if res.isErr: let res = SqliteStore.init(sqliteDatabase, retentionPolicy=some(retentionPolicy))
warn "failed to init WakuMessageStore", err = res.error if res.isErr():
warn "failed to init SqliteStore", err = res.error
waku_node_errors.inc(labelValues = ["init_store_failure"]) waku_node_errors.inc(labelValues = ["init_store_failure"])
else: else:
storeTuple.mStorage = res.value storeTuple.mStorage = res.value
@ -1012,7 +1013,7 @@ when isMainModule:
# 4/7 Mount and initialize configured protocols # 4/7 Mount and initialize configured protocols
proc setupProtocols(node: WakuNode, proc setupProtocols(node: WakuNode,
conf: WakuNodeConf, conf: WakuNodeConf,
mStorage: WakuMessageStore = nil): SetupResult[bool] = mStorage: SqliteStore = nil): SetupResult[bool] =
## Setup configured protocols on an existing Waku v2 node. ## Setup configured protocols on an existing Waku v2 node.
## Optionally include persistent message storage. ## Optionally include persistent message storage.
@ -1159,7 +1160,7 @@ when isMainModule:
var var
pStorage: WakuPeerStorage pStorage: WakuPeerStorage
mStorage: WakuMessageStore mStorage: SqliteStore
let setupStorageRes = setupStorage(conf) let setupStorageRes = setupStorage(conf)

View File

@ -70,7 +70,7 @@ type
store*: MessageStore # sqlite DB handle store*: MessageStore # sqlite DB handle
wakuSwap*: WakuSwap wakuSwap*: WakuSwap
persistMessages*: bool persistMessages*: bool
#TODO: WakuMessageStore currenly also holds isSqliteOnly; put it in single place. #TODO: SqliteStore currenly also holds isSqliteOnly; put it in single place.
isSqliteOnly: bool # if true, don't use in memory-store and answer history queries from the sqlite DB isSqliteOnly: bool # if true, don't use in memory-store and answer history queries from the sqlite DB