mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-02 21:43:11 +00:00
* cleanup imports and logs * add BlockHandle type * revert deps * refactor: async error handling and future tracking improvements - Update async procedures to use explicit raises annotation - Modify TrackedFutures to handle futures with no raised exceptions - Replace `asyncSpawn` with explicit future tracking - Update test suites to use `unittest2` - Standardize error handling across network and async components - Remove deprecated error handling patterns This commit introduces a more robust approach to async error handling and future management, improving type safety and reducing potential runtime errors. * bump nim-serde * remove asyncSpawn * rework background downloads and prefetch * imporove logging * refactor: enhance async procedures with error handling and raise annotations * misc cleanup * misc * refactor: implement allFinishedFailed to aggregate future results with success and failure tracking * refactor: update error handling in reader procedures to raise ChunkerError and CancelledError * refactor: improve error handling in wantListHandler and accountHandler procedures * refactor: simplify LPStreamReadError creation by consolidating parameters * refactor: enhance error handling in AsyncStreamWrapper to catch unexpected errors * refactor: enhance error handling in advertiser and discovery loops to improve resilience * misc * refactor: improve code structure and readability * remove cancellation from addSlotToQueue * refactor: add assertion for unexpected errors in local store checks * refactor: prevent tracking of finished futures and improve test assertions * refactor: improve error handling in local store checks * remove usage of msgDetail * feat: add initial implementation of discovery engine and related components * refactor: improve task scheduling logic by removing unnecessary break statement * break after scheduling a task * make taskHandler cancelable * refactor: update async handlers to raise CancelledError * refactor(advertiser): streamline error handling and improve task flow in advertise loops * fix: correct spelling of "divisible" in error messages and comments * refactor(discovery): simplify discovery task loop and improve error handling * refactor(engine): filter peers before processing in cancelBlocks procedure
79 lines
2.3 KiB
Nim
79 lines
2.3 KiB
Nim
import std/times
|
|
import pkg/ethers
|
|
import pkg/questionable
|
|
import pkg/chronos
|
|
import pkg/stint
|
|
import ../clock
|
|
import ../conf
|
|
import ../utils/trackedfutures
|
|
|
|
export clock
|
|
|
|
logScope:
|
|
topics = "contracts clock"
|
|
|
|
type OnChainClock* = ref object of Clock
|
|
provider: Provider
|
|
subscription: Subscription
|
|
offset: times.Duration
|
|
blockNumber: UInt256
|
|
started: bool
|
|
newBlock: AsyncEvent
|
|
trackedFutures: TrackedFutures
|
|
|
|
proc new*(_: type OnChainClock, provider: Provider): OnChainClock =
|
|
OnChainClock(
|
|
provider: provider, newBlock: newAsyncEvent(), trackedFutures: TrackedFutures()
|
|
)
|
|
|
|
proc update(clock: OnChainClock, blck: Block) =
|
|
if number =? blck.number and number > clock.blockNumber:
|
|
let blockTime = initTime(blck.timestamp.truncate(int64), 0)
|
|
let computerTime = getTime()
|
|
clock.offset = blockTime - computerTime
|
|
clock.blockNumber = number
|
|
trace "updated clock",
|
|
blockTime = blck.timestamp, blockNumber = number, offset = clock.offset
|
|
clock.newBlock.fire()
|
|
|
|
proc update(clock: OnChainClock) {.async: (raises: []).} =
|
|
try:
|
|
if latest =? (await clock.provider.getBlock(BlockTag.latest)):
|
|
clock.update(latest)
|
|
except CatchableError as error:
|
|
debug "error updating clock: ", error = error.msg
|
|
|
|
method start*(clock: OnChainClock) {.async.} =
|
|
if clock.started:
|
|
return
|
|
|
|
proc onBlock(blckResult: ?!Block) =
|
|
if eventError =? blckResult.errorOption:
|
|
error "There was an error in block subscription", msg = eventError.msg
|
|
return
|
|
|
|
# ignore block parameter; hardhat may call this with pending blocks
|
|
clock.trackedFutures.track(clock.update())
|
|
|
|
await clock.update()
|
|
|
|
clock.subscription = await clock.provider.subscribe(onBlock)
|
|
clock.started = true
|
|
|
|
method stop*(clock: OnChainClock) {.async.} =
|
|
if not clock.started:
|
|
return
|
|
|
|
await clock.subscription.unsubscribe()
|
|
await clock.trackedFutures.cancelTracked()
|
|
clock.started = false
|
|
|
|
method now*(clock: OnChainClock): SecondsSince1970 =
|
|
doAssert clock.started, "clock should be started before calling now()"
|
|
return toUnix(getTime() + clock.offset)
|
|
|
|
method waitUntil*(clock: OnChainClock, time: SecondsSince1970) {.async.} =
|
|
while (let difference = time - clock.now(); difference > 0):
|
|
clock.newBlock.clear()
|
|
discard await clock.newBlock.wait().withTimeout(chronos.seconds(difference))
|