import std/os import std/options import std/math import pkg/asynctest import pkg/chronos import pkg/chronicles import pkg/stew/byteutils import pkg/datastore import pkg/questionable import pkg/stint import pkg/nitro import pkg/codexdht/discv5/protocol as discv5 import pkg/codex/stores import pkg/codex/clock import pkg/codex/contracts import pkg/codex/systemclock import pkg/codex/blockexchange import pkg/codex/chunker import pkg/codex/node import pkg/codex/manifest import pkg/codex/discovery import pkg/codex/blocktype as bt import ../examples import ./helpers import ./helpers/mockmarket import ./helpers/mockclock asyncchecksuite "Test Node": let (path, _, _) = instantiationInfo(-2, fullPaths = true) # get this file's name var file: File chunker: Chunker switch: Switch wallet: WalletRef network: BlockExcNetwork clock: Clock localStore: RepoStore localStoreRepoDs: DataStore localStoreMetaDs: DataStore engine: BlockExcEngine store: NetworkStore node: CodexNodeRef blockDiscovery: Discovery peerStore: PeerCtxStore pendingBlocks: PendingBlocksManager discovery: DiscoveryEngine proc fetch(T: type Manifest, chunker: Chunker): Future[Manifest] {.async.} = # Collect blocks from Chunker into Manifest await storeDataGetManifest(localStore, chunker) proc retrieve(cid: Cid): Future[seq[byte]] {.async.} = # Retrieve an entire file contents by file Cid let oddChunkSize = math.trunc(DefaultBlockSize.float/1.359).int # Let's check that node.retrieve can correctly rechunk data stream = (await node.retrieve(cid)).tryGet() var data: seq[byte] defer: await stream.close() while not stream.atEof: var buf = newSeq[byte](oddChunkSize) res = await stream.readOnce(addr buf[0], oddChunkSize) check res <= oddChunkSize buf.setLen(res) data &= buf return data setup: file = open(path.splitFile().dir /../ "fixtures" / "test.jpg") chunker = FileChunker.new(file = file, chunkSize = DefaultBlockSize) switch = newStandardSwitch() wallet = WalletRef.new(EthPrivateKey.random()) network = BlockExcNetwork.new(switch) clock = SystemClock.new() localStoreMetaDs = SQLiteDatastore.new(Memory).tryGet() localStoreRepoDs = SQLiteDatastore.new(Memory).tryGet() localStore = RepoStore.new(localStoreRepoDs, localStoreMetaDs, clock=clock) await localStore.start() blockDiscovery = Discovery.new( switch.peerInfo.privateKey, announceAddrs = @[MultiAddress.init("/ip4/127.0.0.1/tcp/0") .expect("Should return multiaddress")]) peerStore = PeerCtxStore.new() pendingBlocks = PendingBlocksManager.new() discovery = DiscoveryEngine.new(localStore, peerStore, network, blockDiscovery, pendingBlocks) engine = BlockExcEngine.new(localStore, wallet, network, discovery, peerStore, pendingBlocks) store = NetworkStore.new(engine, localStore) node = CodexNodeRef.new(switch, store, engine, nil, blockDiscovery) # TODO: pass `Erasure` await node.start() teardown: close(file) await node.stop() test "Fetch Manifest": let manifest = await Manifest.fetch(chunker) manifestBlock = bt.Block.new( manifest.encode().tryGet(), codec = DagPBCodec ).tryGet() (await localStore.putBlock(manifestBlock)).tryGet() let fetched = (await node.fetchManifest(manifestBlock.cid)).tryGet() check: fetched == manifest test "Block Batching": let manifest = await Manifest.fetch(chunker) for batchSize in 1..12: (await node.fetchBatched( manifest, batchSize = batchSize, proc(blocks: seq[bt.Block]) {.gcsafe, async.} = check blocks.len > 0 and blocks.len <= batchSize )).tryGet() test "Block Batching with expiry": let manifest = await Manifest.fetch(chunker) # The blocks have set default TTL, so in order to update it we have to have larger TTL expectedExpiry: SecondsSince1970 = clock.now + DefaultBlockTtl.seconds + 123 (await node.fetchBatched(manifest, expiry=some expectedExpiry)).tryGet() for index in 0.. 0): original &= chunk await stream.pushData(chunk) finally: await stream.pushEof() await stream.close() let manifestCid = (await storeFut).tryGet() check: (await localStore.hasBlock(manifestCid)).tryGet() let manifestBlock = (await localStore.getBlock(manifestCid)).tryGet() localManifest = Manifest.decode(manifestBlock).tryGet() let data = await retrieve(manifestCid) check: data.len == localManifest.datasetSize.int data.len == original.len sha256.digest(data) == sha256.digest(original) test "Retrieve One Block": let testString = "Block 1" blk = bt.Block.new(testString.toBytes).tryGet() (await localStore.putBlock(blk)).tryGet() let stream = (await node.retrieve(blk.cid)).tryGet() defer: await stream.close() var data = newSeq[byte](testString.len) await stream.readExactly(addr data[0], data.len) check string.fromBytes(data) == testString asyncchecksuite "Test Node - host contracts": let (path, _, _) = instantiationInfo(-2, fullPaths = true) # get this file's name var file: File chunker: Chunker switch: Switch wallet: WalletRef network: BlockExcNetwork clock: MockClock localStore: RepoStore localStoreRepoDs: DataStore localStoreMetaDs: DataStore engine: BlockExcEngine store: NetworkStore sales: Sales node: CodexNodeRef blockDiscovery: Discovery peerStore: PeerCtxStore pendingBlocks: PendingBlocksManager discovery: DiscoveryEngine manifest: Manifest manifestCid: string proc fetch(T: type Manifest, chunker: Chunker): Future[Manifest] {.async.} = # Collect blocks from Chunker into Manifest await storeDataGetManifest(localStore, chunker) setup: file = open(path.splitFile().dir /../ "fixtures" / "test.jpg") chunker = FileChunker.new(file = file, chunkSize = DefaultBlockSize) switch = newStandardSwitch() wallet = WalletRef.new(EthPrivateKey.random()) network = BlockExcNetwork.new(switch) clock = MockClock.new() localStoreMetaDs = SQLiteDatastore.new(Memory).tryGet() localStoreRepoDs = SQLiteDatastore.new(Memory).tryGet() localStore = RepoStore.new(localStoreRepoDs, localStoreMetaDs, clock=clock) await localStore.start() blockDiscovery = Discovery.new( switch.peerInfo.privateKey, announceAddrs = @[MultiAddress.init("/ip4/127.0.0.1/tcp/0") .expect("Should return multiaddress")]) peerStore = PeerCtxStore.new() pendingBlocks = PendingBlocksManager.new() discovery = DiscoveryEngine.new(localStore, peerStore, network, blockDiscovery, pendingBlocks) engine = BlockExcEngine.new(localStore, wallet, network, discovery, peerStore, pendingBlocks) store = NetworkStore.new(engine, localStore) node = CodexNodeRef.new(switch, store, engine, nil, blockDiscovery) # TODO: pass `Erasure` # Setup Host Contracts and dependencies let market = MockMarket.new() sales = Sales.new(market, clock, localStore, 0) let hostContracts = some HostInteractions.new(clock, sales) node.contracts = (ClientInteractions.none, hostContracts, ValidatorInteractions.none) await node.start() # Populate manifest in local store manifest = await storeDataGetManifest(localStore, chunker) let manifestBlock = bt.Block.new( manifest.encode().tryGet(), codec = DagPBCodec ).tryGet() manifestCid = $(manifestBlock.cid) (await localStore.putBlock(manifestBlock)).tryGet() teardown: close(file) await node.stop() test "onExpiryUpdate callback is set": check sales.onExpiryUpdate.isSome test "onExpiryUpdate callback": let # The blocks have set default TTL, so in order to update it we have to have larger TTL expectedExpiry: SecondsSince1970 = clock.now + DefaultBlockTtl.seconds + 11123 expiryUpdateCallback = !sales.onExpiryUpdate (await expiryUpdateCallback(manifestCid, expectedExpiry)).tryGet() for index in 0..