2022-06-19 07:57:52 +02:00
|
|
|
# beacon_chain
|
2023-01-20 14:14:37 +00:00
|
|
|
# Copyright (c) 2018-2023 Status Research & Development GmbH
|
2021-03-24 00:50:18 +02:00
|
|
|
# Licensed and distributed under either of
|
|
|
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
|
|
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
|
|
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
|
|
|
|
2023-01-20 14:14:37 +00:00
|
|
|
{.push raises: [].}
|
2022-06-19 07:57:52 +02:00
|
|
|
|
2021-03-24 00:50:18 +02:00
|
|
|
import
|
2023-09-22 14:06:27 +03:00
|
|
|
std/sequtils,
|
2021-03-24 00:50:18 +02:00
|
|
|
stew/results,
|
|
|
|
chronicles,
|
2021-08-03 17:17:11 +02:00
|
|
|
./rest_utils,
|
2021-10-19 16:09:26 +02:00
|
|
|
../beacon_node
|
2021-03-24 00:50:18 +02:00
|
|
|
|
2021-10-27 15:01:11 +03:00
|
|
|
export rest_utils
|
|
|
|
|
2021-03-24 00:50:18 +02:00
|
|
|
logScope: topics = "rest_eventapi"
|
|
|
|
|
2022-06-19 07:57:52 +02:00
|
|
|
proc validateEventTopics(events: seq[EventTopic],
|
|
|
|
withLightClient: bool): Result[EventTopics, cstring] =
|
2021-03-24 00:50:18 +02:00
|
|
|
const NonUniqueError = cstring("Event topics must be unique")
|
2022-06-19 07:57:52 +02:00
|
|
|
const UnsupportedError = cstring("Unsupported event topic value")
|
2021-03-24 00:50:18 +02:00
|
|
|
var res: set[EventTopic]
|
|
|
|
for item in events:
|
2022-06-19 07:57:52 +02:00
|
|
|
if item in res:
|
|
|
|
return err(NonUniqueError)
|
|
|
|
if not withLightClient and item in [
|
|
|
|
EventTopic.LightClientFinalityUpdate,
|
|
|
|
EventTopic.LightClientOptimisticUpdate]:
|
|
|
|
return err(UnsupportedError)
|
|
|
|
res.incl(item)
|
|
|
|
|
2021-03-24 00:50:18 +02:00
|
|
|
if res == {}:
|
|
|
|
err("Empty topics list")
|
|
|
|
else:
|
|
|
|
ok(res)
|
|
|
|
|
2022-06-17 18:27:28 +03:00
|
|
|
proc eventHandler*[T](response: HttpResponseRef,
|
|
|
|
eventQueue: AsyncEventQueue[T],
|
|
|
|
serverEvent: string) {.async.} =
|
|
|
|
var empty: seq[T]
|
|
|
|
let key = eventQueue.register()
|
|
|
|
|
2021-09-22 15:17:15 +03:00
|
|
|
while true:
|
2022-06-17 18:27:28 +03:00
|
|
|
var exitLoop = false
|
|
|
|
|
|
|
|
let events =
|
2021-09-22 15:17:15 +03:00
|
|
|
try:
|
2022-06-17 18:27:28 +03:00
|
|
|
let res = await eventQueue.waitEvents(key)
|
|
|
|
res
|
|
|
|
except CancelledError:
|
|
|
|
empty
|
|
|
|
|
|
|
|
for event in events:
|
2022-06-20 08:53:39 +03:00
|
|
|
let jsonRes = RestApiResponse.prepareJsonStringResponse(event)
|
2022-06-17 18:27:28 +03:00
|
|
|
|
|
|
|
exitLoop =
|
|
|
|
if response.state != HttpResponseState.Sending:
|
|
|
|
true
|
|
|
|
else:
|
|
|
|
try:
|
|
|
|
await response.sendEvent(serverEvent, jsonRes)
|
|
|
|
false
|
|
|
|
except CancelledError:
|
|
|
|
true
|
|
|
|
except HttpError as exc:
|
|
|
|
debug "Unable to deliver event to remote peer",
|
|
|
|
error_name = $exc.name, error_msg = $exc.msg
|
|
|
|
true
|
|
|
|
except CatchableError as exc:
|
|
|
|
debug "Unexpected error encountered, while trying to deliver event",
|
|
|
|
error_name = $exc.name, error_msg = $exc.msg
|
|
|
|
true
|
|
|
|
|
|
|
|
if exitLoop:
|
|
|
|
break
|
|
|
|
|
|
|
|
if exitLoop or len(events) == 0:
|
2021-09-22 15:17:15 +03:00
|
|
|
break
|
|
|
|
|
2022-06-17 18:27:28 +03:00
|
|
|
eventQueue.unregister(key)
|
|
|
|
|
2021-03-24 00:50:18 +02:00
|
|
|
proc installEventApiHandlers*(router: var RestRouter, node: BeaconNode) =
|
2021-08-23 13:41:48 +03:00
|
|
|
# https://ethereum.github.io/beacon-APIs/#/Events/eventstream
|
2022-01-06 08:38:40 +01:00
|
|
|
router.api(MethodGet, "/eth/v1/events") do (
|
2021-03-24 00:50:18 +02:00
|
|
|
topics: seq[EventTopic]) -> RestApiResponse:
|
|
|
|
let eventTopics =
|
|
|
|
block:
|
|
|
|
if topics.isErr():
|
|
|
|
return RestApiResponse.jsonError(Http400, "Invalid topics value",
|
|
|
|
$topics.error())
|
2022-06-19 07:57:52 +02:00
|
|
|
let res = validateEventTopics(topics.get(),
|
2022-06-24 16:57:50 +02:00
|
|
|
node.dag.lcDataStore.serve)
|
2021-03-24 00:50:18 +02:00
|
|
|
if res.isErr():
|
|
|
|
return RestApiResponse.jsonError(Http400, "Invalid topics value",
|
|
|
|
$res.error())
|
|
|
|
res.get()
|
|
|
|
|
2022-01-21 18:52:34 +02:00
|
|
|
let res = preferredContentType(textEventStreamMediaType)
|
2022-08-19 13:30:07 +03:00
|
|
|
|
2021-09-22 15:17:15 +03:00
|
|
|
if res.isErr():
|
|
|
|
return RestApiResponse.jsonError(Http406, ContentNotAcceptableError)
|
2022-01-21 18:52:34 +02:00
|
|
|
if res.get() != textEventStreamMediaType:
|
2021-09-22 15:17:15 +03:00
|
|
|
return RestApiResponse.jsonError(Http500, InvalidAcceptError)
|
|
|
|
|
|
|
|
var response = request.getResponse()
|
|
|
|
response.keepAlive = false
|
|
|
|
try:
|
|
|
|
await response.prepareSSE()
|
|
|
|
except HttpError:
|
|
|
|
# It means that server failed to send HTTP response to the remote client
|
|
|
|
# so there no need to respond with HTTP error response.
|
|
|
|
return
|
|
|
|
|
|
|
|
let handlers =
|
|
|
|
block:
|
|
|
|
var res: seq[Future[void]]
|
|
|
|
if EventTopic.Head in eventTopics:
|
2022-06-17 18:27:28 +03:00
|
|
|
let handler = response.eventHandler(node.eventBus.headQueue,
|
|
|
|
"head")
|
2021-09-22 15:17:15 +03:00
|
|
|
res.add(handler)
|
|
|
|
if EventTopic.Block in eventTopics:
|
2022-06-17 18:27:28 +03:00
|
|
|
let handler = response.eventHandler(node.eventBus.blocksQueue,
|
|
|
|
"block")
|
2021-09-22 15:17:15 +03:00
|
|
|
res.add(handler)
|
|
|
|
if EventTopic.Attestation in eventTopics:
|
2022-06-17 18:27:28 +03:00
|
|
|
let handler = response.eventHandler(node.eventBus.attestQueue,
|
2021-09-22 15:17:15 +03:00
|
|
|
"attestation")
|
|
|
|
res.add(handler)
|
|
|
|
if EventTopic.VoluntaryExit in eventTopics:
|
2022-06-17 18:27:28 +03:00
|
|
|
let handler = response.eventHandler(node.eventBus.exitQueue,
|
2021-09-22 15:17:15 +03:00
|
|
|
"voluntary_exit")
|
|
|
|
res.add(handler)
|
|
|
|
if EventTopic.FinalizedCheckpoint in eventTopics:
|
2022-06-17 18:27:28 +03:00
|
|
|
let handler = response.eventHandler(node.eventBus.finalQueue,
|
2021-09-22 15:17:15 +03:00
|
|
|
"finalized_checkpoint")
|
|
|
|
res.add(handler)
|
|
|
|
if EventTopic.ChainReorg in eventTopics:
|
2022-06-17 18:27:28 +03:00
|
|
|
let handler = response.eventHandler(node.eventBus.reorgQueue,
|
|
|
|
"chain_reorg")
|
2021-09-22 15:17:15 +03:00
|
|
|
res.add(handler)
|
|
|
|
if EventTopic.ContributionAndProof in eventTopics:
|
2022-06-17 18:27:28 +03:00
|
|
|
let handler = response.eventHandler(node.eventBus.contribQueue,
|
2021-09-22 15:17:15 +03:00
|
|
|
"contribution_and_proof")
|
|
|
|
res.add(handler)
|
2022-06-19 07:57:52 +02:00
|
|
|
if EventTopic.LightClientFinalityUpdate in eventTopics:
|
2022-06-24 16:57:50 +02:00
|
|
|
doAssert node.dag.lcDataStore.serve
|
2022-06-19 07:57:52 +02:00
|
|
|
let handler = response.eventHandler(node.eventBus.finUpdateQueue,
|
2022-10-12 19:16:49 -05:00
|
|
|
"light_client_finality_update")
|
2022-06-19 07:57:52 +02:00
|
|
|
res.add(handler)
|
|
|
|
if EventTopic.LightClientOptimisticUpdate in eventTopics:
|
2022-06-24 16:57:50 +02:00
|
|
|
doAssert node.dag.lcDataStore.serve
|
2022-06-19 07:57:52 +02:00
|
|
|
let handler = response.eventHandler(node.eventBus.optUpdateQueue,
|
2022-10-12 19:16:49 -05:00
|
|
|
"light_client_optimistic_update")
|
2022-06-19 07:57:52 +02:00
|
|
|
res.add(handler)
|
2021-09-22 15:17:15 +03:00
|
|
|
res
|
|
|
|
|
|
|
|
discard await one(handlers)
|
|
|
|
# One of the handlers finished, it means that connection has been droped, so
|
|
|
|
# we cancelling all other handlers.
|
|
|
|
let pending =
|
2023-09-22 14:06:27 +03:00
|
|
|
handlers.filterIt(not(it.finished())).mapIt(it.cancelAndWait())
|
|
|
|
await noCancel allFutures(pending)
|
2021-09-22 15:17:15 +03:00
|
|
|
return
|