From 135eaae9fb614c6de597ed445f5299691053fccd Mon Sep 17 00:00:00 2001 From: Hanno Cornelius <68783915+jm-clius@users.noreply.github.com> Date: Tue, 24 Nov 2020 05:44:37 +0200 Subject: [PATCH] Waku v2 JSON-RPC REST API: Store protocol proof of concept (#263) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Waku V2 history query POC * Fix folder structure * Improve test clarity * Improve imports, returns and some naming * Changed naming conventions. Refactor & improve. Co-authored-by: Oskar Thorén --- tests/all_tests_v2.nim | 3 +- tests/v2/test_jsonrpc_waku.nim | 94 +++++++++++++++++++++++ tests/v2/test_rpc_waku.nim | 4 + waku/v2/node/jsonrpc/jsonrpc_callsigs.nim | 1 + waku/v2/node/jsonrpc/jsonrpc_types.nim | 14 ++++ waku/v2/node/jsonrpc/jsonrpc_utils.nim | 23 ++++++ waku/v2/node/jsonrpc/store_api.nim | 33 ++++++++ 7 files changed, 171 insertions(+), 1 deletion(-) create mode 100644 tests/v2/test_jsonrpc_waku.nim create mode 100644 waku/v2/node/jsonrpc/jsonrpc_callsigs.nim create mode 100644 waku/v2/node/jsonrpc/jsonrpc_types.nim create mode 100644 waku/v2/node/jsonrpc/jsonrpc_utils.nim create mode 100644 waku/v2/node/jsonrpc/store_api.nim diff --git a/tests/all_tests_v2.nim b/tests/all_tests_v2.nim index d306e8838..17cdc2f76 100644 --- a/tests/all_tests_v2.nim +++ b/tests/all_tests_v2.nim @@ -9,4 +9,5 @@ import ./v2/test_waku_payload, ./v2/test_rpc_waku, ./v2/test_waku_swap, - ./v2/test_message_store + ./v2/test_message_store, + ./v2/test_jsonrpc_waku diff --git a/tests/v2/test_jsonrpc_waku.nim b/tests/v2/test_jsonrpc_waku.nim new file mode 100644 index 000000000..413eb9d5b --- /dev/null +++ b/tests/v2/test_jsonrpc_waku.nim @@ -0,0 +1,94 @@ +import + std/[unittest, options, sets, tables, os, strutils], + stew/shims/net as stewNet, + json_rpc/[rpcserver, rpcclient], + libp2p/standard_setup, + libp2p/switch, + libp2p/protobuf/minprotobuf, + libp2p/stream/[bufferstream, connection], + libp2p/crypto/crypto, + libp2p/protocols/pubsub/rpc/message, + ../../waku/v2/waku_types, + ../../waku/v2/node/wakunode2, + ../../waku/v2/node/jsonrpc/[jsonrpc_types,store_api], + ../../waku/v2/protocol/[waku_store, message_notifier], + ../test_helpers + +template sourceDir*: string = currentSourcePath.rsplit(DirSep, 1)[0] +const sigPath = sourceDir / ParDir / ParDir / "waku" / "v2" / "node" / "jsonrpc" / "jsonrpc_callsigs.nim" +createRpcSigs(RpcHttpClient, sigPath) + +suite "Waku v2 JSON-RPC API": + + asyncTest "get_waku_v2_store_v1_messages": + const defaultTopic = "/waku/2/default-waku/proto" + const testCodec = "/waku/2/default-waku/codec" + + # WakuNode setup + let + rng = crypto.newRng() + privkey = crypto.PrivateKey.random(Secp256k1, rng[]).tryGet() + bindIp = ValidIpAddress.init("0.0.0.0") + extIp = ValidIpAddress.init("127.0.0.1") + port = Port(9000) + node = WakuNode.init(privkey, bindIp, port, some(extIp), some(port)) + + waitFor node.start() + + waitFor node.mountRelay(@[defaultTopic]) + + # RPC server setup + let + rpcPort = Port(8545) + ta = initTAddress(bindIp, rpcPort) + server = newRpcHttpServer([ta]) + + setupWakuJSONRPC(node, server) + server.start() + + # WakuStore setup + let + key = wakunode2.PrivateKey.random(ECDSA, rng[]).get() + peer = PeerInfo.init(key) + + node.mountStore() + let + subscription = node.wakuStore.subscription() + + var listenSwitch = newStandardSwitch(some(key)) + discard waitFor listenSwitch.start() + + node.wakuStore.setPeer(listenSwitch.peerInfo) + + listenSwitch.mount(node.wakuStore) + + var subscriptions = newTable[string, MessageNotificationSubscription]() + subscriptions[testCodec] = subscription + + # Now prime it with some history before tests + var + msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic(2)), + WakuMessage(payload: @[byte 1], contentTopic: ContentTopic(1)), + WakuMessage(payload: @[byte 2], contentTopic: ContentTopic(1)), + WakuMessage(payload: @[byte 3], contentTopic: ContentTopic(1)), + WakuMessage(payload: @[byte 4], contentTopic: ContentTopic(1)), + WakuMessage(payload: @[byte 5], contentTopic: ContentTopic(1)), + WakuMessage(payload: @[byte 6], contentTopic: ContentTopic(1)), + WakuMessage(payload: @[byte 7], contentTopic: ContentTopic(1)), + WakuMessage(payload: @[byte 8], contentTopic: ContentTopic(1)), + WakuMessage(payload: @[byte 9], contentTopic: ContentTopic(2))] + + for wakuMsg in msgList: + waitFor subscriptions.notify(defaultTopic, wakuMsg) + + let client = newRpcHttpClient() + await client.connect("127.0.0.1", rpcPort) + + let response = await client.get_waku_v2_store_v1_messages(@[ContentTopic(1)], some(StorePagingOptions())) + check: + response.messages.len() == 8 + response.pagingOptions.isNone + + server.stop() + server.close() + waitfor node.stop() diff --git a/tests/v2/test_rpc_waku.nim b/tests/v2/test_rpc_waku.nim index 0de1df90b..d75194cac 100644 --- a/tests/v2/test_rpc_waku.nim +++ b/tests/v2/test_rpc_waku.nim @@ -45,3 +45,7 @@ suite "Waku v2 Remote Procedure Calls": await client.connect("127.0.0.1", rpcPort) check await(client.waku_version()) == WakuRelayCodec + + server.stop() + server.close() + waitfor node.stop() diff --git a/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim b/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim new file mode 100644 index 000000000..c6b5d97f2 --- /dev/null +++ b/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim @@ -0,0 +1 @@ +proc get_waku_v2_store_v1_messages(topics: seq[ContentTopic], pagingOptions: Option[StorePagingOptions]): StoreResponse diff --git a/waku/v2/node/jsonrpc/jsonrpc_types.nim b/waku/v2/node/jsonrpc/jsonrpc_types.nim new file mode 100644 index 000000000..f1363ebda --- /dev/null +++ b/waku/v2/node/jsonrpc/jsonrpc_types.nim @@ -0,0 +1,14 @@ +import + ../../waku_types, + std/options + +type + StoreResponse* = object + messages*: seq[WakuMessage] + pagingOptions*: Option[StorePagingOptions] + + StorePagingOptions* = object + ## This type holds some options for pagination + pageSize*: uint64 + cursor*: Option[Index] + forward*: bool diff --git a/waku/v2/node/jsonrpc/jsonrpc_utils.nim b/waku/v2/node/jsonrpc/jsonrpc_utils.nim new file mode 100644 index 000000000..e41288a93 --- /dev/null +++ b/waku/v2/node/jsonrpc/jsonrpc_utils.nim @@ -0,0 +1,23 @@ +import + std/options, + ../../waku_types, + ../wakunode2, + ./jsonrpc_types + +## Conversion tools +## Since the Waku v2 JSON-RPC API has its own defined types, +## we need to convert between these and the types for the Nim API + +proc toPagingInfo*(pagingOptions: StorePagingOptions): PagingInfo = + PagingInfo(pageSize: pagingOptions.pageSize, + cursor: if pagingOptions.cursor.isSome: pagingOptions.cursor.get else: Index(), + direction: if pagingOptions.forward: PagingDirection.FORWARD else: PagingDirection.BACKWARD) + +proc toPagingOptions*(pagingInfo: PagingInfo): StorePagingOptions = + StorePagingOptions(pageSize: pagingInfo.pageSize, + cursor: some(pagingInfo.cursor), + forward: if pagingInfo.direction == PagingDirection.FORWARD: true else: false) + +proc toStoreResponse*(historyResponse: HistoryResponse): StoreResponse = + StoreResponse(messages: historyResponse.messages, + pagingOptions: if historyResponse.pagingInfo != PagingInfo(): some(historyResponse.pagingInfo.toPagingOptions()) else: none(StorePagingOptions)) diff --git a/waku/v2/node/jsonrpc/store_api.nim b/waku/v2/node/jsonrpc/store_api.nim new file mode 100644 index 000000000..edf3bccd1 --- /dev/null +++ b/waku/v2/node/jsonrpc/store_api.nim @@ -0,0 +1,33 @@ +import + std/options, + json_rpc/rpcserver, + ../../waku_types, + ../wakunode2, + ./jsonrpc_types, ./jsonrpc_utils + +proc setupWakuJSONRPC*(node: WakuNode, rpcsrv: RpcServer) = + const futTimeout = 5.seconds + + ## Store API version 1 definitions + + rpcsrv.rpc("get_waku_v2_store_v1_messages") do(topics: seq[ContentTopic], pagingOptions: Option[StorePagingOptions]) -> StoreResponse: + ## Returns history for a list of content topics with optional paging + debug "get_waku_v2_store_v1_messages" + + var responseFut = newFuture[StoreResponse]() + + proc queryFuncHandler(response: HistoryResponse) {.gcsafe, closure.} = + debug "get_waku_v2_store_v1_messages response" + responseFut.complete(response.toStoreResponse()) + + let historyQuery = HistoryQuery(topics: topics, + pagingInfo: if pagingOptions.isSome: pagingOptions.get.toPagingInfo() else: PagingInfo()) + + await node.query(historyQuery, queryFuncHandler) + + if (await responseFut.withTimeout(futTimeout)): + # Future completed + return responseFut.read() + else: + # Future failed to complete + raise newException(ValueError, "No history response received")