mirror of https://github.com/status-im/nim-eth.git
Implement quick Waku - Whisper bridge by sharing the queue + adjust test
This commit is contained in:
parent
8d45b22033
commit
a8a55f16dc
|
@ -52,6 +52,7 @@ proc runP2pTests() =
|
||||||
"test_shh",
|
"test_shh",
|
||||||
"test_shh_config",
|
"test_shh_config",
|
||||||
"test_shh_connect",
|
"test_shh_connect",
|
||||||
|
"test_waku_bridge",
|
||||||
"test_protocol_handlers",
|
"test_protocol_handlers",
|
||||||
]:
|
]:
|
||||||
runTest("tests/p2p/" & filename)
|
runTest("tests/p2p/" & filename)
|
||||||
|
|
|
@ -38,6 +38,8 @@ import
|
||||||
options, tables, times, chronos, chronicles,
|
options, tables, times, chronos, chronicles,
|
||||||
eth/[keys, async_utils, p2p], whisper/whisper_types
|
eth/[keys, async_utils, p2p], whisper/whisper_types
|
||||||
|
|
||||||
|
import eth/p2p/rlpx_protocols/whisper_protocol
|
||||||
|
|
||||||
export
|
export
|
||||||
whisper_types
|
whisper_types
|
||||||
|
|
||||||
|
@ -72,7 +74,7 @@ type
|
||||||
received: HashSet[Message]
|
received: HashSet[Message]
|
||||||
|
|
||||||
WakuNetwork = ref object
|
WakuNetwork = ref object
|
||||||
queue*: Queue
|
queue*: ref Queue
|
||||||
filters*: Filters
|
filters*: Filters
|
||||||
config*: WakuConfig
|
config*: WakuConfig
|
||||||
|
|
||||||
|
@ -98,7 +100,11 @@ proc run(peer: Peer) {.gcsafe, async.}
|
||||||
proc run(node: EthereumNode, network: WakuNetwork) {.gcsafe, async.}
|
proc run(node: EthereumNode, network: WakuNetwork) {.gcsafe, async.}
|
||||||
|
|
||||||
proc initProtocolState*(network: WakuNetwork, node: EthereumNode) {.gcsafe.} =
|
proc initProtocolState*(network: WakuNetwork, node: EthereumNode) {.gcsafe.} =
|
||||||
network.queue = initQueue(defaultQueueCapacity)
|
if node.protocolState(Whisper).isNil:
|
||||||
|
new(network.queue)
|
||||||
|
network.queue[] = initQueue(defaultQueueCapacity)
|
||||||
|
else:
|
||||||
|
network.queue = node.protocolState(Whisper).queue
|
||||||
network.filters = initTable[string, Filter]()
|
network.filters = initTable[string, Filter]()
|
||||||
network.config.bloom = fullBloom()
|
network.config.bloom = fullBloom()
|
||||||
network.config.powRequirement = defaultMinPow
|
network.config.powRequirement = defaultMinPow
|
||||||
|
@ -195,7 +201,7 @@ p2pProtocol Waku(version = wakuVersion,
|
||||||
|
|
||||||
# This can still be a duplicate message, but from another peer than
|
# This can still be a duplicate message, but from another peer than
|
||||||
# the peer who send the message.
|
# the peer who send the message.
|
||||||
if peer.networkState.queue.add(msg):
|
if peer.networkState.queue[].add(msg):
|
||||||
# notify filters of this message
|
# notify filters of this message
|
||||||
peer.networkState.filters.notify(msg)
|
peer.networkState.filters.notify(msg)
|
||||||
|
|
||||||
|
@ -294,7 +300,7 @@ proc run(node: EthereumNode, network: WakuNetwork) {.async.} =
|
||||||
while true:
|
while true:
|
||||||
# prune message queue every second
|
# prune message queue every second
|
||||||
# TTL unit is in seconds, so this should be sufficient?
|
# TTL unit is in seconds, so this should be sufficient?
|
||||||
network.queue.prune()
|
network.queue[].prune()
|
||||||
# pruning the received sets is not necessary for correct workings
|
# pruning the received sets is not necessary for correct workings
|
||||||
# but simply from keeping the sets growing indefinitely
|
# but simply from keeping the sets growing indefinitely
|
||||||
node.pruneReceived()
|
node.pruneReceived()
|
||||||
|
@ -317,7 +323,7 @@ proc queueMessage(node: EthereumNode, msg: Message): bool =
|
||||||
return false
|
return false
|
||||||
|
|
||||||
trace "Adding message to queue"
|
trace "Adding message to queue"
|
||||||
if wakuNet.queue.add(msg):
|
if wakuNet.queue[].add(msg):
|
||||||
# Also notify our own filters of the message we are sending,
|
# Also notify our own filters of the message we are sending,
|
||||||
# e.g. msg from local Dapp to Dapp
|
# e.g. msg from local Dapp to Dapp
|
||||||
wakuNet.filters.notify(msg)
|
wakuNet.filters.notify(msg)
|
||||||
|
@ -459,4 +465,4 @@ proc resetMessageQueue*(node: EthereumNode) =
|
||||||
## Full reset of the message queue.
|
## Full reset of the message queue.
|
||||||
##
|
##
|
||||||
## NOTE: Not something that should be run in normal circumstances.
|
## NOTE: Not something that should be run in normal circumstances.
|
||||||
node.protocolState(Waku).queue = initQueue(defaultQueueCapacity)
|
node.protocolState(Waku).queue[] = initQueue(defaultQueueCapacity)
|
||||||
|
|
|
@ -70,7 +70,7 @@ type
|
||||||
received: HashSet[Message]
|
received: HashSet[Message]
|
||||||
|
|
||||||
WhisperNetwork = ref object
|
WhisperNetwork = ref object
|
||||||
queue*: Queue
|
queue*: ref Queue
|
||||||
filters*: Filters
|
filters*: Filters
|
||||||
config*: WhisperConfig
|
config*: WhisperConfig
|
||||||
|
|
||||||
|
@ -95,7 +95,8 @@ proc run(peer: Peer) {.gcsafe, async.}
|
||||||
proc run(node: EthereumNode, network: WhisperNetwork) {.gcsafe, async.}
|
proc run(node: EthereumNode, network: WhisperNetwork) {.gcsafe, async.}
|
||||||
|
|
||||||
proc initProtocolState*(network: WhisperNetwork, node: EthereumNode) {.gcsafe.} =
|
proc initProtocolState*(network: WhisperNetwork, node: EthereumNode) {.gcsafe.} =
|
||||||
network.queue = initQueue(defaultQueueCapacity)
|
new(network.queue)
|
||||||
|
network.queue[] = initQueue(defaultQueueCapacity)
|
||||||
network.filters = initTable[string, Filter]()
|
network.filters = initTable[string, Filter]()
|
||||||
network.config.bloom = fullBloom()
|
network.config.bloom = fullBloom()
|
||||||
network.config.powRequirement = defaultMinPow
|
network.config.powRequirement = defaultMinPow
|
||||||
|
@ -192,7 +193,7 @@ p2pProtocol Whisper(version = whisperVersion,
|
||||||
|
|
||||||
# This can still be a duplicate message, but from another peer than
|
# This can still be a duplicate message, but from another peer than
|
||||||
# the peer who send the message.
|
# the peer who send the message.
|
||||||
if peer.networkState.queue.add(msg):
|
if peer.networkState.queue[].add(msg):
|
||||||
# notify filters of this message
|
# notify filters of this message
|
||||||
peer.networkState.filters.notify(msg)
|
peer.networkState.filters.notify(msg)
|
||||||
|
|
||||||
|
@ -291,7 +292,7 @@ proc run(node: EthereumNode, network: WhisperNetwork) {.async.} =
|
||||||
while true:
|
while true:
|
||||||
# prune message queue every second
|
# prune message queue every second
|
||||||
# TTL unit is in seconds, so this should be sufficient?
|
# TTL unit is in seconds, so this should be sufficient?
|
||||||
network.queue.prune()
|
network.queue[].prune()
|
||||||
# pruning the received sets is not necessary for correct workings
|
# pruning the received sets is not necessary for correct workings
|
||||||
# but simply from keeping the sets growing indefinitely
|
# but simply from keeping the sets growing indefinitely
|
||||||
node.pruneReceived()
|
node.pruneReceived()
|
||||||
|
@ -314,7 +315,7 @@ proc queueMessage(node: EthereumNode, msg: Message): bool =
|
||||||
return false
|
return false
|
||||||
|
|
||||||
trace "Adding message to queue"
|
trace "Adding message to queue"
|
||||||
if whisperNet.queue.add(msg):
|
if whisperNet.queue[].add(msg):
|
||||||
# Also notify our own filters of the message we are sending,
|
# Also notify our own filters of the message we are sending,
|
||||||
# e.g. msg from local Dapp to Dapp
|
# e.g. msg from local Dapp to Dapp
|
||||||
whisperNet.filters.notify(msg)
|
whisperNet.filters.notify(msg)
|
||||||
|
@ -456,4 +457,4 @@ proc resetMessageQueue*(node: EthereumNode) =
|
||||||
## Full reset of the message queue.
|
## Full reset of the message queue.
|
||||||
##
|
##
|
||||||
## NOTE: Not something that should be run in normal circumstances.
|
## NOTE: Not something that should be run in normal circumstances.
|
||||||
node.protocolState(Whisper).queue = initQueue(defaultQueueCapacity)
|
node.protocolState(Whisper).queue[] = initQueue(defaultQueueCapacity)
|
||||||
|
|
|
@ -1,416 +0,0 @@
|
||||||
#
|
|
||||||
# Ethereum P2P
|
|
||||||
# (c) Copyright 2018
|
|
||||||
# Status Research & Development GmbH
|
|
||||||
#
|
|
||||||
# Licensed under either of
|
|
||||||
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
|
||||||
# MIT license (LICENSE-MIT)
|
|
||||||
|
|
||||||
import
|
|
||||||
sequtils, options, unittest, times, tables,
|
|
||||||
nimcrypto/hash,
|
|
||||||
eth/[keys, rlp],
|
|
||||||
eth/p2p/rlpx_protocols/waku_protocol as waku
|
|
||||||
|
|
||||||
suite "Waku payload":
|
|
||||||
test "should roundtrip without keys":
|
|
||||||
let payload = Payload(payload: @[byte 0, 1, 2])
|
|
||||||
let encoded = waku.encode(payload)
|
|
||||||
|
|
||||||
let decoded = waku.decode(encoded.get())
|
|
||||||
check:
|
|
||||||
decoded.isSome()
|
|
||||||
payload.payload == decoded.get().payload
|
|
||||||
decoded.get().src.isNone()
|
|
||||||
decoded.get().padding.get().len == 251 # 256 -1 -1 -3
|
|
||||||
|
|
||||||
test "should roundtrip with symmetric encryption":
|
|
||||||
var symKey: SymKey
|
|
||||||
let payload = Payload(symKey: some(symKey), payload: @[byte 0, 1, 2])
|
|
||||||
let encoded = waku.encode(payload)
|
|
||||||
|
|
||||||
let decoded = waku.decode(encoded.get(), symKey = some(symKey))
|
|
||||||
check:
|
|
||||||
decoded.isSome()
|
|
||||||
payload.payload == decoded.get().payload
|
|
||||||
decoded.get().src.isNone()
|
|
||||||
decoded.get().padding.get().len == 251 # 256 -1 -1 -3
|
|
||||||
|
|
||||||
test "should roundtrip with signature":
|
|
||||||
let privKey = keys.newPrivateKey()
|
|
||||||
|
|
||||||
let payload = Payload(src: some(privKey), payload: @[byte 0, 1, 2])
|
|
||||||
let encoded = waku.encode(payload)
|
|
||||||
|
|
||||||
let decoded = waku.decode(encoded.get())
|
|
||||||
check:
|
|
||||||
decoded.isSome()
|
|
||||||
payload.payload == decoded.get().payload
|
|
||||||
privKey.getPublicKey() == decoded.get().src.get()
|
|
||||||
decoded.get().padding.get().len == 186 # 256 -1 -1 -3 -65
|
|
||||||
|
|
||||||
test "should roundtrip with asymmetric encryption":
|
|
||||||
let privKey = keys.newPrivateKey()
|
|
||||||
|
|
||||||
let payload = Payload(dst: some(privKey.getPublicKey()),
|
|
||||||
payload: @[byte 0, 1, 2])
|
|
||||||
let encoded = waku.encode(payload)
|
|
||||||
|
|
||||||
let decoded = waku.decode(encoded.get(), dst = some(privKey))
|
|
||||||
check:
|
|
||||||
decoded.isSome()
|
|
||||||
payload.payload == decoded.get().payload
|
|
||||||
decoded.get().src.isNone()
|
|
||||||
decoded.get().padding.get().len == 251 # 256 -1 -1 -3
|
|
||||||
|
|
||||||
test "should return specified bloom":
|
|
||||||
# Geth test: https://github.com/ethersphere/go-ethereum/blob/d3441ebb563439bac0837d70591f92e2c6080303/waku/wakuv6/waku_test.go#L834
|
|
||||||
let top0 = [byte 0, 0, 255, 6]
|
|
||||||
var x: Bloom
|
|
||||||
x[0] = byte 1
|
|
||||||
x[32] = byte 1
|
|
||||||
x[^1] = byte 128
|
|
||||||
check @(top0.topicBloom) == @x
|
|
||||||
|
|
||||||
suite "Waku payload padding":
|
|
||||||
test "should do max padding":
|
|
||||||
let payload = Payload(payload: repeat(byte 1, 254))
|
|
||||||
let encoded = waku.encode(payload)
|
|
||||||
|
|
||||||
let decoded = waku.decode(encoded.get())
|
|
||||||
check:
|
|
||||||
decoded.isSome()
|
|
||||||
payload.payload == decoded.get().payload
|
|
||||||
decoded.get().padding.isSome()
|
|
||||||
decoded.get().padding.get().len == 256 # as dataLen == 256
|
|
||||||
|
|
||||||
test "should do max padding with signature":
|
|
||||||
let privKey = keys.newPrivateKey()
|
|
||||||
|
|
||||||
let payload = Payload(src: some(privKey), payload: repeat(byte 1, 189))
|
|
||||||
let encoded = waku.encode(payload)
|
|
||||||
|
|
||||||
let decoded = waku.decode(encoded.get())
|
|
||||||
check:
|
|
||||||
decoded.isSome()
|
|
||||||
payload.payload == decoded.get().payload
|
|
||||||
privKey.getPublicKey() == decoded.get().src.get()
|
|
||||||
decoded.get().padding.isSome()
|
|
||||||
decoded.get().padding.get().len == 256 # as dataLen == 256
|
|
||||||
|
|
||||||
test "should do min padding":
|
|
||||||
let payload = Payload(payload: repeat(byte 1, 253))
|
|
||||||
let encoded = waku.encode(payload)
|
|
||||||
|
|
||||||
let decoded = waku.decode(encoded.get())
|
|
||||||
check:
|
|
||||||
decoded.isSome()
|
|
||||||
payload.payload == decoded.get().payload
|
|
||||||
decoded.get().padding.isSome()
|
|
||||||
decoded.get().padding.get().len == 1 # as dataLen == 255
|
|
||||||
|
|
||||||
test "should do min padding with signature":
|
|
||||||
let privKey = keys.newPrivateKey()
|
|
||||||
|
|
||||||
let payload = Payload(src: some(privKey), payload: repeat(byte 1, 188))
|
|
||||||
let encoded = waku.encode(payload)
|
|
||||||
|
|
||||||
let decoded = waku.decode(encoded.get())
|
|
||||||
check:
|
|
||||||
decoded.isSome()
|
|
||||||
payload.payload == decoded.get().payload
|
|
||||||
privKey.getPublicKey() == decoded.get().src.get()
|
|
||||||
decoded.get().padding.isSome()
|
|
||||||
decoded.get().padding.get().len == 1 # as dataLen == 255
|
|
||||||
|
|
||||||
test "should roundtrip custom padding":
|
|
||||||
let payload = Payload(payload: repeat(byte 1, 10),
|
|
||||||
padding: some(repeat(byte 2, 100)))
|
|
||||||
let encoded = waku.encode(payload)
|
|
||||||
|
|
||||||
let decoded = waku.decode(encoded.get())
|
|
||||||
check:
|
|
||||||
decoded.isSome()
|
|
||||||
payload.payload == decoded.get().payload
|
|
||||||
decoded.get().padding.isSome()
|
|
||||||
payload.padding.get() == decoded.get().padding.get()
|
|
||||||
|
|
||||||
test "should roundtrip custom 0 padding":
|
|
||||||
let padding: seq[byte] = @[]
|
|
||||||
let payload = Payload(payload: repeat(byte 1, 10),
|
|
||||||
padding: some(padding))
|
|
||||||
let encoded = waku.encode(payload)
|
|
||||||
|
|
||||||
let decoded = waku.decode(encoded.get())
|
|
||||||
check:
|
|
||||||
decoded.isSome()
|
|
||||||
payload.payload == decoded.get().payload
|
|
||||||
decoded.get().padding.isNone()
|
|
||||||
|
|
||||||
test "should roundtrip custom padding with signature":
|
|
||||||
let privKey = keys.newPrivateKey()
|
|
||||||
let payload = Payload(src: some(privKey), payload: repeat(byte 1, 10),
|
|
||||||
padding: some(repeat(byte 2, 100)))
|
|
||||||
let encoded = waku.encode(payload)
|
|
||||||
|
|
||||||
let decoded = waku.decode(encoded.get())
|
|
||||||
check:
|
|
||||||
decoded.isSome()
|
|
||||||
payload.payload == decoded.get().payload
|
|
||||||
privKey.getPublicKey() == decoded.get().src.get()
|
|
||||||
decoded.get().padding.isSome()
|
|
||||||
payload.padding.get() == decoded.get().padding.get()
|
|
||||||
|
|
||||||
test "should roundtrip custom 0 padding with signature":
|
|
||||||
let padding: seq[byte] = @[]
|
|
||||||
let privKey = keys.newPrivateKey()
|
|
||||||
let payload = Payload(src: some(privKey), payload: repeat(byte 1, 10),
|
|
||||||
padding: some(padding))
|
|
||||||
let encoded = waku.encode(payload)
|
|
||||||
|
|
||||||
let decoded = waku.decode(encoded.get())
|
|
||||||
check:
|
|
||||||
decoded.isSome()
|
|
||||||
payload.payload == decoded.get().payload
|
|
||||||
privKey.getPublicKey() == decoded.get().src.get()
|
|
||||||
decoded.get().padding.isNone()
|
|
||||||
|
|
||||||
# example from https://github.com/paritytech/parity-ethereum/blob/93e1040d07e385d1219d00af71c46c720b0a1acf/waku/src/message.rs#L439
|
|
||||||
let
|
|
||||||
env0 = Envelope(
|
|
||||||
expiry:100000, ttl: 30, topic: [byte 0, 0, 0, 0],
|
|
||||||
data: repeat(byte 9, 256), nonce: 1010101)
|
|
||||||
env1 = Envelope(
|
|
||||||
expiry:100000, ttl: 30, topic: [byte 0, 0, 0, 0],
|
|
||||||
data: repeat(byte 9, 256), nonce: 1010102)
|
|
||||||
|
|
||||||
suite "Waku envelope":
|
|
||||||
|
|
||||||
proc hashAndPow(env: Envelope): (string, float64) =
|
|
||||||
# This is the current implementation of go-ethereum
|
|
||||||
let size = env.toShortRlp().len().uint32
|
|
||||||
# This is our current implementation in `waku_protocol.nim`
|
|
||||||
# let size = env.len().uint32
|
|
||||||
# This is the EIP-627 specification
|
|
||||||
# let size = env.toRlp().len().uint32
|
|
||||||
let hash = env.calcPowHash()
|
|
||||||
($hash, calcPow(size, env.ttl, hash))
|
|
||||||
|
|
||||||
test "PoW calculation leading zeroes tests":
|
|
||||||
# Test values from Parity, in message.rs
|
|
||||||
let testHashes = [
|
|
||||||
# 256 leading zeroes
|
|
||||||
"0x0000000000000000000000000000000000000000000000000000000000000000",
|
|
||||||
# 255 leading zeroes
|
|
||||||
"0x0000000000000000000000000000000000000000000000000000000000000001",
|
|
||||||
# no leading zeroes
|
|
||||||
"0xff00000000000000000000000000000000000000000000000000000000000000"
|
|
||||||
]
|
|
||||||
check:
|
|
||||||
calcPow(1, 1, Hash.fromHex(testHashes[0])) ==
|
|
||||||
115792089237316200000000000000000000000000000000000000000000000000000000000000.0
|
|
||||||
calcPow(1, 1, Hash.fromHex(testHashes[1])) ==
|
|
||||||
57896044618658100000000000000000000000000000000000000000000000000000000000000.0
|
|
||||||
calcPow(1, 1, Hash.fromHex(testHashes[2])) == 1.0
|
|
||||||
|
|
||||||
# Test values from go-ethereum wakuv6 in envelope_test
|
|
||||||
var env = Envelope(ttl: 1, data: @[byte 0xde, 0xad, 0xbe, 0xef])
|
|
||||||
# PoW calculation with no leading zeroes
|
|
||||||
env.nonce = 100000
|
|
||||||
check hashAndPoW(env) == ("A788E02A95BFC673709E97CA81E39CA903BAD5638D3388964C51EB64952172D6",
|
|
||||||
0.07692307692307693)
|
|
||||||
# PoW calculation with 8 leading zeroes
|
|
||||||
env.nonce = 276
|
|
||||||
check hashAndPoW(env) == ("00E2374C6353C243E4073E209A7F2ACB2506522AF318B3B78CF9A88310A2A11C",
|
|
||||||
19.692307692307693)
|
|
||||||
|
|
||||||
test "should validate and allow envelope according to config":
|
|
||||||
let ttl = 1'u32
|
|
||||||
let topic = [byte 1, 2, 3, 4]
|
|
||||||
let config = WakuConfig(powRequirement: 0, bloom: topic.topicBloom(),
|
|
||||||
isLightNode: false, maxMsgSize: defaultMaxMsgSize)
|
|
||||||
|
|
||||||
let env = Envelope(expiry:epochTime().uint32 + ttl, ttl: ttl, topic: topic,
|
|
||||||
data: repeat(byte 9, 256), nonce: 0)
|
|
||||||
check env.valid()
|
|
||||||
|
|
||||||
let msg = initMessage(env)
|
|
||||||
check msg.allowed(config)
|
|
||||||
|
|
||||||
test "should invalidate envelope due to ttl 0":
|
|
||||||
let ttl = 0'u32
|
|
||||||
let topic = [byte 1, 2, 3, 4]
|
|
||||||
let config = WakuConfig(powRequirement: 0, bloom: topic.topicBloom(),
|
|
||||||
isLightNode: false, maxMsgSize: defaultMaxMsgSize)
|
|
||||||
|
|
||||||
let env = Envelope(expiry:epochTime().uint32 + ttl, ttl: ttl, topic: topic,
|
|
||||||
data: repeat(byte 9, 256), nonce: 0)
|
|
||||||
check env.valid() == false
|
|
||||||
|
|
||||||
test "should invalidate envelope due to expired":
|
|
||||||
let ttl = 1'u32
|
|
||||||
let topic = [byte 1, 2, 3, 4]
|
|
||||||
let config = WakuConfig(powRequirement: 0, bloom: topic.topicBloom(),
|
|
||||||
isLightNode: false, maxMsgSize: defaultMaxMsgSize)
|
|
||||||
|
|
||||||
let env = Envelope(expiry:epochTime().uint32, ttl: ttl, topic: topic,
|
|
||||||
data: repeat(byte 9, 256), nonce: 0)
|
|
||||||
check env.valid() == false
|
|
||||||
|
|
||||||
test "should invalidate envelope due to in the future":
|
|
||||||
let ttl = 1'u32
|
|
||||||
let topic = [byte 1, 2, 3, 4]
|
|
||||||
let config = WakuConfig(powRequirement: 0, bloom: topic.topicBloom(),
|
|
||||||
isLightNode: false, maxMsgSize: defaultMaxMsgSize)
|
|
||||||
|
|
||||||
# there is currently a 2 second tolerance, hence the + 3
|
|
||||||
let env = Envelope(expiry:epochTime().uint32 + ttl + 3, ttl: ttl, topic: topic,
|
|
||||||
data: repeat(byte 9, 256), nonce: 0)
|
|
||||||
check env.valid() == false
|
|
||||||
|
|
||||||
test "should not allow envelope due to bloom filter":
|
|
||||||
let topic = [byte 1, 2, 3, 4]
|
|
||||||
let wrongTopic = [byte 9, 8, 7, 6]
|
|
||||||
let config = WakuConfig(powRequirement: 0, bloom: wrongTopic.topicBloom(),
|
|
||||||
isLightNode: false, maxMsgSize: defaultMaxMsgSize)
|
|
||||||
|
|
||||||
let env = Envelope(expiry:100000 , ttl: 30, topic: topic,
|
|
||||||
data: repeat(byte 9, 256), nonce: 0)
|
|
||||||
|
|
||||||
let msg = initMessage(env)
|
|
||||||
check msg.allowed(config) == false
|
|
||||||
|
|
||||||
|
|
||||||
suite "Waku queue":
|
|
||||||
test "should throw out lower proof-of-work item when full":
|
|
||||||
var queue = initQueue(1)
|
|
||||||
|
|
||||||
let msg0 = initMessage(env0)
|
|
||||||
let msg1 = initMessage(env1)
|
|
||||||
|
|
||||||
discard queue.add(msg0)
|
|
||||||
discard queue.add(msg1)
|
|
||||||
|
|
||||||
check:
|
|
||||||
queue.items.len() == 1
|
|
||||||
queue.items[0].env.nonce ==
|
|
||||||
(if msg0.pow > msg1.pow: msg0.env.nonce else: msg1.env.nonce)
|
|
||||||
|
|
||||||
test "should not throw out messages as long as there is capacity":
|
|
||||||
var queue = initQueue(2)
|
|
||||||
|
|
||||||
check:
|
|
||||||
queue.add(initMessage(env0)) == true
|
|
||||||
queue.add(initMessage(env1)) == true
|
|
||||||
|
|
||||||
queue.items.len() == 2
|
|
||||||
|
|
||||||
test "check field order against expected rlp order":
|
|
||||||
check rlp.encode(env0) ==
|
|
||||||
rlp.encodeList(env0.expiry, env0.ttl, env0.topic, env0.data, env0.nonce)
|
|
||||||
|
|
||||||
# To test filters we do not care if the msg is valid or allowed
|
|
||||||
proc prepFilterTestMsg(pubKey = none[PublicKey](), symKey = none[SymKey](),
|
|
||||||
src = none[PrivateKey](), topic: Topic,
|
|
||||||
padding = none[seq[byte]]()): Message =
|
|
||||||
let payload = Payload(dst: pubKey, symKey: symKey, src: src,
|
|
||||||
payload: @[byte 0, 1, 2], padding: padding)
|
|
||||||
let encoded = waku.encode(payload)
|
|
||||||
let env = Envelope(expiry: 1, ttl: 1, topic: topic, data: encoded.get(),
|
|
||||||
nonce: 0)
|
|
||||||
result = initMessage(env)
|
|
||||||
|
|
||||||
suite "Waku filter":
|
|
||||||
test "should notify filter on message with symmetric encryption":
|
|
||||||
var symKey: SymKey
|
|
||||||
let topic = [byte 0, 0, 0, 0]
|
|
||||||
let msg = prepFilterTestMsg(symKey = some(symKey), topic = topic)
|
|
||||||
|
|
||||||
var filters = initTable[string, Filter]()
|
|
||||||
let filter = newFilter(symKey = some(symKey), topics = @[topic])
|
|
||||||
let filterId = filters.subscribeFilter(filter)
|
|
||||||
|
|
||||||
notify(filters, msg)
|
|
||||||
|
|
||||||
let messages = filters.getFilterMessages(filterId)
|
|
||||||
check:
|
|
||||||
messages.len == 1
|
|
||||||
messages[0].decoded.src.isNone()
|
|
||||||
messages[0].dst.isNone()
|
|
||||||
|
|
||||||
test "should notify filter on message with asymmetric encryption":
|
|
||||||
let privKey = keys.newPrivateKey()
|
|
||||||
let topic = [byte 0, 0, 0, 0]
|
|
||||||
let msg = prepFilterTestMsg(pubKey = some(privKey.getPublicKey()),
|
|
||||||
topic = topic)
|
|
||||||
|
|
||||||
var filters = initTable[string, Filter]()
|
|
||||||
let filter = newFilter(privateKey = some(privKey), topics = @[topic])
|
|
||||||
let filterId = filters.subscribeFilter(filter)
|
|
||||||
|
|
||||||
notify(filters, msg)
|
|
||||||
|
|
||||||
let messages = filters.getFilterMessages(filterId)
|
|
||||||
check:
|
|
||||||
messages.len == 1
|
|
||||||
messages[0].decoded.src.isNone()
|
|
||||||
messages[0].dst.isSome()
|
|
||||||
|
|
||||||
test "should notify filter on message with signature":
|
|
||||||
let privKey = keys.newPrivateKey()
|
|
||||||
let topic = [byte 0, 0, 0, 0]
|
|
||||||
let msg = prepFilterTestMsg(src = some(privKey), topic = topic)
|
|
||||||
|
|
||||||
var filters = initTable[string, Filter]()
|
|
||||||
let filter = newFilter(src = some(privKey.getPublicKey()),
|
|
||||||
topics = @[topic])
|
|
||||||
let filterId = filters.subscribeFilter(filter)
|
|
||||||
|
|
||||||
notify(filters, msg)
|
|
||||||
|
|
||||||
let messages = filters.getFilterMessages(filterId)
|
|
||||||
check:
|
|
||||||
messages.len == 1
|
|
||||||
messages[0].decoded.src.isSome()
|
|
||||||
messages[0].dst.isNone()
|
|
||||||
|
|
||||||
test "test notify of filter against PoW requirement":
|
|
||||||
let topic = [byte 0, 0, 0, 0]
|
|
||||||
let padding = some(repeat(byte 0, 251))
|
|
||||||
# this message has a PoW of 0.02962962962962963, number should be updated
|
|
||||||
# in case PoW algorithm changes or contents of padding, payload, topic, etc.
|
|
||||||
# update: now with NON rlp encoded envelope size the PoW of this message is
|
|
||||||
# 0.014492753623188406
|
|
||||||
let msg = prepFilterTestMsg(topic = topic, padding = padding)
|
|
||||||
|
|
||||||
var filters = initTable[string, Filter]()
|
|
||||||
let
|
|
||||||
filterId1 = filters.subscribeFilter(
|
|
||||||
newFilter(topics = @[topic], powReq = 0.014492753623188406))
|
|
||||||
filterId2 = filters.subscribeFilter(
|
|
||||||
newFilter(topics = @[topic], powReq = 0.014492753623188407))
|
|
||||||
|
|
||||||
notify(filters, msg)
|
|
||||||
|
|
||||||
check:
|
|
||||||
filters.getFilterMessages(filterId1).len == 1
|
|
||||||
filters.getFilterMessages(filterId2).len == 0
|
|
||||||
|
|
||||||
test "test notify of filter on message with certain topic":
|
|
||||||
let
|
|
||||||
topic1 = [byte 0xAB, 0x12, 0xCD, 0x34]
|
|
||||||
topic2 = [byte 0, 0, 0, 0]
|
|
||||||
|
|
||||||
let msg = prepFilterTestMsg(topic = topic1)
|
|
||||||
|
|
||||||
var filters = initTable[string, Filter]()
|
|
||||||
let
|
|
||||||
filterId1 = filters.subscribeFilter(newFilter(topics = @[topic1]))
|
|
||||||
filterId2 = filters.subscribeFilter(newFilter(topics = @[topic2]))
|
|
||||||
|
|
||||||
notify(filters, msg)
|
|
||||||
|
|
||||||
check:
|
|
||||||
filters.getFilterMessages(filterId1).len == 1
|
|
||||||
filters.getFilterMessages(filterId2).len == 0
|
|
|
@ -0,0 +1,93 @@
|
||||||
|
#
|
||||||
|
# Ethereum P2P
|
||||||
|
# (c) Copyright 2018
|
||||||
|
# Status Research & Development GmbH
|
||||||
|
#
|
||||||
|
# Licensed under either of
|
||||||
|
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
||||||
|
# MIT license (LICENSE-MIT)
|
||||||
|
|
||||||
|
import
|
||||||
|
sequtils, unittest, tables, chronos, eth/p2p, eth/p2p/peer_pool,
|
||||||
|
./p2p_test_helper
|
||||||
|
|
||||||
|
import eth/p2p/rlpx_protocols/waku_protocol as waku
|
||||||
|
import eth/p2p/rlpx_protocols/whisper_protocol as whisper
|
||||||
|
|
||||||
|
let safeTTL = 5'u32
|
||||||
|
let waitInterval = waku.messageInterval + 150.milliseconds
|
||||||
|
|
||||||
|
suite "Waku - Whisper bridge tests":
|
||||||
|
# Waku Whisper node has both capabilities, listens to Whisper and Waku and
|
||||||
|
# relays traffic between the two.
|
||||||
|
var
|
||||||
|
nodeWakuWhisper = setupTestNode(Whisper, Waku) # This will be the bridge
|
||||||
|
nodeWhisper = setupTestNode(Whisper)
|
||||||
|
nodeWaku = setupTestNode(Waku)
|
||||||
|
|
||||||
|
nodeWakuWhisper.startListening()
|
||||||
|
let bridgeNode = newNode(initENode(nodeWakuWhisper.keys.pubKey,
|
||||||
|
nodeWakuWhisper.address))
|
||||||
|
waitFor nodeWhisper.peerPool.connectToNode(bridgeNode)
|
||||||
|
waitFor nodeWaku.peerPool.connectToNode(bridgeNode)
|
||||||
|
|
||||||
|
asyncTest "WakuWhisper and Whisper peers connected":
|
||||||
|
check:
|
||||||
|
nodeWakuWhisper.peerPool.connectedNodes.len() == 2
|
||||||
|
|
||||||
|
asyncTest "Whisper - Waku communcation via bridge":
|
||||||
|
# topic whisper node subscribes to, waku node posts to
|
||||||
|
let topic1 = [byte 0x12, 0, 0, 0]
|
||||||
|
# topic waku node subscribes to, whisper node posts to
|
||||||
|
let topic2 = [byte 0x34, 0, 0, 0]
|
||||||
|
var payloads = [repeat(byte 0, 10), repeat(byte 1, 10)]
|
||||||
|
var futures = [newFuture[int](), newFuture[int]()]
|
||||||
|
|
||||||
|
proc handler1(msg: whisper.ReceivedMessage) =
|
||||||
|
check msg.decoded.payload == payloads[0]
|
||||||
|
futures[0].complete(1)
|
||||||
|
proc handler2(msg: waku.ReceivedMessage) =
|
||||||
|
check msg.decoded.payload == payloads[1]
|
||||||
|
futures[1].complete(1)
|
||||||
|
|
||||||
|
var filter1 = whisper.subscribeFilter(nodeWhisper,
|
||||||
|
whisper.newFilter(topics = @[topic1]), handler1)
|
||||||
|
var filter2 = waku.subscribeFilter(nodeWaku,
|
||||||
|
waku.newFilter(topics = @[topic2]), handler2)
|
||||||
|
|
||||||
|
check:
|
||||||
|
# Message should also end up in the Whisper node its queue via the bridge
|
||||||
|
waku.postMessage(nodeWaku, ttl = safeTTL + 1, topic = topic1,
|
||||||
|
payload = payloads[0]) == true
|
||||||
|
# Message should also end up in the Waku node its queue via the bridge
|
||||||
|
whisper.postMessage(nodeWhisper, ttl = safeTTL, topic = topic2,
|
||||||
|
payload = payloads[1]) == true
|
||||||
|
nodeWhisper.protocolState(Whisper).queue.items.len == 1
|
||||||
|
nodeWaku.protocolState(Waku).queue.items.len == 1
|
||||||
|
|
||||||
|
# waitInterval*2 as messages have to pass the bridge also (2 hops)
|
||||||
|
await allFutures(futures).withTimeout(waitInterval*2)
|
||||||
|
|
||||||
|
# Relay can receive Whisper & Waku messages
|
||||||
|
nodeWakuWhisper.protocolState(Whisper).queue.items.len == 2
|
||||||
|
nodeWakuWhisper.protocolState(Waku).queue.items.len == 2
|
||||||
|
|
||||||
|
# Whisper node can receive Waku messages (via bridge)
|
||||||
|
nodeWhisper.protocolState(Whisper).queue.items.len == 2
|
||||||
|
# Waku node can receive Whisper messages (via bridge)
|
||||||
|
nodeWaku.protocolState(Waku).queue.items.len == 2
|
||||||
|
|
||||||
|
whisper.unsubscribeFilter(nodeWhisper, filter1) == true
|
||||||
|
waku.unsubscribeFilter(nodeWaku, filter2) == true
|
||||||
|
|
||||||
|
# XXX: This reads a bit weird, but eh
|
||||||
|
waku.resetMessageQueue(nodeWaku)
|
||||||
|
whisper.resetMessageQueue(nodeWhisper)
|
||||||
|
# shared queue so Waku and Whisper should be set to 0
|
||||||
|
waku.resetMessageQueue(nodeWakuWhisper)
|
||||||
|
|
||||||
|
check:
|
||||||
|
nodeWhisper.protocolState(Whisper).queue.items.len == 0
|
||||||
|
nodeWaku.protocolState(Waku).queue.items.len == 0
|
||||||
|
nodeWakuWhisper.protocolState(Whisper).queue.items.len == 0
|
||||||
|
nodeWakuWhisper.protocolState(Waku).queue.items.len == 0
|
|
@ -1,390 +0,0 @@
|
||||||
#
|
|
||||||
# Ethereum P2P
|
|
||||||
# (c) Copyright 2018
|
|
||||||
# Status Research & Development GmbH
|
|
||||||
#
|
|
||||||
# Licensed under either of
|
|
||||||
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
|
||||||
# MIT license (LICENSE-MIT)
|
|
||||||
|
|
||||||
import
|
|
||||||
sequtils, options, unittest, tables, chronos, eth/[keys, p2p],
|
|
||||||
eth/p2p/peer_pool, ./p2p_test_helper
|
|
||||||
|
|
||||||
import eth/p2p/rlpx_protocols/waku_protocol as waku
|
|
||||||
import eth/p2p/rlpx_protocols/whisper_protocol as whisper
|
|
||||||
|
|
||||||
# proc resetMessageQueues(nodes: varargs[EthereumNode]) =
|
|
||||||
# for node in nodes:
|
|
||||||
# resetMessageQueue(node)
|
|
||||||
|
|
||||||
let safeTTL = 5'u32
|
|
||||||
let waitInterval = waku.messageInterval + 150.milliseconds
|
|
||||||
|
|
||||||
suite "Waku connections":
|
|
||||||
var node1 = setupTestNode(Waku)
|
|
||||||
var node2 = setupTestNode(Waku)
|
|
||||||
node2.startListening()
|
|
||||||
waitFor node1.peerPool.connectToNode(newNode(initENode(node2.keys.pubKey,
|
|
||||||
node2.address)))
|
|
||||||
|
|
||||||
# Waku Whisper has both capabilities and listens to Whisper, then relays traffic
|
|
||||||
var nodeWakuWhisper = setupTestNode(Waku, Whisper)
|
|
||||||
# XXX: Assuming we added Whisper capability here
|
|
||||||
var nodeWhisper = setupTestNode(Whisper)
|
|
||||||
# TODO: Connect them
|
|
||||||
nodeWakuWhisper.startListening()
|
|
||||||
waitFor nodeWhisper.peerPool.connectToNode(newNode(initENode(nodeWakuWhisper.keys.pubKey,
|
|
||||||
nodeWakuWhisper.address)))
|
|
||||||
|
|
||||||
# NOTE: Commented out Whisper equivalent tests
|
|
||||||
# To enable, fully qualify nodes
|
|
||||||
|
|
||||||
# asyncTest "Two peers connected":
|
|
||||||
# check:
|
|
||||||
# node1.peerPool.connectedNodes.len() == 1
|
|
||||||
|
|
||||||
# asyncTest "Filters with encryption and signing":
|
|
||||||
# let encryptKeyPair = newKeyPair()
|
|
||||||
# let signKeyPair = newKeyPair()
|
|
||||||
# var symKey: SymKey
|
|
||||||
# let topic = [byte 0x12, 0, 0, 0]
|
|
||||||
# var filters: seq[string] = @[]
|
|
||||||
# var payloads = [repeat(byte 1, 10), repeat(byte 2, 10),
|
|
||||||
# repeat(byte 3, 10), repeat(byte 4, 10)]
|
|
||||||
# var futures = [newFuture[int](), newFuture[int](),
|
|
||||||
# newFuture[int](), newFuture[int]()]
|
|
||||||
|
|
||||||
# proc handler1(msg: ReceivedMessage) =
|
|
||||||
# var count {.global.}: int
|
|
||||||
# check msg.decoded.payload == payloads[0] or msg.decoded.payload == payloads[1]
|
|
||||||
# count += 1
|
|
||||||
# if count == 2: futures[0].complete(1)
|
|
||||||
# proc handler2(msg: ReceivedMessage) =
|
|
||||||
# check msg.decoded.payload == payloads[1]
|
|
||||||
# futures[1].complete(1)
|
|
||||||
# proc handler3(msg: ReceivedMessage) =
|
|
||||||
# var count {.global.}: int
|
|
||||||
# check msg.decoded.payload == payloads[2] or msg.decoded.payload == payloads[3]
|
|
||||||
# count += 1
|
|
||||||
# if count == 2: futures[2].complete(1)
|
|
||||||
# proc handler4(msg: ReceivedMessage) =
|
|
||||||
# check msg.decoded.payload == payloads[3]
|
|
||||||
# futures[3].complete(1)
|
|
||||||
|
|
||||||
# # Filters
|
|
||||||
# # filter for encrypted asym
|
|
||||||
# filters.add(node1.subscribeFilter(newFilter(privateKey = some(encryptKeyPair.seckey),
|
|
||||||
# topics = @[topic]), handler1))
|
|
||||||
# # filter for encrypted asym + signed
|
|
||||||
# filters.add(node1.subscribeFilter(newFilter(some(signKeyPair.pubkey),
|
|
||||||
# privateKey = some(encryptKeyPair.seckey),
|
|
||||||
# topics = @[topic]), handler2))
|
|
||||||
# # filter for encrypted sym
|
|
||||||
# filters.add(node1.subscribeFilter(newFilter(symKey = some(symKey),
|
|
||||||
# topics = @[topic]), handler3))
|
|
||||||
# # filter for encrypted sym + signed
|
|
||||||
# filters.add(node1.subscribeFilter(newFilter(some(signKeyPair.pubkey),
|
|
||||||
# symKey = some(symKey),
|
|
||||||
# topics = @[topic]), handler4))
|
|
||||||
# # Messages
|
|
||||||
# check:
|
|
||||||
# # encrypted asym
|
|
||||||
# node2.postMessage(some(encryptKeyPair.pubkey), ttl = safeTTL,
|
|
||||||
# topic = topic, payload = payloads[0]) == true
|
|
||||||
# # encrypted asym + signed
|
|
||||||
# node2.postMessage(some(encryptKeyPair.pubkey),
|
|
||||||
# src = some(signKeyPair.seckey), ttl = safeTTL,
|
|
||||||
# topic = topic, payload = payloads[1]) == true
|
|
||||||
# # encrypted sym
|
|
||||||
# node2.postMessage(symKey = some(symKey), ttl = safeTTL, topic = topic,
|
|
||||||
# payload = payloads[2]) == true
|
|
||||||
# # encrypted sym + signed
|
|
||||||
# node2.postMessage(symKey = some(symKey),
|
|
||||||
# src = some(signKeyPair.seckey),
|
|
||||||
# ttl = safeTTL, topic = topic,
|
|
||||||
# payload = payloads[3]) == true
|
|
||||||
|
|
||||||
# node2.protocolState(Waku).queue.items.len == 4
|
|
||||||
|
|
||||||
# check:
|
|
||||||
# await allFutures(futures).withTimeout(waitInterval)
|
|
||||||
# node1.protocolState(Waku).queue.items.len == 4
|
|
||||||
|
|
||||||
# for filter in filters:
|
|
||||||
# check node1.unsubscribeFilter(filter) == true
|
|
||||||
|
|
||||||
# resetMessageQueues(node1, node2)
|
|
||||||
|
|
||||||
# asyncTest "Filters with topics":
|
|
||||||
# let topic1 = [byte 0x12, 0, 0, 0]
|
|
||||||
# let topic2 = [byte 0x34, 0, 0, 0]
|
|
||||||
# var payloads = [repeat(byte 0, 10), repeat(byte 1, 10)]
|
|
||||||
# var futures = [newFuture[int](), newFuture[int]()]
|
|
||||||
# proc handler1(msg: ReceivedMessage) =
|
|
||||||
# check msg.decoded.payload == payloads[0]
|
|
||||||
# futures[0].complete(1)
|
|
||||||
# proc handler2(msg: ReceivedMessage) =
|
|
||||||
# check msg.decoded.payload == payloads[1]
|
|
||||||
# futures[1].complete(1)
|
|
||||||
|
|
||||||
# var filter1 = node1.subscribeFilter(newFilter(topics = @[topic1]), handler1)
|
|
||||||
# var filter2 = node1.subscribeFilter(newFilter(topics = @[topic2]), handler2)
|
|
||||||
|
|
||||||
# check:
|
|
||||||
# node2.postMessage(ttl = safeTTL + 1, topic = topic1,
|
|
||||||
# payload = payloads[0]) == true
|
|
||||||
# node2.postMessage(ttl = safeTTL, topic = topic2,
|
|
||||||
# payload = payloads[1]) == true
|
|
||||||
# node2.protocolState(Waku).queue.items.len == 2
|
|
||||||
|
|
||||||
# await allFutures(futures).withTimeout(waitInterval)
|
|
||||||
# node1.protocolState(Waku).queue.items.len == 2
|
|
||||||
|
|
||||||
# node1.unsubscribeFilter(filter1) == true
|
|
||||||
# node1.unsubscribeFilter(filter2) == true
|
|
||||||
|
|
||||||
# resetMessageQueues(node1, node2)
|
|
||||||
|
|
||||||
# asyncTest "Filters with PoW":
|
|
||||||
# let topic = [byte 0x12, 0, 0, 0]
|
|
||||||
# var payload = repeat(byte 0, 10)
|
|
||||||
# var futures = [newFuture[int](), newFuture[int]()]
|
|
||||||
# proc handler1(msg: ReceivedMessage) =
|
|
||||||
# check msg.decoded.payload == payload
|
|
||||||
# futures[0].complete(1)
|
|
||||||
# proc handler2(msg: ReceivedMessage) =
|
|
||||||
# check msg.decoded.payload == payload
|
|
||||||
# futures[1].complete(1)
|
|
||||||
|
|
||||||
# var filter1 = node1.subscribeFilter(newFilter(topics = @[topic], powReq = 0),
|
|
||||||
# handler1)
|
|
||||||
# var filter2 = node1.subscribeFilter(newFilter(topics = @[topic],
|
|
||||||
# powReq = 1_000_000), handler2)
|
|
||||||
|
|
||||||
# check:
|
|
||||||
# node2.postMessage(ttl = safeTTL, topic = topic, payload = payload) == true
|
|
||||||
|
|
||||||
# (await futures[0].withTimeout(waitInterval)) == true
|
|
||||||
# (await futures[1].withTimeout(waitInterval)) == false
|
|
||||||
# node1.protocolState(Waku).queue.items.len == 1
|
|
||||||
|
|
||||||
# node1.unsubscribeFilter(filter1) == true
|
|
||||||
# node1.unsubscribeFilter(filter2) == true
|
|
||||||
|
|
||||||
# resetMessageQueues(node1, node2)
|
|
||||||
|
|
||||||
# asyncTest "Filters with queues":
|
|
||||||
# let topic = [byte 0, 0, 0, 0]
|
|
||||||
# let payload = repeat(byte 0, 10)
|
|
||||||
|
|
||||||
# var filter = node1.subscribeFilter(newFilter(topics = @[topic]))
|
|
||||||
# for i in countdown(10, 1):
|
|
||||||
# check node2.postMessage(ttl = safeTTL, topic = topic,
|
|
||||||
# payload = payload) == true
|
|
||||||
|
|
||||||
# await sleepAsync(waitInterval)
|
|
||||||
# check:
|
|
||||||
# node1.getFilterMessages(filter).len() == 10
|
|
||||||
# node1.getFilterMessages(filter).len() == 0
|
|
||||||
# node1.unsubscribeFilter(filter) == true
|
|
||||||
|
|
||||||
# resetMessageQueues(node1, node2)
|
|
||||||
|
|
||||||
# asyncTest "Local filter notify":
|
|
||||||
# let topic = [byte 0, 0, 0, 0]
|
|
||||||
|
|
||||||
# var filter = node1.subscribeFilter(newFilter(topics = @[topic]))
|
|
||||||
# check:
|
|
||||||
# node1.postMessage(ttl = safeTTL, topic = topic,
|
|
||||||
# payload = repeat(byte 4, 10)) == true
|
|
||||||
# node1.getFilterMessages(filter).len() == 1
|
|
||||||
# node1.unsubscribeFilter(filter) == true
|
|
||||||
|
|
||||||
# await sleepAsync(waitInterval)
|
|
||||||
# resetMessageQueues(node1, node2)
|
|
||||||
|
|
||||||
# asyncTest "Bloomfilter blocking":
|
|
||||||
# let sendTopic1 = [byte 0x12, 0, 0, 0]
|
|
||||||
# let sendTopic2 = [byte 0x34, 0, 0, 0]
|
|
||||||
# let filterTopics = @[[byte 0x34, 0, 0, 0],[byte 0x56, 0, 0, 0]]
|
|
||||||
# let payload = repeat(byte 0, 10)
|
|
||||||
# var f: Future[int] = newFuture[int]()
|
|
||||||
# proc handler(msg: ReceivedMessage) =
|
|
||||||
# check msg.decoded.payload == payload
|
|
||||||
# f.complete(1)
|
|
||||||
# var filter = node1.subscribeFilter(newFilter(topics = filterTopics), handler)
|
|
||||||
# await node1.setBloomFilter(node1.filtersToBloom())
|
|
||||||
|
|
||||||
# check:
|
|
||||||
# node2.postMessage(ttl = safeTTL, topic = sendTopic1,
|
|
||||||
# payload = payload) == true
|
|
||||||
# node2.protocolState(Waku).queue.items.len == 1
|
|
||||||
|
|
||||||
# (await f.withTimeout(waitInterval)) == false
|
|
||||||
# node1.protocolState(Waku).queue.items.len == 0
|
|
||||||
|
|
||||||
# resetMessageQueues(node1, node2)
|
|
||||||
|
|
||||||
# f = newFuture[int]()
|
|
||||||
|
|
||||||
# check:
|
|
||||||
# node2.postMessage(ttl = safeTTL, topic = sendTopic2,
|
|
||||||
# payload = payload) == true
|
|
||||||
# node2.protocolState(Waku).queue.items.len == 1
|
|
||||||
|
|
||||||
# await f.withTimeout(waitInterval)
|
|
||||||
# f.read() == 1
|
|
||||||
# node1.protocolState(Waku).queue.items.len == 1
|
|
||||||
|
|
||||||
# node1.unsubscribeFilter(filter) == true
|
|
||||||
|
|
||||||
# await node1.setBloomFilter(fullBloom())
|
|
||||||
|
|
||||||
# resetMessageQueues(node1, node2)
|
|
||||||
|
|
||||||
# asyncTest "PoW blocking":
|
|
||||||
# let topic = [byte 0, 0, 0, 0]
|
|
||||||
# let payload = repeat(byte 0, 10)
|
|
||||||
|
|
||||||
# await node1.setPowRequirement(1_000_000)
|
|
||||||
# check:
|
|
||||||
# node2.postMessage(ttl = safeTTL, topic = topic, payload = payload) == true
|
|
||||||
# node2.protocolState(Waku).queue.items.len == 1
|
|
||||||
# await sleepAsync(waitInterval)
|
|
||||||
# check:
|
|
||||||
# node1.protocolState(Waku).queue.items.len == 0
|
|
||||||
|
|
||||||
# resetMessageQueues(node1, node2)
|
|
||||||
|
|
||||||
# await node1.setPowRequirement(0.0)
|
|
||||||
# check:
|
|
||||||
# node2.postMessage(ttl = safeTTL, topic = topic, payload = payload) == true
|
|
||||||
# node2.protocolState(Waku).queue.items.len == 1
|
|
||||||
# await sleepAsync(waitInterval)
|
|
||||||
# check:
|
|
||||||
# node1.protocolState(Waku).queue.items.len == 1
|
|
||||||
|
|
||||||
# resetMessageQueues(node1, node2)
|
|
||||||
|
|
||||||
# asyncTest "Queue pruning":
|
|
||||||
# let topic = [byte 0, 0, 0, 0]
|
|
||||||
# let payload = repeat(byte 0, 10)
|
|
||||||
# # We need a minimum TTL of 2 as when set to 1 there is a small chance that
|
|
||||||
# # it is already expired after messageInterval due to rounding down of float
|
|
||||||
# # to uint32 in postMessage()
|
|
||||||
# let lowerTTL = 2'u32 # Lower TTL as we need to wait for messages to expire
|
|
||||||
# for i in countdown(10, 1):
|
|
||||||
# check node2.postMessage(ttl = lowerTTL, topic = topic, payload = payload) == true
|
|
||||||
# check node2.protocolState(Waku).queue.items.len == 10
|
|
||||||
|
|
||||||
# await sleepAsync(waitInterval)
|
|
||||||
# check node1.protocolState(Waku).queue.items.len == 10
|
|
||||||
|
|
||||||
# await sleepAsync(milliseconds((lowerTTL+1)*1000))
|
|
||||||
# check node1.protocolState(Waku).queue.items.len == 0
|
|
||||||
# check node2.protocolState(Waku).queue.items.len == 0
|
|
||||||
|
|
||||||
# resetMessageQueues(node1, node2)
|
|
||||||
|
|
||||||
# asyncTest "P2P post":
|
|
||||||
# let topic = [byte 0, 0, 0, 0]
|
|
||||||
# var f: Future[int] = newFuture[int]()
|
|
||||||
# proc handler(msg: ReceivedMessage) =
|
|
||||||
# check msg.decoded.payload == repeat(byte 4, 10)
|
|
||||||
# f.complete(1)
|
|
||||||
|
|
||||||
# var filter = node1.subscribeFilter(newFilter(topics = @[topic],
|
|
||||||
# allowP2P = true), handler)
|
|
||||||
# check:
|
|
||||||
# node1.setPeerTrusted(toNodeId(node2.keys.pubkey)) == true
|
|
||||||
# node2.postMessage(ttl = 10, topic = topic,
|
|
||||||
# payload = repeat(byte 4, 10),
|
|
||||||
# targetPeer = some(toNodeId(node1.keys.pubkey))) == true
|
|
||||||
|
|
||||||
# await f.withTimeout(waitInterval)
|
|
||||||
# f.read() == 1
|
|
||||||
# node1.protocolState(Waku).queue.items.len == 0
|
|
||||||
# node2.protocolState(Waku).queue.items.len == 0
|
|
||||||
|
|
||||||
# node1.unsubscribeFilter(filter) == true
|
|
||||||
|
|
||||||
# asyncTest "Light node posting":
|
|
||||||
# var ln1 = setupTestNode(Waku)
|
|
||||||
# ln1.setLightNode(true)
|
|
||||||
|
|
||||||
# await ln1.peerPool.connectToNode(newNode(initENode(node2.keys.pubKey,
|
|
||||||
# node2.address)))
|
|
||||||
|
|
||||||
# let topic = [byte 0, 0, 0, 0]
|
|
||||||
|
|
||||||
# check:
|
|
||||||
# # normal post
|
|
||||||
# ln1.postMessage(ttl = safeTTL, topic = topic,
|
|
||||||
# payload = repeat(byte 0, 10)) == false
|
|
||||||
# ln1.protocolState(Waku).queue.items.len == 0
|
|
||||||
# # P2P post
|
|
||||||
# ln1.postMessage(ttl = safeTTL, topic = topic,
|
|
||||||
# payload = repeat(byte 0, 10),
|
|
||||||
# targetPeer = some(toNodeId(node2.keys.pubkey))) == true
|
|
||||||
# ln1.protocolState(Waku).queue.items.len == 0
|
|
||||||
|
|
||||||
# asyncTest "Connect two light nodes":
|
|
||||||
# var ln1 = setupTestNode(Waku)
|
|
||||||
# var ln2 = setupTestNode(Waku)
|
|
||||||
|
|
||||||
# ln1.setLightNode(true)
|
|
||||||
# ln2.setLightNode(true)
|
|
||||||
|
|
||||||
# ln2.startListening()
|
|
||||||
# let peer = await ln1.rlpxConnect(newNode(initENode(ln2.keys.pubKey,
|
|
||||||
# ln2.address)))
|
|
||||||
# check peer.isNil == true
|
|
||||||
|
|
||||||
asyncTest "WakuWhisper and Whisper peers connected":
|
|
||||||
check:
|
|
||||||
nodeWakuWhisper.peerPool.connectedNodes.len() == 1
|
|
||||||
|
|
||||||
asyncTest "WhisperWaku and Whisper filters with topics":
|
|
||||||
let topic1 = [byte 0x12, 0, 0, 0]
|
|
||||||
let topic2 = [byte 0x34, 0, 0, 0]
|
|
||||||
var payloads = [repeat(byte 0, 10), repeat(byte 1, 10)]
|
|
||||||
var futures = [newFuture[int](), newFuture[int]()]
|
|
||||||
|
|
||||||
proc handler1(msg: whisper.ReceivedMessage) =
|
|
||||||
check msg.decoded.payload == payloads[0]
|
|
||||||
futures[0].complete(1)
|
|
||||||
proc handler2(msg: whisper.ReceivedMessage) =
|
|
||||||
check msg.decoded.payload == payloads[1]
|
|
||||||
futures[1].complete(1)
|
|
||||||
|
|
||||||
var filter1 = nodeWakuWhisper.subscribeFilter(whisper.newFilter(topics = @[topic1]), handler1)
|
|
||||||
var filter2 = nodeWakuWhisper.subscribeFilter(whisper.newFilter(topics = @[topic2]), handler2)
|
|
||||||
|
|
||||||
check:
|
|
||||||
whisper.postMessage(nodeWhisper, ttl = safeTTL + 1, topic = topic1,
|
|
||||||
payload = payloads[0]) == true
|
|
||||||
whisper.postMessage(nodeWhisper, ttl = safeTTL, topic = topic2,
|
|
||||||
payload = payloads[1]) == true
|
|
||||||
nodeWhisper.protocolState(Whisper).queue.items.len == 2
|
|
||||||
|
|
||||||
await allFutures(futures).withTimeout(waitInterval)
|
|
||||||
|
|
||||||
# This shows WakuWhisper can receive Whisper messages
|
|
||||||
# TODO: This should also make its way to Waku state! Where?
|
|
||||||
nodeWakuWhisper.protocolState(Whisper).queue.items.len == 2
|
|
||||||
|
|
||||||
# XXX: How does this look with protocol state for waku and whisper?
|
|
||||||
whisper.unsubscribeFilter(nodeWakuWhisper, filter1) == true
|
|
||||||
whisper.unsubscribeFilter(nodeWakuWhisper, filter2) == true
|
|
||||||
|
|
||||||
# XXX: This reads a bit weird, but eh
|
|
||||||
waku.resetMessageQueue(nodeWakuWhisper)
|
|
||||||
whisper.resetMessageQueue(nodeWakuWhisper)
|
|
||||||
whisper.resetMessageQueue(nodeWhisper)
|
|
||||||
|
|
||||||
check:
|
|
||||||
nodeWhisper.protocolState(Whisper).queue.items.len == 0
|
|
||||||
nodeWakuWhisper.protocolState(Whisper).queue.items.len == 0
|
|
||||||
|
|
||||||
# TODO: Add test for Waku node also listening on Whisper topic
|
|
Loading…
Reference in New Issue