deploy: a505a2e78928fdac89edc21c7aa0454a998b7524

This commit is contained in:
oskarth 2021-04-24 05:18:53 +00:00
parent e1d7b739bb
commit b4fd2ca3a0
6 changed files with 128 additions and 7 deletions

View File

@ -15,7 +15,8 @@ import
procSuite "Waku Light Push": procSuite "Waku Light Push":
asyncTest "handle light push request": # NOTE See test_wakunode for light push request success
asyncTest "handle light push request fail":
const defaultTopic = "/waku/2/default-waku/proto" const defaultTopic = "/waku/2/default-waku/proto"
let let
@ -62,11 +63,11 @@ procSuite "Waku Light Push":
listenSwitch.mount(proto2) listenSwitch.mount(proto2)
# FIXME Don't think this will be hit yet
proc handler(response: PushResponse) {.gcsafe, closure.} = proc handler(response: PushResponse) {.gcsafe, closure.} =
debug "push response handler, expecting false" debug "push response handler, expecting false"
check: check:
response.isSuccess == false response.isSuccess == false
debug "Additional info", info=response.info
completionFut.complete(true) completionFut.complete(true)
await proto.request(rpc, handler) await proto.request(rpc, handler)

View File

@ -15,6 +15,8 @@ import
../../waku/v2/protocol/[waku_relay, waku_message, message_notifier], ../../waku/v2/protocol/[waku_relay, waku_message, message_notifier],
../../waku/v2/protocol/waku_store/waku_store, ../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/protocol/waku_filter/waku_filter, ../../waku/v2/protocol/waku_filter/waku_filter,
../../waku/v2/protocol/waku_lightpush/waku_lightpush,
../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/utils/peers, ../../waku/v2/utils/peers,
../../waku/v2/node/wakunode2, ../../waku/v2/node/wakunode2,
../test_helpers ../test_helpers
@ -566,4 +568,67 @@ procSuite "WakuNode":
await allFutures([node1.stop(), node2.stop()]) await allFutures([node1.stop(), node2.stop()])
asyncTest "Lightpush message return success":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"),
Port(60000))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
Port(60002))
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node3 = WakuNode.init(nodeKey3, ValidIpAddress.init("0.0.0.0"),
Port(60003))
pubSubTopic = "test"
contentTopic = ContentTopic("/waku/2/default-content/proto")
payload = "hello world".toBytes()
message = WakuMessage(payload: payload, contentTopic: contentTopic)
# Light node, only lightpush
await node1.start()
node1.mountLightPush()
# Intermediate node
await node2.start()
node2.mountRelay(@[pubSubTopic])
node2.mountLightPush()
# Receiving node
await node3.start()
node3.mountRelay(@[pubSubTopic])
discard await node1.peerManager.dialPeer(node2.peerInfo, WakuLightPushCodec)
await sleepAsync(5.seconds)
await node3.connectToNodes(@[node2.peerInfo])
var completionFutLightPush = newFuture[bool]()
var completionFutRelay = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data)
if msg.isOk():
let val = msg.value()
check:
topic == pubSubTopic
val.contentTopic == contentTopic
val.payload == payload
completionFutRelay.complete(true)
node3.subscribe(pubSubTopic, relayHandler)
await sleepAsync(2000.millis)
proc handler(response: PushResponse) {.gcsafe, closure.} =
debug "push response handler, expecting true"
check:
response.isSuccess == true
completionFutLightPush.complete(true)
# Publishing with lightpush
await node1.lightpush(pubSubTopic, message, handler)
await sleepAsync(2000.millis)
check:
(await completionFutRelay.withTimeout(5.seconds)) == true
(await completionFutLightPush.withTimeout(5.seconds)) == true
await node1.stop()
await node2.stop()
await node3.stop()

View File

