[WIP] Address feedback

This commit is contained in:
Andrea Maria Piana 2023-04-05 14:01:41 +01:00
parent a83876e163
commit 2d287ec290
No known key found for this signature in database
GPG Key ID: AA6CCA6DE0E06424
6 changed files with 125 additions and 134 deletions

View File

@ -47,7 +47,8 @@ import
../../waku/v2/protocol/waku_relay/validators,
../../waku/v2/utils/peers,
./wakunode2_setup_rest,
./wakunode2_setup_rpc
./wakunode2_setup_rpc,
./wakunode2_archive_driver
when defined(rln):
import
@ -392,9 +393,7 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf,
if conf.store:
# Archive setup
let messageValidator: MessageValidator = DefaultMessageValidator()
let mountArchiveRes = mountArchive(node, conf, messageValidator=some(messageValidator), retentionPolicy=archiveRetentionPolicy)
if mountArchiveRes.isErr():
return err("failed to mount archive protocol")
mountArchive(node, archiveDriver, messageValidator=some(messageValidator), retentionPolicy=archiveRetentionPolicy)
# Store setup
try:

View File

@ -0,0 +1,94 @@
import
std/strutils,
chronicles,
stew/results,
../../waku/v2/protocol/waku_archive,
../../waku/v2/config,
../../waku/v2/protocol/waku_archive/driver/sqlite_driver,
../../waku/v2/protocol/waku_archive/driver/postgres_driver,
../../waku/v2/protocol/waku_archive/driver/queue_driver,
../../waku/v2/protocol/waku_archive/driver/sqlite_driver/migrations as archive_driver_sqlite_migrations,
../../waku/common/sqlite
type SetupResult[T] = Result[T, string]
proc performSqliteVacuum(db: SqliteDatabase): SetupResult[void] =
## SQLite database vacuuming
# TODO: Run vacuuming conditionally based on database page stats
# if (pageCount > 0 and freelistCount > 0):
debug "starting sqlite database vacuuming"
let resVacuum = db.vacuum()
if resVacuum.isErr():
return err("failed to execute vacuum: " & resVacuum.error)
debug "finished sqlite database vacuuming"
proc gatherSqlitePageStats(db: SqliteDatabase): SetupResult[(int64, int64, int64)] =
let
pageSize = ?db.getPageSize()
pageCount = ?db.getPageCount()
freelistCount = ?db.getFreelistCount()
ok((pageSize, pageCount, freelistCount))
proc setupSqliteDriver(conf: WakuNodeConf, path: string): SetupResult[ArchiveDriver] =
let res = SqliteDatabase.new(path)
if res.isErr():
return err("could not create sqlite database")
let database = res.get()
# TODO: Run this only if the database engine is SQLite
let (pageSize, pageCount, freelistCount) = ?gatherSqlitePageStats(database)
debug "sqlite database page stats", pageSize=pageSize, pages=pageCount, freePages=freelistCount
if conf.storeMessageDbVacuum and (pageCount > 0 and freelistCount > 0):
?performSqliteVacuum(database)
# Database migration
if conf.storeMessageDbMigration:
?archive_driver_sqlite_migrations.migrate(database)
debug "setting up sqlite waku archive driver"
let sqliteDriverRes = SqliteDriver.new(database)
if sqliteDriverRes.isErr():
return err("failed to init sqlite archive driver: " & res.error)
ok(sqliteDriverRes.value)
proc setupPostgresDriver(conf: WakuNodeConf): SetupResult[ArchiveDriver] =
let res = PostgresDriver.new(conf)
if res.isErr():
return err("could not create postgres driver")
ok(res.value)
proc setupWakuArchiveDriver*(conf: WakuNodeConf): SetupResult[ArchiveDriver] =
if conf.storeMessageDbUrl == "" or conf.storeMessageDbUrl == "none":
let driver = QueueDriver.new() # Defaults to a capacity of 25.000 messages
return ok(driver)
let dbUrlParts = conf.storeMessageDbUrl.split("://", 1)
let
engine = dbUrlParts[0]
path = dbUrlParts[1]
let connRes = case engine
of "sqlite":
setupSqliteDriver(conf, path)
of "postgres":
setupPostgresDriver(conf)
else:
return err("unknown database engine")
if connRes.isErr():
return err("failed to init connection" & connRes.error)
ok(connRes.get())

View File

