Merge branch 'master' into feature/ceremony-files

# Conflicts:
#	codex/contracts/market.nim
#	codex/contracts/marketplace.nim
This commit is contained in:
Ben 2024-05-15 10:12:18 +02:00
commit 267266ac92
No known key found for this signature in database
GPG Key ID: 541B9D8C9F1426A1
7 changed files with 121 additions and 114 deletions

View File

@ -11,6 +11,7 @@ import std/sequtils
import pkg/chronos import pkg/chronos
import pkg/libp2p/cid import pkg/libp2p/cid
import pkg/libp2p/multicodec
import pkg/metrics import pkg/metrics
import pkg/questionable import pkg/questionable
import pkg/questionable/results import pkg/questionable/results
@ -25,6 +26,7 @@ import ../../utils
import ../../discovery import ../../discovery
import ../../stores/blockstore import ../../stores/blockstore
import ../../logutils import ../../logutils
import ../../manifest
logScope: logScope:
topics = "codex discoveryengine" topics = "codex discoveryengine"
@ -76,14 +78,32 @@ proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} =
await sleepAsync(b.discoveryLoopSleep) await sleepAsync(b.discoveryLoopSleep)
proc advertiseBlock(b: DiscoveryEngine, cid: Cid) {.async.} =
without isM =? cid.isManifest, err:
warn "Unable to determine if cid is manifest"
return
if isM:
without blk =? await b.localStore.getBlock(cid), err:
error "Error retrieving manifest block", cid, err = err.msg
return
without manifest =? Manifest.decode(blk), err:
error "Unable to decode as manifest", err = err.msg
return
# announce manifest cid and tree cid
await b.advertiseQueue.put(cid)
await b.advertiseQueue.put(manifest.treeCid)
proc advertiseQueueLoop(b: DiscoveryEngine) {.async.} = proc advertiseQueueLoop(b: DiscoveryEngine) {.async.} =
while b.discEngineRunning: while b.discEngineRunning:
if cids =? await b.localStore.listBlocks(blockType = b.advertiseType): if cids =? await b.localStore.listBlocks(blockType = b.advertiseType):
trace "Begin iterating blocks..." trace "Begin iterating blocks..."
for c in cids: for c in cids:
if cid =? await c: if cid =? await c:
await b.advertiseQueue.put(cid) b.advertiseBlock(cid)
await sleepAsync(50.millis) await sleepAsync(100.millis)
trace "Iterating blocks finished." trace "Iterating blocks finished."
await sleepAsync(b.advertiseLoopSleep) await sleepAsync(b.advertiseLoopSleep)
@ -248,7 +268,7 @@ proc new*(
discoveryLoopSleep = DefaultDiscoveryLoopSleep, discoveryLoopSleep = DefaultDiscoveryLoopSleep,
advertiseLoopSleep = DefaultAdvertiseLoopSleep, advertiseLoopSleep = DefaultAdvertiseLoopSleep,
minPeersPerBlock = DefaultMinPeersPerBlock, minPeersPerBlock = DefaultMinPeersPerBlock,
advertiseType = BlockType.Both advertiseType = BlockType.Manifest
): DiscoveryEngine = ): DiscoveryEngine =
## Create a discovery engine instance for advertising services ## Create a discovery engine instance for advertising services
## ##

View File

