Merge branch 'master' into enable-libp2p-connectivity

This commit is contained in:
gmega 2024-05-28 10:35:02 -03:00
commit bd712a1c97
No known key found for this signature in database
GPG Key ID: FFD8DAF00660270F
32 changed files with 260 additions and 279 deletions

View File

@ -11,6 +11,7 @@ import std/sequtils
import pkg/chronos
import pkg/libp2p/cid
import pkg/libp2p/multicodec
import pkg/metrics
import pkg/questionable
import pkg/questionable/results
@ -25,6 +26,7 @@ import ../../utils
import ../../discovery
import ../../stores/blockstore
import ../../logutils
import ../../manifest
logScope:
topics = "codex discoveryengine"
@ -68,29 +70,45 @@ proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} =
try:
await b.discoveryQueue.put(cid)
except CatchableError as exc:
trace "Exception in discovery loop", exc = exc.msg
warn "Exception in discovery loop", exc = exc.msg
logScope:
sleep = b.discoveryLoopSleep
wanted = b.pendingBlocks.len
trace "About to sleep discovery loop"
await sleepAsync(b.discoveryLoopSleep)
proc advertiseBlock(b: DiscoveryEngine, cid: Cid) {.async.} =
without isM =? cid.isManifest, err:
warn "Unable to determine if cid is manifest"
return
if isM:
without blk =? await b.localStore.getBlock(cid), err:
error "Error retrieving manifest block", cid, err = err.msg
return
without manifest =? Manifest.decode(blk), err:
error "Unable to decode as manifest", err = err.msg
return
# announce manifest cid and tree cid
await b.advertiseQueue.put(cid)
await b.advertiseQueue.put(manifest.treeCid)
proc advertiseQueueLoop(b: DiscoveryEngine) {.async.} =
while b.discEngineRunning:
if cids =? await b.localStore.listBlocks(blockType = b.advertiseType):
trace "Begin iterating blocks..."
for c in cids:
if cid =? await c:
await b.advertiseQueue.put(cid)
await sleepAsync(50.millis)
b.advertiseBlock(cid)
await sleepAsync(100.millis)
trace "Iterating blocks finished."
trace "About to sleep advertise loop", sleep = b.advertiseLoopSleep
await sleepAsync(b.advertiseLoopSleep)
trace "Exiting advertise task loop"
info "Exiting advertise task loop"
proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} =
## Run advertise tasks
@ -102,7 +120,6 @@ proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} =
cid = await b.advertiseQueue.get()
if cid in b.inFlightAdvReqs:
trace "Advertise request already in progress", cid
continue
try:
@ -111,17 +128,15 @@ proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} =
b.inFlightAdvReqs[cid] = request
codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64)
trace "Advertising block", cid, inflight = b.inFlightAdvReqs.len
await request
finally:
b.inFlightAdvReqs.del(cid)
codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64)
trace "Advertised block", cid, inflight = b.inFlightAdvReqs.len
except CatchableError as exc:
trace "Exception in advertise task runner", exc = exc.msg
warn "Exception in advertise task runner", exc = exc.msg
trace "Exiting advertise task runner"
info "Exiting advertise task runner"
proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =
## Run discovery tasks
@ -166,9 +181,9 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =
b.inFlightDiscReqs.del(cid)
codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64)
except CatchableError as exc:
trace "Exception in discovery task runner", exc = exc.msg
warn "Exception in discovery task runner", exc = exc.msg
trace "Exiting discovery task runner"
info "Exiting discovery task runner"
proc queueFindBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} =
for cid in cids:
@ -183,10 +198,9 @@ proc queueProvideBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} =
for cid in cids:
if cid notin b.advertiseQueue:
try:
trace "Queueing provide block", cid, queue = b.discoveryQueue.len
b.advertiseQueue.putNoWait(cid)
except CatchableError as exc:
trace "Exception queueing discovery request", exc = exc.msg
warn "Exception queueing discovery request", exc = exc.msg
proc start*(b: DiscoveryEngine) {.async.} =
## Start the discengine task
@ -254,7 +268,7 @@ proc new*(
discoveryLoopSleep = DefaultDiscoveryLoopSleep,
advertiseLoopSleep = DefaultAdvertiseLoopSleep,
minPeersPerBlock = DefaultMinPeersPerBlock,
advertiseType = BlockType.Both
advertiseType = BlockType.Manifest
): DiscoveryEngine =
## Create a discovery engine instance for advertising services
##

View File

