mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-13 16:25:00 +00:00
feature/store-query-node-api (#174)
* changes * fix * changes * made the query function work * added rpc * whoops * Update wakurpc.nim * minor * fixes * update docs * fix * error handling * end-to-end waku node test * Update test_wakunode.nim * Update waku/node/v2/rpc/wakurpc.nim Co-authored-by: Oskar Thorén <ot@oskarthoren.com> * Update test_wakunode.nim * Update wakunode2.nim * fix * fix * shorter * rm echo * fix * added history peer Co-authored-by: Oskar Thorén <ot@oskarthoren.com>
This commit is contained in:
parent
e1414ac922
commit
1f68e63185
@ -53,11 +53,11 @@ method publish*(w: WakuNode, topic: Topic, message: WakuMessage)
|
||||
##
|
||||
## Status: Implemented.
|
||||
|
||||
method query*(w: WakuNode, query: HistoryQuery): HistoryResponse
|
||||
## Queries for historical messages.
|
||||
method query*(w: WakuNode, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} =
|
||||
## Queries known nodes for historical messages. Triggers the handler whenever a response is received.
|
||||
## QueryHandlerFunc is a method that takes a HistoryResponse.
|
||||
##
|
||||
## Status: Not yet implemented.
|
||||
## TODO Implement as wrapper around `waku_store` and send RPC.
|
||||
## Status: Implemented.
|
||||
```
|
||||
|
||||
## JSON RPC
|
||||
|
@ -17,13 +17,6 @@ import
|
||||
|
||||
procSuite "Waku Store":
|
||||
asyncTest "handle query":
|
||||
let
|
||||
proto = WakuStore.init()
|
||||
subscription = proto.subscription()
|
||||
|
||||
var subscriptions = newTable[string, MessageNotificationSubscription]()
|
||||
subscriptions["test"] = subscription
|
||||
|
||||
let
|
||||
key = PrivateKey.random(ECDSA, rng[]).get()
|
||||
peer = PeerInfo.init(key)
|
||||
@ -34,22 +27,33 @@ procSuite "Waku Store":
|
||||
discard await dialSwitch.start()
|
||||
|
||||
var listenSwitch = newStandardSwitch(some(key))
|
||||
listenSwitch.mount(proto)
|
||||
discard await listenSwitch.start()
|
||||
|
||||
let
|
||||
proto = WakuStore.init(dialSwitch)
|
||||
subscription = proto.subscription()
|
||||
rpc = HistoryQuery(uuid: "1234", topics: @["topic"])
|
||||
|
||||
proto.setPeer(listenSwitch.peerInfo)
|
||||
|
||||
var subscriptions = newTable[string, MessageNotificationSubscription]()
|
||||
subscriptions["test"] = subscription
|
||||
|
||||
listenSwitch.mount(proto)
|
||||
|
||||
await subscriptions.notify("foo", msg)
|
||||
await subscriptions.notify("foo", msg2)
|
||||
|
||||
let conn = await dialSwitch.dial(listenSwitch.peerInfo.peerId, listenSwitch.peerInfo.addrs, WakuStoreCodec)
|
||||
var completionFut = newFuture[bool]()
|
||||
|
||||
var rpc = HistoryQuery(uuid: "1234", topics: @["topic"])
|
||||
await conn.writeLP(rpc.encode().buffer)
|
||||
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
|
||||
check:
|
||||
response.uuid == rpc.uuid
|
||||
response.messages.len() == 1
|
||||
response.messages[0] == msg
|
||||
completionFut.complete(true)
|
||||
|
||||
var message = await conn.readLp(64*1024)
|
||||
let response = HistoryResponse.init(message)
|
||||
await proto.query(rpc, handler)
|
||||
|
||||
check:
|
||||
response.isErr == false
|
||||
response.value.uuid == rpc.uuid
|
||||
response.value.messages.len() == 1
|
||||
response.value.messages[0] == msg
|
||||
(await completionFut.withTimeout(5.seconds)) == true
|
||||
|
@ -7,7 +7,7 @@ import
|
||||
libp2p/crypto/secp,
|
||||
libp2p/switch,
|
||||
eth/keys,
|
||||
../../waku/protocol/v2/waku_relay,
|
||||
../../waku/protocol/v2/[waku_relay, waku_store, message_notifier],
|
||||
../../waku/node/v2/[wakunode2, waku_types],
|
||||
../test_helpers
|
||||
|
||||
@ -108,4 +108,41 @@ procSuite "WakuNode":
|
||||
|
||||
check:
|
||||
(await completionFut.withTimeout(5.seconds)) == true
|
||||
await allFutures([node1.stop(), node2.stop()])
|
||||
await node1.stop()
|
||||
await node2.stop()
|
||||
|
||||
asyncTest "Store protocol returns expected message":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60002))
|
||||
contentTopic = "foobar"
|
||||
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
|
||||
uuid = "123"
|
||||
|
||||
var completionFut = newFuture[bool]()
|
||||
|
||||
await node1.start()
|
||||
await node2.start()
|
||||
|
||||
await node2.subscriptions.notify("waku", message)
|
||||
|
||||
await sleepAsync(2000.millis)
|
||||
|
||||
node1.wakuStore.setPeer(node2.peerInfo)
|
||||
|
||||
proc storeHandler(response: HistoryResponse) {.gcsafe, closure.} =
|
||||
check:
|
||||
response.uuid == uuid
|
||||
response.messages[0] == message
|
||||
completionFut.complete(true)
|
||||
|
||||
await node1.query(HistoryQuery(uuid: uuid, topics: @[contentTopic]), storeHandler)
|
||||
|
||||
check:
|
||||
(await completionFut.withTimeout(5.seconds)) == true
|
||||
await node1.stop()
|
||||
await node2.stop()
|
||||
|
@ -5,6 +5,7 @@ proc waku_publish(topic: string, message: seq[byte]): bool
|
||||
# TODO This should be properly done with rpc types, etc.
|
||||
proc waku_publish2(topic: string, message: seq[byte]): bool
|
||||
proc waku_subscribe(topic: string): bool
|
||||
proc waku_query(uuid: string, topics: seq[string]): bool
|
||||
#proc waku_subscribe(topic: string, handler: Topichandler): bool
|
||||
|
||||
# NYI
|
||||
|
@ -65,3 +65,13 @@ proc setupWakuRPC*(node: WakuNode, rpcsrv: RpcServer) =
|
||||
return true
|
||||
#if not result:
|
||||
# raise newException(ValueError, "Message could not be posted")
|
||||
|
||||
rpcsrv.rpc("waku_query") do(uuid: string, topics: seq[string]) -> bool:
|
||||
debug "waku_query", uuid=uuid
|
||||
|
||||
# XXX: Hacky in-line handler
|
||||
proc handler(response: HistoryResponse) {.gcsafe.} =
|
||||
info "Hit response handler", uuid=response.uuid, messages=response.messages
|
||||
|
||||
await node.query(HistoryQuery(uuid: uuid, topics: topics), handler)
|
||||
return true
|
||||
|
@ -8,6 +8,7 @@ import
|
||||
libp2p/[switch, peerinfo, multiaddress, crypto/crypto],
|
||||
libp2p/protobuf/minprotobuf,
|
||||
libp2p/protocols/protocol,
|
||||
libp2p/switch,
|
||||
libp2p/stream/connection,
|
||||
libp2p/protocols/pubsub/[pubsub, floodsub, gossipsub]
|
||||
|
||||
@ -43,6 +44,8 @@ type
|
||||
topics*: seq[string] # @TODO TOPIC
|
||||
handler*: MessageNotificationHandler
|
||||
|
||||
QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.}
|
||||
|
||||
HistoryQuery* = object
|
||||
uuid*: string
|
||||
topics*: seq[string]
|
||||
@ -51,7 +54,12 @@ type
|
||||
uuid*: string
|
||||
messages*: seq[WakuMessage]
|
||||
|
||||
HistoryPeer* = object
|
||||
peerInfo*: PeerInfo
|
||||
|
||||
WakuStore* = ref object of LPProtocol
|
||||
switch*: Switch
|
||||
peers*: seq[HistoryPeer]
|
||||
messages*: seq[WakuMessage]
|
||||
|
||||
FilterRequest* = object
|
||||
|
@ -102,7 +102,8 @@ proc start*(node: WakuNode) {.async.} =
|
||||
node.libp2pTransportLoops = await node.switch.start()
|
||||
|
||||
# NOTE WakuRelay is being instantiated as part of initing node
|
||||
node.wakuStore = WakuStore.init()
|
||||
|
||||
node.wakuStore = WakuStore.init(node.switch)
|
||||
node.switch.mount(node.wakuStore)
|
||||
node.subscriptions.subscribe(WakuStoreCodec, node.wakuStore.subscription())
|
||||
|
||||
@ -185,15 +186,12 @@ proc publish*(node: WakuNode, topic: Topic, message: WakuMessage) =
|
||||
# XXX Consider awaiting here
|
||||
discard wakuRelay.publish(topic, data)
|
||||
|
||||
proc query*(w: WakuNode, query: HistoryQuery): HistoryResponse =
|
||||
## Queries for historical messages.
|
||||
proc query*(w: WakuNode, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} =
|
||||
## Queries known nodes for historical messages. Triggers the handler whenever a response is received.
|
||||
## QueryHandlerFunc is a method that takes a HistoryResponse.
|
||||
##
|
||||
## Status: Not yet implemented.
|
||||
## TODO Implement as wrapper around `waku_store` and send RPC.
|
||||
|
||||
# XXX Unclear how this should be hooked up, Message or WakuMessage?
|
||||
# result.messages.insert(msg[1])
|
||||
discard
|
||||
## Status: Implemented.
|
||||
await w.wakuStore.query(query, handler)
|
||||
|
||||
when isMainModule:
|
||||
import
|
||||
|
@ -1,6 +1,7 @@
|
||||
import
|
||||
std/tables,
|
||||
chronos, chronicles, metrics, stew/results,
|
||||
libp2p/switch,
|
||||
libp2p/protocols/protocol,
|
||||
libp2p/protobuf/minprotobuf,
|
||||
libp2p/stream/connection,
|
||||
@ -55,7 +56,7 @@ proc encode*(response: HistoryResponse): ProtoBuffer =
|
||||
for msg in response.messages:
|
||||
result.write(2, msg.encode())
|
||||
|
||||
proc query(w: WakuStore, query: HistoryQuery): HistoryResponse =
|
||||
proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse =
|
||||
result = HistoryResponse(uuid: query.uuid, messages: newSeq[WakuMessage]())
|
||||
for msg in w.messages:
|
||||
if msg.contentTopic in query.topics:
|
||||
@ -70,17 +71,22 @@ method init*(ws: WakuStore) =
|
||||
|
||||
info "received query"
|
||||
|
||||
let res = ws.query(rpc.value)
|
||||
let res = ws.findMessages(rpc.value)
|
||||
|
||||
await conn.writeLp(res.encode().buffer)
|
||||
|
||||
ws.handler = handle
|
||||
ws.codec = WakuStoreCodec
|
||||
|
||||
proc init*(T: type WakuStore): T =
|
||||
proc init*(T: type WakuStore, switch: Switch): T =
|
||||
new result
|
||||
result.switch = switch
|
||||
result.init()
|
||||
|
||||
# @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY
|
||||
proc setPeer*(ws: WakuStore, peer: PeerInfo) =
|
||||
ws.peers.add(HistoryPeer(peerInfo: peer))
|
||||
|
||||
proc subscription*(proto: WakuStore): MessageNotificationSubscription =
|
||||
## The filter function returns the pubsub filter for the node.
|
||||
## This is used to pipe messages into the storage, therefore
|
||||
@ -90,3 +96,25 @@ proc subscription*(proto: WakuStore): MessageNotificationSubscription =
|
||||
proto.messages.add(msg)
|
||||
|
||||
MessageNotificationSubscription.init(@[], handle)
|
||||
|
||||
proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} =
|
||||
# @TODO We need to be more stratigic about which peers we dial. Right now we just set one on the service.
|
||||
# Ideally depending on the query and our set of peers we take a subset of ideal peers.
|
||||
# This will require us to check for various factors such as:
|
||||
# - which topics they track
|
||||
# - latency?
|
||||
# - default store peer?
|
||||
|
||||
let peer = w.peers[0]
|
||||
let conn = await w.switch.dial(peer.peerInfo.peerId, peer.peerInfo.addrs, WakuStoreCodec)
|
||||
|
||||
await conn.writeLP(query.encode().buffer)
|
||||
|
||||
var message = await conn.readLp(64*1024)
|
||||
let response = HistoryResponse.init(message)
|
||||
|
||||
if response.isErr:
|
||||
error "failed to decode response"
|
||||
return
|
||||
|
||||
handler(response.value)
|
||||
|
Loading…
x
Reference in New Issue
Block a user