feature/historic-api (#95)

* started working on historic api

* very rough code

* moved

* also publishing

* todos

* added tests

* added

* fix
This commit is contained in:
Dean Eigenmann 2020-07-29 15:24:01 +02:00 committed by GitHub
parent c018be8977
commit c3f57c79f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 103 additions and 59 deletions

View File

@ -1,3 +1,4 @@
import import
# Waku v2 tests # Waku v2 tests
./v2/test_waku ./v2/test_waku,
./v2/test_wakunode

View File

@ -1,5 +1,5 @@
import import
unittest, chronos, strutils, unittest, chronos,
eth/[keys, p2p] eth/[keys, p2p]
var nextPort = 30303 var nextPort = 30303
@ -30,5 +30,3 @@ template procSuite*(name, body: untyped) =
body body
suitePayload() suitePayload()
template sourceDir*: string = currentSourcePath.rsplit(DirSep, 1)[0]

View File

@ -1,11 +1,11 @@
import import
unittest, options, os, stew/byteutils, unittest, options, os, stew/byteutils, strutils,
json_rpc/[rpcserver, rpcclient], json_rpc/[rpcserver, rpcclient],
eth/common as eth_common, eth/[rlp, keys, p2p], eth/common as eth_common, eth/[rlp, keys, p2p],
../../waku/protocol/v1/waku_protocol, ../../waku/protocol/v1/waku_protocol,
../../waku/node/v1/rpc/[hexstrings, rpc_types, waku, key_storage], ../../waku/node/v1/rpc/[hexstrings, rpc_types, waku, key_storage]
./test_helpers
template sourceDir*: string = currentSourcePath.rsplit(DirSep, 1)[0]
## Generate client convenience marshalling wrappers from forward declarations ## Generate client convenience marshalling wrappers from forward declarations
## For testing, ethcallsigs needs to be kept in sync with ../waku/node/v1/rpc/waku ## For testing, ethcallsigs needs to be kept in sync with ../waku/node/v1/rpc/waku
const sigPath = sourceDir / ParDir / ParDir / "waku" / "node" / "v1" / "rpc" / "wakucallsigs.nim" const sigPath = sourceDir / ParDir / ParDir / "waku" / "node" / "v1" / "rpc" / "wakucallsigs.nim"

View File

@ -12,7 +12,7 @@ import
eth/p2p/rlpx_protocols/whisper_protocol as whisper, eth/p2p/rlpx_protocols/whisper_protocol as whisper,
../../waku/protocol/v1/waku_protocol as waku, ../../waku/protocol/v1/waku_protocol as waku,
../../waku/protocol/v1/waku_bridge, ../../waku/protocol/v1/waku_bridge,
./test_helpers ../test_helpers
let safeTTL = 5'u32 let safeTTL = 5'u32
let waitInterval = waku.messageInterval + 150.milliseconds let waitInterval = waku.messageInterval + 150.milliseconds

View File

@ -10,7 +10,7 @@
import import
sequtils, tables, unittest, chronos, eth/[keys, p2p], eth/p2p/peer_pool, sequtils, tables, unittest, chronos, eth/[keys, p2p], eth/p2p/peer_pool,
../../waku/protocol/v1/waku_protocol, ../../waku/protocol/v1/waku_protocol,
./test_helpers ../test_helpers
const const
safeTTL = 5'u32 safeTTL = 5'u32

View File

@ -2,7 +2,7 @@ import
unittest, chronos, tables, sequtils, times, unittest, chronos, tables, sequtils, times,
eth/[p2p, async_utils], eth/p2p/peer_pool, eth/[p2p, async_utils], eth/p2p/peer_pool,
../../waku/protocol/v1/[waku_protocol, waku_mail], ../../waku/protocol/v1/[waku_protocol, waku_mail],
./test_helpers ../test_helpers
const const
transmissionTimeout = chronos.milliseconds(100) transmissionTimeout = chronos.milliseconds(100)

View File

@ -18,6 +18,8 @@ import utils,
libp2p/protocols/pubsub/floodsub libp2p/protocols/pubsub/floodsub
import ../../waku/protocol/v2/waku_protocol2 import ../../waku/protocol/v2/waku_protocol2
import ../test_helpers
const const
StreamTransportTrackerName = "stream.transport" StreamTransportTrackerName = "stream.transport"
StreamServerTrackerName = "stream.server" StreamServerTrackerName = "stream.server"
@ -49,7 +51,7 @@ proc decodeMessage(data: seq[byte]): string =
result = "" result = ""
let res = pb.getField(1, result) let res = pb.getField(1, result)
suite "FloodSub": procSuite "FloodSub":
teardown: teardown:
let let
trackers = [ trackers = [
@ -64,41 +66,41 @@ suite "FloodSub":
if not isNil(tracker): if not isNil(tracker):
check tracker.isLeaked() == false check tracker.isLeaked() == false
test "FloodSub basic publish/subscribe A -> B": asyncTest "FloodSub basic publish/subscribe A -> B":
proc runTests(): Future[bool] {.async.} = var completionFut = newFuture[bool]()
var completionFut = newFuture[bool]() proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = debug "Hit handler", topic
debug "Hit handler", topic let msg = decodeMessage(data)
let msg = decodeMessage(data) check topic == "foobar"
check topic == "foobar" check msg == "hello"
check msg == "hello" completionFut.complete(true)
completionFut.complete(true)
let let
nodes = generateNodes(2) nodes = generateNodes(2)
nodesFut = await allFinished( nodesFut = await allFinished(
nodes[0].start(), nodes[0].start(),
nodes[1].start() nodes[1].start()
)
await subscribeNodes(nodes)
await nodes[1].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar")
# TODO: you might want to check the value here
let msg = message()
discard await nodes[0].publish("foobar", msg)
result = await completionFut.wait(5.seconds)
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop()
) )
for fut in nodesFut: await subscribeNodes(nodes)
let res = fut.read()
await allFuturesThrowing(res) await nodes[1].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar")
# TODO: you might want to check the value here
let msg = message()
discard await nodes[0].publish("foobar", msg)
let result = await completionFut.wait(5.seconds)
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop()
)
for fut in nodesFut:
let res = fut.read()
await allFuturesThrowing(res)
check: check:
waitFor(runTests()) == true result == true

View File

@ -0,0 +1,28 @@
import unittest
import confutils, chronicles, chronos, os
import stew/shims/net as stewNet
import libp2p/crypto/crypto
import libp2p/crypto/secp
import eth/keys
import json_rpc/[rpcclient, rpcserver]
import ../../waku/node/v2/[config, wakunode2]
import ../test_helpers
procSuite "WakuNode":
asyncTest "Message published with content filter is retrievable":
let conf = WakuNodeConf.load()
let node = await WakuNode.init(conf)
let topic = "foobar"
let message = cast[seq[byte]]("hello world")
node.publish(topic, ContentFilter(contentTopic: topic), message)
let response = node.query(HistoryQuery(topics: @[topic]))
check:
response.messages.len == 1
response.messages[0] == message

View File

@ -1,5 +1,5 @@
import import
confutils, config, strutils, chronos, json_rpc/rpcserver, metrics, confutils, config, strutils, chronos, json_rpc/rpcserver, metrics, sequtils,
chronicles/topics_registry, # TODO: What? Need this for setLoglevel, weird. chronicles/topics_registry, # TODO: What? Need this for setLoglevel, weird.
eth/[keys, p2p], eth/net/nat, eth/[keys, p2p], eth/net/nat,
eth/p2p/[discovery, enode], eth/p2p/[discovery, enode],
@ -22,12 +22,24 @@ type
PublicKey* = crypto.PublicKey PublicKey* = crypto.PublicKey
PrivateKey* = crypto.PrivateKey PrivateKey* = crypto.PrivateKey
Topic* = string
Message* = seq[byte]
ContentFilter* = object
contentTopic*: string
HistoryQuery* = object
topics*: seq[string]
HistoryResponse* = object
messages*: seq[Message]
# NOTE: based on Eth2Node in NBC eth2_network.nim # NOTE: based on Eth2Node in NBC eth2_network.nim
WakuNode* = ref object of RootObj WakuNode* = ref object of RootObj
switch*: Switch switch*: Switch
# XXX: Unclear if we need this # XXX: Unclear if we need this
peerInfo*: PeerInfo peerInfo*: PeerInfo
libp2pTransportLoops*: seq[Future[void]] libp2pTransportLoops*: seq[Future[void]]
messages: seq[(Topic, Message)]
const clientId = "Nimbus waku node" const clientId = "Nimbus waku node"
@ -230,11 +242,6 @@ method init*(T: type WakuNode, conf: WakuNodeConf): Future[T] {.async.} =
await node.start(conf) await node.start(conf)
return node return node
type Topic* = string
type Message* = seq[byte]
type ContentFilter* = object
contentTopic*: string
# TODO Update TopicHandler to take Message, not seq[byte] data # TODO Update TopicHandler to take Message, not seq[byte] data
#type TopicHandler* = proc(topic: Topic, message: Message) #type TopicHandler* = proc(topic: Topic, message: Message)
# Currently this is using the one in pubsub.nim, roughly: # Currently this is using the one in pubsub.nim, roughly:
@ -242,12 +249,6 @@ type ContentFilter* = object
type ContentFilterHandler* = proc(contentFilter: ContentFilter, message: Message) type ContentFilterHandler* = proc(contentFilter: ContentFilter, message: Message)
type HistoryQuery = object
xxx*: seq[byte]
type HistoryResponse = object
xxx*: seq[byte]
method subscribe*(w: WakuNode, topic: Topic, handler: TopicHandler) = method subscribe*(w: WakuNode, topic: Topic, handler: TopicHandler) =
## Subscribes to a PubSub topic. Triggers handler when receiving messages on ## Subscribes to a PubSub topic. Triggers handler when receiving messages on
## this topic. TopicHandler is a method that takes a topic and a `Message`. ## this topic. TopicHandler is a method that takes a topic and a `Message`.
@ -295,7 +296,6 @@ method publish*(w: WakuNode, topic: Topic, message: Message) =
discard wakuSub.publish(topic, message) discard wakuSub.publish(topic, message)
method publish*(w: WakuNode, topic: Topic, contentFilter: ContentFilter, message: Message) = method publish*(w: WakuNode, topic: Topic, contentFilter: ContentFilter, message: Message) =
echo "NYI"
## Publish a `Message` to a PubSub topic with a specific content filter. ## Publish a `Message` to a PubSub topic with a specific content filter.
## Currently this means a `contentTopic`. ## Currently this means a `contentTopic`.
## ##
@ -303,13 +303,28 @@ method publish*(w: WakuNode, topic: Topic, contentFilter: ContentFilter, message
## TODO Implement as wrapper around `waku_protocol` and `publish`, and ensure ## TODO Implement as wrapper around `waku_protocol` and `publish`, and ensure
## Message is passed, not `data` field. Also ensure content filter is in ## Message is passed, not `data` field. Also ensure content filter is in
## Message. ## Message.
##
w.messages.insert((contentFilter.contentTopic, message))
let wakuSub = w.switch.pubSub.get()
# XXX Consider awaiting here
# @TODO MAKE SURE WE PASS CONTENT FILTER
discard wakuSub.publish(topic, message)
method query*(w: WakuNode, query: HistoryQuery): HistoryResponse = method query*(w: WakuNode, query: HistoryQuery): HistoryResponse =
echo "NYI"
## Queries for historical messages. ## Queries for historical messages.
## ##
## Status: Not yet implemented. ## Status: Not yet implemented.
## TODO Implement as wrapper around `waku_protocol` and send `RPCMsg`. ## TODO Implement as wrapper around `waku_protocol` and send `RPCMsg`.
result.messages = newSeq[Message]()
for msg in w.messages:
if msg[0] notin query.topics:
continue
result.messages.insert(msg[1])
when isMainModule: when isMainModule:
let conf = WakuNodeConf.load() let conf = WakuNodeConf.load()