Merge branch 'master' into feat/chronos-v4
This commit is contained in:
commit
b9e2ae8bf6
|
@ -11,6 +11,7 @@ import std/sequtils
|
||||||
|
|
||||||
import pkg/chronos
|
import pkg/chronos
|
||||||
import pkg/libp2p/cid
|
import pkg/libp2p/cid
|
||||||
|
import pkg/libp2p/multicodec
|
||||||
import pkg/metrics
|
import pkg/metrics
|
||||||
import pkg/questionable
|
import pkg/questionable
|
||||||
import pkg/questionable/results
|
import pkg/questionable/results
|
||||||
|
@ -25,6 +26,7 @@ import ../../utils
|
||||||
import ../../discovery
|
import ../../discovery
|
||||||
import ../../stores/blockstore
|
import ../../stores/blockstore
|
||||||
import ../../logutils
|
import ../../logutils
|
||||||
|
import ../../manifest
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "codex discoveryengine"
|
topics = "codex discoveryengine"
|
||||||
|
@ -76,14 +78,32 @@ proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} =
|
||||||
|
|
||||||
await sleepAsync(b.discoveryLoopSleep)
|
await sleepAsync(b.discoveryLoopSleep)
|
||||||
|
|
||||||
|
proc advertiseBlock(b: DiscoveryEngine, cid: Cid) {.async.} =
|
||||||
|
without isM =? cid.isManifest, err:
|
||||||
|
warn "Unable to determine if cid is manifest"
|
||||||
|
return
|
||||||
|
|
||||||
|
if isM:
|
||||||
|
without blk =? await b.localStore.getBlock(cid), err:
|
||||||
|
error "Error retrieving manifest block", cid, err = err.msg
|
||||||
|
return
|
||||||
|
|
||||||
|
without manifest =? Manifest.decode(blk), err:
|
||||||
|
error "Unable to decode as manifest", err = err.msg
|
||||||
|
return
|
||||||
|
|
||||||
|
# announce manifest cid and tree cid
|
||||||
|
await b.advertiseQueue.put(cid)
|
||||||
|
await b.advertiseQueue.put(manifest.treeCid)
|
||||||
|
|
||||||
proc advertiseQueueLoop(b: DiscoveryEngine) {.async.} =
|
proc advertiseQueueLoop(b: DiscoveryEngine) {.async.} =
|
||||||
while b.discEngineRunning:
|
while b.discEngineRunning:
|
||||||
if cids =? await b.localStore.listBlocks(blockType = b.advertiseType):
|
if cids =? await b.localStore.listBlocks(blockType = b.advertiseType):
|
||||||
trace "Begin iterating blocks..."
|
trace "Begin iterating blocks..."
|
||||||
for c in cids:
|
for c in cids:
|
||||||
if cid =? await c:
|
if cid =? await c:
|
||||||
await b.advertiseQueue.put(cid)
|
b.advertiseBlock(cid)
|
||||||
await sleepAsync(50.millis)
|
await sleepAsync(100.millis)
|
||||||
trace "Iterating blocks finished."
|
trace "Iterating blocks finished."
|
||||||
|
|
||||||
await sleepAsync(b.advertiseLoopSleep)
|
await sleepAsync(b.advertiseLoopSleep)
|
||||||
|
@ -248,7 +268,7 @@ proc new*(
|
||||||
discoveryLoopSleep = DefaultDiscoveryLoopSleep,
|
discoveryLoopSleep = DefaultDiscoveryLoopSleep,
|
||||||
advertiseLoopSleep = DefaultAdvertiseLoopSleep,
|
advertiseLoopSleep = DefaultAdvertiseLoopSleep,
|
||||||
minPeersPerBlock = DefaultMinPeersPerBlock,
|
minPeersPerBlock = DefaultMinPeersPerBlock,
|
||||||
advertiseType = BlockType.Both
|
advertiseType = BlockType.Manifest
|
||||||
): DiscoveryEngine =
|
): DiscoveryEngine =
|
||||||
## Create a discovery engine instance for advertising services
|
## Create a discovery engine instance for advertising services
|
||||||
##
|
##
|
||||||
|
|
|
@ -46,7 +46,7 @@ proc approveFunds(market: OnChainMarket, amount: UInt256) {.async.} =
|
||||||
convertEthersError:
|
convertEthersError:
|
||||||
let tokenAddress = await market.contract.token()
|
let tokenAddress = await market.contract.token()
|
||||||
let token = Erc20Token.new(tokenAddress, market.signer)
|
let token = Erc20Token.new(tokenAddress, market.signer)
|
||||||
discard await token.increaseAllowance(market.contract.address(), amount).confirm(1)
|
discard await token.increaseAllowance(market.contract.address(), amount).confirm(0)
|
||||||
|
|
||||||
method getZkeyHash*(market: OnChainMarket): Future[?string] {.async.} =
|
method getZkeyHash*(market: OnChainMarket): Future[?string] {.async.} =
|
||||||
let config = await market.contract.config()
|
let config = await market.contract.config()
|
||||||
|
@ -92,7 +92,7 @@ method requestStorage(market: OnChainMarket, request: StorageRequest){.async.} =
|
||||||
convertEthersError:
|
convertEthersError:
|
||||||
debug "Requesting storage"
|
debug "Requesting storage"
|
||||||
await market.approveFunds(request.price())
|
await market.approveFunds(request.price())
|
||||||
await market.contract.requestStorage(request)
|
discard await market.contract.requestStorage(request).confirm(0)
|
||||||
|
|
||||||
method getRequest(market: OnChainMarket,
|
method getRequest(market: OnChainMarket,
|
||||||
id: RequestId): Future[?StorageRequest] {.async.} =
|
id: RequestId): Future[?StorageRequest] {.async.} =
|
||||||
|
@ -159,16 +159,16 @@ method fillSlot(market: OnChainMarket,
|
||||||
collateral: UInt256) {.async.} =
|
collateral: UInt256) {.async.} =
|
||||||
convertEthersError:
|
convertEthersError:
|
||||||
await market.approveFunds(collateral)
|
await market.approveFunds(collateral)
|
||||||
await market.contract.fillSlot(requestId, slotIndex, proof)
|
discard await market.contract.fillSlot(requestId, slotIndex, proof).confirm(0)
|
||||||
|
|
||||||
method freeSlot*(market: OnChainMarket, slotId: SlotId) {.async.} =
|
method freeSlot*(market: OnChainMarket, slotId: SlotId) {.async.} =
|
||||||
convertEthersError:
|
convertEthersError:
|
||||||
await market.contract.freeSlot(slotId)
|
discard await market.contract.freeSlot(slotId).confirm(0)
|
||||||
|
|
||||||
method withdrawFunds(market: OnChainMarket,
|
method withdrawFunds(market: OnChainMarket,
|
||||||
requestId: RequestId) {.async.} =
|
requestId: RequestId) {.async.} =
|
||||||
convertEthersError:
|
convertEthersError:
|
||||||
await market.contract.withdrawFunds(requestId)
|
discard await market.contract.withdrawFunds(requestId).confirm(0)
|
||||||
|
|
||||||
method isProofRequired*(market: OnChainMarket,
|
method isProofRequired*(market: OnChainMarket,
|
||||||
id: SlotId): Future[bool] {.async.} =
|
id: SlotId): Future[bool] {.async.} =
|
||||||
|
@ -201,13 +201,13 @@ method submitProof*(market: OnChainMarket,
|
||||||
id: SlotId,
|
id: SlotId,
|
||||||
proof: Groth16Proof) {.async.} =
|
proof: Groth16Proof) {.async.} =
|
||||||
convertEthersError:
|
convertEthersError:
|
||||||
await market.contract.submitProof(id, proof)
|
discard await market.contract.submitProof(id, proof).confirm(0)
|
||||||
|
|
||||||
method markProofAsMissing*(market: OnChainMarket,
|
method markProofAsMissing*(market: OnChainMarket,
|
||||||
id: SlotId,
|
id: SlotId,
|
||||||
period: Period) {.async.} =
|
period: Period) {.async.} =
|
||||||
convertEthersError:
|
convertEthersError:
|
||||||
await market.contract.markProofAsMissing(id, period)
|
discard await market.contract.markProofAsMissing(id, period).confirm(0)
|
||||||
|
|
||||||
method canProofBeMarkedAsMissing*(
|
method canProofBeMarkedAsMissing*(
|
||||||
market: OnChainMarket,
|
market: OnChainMarket,
|
||||||
|
@ -218,7 +218,7 @@ method canProofBeMarkedAsMissing*(
|
||||||
let contractWithoutSigner = market.contract.connect(provider)
|
let contractWithoutSigner = market.contract.connect(provider)
|
||||||
let overrides = CallOverrides(blockTag: some BlockTag.pending)
|
let overrides = CallOverrides(blockTag: some BlockTag.pending)
|
||||||
try:
|
try:
|
||||||
await contractWithoutSigner.markProofAsMissing(id, period, overrides)
|
discard await contractWithoutSigner.markProofAsMissing(id, period, overrides)
|
||||||
return true
|
return true
|
||||||
except EthersError as e:
|
except EthersError as e:
|
||||||
trace "Proof cannot be marked as missing", msg = e.msg
|
trace "Proof cannot be marked as missing", msg = e.msg
|
||||||
|
|
|
@ -42,10 +42,10 @@ proc slashMisses*(marketplace: Marketplace): UInt256 {.contract, view.}
|
||||||
proc slashPercentage*(marketplace: Marketplace): UInt256 {.contract, view.}
|
proc slashPercentage*(marketplace: Marketplace): UInt256 {.contract, view.}
|
||||||
proc minCollateralThreshold*(marketplace: Marketplace): UInt256 {.contract, view.}
|
proc minCollateralThreshold*(marketplace: Marketplace): UInt256 {.contract, view.}
|
||||||
|
|
||||||
proc requestStorage*(marketplace: Marketplace, request: StorageRequest) {.contract.}
|
proc requestStorage*(marketplace: Marketplace, request: StorageRequest): ?TransactionResponse {.contract.}
|
||||||
proc fillSlot*(marketplace: Marketplace, requestId: RequestId, slotIndex: UInt256, proof: Groth16Proof) {.contract.}
|
proc fillSlot*(marketplace: Marketplace, requestId: RequestId, slotIndex: UInt256, proof: Groth16Proof): ?TransactionResponse {.contract.}
|
||||||
proc withdrawFunds*(marketplace: Marketplace, requestId: RequestId) {.contract.}
|
proc withdrawFunds*(marketplace: Marketplace, requestId: RequestId): ?TransactionResponse {.contract.}
|
||||||
proc freeSlot*(marketplace: Marketplace, id: SlotId) {.contract.}
|
proc freeSlot*(marketplace: Marketplace, id: SlotId): ?TransactionResponse {.contract.}
|
||||||
proc getRequest*(marketplace: Marketplace, id: RequestId): StorageRequest {.contract, view.}
|
proc getRequest*(marketplace: Marketplace, id: RequestId): StorageRequest {.contract, view.}
|
||||||
proc getHost*(marketplace: Marketplace, id: SlotId): Address {.contract, view.}
|
proc getHost*(marketplace: Marketplace, id: SlotId): Address {.contract, view.}
|
||||||
proc getActiveSlot*(marketplace: Marketplace, id: SlotId): Slot {.contract, view.}
|
proc getActiveSlot*(marketplace: Marketplace, id: SlotId): Slot {.contract, view.}
|
||||||
|
@ -66,5 +66,5 @@ proc willProofBeRequired*(marketplace: Marketplace, id: SlotId): bool {.contract
|
||||||
proc getChallenge*(marketplace: Marketplace, id: SlotId): array[32, byte] {.contract, view.}
|
proc getChallenge*(marketplace: Marketplace, id: SlotId): array[32, byte] {.contract, view.}
|
||||||
proc getPointer*(marketplace: Marketplace, id: SlotId): uint8 {.contract, view.}
|
proc getPointer*(marketplace: Marketplace, id: SlotId): uint8 {.contract, view.}
|
||||||
|
|
||||||
proc submitProof*(marketplace: Marketplace, id: SlotId, proof: Groth16Proof) {.contract.}
|
proc submitProof*(marketplace: Marketplace, id: SlotId, proof: Groth16Proof): ?TransactionResponse {.contract.}
|
||||||
proc markProofAsMissing*(marketplace: Marketplace, id: SlotId, period: UInt256) {.contract.}
|
proc markProofAsMissing*(marketplace: Marketplace, id: SlotId, period: UInt256): ?TransactionResponse {.contract.}
|
||||||
|
|
|
@ -50,7 +50,7 @@ asyncchecksuite "Block Advertising and Discovery":
|
||||||
blockDiscovery = MockDiscovery.new()
|
blockDiscovery = MockDiscovery.new()
|
||||||
wallet = WalletRef.example
|
wallet = WalletRef.example
|
||||||
network = BlockExcNetwork.new(switch)
|
network = BlockExcNetwork.new(switch)
|
||||||
localStore = CacheStore.new(blocks.mapIt( it ))
|
localStore = CacheStore.new(blocks.mapIt(it))
|
||||||
peerStore = PeerCtxStore.new()
|
peerStore = PeerCtxStore.new()
|
||||||
pendingBlocks = PendingBlocksManager.new()
|
pendingBlocks = PendingBlocksManager.new()
|
||||||
|
|
||||||
|
@ -92,60 +92,43 @@ asyncchecksuite "Block Advertising and Discovery":
|
||||||
|
|
||||||
blockDiscovery.findBlockProvidersHandler =
|
blockDiscovery.findBlockProvidersHandler =
|
||||||
proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] {.async.} =
|
proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] {.async.} =
|
||||||
await engine.resolveBlocks(blocks.filterIt( it.cid == cid ))
|
await engine.resolveBlocks(blocks.filterIt(it.cid == cid))
|
||||||
|
|
||||||
await allFuturesThrowing(
|
await allFuturesThrowing(
|
||||||
allFinished(pendingBlocks))
|
allFinished(pendingBlocks))
|
||||||
|
|
||||||
await engine.stop()
|
await engine.stop()
|
||||||
|
|
||||||
test "Should advertise both manifests and blocks":
|
test "Should advertise both manifests and trees":
|
||||||
let
|
let
|
||||||
|
cids = @[manifest.cid.tryGet, manifest.treeCid]
|
||||||
advertised = initTable.collect:
|
advertised = initTable.collect:
|
||||||
for b in (blocks & manifestBlock): {b.cid: newFuture[void]()}
|
for cid in cids: {cid: newFuture[void]()}
|
||||||
|
|
||||||
blockDiscovery
|
blockDiscovery
|
||||||
.publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async.} =
|
.publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async.} =
|
||||||
if cid in advertised and not advertised[cid].finished():
|
if cid in advertised and not advertised[cid].finished():
|
||||||
advertised[cid].complete()
|
advertised[cid].complete()
|
||||||
|
|
||||||
discovery.advertiseType = BlockType.Both
|
await engine.start()
|
||||||
await engine.start() # fire up advertise loop
|
|
||||||
await allFuturesThrowing(
|
await allFuturesThrowing(
|
||||||
allFinished(toSeq(advertised.values)))
|
allFinished(toSeq(advertised.values)))
|
||||||
await engine.stop()
|
await engine.stop()
|
||||||
|
|
||||||
test "Should advertise local manifests":
|
test "Should not advertise local blocks":
|
||||||
let
|
let
|
||||||
advertised = newFuture[Cid]()
|
blockCids = blocks.mapIt(it.cid)
|
||||||
|
|
||||||
blockDiscovery
|
blockDiscovery
|
||||||
.publishBlockProvideHandler = proc(
|
.publishBlockProvideHandler = proc(
|
||||||
d: MockDiscovery,
|
d: MockDiscovery,
|
||||||
cid: Cid
|
cid: Cid
|
||||||
) {.async: (handleException: true).} =
|
) {.async: (handleException: true).} =
|
||||||
check manifestBlock.cid == cid
|
check:
|
||||||
advertised.complete(cid)
|
cid notin blockCids
|
||||||
|
|
||||||
discovery.advertiseType = BlockType.Manifest
|
await engine.start()
|
||||||
await engine.start() # fire up advertise loop
|
await sleepAsync(3.seconds)
|
||||||
check (await advertised.wait(10.millis)) == manifestBlock.cid
|
|
||||||
await engine.stop()
|
|
||||||
|
|
||||||
test "Should advertise local blocks":
|
|
||||||
let
|
|
||||||
advertised = initTable.collect:
|
|
||||||
for b in blocks: {b.cid: newFuture[void]()}
|
|
||||||
|
|
||||||
blockDiscovery
|
|
||||||
.publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async.} =
|
|
||||||
if cid in advertised and not advertised[cid].finished():
|
|
||||||
advertised[cid].complete()
|
|
||||||
|
|
||||||
discovery.advertiseType = BlockType.Block
|
|
||||||
await engine.start() # fire up advertise loop
|
|
||||||
await allFuturesThrowing(
|
|
||||||
allFinished(toSeq(advertised.values)))
|
|
||||||
await engine.stop()
|
await engine.stop()
|
||||||
|
|
||||||
test "Should not launch discovery if remote peer has block":
|
test "Should not launch discovery if remote peer has block":
|
||||||
|
@ -171,7 +154,7 @@ asyncchecksuite "Block Advertising and Discovery":
|
||||||
): Future[seq[SignedPeerRecord]] {.async: (handleException: true).} =
|
): Future[seq[SignedPeerRecord]] {.async: (handleException: true).} =
|
||||||
check false
|
check false
|
||||||
|
|
||||||
await engine.start() # fire up discovery loop
|
await engine.start()
|
||||||
engine.pendingBlocks.resolve(blocks.mapIt(BlockDelivery(blk: it, address: it.address)))
|
engine.pendingBlocks.resolve(blocks.mapIt(BlockDelivery(blk: it, address: it.address)))
|
||||||
|
|
||||||
await allFuturesThrowing(
|
await allFuturesThrowing(
|
||||||
|
@ -179,23 +162,33 @@ asyncchecksuite "Block Advertising and Discovery":
|
||||||
|
|
||||||
await engine.stop()
|
await engine.stop()
|
||||||
|
|
||||||
asyncchecksuite "E2E - Multiple Nodes Discovery":
|
proc asBlock(m: Manifest): bt.Block =
|
||||||
let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256)
|
let mdata = m.encode().tryGet()
|
||||||
|
bt.Block.new(data = mdata, codec = ManifestCodec).tryGet()
|
||||||
|
|
||||||
|
asyncchecksuite "E2E - Multiple Nodes Discovery":
|
||||||
var
|
var
|
||||||
switch: seq[Switch]
|
switch: seq[Switch]
|
||||||
blockexc: seq[NetworkStore]
|
blockexc: seq[NetworkStore]
|
||||||
blocks: seq[bt.Block]
|
manifests: seq[Manifest]
|
||||||
|
mBlocks: seq[bt.Block]
|
||||||
|
trees: seq[CodexTree]
|
||||||
|
|
||||||
setup:
|
setup:
|
||||||
while true:
|
|
||||||
let chunk = await chunker.getBytes()
|
|
||||||
if chunk.len <= 0:
|
|
||||||
break
|
|
||||||
|
|
||||||
blocks.add(bt.Block.new(chunk).tryGet())
|
|
||||||
|
|
||||||
for _ in 0..<4:
|
for _ in 0..<4:
|
||||||
|
let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256)
|
||||||
|
var blocks = newSeq[bt.Block]()
|
||||||
|
while true:
|
||||||
|
let chunk = await chunker.getBytes()
|
||||||
|
if chunk.len <= 0:
|
||||||
|
break
|
||||||
|
|
||||||
|
blocks.add(bt.Block.new(chunk).tryGet())
|
||||||
|
let (manifest, tree) = makeManifestAndTree(blocks).tryGet()
|
||||||
|
manifests.add(manifest)
|
||||||
|
mBlocks.add(manifest.asBlock())
|
||||||
|
trees.add(tree)
|
||||||
|
|
||||||
let
|
let
|
||||||
s = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr})
|
s = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr})
|
||||||
blockDiscovery = MockDiscovery.new()
|
blockDiscovery = MockDiscovery.new()
|
||||||
|
@ -229,9 +222,12 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
|
||||||
teardown:
|
teardown:
|
||||||
switch = @[]
|
switch = @[]
|
||||||
blockexc = @[]
|
blockexc = @[]
|
||||||
|
manifests = @[]
|
||||||
|
mBlocks = @[]
|
||||||
|
trees = @[]
|
||||||
|
|
||||||
test "E2E - Should advertise and discover blocks":
|
test "E2E - Should advertise and discover blocks":
|
||||||
# Distribute the blocks amongst 1..3
|
# Distribute the manifests and trees amongst 1..3
|
||||||
# Ask 0 to download everything without connecting him beforehand
|
# Ask 0 to download everything without connecting him beforehand
|
||||||
|
|
||||||
var advertised: Table[Cid, SignedPeerRecord]
|
var advertised: Table[Cid, SignedPeerRecord]
|
||||||
|
@ -248,14 +244,14 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
|
||||||
.publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} =
|
.publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} =
|
||||||
advertised[cid] = switch[3].peerInfo.signedPeerRecord
|
advertised[cid] = switch[3].peerInfo.signedPeerRecord
|
||||||
|
|
||||||
discard blocks[0..5].mapIt(blockexc[1].engine.pendingBlocks.getWantHandle(it.address))
|
discard blockexc[1].engine.pendingBlocks.getWantHandle(mBlocks[0].cid)
|
||||||
await blockexc[1].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[0..5].mapIt(BlockDelivery(blk: it, address: it.address)))
|
await blockexc[1].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[0], address: BlockAddress(leaf: false, cid: mBlocks[0].cid))])
|
||||||
|
|
||||||
discard blocks[4..10].mapIt(blockexc[2].engine.pendingBlocks.getWantHandle(it.address))
|
discard blockexc[2].engine.pendingBlocks.getWantHandle(mBlocks[1].cid)
|
||||||
await blockexc[2].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[4..10].mapIt(BlockDelivery(blk: it, address: it.address)))
|
await blockexc[2].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[1], address: BlockAddress(leaf: false, cid: mBlocks[1].cid))])
|
||||||
|
|
||||||
discard blocks[10..15].mapIt(blockexc[3].engine.pendingBlocks.getWantHandle(it.address))
|
discard blockexc[3].engine.pendingBlocks.getWantHandle(mBlocks[2].cid)
|
||||||
await blockexc[3].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[10..15].mapIt(BlockDelivery(blk: it, address: it.address)))
|
await blockexc[3].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[2], address: BlockAddress(leaf: false, cid: mBlocks[2].cid))])
|
||||||
|
|
||||||
MockDiscovery(blockexc[0].engine.discovery.discovery)
|
MockDiscovery(blockexc[0].engine.discovery.discovery)
|
||||||
.findBlockProvidersHandler = proc(d: MockDiscovery, cid: Cid):
|
.findBlockProvidersHandler = proc(d: MockDiscovery, cid: Cid):
|
||||||
|
@ -264,22 +260,22 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
|
||||||
result.add(advertised[cid])
|
result.add(advertised[cid])
|
||||||
|
|
||||||
let futs = collect(newSeq):
|
let futs = collect(newSeq):
|
||||||
for b in blocks:
|
for m in mBlocks[0..2]:
|
||||||
blockexc[0].engine.requestBlock(b.cid)
|
blockexc[0].engine.requestBlock(m.cid)
|
||||||
|
|
||||||
await allFuturesThrowing(
|
await allFuturesThrowing(
|
||||||
switch.mapIt( it.start() ) &
|
switch.mapIt(it.start()) &
|
||||||
blockexc.mapIt( it.engine.start() )).wait(10.seconds)
|
blockexc.mapIt(it.engine.start())).wait(10.seconds)
|
||||||
|
|
||||||
await allFutures(futs).wait(10.seconds)
|
await allFutures(futs).wait(10.seconds)
|
||||||
|
|
||||||
await allFuturesThrowing(
|
await allFuturesThrowing(
|
||||||
blockexc.mapIt( it.engine.stop() ) &
|
blockexc.mapIt(it.engine.stop()) &
|
||||||
switch.mapIt( it.stop() )).wait(10.seconds)
|
switch.mapIt(it.stop())).wait(10.seconds)
|
||||||
|
|
||||||
test "E2E - Should advertise and discover blocks with peers already connected":
|
test "E2E - Should advertise and discover blocks with peers already connected":
|
||||||
# Distribute the blocks amongst 1..3
|
# Distribute the blocks amongst 1..3
|
||||||
# Ask 0 to download everything without connecting him beforehand
|
# Ask 0 to download everything *WITH* connecting him beforehand
|
||||||
|
|
||||||
var advertised: Table[Cid, SignedPeerRecord]
|
var advertised: Table[Cid, SignedPeerRecord]
|
||||||
|
|
||||||
|
@ -295,14 +291,14 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
|
||||||
.publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} =
|
.publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} =
|
||||||
advertised[cid] = switch[3].peerInfo.signedPeerRecord
|
advertised[cid] = switch[3].peerInfo.signedPeerRecord
|
||||||
|
|
||||||
discard blocks[0..5].mapIt(blockexc[1].engine.pendingBlocks.getWantHandle(it.address))
|
discard blockexc[1].engine.pendingBlocks.getWantHandle(mBlocks[0].cid)
|
||||||
await blockexc[1].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[0..5].mapIt(BlockDelivery(blk: it, address: it.address)))
|
await blockexc[1].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[0], address: BlockAddress(leaf: false, cid: mBlocks[0].cid))])
|
||||||
|
|
||||||
discard blocks[4..10].mapIt(blockexc[2].engine.pendingBlocks.getWantHandle(it.address))
|
discard blockexc[2].engine.pendingBlocks.getWantHandle(mBlocks[1].cid)
|
||||||
await blockexc[2].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[4..10].mapIt(BlockDelivery(blk: it, address: it.address)))
|
await blockexc[2].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[1], address: BlockAddress(leaf: false, cid: mBlocks[1].cid))])
|
||||||
|
|
||||||
discard blocks[10..15].mapIt(blockexc[3].engine.pendingBlocks.getWantHandle(it.address))
|
discard blockexc[3].engine.pendingBlocks.getWantHandle(mBlocks[2].cid)
|
||||||
await blockexc[3].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[10..15].mapIt(BlockDelivery(blk: it, address: it.address)))
|
await blockexc[3].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[2], address: BlockAddress(leaf: false, cid: mBlocks[2].cid))])
|
||||||
|
|
||||||
MockDiscovery(blockexc[0].engine.discovery.discovery)
|
MockDiscovery(blockexc[0].engine.discovery.discovery)
|
||||||
.findBlockProvidersHandler = proc(d: MockDiscovery, cid: Cid):
|
.findBlockProvidersHandler = proc(d: MockDiscovery, cid: Cid):
|
||||||
|
@ -311,14 +307,14 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
|
||||||
return @[advertised[cid]]
|
return @[advertised[cid]]
|
||||||
|
|
||||||
let
|
let
|
||||||
futs = blocks.mapIt( blockexc[0].engine.requestBlock( it.cid ) )
|
futs = mBlocks[0..2].mapIt(blockexc[0].engine.requestBlock(it.cid))
|
||||||
|
|
||||||
await allFuturesThrowing(
|
await allFuturesThrowing(
|
||||||
switch.mapIt( it.start() ) &
|
switch.mapIt(it.start()) &
|
||||||
blockexc.mapIt( it.engine.start() )).wait(10.seconds)
|
blockexc.mapIt(it.engine.start())).wait(10.seconds)
|
||||||
|
|
||||||
await allFutures(futs).wait(10.seconds)
|
await allFutures(futs).wait(10.seconds)
|
||||||
|
|
||||||
await allFuturesThrowing(
|
await allFuturesThrowing(
|
||||||
blockexc.mapIt( it.engine.stop() ) &
|
blockexc.mapIt(it.engine.stop()) &
|
||||||
switch.mapIt( it.stop() )).wait(10.seconds)
|
switch.mapIt(it.stop())).wait(10.seconds)
|
||||||
|
|
|
@ -10,17 +10,26 @@ import pkg/codex/blockexchange
|
||||||
import pkg/codex/chunker
|
import pkg/codex/chunker
|
||||||
import pkg/codex/blocktype as bt
|
import pkg/codex/blocktype as bt
|
||||||
import pkg/codex/blockexchange/engine
|
import pkg/codex/blockexchange/engine
|
||||||
|
import pkg/codex/manifest
|
||||||
|
import pkg/codex/merkletree
|
||||||
|
|
||||||
import ../../../asynctest
|
import ../../../asynctest
|
||||||
import ../../helpers
|
import ../../helpers
|
||||||
import ../../helpers/mockdiscovery
|
import ../../helpers/mockdiscovery
|
||||||
import ../../examples
|
import ../../examples
|
||||||
|
|
||||||
|
proc asBlock(m: Manifest): bt.Block =
|
||||||
|
let mdata = m.encode().tryGet()
|
||||||
|
bt.Block.new(data = mdata, codec = ManifestCodec).tryGet()
|
||||||
|
|
||||||
asyncchecksuite "Test Discovery Engine":
|
asyncchecksuite "Test Discovery Engine":
|
||||||
let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256)
|
let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256)
|
||||||
|
|
||||||
var
|
var
|
||||||
blocks: seq[bt.Block]
|
blocks: seq[bt.Block]
|
||||||
|
manifest: Manifest
|
||||||
|
tree: CodexTree
|
||||||
|
manifestBlock: bt.Block
|
||||||
switch: Switch
|
switch: Switch
|
||||||
peerStore: PeerCtxStore
|
peerStore: PeerCtxStore
|
||||||
blockDiscovery: MockDiscovery
|
blockDiscovery: MockDiscovery
|
||||||
|
@ -35,6 +44,10 @@ asyncchecksuite "Test Discovery Engine":
|
||||||
|
|
||||||
blocks.add(bt.Block.new(chunk).tryGet())
|
blocks.add(bt.Block.new(chunk).tryGet())
|
||||||
|
|
||||||
|
(manifest, tree) = makeManifestAndTree(blocks).tryGet()
|
||||||
|
manifestBlock = manifest.asBlock()
|
||||||
|
blocks.add(manifestBlock)
|
||||||
|
|
||||||
switch = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr})
|
switch = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr})
|
||||||
network = BlockExcNetwork.new(switch)
|
network = BlockExcNetwork.new(switch)
|
||||||
peerStore = PeerCtxStore.new()
|
peerStore = PeerCtxStore.new()
|
||||||
|
@ -51,11 +64,11 @@ asyncchecksuite "Test Discovery Engine":
|
||||||
blockDiscovery,
|
blockDiscovery,
|
||||||
pendingBlocks,
|
pendingBlocks,
|
||||||
discoveryLoopSleep = 100.millis)
|
discoveryLoopSleep = 100.millis)
|
||||||
wants = blocks.mapIt( pendingBlocks.getWantHandle(it.cid) )
|
wants = blocks.mapIt(pendingBlocks.getWantHandle(it.cid) )
|
||||||
|
|
||||||
blockDiscovery.findBlockProvidersHandler =
|
blockDiscovery.findBlockProvidersHandler =
|
||||||
proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] {.async, gcsafe.} =
|
proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] {.async, gcsafe.} =
|
||||||
pendingBlocks.resolve(blocks.filterIt( it.cid == cid).mapIt(BlockDelivery(blk: it, address: it.address)))
|
pendingBlocks.resolve(blocks.filterIt(it.cid == cid).mapIt(BlockDelivery(blk: it, address: it.address)))
|
||||||
|
|
||||||
await discoveryEngine.start()
|
await discoveryEngine.start()
|
||||||
await allFuturesThrowing(allFinished(wants)).wait(1.seconds)
|
await allFuturesThrowing(allFinished(wants)).wait(1.seconds)
|
||||||
|
@ -63,7 +76,7 @@ asyncchecksuite "Test Discovery Engine":
|
||||||
|
|
||||||
test "Should Advertise Haves":
|
test "Should Advertise Haves":
|
||||||
var
|
var
|
||||||
localStore = CacheStore.new(blocks.mapIt( it ))
|
localStore = CacheStore.new(blocks.mapIt(it))
|
||||||
discoveryEngine = DiscoveryEngine.new(
|
discoveryEngine = DiscoveryEngine.new(
|
||||||
localStore,
|
localStore,
|
||||||
peerStore,
|
peerStore,
|
||||||
|
@ -72,8 +85,8 @@ asyncchecksuite "Test Discovery Engine":
|
||||||
pendingBlocks,
|
pendingBlocks,
|
||||||
discoveryLoopSleep = 100.millis)
|
discoveryLoopSleep = 100.millis)
|
||||||
haves = collect(initTable):
|
haves = collect(initTable):
|
||||||
for b in blocks:
|
for cid in @[manifestBlock.cid, manifest.treeCid]:
|
||||||
{ b.cid: newFuture[void]() }
|
{ cid: newFuture[void]() }
|
||||||
|
|
||||||
blockDiscovery.publishBlockProvideHandler =
|
blockDiscovery.publishBlockProvideHandler =
|
||||||
proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} =
|
proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} =
|
||||||
|
@ -111,31 +124,6 @@ asyncchecksuite "Test Discovery Engine":
|
||||||
await want.wait(1.seconds)
|
await want.wait(1.seconds)
|
||||||
await discoveryEngine.stop()
|
await discoveryEngine.stop()
|
||||||
|
|
||||||
test "Should queue advertise request":
|
|
||||||
var
|
|
||||||
localStore = CacheStore.new(@[blocks[0]])
|
|
||||||
discoveryEngine = DiscoveryEngine.new(
|
|
||||||
localStore,
|
|
||||||
peerStore,
|
|
||||||
network,
|
|
||||||
blockDiscovery,
|
|
||||||
pendingBlocks,
|
|
||||||
discoveryLoopSleep = 100.millis)
|
|
||||||
have = newFuture[void]()
|
|
||||||
|
|
||||||
blockDiscovery.publishBlockProvideHandler =
|
|
||||||
proc(
|
|
||||||
d: MockDiscovery,
|
|
||||||
cid: Cid
|
|
||||||
) {.async: (handleException: true), gcsafe.} =
|
|
||||||
check cid == blocks[0].cid
|
|
||||||
if not have.finished:
|
|
||||||
have.complete()
|
|
||||||
|
|
||||||
await discoveryEngine.start()
|
|
||||||
await have.wait(1.seconds)
|
|
||||||
await discoveryEngine.stop()
|
|
||||||
|
|
||||||
test "Should not request more than minPeersPerBlock":
|
test "Should not request more than minPeersPerBlock":
|
||||||
var
|
var
|
||||||
localStore = CacheStore.new()
|
localStore = CacheStore.new()
|
||||||
|
|
|
@ -39,10 +39,10 @@ ethersuite "Marketplace contracts":
|
||||||
|
|
||||||
switchAccount(client)
|
switchAccount(client)
|
||||||
discard await token.approve(marketplace.address, request.price)
|
discard await token.approve(marketplace.address, request.price)
|
||||||
await marketplace.requestStorage(request)
|
discard await marketplace.requestStorage(request)
|
||||||
switchAccount(host)
|
switchAccount(host)
|
||||||
discard await token.approve(marketplace.address, request.ask.collateral)
|
discard await token.approve(marketplace.address, request.ask.collateral)
|
||||||
await marketplace.fillSlot(request.id, 0.u256, proof)
|
discard await marketplace.fillSlot(request.id, 0.u256, proof)
|
||||||
slotId = request.slotId(0.u256)
|
slotId = request.slotId(0.u256)
|
||||||
|
|
||||||
proc waitUntilProofRequired(slotId: SlotId) {.async.} =
|
proc waitUntilProofRequired(slotId: SlotId) {.async.} =
|
||||||
|
@ -57,12 +57,12 @@ ethersuite "Marketplace contracts":
|
||||||
proc startContract() {.async.} =
|
proc startContract() {.async.} =
|
||||||
for slotIndex in 1..<request.ask.slots:
|
for slotIndex in 1..<request.ask.slots:
|
||||||
discard await token.approve(marketplace.address, request.ask.collateral)
|
discard await token.approve(marketplace.address, request.ask.collateral)
|
||||||
await marketplace.fillSlot(request.id, slotIndex.u256, proof)
|
discard await marketplace.fillSlot(request.id, slotIndex.u256, proof)
|
||||||
|
|
||||||
test "accept marketplace proofs":
|
test "accept marketplace proofs":
|
||||||
switchAccount(host)
|
switchAccount(host)
|
||||||
await waitUntilProofRequired(slotId)
|
await waitUntilProofRequired(slotId)
|
||||||
await marketplace.submitProof(slotId, proof)
|
discard await marketplace.submitProof(slotId, proof)
|
||||||
|
|
||||||
test "can mark missing proofs":
|
test "can mark missing proofs":
|
||||||
switchAccount(host)
|
switchAccount(host)
|
||||||
|
@ -71,7 +71,7 @@ ethersuite "Marketplace contracts":
|
||||||
let endOfPeriod = periodicity.periodEnd(missingPeriod)
|
let endOfPeriod = periodicity.periodEnd(missingPeriod)
|
||||||
await ethProvider.advanceTimeTo(endOfPeriod + 1)
|
await ethProvider.advanceTimeTo(endOfPeriod + 1)
|
||||||
switchAccount(client)
|
switchAccount(client)
|
||||||
await marketplace.markProofAsMissing(slotId, missingPeriod)
|
discard await marketplace.markProofAsMissing(slotId, missingPeriod)
|
||||||
|
|
||||||
test "can be paid out at the end":
|
test "can be paid out at the end":
|
||||||
switchAccount(host)
|
switchAccount(host)
|
||||||
|
@ -80,7 +80,7 @@ ethersuite "Marketplace contracts":
|
||||||
let requestEnd = await marketplace.requestEnd(request.id)
|
let requestEnd = await marketplace.requestEnd(request.id)
|
||||||
await ethProvider.advanceTimeTo(requestEnd.u256 + 1)
|
await ethProvider.advanceTimeTo(requestEnd.u256 + 1)
|
||||||
let startBalance = await token.balanceOf(address)
|
let startBalance = await token.balanceOf(address)
|
||||||
await marketplace.freeSlot(slotId)
|
discard await marketplace.freeSlot(slotId)
|
||||||
let endBalance = await token.balanceOf(address)
|
let endBalance = await token.balanceOf(address)
|
||||||
check endBalance == (startBalance + request.ask.duration * request.ask.reward + request.ask.collateral)
|
check endBalance == (startBalance + request.ask.duration * request.ask.reward + request.ask.collateral)
|
||||||
|
|
||||||
|
|
|
@ -241,7 +241,7 @@ ethersuite "On-Chain Market":
|
||||||
await waitUntilProofRequired(slotId)
|
await waitUntilProofRequired(slotId)
|
||||||
let missingPeriod = periodicity.periodOf(await ethProvider.currentTime())
|
let missingPeriod = periodicity.periodOf(await ethProvider.currentTime())
|
||||||
await advanceToNextPeriod()
|
await advanceToNextPeriod()
|
||||||
await marketplace.markProofAsMissing(slotId, missingPeriod)
|
discard await marketplace.markProofAsMissing(slotId, missingPeriod)
|
||||||
check receivedIds == @[request.id]
|
check receivedIds == @[request.id]
|
||||||
await subscription.unsubscribe()
|
await subscription.unsubscribe()
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue