From 2e90762e3fabf668775b8f8e8b5a037def2a9646 Mon Sep 17 00:00:00 2001 From: Doug Hoyte Date: Fri, 15 Sep 2023 16:13:56 -0400 Subject: [PATCH] C++: protocol version 0 --- cpp/Negentropy.h | 60 ++++++++++++++++++++++++++++++++++++-------- test/cpp/harness.cpp | 6 +++-- 2 files changed, 54 insertions(+), 12 deletions(-) diff --git a/cpp/Negentropy.h b/cpp/Negentropy.h index 5abcc11..b0c5ea8 100644 --- a/cpp/Negentropy.h +++ b/cpp/Negentropy.h @@ -12,12 +12,14 @@ #include #include #include +#include #include namespace negentropy { +const uint64_t PROTOCOL_VERSION_0 = 0x60; const uint64_t MAX_U64 = std::numeric_limits::max(); using err = std::runtime_error; @@ -27,8 +29,8 @@ enum class Mode { Skip = 0, Fingerprint = 1, IdList = 2, - Deprecated = 3, - Continuation = 4, + Continuation = 3, + UnsupportedProtocolVersion = 4, }; @@ -76,6 +78,7 @@ struct Negentropy { std::string itemIds; bool sealed = false; bool isInitiator = false; + bool didHandshake = false; bool continuationNeeded = false; std::deque pendingOutputs; @@ -117,24 +120,44 @@ struct Negentropy { std::string initiate() { if (!sealed) throw negentropy::err("not sealed"); + if (didHandshake) throw negentropy::err("can't initiate after reconcile"); isInitiator = true; splitRange(0, numItems(), Item(0), Item(MAX_U64), pendingOutputs); - return buildOutput(); + auto output = std::move(buildOutput(true).value()); + return output; } std::string reconcile(std::string_view query) { if (isInitiator) throw negentropy::err("initiator not asking for have/need IDs"); + + if (!didHandshake) { + auto protocolVersion = getByte(query); + if (protocolVersion < 0x60 || protocolVersion > 0x6F) throw negentropy::err("invalid negentropy protocol version byte"); + if (protocolVersion != PROTOCOL_VERSION_0) { + std::string o; + uint64_t lastTimestampOut = 0; + o += encodeBound(Item(PROTOCOL_VERSION_0), lastTimestampOut); + o += encodeVarInt(uint64_t(Mode::UnsupportedProtocolVersion)); + return o; + } + didHandshake = true; + } + std::vector haveIds, needIds; reconcileAux(query, haveIds, needIds); - return buildOutput(); + + auto output = std::move(buildOutput(false).value()); + return output; } - std::string reconcile(std::string_view query, std::vector &haveIds, std::vector &needIds) { + std::optional reconcile(std::string_view query, std::vector &haveIds, std::vector &needIds) { if (!isInitiator) throw negentropy::err("non-initiator asking for have/need IDs"); + reconcileAux(query, haveIds, needIds); - return buildOutput(); + + return buildOutput(false); } private: @@ -241,10 +264,10 @@ struct Negentropy { flushIdListOutput(); } - } else if (mode == Mode::Deprecated) { - throw negentropy::err("other side is speaking old negentropy protocol"); } else if (mode == Mode::Continuation) { continuationNeeded = true; + } else if (mode == Mode::UnsupportedProtocolVersion) { + throw negentropy::err("server does not support our negentropy protocol version"); } else { throw negentropy::err("unexpected mode"); } @@ -300,11 +323,17 @@ struct Negentropy { } } - std::string buildOutput() { + std::optional buildOutput(bool initialMessage) { std::string output; Item currBound; uint64_t lastTimestampOut = 0; + if (initialMessage) { + if (didHandshake) throw negentropy::err("already built initial message"); + didHandshake = true; + output.push_back(PROTOCOL_VERSION_0); + } + std::sort(pendingOutputs.begin(), pendingOutputs.end(), [](const auto &a, const auto &b){ return a.start < b.start; }); while (pendingOutputs.size()) { @@ -332,11 +361,15 @@ struct Negentropy { // Server indicates that it has more to send, OR ensure client sends a non-empty message - if ((!isInitiator && pendingOutputs.size()) || (isInitiator && output.size() == 0 && continuationNeeded)) { + if (!isInitiator && pendingOutputs.size()) { output += encodeBound(Item(MAX_U64), lastTimestampOut); output += encodeVarInt(uint64_t(Mode::Continuation)); } + if (isInitiator && output.size() == 0 && !continuationNeeded) { + return std::nullopt; + } + return output; } @@ -362,6 +395,13 @@ struct Negentropy { // Decoding + uint8_t getByte(std::string_view &encoded) { + if (encoded.size() < 1) throw negentropy::err("parse ends prematurely"); + uint8_t output = encoded[0]; + encoded = encoded.substr(1); + return output; + } + std::string getBytes(std::string_view &encoded, size_t n) { if (encoded.size() < n) throw negentropy::err("parse ends prematurely"); auto res = encoded.substr(0, n); diff --git a/test/cpp/harness.cpp b/test/cpp/harness.cpp index e70c880..1912c00 100644 --- a/test/cpp/harness.cpp +++ b/test/cpp/harness.cpp @@ -54,15 +54,17 @@ int main() { if (ne.isInitiator) { std::vector have, need; - q = ne.reconcile(q, have, need); + auto resp = ne.reconcile(q, have, need); for (auto &id : have) std::cout << "have," << hoytech::to_hex(id) << "\n"; for (auto &id : need) std::cout << "need," << hoytech::to_hex(id) << "\n"; - if (q.size() == 0) { + if (!resp) { std::cout << "done" << std::endl; continue; } + + q = *resp; } else { q = ne.reconcile(q); }