@ -15,7 +15,6 @@ import
import
../../../waku/common/sqlite,
../../../waku/v2/node/peer_manager,
../../../waku/v2/config,
../../../waku/v2/protocol/waku_message,
../../../waku/v2/protocol/waku_archive,
../../../waku/v2/protocol/waku_archive/driver/sqlite_driver,
@ -27,10 +26,9 @@ import
../testlib/waku2
proc defaultConf : WakuNodeConf =
return WakuNodeConf(
storeMessageDbUrl: "sqlite://:memory:",
listenAddress: ValidIpAddress.init("127.0.0.1"), rpcAddress: ValidIpAddress.init("127.0.0.1"), restAddress: ValidIpAddress.init("127.0.0.1"), metricsServerAddress: ValidIpAddress.init("127.0.0.1"))
proc newTestArchiveDriver(): ArchiveDriver =
let database = SqliteDatabase.new(":memory:").tryGet()
SqliteDriver.new(database).tryGet()
proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): HistoryCursor =
HistoryCursor(
@ -56,10 +54,14 @@ procSuite "WakuNode - Store":
fakeWakuMessage(@[byte 09], ts=ts(90, timeOrigin))
]
proc insertFixtures(node: WakuNode) : void =
for msg in msgListA:
let msg_digest = waku_archive.computeDigest(msg)
require node.wakuArchive.driver.put(DefaultPubsubTopic, msg, msg_digest, msg.timestamp).isOk()
let archiveA = block:
let driver = newTestArchiveDriver()
for msg in msgListA:
let msg_digest = waku_archive.computeDigest(msg)
require driver.put(DefaultPubsubTopic, msg, msg_digest, msg.timestamp).isOk()
driver
asyncTest "Store protocol returns expected messages":
## Setup
@ -71,10 +73,7 @@ procSuite "WakuNode - Store":
await allFutures(client.start(), server.start())
let mountArchiveRes = server.mountArchive(defaultConf(), none(MessageValidator), none(RetentionPolicy))
require mountArchiveRes.isOk()
insertFixtures(server)
server.mountArchive(some(archiveA), none(MessageValidator), none(RetentionPolicy))
await server.mountStore()
client.mountStoreClient()
@ -106,9 +105,7 @@ procSuite "WakuNode - Store":
await allFutures(client.start(), server.start())
let mountArchiveRes = server.mountArchive(defaultConf(), none(MessageValidator), none(RetentionPolicy))
require mountArchiveRes.isOk()
insertFixtures(server)
server.mountArchive(some(archiveA), none(MessageValidator), none(RetentionPolicy))
await server.mountStore()
client.mountStoreClient()
@ -157,10 +154,7 @@ procSuite "WakuNode - Store":
await allFutures(client.start(), server.start())
let mountArchiveRes = server.mountArchive(defaultConf(), none(MessageValidator), none(RetentionPolicy))
require mountArchiveRes.isOk()
insertFixtures(server)
server.mountArchive(some(archiveA), none(MessageValidator), none(RetentionPolicy))
await server.mountStore()
client.mountStoreClient()
@ -213,12 +207,8 @@ procSuite "WakuNode - Store":
await allFutures(client.start(), server.start(), filterSource.start())
await filterSource.mountFilter()
let mountArchiveRes = server.mountArchive(defaultConf(), none(MessageValidator), none(RetentionPolicy))
require mountArchiveRes.isOk()
insertFixtures(server)
let driver = newTestArchiveDriver()
server.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy))
await server.mountStore()
await server.mountFilterClient()
client.mountStoreClient()

View File

@ -10,23 +10,19 @@ import
libp2p/crypto/crypto,
json_rpc/[rpcserver, rpcclient]
import
../../../waku/v2/config,
../../../waku/v2/node/peer_manager,
../../../waku/v2/node/waku_node,
../../../waku/v2/node/jsonrpc/admin/handlers as admin_api,
../../../waku/v2/node/jsonrpc/admin/client as admin_api_client,
../../../waku/v2/protocol/waku_relay,
../../../waku/v2/protocol/waku_archive,
../../../waku/v2/protocol/waku_archive/driver/queue_driver,
../../../waku/v2/protocol/waku_store,
../../../waku/v2/protocol/waku_filter,
../../../waku/v2/utils/peers,
../testlib/waku2
proc defaultConf : WakuNodeConf =
return WakuNodeConf(
listenAddress: ValidIpAddress.init("127.0.0.1"), rpcAddress: ValidIpAddress.init("127.0.0.1"), restAddress: ValidIpAddress.init("127.0.0.1"), metricsServerAddress: ValidIpAddress.init("127.0.0.1"))
procSuite "Waku v2 JSON-RPC API - Admin":
let
bindIp = ValidIpAddress.init("0.0.0.0")
@ -158,11 +154,8 @@ procSuite "Waku v2 JSON-RPC API - Admin":
await node.mountFilter()
await node.mountFilterClient()
await node.mountSwap()
let mountArchiveRes = node.mountArchive(defaultConf(), none(MessageValidator), none(RetentionPolicy))
require mountArchiveRes.isOk()
let driver: ArchiveDriver = QueueDriver.new()
node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy))
await node.mountStore()
node.mountStoreClient()

View File

