mirror of
https://github.com/logos-messaging/negentropy.git
synced 2026-01-03 22:43:07 +00:00
major refactor to C++ impl: use SHA-256 as fingerprint function
This commit is contained in:
parent
3876742626
commit
94c7830f50
210
cpp/Negentropy.h
210
cpp/Negentropy.h
@ -13,6 +13,7 @@
|
||||
#include <algorithm>
|
||||
#include <stdexcept>
|
||||
|
||||
#include <openssl/sha.h>
|
||||
|
||||
|
||||
namespace negentropy {
|
||||
@ -22,47 +23,40 @@ const uint64_t MAX_U64 = std::numeric_limits<uint64_t>::max();
|
||||
using err = std::runtime_error;
|
||||
|
||||
|
||||
struct alignas(16) XorElem {
|
||||
enum class Mode {
|
||||
Skip = 0,
|
||||
Fingerprint = 1,
|
||||
IdList = 2,
|
||||
Deprecated = 3,
|
||||
Continuation = 4,
|
||||
};
|
||||
|
||||
|
||||
struct Item {
|
||||
uint64_t timestamp;
|
||||
uint64_t idSize;
|
||||
char id[32];
|
||||
|
||||
XorElem() : timestamp(0), idSize(32) {
|
||||
Item(uint64_t timestamp = 0) : timestamp(timestamp), idSize(0) {
|
||||
memset(id, '\0', sizeof(id));
|
||||
}
|
||||
|
||||
XorElem(uint64_t timestamp, std::string_view id_) : timestamp(timestamp), idSize(id_.size()) {
|
||||
if (idSize > 32) throw negentropy::err("id too big");
|
||||
memset(id, '\0', sizeof(id));
|
||||
memcpy(id, id_.data(), idSize);
|
||||
}
|
||||
|
||||
XorElem(uint64_t timestamp, uint64_t idSize) : timestamp(timestamp), idSize(idSize) {
|
||||
if (idSize > 32) throw negentropy::err("id too big");
|
||||
Item(uint64_t timestamp, std::string_view id_) : timestamp(timestamp), idSize(id_.size()) {
|
||||
if (id_.size() > 32) throw negentropy::err("id too big");
|
||||
memset(id, '\0', sizeof(id));
|
||||
memcpy(id, id_.data(), id_.size());
|
||||
}
|
||||
|
||||
std::string_view getId() const {
|
||||
return std::string_view(id, idSize);
|
||||
}
|
||||
|
||||
std::string_view getId(uint64_t subSize) const {
|
||||
return getId().substr(0, subSize);
|
||||
}
|
||||
|
||||
XorElem& operator^=(const XorElem &other) {
|
||||
auto *p1 = static_cast<unsigned char *>(__builtin_assume_aligned(id, 16));
|
||||
auto *p2 = static_cast<unsigned char *>(__builtin_assume_aligned(other.id, 16));
|
||||
for (size_t i = 0; i < 32; i++) p1[i] ^= p2[i];
|
||||
return *this;
|
||||
}
|
||||
|
||||
bool operator==(const XorElem &other) const {
|
||||
bool operator==(const Item &other) const {
|
||||
return timestamp == other.timestamp && getId() == other.getId();
|
||||
}
|
||||
};
|
||||
|
||||
inline bool operator<(const XorElem &a, const XorElem &b) {
|
||||
inline bool operator<(const Item &a, const Item &b) {
|
||||
return a.timestamp != b.timestamp ? a.timestamp < b.timestamp : a.getId() < b.getId();
|
||||
};
|
||||
|
||||
@ -71,17 +65,19 @@ struct Negentropy {
|
||||
uint64_t idSize;
|
||||
uint64_t frameSizeLimit;
|
||||
|
||||
struct BoundOutput {
|
||||
XorElem start;
|
||||
XorElem end;
|
||||
struct OutputRange {
|
||||
Item start;
|
||||
Item end;
|
||||
std::string payload;
|
||||
};
|
||||
|
||||
std::vector<XorElem> items;
|
||||
std::vector<Item> addedItems;
|
||||
std::vector<uint64_t> itemTimestamps;
|
||||
std::string itemIds;
|
||||
bool sealed = false;
|
||||
bool isInitiator = false;
|
||||
bool continuationNeeded = false;
|
||||
std::deque<BoundOutput> pendingOutputs;
|
||||
std::deque<OutputRange> pendingOutputs;
|
||||
|
||||
Negentropy(uint64_t idSize = 16, uint64_t frameSizeLimit = 0) : idSize(idSize), frameSizeLimit(frameSizeLimit) {
|
||||
if (idSize < 8 || idSize > 32) throw negentropy::err("idSize invalid");
|
||||
@ -90,22 +86,40 @@ struct Negentropy {
|
||||
|
||||
void addItem(uint64_t createdAt, std::string_view id) {
|
||||
if (sealed) throw negentropy::err("already sealed");
|
||||
if (id.size() < idSize) throw negentropy::err("bad id size");
|
||||
|
||||
items.emplace_back(createdAt, id);
|
||||
addedItems.emplace_back(createdAt, id.substr(0, idSize));
|
||||
}
|
||||
|
||||
void seal() {
|
||||
if (sealed) throw negentropy::err("already sealed");
|
||||
|
||||
std::sort(items.begin(), items.end());
|
||||
sealed = true;
|
||||
|
||||
std::sort(addedItems.begin(), addedItems.end());
|
||||
|
||||
if (addedItems.size() > 1) {
|
||||
for (size_t i = 0; i < addedItems.size() - 1; i++) {
|
||||
if (addedItems[i] == addedItems[i + 1]) throw negentropy::err("duplicate item inserted");
|
||||
}
|
||||
}
|
||||
|
||||
itemTimestamps.reserve(addedItems.size());
|
||||
itemIds.reserve(addedItems.size() * idSize);
|
||||
|
||||
for (const auto &item : addedItems) {
|
||||
itemTimestamps.push_back(item.timestamp);
|
||||
itemIds += item.getId();
|
||||
}
|
||||
|
||||
addedItems.clear();
|
||||
addedItems.shrink_to_fit();
|
||||
}
|
||||
|
||||
std::string initiate() {
|
||||
if (!sealed) throw negentropy::err("not sealed");
|
||||
isInitiator = true;
|
||||
|
||||
splitRange(items.begin(), items.end(), XorElem(0, ""), XorElem(MAX_U64, ""), pendingOutputs);
|
||||
splitRange(0, numItems(), Item(0), Item(MAX_U64), pendingOutputs);
|
||||
|
||||
return buildOutput();
|
||||
}
|
||||
@ -124,34 +138,50 @@ struct Negentropy {
|
||||
}
|
||||
|
||||
private:
|
||||
size_t numItems() {
|
||||
return itemTimestamps.size();
|
||||
}
|
||||
|
||||
std::string_view getItemId(size_t i) {
|
||||
return std::string_view(itemIds.data() + (i * idSize), idSize);
|
||||
}
|
||||
|
||||
Item getItem(size_t i) {
|
||||
return Item(itemTimestamps[i], getItemId(i));
|
||||
}
|
||||
|
||||
std::string computeFingerprint(size_t lower, size_t num) {
|
||||
unsigned char hash[SHA256_DIGEST_LENGTH];
|
||||
SHA256(reinterpret_cast<unsigned char*>(itemIds.data() + (lower * idSize)), num * idSize, hash);
|
||||
return std::string(reinterpret_cast<char*>(hash), idSize);
|
||||
}
|
||||
|
||||
void reconcileAux(std::string_view query, std::vector<std::string> &haveIds, std::vector<std::string> &needIds) {
|
||||
if (!sealed) throw negentropy::err("not sealed");
|
||||
continuationNeeded = false;
|
||||
|
||||
auto prevBound = XorElem(0, "");
|
||||
auto prevIndex = items.begin();
|
||||
Item prevBound;
|
||||
size_t prevIndex = 0;
|
||||
uint64_t lastTimestampIn = 0;
|
||||
std::deque<BoundOutput> outputs;
|
||||
std::deque<OutputRange> outputs;
|
||||
|
||||
while (query.size()) {
|
||||
auto currBound = decodeBound(query, lastTimestampIn);
|
||||
auto mode = decodeVarInt(query); // 0 = Skip, 1 = Fingerprint, 2 = IdList, 3 = deprecated, 4 = Continuation
|
||||
auto mode = Mode(decodeVarInt(query));
|
||||
|
||||
auto lower = prevIndex;
|
||||
auto upper = std::upper_bound(prevIndex, items.end(), currBound);
|
||||
auto upper = findUpperBound(prevIndex, numItems(), currBound);
|
||||
|
||||
if (mode == 0) { // Skip
|
||||
if (mode == Mode::Skip) {
|
||||
// Do nothing
|
||||
} else if (mode == 1) { // Fingerprint
|
||||
XorElem theirXorSet(0, getBytes(query, idSize));
|
||||
} else if (mode == Mode::Fingerprint) {
|
||||
auto theirFingerprint = getBytes(query, idSize);
|
||||
auto ourFingerprint = computeFingerprint(lower, upper - lower);
|
||||
|
||||
XorElem ourXorSet;
|
||||
for (auto i = lower; i < upper; ++i) ourXorSet ^= *i;
|
||||
|
||||
if (theirXorSet.getId() != ourXorSet.getId(idSize)) {
|
||||
if (theirFingerprint != ourFingerprint) {
|
||||
splitRange(lower, upper, prevBound, currBound, outputs);
|
||||
}
|
||||
} else if (mode == 2) { // IdList
|
||||
} else if (mode == Mode::IdList) {
|
||||
auto numIds = decodeVarInt(query);
|
||||
|
||||
std::unordered_set<std::string> theirElems;
|
||||
@ -160,8 +190,8 @@ struct Negentropy {
|
||||
theirElems.insert(e);
|
||||
}
|
||||
|
||||
for (auto it = lower; it < upper; ++it) {
|
||||
auto k = std::string(it->getId());
|
||||
for (auto i = lower; i < upper; ++i) {
|
||||
auto k = std::string(getItemId(i));
|
||||
|
||||
if (theirElems.find(k) == theirElems.end()) {
|
||||
// ID exists on our side, but not their side
|
||||
@ -182,17 +212,17 @@ struct Negentropy {
|
||||
|
||||
auto it = lower;
|
||||
bool didSplit = false;
|
||||
XorElem splitBound;
|
||||
Item splitBound;
|
||||
|
||||
auto flushIdListOutput = [&]{
|
||||
std::string payload = encodeVarInt(2); // mode = IdList
|
||||
std::string payload = encodeVarInt(uint64_t(Mode::IdList));
|
||||
|
||||
payload += encodeVarInt(responseHaveIds.size());
|
||||
for (const auto &id : responseHaveIds) payload += id;
|
||||
|
||||
auto nextSplitBound = std::next(it) >= upper ? currBound : getMinimalBound(*it, *std::next(it));
|
||||
auto nextSplitBound = it + 1 >= upper ? currBound : getMinimalBound(getItem(it), getItem(it + 1));
|
||||
|
||||
outputs.emplace_back(BoundOutput({
|
||||
outputs.emplace_back(OutputRange({
|
||||
didSplit ? splitBound : prevBound,
|
||||
nextSplitBound,
|
||||
std::move(payload)
|
||||
@ -205,15 +235,15 @@ struct Negentropy {
|
||||
};
|
||||
|
||||
for (; it < upper; ++it) {
|
||||
responseHaveIds.emplace_back(it->getId());
|
||||
responseHaveIds.emplace_back(getItemId(it));
|
||||
if (responseHaveIds.size() >= 100) flushIdListOutput(); // 100*32 is less than minimum frame size limit of 4k
|
||||
}
|
||||
|
||||
flushIdListOutput();
|
||||
}
|
||||
} else if (mode == 3) { // Deprecated
|
||||
} else if (mode == Mode::Deprecated) {
|
||||
throw negentropy::err("other side is speaking old negentropy protocol");
|
||||
} else if (mode == 4) { // Continuation
|
||||
} else if (mode == Mode::Continuation) {
|
||||
continuationNeeded = true;
|
||||
} else {
|
||||
throw negentropy::err("unexpected mode");
|
||||
@ -229,34 +259,37 @@ struct Negentropy {
|
||||
}
|
||||
}
|
||||
|
||||
void splitRange(std::vector<XorElem>::iterator lower, std::vector<XorElem>::iterator upper, const XorElem &lowerBound, const XorElem &upperBound, std::deque<BoundOutput> &outputs) {
|
||||
void splitRange(size_t lower, size_t upper, const Item &lowerBound, const Item &upperBound, std::deque<OutputRange> &outputs) {
|
||||
uint64_t numElems = upper - lower;
|
||||
const uint64_t buckets = 16;
|
||||
|
||||
if (numElems < buckets * 2) {
|
||||
std::string payload = encodeVarInt(2); // mode = IdList
|
||||
std::string payload = encodeVarInt(uint64_t(Mode::IdList));
|
||||
payload += encodeVarInt(numElems);
|
||||
for (auto it = lower; it < upper; ++it) payload += it->getId(idSize);
|
||||
for (auto i = lower; i < upper; i++) payload += getItemId(i);
|
||||
|
||||
outputs.emplace_back(BoundOutput({ lowerBound, upperBound, std::move(payload) }));
|
||||
outputs.emplace_back(OutputRange({
|
||||
lowerBound,
|
||||
upperBound,
|
||||
std::move(payload)
|
||||
}));
|
||||
} else {
|
||||
uint64_t itemsPerBucket = numElems / buckets;
|
||||
uint64_t bucketsWithExtra = numElems % buckets;
|
||||
auto curr = lower;
|
||||
XorElem prevBound = *curr;
|
||||
Item prevBound = getItem(curr);
|
||||
|
||||
for (uint64_t i = 0; i < buckets; i++) {
|
||||
XorElem ourXorSet;
|
||||
for (auto bucketEnd = curr + itemsPerBucket + (i < bucketsWithExtra ? 1 : 0); curr != bucketEnd; curr++) {
|
||||
ourXorSet ^= *curr;
|
||||
}
|
||||
auto bucketSize = itemsPerBucket + (i < bucketsWithExtra ? 1 : 0);
|
||||
auto ourFingerprint = computeFingerprint(curr, bucketSize);
|
||||
curr += bucketSize;
|
||||
|
||||
std::string payload = encodeVarInt(1); // mode = Fingerprint
|
||||
payload += ourXorSet.getId(idSize);
|
||||
std::string payload = encodeVarInt(uint64_t(Mode::Fingerprint));
|
||||
payload += ourFingerprint;
|
||||
|
||||
outputs.emplace_back(BoundOutput({
|
||||
outputs.emplace_back(OutputRange({
|
||||
i == 0 ? lowerBound : prevBound,
|
||||
curr == items.end() ? upperBound : getMinimalBound(*std::prev(curr), *curr),
|
||||
curr == upper ? upperBound : getMinimalBound(getItem(curr - 1), getItem(curr)),
|
||||
std::move(payload)
|
||||
}));
|
||||
|
||||
@ -269,7 +302,7 @@ struct Negentropy {
|
||||
|
||||
std::string buildOutput() {
|
||||
std::string output;
|
||||
auto currBound = XorElem(0, "");
|
||||
Item currBound;
|
||||
uint64_t lastTimestampOut = 0;
|
||||
|
||||
std::sort(pendingOutputs.begin(), pendingOutputs.end(), [](const auto &a, const auto &b){ return a.start < b.start; });
|
||||
@ -279,12 +312,12 @@ struct Negentropy {
|
||||
|
||||
auto &p = pendingOutputs.front();
|
||||
|
||||
// When bounds are out of order or overlapping, finish and resume next time (shouldn't happen because of sort above)
|
||||
// If bounds are out of order or overlapping, finish and resume next time (shouldn't happen because of sort above)
|
||||
if (p.start < currBound) break;
|
||||
|
||||
if (currBound != p.start) {
|
||||
o += encodeBound(p.start, lastTimestampOut);
|
||||
o += encodeVarInt(0); // mode = Skip
|
||||
o += encodeVarInt(uint64_t(Mode::Skip));
|
||||
}
|
||||
|
||||
o += encodeBound(p.end, lastTimestampOut);
|
||||
@ -300,13 +333,32 @@ struct Negentropy {
|
||||
// Server indicates that it has more to send, OR ensure client sends a non-empty message
|
||||
|
||||
if ((!isInitiator && pendingOutputs.size()) || (isInitiator && output.size() == 0 && continuationNeeded)) {
|
||||
output += encodeBound(XorElem(MAX_U64, ""), lastTimestampOut);
|
||||
output += encodeVarInt(4); // mode = Continue
|
||||
output += encodeBound(Item(MAX_U64), lastTimestampOut);
|
||||
output += encodeVarInt(uint64_t(Mode::Continuation));
|
||||
}
|
||||
|
||||
return output;
|
||||
}
|
||||
|
||||
size_t findUpperBound(size_t first, size_t last, const Item &value) {
|
||||
size_t count = last - first;
|
||||
|
||||
while (count > 0) {
|
||||
size_t it = first;
|
||||
size_t step = count / 2;
|
||||
it += step;
|
||||
|
||||
if (value.timestamp == itemTimestamps[it] ? value.getId() < getItemId(it) : value.timestamp < itemTimestamps[it]) {
|
||||
count = step;
|
||||
} else {
|
||||
first = ++it;
|
||||
count -= step + 1;
|
||||
}
|
||||
}
|
||||
|
||||
return first;
|
||||
}
|
||||
|
||||
|
||||
// Decoding
|
||||
|
||||
@ -340,10 +392,10 @@ struct Negentropy {
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
XorElem decodeBound(std::string_view &encoded, uint64_t &lastTimestampIn) {
|
||||
Item decodeBound(std::string_view &encoded, uint64_t &lastTimestampIn) {
|
||||
auto timestamp = decodeTimestampIn(encoded, lastTimestampIn);
|
||||
auto len = decodeVarInt(encoded);
|
||||
return XorElem(timestamp, getBytes(encoded, len));
|
||||
return Item(timestamp, getBytes(encoded, len));
|
||||
}
|
||||
|
||||
|
||||
@ -380,19 +432,19 @@ struct Negentropy {
|
||||
return encodeVarInt(timestamp + 1);
|
||||
};
|
||||
|
||||
std::string encodeBound(const XorElem &bound, uint64_t &lastTimestampOut) {
|
||||
std::string encodeBound(const Item &bound, uint64_t &lastTimestampOut) {
|
||||
std::string output;
|
||||
|
||||
output += encodeTimestampOut(bound.timestamp, lastTimestampOut);
|
||||
output += encodeVarInt(bound.idSize);
|
||||
output += bound.getId(idSize);
|
||||
output += bound.getId();
|
||||
|
||||
return output;
|
||||
};
|
||||
|
||||
XorElem getMinimalBound(const XorElem &prev, const XorElem &curr) {
|
||||
Item getMinimalBound(const Item &prev, const Item &curr) {
|
||||
if (curr.timestamp != prev.timestamp) {
|
||||
return XorElem(curr.timestamp, "");
|
||||
return Item(curr.timestamp, "");
|
||||
} else {
|
||||
uint64_t sharedPrefixBytes = 0;
|
||||
auto currKey = curr.getId();
|
||||
@ -403,7 +455,7 @@ struct Negentropy {
|
||||
sharedPrefixBytes++;
|
||||
}
|
||||
|
||||
return XorElem(curr.timestamp, currKey.substr(0, sharedPrefixBytes + 1));
|
||||
return Item(curr.timestamp, currKey.substr(0, sharedPrefixBytes + 1));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@ -1,2 +1,2 @@
|
||||
harness: harness.cpp ../../cpp/Negentropy.h
|
||||
g++ -g -std=c++20 -I../../cpp/ -I ./hoytech-cpp/ harness.cpp -o harness
|
||||
g++ -g -std=c++20 -I../../cpp/ -I ./hoytech-cpp/ harness.cpp -lcrypto -o harness
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user