@ -46,7 +46,7 @@ proc approveFunds(market: OnChainMarket, amount: UInt256) {.async.} =
convertEthersError: convertEthersError:
let tokenAddress = await market.contract.token() let tokenAddress = await market.contract.token()
let token = Erc20Token.new(tokenAddress, market.signer) let token = Erc20Token.new(tokenAddress, market.signer)
discard await token.increaseAllowance(market.contract.address(), amount).confirm(1) discard await token.increaseAllowance(market.contract.address(), amount).confirm(0)
method getZkeyHash*(market: OnChainMarket): Future[?string] {.async.} = method getZkeyHash*(market: OnChainMarket): Future[?string] {.async.} =
let config = await market.contract.config() let config = await market.contract.config()
@ -92,7 +92,7 @@ method requestStorage(market: OnChainMarket, request: StorageRequest){.async.} =
convertEthersError: convertEthersError:
debug "Requesting storage" debug "Requesting storage"
await market.approveFunds(request.price()) await market.approveFunds(request.price())
discard await market.contract.requestStorage(request).confirm(1) discard await market.contract.requestStorage(request).confirm(0)
method getRequest(market: OnChainMarket, method getRequest(market: OnChainMarket,
id: RequestId): Future[?StorageRequest] {.async.} = id: RequestId): Future[?StorageRequest] {.async.} =
@ -159,16 +159,16 @@ method fillSlot(market: OnChainMarket,
collateral: UInt256) {.async.} = collateral: UInt256) {.async.} =
convertEthersError: convertEthersError:
await market.approveFunds(collateral) await market.approveFunds(collateral)
await market.contract.fillSlot(requestId, slotIndex, proof) discard await market.contract.fillSlot(requestId, slotIndex, proof).confirm(0)
method freeSlot*(market: OnChainMarket, slotId: SlotId) {.async.} = method freeSlot*(market: OnChainMarket, slotId: SlotId) {.async.} =
convertEthersError: convertEthersError:
await market.contract.freeSlot(slotId) discard await market.contract.freeSlot(slotId).confirm(0)
method withdrawFunds(market: OnChainMarket, method withdrawFunds(market: OnChainMarket,
requestId: RequestId) {.async.} = requestId: RequestId) {.async.} =
convertEthersError: convertEthersError:
await market.contract.withdrawFunds(requestId) discard await market.contract.withdrawFunds(requestId).confirm(0)
method isProofRequired*(market: OnChainMarket, method isProofRequired*(market: OnChainMarket,
id: SlotId): Future[bool] {.async.} = id: SlotId): Future[bool] {.async.} =
@ -201,13 +201,13 @@ method submitProof*(market: OnChainMarket,
id: SlotId, id: SlotId,
proof: Groth16Proof) {.async.} = proof: Groth16Proof) {.async.} =
convertEthersError: convertEthersError:
await market.contract.submitProof(id, proof) discard await market.contract.submitProof(id, proof).confirm(0)
method markProofAsMissing*(market: OnChainMarket, method markProofAsMissing*(market: OnChainMarket,
id: SlotId, id: SlotId,
period: Period) {.async.} = period: Period) {.async.} =
convertEthersError: convertEthersError:
await market.contract.markProofAsMissing(id, period) discard await market.contract.markProofAsMissing(id, period).confirm(0)
method canProofBeMarkedAsMissing*( method canProofBeMarkedAsMissing*(
market: OnChainMarket, market: OnChainMarket,
@ -218,7 +218,7 @@ method canProofBeMarkedAsMissing*(
let contractWithoutSigner = market.contract.connect(provider) let contractWithoutSigner = market.contract.connect(provider)
let overrides = CallOverrides(blockTag: some BlockTag.pending) let overrides = CallOverrides(blockTag: some BlockTag.pending)
try: try:
await contractWithoutSigner.markProofAsMissing(id, period, overrides) discard await contractWithoutSigner.markProofAsMissing(id, period, overrides)
return true return true
except EthersError as e: except EthersError as e:
trace "Proof cannot be marked as missing", msg = e.msg trace "Proof cannot be marked as missing", msg = e.msg

View File

@ -43,9 +43,9 @@ proc slashPercentage*(marketplace: Marketplace): UInt256 {.contract, view.}
proc minCollateralThreshold*(marketplace: Marketplace): UInt256 {.contract, view.} proc minCollateralThreshold*(marketplace: Marketplace): UInt256 {.contract, view.}
proc requestStorage*(marketplace: Marketplace, request: StorageRequest): ?TransactionResponse {.contract.} proc requestStorage*(marketplace: Marketplace, request: StorageRequest): ?TransactionResponse {.contract.}
proc fillSlot*(marketplace: Marketplace, requestId: RequestId, slotIndex: UInt256, proof: Groth16Proof) {.contract.} proc fillSlot*(marketplace: Marketplace, requestId: RequestId, slotIndex: UInt256, proof: Groth16Proof): ?TransactionResponse {.contract.}
proc withdrawFunds*(marketplace: Marketplace, requestId: RequestId) {.contract.} proc withdrawFunds*(marketplace: Marketplace, requestId: RequestId): ?TransactionResponse {.contract.}
proc freeSlot*(marketplace: Marketplace, id: SlotId) {.contract.} proc freeSlot*(marketplace: Marketplace, id: SlotId): ?TransactionResponse {.contract.}
proc getRequest*(marketplace: Marketplace, id: RequestId): StorageRequest {.contract, view.} proc getRequest*(marketplace: Marketplace, id: RequestId): StorageRequest {.contract, view.}
proc getHost*(marketplace: Marketplace, id: SlotId): Address {.contract, view.} proc getHost*(marketplace: Marketplace, id: SlotId): Address {.contract, view.}
proc getActiveSlot*(marketplace: Marketplace, id: SlotId): Slot {.contract, view.} proc getActiveSlot*(marketplace: Marketplace, id: SlotId): Slot {.contract, view.}
@ -66,5 +66,5 @@ proc willProofBeRequired*(marketplace: Marketplace, id: SlotId): bool {.contract
proc getChallenge*(marketplace: Marketplace, id: SlotId): array[32, byte] {.contract, view.} proc getChallenge*(marketplace: Marketplace, id: SlotId): array[32, byte] {.contract, view.}
proc getPointer*(marketplace: Marketplace, id: SlotId): uint8 {.contract, view.} proc getPointer*(marketplace: Marketplace, id: SlotId): uint8 {.contract, view.}
proc submitProof*(marketplace: Marketplace, id: SlotId, proof: Groth16Proof) {.contract.} proc submitProof*(marketplace: Marketplace, id: SlotId, proof: Groth16Proof): ?TransactionResponse {.contract.}
proc markProofAsMissing*(marketplace: Marketplace, id: SlotId, period: UInt256) {.contract.} proc markProofAsMissing*(marketplace: Marketplace, id: SlotId, period: UInt256): ?TransactionResponse {.contract.}

View File

@ -50,7 +50,7 @@ asyncchecksuite "Block Advertising and Discovery":
blockDiscovery = MockDiscovery.new() blockDiscovery = MockDiscovery.new()
wallet = WalletRef.example wallet = WalletRef.example
network = BlockExcNetwork.new(switch) network = BlockExcNetwork.new(switch)
localStore = CacheStore.new(blocks.mapIt( it )) localStore = CacheStore.new(blocks.mapIt(it))
peerStore = PeerCtxStore.new() peerStore = PeerCtxStore.new()
pendingBlocks = PendingBlocksManager.new() pendingBlocks = PendingBlocksManager.new()
@ -92,57 +92,40 @@ asyncchecksuite "Block Advertising and Discovery":
blockDiscovery.findBlockProvidersHandler = blockDiscovery.findBlockProvidersHandler =
proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] {.async.} = proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] {.async.} =
await engine.resolveBlocks(blocks.filterIt( it.cid == cid )) await engine.resolveBlocks(blocks.filterIt(it.cid == cid))
await allFuturesThrowing( await allFuturesThrowing(
allFinished(pendingBlocks)) allFinished(pendingBlocks))
await engine.stop() await engine.stop()
test "Should advertise both manifests and blocks": test "Should advertise both manifests and trees":
let let
cids = @[manifest.cid.tryGet, manifest.treeCid]
advertised = initTable.collect: advertised = initTable.collect:
for b in (blocks & manifestBlock): {b.cid: newFuture[void]()} for cid in cids: {cid: newFuture[void]()}
blockDiscovery blockDiscovery
.publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async.} = .publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async.} =
if cid in advertised and not advertised[cid].finished(): if cid in advertised and not advertised[cid].finished():
advertised[cid].complete() advertised[cid].complete()
discovery.advertiseType = BlockType.Both await engine.start()
await engine.start() # fire up advertise loop
await allFuturesThrowing( await allFuturesThrowing(
allFinished(toSeq(advertised.values))) allFinished(toSeq(advertised.values)))
await engine.stop() await engine.stop()
test "Should advertise local manifests": test "Should not advertise local blocks":
let let
advertised = newFuture[Cid]() blockCids = blocks.mapIt(it.cid)
blockDiscovery blockDiscovery
.publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async.} = .publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async.} =
check manifestBlock.cid == cid check:
advertised.complete(cid) cid notin blockCids
discovery.advertiseType = BlockType.Manifest await engine.start()
await engine.start() # fire up advertise loop await sleepAsync(3.seconds)
check (await advertised.wait(10.millis)) == manifestBlock.cid
await engine.stop()
test "Should advertise local blocks":
let
advertised = initTable.collect:
for b in blocks: {b.cid: newFuture[void]()}
blockDiscovery
.publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async.} =
if cid in advertised and not advertised[cid].finished():
advertised[cid].complete()
discovery.advertiseType = BlockType.Block
await engine.start() # fire up advertise loop
await allFuturesThrowing(
allFinished(toSeq(advertised.values)))
await engine.stop() await engine.stop()
test "Should not launch discovery if remote peer has block": test "Should not launch discovery if remote peer has block":
@ -165,7 +148,7 @@ asyncchecksuite "Block Advertising and Discovery":
proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] = proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] =
check false check false
await engine.start() # fire up discovery loop await engine.start()
engine.pendingBlocks.resolve(blocks.mapIt(BlockDelivery(blk: it, address: it.address))) engine.pendingBlocks.resolve(blocks.mapIt(BlockDelivery(blk: it, address: it.address)))
await allFuturesThrowing( await allFuturesThrowing(
@ -173,23 +156,33 @@ asyncchecksuite "Block Advertising and Discovery":
await engine.stop() await engine.stop()
asyncchecksuite "E2E - Multiple Nodes Discovery": proc asBlock(m: Manifest): bt.Block =
let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256) let mdata = m.encode().tryGet()
bt.Block.new(data = mdata, codec = ManifestCodec).tryGet()
asyncchecksuite "E2E - Multiple Nodes Discovery":
var var
switch: seq[Switch] switch: seq[Switch]
blockexc: seq[NetworkStore] blockexc: seq[NetworkStore]
blocks: seq[bt.Block] manifests: seq[Manifest]
mBlocks: seq[bt.Block]
trees: seq[CodexTree]
setup: setup:
while true:
let chunk = await chunker.getBytes()
if chunk.len <= 0:
break
blocks.add(bt.Block.new(chunk).tryGet())
for _ in 0..<4: for _ in 0..<4:
let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256)
var blocks = newSeq[bt.Block]()
while true:
let chunk = await chunker.getBytes()
if chunk.len <= 0:
break
blocks.add(bt.Block.new(chunk).tryGet())
let (manifest, tree) = makeManifestAndTree(blocks).tryGet()
manifests.add(manifest)
mBlocks.add(manifest.asBlock())
trees.add(tree)
let let
s = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr}) s = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr})
blockDiscovery = MockDiscovery.new() blockDiscovery = MockDiscovery.new()
@ -223,9 +216,12 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
teardown: teardown:
switch = @[] switch = @[]
blockexc = @[] blockexc = @[]
manifests = @[]
mBlocks = @[]
trees = @[]
test "E2E - Should advertise and discover blocks": test "E2E - Should advertise and discover blocks":
# Distribute the blocks amongst 1..3 # Distribute the manifests and trees amongst 1..3
# Ask 0 to download everything without connecting him beforehand # Ask 0 to download everything without connecting him beforehand
var advertised: Table[Cid, SignedPeerRecord] var advertised: Table[Cid, SignedPeerRecord]
@ -242,14 +238,14 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
.publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} = .publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} =
advertised[cid] = switch[3].peerInfo.signedPeerRecord advertised[cid] = switch[3].peerInfo.signedPeerRecord
discard blocks[0..5].mapIt(blockexc[1].engine.pendingBlocks.getWantHandle(it.address)) discard blockexc[1].engine.pendingBlocks.getWantHandle(mBlocks[0].cid)
await blockexc[1].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[0..5].mapIt(BlockDelivery(blk: it, address: it.address))) await blockexc[1].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[0], address: BlockAddress(leaf: false, cid: mBlocks[0].cid))])
discard blocks[4..10].mapIt(blockexc[2].engine.pendingBlocks.getWantHandle(it.address)) discard blockexc[2].engine.pendingBlocks.getWantHandle(mBlocks[1].cid)
await blockexc[2].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[4..10].mapIt(BlockDelivery(blk: it, address: it.address))) await blockexc[2].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[1], address: BlockAddress(leaf: false, cid: mBlocks[1].cid))])
discard blocks[10..15].mapIt(blockexc[3].engine.pendingBlocks.getWantHandle(it.address)) discard blockexc[3].engine.pendingBlocks.getWantHandle(mBlocks[2].cid)
await blockexc[3].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[10..15].mapIt(BlockDelivery(blk: it, address: it.address))) await blockexc[3].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[2], address: BlockAddress(leaf: false, cid: mBlocks[2].cid))])
MockDiscovery(blockexc[0].engine.discovery.discovery) MockDiscovery(blockexc[0].engine.discovery.discovery)
.findBlockProvidersHandler = proc(d: MockDiscovery, cid: Cid): .findBlockProvidersHandler = proc(d: MockDiscovery, cid: Cid):
@ -258,22 +254,22 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
result.add(advertised[cid]) result.add(advertised[cid])
let futs = collect(newSeq): let futs = collect(newSeq):
for b in blocks: for m in mBlocks[0..2]:
blockexc[0].engine.requestBlock(b.cid) blockexc[0].engine.requestBlock(m.cid)
await allFuturesThrowing( await allFuturesThrowing(
switch.mapIt( it.start() ) & switch.mapIt(it.start()) &
blockexc.mapIt( it.engine.start() )).wait(10.seconds) blockexc.mapIt(it.engine.start())).wait(10.seconds)
await allFutures(futs).wait(10.seconds) await allFutures(futs).wait(10.seconds)
await allFuturesThrowing( await allFuturesThrowing(
blockexc.mapIt( it.engine.stop() ) & blockexc.mapIt(it.engine.stop()) &
switch.mapIt( it.stop() )).wait(10.seconds) switch.mapIt(it.stop())).wait(10.seconds)
test "E2E - Should advertise and discover blocks with peers already connected": test "E2E - Should advertise and discover blocks with peers already connected":
# Distribute the blocks amongst 1..3 # Distribute the blocks amongst 1..3
# Ask 0 to download everything without connecting him beforehand # Ask 0 to download everything *WITH* connecting him beforehand
var advertised: Table[Cid, SignedPeerRecord] var advertised: Table[Cid, SignedPeerRecord]
@ -289,14 +285,14 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
.publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} = .publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} =
advertised[cid] = switch[3].peerInfo.signedPeerRecord advertised[cid] = switch[3].peerInfo.signedPeerRecord
discard blocks[0..5].mapIt(blockexc[1].engine.pendingBlocks.getWantHandle(it.address)) discard blockexc[1].engine.pendingBlocks.getWantHandle(mBlocks[0].cid)
await blockexc[1].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[0..5].mapIt(BlockDelivery(blk: it, address: it.address))) await blockexc[1].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[0], address: BlockAddress(leaf: false, cid: mBlocks[0].cid))])
discard blocks[4..10].mapIt(blockexc[2].engine.pendingBlocks.getWantHandle(it.address)) discard blockexc[2].engine.pendingBlocks.getWantHandle(mBlocks[1].cid)
await blockexc[2].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[4..10].mapIt(BlockDelivery(blk: it, address: it.address))) await blockexc[2].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[1], address: BlockAddress(leaf: false, cid: mBlocks[1].cid))])
discard blocks[10..15].mapIt(blockexc[3].engine.pendingBlocks.getWantHandle(it.address)) discard blockexc[3].engine.pendingBlocks.getWantHandle(mBlocks[2].cid)
await blockexc[3].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[10..15].mapIt(BlockDelivery(blk: it, address: it.address))) await blockexc[3].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[2], address: BlockAddress(leaf: false, cid: mBlocks[2].cid))])
MockDiscovery(blockexc[0].engine.discovery.discovery) MockDiscovery(blockexc[0].engine.discovery.discovery)
.findBlockProvidersHandler = proc(d: MockDiscovery, cid: Cid): .findBlockProvidersHandler = proc(d: MockDiscovery, cid: Cid):
@ -305,14 +301,14 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
return @[advertised[cid]] return @[advertised[cid]]
let let
futs = blocks.mapIt( blockexc[0].engine.requestBlock( it.cid ) ) futs = mBlocks[0..2].mapIt(blockexc[0].engine.requestBlock(it.cid))
await allFuturesThrowing( await allFuturesThrowing(
switch.mapIt( it.start() ) & switch.mapIt(it.start()) &
blockexc.mapIt( it.engine.start() )).wait(10.seconds) blockexc.mapIt(it.engine.start())).wait(10.seconds)
await allFutures(futs).wait(10.seconds) await allFutures(futs).wait(10.seconds)
await allFuturesThrowing( await allFuturesThrowing(
blockexc.mapIt( it.engine.stop() ) & blockexc.mapIt(it.engine.stop()) &
switch.mapIt( it.stop() )).wait(10.seconds) switch.mapIt(it.stop())).wait(10.seconds)

