feature/start-protocol-command (#209)

* started working

* fixes

* rename start -> mount

* started logging

* mounting relay

* Update wakunode2.nim

* start of fixe

* fix

* fix

* fixes

* Update waku/node/v2/wakunode2.nim

Co-authored-by: Oskar Thorén <ot@oskarthoren.com>

* removed comment

* fix

* update

* changed the default

Co-authored-by: Oskar Thorén <ot@oskarthoren.com>
This commit is contained in:
Dean Eigenmann 2020-10-20 04:36:27 +02:00 committed by GitHub
parent e6949da918
commit 8e87323eef
8 changed files with 108 additions and 55 deletions

View File

@ -24,7 +24,7 @@ RUN make -j$(nproc) $MAKE_TARGET NIM_PARAMS="$NIM_PARAMS"
FROM alpine:3.12
ARG MAKE_TARGET=wakunode
ARG MAKE_TARGET=wakunode2
LABEL maintainer="jakub@status.im"
LABEL source="https://github.com/status-im/nim-waku"
@ -45,6 +45,6 @@ COPY --from=nim-build /app/build/$MAKE_TARGET /usr/local/bin/
# Symlink the correct wakunode binary
RUN ln -sv /usr/local/bin/$MAKE_TARGET /usr/bin/wakunode
ENTRYPOINT ["/usr/bin/wakunode"]
ENTRYPOINT ["/usr/bin/wakunode", "--store:true", "--relay:true", "--filter:true"]
# By default just show help if called without arguments
CMD ["--help"]

View File

@ -25,6 +25,7 @@ proc runBackground() {.async.} =
Port(uint16(conf.tcpPort) + conf.portsShift), extIp, extTcpPort)
await node.start()
await node.mountRelay()
# Subscribe to a topic
let topic = cast[Topic]("foobar")

View File

@ -160,10 +160,11 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
Port(uint16(conf.tcpPort) + conf.portsShift),
Port(uint16(conf.udpPort) + conf.portsShift))
node = WakuNode.init(conf.nodeKey, conf.libp2pAddress,
Port(uint16(conf.tcpPort) + conf.portsShift), extIp, extTcpPort, conf.topics.split(" "))
Port(uint16(conf.tcpPort) + conf.portsShift), extIp, extTcpPort)
# waitFor vs await
await node.start()
await node.mountRelay(conf.topics.split(" "))
var chat = Chat(node: node, transp: transp, subscribed: true, connected: false, started: true)
@ -175,6 +176,8 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
echo &"Listening on\n {listenStr}"
if conf.storenode != "":
node.mountStore()
node.wakuStore.setPeer(parsePeer(conf.storenode))
proc storeHandler(response: HistoryResponse) {.gcsafe.} =

View File

@ -22,10 +22,12 @@ suite "Waku v2 Remote Procedure Calls":
bindIp = ValidIpAddress.init("0.0.0.0")
extIp = ValidIpAddress.init("127.0.0.1")
port = Port(9000)
node = WakuNode.init(privkey, bindIp, port, some(extIp), some(port), @["waku"])
node = WakuNode.init(privkey, bindIp, port, some(extIp), some(port))
waitFor node.start()
waitFor node.mountRelay(@["waku"])
# RPC server setup
let
rpcPort = Port(8545)

View File

@ -78,11 +78,15 @@ procSuite "FloodSub":
let
nodes = generateNodes(2)
nodesFut = await allFinished(
nodes[0].start(),
nodes[1].start()
)
for node in nodes:
await node.mountRelay()
await subscribeNodes(nodes)
await nodes[1].subscribe("foobar", handler)

View File

