Lightpush protocol cont (#506)

* lightpush conf and mount with relay from node

* mount lightpush after relay

* lightpush relay integration wip

* lightpush node integrate and test node

* clean
This commit is contained in:
Oskar Thorén 2021-04-24 12:56:37 +08:00 committed by GitHub
parent a04643e9a2
commit 13ac035e5a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 128 additions and 7 deletions

View File

@ -15,7 +15,8 @@ import
procSuite "Waku Light Push":
asyncTest "handle light push request":
# NOTE See test_wakunode for light push request success
asyncTest "handle light push request fail":
const defaultTopic = "/waku/2/default-waku/proto"
let
@ -62,11 +63,11 @@ procSuite "Waku Light Push":
listenSwitch.mount(proto2)
# FIXME Don't think this will be hit yet
proc handler(response: PushResponse) {.gcsafe, closure.} =
debug "push response handler, expecting false"
check:
response.isSuccess == false
debug "Additional info", info=response.info
completionFut.complete(true)
await proto.request(rpc, handler)

View File

@ -15,6 +15,8 @@ import
../../waku/v2/protocol/[waku_relay, waku_message, message_notifier],
../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/protocol/waku_filter/waku_filter,
../../waku/v2/protocol/waku_lightpush/waku_lightpush,
../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/utils/peers,
../../waku/v2/node/wakunode2,
../test_helpers
@ -566,4 +568,67 @@ procSuite "WakuNode":
await allFutures([node1.stop(), node2.stop()])
asyncTest "Lightpush message return success":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"),
Port(60000))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
Port(60002))
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node3 = WakuNode.init(nodeKey3, ValidIpAddress.init("0.0.0.0"),
Port(60003))
pubSubTopic = "test"
contentTopic = ContentTopic("/waku/2/default-content/proto")
payload = "hello world".toBytes()
message = WakuMessage(payload: payload, contentTopic: contentTopic)
# Light node, only lightpush
await node1.start()
node1.mountLightPush()
# Intermediate node
await node2.start()
node2.mountRelay(@[pubSubTopic])
node2.mountLightPush()
# Receiving node
await node3.start()
node3.mountRelay(@[pubSubTopic])
discard await node1.peerManager.dialPeer(node2.peerInfo, WakuLightPushCodec)
await sleepAsync(5.seconds)
await node3.connectToNodes(@[node2.peerInfo])
var completionFutLightPush = newFuture[bool]()
var completionFutRelay = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data)
if msg.isOk():
let val = msg.value()
check:
topic == pubSubTopic
val.contentTopic == contentTopic
val.payload == payload
completionFutRelay.complete(true)
node3.subscribe(pubSubTopic, relayHandler)
await sleepAsync(2000.millis)
proc handler(response: PushResponse) {.gcsafe, closure.} =
debug "push response handler, expecting true"
check:
response.isSuccess == true
completionFutLightPush.complete(true)
# Publishing with lightpush
await node1.lightpush(pubSubTopic, message, handler)
await sleepAsync(2000.millis)
check:
(await completionFutRelay.withTimeout(5.seconds)) == true
(await completionFutLightPush.withTimeout(5.seconds)) == true
await node1.stop()
await node2.stop()
await node3.stop()

View File

