From 13ac035e5a4981ced3fc55b012c5be5079644041 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Oskar=20Thor=C3=A9n?= Date: Sat, 24 Apr 2021 12:56:37 +0800 Subject: [PATCH] Lightpush protocol cont (#506) * lightpush conf and mount with relay from node * mount lightpush after relay * lightpush relay integration wip * lightpush node integrate and test node * clean --- tests/v2/test_waku_lightpush.nim | 5 +- tests/v2/test_wakunode.nim | 65 +++++++++++++++++++ waku/v2/node/config.nim | 5 ++ waku/v2/node/wakunode2.nim | 32 +++++++++ .../waku_lightpush/waku_lightpush.nim | 24 +++++-- .../waku_lightpush/waku_lightpush_types.nim | 4 +- 6 files changed, 128 insertions(+), 7 deletions(-) diff --git a/tests/v2/test_waku_lightpush.nim b/tests/v2/test_waku_lightpush.nim index edbfc0427..2316b8bcf 100644 --- a/tests/v2/test_waku_lightpush.nim +++ b/tests/v2/test_waku_lightpush.nim @@ -15,7 +15,8 @@ import procSuite "Waku Light Push": - asyncTest "handle light push request": + # NOTE See test_wakunode for light push request success + asyncTest "handle light push request fail": const defaultTopic = "/waku/2/default-waku/proto" let @@ -62,11 +63,11 @@ procSuite "Waku Light Push": listenSwitch.mount(proto2) - # FIXME Don't think this will be hit yet proc handler(response: PushResponse) {.gcsafe, closure.} = debug "push response handler, expecting false" check: response.isSuccess == false + debug "Additional info", info=response.info completionFut.complete(true) await proto.request(rpc, handler) diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index 42ee4ec1f..290f5cb9b 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -15,6 +15,8 @@ import ../../waku/v2/protocol/[waku_relay, waku_message, message_notifier], ../../waku/v2/protocol/waku_store/waku_store, ../../waku/v2/protocol/waku_filter/waku_filter, + ../../waku/v2/protocol/waku_lightpush/waku_lightpush, + ../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/utils/peers, ../../waku/v2/node/wakunode2, ../test_helpers @@ -566,4 +568,67 @@ procSuite "WakuNode": await allFutures([node1.stop(), node2.stop()]) + asyncTest "Lightpush message return success": + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"), + Port(60000)) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), + Port(60002)) + nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node3 = WakuNode.init(nodeKey3, ValidIpAddress.init("0.0.0.0"), + Port(60003)) + pubSubTopic = "test" + contentTopic = ContentTopic("/waku/2/default-content/proto") + payload = "hello world".toBytes() + message = WakuMessage(payload: payload, contentTopic: contentTopic) + # Light node, only lightpush + await node1.start() + node1.mountLightPush() + + # Intermediate node + await node2.start() + node2.mountRelay(@[pubSubTopic]) + node2.mountLightPush() + + # Receiving node + await node3.start() + node3.mountRelay(@[pubSubTopic]) + + discard await node1.peerManager.dialPeer(node2.peerInfo, WakuLightPushCodec) + await sleepAsync(5.seconds) + await node3.connectToNodes(@[node2.peerInfo]) + + var completionFutLightPush = newFuture[bool]() + var completionFutRelay = newFuture[bool]() + proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = + let msg = WakuMessage.init(data) + if msg.isOk(): + let val = msg.value() + check: + topic == pubSubTopic + val.contentTopic == contentTopic + val.payload == payload + completionFutRelay.complete(true) + + node3.subscribe(pubSubTopic, relayHandler) + await sleepAsync(2000.millis) + + proc handler(response: PushResponse) {.gcsafe, closure.} = + debug "push response handler, expecting true" + check: + response.isSuccess == true + completionFutLightPush.complete(true) + + # Publishing with lightpush + await node1.lightpush(pubSubTopic, message, handler) + await sleepAsync(2000.millis) + + check: + (await completionFutRelay.withTimeout(5.seconds)) == true + (await completionFutLightPush.withTimeout(5.seconds)) == true + await node1.stop() + await node2.stop() + await node3.stop() diff --git a/waku/v2/node/config.nim b/waku/v2/node/config.nim index 824008fa6..29eff6d43 100644 --- a/waku/v2/node/config.nim +++ b/waku/v2/node/config.nim @@ -78,6 +78,11 @@ type defaultValue: false name: "swap" }: bool + lightpush* {. + desc: "Enable lightpush protocol: true|false", + defaultValue: false + name: "lightpush" }: bool + filternode* {. desc: "Peer multiaddr to request content filtering of messages.", defaultValue: "" diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 7b0cd3bfc..a9c9ce2bd 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -17,6 +17,7 @@ import ../protocol/waku_swap/waku_swap, ../protocol/waku_filter/waku_filter, ../protocol/waku_rln_relay/[rln,waku_rln_relay_utils], + ../protocol/waku_lightpush/waku_lightpush, ../utils/peers, ./storage/message/message_store, ./storage/peer/peer_storage, @@ -60,6 +61,7 @@ type wakuFilter*: WakuFilter wakuSwap*: WakuSwap wakuRlnRelay*: WakuRLNRelay + wakuLightPush*: WakuLightPush peerInfo*: PeerInfo libp2pTransportLoops*: seq[Future[void]] # TODO Revist messages field indexing as well as if this should be Message or WakuMessage @@ -295,6 +297,19 @@ proc publish*(node: WakuNode, topic: Topic, message: WakuMessage, rlnRelayEnabl discard await wakuRelay.publish(topic, data) +proc lightpush*(node: WakuNode, topic: Topic, message: WakuMessage, handler: PushResponseHandler) {.async, gcsafe.} = + ## Pushes a `WakuMessage` to a node which relays it further on PubSub topic. + ## Returns whether relaying was successful or not in `handler`. + ## `WakuMessage` should contain a `contentTopic` field for light node + ## functionality. This field may be also be omitted. + ## + ## Status: Implemented. + + debug "Publishing with lightpush", topic=topic, contentTopic=message.contentTopic + + let rpc = PushRequest(pubSubTopic: topic, message: message) + await node.wakuLightPush.request(rpc, handler) + proc query*(node: WakuNode, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} = ## Queries known nodes for historical messages. Triggers the handler whenever a response is received. ## QueryHandlerFunc is a method that takes a HistoryResponse. @@ -445,6 +460,19 @@ proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRela info "relay mounted and started successfully" +proc mountLightPush*(node: WakuNode) = + info "mounting light push" + + if node.wakuRelay.isNil: + debug "mounting lightpush without relay" + node.wakuLightPush = WakuLightPush.init(node.peerManager, node.rng, nil) + else: + debug "mounting lightpush with relay" + node.wakuLightPush = WakuLightPush.init(node.peerManager, node.rng, nil, node.wakuRelay) + + node.switch.mount(node.wakuLightPush) + + ## Helpers proc dialPeer*(n: WakuNode, address: string) {.async.} = info "dialPeer", address = address @@ -640,6 +668,10 @@ when isMainModule: if conf.staticnodes.len > 0: waitFor connectToNodes(node, conf.staticnodes) + # NOTE Must be mounted after relay + if conf.lightpush: + mountLightPush(node) + if conf.rpc: startRpc(node, conf.rpcAddress, Port(conf.rpcPort + conf.portsShift), conf) diff --git a/waku/v2/protocol/waku_lightpush/waku_lightpush.nim b/waku/v2/protocol/waku_lightpush/waku_lightpush.nim index 70723a262..b7e7cd01f 100644 --- a/waku/v2/protocol/waku_lightpush/waku_lightpush.nim +++ b/waku/v2/protocol/waku_lightpush/waku_lightpush.nim @@ -12,7 +12,8 @@ import ../message_notifier, waku_lightpush_types, ../../utils/requests, - ../../node/peer_manager/peer_manager + ../../node/peer_manager/peer_manager, + ../waku_relay export waku_lightpush_types @@ -98,11 +99,13 @@ proc init*(T: type PushRPC, buffer: seq[byte]): ProtoResult[T] = ok(rpc) # Protocol ------------------------------------------------------- -proc init*(T: type WakuLightPush, peerManager: PeerManager, rng: ref BrHmacDrbgContext, handler: PushRequestHandler): T = +proc init*(T: type WakuLightPush, peerManager: PeerManager, rng: ref BrHmacDrbgContext, handler: PushRequestHandler, relay: WakuRelay = nil): T = + debug "init" new result result.rng = crypto.newRng() result.peerManager = peerManager result.requestHandler = handler + result.relayReference = relay result.init() proc setPeer*(wlp: WakuLightPush, peer: PeerInfo) = @@ -110,6 +113,7 @@ proc setPeer*(wlp: WakuLightPush, peer: PeerInfo) = waku_lightpush_peers.inc() method init*(wlp: WakuLightPush) = + debug "init" proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = var message = await conn.readLp(64*1024) var res = PushRPC.init(message) @@ -123,8 +127,20 @@ method init*(wlp: WakuLightPush) = let value = res.value if value.request != PushRequest(): info "lightpush push request" - # TODO Relay messages here - var response = PushResponse(is_success: false, info: "NYI") + let + pubSubTopic = value.request.pubSubTopic + message = value.request.message + debug "PushRequest", pubSubTopic=pubSubTopic, msg=message + var response: PushResponse + if wlp.relayReference != nil: + let wakuRelay = wlp.relayReference + let data = message.encode().buffer + # XXX Assumes success, should probably be extended to check for network, peers, etc + discard wakuRelay.publish(pubSubTopic, data) + response = PushResponse(is_success: true, info: "Totally.") + else: + debug "No relay protocol present, unsuccesssful push" + response = PushResponse(is_success: false, info: "No relay protocol") await conn.writeLp(PushRPC(requestId: value.requestId, response: response).encode().buffer) #wlp.requestHandler(value.requestId, value.request) diff --git a/waku/v2/protocol/waku_lightpush/waku_lightpush_types.nim b/waku/v2/protocol/waku_lightpush/waku_lightpush_types.nim index 7459eae78..bb3602e8e 100644 --- a/waku/v2/protocol/waku_lightpush/waku_lightpush_types.nim +++ b/waku/v2/protocol/waku_lightpush/waku_lightpush_types.nim @@ -4,7 +4,8 @@ import libp2p/peerinfo, libp2p/protocols/protocol, ../../node/peer_manager/peer_manager, - ../waku_message + ../waku_message, + ../waku_relay export waku_message @@ -30,3 +31,4 @@ type rng*: ref BrHmacDrbgContext peerManager*: PeerManager requestHandler*: PushRequestHandler + relayReference*: WakuRelay