diff --git a/eth/p2p.nim b/eth/p2p.nim index 9bdf775..9fb2330 100644 --- a/eth/p2p.nim +++ b/eth/p2p.nim @@ -162,3 +162,7 @@ proc randomPeerWith*(node: EthereumNode, Protocol: type): Peer = if candidates.len > 0: return candidates.rand() +proc getPeer*(node: EthereumNode, peerId: NodeId, Protocol: type): Option[Peer] = + for peer in node.peers(Protocol): + if peer.remote.id == peerId: + return some(peer) diff --git a/eth/p2p/rlpx_protocols/waku_mail.nim b/eth/p2p/rlpx_protocols/waku_mail.nim new file mode 100644 index 0000000..425f399 --- /dev/null +++ b/eth/p2p/rlpx_protocols/waku_mail.nim @@ -0,0 +1,82 @@ +# +# Waku Mail Client & Server +# (c) Copyright 2019 +# Status Research & Development GmbH +# +# Licensed under either of +# Apache License, version 2.0, (LICENSE-APACHEv2) +# MIT license (LICENSE-MIT) +# +import + chronos, + eth/[p2p, async_utils], eth/p2p/rlpx_protocols/waku_protocol + +const + requestCompleteTimeout = chronos.seconds(5) + +type + Cursor = Bytes + + MailRequest* = object + lower*: uint32 ## Unix timestamp; oldest requested envelope's creation time + upper*: uint32 ## Unix timestamp; newest requested envelope's creation time + bloom*: Bytes ## Bloom filter to apply on the envelopes + limit*: uint32 ## Maximum amount of envelopes to return + cursor*: Cursor ## Optional cursor + +proc requestMail*(node: EthereumNode, peerId: NodeId, request: MailRequest, + symKey: SymKey, requests = 10): Future[Option[Cursor]] {.async.} = + ## Send p2p mail request and check request complete. + ## If result is none, and error occured. If result is a none empty cursor, + ## more envelopes are available. + # TODO: Perhaps don't go the recursive route or could use the actual response + # proc to implement this (via a handler) and store the necessary data in the + # WakuPeer object. + + # TODO: move this check out of requestMail? + let peer = node.getPeer(peerId, Waku) + if not peer.isSome(): + error "Invalid peer" + return result + elif not peer.get().state(Waku).trusted: + return result + + var writer = initRlpWriter() + writer.append(request) + let payload = writer.finish() + let data = encode(Payload(payload: payload, symKey: some(symKey))) + if not data.isSome(): + error "Encoding of payload failed" + return result + + # TODO: should this envelope be valid in terms of ttl, PoW, etc.? + let env = Envelope(expiry:0, ttl: 0, data: data.get(), nonce: 0) + # Send the request + traceAsyncErrors peer.get().p2pRequest(env) + + # Wait for the Request Complete packet + var f = peer.get().nextMsg(Waku.p2pRequestComplete) + if await f.withTimeout(requestCompleteTimeout): + let response = f.read() + # TODO: I guess the idea is to check requestId (Hash) also? + let requests = requests - 1 + # If there is cursor data, do another request + if response.data.cursor.len > 0 and requests > 0: + var newRequest = request + newRequest.cursor = response.data.cursor + return await requestMail(node, peerId, newRequest, symKey, requests) + else: + return some(response.data.cursor) + else: + error "p2pRequestComplete timeout" + return result + +proc p2pRequestHandler(peer: Peer, envelope: Envelope) = + # Mail server p2p request implementation + discard + +proc enableMailServer*(node: EthereumNode, customHandler: P2PRequestHandler) = + node.protocolState(Waku).p2pRequestHandler = customHandler + +proc enableMailServer*(node: EthereumNode) = + node.protocolState(Waku).p2pRequestHandler = p2pRequestHandler diff --git a/eth/p2p/rlpx_protocols/waku_protocol.nim b/eth/p2p/rlpx_protocols/waku_protocol.nim index 695c474..295ba10 100644 --- a/eth/p2p/rlpx_protocols/waku_protocol.nim +++ b/eth/p2p/rlpx_protocols/waku_protocol.nim @@ -36,7 +36,7 @@ import options, tables, times, chronos, chronicles, - eth/[keys, async_utils, p2p], whisper/whisper_types + eth/[keys, async_utils, p2p], whisper/whisper_types, eth/trie/trie_defs export whisper_types @@ -71,19 +71,22 @@ type trusted*: bool received: HashSet[Message] + P2PRequestHandler* = proc(peer: Peer, envelope: Envelope) {.gcsafe.} + WakuNetwork = ref object queue*: ref Queue filters*: Filters config*: WakuConfig + p2pRequestHandler*: P2PRequestHandler - MailRequest* = object - lower*: uint32 ## Unix timestamp; oldest requested envelope's creation time - upper*: uint32 ## Unix timestamp; newest requested envelope's creation time - bloom*: Bytes ## Bloom filter to apply on the envelopes - limit*: uint32 ## Maximum amount of envelopes to return - # TODO: Not sure how cursor is supposed to work. The server is supposed - # to also send a P2P request back? Strange. And why is it an array? - # cursor*: Option[Bytes] + # TODO: In the current specification this is not wrapped in a regular envelope + # as is done for the P2P Request packet. If we could alter this in the spec it + # would be a cleaner separation between Waku and Mail server / client and then + # this could be moved to waku_mail.nim + P2PRequestCompleteObject* = object + requestId*: Hash + lastEnvelopeHash*: Hash + cursor*: Bytes proc allowed*(msg: Message, config: WakuConfig): bool = # Check max msg size, already happens in RLPx but there is a specific waku @@ -226,9 +229,8 @@ p2pProtocol Waku(version = wakuVersion, nextID 126 proc p2pRequest(peer: Peer, envelope: Envelope) = - # TODO: here we would have to allow to insert some specific implementation - # such as e.g. Waku Mail Server - discard + if not peer.networkState.p2pRequestHandler.isNil(): + peer.networkState.p2pRequestHandler(peer, envelope) proc p2pMessage(peer: Peer, envelopes: openarray[Envelope]) = if peer.state.trusted: @@ -248,7 +250,11 @@ p2pProtocol Waku(version = wakuVersion, proc p2pSyncRequest(peer: Peer) = discard proc p2pSyncResponse(peer: Peer) = discard - proc p2pRequestComplete(peer: Peer) = discard + proc p2pRequestComplete(peer: Peer, data: P2PRequestCompleteObject) = discard + # This is actually rather a requestResponse in combination with p2pRequest + # However, we can't use that system due to the unfortunate fact that the + # packet IDs are not consecutive, and nextID is not recognized in between + # these. # 'Runner' calls --------------------------------------------------------------- @@ -320,12 +326,6 @@ proc sendP2PMessage(node: EthereumNode, peerId: NodeId, asyncCheck peer.p2pMessage(envelopes) return true -proc sendP2PRequest(node: EthereumNode, peerId: NodeId, env: Envelope): bool = - for peer in node.peers(Waku): - if peer.remote.id == peerId: - asyncCheck peer.p2pRequest(env) - return true - proc queueMessage(node: EthereumNode, msg: Message): bool = var wakuNet = node.protocolState(Waku) @@ -478,20 +478,3 @@ proc resetMessageQueue*(node: EthereumNode) = ## ## NOTE: Not something that should be run in normal circumstances. node.protocolState(Waku).queue[] = initQueue(defaultQueueCapacity) - -proc requestMail*(node: EthereumNode, peerId: NodeId, request: MailRequest, - symKey: SymKey): bool = - var writer = initRlpWriter() - writer.append(request) - let payload = writer.finish() - - let data = encode(Payload(payload: payload, symKey: some(symKey))) - - if data.isSome(): - # TODO: should this envelope be valid in terms of ttl, PoW, etc.? - var env = Envelope(expiry:0, ttl: 0, data: data.get(), nonce: 0) - - return node.sendP2PRequest(peerId, env) - else: - error "Encoding of payload failed" - return false diff --git a/tests/p2p/test_waku_mail.nim b/tests/p2p/test_waku_mail.nim index ae17d8c..5c94c2a 100644 --- a/tests/p2p/test_waku_mail.nim +++ b/tests/p2p/test_waku_mail.nim @@ -1,6 +1,7 @@ import - unittest, chronos, tables, sequtils, times, eth/p2p, eth/p2p/peer_pool, - eth/p2p/rlpx_protocols/waku_protocol, chronicles, + unittest, chronos, tables, sequtils, times, + eth/[p2p, async_utils], eth/p2p/peer_pool, + eth/p2p/rlpx_protocols/[waku_protocol, waku_mail], ./p2p_test_helper suite "Waku Mail Client": @@ -17,7 +18,7 @@ suite "Waku Mail Client": check: client.peerPool.connectedNodes.len() == 1 - asyncTest "Test Mail Request": + asyncTest "Mail Request and Request Complete": let topic = [byte 0, 0, 0, 0] bloom = toBloom(@[topic]) @@ -28,7 +29,8 @@ suite "Waku Mail Client": limit: limit) var symKey: SymKey - check client.requestMail(simpleServerNode.id, request, symKey) + check client.setPeerTrusted(simpleServerNode.id) + var cursorFut = client.requestMail(simpleServerNode.id, request, symKey, 1) # Simple mailserver part let peer = simpleServer.peerPool.connectedNodes[clientNode] @@ -46,7 +48,12 @@ suite "Waku Mail Client": output.bloom == bloom output.limit == limit - asyncTest "Test Mail Send": + var test: P2PRequestCompleteObject + await peer.p2pRequestComplete(test) + + check await cursorFut.withTimeout(chronos.milliseconds(100)) + + asyncTest "Mail Send": let topic = [byte 0x12, 0x34, 0x56, 0x78] let payload = repeat(byte 0, 10) var f = newFuture[int]() @@ -69,3 +76,30 @@ suite "Waku Mail Client": await f.withTimeout(chronos.milliseconds(100)) client.unsubscribeFilter(filter) + + asyncTest "Multiple Client Request and Complete": + var count = 5 + proc customHandler(peer: Peer, envelope: Envelope)= + var envelopes: seq[Envelope] + traceAsyncErrors peer.p2pMessage(envelopes) + + var test: P2PRequestCompleteObject + count = count - 1 + if count == 0: + test.cursor = @[] + else: + test.cursor = @[byte count] + traceAsyncErrors peer.p2pRequestComplete(test) + + simpleServer.enableMailServer(customHandler) + check client.setPeerTrusted(simpleServerNode.id) + var request: MailRequest + var symKey: SymKey + let cursor = + await client.requestMail(simpleServerNode.id, request, symKey, 5) + require cursor.isSome() + check: + cursor.get().len == 0 + count == 0 + + # TODO: Also check for received envelopes.