From 1568fa6e6ff82981979904f5301dc8ab2a52ea03 Mon Sep 17 00:00:00 2001 From: Dean Eigenmann <7621705+decanus@users.noreply.github.com> Date: Mon, 16 Nov 2020 09:38:52 +0100 Subject: [PATCH] feature/persistence-simple (#268) * implements * add * fix * eol * rebase * added tests * minor cleanup * errors * Update store.md * fix --- .gitmodules | 5 + docs/tutorial/store.md | 3 + tests/all_tests_v2.nim | 3 +- tests/v2/test_message_store.nim | 34 +++++ tests/v2/test_waku_store.nim | 70 +++++++++- vendor/nim-sqlite3-abi | 1 + waku/node/v2/config.nim | 5 + waku/node/v2/message_store.nim | 222 ++++++++++++++++++++++++++++++++ waku/node/v2/waku_types.nim | 14 +- waku/node/v2/wakunode2.nim | 17 ++- waku/protocol/v2/waku_store.nim | 24 +++- 11 files changed, 384 insertions(+), 14 deletions(-) create mode 100644 tests/v2/test_message_store.nim create mode 160000 vendor/nim-sqlite3-abi create mode 100644 waku/node/v2/message_store.nim diff --git a/.gitmodules b/.gitmodules index 16ff5ddfb..52acd0f18 100644 --- a/.gitmodules +++ b/.gitmodules @@ -98,3 +98,8 @@ url = https://github.com/status-im/nim-bearssl.git ignore = dirty branch = master +[submodule "vendor/nim-sqlite3-abi"] + path = vendor/nim-sqlite3-abi + url = https://github.com/arnetheduck/nim-sqlite3-abi.git + ignore = dirty + branch = master diff --git a/docs/tutorial/store.md b/docs/tutorial/store.md index 0f17852c0..5951103b2 100644 --- a/docs/tutorial/store.md +++ b/docs/tutorial/store.md @@ -20,6 +20,9 @@ Run two nodes and connect them: ./build/wakunode2 --ports-shift:1 --staticnode:/ip4/0.0.0.0/tcp/60000/p2p/16Uiu2HAmF4tuht6fmna6uDqoSMgFqhUrdaVR6VQRyGr6sCpfS2jp --storenode:/ip4/0.0.0.0/tcp/60000/p2p/16Uiu2HAmF4tuht6fmna6uDqoSMgFqhUrdaVR6VQRyGr6sCpfS2jp ``` +When passing the flag `dbpath` with a path, messages are persisted and stored in a database called `store` under the specified path. +When none is passed, messages are not persisted and are only stored in-memory. + You should see your nodes connecting. Do basic RPC calls: diff --git a/tests/all_tests_v2.nim b/tests/all_tests_v2.nim index 5106b93eb..d306e8838 100644 --- a/tests/all_tests_v2.nim +++ b/tests/all_tests_v2.nim @@ -8,4 +8,5 @@ import ./v2/test_waku_pagination, ./v2/test_waku_payload, ./v2/test_rpc_waku, - ./v2/test_waku_swap + ./v2/test_waku_swap, + ./v2/test_message_store diff --git a/tests/v2/test_message_store.nim b/tests/v2/test_message_store.nim new file mode 100644 index 000000000..95dc1bf0e --- /dev/null +++ b/tests/v2/test_message_store.nim @@ -0,0 +1,34 @@ + +import + std/[unittest, options, tables, sets], + chronos, chronicles, + ../../waku/node/v2/[waku_types, message_store], + ../test_helpers, ./utils + +suite "Message Store": + test "set and get works": + let + store = MessageStore.init("", inMemory = true)[] + topic = ContentTopic(1) + + var msgs = @[ + WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic), + WakuMessage(payload: @[byte 1, 2, 3, 4], contentTopic: topic), + WakuMessage(payload: @[byte 1, 2, 3, 4, 5], contentTopic: topic), + ] + + defer: store.close() + + for msg in msgs: + discard store.put(computeIndex(msg), msg) + + var responseCount = 0 + proc data(timestamp: uint64, msg: WakuMessage) = + responseCount += 1 + check msg in msgs + + let res = store.getAll(data) + + check: + res.isErr == false + responseCount == 3 diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index be535d890..1aa2c0412 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -12,7 +12,7 @@ import libp2p/transports/transport, libp2p/transports/tcptransport, ../../waku/protocol/v2/[waku_store, message_notifier], - ../../waku/node/v2/waku_types, + ../../waku/node/v2/[waku_types, message_store], ../test_helpers, ./utils @@ -58,7 +58,73 @@ procSuite "Waku Store": check: (await completionFut.withTimeout(5.seconds)) == true - + + asyncTest "handle query with store and restarts": + let + key = PrivateKey.random(ECDSA, rng[]).get() + peer = PeerInfo.init(key) + topic = ContentTopic(1) + store = MessageStore.init("/foo", inMemory = true)[] + msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic) + msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic(2)) + + var dialSwitch = newStandardSwitch() + discard await dialSwitch.start() + + var listenSwitch = newStandardSwitch(some(key)) + discard await listenSwitch.start() + + let + proto = WakuStore.init(dialSwitch, crypto.newRng(), store) + subscription = proto.subscription() + rpc = HistoryQuery(topics: @[topic]) + + proto.setPeer(listenSwitch.peerInfo) + + var subscriptions = newTable[string, MessageNotificationSubscription]() + subscriptions["test"] = subscription + + listenSwitch.mount(proto) + + await subscriptions.notify("foo", msg) + await subscriptions.notify("foo", msg2) + + var completionFut = newFuture[bool]() + + proc handler(response: HistoryResponse) {.gcsafe, closure.} = + check: + response.messages.len() == 1 + response.messages[0] == msg + completionFut.complete(true) + + await proto.query(rpc, handler) + + check: + (await completionFut.withTimeout(5.seconds)) == true + + let + proto2 = WakuStore.init(dialSwitch, crypto.newRng(), store) + key2 = PrivateKey.random(ECDSA, rng[]).get() + + var listenSwitch2 = newStandardSwitch(some(key2)) + discard await listenSwitch2.start() + + proto2.setPeer(listenSwitch2.peerInfo) + + listenSwitch2.mount(proto2) + + var completionFut2 = newFuture[bool]() + proc handler2(response: HistoryResponse) {.gcsafe, closure.} = + check: + response.messages.len() == 1 + response.messages[0] == msg + completionFut2.complete(true) + + await proto2.query(rpc, handler2) + + check: + (await completionFut2.withTimeout(5.seconds)) == true + asyncTest "handle query with forward pagination": let key = PrivateKey.random(ECDSA, rng[]).get() diff --git a/vendor/nim-sqlite3-abi b/vendor/nim-sqlite3-abi new file mode 160000 index 000000000..068ff3593 --- /dev/null +++ b/vendor/nim-sqlite3-abi @@ -0,0 +1 @@ +Subproject commit 068ff3593c1582bf3d96b75dcf40fa72e3739b29 diff --git a/waku/node/v2/config.nim b/waku/node/v2/config.nim index 38a6e5e6f..06b8558de 100644 --- a/waku/node/v2/config.nim +++ b/waku/node/v2/config.nim @@ -67,6 +67,11 @@ type desc: "Enode URL to filter.", defaultValue: "" name: "filternode" }: string + + dbpath* {. + desc: "The database path for the store protocol.", + defaultValue: "" + name: "dbpath" }: string topics* {. desc: "Default topics to subscribe to (space separated list)." diff --git a/waku/node/v2/message_store.nim b/waku/node/v2/message_store.nim new file mode 100644 index 000000000..6ad646b5f --- /dev/null +++ b/waku/node/v2/message_store.nim @@ -0,0 +1,222 @@ +import + os, + sqlite3_abi, + waku_types, + chronos, chronicles, metrics, stew/results, + libp2p/crypto/crypto, + libp2p/protocols/protocol, + libp2p/protobuf/minprotobuf, + libp2p/stream/connection, + stew/results, metrics + +{.push raises: [Defect].} + +# The code in this file is an adaptation of the Sqlite KV Store found in nim-eth. +# https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim +# +# Most of it is a direct copy, the only unique functions being `get` and `put`. + +type + RawStmtPtr = ptr sqlite3_stmt + + AutoDisposed[T: ptr|ref] = object + val: T + + DataProc* = proc(timestamp: uint64, msg: WakuMessage) {.closure.} + +template dispose(db: Sqlite) = + discard sqlite3_close(db) + +template dispose(db: RawStmtPtr) = + discard sqlite3_finalize(db) + +proc release[T](x: var AutoDisposed[T]): T = + result = x.val + x.val = nil + +proc disposeIfUnreleased[T](x: var AutoDisposed[T]) = + mixin dispose + if x.val != nil: + dispose(x.release) + +template checkErr(op, cleanup: untyped) = + if (let v = (op); v != SQLITE_OK): + cleanup + return err($sqlite3_errstr(v)) + +template checkErr(op) = + checkErr(op): discard + +proc init*( + T: type MessageStore, + basePath: string, + name: string = "store", + readOnly = false, + inMemory = false): MessageStoreResult[T] = + var env: AutoDisposed[ptr sqlite3] + defer: disposeIfUnreleased(env) + + let + name = + if inMemory: ":memory:" + else: basepath / name & ".sqlite3" + flags = + if readOnly: SQLITE_OPEN_READONLY + else: SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE + + if not inMemory: + try: + createDir(basePath) + except OSError, IOError: + return err("`sqlite: cannot create database directory") + + checkErr sqlite3_open_v2(name, addr env.val, flags.cint, nil) + + template prepare(q: string, cleanup: untyped): ptr sqlite3_stmt = + var s: ptr sqlite3_stmt + checkErr sqlite3_prepare_v2(env.val, q, q.len.cint, addr s, nil): + cleanup + s + + template checkExec(s: ptr sqlite3_stmt) = + if (let x = sqlite3_step(s); x != SQLITE_DONE): + discard sqlite3_finalize(s) + return err($sqlite3_errstr(x)) + + if (let x = sqlite3_finalize(s); x != SQLITE_OK): + return err($sqlite3_errstr(x)) + + template checkExec(q: string) = + let s = prepare(q): discard + checkExec(s) + + template checkWalPragmaResult(journalModePragma: ptr sqlite3_stmt) = + if (let x = sqlite3_step(journalModePragma); x != SQLITE_ROW): + discard sqlite3_finalize(journalModePragma) + return err($sqlite3_errstr(x)) + + if (let x = sqlite3_column_type(journalModePragma, 0); x != SQLITE3_TEXT): + discard sqlite3_finalize(journalModePragma) + return err($sqlite3_errstr(x)) + + if (let x = sqlite3_column_text(journalModePragma, 0); + x != "memory" and x != "wal"): + discard sqlite3_finalize(journalModePragma) + return err("Invalid pragma result: " & $x) + + # TODO: check current version and implement schema versioning + checkExec "PRAGMA user_version = 1;" + + let journalModePragma = prepare("PRAGMA journal_mode = WAL;"): discard + checkWalPragmaResult(journalModePragma) + checkExec(journalModePragma) + + ## Table is the SQL query for creating the messages Table. + ## It contains: + ## - 4-Byte ContentTopic stored as an Integer + ## - Payload stored as a blob + checkExec """ + CREATE TABLE IF NOT EXISTS messages ( + id BLOB PRIMARY KEY, + timestamp INTEGER NOT NULL, + contentTopic INTEGER NOT NULL, + payload BLOB + ) WITHOUT ROWID; + """ + + ok(MessageStore( + env: env.release + )) + +template prepare(env: Sqlite, q: string, cleanup: untyped): ptr sqlite3_stmt = + var s: ptr sqlite3_stmt + checkErr sqlite3_prepare_v2(env, q, q.len.cint, addr s, nil): + cleanup + s + +proc bindParam(s: RawStmtPtr, n: int, val: auto): cint = + when val is openarray[byte]|seq[byte]: + if val.len > 0: + sqlite3_bind_blob(s, n.cint, unsafeAddr val[0], val.len.cint, nil) + else: + sqlite3_bind_blob(s, n.cint, nil, 0.cint, nil) + elif val is int32: + sqlite3_bind_int(s, n.cint, val) + elif val is uint32: + sqlite3_bind_int(s, int(n).cint, int(val).cint) + elif val is int64: + sqlite3_bind_int64(s, n.cint, val) + else: + {.fatal: "Please add support for the 'kek' type".} + +proc put*(db: MessageStore, cursor: Index, message: WakuMessage): MessageStoreResult[void] = + ## Adds a message to the storage. + ## + ## **Example:** + ## + ## .. code-block:: + ## let res = db.put(message) + ## if res.isErr: + ## echo "error" + let s = prepare(db.env, "INSERT INTO messages (id, timestamp, contentTopic, payload) VALUES (?, ?, ?, ?);"): discard + checkErr bindParam(s, 1, @(cursor.digest.data)) + checkErr bindParam(s, 2, int64(cursor.receivedTime)) + checkErr bindParam(s, 3, message.contentTopic) + checkErr bindParam(s, 4, message.payload) + + let res = + if (let v = sqlite3_step(s); v != SQLITE_DONE): + err($sqlite3_errstr(v)) + else: + ok() + + # release implict transaction + discard sqlite3_reset(s) # same return information as step + discard sqlite3_clear_bindings(s) # no errors possible + + res + +proc close*(db: MessageStore) = + discard sqlite3_close(db.env) + + db[] = MessageStore()[] + +proc getAll*(db: MessageStore, onData: DataProc): MessageStoreResult[bool] = + ## Retreives all messages from the storage. + ## + ## **Example:** + ## + ## .. code-block:: + ## proc data(timestamp: uint64, msg: WakuMessage) = + ## echo cast[string](msg.payload) + ## + ## let res = db.get(data) + ## if res.isErr: + ## echo "error" + + let query = "SELECT timestamp, contentTopic, payload FROM messages" + var s = prepare(db.env, query): discard + + try: + var gotResults = false + while true: + let v = sqlite3_step(s) + case v + of SQLITE_ROW: + let + timestamp = sqlite3_column_int64(s, 0) + topic = sqlite3_column_int(s, 1) + p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 2)) + l = sqlite3_column_bytes(s, 2) + + onData(uint64(timestamp), WakuMessage(contentTopic: ContentTopic(int(topic)), payload: @(toOpenArray(p, 0, l-1)))) + gotResults = true + of SQLITE_DONE: + break + else: + return err($sqlite3_errstr(v)) + return ok gotResults + finally: + # release implicit transaction + discard sqlite3_reset(s) # same return information as step + discard sqlite3_clear_bindings(s) # no errors possible diff --git a/waku/node/v2/waku_types.nim b/waku/node/v2/waku_types.nim index d1604d805..d52820306 100644 --- a/waku/node/v2/waku_types.nim +++ b/waku/node/v2/waku_types.nim @@ -11,7 +11,8 @@ import libp2p/switch, libp2p/stream/connection, libp2p/protocols/pubsub/[pubsub, gossipsub], - nimcrypto/sha2 + nimcrypto/sha2, + sqlite3_abi # Constants required for pagination ------------------------------------------- const MaxPageSize* = 100 # Maximum number of waku messages in each page @@ -77,12 +78,19 @@ type HistoryPeer* = object peerInfo*: PeerInfo + MessageStoreResult*[T] = Result[T, string] + + Sqlite* = ptr sqlite3 + + MessageStore* = ref object of RootObj + env*: Sqlite + WakuStore* = ref object of LPProtocol switch*: Switch rng*: ref BrHmacDrbgContext peers*: seq[HistoryPeer] messages*: seq[IndexedWakuMessage] - + store*: MessageStore FilterRequest* = object contentFilters*: seq[ContentFilter] @@ -212,6 +220,6 @@ proc computeIndex*(msg: WakuMessage): Index = ctx.update(msg.payload) let digest = ctx.finish() # computes the hash ctx.clear() - result.digest = digest result.receivedTime = epochTime() # gets the unix timestamp + diff --git a/waku/node/v2/wakunode2.nim b/waku/node/v2/wakunode2.nim index 579f36800..5dc6542a1 100644 --- a/waku/node/v2/wakunode2.nim +++ b/waku/node/v2/wakunode2.nim @@ -11,7 +11,7 @@ import libp2p/peerinfo, libp2p/standard_setup, ../../protocol/v2/[waku_relay, waku_store, waku_filter, message_notifier], - ./waku_types + ./waku_types, ./message_store export waku_types @@ -243,9 +243,9 @@ proc mountFilter*(node: WakuNode) = node.switch.mount(node.wakuFilter) node.subscriptions.subscribe(WakuFilterCodec, node.wakuFilter.subscription()) -proc mountStore*(node: WakuNode) = +proc mountStore*(node: WakuNode, store: MessageStore = nil) = info "mounting store" - node.wakuStore = WakuStore.init(node.switch, node.rng) + node.wakuStore = WakuStore.init(node.switch, node.rng, store) node.switch.mount(node.wakuStore) node.subscriptions.subscribe(WakuStoreCodec, node.wakuStore.subscription()) @@ -379,7 +379,16 @@ when isMainModule: waitFor node.start() if conf.store: - mountStore(node) + var store: MessageStore + + if conf.dbpath != "": + let res = MessageStore.init(conf.dbpath) + if res.isErr: + warn "failed to init MessageStore", err = res.error + else: + store = res.value + + mountStore(node, store) if conf.filter: mountFilter(node) diff --git a/waku/protocol/v2/waku_store.nim b/waku/protocol/v2/waku_store.nim index e779a7b07..c7d4ef315 100644 --- a/waku/protocol/v2/waku_store.nim +++ b/waku/protocol/v2/waku_store.nim @@ -8,7 +8,7 @@ import libp2p/protobuf/minprotobuf, libp2p/stream/connection, ./message_notifier, - ./../../node/v2/waku_types + ./../../node/v2/[waku_types, message_store] logScope: topics = "wakustore" @@ -124,7 +124,7 @@ proc init*(T: type HistoryResponse, buffer: seq[byte]): ProtoResult[T] = discard ? pb.getRepeatedField(1, messages) for buf in messages: - msg.messages.add( ? WakuMessage.init(buf)) + msg.messages.add(? WakuMessage.init(buf)) var pagingInfoBuffer: seq[byte] discard ? pb.getField(2,pagingInfoBuffer) @@ -292,10 +292,21 @@ method init*(ws: WakuStore) = ws.handler = handle ws.codec = WakuStoreCodec -proc init*(T: type WakuStore, switch: Switch, rng: ref BrHmacDrbgContext): T = + if ws.store.isNil: + return + + proc onData(timestamp: uint64, msg: WakuMessage) = + ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex())) + + let res = ws.store.getAll(onData) + if res.isErr: + warn "failed to load messages from store", err = res.error + +proc init*(T: type WakuStore, switch: Switch, rng: ref BrHmacDrbgContext, store: MessageStore = nil): T = new result result.rng = rng result.switch = switch + result.store = store result.init() # @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY @@ -310,7 +321,12 @@ proc subscription*(proto: WakuStore): MessageNotificationSubscription = proc handle(topic: string, msg: WakuMessage) {.async.} = let index = msg.computeIndex() proto.messages.add(IndexedWakuMessage(msg: msg, index: index)) - + if proto.store.isNil: + return + + let res = proto.store.put(index, msg) + if res.isErr: + warn "failed to store messages", err = res.error MessageNotificationSubscription.init(@[], handle)