feature/persistence-simple (#268)

* implements

* add

* fix

* eol

* rebase

* added tests

* minor cleanup

* errors

* Update store.md

* fix
This commit is contained in:
Dean Eigenmann 2020-11-16 09:38:52 +01:00 committed by GitHub
parent a1bb2c4954
commit 1568fa6e6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 384 additions and 14 deletions

5
.gitmodules vendored
View File

@ -98,3 +98,8 @@
url = https://github.com/status-im/nim-bearssl.git
ignore = dirty
branch = master
[submodule "vendor/nim-sqlite3-abi"]
path = vendor/nim-sqlite3-abi
url = https://github.com/arnetheduck/nim-sqlite3-abi.git
ignore = dirty
branch = master

View File

@ -20,6 +20,9 @@ Run two nodes and connect them:
./build/wakunode2 --ports-shift:1 --staticnode:/ip4/0.0.0.0/tcp/60000/p2p/16Uiu2HAmF4tuht6fmna6uDqoSMgFqhUrdaVR6VQRyGr6sCpfS2jp --storenode:/ip4/0.0.0.0/tcp/60000/p2p/16Uiu2HAmF4tuht6fmna6uDqoSMgFqhUrdaVR6VQRyGr6sCpfS2jp
```
When passing the flag `dbpath` with a path, messages are persisted and stored in a database called `store` under the specified path.
When none is passed, messages are not persisted and are only stored in-memory.
You should see your nodes connecting.
Do basic RPC calls:

View File

@ -8,4 +8,5 @@ import
./v2/test_waku_pagination,
./v2/test_waku_payload,
./v2/test_rpc_waku,
./v2/test_waku_swap
./v2/test_waku_swap,
./v2/test_message_store

View File

@ -0,0 +1,34 @@
import
std/[unittest, options, tables, sets],
chronos, chronicles,
../../waku/node/v2/[waku_types, message_store],
../test_helpers, ./utils
suite "Message Store":
test "set and get works":
let
store = MessageStore.init("", inMemory = true)[]
topic = ContentTopic(1)
var msgs = @[
WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic),
WakuMessage(payload: @[byte 1, 2, 3, 4], contentTopic: topic),
WakuMessage(payload: @[byte 1, 2, 3, 4, 5], contentTopic: topic),
]
defer: store.close()
for msg in msgs:
discard store.put(computeIndex(msg), msg)
var responseCount = 0
proc data(timestamp: uint64, msg: WakuMessage) =
responseCount += 1
check msg in msgs
let res = store.getAll(data)
check:
res.isErr == false
responseCount == 3

View File

@ -12,7 +12,7 @@ import
libp2p/transports/transport,
libp2p/transports/tcptransport,
../../waku/protocol/v2/[waku_store, message_notifier],
../../waku/node/v2/waku_types,
../../waku/node/v2/[waku_types, message_store],
../test_helpers, ./utils
@ -58,7 +58,73 @@ procSuite "Waku Store":
check:
(await completionFut.withTimeout(5.seconds)) == true
asyncTest "handle query with store and restarts":
let
key = PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.init(key)
topic = ContentTopic(1)
store = MessageStore.init("/foo", inMemory = true)[]
msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic)
msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic(2))
var dialSwitch = newStandardSwitch()
discard await dialSwitch.start()
var listenSwitch = newStandardSwitch(some(key))
discard await listenSwitch.start()
let
proto = WakuStore.init(dialSwitch, crypto.newRng(), store)
subscription = proto.subscription()
rpc = HistoryQuery(topics: @[topic])
proto.setPeer(listenSwitch.peerInfo)
var subscriptions = newTable[string, MessageNotificationSubscription]()
subscriptions["test"] = subscription
listenSwitch.mount(proto)
await subscriptions.notify("foo", msg)
await subscriptions.notify("foo", msg2)
var completionFut = newFuture[bool]()
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
check:
response.messages.len() == 1
response.messages[0] == msg
completionFut.complete(true)
await proto.query(rpc, handler)
check:
(await completionFut.withTimeout(5.seconds)) == true
let
proto2 = WakuStore.init(dialSwitch, crypto.newRng(), store)
key2 = PrivateKey.random(ECDSA, rng[]).get()
var listenSwitch2 = newStandardSwitch(some(key2))
discard await listenSwitch2.start()
proto2.setPeer(listenSwitch2.peerInfo)
listenSwitch2.mount(proto2)
var completionFut2 = newFuture[bool]()
proc handler2(response: HistoryResponse) {.gcsafe, closure.} =
check:
response.messages.len() == 1
response.messages[0] == msg
completionFut2.complete(true)
await proto2.query(rpc, handler2)
check:
(await completionFut2.withTimeout(5.seconds)) == true
asyncTest "handle query with forward pagination":
let
key = PrivateKey.random(ECDSA, rng[]).get()

1
vendor/nim-sqlite3-abi vendored Submodule

@ -0,0 +1 @@
Subproject commit 068ff3593c1582bf3d96b75dcf40fa72e3739b29

View File

@ -67,6 +67,11 @@ type
desc: "Enode URL to filter.",
defaultValue: ""
name: "filternode" }: string
dbpath* {.
desc: "The database path for the store protocol.",
defaultValue: ""
name: "dbpath" }: string
topics* {.
desc: "Default topics to subscribe to (space separated list)."

View File

@ -0,0 +1,222 @@
import
os,
sqlite3_abi,
waku_types,
chronos, chronicles, metrics, stew/results,
libp2p/crypto/crypto,
libp2p/protocols/protocol,
libp2p/protobuf/minprotobuf,
libp2p/stream/connection,
stew/results, metrics
{.push raises: [Defect].}
# The code in this file is an adaptation of the Sqlite KV Store found in nim-eth.
# https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim
#
# Most of it is a direct copy, the only unique functions being `get` and `put`.
type
RawStmtPtr = ptr sqlite3_stmt
AutoDisposed[T: ptr|ref] = object
val: T
DataProc* = proc(timestamp: uint64, msg: WakuMessage) {.closure.}
template dispose(db: Sqlite) =
discard sqlite3_close(db)
template dispose(db: RawStmtPtr) =
discard sqlite3_finalize(db)
proc release[T](x: var AutoDisposed[T]): T =
result = x.val
x.val = nil
proc disposeIfUnreleased[T](x: var AutoDisposed[T]) =
mixin dispose
if x.val != nil:
dispose(x.release)
template checkErr(op, cleanup: untyped) =
if (let v = (op); v != SQLITE_OK):
cleanup
return err($sqlite3_errstr(v))
template checkErr(op) =
checkErr(op): discard
proc init*(
T: type MessageStore,
basePath: string,
name: string = "store",
readOnly = false,
inMemory = false): MessageStoreResult[T] =
var env: AutoDisposed[ptr sqlite3]
defer: disposeIfUnreleased(env)
let
name =
if inMemory: ":memory:"
else: basepath / name & ".sqlite3"
flags =
if readOnly: SQLITE_OPEN_READONLY
else: SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE
if not inMemory:
try:
createDir(basePath)
except OSError, IOError:
return err("`sqlite: cannot create database directory")
checkErr sqlite3_open_v2(name, addr env.val, flags.cint, nil)
template prepare(q: string, cleanup: untyped): ptr sqlite3_stmt =
var s: ptr sqlite3_stmt
checkErr sqlite3_prepare_v2(env.val, q, q.len.cint, addr s, nil):
cleanup
s
template checkExec(s: ptr sqlite3_stmt) =
if (let x = sqlite3_step(s); x != SQLITE_DONE):
discard sqlite3_finalize(s)
return err($sqlite3_errstr(x))
if (let x = sqlite3_finalize(s); x != SQLITE_OK):
return err($sqlite3_errstr(x))
template checkExec(q: string) =
let s = prepare(q): discard
checkExec(s)
template checkWalPragmaResult(journalModePragma: ptr sqlite3_stmt) =
if (let x = sqlite3_step(journalModePragma); x != SQLITE_ROW):
discard sqlite3_finalize(journalModePragma)
return err($sqlite3_errstr(x))
if (let x = sqlite3_column_type(journalModePragma, 0); x != SQLITE3_TEXT):
discard sqlite3_finalize(journalModePragma)
return err($sqlite3_errstr(x))
if (let x = sqlite3_column_text(journalModePragma, 0);
x != "memory" and x != "wal"):
discard sqlite3_finalize(journalModePragma)
return err("Invalid pragma result: " & $x)
# TODO: check current version and implement schema versioning
checkExec "PRAGMA user_version = 1;"
let journalModePragma = prepare("PRAGMA journal_mode = WAL;"): discard
checkWalPragmaResult(journalModePragma)
checkExec(journalModePragma)
## Table is the SQL query for creating the messages Table.
## It contains:
## - 4-Byte ContentTopic stored as an Integer
## - Payload stored as a blob
checkExec """
CREATE TABLE IF NOT EXISTS messages (
id BLOB PRIMARY KEY,
timestamp INTEGER NOT NULL,
contentTopic INTEGER NOT NULL,
payload BLOB
) WITHOUT ROWID;
"""
ok(MessageStore(
env: env.release
))
template prepare(env: Sqlite, q: string, cleanup: untyped): ptr sqlite3_stmt =
var s: ptr sqlite3_stmt
checkErr sqlite3_prepare_v2(env, q, q.len.cint, addr s, nil):
cleanup
s
proc bindParam(s: RawStmtPtr, n: int, val: auto): cint =
when val is openarray[byte]|seq[byte]:
if val.len > 0:
sqlite3_bind_blob(s, n.cint, unsafeAddr val[0], val.len.cint, nil)
else:
sqlite3_bind_blob(s, n.cint, nil, 0.cint, nil)
elif val is int32:
sqlite3_bind_int(s, n.cint, val)
elif val is uint32:
sqlite3_bind_int(s, int(n).cint, int(val).cint)
elif val is int64:
sqlite3_bind_int64(s, n.cint, val)
else:
{.fatal: "Please add support for the 'kek' type".}
proc put*(db: MessageStore, cursor: Index, message: WakuMessage): MessageStoreResult[void] =
## Adds a message to the storage.
##
## **Example:**
##
## .. code-block::
## let res = db.put(message)
## if res.isErr:
## echo "error"
let s = prepare(db.env, "INSERT INTO messages (id, timestamp, contentTopic, payload) VALUES (?, ?, ?, ?);"): discard
checkErr bindParam(s, 1, @(cursor.digest.data))
checkErr bindParam(s, 2, int64(cursor.receivedTime))
checkErr bindParam(s, 3, message.contentTopic)
checkErr bindParam(s, 4, message.payload)
let res =
if (let v = sqlite3_step(s); v != SQLITE_DONE):
err($sqlite3_errstr(v))
else:
ok()
# release implict transaction
discard sqlite3_reset(s) # same return information as step
discard sqlite3_clear_bindings(s) # no errors possible
res
proc close*(db: MessageStore) =
discard sqlite3_close(db.env)
db[] = MessageStore()[]
proc getAll*(db: MessageStore, onData: DataProc): MessageStoreResult[bool] =
## Retreives all messages from the storage.
##
## **Example:**
##
## .. code-block::
## proc data(timestamp: uint64, msg: WakuMessage) =
## echo cast[string](msg.payload)
##
## let res = db.get(data)
## if res.isErr:
## echo "error"
let query = "SELECT timestamp, contentTopic, payload FROM messages"
var s = prepare(db.env, query): discard
try:
var gotResults = false
while true:
let v = sqlite3_step(s)
case v
of SQLITE_ROW:
let
timestamp = sqlite3_column_int64(s, 0)
topic = sqlite3_column_int(s, 1)
p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 2))
l = sqlite3_column_bytes(s, 2)
onData(uint64(timestamp), WakuMessage(contentTopic: ContentTopic(int(topic)), payload: @(toOpenArray(p, 0, l-1))))
gotResults = true
of SQLITE_DONE:
break
else:
return err($sqlite3_errstr(v))
return ok gotResults
finally:
# release implicit transaction
discard sqlite3_reset(s) # same return information as step
discard sqlite3_clear_bindings(s) # no errors possible

