add WrappedBuffer helper to handle Uint8Array IDs

Add a custom Trie based HashMap to handle Uint8Array keys
This commit is contained in:
nazeh 2023-05-07 16:03:07 +03:00
parent e789796ea8
commit 373a3fe474
2 changed files with 202 additions and 89 deletions

View File

@ -1,5 +1,105 @@
// (C) 2023 Doug Hoyte. MIT license
class WrappedBuffer {
constructor(buffer) {
this._raw = new Uint8Array(buffer) || new Uint8Array(1024);
this.length = buffer ? buffer.length : 0;
this._expansionRate = 2;
}
unwrap() {
return this._raw.subarray(0, this.length)
}
get capacity() {
return this._raw.byteLength
}
extend(buf) {
const targetSize = buf.length + this.length;
if (this.capacity < targetSize) {
const oldRaw = this._raw;
const newCapacity = Math.max(
Math.floor(this.capacity * this._expansionRate),
targetSize
);
this._raw = new Uint8Array(newCapacity);
this._raw.set(oldRaw);
}
this._raw.set(buf, this.length);
this.length += buf.length;
return this;
}
shift() {
const first = this._raw[0]
this._raw = this._raw.subarray(1)
this.length -= 1
return first
}
splice(n = 1) {
const firstSubarray = this._raw.subarray(0, n)
this._raw = this._raw.subarray(n)
this.length -= n
return firstSubarray
}
}
class TrieNode {
constructor() {
this.children = new Array(256).fill(null);
this.value = null;
this.isTail = false;
}
}
class HashMap {
constructor() {
this._root = new TrieNode();
}
set(id, value) {
let node = this._root;
for (let byte of id) {
if (!node.children[byte]) {
node.children[byte] = new TrieNode();
}
node = node.children[byte];
};
node.value = value;
node.isTail = true;
}
get(keyUint8Array) {
let node = this._root;
for (let element of keyUint8Array) {
if (!node.children[element]) {
return null; // return null if the key is not in the trie
}
node = node.children[element];
}
return node.value;
}
*[Symbol.iterator]() {
const key = [];
function* walk(node) {
for (let i = 0; i < 256; i++) {
if (!node.children[i]) continue;
key.push(i);
if (node.children[i].isTail) {
yield [new Uint8Array(key), node.children[i].value];
}
yield* walk(node.children[i]);
key.pop();
}
}
yield* walk(this._root);
}
}
class Negentropy {
constructor(idSize) {
if (idSize < 8 || idSize > 32) throw Error("idSize invalid");
@ -9,10 +109,10 @@ class Negentropy {
addItem(timestamp, id) {
if (this.sealed) throw Error("already sealed");
if (id.length > 64 || id.length % 2 !== 0) throw Error("bad length for id");
id = id.substr(0, this.idSize * 2);
if (id.byteLength > 64 || id.byteLength % 2 !== 0) throw Error("bad length for id");
id = id.subarray(0, this.idSize * 2);
this.items.push({ timestamp, id: fromHexString(id), idHex: id, });
this.items.push({ timestamp, id });
}
seal() {
@ -30,27 +130,30 @@ class Negentropy {
}
_zeroBound() {
let id = new Array(this.idSize).fill(0);
return { timestamp: 0, id, idHex: toHexString(id), };
return { timestamp: 0, id: new Uint8Array(this.idSize) };
}
_maxBound() {
return { timestamp: Number.MAX_VALUE, id: new Uint8Array(0) };
}
initiate() {
if (!this.sealed) throw Error("not sealed");
this.isInitiator = true;
let output = [];
let output = new WrappedBuffer();
let state = this._newState();
this.splitRange(0, this.items.length, this._zeroBound(), { timestamp: Number.MAX_VALUE, id: [], }, state, output);
return toHexString(output);
this.splitRange(0, this.items.length, this._zeroBound(), this._maxBound(), state, output);
return output.unwrap();
}
reconcile(query) {
if (!this.sealed) throw Error("not sealed");
query = fromHexString(query);
query = new WrappedBuffer(query);
let haveIds = [], needIds = [];
let output = [];
let output = new WrappedBuffer();
let prevBound = this._zeroBound();
let prevIndex = 0;
let state = this._newState();
@ -59,8 +162,8 @@ class Negentropy {
let doSkip = () => {
if (!skip) return;
skip = false;
output.push(...this.encodeBound(prevBound, state));
output.push(...this.encodeVarInt(0)); // mode = Skip
this.encodeBound(prevBound, state, output);
output.extend(this.encodeVarInt(0)); // mode = Skip
};
while (query.length !== 0) {
@ -75,7 +178,7 @@ class Negentropy {
} else if (mode === 1) { // Fingerprint
let theirXorSet = this.getBytes(query, this.idSize);
let ourXorSet = new Array(this.idSize).fill(0);
let ourXorSet = new Uint8Array(this.idSize);
for (let i = lower; i < upper; ++i) {
let item = this.items[i];
for (let j = 0; j < this.idSize; j++) ourXorSet[j] ^= item.id[j];
@ -98,48 +201,48 @@ class Negentropy {
} else if (mode === 2) { // IdList
let numElems = this.decodeVarInt(query);
let theirElems = {};
let theirElems = new HashMap();
for (let i = 0; i < numElems; i++) {
let id = toHexString(this.getBytes(query, this.idSize));
theirElems[id] = { offset: i, onBothSides: false, };
let id = this.getBytes(query, this.idSize);
theirElems.set(id, { offset: i, onBothSides: false });
}
let responseHaveIds = [];
let responseNeedIndices = [];
for (let i = lower; i < upper; i++) {
let id = this.items[i].idHex;
let e = theirElems[id];
let id = this.items[i].id;
let e = theirElems.get(id);
if (e === undefined) {
if (!e) {
// ID exists on our side, but not their side
if (this.isInitiator) haveIds.push(id);
else responseHaveIds.push(id);
} else {
// ID exists on both sides
theirElems[id].onBothSides = true;
e.onBothSides = true;
}
}
for (let k of Object.keys(theirElems)) {
if (!theirElems[k].onBothSides) {
for (let [k, v] of theirElems) {
if (!v.onBothSides) {
// ID exists on their side, but not our side
if (this.isInitiator) needIds.push(k);
else responseNeedIndices.push(theirElems[k].offset);
else responseNeedIndices.push(v.offset);
}
}
if (!this.isInitiator) {
doSkip();
output.push(...this.encodeBound(currBound, state));
output.push(...this.encodeVarInt(3)); // mode = IdListResponse
this.encodeBound(currBound, state, output);
output.extend(this.encodeVarInt(3)); // mode = IdListResponse
output.push(...this.encodeVarInt(responseHaveIds.length));
for (let id of responseHaveIds) output.push(...fromHexString(id));
output.extend(this.encodeVarInt(responseHaveIds.length));
for (let id of responseHaveIds) output.extend(id);
let bitField = this.encodeBitField(responseNeedIndices);
output.push(...this.encodeVarInt(bitField.length));
output.push(...bitField);
output.extend(this.encodeVarInt(bitField.length));
output.extend(bitField);
} else {
skip = true;
}
@ -149,14 +252,14 @@ class Negentropy {
let numIds = this.decodeVarInt(query);
for (let i = 0; i < numIds; i++) {
needIds.push(toHexString(this.getBytes(query, this.idSize)));
needIds.push(this.getBytes(query, this.idSize));
}
let bitFieldSize = this.decodeVarInt(query);
let bitField = this.getBytes(query, bitFieldSize);
for (let i = lower; i < upper; i++) {
if (this.bitFieldLookup(bitField, i - lower)) haveIds.push(this.items[i].idHex);
if (this.bitFieldLookup(bitField, i - lower)) haveIds.push(this.items[i].id);
}
} else {
throw Error("unexpected mode");
@ -166,35 +269,35 @@ class Negentropy {
prevBound = currBound;
}
return [toHexString(output), haveIds, needIds];
return [output.unwrap(), haveIds, needIds];
}
splitRange(lower, upper, lowerBound, upperBound, state, output) {
splitRange(lower, upper, _lowerBound, upperBound, state, output) {
let numElems = upper - lower;
let buckets = 16;
if (numElems < buckets * 2) {
output.push(...this.encodeBound(upperBound, state));
output.push(...this.encodeVarInt(2)); // mode = IdList
this.encodeBound(upperBound, state, output);
output.extend(this.encodeVarInt(2)); // mode = IdList
output.push(...this.encodeVarInt(numElems));
for (let it = lower; it < upper; ++it) output.push(...this.items[it].id);
output.extend(this.encodeVarInt(numElems));
for (let it = lower; it < upper; ++it) output.extend(this.items[it].id);
} else {
let itemsPerBucket = Math.floor(numElems / buckets);
let bucketsWithExtra = numElems % buckets;
let curr = lower;
for (let i = 0; i < buckets; i++) {
let ourXorSet = new Array(this.idSize).fill(0);
let ourXorSet = new Uint8Array(this.idSize)
for (let bucketEnd = curr + itemsPerBucket + (i < bucketsWithExtra ? 1 : 0); curr != bucketEnd; curr++) {
for (let j = 0; j < this.idSize; j++) ourXorSet[j] ^= this.items[curr].id[j];
}
if (i === buckets - 1) output.push(...this.encodeBound(upperBound, state));
else output.push(...this.encodeMinimalBound(this.items[curr], this.items[curr - 1], state));
if (i === buckets - 1) this.encodeBound(upperBound, state, output);
else this.encodeMinimalBound(this.items[curr], this.items[curr - 1], state, output);
output.push(...this.encodeVarInt(1)); // mode = Fingerprint
output.push(...ourXorSet);
output.extend(this.encodeVarInt(1)); // mode = Fingerprint
output.extend(ourXorSet);
}
}
}
@ -208,7 +311,7 @@ class Negentropy {
getBytes(buf, n) {
if (buf.length < n) throw Error("parse ends prematurely");
return buf.splice(0, n);
return buf.splice(n);
}
decodeVarInt(buf) {
@ -238,28 +341,28 @@ class Negentropy {
decodeBound(encoded, state) {
let timestamp = this.decodeTimestampIn(encoded, state);
let len = this.decodeVarInt(encoded);
if (len > this.idSize) throw herr("bound key too long");
if (len > this.idSize) throw Error("bound key too long");
let id = this.getBytes(encoded, len);
return { timestamp, id, idHex: toHexString(id), };
return { timestamp, id };
}
// Encoding
encodeVarInt(n) {
if (n === 0) return [0];
if (n === 0) return new Uint8Array([0]);
let o = [];
while (n !== 0) {
o.push(n & 0x7F);
o.push(n & 127);
n >>>= 7;
}
o.reverse();
for (let i = 0; i < o.length - 1; i++) o[i] |= 0x80;
for (let i = 0; i < o.length - 1; i++) o[i] |= 128;
return o;
return new Uint8Array(o);
}
encodeTimestampOut(timestamp, state) {
@ -274,23 +377,17 @@ class Negentropy {
return this.encodeVarInt(timestamp + 1);
}
encodeBound(key, state) {
let output = [];
output.push(...this.encodeTimestampOut(key.timestamp, state));
output.push(...this.encodeVarInt(key.id.length));
output.push(...key.id);
return output;
encodeBound(key, state, output) {
output.extend(this.encodeTimestampOut(key.timestamp, state));
output.extend(this.encodeVarInt(key.id.length));
output.extend(key.id);
}
encodeMinimalBound(curr, prev, state) {
let output = [];
output.push(...this.encodeTimestampOut(curr.timestamp, state));
encodeMinimalBound(curr, prev, state, output) {
output.extend(this.encodeTimestampOut(curr.timestamp, state));
if (curr.timestamp !== prev.timestamp) {
output.push(...this.encodeVarInt(0));
output.extend(this.encodeVarInt(0));
} else {
let sharedPrefixBytes = 0;
@ -299,18 +396,16 @@ class Negentropy {
sharedPrefixBytes++;
}
output.push(...this.encodeVarInt(sharedPrefixBytes + 1));
output.push(...curr.id.slice(0, sharedPrefixBytes + 1));
output.extend(this.encodeVarInt(sharedPrefixBytes + 1));
output.extend(curr.id.subarray(0, sharedPrefixBytes + 1));
}
return output;
};
encodeBitField(inds) {
if (inds.length === 0) return [];
let max = Math.max(...inds);
let bitField = new Array(Math.floor((max + 8) / 8)).fill(0);
let bitField = new Uint8Array(Math.floor((max + 8) / 8));
for (let ind of inds) bitField[Math.floor(ind / 8)] |= 1 << (ind % 8);
return bitField;
@ -325,21 +420,15 @@ class Negentropy {
function fromHexString(hexString) {
if ((hexString.length % 2) !== 0) throw Error("uneven length of hex string");
if (hexString.length === 0) return [];
return hexString.match(/../g).map((byte) => parseInt(byte, 16));
}
function toHexString(buf) {
return buf.reduce((str, byte) => str + byte.toString(16).padStart(2, '0'), '');
}
/**
* @param {Item} a
* @param {Item} b
*
* @returns {number}
*/
function itemCompare(a, b) {
if (a.timestamp === b.timestamp) {
if (a.idHex < b.idHex) return -1;
else if (a.idHex > b.idHex) return 1;
return 0;
return compareUint8Array(a.id, b.id)
}
return a.timestamp - b.timestamp;
@ -372,6 +461,30 @@ function findUpperBound(arr, first, last, value, cmp) {
return binarySearch(arr, first, last, (a) => cmp(value, a) >= 0);
}
/**
* @param {Uint8Array} a
* @param {Uint8Array} b
*/
function compareUint8Array(a, b) {
for (let i = 0; i < a.byteLength; i++) {
if (a[i] < b[i]) {
return -1
}
if (a[i] > b[i]) {
return 1
}
}
if (a.byteLength > b.byteLength) {
return 1
}
if (a.byteLength < b.byteLength) {
return -1
}
return 0
}
module.exports = Negentropy;

View File

@ -4,9 +4,9 @@ const Negentropy = require('../../js/Negentropy.js');
const idSize = 16;
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
terminal: false
input: process.stdin,
output: process.stdout,
terminal: false
});
let n = 0;
@ -19,8 +19,8 @@ rl.on('line', (line) => {
let mode = parseInt(items[0]);
let created = parseInt(items[1]);
let id = items[2];
if (id.length !== idSize*2) throw Error("unexpected id size");
let id = Buffer.from(items[2].trim(), 'hex');
if (id.byteLength !== idSize) throw Error(`id should be ${idSize} bytes`);
if (mode === 1) {
x1.addItem(created, id);
@ -50,18 +50,18 @@ rl.once('close', () => {
let [newQ, haveIds, needIds] = x1.reconcile(q);
q = newQ;
for (let id of haveIds) console.log(`xor,HAVE,${id}`);
for (let id of needIds) console.log(`xor,NEED,${id}`);
for (let id of haveIds) console.log(`xor,HAVE,${Buffer.from(id).toString('hex')}`);
for (let id of needIds) console.log(`xor,NEED,${Buffer.from(id).toString('hex')}`);
}
if (q.length === 0) break;
console.error(`[${round}] CLIENT -> SERVER: ${q.length / 2} bytes`);
console.error(`[${round}] CLIENT -> SERVER: ${q.byteLength} bytes`);
let [newQ, haveIds, needIds] = x2.reconcile(q);
q = newQ;
console.error(`[${round}] SERVER -> CLIENT: ${q.length / 2} bytes`);
console.error(`[${round}] SERVER -> CLIENT: ${q.byteLength} bytes`);
round++;
}