nimbus-eth2/beacon_chain/rpc/rest_event_api.nim
Eugene Kabanov 7c9a6b7170
Improve chronos.Future tracking. (#2988)
* Add `child_id` field.

* Fix json-rpc api call and bump chronos.

* Bump chronos master and fix compilation warnings.

* One more bump of `chronos`.

* add random import

* export rest_utils a bit more

Co-authored-by: Jacek Sieka <jacek@status.im>
2021-10-27 14:01:11 +02:00

183 lines
6.8 KiB
Nim

# Copyright (c) 2018-2020 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import
stew/results,
chronicles,
./rest_utils,
../beacon_node
export rest_utils
logScope: topics = "rest_eventapi"
proc validateEventTopics(events: seq[EventTopic]): Result[EventTopics,
cstring] =
const NonUniqueError = cstring("Event topics must be unique")
var res: set[EventTopic]
for item in events:
case item
of EventTopic.Head:
if EventTopic.Head in res:
return err(NonUniqueError)
res.incl(EventTopic.Head)
of EventTopic.Block:
if EventTopic.Block in res:
return err(NonUniqueError)
res.incl(EventTopic.Block)
of EventTopic.Attestation:
if EventTopic.Attestation in res:
return err(NonUniqueError)
res.incl(EventTopic.Attestation)
of EventTopic.VoluntaryExit:
if EventTopic.VoluntaryExit in res:
return err(NonUniqueError)
res.incl(EventTopic.VoluntaryExit)
of EventTopic.FinalizedCheckpoint:
if EventTopic.FinalizedCheckpoint in res:
return err(NonUniqueError)
res.incl(EventTopic.FinalizedCheckpoint)
of EventTopic.ChainReorg:
if EventTopic.ChainReorg in res:
return err(NonUniqueError)
res.incl(EventTopic.ChainReorg)
of EventTopic.ContributionAndProof:
if EventTopic.ContributionAndProof in res:
return err(NonUniqueError)
res.incl(EventTopic.ContributionAndProof)
if res == {}:
err("Empty topics list")
else:
ok(res)
proc eventHandler*(response: HttpResponseRef, node: BeaconNode,
T: typedesc, event: string,
serverEvent: string) {.async.} =
var fut = node.eventBus.waitEvent(T, event)
while true:
let jsonRes =
try:
let res = await fut
when T is ForkedTrustedSignedBeaconBlock:
let blockInfo = RestBlockInfo.init(res)
some(RestApiResponse.prepareJsonStringResponse(blockInfo))
else:
some(RestApiResponse.prepareJsonStringResponse(res))
except CancelledError:
none[string]()
if jsonRes.isNone() or (response.state != HttpResponseState.Sending):
# Cancellation happened or connection with remote peer has been lost.
break
# Initiating new event waiting to avoid race conditions and event misses.
fut = node.eventBus.waitEvent(T, event)
# Sending event and payload over wire.
let exitLoop =
try:
await response.sendEvent(serverEvent, jsonRes.get())
false
except CancelledError:
true
except HttpError as exc:
debug "Unable to deliver event to remote peer", error_name = $exc.name,
error_msg = $exc.msg
true
except CatchableError as exc:
debug "Unexpected error encountered", error_name = $exc.name,
error_msg = $exc.msg
true
if exitLoop:
if not(fut.finished()):
await fut.cancelAndWait()
break
proc installEventApiHandlers*(router: var RestRouter, node: BeaconNode) =
# https://ethereum.github.io/beacon-APIs/#/Events/eventstream
router.api(MethodGet, "/api/eth/v1/events") do (
topics: seq[EventTopic]) -> RestApiResponse:
let eventTopics =
block:
if topics.isErr():
return RestApiResponse.jsonError(Http400, "Invalid topics value",
$topics.error())
let res = validateEventTopics(topics.get())
if res.isErr():
return RestApiResponse.jsonError(Http400, "Invalid topics value",
$res.error())
res.get()
let res = preferredContentType("text/event-stream")
if res.isErr():
return RestApiResponse.jsonError(Http406, ContentNotAcceptableError)
if res.get() != "text/event-stream":
return RestApiResponse.jsonError(Http500, InvalidAcceptError)
var response = request.getResponse()
response.keepAlive = false
try:
await response.prepareSSE()
except HttpError:
# It means that server failed to send HTTP response to the remote client
# so there no need to respond with HTTP error response.
return
let handlers =
block:
var res: seq[Future[void]]
if EventTopic.Head in eventTopics:
let handler = response.eventHandler(node, HeadChangeInfoObject,
"head-change", "head")
res.add(handler)
if EventTopic.Block in eventTopics:
let handler = response.eventHandler(node,
ForkedTrustedSignedBeaconBlock,
"signed-beacon-block", "block")
res.add(handler)
if EventTopic.Attestation in eventTopics:
let handler = response.eventHandler(node, Attestation,
"attestation-received",
"attestation")
res.add(handler)
if EventTopic.VoluntaryExit in eventTopics:
let handler = response.eventHandler(node, SignedVoluntaryExit,
"voluntary-exit",
"voluntary_exit")
res.add(handler)
if EventTopic.FinalizedCheckpoint in eventTopics:
let handler = response.eventHandler(node, FinalizationInfoObject,
"finalization",
"finalized_checkpoint")
res.add(handler)
if EventTopic.ChainReorg in eventTopics:
let handler = response.eventHandler(node, ReorgInfoObject,
"chain-reorg", "chain_reorg")
res.add(handler)
if EventTopic.ContributionAndProof in eventTopics:
let handler = response.eventHandler(node, SignedContributionAndProof,
"sync-contribution-and-proof",
"contribution_and_proof")
res.add(handler)
res
discard await one(handlers)
# One of the handlers finished, it means that connection has been droped, so
# we cancelling all other handlers.
let pending =
block:
var res: seq[Future[void]]
for fut in handlers:
if not(fut.finished()):
fut.cancel()
res.add(fut)
res
await allFutures(pending)
return
router.redirect(
MethodGet,
"/eth/v1/events",
"/api/eth/v1/events"
)