mirror of https://github.com/status-im/nim-eth.git
First go on Waku mail client addition
This commit is contained in:
parent
655fc43751
commit
9bd042c265
|
@ -53,6 +53,7 @@ proc runP2pTests() =
|
|||
"test_shh_config",
|
||||
"test_shh_connect",
|
||||
"test_waku_bridge",
|
||||
"test_waku_mail",
|
||||
"test_protocol_handlers",
|
||||
]:
|
||||
runTest("tests/p2p/" & filename)
|
||||
|
|
|
@ -76,6 +76,14 @@ type
|
|||
filters*: Filters
|
||||
config*: WakuConfig
|
||||
|
||||
MailRequest* = object
|
||||
lower*: uint32 ## Unix timestamp; oldest requested envelope's creation time
|
||||
upper*: uint32 ## Unix timestamp; newest requested envelope's creation time
|
||||
bloom*: Bytes ## Bloom filter to apply on the envelopes
|
||||
limit*: uint32 ## Maximum amount of envelopes to return
|
||||
# TODO: Not sure how cursor is supposed to work. The server is supposed
|
||||
# to also send a P2P request back? Strange. And why is it an array?
|
||||
# cursor*: Option[Bytes]
|
||||
|
||||
proc allowed*(msg: Message, config: WakuConfig): bool =
|
||||
# Check max msg size, already happens in RLPx but there is a specific waku
|
||||
|
@ -222,9 +230,10 @@ p2pProtocol Waku(version = wakuVersion,
|
|||
# such as e.g. Waku Mail Server
|
||||
discard
|
||||
|
||||
proc p2pMessage(peer: Peer, envelope: Envelope) =
|
||||
proc p2pMessage(peer: Peer, envelopes: openarray[Envelope]) =
|
||||
if peer.state.trusted:
|
||||
# when trusted we can bypass any checks on envelope
|
||||
for envelope in envelopes:
|
||||
let msg = Message(env: envelope, isP2P: true)
|
||||
peer.networkState.filters.notify(msg)
|
||||
|
||||
|
@ -304,10 +313,17 @@ proc run(node: EthereumNode, network: WakuNetwork) {.async.} =
|
|||
|
||||
# Private EthereumNode calls ---------------------------------------------------
|
||||
|
||||
proc sendP2PMessage(node: EthereumNode, peerId: NodeId, env: Envelope): bool =
|
||||
proc sendP2PMessage(node: EthereumNode, peerId: NodeId,
|
||||
envelopes: openarray[Envelope]): bool =
|
||||
for peer in node.peers(Waku):
|
||||
if peer.remote.id == peerId:
|
||||
asyncCheck peer.p2pMessage(env)
|
||||
asyncCheck peer.p2pMessage(envelopes)
|
||||
return true
|
||||
|
||||
proc sendP2PRequest(node: EthereumNode, peerId: NodeId, env: Envelope): bool =
|
||||
for peer in node.peers(Waku):
|
||||
if peer.remote.id == peerId:
|
||||
asyncCheck peer.p2pRequest(env)
|
||||
return true
|
||||
|
||||
proc queueMessage(node: EthereumNode, msg: Message): bool =
|
||||
|
@ -347,7 +363,7 @@ proc postMessage*(node: EthereumNode, pubKey = none[PublicKey](),
|
|||
|
||||
# Allow lightnode to post only direct p2p messages
|
||||
if targetPeer.isSome():
|
||||
return node.sendP2PMessage(targetPeer.get(), env)
|
||||
return node.sendP2PMessage(targetPeer.get(), @[env])
|
||||
elif not node.protocolState(Waku).config.isLightNode:
|
||||
# non direct p2p message can not have ttl of 0
|
||||
if env.ttl == 0:
|
||||
|
@ -463,3 +479,19 @@ proc resetMessageQueue*(node: EthereumNode) =
|
|||
## NOTE: Not something that should be run in normal circumstances.
|
||||
node.protocolState(Waku).queue[] = initQueue(defaultQueueCapacity)
|
||||
|
||||
proc requestMail*(node: EthereumNode, peerId: NodeId, request: MailRequest,
|
||||
symKey: SymKey): bool =
|
||||
var writer = initRlpWriter()
|
||||
writer.append(request)
|
||||
let payload = writer.finish()
|
||||
|
||||
let data = encode(Payload(payload: payload, symKey: some(symKey)))
|
||||
|
||||
if data.isSome():
|
||||
# TODO: should this envelope be valid in terms of ttl, PoW, etc.?
|
||||
var env = Envelope(expiry:0, ttl: 0, data: data.get(), nonce: 0)
|
||||
|
||||
return node.sendP2PRequest(peerId, env)
|
||||
else:
|
||||
error "Encoding of payload failed"
|
||||
return false
|
||||
|
|
|
@ -0,0 +1,71 @@
|
|||
import
|
||||
unittest, chronos, tables, sequtils, times, eth/p2p, eth/p2p/peer_pool,
|
||||
eth/p2p/rlpx_protocols/waku_protocol, chronicles,
|
||||
./p2p_test_helper
|
||||
|
||||
suite "Waku Mail Client":
|
||||
var client = setupTestNode(Waku)
|
||||
var simpleServer = setupTestNode(Waku)
|
||||
|
||||
simpleServer.startListening()
|
||||
let simpleServerNode = newNode(initENode(simpleServer.keys.pubKey,
|
||||
simpleServer.address))
|
||||
let clientNode = newNode(initENode(client.keys.pubKey, client.address))
|
||||
waitFor client.peerPool.connectToNode(simpleServerNode)
|
||||
|
||||
asyncTest "Two peers connected":
|
||||
check:
|
||||
client.peerPool.connectedNodes.len() == 1
|
||||
|
||||
asyncTest "Test Mail Request":
|
||||
let
|
||||
topic = [byte 0, 0, 0, 0]
|
||||
bloom = toBloom(@[topic])
|
||||
lower = 0'u32
|
||||
upper = epochTime().uint32
|
||||
limit = 100'u32
|
||||
request = MailRequest(lower: lower, upper: upper, bloom: @bloom,
|
||||
limit: limit)
|
||||
|
||||
var symKey: SymKey
|
||||
check client.requestMail(simpleServerNode.id, request, symKey)
|
||||
|
||||
# Simple mailserver part
|
||||
let peer = simpleServer.peerPool.connectedNodes[clientNode]
|
||||
var f = peer.nextMsg(Waku.p2pRequest)
|
||||
require await f.withTimeout(chronos.milliseconds(100))
|
||||
let response = f.read()
|
||||
let decoded = decode(response.envelope.data, symKey = some(symKey))
|
||||
require decoded.isSome()
|
||||
|
||||
var rlp = rlpFromBytes(decoded.get().payload.toRange)
|
||||
let output = rlp.read(MailRequest)
|
||||
check:
|
||||
output.lower == lower
|
||||
output.upper == upper
|
||||
output.bloom == bloom
|
||||
output.limit == limit
|
||||
|
||||
asyncTest "Test Mail Send":
|
||||
let topic = [byte 0x12, 0x34, 0x56, 0x78]
|
||||
let payload = repeat(byte 0, 10)
|
||||
var f = newFuture[int]()
|
||||
|
||||
proc handler(msg: ReceivedMessage) =
|
||||
check msg.decoded.payload == payload
|
||||
f.complete(1)
|
||||
|
||||
let filter = subscribeFilter(client,
|
||||
newFilter(topics = @[topic], allowP2P = true), handler)
|
||||
|
||||
check:
|
||||
client.setPeerTrusted(simpleServerNode.id)
|
||||
# ttl 0 to show that ttl should be ignored
|
||||
# TODO: perhaps not the best way to test this, means no PoW calculation
|
||||
# may be done, and not sure if that is OK?
|
||||
simpleServer.postMessage(ttl = 0, topic = topic, payload = payload,
|
||||
targetPeer = some(clientNode.id))
|
||||
|
||||
await f.withTimeout(chronos.milliseconds(100))
|
||||
|
||||
client.unsubscribeFilter(filter)
|
Loading…
Reference in New Issue