Fix, improve and enable tshh_connect + other improvements:
- minePow fix - random padding - random IV (phew!) - other small changes + comments
This commit is contained in:
parent
c4c596a90f
commit
8cad437112
|
@ -32,5 +32,5 @@ task test, "Runs the test suite":
|
||||||
runTest "tdiscovery"
|
runTest "tdiscovery"
|
||||||
runTest "tserver"
|
runTest "tserver"
|
||||||
runTest "tserver", "-d:useSnappy"
|
runTest "tserver", "-d:useSnappy"
|
||||||
# runTest "tshh_connect"
|
runTest "tshh_connect"
|
||||||
runTest "tshh_connect_mocked"
|
runTest "tshh_connect_mocked"
|
||||||
|
|
|
@ -25,6 +25,8 @@ const
|
||||||
whisperVersion* = 6
|
whisperVersion* = 6
|
||||||
defaultMinPow* = 0.001'f64
|
defaultMinPow* = 0.001'f64
|
||||||
defaultMaxMsgSize* = 1024'u32 * 1024'u32 # * 10 # should be no higher than max RLPx size
|
defaultMaxMsgSize* = 1024'u32 * 1024'u32 # * 10 # should be no higher than max RLPx size
|
||||||
|
messageInterval* = 300 ## Interval at which messages are send to peers, in ms
|
||||||
|
pruneInterval* = 1000 ## Interval at which message queue is pruned, in ms
|
||||||
|
|
||||||
type
|
type
|
||||||
Hash* = MDigest[256]
|
Hash* = MDigest[256]
|
||||||
|
@ -181,13 +183,9 @@ proc `or`(a, b: Bloom): Bloom =
|
||||||
|
|
||||||
proc bytesCopy(bloom: var Bloom, b: Bytes) =
|
proc bytesCopy(bloom: var Bloom, b: Bytes) =
|
||||||
assert b.len == bloomSize
|
assert b.len == bloomSize
|
||||||
# memcopy?
|
copyMem(addr bloom[0], unsafeAddr b[0], bloomSize)
|
||||||
for i in 0..<bloom.len:
|
|
||||||
bloom[i] = b[i]
|
|
||||||
|
|
||||||
proc toBloom*(topics: openArray[Topic]): Bloom =
|
proc toBloom*(topics: openArray[Topic]): Bloom =
|
||||||
#if topics.len == 0:
|
|
||||||
# XXX: should we set the bloom here the all 1's ?
|
|
||||||
for topic in topics:
|
for topic in topics:
|
||||||
result = result or topicBloom(topic)
|
result = result or topicBloom(topic)
|
||||||
|
|
||||||
|
@ -198,13 +196,10 @@ proc bloomFilterMatch(filter, sample: Bloom): bool =
|
||||||
return true
|
return true
|
||||||
|
|
||||||
proc fullBloom*(): Bloom =
|
proc fullBloom*(): Bloom =
|
||||||
|
# There is no setMem exported in system, assume compiler is smart enough?
|
||||||
for i in 0..<result.len:
|
for i in 0..<result.len:
|
||||||
result[i] = 0xFF
|
result[i] = 0xFF
|
||||||
|
|
||||||
proc emptyBloom*(): Bloom =
|
|
||||||
for i in 0..<result.len:
|
|
||||||
result[i] = 0x00
|
|
||||||
|
|
||||||
proc encryptAesGcm(plain: openarray[byte], key: SymKey,
|
proc encryptAesGcm(plain: openarray[byte], key: SymKey,
|
||||||
iv: array[gcmIVLen, byte]): Bytes =
|
iv: array[gcmIVLen, byte]): Bytes =
|
||||||
## Encrypt using AES-GCM, making sure to append tag and iv, in that order
|
## Encrypt using AES-GCM, making sure to append tag and iv, in that order
|
||||||
|
@ -297,7 +292,12 @@ proc encode*(self: Payload): Option[Bytes] =
|
||||||
if self.padding.isSome():
|
if self.padding.isSome():
|
||||||
plain.add self.padding.get()
|
plain.add self.padding.get()
|
||||||
else:
|
else:
|
||||||
plain.add repeat(0'u8, padLen) # XXX: should be random
|
var padding = newSeq[byte](padLen)
|
||||||
|
if randomBytes(padding) != padLen:
|
||||||
|
notice "Generation of random padding failed"
|
||||||
|
return
|
||||||
|
|
||||||
|
plain.add padding
|
||||||
|
|
||||||
if self.src.isSome(): # Private key present - signature requested
|
if self.src.isSome(): # Private key present - signature requested
|
||||||
let hash = keccak256.digest(plain)
|
let hash = keccak256.digest(plain)
|
||||||
|
@ -318,7 +318,11 @@ proc encode*(self: Payload): Option[Bytes] =
|
||||||
return some(res)
|
return some(res)
|
||||||
|
|
||||||
if self.symKey.isSome(): # Symmetric key present - encryption requested
|
if self.symKey.isSome(): # Symmetric key present - encryption requested
|
||||||
var iv: array[gcmIVLen, byte] # XXX: random!
|
var iv: array[gcmIVLen, byte]
|
||||||
|
if randomBytes(iv) != gcmIVLen:
|
||||||
|
notice "Generation of random IV failed"
|
||||||
|
return
|
||||||
|
|
||||||
return some(encryptAesGcm(plain, self.symKey.get(), iv))
|
return some(encryptAesGcm(plain, self.symKey.get(), iv))
|
||||||
|
|
||||||
# No encryption!
|
# No encryption!
|
||||||
|
@ -449,13 +453,14 @@ proc minePow*(self: Envelope, seconds: float): uint64 =
|
||||||
while epochTime() < mineEnd or bestPow == 0: # At least one round
|
while epochTime() < mineEnd or bestPow == 0: # At least one round
|
||||||
var tmp = ctx # copy hash calculated so far - we'll reuse that for each iter
|
var tmp = ctx # copy hash calculated so far - we'll reuse that for each iter
|
||||||
tmp.update(i.toBE())
|
tmp.update(i.toBE())
|
||||||
i.inc
|
|
||||||
# XXX:a random nonce here would not leak number of iters
|
# XXX:a random nonce here would not leak number of iters
|
||||||
let pow = calcPow(1, 1, tmp.finish())
|
let pow = calcPow(1, 1, tmp.finish())
|
||||||
if pow > bestPow: # XXX: could also compare hashes as numbers instead
|
if pow > bestPow: # XXX: could also compare hashes as numbers instead
|
||||||
bestPow = pow
|
bestPow = pow
|
||||||
result = i.uint64
|
result = i.uint64
|
||||||
|
|
||||||
|
i.inc
|
||||||
|
|
||||||
proc calcPowHash*(self: Envelope): Hash =
|
proc calcPowHash*(self: Envelope): Hash =
|
||||||
## Calculate the message hash, as done during mining - this can be used to
|
## Calculate the message hash, as done during mining - this can be used to
|
||||||
## verify proof-of-work
|
## verify proof-of-work
|
||||||
|
@ -555,6 +560,9 @@ proc add*(self: var Queue, msg: Message): bool =
|
||||||
proc newFilter*(src = none[PublicKey](), privateKey = none[PrivateKey](),
|
proc newFilter*(src = none[PublicKey](), privateKey = none[PrivateKey](),
|
||||||
symKey = none[SymKey](), topics: seq[Topic] = @[],
|
symKey = none[SymKey](), topics: seq[Topic] = @[],
|
||||||
powReq = 0.0, allowP2P = false): Filter =
|
powReq = 0.0, allowP2P = false): Filter =
|
||||||
|
# Zero topics will give an empty bloom filter which is fine as this bloom
|
||||||
|
# filter is only used to `or` with existing/other bloom filters. Not to do
|
||||||
|
# matching.
|
||||||
Filter(src: src, privateKey: privateKey, symKey: symKey, topics: topics,
|
Filter(src: src, privateKey: privateKey, symKey: symKey, topics: topics,
|
||||||
powReq: powReq, allowP2P: allowP2P, bloom: toBloom(topics))
|
powReq: powReq, allowP2P: allowP2P, bloom: toBloom(topics))
|
||||||
|
|
||||||
|
@ -689,7 +697,7 @@ p2pProtocol Whisper(version = whisperVersion,
|
||||||
whisperNet.config.isLightNode))
|
whisperNet.config.isLightNode))
|
||||||
|
|
||||||
if m.protocolVersion == whisperVersion:
|
if m.protocolVersion == whisperVersion:
|
||||||
debug "Suitable Whisper peer", peer, whisperVersion
|
debug "Whisper peer", peer, whisperVersion
|
||||||
else:
|
else:
|
||||||
raise newException(UselessPeerError, "Incompatible Whisper version")
|
raise newException(UselessPeerError, "Incompatible Whisper version")
|
||||||
|
|
||||||
|
@ -827,7 +835,7 @@ proc run(peer: Peer) {.async.} =
|
||||||
whisperPeer.running = true
|
whisperPeer.running = true
|
||||||
while whisperPeer.running:
|
while whisperPeer.running:
|
||||||
peer.processQueue()
|
peer.processQueue()
|
||||||
await sleepAsync(300)
|
await sleepAsync(messageInterval)
|
||||||
|
|
||||||
proc pruneReceived(node: EthereumNode) =
|
proc pruneReceived(node: EthereumNode) =
|
||||||
if node.peerPool != nil: # XXX: a bit dirty to need to check for this here ...
|
if node.peerPool != nil: # XXX: a bit dirty to need to check for this here ...
|
||||||
|
@ -850,7 +858,7 @@ proc run(node: EthereumNode, network: WhisperNetwork) {.async.} =
|
||||||
# pruning the received sets is not necessary for correct workings
|
# pruning the received sets is not necessary for correct workings
|
||||||
# but simply from keeping the sets growing indefinitely
|
# but simply from keeping the sets growing indefinitely
|
||||||
node.pruneReceived()
|
node.pruneReceived()
|
||||||
await sleepAsync(1000)
|
await sleepAsync(pruneInterval)
|
||||||
|
|
||||||
# Public EthereumNode calls ----------------------------------------------------
|
# Public EthereumNode calls ----------------------------------------------------
|
||||||
|
|
||||||
|
@ -964,3 +972,6 @@ proc setLightNode*(node: EthereumNode, isLightNode: bool) =
|
||||||
proc configureWhisper*(node: EthereumNode, config: WhisperConfig) =
|
proc configureWhisper*(node: EthereumNode, config: WhisperConfig) =
|
||||||
node.protocolState(Whisper).config = config
|
node.protocolState(Whisper).config = config
|
||||||
|
|
||||||
|
# Not something that should be run in normal circumstances
|
||||||
|
proc resetMessageQueue*(node: EthereumNode) =
|
||||||
|
node.protocolState(Whisper).queue = initQueue(defaultQueueCapacity)
|
||||||
|
|
|
@ -277,9 +277,10 @@ suite "Whisper queue":
|
||||||
|
|
||||||
# To test filters we do not care if the msg is valid or allowed
|
# To test filters we do not care if the msg is valid or allowed
|
||||||
proc prepFilterTestMsg(pubKey = none[PublicKey](), symKey = none[SymKey](),
|
proc prepFilterTestMsg(pubKey = none[PublicKey](), symKey = none[SymKey](),
|
||||||
src = none[PrivateKey](), topic: Topic): Message =
|
src = none[PrivateKey](), topic: Topic,
|
||||||
|
padding = none[seq[byte]]()): Message =
|
||||||
let payload = Payload(dst: pubKey, symKey: symKey, src: src,
|
let payload = Payload(dst: pubKey, symKey: symKey, src: src,
|
||||||
payload: @[byte 0, 1, 2])
|
payload: @[byte 0, 1, 2], padding: padding)
|
||||||
let encoded = whisper.encode(payload)
|
let encoded = whisper.encode(payload)
|
||||||
let env = Envelope(expiry: 1, ttl: 1, topic: topic, data: encoded.get(),
|
let env = Envelope(expiry: 1, ttl: 1, topic: topic, data: encoded.get(),
|
||||||
nonce: 0)
|
nonce: 0)
|
||||||
|
@ -332,9 +333,10 @@ suite "Whisper filter":
|
||||||
|
|
||||||
test "test notify of filter against PoW requirement":
|
test "test notify of filter against PoW requirement":
|
||||||
let topic = [byte 0, 0, 0, 0]
|
let topic = [byte 0, 0, 0, 0]
|
||||||
|
let padding = some(repeat(byte 0, 251))
|
||||||
# this message has a PoW of 0.02962962962962963, number should be updated
|
# this message has a PoW of 0.02962962962962963, number should be updated
|
||||||
# in case PoW algorithm changes
|
# in case PoW algorithm changes or contents of padding, payload, topic, etc.
|
||||||
let msg = prepFilterTestMsg(topic = topic)
|
let msg = prepFilterTestMsg(topic = topic, padding = padding)
|
||||||
|
|
||||||
var filters = initTable[string, Filter]()
|
var filters = initTable[string, Filter]()
|
||||||
let
|
let
|
||||||
|
|
|
@ -11,6 +11,11 @@ import
|
||||||
sequtils, options, unittest, tables, asyncdispatch2, rlp, eth_keys,
|
sequtils, options, unittest, tables, asyncdispatch2, rlp, eth_keys,
|
||||||
eth_p2p, eth_p2p/rlpx_protocols/[whisper_protocol], eth_p2p/[discovery, enode]
|
eth_p2p, eth_p2p/rlpx_protocols/[whisper_protocol], eth_p2p/[discovery, enode]
|
||||||
|
|
||||||
|
const
|
||||||
|
useCompression = defined(useSnappy)
|
||||||
|
|
||||||
|
var nextPort = 30303
|
||||||
|
|
||||||
proc localAddress(port: int): Address =
|
proc localAddress(port: int): Address =
|
||||||
let port = Port(port)
|
let port = Port(port)
|
||||||
result = Address(udpPort: port, tcpPort: port, ip: parseIpAddress("127.0.0.1"))
|
result = Address(udpPort: port, tcpPort: port, ip: parseIpAddress("127.0.0.1"))
|
||||||
|
@ -33,298 +38,348 @@ template asyncTest(name, body: untyped) =
|
||||||
proc scenario {.async.} = body
|
proc scenario {.async.} = body
|
||||||
waitFor scenario()
|
waitFor scenario()
|
||||||
|
|
||||||
const useCompression = defined(useSnappy)
|
proc resetMessageQueues(nodes: varargs[EthereumNode]) =
|
||||||
let
|
for node in nodes:
|
||||||
keys1 = newKeyPair()
|
node.resetMessageQueue()
|
||||||
keys2 = newKeyPair()
|
|
||||||
var node1 = newEthereumNode(keys1, localAddress(30303), 1, nil,
|
|
||||||
addAllCapabilities = false,
|
|
||||||
useCompression = useCompression)
|
|
||||||
node1.addCapability Whisper
|
|
||||||
|
|
||||||
var node2 = newEthereumNode(keys2, localAddress(30304), 1, nil,
|
proc prepTestNode(): EthereumNode =
|
||||||
addAllCapabilities = false,
|
let keys1 = newKeyPair()
|
||||||
useCompression = useCompression)
|
result = newEthereumNode(keys1, localAddress(nextPort), 1, nil,
|
||||||
node2.addCapability Whisper
|
addAllCapabilities = false,
|
||||||
|
useCompression = useCompression)
|
||||||
|
nextPort.inc
|
||||||
|
result.addCapability Whisper
|
||||||
|
|
||||||
template waitForEmptyQueues() =
|
let bootENode = waitFor setupBootNode()
|
||||||
while node1.protocolState(Whisper).queue.items.len != 0 or
|
|
||||||
node2.protocolState(Whisper).queue.items.len != 0: poll()
|
|
||||||
|
|
||||||
when not defined(directConnect):
|
var node1 = prepTestNode()
|
||||||
let bootENode = waitFor setupBootNode()
|
var node2 = prepTestNode()
|
||||||
|
# node2 listening and node1 not, to avoid many incoming vs outgoing
|
||||||
# node2 listening and node1 not, to avoid many incoming vs outgoing
|
var node1Connected = node1.connectToNetwork(@[bootENode], false, true)
|
||||||
var node1Connected = node1.connectToNetwork(@[bootENode], false, true)
|
var node2Connected = node2.connectToNetwork(@[bootENode], true, true)
|
||||||
var node2Connected = node2.connectToNetwork(@[bootENode], true, true)
|
waitFor node1Connected
|
||||||
waitFor node1Connected
|
waitFor node2Connected
|
||||||
waitFor node2Connected
|
|
||||||
|
|
||||||
|
suite "Whisper connections":
|
||||||
asyncTest "Two peers connected":
|
asyncTest "Two peers connected":
|
||||||
check:
|
check:
|
||||||
node1.peerPool.connectedNodes.len() == 1
|
node1.peerPool.connectedNodes.len() == 1
|
||||||
node2.peerPool.connectedNodes.len() == 1
|
node2.peerPool.connectedNodes.len() == 1
|
||||||
else: # XXX: tricky without peerPool
|
|
||||||
node2.startListening()
|
|
||||||
discard waitFor node1.rlpxConnect(newNode(initENode(node2.keys.pubKey,
|
|
||||||
node2.address)))
|
|
||||||
|
|
||||||
asyncTest "Filters with encryption and signing":
|
asyncTest "Filters with encryption and signing":
|
||||||
let encryptKeyPair = newKeyPair()
|
let encryptKeyPair = newKeyPair()
|
||||||
let signKeyPair = newKeyPair()
|
let signKeyPair = newKeyPair()
|
||||||
var symKey: SymKey
|
var symKey: SymKey
|
||||||
let topic = [byte 0x12, 0, 0, 0]
|
let topic = [byte 0x12, 0, 0, 0]
|
||||||
var filters: seq[string] = @[]
|
var filters: seq[string] = @[]
|
||||||
var payloads = [repeat(byte 1, 10), repeat(byte 2, 10),
|
var payloads = [repeat(byte 1, 10), repeat(byte 2, 10),
|
||||||
repeat(byte 3, 10), repeat(byte 4, 10)]
|
repeat(byte 3, 10), repeat(byte 4, 10)]
|
||||||
var futures = [newFuture[int](), newFuture[int](),
|
var futures = [newFuture[int](), newFuture[int](),
|
||||||
newFuture[int](), newFuture[int]()]
|
newFuture[int](), newFuture[int]()]
|
||||||
|
|
||||||
proc handler1(msg: ReceivedMessage) =
|
proc handler1(msg: ReceivedMessage) =
|
||||||
var count {.global.}: int
|
var count {.global.}: int
|
||||||
check msg.decoded.payload == payloads[0] or msg.decoded.payload == payloads[1]
|
check msg.decoded.payload == payloads[0] or msg.decoded.payload == payloads[1]
|
||||||
count += 1
|
count += 1
|
||||||
if count == 2: futures[0].complete(1)
|
if count == 2: futures[0].complete(1)
|
||||||
proc handler2(msg: ReceivedMessage) =
|
proc handler2(msg: ReceivedMessage) =
|
||||||
check msg.decoded.payload == payloads[1]
|
check msg.decoded.payload == payloads[1]
|
||||||
futures[1].complete(1)
|
futures[1].complete(1)
|
||||||
proc handler3(msg: ReceivedMessage) =
|
proc handler3(msg: ReceivedMessage) =
|
||||||
var count {.global.}: int
|
var count {.global.}: int
|
||||||
check msg.decoded.payload == payloads[2] or msg.decoded.payload == payloads[3]
|
check msg.decoded.payload == payloads[2] or msg.decoded.payload == payloads[3]
|
||||||
count += 1
|
count += 1
|
||||||
if count == 2: futures[2].complete(1)
|
if count == 2: futures[2].complete(1)
|
||||||
proc handler4(msg: ReceivedMessage) =
|
proc handler4(msg: ReceivedMessage) =
|
||||||
check msg.decoded.payload == payloads[3]
|
check msg.decoded.payload == payloads[3]
|
||||||
futures[3].complete(1)
|
futures[3].complete(1)
|
||||||
|
|
||||||
# Filters
|
# Filters
|
||||||
# filter for encrypted asym
|
# filter for encrypted asym
|
||||||
filters.add(node1.subscribeFilter(newFilter(privateKey = some(encryptKeyPair.seckey),
|
filters.add(node1.subscribeFilter(newFilter(privateKey = some(encryptKeyPair.seckey),
|
||||||
topics = @[topic]), handler1))
|
topics = @[topic]), handler1))
|
||||||
# filter for encrypted asym + signed
|
# filter for encrypted asym + signed
|
||||||
filters.add(node1.subscribeFilter(newFilter(some(signKeyPair.pubkey),
|
filters.add(node1.subscribeFilter(newFilter(some(signKeyPair.pubkey),
|
||||||
privateKey = some(encryptKeyPair.seckey),
|
privateKey = some(encryptKeyPair.seckey),
|
||||||
topics = @[topic]), handler2))
|
topics = @[topic]), handler2))
|
||||||
# filter for encrypted sym
|
# filter for encrypted sym
|
||||||
filters.add(node1.subscribeFilter(newFilter(symKey = some(symKey),
|
filters.add(node1.subscribeFilter(newFilter(symKey = some(symKey),
|
||||||
topics = @[topic]), handler3))
|
topics = @[topic]), handler3))
|
||||||
# filter for encrypted sym + signed
|
# filter for encrypted sym + signed
|
||||||
filters.add(node1.subscribeFilter(newFilter(some(signKeyPair.pubkey),
|
filters.add(node1.subscribeFilter(newFilter(some(signKeyPair.pubkey),
|
||||||
symKey = some(symKey),
|
symKey = some(symKey),
|
||||||
topics = @[topic]), handler4))
|
topics = @[topic]), handler4))
|
||||||
# Messages
|
var safeTTL = 5'u32
|
||||||
# encrypted asym
|
# Messages
|
||||||
check true == node2.postMessage(some(encryptKeyPair.pubkey), ttl = 5,
|
check:
|
||||||
topic = topic, payload = payloads[0])
|
# encrypted asym
|
||||||
# encrypted asym + signed
|
node2.postMessage(some(encryptKeyPair.pubkey), ttl = safeTTL,
|
||||||
check true == node2.postMessage(some(encryptKeyPair.pubkey),
|
topic = topic, payload = payloads[0]) == true
|
||||||
src = some(signKeyPair.seckey), ttl = 4,
|
# encrypted asym + signed
|
||||||
topic = topic, payload = payloads[1])
|
node2.postMessage(some(encryptKeyPair.pubkey),
|
||||||
# encrypted sym
|
src = some(signKeyPair.seckey), ttl = safeTTL,
|
||||||
check true == node2.postMessage(symKey = some(symKey), ttl = 3, topic = topic,
|
topic = topic, payload = payloads[1]) == true
|
||||||
payload = payloads[2])
|
# encrypted sym
|
||||||
# encrypted sym + signed
|
node2.postMessage(symKey = some(symKey), ttl = safeTTL, topic = topic,
|
||||||
check true == node2.postMessage(symKey = some(symKey),
|
payload = payloads[2]) == true
|
||||||
src = some(signKeyPair.seckey), ttl = 2,
|
# encrypted sym + signed
|
||||||
topic = topic, payload = payloads[3])
|
node2.postMessage(symKey = some(symKey),
|
||||||
|
src = some(signKeyPair.seckey),
|
||||||
|
ttl = safeTTL, topic = topic,
|
||||||
|
payload = payloads[3]) == true
|
||||||
|
|
||||||
check node2.protocolState(Whisper).queue.items.len == 4
|
node2.protocolState(Whisper).queue.items.len == 4
|
||||||
|
|
||||||
var f = all(futures)
|
var f = all(futures)
|
||||||
await f or sleepAsync(300)
|
await f or sleepAsync(messageInterval)
|
||||||
check:
|
check:
|
||||||
f.finished == true
|
f.finished == true
|
||||||
node1.protocolState(Whisper).queue.items.len == 4
|
node1.protocolState(Whisper).queue.items.len == 4
|
||||||
|
|
||||||
for filter in filters:
|
for filter in filters:
|
||||||
check node1.unsubscribeFilter(filter) == true
|
check node1.unsubscribeFilter(filter) == true
|
||||||
|
|
||||||
waitForEmptyQueues()
|
resetMessageQueues(node1, node2)
|
||||||
|
|
||||||
asyncTest "Filters with topics":
|
asyncTest "Filters with topics":
|
||||||
let topic1 = [byte 0x12, 0, 0, 0]
|
let topic1 = [byte 0x12, 0, 0, 0]
|
||||||
let topic2 = [byte 0x34, 0, 0, 0]
|
let topic2 = [byte 0x34, 0, 0, 0]
|
||||||
var payloads = [repeat(byte 0, 10), repeat(byte 1, 10)]
|
var payloads = [repeat(byte 0, 10), repeat(byte 1, 10)]
|
||||||
var futures = [newFuture[int](), newFuture[int]()]
|
var futures = [newFuture[int](), newFuture[int]()]
|
||||||
proc handler1(msg: ReceivedMessage) =
|
proc handler1(msg: ReceivedMessage) =
|
||||||
check msg.decoded.payload == payloads[0]
|
check msg.decoded.payload == payloads[0]
|
||||||
futures[0].complete(1)
|
futures[0].complete(1)
|
||||||
proc handler2(msg: ReceivedMessage) =
|
proc handler2(msg: ReceivedMessage) =
|
||||||
check msg.decoded.payload == payloads[1]
|
check msg.decoded.payload == payloads[1]
|
||||||
futures[1].complete(1)
|
futures[1].complete(1)
|
||||||
|
|
||||||
var filter1 = node1.subscribeFilter(newFilter(topics = @[topic1]), handler1)
|
var filter1 = node1.subscribeFilter(newFilter(topics = @[topic1]), handler1)
|
||||||
var filter2 = node1.subscribeFilter(newFilter(topics = @[topic2]), handler2)
|
var filter2 = node1.subscribeFilter(newFilter(topics = @[topic2]), handler2)
|
||||||
|
|
||||||
check:
|
var safeTTL = 3'u32
|
||||||
true == node2.postMessage(ttl = 3, topic = topic1, payload = payloads[0])
|
check:
|
||||||
true == node2.postMessage(ttl = 2, topic = topic2, payload = payloads[1])
|
node2.postMessage(ttl = safeTTL + 1, topic = topic1,
|
||||||
|
payload = payloads[0]) == true
|
||||||
|
node2.postMessage(ttl = safeTTL, topic = topic2,
|
||||||
|
payload = payloads[1]) == true
|
||||||
|
node2.protocolState(Whisper).queue.items.len == 2
|
||||||
|
|
||||||
var f = all(futures)
|
var f = all(futures)
|
||||||
await f or sleepAsync(300)
|
await f or sleepAsync(messageInterval)
|
||||||
check:
|
check:
|
||||||
f.finished == true
|
f.finished == true
|
||||||
node1.protocolState(Whisper).queue.items.len == 2
|
node1.protocolState(Whisper).queue.items.len == 2
|
||||||
|
|
||||||
node1.unsubscribeFilter(filter1) == true
|
node1.unsubscribeFilter(filter1) == true
|
||||||
node1.unsubscribeFilter(filter2) == true
|
node1.unsubscribeFilter(filter2) == true
|
||||||
|
|
||||||
waitForEmptyQueues()
|
resetMessageQueues(node1, node2)
|
||||||
|
|
||||||
asyncTest "Filters with PoW":
|
asyncTest "Filters with PoW":
|
||||||
let topic = [byte 0x12, 0, 0, 0]
|
let topic = [byte 0x12, 0, 0, 0]
|
||||||
var payload = repeat(byte 0, 10)
|
var payload = repeat(byte 0, 10)
|
||||||
var futures = [newFuture[int](), newFuture[int]()]
|
var futures = [newFuture[int](), newFuture[int]()]
|
||||||
proc handler1(msg: ReceivedMessage) =
|
proc handler1(msg: ReceivedMessage) =
|
||||||
check msg.decoded.payload == payload
|
check msg.decoded.payload == payload
|
||||||
futures[0].complete(1)
|
futures[0].complete(1)
|
||||||
proc handler2(msg: ReceivedMessage) =
|
proc handler2(msg: ReceivedMessage) =
|
||||||
check msg.decoded.payload == payload
|
check msg.decoded.payload == payload
|
||||||
futures[1].complete(1)
|
futures[1].complete(1)
|
||||||
|
|
||||||
var filter1 = node1.subscribeFilter(newFilter(topics = @[topic], powReq = 0),
|
var filter1 = node1.subscribeFilter(newFilter(topics = @[topic], powReq = 0),
|
||||||
handler1)
|
handler1)
|
||||||
var filter2 = node1.subscribeFilter(newFilter(topics = @[topic], powReq = 10),
|
var filter2 = node1.subscribeFilter(newFilter(topics = @[topic],
|
||||||
handler2)
|
powReq = 1_000_000), handler2)
|
||||||
|
|
||||||
check:
|
let safeTTL = 2'u32
|
||||||
true == node2.postMessage(ttl = 2, topic = topic, payload = payload)
|
check:
|
||||||
|
node2.postMessage(ttl = safeTTL, topic = topic, payload = payload) == true
|
||||||
|
|
||||||
await futures[0] or sleepAsync(300)
|
await futures[0] or sleepAsync(messageInterval)
|
||||||
await futures[1] or sleepAsync(300)
|
await futures[1] or sleepAsync(messageInterval)
|
||||||
check:
|
check:
|
||||||
futures[0].finished == true
|
futures[0].finished == true
|
||||||
futures[1].finished == false
|
futures[1].finished == false
|
||||||
node1.protocolState(Whisper).queue.items.len == 1
|
node1.protocolState(Whisper).queue.items.len == 1
|
||||||
|
|
||||||
node1.unsubscribeFilter(filter1) == true
|
node1.unsubscribeFilter(filter1) == true
|
||||||
node1.unsubscribeFilter(filter2) == true
|
node1.unsubscribeFilter(filter2) == true
|
||||||
|
|
||||||
waitForEmptyQueues()
|
resetMessageQueues(node1, node2)
|
||||||
|
|
||||||
asyncTest "Filters with queues":
|
asyncTest "Filters with queues":
|
||||||
let topic = [byte 0, 0, 0, 0]
|
let topic = [byte 0, 0, 0, 0]
|
||||||
let payload = repeat(byte 0, 10)
|
let payload = repeat(byte 0, 10)
|
||||||
|
|
||||||
var filter = node1.subscribeFilter(newFilter(topics = @[topic]))
|
var filter = node1.subscribeFilter(newFilter(topics = @[topic]))
|
||||||
for i in countdown(10, 1):
|
for i in countdown(10, 1):
|
||||||
check true == node2.postMessage(ttl = i.uint32, topic = topic,
|
check node2.postMessage(ttl = i.uint32, topic = topic,
|
||||||
payload = payload)
|
payload = payload) == true
|
||||||
|
|
||||||
await sleepAsync(300)
|
await sleepAsync(messageInterval)
|
||||||
check:
|
check:
|
||||||
node1.getFilterMessages(filter).len() == 10
|
node1.getFilterMessages(filter).len() == 10
|
||||||
node1.getFilterMessages(filter).len() == 0
|
node1.getFilterMessages(filter).len() == 0
|
||||||
node1.unsubscribeFilter(filter) == true
|
node1.unsubscribeFilter(filter) == true
|
||||||
|
|
||||||
waitForEmptyQueues()
|
resetMessageQueues(node1, node2)
|
||||||
|
|
||||||
asyncTest "Bloomfilter blocking":
|
asyncTest "Local filter notify":
|
||||||
let sendTopic1 = [byte 0x12, 0, 0, 0]
|
let topic = [byte 0, 0, 0, 0]
|
||||||
let sendTopic2 = [byte 0x34, 0, 0, 0]
|
|
||||||
let filterTopics = @[[byte 0x34, 0, 0, 0],[byte 0x56, 0, 0, 0]]
|
|
||||||
let payload = repeat(byte 0, 10)
|
|
||||||
var f: Future[int] = newFuture[int]()
|
|
||||||
proc handler(msg: ReceivedMessage) =
|
|
||||||
check msg.decoded.payload == payload
|
|
||||||
f.complete(1)
|
|
||||||
var filter = node1.subscribeFilter(newFilter(topics = filterTopics), handler)
|
|
||||||
await node1.setBloomFilter(node1.filtersToBloom())
|
|
||||||
|
|
||||||
check true == node2.postMessage(ttl = 1, topic = sendTopic1, payload = payload)
|
var filter = node1.subscribeFilter(newFilter(topics = @[topic]))
|
||||||
|
let safeTTL = 2'u32
|
||||||
|
check:
|
||||||
|
node1.postMessage(ttl = safeTTL, topic = topic,
|
||||||
|
payload = repeat(byte 4, 10)) == true
|
||||||
|
node1.getFilterMessages(filter).len() == 1
|
||||||
|
node1.unsubscribeFilter(filter) == true
|
||||||
|
|
||||||
await f or sleepAsync(300)
|
await sleepAsync(messageInterval)
|
||||||
check:
|
resetMessageQueues(node1, node2)
|
||||||
f.finished == false
|
|
||||||
node1.protocolState(Whisper).queue.items.len == 0
|
|
||||||
node2.protocolState(Whisper).queue.items.len == 1
|
|
||||||
|
|
||||||
f = newFuture[int]()
|
asyncTest "Bloomfilter blocking":
|
||||||
waitForEmptyQueues()
|
let sendTopic1 = [byte 0x12, 0, 0, 0]
|
||||||
|
let sendTopic2 = [byte 0x34, 0, 0, 0]
|
||||||
|
let filterTopics = @[[byte 0x34, 0, 0, 0],[byte 0x56, 0, 0, 0]]
|
||||||
|
let payload = repeat(byte 0, 10)
|
||||||
|
var f: Future[int] = newFuture[int]()
|
||||||
|
proc handler(msg: ReceivedMessage) =
|
||||||
|
check msg.decoded.payload == payload
|
||||||
|
f.complete(1)
|
||||||
|
var filter = node1.subscribeFilter(newFilter(topics = filterTopics), handler)
|
||||||
|
await node1.setBloomFilter(node1.filtersToBloom())
|
||||||
|
|
||||||
check true == node2.postMessage(ttl = 1, topic = sendTopic2, payload = payload)
|
let safeTTL = 2'u32
|
||||||
|
check:
|
||||||
|
node2.postMessage(ttl = safeTTL, topic = sendTopic1,
|
||||||
|
payload = payload) == true
|
||||||
|
node2.protocolState(Whisper).queue.items.len == 1
|
||||||
|
|
||||||
await f or sleepAsync(300)
|
await f or sleepAsync(messageInterval)
|
||||||
check:
|
check:
|
||||||
f.finished == true
|
f.finished == false
|
||||||
f.read() == 1
|
node1.protocolState(Whisper).queue.items.len == 0
|
||||||
node1.protocolState(Whisper).queue.items.len == 1
|
|
||||||
node2.protocolState(Whisper).queue.items.len == 1
|
|
||||||
|
|
||||||
node1.unsubscribeFilter(filter) == true
|
resetMessageQueues(node1, node2)
|
||||||
|
|
||||||
await node1.setBloomFilter(fullBloom())
|
f = newFuture[int]()
|
||||||
|
|
||||||
waitForEmptyQueues()
|
check:
|
||||||
|
node2.postMessage(ttl = safeTTL, topic = sendTopic2,
|
||||||
|
payload = payload) == true
|
||||||
|
node2.protocolState(Whisper).queue.items.len == 1
|
||||||
|
|
||||||
asyncTest "PoW blocking":
|
await f or sleepAsync(messageInterval)
|
||||||
let topic = [byte 0, 0, 0, 0]
|
check:
|
||||||
let payload = repeat(byte 0, 10)
|
f.finished == true
|
||||||
await node1.setPowRequirement(1.0)
|
f.read() == 1
|
||||||
check true == node2.postMessage(ttl = 1, topic = topic, payload = payload)
|
node1.protocolState(Whisper).queue.items.len == 1
|
||||||
await sleepAsync(300)
|
|
||||||
check:
|
|
||||||
node1.protocolState(Whisper).queue.items.len == 0
|
|
||||||
node2.protocolState(Whisper).queue.items.len == 1
|
|
||||||
|
|
||||||
waitForEmptyQueues()
|
node1.unsubscribeFilter(filter) == true
|
||||||
|
|
||||||
await node1.setPowRequirement(0.0)
|
await node1.setBloomFilter(fullBloom())
|
||||||
check true == node2.postMessage(ttl = 1, topic = topic, payload = payload)
|
|
||||||
await sleepAsync(300)
|
|
||||||
check:
|
|
||||||
node1.protocolState(Whisper).queue.items.len == 1
|
|
||||||
node2.protocolState(Whisper).queue.items.len == 1
|
|
||||||
|
|
||||||
waitForEmptyQueues()
|
resetMessageQueues(node1, node2)
|
||||||
|
|
||||||
asyncTest "Queue pruning":
|
asyncTest "PoW blocking":
|
||||||
let topic = [byte 0, 0, 0, 0]
|
let topic = [byte 0, 0, 0, 0]
|
||||||
let payload = repeat(byte 0, 10)
|
let payload = repeat(byte 0, 10)
|
||||||
for i in countdown(10, 1):
|
let safeTTL = 2'u32
|
||||||
check true == node2.postMessage(ttl = i.uint32, topic = topic,
|
|
||||||
payload = payload)
|
|
||||||
check node2.protocolState(Whisper).queue.items.len == 10
|
|
||||||
|
|
||||||
await sleepAsync(300)
|
await node1.setPowRequirement(1_000_000)
|
||||||
check:
|
check:
|
||||||
node1.protocolState(Whisper).queue.items.len == 10
|
node2.postMessage(ttl = safeTTL, topic = topic, payload = payload) == true
|
||||||
|
node2.protocolState(Whisper).queue.items.len == 1
|
||||||
|
await sleepAsync(messageInterval)
|
||||||
|
check:
|
||||||
|
node1.protocolState(Whisper).queue.items.len == 0
|
||||||
|
|
||||||
await sleepAsync(1000)
|
resetMessageQueues(node1, node2)
|
||||||
check:
|
|
||||||
node1.protocolState(Whisper).queue.items.len == 0
|
|
||||||
node2.protocolState(Whisper).queue.items.len == 0
|
|
||||||
|
|
||||||
asyncTest "Light node posting":
|
await node1.setPowRequirement(0.0)
|
||||||
let topic = [byte 0, 0, 0, 0]
|
check:
|
||||||
node1.setLightNode(true)
|
node2.postMessage(ttl = safeTTL, topic = topic, payload = payload) == true
|
||||||
|
node2.protocolState(Whisper).queue.items.len == 1
|
||||||
|
await sleepAsync(messageInterval)
|
||||||
|
check:
|
||||||
|
node1.protocolState(Whisper).queue.items.len == 1
|
||||||
|
|
||||||
check:
|
resetMessageQueues(node1, node2)
|
||||||
node1.postMessage(ttl = 2, topic = topic, payload = repeat(byte 0, 10)) == false
|
|
||||||
node1.protocolState(Whisper).queue.items.len == 0
|
|
||||||
|
|
||||||
node1.setLightNode(false)
|
asyncTest "Queue pruning":
|
||||||
|
let topic = [byte 0, 0, 0, 0]
|
||||||
|
let payload = repeat(byte 0, 10)
|
||||||
|
# We need a minimum TTL of 2 as when set to 1 there is a small chance that
|
||||||
|
# it is already expired after messageInterval due to rounding down of float
|
||||||
|
# to uint32 in postMessage()
|
||||||
|
let minTTL = 2'u32
|
||||||
|
for i in countdown(minTTL + 9, minTTL):
|
||||||
|
check node2.postMessage(ttl = i, topic = topic, payload = payload) == true
|
||||||
|
check node2.protocolState(Whisper).queue.items.len == 10
|
||||||
|
|
||||||
asyncTest "P2P":
|
await sleepAsync(messageInterval)
|
||||||
let topic = [byte 0, 0, 0, 0]
|
check node1.protocolState(Whisper).queue.items.len == 10
|
||||||
var f: Future[int] = newFuture[int]()
|
|
||||||
proc handler(msg: ReceivedMessage) =
|
|
||||||
check msg.decoded.payload == repeat(byte 4, 10)
|
|
||||||
f.complete(1)
|
|
||||||
|
|
||||||
var filter = node1.subscribeFilter(newFilter(topics = @[topic], allowP2P = true),
|
await sleepAsync(int(minTTL*1000))
|
||||||
handler)
|
check node1.protocolState(Whisper).queue.items.len == 0
|
||||||
check:
|
check node2.protocolState(Whisper).queue.items.len == 0
|
||||||
true == node1.setPeerTrusted(toNodeId(node2.keys.pubkey))
|
|
||||||
true == node2.postMessage(ttl = 2, topic = topic,
|
|
||||||
payload = repeat(byte 4, 10),
|
|
||||||
targetPeer = some(toNodeId(node1.keys.pubkey)))
|
|
||||||
|
|
||||||
await f or sleepAsync(300)
|
resetMessageQueues(node1, node2)
|
||||||
check:
|
|
||||||
f.finished == true
|
|
||||||
f.read() == 1
|
|
||||||
node1.protocolState(Whisper).queue.items.len == 0
|
|
||||||
node2.protocolState(Whisper).queue.items.len == 0
|
|
||||||
|
|
||||||
node1.unsubscribeFilter(filter) == true
|
asyncTest "P2P post":
|
||||||
|
let topic = [byte 0, 0, 0, 0]
|
||||||
|
var f: Future[int] = newFuture[int]()
|
||||||
|
proc handler(msg: ReceivedMessage) =
|
||||||
|
check msg.decoded.payload == repeat(byte 4, 10)
|
||||||
|
f.complete(1)
|
||||||
|
|
||||||
|
var filter = node1.subscribeFilter(newFilter(topics = @[topic],
|
||||||
|
allowP2P = true), handler)
|
||||||
|
check:
|
||||||
|
node1.setPeerTrusted(toNodeId(node2.keys.pubkey)) == true
|
||||||
|
node2.postMessage(ttl = 10, topic = topic,
|
||||||
|
payload = repeat(byte 4, 10),
|
||||||
|
targetPeer = some(toNodeId(node1.keys.pubkey))) == true
|
||||||
|
|
||||||
|
await f or sleepAsync(messageInterval)
|
||||||
|
check:
|
||||||
|
f.finished == true
|
||||||
|
f.read() == 1
|
||||||
|
node1.protocolState(Whisper).queue.items.len == 0
|
||||||
|
node2.protocolState(Whisper).queue.items.len == 0
|
||||||
|
|
||||||
|
node1.unsubscribeFilter(filter) == true
|
||||||
|
|
||||||
|
test "Light node posting":
|
||||||
|
var ln1 = prepTestNode()
|
||||||
|
ln1.setLightNode(true)
|
||||||
|
|
||||||
|
# not listening, so will only connect to others that are listening (node2)
|
||||||
|
waitFor ln1.connectToNetwork(@[bootENode], false, true)
|
||||||
|
|
||||||
|
let topic = [byte 0, 0, 0, 0]
|
||||||
|
|
||||||
|
let safeTTL = 2'u32
|
||||||
|
check:
|
||||||
|
# normal post
|
||||||
|
ln1.postMessage(ttl = safeTTL, topic = topic,
|
||||||
|
payload = repeat(byte 0, 10)) == false
|
||||||
|
ln1.protocolState(Whisper).queue.items.len == 0
|
||||||
|
# P2P post
|
||||||
|
ln1.postMessage(ttl = safeTTL, topic = topic,
|
||||||
|
payload = repeat(byte 0, 10),
|
||||||
|
targetPeer = some(toNodeId(node2.keys.pubkey))) == true
|
||||||
|
ln1.protocolState(Whisper).queue.items.len == 0
|
||||||
|
|
||||||
|
test "Connect two light nodes":
|
||||||
|
var ln1 = prepTestNode()
|
||||||
|
var ln2 = prepTestNode()
|
||||||
|
|
||||||
|
ln1.setLightNode(true)
|
||||||
|
ln2.setLightNode(true)
|
||||||
|
|
||||||
|
ln2.startListening()
|
||||||
|
let peer = waitFor ln1.rlpxConnect(newNode(initENode(ln2.keys.pubKey,
|
||||||
|
ln2.address)))
|
||||||
|
check peer.isNil == true
|
||||||
|
|
Loading…
Reference in New Issue