@ -44,12 +44,13 @@ procSuite "WakuNode":
await node.start()
await node.mountRelay()
# Subscribe our node to the pubSubTopic where all chat data go onto.
await node.subscribe(pubSubTopic, relayHandler)
# Subscribe a contentFilter to trigger a specific application handler when
# WakuMessages with that content are received
# node2.wakuFilter.setPeer(node1.peerInfo)
await node.subscribe(filterRequest, contentHandler)
await sleepAsync(2000.millis)
@ -95,6 +96,12 @@ procSuite "WakuNode":
await allFutures([node1.start(), node2.start()])
await node1.mountRelay()
await node2.mountRelay()
node1.mountFilter()
node2.mountFilter()
# Subscribe our node to the pubSubTopic where all chat data go onto.
await node1.subscribe(pubSubTopic, relayHandler)
# Subscribe a contentFilter to trigger a specific application handler when
@ -132,7 +139,9 @@ procSuite "WakuNode":
var completionFut = newFuture[bool]()
await node1.start()
node1.mountStore()
await node2.start()
node2.mountStore()
await node2.subscriptions.notify("waku", message)
@ -166,7 +175,9 @@ procSuite "WakuNode":
var completionFut = newFuture[bool]()
await node1.start()
node1.mountFilter()
await node2.start()
node2.mountFilter()
node1.wakuFilter.setPeer(node2.peerInfo)

View File

