mirror of
https://github.com/logos-messaging/negentropy.git
synced 2026-01-02 14:03:07 +00:00
js: implement protocol version 1
This commit is contained in:
parent
304779371c
commit
ed5f8e7e48
753
js/Negentropy.js
753
js/Negentropy.js
@ -1,18 +1,18 @@
|
||||
// (C) 2023 Doug Hoyte. MIT license
|
||||
|
||||
const PROTOCOL_VERSION_0 = 0x60;
|
||||
const PROTOCOL_VERSION = 0x61; // Version 1
|
||||
const ID_SIZE = 32;
|
||||
const FINGERPRINT_SIZE = 16;
|
||||
|
||||
const Mode = {
|
||||
Skip: 0,
|
||||
Fingerprint: 1,
|
||||
IdList: 2,
|
||||
Continuation: 3,
|
||||
UnsupportedProtocolVersion: 4,
|
||||
};
|
||||
|
||||
class WrappedBuffer {
|
||||
constructor(buffer) {
|
||||
this._raw = new Uint8Array(buffer || 256);
|
||||
this._raw = new Uint8Array(buffer || 512);
|
||||
this.length = buffer ? buffer.length : 0;
|
||||
}
|
||||
|
||||
@ -26,6 +26,7 @@ class WrappedBuffer {
|
||||
|
||||
extend(buf) {
|
||||
if (buf._raw) buf = buf.unwrap();
|
||||
if (typeof(buf.length) !== 'number') throw Error("bad length");
|
||||
const targetSize = buf.length + this.length;
|
||||
if (this.capacity < targetSize) {
|
||||
const oldRaw = this._raw;
|
||||
@ -53,17 +54,49 @@ class WrappedBuffer {
|
||||
}
|
||||
}
|
||||
|
||||
function decodeVarInt(buf) {
|
||||
let res = 0;
|
||||
|
||||
class Negentropy {
|
||||
constructor(idSize = 16, frameSizeLimit = 0) {
|
||||
if (idSize < 8 || idSize > 32) throw Error("idSize invalid");
|
||||
if (frameSizeLimit !== 0 && frameSizeLimit < 4096) throw Error("frameSizeLimit too small");
|
||||
while (1) {
|
||||
if (buf.length === 0) throw Error("parse ends prematurely");
|
||||
let byte = buf.shift();
|
||||
res = (res << 7) | (byte & 127);
|
||||
if ((byte & 128) === 0) break;
|
||||
}
|
||||
|
||||
this.idSize = idSize;
|
||||
this.frameSizeLimit = frameSizeLimit;
|
||||
return res;
|
||||
}
|
||||
|
||||
this.addedItems = [];
|
||||
this.pendingOutputs = [];
|
||||
function encodeVarInt(n) {
|
||||
if (n === 0) return new WrappedBuffer([0]);
|
||||
|
||||
let o = [];
|
||||
|
||||
while (n !== 0) {
|
||||
o.push(n & 127);
|
||||
n >>>= 7;
|
||||
}
|
||||
|
||||
o.reverse();
|
||||
|
||||
for (let i = 0; i < o.length - 1; i++) o[i] |= 128;
|
||||
|
||||
return new WrappedBuffer(o);
|
||||
}
|
||||
|
||||
function getByte(buf) {
|
||||
return getBytes(buf, 1)[0];
|
||||
}
|
||||
|
||||
function getBytes(buf, n) {
|
||||
if (buf.length < n) throw Error("parse ends prematurely");
|
||||
return buf.shiftN(n);
|
||||
}
|
||||
|
||||
|
||||
class Accumulator {
|
||||
constructor() {
|
||||
this.setToZero();
|
||||
|
||||
if (typeof window === 'undefined') { // node.js
|
||||
const crypto = require('crypto');
|
||||
@ -73,312 +106,133 @@ class Negentropy {
|
||||
}
|
||||
}
|
||||
|
||||
addItem(timestamp, id) {
|
||||
setToZero() {
|
||||
this.buf = new Uint8Array(ID_SIZE);
|
||||
}
|
||||
|
||||
add(otherBuf) {
|
||||
let currCarry = 0, nextCarry = 0;
|
||||
let p = new DataView(this.buf.buffer);
|
||||
let po = new DataView(otherBuf.buffer);
|
||||
|
||||
for (let i = 0; i < 8; i++) {
|
||||
let offset = i * 4;
|
||||
let orig = p.getUint32(offset, true);
|
||||
let otherV = po.getUint32(offset, true);
|
||||
|
||||
let next = orig;
|
||||
|
||||
next += currCarry;
|
||||
next += otherV;
|
||||
if (next > 0xFFFFFFFF) nextCarry = 1;
|
||||
|
||||
p.setUint32(offset, next & 0xFFFFFFFF, true);
|
||||
currCarry = nextCarry;
|
||||
nextCarry = 0;
|
||||
}
|
||||
}
|
||||
|
||||
negate() {
|
||||
let p = new DataView(this.buf.buffer);
|
||||
|
||||
for (let i = 0; i < 8; i++) {
|
||||
let offset = i * 4;
|
||||
p.setUint32(offset, ~p.getUint32(offset, true));
|
||||
}
|
||||
|
||||
let one = new Uint8Array(ID_SIZE);
|
||||
one[0] = 1;
|
||||
this.add(one);
|
||||
}
|
||||
|
||||
async getFingerprint(n) {
|
||||
let input = new WrappedBuffer();
|
||||
input.extend(this.buf);
|
||||
input.extend(encodeVarInt(n));
|
||||
|
||||
let hash = await this.sha256(input.unwrap());
|
||||
|
||||
return hash.subarray(0, FINGERPRINT_SIZE);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
class NegentropyStorageVector {
|
||||
constructor() {
|
||||
this.items = [];
|
||||
this.sealed = false;
|
||||
}
|
||||
|
||||
insert(timestamp, id) {
|
||||
if (this.sealed) throw Error("already sealed");
|
||||
id = this._loadInputBuffer(id);
|
||||
|
||||
if (id.byteLength > 64 || id.byteLength < this.idSize) throw Error("bad length for id");
|
||||
if (id.byteLength > this.idSize) id = id.subarray(0, this.idSize);
|
||||
|
||||
this.addedItems.push([ timestamp, id ]);
|
||||
id = loadInputBuffer(id);
|
||||
if (id.byteLength !== ID_SIZE) throw Error("bad id size for added item");
|
||||
this.items.push({ timestamp, id });
|
||||
}
|
||||
|
||||
seal() {
|
||||
if (this.sealed) throw Error("already sealed");
|
||||
this.sealed = true;
|
||||
|
||||
this.addedItems.sort((a, b) => a[0] !== b[0] ? a[0] - b[0] : compareUint8Array(a[1], b[1]));
|
||||
this.items.sort(itemCompare);
|
||||
|
||||
if (this.addedItems.length > 1) {
|
||||
for (let i = 0; i < this.addedItems.length - 1; i++) {
|
||||
if (this.addedItems[i][0] == this.addedItems[i + 1][0] &&
|
||||
compareUint8Array(this.addedItems[i][1], this.addedItems[i + 1][1]) === 0) throw Error("duplicate item inserted");
|
||||
}
|
||||
for (let i = 1; i < this.items.length; i++) {
|
||||
if (itemCompare(this.items[i - 1], this.items[i]) === 0) throw Error("duplicate item inserted");
|
||||
}
|
||||
|
||||
this.itemTimestamps = new BigUint64Array(this.addedItems.length);
|
||||
this.itemIds = new Uint8Array(this.addedItems.length * this.idSize);
|
||||
|
||||
for (let i = 0; i < this.addedItems.length; i++) {
|
||||
let item = this.addedItems[i];
|
||||
this.itemTimestamps[i] = BigInt(item[0]);
|
||||
this.itemIds.set(item[1], i * this.idSize);
|
||||
}
|
||||
|
||||
delete this.addedItems;
|
||||
}
|
||||
|
||||
_newState() {
|
||||
return {
|
||||
lastTimestampIn: 0,
|
||||
lastTimestampOut: 0,
|
||||
};
|
||||
unseal() {
|
||||
this.sealed = false;
|
||||
}
|
||||
|
||||
_zeroBound() {
|
||||
return { timestamp: 0, id: new Uint8Array(this.idSize) };
|
||||
}
|
||||
|
||||
_maxBound() {
|
||||
return { timestamp: Number.MAX_VALUE, id: new Uint8Array(0) };
|
||||
}
|
||||
|
||||
_loadInputBuffer(inp) {
|
||||
if (typeof(inp) === 'string') inp = hexToUint8Array(inp);
|
||||
else if (__proto__ !== Uint8Array.prototype) inp = new Uint8Array(inp); // node Buffer?
|
||||
return inp;
|
||||
}
|
||||
|
||||
numItems() {
|
||||
return this.itemTimestamps.length;
|
||||
}
|
||||
|
||||
getItemTimestamp(i) {
|
||||
return Number(this.itemTimestamps[i]);
|
||||
}
|
||||
|
||||
getItemId(i) {
|
||||
let offset = i * this.idSize;
|
||||
return this.itemIds.subarray(offset, offset + this.idSize);
|
||||
size() {
|
||||
this._checkSealed();
|
||||
return this.items.length;
|
||||
}
|
||||
|
||||
getItem(i) {
|
||||
return { timestamp: this.getItemTimestamp(i), id: this.getItemId(i), };
|
||||
this._checkSealed();
|
||||
if (i >= this.items.length) throw Error("out of range");
|
||||
return this.items[i];
|
||||
}
|
||||
|
||||
async computeFingerprint(lower, num) {
|
||||
let offset = lower * this.idSize;
|
||||
let slice = this.itemIds.subarray(offset, offset + (num * this.idSize));
|
||||
let output = await this.sha256(slice);
|
||||
return output.subarray(0, this.idSize);
|
||||
iterate(begin, end, cb) {
|
||||
this._checkSealed();
|
||||
this._checkBounds(begin, end);
|
||||
|
||||
for (let i = begin; i < end; ++i) {
|
||||
if (!cb(this.items[i], i)) break;
|
||||
}
|
||||
}
|
||||
|
||||
async initiate() {
|
||||
findLowerBound(begin, end, bound) {
|
||||
this._checkSealed();
|
||||
this._checkBounds(begin, end);
|
||||
|
||||
return this._binarySearch(this.items, begin, end, (a) => itemCompare(a, bound) < 0);
|
||||
}
|
||||
|
||||
async fingerprint(begin, end) {
|
||||
let out = new Accumulator();
|
||||
out.setToZero();
|
||||
|
||||
this.iterate(begin, end, (item, i) => {
|
||||
out.add(item.id);
|
||||
return true;
|
||||
});
|
||||
|
||||
return await out.getFingerprint(end - begin);
|
||||
}
|
||||
|
||||
_checkSealed() {
|
||||
if (!this.sealed) throw Error("not sealed");
|
||||
this.isInitiator = true;
|
||||
if (this.didHandshake) throw Error("can't initiate after reconcile");
|
||||
|
||||
await this.splitRange(0, this.numItems(), this._zeroBound(), this._maxBound(), this.pendingOutputs);
|
||||
|
||||
return this.buildOutput(true);
|
||||
}
|
||||
|
||||
async reconcile(query) {
|
||||
query = new WrappedBuffer(this._loadInputBuffer(query));
|
||||
let haveIds = [], needIds = [];
|
||||
|
||||
if (!this.sealed) throw Error("not sealed");
|
||||
this.continuationNeeded = false;
|
||||
|
||||
let prevBound = this._zeroBound();
|
||||
let prevIndex = 0;
|
||||
let state = this._newState();
|
||||
let outputs = [];
|
||||
|
||||
if (!this.isInitiator && !this.didHandshake) {
|
||||
let protocolVersion = this.getBytes(query, 1)[0];
|
||||
if (protocolVersion < 0x60 || protocolVersion > 0x6F) throw Error("invalid negentropy protocol version byte");
|
||||
if (protocolVersion !== PROTOCOL_VERSION_0) {
|
||||
let o = new WrappedBuffer();
|
||||
o.extend(this.encodeBound({ timestamp: PROTOCOL_VERSION_0, id: new Uint8Array([]), }, state));
|
||||
o.extend(this.encodeVarInt(Mode.UnsupportedProtocolVersion));
|
||||
let ret = o.unwrap();
|
||||
if (!this.wantUint8ArrayOutput) ret = uint8ArrayToHex(ret);
|
||||
return [ret, haveIds, needIds];
|
||||
}
|
||||
this.didHandshake = true;
|
||||
}
|
||||
|
||||
while (query.length !== 0) {
|
||||
let currBound = this.decodeBound(query, state);
|
||||
let mode = this.decodeVarInt(query);
|
||||
|
||||
let lower = prevIndex;
|
||||
let upper = this.findUpperBound(lower, this.numItems(), currBound);
|
||||
|
||||
if (mode === Mode.Skip) {
|
||||
// Do nothing
|
||||
} else if (mode === Mode.Fingerprint) {
|
||||
let theirFingerprint = this.getBytes(query, this.idSize);
|
||||
let ourFingerprint = await this.computeFingerprint(lower, upper - lower);
|
||||
|
||||
if (compareUint8Array(theirFingerprint, ourFingerprint) !== 0) {
|
||||
await this.splitRange(lower, upper, prevBound, currBound, outputs);
|
||||
}
|
||||
} else if (mode === Mode.IdList) {
|
||||
let numIds = this.decodeVarInt(query);
|
||||
|
||||
let theirElems = {}; // stringified Uint8Array -> original Uint8Array
|
||||
for (let i = 0; i < numIds; i++) {
|
||||
let e = this.getBytes(query, this.idSize);
|
||||
theirElems[e] = e;
|
||||
}
|
||||
|
||||
for (let i = lower; i < upper; i++) {
|
||||
let k = this.getItemId(i);
|
||||
|
||||
if (!theirElems[k]) {
|
||||
// ID exists on our side, but not their side
|
||||
if (this.isInitiator) haveIds.push(this.wantUint8ArrayOutput ? k : uint8ArrayToHex(k));
|
||||
} else {
|
||||
// ID exists on both sides
|
||||
delete theirElems[k];
|
||||
}
|
||||
}
|
||||
|
||||
if (this.isInitiator) {
|
||||
for (let v of Object.values(theirElems)) {
|
||||
needIds.push(this.wantUint8ArrayOutput ? v : uint8ArrayToHex(v));
|
||||
}
|
||||
} else {
|
||||
let responseHaveIds = [];
|
||||
|
||||
let it = lower;
|
||||
let didSplit = false;
|
||||
let splitBound = this._zeroBound();
|
||||
|
||||
let flushIdListOutput = () => {
|
||||
let payload = this.encodeVarInt(Mode.IdList);
|
||||
|
||||
payload.extend(this.encodeVarInt(responseHaveIds.length));
|
||||
for (let id of responseHaveIds) payload.extend(id);
|
||||
|
||||
let nextSplitBound = (it+1) >= upper ? currBound : this.getMinimalBound(this.getItem(it), this.getItem(it+1));
|
||||
|
||||
outputs.push({
|
||||
start: didSplit ? splitBound : prevBound,
|
||||
end: nextSplitBound,
|
||||
payload: payload,
|
||||
});
|
||||
|
||||
splitBound = nextSplitBound;
|
||||
didSplit = true;
|
||||
|
||||
responseHaveIds = [];
|
||||
};
|
||||
|
||||
for (; it < upper; ++it) {
|
||||
responseHaveIds.push(this.getItemId(it));
|
||||
if (responseHaveIds.length >= 100) flushIdListOutput(); // 100*32 is less than minimum frame size limit of 4k
|
||||
}
|
||||
|
||||
flushIdListOutput();
|
||||
}
|
||||
} else if (mode === Mode.Continuation) {
|
||||
this.continuationNeeded = true;
|
||||
} else if (mode === Mode.UnsupportedProtocolVersion) {
|
||||
throw Error("server does not support our negentropy protocol version");
|
||||
} else {
|
||||
throw Error("unexpected mode");
|
||||
}
|
||||
|
||||
prevIndex = upper;
|
||||
prevBound = currBound;
|
||||
}
|
||||
|
||||
while (outputs.length) {
|
||||
this.pendingOutputs.unshift(outputs.pop());
|
||||
}
|
||||
|
||||
return [this.buildOutput(false), haveIds, needIds];
|
||||
_checkBounds(begin, end) {
|
||||
if (begin > end || end > this.items.length) throw Error("bad range");
|
||||
}
|
||||
|
||||
async splitRange(lower, upper, lowerBound, upperBound, outputs) {
|
||||
let numElems = upper - lower;
|
||||
let buckets = 16;
|
||||
|
||||
if (numElems < buckets * 2) {
|
||||
let payload = this.encodeVarInt(Mode.IdList);
|
||||
payload.extend(this.encodeVarInt(numElems));
|
||||
for (let it = lower; it < upper; ++it) payload.extend(this.getItemId(it));
|
||||
|
||||
outputs.push({
|
||||
start: lowerBound,
|
||||
end: upperBound,
|
||||
payload: payload,
|
||||
});
|
||||
} else {
|
||||
let itemsPerBucket = Math.floor(numElems / buckets);
|
||||
let bucketsWithExtra = numElems % buckets;
|
||||
let curr = lower;
|
||||
let prevBound = this.getItem(curr);
|
||||
|
||||
for (let i = 0; i < buckets; i++) {
|
||||
let bucketSize = itemsPerBucket + (i < bucketsWithExtra ? 1 : 0);
|
||||
let ourFingerprint = await this.computeFingerprint(curr, bucketSize);
|
||||
curr += bucketSize;
|
||||
|
||||
let payload = this.encodeVarInt(Mode.Fingerprint);
|
||||
payload.extend(ourFingerprint);
|
||||
|
||||
outputs.push({
|
||||
start: i === 0 ? lowerBound : prevBound,
|
||||
end: curr === upper ? upperBound : this.getMinimalBound(this.getItem(curr - 1), this.getItem(curr)),
|
||||
payload: payload,
|
||||
});
|
||||
|
||||
prevBound = outputs[outputs.length - 1].end;
|
||||
}
|
||||
|
||||
outputs[outputs.length - 1].end = upperBound;
|
||||
}
|
||||
}
|
||||
|
||||
buildOutput(initialMessage) {
|
||||
let output = new WrappedBuffer();
|
||||
let currBound = this._zeroBound();
|
||||
let state = this._newState();
|
||||
|
||||
if (initialMessage) {
|
||||
if (this.didHandshake) throw Error("already built initial message");
|
||||
output.extend([ PROTOCOL_VERSION_0 ]);
|
||||
this.didHandshake = true;
|
||||
}
|
||||
|
||||
this.pendingOutputs.sort((a,b) => itemCompare(a.start, b.start));
|
||||
|
||||
while (this.pendingOutputs.length) {
|
||||
let o = new WrappedBuffer();
|
||||
|
||||
let p = this.pendingOutputs[0];
|
||||
|
||||
let cmp = itemCompare(p.start, currBound);
|
||||
// If bounds are out of order or overlapping, finish and resume next time (shouldn't happen because of sort above)
|
||||
if (cmp < 0) break;
|
||||
|
||||
if (cmp !== 0) {
|
||||
o.extend(this.encodeBound(p.start, state));
|
||||
o.extend(this.encodeVarInt(Mode.Skip));
|
||||
}
|
||||
|
||||
o.extend(this.encodeBound(p.end, state));
|
||||
o.extend(p.payload);
|
||||
|
||||
if (this.frameSizeLimit && output.length + o.length > this.frameSizeLimit - 5) break; // 5 leaves room for Continuation
|
||||
output.extend(o);
|
||||
|
||||
currBound = p.end;
|
||||
this.pendingOutputs.shift();
|
||||
|
||||
}
|
||||
|
||||
// Server indicates that it has more to send, OR ensure client sends a non-empty message
|
||||
|
||||
if (!this.isInitiator && this.pendingOutputs.length) {
|
||||
output.extend(this.encodeBound(this._maxBound(), state));
|
||||
output.extend(this.encodeVarInt(Mode.Continuation));
|
||||
}
|
||||
|
||||
if (this.isInitiator && output.length === 0 && !this.continuationNeeded) {
|
||||
return null;
|
||||
}
|
||||
|
||||
let ret = output.unwrap();
|
||||
if (!this.wantUint8ArrayOutput) ret = uint8ArrayToHex(ret);
|
||||
return ret;
|
||||
}
|
||||
|
||||
findUpperBound(first, last, value) {
|
||||
_binarySearch(arr, first, last, cmp) {
|
||||
let count = last - first;
|
||||
|
||||
while (count > 0) {
|
||||
@ -386,7 +240,7 @@ class Negentropy {
|
||||
let step = Math.floor(count / 2);
|
||||
it += step;
|
||||
|
||||
if (!(value.timestamp === this.getItemTimestamp(it) ? compareUint8Array(value.id, this.getItemId(it)) < 0 : value.timestamp < this.getItemTimestamp(it))) {
|
||||
if (cmp(arr[it])) {
|
||||
first = ++it;
|
||||
count -= step + 1;
|
||||
} else {
|
||||
@ -396,83 +250,267 @@ class Negentropy {
|
||||
|
||||
return first;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class Negentropy {
|
||||
constructor(storage, frameSizeLimit = 0) {
|
||||
if (frameSizeLimit !== 0 && frameSizeLimit < 4096) throw Error("frameSizeLimit too small");
|
||||
|
||||
this.storage = storage;
|
||||
this.frameSizeLimit = frameSizeLimit;
|
||||
|
||||
this.lastTimestampIn = 0;
|
||||
this.lastTimestampOut = 0;
|
||||
}
|
||||
|
||||
_bound(timestamp, id) {
|
||||
return { timestamp, id: id ? id : new Uint8Array(0) };
|
||||
}
|
||||
|
||||
async initiate() {
|
||||
if (this.isInitiator) throw Error("already initiated");
|
||||
this.isInitiator = true;
|
||||
|
||||
let output = new WrappedBuffer();
|
||||
output.extend([ PROTOCOL_VERSION ]);
|
||||
|
||||
await this.splitRange(0, this.storage.size(), this._bound(Number.MAX_VALUE), output);
|
||||
|
||||
return this._renderOutput(output);
|
||||
}
|
||||
|
||||
setInitiator() {
|
||||
this.isInitiator = true;
|
||||
}
|
||||
|
||||
async reconcile(query) {
|
||||
let haveIds = [], needIds = [];
|
||||
query = new WrappedBuffer(loadInputBuffer(query));
|
||||
|
||||
this.lastTimestampIn = this.lastTimestampOut = 0; // reset for each message
|
||||
|
||||
let fullOutput = new WrappedBuffer();
|
||||
fullOutput.extend([ PROTOCOL_VERSION ]);
|
||||
|
||||
let protocolVersion = getByte(query);
|
||||
if (protocolVersion < 0x60 || protocolVersion > 0x6F) throw Error("invalid negentropy protocol version byte");
|
||||
if (protocolVersion !== PROTOCOL_VERSION) {
|
||||
if (this.isInitiator) throw Error("unsupported negentropy protocol version requested: " + (protocolVersion - 0x60));
|
||||
else return [this._renderOutput(fullOutput), haveIds, needIds];
|
||||
}
|
||||
|
||||
let storageSize = this.storage.size();
|
||||
let prevBound = this._bound(0);
|
||||
let prevIndex = 0;
|
||||
let skip = false;
|
||||
|
||||
while (query.length !== 0) {
|
||||
let o = new WrappedBuffer();
|
||||
|
||||
let doSkip = () => {
|
||||
if (skip) {
|
||||
skip = false;
|
||||
o.extend(this.encodeBound(prevBound));
|
||||
o.extend(encodeVarInt(Mode.Skip));
|
||||
}
|
||||
};
|
||||
|
||||
let currBound = this.decodeBound(query);
|
||||
let mode = decodeVarInt(query);
|
||||
|
||||
let lower = prevIndex;
|
||||
let upper = this.storage.findLowerBound(prevIndex, storageSize, currBound);
|
||||
|
||||
if (mode === Mode.Skip) {
|
||||
skip = true;
|
||||
} else if (mode === Mode.Fingerprint) {
|
||||
let theirFingerprint = getBytes(query, FINGERPRINT_SIZE);
|
||||
let ourFingerprint = await this.storage.fingerprint(lower, upper);
|
||||
|
||||
if (compareUint8Array(theirFingerprint, ourFingerprint) !== 0) {
|
||||
doSkip();
|
||||
await this.splitRange(lower, upper, currBound, o);
|
||||
} else {
|
||||
skip = true;
|
||||
}
|
||||
} else if (mode === Mode.IdList) {
|
||||
let numIds = decodeVarInt(query);
|
||||
|
||||
let theirElems = {}; // stringified Uint8Array -> original Uint8Array (or hex)
|
||||
for (let i = 0; i < numIds; i++) {
|
||||
let e = getBytes(query, ID_SIZE);
|
||||
theirElems[e] = e;
|
||||
}
|
||||
|
||||
this.storage.iterate(lower, upper, (item) => {
|
||||
let k = item.id;
|
||||
|
||||
if (!theirElems[k]) {
|
||||
// ID exists on our side, but not their side
|
||||
if (this.isInitiator) haveIds.push(this.wantUint8ArrayOutput ? k : uint8ArrayToHex(k));
|
||||
} else {
|
||||
// ID exists on both sides
|
||||
delete theirElems[k];
|
||||
}
|
||||
|
||||
return true;
|
||||
});
|
||||
|
||||
if (this.isInitiator) {
|
||||
skip = true;
|
||||
|
||||
for (let v of Object.values(theirElems)) {
|
||||
// ID exists on their side, but not our side
|
||||
needIds.push(this.wantUint8ArrayOutput ? v : uint8ArrayToHex(v));
|
||||
}
|
||||
} else {
|
||||
doSkip();
|
||||
|
||||
let responseIds = new WrappedBuffer();
|
||||
let numResponseIds = 0;
|
||||
let endBound = currBound;
|
||||
|
||||
this.storage.iterate(lower, upper, (item, index) => {
|
||||
if (this.exceededFrameSizeLimit(fullOutput.length + responseIds.length)) {
|
||||
endBound = item;
|
||||
upper = index; // shrink upper so that remaining range gets correct fingerprint
|
||||
return false;
|
||||
}
|
||||
|
||||
responseIds.extend(item.id);
|
||||
numResponseIds++;
|
||||
return true;
|
||||
});
|
||||
|
||||
o.extend(this.encodeBound(endBound));
|
||||
o.extend(encodeVarInt(Mode.IdList));
|
||||
o.extend(encodeVarInt(numResponseIds));
|
||||
o.extend(responseIds);
|
||||
|
||||
fullOutput.extend(o);
|
||||
o = new WrappedBuffer();
|
||||
}
|
||||
} else {
|
||||
throw Error("unexpected mode");
|
||||
}
|
||||
|
||||
if (this.exceededFrameSizeLimit(fullOutput.length + o.length)) {
|
||||
// frameSizeLimit exceeded: Stop range processing and return a fingerprint for the remaining range
|
||||
let remainingFingerprint = await this.storage.fingerprint(upper, storageSize);
|
||||
|
||||
fullOutput.extend(this.encodeBound(this._bound(Number.MAX_VALUE)));
|
||||
fullOutput.extend(encodeVarInt(Mode.Fingerprint));
|
||||
fullOutput.extend(remainingFingerprint);
|
||||
break;
|
||||
} else {
|
||||
fullOutput.extend(o);
|
||||
}
|
||||
|
||||
prevIndex = upper;
|
||||
prevBound = currBound;
|
||||
}
|
||||
|
||||
return [fullOutput.length === 1 && this.isInitiator ? null : this._renderOutput(fullOutput), haveIds, needIds];
|
||||
}
|
||||
|
||||
async splitRange(lower, upper, upperBound, o) {
|
||||
let numElems = upper - lower;
|
||||
let buckets = 16;
|
||||
|
||||
if (numElems < buckets * 2) {
|
||||
o.extend(this.encodeBound(upperBound));
|
||||
o.extend(encodeVarInt(Mode.IdList));
|
||||
|
||||
o.extend(encodeVarInt(numElems));
|
||||
this.storage.iterate(lower, upper, (item) => {
|
||||
o.extend(item.id);
|
||||
return true;
|
||||
});
|
||||
} else {
|
||||
let itemsPerBucket = Math.floor(numElems / buckets);
|
||||
let bucketsWithExtra = numElems % buckets;
|
||||
let curr = lower;
|
||||
|
||||
for (let i = 0; i < buckets; i++) {
|
||||
let bucketSize = itemsPerBucket + (i < bucketsWithExtra ? 1 : 0);
|
||||
let ourFingerprint = await this.storage.fingerprint(curr, curr + bucketSize);
|
||||
curr += bucketSize;
|
||||
|
||||
let nextBound;
|
||||
|
||||
if (curr === upper) {
|
||||
nextBound = upperBound;
|
||||
} else {
|
||||
let prevItem, currItem;
|
||||
|
||||
this.storage.iterate(curr - 1, curr + 1, (item, index) => {
|
||||
if (index === curr - 1) prevItem = item;
|
||||
else currItem = item;
|
||||
return true;
|
||||
});
|
||||
|
||||
nextBound = this.getMinimalBound(prevItem, currItem);
|
||||
}
|
||||
|
||||
o.extend(this.encodeBound(nextBound));
|
||||
o.extend(encodeVarInt(Mode.Fingerprint));
|
||||
o.extend(ourFingerprint);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_renderOutput(o) {
|
||||
o = o.unwrap();
|
||||
if (!this.wantUint8ArrayOutput) o = uint8ArrayToHex(o);
|
||||
return o;
|
||||
}
|
||||
|
||||
exceededFrameSizeLimit(n) {
|
||||
return this.frameSizeLimit && n > this.frameSizeLimit - 200;
|
||||
}
|
||||
|
||||
// Decoding
|
||||
|
||||
getBytes(buf, n) {
|
||||
if (buf.length < n) throw Error("parse ends prematurely");
|
||||
return buf.shiftN(n);
|
||||
}
|
||||
|
||||
decodeVarInt(buf) {
|
||||
let res = 0;
|
||||
|
||||
while (1) {
|
||||
if (buf.length === 0) throw Error("parse ends prematurely");
|
||||
let byte = buf.shift();
|
||||
res = (res << 7) | (byte & 127);
|
||||
if ((byte & 128) === 0) break;
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
decodeTimestampIn(encoded, state) {
|
||||
let timestamp = this.decodeVarInt(encoded);
|
||||
decodeTimestampIn(encoded) {
|
||||
let timestamp = decodeVarInt(encoded);
|
||||
timestamp = timestamp === 0 ? Number.MAX_VALUE : timestamp - 1;
|
||||
if (state.lastTimestampIn === Number.MAX_VALUE || timestamp === Number.MAX_VALUE) {
|
||||
state.lastTimestampIn = Number.MAX_VALUE;
|
||||
if (this.lastTimestampIn === Number.MAX_VALUE || timestamp === Number.MAX_VALUE) {
|
||||
this.lastTimestampIn = Number.MAX_VALUE;
|
||||
return Number.MAX_VALUE;
|
||||
}
|
||||
timestamp += state.lastTimestampIn;
|
||||
state.lastTimestampIn = timestamp;
|
||||
timestamp += this.lastTimestampIn;
|
||||
this.lastTimestampIn = timestamp;
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
decodeBound(encoded, state) {
|
||||
let timestamp = this.decodeTimestampIn(encoded, state);
|
||||
let len = this.decodeVarInt(encoded);
|
||||
if (len > this.idSize) throw Error("bound key too long");
|
||||
let id = this.getBytes(encoded, len);
|
||||
decodeBound(encoded) {
|
||||
let timestamp = this.decodeTimestampIn(encoded);
|
||||
let len = decodeVarInt(encoded);
|
||||
if (len > ID_SIZE) throw Error("bound key too long");
|
||||
let id = getBytes(encoded, len);
|
||||
return { timestamp, id };
|
||||
}
|
||||
|
||||
// Encoding
|
||||
|
||||
encodeVarInt(n) {
|
||||
if (n === 0) return new WrappedBuffer([0]);
|
||||
|
||||
let o = [];
|
||||
|
||||
while (n !== 0) {
|
||||
o.push(n & 127);
|
||||
n >>>= 7;
|
||||
}
|
||||
|
||||
o.reverse();
|
||||
|
||||
for (let i = 0; i < o.length - 1; i++) o[i] |= 128;
|
||||
|
||||
return new WrappedBuffer(o);
|
||||
}
|
||||
|
||||
encodeTimestampOut(timestamp, state) {
|
||||
encodeTimestampOut(timestamp) {
|
||||
if (timestamp === Number.MAX_VALUE) {
|
||||
state.lastTimestampOut = Number.MAX_VALUE;
|
||||
return this.encodeVarInt(0);
|
||||
this.lastTimestampOut = Number.MAX_VALUE;
|
||||
return encodeVarInt(0);
|
||||
}
|
||||
|
||||
let temp = timestamp;
|
||||
timestamp -= state.lastTimestampOut;
|
||||
state.lastTimestampOut = temp;
|
||||
return this.encodeVarInt(timestamp + 1);
|
||||
timestamp -= this.lastTimestampOut;
|
||||
this.lastTimestampOut = temp;
|
||||
return encodeVarInt(timestamp + 1);
|
||||
}
|
||||
|
||||
encodeBound(key, state) {
|
||||
encodeBound(key) {
|
||||
let output = new WrappedBuffer();
|
||||
|
||||
output.extend(this.encodeTimestampOut(key.timestamp, state));
|
||||
output.extend(this.encodeVarInt(key.id.length));
|
||||
output.extend(this.encodeTimestampOut(key.timestamp));
|
||||
output.extend(encodeVarInt(key.id.length));
|
||||
output.extend(key.id);
|
||||
|
||||
return output;
|
||||
@ -480,20 +518,27 @@ class Negentropy {
|
||||
|
||||
getMinimalBound(prev, curr) {
|
||||
if (curr.timestamp !== prev.timestamp) {
|
||||
return { timestamp: curr.timestamp, id: new Uint8Array(0), };
|
||||
return this._bound(curr.timestamp);
|
||||
} else {
|
||||
let sharedPrefixBytes = 0;
|
||||
let currKey = curr.id;
|
||||
let prevKey = prev.id;
|
||||
|
||||
for (let i = 0; i < this.idSize; i++) {
|
||||
if (curr.id[i] !== prev.id[i]) break;
|
||||
for (let i = 0; i < ID_SIZE; i++) {
|
||||
if (currKey[i] !== prevKey[i]) break;
|
||||
sharedPrefixBytes++;
|
||||
}
|
||||
|
||||
return { timestamp: curr.timestamp, id: curr.id.subarray(0, sharedPrefixBytes + 1), };
|
||||
return this._bound(curr.timestamp, curr.id.subarray(0, sharedPrefixBytes + 1));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
function loadInputBuffer(inp) {
|
||||
if (typeof(inp) === 'string') inp = hexToUint8Array(inp);
|
||||
else if (__proto__ !== Uint8Array.prototype) inp = new Uint8Array(inp); // node Buffer?
|
||||
return inp;
|
||||
}
|
||||
|
||||
function hexToUint8Array(h) {
|
||||
if (h.startsWith('0x')) h = h.substr(2);
|
||||
@ -541,4 +586,4 @@ function itemCompare(a, b) {
|
||||
}
|
||||
|
||||
|
||||
module.exports = Negentropy;
|
||||
module.exports = { Negentropy, NegentropyStorageVector, };
|
||||
|
||||
54
js/README.md
Normal file
54
js/README.md
Normal file
@ -0,0 +1,54 @@
|
||||
# Negentropy Javascript Implementation
|
||||
|
||||
The library is contained in a single javascript file. It shouldn't need any dependencies, in either a browser or node.js:
|
||||
|
||||
const Negentropy = require('Negentropy.js');
|
||||
|
||||
## Storage
|
||||
|
||||
First, you need to create a storage instance. Currently only `Vector` is implemented:
|
||||
|
||||
let storage = new NegentropyStorageVector();
|
||||
|
||||
Next, add all the items in your collection, and `seal()`:
|
||||
|
||||
for (let item of myItems) {
|
||||
storage.insert(timestamp, id);
|
||||
}
|
||||
|
||||
ne.seal();
|
||||
|
||||
* `timestamp` should be a JS `Number`
|
||||
* `id` should be a hex string, `Uint8Array`, or node.js `Buffer`
|
||||
|
||||
## Reconciliation
|
||||
|
||||
Create a Negentropy object:
|
||||
|
||||
let ne = new Negentropy(storage, 50_000);
|
||||
|
||||
* The second parameter (`50'000` above) is the `frameSizeLimit`. This can be omitted (or `0`) to permit unlimited-sized frames.
|
||||
|
||||
On the client-side, create an initial message, and then transmit it to the server, receive the response, and `reconcile` until complete (signified by returning `null` for `newMsg`):
|
||||
|
||||
let msg = await ne.initiate();
|
||||
|
||||
while (msg.length !== null) {
|
||||
let response = queryServer(msg);
|
||||
let [newMsg, have, need] = await ne.reconcile(msg);
|
||||
msg = newMsg;
|
||||
// handle have/need
|
||||
}
|
||||
|
||||
* The output `msg`s and the IDs in `have`/`need` are hex strings, but you can set `ne.wantUint8ArrayOutput = true` if you want `Uint8Array`s instead.
|
||||
|
||||
The server-side is similar, except it doesn't create an initial message, there are no `have`/`need` arrays, and `newMsg` will never be `null`:
|
||||
|
||||
while (1) {
|
||||
let msg = receiveMsgFromClient();
|
||||
let [newMsg] = await ne.reconcile(msg);
|
||||
respondToClient(newMsg);
|
||||
}
|
||||
|
||||
* The `initiate()` and `reconcile()` methods are async because the `crypto.subtle.digest()` browser API is async.
|
||||
* Timestamp values greater than `Number.MAX_VALUE` will currently cause failures.
|
||||
@ -1,7 +1,5 @@
|
||||
const readline = require('readline');
|
||||
const Negentropy = require('../../js/Negentropy.js');
|
||||
|
||||
const idSize = 16;
|
||||
const { Negentropy, NegentropyStorageVector } = require('../../js/Negentropy.js');
|
||||
|
||||
let frameSizeLimit = 0;
|
||||
if (process.env.FRAMESIZELIMIT) frameSizeLimit = parseInt(process.env.FRAMESIZELIMIT);
|
||||
@ -12,7 +10,8 @@ const rl = readline.createInterface({
|
||||
terminal: false
|
||||
});
|
||||
|
||||
let ne = new Negentropy(idSize, frameSizeLimit);
|
||||
let ne;
|
||||
let storage = new NegentropyStorageVector();
|
||||
|
||||
rl.on('line', async (line) => {
|
||||
let items = line.split(',');
|
||||
@ -21,9 +20,10 @@ rl.on('line', async (line) => {
|
||||
if (items.length !== 3) throw Error("too few items");
|
||||
let created = parseInt(items[1]);
|
||||
let id = items[2].trim();
|
||||
ne.addItem(created, id);
|
||||
storage.insert(created, id);
|
||||
} else if (items[0] == "seal") {
|
||||
ne.seal();
|
||||
storage.seal();
|
||||
ne = new Negentropy(storage, frameSizeLimit);
|
||||
} else if (items[0] == "initiate") {
|
||||
let q = await ne.initiate();
|
||||
if (frameSizeLimit && q.length/2 > frameSizeLimit) throw Error("frameSizeLimit exceeded");
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user