@ -262,8 +262,6 @@ proc blockPresenceHandler*(
not b.peers.anyIt( cid in it.peerHaveCids ))
proc scheduleTasks(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} =
trace "Schedule a task for new blocks", items = blocksDelivery.len
let
cids = blocksDelivery.mapIt( it.blk.cid )
@ -277,7 +275,7 @@ proc scheduleTasks(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.asyn
if b.scheduleTask(p):
trace "Task scheduled for peer", peer = p.id
else:
trace "Unable to schedule task for peer", peer = p.id
warn "Unable to schedule task for peer", peer = p.id
break # do next peer
@ -293,7 +291,7 @@ proc cancelBlocks(b: BlockExcEngine, addrs: seq[BlockAddress]) {.async.} =
.filterIt(it.failed)
if failed.len > 0:
trace "Failed to send block request cancellations to peers", peers = failed.len
warn "Failed to send block request cancellations to peers", peers = failed.len
proc getAnnouceCids(blocksDelivery: seq[BlockDelivery]): seq[Cid] =
var cids = initHashSet[Cid]()
@ -309,8 +307,6 @@ proc getAnnouceCids(blocksDelivery: seq[BlockDelivery]): seq[Cid] =
return cids.toSeq
proc resolveBlocks*(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} =
trace "Resolving blocks", blocks = blocksDelivery.len
b.pendingBlocks.resolve(blocksDelivery)
await b.scheduleTasks(blocksDelivery)
let announceCids = getAnnouceCids(blocksDelivery)
@ -618,7 +614,7 @@ proc blockexcTaskRunner(b: BlockExcEngine) {.async.} =
trace "Got new task from queue", peerId = peerCtx.id
await b.taskHandler(peerCtx)
trace "Exiting blockexc task runner"
info "Exiting blockexc task runner"
proc new*(
T: type BlockExcEngine,

View File

@ -102,8 +102,6 @@ proc resolve*(
trace "Block retrieval time", retrievalDurationUs, address = bd.address
else:
trace "Block handle already finished", address = bd.address
do:
warn "Attempting to resolve block that's not currently a pending block", address = bd.address
proc setInFlight*(
p: PendingBlocksManager,

View File

@ -147,9 +147,6 @@ proc sendWantCancellations*(
addresses: seq[BlockAddress]): Future[void] {.async.} =
## Informs a remote peer that we're no longer interested in a set of blocks
##
trace "Sending block request cancellation to peer", addrs = addresses.len, peer = id
await b.sendWantList(id = id, addresses = addresses, cancel = true)
proc handleBlocksDelivery(

View File

@ -46,7 +46,7 @@ proc approveFunds(market: OnChainMarket, amount: UInt256) {.async.} =
convertEthersError:
let tokenAddress = await market.contract.token()
let token = Erc20Token.new(tokenAddress, market.signer)
discard await token.increaseAllowance(market.contract.address(), amount).confirm(1)
discard await token.increaseAllowance(market.contract.address(), amount).confirm(0)
method getZkeyHash*(market: OnChainMarket): Future[?string] {.async.} =
let config = await market.contract.config()
@ -92,7 +92,7 @@ method requestStorage(market: OnChainMarket, request: StorageRequest){.async.} =
convertEthersError:
debug "Requesting storage"
await market.approveFunds(request.price())
await market.contract.requestStorage(request)
discard await market.contract.requestStorage(request).confirm(0)
method getRequest(market: OnChainMarket,
id: RequestId): Future[?StorageRequest] {.async.} =
@ -126,6 +126,11 @@ method getRequestEnd*(market: OnChainMarket,
convertEthersError:
return await market.contract.requestEnd(id)
method requestExpiresAt*(market: OnChainMarket,
id: RequestId): Future[SecondsSince1970] {.async.} =
convertEthersError:
return await market.contract.requestExpiry(id)
method getHost(market: OnChainMarket,
requestId: RequestId,
slotIndex: UInt256): Future[?Address] {.async.} =
@ -154,16 +159,16 @@ method fillSlot(market: OnChainMarket,
collateral: UInt256) {.async.} =
convertEthersError:
await market.approveFunds(collateral)
await market.contract.fillSlot(requestId, slotIndex, proof)
discard await market.contract.fillSlot(requestId, slotIndex, proof).confirm(0)
method freeSlot*(market: OnChainMarket, slotId: SlotId) {.async.} =
convertEthersError:
await market.contract.freeSlot(slotId)
discard await market.contract.freeSlot(slotId).confirm(0)
method withdrawFunds(market: OnChainMarket,
requestId: RequestId) {.async.} =
convertEthersError:
await market.contract.withdrawFunds(requestId)
discard await market.contract.withdrawFunds(requestId).confirm(0)
method isProofRequired*(market: OnChainMarket,
id: SlotId): Future[bool] {.async.} =
@ -196,13 +201,13 @@ method submitProof*(market: OnChainMarket,
id: SlotId,
proof: Groth16Proof) {.async.} =
convertEthersError:
await market.contract.submitProof(id, proof)
discard await market.contract.submitProof(id, proof).confirm(0)
method markProofAsMissing*(market: OnChainMarket,
id: SlotId,
period: Period) {.async.} =
convertEthersError:
await market.contract.markProofAsMissing(id, period)
discard await market.contract.markProofAsMissing(id, period).confirm(0)
method canProofBeMarkedAsMissing*(
market: OnChainMarket,
@ -213,7 +218,7 @@ method canProofBeMarkedAsMissing*(
let contractWithoutSigner = market.contract.connect(provider)
let overrides = CallOverrides(blockTag: some BlockTag.pending)
try:
await contractWithoutSigner.markProofAsMissing(id, period, overrides)
discard await contractWithoutSigner.markProofAsMissing(id, period, overrides)
return true
except EthersError as e:
trace "Proof cannot be marked as missing", msg = e.msg

View File

@ -42,10 +42,10 @@ proc slashMisses*(marketplace: Marketplace): UInt256 {.contract, view.}
proc slashPercentage*(marketplace: Marketplace): UInt256 {.contract, view.}
proc minCollateralThreshold*(marketplace: Marketplace): UInt256 {.contract, view.}
proc requestStorage*(marketplace: Marketplace, request: StorageRequest) {.contract.}
proc fillSlot*(marketplace: Marketplace, requestId: RequestId, slotIndex: UInt256, proof: Groth16Proof) {.contract.}
proc withdrawFunds*(marketplace: Marketplace, requestId: RequestId) {.contract.}
proc freeSlot*(marketplace: Marketplace, id: SlotId) {.contract.}
proc requestStorage*(marketplace: Marketplace, request: StorageRequest): ?TransactionResponse {.contract.}
proc fillSlot*(marketplace: Marketplace, requestId: RequestId, slotIndex: UInt256, proof: Groth16Proof): ?TransactionResponse {.contract.}
proc withdrawFunds*(marketplace: Marketplace, requestId: RequestId): ?TransactionResponse {.contract.}
proc freeSlot*(marketplace: Marketplace, id: SlotId): ?TransactionResponse {.contract.}
proc getRequest*(marketplace: Marketplace, id: RequestId): StorageRequest {.contract, view.}
proc getHost*(marketplace: Marketplace, id: SlotId): Address {.contract, view.}
proc getActiveSlot*(marketplace: Marketplace, id: SlotId): Slot {.contract, view.}
@ -55,6 +55,7 @@ proc mySlots*(marketplace: Marketplace): seq[SlotId] {.contract, view.}
proc requestState*(marketplace: Marketplace, requestId: RequestId): RequestState {.contract, view.}
proc slotState*(marketplace: Marketplace, slotId: SlotId): SlotState {.contract, view.}
proc requestEnd*(marketplace: Marketplace, requestId: RequestId): SecondsSince1970 {.contract, view.}
proc requestExpiry*(marketplace: Marketplace, requestId: RequestId): SecondsSince1970 {.contract, view.}
proc proofTimeout*(marketplace: Marketplace): UInt256 {.contract, view.}
@ -65,5 +66,5 @@ proc willProofBeRequired*(marketplace: Marketplace, id: SlotId): bool {.contract
proc getChallenge*(marketplace: Marketplace, id: SlotId): array[32, byte] {.contract, view.}
proc getPointer*(marketplace: Marketplace, id: SlotId): uint8 {.contract, view.}
proc submitProof*(marketplace: Marketplace, id: SlotId, proof: Groth16Proof) {.contract.}
proc markProofAsMissing*(marketplace: Marketplace, id: SlotId, period: UInt256) {.contract.}
proc submitProof*(marketplace: Marketplace, id: SlotId, proof: Groth16Proof): ?TransactionResponse {.contract.}
proc markProofAsMissing*(marketplace: Marketplace, id: SlotId, period: UInt256): ?TransactionResponse {.contract.}

View File

@ -84,16 +84,13 @@ method find*(
method provide*(d: Discovery, cid: Cid) {.async, base.} =
## Provide a bock Cid
##
trace "Providing block", cid
let
nodes = await d.protocol.addProvider(
cid.toNodeId(), d.providerRecord.get)
if nodes.len <= 0:
trace "Couldn't provide to any nodes!"
warn "Couldn't provide to any nodes!"
trace "Provided to nodes", nodes = nodes.len
method find*(
d: Discovery,

View File

@ -66,7 +66,7 @@ proc encode*(manifest: Manifest): ?!seq[byte] =
var header = initProtoBuffer()
header.write(1, manifest.treeCid.data.buffer)
header.write(2, manifest.blockSize.uint32)
header.write(3, manifest.datasetSize.uint32)
header.write(3, manifest.datasetSize.uint64)
header.write(4, manifest.codec.uint32)
header.write(5, manifest.hcodec.uint32)
header.write(6, manifest.version.uint32)
@ -75,7 +75,7 @@ proc encode*(manifest: Manifest): ?!seq[byte] =
erasureInfo.write(1, manifest.ecK.uint32)
erasureInfo.write(2, manifest.ecM.uint32)
erasureInfo.write(3, manifest.originalTreeCid.data.buffer)
erasureInfo.write(4, manifest.originalDatasetSize.uint32)
erasureInfo.write(4, manifest.originalDatasetSize.uint64)
erasureInfo.write(5, manifest.protectedStrategy.uint32)
if manifest.verifiable:
@ -106,12 +106,12 @@ proc decode*(_: type Manifest, data: openArray[byte]): ?!Manifest =
pbVerificationInfo: ProtoBuffer
treeCidBuf: seq[byte]
originalTreeCid: seq[byte]
datasetSize: uint32
datasetSize: uint64
codec: uint32
hcodec: uint32
version: uint32
blockSize: uint32
originalDatasetSize: uint32
originalDatasetSize: uint64
ecK, ecM: uint32
protectedStrategy: uint32
verifyRoot: seq[byte]

View File

@ -84,6 +84,10 @@ method getRequestEnd*(market: Market,
id: RequestId): Future[SecondsSince1970] {.base, async.} =
raiseAssert("not implemented")
method requestExpiresAt*(market: Market,
id: RequestId): Future[SecondsSince1970] {.base, async.} =
raiseAssert("not implemented")
method getHost*(market: Market,
requestId: RequestId,
slotIndex: UInt256): Future[?Address] {.base, async.} =

View File

@ -294,7 +294,7 @@ proc store*(
## Save stream contents as dataset with given blockSize
## to nodes's BlockStore, and return Cid of its manifest
##
trace "Storing data"
info "Storing data"
let
hcodec = Sha256HashCodec
@ -308,8 +308,6 @@ proc store*(
let chunk = await chunker.getBytes();
chunk.len > 0):
trace "Got data from stream", len = chunk.len
without mhash =? MultiHash.digest($hcodec, chunk).mapFailure, err:
return failure(err)
@ -322,7 +320,7 @@ proc store*(
cids.add(cid)
if err =? (await self.networkStore.putBlock(blk)).errorOption:
trace "Unable to store block", cid = blk.cid, err = err.msg
error "Unable to store block", cid = blk.cid, err = err.msg
return failure(&"Unable to store block {blk.cid}")
except CancelledError as exc:
raise exc
@ -353,7 +351,7 @@ proc store*(
codec = dataCodec)
without manifestBlk =? await self.storeManifest(manifest), err:
trace "Unable to store manifest"
error "Unable to store manifest"
return failure(err)
info "Stored data", manifestCid = manifestBlk.cid,
@ -361,7 +359,6 @@ proc store*(
blocks = manifest.blocksCount,
datasetSize = manifest.datasetSize
# Announce manifest
await self.discovery.provide(manifestBlk.cid)
await self.discovery.provide(treeCid)

View File

@ -18,18 +18,15 @@ type
clock: Clock
purchases: Table[PurchaseId, Purchase]
proofProbability*: UInt256
requestExpiryInterval*: UInt256
PurchaseTimeout* = Timeout
const DefaultProofProbability = 100.u256
const DefaultRequestExpiryInterval = (10 * 60).u256
proc new*(_: type Purchasing, market: Market, clock: Clock): Purchasing =
Purchasing(
market: market,
clock: clock,
proofProbability: DefaultProofProbability,
requestExpiryInterval: DefaultRequestExpiryInterval,
)
proc load*(purchasing: Purchasing) {.async.} =
@ -52,8 +49,6 @@ proc populate*(purchasing: Purchasing,
result = request
if result.ask.proofProbability == 0.u256:
result.ask.proofProbability = purchasing.proofProbability
if result.expiry == 0.u256:
result.expiry = (purchasing.clock.now().u256 + purchasing.requestExpiryInterval)
if result.nonce == Nonce.default:
var id = result.nonce.toArray
doAssert randomBytes(id) == 32

View File

@ -34,7 +34,7 @@ method run*(state: PurchaseSubmitted, machine: Machine): Future[?State] {.async.
await subscription.unsubscribe()
proc withTimeout(future: Future[void]) {.async.} =
let expiry = request.expiry.truncate(int64) + 1
let expiry = (await market.requestExpiresAt(request.id)) + 1
trace "waiting for request fulfillment or expiry", expiry
await future.withTimeout(clock, expiry)

View File

@ -399,7 +399,7 @@ proc initPurchasingApi(node: CodexNodeRef, router: var RestRouter) =
## duration - the duration of the request in seconds
## proofProbability - how often storage proofs are required
## reward - the maximum amount of tokens paid per second per slot to hosts the client is willing to pay
## expiry - timestamp, in seconds, when the request expires if the Request does not find requested amount of nodes to host the data
## expiry - specifies threshold in seconds from now when the request expires if the Request does not find requested amount of nodes to host the data
## nodes - number of nodes the content should be stored on
## tolerance - allowed number of nodes that can be lost before content is lost
## colateral - requested collateral from hosts when they fill slot
@ -425,15 +425,8 @@ proc initPurchasingApi(node: CodexNodeRef, router: var RestRouter) =
without expiry =? params.expiry:
return RestApiResponse.error(Http400, "Expiry required")
if node.clock.isNil:
return RestApiResponse.error(Http500)
if expiry <= node.clock.now.u256:
return RestApiResponse.error(Http400, "Expiry needs to be in future. Now: " & $node.clock.now)
let expiryLimit = node.clock.now.u256 + params.duration
if expiry > expiryLimit:
return RestApiResponse.error(Http400, "Expiry has to be before the request's end (now + duration). Limit: " & $expiryLimit)
if expiry <= 0 or expiry >= params.duration:
return RestApiResponse.error(Http400, "Expiry needs value bigger then zero and smaller then the request's duration")
without purchaseId =? await node.requestStorage(
cid,
@ -494,7 +487,7 @@ proc initPurchasingApi(node: CodexNodeRef, router: var RestRouter) =
proc initNodeApi(node: CodexNodeRef, conf: CodexConf, router: var RestRouter) =
## various node management api's
##
##
router.api(
MethodGet,
"/api/codex/v1/spr") do () -> RestApiResponse:

View File

@ -72,8 +72,11 @@ proc subscribeCancellation(agent: SalesAgent) {.async.} =
without request =? data.request:
return
let market = agent.context.market
let expiry = await market.requestExpiresAt(data.requestId)
while true:
let deadline = max(clock.now, request.expiry.truncate(int64)) + 1
let deadline = max(clock.now, expiry) + 1
trace "Waiting for request to be cancelled", now=clock.now, expiry=deadline
await clock.waitUntil(deadline)

View File

@ -69,9 +69,6 @@ method putBlock*(
ttl = Duration.none): Future[?!void] {.async.} =
## Store block locally and notify the network
##
trace "Putting block into network store", cid = blk.cid
let res = await self.localStore.putBlock(blk, ttl)
if res.isErr:
return res

View File

@ -119,7 +119,7 @@ method putCidAndProof*(
without key =? createBlockCidAndProofMetadataKey(treeCid, index), err:
return failure(err)
trace "Storing block cid and proof with key", key
trace "Storing block cid and proof", blockCid, key
let value = (blockCid, proof).encode()
@ -313,7 +313,7 @@ method putBlock*(
return success()
without key =? makePrefixKey(self.postFixLen, blk.cid), err:
trace "Error getting key from provider", err = err.msg
warn "Error getting key from provider", err = err.msg
return failure(err)
if await key in self.repoDs:
@ -325,8 +325,6 @@ method putBlock*(
return failure(
newException(QuotaUsedError, "Cannot store block, quota used!"))
trace "Storing block with key", key
var
batch: seq[BatchEntry]
@ -334,22 +332,21 @@ method putBlock*(
used = self.quotaUsedBytes + blk.data.len.uint
if err =? (await self.repoDs.put(key, blk.data)).errorOption:
trace "Error storing block", err = err.msg
error "Error storing block", err = err.msg
return failure(err)
trace "Updating quota", used
batch.add((QuotaUsedKey, @(used.uint64.toBytesBE)))
without blockExpEntry =? self.getBlockExpirationEntry(blk.cid, ttl), err:
trace "Unable to create block expiration metadata key", err = err.msg
warn "Unable to create block expiration metadata key", err = err.msg
return failure(err)
batch.add(blockExpEntry)
if err =? (await self.metaDs.put(batch)).errorOption:
trace "Error updating quota bytes", err = err.msg
error "Error updating quota bytes", err = err.msg
if err =? (await self.repoDs.delete(key)).errorOption:
trace "Error deleting block after failed quota update", err = err.msg
error "Error deleting block after failed quota update", err = err.msg
return failure(err)
return failure(err)
@ -357,7 +354,7 @@ method putBlock*(
self.quotaUsedBytes = used
inc self.totalBlocks
if isErr (await self.persistTotalBlocksCount()):
trace "Unable to update block total metadata"
warn "Unable to update block total metadata"
return failure("Unable to update block total metadata")
self.updateMetrics()

View File

@ -34,6 +34,12 @@ proc new*(
proc slots*(validation: Validation): seq[SlotId] =
validation.slots.toSeq
proc iterateSlots(validation: Validation, action: proc(s: SlotId): Future[void] {.async.}) {.async.} =
# Copy of hashSet, for iteration.
let slots = validation.slots
for slotId in slots:
await action(slotId)
proc getCurrentPeriod(validation: Validation): UInt256 =
return validation.periodicity.periodOf(validation.clock.now().u256)
@ -55,11 +61,12 @@ proc subscribeSlotFilled(validation: Validation) {.async.} =
proc removeSlotsThatHaveEnded(validation: Validation) {.async.} =
var ended: HashSet[SlotId]
for slotId in validation.slots:
proc onSlot(slotId: SlotId) {.async.} =
let state = await validation.market.slotState(slotId)
if state != SlotState.Filled:
trace "Removing slot", slotId
ended.incl(slotId)
await validation.iterateSlots(onSlot)
validation.slots.excl(ended)
proc markProofAsMissing(validation: Validation,
@ -81,9 +88,10 @@ proc markProofAsMissing(validation: Validation,
error "Marking proof as missing failed", msg = e.msg
proc markProofsAsMissing(validation: Validation) {.async.} =
for slotId in validation.slots:
proc onSlot(slotId: SlotId) {.async.} =
let previousPeriod = validation.getCurrentPeriod() - 1
await validation.markProofAsMissing(slotId, previousPeriod)
await validation.iterateSlots(onSlot)
proc run(validation: Validation) {.async.} =
trace "Validation started"

View File

@ -213,8 +213,7 @@ components:
description: Number as decimal string that represents how much collateral is asked from hosts that wants to fill a slots
expiry:
type: string
description: Number as decimal string that represents expiry time of the request (in unix timestamp)
description: Number as decimal string that represents expiry threshold in seconds from when the Request is submitted. When the threshold is reached and the Request does not find requested amount of nodes to host the data, the Request is voided. The number of seconds can not be higher then the Request's duration itself.
StorageAsk:
type: object
required:

View File

@ -50,7 +50,7 @@ asyncchecksuite "Block Advertising and Discovery":
blockDiscovery = MockDiscovery.new()
wallet = WalletRef.example
network = BlockExcNetwork.new(switch)
localStore = CacheStore.new(blocks.mapIt( it ))
localStore = CacheStore.new(blocks.mapIt(it))
peerStore = PeerCtxStore.new()
pendingBlocks = PendingBlocksManager.new()
@ -92,57 +92,40 @@ asyncchecksuite "Block Advertising and Discovery":
blockDiscovery.findBlockProvidersHandler =
proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] {.async.} =
await engine.resolveBlocks(blocks.filterIt( it.cid == cid ))
await engine.resolveBlocks(blocks.filterIt(it.cid == cid))
await allFuturesThrowing(
allFinished(pendingBlocks))
await engine.stop()
test "Should advertise both manifests and blocks":
test "Should advertise both manifests and trees":
let
cids = @[manifest.cid.tryGet, manifest.treeCid]
advertised = initTable.collect:
for b in (blocks & manifestBlock): {b.cid: newFuture[void]()}
for cid in cids: {cid: newFuture[void]()}
blockDiscovery
.publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async.} =
if cid in advertised and not advertised[cid].finished():
advertised[cid].complete()
discovery.advertiseType = BlockType.Both
await engine.start() # fire up advertise loop
await engine.start()
await allFuturesThrowing(
allFinished(toSeq(advertised.values)))
await engine.stop()
test "Should advertise local manifests":
test "Should not advertise local blocks":
let
advertised = newFuture[Cid]()
blockCids = blocks.mapIt(it.cid)
blockDiscovery
.publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async.} =
check manifestBlock.cid == cid
advertised.complete(cid)
check:
cid notin blockCids
discovery.advertiseType = BlockType.Manifest
await engine.start() # fire up advertise loop
check (await advertised.wait(10.millis)) == manifestBlock.cid
await engine.stop()
test "Should advertise local blocks":
let
advertised = initTable.collect:
for b in blocks: {b.cid: newFuture[void]()}
blockDiscovery
.publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async.} =
if cid in advertised and not advertised[cid].finished():
advertised[cid].complete()
discovery.advertiseType = BlockType.Block
await engine.start() # fire up advertise loop
await allFuturesThrowing(
allFinished(toSeq(advertised.values)))
await engine.start()
await sleepAsync(3.seconds)
await engine.stop()
test "Should not launch discovery if remote peer has block":
@ -165,7 +148,7 @@ asyncchecksuite "Block Advertising and Discovery":
proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] =
check false
await engine.start() # fire up discovery loop
await engine.start()
engine.pendingBlocks.resolve(blocks.mapIt(BlockDelivery(blk: it, address: it.address)))
await allFuturesThrowing(
@ -173,23 +156,33 @@ asyncchecksuite "Block Advertising and Discovery":
await engine.stop()
asyncchecksuite "E2E - Multiple Nodes Discovery":
let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256)
proc asBlock(m: Manifest): bt.Block =
let mdata = m.encode().tryGet()
bt.Block.new(data = mdata, codec = ManifestCodec).tryGet()
asyncchecksuite "E2E - Multiple Nodes Discovery":
var
switch: seq[Switch]
blockexc: seq[NetworkStore]
blocks: seq[bt.Block]
manifests: seq[Manifest]
mBlocks: seq[bt.Block]
trees: seq[CodexTree]
setup:
while true:
let chunk = await chunker.getBytes()
if chunk.len <= 0:
break
blocks.add(bt.Block.new(chunk).tryGet())
for _ in 0..<4:
let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256)
var blocks = newSeq[bt.Block]()
while true:
let chunk = await chunker.getBytes()
if chunk.len <= 0:
break
blocks.add(bt.Block.new(chunk).tryGet())
let (manifest, tree) = makeManifestAndTree(blocks).tryGet()
manifests.add(manifest)
mBlocks.add(manifest.asBlock())
trees.add(tree)
let
s = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr})
blockDiscovery = MockDiscovery.new()
@ -223,9 +216,12 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
teardown:
switch = @[]
blockexc = @[]
manifests = @[]
mBlocks = @[]
trees = @[]
test "E2E - Should advertise and discover blocks":
# Distribute the blocks amongst 1..3
# Distribute the manifests and trees amongst 1..3
# Ask 0 to download everything without connecting him beforehand
var advertised: Table[Cid, SignedPeerRecord]
@ -242,14 +238,14 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
.publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} =
advertised[cid] = switch[3].peerInfo.signedPeerRecord
discard blocks[0..5].mapIt(blockexc[1].engine.pendingBlocks.getWantHandle(it.address))
await blockexc[1].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[0..5].mapIt(BlockDelivery(blk: it, address: it.address)))
discard blockexc[1].engine.pendingBlocks.getWantHandle(mBlocks[0].cid)
await blockexc[1].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[0], address: BlockAddress(leaf: false, cid: mBlocks[0].cid))])
discard blocks[4..10].mapIt(blockexc[2].engine.pendingBlocks.getWantHandle(it.address))
await blockexc[2].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[4..10].mapIt(BlockDelivery(blk: it, address: it.address)))
discard blockexc[2].engine.pendingBlocks.getWantHandle(mBlocks[1].cid)
await blockexc[2].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[1], address: BlockAddress(leaf: false, cid: mBlocks[1].cid))])
discard blocks[10..15].mapIt(blockexc[3].engine.pendingBlocks.getWantHandle(it.address))
await blockexc[3].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[10..15].mapIt(BlockDelivery(blk: it, address: it.address)))
discard blockexc[3].engine.pendingBlocks.getWantHandle(mBlocks[2].cid)
await blockexc[3].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[2], address: BlockAddress(leaf: false, cid: mBlocks[2].cid))])
MockDiscovery(blockexc[0].engine.discovery.discovery)
.findBlockProvidersHandler = proc(d: MockDiscovery, cid: Cid):
@ -258,22 +254,22 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
result.add(advertised[cid])
let futs = collect(newSeq):
for b in blocks:
blockexc[0].engine.requestBlock(b.cid)
for m in mBlocks[0..2]:
blockexc[0].engine.requestBlock(m.cid)
await allFuturesThrowing(
switch.mapIt( it.start() ) &
blockexc.mapIt( it.engine.start() )).wait(10.seconds)
switch.mapIt(it.start()) &
blockexc.mapIt(it.engine.start())).wait(10.seconds)
await allFutures(futs).wait(10.seconds)
await allFuturesThrowing(
blockexc.mapIt( it.engine.stop() ) &
switch.mapIt( it.stop() )).wait(10.seconds)
blockexc.mapIt(it.engine.stop()) &
switch.mapIt(it.stop())).wait(10.seconds)
test "E2E - Should advertise and discover blocks with peers already connected":
# Distribute the blocks amongst 1..3
# Ask 0 to download everything without connecting him beforehand
# Ask 0 to download everything *WITH* connecting him beforehand
var advertised: Table[Cid, SignedPeerRecord]
@ -289,14 +285,14 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
.publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} =
advertised[cid] = switch[3].peerInfo.signedPeerRecord
discard blocks[0..5].mapIt(blockexc[1].engine.pendingBlocks.getWantHandle(it.address))
await blockexc[1].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[0..5].mapIt(BlockDelivery(blk: it, address: it.address)))
discard blockexc[1].engine.pendingBlocks.getWantHandle(mBlocks[0].cid)
await blockexc[1].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[0], address: BlockAddress(leaf: false, cid: mBlocks[0].cid))])
discard blocks[4..10].mapIt(blockexc[2].engine.pendingBlocks.getWantHandle(it.address))
await blockexc[2].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[4..10].mapIt(BlockDelivery(blk: it, address: it.address)))
discard blockexc[2].engine.pendingBlocks.getWantHandle(mBlocks[1].cid)
await blockexc[2].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[1], address: BlockAddress(leaf: false, cid: mBlocks[1].cid))])
discard blocks[10..15].mapIt(blockexc[3].engine.pendingBlocks.getWantHandle(it.address))
await blockexc[3].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[10..15].mapIt(BlockDelivery(blk: it, address: it.address)))
discard blockexc[3].engine.pendingBlocks.getWantHandle(mBlocks[2].cid)
await blockexc[3].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[2], address: BlockAddress(leaf: false, cid: mBlocks[2].cid))])
MockDiscovery(blockexc[0].engine.discovery.discovery)
.findBlockProvidersHandler = proc(d: MockDiscovery, cid: Cid):
@ -305,14 +301,14 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
return @[advertised[cid]]
let
futs = blocks.mapIt( blockexc[0].engine.requestBlock( it.cid ) )
futs = mBlocks[0..2].mapIt(blockexc[0].engine.requestBlock(it.cid))
await allFuturesThrowing(
switch.mapIt( it.start() ) &
blockexc.mapIt( it.engine.start() )).wait(10.seconds)
switch.mapIt(it.start()) &
blockexc.mapIt(it.engine.start())).wait(10.seconds)
await allFutures(futs).wait(10.seconds)
await allFuturesThrowing(
blockexc.mapIt( it.engine.stop() ) &
switch.mapIt( it.stop() )).wait(10.seconds)
blockexc.mapIt(it.engine.stop()) &
switch.mapIt(it.stop())).wait(10.seconds)

