feat: update expiry when data downloaded and slot filled (#619)

Co-authored-by: Eric <5089238+emizzle@users.noreply.github.com>
Co-authored-by: markspanbroek <mark@spanbroek.net>
This commit is contained in:
Adam Uhlíř 2023-11-22 11:09:12 +01:00 committed by GitHub
parent 4d546f9ace
commit 8681a40ee7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 306 additions and 17 deletions

View File

@ -1,6 +1,7 @@
import pkg/chronos
import pkg/stew/endians2
import pkg/upraises
import pkg/stint
type
Clock* = ref object of RootObj
@ -13,6 +14,12 @@ method now*(clock: Clock): SecondsSince1970 {.base, upraises: [].} =
method waitUntil*(clock: Clock, time: SecondsSince1970) {.base, async.} =
raiseAssert "not implemented"
method start*(clock: Clock) {.base, async.} =
discard
method stop*(clock: Clock) {.base, async.} =
discard
proc withTimeout*(future: Future[void],
clock: Clock,
expiry: SecondsSince1970) {.async.} =
@ -32,3 +39,6 @@ proc toBytes*(i: SecondsSince1970): seq[byte] =
proc toSecondsSince1970*(bytes: seq[byte]): SecondsSince1970 =
let asUint = uint64.fromBytes(bytes)
cast[int64](asUint)
proc toSecondsSince1970*(bigint: UInt256): SecondsSince1970 =
bigint.truncate(int64)

View File

@ -17,7 +17,7 @@ type
proc new*(_: type OnChainClock, provider: Provider): OnChainClock =
OnChainClock(provider: provider, newBlock: newAsyncEvent())
proc start*(clock: OnChainClock) {.async.} =
method start*(clock: OnChainClock) {.async.} =
if clock.started:
return
clock.started = true
@ -33,7 +33,7 @@ proc start*(clock: OnChainClock) {.async.} =
clock.subscription = await clock.provider.subscribe(onBlock)
proc stop*(clock: OnChainClock) {.async.} =
method stop*(clock: OnChainClock) {.async.} =
if not clock.started:
return
clock.started = false

View File

@ -13,7 +13,7 @@ type
proc new*(
_: type HostInteractions,
clock: OnChainClock,
clock: Clock,
sales: Sales
): HostInteractions =
## Create a new HostInteractions instance

View File

@ -7,7 +7,7 @@ export clock
type
ContractInteractions* = ref object of RootObj
clock*: OnChainClock
clock*: Clock
method start*(self: ContractInteractions) {.async, base.} =
await self.clock.start()

View File

@ -104,11 +104,28 @@ proc fetchManifest*(
return manifest.success
proc updateExpiry*(node: CodexNodeRef, manifestCid: Cid, expiry: SecondsSince1970): Future[?!void] {.async.} =
without manifest =? await node.fetchManifest(manifestCid), error:
trace "Unable to fetch manifest for cid", manifestCid
return failure(error)
try:
let ensuringFutures = Iter.fromSlice(0..<manifest.blocksCount)
.mapIt(node.blockStore.ensureExpiry( manifest.treeCid, it, expiry ))
await allFuturesThrowing(ensuringFutures)
except CancelledError as exc:
raise exc
except CatchableError as exc:
return failure(exc.msg)
return success()
proc fetchBatched*(
node: CodexNodeRef,
manifest: Manifest,
batchSize = FetchBatch,
onBatch: BatchProc = nil): Future[?!void] {.async, gcsafe.} =
onBatch: BatchProc = nil,
expiry = SecondsSince1970.none): Future[?!void] {.async, gcsafe.} =
## Fetch manifest in batches of `batchSize`
##
@ -127,6 +144,10 @@ proc fetchBatched*(
try:
await allFuturesThrowing(allFinished(blocks))
if expiryValue =? expiry:
await allFuturesThrowing(blocks.mapIt(node.blockStore.ensureExpiry(it.read.get.cid, expiryValue)))
if not onBatch.isNil:
await onBatch(blocks.mapIt( it.read.get ))
except CancelledError as exc:
@ -419,13 +440,21 @@ proc start*(node: CodexNodeRef) {.async.} =
# since fetching of blocks will have to be selective according
# to a combination of parameters, such as node slot position
# and dataset geometry
if fetchErr =? (await node.fetchBatched(manifest, onBatch = onBatch)).errorOption:
if fetchErr =? (await node.fetchBatched(manifest, onBatch = onBatch, expiry = some request.expiry.toSecondsSince1970)).errorOption:
let error = newException(CodexError, "Unable to retrieve blocks")
error.parent = fetchErr
return failure(error)
return success()
hostContracts.sales.onExpiryUpdate = proc(rootCid: string, expiry: SecondsSince1970): Future[?!void] {.async.} =
without cid =? Cid.init(rootCid):
trace "Unable to parse Cid", cid
let error = newException(CodexError, "Unable to parse Cid")
return failure(error)
return await node.updateExpiry(cid, expiry)
hostContracts.sales.onClear = proc(request: StorageRequest,
slotIndex: UInt256) =
# TODO: remove data from local storage

View File

@ -63,6 +63,9 @@ proc `onSale=`*(sales: Sales, callback: OnSale) =
proc `onProve=`*(sales: Sales, callback: OnProve) =
sales.context.onProve = some callback
proc `onExpiryUpdate=`*(sales: Sales, callback: OnExpiryUpdate) =
sales.context.onExpiryUpdate = some callback
proc onStore*(sales: Sales): ?OnStore = sales.context.onStore
proc onClear*(sales: Sales): ?OnClear = sales.context.onClear
@ -71,6 +74,8 @@ proc onSale*(sales: Sales): ?OnSale = sales.context.onSale
proc onProve*(sales: Sales): ?OnProve = sales.context.onProve
proc onExpiryUpdate*(sales: Sales): ?OnExpiryUpdate = sales.context.onExpiryUpdate
func new*(_: type Sales,
market: Market,
clock: Clock,

View File

@ -17,6 +17,7 @@ type
onClear*: ?OnClear
onSale*: ?OnSale
onProve*: ?OnProve
onExpiryUpdate*: ?OnExpiryUpdate
reservations*: Reservations
slotQueue*: SlotQueue
simulateProofFailures*: int
@ -25,6 +26,7 @@ type
slot: UInt256,
onBatch: BatchProc): Future[?!void] {.gcsafe, upraises: [].}
OnProve* = proc(slot: Slot): Future[seq[byte]] {.gcsafe, upraises: [].}
OnExpiryUpdate* = proc(rootCid: string, expiry: SecondsSince1970): Future[?!void] {.gcsafe, upraises: [].}
OnClear* = proc(request: StorageRequest,
slotIndex: UInt256) {.gcsafe, upraises: [].}
OnSale* = proc(request: StorageRequest,

View File

@ -1,4 +1,5 @@
import pkg/questionable
import pkg/questionable/results
import pkg/chronicles
import ../../conf
import ../statemachine
@ -37,9 +38,18 @@ method run*(state: SaleFilled, machine: Machine): Future[?State] {.async.} =
if host == me.some:
info "Slot succesfully filled", requestId = $data.requestId, slotIndex = $data.slotIndex
if request =? data.request:
if onFilled =? agent.onFilled:
onFilled(request, data.slotIndex)
without request =? data.request:
raiseAssert "no sale request"
if onFilled =? agent.onFilled:
onFilled(request, data.slotIndex)
without onExpiryUpdate =? context.onExpiryUpdate:
raiseAssert "onExpiryUpdate callback not set"
let requestEnd = await market.getRequestEnd(data.requestId)
if err =? (await onExpiryUpdate(request.content.cid, requestEnd)).errorOption:
return some State(SaleErrored(error: err))
when codex_enable_proof_failures:
if context.simulateProofFailures > 0:

View File

@ -88,6 +88,18 @@ method ensureExpiry*(
raiseAssert("Not implemented!")
method ensureExpiry*(
self: BlockStore,
treeCid: Cid,
index: Natural,
expiry: SecondsSince1970
): Future[?!void] {.base.} =
## Ensure that block's associated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##
raiseAssert("Not implemented!")
method delBlock*(self: BlockStore, cid: Cid): Future[?!void] {.base.} =
## Delete a block from the blockstore
##

View File

@ -230,6 +230,17 @@ method ensureExpiry*(
discard # CacheStore does not have notion of TTL
method ensureExpiry*(
self: CacheStore,
treeCid: Cid,
index: Natural,
expiry: SecondsSince1970
): Future[?!void] {.async.} =
## Updates block's associated TTL in store - not applicable for CacheStore
##
discard # CacheStore does not have notion of TTL
method delBlock*(self: CacheStore, cid: Cid): Future[?!void] {.async.} =
## Delete a block from the blockstore
##

View File

@ -101,13 +101,36 @@ method ensureExpiry*(
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##
if (await self.localStore.hasBlock(cid)).tryGet:
without blockCheck =? await self.localStore.hasBlock(cid), err:
return failure(err)
if blockCheck:
return await self.localStore.ensureExpiry(cid, expiry)
else:
trace "Updating expiry - block not in local store", cid
return success()
method ensureExpiry*(
self: NetworkStore,
treeCid: Cid,
index: Natural,
expiry: SecondsSince1970
): Future[?!void] {.async.} =
## Ensure that block's associated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##
without blockCheck =? await self.localStore.hasBlock(treeCid, index), err:
return failure(err)
if blockCheck:
return await self.localStore.ensureExpiry(treeCid, index, expiry)
else:
trace "Updating expiry - block not in local store", treeCid, index
return success()
method listBlocks*(
self: NetworkStore,
blockType = BlockType.Manifest): Future[?!AsyncIter[?Cid]] =

View File

@ -256,7 +256,7 @@ method ensureExpiry*(
return failure(err)
if expiry <= currentExpiry.toSecondsSince1970:
trace "Current expiry is larger then the specified one, no action needed"
trace "Current expiry is larger than or equal to the specified one, no action needed", current = currentExpiry.toSecondsSince1970, ensuring = expiry
return success()
if err =? (await self.metaDs.put(expiryKey, expiry.toBytes)).errorOption:
@ -265,6 +265,20 @@ method ensureExpiry*(
return success()
method ensureExpiry*(
self: RepoStore,
treeCid: Cid,
index: Natural,
expiry: SecondsSince1970
): Future[?!void] {.async.} =
## Ensure that block's associated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##
without cidAndProof =? await self.getCidAndProof(treeCid, index), err:
return failure(err)
await self.ensureExpiry(cidAndProof[0], expiry)
proc persistTotalBlocksCount(self: RepoStore): Future[?!void] {.async.} =
if err =? (await self.metaDs.put(
CodexTotalBlocksKey,

View File

@ -1,4 +1,7 @@
import pkg/asynctest
import pkg/questionable/results
import pkg/codex/clock
import pkg/codex/contracts/requests
import pkg/codex/sales
import pkg/codex/sales/salesagent
@ -7,6 +10,7 @@ import pkg/codex/sales/states/filled
import pkg/codex/sales/states/errored
import pkg/codex/sales/states/proving
import pkg/codex/sales/states/finished
import ../../helpers/mockmarket
import ../../examples
import ../../helpers
@ -20,6 +24,7 @@ checksuite "sales state 'filled'":
var slot: MockSlot
var agent: SalesAgent
var state: SaleFilled
var onExpiryUpdatePassedExpiry: SecondsSince1970
setup:
market = MockMarket.new()
@ -27,11 +32,18 @@ checksuite "sales state 'filled'":
host: Address.example,
slotIndex: slotIndex,
proof: @[])
let context = SalesContext(market: market)
market.requestEnds[request.id] = 321
onExpiryUpdatePassedExpiry = -1
let onExpiryUpdate = proc(rootCid: string, expiry: SecondsSince1970): Future[?!void] {.async.} =
onExpiryUpdatePassedExpiry = expiry
return success()
let context = SalesContext(market: market, onExpiryUpdate: some onExpiryUpdate)
agent = newSalesAgent(context,
request.id,
slotIndex,
StorageRequest.none)
some request)
state = SaleFilled.new()
test "switches to proving state when slot is filled by me":
@ -40,6 +52,16 @@ checksuite "sales state 'filled'":
let next = await state.run(agent)
check !next of SaleProving
test "calls onExpiryUpdate with request end":
slot.host = await market.getSigner()
market.filled = @[slot]
let expectedExpiry = 123
market.requestEnds[request.id] = expectedExpiry
let next = await state.run(agent)
check !next of SaleProving
check onExpiryUpdatePassedExpiry == expectedExpiry
test "switches to error state when slot is filled by another host":
slot.host = Address.example
market.filled = @[slot]

View File

@ -59,6 +59,10 @@ asyncchecksuite "Sales - start":
slot: UInt256,
onBatch: BatchProc): Future[?!void] {.async.} =
return success()
sales.onExpiryUpdate = proc(rootCid: string, expiry: SecondsSince1970): Future[?!void] {.async.} =
return success()
queue = sales.context.slotQueue
sales.onProve = proc(slot: Slot): Future[seq[byte]] {.async.} =
return proof
@ -157,6 +161,10 @@ asyncchecksuite "Sales":
slot: UInt256,
onBatch: BatchProc): Future[?!void] {.async.} =
return success()
sales.onExpiryUpdate = proc(rootCid: string, expiry: SecondsSince1970): Future[?!void] {.async.} =
return success()
queue = sales.context.slotQueue
sales.onProve = proc(slot: Slot): Future[seq[byte]] {.async.} =
return proof

View File

@ -6,11 +6,17 @@ import pkg/asynctest
import pkg/chronos
import pkg/chronicles
import pkg/stew/byteutils
import pkg/datastore
import pkg/questionable
import pkg/stint
import pkg/nitro
import pkg/codexdht/discv5/protocol as discv5
import pkg/codex/stores
import pkg/codex/clock
import pkg/codex/contracts
import pkg/codex/systemclock
import pkg/codex/blockexchange
import pkg/codex/chunker
import pkg/codex/node
@ -18,7 +24,10 @@ import pkg/codex/manifest
import pkg/codex/discovery
import pkg/codex/blocktype as bt
import ../examples
import ./helpers
import ./helpers/mockmarket
import ./helpers/mockclock
asyncchecksuite "Test Node":
let
@ -30,7 +39,10 @@ asyncchecksuite "Test Node":
switch: Switch
wallet: WalletRef
network: BlockExcNetwork
localStore: CacheStore
clock: Clock
localStore: RepoStore
localStoreRepoDs: DataStore
localStoreMetaDs: DataStore
engine: BlockExcEngine
store: NetworkStore
node: CodexNodeRef
@ -69,7 +81,13 @@ asyncchecksuite "Test Node":
switch = newStandardSwitch()
wallet = WalletRef.new(EthPrivateKey.random())
network = BlockExcNetwork.new(switch)
localStore = CacheStore.new()
clock = SystemClock.new()
localStoreMetaDs = SQLiteDatastore.new(Memory).tryGet()
localStoreRepoDs = SQLiteDatastore.new(Memory).tryGet()
localStore = RepoStore.new(localStoreRepoDs, localStoreMetaDs, clock=clock)
await localStore.start()
blockDiscovery = Discovery.new(
switch.peerInfo.privateKey,
announceAddrs = @[MultiAddress.init("/ip4/127.0.0.1/tcp/0")
@ -105,8 +123,7 @@ asyncchecksuite "Test Node":
fetched == manifest
test "Block Batching":
let
manifest = await Manifest.fetch(chunker)
let manifest = await Manifest.fetch(chunker)
for batchSize in 1..12:
(await node.fetchBatched(
@ -116,6 +133,21 @@ asyncchecksuite "Test Node":
check blocks.len > 0 and blocks.len <= batchSize
)).tryGet()
test "Block Batching with expiry":
let
manifest = await Manifest.fetch(chunker)
# The blocks have set default TTL, so in order to update it we have to have larger TTL
expectedExpiry: SecondsSince1970 = clock.now + DefaultBlockTtl.seconds + 123
(await node.fetchBatched(manifest, expiry=some expectedExpiry)).tryGet()
for index in 0..<manifest.blocksCount:
let blk = (await localStore.getBlock(manifest.treeCid, index)).tryGet
let expiryKey = (createBlockExpirationMetadataKey(blk.cid)).tryGet
let expiry = await localStoreMetaDs.get(expiryKey)
check (expiry.tryGet).toSecondsSince1970 == expectedExpiry
test "Store and retrieve Data Stream":
let
stream = BufferStream.new()
@ -163,3 +195,114 @@ asyncchecksuite "Test Node":
var data = newSeq[byte](testString.len)
await stream.readExactly(addr data[0], data.len)
check string.fromBytes(data) == testString
asyncchecksuite "Test Node - host contracts":
let
(path, _, _) = instantiationInfo(-2, fullPaths = true) # get this file's name
var
file: File
chunker: Chunker
switch: Switch
wallet: WalletRef
network: BlockExcNetwork
clock: MockClock
localStore: RepoStore
localStoreRepoDs: DataStore
localStoreMetaDs: DataStore
engine: BlockExcEngine
store: NetworkStore
sales: Sales
node: CodexNodeRef
blockDiscovery: Discovery
peerStore: PeerCtxStore
pendingBlocks: PendingBlocksManager
discovery: DiscoveryEngine
manifest: Manifest
manifestCid: string
proc fetch(T: type Manifest, chunker: Chunker): Future[Manifest] {.async.} =
# Collect blocks from Chunker into Manifest
await storeDataGetManifest(localStore, chunker)
setup:
file = open(path.splitFile().dir /../ "fixtures" / "test.jpg")
chunker = FileChunker.new(file = file, chunkSize = DefaultBlockSize)
switch = newStandardSwitch()
wallet = WalletRef.new(EthPrivateKey.random())
network = BlockExcNetwork.new(switch)
clock = MockClock.new()
localStoreMetaDs = SQLiteDatastore.new(Memory).tryGet()
localStoreRepoDs = SQLiteDatastore.new(Memory).tryGet()
localStore = RepoStore.new(localStoreRepoDs, localStoreMetaDs, clock=clock)
await localStore.start()
blockDiscovery = Discovery.new(
switch.peerInfo.privateKey,
announceAddrs = @[MultiAddress.init("/ip4/127.0.0.1/tcp/0")
.expect("Should return multiaddress")])
peerStore = PeerCtxStore.new()
pendingBlocks = PendingBlocksManager.new()
discovery = DiscoveryEngine.new(localStore, peerStore, network, blockDiscovery, pendingBlocks)
engine = BlockExcEngine.new(localStore, wallet, network, discovery, peerStore, pendingBlocks)
store = NetworkStore.new(engine, localStore)
node = CodexNodeRef.new(switch, store, engine, nil, blockDiscovery) # TODO: pass `Erasure`
# Setup Host Contracts and dependencies
let market = MockMarket.new()
sales = Sales.new(market, clock, localStore, 0)
let hostContracts = some HostInteractions.new(clock, sales)
node.contracts = (ClientInteractions.none, hostContracts, ValidatorInteractions.none)
await node.start()
# Populate manifest in local store
manifest = await storeDataGetManifest(localStore, chunker)
let manifestBlock = bt.Block.new(
manifest.encode().tryGet(),
codec = DagPBCodec
).tryGet()
manifestCid = $(manifestBlock.cid)
(await localStore.putBlock(manifestBlock)).tryGet()
teardown:
close(file)
await node.stop()
test "onExpiryUpdate callback is set":
check sales.onExpiryUpdate.isSome
test "onExpiryUpdate callback":
let
# The blocks have set default TTL, so in order to update it we have to have larger TTL
expectedExpiry: SecondsSince1970 = clock.now + DefaultBlockTtl.seconds + 11123
expiryUpdateCallback = !sales.onExpiryUpdate
(await expiryUpdateCallback(manifestCid, expectedExpiry)).tryGet()
for index in 0..<manifest.blocksCount:
let blk = (await localStore.getBlock(manifest.treeCid, index)).tryGet
let expiryKey = (createBlockExpirationMetadataKey(blk.cid)).tryGet
let expiry = await localStoreMetaDs.get(expiryKey)
check (expiry.tryGet).toSecondsSince1970 == expectedExpiry
test "onStore callback is set":
check sales.onStore.isSome
test "onStore callback":
let onStore = !sales.onStore
var request = StorageRequest.example
request.content.cid = manifestCid
var fetchedBytes: uint = 0
let onBatch = proc(blocks: seq[bt.Block]) {.async.} =
for blk in blocks:
fetchedBytes += blk.data.len.uint
(await onStore(request, 0.u256, onBatch)).tryGet()
check fetchedBytes == 2291520