mirror of
https://github.com/status-im/nimbus-eth2.git
synced 2025-01-13 15:55:20 +00:00
AsyncEventBus missing attestations fix. (#3664)
This commit is contained in:
parent
b0e46686f1
commit
5e7b28cd66
@ -37,6 +37,15 @@ export
|
||||
type
|
||||
RpcServer* = RpcHttpServer
|
||||
|
||||
EventBus* = object
|
||||
blocksQueue*: AsyncEventQueue[ForkedTrustedSignedBeaconBlock]
|
||||
headQueue*: AsyncEventQueue[HeadChangeInfoObject]
|
||||
reorgQueue*: AsyncEventQueue[ReorgInfoObject]
|
||||
attestQueue*: AsyncEventQueue[Attestation]
|
||||
contribQueue*: AsyncEventQueue[SignedContributionAndProof]
|
||||
exitQueue*: AsyncEventQueue[SignedVoluntaryExit]
|
||||
finalQueue*: AsyncEventQueue[FinalizationInfoObject]
|
||||
|
||||
BeaconNode* = ref object
|
||||
nickname*: string
|
||||
graffitiBytes*: GraffitiBytes
|
||||
@ -57,7 +66,7 @@ type
|
||||
restServer*: RestServerRef
|
||||
keymanagerServer*: RestServerRef
|
||||
keymanagerToken*: Option[string]
|
||||
eventBus*: AsyncEventBus
|
||||
eventBus*: EventBus
|
||||
vcProcess*: Process
|
||||
requestManager*: RequestManager
|
||||
syncManager*: SyncManager[Peer, PeerId]
|
||||
|
@ -149,17 +149,17 @@ proc loadChainDag(
|
||||
config: BeaconNodeConf,
|
||||
cfg: RuntimeConfig,
|
||||
db: BeaconChainDB,
|
||||
eventBus: AsyncEventBus,
|
||||
eventBus: EventBus,
|
||||
validatorMonitor: ref ValidatorMonitor,
|
||||
networkGenesisValidatorsRoot: Option[Eth2Digest]): ChainDAGRef =
|
||||
info "Loading block DAG from database", path = config.databaseDir
|
||||
|
||||
proc onBlockAdded(data: ForkedTrustedSignedBeaconBlock) =
|
||||
eventBus.emit("signed-beacon-block", data)
|
||||
eventBus.blocksQueue.emit(data)
|
||||
proc onHeadChanged(data: HeadChangeInfoObject) =
|
||||
eventBus.emit("head-change", data)
|
||||
eventBus.headQueue.emit(data)
|
||||
proc onChainReorg(data: ReorgInfoObject) =
|
||||
eventBus.emit("chain-reorg", data)
|
||||
eventBus.reorgQueue.emit(data)
|
||||
proc onLightClientFinalityUpdate(data: altair.LightClientFinalityUpdate) =
|
||||
discard
|
||||
proc onLightClientOptimisticUpdate(data: altair.LightClientOptimisticUpdate) =
|
||||
@ -220,17 +220,17 @@ proc initFullNode(
|
||||
template config(): auto = node.config
|
||||
|
||||
proc onAttestationReceived(data: Attestation) =
|
||||
node.eventBus.emit("attestation-received", data)
|
||||
node.eventBus.attestQueue.emit(data)
|
||||
proc onSyncContribution(data: SignedContributionAndProof) =
|
||||
node.eventBus.emit("sync-contribution-and-proof", data)
|
||||
node.eventBus.contribQueue.emit(data)
|
||||
proc onVoluntaryExitAdded(data: SignedVoluntaryExit) =
|
||||
node.eventBus.emit("voluntary-exit", data)
|
||||
node.eventBus.exitQueue.emit(data)
|
||||
proc makeOnFinalizationCb(
|
||||
# This `nimcall` functions helps for keeping track of what
|
||||
# needs to be captured by the onFinalization closure.
|
||||
eventBus: AsyncEventBus,
|
||||
eventBus: EventBus,
|
||||
eth1Monitor: Eth1Monitor): OnFinalizedCallback {.nimcall.} =
|
||||
static: doAssert (eventBus is ref) and (eth1Monitor is ref)
|
||||
static: doAssert (eth1Monitor is ref)
|
||||
return proc(dag: ChainDAGRef, data: FinalizationInfoObject) =
|
||||
if eth1Monitor != nil:
|
||||
let finalizedEpochRef = dag.getFinalizedEpochRef()
|
||||
@ -238,7 +238,7 @@ proc initFullNode(
|
||||
finalizedEpochRef.eth1_data,
|
||||
finalizedEpochRef.eth1_deposit_index)
|
||||
node.updateLightClientFromDag()
|
||||
eventBus.emit("finalization", data)
|
||||
eventBus.finalQueue.emit(data)
|
||||
|
||||
func getLocalHeadSlot(): Slot =
|
||||
dag.head.slot
|
||||
@ -370,7 +370,15 @@ proc init*(T: type BeaconNode,
|
||||
raise newException(Defect, "Failure in taskpool initialization.")
|
||||
|
||||
let
|
||||
eventBus = newAsyncEventBus()
|
||||
eventBus = EventBus(
|
||||
blocksQueue: newAsyncEventQueue[ForkedTrustedSignedBeaconBlock](),
|
||||
headQueue: newAsyncEventQueue[HeadChangeInfoObject](),
|
||||
reorgQueue: newAsyncEventQueue[ReorgInfoObject](),
|
||||
attestQueue: newAsyncEventQueue[Attestation](),
|
||||
contribQueue: newAsyncEventQueue[SignedContributionAndProof](),
|
||||
exitQueue: newAsyncEventQueue[SignedVoluntaryExit](),
|
||||
finalQueue: newAsyncEventQueue[FinalizationInfoObject]()
|
||||
)
|
||||
db = BeaconChainDB.new(config.databaseDir, inMemory = false)
|
||||
|
||||
var
|
||||
|
@ -53,46 +53,56 @@ proc validateEventTopics(events: seq[EventTopic]): Result[EventTopics,
|
||||
else:
|
||||
ok(res)
|
||||
|
||||
proc eventHandler*(response: HttpResponseRef, node: BeaconNode,
|
||||
T: typedesc, event: string,
|
||||
serverEvent: string) {.async.} =
|
||||
var fut = node.eventBus.waitEvent(T, event)
|
||||
proc eventHandler*[T](response: HttpResponseRef,
|
||||
eventQueue: AsyncEventQueue[T],
|
||||
serverEvent: string) {.async.} =
|
||||
var empty: seq[T]
|
||||
let key = eventQueue.register()
|
||||
|
||||
while true:
|
||||
let jsonRes =
|
||||
var exitLoop = false
|
||||
|
||||
let events =
|
||||
try:
|
||||
let res = await fut
|
||||
let res = await eventQueue.waitEvents(key)
|
||||
res
|
||||
except CancelledError:
|
||||
empty
|
||||
|
||||
for event in events:
|
||||
let jsonRes =
|
||||
when T is ForkedTrustedSignedBeaconBlock:
|
||||
let blockInfo = RestBlockInfo.init(res)
|
||||
some(RestApiResponse.prepareJsonStringResponse(blockInfo))
|
||||
let blockInfo = RestBlockInfo.init(event)
|
||||
RestApiResponse.prepareJsonStringResponse(blockInfo)
|
||||
else:
|
||||
some(RestApiResponse.prepareJsonStringResponse(res))
|
||||
except CancelledError:
|
||||
none[string]()
|
||||
if jsonRes.isNone() or (response.state != HttpResponseState.Sending):
|
||||
# Cancellation happened or connection with remote peer has been lost.
|
||||
break
|
||||
# Initiating new event waiting to avoid race conditions and event misses.
|
||||
fut = node.eventBus.waitEvent(T, event)
|
||||
# Sending event and payload over wire.
|
||||
let exitLoop =
|
||||
try:
|
||||
await response.sendEvent(serverEvent, jsonRes.get())
|
||||
false
|
||||
except CancelledError:
|
||||
true
|
||||
except HttpError as exc:
|
||||
debug "Unable to deliver event to remote peer", error_name = $exc.name,
|
||||
error_msg = $exc.msg
|
||||
true
|
||||
except CatchableError as exc:
|
||||
debug "Unexpected error encountered", error_name = $exc.name,
|
||||
error_msg = $exc.msg
|
||||
true
|
||||
if exitLoop:
|
||||
if not(fut.finished()):
|
||||
await fut.cancelAndWait()
|
||||
RestApiResponse.prepareJsonStringResponse(event)
|
||||
|
||||
exitLoop =
|
||||
if response.state != HttpResponseState.Sending:
|
||||
true
|
||||
else:
|
||||
try:
|
||||
await response.sendEvent(serverEvent, jsonRes)
|
||||
false
|
||||
except CancelledError:
|
||||
true
|
||||
except HttpError as exc:
|
||||
debug "Unable to deliver event to remote peer",
|
||||
error_name = $exc.name, error_msg = $exc.msg
|
||||
true
|
||||
except CatchableError as exc:
|
||||
debug "Unexpected error encountered, while trying to deliver event",
|
||||
error_name = $exc.name, error_msg = $exc.msg
|
||||
true
|
||||
|
||||
if exitLoop:
|
||||
break
|
||||
|
||||
if exitLoop or len(events) == 0:
|
||||
break
|
||||
|
||||
eventQueue.unregister(key)
|
||||
|
||||
proc installEventApiHandlers*(router: var RestRouter, node: BeaconNode) =
|
||||
# https://ethereum.github.io/beacon-APIs/#/Events/eventstream
|
||||
router.api(MethodGet, "/eth/v1/events") do (
|
||||
@ -127,36 +137,31 @@ proc installEventApiHandlers*(router: var RestRouter, node: BeaconNode) =
|
||||
block:
|
||||
var res: seq[Future[void]]
|
||||
if EventTopic.Head in eventTopics:
|
||||
let handler = response.eventHandler(node, HeadChangeInfoObject,
|
||||
"head-change", "head")
|
||||
let handler = response.eventHandler(node.eventBus.headQueue,
|
||||
"head")
|
||||
res.add(handler)
|
||||
if EventTopic.Block in eventTopics:
|
||||
let handler = response.eventHandler(node,
|
||||
ForkedTrustedSignedBeaconBlock,
|
||||
"signed-beacon-block", "block")
|
||||
let handler = response.eventHandler(node.eventBus.blocksQueue,
|
||||
"block")
|
||||
res.add(handler)
|
||||
if EventTopic.Attestation in eventTopics:
|
||||
let handler = response.eventHandler(node, Attestation,
|
||||
"attestation-received",
|
||||
let handler = response.eventHandler(node.eventBus.attestQueue,
|
||||
"attestation")
|
||||
res.add(handler)
|
||||
if EventTopic.VoluntaryExit in eventTopics:
|
||||
let handler = response.eventHandler(node, SignedVoluntaryExit,
|
||||
"voluntary-exit",
|
||||
let handler = response.eventHandler(node.eventBus.exitQueue,
|
||||
"voluntary_exit")
|
||||
res.add(handler)
|
||||
if EventTopic.FinalizedCheckpoint in eventTopics:
|
||||
let handler = response.eventHandler(node, FinalizationInfoObject,
|
||||
"finalization",
|
||||
let handler = response.eventHandler(node.eventBus.finalQueue,
|
||||
"finalized_checkpoint")
|
||||
res.add(handler)
|
||||
if EventTopic.ChainReorg in eventTopics:
|
||||
let handler = response.eventHandler(node, ReorgInfoObject,
|
||||
"chain-reorg", "chain_reorg")
|
||||
let handler = response.eventHandler(node.eventBus.reorgQueue,
|
||||
"chain_reorg")
|
||||
res.add(handler)
|
||||
if EventTopic.ContributionAndProof in eventTopics:
|
||||
let handler = response.eventHandler(node, SignedContributionAndProof,
|
||||
"sync-contribution-and-proof",
|
||||
let handler = response.eventHandler(node.eventBus.contribQueue,
|
||||
"contribution_and_proof")
|
||||
res.add(handler)
|
||||
res
|
||||
|
2
vendor/nim-chronos
vendored
2
vendor/nim-chronos
vendored
@ -1 +1 @@
|
||||
Subproject commit b3548583fcc768d93654685e7ea55126c1752c29
|
||||
Subproject commit 61fbbc551208aca182ff810661bdf37b08a377cd
|
Loading…
x
Reference in New Issue
Block a user