Waku v2 JSON-RPC REST API: Store protocol proof of concept (#263)

* Waku V2 history query POC

* Fix folder structure

* Improve test clarity

* Improve imports, returns and some naming

* Changed naming conventions. Refactor & improve.

Co-authored-by: Oskar Thorén <ot@oskarthoren.com>
This commit is contained in:
Hanno Cornelius 2020-11-24 05:44:37 +02:00 committed by GitHub
parent e875027be4
commit 135eaae9fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 171 additions and 1 deletions

View File

@ -9,4 +9,5 @@ import
./v2/test_waku_payload,
./v2/test_rpc_waku,
./v2/test_waku_swap,
./v2/test_message_store
./v2/test_message_store,
./v2/test_jsonrpc_waku

View File

@ -0,0 +1,94 @@
import
std/[unittest, options, sets, tables, os, strutils],
stew/shims/net as stewNet,
json_rpc/[rpcserver, rpcclient],
libp2p/standard_setup,
libp2p/switch,
libp2p/protobuf/minprotobuf,
libp2p/stream/[bufferstream, connection],
libp2p/crypto/crypto,
libp2p/protocols/pubsub/rpc/message,
../../waku/v2/waku_types,
../../waku/v2/node/wakunode2,
../../waku/v2/node/jsonrpc/[jsonrpc_types,store_api],
../../waku/v2/protocol/[waku_store, message_notifier],
../test_helpers
template sourceDir*: string = currentSourcePath.rsplit(DirSep, 1)[0]
const sigPath = sourceDir / ParDir / ParDir / "waku" / "v2" / "node" / "jsonrpc" / "jsonrpc_callsigs.nim"
createRpcSigs(RpcHttpClient, sigPath)
suite "Waku v2 JSON-RPC API":
asyncTest "get_waku_v2_store_v1_messages":
const defaultTopic = "/waku/2/default-waku/proto"
const testCodec = "/waku/2/default-waku/codec"
# WakuNode setup
let
rng = crypto.newRng()
privkey = crypto.PrivateKey.random(Secp256k1, rng[]).tryGet()
bindIp = ValidIpAddress.init("0.0.0.0")
extIp = ValidIpAddress.init("127.0.0.1")
port = Port(9000)
node = WakuNode.init(privkey, bindIp, port, some(extIp), some(port))
waitFor node.start()
waitFor node.mountRelay(@[defaultTopic])
# RPC server setup
let
rpcPort = Port(8545)
ta = initTAddress(bindIp, rpcPort)
server = newRpcHttpServer([ta])
setupWakuJSONRPC(node, server)
server.start()
# WakuStore setup
let
key = wakunode2.PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.init(key)
node.mountStore()
let
subscription = node.wakuStore.subscription()
var listenSwitch = newStandardSwitch(some(key))
discard waitFor listenSwitch.start()
node.wakuStore.setPeer(listenSwitch.peerInfo)
listenSwitch.mount(node.wakuStore)
var subscriptions = newTable[string, MessageNotificationSubscription]()
subscriptions[testCodec] = subscription
# Now prime it with some history before tests
var
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic(2)),
WakuMessage(payload: @[byte 1], contentTopic: ContentTopic(1)),
WakuMessage(payload: @[byte 2], contentTopic: ContentTopic(1)),
WakuMessage(payload: @[byte 3], contentTopic: ContentTopic(1)),
WakuMessage(payload: @[byte 4], contentTopic: ContentTopic(1)),
WakuMessage(payload: @[byte 5], contentTopic: ContentTopic(1)),
WakuMessage(payload: @[byte 6], contentTopic: ContentTopic(1)),
WakuMessage(payload: @[byte 7], contentTopic: ContentTopic(1)),
WakuMessage(payload: @[byte 8], contentTopic: ContentTopic(1)),
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic(2))]
for wakuMsg in msgList:
waitFor subscriptions.notify(defaultTopic, wakuMsg)
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort)
let response = await client.get_waku_v2_store_v1_messages(@[ContentTopic(1)], some(StorePagingOptions()))
check:
response.messages.len() == 8
response.pagingOptions.isNone
server.stop()
server.close()
waitfor node.stop()

View File

@ -45,3 +45,7 @@ suite "Waku v2 Remote Procedure Calls":
await client.connect("127.0.0.1", rpcPort)
check await(client.waku_version()) == WakuRelayCodec
server.stop()
server.close()
waitfor node.stop()

View File

@ -0,0 +1 @@
proc get_waku_v2_store_v1_messages(topics: seq[ContentTopic], pagingOptions: Option[StorePagingOptions]): StoreResponse

View File

@ -0,0 +1,14 @@
import
../../waku_types,
std/options
type
StoreResponse* = object
messages*: seq[WakuMessage]
pagingOptions*: Option[StorePagingOptions]
StorePagingOptions* = object
## This type holds some options for pagination
pageSize*: uint64
cursor*: Option[Index]
forward*: bool

View File

@ -0,0 +1,23 @@
import
std/options,
../../waku_types,
../wakunode2,
./jsonrpc_types
## Conversion tools
## Since the Waku v2 JSON-RPC API has its own defined types,
## we need to convert between these and the types for the Nim API
proc toPagingInfo*(pagingOptions: StorePagingOptions): PagingInfo =
PagingInfo(pageSize: pagingOptions.pageSize,
cursor: if pagingOptions.cursor.isSome: pagingOptions.cursor.get else: Index(),
direction: if pagingOptions.forward: PagingDirection.FORWARD else: PagingDirection.BACKWARD)
proc toPagingOptions*(pagingInfo: PagingInfo): StorePagingOptions =
StorePagingOptions(pageSize: pagingInfo.pageSize,
cursor: some(pagingInfo.cursor),
forward: if pagingInfo.direction == PagingDirection.FORWARD: true else: false)
proc toStoreResponse*(historyResponse: HistoryResponse): StoreResponse =
StoreResponse(messages: historyResponse.messages,
pagingOptions: if historyResponse.pagingInfo != PagingInfo(): some(historyResponse.pagingInfo.toPagingOptions()) else: none(StorePagingOptions))

View File

@ -0,0 +1,33 @@
import
std/options,
json_rpc/rpcserver,
../../waku_types,
../wakunode2,
./jsonrpc_types, ./jsonrpc_utils
proc setupWakuJSONRPC*(node: WakuNode, rpcsrv: RpcServer) =
const futTimeout = 5.seconds
## Store API version 1 definitions
rpcsrv.rpc("get_waku_v2_store_v1_messages") do(topics: seq[ContentTopic], pagingOptions: Option[StorePagingOptions]) -> StoreResponse:
## Returns history for a list of content topics with optional paging
debug "get_waku_v2_store_v1_messages"
var responseFut = newFuture[StoreResponse]()
proc queryFuncHandler(response: HistoryResponse) {.gcsafe, closure.} =
debug "get_waku_v2_store_v1_messages response"
responseFut.complete(response.toStoreResponse())
let historyQuery = HistoryQuery(topics: topics,
pagingInfo: if pagingOptions.isSome: pagingOptions.get.toPagingInfo() else: PagingInfo())
await node.query(historyQuery, queryFuncHandler)
if (await responseFut.withTimeout(futTimeout)):
# Future completed
return responseFut.read()
else:
# Future failed to complete
raise newException(ValueError, "No history response received")