Quick implementation for multiple mail requests by using cursor

This commit is contained in:
kdeme 2019-12-10 21:49:54 +01:00 committed by zah
parent 9bd042c265
commit f0582a084c
4 changed files with 144 additions and 41 deletions

View File

@ -162,3 +162,7 @@ proc randomPeerWith*(node: EthereumNode, Protocol: type): Peer =
if candidates.len > 0: if candidates.len > 0:
return candidates.rand() return candidates.rand()
proc getPeer*(node: EthereumNode, peerId: NodeId, Protocol: type): Option[Peer] =
for peer in node.peers(Protocol):
if peer.remote.id == peerId:
return some(peer)

View File

@ -0,0 +1,82 @@
#
# Waku Mail Client & Server
# (c) Copyright 2019
# Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
#
import
chronos,
eth/[p2p, async_utils], eth/p2p/rlpx_protocols/waku_protocol
const
requestCompleteTimeout = chronos.seconds(5)
type
Cursor = Bytes
MailRequest* = object
lower*: uint32 ## Unix timestamp; oldest requested envelope's creation time
upper*: uint32 ## Unix timestamp; newest requested envelope's creation time
bloom*: Bytes ## Bloom filter to apply on the envelopes
limit*: uint32 ## Maximum amount of envelopes to return
cursor*: Cursor ## Optional cursor
proc requestMail*(node: EthereumNode, peerId: NodeId, request: MailRequest,
symKey: SymKey, requests = 10): Future[Option[Cursor]] {.async.} =
## Send p2p mail request and check request complete.
## If result is none, and error occured. If result is a none empty cursor,
## more envelopes are available.
# TODO: Perhaps don't go the recursive route or could use the actual response
# proc to implement this (via a handler) and store the necessary data in the
# WakuPeer object.
# TODO: move this check out of requestMail?
let peer = node.getPeer(peerId, Waku)
if not peer.isSome():
error "Invalid peer"
return result
elif not peer.get().state(Waku).trusted:
return result
var writer = initRlpWriter()
writer.append(request)
let payload = writer.finish()
let data = encode(Payload(payload: payload, symKey: some(symKey)))
if not data.isSome():
error "Encoding of payload failed"
return result
# TODO: should this envelope be valid in terms of ttl, PoW, etc.?
let env = Envelope(expiry:0, ttl: 0, data: data.get(), nonce: 0)
# Send the request
traceAsyncErrors peer.get().p2pRequest(env)
# Wait for the Request Complete packet
var f = peer.get().nextMsg(Waku.p2pRequestComplete)
if await f.withTimeout(requestCompleteTimeout):
let response = f.read()
# TODO: I guess the idea is to check requestId (Hash) also?
let requests = requests - 1
# If there is cursor data, do another request
if response.data.cursor.len > 0 and requests > 0:
var newRequest = request
newRequest.cursor = response.data.cursor
return await requestMail(node, peerId, newRequest, symKey, requests)
else:
return some(response.data.cursor)
else:
error "p2pRequestComplete timeout"
return result
proc p2pRequestHandler(peer: Peer, envelope: Envelope) =
# Mail server p2p request implementation
discard
proc enableMailServer*(node: EthereumNode, customHandler: P2PRequestHandler) =
node.protocolState(Waku).p2pRequestHandler = customHandler
proc enableMailServer*(node: EthereumNode) =
node.protocolState(Waku).p2pRequestHandler = p2pRequestHandler

View File

