JS: protocol version 0

This commit is contained in:
Doug Hoyte 2023-09-15 17:44:43 -04:00
parent 2e90762e3f
commit c3466fc040
2 changed files with 54 additions and 18 deletions

View File

@ -1,5 +1,15 @@
// (C) 2023 Doug Hoyte. MIT license
const PROTOCOL_VERSION_0 = 0x60;
const Mode = {
Skip: 0,
Fingerprint: 1,
IdList: 2,
Continuation: 3,
UnsupportedProtocolVersion: 4,
};
class WrappedBuffer {
constructor(buffer) {
this._raw = new Uint8Array(buffer || 256);
@ -146,10 +156,11 @@ class Negentropy {
async initiate() {
if (!this.sealed) throw Error("not sealed");
this.isInitiator = true;
if (this.didHandshake) throw Error("can't initiate after reconcile");
await this.splitRange(0, this.numItems(), this._zeroBound(), this._maxBound(), this.pendingOutputs);
return this.buildOutput();
return this.buildOutput(true);
}
async reconcile(query) {
@ -164,23 +175,37 @@ class Negentropy {
let state = this._newState();
let outputs = [];
if (!this.isInitiator && !this.didHandshake) {
let protocolVersion = this.getBytes(query, 1)[0];
if (protocolVersion < 0x60 || protocolVersion > 0x6F) throw Error("invalid negentropy protocol version byte");
if (protocolVersion !== PROTOCOL_VERSION_0) {
let o = new WrappedBuffer();
o.extend(this.encodeBound({ timestamp: PROTOCOL_VERSION_0, id: new Uint8Array([]), }, state));
o.extend(this.encodeVarInt(Mode.UnsupportedProtocolVersion));
let ret = o.unwrap();
if (!this.wantUint8ArrayOutput) ret = uint8ArrayToHex(ret);
return [ret, haveIds, needIds];
}
this.didHandshake = true;
}
while (query.length !== 0) {
let currBound = this.decodeBound(query, state);
let mode = this.decodeVarInt(query); // 0 = Skip, 1 = Fingerprint, 2 = IdList, 3 = deprecated, 4 = Continuation
let mode = this.decodeVarInt(query);
let lower = prevIndex;
let upper = this.findUpperBound(lower, this.numItems(), currBound);
if (mode === 0) { // Skip
if (mode === Mode.Skip) {
// Do nothing
} else if (mode === 1) { // Fingerprint
} else if (mode === Mode.Fingerprint) {
let theirFingerprint = this.getBytes(query, this.idSize);
let ourFingerprint = await this.computeFingerprint(lower, upper - lower);
if (compareUint8Array(theirFingerprint, ourFingerprint) !== 0) {
await this.splitRange(lower, upper, prevBound, currBound, outputs);
}
} else if (mode === 2) { // IdList
} else if (mode === Mode.IdList) {
let numIds = this.decodeVarInt(query);
let theirElems = {}; // stringified Uint8Array -> original Uint8Array
@ -213,7 +238,7 @@ class Negentropy {
let splitBound = this._zeroBound();
let flushIdListOutput = () => {
let payload = this.encodeVarInt(2); // mode = IdList
let payload = this.encodeVarInt(Mode.IdList);
payload.extend(this.encodeVarInt(responseHaveIds.length));
for (let id of responseHaveIds) payload.extend(id);
@ -239,10 +264,10 @@ class Negentropy {
flushIdListOutput();
}
} else if (mode === 3) { // Deprecated
throw Error("other side is speaking old negentropy protocol");
} else if (mode === 4) { // Continuation
} else if (mode === Mode.Continuation) {
this.continuationNeeded = true;
} else if (mode === Mode.UnsupportedProtocolVersion) {
throw Error("server does not support our negentropy protocol version");
} else {
throw Error("unexpected mode");
}
@ -255,7 +280,7 @@ class Negentropy {
this.pendingOutputs.unshift(outputs.pop());
}
return [this.buildOutput(), haveIds, needIds];
return [this.buildOutput(false), haveIds, needIds];
}
async splitRange(lower, upper, lowerBound, upperBound, outputs) {
@ -263,7 +288,7 @@ class Negentropy {
let buckets = 16;
if (numElems < buckets * 2) {
let payload = this.encodeVarInt(2); // mode = IdList
let payload = this.encodeVarInt(Mode.IdList);
payload.extend(this.encodeVarInt(numElems));
for (let it = lower; it < upper; ++it) payload.extend(this.getItemId(it));
@ -283,7 +308,7 @@ class Negentropy {
let ourFingerprint = await this.computeFingerprint(curr, bucketSize);
curr += bucketSize;
let payload = this.encodeVarInt(1); // mode = Fingerprint
let payload = this.encodeVarInt(Mode.Fingerprint);
payload.extend(ourFingerprint);
outputs.push({
@ -299,11 +324,17 @@ class Negentropy {
}
}
buildOutput() {
buildOutput(initialMessage) {
let output = new WrappedBuffer();
let currBound = this._zeroBound();
let state = this._newState();
if (initialMessage) {
if (this.didHandshake) throw Error("already built initial message");
output.extend([ PROTOCOL_VERSION_0 ]);
this.didHandshake = true;
}
this.pendingOutputs.sort((a,b) => itemCompare(a.start, b.start));
while (this.pendingOutputs.length) {
@ -317,7 +348,7 @@ class Negentropy {
if (cmp !== 0) {
o.extend(this.encodeBound(p.start, state));
o.extend(this.encodeVarInt(0)); // mode = Skip
o.extend(this.encodeVarInt(Mode.Skip));
}
o.extend(this.encodeBound(p.end, state));
@ -333,9 +364,13 @@ class Negentropy {
// Server indicates that it has more to send, OR ensure client sends a non-empty message
if ((!this.isInitiator && this.pendingOutputs.length) || (this.isInitiator && output.length == 0 && this.continuationNeeded)) {
if (!this.isInitiator && this.pendingOutputs.length) {
output.extend(this.encodeBound(this._maxBound(), state));
output.extend(this.encodeVarInt(4)); // mode = Continue
output.extend(this.encodeVarInt(Mode.Continuation));
}
if (this.isInitiator && output.length === 0 && !this.continuationNeeded) {
return null;
}
let ret = output.unwrap();

View File

@ -32,12 +32,13 @@ rl.on('line', async (line) => {
let q = items[1];
let [newQ, haveIds, needIds] = await ne.reconcile(q);
q = newQ;
if (frameSizeLimit && q.length/2 > frameSizeLimit) throw Error("frameSizeLimit exceeded");
for (let id of haveIds) console.log(`have,${id}`);
for (let id of needIds) console.log(`need,${id}`);
if (ne.isInitiator && q.length === 0) {
if (frameSizeLimit && q !== null && q.length/2 > frameSizeLimit) throw Error("frameSizeLimit exceeded");
if (q === null) {
console.log(`done`);
} else {
console.log(`msg,${q}`);