support setting frameSizeLimit server-side

This commit is contained in:
Doug Hoyte 2023-08-11 22:16:38 -04:00
parent fd33c9a8b2
commit 409d28004b
2 changed files with 74 additions and 38 deletions

View File

@ -8,7 +8,7 @@
#include <string_view>
#include <vector>
#include <deque>
#include <unordered_map>
#include <unordered_set>
#include <limits>
#include <algorithm>
#include <stdexcept>
@ -69,6 +69,7 @@ inline bool operator<(const XorElem &a, const XorElem &b) {
struct Negentropy {
uint64_t idSize;
uint64_t frameSizeLimit;
struct BoundOutput {
XorElem start;
@ -79,11 +80,12 @@ struct Negentropy {
std::vector<XorElem> items;
bool sealed = false;
bool isInitiator = false;
uint64_t frameSizeLimit = 0;
bool continuationNeeded = false;
std::deque<BoundOutput> pendingOutputs;
Negentropy(uint64_t idSize) : idSize(idSize) {
Negentropy(uint64_t idSize = 16, uint64_t frameSizeLimit = 0) : idSize(idSize), frameSizeLimit(frameSizeLimit) {
if (idSize < 8 || idSize > 32) throw negentropy::err("idSize invalid");
if (frameSizeLimit != 0 && frameSizeLimit < 4096) throw negentropy::err("frameSizeLimit too small");
}
void addItem(uint64_t createdAt, std::string_view id) {
@ -100,13 +102,10 @@ struct Negentropy {
sealed = true;
}
std::string initiate(uint64_t frameSizeLimit_ = 0) {
std::string initiate() {
if (!sealed) throw negentropy::err("not sealed");
isInitiator = true;
if (frameSizeLimit_ != 0 && frameSizeLimit_ < 1024) throw negentropy::err("frameSizeLimit too small");
frameSizeLimit = frameSizeLimit_;
splitRange(items.begin(), items.end(), XorElem(0, ""), XorElem(MAX_U64, ""), pendingOutputs);
return buildOutput();
@ -128,6 +127,7 @@ struct Negentropy {
private:
void reconcileAux(std::string_view query, std::vector<std::string> &haveIds, std::vector<std::string> &needIds) {
if (!sealed) throw negentropy::err("not sealed");
continuationNeeded = false;
auto prevBound = XorElem(0, "");
auto prevIndex = items.begin();
@ -136,7 +136,7 @@ struct Negentropy {
while (query.size()) {
auto currBound = decodeBound(query, lastTimestampIn);
auto mode = decodeVarInt(query); // 0 = Skip, 1 = Fingerprint, 2 = IdList
auto mode = decodeVarInt(query); // 0 = Skip, 1 = Fingerprint, 2 = IdList, 3 = deprecated, 4 = Continuation
auto lower = prevIndex;
auto upper = std::upper_bound(prevIndex, items.end(), currBound);
@ -155,50 +155,67 @@ struct Negentropy {
} else if (mode == 2) { // IdList
auto numIds = decodeVarInt(query);
struct TheirElem {
uint64_t offset;
bool onBothSides;
};
std::unordered_map<std::string, TheirElem> theirElems;
std::unordered_set<std::string> theirElems;
for (uint64_t i = 0; i < numIds; i++) {
auto e = getBytes(query, idSize);
theirElems.emplace(e, TheirElem{i, false});
theirElems.insert(e);
}
for (auto it = lower; it < upper; ++it) {
auto e = theirElems.find(std::string(it->getId()));
auto k = std::string(it->getId());
if (e == theirElems.end()) {
if (theirElems.find(k) == theirElems.end()) {
// ID exists on our side, but not their side
if (isInitiator) haveIds.emplace_back(it->getId());
if (isInitiator) haveIds.emplace_back(k);
} else {
// ID exists on both sides
e->second.onBothSides = true;
theirElems.erase(k);
}
}
for (const auto &[k, v] : theirElems) {
if (!v.onBothSides) {
if (isInitiator) {
for (const auto &k : theirElems) {
// ID exists on their side, but not our side
if (isInitiator) needIds.emplace_back(k);
needIds.emplace_back(k);
}
}
if (!isInitiator) {
} else {
std::vector<std::string> responseHaveIds;
for (auto it = lower; it < upper; ++it) {
auto it = lower;
bool didSplit = false;
XorElem splitBound;
auto flushIdListOutput = [&]{
std::string payload = encodeVarInt(2); // mode = IdList
payload += encodeVarInt(responseHaveIds.size());
for (const auto &id : responseHaveIds) payload += id;
auto nextSplitBound = it >= upper ? currBound : getMinimalBound(*it, *std::next(it));
outputs.emplace_back(BoundOutput({
didSplit ? splitBound : prevBound,
nextSplitBound,
std::move(payload)
}));
splitBound = nextSplitBound;
didSplit = true;
responseHaveIds.clear();
};
for (; it < upper; ++it) {
responseHaveIds.emplace_back(it->getId());
if (responseHaveIds.size() >= 100) flushIdListOutput(); // 100*32 is less than minimum frame size limit of 4k
}
std::string payload = encodeVarInt(2); // mode = IdList
payload += encodeVarInt(responseHaveIds.size());
for (const auto &id : responseHaveIds) payload += id;
outputs.emplace_back(BoundOutput({ prevBound, currBound, std::move(payload) }));
flushIdListOutput();
}
} else if (mode == 3) { // Deprecated
throw negentropy::err("other side is speaking old negentropy protocol");
} else if (mode == 4) { // Continuation
continuationNeeded = true;
} else {
throw negentropy::err("unexpected mode");
}
@ -260,6 +277,8 @@ struct Negentropy {
std::string o;
auto &p = pendingOutputs.front();
// When bounds are out of order, finish this message and we'll resume next time
if (p.start < currBound) break;
if (currBound != p.start) {
@ -270,7 +289,7 @@ struct Negentropy {
o += encodeBound(p.end, lastTimestampOut);
o += p.payload;
if (frameSizeLimit && output.size() + o.size() > frameSizeLimit) break;
if (frameSizeLimit && output.size() + o.size() > frameSizeLimit - 5) break; // 5 leaves room for Continuation
output += o;
pendingOutputs.pop_front();
@ -278,6 +297,13 @@ struct Negentropy {
currBound = p.end;
}
// Server indicates that it has more to send, OR ensure client sends a non-empty message
if ((!isInitiator && pendingOutputs.size()) || (isInitiator && output.size() == 0 && continuationNeeded)) {
output += encodeBound(XorElem(MAX_U64, ""), lastTimestampOut);
output += encodeVarInt(4); // mode = Continue
}
return output;
}

View File

@ -25,9 +25,13 @@ std::vector<std::string> split(const std::string &s, char delim) {
int main() {
const uint64_t idSize = 16;
uint64_t frameSizeLimit1 = 0, frameSizeLimit2 = 0;
if (::getenv("FRAMESIZELIMIT1")) frameSizeLimit1 = std::stoull(::getenv("FRAMESIZELIMIT1"));
if (::getenv("FRAMESIZELIMIT2")) frameSizeLimit2 = std::stoull(::getenv("FRAMESIZELIMIT2"));
// x1 is client, x2 is server
Negentropy x1(idSize);
Negentropy x2(idSize);
Negentropy x1(idSize, frameSizeLimit1);
Negentropy x2(idSize, frameSizeLimit2);
std::string line;
while (std::cin) {
@ -59,14 +63,14 @@ int main() {
std::string q;
uint64_t round = 0;
uint64_t totalUp = 0;
uint64_t totalDown = 0;
while (1) {
// CLIENT -> SERVER
if (round == 0) {
uint64_t frameSizeLimit = 0;
if (::getenv("FRAMESIZELIMIT")) frameSizeLimit = std::stoull(::getenv("FRAMESIZELIMIT"));
q = x1.initiate(frameSizeLimit);
q = x1.initiate();
} else {
std::vector<std::string> have, need;
q = x1.reconcile(q, have, need);
@ -78,16 +82,22 @@ int main() {
if (q.size() == 0) break;
std::cerr << "[" << round << "] CLIENT -> SERVER: " << q.size() << " bytes" << std::endl;
totalUp += q.size();
if (frameSizeLimit1 && q.size() > frameSizeLimit1) throw hoytech::error("frameSizeLimit1 exceeded");
// SERVER -> CLIENT
q = x2.reconcile(q);
std::cerr << "[" << round << "] SERVER -> CLIENT: " << q.size() << " bytes" << std::endl;
totalDown += q.size();
if (frameSizeLimit2 && q.size() > frameSizeLimit2) throw hoytech::error("frameSizeLimit2 exceeded");
round++;
}
std::cerr << "UP: " << totalUp << " bytes, DOWN: " << totalDown << " bytes" << std::endl;
return 0;
}