diff --git a/codex/clock.nim b/codex/clock.nim index 88df94da..f680ddec 100644 --- a/codex/clock.nim +++ b/codex/clock.nim @@ -1,6 +1,7 @@ import pkg/chronos import pkg/stew/endians2 import pkg/upraises +import pkg/stint type Clock* = ref object of RootObj @@ -13,6 +14,12 @@ method now*(clock: Clock): SecondsSince1970 {.base, upraises: [].} = method waitUntil*(clock: Clock, time: SecondsSince1970) {.base, async.} = raiseAssert "not implemented" +method start*(clock: Clock) {.base, async.} = + discard + +method stop*(clock: Clock) {.base, async.} = + discard + proc withTimeout*(future: Future[void], clock: Clock, expiry: SecondsSince1970) {.async.} = @@ -32,3 +39,6 @@ proc toBytes*(i: SecondsSince1970): seq[byte] = proc toSecondsSince1970*(bytes: seq[byte]): SecondsSince1970 = let asUint = uint64.fromBytes(bytes) cast[int64](asUint) + +proc toSecondsSince1970*(bigint: UInt256): SecondsSince1970 = + bigint.truncate(int64) diff --git a/codex/contracts/clock.nim b/codex/contracts/clock.nim index 7b5187c8..23b678e9 100644 --- a/codex/contracts/clock.nim +++ b/codex/contracts/clock.nim @@ -17,7 +17,7 @@ type proc new*(_: type OnChainClock, provider: Provider): OnChainClock = OnChainClock(provider: provider, newBlock: newAsyncEvent()) -proc start*(clock: OnChainClock) {.async.} = +method start*(clock: OnChainClock) {.async.} = if clock.started: return clock.started = true @@ -33,7 +33,7 @@ proc start*(clock: OnChainClock) {.async.} = clock.subscription = await clock.provider.subscribe(onBlock) -proc stop*(clock: OnChainClock) {.async.} = +method stop*(clock: OnChainClock) {.async.} = if not clock.started: return clock.started = false diff --git a/codex/contracts/interactions/hostinteractions.nim b/codex/contracts/interactions/hostinteractions.nim index d563eb9c..bfb9ac9c 100644 --- a/codex/contracts/interactions/hostinteractions.nim +++ b/codex/contracts/interactions/hostinteractions.nim @@ -13,7 +13,7 @@ type proc new*( _: type HostInteractions, - clock: OnChainClock, + clock: Clock, sales: Sales ): HostInteractions = ## Create a new HostInteractions instance diff --git a/codex/contracts/interactions/interactions.nim b/codex/contracts/interactions/interactions.nim index 3ad67991..287cf297 100644 --- a/codex/contracts/interactions/interactions.nim +++ b/codex/contracts/interactions/interactions.nim @@ -7,7 +7,7 @@ export clock type ContractInteractions* = ref object of RootObj - clock*: OnChainClock + clock*: Clock method start*(self: ContractInteractions) {.async, base.} = await self.clock.start() diff --git a/codex/node.nim b/codex/node.nim index cd77c972..bcb8e58e 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -104,11 +104,28 @@ proc fetchManifest*( return manifest.success +proc updateExpiry*(node: CodexNodeRef, manifestCid: Cid, expiry: SecondsSince1970): Future[?!void] {.async.} = + without manifest =? await node.fetchManifest(manifestCid), error: + trace "Unable to fetch manifest for cid", manifestCid + return failure(error) + + try: + let ensuringFutures = Iter.fromSlice(0.. 0: diff --git a/codex/stores/blockstore.nim b/codex/stores/blockstore.nim index c58e655f..bb91258a 100644 --- a/codex/stores/blockstore.nim +++ b/codex/stores/blockstore.nim @@ -88,6 +88,18 @@ method ensureExpiry*( raiseAssert("Not implemented!") +method ensureExpiry*( + self: BlockStore, + treeCid: Cid, + index: Natural, + expiry: SecondsSince1970 +): Future[?!void] {.base.} = + ## Ensure that block's associated expiry is at least given timestamp + ## If the current expiry is lower then it is updated to the given one, otherwise it is left intact + ## + + raiseAssert("Not implemented!") + method delBlock*(self: BlockStore, cid: Cid): Future[?!void] {.base.} = ## Delete a block from the blockstore ## diff --git a/codex/stores/cachestore.nim b/codex/stores/cachestore.nim index a7acf4ec..13308c7a 100644 --- a/codex/stores/cachestore.nim +++ b/codex/stores/cachestore.nim @@ -230,6 +230,17 @@ method ensureExpiry*( discard # CacheStore does not have notion of TTL +method ensureExpiry*( + self: CacheStore, + treeCid: Cid, + index: Natural, + expiry: SecondsSince1970 +): Future[?!void] {.async.} = + ## Updates block's associated TTL in store - not applicable for CacheStore + ## + + discard # CacheStore does not have notion of TTL + method delBlock*(self: CacheStore, cid: Cid): Future[?!void] {.async.} = ## Delete a block from the blockstore ## diff --git a/codex/stores/networkstore.nim b/codex/stores/networkstore.nim index 16e72b21..eec84611 100644 --- a/codex/stores/networkstore.nim +++ b/codex/stores/networkstore.nim @@ -101,13 +101,36 @@ method ensureExpiry*( ## If the current expiry is lower then it is updated to the given one, otherwise it is left intact ## - if (await self.localStore.hasBlock(cid)).tryGet: + without blockCheck =? await self.localStore.hasBlock(cid), err: + return failure(err) + + if blockCheck: return await self.localStore.ensureExpiry(cid, expiry) else: trace "Updating expiry - block not in local store", cid return success() +method ensureExpiry*( + self: NetworkStore, + treeCid: Cid, + index: Natural, + expiry: SecondsSince1970 +): Future[?!void] {.async.} = + ## Ensure that block's associated expiry is at least given timestamp + ## If the current expiry is lower then it is updated to the given one, otherwise it is left intact + ## + + without blockCheck =? await self.localStore.hasBlock(treeCid, index), err: + return failure(err) + + if blockCheck: + return await self.localStore.ensureExpiry(treeCid, index, expiry) + else: + trace "Updating expiry - block not in local store", treeCid, index + + return success() + method listBlocks*( self: NetworkStore, blockType = BlockType.Manifest): Future[?!AsyncIter[?Cid]] = diff --git a/codex/stores/repostore.nim b/codex/stores/repostore.nim index 37c54972..db53ae6f 100644 --- a/codex/stores/repostore.nim +++ b/codex/stores/repostore.nim @@ -256,7 +256,7 @@ method ensureExpiry*( return failure(err) if expiry <= currentExpiry.toSecondsSince1970: - trace "Current expiry is larger then the specified one, no action needed" + trace "Current expiry is larger than or equal to the specified one, no action needed", current = currentExpiry.toSecondsSince1970, ensuring = expiry return success() if err =? (await self.metaDs.put(expiryKey, expiry.toBytes)).errorOption: @@ -265,6 +265,20 @@ method ensureExpiry*( return success() +method ensureExpiry*( + self: RepoStore, + treeCid: Cid, + index: Natural, + expiry: SecondsSince1970 +): Future[?!void] {.async.} = + ## Ensure that block's associated expiry is at least given timestamp + ## If the current expiry is lower then it is updated to the given one, otherwise it is left intact + ## + without cidAndProof =? await self.getCidAndProof(treeCid, index), err: + return failure(err) + + await self.ensureExpiry(cidAndProof[0], expiry) + proc persistTotalBlocksCount(self: RepoStore): Future[?!void] {.async.} = if err =? (await self.metaDs.put( CodexTotalBlocksKey, diff --git a/tests/codex/sales/states/testfilled.nim b/tests/codex/sales/states/testfilled.nim index 68a1318a..92a004a3 100644 --- a/tests/codex/sales/states/testfilled.nim +++ b/tests/codex/sales/states/testfilled.nim @@ -1,4 +1,7 @@ import pkg/asynctest +import pkg/questionable/results + +import pkg/codex/clock import pkg/codex/contracts/requests import pkg/codex/sales import pkg/codex/sales/salesagent @@ -7,6 +10,7 @@ import pkg/codex/sales/states/filled import pkg/codex/sales/states/errored import pkg/codex/sales/states/proving import pkg/codex/sales/states/finished + import ../../helpers/mockmarket import ../../examples import ../../helpers @@ -20,6 +24,7 @@ checksuite "sales state 'filled'": var slot: MockSlot var agent: SalesAgent var state: SaleFilled + var onExpiryUpdatePassedExpiry: SecondsSince1970 setup: market = MockMarket.new() @@ -27,11 +32,18 @@ checksuite "sales state 'filled'": host: Address.example, slotIndex: slotIndex, proof: @[]) - let context = SalesContext(market: market) + + market.requestEnds[request.id] = 321 + onExpiryUpdatePassedExpiry = -1 + let onExpiryUpdate = proc(rootCid: string, expiry: SecondsSince1970): Future[?!void] {.async.} = + onExpiryUpdatePassedExpiry = expiry + return success() + let context = SalesContext(market: market, onExpiryUpdate: some onExpiryUpdate) + agent = newSalesAgent(context, request.id, slotIndex, - StorageRequest.none) + some request) state = SaleFilled.new() test "switches to proving state when slot is filled by me": @@ -40,6 +52,16 @@ checksuite "sales state 'filled'": let next = await state.run(agent) check !next of SaleProving + test "calls onExpiryUpdate with request end": + slot.host = await market.getSigner() + market.filled = @[slot] + + let expectedExpiry = 123 + market.requestEnds[request.id] = expectedExpiry + let next = await state.run(agent) + check !next of SaleProving + check onExpiryUpdatePassedExpiry == expectedExpiry + test "switches to error state when slot is filled by another host": slot.host = Address.example market.filled = @[slot] diff --git a/tests/codex/sales/testsales.nim b/tests/codex/sales/testsales.nim index 9827a357..c49dc613 100644 --- a/tests/codex/sales/testsales.nim +++ b/tests/codex/sales/testsales.nim @@ -59,6 +59,10 @@ asyncchecksuite "Sales - start": slot: UInt256, onBatch: BatchProc): Future[?!void] {.async.} = return success() + + sales.onExpiryUpdate = proc(rootCid: string, expiry: SecondsSince1970): Future[?!void] {.async.} = + return success() + queue = sales.context.slotQueue sales.onProve = proc(slot: Slot): Future[seq[byte]] {.async.} = return proof @@ -157,6 +161,10 @@ asyncchecksuite "Sales": slot: UInt256, onBatch: BatchProc): Future[?!void] {.async.} = return success() + + sales.onExpiryUpdate = proc(rootCid: string, expiry: SecondsSince1970): Future[?!void] {.async.} = + return success() + queue = sales.context.slotQueue sales.onProve = proc(slot: Slot): Future[seq[byte]] {.async.} = return proof diff --git a/tests/codex/testnode.nim b/tests/codex/testnode.nim index 57e52558..19a25e20 100644 --- a/tests/codex/testnode.nim +++ b/tests/codex/testnode.nim @@ -6,11 +6,17 @@ import pkg/asynctest import pkg/chronos import pkg/chronicles import pkg/stew/byteutils +import pkg/datastore +import pkg/questionable +import pkg/stint import pkg/nitro import pkg/codexdht/discv5/protocol as discv5 import pkg/codex/stores +import pkg/codex/clock +import pkg/codex/contracts +import pkg/codex/systemclock import pkg/codex/blockexchange import pkg/codex/chunker import pkg/codex/node @@ -18,7 +24,10 @@ import pkg/codex/manifest import pkg/codex/discovery import pkg/codex/blocktype as bt +import ../examples import ./helpers +import ./helpers/mockmarket +import ./helpers/mockclock asyncchecksuite "Test Node": let @@ -30,7 +39,10 @@ asyncchecksuite "Test Node": switch: Switch wallet: WalletRef network: BlockExcNetwork - localStore: CacheStore + clock: Clock + localStore: RepoStore + localStoreRepoDs: DataStore + localStoreMetaDs: DataStore engine: BlockExcEngine store: NetworkStore node: CodexNodeRef @@ -69,7 +81,13 @@ asyncchecksuite "Test Node": switch = newStandardSwitch() wallet = WalletRef.new(EthPrivateKey.random()) network = BlockExcNetwork.new(switch) - localStore = CacheStore.new() + + clock = SystemClock.new() + localStoreMetaDs = SQLiteDatastore.new(Memory).tryGet() + localStoreRepoDs = SQLiteDatastore.new(Memory).tryGet() + localStore = RepoStore.new(localStoreRepoDs, localStoreMetaDs, clock=clock) + await localStore.start() + blockDiscovery = Discovery.new( switch.peerInfo.privateKey, announceAddrs = @[MultiAddress.init("/ip4/127.0.0.1/tcp/0") @@ -105,8 +123,7 @@ asyncchecksuite "Test Node": fetched == manifest test "Block Batching": - let - manifest = await Manifest.fetch(chunker) + let manifest = await Manifest.fetch(chunker) for batchSize in 1..12: (await node.fetchBatched( @@ -116,6 +133,21 @@ asyncchecksuite "Test Node": check blocks.len > 0 and blocks.len <= batchSize )).tryGet() + test "Block Batching with expiry": + let + manifest = await Manifest.fetch(chunker) + # The blocks have set default TTL, so in order to update it we have to have larger TTL + expectedExpiry: SecondsSince1970 = clock.now + DefaultBlockTtl.seconds + 123 + + (await node.fetchBatched(manifest, expiry=some expectedExpiry)).tryGet() + + for index in 0..