View File

@ -10,17 +10,26 @@ import pkg/codex/blockexchange
import pkg/codex/chunker import pkg/codex/chunker
import pkg/codex/blocktype as bt import pkg/codex/blocktype as bt
import pkg/codex/blockexchange/engine import pkg/codex/blockexchange/engine
import pkg/codex/manifest
import pkg/codex/merkletree
import ../../../asynctest import ../../../asynctest
import ../../helpers import ../../helpers
import ../../helpers/mockdiscovery import ../../helpers/mockdiscovery
import ../../examples import ../../examples
proc asBlock(m: Manifest): bt.Block =
let mdata = m.encode().tryGet()
bt.Block.new(data = mdata, codec = ManifestCodec).tryGet()
asyncchecksuite "Test Discovery Engine": asyncchecksuite "Test Discovery Engine":
let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256) let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256)
var var
blocks: seq[bt.Block] blocks: seq[bt.Block]
manifest: Manifest
tree: CodexTree
manifestBlock: bt.Block
switch: Switch switch: Switch
peerStore: PeerCtxStore peerStore: PeerCtxStore
blockDiscovery: MockDiscovery blockDiscovery: MockDiscovery
@ -35,6 +44,10 @@ asyncchecksuite "Test Discovery Engine":
blocks.add(bt.Block.new(chunk).tryGet()) blocks.add(bt.Block.new(chunk).tryGet())
(manifest, tree) = makeManifestAndTree(blocks).tryGet()
manifestBlock = manifest.asBlock()
blocks.add(manifestBlock)
switch = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr}) switch = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr})
network = BlockExcNetwork.new(switch) network = BlockExcNetwork.new(switch)
peerStore = PeerCtxStore.new() peerStore = PeerCtxStore.new()
@ -51,11 +64,11 @@ asyncchecksuite "Test Discovery Engine":
blockDiscovery, blockDiscovery,
pendingBlocks, pendingBlocks,
discoveryLoopSleep = 100.millis) discoveryLoopSleep = 100.millis)
wants = blocks.mapIt( pendingBlocks.getWantHandle(it.cid) ) wants = blocks.mapIt(pendingBlocks.getWantHandle(it.cid) )
blockDiscovery.findBlockProvidersHandler = blockDiscovery.findBlockProvidersHandler =
proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] {.async, gcsafe.} = proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] {.async, gcsafe.} =
pendingBlocks.resolve(blocks.filterIt( it.cid == cid).mapIt(BlockDelivery(blk: it, address: it.address))) pendingBlocks.resolve(blocks.filterIt(it.cid == cid).mapIt(BlockDelivery(blk: it, address: it.address)))
await discoveryEngine.start() await discoveryEngine.start()
await allFuturesThrowing(allFinished(wants)).wait(1.seconds) await allFuturesThrowing(allFinished(wants)).wait(1.seconds)
@ -63,7 +76,7 @@ asyncchecksuite "Test Discovery Engine":
test "Should Advertise Haves": test "Should Advertise Haves":
var var
localStore = CacheStore.new(blocks.mapIt( it )) localStore = CacheStore.new(blocks.mapIt(it))
discoveryEngine = DiscoveryEngine.new( discoveryEngine = DiscoveryEngine.new(
localStore, localStore,
peerStore, peerStore,
@ -72,8 +85,8 @@ asyncchecksuite "Test Discovery Engine":
pendingBlocks, pendingBlocks,
discoveryLoopSleep = 100.millis) discoveryLoopSleep = 100.millis)
haves = collect(initTable): haves = collect(initTable):
for b in blocks: for cid in @[manifestBlock.cid, manifest.treeCid]:
{ b.cid: newFuture[void]() } { cid: newFuture[void]() }
blockDiscovery.publishBlockProvideHandler = blockDiscovery.publishBlockProvideHandler =
proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} = proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} =
@ -108,28 +121,6 @@ asyncchecksuite "Test Discovery Engine":
await want.wait(1.seconds) await want.wait(1.seconds)
await discoveryEngine.stop() await discoveryEngine.stop()
test "Should queue advertise request":
var
localStore = CacheStore.new(@[blocks[0]])
discoveryEngine = DiscoveryEngine.new(
localStore,
peerStore,
network,
blockDiscovery,
pendingBlocks,
discoveryLoopSleep = 100.millis)
have = newFuture[void]()
blockDiscovery.publishBlockProvideHandler =
proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} =
check cid == blocks[0].cid
if not have.finished:
have.complete()
await discoveryEngine.start()
await have.wait(1.seconds)
await discoveryEngine.stop()
test "Should not request more than minPeersPerBlock": test "Should not request more than minPeersPerBlock":
var var
localStore = CacheStore.new() localStore = CacheStore.new()

