mirror of
https://github.com/logos-messaging/negentropy.git
synced 2026-01-04 15:03:12 +00:00
C++: protocol version 0
This commit is contained in:
parent
ee97beb79b
commit
2e90762e3f
@ -12,12 +12,14 @@
|
||||
#include <limits>
|
||||
#include <algorithm>
|
||||
#include <stdexcept>
|
||||
#include <optional>
|
||||
|
||||
#include <openssl/sha.h>
|
||||
|
||||
|
||||
namespace negentropy {
|
||||
|
||||
const uint64_t PROTOCOL_VERSION_0 = 0x60;
|
||||
|
||||
const uint64_t MAX_U64 = std::numeric_limits<uint64_t>::max();
|
||||
using err = std::runtime_error;
|
||||
@ -27,8 +29,8 @@ enum class Mode {
|
||||
Skip = 0,
|
||||
Fingerprint = 1,
|
||||
IdList = 2,
|
||||
Deprecated = 3,
|
||||
Continuation = 4,
|
||||
Continuation = 3,
|
||||
UnsupportedProtocolVersion = 4,
|
||||
};
|
||||
|
||||
|
||||
@ -76,6 +78,7 @@ struct Negentropy {
|
||||
std::string itemIds;
|
||||
bool sealed = false;
|
||||
bool isInitiator = false;
|
||||
bool didHandshake = false;
|
||||
bool continuationNeeded = false;
|
||||
std::deque<OutputRange> pendingOutputs;
|
||||
|
||||
@ -117,24 +120,44 @@ struct Negentropy {
|
||||
|
||||
std::string initiate() {
|
||||
if (!sealed) throw negentropy::err("not sealed");
|
||||
if (didHandshake) throw negentropy::err("can't initiate after reconcile");
|
||||
isInitiator = true;
|
||||
|
||||
splitRange(0, numItems(), Item(0), Item(MAX_U64), pendingOutputs);
|
||||
|
||||
return buildOutput();
|
||||
auto output = std::move(buildOutput(true).value());
|
||||
return output;
|
||||
}
|
||||
|
||||
std::string reconcile(std::string_view query) {
|
||||
if (isInitiator) throw negentropy::err("initiator not asking for have/need IDs");
|
||||
|
||||
if (!didHandshake) {
|
||||
auto protocolVersion = getByte(query);
|
||||
if (protocolVersion < 0x60 || protocolVersion > 0x6F) throw negentropy::err("invalid negentropy protocol version byte");
|
||||
if (protocolVersion != PROTOCOL_VERSION_0) {
|
||||
std::string o;
|
||||
uint64_t lastTimestampOut = 0;
|
||||
o += encodeBound(Item(PROTOCOL_VERSION_0), lastTimestampOut);
|
||||
o += encodeVarInt(uint64_t(Mode::UnsupportedProtocolVersion));
|
||||
return o;
|
||||
}
|
||||
didHandshake = true;
|
||||
}
|
||||
|
||||
std::vector<std::string> haveIds, needIds;
|
||||
reconcileAux(query, haveIds, needIds);
|
||||
return buildOutput();
|
||||
|
||||
auto output = std::move(buildOutput(false).value());
|
||||
return output;
|
||||
}
|
||||
|
||||
std::string reconcile(std::string_view query, std::vector<std::string> &haveIds, std::vector<std::string> &needIds) {
|
||||
std::optional<std::string> reconcile(std::string_view query, std::vector<std::string> &haveIds, std::vector<std::string> &needIds) {
|
||||
if (!isInitiator) throw negentropy::err("non-initiator asking for have/need IDs");
|
||||
|
||||
reconcileAux(query, haveIds, needIds);
|
||||
return buildOutput();
|
||||
|
||||
return buildOutput(false);
|
||||
}
|
||||
|
||||
private:
|
||||
@ -241,10 +264,10 @@ struct Negentropy {
|
||||
|
||||
flushIdListOutput();
|
||||
}
|
||||
} else if (mode == Mode::Deprecated) {
|
||||
throw negentropy::err("other side is speaking old negentropy protocol");
|
||||
} else if (mode == Mode::Continuation) {
|
||||
continuationNeeded = true;
|
||||
} else if (mode == Mode::UnsupportedProtocolVersion) {
|
||||
throw negentropy::err("server does not support our negentropy protocol version");
|
||||
} else {
|
||||
throw negentropy::err("unexpected mode");
|
||||
}
|
||||
@ -300,11 +323,17 @@ struct Negentropy {
|
||||
}
|
||||
}
|
||||
|
||||
std::string buildOutput() {
|
||||
std::optional<std::string> buildOutput(bool initialMessage) {
|
||||
std::string output;
|
||||
Item currBound;
|
||||
uint64_t lastTimestampOut = 0;
|
||||
|
||||
if (initialMessage) {
|
||||
if (didHandshake) throw negentropy::err("already built initial message");
|
||||
didHandshake = true;
|
||||
output.push_back(PROTOCOL_VERSION_0);
|
||||
}
|
||||
|
||||
std::sort(pendingOutputs.begin(), pendingOutputs.end(), [](const auto &a, const auto &b){ return a.start < b.start; });
|
||||
|
||||
while (pendingOutputs.size()) {
|
||||
@ -332,11 +361,15 @@ struct Negentropy {
|
||||
|
||||
// Server indicates that it has more to send, OR ensure client sends a non-empty message
|
||||
|
||||
if ((!isInitiator && pendingOutputs.size()) || (isInitiator && output.size() == 0 && continuationNeeded)) {
|
||||
if (!isInitiator && pendingOutputs.size()) {
|
||||
output += encodeBound(Item(MAX_U64), lastTimestampOut);
|
||||
output += encodeVarInt(uint64_t(Mode::Continuation));
|
||||
}
|
||||
|
||||
if (isInitiator && output.size() == 0 && !continuationNeeded) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
return output;
|
||||
}
|
||||
|
||||
@ -362,6 +395,13 @@ struct Negentropy {
|
||||
|
||||
// Decoding
|
||||
|
||||
uint8_t getByte(std::string_view &encoded) {
|
||||
if (encoded.size() < 1) throw negentropy::err("parse ends prematurely");
|
||||
uint8_t output = encoded[0];
|
||||
encoded = encoded.substr(1);
|
||||
return output;
|
||||
}
|
||||
|
||||
std::string getBytes(std::string_view &encoded, size_t n) {
|
||||
if (encoded.size() < n) throw negentropy::err("parse ends prematurely");
|
||||
auto res = encoded.substr(0, n);
|
||||
|
||||
@ -54,15 +54,17 @@ int main() {
|
||||
|
||||
if (ne.isInitiator) {
|
||||
std::vector<std::string> have, need;
|
||||
q = ne.reconcile(q, have, need);
|
||||
auto resp = ne.reconcile(q, have, need);
|
||||
|
||||
for (auto &id : have) std::cout << "have," << hoytech::to_hex(id) << "\n";
|
||||
for (auto &id : need) std::cout << "need," << hoytech::to_hex(id) << "\n";
|
||||
|
||||
if (q.size() == 0) {
|
||||
if (!resp) {
|
||||
std::cout << "done" << std::endl;
|
||||
continue;
|
||||
}
|
||||
|
||||
q = *resp;
|
||||
} else {
|
||||
q = ne.reconcile(q);
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user