View File

@ -10,17 +10,26 @@ import pkg/codex/blockexchange
import pkg/codex/chunker
import pkg/codex/blocktype as bt
import pkg/codex/blockexchange/engine
import pkg/codex/manifest
import pkg/codex/merkletree
import ../../../asynctest
import ../../helpers
import ../../helpers/mockdiscovery
import ../../examples
proc asBlock(m: Manifest): bt.Block =
let mdata = m.encode().tryGet()
bt.Block.new(data = mdata, codec = ManifestCodec).tryGet()
asyncchecksuite "Test Discovery Engine":
let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256)
var
blocks: seq[bt.Block]
manifest: Manifest
tree: CodexTree
manifestBlock: bt.Block
switch: Switch
peerStore: PeerCtxStore
blockDiscovery: MockDiscovery
@ -35,6 +44,10 @@ asyncchecksuite "Test Discovery Engine":
blocks.add(bt.Block.new(chunk).tryGet())
(manifest, tree) = makeManifestAndTree(blocks).tryGet()
manifestBlock = manifest.asBlock()
blocks.add(manifestBlock)
switch = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr})
network = BlockExcNetwork.new(switch)
peerStore = PeerCtxStore.new()
@ -51,11 +64,11 @@ asyncchecksuite "Test Discovery Engine":
blockDiscovery,
pendingBlocks,
discoveryLoopSleep = 100.millis)
wants = blocks.mapIt( pendingBlocks.getWantHandle(it.cid) )
wants = blocks.mapIt(pendingBlocks.getWantHandle(it.cid) )
blockDiscovery.findBlockProvidersHandler =
proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] {.async, gcsafe.} =
pendingBlocks.resolve(blocks.filterIt( it.cid == cid).mapIt(BlockDelivery(blk: it, address: it.address)))
pendingBlocks.resolve(blocks.filterIt(it.cid == cid).mapIt(BlockDelivery(blk: it, address: it.address)))
await discoveryEngine.start()
await allFuturesThrowing(allFinished(wants)).wait(1.seconds)
@ -63,7 +76,7 @@ asyncchecksuite "Test Discovery Engine":
test "Should Advertise Haves":
var
localStore = CacheStore.new(blocks.mapIt( it ))
localStore = CacheStore.new(blocks.mapIt(it))
discoveryEngine = DiscoveryEngine.new(
localStore,
peerStore,
@ -72,8 +85,8 @@ asyncchecksuite "Test Discovery Engine":
pendingBlocks,
discoveryLoopSleep = 100.millis)
haves = collect(initTable):
for b in blocks:
{ b.cid: newFuture[void]() }
for cid in @[manifestBlock.cid, manifest.treeCid]:
{ cid: newFuture[void]() }
blockDiscovery.publishBlockProvideHandler =
proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} =
@ -108,28 +121,6 @@ asyncchecksuite "Test Discovery Engine":
await want.wait(1.seconds)
await discoveryEngine.stop()
test "Should queue advertise request":
var
localStore = CacheStore.new(@[blocks[0]])
discoveryEngine = DiscoveryEngine.new(
localStore,
peerStore,
network,
blockDiscovery,
pendingBlocks,
discoveryLoopSleep = 100.millis)
have = newFuture[void]()
blockDiscovery.publishBlockProvideHandler =
proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} =
check cid == blocks[0].cid
if not have.finished:
have.complete()
await discoveryEngine.start()
await have.wait(1.seconds)
await discoveryEngine.stop()
test "Should not request more than minPeersPerBlock":
var
localStore = CacheStore.new()

View File

@ -20,6 +20,7 @@ type
activeSlots*: Table[Address, seq[SlotId]]
requested*: seq[StorageRequest]
requestEnds*: Table[RequestId, SecondsSince1970]
requestExpiry*: Table[RequestId, SecondsSince1970]
requestState*: Table[RequestId, RequestState]
slotState*: Table[SlotId, SlotState]
fulfilled*: seq[Fulfillment]
@ -165,6 +166,10 @@ method getRequestEnd*(market: MockMarket,
id: RequestId): Future[SecondsSince1970] {.async.} =
return market.requestEnds[id]
method requestExpiresAt*(market: MockMarket,
id: RequestId): Future[SecondsSince1970] {.async.} =
return market.requestExpiry[id]
method getHost*(market: MockMarket,
requestId: RequestId,
slotIndex: UInt256): Future[?Address] {.async.} =

View File

@ -473,6 +473,9 @@ asyncchecksuite "Sales":
check eventually (await reservations.all(Availability)).get == @[availability]
test "makes storage available again when request expires":
let expiry = getTime().toUnix() + 10
market.requestExpiry[request.id] = expiry
let origSize = availability.freeSize
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
@ -486,11 +489,14 @@ asyncchecksuite "Sales":
# would otherwise not set the timeout early enough as it uses `clock.now` in the deadline calculation.
await sleepAsync(chronos.milliseconds(100))
market.requestState[request.id]=RequestState.Cancelled
clock.set(request.expiry.truncate(int64)+1)
clock.set(expiry + 1)
check eventually (await reservations.all(Availability)).get == @[availability]
check getAvailability().freeSize == origSize
test "verifies that request is indeed expired from onchain before firing onCancelled":
let expiry = getTime().toUnix() + 10
market.requestExpiry[request.id] = expiry
let origSize = availability.freeSize
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
@ -504,7 +510,7 @@ asyncchecksuite "Sales":
# If we would not await, then the `clock.set` would run "too fast" as the `subscribeCancellation()`
# would otherwise not set the timeout early enough as it uses `clock.now` in the deadline calculation.
await sleepAsync(chronos.milliseconds(100))
clock.set(request.expiry.truncate(int64)+1)
clock.set(expiry + 1)
check getAvailability().freeSize == 0
market.requestState[request.id]=RequestState.Cancelled # Now "on-chain" is also expired

