From 94c7830f50b5fb6b71e3dbff1efe5702a9939976 Mon Sep 17 00:00:00 2001 From: Doug Hoyte Date: Wed, 13 Sep 2023 23:36:09 -0400 Subject: [PATCH] major refactor to C++ impl: use SHA-256 as fingerprint function --- cpp/Negentropy.h | 210 +++++++++++++++++++++++++++++----------------- test/cpp/Makefile | 2 +- 2 files changed, 132 insertions(+), 80 deletions(-) diff --git a/cpp/Negentropy.h b/cpp/Negentropy.h index 1cc30e0..5abcc11 100644 --- a/cpp/Negentropy.h +++ b/cpp/Negentropy.h @@ -13,6 +13,7 @@ #include #include +#include namespace negentropy { @@ -22,47 +23,40 @@ const uint64_t MAX_U64 = std::numeric_limits::max(); using err = std::runtime_error; -struct alignas(16) XorElem { +enum class Mode { + Skip = 0, + Fingerprint = 1, + IdList = 2, + Deprecated = 3, + Continuation = 4, +}; + + +struct Item { uint64_t timestamp; uint64_t idSize; char id[32]; - XorElem() : timestamp(0), idSize(32) { + Item(uint64_t timestamp = 0) : timestamp(timestamp), idSize(0) { memset(id, '\0', sizeof(id)); } - XorElem(uint64_t timestamp, std::string_view id_) : timestamp(timestamp), idSize(id_.size()) { - if (idSize > 32) throw negentropy::err("id too big"); - memset(id, '\0', sizeof(id)); - memcpy(id, id_.data(), idSize); - } - - XorElem(uint64_t timestamp, uint64_t idSize) : timestamp(timestamp), idSize(idSize) { - if (idSize > 32) throw negentropy::err("id too big"); + Item(uint64_t timestamp, std::string_view id_) : timestamp(timestamp), idSize(id_.size()) { + if (id_.size() > 32) throw negentropy::err("id too big"); memset(id, '\0', sizeof(id)); + memcpy(id, id_.data(), id_.size()); } std::string_view getId() const { return std::string_view(id, idSize); } - std::string_view getId(uint64_t subSize) const { - return getId().substr(0, subSize); - } - - XorElem& operator^=(const XorElem &other) { - auto *p1 = static_cast(__builtin_assume_aligned(id, 16)); - auto *p2 = static_cast(__builtin_assume_aligned(other.id, 16)); - for (size_t i = 0; i < 32; i++) p1[i] ^= p2[i]; - return *this; - } - - bool operator==(const XorElem &other) const { + bool operator==(const Item &other) const { return timestamp == other.timestamp && getId() == other.getId(); } }; -inline bool operator<(const XorElem &a, const XorElem &b) { +inline bool operator<(const Item &a, const Item &b) { return a.timestamp != b.timestamp ? a.timestamp < b.timestamp : a.getId() < b.getId(); }; @@ -71,17 +65,19 @@ struct Negentropy { uint64_t idSize; uint64_t frameSizeLimit; - struct BoundOutput { - XorElem start; - XorElem end; + struct OutputRange { + Item start; + Item end; std::string payload; }; - std::vector items; + std::vector addedItems; + std::vector itemTimestamps; + std::string itemIds; bool sealed = false; bool isInitiator = false; bool continuationNeeded = false; - std::deque pendingOutputs; + std::deque pendingOutputs; Negentropy(uint64_t idSize = 16, uint64_t frameSizeLimit = 0) : idSize(idSize), frameSizeLimit(frameSizeLimit) { if (idSize < 8 || idSize > 32) throw negentropy::err("idSize invalid"); @@ -90,22 +86,40 @@ struct Negentropy { void addItem(uint64_t createdAt, std::string_view id) { if (sealed) throw negentropy::err("already sealed"); + if (id.size() < idSize) throw negentropy::err("bad id size"); - items.emplace_back(createdAt, id); + addedItems.emplace_back(createdAt, id.substr(0, idSize)); } void seal() { if (sealed) throw negentropy::err("already sealed"); - - std::sort(items.begin(), items.end()); sealed = true; + + std::sort(addedItems.begin(), addedItems.end()); + + if (addedItems.size() > 1) { + for (size_t i = 0; i < addedItems.size() - 1; i++) { + if (addedItems[i] == addedItems[i + 1]) throw negentropy::err("duplicate item inserted"); + } + } + + itemTimestamps.reserve(addedItems.size()); + itemIds.reserve(addedItems.size() * idSize); + + for (const auto &item : addedItems) { + itemTimestamps.push_back(item.timestamp); + itemIds += item.getId(); + } + + addedItems.clear(); + addedItems.shrink_to_fit(); } std::string initiate() { if (!sealed) throw negentropy::err("not sealed"); isInitiator = true; - splitRange(items.begin(), items.end(), XorElem(0, ""), XorElem(MAX_U64, ""), pendingOutputs); + splitRange(0, numItems(), Item(0), Item(MAX_U64), pendingOutputs); return buildOutput(); } @@ -124,34 +138,50 @@ struct Negentropy { } private: + size_t numItems() { + return itemTimestamps.size(); + } + + std::string_view getItemId(size_t i) { + return std::string_view(itemIds.data() + (i * idSize), idSize); + } + + Item getItem(size_t i) { + return Item(itemTimestamps[i], getItemId(i)); + } + + std::string computeFingerprint(size_t lower, size_t num) { + unsigned char hash[SHA256_DIGEST_LENGTH]; + SHA256(reinterpret_cast(itemIds.data() + (lower * idSize)), num * idSize, hash); + return std::string(reinterpret_cast(hash), idSize); + } + void reconcileAux(std::string_view query, std::vector &haveIds, std::vector &needIds) { if (!sealed) throw negentropy::err("not sealed"); continuationNeeded = false; - auto prevBound = XorElem(0, ""); - auto prevIndex = items.begin(); + Item prevBound; + size_t prevIndex = 0; uint64_t lastTimestampIn = 0; - std::deque outputs; + std::deque outputs; while (query.size()) { auto currBound = decodeBound(query, lastTimestampIn); - auto mode = decodeVarInt(query); // 0 = Skip, 1 = Fingerprint, 2 = IdList, 3 = deprecated, 4 = Continuation + auto mode = Mode(decodeVarInt(query)); auto lower = prevIndex; - auto upper = std::upper_bound(prevIndex, items.end(), currBound); + auto upper = findUpperBound(prevIndex, numItems(), currBound); - if (mode == 0) { // Skip + if (mode == Mode::Skip) { // Do nothing - } else if (mode == 1) { // Fingerprint - XorElem theirXorSet(0, getBytes(query, idSize)); + } else if (mode == Mode::Fingerprint) { + auto theirFingerprint = getBytes(query, idSize); + auto ourFingerprint = computeFingerprint(lower, upper - lower); - XorElem ourXorSet; - for (auto i = lower; i < upper; ++i) ourXorSet ^= *i; - - if (theirXorSet.getId() != ourXorSet.getId(idSize)) { + if (theirFingerprint != ourFingerprint) { splitRange(lower, upper, prevBound, currBound, outputs); } - } else if (mode == 2) { // IdList + } else if (mode == Mode::IdList) { auto numIds = decodeVarInt(query); std::unordered_set theirElems; @@ -160,8 +190,8 @@ struct Negentropy { theirElems.insert(e); } - for (auto it = lower; it < upper; ++it) { - auto k = std::string(it->getId()); + for (auto i = lower; i < upper; ++i) { + auto k = std::string(getItemId(i)); if (theirElems.find(k) == theirElems.end()) { // ID exists on our side, but not their side @@ -182,17 +212,17 @@ struct Negentropy { auto it = lower; bool didSplit = false; - XorElem splitBound; + Item splitBound; auto flushIdListOutput = [&]{ - std::string payload = encodeVarInt(2); // mode = IdList + std::string payload = encodeVarInt(uint64_t(Mode::IdList)); payload += encodeVarInt(responseHaveIds.size()); for (const auto &id : responseHaveIds) payload += id; - auto nextSplitBound = std::next(it) >= upper ? currBound : getMinimalBound(*it, *std::next(it)); + auto nextSplitBound = it + 1 >= upper ? currBound : getMinimalBound(getItem(it), getItem(it + 1)); - outputs.emplace_back(BoundOutput({ + outputs.emplace_back(OutputRange({ didSplit ? splitBound : prevBound, nextSplitBound, std::move(payload) @@ -205,15 +235,15 @@ struct Negentropy { }; for (; it < upper; ++it) { - responseHaveIds.emplace_back(it->getId()); + responseHaveIds.emplace_back(getItemId(it)); if (responseHaveIds.size() >= 100) flushIdListOutput(); // 100*32 is less than minimum frame size limit of 4k } flushIdListOutput(); } - } else if (mode == 3) { // Deprecated + } else if (mode == Mode::Deprecated) { throw negentropy::err("other side is speaking old negentropy protocol"); - } else if (mode == 4) { // Continuation + } else if (mode == Mode::Continuation) { continuationNeeded = true; } else { throw negentropy::err("unexpected mode"); @@ -229,34 +259,37 @@ struct Negentropy { } } - void splitRange(std::vector::iterator lower, std::vector::iterator upper, const XorElem &lowerBound, const XorElem &upperBound, std::deque &outputs) { + void splitRange(size_t lower, size_t upper, const Item &lowerBound, const Item &upperBound, std::deque &outputs) { uint64_t numElems = upper - lower; const uint64_t buckets = 16; if (numElems < buckets * 2) { - std::string payload = encodeVarInt(2); // mode = IdList + std::string payload = encodeVarInt(uint64_t(Mode::IdList)); payload += encodeVarInt(numElems); - for (auto it = lower; it < upper; ++it) payload += it->getId(idSize); + for (auto i = lower; i < upper; i++) payload += getItemId(i); - outputs.emplace_back(BoundOutput({ lowerBound, upperBound, std::move(payload) })); + outputs.emplace_back(OutputRange({ + lowerBound, + upperBound, + std::move(payload) + })); } else { uint64_t itemsPerBucket = numElems / buckets; uint64_t bucketsWithExtra = numElems % buckets; auto curr = lower; - XorElem prevBound = *curr; + Item prevBound = getItem(curr); for (uint64_t i = 0; i < buckets; i++) { - XorElem ourXorSet; - for (auto bucketEnd = curr + itemsPerBucket + (i < bucketsWithExtra ? 1 : 0); curr != bucketEnd; curr++) { - ourXorSet ^= *curr; - } + auto bucketSize = itemsPerBucket + (i < bucketsWithExtra ? 1 : 0); + auto ourFingerprint = computeFingerprint(curr, bucketSize); + curr += bucketSize; - std::string payload = encodeVarInt(1); // mode = Fingerprint - payload += ourXorSet.getId(idSize); + std::string payload = encodeVarInt(uint64_t(Mode::Fingerprint)); + payload += ourFingerprint; - outputs.emplace_back(BoundOutput({ + outputs.emplace_back(OutputRange({ i == 0 ? lowerBound : prevBound, - curr == items.end() ? upperBound : getMinimalBound(*std::prev(curr), *curr), + curr == upper ? upperBound : getMinimalBound(getItem(curr - 1), getItem(curr)), std::move(payload) })); @@ -269,7 +302,7 @@ struct Negentropy { std::string buildOutput() { std::string output; - auto currBound = XorElem(0, ""); + Item currBound; uint64_t lastTimestampOut = 0; std::sort(pendingOutputs.begin(), pendingOutputs.end(), [](const auto &a, const auto &b){ return a.start < b.start; }); @@ -279,12 +312,12 @@ struct Negentropy { auto &p = pendingOutputs.front(); - // When bounds are out of order or overlapping, finish and resume next time (shouldn't happen because of sort above) + // If bounds are out of order or overlapping, finish and resume next time (shouldn't happen because of sort above) if (p.start < currBound) break; if (currBound != p.start) { o += encodeBound(p.start, lastTimestampOut); - o += encodeVarInt(0); // mode = Skip + o += encodeVarInt(uint64_t(Mode::Skip)); } o += encodeBound(p.end, lastTimestampOut); @@ -300,13 +333,32 @@ struct Negentropy { // Server indicates that it has more to send, OR ensure client sends a non-empty message if ((!isInitiator && pendingOutputs.size()) || (isInitiator && output.size() == 0 && continuationNeeded)) { - output += encodeBound(XorElem(MAX_U64, ""), lastTimestampOut); - output += encodeVarInt(4); // mode = Continue + output += encodeBound(Item(MAX_U64), lastTimestampOut); + output += encodeVarInt(uint64_t(Mode::Continuation)); } return output; } + size_t findUpperBound(size_t first, size_t last, const Item &value) { + size_t count = last - first; + + while (count > 0) { + size_t it = first; + size_t step = count / 2; + it += step; + + if (value.timestamp == itemTimestamps[it] ? value.getId() < getItemId(it) : value.timestamp < itemTimestamps[it]) { + count = step; + } else { + first = ++it; + count -= step + 1; + } + } + + return first; + } + // Decoding @@ -340,10 +392,10 @@ struct Negentropy { return timestamp; } - XorElem decodeBound(std::string_view &encoded, uint64_t &lastTimestampIn) { + Item decodeBound(std::string_view &encoded, uint64_t &lastTimestampIn) { auto timestamp = decodeTimestampIn(encoded, lastTimestampIn); auto len = decodeVarInt(encoded); - return XorElem(timestamp, getBytes(encoded, len)); + return Item(timestamp, getBytes(encoded, len)); } @@ -380,19 +432,19 @@ struct Negentropy { return encodeVarInt(timestamp + 1); }; - std::string encodeBound(const XorElem &bound, uint64_t &lastTimestampOut) { + std::string encodeBound(const Item &bound, uint64_t &lastTimestampOut) { std::string output; output += encodeTimestampOut(bound.timestamp, lastTimestampOut); output += encodeVarInt(bound.idSize); - output += bound.getId(idSize); + output += bound.getId(); return output; }; - XorElem getMinimalBound(const XorElem &prev, const XorElem &curr) { + Item getMinimalBound(const Item &prev, const Item &curr) { if (curr.timestamp != prev.timestamp) { - return XorElem(curr.timestamp, ""); + return Item(curr.timestamp, ""); } else { uint64_t sharedPrefixBytes = 0; auto currKey = curr.getId(); @@ -403,7 +455,7 @@ struct Negentropy { sharedPrefixBytes++; } - return XorElem(curr.timestamp, currKey.substr(0, sharedPrefixBytes + 1)); + return Item(curr.timestamp, currKey.substr(0, sharedPrefixBytes + 1)); } } }; diff --git a/test/cpp/Makefile b/test/cpp/Makefile index 70d68df..c18caff 100644 --- a/test/cpp/Makefile +++ b/test/cpp/Makefile @@ -1,2 +1,2 @@ harness: harness.cpp ../../cpp/Negentropy.h - g++ -g -std=c++20 -I../../cpp/ -I ./hoytech-cpp/ harness.cpp -o harness + g++ -g -std=c++20 -I../../cpp/ -I ./hoytech-cpp/ harness.cpp -lcrypto -o harness