mirror of
https://github.com/status-im/nim-eth.git
synced 2025-01-25 22:10:16 +00:00
07cf801b97
* Rebrand asyncdispatch2 to chronos * fix nimble path to chronos
386 lines
13 KiB
Nim
386 lines
13 KiB
Nim
#
|
|
# Ethereum P2P
|
|
# (c) Copyright 2018
|
|
# Status Research & Development GmbH
|
|
#
|
|
# Licensed under either of
|
|
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
|
# MIT license (LICENSE-MIT)
|
|
|
|
import
|
|
sequtils, options, unittest, tables, chronos, eth/[rlp, keys, p2p],
|
|
eth/p2p/rlpx_protocols/[whisper_protocol], eth/p2p/[discovery, enode]
|
|
|
|
const
|
|
useCompression = defined(useSnappy)
|
|
|
|
var nextPort = 30303
|
|
|
|
proc localAddress(port: int): Address =
|
|
let port = Port(port)
|
|
result = Address(udpPort: port, tcpPort: port, ip: parseIpAddress("127.0.0.1"))
|
|
|
|
proc startDiscoveryNode(privKey: PrivateKey, address: Address,
|
|
bootnodes: seq[ENode]): Future[DiscoveryProtocol] {.async.} =
|
|
result = newDiscoveryProtocol(privKey, address, bootnodes)
|
|
result.open()
|
|
await result.bootstrap()
|
|
|
|
proc setupBootNode(): Future[ENode] {.async.} =
|
|
let
|
|
bootNodeKey = newPrivateKey()
|
|
bootNodeAddr = localAddress(30301)
|
|
bootNode = await startDiscoveryNode(bootNodeKey, bootNodeAddr, @[])
|
|
result = initENode(bootNodeKey.getPublicKey, bootNodeAddr)
|
|
|
|
template asyncTest(name, body: untyped) =
|
|
test name:
|
|
proc scenario {.async.} = body
|
|
waitFor scenario()
|
|
|
|
proc resetMessageQueues(nodes: varargs[EthereumNode]) =
|
|
for node in nodes:
|
|
node.resetMessageQueue()
|
|
|
|
proc prepTestNode(): EthereumNode =
|
|
let keys1 = newKeyPair()
|
|
result = newEthereumNode(keys1, localAddress(nextPort), 1, nil,
|
|
addAllCapabilities = false,
|
|
useCompression = useCompression)
|
|
nextPort.inc
|
|
result.addCapability Whisper
|
|
|
|
let bootENode = waitFor setupBootNode()
|
|
|
|
var node1 = prepTestNode()
|
|
var node2 = prepTestNode()
|
|
# node2 listening and node1 not, to avoid many incoming vs outgoing
|
|
var node1Connected = node1.connectToNetwork(@[bootENode], false, true)
|
|
var node2Connected = node2.connectToNetwork(@[bootENode], true, true)
|
|
waitFor node1Connected
|
|
waitFor node2Connected
|
|
|
|
suite "Whisper connections":
|
|
asyncTest "Two peers connected":
|
|
check:
|
|
node1.peerPool.connectedNodes.len() == 1
|
|
node2.peerPool.connectedNodes.len() == 1
|
|
|
|
asyncTest "Filters with encryption and signing":
|
|
let encryptKeyPair = newKeyPair()
|
|
let signKeyPair = newKeyPair()
|
|
var symKey: SymKey
|
|
let topic = [byte 0x12, 0, 0, 0]
|
|
var filters: seq[string] = @[]
|
|
var payloads = [repeat(byte 1, 10), repeat(byte 2, 10),
|
|
repeat(byte 3, 10), repeat(byte 4, 10)]
|
|
var futures = [newFuture[int](), newFuture[int](),
|
|
newFuture[int](), newFuture[int]()]
|
|
|
|
proc handler1(msg: ReceivedMessage) =
|
|
var count {.global.}: int
|
|
check msg.decoded.payload == payloads[0] or msg.decoded.payload == payloads[1]
|
|
count += 1
|
|
if count == 2: futures[0].complete(1)
|
|
proc handler2(msg: ReceivedMessage) =
|
|
check msg.decoded.payload == payloads[1]
|
|
futures[1].complete(1)
|
|
proc handler3(msg: ReceivedMessage) =
|
|
var count {.global.}: int
|
|
check msg.decoded.payload == payloads[2] or msg.decoded.payload == payloads[3]
|
|
count += 1
|
|
if count == 2: futures[2].complete(1)
|
|
proc handler4(msg: ReceivedMessage) =
|
|
check msg.decoded.payload == payloads[3]
|
|
futures[3].complete(1)
|
|
|
|
# Filters
|
|
# filter for encrypted asym
|
|
filters.add(node1.subscribeFilter(newFilter(privateKey = some(encryptKeyPair.seckey),
|
|
topics = @[topic]), handler1))
|
|
# filter for encrypted asym + signed
|
|
filters.add(node1.subscribeFilter(newFilter(some(signKeyPair.pubkey),
|
|
privateKey = some(encryptKeyPair.seckey),
|
|
topics = @[topic]), handler2))
|
|
# filter for encrypted sym
|
|
filters.add(node1.subscribeFilter(newFilter(symKey = some(symKey),
|
|
topics = @[topic]), handler3))
|
|
# filter for encrypted sym + signed
|
|
filters.add(node1.subscribeFilter(newFilter(some(signKeyPair.pubkey),
|
|
symKey = some(symKey),
|
|
topics = @[topic]), handler4))
|
|
var safeTTL = 5'u32
|
|
# Messages
|
|
check:
|
|
# encrypted asym
|
|
node2.postMessage(some(encryptKeyPair.pubkey), ttl = safeTTL,
|
|
topic = topic, payload = payloads[0]) == true
|
|
# encrypted asym + signed
|
|
node2.postMessage(some(encryptKeyPair.pubkey),
|
|
src = some(signKeyPair.seckey), ttl = safeTTL,
|
|
topic = topic, payload = payloads[1]) == true
|
|
# encrypted sym
|
|
node2.postMessage(symKey = some(symKey), ttl = safeTTL, topic = topic,
|
|
payload = payloads[2]) == true
|
|
# encrypted sym + signed
|
|
node2.postMessage(symKey = some(symKey),
|
|
src = some(signKeyPair.seckey),
|
|
ttl = safeTTL, topic = topic,
|
|
payload = payloads[3]) == true
|
|
|
|
node2.protocolState(Whisper).queue.items.len == 4
|
|
|
|
var f = all(futures)
|
|
await f or sleepAsync(messageInterval)
|
|
check:
|
|
f.finished == true
|
|
node1.protocolState(Whisper).queue.items.len == 4
|
|
|
|
for filter in filters:
|
|
check node1.unsubscribeFilter(filter) == true
|
|
|
|
resetMessageQueues(node1, node2)
|
|
|
|
asyncTest "Filters with topics":
|
|
let topic1 = [byte 0x12, 0, 0, 0]
|
|
let topic2 = [byte 0x34, 0, 0, 0]
|
|
var payloads = [repeat(byte 0, 10), repeat(byte 1, 10)]
|
|
var futures = [newFuture[int](), newFuture[int]()]
|
|
proc handler1(msg: ReceivedMessage) =
|
|
check msg.decoded.payload == payloads[0]
|
|
futures[0].complete(1)
|
|
proc handler2(msg: ReceivedMessage) =
|
|
check msg.decoded.payload == payloads[1]
|
|
futures[1].complete(1)
|
|
|
|
var filter1 = node1.subscribeFilter(newFilter(topics = @[topic1]), handler1)
|
|
var filter2 = node1.subscribeFilter(newFilter(topics = @[topic2]), handler2)
|
|
|
|
var safeTTL = 3'u32
|
|
check:
|
|
node2.postMessage(ttl = safeTTL + 1, topic = topic1,
|
|
payload = payloads[0]) == true
|
|
node2.postMessage(ttl = safeTTL, topic = topic2,
|
|
payload = payloads[1]) == true
|
|
node2.protocolState(Whisper).queue.items.len == 2
|
|
|
|
var f = all(futures)
|
|
await f or sleepAsync(messageInterval)
|
|
check:
|
|
f.finished == true
|
|
node1.protocolState(Whisper).queue.items.len == 2
|
|
|
|
node1.unsubscribeFilter(filter1) == true
|
|
node1.unsubscribeFilter(filter2) == true
|
|
|
|
resetMessageQueues(node1, node2)
|
|
|
|
asyncTest "Filters with PoW":
|
|
let topic = [byte 0x12, 0, 0, 0]
|
|
var payload = repeat(byte 0, 10)
|
|
var futures = [newFuture[int](), newFuture[int]()]
|
|
proc handler1(msg: ReceivedMessage) =
|
|
check msg.decoded.payload == payload
|
|
futures[0].complete(1)
|
|
proc handler2(msg: ReceivedMessage) =
|
|
check msg.decoded.payload == payload
|
|
futures[1].complete(1)
|
|
|
|
var filter1 = node1.subscribeFilter(newFilter(topics = @[topic], powReq = 0),
|
|
handler1)
|
|
var filter2 = node1.subscribeFilter(newFilter(topics = @[topic],
|
|
powReq = 1_000_000), handler2)
|
|
|
|
let safeTTL = 2'u32
|
|
check:
|
|
node2.postMessage(ttl = safeTTL, topic = topic, payload = payload) == true
|
|
|
|
await futures[0] or sleepAsync(messageInterval)
|
|
await futures[1] or sleepAsync(messageInterval)
|
|
check:
|
|
futures[0].finished == true
|
|
futures[1].finished == false
|
|
node1.protocolState(Whisper).queue.items.len == 1
|
|
|
|
node1.unsubscribeFilter(filter1) == true
|
|
node1.unsubscribeFilter(filter2) == true
|
|
|
|
resetMessageQueues(node1, node2)
|
|
|
|
asyncTest "Filters with queues":
|
|
let topic = [byte 0, 0, 0, 0]
|
|
let payload = repeat(byte 0, 10)
|
|
|
|
var filter = node1.subscribeFilter(newFilter(topics = @[topic]))
|
|
for i in countdown(10, 1):
|
|
check node2.postMessage(ttl = i.uint32, topic = topic,
|
|
payload = payload) == true
|
|
|
|
await sleepAsync(messageInterval)
|
|
check:
|
|
node1.getFilterMessages(filter).len() == 10
|
|
node1.getFilterMessages(filter).len() == 0
|
|
node1.unsubscribeFilter(filter) == true
|
|
|
|
resetMessageQueues(node1, node2)
|
|
|
|
asyncTest "Local filter notify":
|
|
let topic = [byte 0, 0, 0, 0]
|
|
|
|
var filter = node1.subscribeFilter(newFilter(topics = @[topic]))
|
|
let safeTTL = 2'u32
|
|
check:
|
|
node1.postMessage(ttl = safeTTL, topic = topic,
|
|
payload = repeat(byte 4, 10)) == true
|
|
node1.getFilterMessages(filter).len() == 1
|
|
node1.unsubscribeFilter(filter) == true
|
|
|
|
await sleepAsync(messageInterval)
|
|
resetMessageQueues(node1, node2)
|
|
|
|
asyncTest "Bloomfilter blocking":
|
|
let sendTopic1 = [byte 0x12, 0, 0, 0]
|
|
let sendTopic2 = [byte 0x34, 0, 0, 0]
|
|
let filterTopics = @[[byte 0x34, 0, 0, 0],[byte 0x56, 0, 0, 0]]
|
|
let payload = repeat(byte 0, 10)
|
|
var f: Future[int] = newFuture[int]()
|
|
proc handler(msg: ReceivedMessage) =
|
|
check msg.decoded.payload == payload
|
|
f.complete(1)
|
|
var filter = node1.subscribeFilter(newFilter(topics = filterTopics), handler)
|
|
await node1.setBloomFilter(node1.filtersToBloom())
|
|
|
|
let safeTTL = 2'u32
|
|
check:
|
|
node2.postMessage(ttl = safeTTL, topic = sendTopic1,
|
|
payload = payload) == true
|
|
node2.protocolState(Whisper).queue.items.len == 1
|
|
|
|
await f or sleepAsync(messageInterval)
|
|
check:
|
|
f.finished == false
|
|
node1.protocolState(Whisper).queue.items.len == 0
|
|
|
|
resetMessageQueues(node1, node2)
|
|
|
|
f = newFuture[int]()
|
|
|
|
check:
|
|
node2.postMessage(ttl = safeTTL, topic = sendTopic2,
|
|
payload = payload) == true
|
|
node2.protocolState(Whisper).queue.items.len == 1
|
|
|
|
await f or sleepAsync(messageInterval)
|
|
check:
|
|
f.finished == true
|
|
f.read() == 1
|
|
node1.protocolState(Whisper).queue.items.len == 1
|
|
|
|
node1.unsubscribeFilter(filter) == true
|
|
|
|
await node1.setBloomFilter(fullBloom())
|
|
|
|
resetMessageQueues(node1, node2)
|
|
|
|
asyncTest "PoW blocking":
|
|
let topic = [byte 0, 0, 0, 0]
|
|
let payload = repeat(byte 0, 10)
|
|
let safeTTL = 2'u32
|
|
|
|
await node1.setPowRequirement(1_000_000)
|
|
check:
|
|
node2.postMessage(ttl = safeTTL, topic = topic, payload = payload) == true
|
|
node2.protocolState(Whisper).queue.items.len == 1
|
|
await sleepAsync(messageInterval)
|
|
check:
|
|
node1.protocolState(Whisper).queue.items.len == 0
|
|
|
|
resetMessageQueues(node1, node2)
|
|
|
|
await node1.setPowRequirement(0.0)
|
|
check:
|
|
node2.postMessage(ttl = safeTTL, topic = topic, payload = payload) == true
|
|
node2.protocolState(Whisper).queue.items.len == 1
|
|
await sleepAsync(messageInterval)
|
|
check:
|
|
node1.protocolState(Whisper).queue.items.len == 1
|
|
|
|
resetMessageQueues(node1, node2)
|
|
|
|
asyncTest "Queue pruning":
|
|
let topic = [byte 0, 0, 0, 0]
|
|
let payload = repeat(byte 0, 10)
|
|
# We need a minimum TTL of 2 as when set to 1 there is a small chance that
|
|
# it is already expired after messageInterval due to rounding down of float
|
|
# to uint32 in postMessage()
|
|
let minTTL = 2'u32
|
|
for i in countdown(minTTL + 9, minTTL):
|
|
check node2.postMessage(ttl = i, topic = topic, payload = payload) == true
|
|
check node2.protocolState(Whisper).queue.items.len == 10
|
|
|
|
await sleepAsync(messageInterval)
|
|
check node1.protocolState(Whisper).queue.items.len == 10
|
|
|
|
await sleepAsync(int(minTTL*1000))
|
|
check node1.protocolState(Whisper).queue.items.len == 0
|
|
check node2.protocolState(Whisper).queue.items.len == 0
|
|
|
|
resetMessageQueues(node1, node2)
|
|
|
|
asyncTest "P2P post":
|
|
let topic = [byte 0, 0, 0, 0]
|
|
var f: Future[int] = newFuture[int]()
|
|
proc handler(msg: ReceivedMessage) =
|
|
check msg.decoded.payload == repeat(byte 4, 10)
|
|
f.complete(1)
|
|
|
|
var filter = node1.subscribeFilter(newFilter(topics = @[topic],
|
|
allowP2P = true), handler)
|
|
check:
|
|
node1.setPeerTrusted(toNodeId(node2.keys.pubkey)) == true
|
|
node2.postMessage(ttl = 10, topic = topic,
|
|
payload = repeat(byte 4, 10),
|
|
targetPeer = some(toNodeId(node1.keys.pubkey))) == true
|
|
|
|
await f or sleepAsync(messageInterval)
|
|
check:
|
|
f.finished == true
|
|
f.read() == 1
|
|
node1.protocolState(Whisper).queue.items.len == 0
|
|
node2.protocolState(Whisper).queue.items.len == 0
|
|
|
|
node1.unsubscribeFilter(filter) == true
|
|
|
|
test "Light node posting":
|
|
var ln1 = prepTestNode()
|
|
ln1.setLightNode(true)
|
|
|
|
# not listening, so will only connect to others that are listening (node2)
|
|
waitFor ln1.connectToNetwork(@[bootENode], false, true)
|
|
|
|
let topic = [byte 0, 0, 0, 0]
|
|
|
|
let safeTTL = 2'u32
|
|
check:
|
|
# normal post
|
|
ln1.postMessage(ttl = safeTTL, topic = topic,
|
|
payload = repeat(byte 0, 10)) == false
|
|
ln1.protocolState(Whisper).queue.items.len == 0
|
|
# P2P post
|
|
ln1.postMessage(ttl = safeTTL, topic = topic,
|
|
payload = repeat(byte 0, 10),
|
|
targetPeer = some(toNodeId(node2.keys.pubkey))) == true
|
|
ln1.protocolState(Whisper).queue.items.len == 0
|
|
|
|
test "Connect two light nodes":
|
|
var ln1 = prepTestNode()
|
|
var ln2 = prepTestNode()
|
|
|
|
ln1.setLightNode(true)
|
|
ln2.setLightNode(true)
|
|
|
|
ln2.startListening()
|
|
let peer = waitFor ln1.rlpxConnect(newNode(initENode(ln2.keys.pubKey,
|
|
ln2.address)))
|
|
check peer.isNil == true
|