diff --git a/docs/api/v2/node.md b/docs/api/v2/node.md index b36ce4861..ab6d0de70 100644 --- a/docs/api/v2/node.md +++ b/docs/api/v2/node.md @@ -53,11 +53,11 @@ method publish*(w: WakuNode, topic: Topic, message: WakuMessage) ## ## Status: Implemented. -method query*(w: WakuNode, query: HistoryQuery): HistoryResponse - ## Queries for historical messages. +method query*(w: WakuNode, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} = + ## Queries known nodes for historical messages. Triggers the handler whenever a response is received. + ## QueryHandlerFunc is a method that takes a HistoryResponse. ## - ## Status: Not yet implemented. - ## TODO Implement as wrapper around `waku_store` and send RPC. + ## Status: Implemented. ``` ## JSON RPC diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index f2d3a5270..b1880ee92 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -17,13 +17,6 @@ import procSuite "Waku Store": asyncTest "handle query": - let - proto = WakuStore.init() - subscription = proto.subscription() - - var subscriptions = newTable[string, MessageNotificationSubscription]() - subscriptions["test"] = subscription - let key = PrivateKey.random(ECDSA, rng[]).get() peer = PeerInfo.init(key) @@ -34,22 +27,33 @@ procSuite "Waku Store": discard await dialSwitch.start() var listenSwitch = newStandardSwitch(some(key)) - listenSwitch.mount(proto) discard await listenSwitch.start() + let + proto = WakuStore.init(dialSwitch) + subscription = proto.subscription() + rpc = HistoryQuery(uuid: "1234", topics: @["topic"]) + + proto.setPeer(listenSwitch.peerInfo) + + var subscriptions = newTable[string, MessageNotificationSubscription]() + subscriptions["test"] = subscription + + listenSwitch.mount(proto) + await subscriptions.notify("foo", msg) await subscriptions.notify("foo", msg2) - let conn = await dialSwitch.dial(listenSwitch.peerInfo.peerId, listenSwitch.peerInfo.addrs, WakuStoreCodec) + var completionFut = newFuture[bool]() - var rpc = HistoryQuery(uuid: "1234", topics: @["topic"]) - await conn.writeLP(rpc.encode().buffer) + proc handler(response: HistoryResponse) {.gcsafe, closure.} = + check: + response.uuid == rpc.uuid + response.messages.len() == 1 + response.messages[0] == msg + completionFut.complete(true) - var message = await conn.readLp(64*1024) - let response = HistoryResponse.init(message) + await proto.query(rpc, handler) check: - response.isErr == false - response.value.uuid == rpc.uuid - response.value.messages.len() == 1 - response.value.messages[0] == msg + (await completionFut.withTimeout(5.seconds)) == true diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index f7d09a073..c1a3d99aa 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -7,7 +7,7 @@ import libp2p/crypto/secp, libp2p/switch, eth/keys, - ../../waku/protocol/v2/waku_relay, + ../../waku/protocol/v2/[waku_relay, waku_store, message_notifier], ../../waku/node/v2/[wakunode2, waku_types], ../test_helpers @@ -108,4 +108,41 @@ procSuite "WakuNode": check: (await completionFut.withTimeout(5.seconds)) == true - await allFutures([node1.stop(), node2.stop()]) + await node1.stop() + await node2.stop() + + asyncTest "Store protocol returns expected message": + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"), + Port(60000)) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), + Port(60002)) + contentTopic = "foobar" + message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) + uuid = "123" + + var completionFut = newFuture[bool]() + + await node1.start() + await node2.start() + + await node2.subscriptions.notify("waku", message) + + await sleepAsync(2000.millis) + + node1.wakuStore.setPeer(node2.peerInfo) + + proc storeHandler(response: HistoryResponse) {.gcsafe, closure.} = + check: + response.uuid == uuid + response.messages[0] == message + completionFut.complete(true) + + await node1.query(HistoryQuery(uuid: uuid, topics: @[contentTopic]), storeHandler) + + check: + (await completionFut.withTimeout(5.seconds)) == true + await node1.stop() + await node2.stop() diff --git a/waku/node/v2/rpc/wakucallsigs.nim b/waku/node/v2/rpc/wakucallsigs.nim index 3ee5c9d75..072f4a3c8 100644 --- a/waku/node/v2/rpc/wakucallsigs.nim +++ b/waku/node/v2/rpc/wakucallsigs.nim @@ -5,6 +5,7 @@ proc waku_publish(topic: string, message: seq[byte]): bool # TODO This should be properly done with rpc types, etc. proc waku_publish2(topic: string, message: seq[byte]): bool proc waku_subscribe(topic: string): bool +proc waku_query(uuid: string, topics: seq[string]): bool #proc waku_subscribe(topic: string, handler: Topichandler): bool # NYI diff --git a/waku/node/v2/rpc/wakurpc.nim b/waku/node/v2/rpc/wakurpc.nim index 2e656d01e..6c862fed7 100644 --- a/waku/node/v2/rpc/wakurpc.nim +++ b/waku/node/v2/rpc/wakurpc.nim @@ -65,3 +65,13 @@ proc setupWakuRPC*(node: WakuNode, rpcsrv: RpcServer) = return true #if not result: # raise newException(ValueError, "Message could not be posted") + + rpcsrv.rpc("waku_query") do(uuid: string, topics: seq[string]) -> bool: + debug "waku_query", uuid=uuid + + # XXX: Hacky in-line handler + proc handler(response: HistoryResponse) {.gcsafe.} = + info "Hit response handler", uuid=response.uuid, messages=response.messages + + await node.query(HistoryQuery(uuid: uuid, topics: topics), handler) + return true diff --git a/waku/node/v2/waku_types.nim b/waku/node/v2/waku_types.nim index 68546fe47..a140fc5ba 100644 --- a/waku/node/v2/waku_types.nim +++ b/waku/node/v2/waku_types.nim @@ -8,6 +8,7 @@ import libp2p/[switch, peerinfo, multiaddress, crypto/crypto], libp2p/protobuf/minprotobuf, libp2p/protocols/protocol, + libp2p/switch, libp2p/stream/connection, libp2p/protocols/pubsub/[pubsub, floodsub, gossipsub] @@ -43,6 +44,8 @@ type topics*: seq[string] # @TODO TOPIC handler*: MessageNotificationHandler + QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.} + HistoryQuery* = object uuid*: string topics*: seq[string] @@ -51,7 +54,12 @@ type uuid*: string messages*: seq[WakuMessage] + HistoryPeer* = object + peerInfo*: PeerInfo + WakuStore* = ref object of LPProtocol + switch*: Switch + peers*: seq[HistoryPeer] messages*: seq[WakuMessage] FilterRequest* = object diff --git a/waku/node/v2/wakunode2.nim b/waku/node/v2/wakunode2.nim index fded29b12..661dfb4db 100644 --- a/waku/node/v2/wakunode2.nim +++ b/waku/node/v2/wakunode2.nim @@ -102,7 +102,8 @@ proc start*(node: WakuNode) {.async.} = node.libp2pTransportLoops = await node.switch.start() # NOTE WakuRelay is being instantiated as part of initing node - node.wakuStore = WakuStore.init() + + node.wakuStore = WakuStore.init(node.switch) node.switch.mount(node.wakuStore) node.subscriptions.subscribe(WakuStoreCodec, node.wakuStore.subscription()) @@ -185,15 +186,12 @@ proc publish*(node: WakuNode, topic: Topic, message: WakuMessage) = # XXX Consider awaiting here discard wakuRelay.publish(topic, data) -proc query*(w: WakuNode, query: HistoryQuery): HistoryResponse = - ## Queries for historical messages. +proc query*(w: WakuNode, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} = + ## Queries known nodes for historical messages. Triggers the handler whenever a response is received. + ## QueryHandlerFunc is a method that takes a HistoryResponse. ## - ## Status: Not yet implemented. - ## TODO Implement as wrapper around `waku_store` and send RPC. - - # XXX Unclear how this should be hooked up, Message or WakuMessage? - # result.messages.insert(msg[1]) - discard + ## Status: Implemented. + await w.wakuStore.query(query, handler) when isMainModule: import diff --git a/waku/protocol/v2/waku_store.nim b/waku/protocol/v2/waku_store.nim index b756fe36f..bb46c4e7f 100644 --- a/waku/protocol/v2/waku_store.nim +++ b/waku/protocol/v2/waku_store.nim @@ -1,6 +1,7 @@ import std/tables, chronos, chronicles, metrics, stew/results, + libp2p/switch, libp2p/protocols/protocol, libp2p/protobuf/minprotobuf, libp2p/stream/connection, @@ -55,7 +56,7 @@ proc encode*(response: HistoryResponse): ProtoBuffer = for msg in response.messages: result.write(2, msg.encode()) -proc query(w: WakuStore, query: HistoryQuery): HistoryResponse = +proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse = result = HistoryResponse(uuid: query.uuid, messages: newSeq[WakuMessage]()) for msg in w.messages: if msg.contentTopic in query.topics: @@ -70,17 +71,22 @@ method init*(ws: WakuStore) = info "received query" - let res = ws.query(rpc.value) + let res = ws.findMessages(rpc.value) await conn.writeLp(res.encode().buffer) ws.handler = handle ws.codec = WakuStoreCodec -proc init*(T: type WakuStore): T = +proc init*(T: type WakuStore, switch: Switch): T = new result + result.switch = switch result.init() +# @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY +proc setPeer*(ws: WakuStore, peer: PeerInfo) = + ws.peers.add(HistoryPeer(peerInfo: peer)) + proc subscription*(proto: WakuStore): MessageNotificationSubscription = ## The filter function returns the pubsub filter for the node. ## This is used to pipe messages into the storage, therefore @@ -90,3 +96,25 @@ proc subscription*(proto: WakuStore): MessageNotificationSubscription = proto.messages.add(msg) MessageNotificationSubscription.init(@[], handle) + +proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} = + # @TODO We need to be more stratigic about which peers we dial. Right now we just set one on the service. + # Ideally depending on the query and our set of peers we take a subset of ideal peers. + # This will require us to check for various factors such as: + # - which topics they track + # - latency? + # - default store peer? + + let peer = w.peers[0] + let conn = await w.switch.dial(peer.peerInfo.peerId, peer.peerInfo.addrs, WakuStoreCodec) + + await conn.writeLP(query.encode().buffer) + + var message = await conn.readLp(64*1024) + let response = HistoryResponse.init(message) + + if response.isErr: + error "failed to decode response" + return + + handler(response.value)