From 9bd042c265f3380bf9302b8d73aa36a3046b8369 Mon Sep 17 00:00:00 2001 From: kdeme Date: Fri, 6 Dec 2019 00:45:14 +0100 Subject: [PATCH] First go on Waku mail client addition --- eth.nimble | 1 + eth/p2p/rlpx_protocols/waku_protocol.nim | 44 +++++++++++++-- tests/p2p/test_waku_mail.nim | 71 ++++++++++++++++++++++++ 3 files changed, 110 insertions(+), 6 deletions(-) create mode 100644 tests/p2p/test_waku_mail.nim diff --git a/eth.nimble b/eth.nimble index 6215f14..455a7c6 100644 --- a/eth.nimble +++ b/eth.nimble @@ -53,6 +53,7 @@ proc runP2pTests() = "test_shh_config", "test_shh_connect", "test_waku_bridge", + "test_waku_mail", "test_protocol_handlers", ]: runTest("tests/p2p/" & filename) diff --git a/eth/p2p/rlpx_protocols/waku_protocol.nim b/eth/p2p/rlpx_protocols/waku_protocol.nim index d180f24..695c474 100644 --- a/eth/p2p/rlpx_protocols/waku_protocol.nim +++ b/eth/p2p/rlpx_protocols/waku_protocol.nim @@ -76,6 +76,14 @@ type filters*: Filters config*: WakuConfig + MailRequest* = object + lower*: uint32 ## Unix timestamp; oldest requested envelope's creation time + upper*: uint32 ## Unix timestamp; newest requested envelope's creation time + bloom*: Bytes ## Bloom filter to apply on the envelopes + limit*: uint32 ## Maximum amount of envelopes to return + # TODO: Not sure how cursor is supposed to work. The server is supposed + # to also send a P2P request back? Strange. And why is it an array? + # cursor*: Option[Bytes] proc allowed*(msg: Message, config: WakuConfig): bool = # Check max msg size, already happens in RLPx but there is a specific waku @@ -222,11 +230,12 @@ p2pProtocol Waku(version = wakuVersion, # such as e.g. Waku Mail Server discard - proc p2pMessage(peer: Peer, envelope: Envelope) = + proc p2pMessage(peer: Peer, envelopes: openarray[Envelope]) = if peer.state.trusted: # when trusted we can bypass any checks on envelope - let msg = Message(env: envelope, isP2P: true) - peer.networkState.filters.notify(msg) + for envelope in envelopes: + let msg = Message(env: envelope, isP2P: true) + peer.networkState.filters.notify(msg) # Following message IDs are not part of EIP-627, but are added and used by # the Status application, we ignore them for now. @@ -304,10 +313,17 @@ proc run(node: EthereumNode, network: WakuNetwork) {.async.} = # Private EthereumNode calls --------------------------------------------------- -proc sendP2PMessage(node: EthereumNode, peerId: NodeId, env: Envelope): bool = +proc sendP2PMessage(node: EthereumNode, peerId: NodeId, + envelopes: openarray[Envelope]): bool = for peer in node.peers(Waku): if peer.remote.id == peerId: - asyncCheck peer.p2pMessage(env) + asyncCheck peer.p2pMessage(envelopes) + return true + +proc sendP2PRequest(node: EthereumNode, peerId: NodeId, env: Envelope): bool = + for peer in node.peers(Waku): + if peer.remote.id == peerId: + asyncCheck peer.p2pRequest(env) return true proc queueMessage(node: EthereumNode, msg: Message): bool = @@ -347,7 +363,7 @@ proc postMessage*(node: EthereumNode, pubKey = none[PublicKey](), # Allow lightnode to post only direct p2p messages if targetPeer.isSome(): - return node.sendP2PMessage(targetPeer.get(), env) + return node.sendP2PMessage(targetPeer.get(), @[env]) elif not node.protocolState(Waku).config.isLightNode: # non direct p2p message can not have ttl of 0 if env.ttl == 0: @@ -463,3 +479,19 @@ proc resetMessageQueue*(node: EthereumNode) = ## NOTE: Not something that should be run in normal circumstances. node.protocolState(Waku).queue[] = initQueue(defaultQueueCapacity) +proc requestMail*(node: EthereumNode, peerId: NodeId, request: MailRequest, + symKey: SymKey): bool = + var writer = initRlpWriter() + writer.append(request) + let payload = writer.finish() + + let data = encode(Payload(payload: payload, symKey: some(symKey))) + + if data.isSome(): + # TODO: should this envelope be valid in terms of ttl, PoW, etc.? + var env = Envelope(expiry:0, ttl: 0, data: data.get(), nonce: 0) + + return node.sendP2PRequest(peerId, env) + else: + error "Encoding of payload failed" + return false diff --git a/tests/p2p/test_waku_mail.nim b/tests/p2p/test_waku_mail.nim new file mode 100644 index 0000000..ae17d8c --- /dev/null +++ b/tests/p2p/test_waku_mail.nim @@ -0,0 +1,71 @@ +import + unittest, chronos, tables, sequtils, times, eth/p2p, eth/p2p/peer_pool, + eth/p2p/rlpx_protocols/waku_protocol, chronicles, + ./p2p_test_helper + +suite "Waku Mail Client": + var client = setupTestNode(Waku) + var simpleServer = setupTestNode(Waku) + + simpleServer.startListening() + let simpleServerNode = newNode(initENode(simpleServer.keys.pubKey, + simpleServer.address)) + let clientNode = newNode(initENode(client.keys.pubKey, client.address)) + waitFor client.peerPool.connectToNode(simpleServerNode) + + asyncTest "Two peers connected": + check: + client.peerPool.connectedNodes.len() == 1 + + asyncTest "Test Mail Request": + let + topic = [byte 0, 0, 0, 0] + bloom = toBloom(@[topic]) + lower = 0'u32 + upper = epochTime().uint32 + limit = 100'u32 + request = MailRequest(lower: lower, upper: upper, bloom: @bloom, + limit: limit) + + var symKey: SymKey + check client.requestMail(simpleServerNode.id, request, symKey) + + # Simple mailserver part + let peer = simpleServer.peerPool.connectedNodes[clientNode] + var f = peer.nextMsg(Waku.p2pRequest) + require await f.withTimeout(chronos.milliseconds(100)) + let response = f.read() + let decoded = decode(response.envelope.data, symKey = some(symKey)) + require decoded.isSome() + + var rlp = rlpFromBytes(decoded.get().payload.toRange) + let output = rlp.read(MailRequest) + check: + output.lower == lower + output.upper == upper + output.bloom == bloom + output.limit == limit + + asyncTest "Test Mail Send": + let topic = [byte 0x12, 0x34, 0x56, 0x78] + let payload = repeat(byte 0, 10) + var f = newFuture[int]() + + proc handler(msg: ReceivedMessage) = + check msg.decoded.payload == payload + f.complete(1) + + let filter = subscribeFilter(client, + newFilter(topics = @[topic], allowP2P = true), handler) + + check: + client.setPeerTrusted(simpleServerNode.id) + # ttl 0 to show that ttl should be ignored + # TODO: perhaps not the best way to test this, means no PoW calculation + # may be done, and not sure if that is OK? + simpleServer.postMessage(ttl = 0, topic = topic, payload = payload, + targetPeer = some(clientNode.id)) + + await f.withTimeout(chronos.milliseconds(100)) + + client.unsubscribeFilter(filter)