diff --git a/js/Negentropy.js b/js/Negentropy.js index 83466ac..d230026 100644 --- a/js/Negentropy.js +++ b/js/Negentropy.js @@ -1,5 +1,15 @@ // (C) 2023 Doug Hoyte. MIT license +const PROTOCOL_VERSION_0 = 0x60; + +const Mode = { + Skip: 0, + Fingerprint: 1, + IdList: 2, + Continuation: 3, + UnsupportedProtocolVersion: 4, +}; + class WrappedBuffer { constructor(buffer) { this._raw = new Uint8Array(buffer || 256); @@ -146,10 +156,11 @@ class Negentropy { async initiate() { if (!this.sealed) throw Error("not sealed"); this.isInitiator = true; + if (this.didHandshake) throw Error("can't initiate after reconcile"); await this.splitRange(0, this.numItems(), this._zeroBound(), this._maxBound(), this.pendingOutputs); - return this.buildOutput(); + return this.buildOutput(true); } async reconcile(query) { @@ -164,23 +175,37 @@ class Negentropy { let state = this._newState(); let outputs = []; + if (!this.isInitiator && !this.didHandshake) { + let protocolVersion = this.getBytes(query, 1)[0]; + if (protocolVersion < 0x60 || protocolVersion > 0x6F) throw Error("invalid negentropy protocol version byte"); + if (protocolVersion !== PROTOCOL_VERSION_0) { + let o = new WrappedBuffer(); + o.extend(this.encodeBound({ timestamp: PROTOCOL_VERSION_0, id: new Uint8Array([]), }, state)); + o.extend(this.encodeVarInt(Mode.UnsupportedProtocolVersion)); + let ret = o.unwrap(); + if (!this.wantUint8ArrayOutput) ret = uint8ArrayToHex(ret); + return [ret, haveIds, needIds]; + } + this.didHandshake = true; + } + while (query.length !== 0) { let currBound = this.decodeBound(query, state); - let mode = this.decodeVarInt(query); // 0 = Skip, 1 = Fingerprint, 2 = IdList, 3 = deprecated, 4 = Continuation + let mode = this.decodeVarInt(query); let lower = prevIndex; let upper = this.findUpperBound(lower, this.numItems(), currBound); - if (mode === 0) { // Skip + if (mode === Mode.Skip) { // Do nothing - } else if (mode === 1) { // Fingerprint + } else if (mode === Mode.Fingerprint) { let theirFingerprint = this.getBytes(query, this.idSize); let ourFingerprint = await this.computeFingerprint(lower, upper - lower); if (compareUint8Array(theirFingerprint, ourFingerprint) !== 0) { await this.splitRange(lower, upper, prevBound, currBound, outputs); } - } else if (mode === 2) { // IdList + } else if (mode === Mode.IdList) { let numIds = this.decodeVarInt(query); let theirElems = {}; // stringified Uint8Array -> original Uint8Array @@ -213,7 +238,7 @@ class Negentropy { let splitBound = this._zeroBound(); let flushIdListOutput = () => { - let payload = this.encodeVarInt(2); // mode = IdList + let payload = this.encodeVarInt(Mode.IdList); payload.extend(this.encodeVarInt(responseHaveIds.length)); for (let id of responseHaveIds) payload.extend(id); @@ -239,10 +264,10 @@ class Negentropy { flushIdListOutput(); } - } else if (mode === 3) { // Deprecated - throw Error("other side is speaking old negentropy protocol"); - } else if (mode === 4) { // Continuation + } else if (mode === Mode.Continuation) { this.continuationNeeded = true; + } else if (mode === Mode.UnsupportedProtocolVersion) { + throw Error("server does not support our negentropy protocol version"); } else { throw Error("unexpected mode"); } @@ -255,7 +280,7 @@ class Negentropy { this.pendingOutputs.unshift(outputs.pop()); } - return [this.buildOutput(), haveIds, needIds]; + return [this.buildOutput(false), haveIds, needIds]; } async splitRange(lower, upper, lowerBound, upperBound, outputs) { @@ -263,7 +288,7 @@ class Negentropy { let buckets = 16; if (numElems < buckets * 2) { - let payload = this.encodeVarInt(2); // mode = IdList + let payload = this.encodeVarInt(Mode.IdList); payload.extend(this.encodeVarInt(numElems)); for (let it = lower; it < upper; ++it) payload.extend(this.getItemId(it)); @@ -283,7 +308,7 @@ class Negentropy { let ourFingerprint = await this.computeFingerprint(curr, bucketSize); curr += bucketSize; - let payload = this.encodeVarInt(1); // mode = Fingerprint + let payload = this.encodeVarInt(Mode.Fingerprint); payload.extend(ourFingerprint); outputs.push({ @@ -299,11 +324,17 @@ class Negentropy { } } - buildOutput() { + buildOutput(initialMessage) { let output = new WrappedBuffer(); let currBound = this._zeroBound(); let state = this._newState(); + if (initialMessage) { + if (this.didHandshake) throw Error("already built initial message"); + output.extend([ PROTOCOL_VERSION_0 ]); + this.didHandshake = true; + } + this.pendingOutputs.sort((a,b) => itemCompare(a.start, b.start)); while (this.pendingOutputs.length) { @@ -317,7 +348,7 @@ class Negentropy { if (cmp !== 0) { o.extend(this.encodeBound(p.start, state)); - o.extend(this.encodeVarInt(0)); // mode = Skip + o.extend(this.encodeVarInt(Mode.Skip)); } o.extend(this.encodeBound(p.end, state)); @@ -333,9 +364,13 @@ class Negentropy { // Server indicates that it has more to send, OR ensure client sends a non-empty message - if ((!this.isInitiator && this.pendingOutputs.length) || (this.isInitiator && output.length == 0 && this.continuationNeeded)) { + if (!this.isInitiator && this.pendingOutputs.length) { output.extend(this.encodeBound(this._maxBound(), state)); - output.extend(this.encodeVarInt(4)); // mode = Continue + output.extend(this.encodeVarInt(Mode.Continuation)); + } + + if (this.isInitiator && output.length === 0 && !this.continuationNeeded) { + return null; } let ret = output.unwrap(); diff --git a/test/js/harness.js b/test/js/harness.js index c37c909..b65250f 100644 --- a/test/js/harness.js +++ b/test/js/harness.js @@ -32,12 +32,13 @@ rl.on('line', async (line) => { let q = items[1]; let [newQ, haveIds, needIds] = await ne.reconcile(q); q = newQ; - if (frameSizeLimit && q.length/2 > frameSizeLimit) throw Error("frameSizeLimit exceeded"); for (let id of haveIds) console.log(`have,${id}`); for (let id of needIds) console.log(`need,${id}`); - if (ne.isInitiator && q.length === 0) { + if (frameSizeLimit && q !== null && q.length/2 > frameSizeLimit) throw Error("frameSizeLimit exceeded"); + + if (q === null) { console.log(`done`); } else { console.log(`msg,${q}`);