diff --git a/beacon_chain/conf.nim b/beacon_chain/conf.nim index 50afde191..fc91e4e02 100644 --- a/beacon_chain/conf.nim +++ b/beacon_chain/conf.nim @@ -37,7 +37,8 @@ export defaultEth2TcpPort, enabledLogLevel, ValidIpAddress, defs, parseCmdArg, completeCmdArg, network_metadata, el_conf, network, BlockHashOrNumber, - confTomlDefs, confTomlNet, confTomlUri + confTomlDefs, confTomlNet, confTomlUri, + LightClientDataImportMode declareGauge network_name, "network name", ["name"] @@ -165,6 +166,11 @@ type desc: "Remote Web3Signer URL that will be used as a source of validators" name: "validators-source"}: Option[string] + validatorsSourceInverval* {. + desc: "Number of minutes between validator list updates" + name: "validators-source-interval" + defaultValue: 60 .}: Natural + secretsDirFlag* {. desc: "A directory containing validator keystore passwords" name: "secrets-dir" .}: Option[InputDir] @@ -883,6 +889,11 @@ type desc: "Remote Web3Signer URL that will be used as a source of validators" name: "validators-source"}: Option[string] + validatorsSourceInverval* {. + desc: "Number of minutes between validator list updates" + name: "validators-source-interval" + defaultValue: 60 .}: Natural + secretsDirFlag* {. desc: "A directory containing validator keystore passwords" name: "secrets-dir" .}: Option[InputDir] diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index c5e685706..f480e112d 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -1617,6 +1617,7 @@ proc run(node: BeaconNode) {.raises: [CatchableError].} = waitFor node.updateGossipStatus(wallSlot) + asyncSpawn pollForDynamicValidators(node) asyncSpawn runSlotLoop(node, wallTime, onSlotStart) asyncSpawn runOnSecondLoop(node) asyncSpawn runQueueProcessingLoop(node.blockProcessor) diff --git a/beacon_chain/nimbus_validator_client.nim b/beacon_chain/nimbus_validator_client.nim index 946fec1d6..fdd2b65be 100644 --- a/beacon_chain/nimbus_validator_client.nim +++ b/beacon_chain/nimbus_validator_client.nim @@ -90,10 +90,12 @@ proc initValidators(vc: ValidatorClientRef): Future[bool] {.async.} = var duplicates: seq[ValidatorPubKey] for keystore in listLoadableKeystores(vc.config, vc.keystoreCache): vc.addValidator(keystore) - let dynamicKeystores = await queryValidatorsSource(vc.config) - for keystore in dynamicKeystores: - vc.addValidator(keystore) - return true + let res = await queryValidatorsSource(vc.config) + if res.isOk(): + let dynamicKeystores = res.get() + for keystore in dynamicKeystores: + vc.addValidator(keystore) + true proc initClock(vc: ValidatorClientRef): Future[BeaconClock] {.async.} = # This procedure performs initialization of BeaconClock using current genesis diff --git a/beacon_chain/validator_client/duties_service.nim b/beacon_chain/validator_client/duties_service.nim index 00e621d16..eb69c249d 100644 --- a/beacon_chain/validator_client/duties_service.nim +++ b/beacon_chain/validator_client/duties_service.nim @@ -19,7 +19,7 @@ logScope: service = ServiceName type DutiesServiceLoop* = enum AttesterLoop, ProposerLoop, IndicesLoop, SyncCommitteeLoop, - ProposerPreparationLoop, ValidatorRegisterLoop + ProposerPreparationLoop, ValidatorRegisterLoop, DynamicValidatorsLoop chronicles.formatIt(DutiesServiceLoop): case it @@ -29,6 +29,7 @@ chronicles.formatIt(DutiesServiceLoop): of SyncCommitteeLoop: "sync_committee_loop" of ProposerPreparationLoop: "proposer_prepare_loop" of ValidatorRegisterLoop: "validator_register_loop" + of DynamicValidatorsLoop: "dynamic_validators_loop" proc checkDuty(duty: RestAttesterDuty): bool = (duty.committee_length <= MAX_VALIDATORS_PER_COMMITTEE) and @@ -588,6 +589,38 @@ proc validatorIndexLoop(service: DutiesServiceRef) {.async.} = await service.pollForValidatorIndices() await service.waitForNextSlot() +proc dynamicValidatorsLoop*(service: DutiesServiceRef) {.async.} = + let vc = service.client + doAssert(vc.config.validatorsSourceInverval > 0) + + proc addValidatorProc(data: KeystoreData) = + vc.addValidator(data) + + var + timeout = minutes(vc.config.validatorsSourceInverval) + exitLoop = false + + while not(exitLoop): + exitLoop = + try: + await sleepAsync(timeout) + timeout = + block: + let res = await vc.config.queryValidatorsSource() + if res.isOk(): + let keystores = res.get() + debug "Validators source has been polled for validators", + keystores_found = len(keystores), + validators_source = vc.config.validatorsSource + vc.attachedValidators.updateDynamicValidators(keystores, + addValidatorProc) + minutes(vc.config.validatorsSourceInverval) + else: + seconds(5) + false + except CancelledError: + true + proc proposerPreparationsLoop(service: DutiesServiceRef) {.async.} = let vc = service.client debug "Beacon proposer preparation loop is waiting for initialization" @@ -661,6 +694,12 @@ proc mainLoop(service: DutiesServiceRef) {.async.} = service.validatorRegisterLoop() else: nil + dynamicFut = + if vc.config.validatorsSourceInverval > 0: + service.dynamicValidatorsLoop() + else: + debug "Dynamic validators update loop disabled" + nil while true: # This loop could look much more nicer/better, when @@ -673,7 +712,8 @@ proc mainLoop(service: DutiesServiceRef) {.async.} = FutureBase(proposeFut), FutureBase(indicesFut), FutureBase(syncFut), - FutureBase(prepareFut) + FutureBase(prepareFut), + FutureBase(dynamicFut) ] if not(isNil(registerFut)): futures.add(FutureBase(registerFut)) discard await race(futures) @@ -687,6 +727,9 @@ proc mainLoop(service: DutiesServiceRef) {.async.} = if not(isNil(registerFut)): checkAndRestart(ValidatorRegisterLoop, registerFut, service.validatorRegisterLoop()) + if not(isNil(dynamicFut)): + checkAndRestart(DynamicValidatorsLoop, dynamicFut, + service.dynamicValidatorsLoop()) false except CancelledError: debug "Service interrupted" @@ -703,6 +746,8 @@ proc mainLoop(service: DutiesServiceRef) {.async.} = pending.add(prepareFut.cancelAndWait()) if not(isNil(registerFut)) and not(registerFut.finished()): pending.add(registerFut.cancelAndWait()) + if not(isNil(dynamicFut)) and not(dynamicFut.finished()): + pending.add(dynamicFut.cancelAndWait()) if not(isNil(service.pollingAttesterDutiesTask)) and not(service.pollingAttesterDutiesTask.finished()): pending.add(service.pollingAttesterDutiesTask.cancelAndWait()) diff --git a/beacon_chain/validators/beacon_validators.nim b/beacon_chain/validators/beacon_validators.nim index 65d3b8db1..359b33595 100644 --- a/beacon_chain/validators/beacon_validators.nim +++ b/beacon_chain/validators/beacon_validators.nim @@ -138,7 +138,12 @@ proc addValidators*(node: BeaconNode) = let dynamicStores = try: - waitFor(queryValidatorsSource(node.config)) + let res = waitFor(queryValidatorsSource(node.config)) + if res.isErr(): + # Error is already reported via log warning. + default(seq[KeystoreData]) + else: + res.get() except CatchableError as exc: warn "Unexpected error happens while polling validator's source", error = $exc.name, reason = $exc.msg @@ -161,6 +166,48 @@ proc addValidators*(node: BeaconNode) = gasLimit) v.updateValidator(data) +proc pollForDynamicValidators*(node: BeaconNode) {.async.} = + if node.config.validatorsSourceInverval == 0: + return + + proc addValidatorProc(keystore: KeystoreData) = + let + epoch = node.currentSlot().epoch + index = Opt.none(ValidatorIndex) + feeRecipient = + node.consensusManager[].getFeeRecipient(keystore.pubkey, index, epoch) + gasLimit = + node.consensusManager[].getGasLimit(keystore.pubkey) + discard node.attachedValidators[].addValidator(keystore, feeRecipient, + gasLimit) + + var + timeout = minutes(node.config.validatorsSourceInverval) + exitLoop = false + + while not(exitLoop): + exitLoop = + try: + await sleepAsync(timeout) + timeout = + block: + let res = await node.config.queryValidatorsSource() + if res.isOk(): + let keystores = res.get() + debug "Validators source has been polled for validators", + keystores_found = len(keystores), + validators_source = node.config.validatorsSource + node.attachedValidators.updateDynamicValidators(keystores, + addValidatorProc) + minutes(node.config.validatorsSourceInverval) + else: + # In case of error we going to repeat our call with much smaller + # interval. + seconds(5) + false + except CancelledError: + true + proc getValidator*(node: BeaconNode, idx: ValidatorIndex): Opt[AttachedValidator] = let key = ? node.dag.validatorKey(idx) node.attachedValidators[].getValidator(key.toPubKey()) diff --git a/beacon_chain/validators/keystore_management.nim b/beacon_chain/validators/keystore_management.nim index b91b650e4..177fbe32a 100644 --- a/beacon_chain/validators/keystore_management.nim +++ b/beacon_chain/validators/keystore_management.nim @@ -95,6 +95,8 @@ type MultipleKeystoresDecryptor* = object previouslyUsedPassword*: string + QueryResult = Result[seq[KeystoreData], string] + const minPasswordLen = 12 minPasswordEntropy = 60.0 @@ -627,12 +629,11 @@ proc existsKeystore(keystoreDir: string, return true false -proc queryValidatorsSource*(config: AnyConf): Future[seq[KeystoreData]] {. - async.} = +proc queryValidatorsSource*(config: AnyConf): Future[QueryResult] {.async.} = var keystores: seq[KeystoreData] if config.validatorsSource.isNone() or len(config.validatorsSource.get()) == 0: - return keystores + return QueryResult.ok(keystores) let vsource = config.validatorsSource.get() logScope: @@ -647,9 +648,9 @@ proc queryValidatorsSource*(config: AnyConf): Future[seq[KeystoreData]] {. let res = RestClientRef.new(vsource, prestoFlags, httpFlags, socketFlags = socketFlags) if res.isErr(): - # TODO keep trying in case of temporary network failure - warn "Unable to resolve validator's source distributed signer address" - return keystores + warn "Unable to resolve validator's source distributed signer " & + "address", reason = $res.error + return QueryResult.err($res.error) res.get() keys = try: @@ -657,26 +658,27 @@ proc queryValidatorsSource*(config: AnyConf): Future[seq[KeystoreData]] {. if response.status != 200: warn "Remote validator's source responded with error", error = response.status - return keystores + return QueryResult.err( + "Remote validator's source responded with error [" & + $response.status & "]") let res = decodeBytes(Web3SignerKeysResponse, response.data, response.contentType) if res.isErr(): warn "Unable to obtain validator's source response", reason = res.error - return keystores - + return QueryResult.err($res.error) res.get() except RestError as exc: warn "Unable to poll validator's source", reason = $exc.msg - return keystores + return QueryResult.err($exc.msg) except CancelledError as exc: debug "The polling of validator's source was interrupted" raise exc except CatchableError as exc: warn "Unexpected error occured while polling validator's source", error = $exc.name, reason = $exc.msg - return keystores + return QueryResult.err($exc.msg) for pubkey in keys: keystores.add(KeystoreData( @@ -689,7 +691,7 @@ proc queryValidatorsSource*(config: AnyConf): Future[seq[KeystoreData]] {. flags: {RemoteKeystoreFlag.DynamicKeystore}, remoteType: RemoteSignerType.Web3Signer)) - keystores + QueryResult.ok(keystores) iterator listLoadableKeys*(validatorsDir, secretsDir: string, keysMask: set[KeystoreKind]): CookedPubKey = diff --git a/beacon_chain/validators/validator_pool.nim b/beacon_chain/validators/validator_pool.nim index f4bd0a9be..c6fc5ed67 100644 --- a/beacon_chain/validators/validator_pool.nim +++ b/beacon_chain/validators/validator_pool.nim @@ -87,6 +87,8 @@ type slashingProtection*: SlashingProtectionDB doppelgangerDetectionEnabled*: bool + AddValidatorProc* = proc(keystore: KeystoreData) {.gcsafe, raises: [].} + template pubkey*(v: AttachedValidator): ValidatorPubKey = v.data.pubkey @@ -221,8 +223,12 @@ proc removeValidator*(pool: var ValidatorPool, pubkey: ValidatorPubKey) = of ValidatorKind.Local: notice "Local validator detached", pubkey, validator = shortLog(validator) of ValidatorKind.Remote: - notice "Remote validator detached", pubkey, - validator = shortLog(validator) + if RemoteKeystoreFlag.DynamicKeystore in validator.data.flags: + notice "Dynamic remote validator detached", pubkey, + validator = shortLog(validator) + else: + notice "Remote validator detached", pubkey, + validator = shortLog(validator) validators.set(pool.count().int64) func needsUpdate*(validator: AttachedValidator): bool = @@ -373,6 +379,46 @@ func triggersDoppelganger*( let v = pool.getValidator(pubkey) v.isSome() and v[].triggersDoppelganger(epoch) +proc updateDynamicValidators*(pool: ref ValidatorPool, + keystores: openArray[KeystoreData], + addProc: AddValidatorProc) = + var + keystoresTable: Table[ValidatorPubKey, Opt[KeystoreData]] + deleteValidators: seq[ValidatorPubKey] + + for keystore in keystores: + keystoresTable[keystore.pubkey] = Opt.some(keystore) + + # We preserve `Local` and `Remote` keystores which are not from dynamic set, + # and also we removing all the dynamic keystores which are not part of new + # dynamic set. + for validator in pool[].items(): + if validator.kind == ValidatorKind.Remote: + if RemoteKeystoreFlag.DynamicKeystore in validator.data.flags: + let keystore = keystoresTable.getOrDefault(validator.pubkey) + if keystore.isSome(): + # Just update validator's `data` field with new data from keystore. + validator.data = keystore.get() + else: + deleteValidators.add(validator.pubkey) + + for pubkey in deleteValidators: + pool[].removeValidator(pubkey) + + # Adding new dynamic keystores. + for keystore in keystores.items(): + let res = pool[].getValidator(keystore.pubkey) + if res.isSome(): + let validator = res.get() + if validator.kind != ValidatorKind.Remote or + RemoteKeystoreFlag.DynamicKeystore notin validator.data.flags: + warn "Attempt to replace local validator with dynamic remote validator", + pubkey = validator.pubkey, validator = shortLog(validator), + remote_signer = $keystore.remotes, + local_validator_kind = validator.kind + else: + addProc(keystore) + proc signWithDistributedKey(v: AttachedValidator, request: Web3SignerRequest): Future[SignatureResult] {.async.} = diff --git a/tests/test_validator_pool.nim b/tests/test_validator_pool.nim index 76c2438fc..036c59406 100644 --- a/tests/test_validator_pool.nim +++ b/tests/test_validator_pool.nim @@ -8,8 +8,26 @@ {.used.} import - unittest2, - ../beacon_chain/validators/validator_pool + std/[algorithm, sequtils], + chronos/unittest2/asynctests, + presto, confutils, + ../beacon_chain/validators/[validator_pool, keystore_management], + ../beacon_chain/[conf, beacon_node] + +func createPubKey(number: int8): ValidatorPubKey = + var res = ValidatorPubKey() + res.blob[0] = uint8(number) + res + +func createLocal(pubkey: ValidatorPubKey): KeystoreData = + KeystoreData(kind: KeystoreKind.Local, pubkey: pubkey) + +func createRemote(pubkey: ValidatorPubKey): KeystoreData = + KeystoreData(kind: KeystoreKind.Remote, pubkey: pubkey) + +func createDynamic(pubkey: ValidatorPubKey): KeystoreData = + KeystoreData(kind: KeystoreKind.Remote, pubkey: pubkey, + flags: {RemoteKeystoreFlag.DynamicKeystore}) func makeValidatorAndIndex( index: ValidatorIndex, activation_epoch: Epoch): Opt[ValidatorAndIndex] = @@ -18,6 +36,33 @@ func makeValidatorAndIndex( validator: Validator(activation_epoch: activation_epoch) ) +func cmp(a, b: array[48, byte]): int = + for index, ch in a.pairs(): + if ch < b[index]: + return -1 + elif ch > b[index]: + return 1 + 0 + +func cmp(a, b: KeystoreData): int = + if (a.kind == b.kind) and (a.pubkey == b.pubkey): + if a.kind == KeystoreKind.Remote: + if a.flags == b.flags: + 0 + else: + card(a.flags) - card(b.flags) + else: + 0 + else: + cmp(a.pubkey.blob, b.pubkey.blob) + +func checkResponse(a, b: openArray[KeystoreData]): bool = + if len(a) != len(b): return false + for index, item in a.pairs(): + if cmp(item, b[index]) != 0: + return false + true + suite "Validator pool": test "Doppelganger for genesis validator": let @@ -80,3 +125,171 @@ suite "Validator pool": not v.doppelgangerReady(GENESIS_EPOCH.start_slot) v.doppelgangerReady(now) + + asyncTest "Dynamic validator set: queryValidatorsSource() test": + proc makeJson(keys: openArray[ValidatorPubKey]): string = + var res = "[" + res.add(keys.mapIt("\"0x" & it.toHex() & "\"").join(",")) + res.add("]") + res + + var testStage = 0 + proc testValidate(pattern: string, value: string): int = 0 + var router = RestRouter.init(testValidate) + router.api(MethodGet, "/api/v1/eth2/publicKeys") do () -> RestApiResponse: + case testStage + of 0: + let data = [createPubKey(1), createPubKey(2)].makeJson() + return RestApiResponse.response(data, Http200, "application/json") + of 1: + let data = [createPubKey(1)].makeJson() + return RestApiResponse.response(data, Http200, "application/json") + of 2: + var data: seq[ValidatorPubKey] + return RestApiResponse.response(data.makeJson(), Http200, + "application/json") + else: + return RestApiResponse.response("INCORRECT TEST STAGE", Http400, + "text/plain") + + var sres = RestServerRef.new(router, initTAddress("127.0.0.1:0")) + let + server = sres.get() + serverAddress = server.server.instance.localAddress() + config = + try: + BeaconNodeConf.load(cmdLine = + mapIt(["--validators-source=http://" & $serverAddress], it)) + except Exception as exc: + raiseAssert exc.msg + + server.start() + try: + block: + testStage = 0 + let res = await queryValidatorsSource(config) + check: + res.isOk() + checkResponse( + res.get(), + [createDynamic(createPubKey(1)), createDynamic(createPubKey(2))]) + block: + testStage = 1 + let res = await queryValidatorsSource(config) + check: + res.isOk() + checkResponse(res.get(), [createDynamic(createPubKey(1))]) + block: + testStage = 2 + let res = await queryValidatorsSource(config) + check: + res.isOk() + len(res.get()) == 0 + block: + testStage = 3 + let res = await queryValidatorsSource(config) + check: + res.isErr() + finally: + await server.closeWait() + + test "Dynamic validator set: updateDynamicValidators() test": + let + fee = default(Eth1Address) + gas = 30000000'u64 + + proc checkPool(pool: ValidatorPool, expected: openArray[KeystoreData]) = + let + attachedKeystores = + block: + var res: seq[KeystoreData] + for validator in pool: + res.add(validator.data) + sorted(res, cmp) + sortedExpected = sorted(expected, cmp) + + for index, value in attachedKeystores: + check cmp(value, sortedExpected[index]) == 0 + + var pool = (ref ValidatorPool)() + discard pool[].addValidator(createLocal(createPubKey(1)), fee, gas) + discard pool[].addValidator(createRemote(createPubKey(2)), fee, gas) + discard pool[].addValidator(createDynamic(createPubKey(3)), fee, gas) + + proc addValidator(data: KeystoreData) {.gcsafe.} = + discard pool[].addValidator(data, fee, gas) + + # Adding new dynamic keystores. + block: + let + expected = [ + createLocal(createPubKey(1)), + createRemote(createPubKey(2)), + createDynamic(createPubKey(3)), + createDynamic(createPubKey(4)), + createDynamic(createPubKey(5)) + ] + keystores = [ + createDynamic(createPubKey(3)), + createDynamic(createPubKey(4)), + createDynamic(createPubKey(5)) + ] + pool.updateDynamicValidators(keystores, addValidator) + pool[].checkPool(expected) + + # Removing dynamic keystores. + block: + let + expected = [ + createLocal(createPubKey(1)), + createRemote(createPubKey(2)), + createDynamic(createPubKey(3)) + ] + keystores = [ + createDynamic(createPubKey(3)), + ] + pool.updateDynamicValidators(keystores, addValidator) + pool[].checkPool(expected) + + # Adding and removing keystores at same time. + block: + let + expected = [ + createLocal(createPubKey(1)), + createRemote(createPubKey(2)), + createDynamic(createPubKey(4)), + createDynamic(createPubKey(5)) + ] + keystores = [ + createDynamic(createPubKey(4)), + createDynamic(createPubKey(5)) + ] + pool.updateDynamicValidators(keystores, addValidator) + pool[].checkPool(expected) + + # Adding dynamic keystores with keys which are static. + block: + let + expected = [ + createLocal(createPubKey(1)), + createRemote(createPubKey(2)), + createDynamic(createPubKey(3)) + ] + keystores = [ + createDynamic(createPubKey(1)), + createDynamic(createPubKey(2)), + createDynamic(createPubKey(3)), + ] + pool.updateDynamicValidators(keystores, addValidator) + pool[].checkPool(expected) + + # Empty response + block: + let + expected = [ + createLocal(createPubKey(1)), + createRemote(createPubKey(2)) + ] + var keystores: seq[KeystoreData] + pool.updateDynamicValidators(keystores, addValidator) + pool[].checkPool(expected)