diff --git a/library/storage_thread_requests/requests/node_download_request.nim b/library/storage_thread_requests/requests/node_download_request.nim index 44b38482..31e12e33 100644 --- a/library/storage_thread_requests/requests/node_download_request.nim +++ b/library/storage_thread_requests/requests/node_download_request.nim @@ -254,8 +254,6 @@ proc cancel( storage: ptr StorageServer, cCid: cstring ): Future[Result[string, string]] {.raises: [], async: (raises: []).} = ## Cancel the download session identified by cid. - ## This operation is not supported when using the stream mode, - ## because the worker will be busy downloading the file. let cid = Cid.init($cCid) if cid.isErr: diff --git a/tests/js/harness.js b/tests/js/harness.js index fda82dc2..179f76f3 100644 --- a/tests/js/harness.js +++ b/tests/js/harness.js @@ -112,6 +112,10 @@ const _lib = { storage_exists: lib.func( 'int storage_exists(void *ctx, str cid, StorageCallback *callback, void *userData)' ), + // peerAddresses is const char ** — pass via koffi.as(addrs, 'char **') or null + storage_connect: lib.func( + 'int storage_connect(void *ctx, str peerId, void *peerAddresses, size_t peerAddressesSize, StorageCallback *callback, void *userData)' + ), }; /** @@ -132,15 +136,11 @@ const _lib = { */ function callAsync(thunk, { onProgress } = {}) { return new Promise((resolve, reject) => { - const keepAlive = setInterval(() => {}, 200); - - let cb; - cb = koffi.register((ret, msg, _len, _userData) => { + let cb = koffi.register((ret, msg, len, _userData) => { if (ret === RET_PROGRESS) { - if (onProgress) onProgress(msg); + if (onProgress) onProgress(msg, len, _userData); return; // callback will fire again; keepAlive remains active } - clearInterval(keepAlive); koffi.unregister(cb); if (ret === RET_OK) resolve(msg ?? ''); else reject(new Error(msg ?? `libstorage error (ret=${ret})`)); @@ -148,7 +148,6 @@ function callAsync(thunk, { onProgress } = {}) { const syncRet = thunk(cb); if (typeof syncRet === 'number' && syncRet !== RET_OK) { - clearInterval(keepAlive); koffi.unregister(cb); reject(new Error(`libstorage sync dispatch failed (ret=${syncRet})`)); } @@ -246,14 +245,16 @@ export class StorageNode { ); const buf = Buffer.from(content, 'utf8'); - await callAsync( - cb => _lib.storage_upload_chunk(this.#ctx, sessionId, buf, buf.byteLength, cb, null) - ); + for (let offset = 0; offset < buf.length; offset += chunkSize) { + const chunk = buf.subarray(offset, Math.min(offset + chunkSize, buf.length)); + await callAsync( + cb => _lib.storage_upload_chunk(this.#ctx, sessionId, chunk, chunk.byteLength, cb, null) + ); + } - const cid = await callAsync( + return callAsync( cb => _lib.storage_upload_finalize(this.#ctx, sessionId, cb, null) ); - return cid; } /** @@ -281,12 +282,81 @@ export class StorageNode { const chunks = []; await callAsync( cb => _lib.storage_download_stream(this.#ctx, cid, chunkSize, local, null, cb, null), - { onProgress: (msg) => { if (msg) chunks.push(msg); } } + { + onProgress: (chunk, bytes, userData) => { + if (chunk) { + chunks.push(chunk); + console.log("download progress: received chunk of size ", bytes, " userData: ", userData); + } + } + } ); return chunks.join(''); } + /** + * Download content by CID. Collects all RET_PROGRESS chunk messages and + * joins them as the full content string. + * + * @param {string} cid + * @param {boolean} local - true = local store only, false = fetch from network + * @param {number} chunkSize + * @returns {Promise} the downloaded content as a UTF-8 string + */ + async downloadContentByChunks(cid, local = false, chunkSize = 64 * 1024) { + await callAsync( + cb => _lib.storage_download_init(this.#ctx, cid, chunkSize, local, cb, null) + ); + + const chunks = []; + let finished = false; + + while(!finished) { + await callAsync( + cb => _lib.storage_download_chunk(this.#ctx, cid, cb, null), + { + onProgress: (chunk, bytes, userData) => { + if (chunk) { + chunks.push(chunk); + console.log("download chunk: received chunk of size ", bytes, " userData: ", userData); + console.debug("chunk length", chunk.length, "bytes", bytes, "chunkSize", chunkSize, " chunk content: ", chunk); + if (bytes === chunkSize) { + console.debug("still more data to come..."); + return; // expect more chunks to come; keep waiting + } + } + finished = true; + } + } + ); + } + + + return chunks.join(''); + } + + /** + * Stream content by CID. Each RET_PROGRESS chunk message calls the callback with the chunk data. + * + * @param {string} cid + * @param {boolean} local - true = local store only, false = fetch from network + * @param {number} chunkSize + * @param {Function} onProgress - callback(chunk, bytes, userData) called for each chunk; chunk is null on final callback + * @returns {Promise} the downloaded content as a UTF-8 string + */ + async streamContent(cid, local = false, chunkSize = 64 * 1024, onProgress = null) { + await callAsync( + cb => _lib.storage_download_init(this.#ctx, cid, chunkSize, local, cb, null) + ); + + const chunks = []; + await callAsync( + cb => _lib.storage_download_stream(this.#ctx, cid, chunkSize, local, null, cb, null), + { onProgress } + ); + } + /** * Cancel an in-progress download for a CID. * @param {string} cid @@ -348,6 +418,28 @@ export class StorageNode { return callAsync(cb => _lib.storage_fetch(this.#ctx, cid, cb, null)); } + /** + * Initialise a download session without streaming — allows callers to cancel + * before any data is transferred. + * @param {string} cid + * @param {boolean} local + * @param {number} chunkSize + */ + downloadInit(cid, local = false, chunkSize = 64 * 1024) { + return callAsync(cb => _lib.storage_download_init(this.#ctx, cid, chunkSize, local, cb, null)); + } + + /** + * Connect to a peer by peer ID and optional multiaddresses. + * If no addresses are given the library falls back to DHT lookup. + * @param {string} peerId - base58 libp2p peer ID + * @param {string[]} addresses - multiaddresses, e.g. ['/ip4/127.0.0.1/tcp/8792'] + */ + connect(peerId, addresses = []) { + const addrs = addresses.length > 0 ? koffi.as(addresses, 'char **') : null; + return callAsync(cb => _lib.storage_connect(this.#ctx, peerId, addrs, addresses.length, cb, null)); + } + /** Graceful shutdown: stop → close → destroy */ async shutdown() { await this.stop(); diff --git a/tests/js/two-node-test.js b/tests/js/two-node-test.js index 0799b9a4..10501b14 100644 --- a/tests/js/two-node-test.js +++ b/tests/js/two-node-test.js @@ -228,6 +228,19 @@ describe('two-node tests', () => { assert.equal(await nodeB.downloadContent(cid, true), content); }); + it('node B can download all chunks of a file from node A, which exists locally', async () => { + // 200 KB with a 16 KB chunk size → 13 chunks; exercises the upload loop + // and multi-progress download on the receiving side. + // const chunkSize = 16 * 1024; + const content = 'A'.repeat(200 * 1024); + const cid = await nodeA.uploadContent(content, 'fetch-test.txt');//, chunkSize); + + assert.equal(await nodeB.exists(cid), false); + let downloaded = await nodeB.downloadContentByChunks(cid, false);//, chunkSize); + assert.equal(await nodeB.exists(cid), true); + assert.equal(downloaded, content); + }); + it('fetched content is equivalent to streamed content', async () => { const content = 'Content for storage_fetch test'; const cid = await nodeA.uploadContent(content, 'fetch-test.txt'); @@ -256,4 +269,98 @@ describe('two-node tests', () => { assert.ok(info.table?.nodes.length == 1); assert.equal(info.table?.nodes[0].peerId, await nodeB.peerId()); }); + + it('large multi-chunk file transfers intact across nodes', async () => { + // 200 KB with a 16 KB chunk size → 13 chunks; exercises the upload loop + // and multi-progress download on the receiving side. + const chunkSize = 16 * 1024; + const content = 'A'.repeat(200 * 1024); + const cid = await nodeA.uploadContent(content, 'large.txt', chunkSize); + + // Verify locally on A first + const local = await nodeA.downloadContent(cid, true, chunkSize); + assert.equal(local.length, content.length, 'local: length mismatch'); + assert.equal(local, content, 'local: content mismatch'); + + // Then verify across the p2p network + const remote = await nodeB.downloadContent(cid, false, chunkSize); + assert.equal(remote.length, content.length, 'remote: length mismatch'); + assert.equal(remote, content, 'remote: content mismatch'); + }); + + it('node B remains functional after cancelling a download init', async () => { + // Upload two independent pieces of content on A. + const cid1 = await nodeA.uploadContent('content to be cancelled'); + const cid2 = await nodeA.uploadContent('content downloaded after cancel'); + + // Init a download for cid1, then immediately cancel it. + await nodeB.downloadInit(cid1, false); + await nodeB.downloadCancel(cid1); + + // After cancelling, the library transfers blocks into B's local store as + // a side-effect of the init, so cid1 should exist locally on B. + assert.equal(await nodeB.exists(cid1), true, 'cancelled CID should be in local store after init'); + + // More importantly, node B must still be functional: it can download a + // completely fresh CID from the network without any issues. + const downloaded = await nodeB.downloadContent(cid2, false); + assert.equal(downloaded, 'content downloaded after cancel'); + }); +}); + +// --------------------------------------------------------------------------- +// Explicit-connect tests — nodes start with no knowledge of each other; +// connection is established by calling storage_connect with peer ID + addrs. +// --------------------------------------------------------------------------- + +describe('explicit connect tests', () => { + let nodeA, nodeB; + + beforeEach(async () => { + const [listenA, discA, listenB, discB] = await Promise.all([ + findFreePort(PORT_BASE + 20), + findFreePort(PORT_BASE + 21), + findFreePort(PORT_BASE + 22), + findFreePort(PORT_BASE + 23), + ]); + + // Start A with no bootstrap; B also starts with no knowledge of A + nodeA = new StorageNode(); + await nodeA.create(nodeConfig('a-exp', { + 'listen-port': listenA, + 'disc-port': discA, + 'nat': 'extip:127.0.0.1', + })); + await nodeA.start(); + + nodeB = new StorageNode(); + await nodeB.create(nodeConfig('b-exp', { + 'listen-port': listenB, + 'disc-port': discB, + 'nat': 'extip:127.0.0.1', + })); + await nodeB.start(); + }); + + afterEach(async () => { + await Promise.allSettled([nodeA.shutdown(), nodeB.shutdown()]); + }); + + it('node B connects to node A via peer ID and multiaddresses', async () => { + // Get A's peer identity and listen addresses from its debug info + const debugA = JSON.parse(await nodeA.debug()); + const peerIdA = debugA.id; + // Filter to loopback addresses only so the connect is local + const addrsA = (debugA.addrs ?? []).filter(a => a.includes('127.0.0.1')); + assert.ok(addrsA.length > 0, 'node A must have at least one loopback address'); + + // B explicitly connects to A by peer ID + addresses (no DHT, no bootstrap) + await nodeB.connect(peerIdA, addrsA); + + // Verify the connection works end-to-end + const content = 'Connected via explicit peer ID + addresses'; + const cid = await nodeA.uploadContent(content); + const downloaded = await nodeB.downloadContent(cid, false); + assert.equal(downloaded, content); + }); });