@ -1,4 +1,5 @@
{.used.}
import
std/[options, times],
stew/shims/net as stewNet,
@ -12,7 +13,6 @@ import
../../../waku/v2/node/waku_node,
../../../waku/v2/node/jsonrpc/store/handlers as store_api,
../../../waku/v2/node/jsonrpc/store/client as store_api_client,
../../../waku/v2/config,
../../../waku/v2/protocol/waku_message,
../../../waku/v2/protocol/waku_archive,
../../../waku/v2/protocol/waku_archive/driver/queue_driver,
@ -23,10 +23,6 @@ import
../../v2/testlib/common,
../../v2/testlib/waku2
proc defaultConf : WakuNodeConf =
return WakuNodeConf(
storeMessageDbUrl: "sqlite://:memory:",
listenAddress: ValidIpAddress.init("127.0.0.1"), rpcAddress: ValidIpAddress.init("127.0.0.1"), restAddress: ValidIpAddress.init("127.0.0.1"), metricsServerAddress: ValidIpAddress.init("127.0.0.1"))
proc put(store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage): Result[void, string] =
let
@ -63,9 +59,8 @@ procSuite "Waku v2 JSON-RPC API - Store":
key = generateEcdsaKey()
peer = PeerInfo.new(key)
let mountArchiveRes = node.mountArchive(defaultConf(), none(MessageValidator), none(RetentionPolicy))
require mountArchiveRes.isOk()
let driver: ArchiveDriver = QueueDriver.new()
node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy))
await node.mountStore()
node.mountStoreClient()
@ -92,7 +87,7 @@ procSuite "Waku v2 JSON-RPC API - Store":
]
for msg in msgList:
require node.wakuArchive.driver.put(DefaultPubsubTopic, msg).isOk()
require driver.put(DefaultPubsubTopic, msg).isOk()
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort, false)

View File

@ -657,100 +657,20 @@ proc unsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: Conte
## Waku archive
type SetupResult[T] = Result[T, string]
proc performSqliteVacuum(db: SqliteDatabase): SetupResult[void] =
## SQLite database vacuuming
# TODO: Run vacuuming conditionally based on database page stats
# if (pageCount > 0 and freelistCount > 0):
debug "starting sqlite database vacuuming"
let resVacuum = db.vacuum()
if resVacuum.isErr():
return err("failed to execute vacuum: " & resVacuum.error)
debug "finished sqlite database vacuuming"
proc gatherSqlitePageStats(db: SqliteDatabase): SetupResult[(int64, int64, int64)] =
let
pageSize = ?db.getPageSize()
pageCount = ?db.getPageCount()
freelistCount = ?db.getFreelistCount()
ok((pageSize, pageCount, freelistCount))
proc setupSqliteDriver(conf: WakuNodeConf, path: string): SetupResult[ArchiveDriver] =
let res = SqliteDatabase.new(path)
if res.isErr():
return err("could not create sqlite database")
let database = res.get()
# TODO: Run this only if the database engine is SQLite
let (pageSize, pageCount, freelistCount) = ?gatherSqlitePageStats(database)
debug "sqlite database page stats", pageSize=pageSize, pages=pageCount, freePages=freelistCount
if conf.storeMessageDbVacuum and (pageCount > 0 and freelistCount > 0):
?performSqliteVacuum(database)
# Database migration
if conf.storeMessageDbMigration:
?archive_driver_sqlite_migrations.migrate(database)
debug "setting up sqlite waku archive driver"
let sqliteDriverRes = SqliteDriver.new(database)
if sqliteDriverRes.isErr():
return err("failed to init sqlite archive driver: " & res.error)
ok(sqliteDriverRes.value)
proc setupPostgresDriver(conf: WakuNodeConf): SetupResult[ArchiveDriver] =
let res = PostgresDriver.new(conf)
if res.isErr():
return err("could not create postgres driver")
ok(res.value)
proc setupWakuArchiveDriver*(conf: WakuNodeConf): SetupResult[ArchiveDriver] =
if conf.storeMessageDbUrl == "" or conf.storeMessageDbUrl == "none":
let driver = QueueDriver.new() # Defaults to a capacity of 25.000 messages
return ok(driver)
let dbUrlParts = conf.storeMessageDbUrl.split("://", 1)
let
engine = dbUrlParts[0]
path = dbUrlParts[1]
let connRes = case engine
of "sqlite":
setupSqliteDriver(conf, path)
of "postgres":
setupPostgresDriver(conf)
else:
return err("unknown database engine")
if connRes.isErr():
return err("failed to init connection" & connRes.error)
ok(connRes.get())
type MountArchiveResult[T] = Result[T, string]
proc mountArchive*(node: WakuNode,
conf: WakuNodeConf,
driver: Option[ArchiveDriver],
messageValidator: Option[MessageValidator],
retentionPolicy: Option[RetentionPolicy]): MountArchiveResult[void] =
retentionPolicy: Option[RetentionPolicy]) =
if driver.isNone():
error "failed to mount waku archive protocol", error="archive driver not set"
return
let driver = setupWakuArchiveDriver(conf)
if driver.isErr():
return err("failed to setup archive driver")
node.wakuArchive = WakuArchive.new(driver.get(), messageValidator, retentionPolicy)
ok()
# TODO: Review this periodic task. Maybe, move it to the appplication code
const WakuArchiveDefaultRetentionPolicyInterval* = 30.minutes