mirror of
https://github.com/status-im/nimbus-eth2.git
synced 2025-02-19 17:58:23 +00:00
Fix polling block monitor from deadlocks when connection has been lost. (#5109)
Fix events block monitor from losing connection forever. Add node to block received log statement.
This commit is contained in:
parent
c2c5d80a4f
commit
c534a285b9
@ -537,7 +537,7 @@ proc pollForEvents(service: BlockServiceRef, node: BeaconNodeServerRef,
|
|||||||
let blck = EventBeaconBlockObject.decodeString(event.data).valueOr:
|
let blck = EventBeaconBlockObject.decodeString(event.data).valueOr:
|
||||||
debug "Got invalid block event format", reason = error
|
debug "Got invalid block event format", reason = error
|
||||||
return
|
return
|
||||||
vc.registerBlock(blck)
|
vc.registerBlock(blck, node)
|
||||||
of "event":
|
of "event":
|
||||||
if event.data != "block":
|
if event.data != "block":
|
||||||
debug "Got unexpected event name field", event_name = event.name,
|
debug "Got unexpected event name field", event_name = event.name,
|
||||||
@ -560,7 +560,7 @@ proc runBlockEventMonitor(service: BlockServiceRef,
|
|||||||
|
|
||||||
while true:
|
while true:
|
||||||
while node.status notin statuses:
|
while node.status notin statuses:
|
||||||
await vc.waitNodes(nil, statuses, roles, false)
|
await vc.waitNodes(nil, statuses, roles, true)
|
||||||
|
|
||||||
let response =
|
let response =
|
||||||
block:
|
block:
|
||||||
@ -568,7 +568,7 @@ proc runBlockEventMonitor(service: BlockServiceRef,
|
|||||||
try:
|
try:
|
||||||
resp = await node.client.subscribeEventStream({EventTopic.Block})
|
resp = await node.client.subscribeEventStream({EventTopic.Block})
|
||||||
if resp.status == 200:
|
if resp.status == 200:
|
||||||
resp
|
Opt.some(resp)
|
||||||
else:
|
else:
|
||||||
let body = await resp.getBodyBytes()
|
let body = await resp.getBodyBytes()
|
||||||
await resp.closeWait()
|
await resp.closeWait()
|
||||||
@ -578,11 +578,11 @@ proc runBlockEventMonitor(service: BlockServiceRef,
|
|||||||
reason = plain.getErrorMessage()
|
reason = plain.getErrorMessage()
|
||||||
debug "Unable to to obtain events stream", code = resp.status,
|
debug "Unable to to obtain events stream", code = resp.status,
|
||||||
reason = reason
|
reason = reason
|
||||||
return
|
Opt.none(HttpClientResponseRef)
|
||||||
except RestError as exc:
|
except RestError as exc:
|
||||||
if not(isNil(resp)): await resp.closeWait()
|
if not(isNil(resp)): await resp.closeWait()
|
||||||
debug "Unable to obtain events stream", reason = $exc.msg
|
debug "Unable to obtain events stream", reason = $exc.msg
|
||||||
return
|
Opt.none(HttpClientResponseRef)
|
||||||
except CancelledError as exc:
|
except CancelledError as exc:
|
||||||
if not(isNil(resp)): await resp.closeWait()
|
if not(isNil(resp)): await resp.closeWait()
|
||||||
debug "Block monitoring loop has been interrupted"
|
debug "Block monitoring loop has been interrupted"
|
||||||
@ -591,17 +591,20 @@ proc runBlockEventMonitor(service: BlockServiceRef,
|
|||||||
if not(isNil(resp)): await resp.closeWait()
|
if not(isNil(resp)): await resp.closeWait()
|
||||||
warn "Got an unexpected error while trying to establish event stream",
|
warn "Got an unexpected error while trying to establish event stream",
|
||||||
reason = $exc.msg
|
reason = $exc.msg
|
||||||
return
|
Opt.none(HttpClientResponseRef)
|
||||||
|
|
||||||
try:
|
if response.isSome():
|
||||||
await service.pollForEvents(node, response)
|
debug "Block monitoring connection has been established"
|
||||||
except CancelledError as exc:
|
try:
|
||||||
raise exc
|
await service.pollForEvents(node, response.get())
|
||||||
except CatchableError as exc:
|
except CancelledError as exc:
|
||||||
warn "Got an unexpected error while receiving block events",
|
raise exc
|
||||||
reason = $exc.msg
|
except CatchableError as exc:
|
||||||
finally:
|
warn "Got an unexpected error while receiving block events",
|
||||||
await response.closeWait()
|
reason = $exc.msg
|
||||||
|
finally:
|
||||||
|
debug "Block monitoring connection has been lost"
|
||||||
|
await response.get().closeWait()
|
||||||
|
|
||||||
proc pollForBlockHeaders(service: BlockServiceRef, node: BeaconNodeServerRef,
|
proc pollForBlockHeaders(service: BlockServiceRef, node: BeaconNodeServerRef,
|
||||||
slot: Slot, waitTime: Duration,
|
slot: Slot, waitTime: Duration,
|
||||||
@ -646,7 +649,7 @@ proc pollForBlockHeaders(service: BlockServiceRef, node: BeaconNodeServerRef,
|
|||||||
block_root: blockHeader.data.root,
|
block_root: blockHeader.data.root,
|
||||||
optimistic: blockHeader.execution_optimistic
|
optimistic: blockHeader.execution_optimistic
|
||||||
)
|
)
|
||||||
vc.registerBlock(eventBlock)
|
vc.registerBlock(eventBlock, node)
|
||||||
return true
|
return true
|
||||||
|
|
||||||
proc runBlockPollMonitor(service: BlockServiceRef,
|
proc runBlockPollMonitor(service: BlockServiceRef,
|
||||||
@ -667,7 +670,7 @@ proc runBlockPollMonitor(service: BlockServiceRef,
|
|||||||
res.geT()
|
res.geT()
|
||||||
|
|
||||||
while node.status notin statuses:
|
while node.status notin statuses:
|
||||||
await vc.waitNodes(nil, statuses, roles, false)
|
await vc.waitNodes(nil, statuses, roles, true)
|
||||||
|
|
||||||
let
|
let
|
||||||
currentTime = vc.beaconClock.now()
|
currentTime = vc.beaconClock.now()
|
||||||
|
@ -1174,14 +1174,15 @@ proc expectBlock*(vc: ValidatorClientRef, slot: Slot,
|
|||||||
if not(retFuture.finished()): retFuture.cancelCallback = cancellation
|
if not(retFuture.finished()): retFuture.cancelCallback = cancellation
|
||||||
retFuture
|
retFuture
|
||||||
|
|
||||||
proc registerBlock*(vc: ValidatorClientRef, data: EventBeaconBlockObject) =
|
proc registerBlock*(vc: ValidatorClientRef, data: EventBeaconBlockObject,
|
||||||
|
node: BeaconNodeServerRef) =
|
||||||
let
|
let
|
||||||
wallTime = vc.beaconClock.now()
|
wallTime = vc.beaconClock.now()
|
||||||
delay = wallTime - data.slot.start_beacon_time()
|
delay = wallTime - data.slot.start_beacon_time()
|
||||||
|
|
||||||
debug "Block received", slot = data.slot,
|
debug "Block received", slot = data.slot,
|
||||||
block_root = shortLog(data.block_root), optimistic = data.optimistic,
|
block_root = shortLog(data.block_root), optimistic = data.optimistic,
|
||||||
delay = delay
|
node = node, delay = delay
|
||||||
|
|
||||||
proc scheduleCallbacks(data: var BlockDataItem,
|
proc scheduleCallbacks(data: var BlockDataItem,
|
||||||
blck: EventBeaconBlockObject) =
|
blck: EventBeaconBlockObject) =
|
||||||
|
Loading…
x
Reference in New Issue
Block a user