View File

@ -41,19 +41,7 @@ method run*(state: MockErrorState, machine: Machine): Future[?State] {.async.} =
raise newException(ValueError, "failure")
asyncchecksuite "Sales agent":
var request = StorageRequest(
ask: StorageAsk(
slots: 4,
slotSize: 100.u256,
duration: 60.u256,
reward: 10.u256,
),
content: StorageContent(
cid: "some cid"
),
expiry: (getTime() + initDuration(hours=1)).toUnix.u256
)
let request = StorageRequest.example
var agent: SalesAgent
var context: SalesContext
var slotIndex: UInt256
@ -62,6 +50,7 @@ asyncchecksuite "Sales agent":
setup:
market = MockMarket.new()
market.requestExpiry[request.id] = getTime().toUnix() + request.expiry.truncate(int64)
clock = MockClock.new()
context = SalesContext(market: market, clock: clock)
slotIndex = 0.u256
@ -109,7 +98,7 @@ asyncchecksuite "Sales agent":
agent.start(MockState.new())
await agent.subscribe()
market.requestState[request.id] = RequestState.Cancelled
clock.set(request.expiry.truncate(int64) + 1)
clock.set(market.requestExpiry[request.id] + 1)
check eventually onCancelCalled
for requestState in {RequestState.New, Started, Finished, Failed}:
@ -117,7 +106,7 @@ asyncchecksuite "Sales agent":
agent.start(MockState.new())
await agent.subscribe()
market.requestState[request.id] = requestState
clock.set(request.expiry.truncate(int64) + 1)
clock.set(market.requestExpiry[request.id] + 1)
await sleepAsync(100.millis)
check not onCancelCalled
@ -126,7 +115,7 @@ asyncchecksuite "Sales agent":
agent.start(MockState.new())
await agent.subscribe()
market.requestState[request.id] = requestState
clock.set(request.expiry.truncate(int64) + 1)
clock.set(market.requestExpiry[request.id] + 1)
check eventually agent.data.cancelled.finished
test "cancelled future is finished (cancelled) when onFulfilled called":

