diff --git a/tests/v2/test_waku_lightpush.nim b/tests/v2/test_waku_lightpush.nim new file mode 100644 index 000000000..edbfc0427 --- /dev/null +++ b/tests/v2/test_waku_lightpush.nim @@ -0,0 +1,76 @@ +{.used.} + +import + std/[options, tables, sets], + testutils/unittests, chronos, chronicles, + libp2p/switch, + libp2p/protobuf/minprotobuf, + libp2p/stream/[bufferstream, connection], + libp2p/crypto/crypto, + libp2p/multistream, + ../../waku/v2/node/peer_manager/peer_manager, + ../../waku/v2/protocol/message_notifier, + ../../waku/v2/protocol/waku_lightpush/waku_lightpush, + ../test_helpers, ./utils + +procSuite "Waku Light Push": + + asyncTest "handle light push request": + const defaultTopic = "/waku/2/default-waku/proto" + + let + key = PrivateKey.random(ECDSA, rng[]).get() + peer = PeerInfo.init(key) + contentTopic = ContentTopic("/waku/2/default-content/proto") + post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic) + + var dialSwitch = newStandardSwitch() + discard await dialSwitch.start() + + var listenSwitch = newStandardSwitch(some(key)) + discard await listenSwitch.start() + + var responseRequestIdFuture = newFuture[string]() + var completionFut = newFuture[bool]() + + proc handle(requestId: string, msg: PushRequest) {.gcsafe, closure.} = + # TODO Success return here + debug "handle push req" + check: + 1 == 0 + responseRequestIdFuture.complete(requestId) + + # FIXME Unclear how we want to use subscriptions, if at all + let + proto = WakuLightPush.init(PeerManager.new(dialSwitch), crypto.newRng(), handle) + wm = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic) + rpc = PushRequest(pubSubTopic: defaultTopic, message: wm) + + dialSwitch.mount(proto) + proto.setPeer(listenSwitch.peerInfo) + + + # TODO Can possibly get rid of this if it isn't dynamic + proc requestHandle(requestId: string, msg: PushRequest) {.gcsafe, closure.} = + debug "push request handler" + # TODO: Also relay message + # TODO: Here we want to send back response with is_success true + discard + + let + proto2 = WakuLightPush.init(PeerManager.new(listenSwitch), crypto.newRng(), requestHandle) + + listenSwitch.mount(proto2) + + # FIXME Don't think this will be hit yet + proc handler(response: PushResponse) {.gcsafe, closure.} = + debug "push response handler, expecting false" + check: + response.isSuccess == false + completionFut.complete(true) + + await proto.request(rpc, handler) + await sleepAsync(2.seconds) + + check: + (await completionFut.withTimeout(5.seconds)) == true diff --git a/waku/v2/protocol/waku_lightpush/README.md b/waku/v2/protocol/waku_lightpush/README.md new file mode 100644 index 000000000..d88517319 --- /dev/null +++ b/waku/v2/protocol/waku_lightpush/README.md @@ -0,0 +1 @@ +# Waku Light Push diff --git a/waku/v2/protocol/waku_lightpush/waku_lightpush.nim b/waku/v2/protocol/waku_lightpush/waku_lightpush.nim new file mode 100644 index 000000000..70723a262 --- /dev/null +++ b/waku/v2/protocol/waku_lightpush/waku_lightpush.nim @@ -0,0 +1,167 @@ +import + std/[tables, sequtils, options], + bearssl, + chronos, chronicles, metrics, stew/results, + libp2p/protocols/pubsub/pubsubpeer, + libp2p/protocols/pubsub/floodsub, + libp2p/protocols/pubsub/gossipsub, + libp2p/protocols/protocol, + libp2p/protobuf/minprotobuf, + libp2p/stream/connection, + libp2p/crypto/crypto, + ../message_notifier, + waku_lightpush_types, + ../../utils/requests, + ../../node/peer_manager/peer_manager + +export waku_lightpush_types + +declarePublicGauge waku_lightpush_peers, "number of lightpush peers" +declarePublicGauge waku_lightpush_errors, "number of lightpush protocol errors", ["type"] + +logScope: + topics = "wakulightpush" + +const + WakuLightPushCodec* = "/vac/waku/lightpush/2.0.0-alpha1" + +# Error types (metric label values) +const + dialFailure = "dial_failure" + decodeRpcFailure = "decode_rpc_failure" + +# Encoding and decoding ------------------------------------------------------- +proc encode*(rpc: PushRequest): ProtoBuffer = + result = initProtoBuffer() + + result.write(1, rpc.pubSubTopic) + result.write(2, rpc.message.encode()) + +proc init*(T: type PushRequest, buffer: seq[byte]): ProtoResult[T] = + #var rpc = PushRequest(pubSubTopic: "", message: WakuMessage()) + var rpc = PushRequest() + let pb = initProtoBuffer(buffer) + + var pubSubTopic: string + discard ? pb.getField(1, pubSubTopic) + rpc.pubSubTopic = pubSubTopic + + var buf: seq[byte] + discard ? pb.getField(2, buf) + rpc.message = ? WakuMessage.init(buf) + + ok(rpc) + +proc encode*(rpc: PushResponse): ProtoBuffer = + result = initProtoBuffer() + + result.write(1, uint64(rpc.isSuccess)) + result.write(2, rpc.info) + +proc init*(T: type PushResponse, buffer: seq[byte]): ProtoResult[T] = + var rpc = PushResponse(isSuccess: false, info: "") + let pb = initProtoBuffer(buffer) + + var isSuccess: uint64 + if ? pb.getField(1, isSuccess): + rpc.isSuccess = bool(isSuccess) + + var info: string + discard ? pb.getField(2, info) + rpc.info = info + + ok(rpc) + +proc encode*(rpc: PushRPC): ProtoBuffer = + result = initProtoBuffer() + + result.write(1, rpc.requestId) + result.write(2, rpc.request.encode()) + result.write(3, rpc.response.encode()) + +proc init*(T: type PushRPC, buffer: seq[byte]): ProtoResult[T] = + var rpc = PushRPC() + let pb = initProtoBuffer(buffer) + + discard ? pb.getField(1, rpc.requestId) + + var requestBuffer: seq[byte] + discard ? pb.getField(2, requestBuffer) + + rpc.request = ? PushRequest.init(requestBuffer) + + var pushBuffer: seq[byte] + discard ? pb.getField(3, pushBuffer) + + rpc.response = ? PushResponse.init(pushBuffer) + + ok(rpc) + +# Protocol ------------------------------------------------------- +proc init*(T: type WakuLightPush, peerManager: PeerManager, rng: ref BrHmacDrbgContext, handler: PushRequestHandler): T = + new result + result.rng = crypto.newRng() + result.peerManager = peerManager + result.requestHandler = handler + result.init() + +proc setPeer*(wlp: WakuLightPush, peer: PeerInfo) = + wlp.peerManager.addPeer(peer, WakuLightPushCodec) + waku_lightpush_peers.inc() + +method init*(wlp: WakuLightPush) = + proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = + var message = await conn.readLp(64*1024) + var res = PushRPC.init(message) + if res.isErr: + error "failed to decode rpc" + waku_lightpush_errors.inc(labelValues = [decodeRpcFailure]) + return + + info "lightpush message received" + + let value = res.value + if value.request != PushRequest(): + info "lightpush push request" + # TODO Relay messages here + var response = PushResponse(is_success: false, info: "NYI") + await conn.writeLp(PushRPC(requestId: value.requestId, + response: response).encode().buffer) + #wlp.requestHandler(value.requestId, value.request) + if value.response != PushResponse(): + if value.response.isSuccess: + info "lightpush message success" + else: + info "lightpush message failure", info=value.response.info + + wlp.handler = handle + wlp.codec = WakuLightPushCodec + +proc request*(w: WakuLightPush, request: PushRequest, handler: PushResponseHandler) {.async, gcsafe.} = + let peerOpt = w.peerManager.selectPeer(WakuLightPushCodec) + + if peerOpt.isNone(): + error "no suitable remote peers" + waku_lightpush_errors.inc(labelValues = [dialFailure]) + return + + let connOpt = await w.peerManager.dialPeer(peerOpt.get(), WakuLightPushCodec) + + if connOpt.isNone(): + # @TODO more sophisticated error handling here + error "failed to connect to remote peer" + waku_lightpush_errors.inc(labelValues = [dialFailure]) + return + + await connOpt.get().writeLP(PushRPC(requestId: generateRequestId(w.rng), + request: request).encode().buffer) + + var message = await connOpt.get().readLp(64*1024) + let response = PushRPC.init(message) + + if response.isErr: + error "failed to decode response" + waku_lightpush_errors.inc(labelValues = [decodeRpcFailure]) + return + + handler(response.value.response) diff --git a/waku/v2/protocol/waku_lightpush/waku_lightpush_types.nim b/waku/v2/protocol/waku_lightpush/waku_lightpush_types.nim new file mode 100644 index 000000000..7459eae78 --- /dev/null +++ b/waku/v2/protocol/waku_lightpush/waku_lightpush_types.nim @@ -0,0 +1,32 @@ +import + std/[tables], + bearssl, + libp2p/peerinfo, + libp2p/protocols/protocol, + ../../node/peer_manager/peer_manager, + ../waku_message + +export waku_message + +type + PushRequest* = object + pubSubTopic*: string + message*: WakuMessage + + PushResponse* = object + isSuccess*: bool + info*: string + + PushRPC* = object + requestId*: string + request*: PushRequest + response*: PushResponse + + PushResponseHandler* = proc(response: PushResponse) {.gcsafe, closure.} + + PushRequestHandler* = proc(requestId: string, msg: PushRequest) {.gcsafe, closure.} + + WakuLightPush* = ref object of LPProtocol + rng*: ref BrHmacDrbgContext + peerManager*: PeerManager + requestHandler*: PushRequestHandler