From 23453cbc65120e49bf7e6175308a47ea9a885088 Mon Sep 17 00:00:00 2001 From: Sergei Tikhomirov Date: Mon, 17 Jun 2024 16:59:41 +0200 Subject: [PATCH] feat: add i13n PoC with dummy request-response protocol --- tests/incentivization/test_all.nim | 4 +- tests/incentivization/test_eligibility.nim | 24 ++++++ tests/incentivization/test_poc.nim | 86 ++++++++++++++++++++++ tests/incentivization/test_rpc_codec.nim | 36 ++++++--- waku/incentivization/client.nim | 55 ++++++++++++++ waku/incentivization/common.nim | 83 +++++++++++++++++++++ waku/incentivization/protocol.nim | 56 ++++++++++++++ waku/incentivization/rpc.nim | 10 +++ waku/incentivization/rpc_codec.nim | 46 +++++++++++- 9 files changed, 386 insertions(+), 14 deletions(-) create mode 100644 tests/incentivization/test_eligibility.nim create mode 100644 tests/incentivization/test_poc.nim create mode 100644 waku/incentivization/client.nim create mode 100644 waku/incentivization/common.nim create mode 100644 waku/incentivization/protocol.nim diff --git a/tests/incentivization/test_all.nim b/tests/incentivization/test_all.nim index 4a38f788b..84c8e0456 100644 --- a/tests/incentivization/test_all.nim +++ b/tests/incentivization/test_all.nim @@ -1,2 +1,4 @@ import - ./test_rpc_codec \ No newline at end of file + ./test_rpc_codec, + ./test_eligibility, + ./test_poc \ No newline at end of file diff --git a/tests/incentivization/test_eligibility.nim b/tests/incentivization/test_eligibility.nim new file mode 100644 index 000000000..a20617ca2 --- /dev/null +++ b/tests/incentivization/test_eligibility.nim @@ -0,0 +1,24 @@ +import + std/options, + testutils/unittests, + chronos + +import + ../../../waku/incentivization/[ + rpc,common + ] + + +suite "Waku Incentivization Eligibility Testing": + + asyncTest "check eligibility success": + var byteSequence: seq[byte] = @[1, 2, 3, 4, 5, 6, 7, 8] + let eligibilityProof = EligibilityProof(proofOfPayment: some(byteSequence)) + check: + isEligible(eligibilityProof) + + asyncTest "check eligibility failure": + var byteSequence: seq[byte] = @[0, 2, 3, 4, 5, 6, 7, 8] + let eligibilityProof = EligibilityProof(proofOfPayment: some(byteSequence)) + check: + not isEligible(eligibilityProof) diff --git a/tests/incentivization/test_poc.nim b/tests/incentivization/test_poc.nim new file mode 100644 index 000000000..6677e7595 --- /dev/null +++ b/tests/incentivization/test_poc.nim @@ -0,0 +1,86 @@ +{.used.} + +import + std/[options, strscans], + testutils/unittests, + chronicles, + chronos, + libp2p/crypto/crypto + +import + ../../../waku/[ + node/peer_manager, + waku_core, + ], + ../testlib/[assertions, wakucore, testasync, futures, testutils], + ../../../waku/incentivization/[ + rpc, + rpc_codec, + common, + client, + protocol, + ] + + +proc newTestDummyProtocolNode*( + switch: Switch, + handler: DummyHandler + ): Future[DummyProtocol] {.async.} = + let + peerManager = PeerManager.new(switch) + dummyProtocol = DummyProtocol.new(peerManager, handler) + + await dummyProtocol.start() + switch.mount(dummyProtocol) + + return dummyProtocol + + +suite "Waku Incentivization PoC Dummy Protocol": + + var + serverSwitch {.threadvar.}: Switch + serverRemotePeerInfo {.threadvar.}: RemotePeerInfo + handlerFuture {.threadvar.}: Future[DummyRequest] + handler {.threadvar.}: DummyHandler + server {.threadvar.}: DummyProtocol + clientSwitch {.threadvar.}: Switch + client {.threadvar.}: WakuDummyClient + clientPeerId {.threadvar.}: PeerId + + asyncSetup: + + # setting up a server + serverSwitch = newTestSwitch() + handlerFuture = newFuture[DummyRequest]() + handler = proc( + peer: PeerId, dummyRequest: DummyRequest + ): Future[DummyResult[void]] {.async.} = + handlerFuture.complete(dummyRequest) + return ok() + server = await newTestDummyProtocolNode(serverSwitch, handler) + + # setting up a client + clientSwitch = newTestSwitch() + let peerManager = PeerManager.new(clientSwitch) + client = WakuDummyClient.new(peerManager) + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + clientPeerId = clientSwitch.peerInfo.peerId + serverRemotePeerInfo = serverSwitch.peerInfo.toRemotePeerInfo() + + asyncTeardown: + await allFutures(clientSwitch.stop(), serverSwitch.stop()) + + asyncTest "incentivization PoC: dummy protocol with a valid eligibility proof": + let request = genDummyRequestWithEligibilityProof(true) + let response = await client.sendRequest(request, serverRemotePeerInfo) + check: + response.isOk() + + asyncTest "incentivization PoC: dummy protocol client with an invalid eligibility proof": + let request = genDummyRequestWithEligibilityProof(false) + let response = await client.sendRequest(request, serverRemotePeerInfo) + check: + response.isErr() diff --git a/tests/incentivization/test_rpc_codec.nim b/tests/incentivization/test_rpc_codec.nim index 11c804016..08daa1032 100644 --- a/tests/incentivization/test_rpc_codec.nim +++ b/tests/incentivization/test_rpc_codec.nim @@ -8,25 +8,37 @@ import import ../../../waku/incentivization/rpc, - ../../../waku/incentivization/rpc_codec - + ../../../waku/incentivization/rpc_codec, + ../../../waku/incentivization/common suite "Waku Incentivization Eligibility Codec": asyncTest "encode eligibility proof": - var byteSequence: seq[byte] = @[1, 2, 3, 4, 5, 6, 7, 8] - let epRpc = EligibilityProof(proofOfPayment: some(byteSequence)) - let encoded = encode(epRpc) + let eligibilityProof = genEligibilityProof(true) + let encoded = encode(eligibilityProof) let decoded = EligibilityProof.decode(encoded.buffer).get() check: - epRpc == decoded + eligibilityProof == decoded asyncTest "encode eligibility status": - let esRpc = EligibilityStatus( - statusCode: uint32(200), - statusDesc: some("OK") - ) - let encoded = encode(esRpc) + let eligibilityStatus = genEligibilityStatus(true) + let encoded = encode(eligibilityStatus) let decoded = EligibilityStatus.decode(encoded.buffer).get() check: - esRpc == decoded + eligibilityStatus == decoded + + asyncTest "encode dummy request": + let dummyRequest = genDummyRequestWithEligibilityProof(true) + let encoded = encode(dummyRequest) + let decoded = DummyRequest.decode(encoded.buffer).get() + check: + dummyRequest == decoded + + asyncTest "encode dummy response": + var dummyResponse = genDummyResponseWithEligibilityStatus(true) + let encoded = encode(dummyResponse) + let decoded = DummyResponse.decode(encoded.buffer).get() + check: + dummyResponse == decoded + + \ No newline at end of file diff --git a/waku/incentivization/client.nim b/waku/incentivization/client.nim new file mode 100644 index 000000000..4fe8170ff --- /dev/null +++ b/waku/incentivization/client.nim @@ -0,0 +1,55 @@ +import std/options, chronicles, chronos, libp2p/protocols/protocol +import + ../node/peer_manager, ../waku_core, ./common, ./rpc_codec, ./rpc + +logScope: + topics = "waku incentivization PoC client" + +type WakuDummyClient* = ref object + peerManager*: PeerManager + +proc new*( + T: type WakuDummyClient, peerManager: PeerManager): T = + WakuDummyClient(peerManager: peerManager) + +proc sendDummyRequest( + dummyClient: WakuDummyClient, dummyRequest: DummyRequest, peer: PeerId | RemotePeerInfo +): Future[DummyResult[void]] {.async, gcsafe.} = + let connOpt = await dummyClient.peerManager.dialPeer(peer, DummyCodec) + if connOpt.isNone(): + return err("dialFailure") + let connection = connOpt.get() + await connection.writeLP(dummyRequest.encode().buffer) + + var buffer: seq[byte] + try: + buffer = await connection.readLp(DefaultMaxRpcSize.int) + except LPStreamRemoteClosedError: + return err("Exception reading: " & getCurrentExceptionMsg()) + + let decodeRespRes = DummyResponse.decode(buffer) + if decodeRespRes.isErr(): + return err("decodeRpcFailure") + + let dummyResponse = decodeRespRes.get() + + let requestId = dummyResponse.requestId + let eligibilityStatus = dummyResponse.eligibilityStatus + let statusCode = eligibilityStatus.statusCode + # status description is optional + var statusDesc = "" + let statusDescRes = eligibilityStatus.statusDesc + if statusDescRes.isSome(): + statusDesc = statusDescRes.get() + + if statusCode == 200: + return ok() + else: + return err(statusDesc) + +proc sendRequest*( + dummyClient: WakuDummyClient, + dummyRequest: DummyRequest, + peer: PeerId | RemotePeerInfo, +): Future[DummyResult[void]] {.async, gcsafe.} = + return await dummyClient.sendDummyRequest(dummyRequest, peer) diff --git a/waku/incentivization/common.nim b/waku/incentivization/common.nim new file mode 100644 index 000000000..515caadf7 --- /dev/null +++ b/waku/incentivization/common.nim @@ -0,0 +1,83 @@ +import + std/options, + std/strscans, + testutils/unittests, + chronicles, + chronos, + libp2p/crypto/crypto + +import stew/results, chronos, libp2p/peerid + +import + ../../../waku/incentivization/rpc, + ../../../waku/incentivization/rpc_codec + +const DummyCodec* = "/vac/waku/dummy/0.0.1" + +type DummyResult*[T] = Result[T, string] + +type DummyHandler* = proc( + peer: PeerId, + dummyRequest: DummyRequest +): Future[DummyResult[void]] {.async.} + +type + DummyProtocolErrorKind* {.pure.} = enum + UNKNOWN = uint32(000) + BAD_RESPONSE = uint32(300) + BAD_REQUEST = uint32(400) + PAYMENT_REQUIRED = uint(402) # error type specific for incentivization + NOT_FOUND = uint32(404) + SERVICE_UNAVAILABLE = uint32(503) + PEER_DIAL_FAILURE = uint32(504) + + DummyProtocolError* = object + case kind*: DummyProtocolErrorKind + of PEER_DIAL_FAILURE: + address*: string + of BAD_RESPONSE, BAD_REQUEST, NOT_FOUND, SERVICE_UNAVAILABLE, PAYMENT_REQUIRED: + cause*: string + else: + discard + + DummyProtocolResult* = Result[void, DummyProtocolError] + + +proc genEligibilityProof*(startsWithOne: bool): EligibilityProof = + let byteSequence: seq[byte] = ( + if startsWithOne: + @[1, 2, 3, 4, 5, 6, 7, 8] + else: + @[0, 2, 3, 4, 5, 6, 7, 8]) + EligibilityProof(proofOfPayment: some(byteSequence)) + +proc genEligibilityStatus*(isEligible: bool): EligibilityStatus = + if isEligible: + EligibilityStatus( + statusCode: uint32(200), + statusDesc: some("OK")) + else: + EligibilityStatus( + statusCode: uint32(402), + statusDesc: some("Payment Required")) + +proc genDummyRequestWithEligibilityProof*(proofValid: bool, requestId: string = ""): DummyRequest = + let eligibilityProof = genEligibilityProof(proofValid) + result.requestId = requestId + result.eligibilityProof = eligibilityProof + +proc genDummyResponseWithEligibilityStatus*(proofValid: bool, requestId: string = ""): DummyResponse = + let eligibilityStatus = genEligibilityStatus(proofValid) + result.requestId = requestId + result.eligibilityStatus = eligibilityStatus + +proc dummyEligibilityCriteriaMet(eligibilityProof: EligibilityProof): bool = + # a dummy criterion: the first element of the proof byte array equals 1 + let proofOfPayment = eligibilityProof.proofOfPayment + if proofOfPayment.isSome: + return (proofOfPayment.get()[0] == 1) + else: + return false + +proc isEligible*(eligibilityProof: EligibilityProof): bool = + dummyEligibilityCriteriaMet(eligibilityProof) diff --git a/waku/incentivization/protocol.nim b/waku/incentivization/protocol.nim new file mode 100644 index 000000000..8dc16c7aa --- /dev/null +++ b/waku/incentivization/protocol.nim @@ -0,0 +1,56 @@ +import + std/[options, sequtils, sets, strutils, tables], + stew/byteutils, + chronicles, + chronos, + libp2p/peerid, + libp2p/protocols/protocol +import + ../node/peer_manager, + ../waku_core, + ./common, + ./rpc_codec, + ./rpc + +logScope: + topics = "waku incentivization PoC" + +type DummyProtocol* = ref object of LPProtocol + peerManager*: PeerManager + dummyHandler*: DummyHandler + +proc handleRequest*( + dummyProtocol: DummyProtocol, peerId: PeerId, buffer: seq[byte] + ): Future[DummyResponse] {.async.} = + let reqDecodeRes = DummyRequest.decode(buffer) + var isProofValid = false + var requestId = "" + if reqDecodeRes.isOk(): + let dummyRequest = reqDecodeRes.get() + let eligibilityProof = dummyRequest.eligibilityProof + requestId = dummyRequest.requestId + isProofValid = isEligible(eligibilityProof) + let response = genDummyResponseWithEligibilityStatus(isProofValid, requestId) + return response + +proc initProtocolHandler(dummyProtocol: DummyProtocol) = + proc handle(conn: Connection, proto: string) {.async.} = + let buffer = await conn.readLp(DefaultMaxRpcSize) + var dummyResponse = await handleRequest(dummyProtocol, conn.peerId, buffer) + await conn.writeLp(dummyResponse.encode().buffer) + + dummyProtocol.handler = handle + dummyProtocol.codec = DummyCodec + +proc new*( + T: type DummyProtocol, + peerManager: PeerManager, + dummyHandler: DummyHandler, + ): T = + let dummyProtocol = DummyProtocol( + peerManager: peerManager, + dummyHandler: dummyHandler, + ) + dummyProtocol.initProtocolHandler() + return dummyProtocol + diff --git a/waku/incentivization/rpc.nim b/waku/incentivization/rpc.nim index 5b0d0e99b..154861359 100644 --- a/waku/incentivization/rpc.nim +++ b/waku/incentivization/rpc.nim @@ -15,3 +15,13 @@ type EligibilityStatus* = object statusCode*: uint32 statusDesc*: Option[string] + + DummyRequest* = object + requestId*: string + # request content goes here + eligibilityProof*: EligibilityProof + + DummyResponse* = object + requestId*: string + # response content goes here + eligibilityStatus*: EligibilityStatus diff --git a/waku/incentivization/rpc_codec.nim b/waku/incentivization/rpc_codec.nim index dbe6e9daf..ce1d8d189 100644 --- a/waku/incentivization/rpc_codec.nim +++ b/waku/incentivization/rpc_codec.nim @@ -5,6 +5,7 @@ import ../waku_core, ./rpc +const DefaultMaxRpcSize* = -1 # Codec for EligibilityProof @@ -28,7 +29,6 @@ proc decode*(T: type EligibilityProof, buffer: seq[byte]): ProtobufResult[T] = epRpc.proofOfPayment = some(proofOfPayment) ok(epRpc) - # Codec for EligibilityStatus proc encode*(esRpc: EligibilityStatus): ProtoBuffer = @@ -57,3 +57,47 @@ proc decode*(T: type EligibilityStatus, buffer: seq[byte]): ProtobufResult[T] = ok(esRpc) +# Codec for DummyRequest + +proc encode*(request: DummyRequest): ProtoBuffer = + var pb = initProtoBuffer() + pb.write3(1, request.requestId) + pb.write3(10, request.eligibilityProof.encode()) + pb + +proc decode*(T: type DummyRequest, buffer: seq[byte]): ProtobufResult[T] = + let pb = initProtoBuffer(buffer) + var request = DummyRequest() + + if not ?pb.getField(1,request.requestId): + return err(ProtobufError.missingRequiredField("requestId")) + + var eligibilityProofBytes: seq[byte] + if not ?pb.getField(10, eligibilityProofBytes): + return err(ProtobufError.missingRequiredField("eligibilityProof")) + else: + request.eligibilityProof = ?EligibilityProof.decode(eligibilityProofBytes) + ok(request) + + +# Codec for DummyResponse + +proc encode*(response: DummyResponse): ProtoBuffer = + var pb = initProtoBuffer() + pb.write3(1, response.requestId) + pb.write3(5, response.eligibilityStatus.encode()) + pb + +proc decode*(T: type DummyResponse, buffer: seq[byte]): ProtobufResult[T] = + let pb = initProtoBuffer(buffer) + var response = DummyResponse() + + if not ?pb.getField(1,response.requestId): + return err(ProtobufError.missingRequiredField("requestId")) + + var eligibilityStatusBytes: seq[byte] + if not ?pb.getField(5, eligibilityStatusBytes): + return err(ProtobufError.missingRequiredField("eligibilityStatus")) + else: + response.eligibilityStatus = ?EligibilityStatus.decode(eligibilityStatusBytes) + ok(response)