remove call to `waitFor` from clock.now and make it async (#691)

This commit is contained in:
Giuliano Mega 2024-02-05 08:36:28 -03:00 committed by GitHub
parent ca0660b07e
commit 591be9446a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 43 additions and 40 deletions

View File

@ -1,6 +1,5 @@
import pkg/chronos import pkg/chronos
import pkg/stew/endians2 import pkg/stew/endians2
import pkg/upraises
import pkg/stint import pkg/stint
type type
@ -8,7 +7,7 @@ type
SecondsSince1970* = int64 SecondsSince1970* = int64
Timeout* = object of CatchableError Timeout* = object of CatchableError
method now*(clock: Clock): SecondsSince1970 {.base, upraises: [].} = method now*(clock: Clock): Future[SecondsSince1970] {.base, async.} =
raiseAssert "not implemented" raiseAssert "not implemented"
method waitUntil*(clock: Clock, time: SecondsSince1970) {.base, async.} = method waitUntil*(clock: Clock, time: SecondsSince1970) {.base, async.} =

View File

@ -46,12 +46,12 @@ method stop*(clock: OnChainClock) {.async.} =
await clock.subscription.unsubscribe() await clock.subscription.unsubscribe()
clock.started = false clock.started = false
method now*(clock: OnChainClock): SecondsSince1970 = method now*(clock: OnChainClock): Future[SecondsSince1970] {.async.} =
when codex_use_hardhat: when codex_use_hardhat:
# hardhat's latest block.timestamp is usually 1s behind the block timestamp # hardhat's latest block.timestamp is usually 1s behind the block timestamp
# in the newHeads event. When testing, always return the latest block. # in the newHeads event. When testing, always return the latest block.
try: try:
if queriedBlock =? (waitFor clock.provider.getBlock(BlockTag.latest)): if queriedBlock =? (await clock.provider.getBlock(BlockTag.latest)):
trace "using last block timestamp for clock.now", trace "using last block timestamp for clock.now",
lastBlockTimestamp = queriedBlock.timestamp.truncate(int64), lastBlockTimestamp = queriedBlock.timestamp.truncate(int64),
cachedBlockTimestamp = clock.lastBlockTime.truncate(int64) cachedBlockTimestamp = clock.lastBlockTime.truncate(int64)
@ -65,6 +65,6 @@ method now*(clock: OnChainClock): SecondsSince1970 =
return toUnix(getTime() + clock.offset) return toUnix(getTime() + clock.offset)
method waitUntil*(clock: OnChainClock, time: SecondsSince1970) {.async.} = method waitUntil*(clock: OnChainClock, time: SecondsSince1970) {.async.} =
while (let difference = time - clock.now(); difference > 0): while (let difference = time - (await clock.now()); difference > 0):
clock.newBlock.clear() clock.newBlock.clear()
discard await clock.newBlock.wait().withTimeout(chronos.seconds(difference)) discard await clock.newBlock.wait().withTimeout(chronos.seconds(difference))

View File

@ -53,7 +53,7 @@ proc populate*(purchasing: Purchasing,
if result.ask.proofProbability == 0.u256: if result.ask.proofProbability == 0.u256:
result.ask.proofProbability = purchasing.proofProbability result.ask.proofProbability = purchasing.proofProbability
if result.expiry == 0.u256: if result.expiry == 0.u256:
result.expiry = (purchasing.clock.now().u256 + purchasing.requestExpiryInterval) result.expiry = (await purchasing.clock.now()).u256 + purchasing.requestExpiryInterval
if result.nonce == Nonce.default: if result.nonce == Nonce.default:
var id = result.nonce.toArray var id = result.nonce.toArray
doAssert randomBytes(id) == 32 doAssert randomBytes(id) == 32

View File

@ -328,10 +328,12 @@ proc initPurchasingApi(node: CodexNodeRef, router: var RestRouter) =
if node.clock.isNil: if node.clock.isNil:
return RestApiResponse.error(Http500) return RestApiResponse.error(Http500)
if expiry <= node.clock.now.u256: let now = (await node.clock.now).u256
if expiry <= now:
return RestApiResponse.error(Http400, "Expiry needs to be in future") return RestApiResponse.error(Http400, "Expiry needs to be in future")
if expiry > node.clock.now.u256 + params.duration: if expiry > now + params.duration:
return RestApiResponse.error(Http400, "Expiry has to be before the request's end (now + duration)") return RestApiResponse.error(Http400, "Expiry has to be before the request's end (now + duration)")
without purchaseId =? await node.requestStorage( without purchaseId =? await node.requestStorage(

View File

@ -73,8 +73,9 @@ proc subscribeCancellation(agent: SalesAgent) {.async.} =
return return
while true: while true:
let deadline = max(clock.now, request.expiry.truncate(int64)) + 1 let now = await clock.now
trace "Waiting for request to be cancelled", now=clock.now, expiry=deadline let deadline = max(now, request.expiry.truncate(int64)) + 1
trace "Waiting for request to be cancelled", now=now, expiry=deadline
await clock.waitUntil(deadline) await clock.waitUntil(deadline)
without state =? await agent.retrieveRequestState(): without state =? await agent.retrieveRequestState():
@ -85,7 +86,7 @@ proc subscribeCancellation(agent: SalesAgent) {.async.} =
agent.schedule(cancelledEvent(request)) agent.schedule(cancelledEvent(request))
break break
debug "The request is not yet canceled, even though it should be. Waiting for some more time.", currentState = state, now=clock.now debug "The request is not yet canceled, even though it should be. Waiting for some more time.", currentState = state, now=now
data.cancelled = onCancelled() data.cancelled = onCancelled()

View File

@ -57,7 +57,7 @@ proc proveLoop(
proc getCurrentPeriod(): Future[Period] {.async.} = proc getCurrentPeriod(): Future[Period] {.async.} =
let periodicity = await market.periodicity() let periodicity = await market.periodicity()
return periodicity.periodOf(clock.now().u256) return periodicity.periodOf((await clock.now()).u256)
proc waitUntilPeriod(period: Period) {.async.} = proc waitUntilPeriod(period: Period) {.async.} =
let periodicity = await market.periodicity() let periodicity = await market.periodicity()

View File

@ -59,7 +59,7 @@ proc deleteExpiredBlock(self: BlockMaintainer, cid: Cid): Future[void] {.async.}
trace "Unable to delete block from repoStore" trace "Unable to delete block from repoStore"
proc processBlockExpiration(self: BlockMaintainer, be: BlockExpiration): Future[void] {.async} = proc processBlockExpiration(self: BlockMaintainer, be: BlockExpiration): Future[void] {.async} =
if be.expiration < self.clock.now: if be.expiration < (await self.clock.now):
await self.deleteExpiredBlock(be.cid) await self.deleteExpiredBlock(be.cid)
else: else:
inc self.offset inc self.offset

View File

@ -229,12 +229,12 @@ proc getBlockExpirationEntry(
proc getBlockExpirationEntry( proc getBlockExpirationEntry(
self: RepoStore, self: RepoStore,
cid: Cid, cid: Cid,
ttl: ?Duration): ?!BatchEntry = ttl: ?Duration): Future[?!BatchEntry] {.async.} =
## Get an expiration entry for a batch for duration since "now" ## Get an expiration entry for a batch for duration since "now"
## ##
let duration = ttl |? self.blockTtl let duration = ttl |? self.blockTtl
self.getBlockExpirationEntry(cid, self.clock.now() + duration.seconds) self.getBlockExpirationEntry(cid, (await self.clock.now()) + duration.seconds)
method ensureExpiry*( method ensureExpiry*(
self: RepoStore, self: RepoStore,
@ -340,7 +340,7 @@ method putBlock*(
trace "Updating quota", used trace "Updating quota", used
batch.add((QuotaUsedKey, @(used.uint64.toBytesBE))) batch.add((QuotaUsedKey, @(used.uint64.toBytesBE)))
without blockExpEntry =? self.getBlockExpirationEntry(blk.cid, ttl), err: without blockExpEntry =? (await self.getBlockExpirationEntry(blk.cid, ttl)), err:
trace "Unable to create block expiration metadata key", err = err.msg trace "Unable to create block expiration metadata key", err = err.msg
return failure(err) return failure(err)
batch.add(blockExpEntry) batch.add(blockExpEntry)

View File

@ -1,10 +1,10 @@
import std/times import std/times
import pkg/upraises import pkg/chronos
import ./clock import ./clock
type type
SystemClock* = ref object of Clock SystemClock* = ref object of Clock
method now*(clock: SystemClock): SecondsSince1970 {.upraises: [].} = method now*(clock: SystemClock): Future[SecondsSince1970] {.async.} =
let now = times.now().utc let now = times.now().utc
now.toTime().toUnix() now.toTime().toUnix()

View File

@ -34,11 +34,11 @@ proc new*(
proc slots*(validation: Validation): seq[SlotId] = proc slots*(validation: Validation): seq[SlotId] =
validation.slots.toSeq validation.slots.toSeq
proc getCurrentPeriod(validation: Validation): UInt256 = proc getCurrentPeriod(validation: Validation): Future[UInt256] {.async.} =
return validation.periodicity.periodOf(validation.clock.now().u256) return validation.periodicity.periodOf((await validation.clock.now()).u256)
proc waitUntilNextPeriod(validation: Validation) {.async.} = proc waitUntilNextPeriod(validation: Validation) {.async.} =
let period = validation.getCurrentPeriod() let period = await validation.getCurrentPeriod()
let periodEnd = validation.periodicity.periodEnd(period) let periodEnd = validation.periodicity.periodEnd(period)
trace "Waiting until next period", currentPeriod = period trace "Waiting until next period", currentPeriod = period
await validation.clock.waitUntil(periodEnd.truncate(int64) + 1) await validation.clock.waitUntil(periodEnd.truncate(int64) + 1)
@ -66,7 +66,7 @@ proc markProofAsMissing(validation: Validation,
slotId: SlotId, slotId: SlotId,
period: Period) {.async.} = period: Period) {.async.} =
logScope: logScope:
currentPeriod = validation.getCurrentPeriod() currentPeriod = (await validation.getCurrentPeriod())
try: try:
if await validation.market.canProofBeMarkedAsMissing(slotId, period): if await validation.market.canProofBeMarkedAsMissing(slotId, period):
@ -82,7 +82,7 @@ proc markProofAsMissing(validation: Validation,
proc markProofsAsMissing(validation: Validation) {.async.} = proc markProofsAsMissing(validation: Validation) {.async.} =
for slotId in validation.slots: for slotId in validation.slots:
let previousPeriod = validation.getCurrentPeriod() - 1 let previousPeriod = (await validation.getCurrentPeriod()) - 1
await validation.markProofAsMissing(slotId, previousPeriod) await validation.markProofAsMissing(slotId, previousPeriod)
proc run(validation: Validation) {.async.} = proc run(validation: Validation) {.async.} =

View File

@ -32,11 +32,11 @@ proc set*(clock: MockClock, time: SecondsSince1970) =
proc advance*(clock: MockClock, seconds: int64) = proc advance*(clock: MockClock, seconds: int64) =
clock.set(clock.time + seconds) clock.set(clock.time + seconds)
method now*(clock: MockClock): SecondsSince1970 = method now*(clock: MockClock): Future[SecondsSince1970] {.async.} =
clock.time clock.time
method waitUntil*(clock: MockClock, time: SecondsSince1970) {.async.} = method waitUntil*(clock: MockClock, time: SecondsSince1970) {.async.} =
if time > clock.now(): if time > (await clock.now()):
let future = newFuture[void]() let future = newFuture[void]()
clock.waiting.add(Waiting(until: time, future: future)) clock.waiting.add(Waiting(until: time, future: future))
await future await future

View File

@ -98,7 +98,7 @@ asyncchecksuite "Test Node - Host contracts":
test "onExpiryUpdate callback": test "onExpiryUpdate callback":
let let
# The blocks have set default TTL, so in order to update it we have to have larger TTL # The blocks have set default TTL, so in order to update it we have to have larger TTL
expectedExpiry: SecondsSince1970 = clock.now + DefaultBlockTtl.seconds + 11123 expectedExpiry: SecondsSince1970 = (await clock.now) + DefaultBlockTtl.seconds + 11123
expiryUpdateCallback = !sales.onExpiryUpdate expiryUpdateCallback = !sales.onExpiryUpdate
(await expiryUpdateCallback(manifestCidStr, expectedExpiry)).tryGet() (await expiryUpdateCallback(manifestCidStr, expectedExpiry)).tryGet()

View File

@ -67,7 +67,7 @@ asyncchecksuite "Sales - start":
sales.onProve = proc(slot: Slot, challenge: ProofChallenge): Future[?!seq[byte]] {.async.} = sales.onProve = proc(slot: Slot, challenge: ProofChallenge): Future[?!seq[byte]] {.async.} =
return success(proof) return success(proof)
itemsProcessed = @[] itemsProcessed = @[]
request.expiry = (clock.now() + 42).u256 request.expiry = ((await clock.now()) + 42).u256
teardown: teardown:
await sales.stop() await sales.stop()

View File

@ -161,7 +161,7 @@ checksuite "Purchasing state machine":
market.requestState[request5.id] = RequestState.Failed market.requestState[request5.id] = RequestState.Failed
# ensure the started state doesn't error, giving a false positive test result # ensure the started state doesn't error, giving a false positive test result
market.requestEnds[request2.id] = clock.now() - 1 market.requestEnds[request2.id] = (await clock.now()) - 1
await purchasing.load() await purchasing.load()
check eventually purchasing.getPurchase(PurchaseId(request1.id)).?finished == false.some check eventually purchasing.getPurchase(PurchaseId(request1.id)).?finished == false.some
@ -182,7 +182,7 @@ checksuite "Purchasing state machine":
test "moves to PurchaseStarted when request state is Started": test "moves to PurchaseStarted when request state is Started":
let request = StorageRequest.example let request = StorageRequest.example
let purchase = Purchase.new(request, market, clock) let purchase = Purchase.new(request, market, clock)
market.requestEnds[request.id] = clock.now() + request.ask.duration.truncate(int64) market.requestEnds[request.id] = (await clock.now()) + request.ask.duration.truncate(int64)
market.requested = @[request] market.requested = @[request]
market.requestState[request.id] = RequestState.Started market.requestState[request.id] = RequestState.Started
let next = await PurchaseUnknown().run(purchase) let next = await PurchaseUnknown().run(purchase)
@ -215,7 +215,7 @@ checksuite "Purchasing state machine":
test "moves to PurchaseFailed state once RequestFailed emitted": test "moves to PurchaseFailed state once RequestFailed emitted":
let request = StorageRequest.example let request = StorageRequest.example
let purchase = Purchase.new(request, market, clock) let purchase = Purchase.new(request, market, clock)
market.requestEnds[request.id] = clock.now() + request.ask.duration.truncate(int64) market.requestEnds[request.id] = (await clock.now()) + request.ask.duration.truncate(int64)
let future = PurchaseStarted().run(purchase) let future = PurchaseStarted().run(purchase)
market.emitRequestFailed(request.id) market.emitRequestFailed(request.id)
@ -226,7 +226,7 @@ checksuite "Purchasing state machine":
test "moves to PurchaseFinished state once request finishes": test "moves to PurchaseFinished state once request finishes":
let request = StorageRequest.example let request = StorageRequest.example
let purchase = Purchase.new(request, market, clock) let purchase = Purchase.new(request, market, clock)
market.requestEnds[request.id] = clock.now() + request.ask.duration.truncate(int64) market.requestEnds[request.id] = (await clock.now()) + request.ask.duration.truncate(int64)
let future = PurchaseStarted().run(purchase) let future = PurchaseStarted().run(purchase)
clock.advance(request.ask.duration.truncate(int64)) clock.advance(request.ask.duration.truncate(int64))

View File

@ -1,14 +1,15 @@
import std/times import std/times
import std/unittest
import codex/systemclock import codex/systemclock
import ../asynctest
import ./helpers import ./helpers
checksuite "SystemClock": asyncchecksuite "SystemClock":
test "Should get now": test "Should get now":
let clock = SystemClock.new() let clock = SystemClock.new()
let expectedNow = times.now().utc let expectedNow = times.now().utc
let now = clock.now() let now = (await clock.now())
check now == expectedNow.toTime().toUnix() check now == expectedNow.toTime().toUnix()

View File

@ -31,9 +31,9 @@ asyncchecksuite "validation":
teardown: teardown:
await validation.stop() await validation.stop()
proc advanceToNextPeriod = proc advanceToNextPeriod {.async.} =
let periodicity = Periodicity(seconds: period.u256) let periodicity = Periodicity(seconds: period.u256)
let period = periodicity.periodOf(clock.now().u256) let period = periodicity.periodOf((await clock.now()).u256)
let periodEnd = periodicity.periodEnd(period) let periodEnd = periodicity.periodEnd(period)
clock.set((periodEnd + 1).truncate(int)) clock.set((periodEnd + 1).truncate(int))

View File

@ -17,23 +17,23 @@ ethersuite "On-Chain Clock":
test "returns the current time of the EVM": test "returns the current time of the EVM":
let latestBlock = (!await ethProvider.getBlock(BlockTag.latest)) let latestBlock = (!await ethProvider.getBlock(BlockTag.latest))
let timestamp = latestBlock.timestamp.truncate(int64) let timestamp = latestBlock.timestamp.truncate(int64)
check clock.now() == timestamp check (await clock.now()) == timestamp
test "updates time with timestamp of new blocks": test "updates time with timestamp of new blocks":
let future = (getTime() + 42.years).toUnix let future = (getTime() + 42.years).toUnix
discard await ethProvider.send("evm_setNextBlockTimestamp", @[%future]) discard await ethProvider.send("evm_setNextBlockTimestamp", @[%future])
discard await ethProvider.send("evm_mine") discard await ethProvider.send("evm_mine")
check clock.now() == future check (await clock.now()) == future
test "can wait until a certain time is reached by the chain": test "can wait until a certain time is reached by the chain":
let future = clock.now() + 42 # seconds let future = (await clock.now()) + 42 # seconds
let waiting = clock.waitUntil(future) let waiting = clock.waitUntil(future)
discard await ethProvider.send("evm_setNextBlockTimestamp", @[%future]) discard await ethProvider.send("evm_setNextBlockTimestamp", @[%future])
discard await ethProvider.send("evm_mine") discard await ethProvider.send("evm_mine")
check await waiting.withTimeout(chronos.milliseconds(100)) check await waiting.withTimeout(chronos.milliseconds(100))
test "can wait until a certain time is reached by the wall-clock": test "can wait until a certain time is reached by the wall-clock":
let future = clock.now() + 1 # seconds let future = (await clock.now()) + 1 # seconds
let waiting = clock.waitUntil(future) let waiting = clock.waitUntil(future)
check await waiting.withTimeout(chronos.seconds(2)) check await waiting.withTimeout(chronos.seconds(2))