@ -73,6 +73,21 @@ type
defaultValue: ""
name: "storenode" }: string
store* {.
desc: "Flag whether to start store protocol",
defaultValue: false
name: "store" }: bool
filter* {.
desc: "Flag whether to start filter protocol",
defaultValue: false
name: "filter" }: bool
relay* {.
desc: "Flag whether to start relay protocol",
defaultValue: true
name: "relay" }: bool
filternode* {.
desc: "Enode URL to filter.",
defaultValue: ""

View File

@ -53,7 +53,7 @@ template tcpEndPoint(address, port): auto =
proc init*(T: type WakuNode, nodeKey: crypto.PrivateKey,
bindIp: ValidIpAddress, bindPort: Port,
extIp = none[ValidIpAddress](), extPort = none[Port](), topics = newSeq[string]()): T =
extIp = none[ValidIpAddress](), extPort = none[Port]()): T =
## Creates a Waku Node.
##
## Status: Implemented.
@ -78,35 +78,14 @@ proc init*(T: type WakuNode, nodeKey: crypto.PrivateKey,
# msgIdProvider = msgIdProvider,
# triggerSelf = true, sign = false,
# verifySignature = false).PubSub
let wakuRelay = WakuRelay.init(
switch = switch,
# Use default
#msgIdProvider = msgIdProvider,
triggerSelf = true,
sign = false,
verifySignature = false)
# This gets messy with: .PubSub
switch.mount(wakuRelay)
result = WakuNode(
switch: switch,
rng: rng,
peerInfo: peerInfo,
wakuRelay: wakuRelay,
subscriptions: newTable[string, MessageNotificationSubscription](),
filters: initTable[string, Filter]()
)
# TODO This is _not_ safe. Subscribe should happen in start and after things settled down.
# Otherwise GRAFT message isn't sent to a node.
for topic in topics:
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
debug "Hit handler", topic=topic, data=data
# XXX: Is using discard here fine? Not sure if we want init to be async?
# Can also move this to the start proc, possibly wiser?
discard result.subscribe(topic, handler)
proc start*(node: WakuNode) {.async.} =
## Starts a created Waku Node.
##
@ -114,29 +93,6 @@ proc start*(node: WakuNode) {.async.} =
##
node.libp2pTransportLoops = await node.switch.start()
# NOTE WakuRelay is being instantiated as part of initing node
node.wakuStore = WakuStore.init(node.switch, node.rng)
node.switch.mount(node.wakuStore)
node.subscriptions.subscribe(WakuStoreCodec, node.wakuStore.subscription())
proc filterHandler(requestId: string, msg: MessagePush) {.gcsafe.} =
info "push received"
for message in msg.messages:
node.filters.notify(message, requestId)
node.wakuFilter = WakuFilter.init(node.switch, node.rng, filterHandler)
node.switch.mount(node.wakuFilter)
node.subscriptions.subscribe(WakuFilterCodec, node.wakuFilter.subscription())
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data)
if msg.isOk():
node.filters.notify(msg.value(), "")
await node.subscriptions.notify(topic, msg.value())
await node.wakuRelay.subscribe("waku", relayHandler)
# TODO Get this from WakuNode obj
let peerInfo = node.peerInfo
info "PeerInfo", peerId = peerInfo.peerId, addrs = peerInfo.addrs
@ -145,8 +101,8 @@ proc start*(node: WakuNode) {.async.} =
info "Listening on", full = listenStr
proc stop*(node: WakuNode) {.async.} =
let wakuRelay = node.wakuRelay
await wakuRelay.stop()
if not node.wakuRelay.isNil:
await node.wakuRelay.stop()
await node.switch.stop()
@ -168,8 +124,10 @@ proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHa
## Status: Implemented.
info "subscribe content", filter=request
# @TODO: ERROR HANDLING
let id = await node.wakuFilter.subscribe(request)
var id = generateRequestId(node.rng)
if node.wakuFilter.isNil == false:
# @TODO: ERROR HANDLING
id = await node.wakuFilter.subscribe(request)
node.filters[id] = Filter(contentFilters: request.contentFilters, handler: handler)
proc unsubscribe*(w: WakuNode, topic: Topic) =
@ -223,6 +181,56 @@ proc info*(node: WakuNode): WakuInfo =
let wakuInfo = WakuInfo(listenStr: listenStr)
return wakuInfo
proc mountFilter*(node: WakuNode) =
info "mounting filter"
proc filterHandler(requestId: string, msg: MessagePush) {.gcsafe.} =
info "push received"
for message in msg.messages:
node.filters.notify(message, requestId)
node.wakuFilter = WakuFilter.init(node.switch, node.rng, filterHandler)
node.switch.mount(node.wakuFilter)
node.subscriptions.subscribe(WakuFilterCodec, node.wakuFilter.subscription())
proc mountStore*(node: WakuNode) =
info "mounting store"
node.wakuStore = WakuStore.init(node.switch, node.rng)
node.switch.mount(node.wakuStore)
node.subscriptions.subscribe(WakuStoreCodec, node.wakuStore.subscription())
proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string]()) {.async, gcsafe.} =
let wakuRelay = WakuRelay.init(
switch = node.switch,
# Use default
#msgIdProvider = msgIdProvider,
triggerSelf = true,
sign = false,
verifySignature = false
)
node.wakuRelay = wakuRelay
node.switch.mount(wakuRelay)
await sleepAsync(5.seconds)
info "mounting relay"
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data)
if msg.isOk():
node.filters.notify(msg.value(), "")
await node.subscriptions.notify(topic, msg.value())
await node.wakuRelay.subscribe("waku", relayHandler)
for topic in topics:
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
debug "Hit handler", topic=topic, data=data
# XXX: Is using discard here fine? Not sure if we want init to be async?
# Can also move this to the start proc, possibly wiser?
discard node.subscribe(topic, handler)
when isMainModule:
import
std/strutils,
@ -300,10 +308,19 @@ when isMainModule:
Port(uint16(conf.tcpPort) + conf.portsShift),
Port(uint16(conf.udpPort) + conf.portsShift))
node = WakuNode.init(conf.nodeKey, conf.libp2pAddress,
Port(uint16(conf.tcpPort) + conf.portsShift), extIp, extTcpPort, conf.topics.split(" "))
Port(uint16(conf.tcpPort) + conf.portsShift), extIp, extTcpPort)
waitFor node.start()
if conf.store:
mountStore(node)
if conf.filter:
mountFilter(node)
if conf.relay:
waitFor mountRelay(node, conf.topics.split(" "))
if conf.staticnodes.len > 0:
connectToNodes(node, conf.staticnodes)