View File

@ -57,6 +57,16 @@ checksuite "Manifest":
check:
encodeDecode(manifest) == manifest
test "Should encode/decode large manifest":
let large = Manifest.new(
treeCid = Cid.example,
blockSize = (64 * 1024).NBytes,
datasetSize = (5 * 1024).MiBs
)
check:
encodeDecode(large) == large
test "Should encode/decode to/from protected manifest":
check:
encodeDecode(protectedManifest) == protectedManifest

View File

@ -19,7 +19,7 @@ asyncchecksuite "Purchasing":
var purchasing: Purchasing
var market: MockMarket
var clock: MockClock
var request: StorageRequest
var request, populatedRequest: StorageRequest
setup:
market = MockMarket.new()
@ -34,6 +34,12 @@ asyncchecksuite "Purchasing":
)
)
# We need request which has stable ID during the whole Purchasing pipeline
# for some tests (related to expiry). Because of Purchasing.populate() we need
# to do the steps bellow.
populatedRequest = StorageRequest.example
populatedRequest.client = await market.getSigner()
test "submits a storage request when asked":
discard await purchasing.purchase(request)
check eventually market.requested.len > 0
@ -63,23 +69,6 @@ asyncchecksuite "Purchasing":
check eventually market.requested.len > 0
check market.requested[0].ask.proofProbability == 42.u256
test "has a default value for request expiration interval":
check purchasing.requestExpiryInterval != 0.u256
test "can change default value for request expiration interval":
purchasing.requestExpiryInterval = 42.u256
let start = getTime().toUnix()
discard await purchasing.purchase(request)
check eventually market.requested.len > 0
check market.requested[0].expiry == (start + 42).u256
test "can override expiry time per request":
let expiry = (getTime().toUnix() + 42).u256
request.expiry = expiry
discard await purchasing.purchase(request)
check eventually market.requested.len > 0
check market.requested[0].expiry == expiry
test "includes a random nonce in every storage request":
discard await purchasing.purchase(request)
discard await purchasing.purchase(request)
@ -92,29 +81,37 @@ asyncchecksuite "Purchasing":
check market.requested[0].client == await market.getSigner()
test "succeeds when request is finished":
let purchase = await purchasing.purchase(request)
market.requestExpiry[populatedRequest.id] = getTime().toUnix() + 10
let purchase = await purchasing.purchase(populatedRequest)
check eventually market.requested.len > 0
let request = market.requested[0]
let requestEnd = getTime().toUnix() + 42
market.requestEnds[request.id] = requestEnd
market.emitRequestFulfilled(request.id)
clock.set(requestEnd + 1)
await purchase.wait()
check purchase.error.isNone
test "fails when request times out":
let purchase = await purchasing.purchase(request)
let expiry = getTime().toUnix() + 10
market.requestExpiry[populatedRequest.id] = expiry
let purchase = await purchasing.purchase(populatedRequest)
check eventually market.requested.len > 0
let request = market.requested[0]
clock.set(request.expiry.truncate(int64) + 1)
clock.set(expiry + 1)
expect PurchaseTimeout:
await purchase.wait()
test "checks that funds were withdrawn when purchase times out":
let purchase = await purchasing.purchase(request)
let expiry = getTime().toUnix() + 10
market.requestExpiry[populatedRequest.id] = expiry
let purchase = await purchasing.purchase(populatedRequest)
check eventually market.requested.len > 0
let request = market.requested[0]
clock.set(request.expiry.truncate(int64) + 1)
clock.set(expiry + 1)
expect PurchaseTimeout:
await purchase.wait()
check market.withdrawn == @[request.id]

