diff --git a/tests/v2/test_waku_lightpush.nim b/tests/v2/test_waku_lightpush.nim index fb35832cd..e1e1a91ec 100644 --- a/tests/v2/test_waku_lightpush.nim +++ b/tests/v2/test_waku_lightpush.nim @@ -1,76 +1,85 @@ {.used.} import - std/[options, tables, sets], - testutils/unittests, chronos, chronicles, + std/options, + testutils/unittests, + chronicles, + chronos, libp2p/switch, - libp2p/protobuf/minprotobuf, - libp2p/stream/[bufferstream, connection], - libp2p/crypto/crypto, - libp2p/multistream, + libp2p/crypto/crypto +import ../../waku/v2/node/peer_manager/peer_manager, - ../../waku/v2/protocol/waku_lightpush/waku_lightpush, - ../test_helpers, ./utils + ../../waku/v2/protocol/waku_message, + ../../waku/v2/protocol/waku_lightpush, + ../test_helpers -procSuite "Waku Light Push": - # NOTE See test_wakunode for light push request success +const + DEFAULT_PUBSUB_TOPIC = "/waku/2/default-waku/proto" + DEFAULT_CONTENT_TOPIC = ContentTopic("/waku/2/default-content/proto") + + +# TODO: Extend lightpush protocol test coverage +procSuite "Waku Lightpush": + + asyncTest "handle light push request success": + # TODO: Move here the test case at test_wakunode: light push request success + discard + asyncTest "handle light push request fail": - const defaultTopic = "/waku/2/default-waku/proto" - let key = PrivateKey.random(ECDSA, rng[]).get() - peer = PeerInfo.new(key) - contentTopic = ContentTopic("/waku/2/default-content/proto") - post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic) - - var dialSwitch = newStandardSwitch() - await dialSwitch.start() - - var listenSwitch = newStandardSwitch(some(key)) + listenSwitch = newStandardSwitch(some(key)) await listenSwitch.start() - var responseRequestIdFuture = newFuture[string]() - var completionFut = newFuture[bool]() + let dialSwitch = newStandardSwitch() + await dialSwitch.start() - proc handle(requestId: string, msg: PushRequest) {.gcsafe, closure.} = + + proc requestHandler(requestId: string, msg: PushRequest) {.gcsafe, closure.} = # TODO Success return here debug "handle push req" check: 1 == 0 - responseRequestIdFuture.complete(requestId) # FIXME Unclear how we want to use subscriptions, if at all let - proto = WakuLightPush.init(PeerManager.new(dialSwitch), crypto.newRng(), handle) - wm = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic) - rpc = PushRequest(pubSubTopic: defaultTopic, message: wm) + peerManager = PeerManager.new(dialSwitch) + rng = crypto.newRng() + proto = WakuLightPush.init(peerManager, rng, requestHandler) - dialSwitch.mount(proto) proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) + dialSwitch.mount(proto) # TODO Can possibly get rid of this if it isn't dynamic - proc requestHandle(requestId: string, msg: PushRequest) {.gcsafe, closure.} = + proc requestHandler2(requestId: string, msg: PushRequest) {.gcsafe, closure.} = debug "push request handler" # TODO: Also relay message # TODO: Here we want to send back response with is_success true discard let - proto2 = WakuLightPush.init(PeerManager.new(listenSwitch), crypto.newRng(), requestHandle) + peerManager2 = PeerManager.new(listenSwitch) + rng2 = crypto.newRng() + proto2 = WakuLightPush.init(peerManager2, rng2, requestHandler2) listenSwitch.mount(proto2) - proc handler(response: PushResponse) {.gcsafe, closure.} = - debug "push response handler, expecting false" - check: - response.isSuccess == false - debug "Additional info", info=response.info - completionFut.complete(true) - await proto.request(rpc, handler) - await sleepAsync(2.seconds) + ## Given + let + msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: DEFAULT_CONTENT_TOPIC) + rpc = PushRequest(message: msg, pubSubTopic: DEFAULT_PUBSUB_TOPIC) + ## When + let res = await proto.request(rpc) + + ## Then + check res.isOk() + let response = res.get() check: - (await completionFut.withTimeout(5.seconds)) == true + not response.isSuccess + + ## Cleanup + await allFutures(listenSwitch.stop(), dialSwitch.stop()) \ No newline at end of file diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index f8b37f2bf..39c7fe9e6 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -19,7 +19,7 @@ import ../../waku/v2/protocol/[waku_relay, waku_message], ../../waku/v2/protocol/waku_store, ../../waku/v2/protocol/waku_filter/waku_filter, - ../../waku/v2/protocol/waku_lightpush/waku_lightpush, + ../../waku/v2/protocol/waku_lightpush, ../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/utils/peers, ../../waku/v2/utils/time, diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index e7c57710f..dab637c3e 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -20,7 +20,7 @@ import ../protocol/waku_store, ../protocol/waku_swap/waku_swap, ../protocol/waku_filter/waku_filter, - ../protocol/waku_lightpush/waku_lightpush, + ../protocol/waku_lightpush, ../protocol/waku_rln_relay/[waku_rln_relay_types], ../utils/[peers, requests, wakuswitch, wakuenr], ./peer_manager/peer_manager, diff --git a/waku/v2/node/wakunode2_types.nim b/waku/v2/node/wakunode2_types.nim index e22e46c34..26a1a0945 100644 --- a/waku/v2/node/wakunode2_types.nim +++ b/waku/v2/node/wakunode2_types.nim @@ -6,7 +6,7 @@ import ../protocol/waku_store, ../protocol/waku_swap/waku_swap, ../protocol/waku_filter/waku_filter, - ../protocol/waku_lightpush/waku_lightpush, + ../protocol/waku_lightpush, ../protocol/waku_rln_relay/waku_rln_relay_types, ./peer_manager/peer_manager, ./discv5/waku_discv5 diff --git a/waku/v2/protocol/waku_lightpush.nim b/waku/v2/protocol/waku_lightpush.nim new file mode 100644 index 000000000..100b45cf3 --- /dev/null +++ b/waku/v2/protocol/waku_lightpush.nim @@ -0,0 +1,9 @@ +import + ./waku_lightpush/protocol, + ./waku_lightpush/rpc, + ./waku_lightpush/rpc_codec + +export + protocol, + rpc, + rpc_codec \ No newline at end of file diff --git a/waku/v2/protocol/waku_lightpush/protocol.nim b/waku/v2/protocol/waku_lightpush/protocol.nim new file mode 100644 index 000000000..628614575 --- /dev/null +++ b/waku/v2/protocol/waku_lightpush/protocol.nim @@ -0,0 +1,156 @@ +{.push raises: [Defect].} + +import + std/options, + stew/results, + chronicles, + chronos, + metrics, + bearssl, + libp2p/crypto/crypto + +import + ../waku_message, + ../waku_relay, + ../../node/peer_manager/peer_manager, + ../../utils/requests, + ./rpc, + ./rpc_codec + + +logScope: + topics = "wakulightpush" + +declarePublicGauge waku_lightpush_peers, "number of lightpush peers" +declarePublicGauge waku_lightpush_errors, "number of lightpush protocol errors", ["type"] +declarePublicGauge waku_lightpush_messages, "number of lightpush messages received", ["type"] + + +const + WakuLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1" + +const + MaxRpcSize* = MaxWakuMessageSize + 64 * 1024 # We add a 64kB safety buffer for protocol overhead + +# Error types (metric label values) +const + dialFailure = "dial_failure" + decodeRpcFailure = "decode_rpc_failure" + + +type + PushResponseHandler* = proc(response: PushResponse) {.gcsafe, closure.} + + PushRequestHandler* = proc(requestId: string, msg: PushRequest) {.gcsafe, closure.} + + WakuLightPushResult*[T] = Result[T, string] + + WakuLightPush* = ref object of LPProtocol + rng*: ref BrHmacDrbgContext + peerManager*: PeerManager + requestHandler*: PushRequestHandler + relayReference*: WakuRelay + + +proc init*(wl: WakuLightPush) = + + proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = + let message = await conn.readLp(MaxRpcSize.int) + let res = PushRPC.init(message) + if res.isErr(): + error "failed to decode rpc" + waku_lightpush_errors.inc(labelValues = [decodeRpcFailure]) + return + + let rpc = res.get() + + if rpc.request != PushRequest(): + info "lightpush push request" + waku_lightpush_messages.inc(labelValues = ["PushRequest"]) + + let + pubSubTopic = rpc.request.pubSubTopic + message = rpc.request.message + debug "PushRequest", pubSubTopic=pubSubTopic, msg=message + + var response: PushResponse + if not wl.relayReference.isNil(): + let data = message.encode().buffer + + # Assumimng success, should probably be extended to check for network, peers, etc + discard wl.relayReference.publish(pubSubTopic, data) + response = PushResponse(is_success: true, info: "Totally.") + else: + debug "No relay protocol present, unsuccesssful push" + response = PushResponse(is_success: false, info: "No relay protocol") + + + let rpc = PushRPC(requestId: rpc.requestId, response: response) + await conn.writeLp(rpc.encode().buffer) + + if rpc.response != PushResponse(): + waku_lightpush_messages.inc(labelValues = ["PushResponse"]) + if rpc.response.isSuccess: + info "lightpush message success" + else: + info "lightpush message failure", info=rpc.response.info + + wl.handler = handle + wl.codec = WakuLightPushCodec + +proc init*(T: type WakuLightPush, peerManager: PeerManager, rng: ref BrHmacDrbgContext, handler: PushRequestHandler, relay: WakuRelay = nil): T = + debug "init" + let rng = crypto.newRng() + let wl = WakuLightPush(rng: rng, + peerManager: peerManager, + requestHandler: handler, + relayReference: relay) + wl.init() + + return wl + + +proc setPeer*(wlp: WakuLightPush, peer: RemotePeerInfo) = + wlp.peerManager.addPeer(peer, WakuLightPushCodec) + waku_lightpush_peers.inc() + + +proc request(wl: WakuLightPush, req: PushRequest, peer: RemotePeerInfo): Future[WakuLightPushResult[PushResponse]] {.async, gcsafe.} = + let connOpt = await wl.peerManager.dialPeer(peer, WakuLightPushCodec) + if connOpt.isNone(): + waku_lightpush_errors.inc(labelValues = [dialFailure]) + return err(dialFailure) + + let connection = connOpt.get() + + let rpc = PushRPC(requestId: generateRequestId(wl.rng), request: req) + await connection.writeLP(rpc.encode().buffer) + + var message = await connection.readLp(MaxRpcSize.int) + let res = PushRPC.init(message) + + if res.isErr(): + waku_lightpush_errors.inc(labelValues = [decodeRpcFailure]) + return err(decodeRpcFailure) + + let rpcRes = res.get() + if rpcRes.response == PushResponse(): + return err("empty response body") + + return ok(rpcRes.response) + +proc request*(wl: WakuLightPush, req: PushRequest): Future[WakuLightPushResult[PushResponse]] {.async, gcsafe.} = + let peerOpt = wl.peerManager.selectPeer(WakuLightPushCodec) + if peerOpt.isNone(): + waku_lightpush_errors.inc(labelValues = [dialFailure]) + return err(dialFailure) + + return await wl.request(req, peerOpt.get()) + +proc request*(wl: WakuLightPush, req: PushRequest, handler: PushResponseHandler) {.async, gcsafe, + deprecated: "Use the no-callback request() procedure".} = + let res = await wl.request(req) + if res.isErr(): + return + + handler(res.get()) \ No newline at end of file diff --git a/waku/v2/protocol/waku_lightpush/rpc.nim b/waku/v2/protocol/waku_lightpush/rpc.nim new file mode 100644 index 000000000..3d4e145b8 --- /dev/null +++ b/waku/v2/protocol/waku_lightpush/rpc.nim @@ -0,0 +1,18 @@ +{.push raises: [Defec].} + +import + ../waku_message + +type + PushRequest* = object + pubSubTopic*: string + message*: WakuMessage + + PushResponse* = object + isSuccess*: bool + info*: string + + PushRPC* = object + requestId*: string + request*: PushRequest + response*: PushResponse \ No newline at end of file diff --git a/waku/v2/protocol/waku_lightpush/rpc_codec.nim b/waku/v2/protocol/waku_lightpush/rpc_codec.nim new file mode 100644 index 000000000..7d4c0e21b --- /dev/null +++ b/waku/v2/protocol/waku_lightpush/rpc_codec.nim @@ -0,0 +1,86 @@ +{.push raises: [Defect].} + + +import + libp2p/protobuf/minprotobuf +import + ../waku_message, + ../../utils/protobuf, + ./rpc + + +proc encode*(rpc: PushRequest): ProtoBuffer = + var output = initProtoBuffer() + output.write3(1, rpc.pubSubTopic) + output.write3(2, rpc.message.encode()) + output.finish3() + + return output + +proc init*(T: type PushRequest, buffer: seq[byte]): ProtoResult[T] = + let pb = initProtoBuffer(buffer) + + var rpc = PushRequest() + + var pubSubTopic: string + discard ?pb.getField(1, pubSubTopic) + rpc.pubSubTopic = pubSubTopic + + var buf: seq[byte] + discard ?pb.getField(2, buf) + rpc.message = ?WakuMessage.init(buf) + + return ok(rpc) + + +proc encode*(rpc: PushResponse): ProtoBuffer = + var output = initProtoBuffer() + output.write3(1, uint64(rpc.isSuccess)) + output.write3(2, rpc.info) + output.finish3() + + return output + +proc init*(T: type PushResponse, buffer: seq[byte]): ProtoResult[T] = + let pb = initProtoBuffer(buffer) + + var rpc = PushResponse(isSuccess: false, info: "") + + var isSuccess: uint64 + if ?pb.getField(1, isSuccess): + rpc.isSuccess = bool(isSuccess) + + var info: string + discard ?pb.getField(2, info) + rpc.info = info + + return ok(rpc) + + +proc encode*(rpc: PushRPC): ProtoBuffer = + var output = initProtoBuffer() + output.write3(1, rpc.requestId) + output.write3(2, rpc.request.encode()) + output.write3(3, rpc.response.encode()) + output.finish3() + + return output + +proc init*(T: type PushRPC, buffer: seq[byte]): ProtoResult[T] = + let pb = initProtoBuffer(buffer) + + var rpc = PushRPC() + + var requestId: string + discard ?pb.getField(1, requestId) + rpc.requestId = requestId + + var requestBuffer: seq[byte] + discard ?pb.getField(2, requestBuffer) + rpc.request = ?PushRequest.init(requestBuffer) + + var pushBuffer: seq[byte] + discard ?pb.getField(3, pushBuffer) + rpc.response = ?PushResponse.init(pushBuffer) + + return ok(rpc) \ No newline at end of file diff --git a/waku/v2/protocol/waku_lightpush/waku_lightpush.nim b/waku/v2/protocol/waku_lightpush/waku_lightpush.nim deleted file mode 100644 index 2134c5f8f..000000000 --- a/waku/v2/protocol/waku_lightpush/waku_lightpush.nim +++ /dev/null @@ -1,202 +0,0 @@ -{.push raises: [Defect].} - -import - std/[tables, options], - bearssl, - chronos, chronicles, metrics, stew/results, - libp2p/protocols/pubsub/pubsubpeer, - libp2p/protocols/pubsub/floodsub, - libp2p/protocols/pubsub/gossipsub, - libp2p/protocols/protocol, - libp2p/protobuf/minprotobuf, - libp2p/stream/connection, - libp2p/crypto/crypto, - waku_lightpush_types, - ../../utils/requests, - ../../utils/protobuf, - ../../node/peer_manager/peer_manager, - ../waku_relay - -export waku_lightpush_types - -declarePublicGauge waku_lightpush_peers, "number of lightpush peers" -declarePublicGauge waku_lightpush_errors, "number of lightpush protocol errors", ["type"] -declarePublicGauge waku_lightpush_messages, "number of lightpush messages received", ["type"] - -logScope: - topics = "wakulightpush" - -const - WakuLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1" - -# Error types (metric label values) -const - dialFailure = "dial_failure" - decodeRpcFailure = "decode_rpc_failure" - -# Encoding and decoding ------------------------------------------------------- -proc encode*(rpc: PushRequest): ProtoBuffer = - var output = initProtoBuffer() - - output.write3(1, rpc.pubSubTopic) - output.write3(2, rpc.message.encode()) - - output.finish3() - - return output - -proc init*(T: type PushRequest, buffer: seq[byte]): ProtoResult[T] = - #var rpc = PushRequest(pubSubTopic: "", message: WakuMessage()) - var rpc = PushRequest() - let pb = initProtoBuffer(buffer) - - var pubSubTopic: string - discard ? pb.getField(1, pubSubTopic) - rpc.pubSubTopic = pubSubTopic - - var buf: seq[byte] - discard ? pb.getField(2, buf) - rpc.message = ? WakuMessage.init(buf) - - return ok(rpc) - -proc encode*(rpc: PushResponse): ProtoBuffer = - var output = initProtoBuffer() - - output.write3(1, uint64(rpc.isSuccess)) - output.write3(2, rpc.info) - - output.finish3() - - return output - -proc init*(T: type PushResponse, buffer: seq[byte]): ProtoResult[T] = - var rpc = PushResponse(isSuccess: false, info: "") - let pb = initProtoBuffer(buffer) - - var isSuccess: uint64 - if ? pb.getField(1, isSuccess): - rpc.isSuccess = bool(isSuccess) - - var info: string - discard ? pb.getField(2, info) - rpc.info = info - - return ok(rpc) - -proc encode*(rpc: PushRPC): ProtoBuffer = - var output = initProtoBuffer() - - output.write3(1, rpc.requestId) - output.write3(2, rpc.request.encode()) - output.write3(3, rpc.response.encode()) - - output.finish3() - - return output - -proc init*(T: type PushRPC, buffer: seq[byte]): ProtoResult[T] = - var rpc = PushRPC() - let pb = initProtoBuffer(buffer) - - discard ? pb.getField(1, rpc.requestId) - - var requestBuffer: seq[byte] - discard ? pb.getField(2, requestBuffer) - - rpc.request = ? PushRequest.init(requestBuffer) - - var pushBuffer: seq[byte] - discard ? pb.getField(3, pushBuffer) - - rpc.response = ? PushResponse.init(pushBuffer) - - return ok(rpc) - -# Protocol ------------------------------------------------------- -proc init*(T: type WakuLightPush, peerManager: PeerManager, rng: ref BrHmacDrbgContext, handler: PushRequestHandler, relay: WakuRelay = nil): T = - debug "init" - let rng = crypto.newRng() - var wl = WakuLightPush(rng: rng, - peerManager: peerManager, - requestHandler: handler, - relayReference: relay) - wl.init() - - return wl - -proc setPeer*(wlp: WakuLightPush, peer: RemotePeerInfo) = - wlp.peerManager.addPeer(peer, WakuLightPushCodec) - waku_lightpush_peers.inc() - -method init*(wlp: WakuLightPush) = - debug "init" - proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = - var message = await conn.readLp(MaxRpcSize.int) - var res = PushRPC.init(message) - if res.isErr: - error "failed to decode rpc" - waku_lightpush_errors.inc(labelValues = [decodeRpcFailure]) - return - - info "lightpush message received" - - let value = res.value - if value.request != PushRequest(): - info "lightpush push request" - waku_lightpush_messages.inc(labelValues = ["PushRequest"]) - let - pubSubTopic = value.request.pubSubTopic - message = value.request.message - debug "PushRequest", pubSubTopic=pubSubTopic, msg=message - var response: PushResponse - if wlp.relayReference != nil: - let wakuRelay = wlp.relayReference - let data = message.encode().buffer - # XXX Assumes success, should probably be extended to check for network, peers, etc - discard wakuRelay.publish(pubSubTopic, data) - response = PushResponse(is_success: true, info: "Totally.") - else: - debug "No relay protocol present, unsuccesssful push" - response = PushResponse(is_success: false, info: "No relay protocol") - await conn.writeLp(PushRPC(requestId: value.requestId, - response: response).encode().buffer) - #wlp.requestHandler(value.requestId, value.request) - if value.response != PushResponse(): - waku_lightpush_messages.inc(labelValues = ["PushResponse"]) - if value.response.isSuccess: - info "lightpush message success" - else: - info "lightpush message failure", info=value.response.info - - wlp.handler = handle - wlp.codec = WakuLightPushCodec - -proc request*(w: WakuLightPush, request: PushRequest, handler: PushResponseHandler) {.async, gcsafe.} = - let peerOpt = w.peerManager.selectPeer(WakuLightPushCodec) - - if peerOpt.isNone(): - error "no suitable remote peers" - waku_lightpush_errors.inc(labelValues = [dialFailure]) - return - - let connOpt = await w.peerManager.dialPeer(peerOpt.get(), WakuLightPushCodec) - - if connOpt.isNone(): - # @TODO more sophisticated error handling here - error "failed to connect to remote peer" - waku_lightpush_errors.inc(labelValues = [dialFailure]) - return - - await connOpt.get().writeLP(PushRPC(requestId: generateRequestId(w.rng), - request: request).encode().buffer) - - var message = await connOpt.get().readLp(64*1024) - let response = PushRPC.init(message) - - if response.isErr: - error "failed to decode response" - waku_lightpush_errors.inc(labelValues = [decodeRpcFailure]) - return - - handler(response.value.response) diff --git a/waku/v2/protocol/waku_lightpush/waku_lightpush_types.nim b/waku/v2/protocol/waku_lightpush/waku_lightpush_types.nim deleted file mode 100644 index 2ae01f5cb..000000000 --- a/waku/v2/protocol/waku_lightpush/waku_lightpush_types.nim +++ /dev/null @@ -1,36 +0,0 @@ -import - std/[tables], - bearssl, - libp2p/protocols/protocol, - ../../node/peer_manager/peer_manager, - ../waku_message, - ../waku_relay - -export waku_message - -const - MaxRpcSize* = MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead - -type - PushRequest* = object - pubSubTopic*: string - message*: WakuMessage - - PushResponse* = object - isSuccess*: bool - info*: string - - PushRPC* = object - requestId*: string - request*: PushRequest - response*: PushResponse - - PushResponseHandler* = proc(response: PushResponse) {.gcsafe, closure.} - - PushRequestHandler* = proc(requestId: string, msg: PushRequest) {.gcsafe, closure.} - - WakuLightPush* = ref object of LPProtocol - rng*: ref BrHmacDrbgContext - peerManager*: PeerManager - requestHandler*: PushRequestHandler - relayReference*: WakuRelay