View File

@ -42,7 +42,7 @@ ethersuite "Marketplace contracts":
discard await marketplace.requestStorage(request) discard await marketplace.requestStorage(request)
switchAccount(host) switchAccount(host)
discard await token.approve(marketplace.address, request.ask.collateral) discard await token.approve(marketplace.address, request.ask.collateral)
await marketplace.fillSlot(request.id, 0.u256, proof) discard await marketplace.fillSlot(request.id, 0.u256, proof)
slotId = request.slotId(0.u256) slotId = request.slotId(0.u256)
proc waitUntilProofRequired(slotId: SlotId) {.async.} = proc waitUntilProofRequired(slotId: SlotId) {.async.} =
@ -57,12 +57,12 @@ ethersuite "Marketplace contracts":
proc startContract() {.async.} = proc startContract() {.async.} =
for slotIndex in 1..<request.ask.slots: for slotIndex in 1..<request.ask.slots:
discard await token.approve(marketplace.address, request.ask.collateral) discard await token.approve(marketplace.address, request.ask.collateral)
await marketplace.fillSlot(request.id, slotIndex.u256, proof) discard await marketplace.fillSlot(request.id, slotIndex.u256, proof)
test "accept marketplace proofs": test "accept marketplace proofs":
switchAccount(host) switchAccount(host)
await waitUntilProofRequired(slotId) await waitUntilProofRequired(slotId)
await marketplace.submitProof(slotId, proof) discard await marketplace.submitProof(slotId, proof)
test "can mark missing proofs": test "can mark missing proofs":
switchAccount(host) switchAccount(host)
@ -71,7 +71,7 @@ ethersuite "Marketplace contracts":
let endOfPeriod = periodicity.periodEnd(missingPeriod) let endOfPeriod = periodicity.periodEnd(missingPeriod)
await ethProvider.advanceTimeTo(endOfPeriod + 1) await ethProvider.advanceTimeTo(endOfPeriod + 1)
switchAccount(client) switchAccount(client)
await marketplace.markProofAsMissing(slotId, missingPeriod) discard await marketplace.markProofAsMissing(slotId, missingPeriod)
test "can be paid out at the end": test "can be paid out at the end":
switchAccount(host) switchAccount(host)
@ -80,7 +80,7 @@ ethersuite "Marketplace contracts":
let requestEnd = await marketplace.requestEnd(request.id) let requestEnd = await marketplace.requestEnd(request.id)
await ethProvider.advanceTimeTo(requestEnd.u256 + 1) await ethProvider.advanceTimeTo(requestEnd.u256 + 1)
let startBalance = await token.balanceOf(address) let startBalance = await token.balanceOf(address)
await marketplace.freeSlot(slotId) discard await marketplace.freeSlot(slotId)
let endBalance = await token.balanceOf(address) let endBalance = await token.balanceOf(address)
check endBalance == (startBalance + request.ask.duration * request.ask.reward + request.ask.collateral) check endBalance == (startBalance + request.ask.duration * request.ask.reward + request.ask.collateral)

View File

@ -241,7 +241,7 @@ ethersuite "On-Chain Market":
await waitUntilProofRequired(slotId) await waitUntilProofRequired(slotId)
let missingPeriod = periodicity.periodOf(await ethProvider.currentTime()) let missingPeriod = periodicity.periodOf(await ethProvider.currentTime())
await advanceToNextPeriod() await advanceToNextPeriod()
await marketplace.markProofAsMissing(slotId, missingPeriod) discard await marketplace.markProofAsMissing(slotId, missingPeriod)
check receivedIds == @[request.id] check receivedIds == @[request.id]
await subscription.unsubscribe() await subscription.unsubscribe()