View File

@ -39,10 +39,10 @@ ethersuite "Marketplace contracts":
switchAccount(client)
discard await token.approve(marketplace.address, request.price)
await marketplace.requestStorage(request)
discard await marketplace.requestStorage(request)
switchAccount(host)
discard await token.approve(marketplace.address, request.ask.collateral)
await marketplace.fillSlot(request.id, 0.u256, proof)
discard await marketplace.fillSlot(request.id, 0.u256, proof)
slotId = request.slotId(0.u256)
proc waitUntilProofRequired(slotId: SlotId) {.async.} =
@ -57,12 +57,12 @@ ethersuite "Marketplace contracts":
proc startContract() {.async.} =
for slotIndex in 1..<request.ask.slots:
discard await token.approve(marketplace.address, request.ask.collateral)
await marketplace.fillSlot(request.id, slotIndex.u256, proof)
discard await marketplace.fillSlot(request.id, slotIndex.u256, proof)
test "accept marketplace proofs":
switchAccount(host)
await waitUntilProofRequired(slotId)
await marketplace.submitProof(slotId, proof)
discard await marketplace.submitProof(slotId, proof)
test "can mark missing proofs":
switchAccount(host)
@ -71,7 +71,7 @@ ethersuite "Marketplace contracts":
let endOfPeriod = periodicity.periodEnd(missingPeriod)
await ethProvider.advanceTimeTo(endOfPeriod + 1)
switchAccount(client)
await marketplace.markProofAsMissing(slotId, missingPeriod)
discard await marketplace.markProofAsMissing(slotId, missingPeriod)
test "can be paid out at the end":
switchAccount(host)
@ -80,12 +80,13 @@ ethersuite "Marketplace contracts":
let requestEnd = await marketplace.requestEnd(request.id)
await ethProvider.advanceTimeTo(requestEnd.u256 + 1)
let startBalance = await token.balanceOf(address)
await marketplace.freeSlot(slotId)
discard await marketplace.freeSlot(slotId)
let endBalance = await token.balanceOf(address)
check endBalance == (startBalance + request.ask.duration * request.ask.reward + request.ask.collateral)
test "cannot mark proofs missing for cancelled request":
await ethProvider.advanceTimeTo(request.expiry + 1)
let expiry = await marketplace.requestExpiry(request.id)
await ethProvider.advanceTimeTo((expiry + 1).u256)
switchAccount(client)
let missingPeriod = periodicity.periodOf(await ethProvider.currentTime())
await ethProvider.advanceTime(periodicity.seconds)

View File

@ -32,6 +32,10 @@ ethersuite "On-Chain Market":
let currentPeriod = periodicity.periodOf(await ethProvider.currentTime())
await ethProvider.advanceTimeTo(periodicity.periodEnd(currentPeriod) + 1)
proc advanceToCancelledRequest(request: StorageRequest) {.async.} =
let expiry = (await market.requestExpiresAt(request.id)) + 1
await ethProvider.advanceTimeTo(expiry.u256)
proc waitUntilProofRequired(slotId: SlotId) {.async.} =
await advanceToNextPeriod()
while not (
@ -70,22 +74,19 @@ ethersuite "On-Chain Market":
test "supports withdrawing of funds":
await market.requestStorage(request)
await ethProvider.advanceTimeTo(request.expiry + 1)
await advanceToCancelledRequest(request)
await market.withdrawFunds(request.id)
test "supports request subscriptions":
var receivedIds: seq[RequestId]
var receivedAsks: seq[StorageAsk]
var receivedExpirys: seq[UInt256]
proc onRequest(id: RequestId, ask: StorageAsk, expiry: UInt256) =
receivedIds.add(id)
receivedAsks.add(ask)
receivedExpirys.add(expiry)
let subscription = await market.subscribeRequests(onRequest)
await market.requestStorage(request)
check receivedIds == @[request.id]
check receivedAsks == @[request.ask]
check receivedExpirys == @[request.expiry]
await subscription.unsubscribe()
test "supports filling of slots":
@ -216,7 +217,7 @@ ethersuite "On-Chain Market":
receivedIds.add(id)
let subscription = await market.subscribeRequestCancelled(request.id, onRequestCancelled)
await ethProvider.advanceTimeTo(request.expiry + 1)
await advanceToCancelledRequest(request)
await market.withdrawFunds(request.id)
check receivedIds == @[request.id]
await subscription.unsubscribe()
@ -240,7 +241,7 @@ ethersuite "On-Chain Market":
await waitUntilProofRequired(slotId)
let missingPeriod = periodicity.periodOf(await ethProvider.currentTime())
await advanceToNextPeriod()
await marketplace.markProofAsMissing(slotId, missingPeriod)
discard await marketplace.markProofAsMissing(slotId, missingPeriod)
check receivedIds == @[request.id]
await subscription.unsubscribe()
@ -255,7 +256,7 @@ ethersuite "On-Chain Market":
receivedIds.add(requestId)
let subscription = await market.subscribeRequestCancelled(request.id, onRequestCancelled)
await ethProvider.advanceTimeTo(request.expiry + 1) # shares expiry with otherRequest
advanceToCancelledRequest(otherRequest) # shares expiry with otherRequest
await market.withdrawFunds(otherRequest.id)
check receivedIds.len == 0
await market.withdrawFunds(request.id)
@ -338,13 +339,12 @@ ethersuite "On-Chain Market":
# 6 blocks, we only need to check 5 "blocks ago". We don't need to check the
# `approve` for the first `requestStorage` call, so that's 1 less again = 4
# "blocks ago".
check eventually (
(await market.queryPastStorageRequests(5)) ==
@[
PastStorageRequest(requestId: request.id, ask: request.ask, expiry: request.expiry),
PastStorageRequest(requestId: request1.id, ask: request1.ask, expiry: request1.expiry),
PastStorageRequest(requestId: request2.id, ask: request2.ask, expiry: request2.expiry)
])
proc getsPastRequest(): Future[bool] {.async.} =
let reqs = await market.queryPastStorageRequests(5)
reqs.mapIt(it.requestId) == @[request.id, request1.id, request2.id]
check eventually await getsPastRequest()
test "past event query can specify negative `blocksAgo` parameter":
await market.requestStorage(request)

View File

@ -59,7 +59,7 @@ proc example*(_: type StorageRequest): StorageRequest =
cid: "zb2rhheVmk3bLks5MgzTqyznLu1zqGH5jrfTA1eAZXrjx7Vob",
merkleRoot: array[32, byte].example
),
expiry: (getTime() + 1.hours).toUnix.u256,
expiry:(60 * 60).u256, # 1 hour ,
nonce: Nonce.example
)

View File

