nimbus-eth2/beacon_chain/rpc/rest_event_api.nim
Agnish Ghosh c0108c2f2a
introduced custody groups, and renamed csc to cgc (#6789)
* introduced custody groups, and renamed csc to cgc

* accomodate tests

* revert to naming columns

* applied review changes

* updated all tests file

* addressed review 2

* merged in typo fixes by airdop farmers/other spam PRs

* handle lint ci

* shift to iterators, avoid redundant copying
2024-12-28 22:07:12 +01:00

186 lines
7.0 KiB
Nim

# beacon_chain
# Copyright (c) 2018-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
import
std/sequtils,
chronicles,
chronos/apps/http/httpserver,
./rest_utils,
../beacon_node
export rest_utils
logScope: topics = "rest_eventapi"
proc validateEventTopics(events: seq[EventTopic],
withLightClient: bool): Result[EventTopics, cstring] =
const NonUniqueError = cstring("Event topics must be unique")
const UnsupportedError = cstring("Unsupported event topic value")
var res: set[EventTopic]
for item in events:
if item in res:
return err(NonUniqueError)
if not withLightClient and item in [
EventTopic.LightClientFinalityUpdate,
EventTopic.LightClientOptimisticUpdate]:
return err(UnsupportedError)
res.incl(item)
if res == {}:
err("Empty topics list")
else:
ok(res)
proc eventHandler*[T](response: HttpResponseRef,
eventQueue: AsyncEventQueue[T],
serverEvent: string) {.async.} =
var empty: seq[T]
let key = eventQueue.register()
while true:
var exitLoop = false
let events =
try:
let res = await eventQueue.waitEvents(key)
res
except CancelledError:
empty
for event in events:
let jsonRes = RestApiResponse.prepareJsonStringResponse(event)
exitLoop =
if response.state != HttpResponseState.Sending:
true
else:
try:
await response.sendEvent(serverEvent, jsonRes)
false
except CancelledError:
true
except HttpError as exc:
debug "Unable to deliver event to remote peer",
error_name = $exc.name, error_msg = $exc.msg
true
except CatchableError as exc:
debug "Unexpected error encountered, while trying to deliver event",
error_name = $exc.name, error_msg = $exc.msg
true
if exitLoop:
break
if exitLoop or len(events) == 0:
break
eventQueue.unregister(key)
proc installEventApiHandlers*(router: var RestRouter, node: BeaconNode) =
# https://ethereum.github.io/beacon-APIs/#/Events/eventstream
router.api2(MethodGet, "/eth/v1/events") do (
topics: seq[EventTopic]) -> RestApiResponse:
let eventTopics =
block:
if topics.isErr():
return RestApiResponse.jsonError(Http400, "Invalid topics value",
$topics.error())
let res = validateEventTopics(topics.get(),
node.dag.lcDataStore.serve)
if res.isErr():
return RestApiResponse.jsonError(Http400, "Invalid topics value",
$res.error())
res.get()
let res = preferredContentType(textEventStreamMediaType)
if res.isErr():
return RestApiResponse.jsonError(Http406, ContentNotAcceptableError)
if res.get() != textEventStreamMediaType:
return RestApiResponse.jsonError(Http500, InvalidAcceptError)
var response = request.getResponse()
response.keepAlive = false
try:
await response.prepareSSE()
except HttpError:
# It means that server failed to send HTTP response to the remote client
# so there no need to respond with HTTP error response.
return
let handlers =
block:
var res: seq[Future[void]]
if EventTopic.Head in eventTopics:
let handler = response.eventHandler(node.eventBus.headQueue,
"head")
res.add(handler)
if EventTopic.Block in eventTopics:
let handler = response.eventHandler(node.eventBus.blocksQueue,
"block")
res.add(handler)
if EventTopic.Attestation in eventTopics:
let handler = response.eventHandler(node.eventBus.attestQueue,
"attestation")
res.add(handler)
if EventTopic.VoluntaryExit in eventTopics:
let handler = response.eventHandler(node.eventBus.exitQueue,
"voluntary_exit")
res.add(handler)
if EventTopic.BLSToExecutionChange in eventTopics:
let handler = response.eventHandler(node.eventBus.blsToExecQueue,
"bls_to_execution_change")
res.add(handler)
if EventTopic.ProposerSlashing in eventTopics:
let handler = response.eventHandler(node.eventBus.propSlashQueue,
"proposer_slashing")
res.add(handler)
if EventTopic.AttesterSlashing in eventTopics:
let handler = response.eventHandler(node.eventBus.attSlashQueue,
"attester_slashing")
res.add(handler)
if EventTopic.BlobSidecar in eventTopics:
let handler = response.eventHandler(node.eventBus.blobSidecarQueue,
"blob_sidecar")
res.add(handler)
if EventTopic.FinalizedCheckpoint in eventTopics:
let handler = response.eventHandler(node.eventBus.finalQueue,
"finalized_checkpoint")
res.add(handler)
if EventTopic.ChainReorg in eventTopics:
let handler = response.eventHandler(node.eventBus.reorgQueue,
"chain_reorg")
res.add(handler)
if EventTopic.ContributionAndProof in eventTopics:
let handler = response.eventHandler(node.eventBus.contribQueue,
"contribution_and_proof")
res.add(handler)
if EventTopic.LightClientFinalityUpdate in eventTopics:
doAssert node.dag.lcDataStore.serve
let handler = response.eventHandler(node.eventBus.finUpdateQueue,
"light_client_finality_update")
res.add(handler)
if EventTopic.LightClientOptimisticUpdate in eventTopics:
doAssert node.dag.lcDataStore.serve
let handler = response.eventHandler(node.eventBus.optUpdateQueue,
"light_client_optimistic_update")
res.add(handler)
res
try:
discard await race(handlers)
except ValueError:
raiseAssert "There should be more than one event handler at this point!"
# One of the handlers finished, it means that connection has been dropped, so
# we cancelling all other handlers.
let pending =
handlers.filterIt(not(it.finished())).mapIt(it.cancelAndWait())
await noCancel allFutures(pending)
return