From 304779371c3b95c17b7f0795160aabf71c6a346c Mon Sep 17 00:00:00 2001 From: Doug Hoyte Date: Fri, 20 Oct 2023 15:25:40 -0400 Subject: [PATCH] cpp: implement protocol version 1 --- .gitmodules | 3 + cpp/Negentropy.h | 507 --------------------- cpp/README.md | 141 ++++++ cpp/negentropy.h | 323 +++++++++++++ cpp/negentropy/encoding.h | 60 +++ cpp/negentropy/storage/BTreeLMDB.h | 146 ++++++ cpp/negentropy/storage/BTreeMem.h | 48 ++ cpp/negentropy/storage/SubRange.h | 63 +++ cpp/negentropy/storage/Vector.h | 88 ++++ cpp/negentropy/storage/base.h | 22 + cpp/negentropy/storage/btree/core.h | 652 +++++++++++++++++++++++++++ cpp/negentropy/storage/btree/debug.h | 189 ++++++++ cpp/negentropy/types.h | 184 ++++++++ test/cpp/.gitignore | 6 + test/cpp/Makefile | 32 +- test/cpp/btreeFuzz.cpp | 149 ++++++ test/cpp/check.sh | 9 + test/cpp/harness.cpp | 27 +- test/cpp/lmdbTest.cpp | 157 +++++++ test/cpp/lmdbxx | 1 + test/cpp/measureSpaceUsage.cpp | 78 ++++ test/cpp/measureSpaceUsage.pl | 9 + test/cpp/subRange.cpp | 165 +++++++ test/fuzz.pl | 2 +- test/protoversion.pl | 25 +- 25 files changed, 2550 insertions(+), 536 deletions(-) delete mode 100644 cpp/Negentropy.h create mode 100644 cpp/README.md create mode 100644 cpp/negentropy.h create mode 100644 cpp/negentropy/encoding.h create mode 100644 cpp/negentropy/storage/BTreeLMDB.h create mode 100644 cpp/negentropy/storage/BTreeMem.h create mode 100644 cpp/negentropy/storage/SubRange.h create mode 100644 cpp/negentropy/storage/Vector.h create mode 100644 cpp/negentropy/storage/base.h create mode 100644 cpp/negentropy/storage/btree/core.h create mode 100644 cpp/negentropy/storage/btree/debug.h create mode 100644 cpp/negentropy/types.h create mode 100644 test/cpp/btreeFuzz.cpp create mode 100755 test/cpp/check.sh create mode 100644 test/cpp/lmdbTest.cpp create mode 160000 test/cpp/lmdbxx create mode 100644 test/cpp/measureSpaceUsage.cpp create mode 100644 test/cpp/measureSpaceUsage.pl create mode 100644 test/cpp/subRange.cpp diff --git a/.gitmodules b/.gitmodules index c29e9ec..2834d9b 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,6 @@ [submodule "test/cpp/hoytech-cpp"] path = test/cpp/hoytech-cpp url = https://github.com/hoytech/hoytech-cpp.git +[submodule "test/cpp/lmdbxx"] + path = test/cpp/lmdbxx + url = https://github.com/hoytech/lmdbxx.git diff --git a/cpp/Negentropy.h b/cpp/Negentropy.h deleted file mode 100644 index b0c5ea8..0000000 --- a/cpp/Negentropy.h +++ /dev/null @@ -1,507 +0,0 @@ -// (C) 2023 Doug Hoyte. MIT license - -#pragma once - -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include - - -namespace negentropy { - -const uint64_t PROTOCOL_VERSION_0 = 0x60; - -const uint64_t MAX_U64 = std::numeric_limits::max(); -using err = std::runtime_error; - - -enum class Mode { - Skip = 0, - Fingerprint = 1, - IdList = 2, - Continuation = 3, - UnsupportedProtocolVersion = 4, -}; - - -struct Item { - uint64_t timestamp; - uint64_t idSize; - char id[32]; - - Item(uint64_t timestamp = 0) : timestamp(timestamp), idSize(0) { - memset(id, '\0', sizeof(id)); - } - - Item(uint64_t timestamp, std::string_view id_) : timestamp(timestamp), idSize(id_.size()) { - if (id_.size() > 32) throw negentropy::err("id too big"); - memset(id, '\0', sizeof(id)); - memcpy(id, id_.data(), id_.size()); - } - - std::string_view getId() const { - return std::string_view(id, idSize); - } - - bool operator==(const Item &other) const { - return timestamp == other.timestamp && getId() == other.getId(); - } -}; - -inline bool operator<(const Item &a, const Item &b) { - return a.timestamp != b.timestamp ? a.timestamp < b.timestamp : a.getId() < b.getId(); -}; - - -struct Negentropy { - uint64_t idSize; - uint64_t frameSizeLimit; - - struct OutputRange { - Item start; - Item end; - std::string payload; - }; - - std::vector addedItems; - std::vector itemTimestamps; - std::string itemIds; - bool sealed = false; - bool isInitiator = false; - bool didHandshake = false; - bool continuationNeeded = false; - std::deque pendingOutputs; - - Negentropy(uint64_t idSize = 16, uint64_t frameSizeLimit = 0) : idSize(idSize), frameSizeLimit(frameSizeLimit) { - if (idSize < 8 || idSize > 32) throw negentropy::err("idSize invalid"); - if (frameSizeLimit != 0 && frameSizeLimit < 4096) throw negentropy::err("frameSizeLimit too small"); - } - - void addItem(uint64_t createdAt, std::string_view id) { - if (sealed) throw negentropy::err("already sealed"); - if (id.size() < idSize) throw negentropy::err("bad id size"); - - addedItems.emplace_back(createdAt, id.substr(0, idSize)); - } - - void seal() { - if (sealed) throw negentropy::err("already sealed"); - sealed = true; - - std::sort(addedItems.begin(), addedItems.end()); - - if (addedItems.size() > 1) { - for (size_t i = 0; i < addedItems.size() - 1; i++) { - if (addedItems[i] == addedItems[i + 1]) throw negentropy::err("duplicate item inserted"); - } - } - - itemTimestamps.reserve(addedItems.size()); - itemIds.reserve(addedItems.size() * idSize); - - for (const auto &item : addedItems) { - itemTimestamps.push_back(item.timestamp); - itemIds += item.getId(); - } - - addedItems.clear(); - addedItems.shrink_to_fit(); - } - - std::string initiate() { - if (!sealed) throw negentropy::err("not sealed"); - if (didHandshake) throw negentropy::err("can't initiate after reconcile"); - isInitiator = true; - - splitRange(0, numItems(), Item(0), Item(MAX_U64), pendingOutputs); - - auto output = std::move(buildOutput(true).value()); - return output; - } - - std::string reconcile(std::string_view query) { - if (isInitiator) throw negentropy::err("initiator not asking for have/need IDs"); - - if (!didHandshake) { - auto protocolVersion = getByte(query); - if (protocolVersion < 0x60 || protocolVersion > 0x6F) throw negentropy::err("invalid negentropy protocol version byte"); - if (protocolVersion != PROTOCOL_VERSION_0) { - std::string o; - uint64_t lastTimestampOut = 0; - o += encodeBound(Item(PROTOCOL_VERSION_0), lastTimestampOut); - o += encodeVarInt(uint64_t(Mode::UnsupportedProtocolVersion)); - return o; - } - didHandshake = true; - } - - std::vector haveIds, needIds; - reconcileAux(query, haveIds, needIds); - - auto output = std::move(buildOutput(false).value()); - return output; - } - - std::optional reconcile(std::string_view query, std::vector &haveIds, std::vector &needIds) { - if (!isInitiator) throw negentropy::err("non-initiator asking for have/need IDs"); - - reconcileAux(query, haveIds, needIds); - - return buildOutput(false); - } - - private: - size_t numItems() { - return itemTimestamps.size(); - } - - std::string_view getItemId(size_t i) { - return std::string_view(itemIds.data() + (i * idSize), idSize); - } - - Item getItem(size_t i) { - return Item(itemTimestamps[i], getItemId(i)); - } - - std::string computeFingerprint(size_t lower, size_t num) { - unsigned char hash[SHA256_DIGEST_LENGTH]; - SHA256(reinterpret_cast(itemIds.data() + (lower * idSize)), num * idSize, hash); - return std::string(reinterpret_cast(hash), idSize); - } - - void reconcileAux(std::string_view query, std::vector &haveIds, std::vector &needIds) { - if (!sealed) throw negentropy::err("not sealed"); - continuationNeeded = false; - - Item prevBound; - size_t prevIndex = 0; - uint64_t lastTimestampIn = 0; - std::deque outputs; - - while (query.size()) { - auto currBound = decodeBound(query, lastTimestampIn); - auto mode = Mode(decodeVarInt(query)); - - auto lower = prevIndex; - auto upper = findUpperBound(prevIndex, numItems(), currBound); - - if (mode == Mode::Skip) { - // Do nothing - } else if (mode == Mode::Fingerprint) { - auto theirFingerprint = getBytes(query, idSize); - auto ourFingerprint = computeFingerprint(lower, upper - lower); - - if (theirFingerprint != ourFingerprint) { - splitRange(lower, upper, prevBound, currBound, outputs); - } - } else if (mode == Mode::IdList) { - auto numIds = decodeVarInt(query); - - std::unordered_set theirElems; - for (uint64_t i = 0; i < numIds; i++) { - auto e = getBytes(query, idSize); - theirElems.insert(e); - } - - for (auto i = lower; i < upper; ++i) { - auto k = std::string(getItemId(i)); - - if (theirElems.find(k) == theirElems.end()) { - // ID exists on our side, but not their side - if (isInitiator) haveIds.emplace_back(k); - } else { - // ID exists on both sides - theirElems.erase(k); - } - } - - if (isInitiator) { - for (const auto &k : theirElems) { - // ID exists on their side, but not our side - needIds.emplace_back(k); - } - } else { - std::vector responseHaveIds; - - auto it = lower; - bool didSplit = false; - Item splitBound; - - auto flushIdListOutput = [&]{ - std::string payload = encodeVarInt(uint64_t(Mode::IdList)); - - payload += encodeVarInt(responseHaveIds.size()); - for (const auto &id : responseHaveIds) payload += id; - - auto nextSplitBound = it + 1 >= upper ? currBound : getMinimalBound(getItem(it), getItem(it + 1)); - - outputs.emplace_back(OutputRange({ - didSplit ? splitBound : prevBound, - nextSplitBound, - std::move(payload) - })); - - splitBound = nextSplitBound; - didSplit = true; - - responseHaveIds.clear(); - }; - - for (; it < upper; ++it) { - responseHaveIds.emplace_back(getItemId(it)); - if (responseHaveIds.size() >= 100) flushIdListOutput(); // 100*32 is less than minimum frame size limit of 4k - } - - flushIdListOutput(); - } - } else if (mode == Mode::Continuation) { - continuationNeeded = true; - } else if (mode == Mode::UnsupportedProtocolVersion) { - throw negentropy::err("server does not support our negentropy protocol version"); - } else { - throw negentropy::err("unexpected mode"); - } - - prevIndex = upper; - prevBound = currBound; - } - - while (outputs.size()) { - pendingOutputs.emplace_front(std::move(outputs.back())); - outputs.pop_back(); - } - } - - void splitRange(size_t lower, size_t upper, const Item &lowerBound, const Item &upperBound, std::deque &outputs) { - uint64_t numElems = upper - lower; - const uint64_t buckets = 16; - - if (numElems < buckets * 2) { - std::string payload = encodeVarInt(uint64_t(Mode::IdList)); - payload += encodeVarInt(numElems); - for (auto i = lower; i < upper; i++) payload += getItemId(i); - - outputs.emplace_back(OutputRange({ - lowerBound, - upperBound, - std::move(payload) - })); - } else { - uint64_t itemsPerBucket = numElems / buckets; - uint64_t bucketsWithExtra = numElems % buckets; - auto curr = lower; - Item prevBound = getItem(curr); - - for (uint64_t i = 0; i < buckets; i++) { - auto bucketSize = itemsPerBucket + (i < bucketsWithExtra ? 1 : 0); - auto ourFingerprint = computeFingerprint(curr, bucketSize); - curr += bucketSize; - - std::string payload = encodeVarInt(uint64_t(Mode::Fingerprint)); - payload += ourFingerprint; - - outputs.emplace_back(OutputRange({ - i == 0 ? lowerBound : prevBound, - curr == upper ? upperBound : getMinimalBound(getItem(curr - 1), getItem(curr)), - std::move(payload) - })); - - prevBound = outputs.back().end; - } - - outputs.back().end = upperBound; - } - } - - std::optional buildOutput(bool initialMessage) { - std::string output; - Item currBound; - uint64_t lastTimestampOut = 0; - - if (initialMessage) { - if (didHandshake) throw negentropy::err("already built initial message"); - didHandshake = true; - output.push_back(PROTOCOL_VERSION_0); - } - - std::sort(pendingOutputs.begin(), pendingOutputs.end(), [](const auto &a, const auto &b){ return a.start < b.start; }); - - while (pendingOutputs.size()) { - std::string o; - - auto &p = pendingOutputs.front(); - - // If bounds are out of order or overlapping, finish and resume next time (shouldn't happen because of sort above) - if (p.start < currBound) break; - - if (currBound != p.start) { - o += encodeBound(p.start, lastTimestampOut); - o += encodeVarInt(uint64_t(Mode::Skip)); - } - - o += encodeBound(p.end, lastTimestampOut); - o += p.payload; - - if (frameSizeLimit && output.size() + o.size() > frameSizeLimit - 5) break; // 5 leaves room for Continuation - output += o; - - currBound = p.end; - pendingOutputs.pop_front(); - } - - // Server indicates that it has more to send, OR ensure client sends a non-empty message - - if (!isInitiator && pendingOutputs.size()) { - output += encodeBound(Item(MAX_U64), lastTimestampOut); - output += encodeVarInt(uint64_t(Mode::Continuation)); - } - - if (isInitiator && output.size() == 0 && !continuationNeeded) { - return std::nullopt; - } - - return output; - } - - size_t findUpperBound(size_t first, size_t last, const Item &value) { - size_t count = last - first; - - while (count > 0) { - size_t it = first; - size_t step = count / 2; - it += step; - - if (value.timestamp == itemTimestamps[it] ? value.getId() < getItemId(it) : value.timestamp < itemTimestamps[it]) { - count = step; - } else { - first = ++it; - count -= step + 1; - } - } - - return first; - } - - - // Decoding - - uint8_t getByte(std::string_view &encoded) { - if (encoded.size() < 1) throw negentropy::err("parse ends prematurely"); - uint8_t output = encoded[0]; - encoded = encoded.substr(1); - return output; - } - - std::string getBytes(std::string_view &encoded, size_t n) { - if (encoded.size() < n) throw negentropy::err("parse ends prematurely"); - auto res = encoded.substr(0, n); - encoded = encoded.substr(n); - return std::string(res); - }; - - uint64_t decodeVarInt(std::string_view &encoded) { - uint64_t res = 0; - - while (1) { - if (encoded.size() == 0) throw negentropy::err("premature end of varint"); - uint64_t byte = encoded[0]; - encoded = encoded.substr(1); - res = (res << 7) | (byte & 0b0111'1111); - if ((byte & 0b1000'0000) == 0) break; - } - - return res; - } - - uint64_t decodeTimestampIn(std::string_view &encoded, uint64_t &lastTimestampIn) { - uint64_t timestamp = decodeVarInt(encoded); - timestamp = timestamp == 0 ? MAX_U64 : timestamp - 1; - timestamp += lastTimestampIn; - if (timestamp < lastTimestampIn) timestamp = MAX_U64; // saturate - lastTimestampIn = timestamp; - return timestamp; - } - - Item decodeBound(std::string_view &encoded, uint64_t &lastTimestampIn) { - auto timestamp = decodeTimestampIn(encoded, lastTimestampIn); - auto len = decodeVarInt(encoded); - return Item(timestamp, getBytes(encoded, len)); - } - - - // Encoding - - std::string encodeVarInt(uint64_t n) { - if (n == 0) return std::string(1, '\0'); - - std::string o; - - while (n) { - o.push_back(static_cast(n & 0x7F)); - n >>= 7; - } - - std::reverse(o.begin(), o.end()); - - for (size_t i = 0; i < o.size() - 1; i++) { - o[i] |= 0x80; - } - - return o; - } - - std::string encodeTimestampOut(uint64_t timestamp, uint64_t &lastTimestampOut) { - if (timestamp == MAX_U64) { - lastTimestampOut = MAX_U64; - return encodeVarInt(0); - } - - uint64_t temp = timestamp; - timestamp -= lastTimestampOut; - lastTimestampOut = temp; - return encodeVarInt(timestamp + 1); - }; - - std::string encodeBound(const Item &bound, uint64_t &lastTimestampOut) { - std::string output; - - output += encodeTimestampOut(bound.timestamp, lastTimestampOut); - output += encodeVarInt(bound.idSize); - output += bound.getId(); - - return output; - }; - - Item getMinimalBound(const Item &prev, const Item &curr) { - if (curr.timestamp != prev.timestamp) { - return Item(curr.timestamp, ""); - } else { - uint64_t sharedPrefixBytes = 0; - auto currKey = curr.getId(); - auto prevKey = prev.getId(); - - for (uint64_t i = 0; i < idSize; i++) { - if (currKey[i] != prevKey[i]) break; - sharedPrefixBytes++; - } - - return Item(curr.timestamp, currKey.substr(0, sharedPrefixBytes + 1)); - } - } -}; - - -} - - -using Negentropy = negentropy::Negentropy; diff --git a/cpp/README.md b/cpp/README.md new file mode 100644 index 0000000..b47b5ad --- /dev/null +++ b/cpp/README.md @@ -0,0 +1,141 @@ +# Negentropy C++ Implementation + +The C++ implementation is header-only and the only required dependency is OpenSSL (for SHA-256). The main `Negentropy` class can be imported with the following: + + #include "negentropy.h" + +## Storage + +First, you need to create a storage instance. Currently the following are available: + +### negentropy::storage::Vector + +All the elements are put into a contiguous vector in memory, and are then sorted. This can be useful for syncing the results of a dynamic query, since it can be constructed rapidly and consumes a minimal amount of memory. However, modifying it by adding or removing elements is expensive (linear in the size of the data-set). + + #include "negentropy/storage/Vector.h" + +To use `Vector`, add all your items with `insert` and then call `seal`: + + negentropy::storage::Vector storage; + + for (const auto &item : myItems) { + storage.insert(timestamp, id); + } + + storage.seal(); + +After sealing, no more items can be added. + +### negentropy::storage::BTreeMem + +Keeps the elements in an in-memory B+Tree. Computing fingerprints, adding, and removing elements are all logarithmic in data-set size. However, the elements will not be persisted to disk, and the data-structure is not thread-safe. + + #include "negentropy/storage/BTreeMem.h" + +To use `BTreeMem`, items can be added in the same way as with `Vector`, however sealing is not necessary (although is supported -- it is a no-op): + + negentropy::storage::BTreeMem storage; + + for (const auto &item : myItems) { + storage.insert(timestamp, id); + } + +More items can be added at any time, and items can be removed with `eraseItem`: + + storage.insert(timestamp, id); + storage.erase(timestamp, id); + + +### negentropy::storage::BTreeLMDB + +Uses the same implementation as BTreeMem, except that it uses [LMDB](http://lmdb.tech/) to save the data-set to persistent storage. Because the database is memory mapped, its read-performance is identical to the "in-memory" version (it is also in-memory, the memory just happens to reside in the page cache). Additionally, the tree can be concurrently accessed by multiple threads/processes using ACID transactions. + + #include "negentropy/storage/BTreeLMDB.h" + +First create an LMDB environment. Next, allocate a DBI to contain your tree(s) by calling `setupDB` inside a write transaction (don't forget to commit it). The `"test-data"` argument is the LMDB DBI table name you want to use: + + negentropy::storage::BTreeLMDB storage; + + auto env = lmdb::env::create(); + env.set_max_dbs(64); + env.open("testdb/", 0); + + lmdb::dbi btreeDbi; + + { + auto txn = lmdb::txn::begin(env); + btreeDbi = negentropy::storage::BTreeLMDB::setupDB(txn, "test-data"); + txn.commit(); + } + +To add/remove items, create a `BTreeLMDB` object inside a write transaction. This is the storage instance: + + { + auto txn = lmdb::txn::begin(env); + negentropy::storage::BTreeLMDB storage(txn, btreeDbi, 300); + + storage.insert(timestamp, id); + + storage.flush(); + txn.commit(); + } + +* The third parameter (`300` in the above example) is the `treeId`. This allows many different trees to co-exist in the same DBI. +* Storage must be flushed before commiting the transaction. `BTreeLMDB` will try to flush in its destructor. If you commit before this happens, you may see "mdb_put: Invalid argument" errors. + + +### negentropy::storage::SubRange + +This storage is a proxy to a sub-range of another storage. It is useful for performing partial syncs of the DB. + +The constructor arguments are the large storage you want to proxy to (of type `Vector`, `BTreeLMDB`, etc), and the lower and upper bounds of the desired sub-range. As usual, lower bounds are inclusive and upper bounds are exclusive: + + negentropy::storage::SubRange subStorage(storage, negentropy::Bound(fromTimestamp), negentropy::Bound(toTimestamp)); + + +## Reconciliation + +Reconciliation works mostly the same for all storage types. First create a `Negentropy` object: + + auto ne = Negentropy(storage, 50'000); + +* The object is templated on the storage type, but can often be auto-deduced (as above). +* The second parameter (`50'000` above) is the `frameSizeLimit`. This can be omitted (or `0`) to permit unlimited-sized frames. + +On the client-side, create an initial message, and then transmit it to the server, receive the response, and `reconcile` until complete: + + std::string msg = ne.initiate(); + + while (true) { + std::string response = queryServer(msg); + + std::vector have, need; + std::optional newMsg = ne.reconcile(response, have, need); + + // handle have/need + + if (!newMsg) break; // done + else std::swap(msg, *newMsg); + } + +In each loop iteration, `have` contains IDs that the client has that the server doesn't, and `need` contains IDs that the server has that the client doesn't. + +The server-side is similar, except it doesn't create an initial message, there are no `have`/`need` arrays, and it doesn't return an optional (servers must always reply to a request): + + while (true) { + std::string msg = receiveMsgFromClient(); + std::string response = ne.reconcile(msg); + respondToClient(response); + } + + + +## BTree Implementation + +The BTree implementation is technically a B+Tree since all records are stored in the leaves. Every node has `next` and `prev` pointers that point to the neighbour nodes on the same level, which allows efficient iteration. + +Each node has an accumulator that contains the sum of the IDs of all nodes below it, allowing fingerprints to be computed in logarithmic time relative to the number of tree leaves. + +Nodes will split and rebalance themselves as necessary to keep the tree balanced. This is a major advantage over rigid data-structures like merkle-search trees and prolly trees, which are only probabilisticly balanced. + +If records are always inserted to the "right" of the tree, nodes will be fully packed. Otherwise, the tree attempts to keep them 50% full. There are more details on the tree invariants in the `negentropy/storage/btree/core.h` implementation file. diff --git a/cpp/negentropy.h b/cpp/negentropy.h new file mode 100644 index 0000000..d3de53e --- /dev/null +++ b/cpp/negentropy.h @@ -0,0 +1,323 @@ +// (C) 2023 Doug Hoyte. MIT license + +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "negentropy/encoding.h" +#include "negentropy/types.h" +#include "negentropy/storage/base.h" + + +namespace negentropy { + +const uint64_t PROTOCOL_VERSION = 0x61; // Version 1 + +const uint64_t MAX_U64 = std::numeric_limits::max(); +using err = std::runtime_error; + + + +template +struct Negentropy { + StorageImpl &storage; + uint64_t frameSizeLimit; + + bool isInitiator = false; + + uint64_t lastTimestampIn = 0; + uint64_t lastTimestampOut = 0; + + Negentropy(StorageImpl &storage, uint64_t frameSizeLimit = 0) : storage(storage), frameSizeLimit(frameSizeLimit) { + if (frameSizeLimit != 0 && frameSizeLimit < 4096) throw negentropy::err("frameSizeLimit too small"); + } + + std::string initiate() { + if (isInitiator) throw negentropy::err("already initiated"); + isInitiator = true; + + std::string output; + output.push_back(PROTOCOL_VERSION); + + output += splitRange(0, storage.size(), Bound(MAX_U64)); + + return output; + } + + void setInitiator() { + isInitiator = true; + } + + std::string reconcile(std::string_view query) { + if (isInitiator) throw negentropy::err("initiator not asking for have/need IDs"); + + std::vector haveIds, needIds; + return reconcileAux(query, haveIds, needIds); + } + + std::optional reconcile(std::string_view query, std::vector &haveIds, std::vector &needIds) { + if (!isInitiator) throw negentropy::err("non-initiator asking for have/need IDs"); + + auto output = reconcileAux(query, haveIds, needIds); + if (output.size() == 1) return std::nullopt; + return output; + } + + private: + std::string reconcileAux(std::string_view query, std::vector &haveIds, std::vector &needIds) { + lastTimestampIn = lastTimestampOut = 0; // reset for each message + + std::string fullOutput; + fullOutput.push_back(PROTOCOL_VERSION); + + auto protocolVersion = getByte(query); + if (protocolVersion < 0x60 || protocolVersion > 0x6F) throw negentropy::err("invalid negentropy protocol version byte"); + if (protocolVersion != PROTOCOL_VERSION) { + if (isInitiator) throw negentropy::err(std::string("unsupported negentropy protocol version requested") + std::to_string(protocolVersion - 0x60)); + else return fullOutput; + } + + uint64_t storageSize = storage.size(); + Bound prevBound; + size_t prevIndex = 0; + bool skip = false; + + while (query.size()) { + std::string o; + + auto doSkip = [&]{ + if (skip) { + skip = false; + o += encodeBound(prevBound); + o += encodeVarInt(uint64_t(Mode::Skip)); + } + }; + + auto currBound = decodeBound(query); + auto mode = Mode(decodeVarInt(query)); + + auto lower = prevIndex; + auto upper = storage.findLowerBound(prevIndex, storageSize, currBound); + + if (mode == Mode::Skip) { + skip = true; + } else if (mode == Mode::Fingerprint) { + auto theirFingerprint = getBytes(query, FINGERPRINT_SIZE); + auto ourFingerprint = storage.fingerprint(lower, upper); + + if (theirFingerprint != ourFingerprint.sv()) { + doSkip(); + o += splitRange(lower, upper, currBound); + } else { + skip = true; + } + } else if (mode == Mode::IdList) { + auto numIds = decodeVarInt(query); + + std::unordered_set theirElems; + for (uint64_t i = 0; i < numIds; i++) { + auto e = getBytes(query, ID_SIZE); + theirElems.insert(e); + } + + storage.iterate(lower, upper, [&](const Item &item, size_t){ + auto k = std::string(item.getId()); + + if (theirElems.find(k) == theirElems.end()) { + // ID exists on our side, but not their side + if (isInitiator) haveIds.emplace_back(k); + } else { + // ID exists on both sides + theirElems.erase(k); + } + + return true; + }); + + if (isInitiator) { + skip = true; + + for (const auto &k : theirElems) { + // ID exists on their side, but not our side + needIds.emplace_back(k); + } + } else { + doSkip(); + + std::string responseIds; + uint64_t numResponseIds = 0; + Bound endBound = currBound; + + storage.iterate(lower, upper, [&](const Item &item, size_t index){ + if (exceededFrameSizeLimit(fullOutput.size() + responseIds.size())) { + endBound = Bound(item); + upper = index; // shrink upper so that remaining range gets correct fingerprint + return false; + } + + responseIds += item.getId(); + numResponseIds++; + return true; + }); + + o += encodeBound(endBound); + o += encodeVarInt(uint64_t(Mode::IdList)); + o += encodeVarInt(numResponseIds); + o += responseIds; + + fullOutput += o; + o.clear(); + } + } else { + throw negentropy::err("unexpected mode"); + } + + if (exceededFrameSizeLimit(fullOutput.size() + o.size())) { + // frameSizeLimit exceeded: Stop range processing and return a fingerprint for the remaining range + auto remainingFingerprint = storage.fingerprint(upper, storageSize); + + fullOutput += encodeBound(Bound(MAX_U64)); + fullOutput += encodeVarInt(uint64_t(Mode::Fingerprint)); + fullOutput += remainingFingerprint.sv(); + break; + } else { + fullOutput += o; + } + + prevIndex = upper; + prevBound = currBound; + } + + return fullOutput; + } + + std::string splitRange(size_t lower, size_t upper, const Bound &upperBound) { + std::string o; + + uint64_t numElems = upper - lower; + const uint64_t buckets = 16; + + if (numElems < buckets * 2) { + o += encodeBound(upperBound); + o += encodeVarInt(uint64_t(Mode::IdList)); + + o += encodeVarInt(numElems); + storage.iterate(lower, upper, [&](const Item &item, size_t){ + o += item.getId(); + return true; + }); + } else { + uint64_t itemsPerBucket = numElems / buckets; + uint64_t bucketsWithExtra = numElems % buckets; + auto curr = lower; + + for (uint64_t i = 0; i < buckets; i++) { + auto bucketSize = itemsPerBucket + (i < bucketsWithExtra ? 1 : 0); + auto ourFingerprint = storage.fingerprint(curr, curr + bucketSize); + curr += bucketSize; + + Bound nextBound; + + if (curr == upper) { + nextBound = upperBound; + } else { + Item prevItem, currItem; + + storage.iterate(curr - 1, curr + 1, [&](const Item &item, size_t index){ + if (index == curr - 1) prevItem = item; + else currItem = item; + return true; + }); + + nextBound = getMinimalBound(prevItem, currItem); + } + + o += encodeBound(nextBound); + o += encodeVarInt(uint64_t(Mode::Fingerprint)); + o += ourFingerprint.sv(); + } + } + + return o; + } + + bool exceededFrameSizeLimit(size_t n) { + return frameSizeLimit && n > frameSizeLimit - 200; + } + + // Decoding + + uint64_t decodeTimestampIn(std::string_view &encoded) { + uint64_t timestamp = decodeVarInt(encoded); + timestamp = timestamp == 0 ? MAX_U64 : timestamp - 1; + timestamp += lastTimestampIn; + if (timestamp < lastTimestampIn) timestamp = MAX_U64; // saturate + lastTimestampIn = timestamp; + return timestamp; + } + + Bound decodeBound(std::string_view &encoded) { + auto timestamp = decodeTimestampIn(encoded); + auto len = decodeVarInt(encoded); + return Bound(timestamp, getBytes(encoded, len)); + } + + // Encoding + + std::string encodeTimestampOut(uint64_t timestamp) { + if (timestamp == MAX_U64) { + lastTimestampOut = MAX_U64; + return encodeVarInt(0); + } + + uint64_t temp = timestamp; + timestamp -= lastTimestampOut; + lastTimestampOut = temp; + return encodeVarInt(timestamp + 1); + }; + + std::string encodeBound(const Bound &bound) { + std::string output; + + output += encodeTimestampOut(bound.item.timestamp); + output += encodeVarInt(bound.idLen); + output += bound.item.getId().substr(0, bound.idLen); + + return output; + }; + + Bound getMinimalBound(const Item &prev, const Item &curr) { + if (curr.timestamp != prev.timestamp) { + return Bound(curr.timestamp); + } else { + uint64_t sharedPrefixBytes = 0; + auto currKey = curr.getId(); + auto prevKey = prev.getId(); + + for (uint64_t i = 0; i < ID_SIZE; i++) { + if (currKey[i] != prevKey[i]) break; + sharedPrefixBytes++; + } + + return Bound(curr.timestamp, currKey.substr(0, sharedPrefixBytes + 1)); + } + } +}; + + +} + + +template +using Negentropy = negentropy::Negentropy; diff --git a/cpp/negentropy/encoding.h b/cpp/negentropy/encoding.h new file mode 100644 index 0000000..c6d5bd6 --- /dev/null +++ b/cpp/negentropy/encoding.h @@ -0,0 +1,60 @@ +#pragma once + +#include + + +namespace negentropy { + +using err = std::runtime_error; + + + +inline uint8_t getByte(std::string_view &encoded) { + if (encoded.size() < 1) throw negentropy::err("parse ends prematurely"); + uint8_t output = encoded[0]; + encoded = encoded.substr(1); + return output; +} + +inline std::string getBytes(std::string_view &encoded, size_t n) { + if (encoded.size() < n) throw negentropy::err("parse ends prematurely"); + auto res = encoded.substr(0, n); + encoded = encoded.substr(n); + return std::string(res); +}; + +inline uint64_t decodeVarInt(std::string_view &encoded) { + uint64_t res = 0; + + while (1) { + if (encoded.size() == 0) throw negentropy::err("premature end of varint"); + uint64_t byte = encoded[0]; + encoded = encoded.substr(1); + res = (res << 7) | (byte & 0b0111'1111); + if ((byte & 0b1000'0000) == 0) break; + } + + return res; +} + +inline std::string encodeVarInt(uint64_t n) { + if (n == 0) return std::string(1, '\0'); + + std::string o; + + while (n) { + o.push_back(static_cast(n & 0x7F)); + n >>= 7; + } + + std::reverse(o.begin(), o.end()); + + for (size_t i = 0; i < o.size() - 1; i++) { + o[i] |= 0x80; + } + + return o; +} + + +} diff --git a/cpp/negentropy/storage/BTreeLMDB.h b/cpp/negentropy/storage/BTreeLMDB.h new file mode 100644 index 0000000..f36036f --- /dev/null +++ b/cpp/negentropy/storage/BTreeLMDB.h @@ -0,0 +1,146 @@ +#pragma once + +#include + +#include "lmdbxx/lmdb++.h" + +#include "negentropy.h" +#include "negentropy/storage/btree/core.h" + + +namespace negentropy { namespace storage { + +using err = std::runtime_error; +using Node = negentropy::storage::btree::Node; +using NodePtr = negentropy::storage::btree::NodePtr; + + +struct BTreeLMDB : btree::BTreeCore { + lmdb::txn &txn; + lmdb::dbi dbi; + uint64_t treeId; + + struct MetaData { + uint64_t rootNodeId; + uint64_t nextNodeId; + + bool operator==(const MetaData &other) const { + return rootNodeId == other.rootNodeId && nextNodeId == other.nextNodeId; + } + }; + + MetaData metaDataCache; + MetaData origMetaData; + std::map dirtyNodeCache; + + + static lmdb::dbi setupDB(lmdb::txn &txn, std::string_view tableName) { + return lmdb::dbi::open(txn, tableName, MDB_CREATE | MDB_REVERSEKEY); + } + + BTreeLMDB(lmdb::txn &txn, lmdb::dbi dbi, uint64_t treeId) : txn(txn), dbi(dbi), treeId(treeId) { + static_assert(sizeof(MetaData) == 16); + std::string_view v; + bool found = dbi.get(txn, getKey(0), v); + metaDataCache = found ? lmdb::from_sv(v) : MetaData{ 0, 1, }; + origMetaData = metaDataCache; + } + + ~BTreeLMDB() { + flush(); + } + + void flush() { + for (auto &[nodeId, node] : dirtyNodeCache) { + dbi.put(txn, getKey(nodeId), node.sv()); + } + dirtyNodeCache.clear(); + + if (metaDataCache != origMetaData) { + dbi.put(txn, getKey(0), lmdb::to_sv(metaDataCache)); + origMetaData = metaDataCache; + } + } + + + // Interface + + const btree::NodePtr getNodeRead(uint64_t nodeId) { + if (nodeId == 0) return {nullptr, 0}; + + auto res = dirtyNodeCache.find(nodeId); + if (res != dirtyNodeCache.end()) return NodePtr{&res->second, nodeId}; + + std::string_view sv; + bool found = dbi.get(txn, getKey(nodeId), sv); + if (!found) throw err("couldn't find node"); + return NodePtr{(Node*)sv.data(), nodeId}; + } + + btree::NodePtr getNodeWrite(uint64_t nodeId) { + if (nodeId == 0) return {nullptr, 0}; + + { + auto res = dirtyNodeCache.find(nodeId); + if (res != dirtyNodeCache.end()) return NodePtr{&res->second, nodeId}; + } + + std::string_view sv; + bool found = dbi.get(txn, getKey(nodeId), sv); + if (!found) throw err("couldn't find node"); + + auto res = dirtyNodeCache.try_emplace(nodeId); + Node *newNode = &res.first->second; + memcpy(newNode, sv.data(), sizeof(Node)); + + return NodePtr{newNode, nodeId}; + } + + btree::NodePtr makeNode() { + uint64_t nodeId = metaDataCache.nextNodeId++; + auto res = dirtyNodeCache.try_emplace(nodeId); + return NodePtr{&res.first->second, nodeId}; + } + + void deleteNode(uint64_t nodeId) { + if (nodeId == 0) throw err("can't delete metadata"); + dirtyNodeCache.erase(nodeId); + dbi.del(txn, getKey(nodeId)); + } + + uint64_t getRootNodeId() { + return metaDataCache.rootNodeId; + } + + void setRootNodeId(uint64_t newRootNodeId) { + metaDataCache.rootNodeId = newRootNodeId; + } + + // Internal utils + + private: + std::string getKey(uint64_t n) { + uint64_t treeIdCopy = treeId; + + if constexpr (std::endian::native == std::endian::big) { + auto byteswap = [](uint64_t &n) { + uint8_t *first = reinterpret_cast(&n); + uint8_t *last = first + 8; + std::reverse(first, last); + }; + + byteswap(n); + byteswap(treeIdCopy); + } else { + static_assert(std::endian::native == std::endian::little); + } + + std::string k; + k += lmdb::to_sv(treeIdCopy); + k += lmdb::to_sv(n); + return k; + } +}; + + +}} diff --git a/cpp/negentropy/storage/BTreeMem.h b/cpp/negentropy/storage/BTreeMem.h new file mode 100644 index 0000000..4f79d20 --- /dev/null +++ b/cpp/negentropy/storage/BTreeMem.h @@ -0,0 +1,48 @@ +#pragma once + +#include "negentropy.h" +#include "negentropy/storage/btree/core.h" + + +namespace negentropy { namespace storage { + + +struct BTreeMem : btree::BTreeCore { + std::unordered_map _nodeStorageMap; + uint64_t _rootNodeId = 0; // 0 means no root + uint64_t _nextNodeId = 1; + + // Interface + + const btree::NodePtr getNodeRead(uint64_t nodeId) { + if (nodeId == 0) return {nullptr, 0}; + auto res = _nodeStorageMap.find(nodeId); + if (res == _nodeStorageMap.end()) return btree::NodePtr{nullptr, 0}; + return btree::NodePtr{&res->second, nodeId}; + } + + btree::NodePtr getNodeWrite(uint64_t nodeId) { + return getNodeRead(nodeId); + } + + btree::NodePtr makeNode() { + uint64_t nodeId = _nextNodeId++; + _nodeStorageMap.try_emplace(nodeId); + return getNodeRead(nodeId); + } + + void deleteNode(uint64_t nodeId) { + _nodeStorageMap.erase(nodeId); + } + + uint64_t getRootNodeId() { + return _rootNodeId; + } + + void setRootNodeId(uint64_t newRootNodeId) { + _rootNodeId = newRootNodeId; + } +}; + + +}} diff --git a/cpp/negentropy/storage/SubRange.h b/cpp/negentropy/storage/SubRange.h new file mode 100644 index 0000000..20d5676 --- /dev/null +++ b/cpp/negentropy/storage/SubRange.h @@ -0,0 +1,63 @@ +#pragma once + +#include + +#include "negentropy.h" + + + +namespace negentropy { namespace storage { + + +struct SubRange : StorageBase { + StorageBase &base; + size_t baseSize; + size_t subBegin; + size_t subEnd; + size_t subSize; + + SubRange(StorageBase &base, const Bound &lowerBound, const Bound &upperBound) : base(base) { + baseSize = base.size(); + subBegin = lowerBound == Bound(0) ? 0 : base.findLowerBound(0, baseSize, lowerBound); + subEnd = upperBound == Bound(MAX_U64) ? baseSize : base.findLowerBound(subBegin, baseSize, upperBound); + if (subEnd != baseSize && Bound(base.getItem(subEnd)) == upperBound) subEnd++; // instead of upper_bound: OK because items are unique + subSize = subEnd - subBegin; + } + + uint64_t size() { + return subSize; + } + + const Item &getItem(size_t i) { + if (i >= subSize) throw negentropy::err("bad index"); + return base.getItem(subBegin + i); + } + + void iterate(size_t begin, size_t end, std::function cb) { + checkBounds(begin, end); + + base.iterate(subBegin + begin, subBegin + end, [&](const Item &item, size_t index){ + return cb(item, index - subBegin); + }); + } + + size_t findLowerBound(size_t begin, size_t end, const Bound &bound) { + checkBounds(begin, end); + + return std::min(base.findLowerBound(subBegin + begin, subBegin + end, bound) - subBegin, subSize); + } + + Fingerprint fingerprint(size_t begin, size_t end) { + checkBounds(begin, end); + + return base.fingerprint(subBegin + begin, subBegin + end); + } + + private: + void checkBounds(size_t begin, size_t end) { + if (begin > end || end > subSize) throw negentropy::err("bad range"); + } +}; + + +}} diff --git a/cpp/negentropy/storage/Vector.h b/cpp/negentropy/storage/Vector.h new file mode 100644 index 0000000..76f22c7 --- /dev/null +++ b/cpp/negentropy/storage/Vector.h @@ -0,0 +1,88 @@ +#pragma once + +#include "negentropy.h" + + + +namespace negentropy { namespace storage { + + +struct Vector : StorageBase { + std::vector items; + bool sealed = false; + + void insert(uint64_t createdAt, std::string_view id) { + if (sealed) throw negentropy::err("already sealed"); + if (id.size() != ID_SIZE) throw negentropy::err("bad id size for added item"); + items.emplace_back(createdAt, id); + } + + void insertItem(const Item &item) { + insert(item.timestamp, item.getId()); + } + + void seal() { + if (sealed) throw negentropy::err("already sealed"); + sealed = true; + + std::sort(items.begin(), items.end()); + + for (size_t i = 1; i < items.size(); i++) { + if (items[i - 1] == items[i]) throw negentropy::err("duplicate item inserted"); + } + } + + void unseal() { + sealed = false; + } + + uint64_t size() { + checkSealed(); + return items.size(); + } + + const Item &getItem(size_t i) { + checkSealed(); + return items.at(i); + } + + void iterate(size_t begin, size_t end, std::function cb) { + checkSealed(); + checkBounds(begin, end); + + for (auto i = begin; i < end; ++i) { + if (!cb(items[i], i)) break; + } + } + + size_t findLowerBound(size_t begin, size_t end, const Bound &bound) { + checkSealed(); + checkBounds(begin, end); + + return std::lower_bound(items.begin() + begin, items.begin() + end, bound.item) - items.begin(); + } + + Fingerprint fingerprint(size_t begin, size_t end) { + Accumulator out; + out.setToZero(); + + iterate(begin, end, [&](const Item &item, size_t){ + out.add(item); + return true; + }); + + return out.getFingerprint(end - begin); + } + + private: + void checkSealed() { + if (!sealed) throw negentropy::err("not sealed"); + } + + void checkBounds(size_t begin, size_t end) { + if (begin > end || end > items.size()) throw negentropy::err("bad range"); + } +}; + + +}} diff --git a/cpp/negentropy/storage/base.h b/cpp/negentropy/storage/base.h new file mode 100644 index 0000000..44c8db8 --- /dev/null +++ b/cpp/negentropy/storage/base.h @@ -0,0 +1,22 @@ +#pragma once + +#include + +#include "negentropy/types.h" + + +namespace negentropy { + +struct StorageBase { + virtual uint64_t size() = 0; + + virtual const Item &getItem(size_t i) = 0; + + virtual void iterate(size_t begin, size_t end, std::function cb) = 0; + + virtual size_t findLowerBound(size_t begin, size_t end, const Bound &value) = 0; + + virtual Fingerprint fingerprint(size_t begin, size_t end) = 0; +}; + +} diff --git a/cpp/negentropy/storage/btree/core.h b/cpp/negentropy/storage/btree/core.h new file mode 100644 index 0000000..ef25483 --- /dev/null +++ b/cpp/negentropy/storage/btree/core.h @@ -0,0 +1,652 @@ +#pragma once + +#include + +#include "negentropy.h" + + + +namespace negentropy { namespace storage { namespace btree { + +using err = std::runtime_error; + +/* + +Each node contains an array of keys. For leaf nodes, the keys are 0. For non-leaf nodes, these will +be the nodeIds of the children leaves. The items in the keys of non-leaf nodes are the first items +in the corresponding child nodes. + +Except for the right-most nodes in the tree at each level (which includes the root node), all nodes +contain at least MIN_ITEMS and at most MAX_ITEMS. + +If a node falls below MIN_ITEMS, a neighbour node (which always has the same parent) is selected. + * If between the two nodes there are REBALANCE_THRESHOLD or fewer total items, all items are + moved into one node and the other is deleted. + * If there are more than REBALANCE_THRESHOLD total items, then the items are divided into two + approximately equal-sized halves. + +If a node goes above MAX_ITEMS then a new neighbour node is created. + * If the node is the right-most in its level, pack the old node to MAX_ITEMS, and move the rest + into the new neighbour. This optimises space-usage in the case of append workloads. + * Otherwise, split the node into two approximately equal-sized halves. + +*/ + + +#ifdef NE_FUZZ_TEST + +// Fuzz test mode: Causes a large amount of tree structure changes like splitting, moving, and rebalancing + +const size_t MIN_ITEMS = 2; +const size_t REBALANCE_THRESHOLD = 4; +const size_t MAX_ITEMS = 6; + +#else + +// Production mode: Nodes fit into 4k pages, and oscillating insert/erase will not cause tree structure changes + +const size_t MIN_ITEMS = 30; +const size_t REBALANCE_THRESHOLD = 60; +const size_t MAX_ITEMS = 80; + +#endif + +static_assert(MIN_ITEMS < REBALANCE_THRESHOLD); +static_assert(REBALANCE_THRESHOLD < MAX_ITEMS); +static_assert(MAX_ITEMS / 2 > MIN_ITEMS); +static_assert(MIN_ITEMS % 2 == 0 && REBALANCE_THRESHOLD % 2 == 0 && MAX_ITEMS % 2 == 0); + + +struct Key { + Item item; + uint64_t nodeId; + + void setToZero() { + item = Item(); + nodeId = 0; + } +}; + +inline bool operator<(const Key &a, const Key &b) { + return a.item < b.item; +}; + +struct Node { + uint64_t numItems; // Number of items in this Node + uint64_t accumCount; // Total number of items in or under this Node + uint64_t nextSibling; // Pointer to next node in this level + uint64_t prevSibling; // Pointer to previous node in this level + + Accumulator accum; + + Key items[MAX_ITEMS + 1]; + + + Node() { + memset((void*)this, '\0', sizeof(*this)); + } + + std::string_view sv() { + return std::string_view(reinterpret_cast(this), sizeof(*this)); + } +}; + +struct NodePtr { + Node *p; + uint64_t nodeId; + + + bool exists() { + return p != nullptr; + } + + Node &get() const { + return *p; + } +}; + +struct Breadcrumb { + size_t index; + NodePtr nodePtr; +}; + + +struct BTreeCore : StorageBase { + //// Node Storage + + virtual const NodePtr getNodeRead(uint64_t nodeId) = 0; + + virtual NodePtr getNodeWrite(uint64_t nodeId) = 0; + + virtual NodePtr makeNode() = 0; + + virtual void deleteNode(uint64_t nodeId) = 0; + + virtual uint64_t getRootNodeId() = 0; + + virtual void setRootNodeId(uint64_t newRootNodeId) = 0; + + + //// Search + + std::vector searchItem(uint64_t rootNodeId, const Item &newItem, bool &found) { + found = false; + std::vector breadcrumbs; + + auto foundNode = getNodeRead(rootNodeId); + + while (foundNode.nodeId) { + const auto &node = foundNode.get(); + size_t index = node.numItems - 1; + + if (node.numItems > 1) { + for (size_t i = 1; i < node.numItems + 1; i++) { + if (i == node.numItems + 1 || newItem < node.items[i].item) { + index = i - 1; + break; + } + } + } + + if (!found && (newItem == node.items[index].item)) found = true; + + breadcrumbs.push_back({index, foundNode}); + foundNode = getNodeRead(node.items[index].nodeId); + } + + return breadcrumbs; + } + + + //// Insert + + bool insert(uint64_t createdAt, std::string_view id) { + return insertItem(Item(createdAt, id)); + } + + bool insertItem(const Item &newItem) { + // Make root leaf in case it doesn't exist + + auto rootNodeId = getRootNodeId(); + + if (!rootNodeId) { + auto newNodePtr = makeNode(); + auto &newNode = newNodePtr.get(); + + newNode.items[0].item = newItem; + newNode.numItems++; + newNode.accum.add(newItem); + newNode.accumCount = 1; + + setRootNodeId(newNodePtr.nodeId); + return true; + } + + + // Traverse interior nodes, leaving breadcrumbs along the way + + + bool found; + auto breadcrumbs = searchItem(rootNodeId, newItem, found); + + if (found) return false; // already inserted + + + // Follow breadcrumbs back to root + + Key newKey = { newItem, 0 }; + bool needsMerge = true; + + while (breadcrumbs.size()) { + auto crumb = breadcrumbs.back(); + breadcrumbs.pop_back(); + + auto &node = getNodeWrite(crumb.nodePtr.nodeId).get(); + + if (!needsMerge) { + node.accum.add(newItem); + node.accumCount++; + } else if (crumb.nodePtr.get().numItems < MAX_ITEMS) { + // Happy path: Node has room for new item + + node.items[node.numItems] = newKey; + std::inplace_merge(node.items, node.items + node.numItems, node.items + node.numItems + 1); + node.numItems++; + + node.accum.add(newItem); + node.accumCount++; + + needsMerge = false; + } else { + // Node is full: Split it into 2 + + auto &left = node; + auto rightPtr = makeNode(); + auto &right = rightPtr.get(); + + left.items[MAX_ITEMS] = newKey; + std::inplace_merge(left.items, left.items + MAX_ITEMS, left.items + MAX_ITEMS + 1); + + left.accum.setToZero(); + left.accumCount = 0; + + if (!left.nextSibling) { + // If right-most node, pack as tightly as possible to optimise for append workloads + left.numItems = MAX_ITEMS; + right.numItems = 1; + } else { + // Otherwise, split the node equally + left.numItems = (MAX_ITEMS / 2) + 1; + right.numItems = MAX_ITEMS / 2; + } + + for (size_t i = 0; i < left.numItems; i++) { + addToAccum(left.items[i], left); + } + + for (size_t i = 0; i < right.numItems; i++) { + right.items[i] = left.items[left.numItems + i]; + addToAccum(right.items[i], right); + } + + for (size_t i = left.numItems; i < MAX_ITEMS + 1; i++) left.items[i].setToZero(); + + right.nextSibling = left.nextSibling; + left.nextSibling = rightPtr.nodeId; + right.prevSibling = crumb.nodePtr.nodeId; + + if (right.nextSibling) { + auto &rightRight = getNodeWrite(right.nextSibling).get(); + rightRight.prevSibling = rightPtr.nodeId; + } + + newKey = { right.items[0].item, rightPtr.nodeId }; + } + + // Update left-most key, in case item was inserted at the beginning + + refreshIndex(node, 0); + } + + // Out of breadcrumbs but still need to merge: New level required + + if (needsMerge) { + auto &left = getNodeRead(rootNodeId).get(); + auto &right = getNodeRead(newKey.nodeId).get(); + + auto newRootPtr = makeNode(); + auto &newRoot = newRootPtr.get(); + newRoot.numItems = 2; + + newRoot.accum.add(left.accum); + newRoot.accum.add(right.accum); + newRoot.accumCount = left.accumCount + right.accumCount; + + newRoot.items[0] = left.items[0]; + newRoot.items[0].nodeId = rootNodeId; + newRoot.items[1] = right.items[0]; + newRoot.items[1].nodeId = newKey.nodeId; + + setRootNodeId(newRootPtr.nodeId); + } + + return true; + } + + + + /// Erase + + bool erase(uint64_t createdAt, std::string_view id) { + return eraseItem(Item(createdAt, id)); + } + + bool eraseItem(const Item &oldItem) { + auto rootNodeId = getRootNodeId(); + if (!rootNodeId) return false; + + + // Traverse interior nodes, leaving breadcrumbs along the way + + bool found; + auto breadcrumbs = searchItem(rootNodeId, oldItem, found); + if (!found) return false; + + + // Remove from node + + bool needsRemove = true; + bool neighbourRefreshNeeded = false; + + while (breadcrumbs.size()) { + auto crumb = breadcrumbs.back(); + breadcrumbs.pop_back(); + + auto &node = getNodeWrite(crumb.nodePtr.nodeId).get(); + + if (!needsRemove) { + node.accum.sub(oldItem); + node.accumCount--; + } else { + for (size_t i = crumb.index + 1; i < node.numItems; i++) node.items[i - 1] = node.items[i]; + node.numItems--; + node.items[node.numItems].setToZero(); + + node.accum.sub(oldItem); + node.accumCount--; + + needsRemove = false; + } + + + if (crumb.index < node.numItems) refreshIndex(node, crumb.index); + + if (neighbourRefreshNeeded) { + refreshIndex(node, crumb.index + 1); + neighbourRefreshNeeded = false; + } + + + if (node.numItems < MIN_ITEMS && breadcrumbs.size() && breadcrumbs.back().nodePtr.get().numItems > 1) { + auto rebalance = [&](Node &leftNode, Node &rightNode) { + size_t totalItems = leftNode.numItems + rightNode.numItems; + size_t numLeft = (totalItems + 1) / 2; + size_t numRight = totalItems - numLeft; + + Accumulator accum; + accum.setToZero(); + uint64_t accumCount = 0; + + if (rightNode.numItems >= numRight) { + // Move extra from right to left + + size_t numMove = rightNode.numItems - numRight; + + for (size_t i = 0; i < numMove; i++) { + auto &item = rightNode.items[i]; + if (item.nodeId == 0) { + accum.add(item.item); + accumCount++; + } else { + auto &movingNode = getNodeRead(item.nodeId).get(); + accum.add(movingNode.accum); + accumCount += movingNode.accumCount; + } + leftNode.items[leftNode.numItems + i] = item; + } + + ::memmove(rightNode.items, rightNode.items + numMove, (rightNode.numItems - numMove) * sizeof(rightNode.items[0])); + + for (size_t i = numRight; i < rightNode.numItems; i++) rightNode.items[i].setToZero(); + + leftNode.accum.add(accum); + rightNode.accum.sub(accum); + + leftNode.accumCount += accumCount; + rightNode.accumCount -= accumCount; + + neighbourRefreshNeeded = true; + } else { + // Move extra from left to right + + size_t numMove = leftNode.numItems - numLeft; + + ::memmove(rightNode.items + numMove, rightNode.items, rightNode.numItems * sizeof(rightNode.items[0])); + + for (size_t i = 0; i < numMove; i++) { + auto &item = leftNode.items[numLeft + i]; + if (item.nodeId == 0) { + accum.add(item.item); + accumCount++; + } else { + auto &movingNode = getNodeRead(item.nodeId).get(); + accum.add(movingNode.accum); + accumCount += movingNode.accumCount; + } + rightNode.items[i] = item; + } + + for (size_t i = numLeft; i < leftNode.numItems; i++) leftNode.items[i].setToZero(); + + leftNode.accum.sub(accum); + rightNode.accum.add(accum); + + leftNode.accumCount -= accumCount; + rightNode.accumCount += accumCount; + } + + leftNode.numItems = numLeft; + rightNode.numItems = numRight; + }; + + if (breadcrumbs.back().index == 0) { + // Use neighbour to the right + + auto &leftNode = node; + auto &rightNode = getNodeWrite(node.nextSibling).get(); + size_t totalItems = leftNode.numItems + rightNode.numItems; + + if (totalItems <= REBALANCE_THRESHOLD) { + // Move all items into right + + ::memmove(rightNode.items + leftNode.numItems, rightNode.items, sizeof(rightNode.items[0]) * rightNode.numItems); + ::memcpy(rightNode.items, leftNode.items, sizeof(leftNode.items[0]) * leftNode.numItems); + + rightNode.numItems += leftNode.numItems; + rightNode.accumCount += leftNode.accumCount; + rightNode.accum.add(leftNode.accum); + + if (leftNode.prevSibling) getNodeWrite(leftNode.prevSibling).get().nextSibling = leftNode.nextSibling; + rightNode.prevSibling = leftNode.prevSibling; + + leftNode.numItems = 0; + } else { + // Rebalance from left to right + + rebalance(leftNode, rightNode); + } + } else { + // Use neighbour to the left + + auto &leftNode = getNodeWrite(node.prevSibling).get(); + auto &rightNode = node; + size_t totalItems = leftNode.numItems + rightNode.numItems; + + if (totalItems <= REBALANCE_THRESHOLD) { + // Move all items into left + + ::memcpy(leftNode.items + leftNode.numItems, rightNode.items, sizeof(rightNode.items[0]) * rightNode.numItems); + + leftNode.numItems += rightNode.numItems; + leftNode.accumCount += rightNode.accumCount; + leftNode.accum.add(rightNode.accum); + + if (rightNode.nextSibling) getNodeWrite(rightNode.nextSibling).get().prevSibling = rightNode.prevSibling; + leftNode.nextSibling = rightNode.nextSibling; + + rightNode.numItems = 0; + } else { + // Rebalance from right to left + + rebalance(leftNode, rightNode); + } + } + } + + if (node.numItems == 0) { + if (node.prevSibling) getNodeWrite(node.prevSibling).get().nextSibling = node.nextSibling; + if (node.nextSibling) getNodeWrite(node.nextSibling).get().prevSibling = node.prevSibling; + + needsRemove = true; + + deleteNode(crumb.nodePtr.nodeId); + } + } + + if (needsRemove) { + setRootNodeId(0); + } else { + auto &node = getNodeRead(rootNodeId).get(); + + if (node.numItems == 1 && node.items[0].nodeId) { + setRootNodeId(node.items[0].nodeId); + deleteNode(rootNodeId); + } + } + + return true; + } + + + //// Compat with the vector interface + + void seal() { + } + + void unseal() { + } + + + //// Utils + + void refreshIndex(Node &node, size_t index) { + auto childNodePtr = getNodeRead(node.items[index].nodeId); + if (childNodePtr.exists()) { + auto &childNode = childNodePtr.get(); + node.items[index].item = childNode.items[0].item; + } + } + + void addToAccum(const Key &k, Node &node) { + if (k.nodeId == 0) { + node.accum.add(k.item); + node.accumCount++; + } else { + auto nodePtr = getNodeRead(k.nodeId); + node.accum.add(nodePtr.get().accum); + node.accumCount += nodePtr.get().accumCount; + } + } + + void traverseToOffset(size_t index, const std::function &cb, std::function customAccum = nullptr) { + auto rootNodePtr = getNodeRead(getRootNodeId()); + if (!rootNodePtr.exists()) return; + auto &rootNode = rootNodePtr.get(); + + if (index > rootNode.accumCount) throw err("out of range"); + return traverseToOffsetAux(index, rootNode, cb, customAccum); + } + + void traverseToOffsetAux(size_t index, Node &node, const std::function &cb, std::function customAccum) { + if (node.numItems == node.accumCount) { + cb(node, index); + return; + } + + for (size_t i = 0; i < node.numItems; i++) { + auto &child = getNodeRead(node.items[i].nodeId).get(); + if (index < child.accumCount) return traverseToOffsetAux(index, child, cb, customAccum); + index -= child.accumCount; + if (customAccum) customAccum(child); + } + } + + + + //// Interface + + uint64_t size() { + auto rootNodePtr = getNodeRead(getRootNodeId()); + if (!rootNodePtr.exists()) return 0; + auto &rootNode = rootNodePtr.get(); + return rootNode.accumCount; + } + + const Item &getItem(size_t index) { + if (index >= size()) throw err("out of range"); + + Item *out; + traverseToOffset(index, [&](Node &node, size_t index){ + out = &node.items[index].item; + }); + return *out; + } + + void iterate(size_t begin, size_t end, std::function cb) { + checkBounds(begin, end); + + size_t num = end - begin; + + traverseToOffset(begin, [&](Node &node, size_t index){ + Node *currNode = &node; + for (size_t i = 0; i < num; i++) { + if (!cb(currNode->items[index].item, begin + i)) return; + index++; + if (index >= currNode->numItems) { + currNode = getNodeRead(currNode->nextSibling).p; + index = 0; + } + } + }); + } + + size_t findLowerBound(size_t begin, size_t end, const Bound &value) { + checkBounds(begin, end); + + auto rootNodePtr = getNodeRead(getRootNodeId()); + if (!rootNodePtr.exists()) return end; + auto &rootNode = rootNodePtr.get(); + if (value.item <= rootNode.items[0].item) return begin; + return std::min(findLowerBoundAux(value, rootNodePtr, 0), end); + } + + size_t findLowerBoundAux(const Bound &value, NodePtr nodePtr, uint64_t numToLeft) { + if (!nodePtr.exists()) return numToLeft + 1; + + Node &node = nodePtr.get(); + + for (size_t i = 1; i < node.numItems; i++) { + if (value.item <= node.items[i].item) { + return findLowerBoundAux(value, getNodeRead(node.items[i - 1].nodeId), numToLeft); + } else { + if (node.items[i - 1].nodeId) numToLeft += getNodeRead(node.items[i - 1].nodeId).get().accumCount; + else numToLeft++; + } + } + + return findLowerBoundAux(value, getNodeRead(node.items[node.numItems - 1].nodeId), numToLeft); + } + + Fingerprint fingerprint(size_t begin, size_t end) { + checkBounds(begin, end); + + auto getAccumLeftOf = [&](size_t index) { + Accumulator accum; + accum.setToZero(); + + traverseToOffset(index, [&](Node &node, size_t index){ + for (size_t i = 0; i < index; i++) accum.add(node.items[i].item); + }, [&](Node &node){ + accum.add(node.accum); + }); + + return accum; + }; + + auto accum1 = getAccumLeftOf(begin); + auto accum2 = getAccumLeftOf(end); + + accum1.negate(); + accum2.add(accum1); + + return accum2.getFingerprint(end - begin); + } + + private: + void checkBounds(size_t begin, size_t end) { + if (begin > end || end > size()) throw negentropy::err("bad range"); + } +}; + + +}}} diff --git a/cpp/negentropy/storage/btree/debug.h b/cpp/negentropy/storage/btree/debug.h new file mode 100644 index 0000000..2415242 --- /dev/null +++ b/cpp/negentropy/storage/btree/debug.h @@ -0,0 +1,189 @@ +#pragma once + +#include +#include + +#include + +#include "negentropy/storage/btree/core.h" +#include "negentropy/storage/BTreeMem.h" +#include "negentropy/storage/BTreeLMDB.h" + + +namespace negentropy { namespace storage { namespace btree { + + +using err = std::runtime_error; + + +inline void dump(BTreeCore &btree, uint64_t nodeId, int depth) { + if (nodeId == 0) { + if (depth == 0) std::cout << "EMPTY TREE" << std::endl; + return; + } + + auto nodePtr = btree.getNodeRead(nodeId); + auto &node = nodePtr.get(); + std::string indent(depth * 4, ' '); + + std::cout << indent << "NODE id=" << nodeId << " numItems=" << node.numItems << " accum=" << hoytech::to_hex(node.accum.sv()) << " accumCount=" << node.accumCount << std::endl; + + for (size_t i = 0; i < node.numItems; i++) { + std::cout << indent << " item: " << node.items[i].item.timestamp << "," << hoytech::to_hex(node.items[i].item.getId()) << std::endl; + dump(btree, node.items[i].nodeId, depth + 1); + } +} + +inline void dump(BTreeCore &btree) { + dump(btree, btree.getRootNodeId(), 0); +} + + +struct VerifyContext { + std::optional leafDepth; + std::set allNodeIds; + std::vector leafNodeIds; +}; + +inline void verify(BTreeCore &btree, uint64_t nodeId, uint64_t depth, VerifyContext &ctx, Accumulator *accumOut = nullptr, uint64_t *accumCountOut = nullptr) { + if (nodeId == 0) return; + + if (ctx.allNodeIds.contains(nodeId)) throw err("verify: saw node id again"); + ctx.allNodeIds.insert(nodeId); + + auto nodePtr = btree.getNodeRead(nodeId); + auto &node = nodePtr.get(); + + if (node.numItems == 0) throw err("verify: empty node"); + if (node.nextSibling && node.numItems < MIN_ITEMS) throw err("verify: too few items in node"); + if (node.numItems > MAX_ITEMS) throw err("verify: too many items"); + + if (node.items[0].nodeId == 0) { + if (ctx.leafDepth) { + if (*ctx.leafDepth != depth) throw err("verify: mismatch of leaf depth"); + } else { + ctx.leafDepth = depth; + } + + ctx.leafNodeIds.push_back(nodeId); + } + + // FIXME: verify unused items are zeroed + + Accumulator accum; + accum.setToZero(); + uint64_t accumCount = 0; + + for (size_t i = 0; i < node.numItems; i++) { + uint64_t childNodeId = node.items[i].nodeId; + if (childNodeId == 0) { + accum.add(node.items[i].item); + accumCount++; + } else { + { + auto firstChildPtr = btree.getNodeRead(childNodeId); + auto &firstChild = firstChildPtr.get(); + if (firstChild.numItems == 0 || firstChild.items[0].item != node.items[i].item) throw err("verify: key does not match child's first key"); + } + verify(btree, childNodeId, depth + 1, ctx, &accum, &accumCount); + } + + if (i < node.numItems - 1) { + if (!(node.items[i].item < node.items[i + 1].item)) throw err("verify: items out of order"); + } + } + + for (size_t i = node.numItems; i < MAX_ITEMS + 1; i++) { + for (size_t j = 0; j < sizeof(Key); j++) if (((char*)&node.items[i])[j] != '\0') throw err("verify: memory not zeroed out"); + } + + if (accumCount != node.accumCount) throw err("verify: accumCount mismatch"); + if (accum.sv() != node.accum.sv()) throw err("verify: accum mismatch"); + + if (accumOut) accumOut->add(accum); + if (accumCountOut) *accumCountOut += accumCount; +} + +inline void verify(BTreeCore &btree, bool isLMDB) { + VerifyContext ctx; + Accumulator accum; + accum.setToZero(); + uint64_t accumCount = 0; + + verify(btree, btree.getRootNodeId(), 0, ctx, &accum, &accumCount); + + if (ctx.leafNodeIds.size()) { + uint64_t i = 0, totalItems = 0; + auto nodePtr = btree.getNodeRead(ctx.leafNodeIds[0]); + std::optional prevItem; + uint64_t prevSibling = 0; + + while (nodePtr.exists()) { + auto &node = nodePtr.get(); + if (nodePtr.nodeId != ctx.leafNodeIds[i]) throw err("verify: leaf id mismatch"); + + if (prevSibling != node.prevSibling) throw err("verify: prevSibling mismatch"); + prevSibling = nodePtr.nodeId; + + nodePtr = btree.getNodeRead(node.nextSibling); + i++; + + for (size_t j = 0; j < node.numItems; j++) { + if (prevItem && !(*prevItem < node.items[j].item)) throw err("verify: leaf item out of order"); + prevItem = node.items[j].item; + totalItems++; + } + } + + if (totalItems != accumCount) throw err("verify: leaf count mismatch"); + } + + // Check for leaks + + if (isLMDB) { + static_assert(std::endian::native == std::endian::little); // FIXME + + auto &btreeLMDB = dynamic_cast(btree); + btreeLMDB.flush(); + + std::string_view key, val; + + // Leaks + + auto cursor = lmdb::cursor::open(btreeLMDB.txn, btreeLMDB.dbi); + + if (cursor.get(key, val, MDB_FIRST)) { + do { + uint64_t nodeId = lmdb::from_sv(key.substr(8)); + if (nodeId != 0 && !ctx.allNodeIds.contains(nodeId)) throw err("verify: memory leak"); + } while (cursor.get(key, val, MDB_NEXT)); + } + + // Dangling + + for (const auto &k : ctx.allNodeIds) { + std::string tpKey; + tpKey += lmdb::to_sv(btreeLMDB.treeId); + tpKey += lmdb::to_sv(k); + if (!btreeLMDB.dbi.get(btreeLMDB.txn, tpKey, val)) throw err("verify: dangling node"); + } + } else { + auto &btreeMem = dynamic_cast(btree); + + // Leaks + + for (const auto &[k, v] : btreeMem._nodeStorageMap) { + if (!ctx.allNodeIds.contains(k)) throw err("verify: memory leak"); + } + + // Dangling + + for (const auto &k : ctx.allNodeIds) { + if (!btreeMem._nodeStorageMap.contains(k)) throw err("verify: dangling node"); + } + } +} + + + +}}} diff --git a/cpp/negentropy/types.h b/cpp/negentropy/types.h new file mode 100644 index 0000000..ce9d9dd --- /dev/null +++ b/cpp/negentropy/types.h @@ -0,0 +1,184 @@ +// (C) 2023 Doug Hoyte. MIT license + +#pragma once + +#include + + +namespace negentropy { + +using err = std::runtime_error; + +const size_t ID_SIZE = 32; +const size_t FINGERPRINT_SIZE = 16; + + +enum class Mode { + Skip = 0, + Fingerprint = 1, + IdList = 2, +}; + + +struct Item { + uint64_t timestamp; + uint8_t id[ID_SIZE]; + + explicit Item(uint64_t timestamp = 0) : timestamp(timestamp) { + memset(id, '\0', sizeof(id)); + } + + explicit Item(uint64_t timestamp, std::string_view id_) : timestamp(timestamp) { + if (id_.size() != sizeof(id)) throw negentropy::err("bad id size for Item"); + memcpy(id, id_.data(), sizeof(id)); + } + + std::string_view getId() const { + return std::string_view(reinterpret_cast(id), sizeof(id)); + } + + bool operator==(const Item &other) const { + return timestamp == other.timestamp && getId() == other.getId(); + } +}; + +inline bool operator<(const Item &a, const Item &b) { + return a.timestamp != b.timestamp ? a.timestamp < b.timestamp : a.getId() < b.getId(); +}; + +inline bool operator<=(const Item &a, const Item &b) { + return a.timestamp != b.timestamp ? a.timestamp <= b.timestamp : a.getId() <= b.getId(); +}; + + +struct Bound { + Item item; + size_t idLen; + + explicit Bound(uint64_t timestamp = 0, std::string_view id = "") : item(timestamp), idLen(id.size()) { + if (idLen > ID_SIZE) throw negentropy::err("bad id size for Bound"); + memcpy(item.id, id.data(), idLen); + } + + explicit Bound(const Item &item_) : item(item_), idLen(ID_SIZE) {} + + bool operator==(const Bound &other) const { + return item == other.item; + } +}; + +inline bool operator<(const Bound &a, const Bound &b) { + return a.item < b.item; +}; + + +struct Fingerprint { + uint8_t buf[FINGERPRINT_SIZE]; + + std::string_view sv() const { + return std::string_view(reinterpret_cast(buf), sizeof(buf)); + } +}; + +struct Accumulator { + uint8_t buf[ID_SIZE]; + + void setToZero() { + memset(buf, '\0', sizeof(buf)); + } + + void add(const Item &item) { + add(item.id); + } + + void add(const Accumulator &acc) { + add(acc.buf); + } + + void add(const uint8_t *otherBuf) { + uint64_t currCarry = 0, nextCarry = 0; + uint64_t *p = reinterpret_cast(buf); + const uint64_t *po = reinterpret_cast(otherBuf); + + auto byteswap = [](uint64_t &n) { + uint8_t *first = reinterpret_cast(&n); + uint8_t *last = first + 8; + std::reverse(first, last); + }; + + for (size_t i = 0; i < 4; i++) { + uint64_t orig = p[i]; + uint64_t otherV = po[i]; + + if constexpr (std::endian::native == std::endian::big) { + byteswap(orig); + byteswap(otherV); + } else { + static_assert(std::endian::native == std::endian::little); + } + + uint64_t next = orig; + + next += currCarry; + if (next < orig) nextCarry = 1; + + next += otherV; + if (next < otherV) nextCarry = 1; + + if constexpr (std::endian::native == std::endian::big) { + byteswap(next); + } + + p[i] = next; + currCarry = nextCarry; + nextCarry = 0; + } + } + + void negate() { + for (size_t i = 0; i < sizeof(buf); i++) { + buf[i] = ~buf[i]; + } + + Accumulator one; + one.setToZero(); + one.buf[0] = 1; + add(one.buf); + } + + void sub(const Item &item) { + sub(item.id); + } + + void sub(const Accumulator &acc) { + sub(acc.buf); + } + + void sub(const uint8_t *otherBuf) { + Accumulator neg; + memcpy(neg.buf, otherBuf, sizeof(buf)); + neg.negate(); + add(neg); + } + + std::string_view sv() const { + return std::string_view(reinterpret_cast(buf), sizeof(buf)); + } + + Fingerprint getFingerprint(uint64_t n) { + std::string input; + input += sv(); + input += encodeVarInt(n); + + unsigned char hash[SHA256_DIGEST_LENGTH]; + SHA256(reinterpret_cast(input.data()), input.size(), hash); + + Fingerprint out; + memcpy(out.buf, hash, FINGERPRINT_SIZE); + + return out; + } +}; + + +} diff --git a/test/cpp/.gitignore b/test/cpp/.gitignore index 29bbe76..55f70e1 100644 --- a/test/cpp/.gitignore +++ b/test/cpp/.gitignore @@ -1 +1,7 @@ /harness +/btreeFuzz +/measureSpaceUsage +/lmdbTest +/subRange + +/testdb/ diff --git a/test/cpp/Makefile b/test/cpp/Makefile index 686d0f6..db2070e 100644 --- a/test/cpp/Makefile +++ b/test/cpp/Makefile @@ -1,2 +1,30 @@ -harness: harness.cpp ../../cpp/Negentropy.h - g++ -g -Wall -std=c++20 -I../../cpp/ -I ./hoytech-cpp/ harness.cpp -lcrypto -o harness +W = -Wall +OPT = -g -O2 +STD = -std=c++20 +CXXFLAGS = $(STD) $(OPT) $(W) -fPIC $(XCXXFLAGS) +INCS = -I../../cpp/ -I./hoytech-cpp/ -Ilmdbxx/include/ + +DEPS = ../../cpp/negentropy.h ../../cpp/negentropy/* ../../cpp/negentropy/storage/* ../../cpp/negentropy/storage/btree/* + +harness: harness.cpp + $(CXX) $(W) $(OPT) $(STD) $(INCS) $< -lcrypto -o $@ + +btreeFuzz: btreeFuzz.cpp + $(CXX) $(W) $(OPT) $(STD) $(INCS) $< -lcrypto -llmdb -o $@ + +lmdbTest: lmdbTest.cpp + $(CXX) $(W) $(OPT) $(STD) $(INCS) $< -lcrypto -llmdb -o $@ + +measureSpaceUsage: measureSpaceUsage.cpp + $(CXX) -DNE_FUZZ_TEST $(W) $(OPT) $(STD) $(INCS) $< -lcrypto -llmdb -o $@ + +subRange: subRange.cpp + $(CXX) -DNE_FUZZ_TEST $(W) $(OPT) $(STD) $(INCS) $< -lcrypto -o $@ + + +.PHONY: all clean + +all: harness btreeFuzz lmdbTest measureSpaceUsage subRange + +clean: + rm -f harness btreeFuzz lmdbTest measureSpaceUsage diff --git a/test/cpp/btreeFuzz.cpp b/test/cpp/btreeFuzz.cpp new file mode 100644 index 0000000..f2ed53e --- /dev/null +++ b/test/cpp/btreeFuzz.cpp @@ -0,0 +1,149 @@ +#include +#include + +#include +#include +#include + +#include +#include + +#include "negentropy.h" +#include "negentropy/storage/BTreeLMDB.h" +#include "negentropy/storage/BTreeMem.h" +#include "negentropy/storage/btree/debug.h" + + + + +struct Verifier { + bool isLMDB; + + std::set addedTimestamps; + + Verifier(bool isLMDB) : isLMDB(isLMDB) {} + + void insert(negentropy::storage::btree::BTreeCore &btree, uint64_t timestamp){ + negentropy::Item item(timestamp, std::string(32, (unsigned char)(timestamp % 256))); + btree.insertItem(item); + addedTimestamps.insert(timestamp); + doVerify(btree); + } + + void erase(negentropy::storage::btree::BTreeCore &btree, uint64_t timestamp){ + negentropy::Item item(timestamp, std::string(32, (unsigned char)(timestamp % 256))); + btree.eraseItem(item); + addedTimestamps.erase(timestamp); + doVerify(btree); + } + + void doVerify(negentropy::storage::btree::BTreeCore &btree) { + try { + negentropy::storage::btree::verify(btree, isLMDB); + } catch (...) { + std::cout << "TREE FAILED INVARIANTS:" << std::endl; + negentropy::storage::btree::dump(btree); + throw; + } + + if (btree.size() != addedTimestamps.size()) throw negentropy::err("verify size mismatch"); + auto iter = addedTimestamps.begin(); + + btree.iterate(0, btree.size(), [&](const auto &item, size_t i) { + if (item.timestamp != *iter) throw negentropy::err("verify element mismatch"); + iter = std::next(iter); + return true; + }); + } +}; + + + + + +void doFuzz(negentropy::storage::btree::BTreeCore &btree, Verifier &v) { + if (btree.size() != 0) throw negentropy::err("expected empty tree"); + + + // Verify return values + + if (!btree.insert(100, std::string(32, '\x01'))) throw negentropy::err("didn't insert element?"); + if (btree.insert(100, std::string(32, '\x01'))) throw negentropy::err("double inserted element?"); + if (!btree.erase(100, std::string(32, '\x01'))) throw negentropy::err("didn't erase element?"); + if (btree.erase(100, std::string(32, '\x01'))) throw negentropy::err("erased non-existing element?"); + + + // Fuzz test: Insertion phase + + while (btree.size() < 5000) { + if (rand() % 3 <= 1) { + int timestamp; + + do { + timestamp = rand(); + } while (v.addedTimestamps.contains(timestamp)); + + std::cout << "INSERT " << timestamp << " size = " << btree.size() << std::endl; + v.insert(btree, timestamp); + } else if (v.addedTimestamps.size()) { + auto it = v.addedTimestamps.begin(); + std::advance(it, rand() % v.addedTimestamps.size()); + + std::cout << "DEL " << (*it) << std::endl; + v.erase(btree, *it); + } + } + + // Fuzz test: Removal phase + + std::cout << "REMOVING ALL" << std::endl; + + while (btree.size()) { + auto it = v.addedTimestamps.begin(); + std::advance(it, rand() % v.addedTimestamps.size()); + auto timestamp = *it; + + std::cout << "DEL " << timestamp << " size = " << btree.size() << std::endl; + v.erase(btree, *it); + } +} + + + +int main() { + std::cout << "SIZEOF NODE: " << sizeof(negentropy::storage::Node) << std::endl; + + + srand(0); + + + if (::getenv("NE_FUZZ_LMDB")) { + system("mkdir -p testdb/"); + system("rm -f testdb/*"); + + auto env = lmdb::env::create(); + env.set_max_dbs(64); + env.set_mapsize(1'000'000'000ULL); + env.open("testdb/", 0); + + auto txn = lmdb::txn::begin(env); + auto btreeDbi = negentropy::storage::BTreeLMDB::setupDB(txn, "test-data"); + + negentropy::storage::BTreeLMDB btree(txn, btreeDbi, 0); + + Verifier v(true); + doFuzz(btree, v); + + btree.flush(); + txn.commit(); + } else { + Verifier v(false); + negentropy::storage::BTreeMem btree; + doFuzz(btree, v); + } + + + std::cout << "OK" << std::endl; + + return 0; +} diff --git a/test/cpp/check.sh b/test/cpp/check.sh new file mode 100755 index 0000000..729eeb3 --- /dev/null +++ b/test/cpp/check.sh @@ -0,0 +1,9 @@ +#!/bin/sh + +make clean +make -j all + +./btreeFuzz +NE_FUZZ_LMDB=1 ./btreeFuzz +./lmdbTest +./subRange diff --git a/test/cpp/harness.cpp b/test/cpp/harness.cpp index 1912c00..5380d36 100644 --- a/test/cpp/harness.cpp +++ b/test/cpp/harness.cpp @@ -1,10 +1,13 @@ #include #include +#include #include #include -#include "Negentropy.h" +#include "negentropy.h" +#include "negentropy/storage/BTreeMem.h" +#include "negentropy/storage/Vector.h" @@ -23,12 +26,11 @@ std::vector split(const std::string &s, char delim) { int main() { - const uint64_t idSize = 16; - uint64_t frameSizeLimit = 0; if (::getenv("FRAMESIZELIMIT")) frameSizeLimit = std::stoull(::getenv("FRAMESIZELIMIT")); - Negentropy ne(idSize, frameSizeLimit); + negentropy::storage::Vector storage; + std::unique_ptr> ne; std::string line; while (std::cin) { @@ -41,20 +43,21 @@ int main() { if (items.size() != 3) throw hoytech::error("wrong num of fields"); uint64_t created = std::stoull(items[1]); auto id = hoytech::from_hex(items[2]); - ne.addItem(created, id); + storage.insert(created, id); } else if (items[0] == "seal") { - ne.seal(); + storage.seal(); + ne = std::make_unique>(storage, frameSizeLimit); } else if (items[0] == "initiate") { - auto q = ne.initiate(); - if (frameSizeLimit && q.size() > frameSizeLimit) throw hoytech::error("frameSizeLimit exceeded"); + auto q = ne->initiate(); + if (frameSizeLimit && q.size() > frameSizeLimit) throw hoytech::error("initiate frameSizeLimit exceeded: ", q.size(), " > ", frameSizeLimit); std::cout << "msg," << hoytech::to_hex(q) << std::endl; } else if (items[0] == "msg") { std::string q; if (items.size() >= 2) q = hoytech::from_hex(items[1]); - if (ne.isInitiator) { + if (ne->isInitiator) { std::vector have, need; - auto resp = ne.reconcile(q, have, need); + auto resp = ne->reconcile(q, have, need); for (auto &id : have) std::cout << "have," << hoytech::to_hex(id) << "\n"; for (auto &id : need) std::cout << "need," << hoytech::to_hex(id) << "\n"; @@ -66,10 +69,10 @@ int main() { q = *resp; } else { - q = ne.reconcile(q); + q = ne->reconcile(q); } - if (frameSizeLimit && q.size() > frameSizeLimit) throw hoytech::error("frameSizeLimit exceeded"); + if (frameSizeLimit && q.size() > frameSizeLimit) throw hoytech::error("frameSizeLimit exceeded: ", q.size(), " > ", frameSizeLimit, ": from ", (ne->isInitiator ? "initiator" : "non-initiator")); std::cout << "msg," << hoytech::to_hex(q) << std::endl; } else { throw hoytech::error("unknown cmd: ", items[0]); diff --git a/test/cpp/lmdbTest.cpp b/test/cpp/lmdbTest.cpp new file mode 100644 index 0000000..f4026a8 --- /dev/null +++ b/test/cpp/lmdbTest.cpp @@ -0,0 +1,157 @@ +#include +#include + +#include +#include +#include + +#include +#include + +#include "negentropy.h" +#include "negentropy/storage/BTreeLMDB.h" +#include "negentropy/storage/BTreeMem.h" +#include "negentropy/storage/btree/debug.h" +#include "negentropy/storage/Vector.h" + + + + + +int main() { + system("mkdir -p testdb/"); + system("rm -f testdb/*"); + + auto env = lmdb::env::create(); + env.set_max_dbs(64); + env.open("testdb/", 0); + + + lmdb::dbi btreeDbi; + + { + auto txn = lmdb::txn::begin(env); + btreeDbi = negentropy::storage::BTreeLMDB::setupDB(txn, "test-data"); + txn.commit(); + } + + negentropy::storage::Vector vec; + + + auto packId = [](uint64_t n){ + auto o = std::string(32, '\0'); + memcpy((char*)o.data(), (char*)&n, sizeof(n)); + return o; + }; + + auto unpackId = [](std::string_view n){ + if (n.size() != 32) throw hoytech::error("too short to unpack"); + return *(uint64_t*)n.data(); + }; + + + { + auto txn = lmdb::txn::begin(env); + negentropy::storage::BTreeLMDB btree(txn, btreeDbi, 300); + + auto add = [&](uint64_t timestamp){ + negentropy::Item item(timestamp, packId(timestamp)); + btree.insertItem(item); + vec.insertItem(item); + }; + + for (size_t i = 1000; i < 2000; i += 2) add(i); + + btree.flush(); + + txn.commit(); + } + + vec.seal(); + + + + + { + auto txn = lmdb::txn::begin(env, 0, MDB_RDONLY); + negentropy::storage::BTreeLMDB btree(txn, btreeDbi, 300); + //negentropy::storage::btree::dump(btree); + negentropy::storage::btree::verify(btree, true); + } + + + + // Identical + + { + auto txn = lmdb::txn::begin(env, 0, MDB_RDONLY); + negentropy::storage::BTreeLMDB btree(txn, btreeDbi, 300); + + auto ne1 = Negentropy(vec); + auto ne2 = Negentropy(btree); + + auto q = ne1.initiate(); + + std::string q2 = ne2.reconcile(q); + + std::vector have, need; + auto q3 = ne1.reconcile(q2, have, need); + if (q3 || have.size() || need.size()) throw hoytech::error("bad reconcile 1"); + } + + + // Make some modifications + + { + auto txn = lmdb::txn::begin(env); + negentropy::storage::BTreeLMDB btree(txn, btreeDbi, 300); + + btree.erase(1044, packId(1044)); + btree.erase(1838, packId(1838)); + + btree.insert(1555, packId(1555)); + btree.insert(99999, packId(99999)); + + btree.flush(); + txn.commit(); + } + + + // Reconcile again + + { + auto txn = lmdb::txn::begin(env, 0, MDB_RDONLY); + negentropy::storage::BTreeLMDB btree(txn, btreeDbi, 300); + + auto ne1 = Negentropy(vec); + auto ne2 = Negentropy(btree); + + std::vector allHave, allNeed; + + std::string msg = ne1.initiate(); + + while (true) { + std::string response = ne2.reconcile(msg); + + std::vector have, need; + auto newMsg = ne1.reconcile(response, have, need); + + for (const auto &id : have) allHave.push_back(unpackId(id)); + for (const auto &id : need) allNeed.push_back(unpackId(id)); + + if (!newMsg) break; // done + msg = *newMsg; + } + + std::sort(allHave.begin(), allHave.end()); + std::sort(allNeed.begin(), allNeed.end()); + + if (allHave != std::vector({ 1044, 1838 })) throw hoytech::error("bad allHave"); + if (allNeed != std::vector({ 1555, 99999 })) throw hoytech::error("bad allNeed"); + } + + + std::cout << "OK" << std::endl; + + return 0; +} diff --git a/test/cpp/lmdbxx b/test/cpp/lmdbxx new file mode 160000 index 0000000..d649a58 --- /dev/null +++ b/test/cpp/lmdbxx @@ -0,0 +1 @@ +Subproject commit d649a581d3cebfe7d8bd4d345bc2c1c4c2cc59a2 diff --git a/test/cpp/measureSpaceUsage.cpp b/test/cpp/measureSpaceUsage.cpp new file mode 100644 index 0000000..aaacab4 --- /dev/null +++ b/test/cpp/measureSpaceUsage.cpp @@ -0,0 +1,78 @@ +#include +#include + +#include +#include + +#include +#include + +#include "negentropy.h" +#include "negentropy/storage/BTreeLMDB.h" +#include "negentropy/storage/BTreeMem.h" +#include "negentropy/storage/btree/debug.h" +#include "negentropy/storage/Vector.h" + + + + + + +int main() { + system("mkdir -p testdb/"); + system("rm -f testdb/*"); + + auto env = lmdb::env::create(); + env.set_max_dbs(64); + env.set_mapsize(1'000'000'000ULL); + env.open("testdb/", 0); + + + lmdb::dbi btreeDbi; + + { + auto txn = lmdb::txn::begin(env); + btreeDbi = negentropy::storage::BTreeLMDB::setupDB(txn, "test-data"); + txn.commit(); + } + + + { + auto txn = lmdb::txn::begin(env); + negentropy::storage::BTreeLMDB btree(txn, btreeDbi, 300); + + auto add = [&](uint64_t timestamp){ + negentropy::Item item(timestamp, std::string(32, '\x01')); + btree.insertItem(item); + }; + + for (size_t i = 1; i < 100'000; i++) add(i); + + btree.flush(); + txn.commit(); + } + + { + auto txn = lmdb::txn::begin(env, 0, MDB_RDONLY); + negentropy::storage::BTreeLMDB btree(txn, btreeDbi, 300); + + auto cursor = lmdb::cursor::open(txn, btreeDbi); + + std::string_view key, val; + size_t minStart = negentropy::MAX_U64; + size_t maxEnd = 0; + + if (cursor.get(key, val, MDB_FIRST)) { + do { + size_t ptrStart = (size_t)val.data(); + size_t ptrEnd = ptrStart + sizeof(negentropy::storage::btree::Node); + if (ptrStart < minStart) minStart = ptrStart; + if (ptrEnd > maxEnd) maxEnd = ptrEnd; + } while (cursor.get(key, val, MDB_NEXT)); + } + + std::cout << "data," << negentropy::storage::btree::MAX_ITEMS << "," << sizeof(negentropy::storage::btree::Node) << "," << (maxEnd - minStart) << std::endl; + } + + return 0; +} diff --git a/test/cpp/measureSpaceUsage.pl b/test/cpp/measureSpaceUsage.pl new file mode 100644 index 0000000..6c8b891 --- /dev/null +++ b/test/cpp/measureSpaceUsage.pl @@ -0,0 +1,9 @@ +system(qq{ perl -pi -e 's/MIN_ITEMS = \\d+/MIN_ITEMS = 2/' ../../cpp/negentropy/storage/btree/core.h }); +system(qq{ perl -pi -e 's/REBALANCE_THRESHOLD = \\d+/REBALANCE_THRESHOLD = 4/' ../../cpp/negentropy/storage/btree/core.h }); +system(qq{ perl -pi -e 's/MAX_ITEMS = \\d+/MAX_ITEMS = 6/' ../../cpp/negentropy/storage/btree/core.h }); + +for (my $i = 6; $i < 128; $i += 2) { + print "DOING ITER $i\n"; + system(qq{ perl -pi -e 's/MAX_ITEMS = \\d+/MAX_ITEMS = $i/' ../../cpp/negentropy/storage/btree/core.h }); + system("rm -f measureSpaceUsage && make measureSpaceUsage && rm -f testdb/data.mdb && ./measureSpaceUsage >> measureSpaceUsage.log"); +} diff --git a/test/cpp/subRange.cpp b/test/cpp/subRange.cpp new file mode 100644 index 0000000..426f284 --- /dev/null +++ b/test/cpp/subRange.cpp @@ -0,0 +1,165 @@ +#include +#include + +#include + +#include +#include + +#include "negentropy.h" +#include "negentropy/storage/Vector.h" +#include "negentropy/storage/BTreeMem.h" +#include "negentropy/storage/SubRange.h" + + + +std::string sha256(std::string_view input) { + unsigned char hash[SHA256_DIGEST_LENGTH]; + SHA256(reinterpret_cast(input.data()), input.size(), hash); + return std::string((const char*)&hash[0], SHA256_DIGEST_LENGTH); +} + +std::string uintToId(uint64_t id) { + return sha256(std::string((char*)&id, 8)); +} + + +template +void testSubRange() { + T vecBig; + T vecSmall; + + for (size_t i = 0; i < 1000; i++) { + vecBig.insert(100 + i, uintToId(i)); + } + + for (size_t i = 400; i < 600; i++) { + vecSmall.insert(100 + i, uintToId(i)); + } + + vecBig.seal(); + vecSmall.seal(); + + negentropy::storage::SubRange subRange(vecBig, negentropy::Bound(100 + 400), negentropy::Bound(100 + 600)); + + if (vecSmall.size() != subRange.size()) throw hoytech::error("size mismatch"); + + if (vecSmall.fingerprint(0, vecSmall.size()).sv() != subRange.fingerprint(0, subRange.size()).sv()) throw hoytech::error("fingerprint mismatch"); + + if (vecSmall.getItem(10) != subRange.getItem(10)) throw hoytech::error("getItem mismatch"); + if (vecBig.getItem(400 + 10) != subRange.getItem(10)) throw hoytech::error("getItem mismatch"); + + { + auto lb = subRange.findLowerBound(0, subRange.size(), negentropy::Bound(550)); + auto lb2 = vecSmall.findLowerBound(0, vecSmall.size(), negentropy::Bound(550)); + if (lb != lb2) throw hoytech::error("findLowerBound mismatch"); + } + + { + auto lb = subRange.findLowerBound(0, subRange.size(), negentropy::Bound(20)); + auto lb2 = vecSmall.findLowerBound(0, vecSmall.size(), negentropy::Bound(20)); + if (lb != lb2) throw hoytech::error("findLowerBound mismatch"); + } + + { + auto lb = subRange.findLowerBound(0, subRange.size(), negentropy::Bound(5000)); + auto lb2 = vecSmall.findLowerBound(0, vecSmall.size(), negentropy::Bound(5000)); + if (lb != lb2) throw hoytech::error("findLowerBound mismatch"); + } +} + + + +template +void testSync(bool emptySide1, bool emptySide2) { + T vecBig; + T vecSmall; + + std::set expectedHave; + std::set expectedNeed; + + size_t const lowerLimit = 20'000; + size_t const upperLimit = 90'000; + + for (size_t i = lowerLimit; i < upperLimit; i++) { + auto id = uintToId(i); + if (emptySide1 || i % 15'000 == 0) { + if (i >= lowerLimit && i < upperLimit) expectedNeed.insert(id); + continue; + } + vecSmall.insert(100 + i, id); + } + + for (size_t i = 0; i < 100'000; i++) { + auto id = uintToId(i); + if (emptySide2 || i % 22'000 == 0) { + if (i >= lowerLimit && i < upperLimit) expectedHave.insert(id); + continue; + } + vecBig.insert(100 + i, id); + } + + // Get rid of common + + std::set commonItems; + + for (const auto &item : expectedHave) { + if (expectedNeed.contains(item)) commonItems.insert(item); + } + + for (const auto &item : commonItems) { + expectedHave.erase(item); + expectedNeed.erase(item); + } + + + vecBig.seal(); + vecSmall.seal(); + + negentropy::storage::SubRange subRange(vecBig, negentropy::Bound(100 + lowerLimit), negentropy::Bound(100 + upperLimit)); + + + auto ne1 = Negentropy(vecSmall, 20'000); + auto ne2 = Negentropy(subRange, 20'000); + + std::string msg = ne1.initiate(); + + while (true) { + msg = ne2.reconcile(msg); + + std::vector have, need; + auto newMsg = ne1.reconcile(msg, have, need); + + for (const auto &item : have) { + if (!expectedHave.contains(item)) throw hoytech::error("unexpected have: ", hoytech::to_hex(item)); + expectedHave.erase(item); + } + + for (const auto &item : need) { + if (!expectedNeed.contains(item)) throw hoytech::error("unexpected need: ", hoytech::to_hex(item)); + expectedNeed.erase(item); + } + + if (!newMsg) break; + else std::swap(msg, *newMsg); + } + + if (expectedHave.size()) throw hoytech::error("missed have"); + if (expectedNeed.size()) throw hoytech::error("missed need"); +} + + + + +int main() { + testSubRange(); + testSubRange(); + + testSync(false, false); + testSync(true, false); + testSync(false, true); + + std::cout << "OK" << std::endl; + + return 0; +} diff --git a/test/fuzz.pl b/test/fuzz.pl index f737632..8530d96 100755 --- a/test/fuzz.pl +++ b/test/fuzz.pl @@ -12,9 +12,9 @@ use Utils; die "usage: $0 " if @ARGV < 2; my $harnessCmd1 = Utils::harnessTypeToCmd(shift) || die "please provide harness type (cpp, js, etc)"; my $harnessCmd2 = Utils::harnessTypeToCmd(shift) || die "please provide harness type (cpp, js, etc)"; -my $idSize = shift || 16; +my $idSize = 32; srand($ENV{SEED} || 0); my $stgen = Session::Token->new(seed => "\x00" x 1024, alphabet => '0123456789abcdef', length => $idSize * 2); diff --git a/test/protoversion.pl b/test/protoversion.pl index fecba34..8feb172 100755 --- a/test/protoversion.pl +++ b/test/protoversion.pl @@ -11,20 +11,19 @@ use Utils; die "usage: $0 " if @ARGV < 1; my $harnessCmd = Utils::harnessTypeToCmd(shift) || die "please provide harness type (cpp, js, etc)"; -my $idSize = shift || 16; my $expectedResp; -## Get expected response using protocol version 0 +## Get expected response using protocol version 1 { my ($infile, $outfile); my $pid = open2($outfile, $infile, $harnessCmd); - print $infile "item,12345,eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee\n"; + print $infile "item,12345,eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee\n"; print $infile "seal\n"; - print $infile "msg,6000000200\n"; ## full range bound, empty IdList + print $infile "msg,6100000200\n"; ## full range bound, empty IdList my $resp = <$outfile>; chomp $resp; @@ -32,31 +31,29 @@ my $expectedResp; $expectedResp = $resp; } -## Client tries to use some hypothetical newer version, but falls back to version 0 +## Client tries to use some hypothetical newer version, but falls back to version 1 { my ($infile, $outfile); my $pid = open2($outfile, $infile, $harnessCmd); - print $infile "item,12345,eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee\n"; + print $infile "item,12345,eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee\n"; print $infile "seal\n"; - print $infile "msg,61aabbccddeeff\n"; ## some new protocol message + print $infile "msg,62aabbccddeeff\n"; ## some new protocol message my $resp = <$outfile>; chomp $resp; - ## 61: The bound timestamp, as varint. The value is 0x60 (preferred protocol version), but 1 is added as per timestamp protocol - ## 00: The following ID has length 0 - ## 04: UnsupportedProtocolVersion mode - die "bad upgrade response: $resp" unless $resp eq "msg,610004"; + ## 61: Preferred protocol version + die "bad upgrade response: $resp" unless $resp eq "msg,61"; - ## Try again with protocol version 0 - print $infile "msg,6000000200\n"; ## full range bound, empty IdList + ## Try again with protocol version 1 + print $infile "msg,6100000200\n"; ## full range bound, empty IdList $resp = <$outfile>; chomp $resp; - die "didn't fall back to protocol version 0: $resp" unless $resp eq $expectedResp; + die "didn't fall back to protocol version 1: $resp" unless $resp eq $expectedResp; } print "OK\n";