From 409d28004be21a253a3f103a7a73791e843df0f7 Mon Sep 17 00:00:00 2001 From: Doug Hoyte Date: Fri, 11 Aug 2023 22:16:38 -0400 Subject: [PATCH] support setting frameSizeLimit server-side --- cpp/Negentropy.h | 92 ++++++++++++++++++++++++++++---------------- test/cpp/harness.cpp | 20 +++++++--- 2 files changed, 74 insertions(+), 38 deletions(-) diff --git a/cpp/Negentropy.h b/cpp/Negentropy.h index e314827..1228c44 100644 --- a/cpp/Negentropy.h +++ b/cpp/Negentropy.h @@ -8,7 +8,7 @@ #include #include #include -#include +#include #include #include #include @@ -69,6 +69,7 @@ inline bool operator<(const XorElem &a, const XorElem &b) { struct Negentropy { uint64_t idSize; + uint64_t frameSizeLimit; struct BoundOutput { XorElem start; @@ -79,11 +80,12 @@ struct Negentropy { std::vector items; bool sealed = false; bool isInitiator = false; - uint64_t frameSizeLimit = 0; + bool continuationNeeded = false; std::deque pendingOutputs; - Negentropy(uint64_t idSize) : idSize(idSize) { + Negentropy(uint64_t idSize = 16, uint64_t frameSizeLimit = 0) : idSize(idSize), frameSizeLimit(frameSizeLimit) { if (idSize < 8 || idSize > 32) throw negentropy::err("idSize invalid"); + if (frameSizeLimit != 0 && frameSizeLimit < 4096) throw negentropy::err("frameSizeLimit too small"); } void addItem(uint64_t createdAt, std::string_view id) { @@ -100,13 +102,10 @@ struct Negentropy { sealed = true; } - std::string initiate(uint64_t frameSizeLimit_ = 0) { + std::string initiate() { if (!sealed) throw negentropy::err("not sealed"); isInitiator = true; - if (frameSizeLimit_ != 0 && frameSizeLimit_ < 1024) throw negentropy::err("frameSizeLimit too small"); - frameSizeLimit = frameSizeLimit_; - splitRange(items.begin(), items.end(), XorElem(0, ""), XorElem(MAX_U64, ""), pendingOutputs); return buildOutput(); @@ -128,6 +127,7 @@ struct Negentropy { private: void reconcileAux(std::string_view query, std::vector &haveIds, std::vector &needIds) { if (!sealed) throw negentropy::err("not sealed"); + continuationNeeded = false; auto prevBound = XorElem(0, ""); auto prevIndex = items.begin(); @@ -136,7 +136,7 @@ struct Negentropy { while (query.size()) { auto currBound = decodeBound(query, lastTimestampIn); - auto mode = decodeVarInt(query); // 0 = Skip, 1 = Fingerprint, 2 = IdList + auto mode = decodeVarInt(query); // 0 = Skip, 1 = Fingerprint, 2 = IdList, 3 = deprecated, 4 = Continuation auto lower = prevIndex; auto upper = std::upper_bound(prevIndex, items.end(), currBound); @@ -155,50 +155,67 @@ struct Negentropy { } else if (mode == 2) { // IdList auto numIds = decodeVarInt(query); - struct TheirElem { - uint64_t offset; - bool onBothSides; - }; - - std::unordered_map theirElems; + std::unordered_set theirElems; for (uint64_t i = 0; i < numIds; i++) { auto e = getBytes(query, idSize); - theirElems.emplace(e, TheirElem{i, false}); + theirElems.insert(e); } for (auto it = lower; it < upper; ++it) { - auto e = theirElems.find(std::string(it->getId())); + auto k = std::string(it->getId()); - if (e == theirElems.end()) { + if (theirElems.find(k) == theirElems.end()) { // ID exists on our side, but not their side - if (isInitiator) haveIds.emplace_back(it->getId()); + if (isInitiator) haveIds.emplace_back(k); } else { // ID exists on both sides - e->second.onBothSides = true; + theirElems.erase(k); } } - for (const auto &[k, v] : theirElems) { - if (!v.onBothSides) { + if (isInitiator) { + for (const auto &k : theirElems) { // ID exists on their side, but not our side - if (isInitiator) needIds.emplace_back(k); + needIds.emplace_back(k); } - } - - if (!isInitiator) { + } else { std::vector responseHaveIds; - for (auto it = lower; it < upper; ++it) { + auto it = lower; + bool didSplit = false; + XorElem splitBound; + + auto flushIdListOutput = [&]{ + std::string payload = encodeVarInt(2); // mode = IdList + + payload += encodeVarInt(responseHaveIds.size()); + for (const auto &id : responseHaveIds) payload += id; + + auto nextSplitBound = it >= upper ? currBound : getMinimalBound(*it, *std::next(it)); + + outputs.emplace_back(BoundOutput({ + didSplit ? splitBound : prevBound, + nextSplitBound, + std::move(payload) + })); + + splitBound = nextSplitBound; + didSplit = true; + + responseHaveIds.clear(); + }; + + for (; it < upper; ++it) { responseHaveIds.emplace_back(it->getId()); + if (responseHaveIds.size() >= 100) flushIdListOutput(); // 100*32 is less than minimum frame size limit of 4k } - std::string payload = encodeVarInt(2); // mode = IdList - - payload += encodeVarInt(responseHaveIds.size()); - for (const auto &id : responseHaveIds) payload += id; - - outputs.emplace_back(BoundOutput({ prevBound, currBound, std::move(payload) })); + flushIdListOutput(); } + } else if (mode == 3) { // Deprecated + throw negentropy::err("other side is speaking old negentropy protocol"); + } else if (mode == 4) { // Continuation + continuationNeeded = true; } else { throw negentropy::err("unexpected mode"); } @@ -260,6 +277,8 @@ struct Negentropy { std::string o; auto &p = pendingOutputs.front(); + + // When bounds are out of order, finish this message and we'll resume next time if (p.start < currBound) break; if (currBound != p.start) { @@ -270,7 +289,7 @@ struct Negentropy { o += encodeBound(p.end, lastTimestampOut); o += p.payload; - if (frameSizeLimit && output.size() + o.size() > frameSizeLimit) break; + if (frameSizeLimit && output.size() + o.size() > frameSizeLimit - 5) break; // 5 leaves room for Continuation output += o; pendingOutputs.pop_front(); @@ -278,6 +297,13 @@ struct Negentropy { currBound = p.end; } + // Server indicates that it has more to send, OR ensure client sends a non-empty message + + if ((!isInitiator && pendingOutputs.size()) || (isInitiator && output.size() == 0 && continuationNeeded)) { + output += encodeBound(XorElem(MAX_U64, ""), lastTimestampOut); + output += encodeVarInt(4); // mode = Continue + } + return output; } diff --git a/test/cpp/harness.cpp b/test/cpp/harness.cpp index b5c853c..05e845d 100644 --- a/test/cpp/harness.cpp +++ b/test/cpp/harness.cpp @@ -25,9 +25,13 @@ std::vector split(const std::string &s, char delim) { int main() { const uint64_t idSize = 16; + uint64_t frameSizeLimit1 = 0, frameSizeLimit2 = 0; + if (::getenv("FRAMESIZELIMIT1")) frameSizeLimit1 = std::stoull(::getenv("FRAMESIZELIMIT1")); + if (::getenv("FRAMESIZELIMIT2")) frameSizeLimit2 = std::stoull(::getenv("FRAMESIZELIMIT2")); + // x1 is client, x2 is server - Negentropy x1(idSize); - Negentropy x2(idSize); + Negentropy x1(idSize, frameSizeLimit1); + Negentropy x2(idSize, frameSizeLimit2); std::string line; while (std::cin) { @@ -59,14 +63,14 @@ int main() { std::string q; uint64_t round = 0; + uint64_t totalUp = 0; + uint64_t totalDown = 0; while (1) { // CLIENT -> SERVER if (round == 0) { - uint64_t frameSizeLimit = 0; - if (::getenv("FRAMESIZELIMIT")) frameSizeLimit = std::stoull(::getenv("FRAMESIZELIMIT")); - q = x1.initiate(frameSizeLimit); + q = x1.initiate(); } else { std::vector have, need; q = x1.reconcile(q, have, need); @@ -78,16 +82,22 @@ int main() { if (q.size() == 0) break; std::cerr << "[" << round << "] CLIENT -> SERVER: " << q.size() << " bytes" << std::endl; + totalUp += q.size(); + if (frameSizeLimit1 && q.size() > frameSizeLimit1) throw hoytech::error("frameSizeLimit1 exceeded"); // SERVER -> CLIENT q = x2.reconcile(q); std::cerr << "[" << round << "] SERVER -> CLIENT: " << q.size() << " bytes" << std::endl; + totalDown += q.size(); + if (frameSizeLimit2 && q.size() > frameSizeLimit2) throw hoytech::error("frameSizeLimit2 exceeded"); round++; } + std::cerr << "UP: " << totalUp << " bytes, DOWN: " << totalDown << " bytes" << std::endl; + return 0; }