cpp: implement protocol version 1

This commit is contained in:
Doug Hoyte 2023-10-20 15:25:40 -04:00
parent ef8edf52c4
commit 304779371c
25 changed files with 2550 additions and 536 deletions

3
.gitmodules vendored
View File

@ -1,3 +1,6 @@
[submodule "test/cpp/hoytech-cpp"]
path = test/cpp/hoytech-cpp
url = https://github.com/hoytech/hoytech-cpp.git
[submodule "test/cpp/lmdbxx"]
path = test/cpp/lmdbxx
url = https://github.com/hoytech/lmdbxx.git

View File

@ -1,507 +0,0 @@
// (C) 2023 Doug Hoyte. MIT license
#pragma once
#include <string.h>
#include <string>
#include <string_view>
#include <vector>
#include <deque>
#include <unordered_set>
#include <limits>
#include <algorithm>
#include <stdexcept>
#include <optional>
#include <openssl/sha.h>
namespace negentropy {
const uint64_t PROTOCOL_VERSION_0 = 0x60;
const uint64_t MAX_U64 = std::numeric_limits<uint64_t>::max();
using err = std::runtime_error;
enum class Mode {
Skip = 0,
Fingerprint = 1,
IdList = 2,
Continuation = 3,
UnsupportedProtocolVersion = 4,
};
struct Item {
uint64_t timestamp;
uint64_t idSize;
char id[32];
Item(uint64_t timestamp = 0) : timestamp(timestamp), idSize(0) {
memset(id, '\0', sizeof(id));
}
Item(uint64_t timestamp, std::string_view id_) : timestamp(timestamp), idSize(id_.size()) {
if (id_.size() > 32) throw negentropy::err("id too big");
memset(id, '\0', sizeof(id));
memcpy(id, id_.data(), id_.size());
}
std::string_view getId() const {
return std::string_view(id, idSize);
}
bool operator==(const Item &other) const {
return timestamp == other.timestamp && getId() == other.getId();
}
};
inline bool operator<(const Item &a, const Item &b) {
return a.timestamp != b.timestamp ? a.timestamp < b.timestamp : a.getId() < b.getId();
};
struct Negentropy {
uint64_t idSize;
uint64_t frameSizeLimit;
struct OutputRange {
Item start;
Item end;
std::string payload;
};
std::vector<Item> addedItems;
std::vector<uint64_t> itemTimestamps;
std::string itemIds;
bool sealed = false;
bool isInitiator = false;
bool didHandshake = false;
bool continuationNeeded = false;
std::deque<OutputRange> pendingOutputs;
Negentropy(uint64_t idSize = 16, uint64_t frameSizeLimit = 0) : idSize(idSize), frameSizeLimit(frameSizeLimit) {
if (idSize < 8 || idSize > 32) throw negentropy::err("idSize invalid");
if (frameSizeLimit != 0 && frameSizeLimit < 4096) throw negentropy::err("frameSizeLimit too small");
}
void addItem(uint64_t createdAt, std::string_view id) {
if (sealed) throw negentropy::err("already sealed");
if (id.size() < idSize) throw negentropy::err("bad id size");
addedItems.emplace_back(createdAt, id.substr(0, idSize));
}
void seal() {
if (sealed) throw negentropy::err("already sealed");
sealed = true;
std::sort(addedItems.begin(), addedItems.end());
if (addedItems.size() > 1) {
for (size_t i = 0; i < addedItems.size() - 1; i++) {
if (addedItems[i] == addedItems[i + 1]) throw negentropy::err("duplicate item inserted");
}
}
itemTimestamps.reserve(addedItems.size());
itemIds.reserve(addedItems.size() * idSize);
for (const auto &item : addedItems) {
itemTimestamps.push_back(item.timestamp);
itemIds += item.getId();
}
addedItems.clear();
addedItems.shrink_to_fit();
}
std::string initiate() {
if (!sealed) throw negentropy::err("not sealed");
if (didHandshake) throw negentropy::err("can't initiate after reconcile");
isInitiator = true;
splitRange(0, numItems(), Item(0), Item(MAX_U64), pendingOutputs);
auto output = std::move(buildOutput(true).value());
return output;
}
std::string reconcile(std::string_view query) {
if (isInitiator) throw negentropy::err("initiator not asking for have/need IDs");
if (!didHandshake) {
auto protocolVersion = getByte(query);
if (protocolVersion < 0x60 || protocolVersion > 0x6F) throw negentropy::err("invalid negentropy protocol version byte");
if (protocolVersion != PROTOCOL_VERSION_0) {
std::string o;
uint64_t lastTimestampOut = 0;
o += encodeBound(Item(PROTOCOL_VERSION_0), lastTimestampOut);
o += encodeVarInt(uint64_t(Mode::UnsupportedProtocolVersion));
return o;
}
didHandshake = true;
}
std::vector<std::string> haveIds, needIds;
reconcileAux(query, haveIds, needIds);
auto output = std::move(buildOutput(false).value());
return output;
}
std::optional<std::string> reconcile(std::string_view query, std::vector<std::string> &haveIds, std::vector<std::string> &needIds) {
if (!isInitiator) throw negentropy::err("non-initiator asking for have/need IDs");
reconcileAux(query, haveIds, needIds);
return buildOutput(false);
}
private:
size_t numItems() {
return itemTimestamps.size();
}
std::string_view getItemId(size_t i) {
return std::string_view(itemIds.data() + (i * idSize), idSize);
}
Item getItem(size_t i) {
return Item(itemTimestamps[i], getItemId(i));
}
std::string computeFingerprint(size_t lower, size_t num) {
unsigned char hash[SHA256_DIGEST_LENGTH];
SHA256(reinterpret_cast<unsigned char*>(itemIds.data() + (lower * idSize)), num * idSize, hash);
return std::string(reinterpret_cast<char*>(hash), idSize);
}
void reconcileAux(std::string_view query, std::vector<std::string> &haveIds, std::vector<std::string> &needIds) {
if (!sealed) throw negentropy::err("not sealed");
continuationNeeded = false;
Item prevBound;
size_t prevIndex = 0;
uint64_t lastTimestampIn = 0;
std::deque<OutputRange> outputs;
while (query.size()) {
auto currBound = decodeBound(query, lastTimestampIn);
auto mode = Mode(decodeVarInt(query));
auto lower = prevIndex;
auto upper = findUpperBound(prevIndex, numItems(), currBound);
if (mode == Mode::Skip) {
// Do nothing
} else if (mode == Mode::Fingerprint) {
auto theirFingerprint = getBytes(query, idSize);
auto ourFingerprint = computeFingerprint(lower, upper - lower);
if (theirFingerprint != ourFingerprint) {
splitRange(lower, upper, prevBound, currBound, outputs);
}
} else if (mode == Mode::IdList) {
auto numIds = decodeVarInt(query);
std::unordered_set<std::string> theirElems;
for (uint64_t i = 0; i < numIds; i++) {
auto e = getBytes(query, idSize);
theirElems.insert(e);
}
for (auto i = lower; i < upper; ++i) {
auto k = std::string(getItemId(i));
if (theirElems.find(k) == theirElems.end()) {
// ID exists on our side, but not their side
if (isInitiator) haveIds.emplace_back(k);
} else {
// ID exists on both sides
theirElems.erase(k);
}
}
if (isInitiator) {
for (const auto &k : theirElems) {
// ID exists on their side, but not our side
needIds.emplace_back(k);
}
} else {
std::vector<std::string> responseHaveIds;
auto it = lower;
bool didSplit = false;
Item splitBound;
auto flushIdListOutput = [&]{
std::string payload = encodeVarInt(uint64_t(Mode::IdList));
payload += encodeVarInt(responseHaveIds.size());
for (const auto &id : responseHaveIds) payload += id;
auto nextSplitBound = it + 1 >= upper ? currBound : getMinimalBound(getItem(it), getItem(it + 1));
outputs.emplace_back(OutputRange({
didSplit ? splitBound : prevBound,
nextSplitBound,
std::move(payload)
}));
splitBound = nextSplitBound;
didSplit = true;
responseHaveIds.clear();
};
for (; it < upper; ++it) {
responseHaveIds.emplace_back(getItemId(it));
if (responseHaveIds.size() >= 100) flushIdListOutput(); // 100*32 is less than minimum frame size limit of 4k
}
flushIdListOutput();
}
} else if (mode == Mode::Continuation) {
continuationNeeded = true;
} else if (mode == Mode::UnsupportedProtocolVersion) {
throw negentropy::err("server does not support our negentropy protocol version");
} else {
throw negentropy::err("unexpected mode");
}
prevIndex = upper;
prevBound = currBound;
}
while (outputs.size()) {
pendingOutputs.emplace_front(std::move(outputs.back()));
outputs.pop_back();
}
}
void splitRange(size_t lower, size_t upper, const Item &lowerBound, const Item &upperBound, std::deque<OutputRange> &outputs) {
uint64_t numElems = upper - lower;
const uint64_t buckets = 16;
if (numElems < buckets * 2) {
std::string payload = encodeVarInt(uint64_t(Mode::IdList));
payload += encodeVarInt(numElems);
for (auto i = lower; i < upper; i++) payload += getItemId(i);
outputs.emplace_back(OutputRange({
lowerBound,
upperBound,
std::move(payload)
}));
} else {
uint64_t itemsPerBucket = numElems / buckets;
uint64_t bucketsWithExtra = numElems % buckets;
auto curr = lower;
Item prevBound = getItem(curr);
for (uint64_t i = 0; i < buckets; i++) {
auto bucketSize = itemsPerBucket + (i < bucketsWithExtra ? 1 : 0);
auto ourFingerprint = computeFingerprint(curr, bucketSize);
curr += bucketSize;
std::string payload = encodeVarInt(uint64_t(Mode::Fingerprint));
payload += ourFingerprint;
outputs.emplace_back(OutputRange({
i == 0 ? lowerBound : prevBound,
curr == upper ? upperBound : getMinimalBound(getItem(curr - 1), getItem(curr)),
std::move(payload)
}));
prevBound = outputs.back().end;
}
outputs.back().end = upperBound;
}
}
std::optional<std::string> buildOutput(bool initialMessage) {
std::string output;
Item currBound;
uint64_t lastTimestampOut = 0;
if (initialMessage) {
if (didHandshake) throw negentropy::err("already built initial message");
didHandshake = true;
output.push_back(PROTOCOL_VERSION_0);
}
std::sort(pendingOutputs.begin(), pendingOutputs.end(), [](const auto &a, const auto &b){ return a.start < b.start; });
while (pendingOutputs.size()) {
std::string o;
auto &p = pendingOutputs.front();
// If bounds are out of order or overlapping, finish and resume next time (shouldn't happen because of sort above)
if (p.start < currBound) break;
if (currBound != p.start) {
o += encodeBound(p.start, lastTimestampOut);
o += encodeVarInt(uint64_t(Mode::Skip));
}
o += encodeBound(p.end, lastTimestampOut);
o += p.payload;
if (frameSizeLimit && output.size() + o.size() > frameSizeLimit - 5) break; // 5 leaves room for Continuation
output += o;
currBound = p.end;
pendingOutputs.pop_front();
}
// Server indicates that it has more to send, OR ensure client sends a non-empty message
if (!isInitiator && pendingOutputs.size()) {
output += encodeBound(Item(MAX_U64), lastTimestampOut);
output += encodeVarInt(uint64_t(Mode::Continuation));
}
if (isInitiator && output.size() == 0 && !continuationNeeded) {
return std::nullopt;
}
return output;
}
size_t findUpperBound(size_t first, size_t last, const Item &value) {
size_t count = last - first;
while (count > 0) {
size_t it = first;
size_t step = count / 2;
it += step;
if (value.timestamp == itemTimestamps[it] ? value.getId() < getItemId(it) : value.timestamp < itemTimestamps[it]) {
count = step;
} else {
first = ++it;
count -= step + 1;
}
}
return first;
}
// Decoding
uint8_t getByte(std::string_view &encoded) {
if (encoded.size() < 1) throw negentropy::err("parse ends prematurely");
uint8_t output = encoded[0];
encoded = encoded.substr(1);
return output;
}
std::string getBytes(std::string_view &encoded, size_t n) {
if (encoded.size() < n) throw negentropy::err("parse ends prematurely");
auto res = encoded.substr(0, n);
encoded = encoded.substr(n);
return std::string(res);
};
uint64_t decodeVarInt(std::string_view &encoded) {
uint64_t res = 0;
while (1) {
if (encoded.size() == 0) throw negentropy::err("premature end of varint");
uint64_t byte = encoded[0];
encoded = encoded.substr(1);
res = (res << 7) | (byte & 0b0111'1111);
if ((byte & 0b1000'0000) == 0) break;
}
return res;
}
uint64_t decodeTimestampIn(std::string_view &encoded, uint64_t &lastTimestampIn) {
uint64_t timestamp = decodeVarInt(encoded);
timestamp = timestamp == 0 ? MAX_U64 : timestamp - 1;
timestamp += lastTimestampIn;
if (timestamp < lastTimestampIn) timestamp = MAX_U64; // saturate
lastTimestampIn = timestamp;
return timestamp;
}
Item decodeBound(std::string_view &encoded, uint64_t &lastTimestampIn) {
auto timestamp = decodeTimestampIn(encoded, lastTimestampIn);
auto len = decodeVarInt(encoded);
return Item(timestamp, getBytes(encoded, len));
}
// Encoding
std::string encodeVarInt(uint64_t n) {
if (n == 0) return std::string(1, '\0');
std::string o;
while (n) {
o.push_back(static_cast<unsigned char>(n & 0x7F));
n >>= 7;
}
std::reverse(o.begin(), o.end());
for (size_t i = 0; i < o.size() - 1; i++) {
o[i] |= 0x80;
}
return o;
}
std::string encodeTimestampOut(uint64_t timestamp, uint64_t &lastTimestampOut) {
if (timestamp == MAX_U64) {
lastTimestampOut = MAX_U64;
return encodeVarInt(0);
}
uint64_t temp = timestamp;
timestamp -= lastTimestampOut;
lastTimestampOut = temp;
return encodeVarInt(timestamp + 1);
};
std::string encodeBound(const Item &bound, uint64_t &lastTimestampOut) {
std::string output;
output += encodeTimestampOut(bound.timestamp, lastTimestampOut);
output += encodeVarInt(bound.idSize);
output += bound.getId();
return output;
};
Item getMinimalBound(const Item &prev, const Item &curr) {
if (curr.timestamp != prev.timestamp) {
return Item(curr.timestamp, "");
} else {
uint64_t sharedPrefixBytes = 0;
auto currKey = curr.getId();
auto prevKey = prev.getId();
for (uint64_t i = 0; i < idSize; i++) {
if (currKey[i] != prevKey[i]) break;
sharedPrefixBytes++;
}
return Item(curr.timestamp, currKey.substr(0, sharedPrefixBytes + 1));
}
}
};
}
using Negentropy = negentropy::Negentropy;

141
cpp/README.md Normal file
View File

@ -0,0 +1,141 @@
# Negentropy C++ Implementation
The C++ implementation is header-only and the only required dependency is OpenSSL (for SHA-256). The main `Negentropy` class can be imported with the following:
#include "negentropy.h"
## Storage
First, you need to create a storage instance. Currently the following are available:
### negentropy::storage::Vector
All the elements are put into a contiguous vector in memory, and are then sorted. This can be useful for syncing the results of a dynamic query, since it can be constructed rapidly and consumes a minimal amount of memory. However, modifying it by adding or removing elements is expensive (linear in the size of the data-set).
#include "negentropy/storage/Vector.h"
To use `Vector`, add all your items with `insert` and then call `seal`:
negentropy::storage::Vector storage;
for (const auto &item : myItems) {
storage.insert(timestamp, id);
}
storage.seal();
After sealing, no more items can be added.
### negentropy::storage::BTreeMem
Keeps the elements in an in-memory B+Tree. Computing fingerprints, adding, and removing elements are all logarithmic in data-set size. However, the elements will not be persisted to disk, and the data-structure is not thread-safe.
#include "negentropy/storage/BTreeMem.h"
To use `BTreeMem`, items can be added in the same way as with `Vector`, however sealing is not necessary (although is supported -- it is a no-op):
negentropy::storage::BTreeMem storage;
for (const auto &item : myItems) {
storage.insert(timestamp, id);
}
More items can be added at any time, and items can be removed with `eraseItem`:
storage.insert(timestamp, id);
storage.erase(timestamp, id);
### negentropy::storage::BTreeLMDB
Uses the same implementation as BTreeMem, except that it uses [LMDB](http://lmdb.tech/) to save the data-set to persistent storage. Because the database is memory mapped, its read-performance is identical to the "in-memory" version (it is also in-memory, the memory just happens to reside in the page cache). Additionally, the tree can be concurrently accessed by multiple threads/processes using ACID transactions.
#include "negentropy/storage/BTreeLMDB.h"
First create an LMDB environment. Next, allocate a DBI to contain your tree(s) by calling `setupDB` inside a write transaction (don't forget to commit it). The `"test-data"` argument is the LMDB DBI table name you want to use:
negentropy::storage::BTreeLMDB storage;
auto env = lmdb::env::create();
env.set_max_dbs(64);
env.open("testdb/", 0);
lmdb::dbi btreeDbi;
{
auto txn = lmdb::txn::begin(env);
btreeDbi = negentropy::storage::BTreeLMDB::setupDB(txn, "test-data");
txn.commit();
}
To add/remove items, create a `BTreeLMDB` object inside a write transaction. This is the storage instance:
{
auto txn = lmdb::txn::begin(env);
negentropy::storage::BTreeLMDB storage(txn, btreeDbi, 300);
storage.insert(timestamp, id);
storage.flush();
txn.commit();
}
* The third parameter (`300` in the above example) is the `treeId`. This allows many different trees to co-exist in the same DBI.
* Storage must be flushed before commiting the transaction. `BTreeLMDB` will try to flush in its destructor. If you commit before this happens, you may see "mdb_put: Invalid argument" errors.
### negentropy::storage::SubRange
This storage is a proxy to a sub-range of another storage. It is useful for performing partial syncs of the DB.
The constructor arguments are the large storage you want to proxy to (of type `Vector`, `BTreeLMDB`, etc), and the lower and upper bounds of the desired sub-range. As usual, lower bounds are inclusive and upper bounds are exclusive:
negentropy::storage::SubRange subStorage(storage, negentropy::Bound(fromTimestamp), negentropy::Bound(toTimestamp));
## Reconciliation
Reconciliation works mostly the same for all storage types. First create a `Negentropy` object:
auto ne = Negentropy(storage, 50'000);
* The object is templated on the storage type, but can often be auto-deduced (as above).
* The second parameter (`50'000` above) is the `frameSizeLimit`. This can be omitted (or `0`) to permit unlimited-sized frames.
On the client-side, create an initial message, and then transmit it to the server, receive the response, and `reconcile` until complete:
std::string msg = ne.initiate();
while (true) {
std::string response = queryServer(msg);
std::vector<std::string> have, need;
std::optional<std::string> newMsg = ne.reconcile(response, have, need);
// handle have/need
if (!newMsg) break; // done
else std::swap(msg, *newMsg);
}
In each loop iteration, `have` contains IDs that the client has that the server doesn't, and `need` contains IDs that the server has that the client doesn't.
The server-side is similar, except it doesn't create an initial message, there are no `have`/`need` arrays, and it doesn't return an optional (servers must always reply to a request):
while (true) {
std::string msg = receiveMsgFromClient();
std::string response = ne.reconcile(msg);
respondToClient(response);
}
## BTree Implementation
The BTree implementation is technically a B+Tree since all records are stored in the leaves. Every node has `next` and `prev` pointers that point to the neighbour nodes on the same level, which allows efficient iteration.
Each node has an accumulator that contains the sum of the IDs of all nodes below it, allowing fingerprints to be computed in logarithmic time relative to the number of tree leaves.
Nodes will split and rebalance themselves as necessary to keep the tree balanced. This is a major advantage over rigid data-structures like merkle-search trees and prolly trees, which are only probabilisticly balanced.
If records are always inserted to the "right" of the tree, nodes will be fully packed. Otherwise, the tree attempts to keep them 50% full. There are more details on the tree invariants in the `negentropy/storage/btree/core.h` implementation file.

323
cpp/negentropy.h Normal file
View File

@ -0,0 +1,323 @@
// (C) 2023 Doug Hoyte. MIT license
#pragma once
#include <string.h>
#include <string>
#include <string_view>
#include <vector>
#include <deque>
#include <unordered_set>
#include <limits>
#include <algorithm>
#include <stdexcept>
#include <optional>
#include <bit>
#include "negentropy/encoding.h"
#include "negentropy/types.h"
#include "negentropy/storage/base.h"
namespace negentropy {
const uint64_t PROTOCOL_VERSION = 0x61; // Version 1
const uint64_t MAX_U64 = std::numeric_limits<uint64_t>::max();
using err = std::runtime_error;
template<typename StorageImpl>
struct Negentropy {
StorageImpl &storage;
uint64_t frameSizeLimit;
bool isInitiator = false;
uint64_t lastTimestampIn = 0;
uint64_t lastTimestampOut = 0;
Negentropy(StorageImpl &storage, uint64_t frameSizeLimit = 0) : storage(storage), frameSizeLimit(frameSizeLimit) {
if (frameSizeLimit != 0 && frameSizeLimit < 4096) throw negentropy::err("frameSizeLimit too small");
}
std::string initiate() {
if (isInitiator) throw negentropy::err("already initiated");
isInitiator = true;
std::string output;
output.push_back(PROTOCOL_VERSION);
output += splitRange(0, storage.size(), Bound(MAX_U64));
return output;
}
void setInitiator() {
isInitiator = true;
}
std::string reconcile(std::string_view query) {
if (isInitiator) throw negentropy::err("initiator not asking for have/need IDs");
std::vector<std::string> haveIds, needIds;
return reconcileAux(query, haveIds, needIds);
}
std::optional<std::string> reconcile(std::string_view query, std::vector<std::string> &haveIds, std::vector<std::string> &needIds) {
if (!isInitiator) throw negentropy::err("non-initiator asking for have/need IDs");
auto output = reconcileAux(query, haveIds, needIds);
if (output.size() == 1) return std::nullopt;
return output;
}
private:
std::string reconcileAux(std::string_view query, std::vector<std::string> &haveIds, std::vector<std::string> &needIds) {
lastTimestampIn = lastTimestampOut = 0; // reset for each message
std::string fullOutput;
fullOutput.push_back(PROTOCOL_VERSION);
auto protocolVersion = getByte(query);
if (protocolVersion < 0x60 || protocolVersion > 0x6F) throw negentropy::err("invalid negentropy protocol version byte");
if (protocolVersion != PROTOCOL_VERSION) {
if (isInitiator) throw negentropy::err(std::string("unsupported negentropy protocol version requested") + std::to_string(protocolVersion - 0x60));
else return fullOutput;
}
uint64_t storageSize = storage.size();
Bound prevBound;
size_t prevIndex = 0;
bool skip = false;
while (query.size()) {
std::string o;
auto doSkip = [&]{
if (skip) {
skip = false;
o += encodeBound(prevBound);
o += encodeVarInt(uint64_t(Mode::Skip));
}
};
auto currBound = decodeBound(query);
auto mode = Mode(decodeVarInt(query));
auto lower = prevIndex;
auto upper = storage.findLowerBound(prevIndex, storageSize, currBound);
if (mode == Mode::Skip) {
skip = true;
} else if (mode == Mode::Fingerprint) {
auto theirFingerprint = getBytes(query, FINGERPRINT_SIZE);
auto ourFingerprint = storage.fingerprint(lower, upper);
if (theirFingerprint != ourFingerprint.sv()) {
doSkip();
o += splitRange(lower, upper, currBound);
} else {
skip = true;
}
} else if (mode == Mode::IdList) {
auto numIds = decodeVarInt(query);
std::unordered_set<std::string> theirElems;
for (uint64_t i = 0; i < numIds; i++) {
auto e = getBytes(query, ID_SIZE);
theirElems.insert(e);
}
storage.iterate(lower, upper, [&](const Item &item, size_t){
auto k = std::string(item.getId());
if (theirElems.find(k) == theirElems.end()) {
// ID exists on our side, but not their side
if (isInitiator) haveIds.emplace_back(k);
} else {
// ID exists on both sides
theirElems.erase(k);
}
return true;
});
if (isInitiator) {
skip = true;
for (const auto &k : theirElems) {
// ID exists on their side, but not our side
needIds.emplace_back(k);
}
} else {
doSkip();
std::string responseIds;
uint64_t numResponseIds = 0;
Bound endBound = currBound;
storage.iterate(lower, upper, [&](const Item &item, size_t index){
if (exceededFrameSizeLimit(fullOutput.size() + responseIds.size())) {
endBound = Bound(item);
upper = index; // shrink upper so that remaining range gets correct fingerprint
return false;
}
responseIds += item.getId();
numResponseIds++;
return true;
});
o += encodeBound(endBound);
o += encodeVarInt(uint64_t(Mode::IdList));
o += encodeVarInt(numResponseIds);
o += responseIds;
fullOutput += o;
o.clear();
}
} else {
throw negentropy::err("unexpected mode");
}
if (exceededFrameSizeLimit(fullOutput.size() + o.size())) {
// frameSizeLimit exceeded: Stop range processing and return a fingerprint for the remaining range
auto remainingFingerprint = storage.fingerprint(upper, storageSize);
fullOutput += encodeBound(Bound(MAX_U64));
fullOutput += encodeVarInt(uint64_t(Mode::Fingerprint));
fullOutput += remainingFingerprint.sv();
break;
} else {
fullOutput += o;
}
prevIndex = upper;
prevBound = currBound;
}
return fullOutput;
}
std::string splitRange(size_t lower, size_t upper, const Bound &upperBound) {
std::string o;
uint64_t numElems = upper - lower;
const uint64_t buckets = 16;
if (numElems < buckets * 2) {
o += encodeBound(upperBound);
o += encodeVarInt(uint64_t(Mode::IdList));
o += encodeVarInt(numElems);
storage.iterate(lower, upper, [&](const Item &item, size_t){
o += item.getId();
return true;
});
} else {
uint64_t itemsPerBucket = numElems / buckets;
uint64_t bucketsWithExtra = numElems % buckets;
auto curr = lower;
for (uint64_t i = 0; i < buckets; i++) {
auto bucketSize = itemsPerBucket + (i < bucketsWithExtra ? 1 : 0);
auto ourFingerprint = storage.fingerprint(curr, curr + bucketSize);
curr += bucketSize;
Bound nextBound;
if (curr == upper) {
nextBound = upperBound;
} else {
Item prevItem, currItem;
storage.iterate(curr - 1, curr + 1, [&](const Item &item, size_t index){
if (index == curr - 1) prevItem = item;
else currItem = item;
return true;
});
nextBound = getMinimalBound(prevItem, currItem);
}
o += encodeBound(nextBound);
o += encodeVarInt(uint64_t(Mode::Fingerprint));
o += ourFingerprint.sv();
}
}
return o;
}
bool exceededFrameSizeLimit(size_t n) {
return frameSizeLimit && n > frameSizeLimit - 200;
}
// Decoding
uint64_t decodeTimestampIn(std::string_view &encoded) {
uint64_t timestamp = decodeVarInt(encoded);
timestamp = timestamp == 0 ? MAX_U64 : timestamp - 1;
timestamp += lastTimestampIn;
if (timestamp < lastTimestampIn) timestamp = MAX_U64; // saturate
lastTimestampIn = timestamp;
return timestamp;
}
Bound decodeBound(std::string_view &encoded) {
auto timestamp = decodeTimestampIn(encoded);
auto len = decodeVarInt(encoded);
return Bound(timestamp, getBytes(encoded, len));
}
// Encoding
std::string encodeTimestampOut(uint64_t timestamp) {
if (timestamp == MAX_U64) {
lastTimestampOut = MAX_U64;
return encodeVarInt(0);
}
uint64_t temp = timestamp;
timestamp -= lastTimestampOut;
lastTimestampOut = temp;
return encodeVarInt(timestamp + 1);
};
std::string encodeBound(const Bound &bound) {
std::string output;
output += encodeTimestampOut(bound.item.timestamp);
output += encodeVarInt(bound.idLen);
output += bound.item.getId().substr(0, bound.idLen);
return output;
};
Bound getMinimalBound(const Item &prev, const Item &curr) {
if (curr.timestamp != prev.timestamp) {
return Bound(curr.timestamp);
} else {
uint64_t sharedPrefixBytes = 0;
auto currKey = curr.getId();
auto prevKey = prev.getId();
for (uint64_t i = 0; i < ID_SIZE; i++) {
if (currKey[i] != prevKey[i]) break;
sharedPrefixBytes++;
}
return Bound(curr.timestamp, currKey.substr(0, sharedPrefixBytes + 1));
}
}
};
}
template<typename T>
using Negentropy = negentropy::Negentropy<T>;

60
cpp/negentropy/encoding.h Normal file
View File

@ -0,0 +1,60 @@
#pragma once
#include <string_view>
namespace negentropy {
using err = std::runtime_error;
inline uint8_t getByte(std::string_view &encoded) {
if (encoded.size() < 1) throw negentropy::err("parse ends prematurely");
uint8_t output = encoded[0];
encoded = encoded.substr(1);
return output;
}
inline std::string getBytes(std::string_view &encoded, size_t n) {
if (encoded.size() < n) throw negentropy::err("parse ends prematurely");
auto res = encoded.substr(0, n);
encoded = encoded.substr(n);
return std::string(res);
};
inline uint64_t decodeVarInt(std::string_view &encoded) {
uint64_t res = 0;
while (1) {
if (encoded.size() == 0) throw negentropy::err("premature end of varint");
uint64_t byte = encoded[0];
encoded = encoded.substr(1);
res = (res << 7) | (byte & 0b0111'1111);
if ((byte & 0b1000'0000) == 0) break;
}
return res;
}
inline std::string encodeVarInt(uint64_t n) {
if (n == 0) return std::string(1, '\0');
std::string o;
while (n) {
o.push_back(static_cast<unsigned char>(n & 0x7F));
n >>= 7;
}
std::reverse(o.begin(), o.end());
for (size_t i = 0; i < o.size() - 1; i++) {
o[i] |= 0x80;
}
return o;
}
}

View File

@ -0,0 +1,146 @@
#pragma once
#include <map>
#include "lmdbxx/lmdb++.h"
#include "negentropy.h"
#include "negentropy/storage/btree/core.h"
namespace negentropy { namespace storage {
using err = std::runtime_error;
using Node = negentropy::storage::btree::Node;
using NodePtr = negentropy::storage::btree::NodePtr;
struct BTreeLMDB : btree::BTreeCore {
lmdb::txn &txn;
lmdb::dbi dbi;
uint64_t treeId;
struct MetaData {
uint64_t rootNodeId;
uint64_t nextNodeId;
bool operator==(const MetaData &other) const {
return rootNodeId == other.rootNodeId && nextNodeId == other.nextNodeId;
}
};
MetaData metaDataCache;
MetaData origMetaData;
std::map<uint64_t, Node> dirtyNodeCache;
static lmdb::dbi setupDB(lmdb::txn &txn, std::string_view tableName) {
return lmdb::dbi::open(txn, tableName, MDB_CREATE | MDB_REVERSEKEY);
}
BTreeLMDB(lmdb::txn &txn, lmdb::dbi dbi, uint64_t treeId) : txn(txn), dbi(dbi), treeId(treeId) {
static_assert(sizeof(MetaData) == 16);
std::string_view v;
bool found = dbi.get(txn, getKey(0), v);
metaDataCache = found ? lmdb::from_sv<MetaData>(v) : MetaData{ 0, 1, };
origMetaData = metaDataCache;
}
~BTreeLMDB() {
flush();
}
void flush() {
for (auto &[nodeId, node] : dirtyNodeCache) {
dbi.put(txn, getKey(nodeId), node.sv());
}
dirtyNodeCache.clear();
if (metaDataCache != origMetaData) {
dbi.put(txn, getKey(0), lmdb::to_sv<MetaData>(metaDataCache));
origMetaData = metaDataCache;
}
}
// Interface
const btree::NodePtr getNodeRead(uint64_t nodeId) {
if (nodeId == 0) return {nullptr, 0};
auto res = dirtyNodeCache.find(nodeId);
if (res != dirtyNodeCache.end()) return NodePtr{&res->second, nodeId};
std::string_view sv;
bool found = dbi.get(txn, getKey(nodeId), sv);
if (!found) throw err("couldn't find node");
return NodePtr{(Node*)sv.data(), nodeId};
}
btree::NodePtr getNodeWrite(uint64_t nodeId) {
if (nodeId == 0) return {nullptr, 0};
{
auto res = dirtyNodeCache.find(nodeId);
if (res != dirtyNodeCache.end()) return NodePtr{&res->second, nodeId};
}
std::string_view sv;
bool found = dbi.get(txn, getKey(nodeId), sv);
if (!found) throw err("couldn't find node");
auto res = dirtyNodeCache.try_emplace(nodeId);
Node *newNode = &res.first->second;
memcpy(newNode, sv.data(), sizeof(Node));
return NodePtr{newNode, nodeId};
}
btree::NodePtr makeNode() {
uint64_t nodeId = metaDataCache.nextNodeId++;
auto res = dirtyNodeCache.try_emplace(nodeId);
return NodePtr{&res.first->second, nodeId};
}
void deleteNode(uint64_t nodeId) {
if (nodeId == 0) throw err("can't delete metadata");
dirtyNodeCache.erase(nodeId);
dbi.del(txn, getKey(nodeId));
}
uint64_t getRootNodeId() {
return metaDataCache.rootNodeId;
}
void setRootNodeId(uint64_t newRootNodeId) {
metaDataCache.rootNodeId = newRootNodeId;
}
// Internal utils
private:
std::string getKey(uint64_t n) {
uint64_t treeIdCopy = treeId;
if constexpr (std::endian::native == std::endian::big) {
auto byteswap = [](uint64_t &n) {
uint8_t *first = reinterpret_cast<uint8_t*>(&n);
uint8_t *last = first + 8;
std::reverse(first, last);
};
byteswap(n);
byteswap(treeIdCopy);
} else {
static_assert(std::endian::native == std::endian::little);
}
std::string k;
k += lmdb::to_sv<uint64_t>(treeIdCopy);
k += lmdb::to_sv<uint64_t>(n);
return k;
}
};
}}

View File

@ -0,0 +1,48 @@
#pragma once
#include "negentropy.h"
#include "negentropy/storage/btree/core.h"
namespace negentropy { namespace storage {
struct BTreeMem : btree::BTreeCore {
std::unordered_map<uint64_t, btree::Node> _nodeStorageMap;
uint64_t _rootNodeId = 0; // 0 means no root
uint64_t _nextNodeId = 1;
// Interface
const btree::NodePtr getNodeRead(uint64_t nodeId) {
if (nodeId == 0) return {nullptr, 0};
auto res = _nodeStorageMap.find(nodeId);
if (res == _nodeStorageMap.end()) return btree::NodePtr{nullptr, 0};
return btree::NodePtr{&res->second, nodeId};
}
btree::NodePtr getNodeWrite(uint64_t nodeId) {
return getNodeRead(nodeId);
}
btree::NodePtr makeNode() {
uint64_t nodeId = _nextNodeId++;
_nodeStorageMap.try_emplace(nodeId);
return getNodeRead(nodeId);
}
void deleteNode(uint64_t nodeId) {
_nodeStorageMap.erase(nodeId);
}
uint64_t getRootNodeId() {
return _rootNodeId;
}
void setRootNodeId(uint64_t newRootNodeId) {
_rootNodeId = newRootNodeId;
}
};
}}

View File

@ -0,0 +1,63 @@
#pragma once
#include <algorithm>
#include "negentropy.h"
namespace negentropy { namespace storage {
struct SubRange : StorageBase {
StorageBase &base;
size_t baseSize;
size_t subBegin;
size_t subEnd;
size_t subSize;
SubRange(StorageBase &base, const Bound &lowerBound, const Bound &upperBound) : base(base) {
baseSize = base.size();
subBegin = lowerBound == Bound(0) ? 0 : base.findLowerBound(0, baseSize, lowerBound);
subEnd = upperBound == Bound(MAX_U64) ? baseSize : base.findLowerBound(subBegin, baseSize, upperBound);
if (subEnd != baseSize && Bound(base.getItem(subEnd)) == upperBound) subEnd++; // instead of upper_bound: OK because items are unique
subSize = subEnd - subBegin;
}
uint64_t size() {
return subSize;
}
const Item &getItem(size_t i) {
if (i >= subSize) throw negentropy::err("bad index");
return base.getItem(subBegin + i);
}
void iterate(size_t begin, size_t end, std::function<bool(const Item &, size_t)> cb) {
checkBounds(begin, end);
base.iterate(subBegin + begin, subBegin + end, [&](const Item &item, size_t index){
return cb(item, index - subBegin);
});
}
size_t findLowerBound(size_t begin, size_t end, const Bound &bound) {
checkBounds(begin, end);
return std::min(base.findLowerBound(subBegin + begin, subBegin + end, bound) - subBegin, subSize);
}
Fingerprint fingerprint(size_t begin, size_t end) {
checkBounds(begin, end);
return base.fingerprint(subBegin + begin, subBegin + end);
}
private:
void checkBounds(size_t begin, size_t end) {
if (begin > end || end > subSize) throw negentropy::err("bad range");
}
};
}}

View File

@ -0,0 +1,88 @@
#pragma once
#include "negentropy.h"
namespace negentropy { namespace storage {
struct Vector : StorageBase {
std::vector<Item> items;
bool sealed = false;
void insert(uint64_t createdAt, std::string_view id) {
if (sealed) throw negentropy::err("already sealed");
if (id.size() != ID_SIZE) throw negentropy::err("bad id size for added item");
items.emplace_back(createdAt, id);
}
void insertItem(const Item &item) {
insert(item.timestamp, item.getId());
}
void seal() {
if (sealed) throw negentropy::err("already sealed");
sealed = true;
std::sort(items.begin(), items.end());
for (size_t i = 1; i < items.size(); i++) {
if (items[i - 1] == items[i]) throw negentropy::err("duplicate item inserted");
}
}
void unseal() {
sealed = false;
}
uint64_t size() {
checkSealed();
return items.size();
}
const Item &getItem(size_t i) {
checkSealed();
return items.at(i);
}
void iterate(size_t begin, size_t end, std::function<bool(const Item &, size_t)> cb) {
checkSealed();
checkBounds(begin, end);
for (auto i = begin; i < end; ++i) {
if (!cb(items[i], i)) break;
}
}
size_t findLowerBound(size_t begin, size_t end, const Bound &bound) {
checkSealed();
checkBounds(begin, end);
return std::lower_bound(items.begin() + begin, items.begin() + end, bound.item) - items.begin();
}
Fingerprint fingerprint(size_t begin, size_t end) {
Accumulator out;
out.setToZero();
iterate(begin, end, [&](const Item &item, size_t){
out.add(item);
return true;
});
return out.getFingerprint(end - begin);
}
private:
void checkSealed() {
if (!sealed) throw negentropy::err("not sealed");
}
void checkBounds(size_t begin, size_t end) {
if (begin > end || end > items.size()) throw negentropy::err("bad range");
}
};
}}

View File

@ -0,0 +1,22 @@
#pragma once
#include <functional>
#include "negentropy/types.h"
namespace negentropy {
struct StorageBase {
virtual uint64_t size() = 0;
virtual const Item &getItem(size_t i) = 0;
virtual void iterate(size_t begin, size_t end, std::function<bool(const Item &, size_t)> cb) = 0;
virtual size_t findLowerBound(size_t begin, size_t end, const Bound &value) = 0;
virtual Fingerprint fingerprint(size_t begin, size_t end) = 0;
};
}

View File

@ -0,0 +1,652 @@
#pragma once
#include <algorithm>
#include "negentropy.h"
namespace negentropy { namespace storage { namespace btree {
using err = std::runtime_error;
/*
Each node contains an array of keys. For leaf nodes, the keys are 0. For non-leaf nodes, these will
be the nodeIds of the children leaves. The items in the keys of non-leaf nodes are the first items
in the corresponding child nodes.
Except for the right-most nodes in the tree at each level (which includes the root node), all nodes
contain at least MIN_ITEMS and at most MAX_ITEMS.
If a node falls below MIN_ITEMS, a neighbour node (which always has the same parent) is selected.
* If between the two nodes there are REBALANCE_THRESHOLD or fewer total items, all items are
moved into one node and the other is deleted.
* If there are more than REBALANCE_THRESHOLD total items, then the items are divided into two
approximately equal-sized halves.
If a node goes above MAX_ITEMS then a new neighbour node is created.
* If the node is the right-most in its level, pack the old node to MAX_ITEMS, and move the rest
into the new neighbour. This optimises space-usage in the case of append workloads.
* Otherwise, split the node into two approximately equal-sized halves.
*/
#ifdef NE_FUZZ_TEST
// Fuzz test mode: Causes a large amount of tree structure changes like splitting, moving, and rebalancing
const size_t MIN_ITEMS = 2;
const size_t REBALANCE_THRESHOLD = 4;
const size_t MAX_ITEMS = 6;
#else
// Production mode: Nodes fit into 4k pages, and oscillating insert/erase will not cause tree structure changes
const size_t MIN_ITEMS = 30;
const size_t REBALANCE_THRESHOLD = 60;
const size_t MAX_ITEMS = 80;
#endif
static_assert(MIN_ITEMS < REBALANCE_THRESHOLD);
static_assert(REBALANCE_THRESHOLD < MAX_ITEMS);
static_assert(MAX_ITEMS / 2 > MIN_ITEMS);
static_assert(MIN_ITEMS % 2 == 0 && REBALANCE_THRESHOLD % 2 == 0 && MAX_ITEMS % 2 == 0);
struct Key {
Item item;
uint64_t nodeId;
void setToZero() {
item = Item();
nodeId = 0;
}
};
inline bool operator<(const Key &a, const Key &b) {
return a.item < b.item;
};
struct Node {
uint64_t numItems; // Number of items in this Node
uint64_t accumCount; // Total number of items in or under this Node
uint64_t nextSibling; // Pointer to next node in this level
uint64_t prevSibling; // Pointer to previous node in this level
Accumulator accum;
Key items[MAX_ITEMS + 1];
Node() {
memset((void*)this, '\0', sizeof(*this));
}
std::string_view sv() {
return std::string_view(reinterpret_cast<char*>(this), sizeof(*this));
}
};
struct NodePtr {
Node *p;
uint64_t nodeId;
bool exists() {
return p != nullptr;
}
Node &get() const {
return *p;
}
};
struct Breadcrumb {
size_t index;
NodePtr nodePtr;
};
struct BTreeCore : StorageBase {
//// Node Storage
virtual const NodePtr getNodeRead(uint64_t nodeId) = 0;
virtual NodePtr getNodeWrite(uint64_t nodeId) = 0;
virtual NodePtr makeNode() = 0;
virtual void deleteNode(uint64_t nodeId) = 0;
virtual uint64_t getRootNodeId() = 0;
virtual void setRootNodeId(uint64_t newRootNodeId) = 0;
//// Search
std::vector<Breadcrumb> searchItem(uint64_t rootNodeId, const Item &newItem, bool &found) {
found = false;
std::vector<Breadcrumb> breadcrumbs;
auto foundNode = getNodeRead(rootNodeId);
while (foundNode.nodeId) {
const auto &node = foundNode.get();
size_t index = node.numItems - 1;
if (node.numItems > 1) {
for (size_t i = 1; i < node.numItems + 1; i++) {
if (i == node.numItems + 1 || newItem < node.items[i].item) {
index = i - 1;
break;
}
}
}
if (!found && (newItem == node.items[index].item)) found = true;
breadcrumbs.push_back({index, foundNode});
foundNode = getNodeRead(node.items[index].nodeId);
}
return breadcrumbs;
}
//// Insert
bool insert(uint64_t createdAt, std::string_view id) {
return insertItem(Item(createdAt, id));
}
bool insertItem(const Item &newItem) {
// Make root leaf in case it doesn't exist
auto rootNodeId = getRootNodeId();
if (!rootNodeId) {
auto newNodePtr = makeNode();
auto &newNode = newNodePtr.get();
newNode.items[0].item = newItem;
newNode.numItems++;
newNode.accum.add(newItem);
newNode.accumCount = 1;
setRootNodeId(newNodePtr.nodeId);
return true;
}
// Traverse interior nodes, leaving breadcrumbs along the way
bool found;
auto breadcrumbs = searchItem(rootNodeId, newItem, found);
if (found) return false; // already inserted
// Follow breadcrumbs back to root
Key newKey = { newItem, 0 };
bool needsMerge = true;
while (breadcrumbs.size()) {
auto crumb = breadcrumbs.back();
breadcrumbs.pop_back();
auto &node = getNodeWrite(crumb.nodePtr.nodeId).get();
if (!needsMerge) {
node.accum.add(newItem);
node.accumCount++;
} else if (crumb.nodePtr.get().numItems < MAX_ITEMS) {
// Happy path: Node has room for new item
node.items[node.numItems] = newKey;
std::inplace_merge(node.items, node.items + node.numItems, node.items + node.numItems + 1);
node.numItems++;
node.accum.add(newItem);
node.accumCount++;
needsMerge = false;
} else {
// Node is full: Split it into 2
auto &left = node;
auto rightPtr = makeNode();
auto &right = rightPtr.get();
left.items[MAX_ITEMS] = newKey;
std::inplace_merge(left.items, left.items + MAX_ITEMS, left.items + MAX_ITEMS + 1);
left.accum.setToZero();
left.accumCount = 0;
if (!left.nextSibling) {
// If right-most node, pack as tightly as possible to optimise for append workloads
left.numItems = MAX_ITEMS;
right.numItems = 1;
} else {
// Otherwise, split the node equally
left.numItems = (MAX_ITEMS / 2) + 1;
right.numItems = MAX_ITEMS / 2;
}
for (size_t i = 0; i < left.numItems; i++) {
addToAccum(left.items[i], left);
}
for (size_t i = 0; i < right.numItems; i++) {
right.items[i] = left.items[left.numItems + i];
addToAccum(right.items[i], right);
}
for (size_t i = left.numItems; i < MAX_ITEMS + 1; i++) left.items[i].setToZero();
right.nextSibling = left.nextSibling;
left.nextSibling = rightPtr.nodeId;
right.prevSibling = crumb.nodePtr.nodeId;
if (right.nextSibling) {
auto &rightRight = getNodeWrite(right.nextSibling).get();
rightRight.prevSibling = rightPtr.nodeId;
}
newKey = { right.items[0].item, rightPtr.nodeId };
}
// Update left-most key, in case item was inserted at the beginning
refreshIndex(node, 0);
}
// Out of breadcrumbs but still need to merge: New level required
if (needsMerge) {
auto &left = getNodeRead(rootNodeId).get();
auto &right = getNodeRead(newKey.nodeId).get();
auto newRootPtr = makeNode();
auto &newRoot = newRootPtr.get();
newRoot.numItems = 2;
newRoot.accum.add(left.accum);
newRoot.accum.add(right.accum);
newRoot.accumCount = left.accumCount + right.accumCount;
newRoot.items[0] = left.items[0];
newRoot.items[0].nodeId = rootNodeId;
newRoot.items[1] = right.items[0];
newRoot.items[1].nodeId = newKey.nodeId;
setRootNodeId(newRootPtr.nodeId);
}
return true;
}
/// Erase
bool erase(uint64_t createdAt, std::string_view id) {
return eraseItem(Item(createdAt, id));
}
bool eraseItem(const Item &oldItem) {
auto rootNodeId = getRootNodeId();
if (!rootNodeId) return false;
// Traverse interior nodes, leaving breadcrumbs along the way
bool found;
auto breadcrumbs = searchItem(rootNodeId, oldItem, found);
if (!found) return false;
// Remove from node
bool needsRemove = true;
bool neighbourRefreshNeeded = false;
while (breadcrumbs.size()) {
auto crumb = breadcrumbs.back();
breadcrumbs.pop_back();
auto &node = getNodeWrite(crumb.nodePtr.nodeId).get();
if (!needsRemove) {
node.accum.sub(oldItem);
node.accumCount--;
} else {
for (size_t i = crumb.index + 1; i < node.numItems; i++) node.items[i - 1] = node.items[i];
node.numItems--;
node.items[node.numItems].setToZero();
node.accum.sub(oldItem);
node.accumCount--;
needsRemove = false;
}
if (crumb.index < node.numItems) refreshIndex(node, crumb.index);
if (neighbourRefreshNeeded) {
refreshIndex(node, crumb.index + 1);
neighbourRefreshNeeded = false;
}
if (node.numItems < MIN_ITEMS && breadcrumbs.size() && breadcrumbs.back().nodePtr.get().numItems > 1) {
auto rebalance = [&](Node &leftNode, Node &rightNode) {
size_t totalItems = leftNode.numItems + rightNode.numItems;
size_t numLeft = (totalItems + 1) / 2;
size_t numRight = totalItems - numLeft;
Accumulator accum;
accum.setToZero();
uint64_t accumCount = 0;
if (rightNode.numItems >= numRight) {
// Move extra from right to left
size_t numMove = rightNode.numItems - numRight;
for (size_t i = 0; i < numMove; i++) {
auto &item = rightNode.items[i];
if (item.nodeId == 0) {
accum.add(item.item);
accumCount++;
} else {
auto &movingNode = getNodeRead(item.nodeId).get();
accum.add(movingNode.accum);
accumCount += movingNode.accumCount;
}
leftNode.items[leftNode.numItems + i] = item;
}
::memmove(rightNode.items, rightNode.items + numMove, (rightNode.numItems - numMove) * sizeof(rightNode.items[0]));
for (size_t i = numRight; i < rightNode.numItems; i++) rightNode.items[i].setToZero();
leftNode.accum.add(accum);
rightNode.accum.sub(accum);
leftNode.accumCount += accumCount;
rightNode.accumCount -= accumCount;
neighbourRefreshNeeded = true;
} else {
// Move extra from left to right
size_t numMove = leftNode.numItems - numLeft;
::memmove(rightNode.items + numMove, rightNode.items, rightNode.numItems * sizeof(rightNode.items[0]));
for (size_t i = 0; i < numMove; i++) {
auto &item = leftNode.items[numLeft + i];
if (item.nodeId == 0) {
accum.add(item.item);
accumCount++;
} else {
auto &movingNode = getNodeRead(item.nodeId).get();
accum.add(movingNode.accum);
accumCount += movingNode.accumCount;
}
rightNode.items[i] = item;
}
for (size_t i = numLeft; i < leftNode.numItems; i++) leftNode.items[i].setToZero();
leftNode.accum.sub(accum);
rightNode.accum.add(accum);
leftNode.accumCount -= accumCount;
rightNode.accumCount += accumCount;
}
leftNode.numItems = numLeft;
rightNode.numItems = numRight;
};
if (breadcrumbs.back().index == 0) {
// Use neighbour to the right
auto &leftNode = node;
auto &rightNode = getNodeWrite(node.nextSibling).get();
size_t totalItems = leftNode.numItems + rightNode.numItems;
if (totalItems <= REBALANCE_THRESHOLD) {
// Move all items into right
::memmove(rightNode.items + leftNode.numItems, rightNode.items, sizeof(rightNode.items[0]) * rightNode.numItems);
::memcpy(rightNode.items, leftNode.items, sizeof(leftNode.items[0]) * leftNode.numItems);
rightNode.numItems += leftNode.numItems;
rightNode.accumCount += leftNode.accumCount;
rightNode.accum.add(leftNode.accum);
if (leftNode.prevSibling) getNodeWrite(leftNode.prevSibling).get().nextSibling = leftNode.nextSibling;
rightNode.prevSibling = leftNode.prevSibling;
leftNode.numItems = 0;
} else {
// Rebalance from left to right
rebalance(leftNode, rightNode);
}
} else {
// Use neighbour to the left
auto &leftNode = getNodeWrite(node.prevSibling).get();
auto &rightNode = node;
size_t totalItems = leftNode.numItems + rightNode.numItems;
if (totalItems <= REBALANCE_THRESHOLD) {
// Move all items into left
::memcpy(leftNode.items + leftNode.numItems, rightNode.items, sizeof(rightNode.items[0]) * rightNode.numItems);
leftNode.numItems += rightNode.numItems;
leftNode.accumCount += rightNode.accumCount;
leftNode.accum.add(rightNode.accum);
if (rightNode.nextSibling) getNodeWrite(rightNode.nextSibling).get().prevSibling = rightNode.prevSibling;
leftNode.nextSibling = rightNode.nextSibling;
rightNode.numItems = 0;
} else {
// Rebalance from right to left
rebalance(leftNode, rightNode);
}
}
}
if (node.numItems == 0) {
if (node.prevSibling) getNodeWrite(node.prevSibling).get().nextSibling = node.nextSibling;
if (node.nextSibling) getNodeWrite(node.nextSibling).get().prevSibling = node.prevSibling;
needsRemove = true;
deleteNode(crumb.nodePtr.nodeId);
}
}
if (needsRemove) {
setRootNodeId(0);
} else {
auto &node = getNodeRead(rootNodeId).get();
if (node.numItems == 1 && node.items[0].nodeId) {
setRootNodeId(node.items[0].nodeId);
deleteNode(rootNodeId);
}
}
return true;
}
//// Compat with the vector interface
void seal() {
}
void unseal() {
}
//// Utils
void refreshIndex(Node &node, size_t index) {
auto childNodePtr = getNodeRead(node.items[index].nodeId);
if (childNodePtr.exists()) {
auto &childNode = childNodePtr.get();
node.items[index].item = childNode.items[0].item;
}
}
void addToAccum(const Key &k, Node &node) {
if (k.nodeId == 0) {
node.accum.add(k.item);
node.accumCount++;
} else {
auto nodePtr = getNodeRead(k.nodeId);
node.accum.add(nodePtr.get().accum);
node.accumCount += nodePtr.get().accumCount;
}
}
void traverseToOffset(size_t index, const std::function<void(Node &node, size_t index)> &cb, std::function<void(Node &)> customAccum = nullptr) {
auto rootNodePtr = getNodeRead(getRootNodeId());
if (!rootNodePtr.exists()) return;
auto &rootNode = rootNodePtr.get();
if (index > rootNode.accumCount) throw err("out of range");
return traverseToOffsetAux(index, rootNode, cb, customAccum);
}
void traverseToOffsetAux(size_t index, Node &node, const std::function<void(Node &node, size_t index)> &cb, std::function<void(Node &)> customAccum) {
if (node.numItems == node.accumCount) {
cb(node, index);
return;
}
for (size_t i = 0; i < node.numItems; i++) {
auto &child = getNodeRead(node.items[i].nodeId).get();
if (index < child.accumCount) return traverseToOffsetAux(index, child, cb, customAccum);
index -= child.accumCount;
if (customAccum) customAccum(child);
}
}
//// Interface
uint64_t size() {
auto rootNodePtr = getNodeRead(getRootNodeId());
if (!rootNodePtr.exists()) return 0;
auto &rootNode = rootNodePtr.get();
return rootNode.accumCount;
}
const Item &getItem(size_t index) {
if (index >= size()) throw err("out of range");
Item *out;
traverseToOffset(index, [&](Node &node, size_t index){
out = &node.items[index].item;
});
return *out;
}
void iterate(size_t begin, size_t end, std::function<bool(const Item &, size_t)> cb) {
checkBounds(begin, end);
size_t num = end - begin;
traverseToOffset(begin, [&](Node &node, size_t index){
Node *currNode = &node;
for (size_t i = 0; i < num; i++) {
if (!cb(currNode->items[index].item, begin + i)) return;
index++;
if (index >= currNode->numItems) {
currNode = getNodeRead(currNode->nextSibling).p;
index = 0;
}
}
});
}
size_t findLowerBound(size_t begin, size_t end, const Bound &value) {
checkBounds(begin, end);
auto rootNodePtr = getNodeRead(getRootNodeId());
if (!rootNodePtr.exists()) return end;
auto &rootNode = rootNodePtr.get();
if (value.item <= rootNode.items[0].item) return begin;
return std::min(findLowerBoundAux(value, rootNodePtr, 0), end);
}
size_t findLowerBoundAux(const Bound &value, NodePtr nodePtr, uint64_t numToLeft) {
if (!nodePtr.exists()) return numToLeft + 1;
Node &node = nodePtr.get();
for (size_t i = 1; i < node.numItems; i++) {
if (value.item <= node.items[i].item) {
return findLowerBoundAux(value, getNodeRead(node.items[i - 1].nodeId), numToLeft);
} else {
if (node.items[i - 1].nodeId) numToLeft += getNodeRead(node.items[i - 1].nodeId).get().accumCount;
else numToLeft++;
}
}
return findLowerBoundAux(value, getNodeRead(node.items[node.numItems - 1].nodeId), numToLeft);
}
Fingerprint fingerprint(size_t begin, size_t end) {
checkBounds(begin, end);
auto getAccumLeftOf = [&](size_t index) {
Accumulator accum;
accum.setToZero();
traverseToOffset(index, [&](Node &node, size_t index){
for (size_t i = 0; i < index; i++) accum.add(node.items[i].item);
}, [&](Node &node){
accum.add(node.accum);
});
return accum;
};
auto accum1 = getAccumLeftOf(begin);
auto accum2 = getAccumLeftOf(end);
accum1.negate();
accum2.add(accum1);
return accum2.getFingerprint(end - begin);
}
private:
void checkBounds(size_t begin, size_t end) {
if (begin > end || end > size()) throw negentropy::err("bad range");
}
};
}}}

View File

@ -0,0 +1,189 @@
#pragma once
#include <iostream>
#include <set>
#include <hoytech/hex.h>
#include "negentropy/storage/btree/core.h"
#include "negentropy/storage/BTreeMem.h"
#include "negentropy/storage/BTreeLMDB.h"
namespace negentropy { namespace storage { namespace btree {
using err = std::runtime_error;
inline void dump(BTreeCore &btree, uint64_t nodeId, int depth) {
if (nodeId == 0) {
if (depth == 0) std::cout << "EMPTY TREE" << std::endl;
return;
}
auto nodePtr = btree.getNodeRead(nodeId);
auto &node = nodePtr.get();
std::string indent(depth * 4, ' ');
std::cout << indent << "NODE id=" << nodeId << " numItems=" << node.numItems << " accum=" << hoytech::to_hex(node.accum.sv()) << " accumCount=" << node.accumCount << std::endl;
for (size_t i = 0; i < node.numItems; i++) {
std::cout << indent << " item: " << node.items[i].item.timestamp << "," << hoytech::to_hex(node.items[i].item.getId()) << std::endl;
dump(btree, node.items[i].nodeId, depth + 1);
}
}
inline void dump(BTreeCore &btree) {
dump(btree, btree.getRootNodeId(), 0);
}
struct VerifyContext {
std::optional<uint64_t> leafDepth;
std::set<uint64_t> allNodeIds;
std::vector<uint64_t> leafNodeIds;
};
inline void verify(BTreeCore &btree, uint64_t nodeId, uint64_t depth, VerifyContext &ctx, Accumulator *accumOut = nullptr, uint64_t *accumCountOut = nullptr) {
if (nodeId == 0) return;
if (ctx.allNodeIds.contains(nodeId)) throw err("verify: saw node id again");
ctx.allNodeIds.insert(nodeId);
auto nodePtr = btree.getNodeRead(nodeId);
auto &node = nodePtr.get();
if (node.numItems == 0) throw err("verify: empty node");
if (node.nextSibling && node.numItems < MIN_ITEMS) throw err("verify: too few items in node");
if (node.numItems > MAX_ITEMS) throw err("verify: too many items");
if (node.items[0].nodeId == 0) {
if (ctx.leafDepth) {
if (*ctx.leafDepth != depth) throw err("verify: mismatch of leaf depth");
} else {
ctx.leafDepth = depth;
}
ctx.leafNodeIds.push_back(nodeId);
}
// FIXME: verify unused items are zeroed
Accumulator accum;
accum.setToZero();
uint64_t accumCount = 0;
for (size_t i = 0; i < node.numItems; i++) {
uint64_t childNodeId = node.items[i].nodeId;
if (childNodeId == 0) {
accum.add(node.items[i].item);
accumCount++;
} else {
{
auto firstChildPtr = btree.getNodeRead(childNodeId);
auto &firstChild = firstChildPtr.get();
if (firstChild.numItems == 0 || firstChild.items[0].item != node.items[i].item) throw err("verify: key does not match child's first key");
}
verify(btree, childNodeId, depth + 1, ctx, &accum, &accumCount);
}
if (i < node.numItems - 1) {
if (!(node.items[i].item < node.items[i + 1].item)) throw err("verify: items out of order");
}
}
for (size_t i = node.numItems; i < MAX_ITEMS + 1; i++) {
for (size_t j = 0; j < sizeof(Key); j++) if (((char*)&node.items[i])[j] != '\0') throw err("verify: memory not zeroed out");
}
if (accumCount != node.accumCount) throw err("verify: accumCount mismatch");
if (accum.sv() != node.accum.sv()) throw err("verify: accum mismatch");
if (accumOut) accumOut->add(accum);
if (accumCountOut) *accumCountOut += accumCount;
}
inline void verify(BTreeCore &btree, bool isLMDB) {
VerifyContext ctx;
Accumulator accum;
accum.setToZero();
uint64_t accumCount = 0;
verify(btree, btree.getRootNodeId(), 0, ctx, &accum, &accumCount);
if (ctx.leafNodeIds.size()) {
uint64_t i = 0, totalItems = 0;
auto nodePtr = btree.getNodeRead(ctx.leafNodeIds[0]);
std::optional<Item> prevItem;
uint64_t prevSibling = 0;
while (nodePtr.exists()) {
auto &node = nodePtr.get();
if (nodePtr.nodeId != ctx.leafNodeIds[i]) throw err("verify: leaf id mismatch");
if (prevSibling != node.prevSibling) throw err("verify: prevSibling mismatch");
prevSibling = nodePtr.nodeId;
nodePtr = btree.getNodeRead(node.nextSibling);
i++;
for (size_t j = 0; j < node.numItems; j++) {
if (prevItem && !(*prevItem < node.items[j].item)) throw err("verify: leaf item out of order");
prevItem = node.items[j].item;
totalItems++;
}
}
if (totalItems != accumCount) throw err("verify: leaf count mismatch");
}
// Check for leaks
if (isLMDB) {
static_assert(std::endian::native == std::endian::little); // FIXME
auto &btreeLMDB = dynamic_cast<BTreeLMDB&>(btree);
btreeLMDB.flush();
std::string_view key, val;
// Leaks
auto cursor = lmdb::cursor::open(btreeLMDB.txn, btreeLMDB.dbi);
if (cursor.get(key, val, MDB_FIRST)) {
do {
uint64_t nodeId = lmdb::from_sv<uint64_t>(key.substr(8));
if (nodeId != 0 && !ctx.allNodeIds.contains(nodeId)) throw err("verify: memory leak");
} while (cursor.get(key, val, MDB_NEXT));
}
// Dangling
for (const auto &k : ctx.allNodeIds) {
std::string tpKey;
tpKey += lmdb::to_sv(btreeLMDB.treeId);
tpKey += lmdb::to_sv(k);
if (!btreeLMDB.dbi.get(btreeLMDB.txn, tpKey, val)) throw err("verify: dangling node");
}
} else {
auto &btreeMem = dynamic_cast<BTreeMem&>(btree);
// Leaks
for (const auto &[k, v] : btreeMem._nodeStorageMap) {
if (!ctx.allNodeIds.contains(k)) throw err("verify: memory leak");
}
// Dangling
for (const auto &k : ctx.allNodeIds) {
if (!btreeMem._nodeStorageMap.contains(k)) throw err("verify: dangling node");
}
}
}
}}}

184
cpp/negentropy/types.h Normal file
View File

@ -0,0 +1,184 @@
// (C) 2023 Doug Hoyte. MIT license
#pragma once
#include <openssl/sha.h>
namespace negentropy {
using err = std::runtime_error;
const size_t ID_SIZE = 32;
const size_t FINGERPRINT_SIZE = 16;
enum class Mode {
Skip = 0,
Fingerprint = 1,
IdList = 2,
};
struct Item {
uint64_t timestamp;
uint8_t id[ID_SIZE];
explicit Item(uint64_t timestamp = 0) : timestamp(timestamp) {
memset(id, '\0', sizeof(id));
}
explicit Item(uint64_t timestamp, std::string_view id_) : timestamp(timestamp) {
if (id_.size() != sizeof(id)) throw negentropy::err("bad id size for Item");
memcpy(id, id_.data(), sizeof(id));
}
std::string_view getId() const {
return std::string_view(reinterpret_cast<const char*>(id), sizeof(id));
}
bool operator==(const Item &other) const {
return timestamp == other.timestamp && getId() == other.getId();
}
};
inline bool operator<(const Item &a, const Item &b) {
return a.timestamp != b.timestamp ? a.timestamp < b.timestamp : a.getId() < b.getId();
};
inline bool operator<=(const Item &a, const Item &b) {
return a.timestamp != b.timestamp ? a.timestamp <= b.timestamp : a.getId() <= b.getId();
};
struct Bound {
Item item;
size_t idLen;
explicit Bound(uint64_t timestamp = 0, std::string_view id = "") : item(timestamp), idLen(id.size()) {
if (idLen > ID_SIZE) throw negentropy::err("bad id size for Bound");
memcpy(item.id, id.data(), idLen);
}
explicit Bound(const Item &item_) : item(item_), idLen(ID_SIZE) {}
bool operator==(const Bound &other) const {
return item == other.item;
}
};
inline bool operator<(const Bound &a, const Bound &b) {
return a.item < b.item;
};
struct Fingerprint {
uint8_t buf[FINGERPRINT_SIZE];
std::string_view sv() const {
return std::string_view(reinterpret_cast<const char*>(buf), sizeof(buf));
}
};
struct Accumulator {
uint8_t buf[ID_SIZE];
void setToZero() {
memset(buf, '\0', sizeof(buf));
}
void add(const Item &item) {
add(item.id);
}
void add(const Accumulator &acc) {
add(acc.buf);
}
void add(const uint8_t *otherBuf) {
uint64_t currCarry = 0, nextCarry = 0;
uint64_t *p = reinterpret_cast<uint64_t*>(buf);
const uint64_t *po = reinterpret_cast<const uint64_t*>(otherBuf);
auto byteswap = [](uint64_t &n) {
uint8_t *first = reinterpret_cast<uint8_t*>(&n);
uint8_t *last = first + 8;
std::reverse(first, last);
};
for (size_t i = 0; i < 4; i++) {
uint64_t orig = p[i];
uint64_t otherV = po[i];
if constexpr (std::endian::native == std::endian::big) {
byteswap(orig);
byteswap(otherV);
} else {
static_assert(std::endian::native == std::endian::little);
}
uint64_t next = orig;
next += currCarry;
if (next < orig) nextCarry = 1;
next += otherV;
if (next < otherV) nextCarry = 1;
if constexpr (std::endian::native == std::endian::big) {
byteswap(next);
}
p[i] = next;
currCarry = nextCarry;
nextCarry = 0;
}
}
void negate() {
for (size_t i = 0; i < sizeof(buf); i++) {
buf[i] = ~buf[i];
}
Accumulator one;
one.setToZero();
one.buf[0] = 1;
add(one.buf);
}
void sub(const Item &item) {
sub(item.id);
}
void sub(const Accumulator &acc) {
sub(acc.buf);
}
void sub(const uint8_t *otherBuf) {
Accumulator neg;
memcpy(neg.buf, otherBuf, sizeof(buf));
neg.negate();
add(neg);
}
std::string_view sv() const {
return std::string_view(reinterpret_cast<const char*>(buf), sizeof(buf));
}
Fingerprint getFingerprint(uint64_t n) {
std::string input;
input += sv();
input += encodeVarInt(n);
unsigned char hash[SHA256_DIGEST_LENGTH];
SHA256(reinterpret_cast<unsigned char*>(input.data()), input.size(), hash);
Fingerprint out;
memcpy(out.buf, hash, FINGERPRINT_SIZE);
return out;
}
};
}

6
test/cpp/.gitignore vendored
View File

@ -1 +1,7 @@
/harness
/btreeFuzz
/measureSpaceUsage
/lmdbTest
/subRange
/testdb/

View File

@ -1,2 +1,30 @@
harness: harness.cpp ../../cpp/Negentropy.h
g++ -g -Wall -std=c++20 -I../../cpp/ -I ./hoytech-cpp/ harness.cpp -lcrypto -o harness
W = -Wall
OPT = -g -O2
STD = -std=c++20
CXXFLAGS = $(STD) $(OPT) $(W) -fPIC $(XCXXFLAGS)
INCS = -I../../cpp/ -I./hoytech-cpp/ -Ilmdbxx/include/
DEPS = ../../cpp/negentropy.h ../../cpp/negentropy/* ../../cpp/negentropy/storage/* ../../cpp/negentropy/storage/btree/*
harness: harness.cpp
$(CXX) $(W) $(OPT) $(STD) $(INCS) $< -lcrypto -o $@
btreeFuzz: btreeFuzz.cpp
$(CXX) $(W) $(OPT) $(STD) $(INCS) $< -lcrypto -llmdb -o $@
lmdbTest: lmdbTest.cpp
$(CXX) $(W) $(OPT) $(STD) $(INCS) $< -lcrypto -llmdb -o $@
measureSpaceUsage: measureSpaceUsage.cpp
$(CXX) -DNE_FUZZ_TEST $(W) $(OPT) $(STD) $(INCS) $< -lcrypto -llmdb -o $@
subRange: subRange.cpp
$(CXX) -DNE_FUZZ_TEST $(W) $(OPT) $(STD) $(INCS) $< -lcrypto -o $@
.PHONY: all clean
all: harness btreeFuzz lmdbTest measureSpaceUsage subRange
clean:
rm -f harness btreeFuzz lmdbTest measureSpaceUsage

149
test/cpp/btreeFuzz.cpp Normal file
View File

@ -0,0 +1,149 @@
#include <iostream>
#include <cstdlib>
#include <sstream>
#include <memory>
#include <set>
#include <hoytech/error.h>
#include <hoytech/hex.h>
#include "negentropy.h"
#include "negentropy/storage/BTreeLMDB.h"
#include "negentropy/storage/BTreeMem.h"
#include "negentropy/storage/btree/debug.h"
struct Verifier {
bool isLMDB;
std::set<uint64_t> addedTimestamps;
Verifier(bool isLMDB) : isLMDB(isLMDB) {}
void insert(negentropy::storage::btree::BTreeCore &btree, uint64_t timestamp){
negentropy::Item item(timestamp, std::string(32, (unsigned char)(timestamp % 256)));
btree.insertItem(item);
addedTimestamps.insert(timestamp);
doVerify(btree);
}
void erase(negentropy::storage::btree::BTreeCore &btree, uint64_t timestamp){
negentropy::Item item(timestamp, std::string(32, (unsigned char)(timestamp % 256)));
btree.eraseItem(item);
addedTimestamps.erase(timestamp);
doVerify(btree);
}
void doVerify(negentropy::storage::btree::BTreeCore &btree) {
try {
negentropy::storage::btree::verify(btree, isLMDB);
} catch (...) {
std::cout << "TREE FAILED INVARIANTS:" << std::endl;
negentropy::storage::btree::dump(btree);
throw;
}
if (btree.size() != addedTimestamps.size()) throw negentropy::err("verify size mismatch");
auto iter = addedTimestamps.begin();
btree.iterate(0, btree.size(), [&](const auto &item, size_t i) {
if (item.timestamp != *iter) throw negentropy::err("verify element mismatch");
iter = std::next(iter);
return true;
});
}
};
void doFuzz(negentropy::storage::btree::BTreeCore &btree, Verifier &v) {
if (btree.size() != 0) throw negentropy::err("expected empty tree");
// Verify return values
if (!btree.insert(100, std::string(32, '\x01'))) throw negentropy::err("didn't insert element?");
if (btree.insert(100, std::string(32, '\x01'))) throw negentropy::err("double inserted element?");
if (!btree.erase(100, std::string(32, '\x01'))) throw negentropy::err("didn't erase element?");
if (btree.erase(100, std::string(32, '\x01'))) throw negentropy::err("erased non-existing element?");
// Fuzz test: Insertion phase
while (btree.size() < 5000) {
if (rand() % 3 <= 1) {
int timestamp;
do {
timestamp = rand();
} while (v.addedTimestamps.contains(timestamp));
std::cout << "INSERT " << timestamp << " size = " << btree.size() << std::endl;
v.insert(btree, timestamp);
} else if (v.addedTimestamps.size()) {
auto it = v.addedTimestamps.begin();
std::advance(it, rand() % v.addedTimestamps.size());
std::cout << "DEL " << (*it) << std::endl;
v.erase(btree, *it);
}
}
// Fuzz test: Removal phase
std::cout << "REMOVING ALL" << std::endl;
while (btree.size()) {
auto it = v.addedTimestamps.begin();
std::advance(it, rand() % v.addedTimestamps.size());
auto timestamp = *it;
std::cout << "DEL " << timestamp << " size = " << btree.size() << std::endl;
v.erase(btree, *it);
}
}
int main() {
std::cout << "SIZEOF NODE: " << sizeof(negentropy::storage::Node) << std::endl;
srand(0);
if (::getenv("NE_FUZZ_LMDB")) {
system("mkdir -p testdb/");
system("rm -f testdb/*");
auto env = lmdb::env::create();
env.set_max_dbs(64);
env.set_mapsize(1'000'000'000ULL);
env.open("testdb/", 0);
auto txn = lmdb::txn::begin(env);
auto btreeDbi = negentropy::storage::BTreeLMDB::setupDB(txn, "test-data");
negentropy::storage::BTreeLMDB btree(txn, btreeDbi, 0);
Verifier v(true);
doFuzz(btree, v);
btree.flush();
txn.commit();
} else {
Verifier v(false);
negentropy::storage::BTreeMem btree;
doFuzz(btree, v);
}
std::cout << "OK" << std::endl;
return 0;
}

9
test/cpp/check.sh Executable file
View File

@ -0,0 +1,9 @@
#!/bin/sh
make clean
make -j all
./btreeFuzz
NE_FUZZ_LMDB=1 ./btreeFuzz
./lmdbTest
./subRange

View File

@ -1,10 +1,13 @@
#include <iostream>
#include <sstream>
#include <memory>
#include <hoytech/error.h>
#include <hoytech/hex.h>
#include "Negentropy.h"
#include "negentropy.h"
#include "negentropy/storage/BTreeMem.h"
#include "negentropy/storage/Vector.h"
@ -23,12 +26,11 @@ std::vector<std::string> split(const std::string &s, char delim) {
int main() {
const uint64_t idSize = 16;
uint64_t frameSizeLimit = 0;
if (::getenv("FRAMESIZELIMIT")) frameSizeLimit = std::stoull(::getenv("FRAMESIZELIMIT"));
Negentropy ne(idSize, frameSizeLimit);
negentropy::storage::Vector storage;
std::unique_ptr<Negentropy<negentropy::storage::Vector>> ne;
std::string line;
while (std::cin) {
@ -41,20 +43,21 @@ int main() {
if (items.size() != 3) throw hoytech::error("wrong num of fields");
uint64_t created = std::stoull(items[1]);
auto id = hoytech::from_hex(items[2]);
ne.addItem(created, id);
storage.insert(created, id);
} else if (items[0] == "seal") {
ne.seal();
storage.seal();
ne = std::make_unique<Negentropy<negentropy::storage::Vector>>(storage, frameSizeLimit);
} else if (items[0] == "initiate") {
auto q = ne.initiate();
if (frameSizeLimit && q.size() > frameSizeLimit) throw hoytech::error("frameSizeLimit exceeded");
auto q = ne->initiate();
if (frameSizeLimit && q.size() > frameSizeLimit) throw hoytech::error("initiate frameSizeLimit exceeded: ", q.size(), " > ", frameSizeLimit);
std::cout << "msg," << hoytech::to_hex(q) << std::endl;
} else if (items[0] == "msg") {
std::string q;
if (items.size() >= 2) q = hoytech::from_hex(items[1]);
if (ne.isInitiator) {
if (ne->isInitiator) {
std::vector<std::string> have, need;
auto resp = ne.reconcile(q, have, need);
auto resp = ne->reconcile(q, have, need);
for (auto &id : have) std::cout << "have," << hoytech::to_hex(id) << "\n";
for (auto &id : need) std::cout << "need," << hoytech::to_hex(id) << "\n";
@ -66,10 +69,10 @@ int main() {
q = *resp;
} else {
q = ne.reconcile(q);
q = ne->reconcile(q);
}
if (frameSizeLimit && q.size() > frameSizeLimit) throw hoytech::error("frameSizeLimit exceeded");
if (frameSizeLimit && q.size() > frameSizeLimit) throw hoytech::error("frameSizeLimit exceeded: ", q.size(), " > ", frameSizeLimit, ": from ", (ne->isInitiator ? "initiator" : "non-initiator"));
std::cout << "msg," << hoytech::to_hex(q) << std::endl;
} else {
throw hoytech::error("unknown cmd: ", items[0]);

157
test/cpp/lmdbTest.cpp Normal file
View File

@ -0,0 +1,157 @@
#include <iostream>
#include <cstdlib>
#include <sstream>
#include <memory>
#include <set>
#include <hoytech/error.h>
#include <hoytech/hex.h>
#include "negentropy.h"
#include "negentropy/storage/BTreeLMDB.h"
#include "negentropy/storage/BTreeMem.h"
#include "negentropy/storage/btree/debug.h"
#include "negentropy/storage/Vector.h"
int main() {
system("mkdir -p testdb/");
system("rm -f testdb/*");
auto env = lmdb::env::create();
env.set_max_dbs(64);
env.open("testdb/", 0);
lmdb::dbi btreeDbi;
{
auto txn = lmdb::txn::begin(env);
btreeDbi = negentropy::storage::BTreeLMDB::setupDB(txn, "test-data");
txn.commit();
}
negentropy::storage::Vector vec;
auto packId = [](uint64_t n){
auto o = std::string(32, '\0');
memcpy((char*)o.data(), (char*)&n, sizeof(n));
return o;
};
auto unpackId = [](std::string_view n){
if (n.size() != 32) throw hoytech::error("too short to unpack");
return *(uint64_t*)n.data();
};
{
auto txn = lmdb::txn::begin(env);
negentropy::storage::BTreeLMDB btree(txn, btreeDbi, 300);
auto add = [&](uint64_t timestamp){
negentropy::Item item(timestamp, packId(timestamp));
btree.insertItem(item);
vec.insertItem(item);
};
for (size_t i = 1000; i < 2000; i += 2) add(i);
btree.flush();
txn.commit();
}
vec.seal();
{
auto txn = lmdb::txn::begin(env, 0, MDB_RDONLY);
negentropy::storage::BTreeLMDB btree(txn, btreeDbi, 300);
//negentropy::storage::btree::dump(btree);
negentropy::storage::btree::verify(btree, true);
}
// Identical
{
auto txn = lmdb::txn::begin(env, 0, MDB_RDONLY);
negentropy::storage::BTreeLMDB btree(txn, btreeDbi, 300);
auto ne1 = Negentropy(vec);
auto ne2 = Negentropy(btree);
auto q = ne1.initiate();
std::string q2 = ne2.reconcile(q);
std::vector<std::string> have, need;
auto q3 = ne1.reconcile(q2, have, need);
if (q3 || have.size() || need.size()) throw hoytech::error("bad reconcile 1");
}
// Make some modifications
{
auto txn = lmdb::txn::begin(env);
negentropy::storage::BTreeLMDB btree(txn, btreeDbi, 300);
btree.erase(1044, packId(1044));
btree.erase(1838, packId(1838));
btree.insert(1555, packId(1555));
btree.insert(99999, packId(99999));
btree.flush();
txn.commit();
}
// Reconcile again
{
auto txn = lmdb::txn::begin(env, 0, MDB_RDONLY);
negentropy::storage::BTreeLMDB btree(txn, btreeDbi, 300);
auto ne1 = Negentropy(vec);
auto ne2 = Negentropy(btree);
std::vector<uint64_t> allHave, allNeed;
std::string msg = ne1.initiate();
while (true) {
std::string response = ne2.reconcile(msg);
std::vector<std::string> have, need;
auto newMsg = ne1.reconcile(response, have, need);
for (const auto &id : have) allHave.push_back(unpackId(id));
for (const auto &id : need) allNeed.push_back(unpackId(id));
if (!newMsg) break; // done
msg = *newMsg;
}
std::sort(allHave.begin(), allHave.end());
std::sort(allNeed.begin(), allNeed.end());
if (allHave != std::vector<uint64_t>({ 1044, 1838 })) throw hoytech::error("bad allHave");
if (allNeed != std::vector<uint64_t>({ 1555, 99999 })) throw hoytech::error("bad allNeed");
}
std::cout << "OK" << std::endl;
return 0;
}

1
test/cpp/lmdbxx Submodule

@ -0,0 +1 @@
Subproject commit d649a581d3cebfe7d8bd4d345bc2c1c4c2cc59a2

View File

@ -0,0 +1,78 @@
#include <iostream>
#include <cstdlib>
#include <sstream>
#include <memory>
#include <hoytech/error.h>
#include <hoytech/hex.h>
#include "negentropy.h"
#include "negentropy/storage/BTreeLMDB.h"
#include "negentropy/storage/BTreeMem.h"
#include "negentropy/storage/btree/debug.h"
#include "negentropy/storage/Vector.h"
int main() {
system("mkdir -p testdb/");
system("rm -f testdb/*");
auto env = lmdb::env::create();
env.set_max_dbs(64);
env.set_mapsize(1'000'000'000ULL);
env.open("testdb/", 0);
lmdb::dbi btreeDbi;
{
auto txn = lmdb::txn::begin(env);
btreeDbi = negentropy::storage::BTreeLMDB::setupDB(txn, "test-data");
txn.commit();
}
{
auto txn = lmdb::txn::begin(env);
negentropy::storage::BTreeLMDB btree(txn, btreeDbi, 300);
auto add = [&](uint64_t timestamp){
negentropy::Item item(timestamp, std::string(32, '\x01'));
btree.insertItem(item);
};
for (size_t i = 1; i < 100'000; i++) add(i);
btree.flush();
txn.commit();
}
{
auto txn = lmdb::txn::begin(env, 0, MDB_RDONLY);
negentropy::storage::BTreeLMDB btree(txn, btreeDbi, 300);
auto cursor = lmdb::cursor::open(txn, btreeDbi);
std::string_view key, val;
size_t minStart = negentropy::MAX_U64;
size_t maxEnd = 0;
if (cursor.get(key, val, MDB_FIRST)) {
do {
size_t ptrStart = (size_t)val.data();
size_t ptrEnd = ptrStart + sizeof(negentropy::storage::btree::Node);
if (ptrStart < minStart) minStart = ptrStart;
if (ptrEnd > maxEnd) maxEnd = ptrEnd;
} while (cursor.get(key, val, MDB_NEXT));
}
std::cout << "data," << negentropy::storage::btree::MAX_ITEMS << "," << sizeof(negentropy::storage::btree::Node) << "," << (maxEnd - minStart) << std::endl;
}
return 0;
}

View File

@ -0,0 +1,9 @@
system(qq{ perl -pi -e 's/MIN_ITEMS = \\d+/MIN_ITEMS = 2/' ../../cpp/negentropy/storage/btree/core.h });
system(qq{ perl -pi -e 's/REBALANCE_THRESHOLD = \\d+/REBALANCE_THRESHOLD = 4/' ../../cpp/negentropy/storage/btree/core.h });
system(qq{ perl -pi -e 's/MAX_ITEMS = \\d+/MAX_ITEMS = 6/' ../../cpp/negentropy/storage/btree/core.h });
for (my $i = 6; $i < 128; $i += 2) {
print "DOING ITER $i\n";
system(qq{ perl -pi -e 's/MAX_ITEMS = \\d+/MAX_ITEMS = $i/' ../../cpp/negentropy/storage/btree/core.h });
system("rm -f measureSpaceUsage && make measureSpaceUsage && rm -f testdb/data.mdb && ./measureSpaceUsage >> measureSpaceUsage.log");
}

165
test/cpp/subRange.cpp Normal file
View File

@ -0,0 +1,165 @@
#include <iostream>
#include <set>
#include <openssl/sha.h>
#include <hoytech/error.h>
#include <hoytech/hex.h>
#include "negentropy.h"
#include "negentropy/storage/Vector.h"
#include "negentropy/storage/BTreeMem.h"
#include "negentropy/storage/SubRange.h"
std::string sha256(std::string_view input) {
unsigned char hash[SHA256_DIGEST_LENGTH];
SHA256(reinterpret_cast<const unsigned char*>(input.data()), input.size(), hash);
return std::string((const char*)&hash[0], SHA256_DIGEST_LENGTH);
}
std::string uintToId(uint64_t id) {
return sha256(std::string((char*)&id, 8));
}
template<typename T>
void testSubRange() {
T vecBig;
T vecSmall;
for (size_t i = 0; i < 1000; i++) {
vecBig.insert(100 + i, uintToId(i));
}
for (size_t i = 400; i < 600; i++) {
vecSmall.insert(100 + i, uintToId(i));
}
vecBig.seal();
vecSmall.seal();
negentropy::storage::SubRange subRange(vecBig, negentropy::Bound(100 + 400), negentropy::Bound(100 + 600));
if (vecSmall.size() != subRange.size()) throw hoytech::error("size mismatch");
if (vecSmall.fingerprint(0, vecSmall.size()).sv() != subRange.fingerprint(0, subRange.size()).sv()) throw hoytech::error("fingerprint mismatch");
if (vecSmall.getItem(10) != subRange.getItem(10)) throw hoytech::error("getItem mismatch");
if (vecBig.getItem(400 + 10) != subRange.getItem(10)) throw hoytech::error("getItem mismatch");
{
auto lb = subRange.findLowerBound(0, subRange.size(), negentropy::Bound(550));
auto lb2 = vecSmall.findLowerBound(0, vecSmall.size(), negentropy::Bound(550));
if (lb != lb2) throw hoytech::error("findLowerBound mismatch");
}
{
auto lb = subRange.findLowerBound(0, subRange.size(), negentropy::Bound(20));
auto lb2 = vecSmall.findLowerBound(0, vecSmall.size(), negentropy::Bound(20));
if (lb != lb2) throw hoytech::error("findLowerBound mismatch");
}
{
auto lb = subRange.findLowerBound(0, subRange.size(), negentropy::Bound(5000));
auto lb2 = vecSmall.findLowerBound(0, vecSmall.size(), negentropy::Bound(5000));
if (lb != lb2) throw hoytech::error("findLowerBound mismatch");
}
}
template<typename T>
void testSync(bool emptySide1, bool emptySide2) {
T vecBig;
T vecSmall;
std::set<std::string> expectedHave;
std::set<std::string> expectedNeed;
size_t const lowerLimit = 20'000;
size_t const upperLimit = 90'000;
for (size_t i = lowerLimit; i < upperLimit; i++) {
auto id = uintToId(i);
if (emptySide1 || i % 15'000 == 0) {
if (i >= lowerLimit && i < upperLimit) expectedNeed.insert(id);
continue;
}
vecSmall.insert(100 + i, id);
}
for (size_t i = 0; i < 100'000; i++) {
auto id = uintToId(i);
if (emptySide2 || i % 22'000 == 0) {
if (i >= lowerLimit && i < upperLimit) expectedHave.insert(id);
continue;
}
vecBig.insert(100 + i, id);
}
// Get rid of common
std::set<std::string> commonItems;
for (const auto &item : expectedHave) {
if (expectedNeed.contains(item)) commonItems.insert(item);
}
for (const auto &item : commonItems) {
expectedHave.erase(item);
expectedNeed.erase(item);
}
vecBig.seal();
vecSmall.seal();
negentropy::storage::SubRange subRange(vecBig, negentropy::Bound(100 + lowerLimit), negentropy::Bound(100 + upperLimit));
auto ne1 = Negentropy(vecSmall, 20'000);
auto ne2 = Negentropy(subRange, 20'000);
std::string msg = ne1.initiate();
while (true) {
msg = ne2.reconcile(msg);
std::vector<std::string> have, need;
auto newMsg = ne1.reconcile(msg, have, need);
for (const auto &item : have) {
if (!expectedHave.contains(item)) throw hoytech::error("unexpected have: ", hoytech::to_hex(item));
expectedHave.erase(item);
}
for (const auto &item : need) {
if (!expectedNeed.contains(item)) throw hoytech::error("unexpected need: ", hoytech::to_hex(item));
expectedNeed.erase(item);
}
if (!newMsg) break;
else std::swap(msg, *newMsg);
}
if (expectedHave.size()) throw hoytech::error("missed have");
if (expectedNeed.size()) throw hoytech::error("missed need");
}
int main() {
testSubRange<negentropy::storage::Vector>();
testSubRange<negentropy::storage::BTreeMem>();
testSync<negentropy::storage::Vector>(false, false);
testSync<negentropy::storage::Vector>(true, false);
testSync<negentropy::storage::Vector>(false, true);
std::cout << "OK" << std::endl;
return 0;
}

View File

@ -12,9 +12,9 @@ use Utils;
die "usage: $0 <lang1> <lang2>" if @ARGV < 2;
my $harnessCmd1 = Utils::harnessTypeToCmd(shift) || die "please provide harness type (cpp, js, etc)";
my $harnessCmd2 = Utils::harnessTypeToCmd(shift) || die "please provide harness type (cpp, js, etc)";
my $idSize = shift || 16;
my $idSize = 32;
srand($ENV{SEED} || 0);
my $stgen = Session::Token->new(seed => "\x00" x 1024, alphabet => '0123456789abcdef', length => $idSize * 2);

View File

@ -11,20 +11,19 @@ use Utils;
die "usage: $0 <lang>" if @ARGV < 1;
my $harnessCmd = Utils::harnessTypeToCmd(shift) || die "please provide harness type (cpp, js, etc)";
my $idSize = shift || 16;
my $expectedResp;
## Get expected response using protocol version 0
## Get expected response using protocol version 1
{
my ($infile, $outfile);
my $pid = open2($outfile, $infile, $harnessCmd);
print $infile "item,12345,eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee\n";
print $infile "item,12345,eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee\n";
print $infile "seal\n";
print $infile "msg,6000000200\n"; ## full range bound, empty IdList
print $infile "msg,6100000200\n"; ## full range bound, empty IdList
my $resp = <$outfile>;
chomp $resp;
@ -32,31 +31,29 @@ my $expectedResp;
$expectedResp = $resp;
}
## Client tries to use some hypothetical newer version, but falls back to version 0
## Client tries to use some hypothetical newer version, but falls back to version 1
{
my ($infile, $outfile);
my $pid = open2($outfile, $infile, $harnessCmd);
print $infile "item,12345,eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee\n";
print $infile "item,12345,eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee\n";
print $infile "seal\n";
print $infile "msg,61aabbccddeeff\n"; ## some new protocol message
print $infile "msg,62aabbccddeeff\n"; ## some new protocol message
my $resp = <$outfile>;
chomp $resp;
## 61: The bound timestamp, as varint. The value is 0x60 (preferred protocol version), but 1 is added as per timestamp protocol
## 00: The following ID has length 0
## 04: UnsupportedProtocolVersion mode
die "bad upgrade response: $resp" unless $resp eq "msg,610004";
## 61: Preferred protocol version
die "bad upgrade response: $resp" unless $resp eq "msg,61";
## Try again with protocol version 0
print $infile "msg,6000000200\n"; ## full range bound, empty IdList
## Try again with protocol version 1
print $infile "msg,6100000200\n"; ## full range bound, empty IdList
$resp = <$outfile>;
chomp $resp;
die "didn't fall back to protocol version 0: $resp" unless $resp eq $expectedResp;
die "didn't fall back to protocol version 1: $resp" unless $resp eq $expectedResp;
}
print "OK\n";