WIP: add additional tests

This commit is contained in:
E M 2026-03-23 21:09:13 +11:00
parent 554ea683a3
commit 81922158c2
No known key found for this signature in database
3 changed files with 212 additions and 15 deletions

View File

@ -254,8 +254,6 @@ proc cancel(
storage: ptr StorageServer, cCid: cstring
): Future[Result[string, string]] {.raises: [], async: (raises: []).} =
## Cancel the download session identified by cid.
## This operation is not supported when using the stream mode,
## because the worker will be busy downloading the file.
let cid = Cid.init($cCid)
if cid.isErr:

View File

@ -112,6 +112,10 @@ const _lib = {
storage_exists: lib.func(
'int storage_exists(void *ctx, str cid, StorageCallback *callback, void *userData)'
),
// peerAddresses is const char ** — pass via koffi.as(addrs, 'char **') or null
storage_connect: lib.func(
'int storage_connect(void *ctx, str peerId, void *peerAddresses, size_t peerAddressesSize, StorageCallback *callback, void *userData)'
),
};
/**
@ -132,15 +136,11 @@ const _lib = {
*/
function callAsync(thunk, { onProgress } = {}) {
return new Promise((resolve, reject) => {
const keepAlive = setInterval(() => {}, 200);
let cb;
cb = koffi.register((ret, msg, _len, _userData) => {
let cb = koffi.register((ret, msg, len, _userData) => {
if (ret === RET_PROGRESS) {
if (onProgress) onProgress(msg);
if (onProgress) onProgress(msg, len, _userData);
return; // callback will fire again; keepAlive remains active
}
clearInterval(keepAlive);
koffi.unregister(cb);
if (ret === RET_OK) resolve(msg ?? '');
else reject(new Error(msg ?? `libstorage error (ret=${ret})`));
@ -148,7 +148,6 @@ function callAsync(thunk, { onProgress } = {}) {
const syncRet = thunk(cb);
if (typeof syncRet === 'number' && syncRet !== RET_OK) {
clearInterval(keepAlive);
koffi.unregister(cb);
reject(new Error(`libstorage sync dispatch failed (ret=${syncRet})`));
}
@ -246,14 +245,16 @@ export class StorageNode {
);
const buf = Buffer.from(content, 'utf8');
await callAsync(
cb => _lib.storage_upload_chunk(this.#ctx, sessionId, buf, buf.byteLength, cb, null)
);
for (let offset = 0; offset < buf.length; offset += chunkSize) {
const chunk = buf.subarray(offset, Math.min(offset + chunkSize, buf.length));
await callAsync(
cb => _lib.storage_upload_chunk(this.#ctx, sessionId, chunk, chunk.byteLength, cb, null)
);
}
const cid = await callAsync(
return callAsync(
cb => _lib.storage_upload_finalize(this.#ctx, sessionId, cb, null)
);
return cid;
}
/**
@ -281,12 +282,81 @@ export class StorageNode {
const chunks = [];
await callAsync(
cb => _lib.storage_download_stream(this.#ctx, cid, chunkSize, local, null, cb, null),
{ onProgress: (msg) => { if (msg) chunks.push(msg); } }
{
onProgress: (chunk, bytes, userData) => {
if (chunk) {
chunks.push(chunk);
console.log("download progress: received chunk of size ", bytes, " userData: ", userData);
}
}
}
);
return chunks.join('');
}
/**
* Download content by CID. Collects all RET_PROGRESS chunk messages and
* joins them as the full content string.
*
* @param {string} cid
* @param {boolean} local - true = local store only, false = fetch from network
* @param {number} chunkSize
* @returns {Promise<string>} the downloaded content as a UTF-8 string
*/
async downloadContentByChunks(cid, local = false, chunkSize = 64 * 1024) {
await callAsync(
cb => _lib.storage_download_init(this.#ctx, cid, chunkSize, local, cb, null)
);
const chunks = [];
let finished = false;
while(!finished) {
await callAsync(
cb => _lib.storage_download_chunk(this.#ctx, cid, cb, null),
{
onProgress: (chunk, bytes, userData) => {
if (chunk) {
chunks.push(chunk);
console.log("download chunk: received chunk of size ", bytes, " userData: ", userData);
console.debug("chunk length", chunk.length, "bytes", bytes, "chunkSize", chunkSize, " chunk content: ", chunk);
if (bytes === chunkSize) {
console.debug("still more data to come...");
return; // expect more chunks to come; keep waiting
}
}
finished = true;
}
}
);
}
return chunks.join('');
}
/**
* Stream content by CID. Each RET_PROGRESS chunk message calls the callback with the chunk data.
*
* @param {string} cid
* @param {boolean} local - true = local store only, false = fetch from network
* @param {number} chunkSize
* @param {Function} onProgress - callback(chunk, bytes, userData) called for each chunk; chunk is null on final callback
* @returns {Promise<string>} the downloaded content as a UTF-8 string
*/
async streamContent(cid, local = false, chunkSize = 64 * 1024, onProgress = null) {
await callAsync(
cb => _lib.storage_download_init(this.#ctx, cid, chunkSize, local, cb, null)
);
const chunks = [];
await callAsync(
cb => _lib.storage_download_stream(this.#ctx, cid, chunkSize, local, null, cb, null),
{ onProgress }
);
}
/**
* Cancel an in-progress download for a CID.
* @param {string} cid
@ -348,6 +418,28 @@ export class StorageNode {
return callAsync(cb => _lib.storage_fetch(this.#ctx, cid, cb, null));
}
/**
* Initialise a download session without streaming allows callers to cancel
* before any data is transferred.
* @param {string} cid
* @param {boolean} local
* @param {number} chunkSize
*/
downloadInit(cid, local = false, chunkSize = 64 * 1024) {
return callAsync(cb => _lib.storage_download_init(this.#ctx, cid, chunkSize, local, cb, null));
}
/**
* Connect to a peer by peer ID and optional multiaddresses.
* If no addresses are given the library falls back to DHT lookup.
* @param {string} peerId - base58 libp2p peer ID
* @param {string[]} addresses - multiaddresses, e.g. ['/ip4/127.0.0.1/tcp/8792']
*/
connect(peerId, addresses = []) {
const addrs = addresses.length > 0 ? koffi.as(addresses, 'char **') : null;
return callAsync(cb => _lib.storage_connect(this.#ctx, peerId, addrs, addresses.length, cb, null));
}
/** Graceful shutdown: stop → close → destroy */
async shutdown() {
await this.stop();

View File

@ -228,6 +228,19 @@ describe('two-node tests', () => {
assert.equal(await nodeB.downloadContent(cid, true), content);
});
it('node B can download all chunks of a file from node A, which exists locally', async () => {
// 200 KB with a 16 KB chunk size → 13 chunks; exercises the upload loop
// and multi-progress download on the receiving side.
// const chunkSize = 16 * 1024;
const content = 'A'.repeat(200 * 1024);
const cid = await nodeA.uploadContent(content, 'fetch-test.txt');//, chunkSize);
assert.equal(await nodeB.exists(cid), false);
let downloaded = await nodeB.downloadContentByChunks(cid, false);//, chunkSize);
assert.equal(await nodeB.exists(cid), true);
assert.equal(downloaded, content);
});
it('fetched content is equivalent to streamed content', async () => {
const content = 'Content for storage_fetch test';
const cid = await nodeA.uploadContent(content, 'fetch-test.txt');
@ -256,4 +269,98 @@ describe('two-node tests', () => {
assert.ok(info.table?.nodes.length == 1);
assert.equal(info.table?.nodes[0].peerId, await nodeB.peerId());
});
it('large multi-chunk file transfers intact across nodes', async () => {
// 200 KB with a 16 KB chunk size → 13 chunks; exercises the upload loop
// and multi-progress download on the receiving side.
const chunkSize = 16 * 1024;
const content = 'A'.repeat(200 * 1024);
const cid = await nodeA.uploadContent(content, 'large.txt', chunkSize);
// Verify locally on A first
const local = await nodeA.downloadContent(cid, true, chunkSize);
assert.equal(local.length, content.length, 'local: length mismatch');
assert.equal(local, content, 'local: content mismatch');
// Then verify across the p2p network
const remote = await nodeB.downloadContent(cid, false, chunkSize);
assert.equal(remote.length, content.length, 'remote: length mismatch');
assert.equal(remote, content, 'remote: content mismatch');
});
it('node B remains functional after cancelling a download init', async () => {
// Upload two independent pieces of content on A.
const cid1 = await nodeA.uploadContent('content to be cancelled');
const cid2 = await nodeA.uploadContent('content downloaded after cancel');
// Init a download for cid1, then immediately cancel it.
await nodeB.downloadInit(cid1, false);
await nodeB.downloadCancel(cid1);
// After cancelling, the library transfers blocks into B's local store as
// a side-effect of the init, so cid1 should exist locally on B.
assert.equal(await nodeB.exists(cid1), true, 'cancelled CID should be in local store after init');
// More importantly, node B must still be functional: it can download a
// completely fresh CID from the network without any issues.
const downloaded = await nodeB.downloadContent(cid2, false);
assert.equal(downloaded, 'content downloaded after cancel');
});
});
// ---------------------------------------------------------------------------
// Explicit-connect tests — nodes start with no knowledge of each other;
// connection is established by calling storage_connect with peer ID + addrs.
// ---------------------------------------------------------------------------
describe('explicit connect tests', () => {
let nodeA, nodeB;
beforeEach(async () => {
const [listenA, discA, listenB, discB] = await Promise.all([
findFreePort(PORT_BASE + 20),
findFreePort(PORT_BASE + 21),
findFreePort(PORT_BASE + 22),
findFreePort(PORT_BASE + 23),
]);
// Start A with no bootstrap; B also starts with no knowledge of A
nodeA = new StorageNode();
await nodeA.create(nodeConfig('a-exp', {
'listen-port': listenA,
'disc-port': discA,
'nat': 'extip:127.0.0.1',
}));
await nodeA.start();
nodeB = new StorageNode();
await nodeB.create(nodeConfig('b-exp', {
'listen-port': listenB,
'disc-port': discB,
'nat': 'extip:127.0.0.1',
}));
await nodeB.start();
});
afterEach(async () => {
await Promise.allSettled([nodeA.shutdown(), nodeB.shutdown()]);
});
it('node B connects to node A via peer ID and multiaddresses', async () => {
// Get A's peer identity and listen addresses from its debug info
const debugA = JSON.parse(await nodeA.debug());
const peerIdA = debugA.id;
// Filter to loopback addresses only so the connect is local
const addrsA = (debugA.addrs ?? []).filter(a => a.includes('127.0.0.1'));
assert.ok(addrsA.length > 0, 'node A must have at least one loopback address');
// B explicitly connects to A by peer ID + addresses (no DHT, no bootstrap)
await nodeB.connect(peerIdA, addrsA);
// Verify the connection works end-to-end
const content = 'Connected via explicit peer ID + addresses';
const cid = await nodeA.uploadContent(content);
const downloaded = await nodeB.downloadContent(cid, false);
assert.equal(downloaded, content);
});
});