@ -78,6 +78,11 @@ type
defaultValue: false defaultValue: false
name: "swap" }: bool name: "swap" }: bool
lightpush* {.
desc: "Enable lightpush protocol: true|false",
defaultValue: false
name: "lightpush" }: bool
filternode* {. filternode* {.
desc: "Peer multiaddr to request content filtering of messages.", desc: "Peer multiaddr to request content filtering of messages.",
defaultValue: "" defaultValue: ""

View File

@ -17,6 +17,7 @@ import
../protocol/waku_swap/waku_swap, ../protocol/waku_swap/waku_swap,
../protocol/waku_filter/waku_filter, ../protocol/waku_filter/waku_filter,
../protocol/waku_rln_relay/[rln,waku_rln_relay_utils], ../protocol/waku_rln_relay/[rln,waku_rln_relay_utils],
../protocol/waku_lightpush/waku_lightpush,
../utils/peers, ../utils/peers,
./storage/message/message_store, ./storage/message/message_store,
./storage/peer/peer_storage, ./storage/peer/peer_storage,
@ -60,6 +61,7 @@ type
wakuFilter*: WakuFilter wakuFilter*: WakuFilter
wakuSwap*: WakuSwap wakuSwap*: WakuSwap
wakuRlnRelay*: WakuRLNRelay wakuRlnRelay*: WakuRLNRelay
wakuLightPush*: WakuLightPush
peerInfo*: PeerInfo peerInfo*: PeerInfo
libp2pTransportLoops*: seq[Future[void]] libp2pTransportLoops*: seq[Future[void]]
# TODO Revist messages field indexing as well as if this should be Message or WakuMessage # TODO Revist messages field indexing as well as if this should be Message or WakuMessage
@ -295,6 +297,19 @@ proc publish*(node: WakuNode, topic: Topic, message: WakuMessage, rlnRelayEnabl
discard await wakuRelay.publish(topic, data) discard await wakuRelay.publish(topic, data)
proc lightpush*(node: WakuNode, topic: Topic, message: WakuMessage, handler: PushResponseHandler) {.async, gcsafe.} =
## Pushes a `WakuMessage` to a node which relays it further on PubSub topic.
## Returns whether relaying was successful or not in `handler`.
## `WakuMessage` should contain a `contentTopic` field for light node
## functionality. This field may be also be omitted.
##
## Status: Implemented.
debug "Publishing with lightpush", topic=topic, contentTopic=message.contentTopic
let rpc = PushRequest(pubSubTopic: topic, message: message)
await node.wakuLightPush.request(rpc, handler)
proc query*(node: WakuNode, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} = proc query*(node: WakuNode, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} =
## Queries known nodes for historical messages. Triggers the handler whenever a response is received. ## Queries known nodes for historical messages. Triggers the handler whenever a response is received.
## QueryHandlerFunc is a method that takes a HistoryResponse. ## QueryHandlerFunc is a method that takes a HistoryResponse.
@ -445,6 +460,19 @@ proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRela
info "relay mounted and started successfully" info "relay mounted and started successfully"
proc mountLightPush*(node: WakuNode) =
info "mounting light push"
if node.wakuRelay.isNil:
debug "mounting lightpush without relay"
node.wakuLightPush = WakuLightPush.init(node.peerManager, node.rng, nil)
else:
debug "mounting lightpush with relay"
node.wakuLightPush = WakuLightPush.init(node.peerManager, node.rng, nil, node.wakuRelay)
node.switch.mount(node.wakuLightPush)
## Helpers ## Helpers
proc dialPeer*(n: WakuNode, address: string) {.async.} = proc dialPeer*(n: WakuNode, address: string) {.async.} =
info "dialPeer", address = address info "dialPeer", address = address
@ -640,6 +668,10 @@ when isMainModule:
if conf.staticnodes.len > 0: if conf.staticnodes.len > 0:
waitFor connectToNodes(node, conf.staticnodes) waitFor connectToNodes(node, conf.staticnodes)
# NOTE Must be mounted after relay
if conf.lightpush:
mountLightPush(node)
if conf.rpc: if conf.rpc:
startRpc(node, conf.rpcAddress, Port(conf.rpcPort + conf.portsShift), conf) startRpc(node, conf.rpcAddress, Port(conf.rpcPort + conf.portsShift), conf)

View File

@ -12,7 +12,8 @@ import
../message_notifier, ../message_notifier,
waku_lightpush_types, waku_lightpush_types,
../../utils/requests, ../../utils/requests,
../../node/peer_manager/peer_manager ../../node/peer_manager/peer_manager,
../waku_relay
export waku_lightpush_types export waku_lightpush_types
@ -98,11 +99,13 @@ proc init*(T: type PushRPC, buffer: seq[byte]): ProtoResult[T] =
ok(rpc) ok(rpc)
# Protocol ------------------------------------------------------- # Protocol -------------------------------------------------------
proc init*(T: type WakuLightPush, peerManager: PeerManager, rng: ref BrHmacDrbgContext, handler: PushRequestHandler): T = proc init*(T: type WakuLightPush, peerManager: PeerManager, rng: ref BrHmacDrbgContext, handler: PushRequestHandler, relay: WakuRelay = nil): T =
debug "init"
new result new result
result.rng = crypto.newRng() result.rng = crypto.newRng()
result.peerManager = peerManager result.peerManager = peerManager
result.requestHandler = handler result.requestHandler = handler
result.relayReference = relay
result.init() result.init()
proc setPeer*(wlp: WakuLightPush, peer: PeerInfo) = proc setPeer*(wlp: WakuLightPush, peer: PeerInfo) =
@ -110,6 +113,7 @@ proc setPeer*(wlp: WakuLightPush, peer: PeerInfo) =
waku_lightpush_peers.inc() waku_lightpush_peers.inc()
method init*(wlp: WakuLightPush) = method init*(wlp: WakuLightPush) =
debug "init"
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
var message = await conn.readLp(64*1024) var message = await conn.readLp(64*1024)
var res = PushRPC.init(message) var res = PushRPC.init(message)
@ -123,8 +127,20 @@ method init*(wlp: WakuLightPush) =
let value = res.value let value = res.value
if value.request != PushRequest(): if value.request != PushRequest():
info "lightpush push request" info "lightpush push request"
# TODO Relay messages here let
var response = PushResponse(is_success: false, info: "NYI") pubSubTopic = value.request.pubSubTopic
message = value.request.message
debug "PushRequest", pubSubTopic=pubSubTopic, msg=message
var response: PushResponse
if wlp.relayReference != nil:
let wakuRelay = wlp.relayReference
let data = message.encode().buffer
# XXX Assumes success, should probably be extended to check for network, peers, etc
discard wakuRelay.publish(pubSubTopic, data)
response = PushResponse(is_success: true, info: "Totally.")
else:
debug "No relay protocol present, unsuccesssful push"
response = PushResponse(is_success: false, info: "No relay protocol")
await conn.writeLp(PushRPC(requestId: value.requestId, await conn.writeLp(PushRPC(requestId: value.requestId,
response: response).encode().buffer) response: response).encode().buffer)
#wlp.requestHandler(value.requestId, value.request) #wlp.requestHandler(value.requestId, value.request)

View File

@ -4,7 +4,8 @@ import
libp2p/peerinfo, libp2p/peerinfo,
libp2p/protocols/protocol, libp2p/protocols/protocol,
../../node/peer_manager/peer_manager, ../../node/peer_manager/peer_manager,
../waku_message ../waku_message,
../waku_relay
export waku_message export waku_message
@ -30,3 +31,4 @@ type
rng*: ref BrHmacDrbgContext rng*: ref BrHmacDrbgContext
peerManager*: PeerManager peerManager*: PeerManager
requestHandler*: PushRequestHandler requestHandler*: PushRequestHandler
relayReference*: WakuRelay