diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index 50d007a52..a505616e4 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -37,6 +37,15 @@ export type RpcServer* = RpcHttpServer + EventBus* = object + blocksQueue*: AsyncEventQueue[ForkedTrustedSignedBeaconBlock] + headQueue*: AsyncEventQueue[HeadChangeInfoObject] + reorgQueue*: AsyncEventQueue[ReorgInfoObject] + attestQueue*: AsyncEventQueue[Attestation] + contribQueue*: AsyncEventQueue[SignedContributionAndProof] + exitQueue*: AsyncEventQueue[SignedVoluntaryExit] + finalQueue*: AsyncEventQueue[FinalizationInfoObject] + BeaconNode* = ref object nickname*: string graffitiBytes*: GraffitiBytes @@ -57,7 +66,7 @@ type restServer*: RestServerRef keymanagerServer*: RestServerRef keymanagerToken*: Option[string] - eventBus*: AsyncEventBus + eventBus*: EventBus vcProcess*: Process requestManager*: RequestManager syncManager*: SyncManager[Peer, PeerId] diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index ada70428b..583df70f0 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -149,17 +149,17 @@ proc loadChainDag( config: BeaconNodeConf, cfg: RuntimeConfig, db: BeaconChainDB, - eventBus: AsyncEventBus, + eventBus: EventBus, validatorMonitor: ref ValidatorMonitor, networkGenesisValidatorsRoot: Option[Eth2Digest]): ChainDAGRef = info "Loading block DAG from database", path = config.databaseDir proc onBlockAdded(data: ForkedTrustedSignedBeaconBlock) = - eventBus.emit("signed-beacon-block", data) + eventBus.blocksQueue.emit(data) proc onHeadChanged(data: HeadChangeInfoObject) = - eventBus.emit("head-change", data) + eventBus.headQueue.emit(data) proc onChainReorg(data: ReorgInfoObject) = - eventBus.emit("chain-reorg", data) + eventBus.reorgQueue.emit(data) proc onLightClientFinalityUpdate(data: altair.LightClientFinalityUpdate) = discard proc onLightClientOptimisticUpdate(data: altair.LightClientOptimisticUpdate) = @@ -220,17 +220,17 @@ proc initFullNode( template config(): auto = node.config proc onAttestationReceived(data: Attestation) = - node.eventBus.emit("attestation-received", data) + node.eventBus.attestQueue.emit(data) proc onSyncContribution(data: SignedContributionAndProof) = - node.eventBus.emit("sync-contribution-and-proof", data) + node.eventBus.contribQueue.emit(data) proc onVoluntaryExitAdded(data: SignedVoluntaryExit) = - node.eventBus.emit("voluntary-exit", data) + node.eventBus.exitQueue.emit(data) proc makeOnFinalizationCb( # This `nimcall` functions helps for keeping track of what # needs to be captured by the onFinalization closure. - eventBus: AsyncEventBus, + eventBus: EventBus, eth1Monitor: Eth1Monitor): OnFinalizedCallback {.nimcall.} = - static: doAssert (eventBus is ref) and (eth1Monitor is ref) + static: doAssert (eth1Monitor is ref) return proc(dag: ChainDAGRef, data: FinalizationInfoObject) = if eth1Monitor != nil: let finalizedEpochRef = dag.getFinalizedEpochRef() @@ -238,7 +238,7 @@ proc initFullNode( finalizedEpochRef.eth1_data, finalizedEpochRef.eth1_deposit_index) node.updateLightClientFromDag() - eventBus.emit("finalization", data) + eventBus.finalQueue.emit(data) func getLocalHeadSlot(): Slot = dag.head.slot @@ -370,7 +370,15 @@ proc init*(T: type BeaconNode, raise newException(Defect, "Failure in taskpool initialization.") let - eventBus = newAsyncEventBus() + eventBus = EventBus( + blocksQueue: newAsyncEventQueue[ForkedTrustedSignedBeaconBlock](), + headQueue: newAsyncEventQueue[HeadChangeInfoObject](), + reorgQueue: newAsyncEventQueue[ReorgInfoObject](), + attestQueue: newAsyncEventQueue[Attestation](), + contribQueue: newAsyncEventQueue[SignedContributionAndProof](), + exitQueue: newAsyncEventQueue[SignedVoluntaryExit](), + finalQueue: newAsyncEventQueue[FinalizationInfoObject]() + ) db = BeaconChainDB.new(config.databaseDir, inMemory = false) var diff --git a/beacon_chain/rpc/rest_event_api.nim b/beacon_chain/rpc/rest_event_api.nim index 9086156c6..e44991472 100644 --- a/beacon_chain/rpc/rest_event_api.nim +++ b/beacon_chain/rpc/rest_event_api.nim @@ -53,46 +53,56 @@ proc validateEventTopics(events: seq[EventTopic]): Result[EventTopics, else: ok(res) -proc eventHandler*(response: HttpResponseRef, node: BeaconNode, - T: typedesc, event: string, - serverEvent: string) {.async.} = - var fut = node.eventBus.waitEvent(T, event) +proc eventHandler*[T](response: HttpResponseRef, + eventQueue: AsyncEventQueue[T], + serverEvent: string) {.async.} = + var empty: seq[T] + let key = eventQueue.register() + while true: - let jsonRes = + var exitLoop = false + + let events = try: - let res = await fut + let res = await eventQueue.waitEvents(key) + res + except CancelledError: + empty + + for event in events: + let jsonRes = when T is ForkedTrustedSignedBeaconBlock: - let blockInfo = RestBlockInfo.init(res) - some(RestApiResponse.prepareJsonStringResponse(blockInfo)) + let blockInfo = RestBlockInfo.init(event) + RestApiResponse.prepareJsonStringResponse(blockInfo) else: - some(RestApiResponse.prepareJsonStringResponse(res)) - except CancelledError: - none[string]() - if jsonRes.isNone() or (response.state != HttpResponseState.Sending): - # Cancellation happened or connection with remote peer has been lost. - break - # Initiating new event waiting to avoid race conditions and event misses. - fut = node.eventBus.waitEvent(T, event) - # Sending event and payload over wire. - let exitLoop = - try: - await response.sendEvent(serverEvent, jsonRes.get()) - false - except CancelledError: - true - except HttpError as exc: - debug "Unable to deliver event to remote peer", error_name = $exc.name, - error_msg = $exc.msg - true - except CatchableError as exc: - debug "Unexpected error encountered", error_name = $exc.name, - error_msg = $exc.msg - true - if exitLoop: - if not(fut.finished()): - await fut.cancelAndWait() + RestApiResponse.prepareJsonStringResponse(event) + + exitLoop = + if response.state != HttpResponseState.Sending: + true + else: + try: + await response.sendEvent(serverEvent, jsonRes) + false + except CancelledError: + true + except HttpError as exc: + debug "Unable to deliver event to remote peer", + error_name = $exc.name, error_msg = $exc.msg + true + except CatchableError as exc: + debug "Unexpected error encountered, while trying to deliver event", + error_name = $exc.name, error_msg = $exc.msg + true + + if exitLoop: + break + + if exitLoop or len(events) == 0: break + eventQueue.unregister(key) + proc installEventApiHandlers*(router: var RestRouter, node: BeaconNode) = # https://ethereum.github.io/beacon-APIs/#/Events/eventstream router.api(MethodGet, "/eth/v1/events") do ( @@ -127,36 +137,31 @@ proc installEventApiHandlers*(router: var RestRouter, node: BeaconNode) = block: var res: seq[Future[void]] if EventTopic.Head in eventTopics: - let handler = response.eventHandler(node, HeadChangeInfoObject, - "head-change", "head") + let handler = response.eventHandler(node.eventBus.headQueue, + "head") res.add(handler) if EventTopic.Block in eventTopics: - let handler = response.eventHandler(node, - ForkedTrustedSignedBeaconBlock, - "signed-beacon-block", "block") + let handler = response.eventHandler(node.eventBus.blocksQueue, + "block") res.add(handler) if EventTopic.Attestation in eventTopics: - let handler = response.eventHandler(node, Attestation, - "attestation-received", + let handler = response.eventHandler(node.eventBus.attestQueue, "attestation") res.add(handler) if EventTopic.VoluntaryExit in eventTopics: - let handler = response.eventHandler(node, SignedVoluntaryExit, - "voluntary-exit", + let handler = response.eventHandler(node.eventBus.exitQueue, "voluntary_exit") res.add(handler) if EventTopic.FinalizedCheckpoint in eventTopics: - let handler = response.eventHandler(node, FinalizationInfoObject, - "finalization", + let handler = response.eventHandler(node.eventBus.finalQueue, "finalized_checkpoint") res.add(handler) if EventTopic.ChainReorg in eventTopics: - let handler = response.eventHandler(node, ReorgInfoObject, - "chain-reorg", "chain_reorg") + let handler = response.eventHandler(node.eventBus.reorgQueue, + "chain_reorg") res.add(handler) if EventTopic.ContributionAndProof in eventTopics: - let handler = response.eventHandler(node, SignedContributionAndProof, - "sync-contribution-and-proof", + let handler = response.eventHandler(node.eventBus.contribQueue, "contribution_and_proof") res.add(handler) res diff --git a/vendor/nim-chronos b/vendor/nim-chronos index b3548583f..61fbbc551 160000 --- a/vendor/nim-chronos +++ b/vendor/nim-chronos @@ -1 +1 @@ -Subproject commit b3548583fcc768d93654685e7ea55126c1752c29 +Subproject commit 61fbbc551208aca182ff810661bdf37b08a377cd