From a1b6eed1a4e26f00137a137479712a2531429e9f Mon Sep 17 00:00:00 2001 From: Doug Hoyte Date: Sun, 11 Jun 2023 14:37:51 -0400 Subject: [PATCH] frameSizeLimit --- cpp/Negentropy.h | 147 +++++++++++++++++++++++++++---------------- test/cpp/harness.cpp | 4 +- test/test.pl | 4 +- 3 files changed, 99 insertions(+), 56 deletions(-) diff --git a/cpp/Negentropy.h b/cpp/Negentropy.h index 05b3219..abcd375 100644 --- a/cpp/Negentropy.h +++ b/cpp/Negentropy.h @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -36,6 +37,11 @@ struct alignas(16) XorElem { memcpy(id, id_.data(), idSize); } + XorElem(uint64_t timestamp, uint64_t idSize) : timestamp(timestamp), idSize(idSize) { + if (idSize > 32) throw negentropy::err("id too big"); + memset(id, '\0', sizeof(id)); + } + std::string_view getId() const { return std::string_view(id, idSize); } @@ -52,7 +58,7 @@ struct alignas(16) XorElem { } bool operator==(const XorElem &other) const { - return getId() == other.getId(); // ignore timestamp + return timestamp == other.timestamp && getId() == other.getId(); } }; @@ -64,9 +70,17 @@ inline bool operator<(const XorElem &a, const XorElem &b) { struct Negentropy { uint64_t idSize; + struct BoundOutput { + XorElem start; + XorElem end; + std::string payload; + }; + std::vector items; bool sealed = false; bool isInitiator = false; + uint64_t frameSizeLimit = 0; + std::deque pendingOutputs; Negentropy(uint64_t idSize) : idSize(idSize) { if (idSize < 8 || idSize > 32) throw negentropy::err("idSize invalid"); @@ -86,44 +100,39 @@ struct Negentropy { sealed = true; } - std::string initiate() { + std::string initiate(uint64_t frameSizeLimit_ = 0) { if (!sealed) throw negentropy::err("not sealed"); isInitiator = true; - std::string output; - uint64_t lastTimestampOut = 0; - splitRange(items.begin(), items.end(), XorElem(0, ""), XorElem(MAX_U64, ""), lastTimestampOut, output); - return output; + if (frameSizeLimit_ != 0 && frameSizeLimit_ < 1024) throw negentropy::err("frameSizeLimit too small"); + frameSizeLimit = frameSizeLimit_; + + splitRange(items.begin(), items.end(), XorElem(0, ""), XorElem(MAX_U64, ""), pendingOutputs); + + return buildOutput(); } std::string reconcile(std::string_view query) { if (isInitiator) throw negentropy::err("initiator not asking for have/need IDs"); std::vector haveIds, needIds; - return reconcileAux(query, haveIds, needIds); + reconcileAux(query, haveIds, needIds); + return buildOutput(); } std::string reconcile(std::string_view query, std::vector &haveIds, std::vector &needIds) { if (!isInitiator) throw negentropy::err("non-initiator asking for have/need IDs"); - return reconcileAux(query, haveIds, needIds); + reconcileAux(query, haveIds, needIds); + return buildOutput(); } private: - std::string reconcileAux(std::string_view query, std::vector &haveIds, std::vector &needIds) { + void reconcileAux(std::string_view query, std::vector &haveIds, std::vector &needIds) { if (!sealed) throw negentropy::err("not sealed"); - std::string output; - auto prevBound = XorElem(); + auto prevBound = XorElem(0, ""); auto prevIndex = items.begin(); uint64_t lastTimestampIn = 0; - uint64_t lastTimestampOut = 0; - bool skip = false; - - auto doSkip = [&]{ - if (!skip) return; - skip = false; - output += encodeBound(prevBound, lastTimestampOut); - output += encodeVarInt(0); // mode = Skip - }; + std::deque outputs; while (query.size()) { auto currBound = decodeBound(query, lastTimestampIn); @@ -133,7 +142,7 @@ struct Negentropy { auto upper = std::upper_bound(prevIndex, items.end(), currBound); if (mode == 0) { // Skip - skip = true; + // Do nothing } else if (mode == 1) { // Fingerprint XorElem theirXorSet(0, getBytes(query, idSize)); @@ -141,10 +150,7 @@ struct Negentropy { for (auto i = lower; i < upper; ++i) ourXorSet ^= *i; if (theirXorSet.getId() != ourXorSet.getId(idSize)) { - doSkip(); - splitRange(lower, upper, prevBound, currBound, lastTimestampOut, output); - } else { - skip = true; + splitRange(lower, upper, prevBound, currBound, outputs); } } else if (mode == 2) { // IdList auto numIds = decodeVarInt(query); @@ -185,22 +191,19 @@ struct Negentropy { } if (!isInitiator) { - doSkip(); - output += encodeBound(currBound, lastTimestampOut); - output += encodeVarInt(3); // mode = IdListResponse + std::string payload = encodeVarInt(3); // mode = IdListResponse - output += encodeVarInt(responseHaveIds.size()); - for (const auto &id : responseHaveIds) output += id; + payload += encodeVarInt(responseHaveIds.size()); + for (const auto &id : responseHaveIds) payload += id; auto bitField = encodeBitField(responseNeedIndices); - output += encodeVarInt(bitField.size()); - output += bitField; - } else { - skip = true; + payload += encodeVarInt(bitField.size()); + payload += bitField; + + outputs.emplace_back(BoundOutput({ prevBound, currBound, std::move(payload) })); } } else if (mode == 3) { // IdListResponse if (!isInitiator) throw negentropy::err("unexpected IdListResponse"); - skip = true; auto numIds = decodeVarInt(query); for (uint64_t i = 0; i < numIds; i++) { @@ -221,23 +224,27 @@ struct Negentropy { prevBound = currBound; } - return output; + while (outputs.size()) { + pendingOutputs.emplace_front(std::move(outputs.back())); + outputs.pop_back(); + } } - void splitRange(std::vector::iterator lower, std::vector::iterator upper, const XorElem &lowerBound, const XorElem &upperBound, uint64_t &lastTimestampOut, std::string &output) { + void splitRange(std::vector::iterator lower, std::vector::iterator upper, const XorElem &lowerBound, const XorElem &upperBound, std::deque &outputs) { uint64_t numElems = upper - lower; const uint64_t buckets = 16; if (numElems < buckets * 2) { - output += encodeBound(upperBound, lastTimestampOut); - output += encodeVarInt(2); // mode = IdList + std::string payload = encodeVarInt(2); // mode = IdList + payload += encodeVarInt(numElems); + for (auto it = lower; it < upper; ++it) payload += it->getId(idSize); - output += encodeVarInt(numElems); - for (auto it = lower; it < upper; ++it) output += it->getId(idSize); + outputs.emplace_back(BoundOutput({ lowerBound, upperBound, std::move(payload) })); } else { uint64_t itemsPerBucket = numElems / buckets; uint64_t bucketsWithExtra = numElems % buckets; auto curr = lower; + XorElem prevBound = *curr; for (uint64_t i = 0; i < buckets; i++) { XorElem ourXorSet; @@ -245,15 +252,52 @@ struct Negentropy { ourXorSet ^= *curr; } - if (i == buckets - 1) output += encodeBound(upperBound, lastTimestampOut); - else output += encodeMinimalBound(*curr, *std::prev(curr), lastTimestampOut); + std::string payload = encodeVarInt(1); // mode = Fingerprint + payload += ourXorSet.getId(idSize); - output += encodeVarInt(1); // mode = Fingerprint - output += ourXorSet.getId(idSize); + outputs.emplace_back(BoundOutput({ + i == 0 ? lowerBound : prevBound, + getMinimalBound(*std::prev(curr), *curr), + std::move(payload) + })); + + prevBound = outputs.back().end; } + + outputs.back().end = upperBound; } } + std::string buildOutput() { + std::string output; + auto currBound = XorElem(0, ""); + uint64_t lastTimestampOut = 0; + + while (pendingOutputs.size()) { + std::string o; + + auto &p = pendingOutputs.front(); + if (p.start < currBound) break; + + if (currBound != p.start) { + o += encodeBound(p.start, lastTimestampOut); + o += encodeVarInt(0); // mode = Skip + } + + o += encodeBound(p.end, lastTimestampOut); + o += p.payload; + + if (frameSizeLimit && output.size() + o.size() > frameSizeLimit) break; + output += o; + + pendingOutputs.pop_front(); + + currBound = p.end; + } + + return output; + } + // Decoding std::string getBytes(std::string_view &encoded, size_t n) { @@ -336,11 +380,9 @@ struct Negentropy { return output; }; - std::string encodeMinimalBound(const XorElem &curr, const XorElem &prev, uint64_t &lastTimestampOut) { - std::string output = encodeTimestampOut(curr.timestamp, lastTimestampOut); - + XorElem getMinimalBound(const XorElem &prev, const XorElem &curr) { if (curr.timestamp != prev.timestamp) { - output += encodeVarInt(0); + return XorElem(curr.timestamp, ""); } else { uint64_t sharedPrefixBytes = 0; auto currKey = curr.getId(); @@ -351,12 +393,9 @@ struct Negentropy { sharedPrefixBytes++; } - output += encodeVarInt(sharedPrefixBytes + 1); - output += currKey.substr(0, sharedPrefixBytes + 1); + return XorElem(curr.timestamp, currKey.substr(0, sharedPrefixBytes + 1)); } - - return output; - }; + } std::string encodeBitField(const std::vector inds) { if (inds.size() == 0) return ""; diff --git a/test/cpp/harness.cpp b/test/cpp/harness.cpp index df805bd..b5c853c 100644 --- a/test/cpp/harness.cpp +++ b/test/cpp/harness.cpp @@ -64,7 +64,9 @@ int main() { // CLIENT -> SERVER if (round == 0) { - q = x1.initiate(); + uint64_t frameSizeLimit = 0; + if (::getenv("FRAMESIZELIMIT")) frameSizeLimit = std::stoull(::getenv("FRAMESIZELIMIT")); + q = x1.initiate(frameSizeLimit); } else { std::vector have, need; q = x1.reconcile(q, have, need); diff --git a/test/test.pl b/test/test.pl index fdf6683..92761f1 100755 --- a/test/test.pl +++ b/test/test.pl @@ -22,7 +22,9 @@ srand($ENV{SEED} || 0); my $stgen = Session::Token->new(seed => "\x00" x 1024, alphabet => '0123456789abcdef', length => $idSize * 2); -while(1) { +my $iters = $ENV{ITERS} || 100; + +for (my $i = 0; $i < $iters; $i++) { my $ids1 = {}; my $ids2 = {};