Add discovery query with forkId and attnets filter (#2420)
This commit is contained in:
parent
36311bfc05
commit
a4a2c1c0e1
|
@ -868,6 +868,38 @@ proc toPeerAddr(node: Node): Result[PeerAddr, cstring] {.raises: [Defect].} =
|
|||
let peerAddr = ? nodeRecord.toPeerAddr(tcpProtocol)
|
||||
ok(peerAddr)
|
||||
|
||||
proc queryRandom*(d: Eth2DiscoveryProtocol, forkId: ENRForkID,
|
||||
attnets: BitArray[ATTESTATION_SUBNET_COUNT]):
|
||||
Future[seq[PeerAddr]] {.async, raises:[Exception, Defect].} =
|
||||
## Perform a discovery query for a random target matching the eth2 field
|
||||
## (forkId) and matching at least one of the attestation subnets.
|
||||
let nodes = await d.queryRandom()
|
||||
let eth2Field = SSZ.encode(forkId)
|
||||
|
||||
var filtered: seq[PeerAddr]
|
||||
for n in nodes:
|
||||
if n.record.contains(("eth2", eth2Field)):
|
||||
let res = n.record.tryGet("attnets", seq[byte])
|
||||
|
||||
if res.isSome():
|
||||
let attnetsNode =
|
||||
try:
|
||||
SSZ.decode(res.get(), BitArray[ATTESTATION_SUBNET_COUNT])
|
||||
except SszError as e:
|
||||
debug "Could not decode attestation subnet bitfield of peer",
|
||||
peer = n.record.toURI(), exception = e.name, msg = e.msg
|
||||
continue
|
||||
|
||||
for i in 0..<attnetsNode.bytes.len:
|
||||
if (attnets.bytes[i] and attnetsNode.bytes[i]) > 0:
|
||||
# we have at least one subnet match
|
||||
let peerAddr = n.toPeerAddr()
|
||||
if peerAddr.isOk():
|
||||
filtered.add(peerAddr.get())
|
||||
break
|
||||
|
||||
return filtered
|
||||
|
||||
proc runDiscoveryLoop*(node: Eth2Node) {.async.} =
|
||||
debug "Starting discovery loop"
|
||||
let enrField = ("eth2", SSZ.encode(node.forkId))
|
||||
|
|
|
@ -18,6 +18,7 @@ import # Unit test
|
|||
./test_bitseqs,
|
||||
./test_block_pool,
|
||||
./test_datatypes,
|
||||
./test_discovery,
|
||||
./test_eth1_monitor,
|
||||
./test_exit_pool,
|
||||
./test_helpers,
|
||||
|
|
|
@ -0,0 +1,132 @@
|
|||
import
|
||||
std/unittest,
|
||||
chronos, stew/shims/net, eth/keys, eth/p2p/discoveryv5/enr,
|
||||
../beacon_chain/conf,
|
||||
../beacon_chain/spec/datatypes,
|
||||
../beacon_chain/networking/[eth2_network, eth2_discovery],
|
||||
./testutil
|
||||
|
||||
template timedAsyncTest*(name, body: untyped) =
|
||||
timedTest name:
|
||||
proc scenario {.async.} = body
|
||||
waitFor scenario()
|
||||
|
||||
proc new*(T: type Eth2DiscoveryProtocol,
|
||||
pk: keys.PrivateKey,
|
||||
enrIp: Option[ValidIpAddress], enrTcpPort, enrUdpPort: Option[Port],
|
||||
bindPort: Port, bindIp: ValidIpAddress,
|
||||
enrFields: openArray[(string, seq[byte])] = [],
|
||||
rng: ref BrHmacDrbgContext):
|
||||
T {.raises: [Exception, Defect].} =
|
||||
|
||||
newProtocol(pk, enrIp, enrTcpPort, enrUdpPort, enrFields,
|
||||
bindPort = bindPort, bindIp = bindIp, rng = rng)
|
||||
|
||||
proc generateNode(rng: ref BrHmacDrbgContext, port: Port,
|
||||
enrFields: openArray[(string, seq[byte])] = []): Eth2DiscoveryProtocol =
|
||||
let ip = ValidIpAddress.init("127.0.0.1")
|
||||
Eth2DiscoveryProtocol.new(keys.PrivateKey.random(rng[]),
|
||||
some(ip), some(port), some(port), port, ip, enrFields, rng = rng)
|
||||
|
||||
suiteReport "Eth2 specific discovery tests":
|
||||
let
|
||||
rng = keys.newRng()
|
||||
enrForkId = ENRForkID(
|
||||
fork_digest: ForkDigest([byte 0, 1, 2, 3]),
|
||||
next_fork_version: Version([byte 0, 0, 0, 0]),
|
||||
next_fork_epoch: Epoch(0))
|
||||
|
||||
timedAsyncTest "Subnet query":
|
||||
var attnets: BitArray[ATTESTATION_SUBNET_COUNT]
|
||||
attnets.setBit(34)
|
||||
|
||||
let
|
||||
node1 = generateNode(rng, Port(5000))
|
||||
node2 = generateNode(rng, Port(5001),
|
||||
{"eth2": SSZ.encode(enrForkId), "attnets": SSZ.encode(attnets)})
|
||||
|
||||
node1.open()
|
||||
node2.open()
|
||||
|
||||
# ping in one direction to add node2 to routing table of node1
|
||||
check (await node2.ping(node1.localNode)).isOk()
|
||||
|
||||
var attnetsSelected: BitArray[ATTESTATION_SUBNET_COUNT]
|
||||
attnetsSelected.setBit(42)
|
||||
attnetsSelected.setBit(34)
|
||||
|
||||
let discovered = await node1.queryRandom(enrForkId, attnetsSelected)
|
||||
check discovered.len == 1
|
||||
|
||||
await node1.closeWait()
|
||||
await node2.closeWait()
|
||||
|
||||
timedAsyncTest "Invalid attnets field":
|
||||
var invalidAttnets: BitArray[ATTESTATION_SUBNET_COUNT div 2]
|
||||
invalidAttnets.setBit(15)
|
||||
# TODO: This doesn't fail actually.
|
||||
# var invalidAttnets2: BitArray[ATTESTATION_SUBNET_COUNT * 2]
|
||||
# invalidAttnets2.setBit(15)
|
||||
var attnets: BitArray[ATTESTATION_SUBNET_COUNT]
|
||||
attnets.setBit(15)
|
||||
|
||||
let
|
||||
node1 = generateNode(rng, Port(5000))
|
||||
node2 = generateNode(rng, Port(5001),
|
||||
{"eth2": SSZ.encode(enrForkId), "attnets": SSZ.encode(invalidAttnets)})
|
||||
node3 = generateNode(rng, Port(5002),
|
||||
{"eth2": SSZ.encode(enrForkId), "attnets": SSZ.encode(attnets)})
|
||||
|
||||
node1.open()
|
||||
node2.open()
|
||||
node3.open()
|
||||
|
||||
check (await node2.ping(node1.localNode)).isOk()
|
||||
check (await node3.ping(node1.localNode)).isOk()
|
||||
|
||||
var attnetsSelected: BitArray[ATTESTATION_SUBNET_COUNT]
|
||||
attnetsSelected.setBit(15)
|
||||
attnetsSelected.setBit(42)
|
||||
|
||||
let discovered = await node1.queryRandom(enrForkId, attnetsSelected)
|
||||
check discovered.len == 1
|
||||
|
||||
await node1.closeWait()
|
||||
await node2.closeWait()
|
||||
await node3.closeWait()
|
||||
|
||||
timedAsyncTest "Subnet query after ENR update":
|
||||
var attnets: BitArray[ATTESTATION_SUBNET_COUNT]
|
||||
attnets.setBit(1)
|
||||
|
||||
let
|
||||
node1 = generateNode(rng, Port(5000))
|
||||
node2 = generateNode(rng, Port(5001),
|
||||
{"eth2": SSZ.encode(enrForkId), "attnets": SSZ.encode(attnets)})
|
||||
|
||||
node1.open()
|
||||
node2.open()
|
||||
|
||||
check (await node2.ping(node1.localNode)).isOk()
|
||||
|
||||
var attnetsSelected: BitArray[ATTESTATION_SUBNET_COUNT]
|
||||
attnetsSelected.setBit(2)
|
||||
|
||||
block:
|
||||
let discovered = await node1.queryRandom(enrForkId, attnetsSelected)
|
||||
check discovered.len == 0
|
||||
|
||||
block:
|
||||
attnets.setBit(2)
|
||||
check node2.updateRecord(
|
||||
{"eth2": SSZ.encode(enrForkId), "attnets": SSZ.encode(attnets)}).isOk()
|
||||
|
||||
let nodes = await node1.findNode(node2.localNode, @[0'u32])
|
||||
check nodes.isOk() and nodes[].len > 0
|
||||
discard node1.addNode(nodes[][0])
|
||||
|
||||
let discovered = await node1.queryRandom(enrForkId, attnetsSelected)
|
||||
check discovered.len == 1
|
||||
|
||||
await node1.closeWait()
|
||||
await node2.closeWait()
|
Loading…
Reference in New Issue