VC: Use scoring function to select best attestation data when using multiple BNs. (#5101)
* Initial commit. * Move score selection log statement to debug level. * Fix proper float64 log format. * Cleanup imports and legacy code. * Address review comments. * Address review comments. * Fix scoring function. * Address review comments. * Address review comments 2. Fix registerBlock post-rebase issues. * Simplify innerLoop decision making. * Make getAttestationDataScore() more testable. Add tests for getAttestationDataScore(). * Add modified AllTests copy.
This commit is contained in:
parent
1e227b2704
commit
ac1b02698a
|
@ -575,9 +575,10 @@ OK: 24/24 Fail: 0/24 Skip: 0/24
|
||||||
OK: 1/1 Fail: 0/1 Skip: 0/1
|
OK: 1/1 Fail: 0/1 Skip: 0/1
|
||||||
## Validator Client test suite
|
## Validator Client test suite
|
||||||
```diff
|
```diff
|
||||||
|
+ getAttestationDataScore() test vectors OK
|
||||||
+ normalizeUri() test vectors OK
|
+ normalizeUri() test vectors OK
|
||||||
```
|
```
|
||||||
OK: 1/1 Fail: 0/1 Skip: 0/1
|
OK: 2/2 Fail: 0/2 Skip: 0/2
|
||||||
## Validator change pool testing suite
|
## Validator change pool testing suite
|
||||||
```diff
|
```diff
|
||||||
+ addValidatorChangeMessage/getAttesterSlashingMessage OK
|
+ addValidatorChangeMessage/getAttesterSlashingMessage OK
|
||||||
|
@ -688,4 +689,4 @@ OK: 2/2 Fail: 0/2 Skip: 0/2
|
||||||
OK: 9/9 Fail: 0/9 Skip: 0/9
|
OK: 9/9 Fail: 0/9 Skip: 0/9
|
||||||
|
|
||||||
---TOTAL---
|
---TOTAL---
|
||||||
OK: 389/394 Fail: 0/394 Skip: 5/394
|
OK: 390/395 Fail: 0/395 Skip: 5/395
|
||||||
|
|
|
@ -5,10 +5,11 @@
|
||||||
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||||
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||||
|
|
||||||
import chronicles
|
import std/strutils
|
||||||
import ../spec/eth2_apis/eth2_rest_serialization,
|
import chronicles, stew/base10
|
||||||
../spec/datatypes/[phase0, altair]
|
import ".."/spec/eth2_apis/eth2_rest_serialization,
|
||||||
import common, fallback_service
|
".."/spec/datatypes/[phase0, altair]
|
||||||
|
import "."/[common, fallback_service, scoring]
|
||||||
|
|
||||||
export eth2_rest_serialization, common
|
export eth2_rest_serialization, common
|
||||||
|
|
||||||
|
@ -35,12 +36,47 @@ type
|
||||||
status*: ApiOperation
|
status*: ApiOperation
|
||||||
data*: seq[ApiNodeResponse[T]]
|
data*: seq[ApiNodeResponse[T]]
|
||||||
|
|
||||||
|
ApiScore* = object
|
||||||
|
index*: int
|
||||||
|
score*: Opt[float64]
|
||||||
|
|
||||||
|
BestNodeResponse*[T] = object
|
||||||
|
node*: BeaconNodeServerRef
|
||||||
|
data*: ApiResponse[T]
|
||||||
|
score*: float64
|
||||||
|
|
||||||
const
|
const
|
||||||
ViableNodeStatus = {RestBeaconNodeStatus.Compatible,
|
ViableNodeStatus = {RestBeaconNodeStatus.Compatible,
|
||||||
RestBeaconNodeStatus.NotSynced,
|
RestBeaconNodeStatus.NotSynced,
|
||||||
RestBeaconNodeStatus.OptSynced,
|
RestBeaconNodeStatus.OptSynced,
|
||||||
RestBeaconNodeStatus.Synced}
|
RestBeaconNodeStatus.Synced}
|
||||||
|
|
||||||
|
proc `$`*(s: ApiScore): string =
|
||||||
|
var res = Base10.toString(uint64(s.index))
|
||||||
|
res.add(": ")
|
||||||
|
if s.score.isSome():
|
||||||
|
res.add(shortScore(s.score.get()))
|
||||||
|
else:
|
||||||
|
res.add("<n/a>")
|
||||||
|
res
|
||||||
|
|
||||||
|
proc `$`*(ss: openArray[ApiScore]): string =
|
||||||
|
"[" & ss.mapIt($it).join(",") & "]"
|
||||||
|
|
||||||
|
chronicles.formatIt(seq[ApiScore]):
|
||||||
|
$it
|
||||||
|
|
||||||
|
func init*(t: typedesc[ApiScore], node: BeaconNodeServerRef,
|
||||||
|
score: float64): ApiScore =
|
||||||
|
ApiScore(index: node.index, score: Opt.some(score))
|
||||||
|
|
||||||
|
func init*(t: typedesc[ApiScore], node: BeaconNodeServerRef): ApiScore =
|
||||||
|
ApiScore(index: node.index, score: Opt.none(float64))
|
||||||
|
|
||||||
|
func init*[T](t: typedesc[BestNodeResponse], node: BeaconNodeServerRef,
|
||||||
|
data: ApiResponse[T], score: float64): BestNodeResponse[T] =
|
||||||
|
BestNodeResponse[T](node: node, data: data, score: score)
|
||||||
|
|
||||||
proc lazyWaiter(node: BeaconNodeServerRef, request: FutureBase,
|
proc lazyWaiter(node: BeaconNodeServerRef, request: FutureBase,
|
||||||
requestName: string, strategy: ApiStrategyKind) {.async.} =
|
requestName: string, strategy: ApiStrategyKind) {.async.} =
|
||||||
try:
|
try:
|
||||||
|
@ -78,6 +114,18 @@ proc lazyWait(nodes: seq[BeaconNodeServerRef], requests: seq[FutureBase],
|
||||||
else:
|
else:
|
||||||
await allFutures(futures)
|
await allFutures(futures)
|
||||||
|
|
||||||
|
proc apiResponseOr[T](future: FutureBase, timerFut: Future[void],
|
||||||
|
message: string): ApiResponse[T] =
|
||||||
|
if future.finished():
|
||||||
|
doAssert(not(future.cancelled()))
|
||||||
|
if future.failed():
|
||||||
|
ApiResponse[T].err($future.error.msg)
|
||||||
|
else:
|
||||||
|
ApiResponse[T].ok(Future[T](future).read())
|
||||||
|
else:
|
||||||
|
doAssert(timerFut.finished())
|
||||||
|
ApiResponse[T].err(message)
|
||||||
|
|
||||||
template firstSuccessParallel*(
|
template firstSuccessParallel*(
|
||||||
vc: ValidatorClientRef,
|
vc: ValidatorClientRef,
|
||||||
responseType: typedesc,
|
responseType: typedesc,
|
||||||
|
@ -118,8 +166,7 @@ template firstSuccessParallel*(
|
||||||
# This case could not be happened.
|
# This case could not be happened.
|
||||||
error "Unexpected exception while waiting for beacon nodes",
|
error "Unexpected exception while waiting for beacon nodes",
|
||||||
err_name = $exc.name, err_msg = $exc.msg
|
err_name = $exc.name, err_msg = $exc.msg
|
||||||
var default: seq[BeaconNodeServerRef]
|
default(seq[BeaconNodeServerRef])
|
||||||
default
|
|
||||||
|
|
||||||
if len(onlineNodes) == 0:
|
if len(onlineNodes) == 0:
|
||||||
retRes = ApiResponse[handlerType].err("No online beacon node(s)")
|
retRes = ApiResponse[handlerType].err("No online beacon node(s)")
|
||||||
|
@ -183,15 +230,8 @@ template firstSuccessParallel*(
|
||||||
let
|
let
|
||||||
node {.inject.} = beaconNode
|
node {.inject.} = beaconNode
|
||||||
apiResponse {.inject.} =
|
apiResponse {.inject.} =
|
||||||
if timerFut.finished():
|
apiResponseOr[responseType](requestFut, timerFut,
|
||||||
ApiResponse[responseType].err(
|
|
||||||
"Timeout exceeded while awaiting for the response")
|
"Timeout exceeded while awaiting for the response")
|
||||||
else:
|
|
||||||
if requestFut.failed():
|
|
||||||
ApiResponse[responseType].err($requestFut.error.msg)
|
|
||||||
else:
|
|
||||||
ApiResponse[responseType].ok(
|
|
||||||
Future[responseType](requestFut).read())
|
|
||||||
handlerResponse =
|
handlerResponse =
|
||||||
try:
|
try:
|
||||||
body2
|
body2
|
||||||
|
@ -200,7 +240,7 @@ template firstSuccessParallel*(
|
||||||
except CatchableError:
|
except CatchableError:
|
||||||
raiseAssert("Response handler must not raise exceptions")
|
raiseAssert("Response handler must not raise exceptions")
|
||||||
|
|
||||||
if apiResponse.isOk() and handlerResponse.isOk():
|
if handlerResponse.isOk():
|
||||||
retRes = handlerResponse
|
retRes = handlerResponse
|
||||||
resultReady = true
|
resultReady = true
|
||||||
asyncSpawn lazyWait(pendingNodes, pendingRequests, timerFut,
|
asyncSpawn lazyWait(pendingNodes, pendingRequests, timerFut,
|
||||||
|
@ -213,7 +253,7 @@ template firstSuccessParallel*(
|
||||||
pendingCancel.add(raceFut.cancelAndWait())
|
pendingCancel.add(raceFut.cancelAndWait())
|
||||||
if not(isNil(timerFut)) and not(timerFut.finished()):
|
if not(isNil(timerFut)) and not(timerFut.finished()):
|
||||||
pendingCancel.add(timerFut.cancelAndWait())
|
pendingCancel.add(timerFut.cancelAndWait())
|
||||||
for index, future in pendingRequests.pairs():
|
for future in pendingRequests.items():
|
||||||
if not(future.finished()):
|
if not(future.finished()):
|
||||||
pendingCancel.add(future.cancelAndWait())
|
pendingCancel.add(future.cancelAndWait())
|
||||||
await allFutures(pendingCancel)
|
await allFutures(pendingCancel)
|
||||||
|
@ -236,13 +276,16 @@ template firstSuccessParallel*(
|
||||||
template bestSuccess*(
|
template bestSuccess*(
|
||||||
vc: ValidatorClientRef,
|
vc: ValidatorClientRef,
|
||||||
responseType: typedesc,
|
responseType: typedesc,
|
||||||
|
handlerType: typedesc,
|
||||||
timeout: Duration,
|
timeout: Duration,
|
||||||
statuses: set[RestBeaconNodeStatus],
|
statuses: set[RestBeaconNodeStatus],
|
||||||
roles: set[BeaconNodeRole],
|
roles: set[BeaconNodeRole],
|
||||||
bodyRequest,
|
bodyRequest,
|
||||||
bodyScore: untyped): ApiResponse[responseType] =
|
bodyScore,
|
||||||
var it {.inject.}: RestClientRef
|
bodyHandler: untyped): ApiResponse[handlerType] =
|
||||||
type BodyType = typeof(bodyRequest)
|
var
|
||||||
|
it {.inject.}: RestClientRef
|
||||||
|
iterations = 0
|
||||||
|
|
||||||
var timerFut =
|
var timerFut =
|
||||||
if timeout != InfiniteDuration:
|
if timeout != InfiniteDuration:
|
||||||
|
@ -250,9 +293,22 @@ template bestSuccess*(
|
||||||
else:
|
else:
|
||||||
nil
|
nil
|
||||||
|
|
||||||
|
var
|
||||||
|
retRes: ApiResponse[handlerType]
|
||||||
|
scores: seq[ApiScore]
|
||||||
|
bestResponse: Opt[BestNodeResponse[handlerType]]
|
||||||
|
|
||||||
|
block mainLoop:
|
||||||
|
while true:
|
||||||
let onlineNodes =
|
let onlineNodes =
|
||||||
try:
|
try:
|
||||||
|
if iterations == 0:
|
||||||
|
# We are not going to wait for BNs if there some available.
|
||||||
await vc.waitNodes(timerFut, statuses, roles, false)
|
await vc.waitNodes(timerFut, statuses, roles, false)
|
||||||
|
else:
|
||||||
|
# We get here only, if all the requests are failed. To avoid requests
|
||||||
|
# spam we going to wait for changes in BNs statuses.
|
||||||
|
await vc.waitNodes(timerFut, statuses, roles, true)
|
||||||
vc.filterNodes(statuses, roles)
|
vc.filterNodes(statuses, roles)
|
||||||
except CancelledError as exc:
|
except CancelledError as exc:
|
||||||
if not(isNil(timerFut)) and not(timerFut.finished()):
|
if not(isNil(timerFut)) and not(timerFut.finished()):
|
||||||
|
@ -262,102 +318,141 @@ template bestSuccess*(
|
||||||
# This case could not be happened.
|
# This case could not be happened.
|
||||||
error "Unexpected exception while waiting for beacon nodes",
|
error "Unexpected exception while waiting for beacon nodes",
|
||||||
err_name = $exc.name, err_msg = $exc.msg
|
err_name = $exc.name, err_msg = $exc.msg
|
||||||
var default: seq[BeaconNodeServerRef]
|
default(seq[BeaconNodeServerRef])
|
||||||
default
|
|
||||||
|
|
||||||
if len(onlineNodes) == 0:
|
if len(onlineNodes) == 0:
|
||||||
ApiResponse[responseType].err("No online beacon node(s)")
|
retRes = ApiResponse[handlerType].err("No online beacon node(s)")
|
||||||
|
break mainLoop
|
||||||
else:
|
else:
|
||||||
let
|
var
|
||||||
(pendingRequests, pendingNodes) =
|
(pendingRequests, pendingNodes) =
|
||||||
block:
|
block:
|
||||||
var requests: seq[BodyType]
|
var requests: seq[FutureBase]
|
||||||
var nodes: seq[BeaconNodeServerRef]
|
var nodes: seq[BeaconNodeServerRef]
|
||||||
for node {.inject.} in onlineNodes:
|
for node {.inject.} in onlineNodes:
|
||||||
it = node.client
|
it = node.client
|
||||||
let fut = bodyRequest
|
let fut = FutureBase(bodyRequest)
|
||||||
requests.add(fut)
|
requests.add(fut)
|
||||||
nodes.add(node)
|
nodes.add(node)
|
||||||
(requests, nodes)
|
(requests, nodes)
|
||||||
|
perfectScoreFound = false
|
||||||
|
|
||||||
status =
|
block innerLoop:
|
||||||
|
while len(pendingRequests) > 0:
|
||||||
|
var
|
||||||
|
finishedRequests: seq[FutureBase]
|
||||||
|
finishedNodes: seq[BeaconNodeServerRef]
|
||||||
|
raceFut: Future[FutureBase]
|
||||||
try:
|
try:
|
||||||
|
raceFut = race(pendingRequests)
|
||||||
|
|
||||||
if isNil(timerFut):
|
if isNil(timerFut):
|
||||||
await allFutures(pendingRequests)
|
await raceFut or timerFut
|
||||||
ApiOperation.Success
|
|
||||||
else:
|
else:
|
||||||
let waitFut = allFutures(pendingRequests)
|
await allFutures(raceFut)
|
||||||
discard await race(waitFut, timerFut)
|
|
||||||
if not(waitFut.finished()):
|
for index, future in pendingRequests.pairs():
|
||||||
await waitFut.cancelAndWait()
|
if future.finished() or
|
||||||
ApiOperation.Timeout
|
(not(isNil(timerFut)) and timerFut.finished()):
|
||||||
else:
|
finishedRequests.add(future)
|
||||||
if not(timerFut.finished()):
|
finishedNodes.add(pendingNodes[index])
|
||||||
await timerFut.cancelAndWait()
|
let
|
||||||
ApiOperation.Success
|
node {.inject.} = pendingNodes[index]
|
||||||
|
apiResponse {.inject.} =
|
||||||
|
apiResponseOr[responseType](future, timerFut,
|
||||||
|
"Timeout exceeded while awaiting for the response")
|
||||||
|
handlerResponse =
|
||||||
|
try:
|
||||||
|
bodyHandler
|
||||||
except CancelledError as exc:
|
except CancelledError as exc:
|
||||||
# We should cancel all the pending requests and timer before we return
|
|
||||||
# result.
|
|
||||||
var pendingCancel: seq[Future[void]]
|
|
||||||
for future in pendingRequests:
|
|
||||||
if not(fut.finished()):
|
|
||||||
pendingCancel.add(fut.cancelAndWait())
|
|
||||||
if not(isNil(timerFut)) and not(timerFut.finished()):
|
|
||||||
pendingCancel.add(timerFut.cancelAndWait())
|
|
||||||
await allFutures(pendingCancel)
|
|
||||||
raise exc
|
raise exc
|
||||||
except CatchableError:
|
except CatchableError:
|
||||||
# This should not be happened, because allFutures() and race() did not
|
raiseAssert(
|
||||||
# raise any exceptions.
|
"Response handler must not raise exceptions")
|
||||||
ApiOperation.Failure
|
|
||||||
|
|
||||||
apiResponses {.inject.} =
|
if handlerResponse.isOk():
|
||||||
block:
|
let
|
||||||
var res: seq[ApiNodeResponse[responseType]]
|
itresponse {.inject.} = handlerResponse.get()
|
||||||
for requestFut, pnode in pendingRequests.pairs():
|
score =
|
||||||
let beaconNode = pendingNodes[index]
|
try:
|
||||||
if requestFut.finished():
|
bodyScore
|
||||||
if requestFut.failed():
|
except CancelledError as exc:
|
||||||
let exc = requestFut.readError()
|
raise exc
|
||||||
debug "One of operation requests has been failed",
|
except CatchableError:
|
||||||
node = beaconNode, err_name = $exc.name,
|
raiseAssert("Score handler must not raise exceptions")
|
||||||
err_msg = $exc.msg
|
scores.add(ApiScore.init(node, score))
|
||||||
beaconNode.status.updateStatus(RestBeaconNodeStatus.Offline)
|
if bestResponse.isNone() or
|
||||||
elif future.cancelled():
|
(score > bestResponse.get().score):
|
||||||
debug "One of operation requests has been interrupted",
|
bestResponse = Opt.some(
|
||||||
node = beaconNode
|
BestNodeResponse.init(node, handlerResponse, score))
|
||||||
|
if perfectScore(score):
|
||||||
|
perfectScoreFound = true
|
||||||
|
break
|
||||||
else:
|
else:
|
||||||
res.add(
|
scores.add(ApiScore.init(node))
|
||||||
ApiNodeResponse(
|
|
||||||
node: beaconNode,
|
|
||||||
data: ApiResponse[responseType].ok(future.read())
|
|
||||||
)
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
case status
|
|
||||||
of ApiOperation.Timeout:
|
|
||||||
debug "One of operation requests has been timed out",
|
|
||||||
node = beaconNode
|
|
||||||
pendingNodes[index].status = RestBeaconNodeStatus.Offline
|
|
||||||
of ApiOperation.Success, ApiOperation.Failure,
|
|
||||||
ApiOperation.Interrupt:
|
|
||||||
# This should not be happened, because all Futures should be
|
|
||||||
# finished.
|
|
||||||
debug "One of operation requests failed unexpectedly",
|
|
||||||
node = beaconNode
|
|
||||||
pendingNodes[index].status = RestBeaconNodeStatus.Offline
|
|
||||||
res
|
|
||||||
|
|
||||||
if len(apiResponses) == 0:
|
if perfectScoreFound:
|
||||||
ApiResponse[responseType].err("No successful responses available")
|
# lazyWait will cancel `pendingRequests` on timeout.
|
||||||
|
asyncSpawn lazyWait(pendingNodes, pendingRequests, timerFut,
|
||||||
|
RequestName, strategy)
|
||||||
|
break innerLoop
|
||||||
|
|
||||||
|
if not(isNil(timerFut)) and timerFut.finished():
|
||||||
|
# If timeout is exceeded we need to cancel all the tasks which
|
||||||
|
# are still running.
|
||||||
|
var pendingCancel: seq[Future[void]]
|
||||||
|
for future in pendingRequests.items():
|
||||||
|
if not(future.finished()):
|
||||||
|
pendingCancel.add(future.cancelAndWait())
|
||||||
|
await allFutures(pendingCancel)
|
||||||
|
break innerLoop
|
||||||
|
|
||||||
|
pendingRequests.keepItIf(it notin finishedRequests)
|
||||||
|
pendingNodes.keepItIf(it notin finishedNodes)
|
||||||
|
|
||||||
|
except CancelledError as exc:
|
||||||
|
var pendingCancel: seq[Future[void]]
|
||||||
|
# `or` operation does not cancelling Futures passed as arguments.
|
||||||
|
if not(isNil(raceFut)) and not(raceFut.finished()):
|
||||||
|
pendingCancel.add(raceFut.cancelAndWait())
|
||||||
|
if not(isNil(timerFut)) and not(timerFut.finished()):
|
||||||
|
pendingCancel.add(timerFut.cancelAndWait())
|
||||||
|
# We should cancel all the requests which are still pending.
|
||||||
|
for future in pendingRequests.items():
|
||||||
|
if not(future.finished()):
|
||||||
|
pendingCancel.add(future.cancelAndWait())
|
||||||
|
# Awaiting cancellations.
|
||||||
|
await allFutures(pendingCancel)
|
||||||
|
raise exc
|
||||||
|
except CatchableError as exc:
|
||||||
|
# This should not be happened, because allFutures() and race()
|
||||||
|
# did not raise any exceptions.
|
||||||
|
error "Unexpected exception while processing request",
|
||||||
|
err_name = $exc.name, err_msg = $exc.msg
|
||||||
|
retRes = ApiResponse[handlerType].err("Unexpected error")
|
||||||
|
break mainLoop
|
||||||
|
|
||||||
|
if bestResponse.isSome():
|
||||||
|
retRes = bestResponse.get().data
|
||||||
|
break mainLoop
|
||||||
else:
|
else:
|
||||||
let index = bestScore
|
if timerFut.finished():
|
||||||
if index >= 0:
|
retRes = ApiResponse[handlerType].err(
|
||||||
debug "Operation request result was selected",
|
"Timeout exceeded while awaiting for responses")
|
||||||
node = apiResponses[index].node
|
break mainLoop
|
||||||
apiResponses[index].data
|
|
||||||
else:
|
else:
|
||||||
ApiResponse[responseType].err("Unable to get best response")
|
# When all requests failed
|
||||||
|
discard
|
||||||
|
|
||||||
|
inc(iterations)
|
||||||
|
|
||||||
|
if retRes.isOk():
|
||||||
|
debug "Best score result selected",
|
||||||
|
request = RequestName, available_scores = scores,
|
||||||
|
best_score = shortScore(bestResponse.get().score),
|
||||||
|
best_node = bestResponse.get().node
|
||||||
|
|
||||||
|
retRes
|
||||||
|
|
||||||
template onceToAll*(
|
template onceToAll*(
|
||||||
vc: ValidatorClientRef,
|
vc: ValidatorClientRef,
|
||||||
|
@ -1225,7 +1320,7 @@ proc produceAttestationData*(
|
||||||
var failures: seq[ApiNodeFailure]
|
var failures: seq[ApiNodeFailure]
|
||||||
|
|
||||||
case strategy
|
case strategy
|
||||||
of ApiStrategyKind.First, ApiStrategyKind.Best:
|
of ApiStrategyKind.First:
|
||||||
let res = vc.firstSuccessParallel(
|
let res = vc.firstSuccessParallel(
|
||||||
RestPlainResponse,
|
RestPlainResponse,
|
||||||
ProduceAttestationDataResponse,
|
ProduceAttestationDataResponse,
|
||||||
|
@ -1266,6 +1361,47 @@ proc produceAttestationData*(
|
||||||
raise (ref ValidatorApiError)(msg: res.error, data: failures)
|
raise (ref ValidatorApiError)(msg: res.error, data: failures)
|
||||||
return res.get().data
|
return res.get().data
|
||||||
|
|
||||||
|
of ApiStrategyKind.Best:
|
||||||
|
let res = vc.bestSuccess(
|
||||||
|
RestPlainResponse,
|
||||||
|
ProduceAttestationDataResponse,
|
||||||
|
OneThirdDuration,
|
||||||
|
ViableNodeStatus,
|
||||||
|
{BeaconNodeRole.AttestationData},
|
||||||
|
produceAttestationDataPlain(it, slot, committee_index),
|
||||||
|
getAttestationDataScore(vc, itresponse)):
|
||||||
|
if apiResponse.isErr():
|
||||||
|
handleCommunicationError()
|
||||||
|
ApiResponse[ProduceAttestationDataResponse].err(apiResponse.error)
|
||||||
|
else:
|
||||||
|
let response = apiResponse.get()
|
||||||
|
case response.status
|
||||||
|
of 200:
|
||||||
|
let res = decodeBytes(ProduceAttestationDataResponse, response.data,
|
||||||
|
response.contentType)
|
||||||
|
if res.isErr():
|
||||||
|
handleUnexpectedData()
|
||||||
|
ApiResponse[ProduceAttestationDataResponse].err($res.error)
|
||||||
|
else:
|
||||||
|
ApiResponse[ProduceAttestationDataResponse].ok(res.get())
|
||||||
|
of 400:
|
||||||
|
handle400()
|
||||||
|
ApiResponse[ProduceAttestationDataResponse].err(ResponseInvalidError)
|
||||||
|
of 500:
|
||||||
|
handle500()
|
||||||
|
ApiResponse[ProduceAttestationDataResponse].err(ResponseInternalError)
|
||||||
|
of 503:
|
||||||
|
handle503()
|
||||||
|
ApiResponse[ProduceAttestationDataResponse].err(
|
||||||
|
ResponseNoSyncError)
|
||||||
|
else:
|
||||||
|
handleUnexpectedCode()
|
||||||
|
ApiResponse[ProduceAttestationDataResponse].err(
|
||||||
|
ResponseUnexpectedError)
|
||||||
|
if res.isErr():
|
||||||
|
raise (ref ValidatorApiError)(msg: res.error, data: failures)
|
||||||
|
return res.get().data
|
||||||
|
|
||||||
of ApiStrategyKind.Priority:
|
of ApiStrategyKind.Priority:
|
||||||
vc.firstSuccessSequential(
|
vc.firstSuccessSequential(
|
||||||
RestPlainResponse,
|
RestPlainResponse,
|
||||||
|
|
|
@ -208,6 +208,7 @@ type
|
||||||
dynamicFeeRecipientsStore*: ref DynamicFeeRecipientsStore
|
dynamicFeeRecipientsStore*: ref DynamicFeeRecipientsStore
|
||||||
validatorsRegCache*: Table[ValidatorPubKey, SignedValidatorRegistrationV1]
|
validatorsRegCache*: Table[ValidatorPubKey, SignedValidatorRegistrationV1]
|
||||||
blocksSeen*: Table[Slot, BlockDataItem]
|
blocksSeen*: Table[Slot, BlockDataItem]
|
||||||
|
rootsSeen*: Table[Eth2Digest, Slot]
|
||||||
processingDelay*: Opt[Duration]
|
processingDelay*: Opt[Duration]
|
||||||
rng*: ref HmacDrbgContext
|
rng*: ref HmacDrbgContext
|
||||||
|
|
||||||
|
@ -1207,23 +1208,25 @@ proc expectBlock*(vc: ValidatorClientRef, slot: Slot,
|
||||||
if not(retFuture.finished()): retFuture.cancelCallback = cancellation
|
if not(retFuture.finished()): retFuture.cancelCallback = cancellation
|
||||||
retFuture
|
retFuture
|
||||||
|
|
||||||
proc registerBlock*(vc: ValidatorClientRef, data: EventBeaconBlockObject,
|
proc registerBlock*(vc: ValidatorClientRef, eblck: EventBeaconBlockObject,
|
||||||
node: BeaconNodeServerRef) =
|
node: BeaconNodeServerRef) =
|
||||||
let
|
let
|
||||||
wallTime = vc.beaconClock.now()
|
wallTime = vc.beaconClock.now()
|
||||||
delay = wallTime - data.slot.start_beacon_time()
|
delay = wallTime - eblck.slot.start_beacon_time()
|
||||||
|
|
||||||
debug "Block received", slot = data.slot,
|
debug "Block received", slot = eblck.slot,
|
||||||
block_root = shortLog(data.block_root), optimistic = data.optimistic,
|
block_root = shortLog(eblck.block_root), optimistic = eblck.optimistic,
|
||||||
node = node, delay = delay
|
node = node, delay = delay
|
||||||
|
|
||||||
proc scheduleCallbacks(data: var BlockDataItem,
|
proc scheduleCallbacks(data: var BlockDataItem,
|
||||||
blck: EventBeaconBlockObject) =
|
blck: EventBeaconBlockObject) =
|
||||||
|
vc.rootsSeen[blck.block_root] = blck.slot
|
||||||
data.blocks.add(blck.block_root)
|
data.blocks.add(blck.block_root)
|
||||||
for mitem in data.waiters.mitems():
|
for mitem in data.waiters.mitems():
|
||||||
if mitem.count >= len(data.blocks):
|
if mitem.count >= len(data.blocks):
|
||||||
if not(mitem.future.finished()): mitem.future.complete(data.blocks)
|
if not(mitem.future.finished()): mitem.future.complete(data.blocks)
|
||||||
vc.blocksSeen.mgetOrPut(data.slot, BlockDataItem()).scheduleCallbacks(data)
|
|
||||||
|
vc.blocksSeen.mgetOrPut(eblck.slot, BlockDataItem()).scheduleCallbacks(eblck)
|
||||||
|
|
||||||
proc pruneBlocksSeen*(vc: ValidatorClientRef, epoch: Epoch) =
|
proc pruneBlocksSeen*(vc: ValidatorClientRef, epoch: Epoch) =
|
||||||
var blocksSeen: Table[Slot, BlockDataItem]
|
var blocksSeen: Table[Slot, BlockDataItem]
|
||||||
|
@ -1231,6 +1234,7 @@ proc pruneBlocksSeen*(vc: ValidatorClientRef, epoch: Epoch) =
|
||||||
if (slot.epoch() + HISTORICAL_DUTIES_EPOCHS) >= epoch:
|
if (slot.epoch() + HISTORICAL_DUTIES_EPOCHS) >= epoch:
|
||||||
blocksSeen[slot] = item
|
blocksSeen[slot] = item
|
||||||
else:
|
else:
|
||||||
|
for root in item.blocks: vc.rootsSeen.del(root)
|
||||||
let blockRoot =
|
let blockRoot =
|
||||||
if len(item.blocks) == 0:
|
if len(item.blocks) == 0:
|
||||||
"<missing>"
|
"<missing>"
|
||||||
|
|
|
@ -0,0 +1,49 @@
|
||||||
|
# beacon_chain
|
||||||
|
# Copyright (c) 2023 Status Research & Development GmbH
|
||||||
|
# Licensed and distributed under either of
|
||||||
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||||
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||||
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||||
|
|
||||||
|
import std/strutils
|
||||||
|
import "."/common
|
||||||
|
|
||||||
|
{.push raises: [].}
|
||||||
|
|
||||||
|
func perfectScore*(score: float64): bool =
|
||||||
|
score == Inf
|
||||||
|
|
||||||
|
proc shortScore*(score: float64): string =
|
||||||
|
if score == Inf: "<perfect>" else: formatFloat(score, ffDecimal, 4)
|
||||||
|
|
||||||
|
proc getAttestationDataScore*(rootsSeen: Table[Eth2Digest, Slot],
|
||||||
|
adata: ProduceAttestationDataResponse): float64 =
|
||||||
|
let
|
||||||
|
slot = rootsSeen.getOrDefault(
|
||||||
|
adata.data.beacon_block_root, FAR_FUTURE_SLOT)
|
||||||
|
|
||||||
|
let res =
|
||||||
|
if (slot == adata.data.slot) and
|
||||||
|
(adata.data.source.epoch + 1 == adata.data.target.epoch):
|
||||||
|
# Perfect score
|
||||||
|
Inf
|
||||||
|
else:
|
||||||
|
let score = float64(adata.data.source.epoch) +
|
||||||
|
float64(adata.data.target.epoch)
|
||||||
|
if slot == FAR_FUTURE_SLOT:
|
||||||
|
score
|
||||||
|
else:
|
||||||
|
if adata.data.slot + 1 == slot:
|
||||||
|
# To avoid `DivizionByZero` defect.
|
||||||
|
score
|
||||||
|
else:
|
||||||
|
score + float64(1) / (float64(adata.data.slot) + float64(1) -
|
||||||
|
float64(slot))
|
||||||
|
|
||||||
|
debug "Attestation score", attestation_data = shortLog(adata.data),
|
||||||
|
block_slot = slot, score = shortScore(res)
|
||||||
|
res
|
||||||
|
|
||||||
|
proc getAttestationDataScore*(vc: ValidatorClientRef,
|
||||||
|
adata: ProduceAttestationDataResponse): float64 =
|
||||||
|
getAttestationDataScore(vc.rootsSeen, adata)
|
|
@ -10,7 +10,7 @@
|
||||||
|
|
||||||
import std/strutils
|
import std/strutils
|
||||||
import unittest2
|
import unittest2
|
||||||
import ../beacon_chain/validator_client/common
|
import ../beacon_chain/validator_client/[common, scoring]
|
||||||
|
|
||||||
const
|
const
|
||||||
HostNames = [
|
HostNames = [
|
||||||
|
@ -143,9 +143,110 @@ const
|
||||||
("", "err(Missing hostname)")
|
("", "err(Missing hostname)")
|
||||||
]
|
]
|
||||||
|
|
||||||
|
type
|
||||||
|
AttestationDataTuple* = tuple[
|
||||||
|
slot: uint64,
|
||||||
|
index: uint64,
|
||||||
|
beacon_block_root: string,
|
||||||
|
source: uint64,
|
||||||
|
target: uint64
|
||||||
|
]
|
||||||
|
|
||||||
|
const
|
||||||
|
AttestationDataVectors = [
|
||||||
|
# Attestation score with block monitoring enabled (perfect).
|
||||||
|
((6002798'u64, 10'u64, "22242212", 187586'u64, 187587'u64),
|
||||||
|
("22242212", 6002798'u64), "<perfect>"),
|
||||||
|
((6002811'u64, 24'u64, "26ec78d6", 187586'u64, 187587'u64),
|
||||||
|
("26ec78d6", 6002811'u64), "<perfect>"),
|
||||||
|
((6002821'u64, 11'u64, "10c6d1a2", 187587'u64, 187588'u64),
|
||||||
|
("10c6d1a2", 6002821'u64), "<perfect>"),
|
||||||
|
((6002836'u64, 15'u64, "42354ded", 187587'u64, 187588'u64),
|
||||||
|
("42354ded", 6002836'u64), "<perfect>"),
|
||||||
|
((6002859'u64, 10'u64, "97d8ac69", 187588'u64, 187589'u64),
|
||||||
|
("97d8ac69", 6002859'u64), "<perfect>"),
|
||||||
|
# Attestation score with block monitoring enabled #1 (not perfect).
|
||||||
|
((6002871'u64, 25'u64, "524a9e2b", 187588'u64, 187589'u64),
|
||||||
|
("524a9e2b", 6002870'u64), "375177.5000"),
|
||||||
|
((6002871'u64, 25'u64, "524a9e2b", 187588'u64, 187589'u64),
|
||||||
|
("524a9e2b", 6002869'u64), "375177.3333"),
|
||||||
|
((6002871'u64, 25'u64, "524a9e2b", 187588'u64, 187589'u64),
|
||||||
|
("524a9e2b", 6002868'u64), "375177.2500"),
|
||||||
|
((6002871'u64, 25'u64, "524a9e2b", 187588'u64, 187589'u64),
|
||||||
|
("524a9e2b", 6002867'u64), "375177.2000"),
|
||||||
|
((6002871'u64, 25'u64, "524a9e2b", 187588'u64, 187589'u64),
|
||||||
|
("524a9e2b", 6002866'u64), "375177.1667"),
|
||||||
|
# Attestation score with block monitoring enabled #2 (not perfect).
|
||||||
|
((6002962'u64, 14'u64, "22a19d87", 187591'u64, 187592'u64),
|
||||||
|
("22a19d87", 6002961'u64), "375183.5000"),
|
||||||
|
((6002962'u64, 14'u64, "22a19d87", 187591'u64, 187592'u64),
|
||||||
|
("22a19d87", 6002960'u64), "375183.3333"),
|
||||||
|
((6002962'u64, 14'u64, "22a19d87", 187591'u64, 187592'u64),
|
||||||
|
("22a19d87", 6002959'u64), "375183.2500"),
|
||||||
|
((6002962'u64, 14'u64, "22a19d87", 187591'u64, 187592'u64),
|
||||||
|
("22a19d87", 6002958'u64), "375183.2000"),
|
||||||
|
((6002962'u64, 14'u64, "22a19d87", 187591'u64, 187592'u64),
|
||||||
|
("22a19d87", 6002957'u64), "375183.1667"),
|
||||||
|
# Attestation score with block monitoring disabled #1.
|
||||||
|
((6003217'u64, 52'u64, "5e945218", 187599'u64, 187600'u64),
|
||||||
|
("00000000", 0'u64), "375199.0000"),
|
||||||
|
((6003217'u64, 52'u64, "5e945218", 187598'u64, 187600'u64),
|
||||||
|
("00000000", 0'u64), "375198.0000"),
|
||||||
|
((6003217'u64, 52'u64, "5e945218", 187597'u64, 187600'u64),
|
||||||
|
("00000000", 0'u64), "375197.0000"),
|
||||||
|
((6003217'u64, 52'u64, "5e945218", 187596'u64, 187600'u64),
|
||||||
|
("00000000", 0'u64), "375196.0000"),
|
||||||
|
((6003217'u64, 52'u64, "5e945218", 187595'u64, 187600'u64),
|
||||||
|
("00000000", 0'u64), "375195.0000"),
|
||||||
|
# Attestation score with block monitoring disabled #2.
|
||||||
|
((6003257'u64, 9'u64, "7bfa464e", 187600'u64, 187601'u64),
|
||||||
|
("00000000", 0'u64), "375201.0000"),
|
||||||
|
((6003257'u64, 9'u64, "7bfa464e", 187599'u64, 187601'u64),
|
||||||
|
("00000000", 0'u64), "375200.0000"),
|
||||||
|
((6003257'u64, 9'u64, "7bfa464e", 187598'u64, 187601'u64),
|
||||||
|
("00000000", 0'u64), "375199.0000"),
|
||||||
|
((6003257'u64, 9'u64, "7bfa464e", 187597'u64, 187601'u64),
|
||||||
|
("00000000", 0'u64), "375198.0000"),
|
||||||
|
((6003257'u64, 9'u64, "7bfa464e", 187596'u64, 187601'u64),
|
||||||
|
("00000000", 0'u64), "375197.0000"),
|
||||||
|
]
|
||||||
|
|
||||||
|
proc init(t: typedesc[Eth2Digest], data: string): Eth2Digest =
|
||||||
|
let length = len(data)
|
||||||
|
var dst = Eth2Digest()
|
||||||
|
try:
|
||||||
|
hexToByteArray(data.toOpenArray(0, len(data) - 1),
|
||||||
|
dst.data.toOpenArray(0, (length div 2) - 1))
|
||||||
|
except ValueError:
|
||||||
|
discard
|
||||||
|
dst
|
||||||
|
|
||||||
|
proc init*(t: typedesc[ProduceAttestationDataResponse],
|
||||||
|
ad: AttestationDataTuple): ProduceAttestationDataResponse =
|
||||||
|
ProduceAttestationDataResponse(data: AttestationData(
|
||||||
|
slot: Slot(ad.slot), index: ad.index,
|
||||||
|
beacon_block_root: Eth2Digest.init(ad.beacon_block_root),
|
||||||
|
source: Checkpoint(epoch: Epoch(ad.source)),
|
||||||
|
target: Checkpoint(epoch: Epoch(ad.target))
|
||||||
|
))
|
||||||
|
|
||||||
|
proc createRootsSeen(
|
||||||
|
root: tuple[root: string, slot: uint64]): Table[Eth2Digest, Slot] =
|
||||||
|
var res: Table[Eth2Digest, Slot]
|
||||||
|
res[Eth2Digest.init(root.root)] = Slot(root.slot)
|
||||||
|
res
|
||||||
|
|
||||||
suite "Validator Client test suite":
|
suite "Validator Client test suite":
|
||||||
test "normalizeUri() test vectors":
|
test "normalizeUri() test vectors":
|
||||||
for hostname in HostNames:
|
for hostname in HostNames:
|
||||||
for vector in GoodTestVectors:
|
for vector in GoodTestVectors:
|
||||||
let expect = vector[1] % (hostname)
|
let expect = vector[1] % (hostname)
|
||||||
check $normalizeUri(parseUri(vector[0] % (hostname))) == expect
|
check $normalizeUri(parseUri(vector[0] % (hostname))) == expect
|
||||||
|
|
||||||
|
test "getAttestationDataScore() test vectors":
|
||||||
|
for vector in AttestationDataVectors:
|
||||||
|
let
|
||||||
|
adata = ProduceAttestationDataResponse.init(vector[0])
|
||||||
|
roots = createRootsSeen(vector[1])
|
||||||
|
score = shortScore(roots.getAttestationDataScore(adata))
|
||||||
|
check score == vector[2]
|
||||||
|
|
Loading…
Reference in New Issue