mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-28 15:00:57 +00:00
Extract MessageStore interface (#350)
This commit is contained in:
parent
76c0d2b839
commit
e8dd014079
@ -2,16 +2,15 @@
|
|||||||
import
|
import
|
||||||
std/[unittest, options, tables, sets],
|
std/[unittest, options, tables, sets],
|
||||||
chronos, chronicles,
|
chronos, chronicles,
|
||||||
../../waku/v2/node/message_store,
|
../../waku/v2/node/message_store/waku_message_store,
|
||||||
../../waku/v2/protocol/waku_store/waku_store,
|
../../waku/v2/protocol/waku_store/waku_store,
|
||||||
./utils,
|
./utils
|
||||||
../../waku/v2/node/sqlite
|
|
||||||
|
|
||||||
suite "Message Store":
|
suite "Message Store":
|
||||||
test "set and get works":
|
test "set and get works":
|
||||||
let
|
let
|
||||||
database = SqliteDatabase.init("", inMemory = true)[]
|
database = SqliteDatabase.init("", inMemory = true)[]
|
||||||
store = MessageStore.init(database)[]
|
store = WakuMessageStore.init(database)[]
|
||||||
topic = ContentTopic(1)
|
topic = ContentTopic(1)
|
||||||
|
|
||||||
var msgs = @[
|
var msgs = @[
|
||||||
|
@ -10,7 +10,7 @@ import
|
|||||||
libp2p/protocols/pubsub/rpc/message,
|
libp2p/protocols/pubsub/rpc/message,
|
||||||
../../waku/v2/protocol/[waku_message, message_notifier],
|
../../waku/v2/protocol/[waku_message, message_notifier],
|
||||||
../../waku/v2/protocol/waku_store/waku_store,
|
../../waku/v2/protocol/waku_store/waku_store,
|
||||||
../../waku/v2/node/[message_store, sqlite],
|
../../waku/v2/node/message_store/waku_message_store,
|
||||||
../test_helpers, ./utils
|
../test_helpers, ./utils
|
||||||
|
|
||||||
procSuite "Waku Store":
|
procSuite "Waku Store":
|
||||||
@ -62,7 +62,7 @@ procSuite "Waku Store":
|
|||||||
peer = PeerInfo.init(key)
|
peer = PeerInfo.init(key)
|
||||||
topic = ContentTopic(1)
|
topic = ContentTopic(1)
|
||||||
database = SqliteDatabase.init("", inMemory = true)[]
|
database = SqliteDatabase.init("", inMemory = true)[]
|
||||||
store = MessageStore.init(database)[]
|
store = WakuMessageStore.init(database)[]
|
||||||
msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic)
|
msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic)
|
||||||
msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic(2))
|
msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic(2))
|
||||||
|
|
||||||
|
@ -3,7 +3,7 @@ import
|
|||||||
eth/keys,
|
eth/keys,
|
||||||
../../../v1/node/rpc/hexstrings,
|
../../../v1/node/rpc/hexstrings,
|
||||||
../../protocol/waku_store/waku_store_types,
|
../../protocol/waku_store/waku_store_types,
|
||||||
../wakunode2, ../waku_payload,
|
../waku_payload,
|
||||||
./jsonrpc_types
|
./jsonrpc_types
|
||||||
|
|
||||||
export hexstrings
|
export hexstrings
|
||||||
|
19
waku/v2/node/message_store/message_store.nim
Normal file
19
waku/v2/node/message_store/message_store.nim
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
import
|
||||||
|
stew/results,
|
||||||
|
../../protocol/waku_message,
|
||||||
|
../../utils/pagination
|
||||||
|
|
||||||
|
## This module defines a message store interface. Implementations of
|
||||||
|
## MessageStore are used by the `WakuStore` protocol to store and re-
|
||||||
|
## trieve historical messages
|
||||||
|
|
||||||
|
type
|
||||||
|
DataProc* = proc(timestamp: uint64, msg: WakuMessage) {.closure.}
|
||||||
|
|
||||||
|
MessageStoreResult*[T] = Result[T, string]
|
||||||
|
|
||||||
|
MessageStore* = ref object of RootObj
|
||||||
|
|
||||||
|
# MessageStore interface
|
||||||
|
method put*(db: MessageStore, cursor: Index, message: WakuMessage): MessageStoreResult[void] {.base.} = discard
|
||||||
|
method getAll*(db: MessageStore, onData: DataProc): MessageStoreResult[bool] {.base.} = discard
|
@ -1,14 +1,17 @@
|
|||||||
import
|
import
|
||||||
sqlite3_abi,
|
sqlite3_abi,
|
||||||
chronos, metrics, stew/results,
|
chronos, metrics,
|
||||||
libp2p/crypto/crypto,
|
libp2p/crypto/crypto,
|
||||||
libp2p/protocols/protocol,
|
libp2p/protocols/protocol,
|
||||||
libp2p/protobuf/minprotobuf,
|
libp2p/protobuf/minprotobuf,
|
||||||
libp2p/stream/connection,
|
libp2p/stream/connection,
|
||||||
stew/results, metrics,
|
stew/results, metrics,
|
||||||
./sqlite,
|
./sqlite,
|
||||||
../protocol/waku_message,
|
./message_store,
|
||||||
../utils/pagination
|
../../protocol/waku_message,
|
||||||
|
../../utils/pagination
|
||||||
|
|
||||||
|
export sqlite
|
||||||
|
|
||||||
# The code in this file is an adaptation of the Sqlite KV Store found in nim-eth.
|
# The code in this file is an adaptation of the Sqlite KV Store found in nim-eth.
|
||||||
# https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim
|
# https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim
|
||||||
@ -16,14 +19,10 @@ import
|
|||||||
# Most of it is a direct copy, the only unique functions being `get` and `put`.
|
# Most of it is a direct copy, the only unique functions being `get` and `put`.
|
||||||
|
|
||||||
type
|
type
|
||||||
DataProc* = proc(timestamp: uint64, msg: WakuMessage) {.closure.}
|
WakuMessageStore* = ref object of MessageStore
|
||||||
|
|
||||||
MessageStoreResult*[T] = Result[T, string]
|
|
||||||
|
|
||||||
MessageStore* = ref object of RootObj
|
|
||||||
database*: SqliteDatabase
|
database*: SqliteDatabase
|
||||||
|
|
||||||
proc init*(T: type MessageStore, db: SqliteDatabase): MessageStoreResult[T] =
|
proc init*(T: type WakuMessageStore, db: SqliteDatabase): MessageStoreResult[T] =
|
||||||
## Table is the SQL query for creating the messages Table.
|
## Table is the SQL query for creating the messages Table.
|
||||||
## It contains:
|
## It contains:
|
||||||
## - 4-Byte ContentTopic stored as an Integer
|
## - 4-Byte ContentTopic stored as an Integer
|
||||||
@ -44,9 +43,9 @@ proc init*(T: type MessageStore, db: SqliteDatabase): MessageStoreResult[T] =
|
|||||||
if res.isErr:
|
if res.isErr:
|
||||||
return err("failed to exec")
|
return err("failed to exec")
|
||||||
|
|
||||||
ok(MessageStore(database: db))
|
ok(WakuMessageStore(database: db))
|
||||||
|
|
||||||
proc put*(db: MessageStore, cursor: Index, message: WakuMessage): MessageStoreResult[void] =
|
method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage): MessageStoreResult[void] =
|
||||||
## Adds a message to the storage.
|
## Adds a message to the storage.
|
||||||
##
|
##
|
||||||
## **Example:**
|
## **Example:**
|
||||||
@ -71,7 +70,7 @@ proc put*(db: MessageStore, cursor: Index, message: WakuMessage): MessageStoreRe
|
|||||||
|
|
||||||
ok()
|
ok()
|
||||||
|
|
||||||
proc getAll*(db: MessageStore, onData: DataProc): MessageStoreResult[bool] =
|
method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageStoreResult[bool] =
|
||||||
## Retreives all messages from the storage.
|
## Retreives all messages from the storage.
|
||||||
##
|
##
|
||||||
## **Example:**
|
## **Example:**
|
||||||
@ -100,6 +99,6 @@ proc getAll*(db: MessageStore, onData: DataProc): MessageStoreResult[bool] =
|
|||||||
|
|
||||||
ok gotMessages
|
ok gotMessages
|
||||||
|
|
||||||
proc close*(db: MessageStore) =
|
proc close*(db: WakuMessageStore) =
|
||||||
## Closes the database.
|
## Closes the database.
|
||||||
db.database.close()
|
db.database.close()
|
@ -14,8 +14,7 @@ import
|
|||||||
../protocol/waku_store/waku_store,
|
../protocol/waku_store/waku_store,
|
||||||
../protocol/waku_swap/waku_swap,
|
../protocol/waku_swap/waku_swap,
|
||||||
../protocol/waku_filter/waku_filter,
|
../protocol/waku_filter/waku_filter,
|
||||||
./message_store,
|
./message_store/message_store,
|
||||||
./sqlite,
|
|
||||||
../utils/requests
|
../utils/requests
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
@ -406,6 +405,7 @@ when isMainModule:
|
|||||||
private_api,
|
private_api,
|
||||||
relay_api,
|
relay_api,
|
||||||
store_api],
|
store_api],
|
||||||
|
./message_store/waku_message_store,
|
||||||
../../common/utils/nat
|
../../common/utils/nat
|
||||||
|
|
||||||
proc startRpc(node: WakuNode, rpcIp: ValidIpAddress, rpcPort: Port, conf: WakuNodeConf) =
|
proc startRpc(node: WakuNode, rpcIp: ValidIpAddress, rpcPort: Port, conf: WakuNodeConf) =
|
||||||
@ -473,16 +473,16 @@ when isMainModule:
|
|||||||
# TODO Set swap peer, for now should be same as store peer
|
# TODO Set swap peer, for now should be same as store peer
|
||||||
|
|
||||||
if conf.store:
|
if conf.store:
|
||||||
var store: MessageStore
|
var store: WakuMessageStore
|
||||||
|
|
||||||
if conf.dbpath != "":
|
if conf.dbpath != "":
|
||||||
let dbRes = SqliteDatabase.init(conf.dbpath)
|
let dbRes = SqliteDatabase.init(conf.dbpath)
|
||||||
if dbRes.isErr:
|
if dbRes.isErr:
|
||||||
warn "failed to init database", err = dbRes.error
|
warn "failed to init database", err = dbRes.error
|
||||||
|
|
||||||
let res = MessageStore.init(dbRes.value)
|
let res = WakuMessageStore.init(dbRes.value)
|
||||||
if res.isErr:
|
if res.isErr:
|
||||||
warn "failed to init MessageStore", err = res.error
|
warn "failed to init WakuMessageStore", err = res.error
|
||||||
else:
|
else:
|
||||||
store = res.value
|
store = res.value
|
||||||
|
|
||||||
|
@ -12,7 +12,7 @@ import
|
|||||||
libp2p/protobuf/minprotobuf,
|
libp2p/protobuf/minprotobuf,
|
||||||
libp2p/stream/connection,
|
libp2p/stream/connection,
|
||||||
../message_notifier,
|
../message_notifier,
|
||||||
../../node/message_store,
|
../../node/message_store/message_store,
|
||||||
../waku_swap/waku_swap,
|
../waku_swap/waku_swap,
|
||||||
./waku_store_types,
|
./waku_store_types,
|
||||||
../../utils/requests
|
../../utils/requests
|
||||||
|
@ -6,7 +6,7 @@ import
|
|||||||
libp2p/protocols/protocol,
|
libp2p/protocols/protocol,
|
||||||
../waku_swap/waku_swap_types,
|
../waku_swap/waku_swap_types,
|
||||||
../waku_message,
|
../waku_message,
|
||||||
../../node/message_store,
|
../../node/message_store/message_store,
|
||||||
../../utils/pagination
|
../../utils/pagination
|
||||||
|
|
||||||
export waku_message
|
export waku_message
|
||||||
|
Loading…
x
Reference in New Issue
Block a user