View File

@ -11,7 +11,8 @@ import
libp2p/switch,
libp2p/stream/connection,
libp2p/protocols/pubsub/[pubsub, gossipsub],
nimcrypto/sha2
nimcrypto/sha2,
sqlite3_abi
# Constants required for pagination -------------------------------------------
const MaxPageSize* = 100 # Maximum number of waku messages in each page
@ -77,12 +78,19 @@ type
HistoryPeer* = object
peerInfo*: PeerInfo
MessageStoreResult*[T] = Result[T, string]
Sqlite* = ptr sqlite3
MessageStore* = ref object of RootObj
env*: Sqlite
WakuStore* = ref object of LPProtocol
switch*: Switch
rng*: ref BrHmacDrbgContext
peers*: seq[HistoryPeer]
messages*: seq[IndexedWakuMessage]
store*: MessageStore
FilterRequest* = object
contentFilters*: seq[ContentFilter]
@ -212,6 +220,6 @@ proc computeIndex*(msg: WakuMessage): Index =
ctx.update(msg.payload)
let digest = ctx.finish() # computes the hash
ctx.clear()
result.digest = digest
result.receivedTime = epochTime() # gets the unix timestamp

View File

@ -11,7 +11,7 @@ import
libp2p/peerinfo,
libp2p/standard_setup,
../../protocol/v2/[waku_relay, waku_store, waku_filter, message_notifier],
./waku_types
./waku_types, ./message_store
export waku_types
@ -243,9 +243,9 @@ proc mountFilter*(node: WakuNode) =
node.switch.mount(node.wakuFilter)
node.subscriptions.subscribe(WakuFilterCodec, node.wakuFilter.subscription())
proc mountStore*(node: WakuNode) =
proc mountStore*(node: WakuNode, store: MessageStore = nil) =
info "mounting store"
node.wakuStore = WakuStore.init(node.switch, node.rng)
node.wakuStore = WakuStore.init(node.switch, node.rng, store)
node.switch.mount(node.wakuStore)
node.subscriptions.subscribe(WakuStoreCodec, node.wakuStore.subscription())
@ -379,7 +379,16 @@ when isMainModule:
waitFor node.start()
if conf.store:
mountStore(node)
var store: MessageStore
if conf.dbpath != "":
let res = MessageStore.init(conf.dbpath)
if res.isErr:
warn "failed to init MessageStore", err = res.error
else:
store = res.value
mountStore(node, store)
if conf.filter:
mountFilter(node)

