Improve discovery during fork (#2913)

This commit is contained in:
Tanguy 2021-09-29 13:06:16 +02:00 committed by GitHub
parent 1dc94aa36f
commit 06df40454f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 122 additions and 9 deletions

View File

@ -75,6 +75,7 @@ type
connWorkers: seq[Future[void]]
connTable: HashSet[PeerID]
forkId*: ENRForkID
discoveryForkId*: ENRForkID
forkDigests*: ForkDigestsRef
rng*: ref BrHmacDrbgContext
peers*: Table[PeerID, Peer]
@ -902,6 +903,22 @@ proc toPeerAddr(node: Node): Result[PeerAddr, cstring] =
let peerAddr = ? nodeRecord.toPeerAddr(tcpProtocol)
ok(peerAddr)
proc isCompatibleForkId*(discoveryForkId: ENRForkID, peerForkId: ENRForkID): bool =
if discoveryForkId.fork_digest == peerForkId.fork_digest:
if discoveryForkId.next_fork_version < peerForkId.next_fork_version:
# Peer knows about a fork and we don't
true
elif discoveryForkId.next_fork_version == peerForkId.next_fork_version:
# We should have the same next_fork_epoch
discoveryForkId.next_fork_epoch == peerForkId.next_fork_epoch
else:
# Our next fork version is bigger than the peer's one
false
else:
# Wrong fork digest
false
proc queryRandom*(
d: Eth2DiscoveryProtocol,
forkId: ENRForkID,
@ -910,13 +927,26 @@ proc queryRandom*(
## Perform a discovery query for a random target
## (forkId) and matching at least one of the attestation subnets.
let eth2Field = SSZ.encode(forkId)
let nodes = await d.queryRandom((enrForkIdField, eth2Field))
let nodes = await d.queryRandom()
var filtered: seq[(int, Node)]
for n in nodes:
var score: int = 0
let eth2FieldBytes = n.record.tryGet(enrForkIdField, seq[byte])
if eth2FieldBytes.isNone():
continue
let peerForkId =
try:
SSZ.decode(eth2FieldBytes.get(), ENRForkID)
except SszError as e:
debug "Could not decode the eth2 field of peer",
peer = n.record.toURI(), exception = e.name, msg = e.msg
continue
if not forkId.isCompatibleForkId(peerForkId):
continue
let attnetsBytes = n.record.tryGet(enrAttestationSubnetsField, seq[byte])
if attnetsBytes.isSome():
let attnetsNode =
@ -1067,7 +1097,7 @@ proc runDiscoveryLoop*(node: Eth2Node) {.async.} =
if wantedAttnetsCount > 0 or wantedSyncnetsCount > 0:
let discoveredNodes = await node.discovery.queryRandom(
node.forkId, wantedAttnets, wantedSyncnets)
node.discoveryForkId, wantedAttnets, wantedSyncnets)
let newPeers = block:
var np = newSeq[PeerAddr]()
@ -1276,7 +1306,7 @@ proc onConnEvent(node: Eth2Node, peerId: PeerID, event: ConnEvent) {.async.} =
peer.connectionState = Disconnected
proc new*(T: type Eth2Node, config: BeaconNodeConf, runtimeCfg: RuntimeConfig,
enrForkId: ENRForkID, forkDigests: ForkDigestsRef,
enrForkId: ENRForkID, discoveryForkId: ENRForkId, forkDigests: ForkDigestsRef,
getBeaconTime: GetBeaconTimeFn, switch: Switch,
pubsub: GossipSub, ip: Option[ValidIpAddress], tcpPort,
udpPort: Option[Port], privKey: keys.PrivateKey, discovery: bool,
@ -1305,6 +1335,7 @@ proc new*(T: type Eth2Node, config: BeaconNodeConf, runtimeCfg: RuntimeConfig,
# the previous netkey.
metadata: metadata,
forkId: enrForkId,
discoveryForkId: discoveryForkId,
forkDigests: forkDigests,
getBeaconTime: getBeaconTime,
discovery: Eth2DiscoveryProtocol.new(
@ -1810,6 +1841,9 @@ proc createEth2Node*(rng: ref BrHmacDrbgContext,
enrForkId = getENRForkID(
cfg, getBeaconTime().slotOrZero.epoch, genesisValidatorsRoot)
discoveryForkId = getDiscoveryForkID(
cfg, getBeaconTime().slotOrZero.epoch, genesisValidatorsRoot)
(extIp, extTcpPort, extUdpPort) = try: setupAddress(
config.nat, config.listenAddress, config.tcpPort, config.udpPort, clientId)
except CatchableError as exc: raise exc
@ -1892,7 +1926,7 @@ proc createEth2Node*(rng: ref BrHmacDrbgContext,
switch.mount(pubsub)
Eth2Node.new(
config, cfg, enrForkId, forkDigests, getBeaconTime, switch, pubsub, extIp,
config, cfg, enrForkId, discoveryForkId, forkDigests, getBeaconTime, switch, pubsub, extIp,
extTcpPort, extUdpPort, netKeys.seckey.asEthKey,
discovery = config.discv5Enabled, rng = rng)
@ -2083,7 +2117,7 @@ proc updateSyncnetsMetadata*(
else:
debug "Sync committees changed; updated ENR syncnets", syncnets
proc updateForkId*(node: Eth2Node, value: ENRForkID) =
proc updateForkId(node: Eth2Node, value: ENRForkID) =
node.forkId = value
let res = node.discovery.updateRecord({enrForkIdField: SSZ.encode value})
if res.isErr():
@ -2093,6 +2127,10 @@ proc updateForkId*(node: Eth2Node, value: ENRForkID) =
else:
debug "ENR fork id changed", value
proc updateForkId*(node: Eth2Node, epoch: Epoch, genesisValidatorsRoot: Eth2Digest) =
node.updateForkId(getENRForkId(node.cfg, epoch, genesisValidatorsRoot))
node.discoveryForkId = getDiscoveryForkID(node.cfg, epoch, genesisValidatorsRoot)
# https://github.com/ethereum/consensus-specs/blob/v1.0.1/specs/phase0/validator.md#phase-0-attestation-subnet-stability
func getStabilitySubnetLength*(node: Eth2Node): uint64 =
EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION +

View File

@ -1096,9 +1096,9 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
next_action_wait.set(nextActionWaitTime.toFloatSeconds)
let epoch = slot.epoch
if epoch >= node.network.forkId.next_fork_epoch:
node.network.updateForkId(
node.dag.cfg.getENRForkID(epoch, node.dag.genesisValidatorsRoot))
if epoch + 1 >= node.network.forkId.next_fork_epoch:
# Update 1 epoch early to block non-fork-ready peers
node.network.updateForkId(epoch, node.dag.genesisValidatorsRoot)
node.updateGossipStatus(slot)

View File

@ -763,6 +763,8 @@ func toGaugeValue*(x: uint64 | Epoch | Slot): int64 =
# TODO where's borrow support when you need it
func `==`*(a, b: ForkDigest | Version): bool =
array[4, byte](a) == array[4, byte](b)
func `<`*(a, b: ForkDigest | Version): bool =
uint32.fromBytesBE(array[4, byte](a)) < uint32.fromBytesBE(array[4, byte](b))
func len*(v: ForkDigest | Version): int = sizeof(v)
func low*(v: ForkDigest | Version): int = 0
func high*(v: ForkDigest | Version): int = len(v) - 1

View File

@ -105,3 +105,19 @@ func getENRForkID*(cfg: RuntimeConfig,
fork_digest: fork_digest,
next_fork_version: next_fork_version,
next_fork_epoch: cfg.nextForkEpochAtEpoch(epoch))
func getDiscoveryForkID*(cfg: RuntimeConfig,
epoch: Epoch,
genesis_validators_root: Eth2Digest): ENRForkID =
# Until 1 epoch from fork, returns pre-fork value
if epoch + 1 >= cfg.ALTAIR_FORK_EPOCH:
getENRForkID(cfg, epoch, genesis_validators_root)
else:
let
current_fork_version = cfg.forkVersionAtEpoch(epoch)
fork_digest = compute_fork_digest(current_fork_version,
genesis_validators_root)
ENRForkID(
fork_digest: fork_digest,
next_fork_version: current_fork_version,
next_fork_epoch: FAR_FUTURE_EPOCH)

View File

@ -149,3 +149,60 @@ suite "Eth2 specific discovery tests":
await node1.closeWait()
await node2.closeWait()
suite "Fork id compatibility test":
test "Digest check":
check false == isCompatibleForkId(
ENRForkID(
fork_digest: ForkDigest([byte 0, 1, 2, 3]),
next_fork_version: Version([byte 0, 0, 0, 0]),
next_fork_epoch: Epoch(0)),
ENRForkID(
fork_digest: ForkDigest([byte 9, 9, 9, 9]),
next_fork_version: Version([byte 0, 0, 0, 0]),
next_fork_epoch: Epoch(0)))
check true == isCompatibleForkId(
ENRForkID(
fork_digest: ForkDigest([byte 0, 1, 2, 3]),
next_fork_version: Version([byte 0, 0, 0, 0]),
next_fork_epoch: Epoch(0)),
ENRForkID(
fork_digest: ForkDigest([byte 0, 1, 2, 3]),
next_fork_version: Version([byte 0, 0, 0, 0]),
next_fork_epoch: Epoch(0)))
test "Fork check":
# Future fork should work
check true == isCompatibleForkId(
ENRForkID(
fork_digest: ForkDigest([byte 0, 1, 2, 3]),
next_fork_version: Version([byte 0, 0, 0, 0]),
next_fork_epoch: Epoch(0)),
ENRForkID(
fork_digest: ForkDigest([byte 0, 1, 2, 3]),
next_fork_version: Version([byte 2, 2, 2, 2]),
next_fork_epoch: Epoch(2)))
# Past fork should fail
check false == isCompatibleForkId(
ENRForkID(
fork_digest: ForkDigest([byte 0, 1, 2, 3]),
next_fork_version: Version([byte 0, 0, 0, 1]),
next_fork_epoch: Epoch(0)),
ENRForkID(
fork_digest: ForkDigest([byte 0, 1, 2, 3]),
next_fork_version: Version([byte 0, 0, 0, 0]),
next_fork_epoch: Epoch(0)))
test "Next fork epoch check":
# Same fork should check next_fork_epoch
check false == isCompatibleForkId(
ENRForkID(
fork_digest: ForkDigest([byte 0, 1, 2, 3]),
next_fork_version: Version([byte 0, 0, 0, 0]),
next_fork_epoch: Epoch(0)),
ENRForkID(
fork_digest: ForkDigest([byte 0, 1, 2, 3]),
next_fork_version: Version([byte 0, 0, 0, 0]),
next_fork_epoch: Epoch(2)))