mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-05-12 14:29:39 +00:00
Add more tests
This commit is contained in:
parent
81922158c2
commit
03ba5a3886
@ -16,6 +16,7 @@ import path from 'path';
|
||||
import { fileURLToPath } from 'url';
|
||||
|
||||
export const __dirname = path.dirname(fileURLToPath(import.meta.url));
|
||||
export const DEBUG_ENABLED = Boolean(process.env.npm_config_debug);
|
||||
|
||||
const libName =
|
||||
process.platform === 'darwin' ? 'libstorage.dylib' :
|
||||
@ -32,7 +33,7 @@ export const RET_PROGRESS = 3;
|
||||
|
||||
// typedef void (*StorageCallback)(int callerRet, const char *msg, size_t len, void *userData)
|
||||
const StorageCallback = koffi.proto(
|
||||
'StorageCallback', 'void', ['int', 'str', 'size_t', 'void *']
|
||||
'StorageCallback', 'void', ['int', 'const char *', 'size_t', 'void *']
|
||||
);
|
||||
|
||||
// All libstorage function bindings
|
||||
@ -118,6 +119,12 @@ const _lib = {
|
||||
),
|
||||
};
|
||||
|
||||
export function debugLog(msg) {
|
||||
if (DEBUG_ENABLED) {
|
||||
console.debug(msg);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps a single async libstorage call into a Promise.
|
||||
*
|
||||
@ -285,8 +292,8 @@ export class StorageNode {
|
||||
{
|
||||
onProgress: (chunk, bytes, userData) => {
|
||||
if (chunk) {
|
||||
chunks.push(chunk);
|
||||
console.log("download progress: received chunk of size ", bytes, " userData: ", userData);
|
||||
chunks.push(chunk.substring(0, chunkSize));
|
||||
debugLog("download progress: received chunk of size ", bytes, " userData: ", userData);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -295,16 +302,36 @@ export class StorageNode {
|
||||
return chunks.join('');
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param {*} cid - manifest CID. The actual content CID is in the manifest's treeCid field, but the library needs the manifest CID to track the download session.
|
||||
* @param {*} chunkSize - must match the chunkSize used in downloadInit; used to determine whether more chunks are expected
|
||||
* @returns {Promise<{chunk: string, bytes: number, more: boolean}>} chunk is the downloaded chunk (up to chunkSize bytes); bytes is the number of bytes in the downloaded chunk; more indicates whether more chunks are expected after this one
|
||||
*/
|
||||
async downloadChunk(cid, chunkSize) {
|
||||
let result = null;
|
||||
await callAsync(cb => _lib.storage_download_chunk(this.#ctx, cid, cb, null), {
|
||||
onProgress: (chunk, bytes) => {
|
||||
result = {
|
||||
chunk: chunk?.substring(0, bytes),
|
||||
bytes,
|
||||
more: bytes === chunkSize,
|
||||
};
|
||||
},
|
||||
});
|
||||
return result ?? { chunk: '', bytes: 0, more: false };
|
||||
}
|
||||
|
||||
/**
|
||||
* Download content by CID. Collects all RET_PROGRESS chunk messages and
|
||||
* joins them as the full content string.
|
||||
* joins them as the full content string. From the caller's perspective this behaves the same as downloadContent, but it allows coverage of downloading by stream and by chunks.
|
||||
*
|
||||
* @param {string} cid
|
||||
* @param {boolean} local - true = local store only, false = fetch from network
|
||||
* @param {number} chunkSize
|
||||
* @returns {Promise<string>} the downloaded content as a UTF-8 string
|
||||
*/
|
||||
async downloadContentByChunks(cid, local = false, chunkSize = 64 * 1024) {
|
||||
async downloadAllChunks(cid, local = false, chunkSize = 64 * 1024) {
|
||||
await callAsync(
|
||||
cb => _lib.storage_download_init(this.#ctx, cid, chunkSize, local, cb, null)
|
||||
);
|
||||
@ -318,21 +345,20 @@ export class StorageNode {
|
||||
{
|
||||
onProgress: (chunk, bytes, userData) => {
|
||||
if (chunk) {
|
||||
chunks.push(chunk);
|
||||
console.log("download chunk: received chunk of size ", bytes, " userData: ", userData);
|
||||
console.debug("chunk length", chunk.length, "bytes", bytes, "chunkSize", chunkSize, " chunk content: ", chunk);
|
||||
if (bytes === chunkSize) {
|
||||
console.debug("still more data to come...");
|
||||
const c = chunk.substring(0, bytes);
|
||||
debugLog("Received chunk,", "length", c.length, "numBytes", bytes, "chunkSize", chunkSize, "more", c.length === chunkSize);
|
||||
chunks.push(c);
|
||||
if (c.length === chunkSize) {
|
||||
return; // expect more chunks to come; keep waiting
|
||||
}
|
||||
}
|
||||
debugLog("complete data:", chunks.join(''));
|
||||
finished = true;
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
|
||||
return chunks.join('');
|
||||
}
|
||||
|
||||
@ -350,7 +376,6 @@ export class StorageNode {
|
||||
cb => _lib.storage_download_init(this.#ctx, cid, chunkSize, local, cb, null)
|
||||
);
|
||||
|
||||
const chunks = [];
|
||||
await callAsync(
|
||||
cb => _lib.storage_download_stream(this.#ctx, cid, chunkSize, local, null, cb, null),
|
||||
{ onProgress }
|
||||
|
||||
@ -18,8 +18,8 @@
|
||||
}
|
||||
},
|
||||
"scripts": {
|
||||
"test": "mocha --timeout 30000 two-node-test.js",
|
||||
"testOnly": "sh -c 'mocha --timeout 30000 --grep \"$1\" two-node-test.js' --"
|
||||
"test": "NODE_OPTIONS=\"--force-node-api-uncaught-exceptions-policy=true\" mocha --timeout 30000 two-node-test.js",
|
||||
"testOnly": "sh -c 'NODE_OPTIONS=\"--force-node-api-uncaught-exceptions-policy=true\" mocha --timeout 30000 --grep \"$1\" two-node-test.js' --"
|
||||
},
|
||||
"dependencies": {
|
||||
"koffi": "^2.9.0"
|
||||
|
||||
@ -20,10 +20,11 @@
|
||||
import { describe, it, before, after, beforeEach, afterEach } from 'mocha';
|
||||
import assert from 'node:assert/strict';
|
||||
import net from 'node:net';
|
||||
import dgram from 'node:dgram';
|
||||
import { rmSync, mkdirSync } from 'fs';
|
||||
import { tmpdir } from 'os';
|
||||
import path from 'path';
|
||||
import { StorageNode } from './harness.js';
|
||||
import { StorageNode, DEBUG_ENABLED, debugLog } from './harness.js';
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Config helpers
|
||||
@ -33,7 +34,6 @@ const TMP = tmpdir();
|
||||
const PORT_BASE = parseInt(process.env.PORT_BASE ?? '8700', 10);
|
||||
|
||||
function nodeConfig(label, config) {
|
||||
const debug = Boolean(process.env.npm_config_debug);
|
||||
|
||||
if (!config["data-dir"]) {
|
||||
config["data-dir"] = path.join(TMP, `libstorage-test-${label}`);
|
||||
@ -44,25 +44,34 @@ function nodeConfig(label, config) {
|
||||
mkdirSync(config["data-dir"], { recursive: true });
|
||||
|
||||
if (!config["log-level"]) {
|
||||
config["log-level"] = debug ? 'TRACE' :'ERROR';
|
||||
config["log-level"] = DEBUG_ENABLED ? 'TRACE' : 'ERROR';
|
||||
}
|
||||
|
||||
if (debug) {
|
||||
console.debug(`Node '${label}' config:`, config);
|
||||
}
|
||||
debugLog(`Node '${label}' config:`, config);
|
||||
|
||||
return JSON.stringify(config);
|
||||
}
|
||||
|
||||
/**
|
||||
* Finds the first free TCP port at or above `port`.
|
||||
* Tries to bind a server; if the port is in use, recurses with port + 1.
|
||||
* Finds the first port at or above `port` that is free on both TCP and UDP.
|
||||
* TCP is checked for the listen port; UDP is checked because discv5 discovery
|
||||
* binds a UDP socket on the same port numbers, and a TCP-only check will miss
|
||||
* stale processes that are still holding a UDP port.
|
||||
*/
|
||||
function findFreePort(port) {
|
||||
return new Promise((resolve) => {
|
||||
const srv = net.createServer();
|
||||
srv.listen(port, '127.0.0.1', () => srv.close(() => resolve(port)));
|
||||
srv.on('error', () => resolve(findFreePort(port + 1)));
|
||||
const tcp = net.createServer();
|
||||
tcp.listen(port, '127.0.0.1', () => {
|
||||
tcp.close(() => {
|
||||
// TCP is free — also verify UDP is free (discv5 binds 0.0.0.0:port)
|
||||
const udp = dgram.createSocket('udp4');
|
||||
udp.bind(port, () => {
|
||||
udp.close(() => resolve(port));
|
||||
});
|
||||
udp.on('error', () => resolve(findFreePort(port + 1)));
|
||||
});
|
||||
});
|
||||
tcp.on('error', () => resolve(findFreePort(port + 1)));
|
||||
});
|
||||
}
|
||||
|
||||
@ -212,7 +221,7 @@ describe('two-node tests', () => {
|
||||
await Promise.allSettled([nodeA.shutdown(), nodeB.shutdown()]);
|
||||
});
|
||||
|
||||
it('node B can download content from node A over p2p', async () => {
|
||||
it('node B can download content from node A', async () => {
|
||||
const content = 'Hello from node A — p2p test content!';
|
||||
const cid = await nodeA.uploadContent(content);
|
||||
assert.equal(await nodeB.downloadContent(cid, false), content);
|
||||
@ -229,14 +238,14 @@ describe('two-node tests', () => {
|
||||
});
|
||||
|
||||
it('node B can download all chunks of a file from node A, which exists locally', async () => {
|
||||
// 200 KB with a 16 KB chunk size → 13 chunks; exercises the upload loop
|
||||
// 50 KB with a 16 KB chunk size → 4 chunks; exercises the upload loop
|
||||
// and multi-progress download on the receiving side.
|
||||
// const chunkSize = 16 * 1024;
|
||||
const content = 'A'.repeat(200 * 1024);
|
||||
const cid = await nodeA.uploadContent(content, 'fetch-test.txt');//, chunkSize);
|
||||
const chunkSize = 16 * 1024;
|
||||
const content = 'A'.repeat(50 * 1024);
|
||||
const cid = await nodeA.uploadContent(content, 'fetch-test.txt', chunkSize);
|
||||
|
||||
assert.equal(await nodeB.exists(cid), false);
|
||||
let downloaded = await nodeB.downloadContentByChunks(cid, false);//, chunkSize);
|
||||
let downloaded = await nodeB.downloadAllChunks(cid, false, chunkSize);
|
||||
assert.equal(await nodeB.exists(cid), true);
|
||||
assert.equal(downloaded, content);
|
||||
});
|
||||
@ -246,11 +255,20 @@ describe('two-node tests', () => {
|
||||
const cid = await nodeA.uploadContent(content, 'fetch-test.txt');
|
||||
|
||||
await nodeB.fetch(cid);
|
||||
const fetched = await nodeB.downloadContent(cid, true); // gets fetched content from local store
|
||||
const fetched = await nodeB.downloadContent(cid, true); // gets previously fetched content from local store
|
||||
const streamed = await nodeB.downloadContent(cid, false); // streams content from network
|
||||
assert.equal(fetched, streamed);
|
||||
});
|
||||
|
||||
it('streamed content is equivalent to chunked content', async () => {
|
||||
const content = 'Content for storage_fetch test';
|
||||
const cid = await nodeA.uploadContent(content, 'fetch-test.txt');
|
||||
|
||||
const streamed = await nodeB.downloadContent(cid); // streams content from network
|
||||
const chunked = await nodeB.downloadAllChunks(cid); // streams content by chunks from network
|
||||
assert.equal(streamed, chunked);
|
||||
});
|
||||
|
||||
it('downloaded manifest is equivalent to fetched manifest', async () => {
|
||||
const content = 'Content for storage_fetch test';
|
||||
const cid = await nodeA.uploadContent(content, 'fetch-test.txt');
|
||||
@ -288,13 +306,43 @@ describe('two-node tests', () => {
|
||||
assert.equal(remote, content, 'remote: content mismatch');
|
||||
});
|
||||
|
||||
it('node B remains functional after cancelling a download init', async () => {
|
||||
// Upload two independent pieces of content on A.
|
||||
const cid1 = await nodeA.uploadContent('content to be cancelled');
|
||||
const cid2 = await nodeA.uploadContent('content downloaded after cancel');
|
||||
it('node B can download a chunk from node A', async () => {
|
||||
// upload 2 chunks worth of data
|
||||
const chunkSize = 16 * 1024;
|
||||
const content1 = '1'.repeat(31 * 1024);
|
||||
const cid1 = await nodeA.uploadContent(content1, 'test.txt', chunkSize);
|
||||
|
||||
// Init a download for cid1, then immediately cancel it.
|
||||
await nodeB.downloadInit(cid1, false);
|
||||
await nodeB.downloadInit(cid1, false, chunkSize);
|
||||
let chunk = await nodeB.downloadChunk(cid1, chunkSize);
|
||||
assert.deepStrictEqual(chunk,
|
||||
{
|
||||
chunk: content1.slice(0, chunkSize),
|
||||
bytes: chunkSize,
|
||||
more: true
|
||||
}, 'first chunk should match');
|
||||
|
||||
chunk = await nodeB.downloadChunk(cid1, chunkSize);
|
||||
assert.deepStrictEqual(chunk,
|
||||
{
|
||||
chunk: content1.slice(chunkSize),
|
||||
bytes: (31 - 16) * 1024,
|
||||
more: false
|
||||
}, 'second chunk should match');
|
||||
});
|
||||
|
||||
it('node B remains functional after cancelling a download by chunks', async () => {
|
||||
// Upload two independent pieces of content on A.
|
||||
const chunkSize = 16 * 1024;
|
||||
const content1 = '1'.repeat(50 * 1024);
|
||||
const content2 = '2'.repeat(50 * 1024);
|
||||
const cid1 = await nodeA.uploadContent(content1);
|
||||
const cid2 = await nodeA.uploadContent(content2);
|
||||
|
||||
// Download a chunk of cid1, then immediately cancel it.
|
||||
await nodeB.downloadInit(cid1, false, chunkSize);
|
||||
let chunk = await nodeB.downloadChunk(cid1, chunkSize);
|
||||
assert.equal(chunk.chunk, content1.slice(0, chunkSize), 'first chunk should match');
|
||||
await nodeB.downloadCancel(cid1);
|
||||
|
||||
// After cancelling, the library transfers blocks into B's local store as
|
||||
@ -304,7 +352,32 @@ describe('two-node tests', () => {
|
||||
// More importantly, node B must still be functional: it can download a
|
||||
// completely fresh CID from the network without any issues.
|
||||
const downloaded = await nodeB.downloadContent(cid2, false);
|
||||
assert.equal(downloaded, 'content downloaded after cancel');
|
||||
assert.equal(downloaded, content2, 'should be able to download other content after cancelling a download by chunks');
|
||||
});
|
||||
|
||||
it('node B remains functional after cancelling a download stream', async () => {
|
||||
// Upload two independent pieces of content on A.
|
||||
const chunkSize = 16 * 1024;
|
||||
const content1 = '1'.repeat(50 * 1024);
|
||||
const content2 = '2'.repeat(50 * 1024);
|
||||
const cid1 = await nodeA.uploadContent(content1);
|
||||
const cid2 = await nodeA.uploadContent(content2);
|
||||
|
||||
// Download a chunk of cid1, then immediately cancel it upon receipt of first chunk.
|
||||
await nodeB.downloadInit(cid1, false, chunkSize);
|
||||
await nodeB.streamContent(cid1, false, chunkSize, async (chunk, bytes, _userData) => {
|
||||
assert.equal(chunk.chunk, content1.slice(0, chunkSize), 'first chunk should match');
|
||||
await nodeB.downloadCancel(cid1);
|
||||
});
|
||||
|
||||
// After cancelling, the library transfers blocks into B's local store as
|
||||
// a side-effect of the init, so cid1 should exist locally on B.
|
||||
assert.equal(await nodeB.exists(cid1), true, 'cancelled CID should be in local store after init');
|
||||
|
||||
// More importantly, node B must still be functional: it can download a
|
||||
// completely fresh CID from the network without any issues.
|
||||
const downloaded = await nodeB.downloadContent(cid2, false);
|
||||
assert.equal(downloaded, content2, 'should be able to download other content after cancelling a download stream');
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user