@ -78,6 +78,11 @@ type
defaultValue: false
name: "swap" }: bool
lightpush* {.
desc: "Enable lightpush protocol: true|false",
defaultValue: false
name: "lightpush" }: bool
filternode* {.
desc: "Peer multiaddr to request content filtering of messages.",
defaultValue: ""

View File

@ -17,6 +17,7 @@ import
../protocol/waku_swap/waku_swap,
../protocol/waku_filter/waku_filter,
../protocol/waku_rln_relay/[rln,waku_rln_relay_utils],
../protocol/waku_lightpush/waku_lightpush,
../utils/peers,
./storage/message/message_store,
./storage/peer/peer_storage,
@ -60,6 +61,7 @@ type
wakuFilter*: WakuFilter
wakuSwap*: WakuSwap
wakuRlnRelay*: WakuRLNRelay
wakuLightPush*: WakuLightPush
peerInfo*: PeerInfo
libp2pTransportLoops*: seq[Future[void]]
# TODO Revist messages field indexing as well as if this should be Message or WakuMessage
@ -295,6 +297,19 @@ proc publish*(node: WakuNode, topic: Topic, message: WakuMessage, rlnRelayEnabl
discard await wakuRelay.publish(topic, data)
proc lightpush*(node: WakuNode, topic: Topic, message: WakuMessage, handler: PushResponseHandler) {.async, gcsafe.} =
## Pushes a `WakuMessage` to a node which relays it further on PubSub topic.
## Returns whether relaying was successful or not in `handler`.
## `WakuMessage` should contain a `contentTopic` field for light node
## functionality. This field may be also be omitted.
##
## Status: Implemented.
debug "Publishing with lightpush", topic=topic, contentTopic=message.contentTopic
let rpc = PushRequest(pubSubTopic: topic, message: message)
await node.wakuLightPush.request(rpc, handler)
proc query*(node: WakuNode, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} =
## Queries known nodes for historical messages. Triggers the handler whenever a response is received.
## QueryHandlerFunc is a method that takes a HistoryResponse.
@ -445,6 +460,19 @@ proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRela
info "relay mounted and started successfully"
proc mountLightPush*(node: WakuNode) =
info "mounting light push"
if node.wakuRelay.isNil:
debug "mounting lightpush without relay"
node.wakuLightPush = WakuLightPush.init(node.peerManager, node.rng, nil)
else:
debug "mounting lightpush with relay"
node.wakuLightPush = WakuLightPush.init(node.peerManager, node.rng, nil, node.wakuRelay)
node.switch.mount(node.wakuLightPush)
## Helpers
proc dialPeer*(n: WakuNode, address: string) {.async.} =
info "dialPeer", address = address
@ -640,6 +668,10 @@ when isMainModule:
if conf.staticnodes.len > 0:
waitFor connectToNodes(node, conf.staticnodes)
# NOTE Must be mounted after relay
if conf.lightpush:
mountLightPush(node)
if conf.rpc:
startRpc(node, conf.rpcAddress, Port(conf.rpcPort + conf.portsShift), conf)

View File

@ -12,7 +12,8 @@ import
../message_notifier,
waku_lightpush_types,
../../utils/requests,
../../node/peer_manager/peer_manager
../../node/peer_manager/peer_manager,
../waku_relay
export waku_lightpush_types
@ -98,11 +99,13 @@ proc init*(T: type PushRPC, buffer: seq[byte]): ProtoResult[T] =
ok(rpc)
# Protocol -------------------------------------------------------
proc init*(T: type WakuLightPush, peerManager: PeerManager, rng: ref BrHmacDrbgContext, handler: PushRequestHandler): T =
proc init*(T: type WakuLightPush, peerManager: PeerManager, rng: ref BrHmacDrbgContext, handler: PushRequestHandler, relay: WakuRelay = nil): T =
debug "init"
new result
result.rng = crypto.newRng()
result.peerManager = peerManager
result.requestHandler = handler
result.relayReference = relay
result.init()
proc setPeer*(wlp: WakuLightPush, peer: PeerInfo) =
@ -110,6 +113,7 @@ proc setPeer*(wlp: WakuLightPush, peer: PeerInfo) =
waku_lightpush_peers.inc()
method init*(wlp: WakuLightPush) =
debug "init"
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
var message = await conn.readLp(64*1024)
var res = PushRPC.init(message)
@ -123,8 +127,20 @@ method init*(wlp: WakuLightPush) =
let value = res.value
if value.request != PushRequest():
info "lightpush push request"
# TODO Relay messages here
var response = PushResponse(is_success: false, info: "NYI")
let
pubSubTopic = value.request.pubSubTopic
message = value.request.message
debug "PushRequest", pubSubTopic=pubSubTopic, msg=message
var response: PushResponse
if wlp.relayReference != nil:
let wakuRelay = wlp.relayReference
let data = message.encode().buffer
# XXX Assumes success, should probably be extended to check for network, peers, etc
discard wakuRelay.publish(pubSubTopic, data)
response = PushResponse(is_success: true, info: "Totally.")
else:
debug "No relay protocol present, unsuccesssful push"
response = PushResponse(is_success: false, info: "No relay protocol")
await conn.writeLp(PushRPC(requestId: value.requestId,
response: response).encode().buffer)
#wlp.requestHandler(value.requestId, value.request)

View File

@ -4,7 +4,8 @@ import
libp2p/peerinfo,
libp2p/protocols/protocol,
../../node/peer_manager/peer_manager,
../waku_message
../waku_message,
../waku_relay
export waku_message
@ -30,3 +31,4 @@ type
rng*: ref BrHmacDrbgContext
peerManager*: PeerManager
requestHandler*: PushRequestHandler
relayReference*: WakuRelay