@ -36,7 +36,7 @@
import import
options, tables, times, chronos, chronicles, options, tables, times, chronos, chronicles,
eth/[keys, async_utils, p2p], whisper/whisper_types eth/[keys, async_utils, p2p], whisper/whisper_types, eth/trie/trie_defs
export export
whisper_types whisper_types
@ -71,19 +71,22 @@ type
trusted*: bool trusted*: bool
received: HashSet[Message] received: HashSet[Message]
P2PRequestHandler* = proc(peer: Peer, envelope: Envelope) {.gcsafe.}
WakuNetwork = ref object WakuNetwork = ref object
queue*: ref Queue queue*: ref Queue
filters*: Filters filters*: Filters
config*: WakuConfig config*: WakuConfig
p2pRequestHandler*: P2PRequestHandler
MailRequest* = object # TODO: In the current specification this is not wrapped in a regular envelope
lower*: uint32 ## Unix timestamp; oldest requested envelope's creation time # as is done for the P2P Request packet. If we could alter this in the spec it
upper*: uint32 ## Unix timestamp; newest requested envelope's creation time # would be a cleaner separation between Waku and Mail server / client and then
bloom*: Bytes ## Bloom filter to apply on the envelopes # this could be moved to waku_mail.nim
limit*: uint32 ## Maximum amount of envelopes to return P2PRequestCompleteObject* = object
# TODO: Not sure how cursor is supposed to work. The server is supposed requestId*: Hash
# to also send a P2P request back? Strange. And why is it an array? lastEnvelopeHash*: Hash
# cursor*: Option[Bytes] cursor*: Bytes
proc allowed*(msg: Message, config: WakuConfig): bool = proc allowed*(msg: Message, config: WakuConfig): bool =
# Check max msg size, already happens in RLPx but there is a specific waku # Check max msg size, already happens in RLPx but there is a specific waku
@ -226,9 +229,8 @@ p2pProtocol Waku(version = wakuVersion,
nextID 126 nextID 126
proc p2pRequest(peer: Peer, envelope: Envelope) = proc p2pRequest(peer: Peer, envelope: Envelope) =
# TODO: here we would have to allow to insert some specific implementation if not peer.networkState.p2pRequestHandler.isNil():
# such as e.g. Waku Mail Server peer.networkState.p2pRequestHandler(peer, envelope)
discard
proc p2pMessage(peer: Peer, envelopes: openarray[Envelope]) = proc p2pMessage(peer: Peer, envelopes: openarray[Envelope]) =
if peer.state.trusted: if peer.state.trusted:
@ -248,7 +250,11 @@ p2pProtocol Waku(version = wakuVersion,
proc p2pSyncRequest(peer: Peer) = discard proc p2pSyncRequest(peer: Peer) = discard
proc p2pSyncResponse(peer: Peer) = discard proc p2pSyncResponse(peer: Peer) = discard
proc p2pRequestComplete(peer: Peer) = discard proc p2pRequestComplete(peer: Peer, data: P2PRequestCompleteObject) = discard
# This is actually rather a requestResponse in combination with p2pRequest
# However, we can't use that system due to the unfortunate fact that the
# packet IDs are not consecutive, and nextID is not recognized in between
# these.
# 'Runner' calls --------------------------------------------------------------- # 'Runner' calls ---------------------------------------------------------------
@ -320,12 +326,6 @@ proc sendP2PMessage(node: EthereumNode, peerId: NodeId,
asyncCheck peer.p2pMessage(envelopes) asyncCheck peer.p2pMessage(envelopes)
return true return true
proc sendP2PRequest(node: EthereumNode, peerId: NodeId, env: Envelope): bool =
for peer in node.peers(Waku):
if peer.remote.id == peerId:
asyncCheck peer.p2pRequest(env)
return true
proc queueMessage(node: EthereumNode, msg: Message): bool = proc queueMessage(node: EthereumNode, msg: Message): bool =
var wakuNet = node.protocolState(Waku) var wakuNet = node.protocolState(Waku)
@ -478,20 +478,3 @@ proc resetMessageQueue*(node: EthereumNode) =
## ##
## NOTE: Not something that should be run in normal circumstances. ## NOTE: Not something that should be run in normal circumstances.
node.protocolState(Waku).queue[] = initQueue(defaultQueueCapacity) node.protocolState(Waku).queue[] = initQueue(defaultQueueCapacity)
proc requestMail*(node: EthereumNode, peerId: NodeId, request: MailRequest,
symKey: SymKey): bool =
var writer = initRlpWriter()
writer.append(request)
let payload = writer.finish()
let data = encode(Payload(payload: payload, symKey: some(symKey)))
if data.isSome():
# TODO: should this envelope be valid in terms of ttl, PoW, etc.?
var env = Envelope(expiry:0, ttl: 0, data: data.get(), nonce: 0)
return node.sendP2PRequest(peerId, env)
else:
error "Encoding of payload failed"
return false

View File

@ -1,6 +1,7 @@
import import
unittest, chronos, tables, sequtils, times, eth/p2p, eth/p2p/peer_pool, unittest, chronos, tables, sequtils, times,
eth/p2p/rlpx_protocols/waku_protocol, chronicles, eth/[p2p, async_utils], eth/p2p/peer_pool,
eth/p2p/rlpx_protocols/[waku_protocol, waku_mail],
./p2p_test_helper ./p2p_test_helper
suite "Waku Mail Client": suite "Waku Mail Client":
@ -17,7 +18,7 @@ suite "Waku Mail Client":
check: check:
client.peerPool.connectedNodes.len() == 1 client.peerPool.connectedNodes.len() == 1
asyncTest "Test Mail Request": asyncTest "Mail Request and Request Complete":
let let
topic = [byte 0, 0, 0, 0] topic = [byte 0, 0, 0, 0]
bloom = toBloom(@[topic]) bloom = toBloom(@[topic])
@ -28,7 +29,8 @@ suite "Waku Mail Client":
limit: limit) limit: limit)
var symKey: SymKey var symKey: SymKey
check client.requestMail(simpleServerNode.id, request, symKey) check client.setPeerTrusted(simpleServerNode.id)
var cursorFut = client.requestMail(simpleServerNode.id, request, symKey, 1)
# Simple mailserver part # Simple mailserver part
let peer = simpleServer.peerPool.connectedNodes[clientNode] let peer = simpleServer.peerPool.connectedNodes[clientNode]
@ -46,7 +48,12 @@ suite "Waku Mail Client":
output.bloom == bloom output.bloom == bloom
output.limit == limit output.limit == limit
asyncTest "Test Mail Send": var test: P2PRequestCompleteObject
await peer.p2pRequestComplete(test)
check await cursorFut.withTimeout(chronos.milliseconds(100))
asyncTest "Mail Send":
let topic = [byte 0x12, 0x34, 0x56, 0x78] let topic = [byte 0x12, 0x34, 0x56, 0x78]
let payload = repeat(byte 0, 10) let payload = repeat(byte 0, 10)
var f = newFuture[int]() var f = newFuture[int]()
@ -69,3 +76,30 @@ suite "Waku Mail Client":
await f.withTimeout(chronos.milliseconds(100)) await f.withTimeout(chronos.milliseconds(100))
client.unsubscribeFilter(filter) client.unsubscribeFilter(filter)
asyncTest "Multiple Client Request and Complete":
var count = 5
proc customHandler(peer: Peer, envelope: Envelope)=
var envelopes: seq[Envelope]
traceAsyncErrors peer.p2pMessage(envelopes)
var test: P2PRequestCompleteObject
count = count - 1
if count == 0:
test.cursor = @[]
else:
test.cursor = @[byte count]
traceAsyncErrors peer.p2pRequestComplete(test)
simpleServer.enableMailServer(customHandler)
check client.setPeerTrusted(simpleServerNode.id)
var request: MailRequest
var symKey: SymKey
let cursor =
await client.requestMail(simpleServerNode.id, request, symKey, 5)
require cursor.isSome()
check:
cursor.get().len == 0
count == 0
# TODO: Also check for received envelopes.