frameSizeLimit

This commit is contained in:
Doug Hoyte 2023-06-11 14:37:51 -04:00
parent 740f5619e2
commit a1b6eed1a4
3 changed files with 99 additions and 56 deletions

View File

@ -7,6 +7,7 @@
#include <string>
#include <string_view>
#include <vector>
#include <deque>
#include <unordered_map>
#include <limits>
#include <algorithm>
@ -36,6 +37,11 @@ struct alignas(16) XorElem {
memcpy(id, id_.data(), idSize);
}
XorElem(uint64_t timestamp, uint64_t idSize) : timestamp(timestamp), idSize(idSize) {
if (idSize > 32) throw negentropy::err("id too big");
memset(id, '\0', sizeof(id));
}
std::string_view getId() const {
return std::string_view(id, idSize);
}
@ -52,7 +58,7 @@ struct alignas(16) XorElem {
}
bool operator==(const XorElem &other) const {
return getId() == other.getId(); // ignore timestamp
return timestamp == other.timestamp && getId() == other.getId();
}
};
@ -64,9 +70,17 @@ inline bool operator<(const XorElem &a, const XorElem &b) {
struct Negentropy {
uint64_t idSize;
struct BoundOutput {
XorElem start;
XorElem end;
std::string payload;
};
std::vector<XorElem> items;
bool sealed = false;
bool isInitiator = false;
uint64_t frameSizeLimit = 0;
std::deque<BoundOutput> pendingOutputs;
Negentropy(uint64_t idSize) : idSize(idSize) {
if (idSize < 8 || idSize > 32) throw negentropy::err("idSize invalid");
@ -86,44 +100,39 @@ struct Negentropy {
sealed = true;
}
std::string initiate() {
std::string initiate(uint64_t frameSizeLimit_ = 0) {
if (!sealed) throw negentropy::err("not sealed");
isInitiator = true;
std::string output;
uint64_t lastTimestampOut = 0;
splitRange(items.begin(), items.end(), XorElem(0, ""), XorElem(MAX_U64, ""), lastTimestampOut, output);
return output;
if (frameSizeLimit_ != 0 && frameSizeLimit_ < 1024) throw negentropy::err("frameSizeLimit too small");
frameSizeLimit = frameSizeLimit_;
splitRange(items.begin(), items.end(), XorElem(0, ""), XorElem(MAX_U64, ""), pendingOutputs);
return buildOutput();
}
std::string reconcile(std::string_view query) {
if (isInitiator) throw negentropy::err("initiator not asking for have/need IDs");
std::vector<std::string> haveIds, needIds;
return reconcileAux(query, haveIds, needIds);
reconcileAux(query, haveIds, needIds);
return buildOutput();
}
std::string reconcile(std::string_view query, std::vector<std::string> &haveIds, std::vector<std::string> &needIds) {
if (!isInitiator) throw negentropy::err("non-initiator asking for have/need IDs");
return reconcileAux(query, haveIds, needIds);
reconcileAux(query, haveIds, needIds);
return buildOutput();
}
private:
std::string reconcileAux(std::string_view query, std::vector<std::string> &haveIds, std::vector<std::string> &needIds) {
void reconcileAux(std::string_view query, std::vector<std::string> &haveIds, std::vector<std::string> &needIds) {
if (!sealed) throw negentropy::err("not sealed");
std::string output;
auto prevBound = XorElem();
auto prevBound = XorElem(0, "");
auto prevIndex = items.begin();
uint64_t lastTimestampIn = 0;
uint64_t lastTimestampOut = 0;
bool skip = false;
auto doSkip = [&]{
if (!skip) return;
skip = false;
output += encodeBound(prevBound, lastTimestampOut);
output += encodeVarInt(0); // mode = Skip
};
std::deque<BoundOutput> outputs;
while (query.size()) {
auto currBound = decodeBound(query, lastTimestampIn);
@ -133,7 +142,7 @@ struct Negentropy {
auto upper = std::upper_bound(prevIndex, items.end(), currBound);
if (mode == 0) { // Skip
skip = true;
// Do nothing
} else if (mode == 1) { // Fingerprint
XorElem theirXorSet(0, getBytes(query, idSize));
@ -141,10 +150,7 @@ struct Negentropy {
for (auto i = lower; i < upper; ++i) ourXorSet ^= *i;
if (theirXorSet.getId() != ourXorSet.getId(idSize)) {
doSkip();
splitRange(lower, upper, prevBound, currBound, lastTimestampOut, output);
} else {
skip = true;
splitRange(lower, upper, prevBound, currBound, outputs);
}
} else if (mode == 2) { // IdList
auto numIds = decodeVarInt(query);
@ -185,22 +191,19 @@ struct Negentropy {
}
if (!isInitiator) {
doSkip();
output += encodeBound(currBound, lastTimestampOut);
output += encodeVarInt(3); // mode = IdListResponse
std::string payload = encodeVarInt(3); // mode = IdListResponse
output += encodeVarInt(responseHaveIds.size());
for (const auto &id : responseHaveIds) output += id;
payload += encodeVarInt(responseHaveIds.size());
for (const auto &id : responseHaveIds) payload += id;
auto bitField = encodeBitField(responseNeedIndices);
output += encodeVarInt(bitField.size());
output += bitField;
} else {
skip = true;
payload += encodeVarInt(bitField.size());
payload += bitField;
outputs.emplace_back(BoundOutput({ prevBound, currBound, std::move(payload) }));
}
} else if (mode == 3) { // IdListResponse
if (!isInitiator) throw negentropy::err("unexpected IdListResponse");
skip = true;
auto numIds = decodeVarInt(query);
for (uint64_t i = 0; i < numIds; i++) {
@ -221,23 +224,27 @@ struct Negentropy {
prevBound = currBound;
}
return output;
while (outputs.size()) {
pendingOutputs.emplace_front(std::move(outputs.back()));
outputs.pop_back();
}
}
void splitRange(std::vector<XorElem>::iterator lower, std::vector<XorElem>::iterator upper, const XorElem &lowerBound, const XorElem &upperBound, uint64_t &lastTimestampOut, std::string &output) {
void splitRange(std::vector<XorElem>::iterator lower, std::vector<XorElem>::iterator upper, const XorElem &lowerBound, const XorElem &upperBound, std::deque<BoundOutput> &outputs) {
uint64_t numElems = upper - lower;
const uint64_t buckets = 16;
if (numElems < buckets * 2) {
output += encodeBound(upperBound, lastTimestampOut);
output += encodeVarInt(2); // mode = IdList
std::string payload = encodeVarInt(2); // mode = IdList
payload += encodeVarInt(numElems);
for (auto it = lower; it < upper; ++it) payload += it->getId(idSize);
output += encodeVarInt(numElems);
for (auto it = lower; it < upper; ++it) output += it->getId(idSize);
outputs.emplace_back(BoundOutput({ lowerBound, upperBound, std::move(payload) }));
} else {
uint64_t itemsPerBucket = numElems / buckets;
uint64_t bucketsWithExtra = numElems % buckets;
auto curr = lower;
XorElem prevBound = *curr;
for (uint64_t i = 0; i < buckets; i++) {
XorElem ourXorSet;
@ -245,15 +252,52 @@ struct Negentropy {
ourXorSet ^= *curr;
}
if (i == buckets - 1) output += encodeBound(upperBound, lastTimestampOut);
else output += encodeMinimalBound(*curr, *std::prev(curr), lastTimestampOut);
std::string payload = encodeVarInt(1); // mode = Fingerprint
payload += ourXorSet.getId(idSize);
output += encodeVarInt(1); // mode = Fingerprint
output += ourXorSet.getId(idSize);
outputs.emplace_back(BoundOutput({
i == 0 ? lowerBound : prevBound,
getMinimalBound(*std::prev(curr), *curr),
std::move(payload)
}));
prevBound = outputs.back().end;
}
outputs.back().end = upperBound;
}
}
std::string buildOutput() {
std::string output;
auto currBound = XorElem(0, "");
uint64_t lastTimestampOut = 0;
while (pendingOutputs.size()) {
std::string o;
auto &p = pendingOutputs.front();
if (p.start < currBound) break;
if (currBound != p.start) {
o += encodeBound(p.start, lastTimestampOut);
o += encodeVarInt(0); // mode = Skip
}
o += encodeBound(p.end, lastTimestampOut);
o += p.payload;
if (frameSizeLimit && output.size() + o.size() > frameSizeLimit) break;
output += o;
pendingOutputs.pop_front();
currBound = p.end;
}
return output;
}
// Decoding
std::string getBytes(std::string_view &encoded, size_t n) {
@ -336,11 +380,9 @@ struct Negentropy {
return output;
};
std::string encodeMinimalBound(const XorElem &curr, const XorElem &prev, uint64_t &lastTimestampOut) {
std::string output = encodeTimestampOut(curr.timestamp, lastTimestampOut);
XorElem getMinimalBound(const XorElem &prev, const XorElem &curr) {
if (curr.timestamp != prev.timestamp) {
output += encodeVarInt(0);
return XorElem(curr.timestamp, "");
} else {
uint64_t sharedPrefixBytes = 0;
auto currKey = curr.getId();
@ -351,12 +393,9 @@ struct Negentropy {
sharedPrefixBytes++;
}
output += encodeVarInt(sharedPrefixBytes + 1);
output += currKey.substr(0, sharedPrefixBytes + 1);
return XorElem(curr.timestamp, currKey.substr(0, sharedPrefixBytes + 1));
}
return output;
};
}
std::string encodeBitField(const std::vector<uint64_t> inds) {
if (inds.size() == 0) return "";

View File

@ -64,7 +64,9 @@ int main() {
// CLIENT -> SERVER
if (round == 0) {
q = x1.initiate();
uint64_t frameSizeLimit = 0;
if (::getenv("FRAMESIZELIMIT")) frameSizeLimit = std::stoull(::getenv("FRAMESIZELIMIT"));
q = x1.initiate(frameSizeLimit);
} else {
std::vector<std::string> have, need;
q = x1.reconcile(q, have, need);

View File

@ -22,7 +22,9 @@ srand($ENV{SEED} || 0);
my $stgen = Session::Token->new(seed => "\x00" x 1024, alphabet => '0123456789abcdef', length => $idSize * 2);
while(1) {
my $iters = $ENV{ITERS} || 100;
for (my $i = 0; $i < $iters; $i++) {
my $ids1 = {};
my $ids2 = {};