@ -70,7 +70,7 @@ proc requestStorageRaw*(
reward: UInt256,
proofProbability: UInt256,
collateral: UInt256,
expiry: UInt256 = 0.u256,
expiry: uint = 0,
nodes: uint = 1,
tolerance: uint = 0
): Response =
@ -88,7 +88,7 @@ proc requestStorageRaw*(
}
if expiry != 0:
json["expiry"] = %expiry
json["expiry"] = %($expiry)
return client.http.post(url, $json)
@ -98,7 +98,7 @@ proc requestStorage*(
duration: UInt256,
reward: UInt256,
proofProbability: UInt256,
expiry: UInt256,
expiry: uint,
collateral: UInt256,
nodes: uint = 1,
tolerance: uint = 0

View File

@ -68,11 +68,9 @@ template marketplacesuite*(name: string, body: untyped) =
expiry: uint64 = 4.periods,
nodes = providers().len,
tolerance = 0): Future[PurchaseId] {.async.} =
let expiry = (await ethProvider.currentTime()) + expiry.u256
let id = client.requestStorage(
cid,
expiry=expiry,
expiry=expiry.uint,
duration=duration.u256,
proofProbability=proofProbability.u256,
collateral=collateral,

View File

@ -112,10 +112,9 @@ twonodessuite "Integration tests", debug1 = false, debug2 = false:
check availability in client1.getAvailabilities().get
test "node handles storage request":
let expiry = (await ethProvider.currentTime()) + 10
let cid = client1.upload("some file contents").get
let id1 = client1.requestStorage(cid, duration=100.u256, reward=2.u256, proofProbability=3.u256, expiry=expiry, collateral=200.u256).get
let id2 = client1.requestStorage(cid, duration=400.u256, reward=5.u256, proofProbability=6.u256, expiry=expiry, collateral=201.u256).get
let id1 = client1.requestStorage(cid, duration=100.u256, reward=2.u256, proofProbability=3.u256, expiry=10, collateral=200.u256).get
let id2 = client1.requestStorage(cid, duration=400.u256, reward=5.u256, proofProbability=6.u256, expiry=10, collateral=201.u256).get
check id1 != id2
test "node retrieves purchase status":
@ -124,13 +123,12 @@ twonodessuite "Integration tests", debug1 = false, debug2 = false:
let chunker = RandomChunker.new(rng, size = DefaultBlockSize * 2, chunkSize = DefaultBlockSize * 2)
let data = await chunker.getBytes()
let cid = client1.upload(byteutils.toHex(data)).get
let expiry = (await ethProvider.currentTime()) + 30
let id = client1.requestStorage(
cid,
duration=100.u256,
reward=2.u256,
proofProbability=3.u256,
expiry=expiry,
expiry=30,
collateral=200.u256,
nodes=2,
tolerance=1).get
@ -139,16 +137,15 @@ twonodessuite "Integration tests", debug1 = false, debug2 = false:
check request.ask.duration == 100.u256
check request.ask.reward == 2.u256
check request.ask.proofProbability == 3.u256
check request.expiry == expiry
check request.expiry == 30
check request.ask.collateral == 200.u256
check request.ask.slots == 2'u64
check request.ask.maxSlotLoss == 1'u64
# TODO: We currently do not support encoding single chunks
# test "node retrieves purchase status with 1 chunk":
# let expiry = (await ethProvider.currentTime()) + 30
# let cid = client1.upload("some file contents").get
# let id = client1.requestStorage(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, expiry=expiry, collateral=200.u256, nodes=2, tolerance=1).get
# let id = client1.requestStorage(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, expiry=30, collateral=200.u256, nodes=2, tolerance=1).get
# let request = client1.getPurchase(id).get.request.get
# check request.ask.duration == 1.u256
# check request.ask.reward == 2.u256
@ -159,13 +156,12 @@ twonodessuite "Integration tests", debug1 = false, debug2 = false:
# check request.ask.maxSlotLoss == 1'u64
test "node remembers purchase status after restart":
let expiry = (await ethProvider.currentTime()) + 30
let cid = client1.upload("some file contents").get
let id = client1.requestStorage(cid,
duration=100.u256,
reward=2.u256,
proofProbability=3.u256,
expiry=expiry,
expiry=30,
collateral=200.u256).get
check eventually client1.purchaseStateIs(id, "submitted")
@ -177,7 +173,7 @@ twonodessuite "Integration tests", debug1 = false, debug2 = false:
check request.ask.duration == 100.u256
check request.ask.reward == 2.u256
check request.ask.proofProbability == 3.u256
check request.expiry == expiry
check request.expiry == 30
check request.ask.collateral == 200.u256
check request.ask.slots == 1'u64
check request.ask.maxSlotLoss == 0'u64
@ -189,14 +185,13 @@ twonodessuite "Integration tests", debug1 = false, debug2 = false:
let availability = client2.postAvailability(totalSize=size, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get
# client 1 requests storage
let expiry = (await ethProvider.currentTime()) + 5*60
let cid = client1.upload(data).get
let id = client1.requestStorage(
cid,
duration=10*60.u256,
reward=400.u256,
proofProbability=3.u256,
expiry=expiry,
expiry=5*60,
collateral=200.u256,
nodes = 5,
tolerance = 2).get
@ -228,14 +223,13 @@ twonodessuite "Integration tests", debug1 = false, debug2 = false:
discard client2.postAvailability(totalSize=size, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get
# client 1 requests storage
let expiry = (await ethProvider.currentTime()) + 5*60
let cid = client1.upload(data).get
let id = client1.requestStorage(
cid,
duration=duration,
reward=reward,
proofProbability=3.u256,
expiry=expiry,
expiry=5*60,
collateral=200.u256,
nodes = nodes,
tolerance = 2).get
@ -253,12 +247,11 @@ twonodessuite "Integration tests", debug1 = false, debug2 = false:
test "request storage fails if nodes and tolerance aren't correct":
let cid = client1.upload("some file contents").get
let expiry = (await ethProvider.currentTime()) + 30
let responseBefore = client1.requestStorageRaw(cid,
duration=100.u256,
reward=2.u256,
proofProbability=3.u256,
expiry=expiry,
expiry=30,
collateral=200.u256,
nodes=1,
tolerance=1)
@ -267,20 +260,15 @@ twonodessuite "Integration tests", debug1 = false, debug2 = false:
check responseBefore.body == "Tolerance cannot be greater or equal than nodes (nodes - tolerance)"
test "node requires expiry and its value to be in future":
let currentTime = await ethProvider.currentTime()
let cid = client1.upload("some file contents").get
let responseMissing = client1.requestStorageRaw(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, collateral=200.u256)
check responseMissing.status == "400 Bad Request"
check responseMissing.body == "Expiry required"
let responsePast = client1.requestStorageRaw(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, collateral=200.u256, expiry=currentTime-10)
check responsePast.status == "400 Bad Request"
check "Expiry needs to be in future" in responsePast.body
let responseBefore = client1.requestStorageRaw(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, collateral=200.u256, expiry=currentTime+10)
let responseBefore = client1.requestStorageRaw(cid, duration=10.u256, reward=2.u256, proofProbability=3.u256, collateral=200.u256, expiry=10)
check responseBefore.status == "400 Bad Request"
check "Expiry has to be before the request's end (now + duration)" in responseBefore.body
check "Expiry needs value bigger then zero and smaller then the request's duration" in responseBefore.body
test "updating non-existing availability":
let nonExistingResponse = client1.patchAvailabilityRaw(AvailabilityId.example, duration=100.u256.some, minPrice=200.u256.some, maxCollateral=200.u256.some)
@ -317,14 +305,13 @@ twonodessuite "Integration tests", debug1 = false, debug2 = false:
let availability = client1.postAvailability(totalSize=originalSize, duration=20*60.u256, minPrice=300.u256, maxCollateral=300.u256).get
# Lets create storage request that will utilize some of the availability's space
let expiry = (await ethProvider.currentTime()) + 5*60
let cid = client2.upload(data).get
let id = client2.requestStorage(
cid,
duration=10*60.u256,
reward=400.u256,
proofProbability=3.u256,
expiry=expiry,
expiry=5*60,
collateral=200.u256,
nodes = 5,
tolerance = 2).get

@ -1 +1 @@
Subproject commit a58427e496088b904aed070e92f1c479c45fd852
Subproject commit 57e8cd5013325f05e16833a5320b575d32a403f3