View File

@ -8,7 +8,7 @@ import
libp2p/protobuf/minprotobuf,
libp2p/stream/connection,
./message_notifier,
./../../node/v2/waku_types
./../../node/v2/[waku_types, message_store]
logScope:
topics = "wakustore"
@ -124,7 +124,7 @@ proc init*(T: type HistoryResponse, buffer: seq[byte]): ProtoResult[T] =
discard ? pb.getRepeatedField(1, messages)
for buf in messages:
msg.messages.add( ? WakuMessage.init(buf))
msg.messages.add(? WakuMessage.init(buf))
var pagingInfoBuffer: seq[byte]
discard ? pb.getField(2,pagingInfoBuffer)
@ -292,10 +292,21 @@ method init*(ws: WakuStore) =
ws.handler = handle
ws.codec = WakuStoreCodec
proc init*(T: type WakuStore, switch: Switch, rng: ref BrHmacDrbgContext): T =
if ws.store.isNil:
return
proc onData(timestamp: uint64, msg: WakuMessage) =
ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex()))
let res = ws.store.getAll(onData)
if res.isErr:
warn "failed to load messages from store", err = res.error
proc init*(T: type WakuStore, switch: Switch, rng: ref BrHmacDrbgContext, store: MessageStore = nil): T =
new result
result.rng = rng
result.switch = switch
result.store = store
result.init()
# @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY
@ -310,7 +321,12 @@ proc subscription*(proto: WakuStore): MessageNotificationSubscription =
proc handle(topic: string, msg: WakuMessage) {.async.} =
let index = msg.computeIndex()
proto.messages.add(IndexedWakuMessage(msg: msg, index: index))
if proto.store.isNil:
return
let res = proto.store.put(index, msg)
if res.isErr:
warn "failed to store messages", err = res.error
MessageNotificationSubscription.init(@[], handle)