initial commit

This commit is contained in:
Doug Hoyte 2023-03-10 12:19:11 -05:00
commit 4a9bd3b9be
12 changed files with 1379 additions and 0 deletions

3
.gitmodules vendored Normal file
View File

@ -0,0 +1,3 @@
[submodule "test/cpp/hoytech-cpp"]
path = test/cpp/hoytech-cpp
url = https://github.com/hoytech/hoytech-cpp.git

292
README.md Normal file
View File

@ -0,0 +1,292 @@
# negentropy
This repo contains the protocol specification, reference implementations, and tests for the negentropy set-reconcilliation protocol.
<!-- TOC FOLLOWS -->
<!-- START OF TOC -->
* [Introduction](#introduction)
* [Protocol](#protocol)
* [Data Requirements](#data-requirements)
* [Setup](#setup)
* [Alternating Messages](#alternating-messages)
* [Algorithm](#algorithm)
* [Definitions](#definitions)
* [Varint](#varint)
* [Bound](#bound)
* [Range](#range)
* [Message](#message)
* [Analysis](#analysis)
* [APIs](#apis)
* [C++](#c)
* [Javascript](#javascript)
* [Implementation Enhancements](#implementation-enhancements)
* [Deferred Range Processing](#deferred-range-processing)
* [Pre-computing](#pre-computing)
* [Copyright](#copyright)
<!-- END OF TOC -->
## Introduction
Set reconcilliation supports the replication or syncing of data-sets, either because they were created independently, or because they have drifted out of sync because of downtime, network partitions, misconfigurations, etc. In the latter case, detecting and fixing these inconsistencies is sometimes called [anti-entropy repair](https://docs.datastax.com/en/cassandra-oss/3.x/cassandra/operations/opsRepairNodesManualRepair.html).
Suppose two participants on a network each have a set of records that they have collected independently. Set-reconcilliation efficiently determines which records one side has that the other side doesn't, and vice versa. After the records that are missing have been determined, this information can be used to transfer the missing data items. The actual transfer is external to the negentropy protocol.
Although there are many ways to do set reconcilliation, negentropy is based on [Aljoscha Meyer's method](https://github.com/AljoschaMeyer/set-reconciliation), which has the advantage of being simple to explain and implement.
## Protocol
### Data Requirements
In order to use negentropy, you need to define some mappings from your data records:
* `record -> ID`
* Typically a cryptographic hash of the entire record
* The ID should be 32 bytes in length (although smaller IDs are supported too)
* Two different records should not have the same ID (satisfied by using a cryptographic hash)
* Two identical records should not have different IDs (records should be canonicalised prior to hashing, if necessary)
* `record -> timestamp`
* Although timestamp is the most obvious, any ordering criteria can be used. The protocol will be most efficient if records with similar timestamps are often downloaded/stored/generated together
* Timestamps do *not* need to be unique (different records can have the same timestamp)
* Units can be anything (seconds, microseconds, etc) as long as they fit in an 64-bit unsigned integer
* The largest 64-bit unsigned integer should be reserved as a special "infinity" value
Negentropy does not support the concept of updating or changing a record while preserving its ID. This should instead be modelled as deleting the old record and inserting a new one.
### Setup
The two parties engaged in the protocol are called the client and the server. The client is also called the *initiator*, because it creates and sends the first message in the protocol.
Each party should begin by sorting their records in ascending order by timestamp. If the timestamps are equivalent, records should be sorted lexically by their IDs. This sorted array and contiguous slices of it are called *ranges*. The *fingerprint* of a range is equal to the bitwise eXclusive OR (XOR) of the IDs of all contained records.
Because each side potentially has a different set of records, ranges cannot be referred to by their indices in one side's sorted array. Instead, they are specified by lower and upper *bounds*. A bound is a timestamp and a variable-length ID prefix. In order to reduce the sizes of reconcilliation messages, ID prefixes are as short as possible while still being able to separate records from their predecessors in the sorted array. If two adjacent records have different timestamps, then the prefix for a bound between them is empty.
Lower bounds are *inclusive* and upper bounds are *exclusive*, as is [typical in computer science](https://www.cs.utexas.edu/users/EWD/transcriptions/EWD08xx/EWD831.html). This means that given two adjacent ranges, the upper bound of the first is equal to the lower bound of the second. In order for a range to have full coverage over the universe of possible timestamps/IDs, the lower bound would have a 0 timestamp and all-0s ID, and the upper-bound would be the specially reserved "infinity" timestamp (max u64), and the ID doesn't matter.
When negotiating a reconcilliation, the client and server should decide on a special `idSize` value. This must be `<= 32`. Using values less than the full 32 bytes will save bandwidth, at the expense of making collisions more likely.
### Alternating Messages
After both sides have setup their sorted arrays, the client creates an initial message and sends it to the server. The server will then reply with another message, and the two parties continue exchanging messages until the protocol terminates (see below). After the protocol terminates, the client will have determined what IDs it has (and the server needs) and which it needs (and the server has).
Each message consists of an ordered sequence of ranges. Each range contains an upper bound, a mode, and a payload. The range's lower bound is the same as the previous range's upper bound (or 0, if none). The mode indicates what type of processing is needed for this range, and therefore how the payload should be parsed.
The modes supported are:
* `Skip`: No further processing is needed for this range. Payload is empty.
* `Fingerprint`: Payload contains the fingerprint for this range.
* `IdList`: Payload contains a complete list of IDs for this range.
* `IdListResponse`: Only allowed for server->client messages. Contains a list of IDs the server has and the client needs, as well as a bit-field that represents which IDs the server needs and the client has.
If a message does not end in a range with an "infinity" upper bound, an implicit range with upper bound of "infinity" and mode `Skip` is appended. This means that an empty message indicates that all ranges have been processed and the sender believes the protocol can now terminate.
### Algorithm
Upon receiving a message, the recipient should loop over the message's ranges in order, while concurrently constructing a new message. `Skip` ranges are answered with `Skip` ranges, and adjacent `Skip` ranges should be coalesced into a single `Skip` range.
`IdList` ranges represent a complete list of IDs held by the sender. Because the receiver obviously knows the items it has, this information is enough to fully reconcile the range. Therefore, when the client receives an `IdList` range, it should reply with a `Skip` range. However, since the goal of the protocol is to ensure the *client* has this information, when a server receives an `IdList` range it should reply with an `IdListResponse` range.
The `IdListResponse` range contains a list of the IDs the server has (and the client needs), but uses a packed bit-field representation to refer to the IDs the client has that the server needs. This avoids having to either a) transfer the complete set of its own IDs, or b) redundantly re-transfer IDs that were sent by the client.
`Fingerprint` ranges contain enough information to determine whether or not the data items within a range are equivalent on each side, however determining the actual difference in IDs requires further recursive processing.
* Because `IdList` and `IdListResponse` messages terminate processing for a given range, they are called *base case* messages.
* When the fingerprints on each side differ, the reciever should *split* its own range and send the results back in the next message. When splitting, the number of records within each sub-range should be considered. When small, an `IdList` range should be sent. When large, then the sub-range should itself be sent as a `Fingerprint` (this is the recursion).
* When a range is split, the sub-ranges should completely cover the original range's lower and upper bounds.
* How to split the range is implementation-defined. The simplest way is to divide the records that fall within the range into N equal-sized buckets, and emit an XOR-mode sub-range for each bucket. However, an implementation could choose different grouping criteria. For example, events with similar timestamps could be grouped into a single bucket. If the implementation believes the other side is less likely to have recent events, it could make the most recent bucket an `IdList`.
* Note that if alternate grouping strategies are used, an implementation should never reply to a range with a single `Fingerprint` range, otherwise the protocol may never terminate (if the other side does the same).
The initial message should cover the full universe, and therefore must have at least one range. The last range's upper bound should have the infinity timestamp (and the `id` doesn't matter, so should be empty also). How many ranges used in the initial message depends on the implementation. The most obvious implementation is to use the same logic as described above, either using the base case or splitting, depending on set size. However, an implementation may choose to use fewer or more buckets in its initial message, and/or may use different grouping strategies.
Once the client has looped over all ranges in a server's message and its constructed response message is a full-universe `Skip` range (ie, the empty string `""`), then it needs no more information from the server and therefore it should terminate the protocol.
## Definitions
### Varint
Varints (variable-sized integers) are a format for storing unsigned integers in a small number of bytes, commonly called BER (Binary Encoded Representation). They are stored as base 128 digits, most significant digit first, with as few digits as possible. Bit eight (the high bit) is set on each byte except the last.
Varint := <Digit+128>* <Digit>
### Bound
The protocol relies on bounds to group ranges of data items. Each range is specified by an *inclusive* lower bound, followed by an *exclusive* upper bound. As noted above, only upper bounds are transmitted (the lower bound of a range is the upper bound of the previous range, or 0 for the first range).
Each bound consists of an encoded timestamp and a variable-length disambiguating prefix of an event ID (in case multiple items have the same timestamp):
Bound := <encodedTimestamp (Varint)> <length (Varint)> <idPrefix (Byte)>*
* The timestamp is encoded specially. The "infinity timestamp" (such that all valid items precede it) is encoded as `0`. All other values are encoded as `1 + offset`, where offset is the difference between this timestamp and the previously encoded timestamp. The initial offset starts at `0` and resets at the beginning of each message.
Offsets are always non-negative since the upper bound's timestamp is always `>=` to the lower bound's timestamp, ranges in a message are always encoded in ascending order, and ranges never overlap.
* The `idPrefix` parameter's size is encoded in `length`, and can be between `0` and `idSize` bytes. Efficient implementations will use the shortest possible prefix needed to separate the first record of this range from the last record of the previous range. If these records' timestamps differ, then the length should be 0, otherwise it should be the byte-length of their common prefix plus 1.
If the `idPrefix` length is less than `idSize` then the omitted trailing bytes are filled with 0 bytes.
### Range
IDs are represented as byte-strings truncated to length `idSize`:
Id := Byte{idSize}
A range consists of an upper bound, a mode, and a payload (determined by mode):
Range := <upperBound (Bound)> <mode (Varint)> <Skip | Fingerprint | IdList | IdListResponse>
* If `mode = 0`, then payload is `Skip`, which is simply empty:
Skip :=
* If `mode = 1`, then payload is `Fingerprint`, the bitwise eXclusive OR of all the IDs in this range, truncated to `idSize`:
Fingerprint := <Id>
* If `mode = 2`, the payload is `IdList`, a variable-length list of all IDs within this range:
IdList := <length (Varint)> <ids (Id)>*
* If `mode = 3`, the payload is `IdListResponse`. This is only sent by the server in response to an `IdList` range. It contains an `IdList` containing IDs only the server-side has, and a bit-field where each bit (starting from the least-significant bit of first byte) indicates if the Nth client-side ID is needed by the server:
IdListResponse := <haveIds (IdList)> <bitFieldLength (Varint)> <bitField (Byte)>*
### Message
A reconcilliation message is just an ordered list of ranges:
Message := <Range>*
An empty message is an implicit `Skip` over the full universe of IDs, and represents that the protocol can terminate.
## Analysis
If you are searching for a single record in an ordered array, binary search allows you to find the record with a logarithmic number of operations. This is because each operation cuts the search space in half. So, searching a list of 1 million items will take about 20 operations:
log(1e6)/log(2) = 19.9316
Negentropy uses a similar principle. Each communication divides items into their own buckets and compares the fingerprints of the buckets. If we always split into 2 buckets, and there was exactly 1 difference, we would cut the search-space in half on each operation.
For effective performance, negentropy requires minimising the number of "round-trips" between the two sides. A sync that takes 20 back-and-forth communications to determine a single difference would take unacceptably long. Fortunately we can expend a small amount of extra bandwidth by splitting our ranges into more than 2 ranges. This has the effect of increasing the base of the logarithm. For example, if we split it into 16 pieces instead:
log(1e6)/log(16) = 4.98289
Additionally, each direction of the protocol's communication can result in a split, so since we are measuring round-trips, we divide this by two:
log(1e6)/log(16)/2 = 2.49145
This means that in general, three round-trip communications will be required to synchronise two sets of 1 million records that differ by 1 record. With an `idSize` of 16, each communication will consume `16*16 + overhead` -- roughly 300 bytes. So total bandwidth in one direction would be about 900 bytes and the other direction about 600 bytes.
What if they differ by multiple records? Because communication is batched, the splitting of multiple differing ranges can happen in parallel. So, the number of round-trips will not be affected (assuming that every message can be delivered in exactly one packet transmission, independent of size, which is of course not entirely true on real networks).
The amount of bandwidth consumed will grow linearly with the number of differences, but this would of course be true even assuming a perfect synchronisation method that had no overhead other than transmitting the differing records.
## APIs
### C++
The library is contained in a single-header with no non-standard dependencies:
#include "Negentropy.h"
First, create a `Negentropy` object. The `16` argument is `idSize`:
Negentropy ne(16);
Next, add all the items in your collection, and `seal()`:
for (const auto &item : myItems) {
ne.addItem(item.timestamp(), item.id());
}
ne.seal();
On the client-side, create an initial message, and then transmit it to the server, receive the response, and `reconcile` until complete:
std::string msg = ne.initiate();
while (msg.size() != 0) {
std::string response = queryServer(msg);
std::vector<std::string> have, need;
msg = ne.reconcile(response, have, need);
// handle have/need
}
In each loop iteration, `have` contains IDs that the client has that the server doesn't, and `need` contains IDs that the server has that the client doesn't.
The server-side is similar, except it doesn't create an initial message, and there are no `have`/`need` arrays:
while (1) {
std::string msg = receiveMsgFromClient();
std::string response = ne.reconcile(msg);
respondToClient(response);
}
### Javascript
The library is contained in a single javascript file. It shouldn't need any dependencies, in either a browser or node.js:
const Negentropy = require('Negentropy.js');
First, create a `Negentropy` object. The `16` argument is `idSize`:
let ne = new Negentropy(16);
Next, add all the items in your collection, and `seal()`:
for (let item of myItems) {
ne.addItem(item.timestamp(), item.id());
}
ne.seal();
On the client-side, create an initial message, and then transmit it to the server, receive the response, and `reconcile` until complete:
let msg = ne.initiate();
while (msg.length != 0) {
let response = queryServer(msg);
let [newMsg, have, need] = ne.reconcile(msg);
msg = newMsg;
// handle have/need
}
The server-side is similar, except it doesn't create an initial message, and there are no `have`/`need` arrays:
while (1) {
let msg = receiveMsgFromClient();
let [response] = ne.reconcile(msg);
respondToClient(response);
}
## Implementation Enhancements
### Deferred Range Processing
If there are too many differences and/or they are too randomly distributed throughout the range, then message sizes may become unmanageably large. This may be undesirable because of the memory required for buffering, and also because large batch sizes prevents work pipelining, where the synchronised records are processed while additional syncing is occurring.
Because of this, a client implementation may choose to defer the processing of ranges. Rather than transmit all the ranges it has found that need syncing, it can transmit a smaller number and keep the remaining for subsequent message rounds. This will decrease the message size at the expense of increasing the number of messaging round-trips.
A client could target fixed size messages, or could dynamically tune the message sizes based on its throughput metrics.
### Pre-computing
Instead of aggregating the data items for each query, servers and/or clients may choose to pre-compute fingerprints for their entire set of data items, or particular subsets. Most likely, fingerprints will be aggregated in a tree data-structure so it is efficient to add or remove items.
How or if this is implemented is independent of the protocol as described in this document.
## Copyright
(C) 2023 Doug Hoyte
Protocol specification, reference implementations, and tests are MIT licensed.

3
TODO Normal file
View File

@ -0,0 +1,3 @@
naming of methods: sealing, initial etc
make mode come first
speed up JS version (maybe use ArrayBuffer or something?)

381
cpp/Negentropy.h Normal file
View File

@ -0,0 +1,381 @@
// (C) 2023 Doug Hoyte. MIT license
#pragma once
#include <string.h>
#include <string>
#include <string_view>
#include <vector>
#include <unordered_map>
#include <limits>
#include <algorithm>
#include <stdexcept>
namespace negentropy {
const uint64_t MAX_U64 = std::numeric_limits<uint64_t>::max();
using err = std::runtime_error;
struct alignas(16) XorElem {
uint64_t timestamp;
uint64_t idSize;
char id[32];
XorElem() : timestamp(0), idSize(32) {
memset(id, '\0', sizeof(id));
}
XorElem(uint64_t timestamp, std::string_view id_) : timestamp(timestamp), idSize(id_.size()) {
if (idSize > 32) throw negentropy::err("id too big");
memset(id, '\0', sizeof(id));
memcpy(id, id_.data(), idSize);
}
std::string_view getId() const {
return std::string_view(id, idSize);
}
std::string_view getId(uint64_t subSize) const {
return getId().substr(0, subSize);
}
XorElem& operator^=(const XorElem &other) {
auto *p1 = static_cast<unsigned char *>(__builtin_assume_aligned(id, 16));
auto *p2 = static_cast<unsigned char *>(__builtin_assume_aligned(other.id, 16));
for (size_t i = 0; i < 32; i++) p1[i] ^= p2[i];
return *this;
}
bool operator==(const XorElem &other) const {
return getId() == other.getId(); // ignore timestamp
}
};
inline bool operator<(const XorElem &a, const XorElem &b) {
return a.timestamp != b.timestamp ? a.timestamp < b.timestamp : a.getId() < b.getId();
};
struct Negentropy {
uint64_t idSize;
std::vector<XorElem> items;
bool sealed = false;
bool isInitiator = false;
Negentropy(uint64_t idSize) : idSize(idSize) {
if (idSize < 8 || idSize > 32) throw negentropy::err("idSize invalid");
}
void addItem(uint64_t createdAt, std::string_view id) {
if (sealed) throw negentropy::err("already sealed");
items.emplace_back(createdAt, id);
}
void seal() {
if (sealed) throw negentropy::err("already sealed");
std::reverse(items.begin(), items.end()); // typically pushed in approximately descending order so this may speed up the sort
std::sort(items.begin(), items.end());
sealed = true;
}
std::string initiate() {
if (!sealed) throw negentropy::err("not sealed");
isInitiator = true;
std::string output;
uint64_t lastTimestampOut = 0;
splitRange(items.begin(), items.end(), XorElem(0, ""), XorElem(MAX_U64, ""), lastTimestampOut, output);
return output;
}
std::string reconcile(std::string_view query) {
if (isInitiator) throw negentropy::err("initiator not asking for have/need IDs");
std::vector<std::string> haveIds, needIds;
return reconcileAux(query, haveIds, needIds);
}
std::string reconcile(std::string_view query, std::vector<std::string> &haveIds, std::vector<std::string> &needIds) {
if (!isInitiator) throw negentropy::err("non-initiator asking for have/need IDs");
return reconcileAux(query, haveIds, needIds);
}
private:
std::string reconcileAux(std::string_view query, std::vector<std::string> &haveIds, std::vector<std::string> &needIds) {
if (!sealed) throw negentropy::err("not sealed");
std::string output;
auto prevBound = XorElem();
auto prevIndex = items.begin();
uint64_t lastTimestampIn = 0;
uint64_t lastTimestampOut = 0;
bool skip = false;
auto doSkip = [&]{
if (!skip) return;
skip = false;
output += encodeBound(prevBound, lastTimestampOut);
output += encodeVarInt(0); // mode = Skip
};
while (query.size()) {
auto currBound = decodeBound(query, lastTimestampIn);
auto mode = decodeVarInt(query); // 0 = Skip, 1 = Fingerprint, 2 = IdList, 3 = IdListResponse
auto lower = prevIndex;
auto upper = std::upper_bound(prevIndex, items.end(), currBound);
if (mode == 0) { // Skip
skip = true;
} else if (mode == 1) { // Fingerprint
XorElem theirXorSet(0, getBytes(query, idSize));
XorElem ourXorSet;
for (auto i = lower; i < upper; ++i) ourXorSet ^= *i;
if (theirXorSet.getId() != ourXorSet.getId(idSize)) {
doSkip();
splitRange(lower, upper, prevBound, currBound, lastTimestampOut, output);
} else {
skip = true;
}
} else if (mode == 2) { // IdList
auto numIds = decodeVarInt(query);
struct TheirElem {
uint64_t offset;
bool onBothSides;
};
std::unordered_map<std::string, TheirElem> theirElems;
for (uint64_t i = 0; i < numIds; i++) {
auto e = getBytes(query, idSize);
theirElems.emplace(e, TheirElem{i, false});
}
std::vector<std::string> responseHaveIds;
std::vector<uint64_t> responseNeedIndices;
for (auto it = lower; it < upper; ++it) {
auto e = theirElems.find(std::string(it->getId()));
if (e == theirElems.end()) {
// ID exists on our side, but not their side
if (isInitiator) haveIds.emplace_back(it->getId());
else responseHaveIds.emplace_back(it->getId());
} else {
// ID exists on both sides
e->second.onBothSides = true;
}
}
for (const auto &[k, v] : theirElems) {
if (!v.onBothSides) {
// ID exists on their side, but not our side
if (isInitiator) needIds.emplace_back(k);
else responseNeedIndices.emplace_back(v.offset);
}
}
if (!isInitiator) {
doSkip();
output += encodeBound(currBound, lastTimestampOut);
output += encodeVarInt(3); // mode = IdListResponse
output += encodeVarInt(responseHaveIds.size());
for (const auto &id : responseHaveIds) output += id;
auto bitField = encodeBitField(responseNeedIndices);
output += encodeVarInt(bitField.size());
output += bitField;
} else {
skip = true;
}
} else if (mode == 3) { // IdListResponse
if (!isInitiator) throw negentropy::err("unexpected IdListResponse");
skip = true;
auto numIds = decodeVarInt(query);
for (uint64_t i = 0; i < numIds; i++) {
needIds.emplace_back(getBytes(query, idSize));
}
auto bitFieldSize = decodeVarInt(query);
auto bitField = getBytes(query, bitFieldSize);
for (auto it = lower; it < upper; ++it) {
if (bitFieldLookup(bitField, it - lower)) haveIds.emplace_back(it->getId());
}
} else {
throw negentropy::err("unexpected mode");
}
prevIndex = upper;
prevBound = currBound;
}
return output;
}
void splitRange(std::vector<XorElem>::iterator lower, std::vector<XorElem>::iterator upper, const XorElem &lowerBound, const XorElem &upperBound, uint64_t &lastTimestampOut, std::string &output) {
uint64_t numElems = upper - lower;
const uint64_t buckets = 16;
if (numElems < buckets * 2) {
output += encodeBound(upperBound, lastTimestampOut);
output += encodeVarInt(2); // mode = IdList
output += encodeVarInt(numElems);
for (auto it = lower; it < upper; ++it) output += it->getId(idSize);
} else {
uint64_t itemsPerBucket = numElems / buckets;
uint64_t bucketsWithExtra = numElems % buckets;
auto curr = lower;
for (uint64_t i = 0; i < buckets; i++) {
XorElem ourXorSet;
for (auto bucketEnd = curr + itemsPerBucket + (i < bucketsWithExtra ? 1 : 0); curr != bucketEnd; curr++) {
ourXorSet ^= *curr;
}
if (i == buckets - 1) output += encodeBound(upperBound, lastTimestampOut);
else output += encodeMinimalBound(*curr, *std::prev(curr), lastTimestampOut);
output += encodeVarInt(1); // mode = Fingerprint
output += ourXorSet.getId(idSize);
}
}
}
// Decoding
std::string getBytes(std::string_view &encoded, size_t n) {
if (encoded.size() < n) throw negentropy::err("parse ends prematurely");
auto res = encoded.substr(0, n);
encoded = encoded.substr(n);
return std::string(res);
};
uint64_t decodeVarInt(std::string_view &encoded) {
uint64_t res = 0;
while (1) {
if (encoded.size() == 0) throw negentropy::err("premature end of varint");
uint64_t byte = encoded[0];
encoded = encoded.substr(1);
res = (res << 7) | (byte & 0b0111'1111);
if ((byte & 0b1000'0000) == 0) break;
}
return res;
}
uint64_t decodeTimestampIn(std::string_view &encoded, uint64_t &lastTimestampIn) {
uint64_t timestamp = decodeVarInt(encoded);
timestamp = timestamp == 0 ? MAX_U64 : timestamp - 1;
timestamp += lastTimestampIn;
if (timestamp < lastTimestampIn) timestamp = MAX_U64; // saturate
lastTimestampIn = timestamp;
return timestamp;
}
XorElem decodeBound(std::string_view &encoded, uint64_t &lastTimestampIn) {
auto timestamp = decodeTimestampIn(encoded, lastTimestampIn);
auto len = decodeVarInt(encoded);
return XorElem(timestamp, getBytes(encoded, len));
}
// Encoding
std::string encodeVarInt(uint64_t n) {
if (n == 0) return std::string(1, '\0');
std::string o;
while (n) {
o.push_back(static_cast<unsigned char>(n & 0x7F));
n >>= 7;
}
std::reverse(o.begin(), o.end());
for (size_t i = 0; i < o.size() - 1; i++) {
o[i] |= 0x80;
}
return o;
}
std::string encodeTimestampOut(uint64_t timestamp, uint64_t &lastTimestampOut) {
if (timestamp == MAX_U64) {
lastTimestampOut = MAX_U64;
return encodeVarInt(0);
}
uint64_t temp = timestamp;
timestamp -= lastTimestampOut;
lastTimestampOut = temp;
return encodeVarInt(timestamp + 1);
};
std::string encodeBound(const XorElem &bound, uint64_t &lastTimestampOut) {
std::string output;
output += encodeTimestampOut(bound.timestamp, lastTimestampOut);
output += encodeVarInt(bound.idSize);
output += bound.getId(idSize);
return output;
};
std::string encodeMinimalBound(const XorElem &curr, const XorElem &prev, uint64_t &lastTimestampOut) {
std::string output = encodeTimestampOut(curr.timestamp, lastTimestampOut);
if (curr.timestamp != prev.timestamp) {
output += encodeVarInt(0);
} else {
uint64_t sharedPrefixBytes = 0;
auto currKey = curr.getId();
auto prevKey = prev.getId();
for (uint64_t i = 0; i < idSize; i++) {
if (currKey[i] != prevKey[i]) break;
sharedPrefixBytes++;
}
output += encodeVarInt(sharedPrefixBytes + 1);
output += currKey.substr(0, sharedPrefixBytes + 1);
}
return output;
};
std::string encodeBitField(const std::vector<uint64_t> inds) {
if (inds.size() == 0) return "";
uint64_t max = *std::max_element(inds.begin(), inds.end());
std::string bitField = std::string((max + 8) / 8, '\0');
for (auto ind : inds) bitField[ind / 8] |= 1 << ind % 8;
return bitField;
}
bool bitFieldLookup(const std::string &bitField, uint64_t ind) {
if ((ind + 8) / 8 > bitField.size()) return false;
return !!(bitField[ind / 8] & 1 << (ind % 8));
}
};
}
using Negentropy = negentropy::Negentropy;

75
docs/generate-toc.pl Normal file
View File

@ -0,0 +1,75 @@
#!/usr/bin/env perl
use common::sense;
my $max_levels = 3;
my $lines = '';
my $headers = [];
my $seenHeaders = {};
{
open(my $fh, '<', 'README.md') || die "unable to open README.md: $!";
while (<$fh>) {
next if /^<!-- START OF TOC -->/ .. /^<!-- END OF TOC -->/;
$lines .= $_;
if (/^[#]+ (.*)/) {
my $whole = $&;
my $title = $1;
my $link = title2link($1);
die "duplicate header: $link" if $seenHeaders->{$link};
$seenHeaders->{$link}++;
push @$headers, $whole;
}
}
}
my $toc = '';
for my $header (@$headers) {
$header =~ /^(#+) (.*)/;
my $prefix = $1;
my $title = $2;
next if length($prefix) > $max_levels || length($prefix) == 1;
$prefix =~ s/^##//;
$prefix =~ s/^\s+//;
$prefix =~ s/#/ /g;
$prefix = "$prefix*";
my $link = title2link($title);
$toc .= "$prefix [$title](#$link)\n";
}
{
open(my $ofh, '>', 'README.md.tmp') || die "unable to open README.md: $!";
$lines =~ s{<!-- TOC FOLLOWS -->}{<!-- TOC FOLLOWS -->\n<!-- START OF TOC -->\n$toc<!-- END OF TOC -->};
print $ofh $lines;
}
while ($lines =~ m{\[.*?\][(]#(.*?)[)]}g) {
my $link = $1;
if (!$seenHeaders->{$link}) {
print STDERR "WARNING: Unresolved link: $link\n";
}
}
system("mv -f README.md.tmp README.md");
sub title2link {
my $title = shift;
my $link = lc $title;
$link =~ s/\s+/-/g;
$link =~ s/[+]//g;
return $link;
}

377
js/Negentropy.js Normal file
View File

@ -0,0 +1,377 @@
// (C) 2023 Doug Hoyte. MIT license
class Negentropy {
constructor(idSize) {
if (idSize < 8 || idSize > 32) throw Error("idSize invalid");
this.idSize = idSize;
this.items = [];
}
addItem(timestamp, id) {
if (this.sealed) throw Error("already sealed");
if (id.length > 64 || id.length % 2 !== 0) throw Error("bad length for id");
id = id.substr(0, this.idSize * 2);
this.items.push({ timestamp, id: fromHexString(id), idHex: id, });
}
seal() {
if (this.sealed) throw Error("already sealed");
this.items.sort(itemCompare);
this.sealed = true;
}
_newState() {
return {
lastTimestampIn: 0,
lastTimestampOut: 0,
};
}
_zeroBound() {
let id = new Array(this.idSize).fill(0);
return { timestamp: 0, id, idHex: toHexString(id), };
}
initiate() {
if (!this.sealed) throw Error("not sealed");
this.isInitiator = true;
let output = [];
let state = this._newState();
this.splitRange(0, this.items.length, this._zeroBound(), { timestamp: Number.MAX_VALUE, id: [], }, state, output);
return toHexString(output);
}
reconcile(query) {
if (!this.sealed) throw Error("not sealed");
query = fromHexString(query);
let haveIds = [], needIds = [];
let output = [];
let prevBound = this._zeroBound();
let prevIndex = 0;
let state = this._newState();
let skip = false;
let doSkip = () => {
if (!skip) return;
skip = false;
output.push(...this.encodeBound(prevBound, state));
output.push(...this.encodeVarInt(0)); // mode = Skip
};
while (query.length !== 0) {
let currBound = this.decodeBound(query, state);
let mode = this.decodeVarInt(query); // 0 = Skip, 1 = Fingerprint, 2 = IdList, 3 = IdListResponse
let lower = prevIndex;
let upper = findUpperBound(this.items, lower, this.items.length, currBound, itemCompare);
if (mode === 0) { // Skip
skip = true;
} else if (mode === 1) { // Fingerprint
let theirXorSet = this.getBytes(query, this.idSize);
let ourXorSet = new Array(this.idSize).fill(0);
for (let i = lower; i < upper; ++i) {
let item = this.items[i];
for (let j = 0; j < this.idSize; j++) ourXorSet[j] ^= item.id[j];
}
let matches = true;
for (let i = 0; i < this.idSize; i++) {
if (theirXorSet[i] !== ourXorSet[i]) {
matches = false;
break;
}
}
if (!matches) {
doSkip();
this.splitRange(lower, upper, prevBound, currBound, state, output);
} else {
skip = true;
}
} else if (mode === 2) { // IdList
let numElems = this.decodeVarInt(query);
let theirElems = {};
for (let i = 0; i < numElems; i++) {
let id = toHexString(this.getBytes(query, this.idSize));
theirElems[id] = { offset: i, onBothSides: false, };
}
let responseHaveIds = [];
let responseNeedIndices = [];
for (let i = lower; i < upper; i++) {
let id = this.items[i].idHex;
let e = theirElems[id];
if (e === undefined) {
// ID exists on our side, but not their side
if (this.isInitiator) haveIds.push(id);
else responseHaveIds.push(id);
} else {
// ID exists on both sides
theirElems[id].onBothSides = true;
}
}
for (let k of Object.keys(theirElems)) {
if (!theirElems[k].onBothSides) {
// ID exists on their side, but not our side
if (this.isInitiator) needIds.push(k);
else responseNeedIndices.push(theirElems[k].offset);
}
}
if (!this.isInitiator) {
doSkip();
output.push(...this.encodeBound(currBound, state));
output.push(...this.encodeVarInt(3)); // mode = IdListResponse
output.push(...this.encodeVarInt(responseHaveIds.length));
for (let id of responseHaveIds) output.push(...fromHexString(id));
let bitField = this.encodeBitField(responseNeedIndices);
output.push(...this.encodeVarInt(bitField.length));
output.push(...bitField);
} else {
skip = true;
}
} else if (mode === 3) { // IdListResponse
if (!this.isInitiator) throw Error("unexpected IdListResponse");
skip = true;
let numIds = this.decodeVarInt(query);
for (let i = 0; i < numIds; i++) {
needIds.push(toHexString(this.getBytes(query, this.idSize)));
}
let bitFieldSize = this.decodeVarInt(query);
let bitField = this.getBytes(query, bitFieldSize);
for (let i = lower; i < upper; i++) {
if (this.bitFieldLookup(bitField, i - lower)) haveIds.push(this.items[i].idHex);
}
} else {
throw Error("unexpected mode");
}
prevIndex = upper;
prevBound = currBound;
}
return [toHexString(output), haveIds, needIds];
}
splitRange(lower, upper, lowerBound, upperBound, state, output) {
let numElems = upper - lower;
let buckets = 16;
if (numElems < buckets * 2) {
output.push(...this.encodeBound(upperBound, state));
output.push(...this.encodeVarInt(2)); // mode = IdList
output.push(...this.encodeVarInt(numElems));
for (let it = lower; it < upper; ++it) output.push(...this.items[it].id);
} else {
let itemsPerBucket = Math.floor(numElems / buckets);
let bucketsWithExtra = numElems % buckets;
let curr = lower;
for (let i = 0; i < buckets; i++) {
let ourXorSet = new Array(this.idSize).fill(0);
for (let bucketEnd = curr + itemsPerBucket + (i < bucketsWithExtra ? 1 : 0); curr != bucketEnd; curr++) {
for (let j = 0; j < this.idSize; j++) ourXorSet[j] ^= this.items[curr].id[j];
}
if (i === buckets - 1) output.push(...this.encodeBound(upperBound, state));
else output.push(...this.encodeMinimalBound(this.items[curr], this.items[curr - 1], state));
output.push(...this.encodeVarInt(1)); // mode = Fingerprint
output.push(...ourXorSet);
}
}
}
// Decoding
getByte(buf) {
if (buf.length === 0) throw Error("parse ends prematurely");
return buf.shift();
}
getBytes(buf, n) {
if (buf.length < n) throw Error("parse ends prematurely");
return buf.splice(0, n);
}
decodeVarInt(buf) {
let res = 0;
while (1) {
let byte = this.getByte(buf);
res = (res << 7) | (byte & 127);
if ((byte & 128) === 0) break;
}
return res;
}
decodeTimestampIn(encoded, state) {
let timestamp = this.decodeVarInt(encoded);
timestamp = timestamp === 0 ? Number.MAX_VALUE : timestamp - 1;
if (state.lastTimestampIn === Number.MAX_VALUE || timestamp === Number.MAX_VALUE) {
state.lastTimestampIn = Number.MAX_VALUE;
return Number.MAX_VALUE;
}
timestamp += state.lastTimestampIn;
state.lastTimestampIn = timestamp;
return timestamp;
}
decodeBound(encoded, state) {
let timestamp = this.decodeTimestampIn(encoded, state);
let len = this.decodeVarInt(encoded);
if (len > this.idSize) throw herr("bound key too long");
let id = this.getBytes(encoded, len);
return { timestamp, id, idHex: toHexString(id), };
}
// Encoding
encodeVarInt(n) {
if (n === 0) return [0];
let o = [];
while (n !== 0) {
o.push(n & 0x7F);
n >>>= 7;
}
o.reverse();
for (let i = 0; i < o.length - 1; i++) o[i] |= 0x80;
return o;
}
encodeTimestampOut(timestamp, state) {
if (timestamp === Number.MAX_VALUE) {
state.lastTimestampOut = Number.MAX_VALUE;
return this.encodeVarInt(0);
}
let temp = timestamp;
timestamp -= state.lastTimestampOut;
state.lastTimestampOut = temp;
return this.encodeVarInt(timestamp + 1);
}
encodeBound(key, state) {
let output = [];
output.push(...this.encodeTimestampOut(key.timestamp, state));
output.push(...this.encodeVarInt(key.id.length));
output.push(...key.id);
return output;
}
encodeMinimalBound(curr, prev, state) {
let output = [];
output.push(...this.encodeTimestampOut(curr.timestamp, state));
if (curr.timestamp !== prev.timestamp) {
output.push(...this.encodeVarInt(0));
} else {
let sharedPrefixBytes = 0;
for (let i = 0; i < this.idSize; i++) {
if (curr.id[i] !== prev.id[i]) break;
sharedPrefixBytes++;
}
output.push(...this.encodeVarInt(sharedPrefixBytes + 1));
output.push(...curr.id.slice(0, sharedPrefixBytes + 1));
}
return output;
};
encodeBitField(inds) {
if (inds.length === 0) return [];
let max = Math.max(...inds);
let bitField = new Array(Math.floor((max + 8) / 8)).fill(0);
for (let ind of inds) bitField[Math.floor(ind / 8)] |= 1 << (ind % 8);
return bitField;
}
bitFieldLookup(bitField, ind) {
if (Math.floor((ind + 8) / 8) > bitField.length) return false;
return !!(bitField[Math.floor(ind / 8)] & 1 << (ind % 8));
}
}
function fromHexString(hexString) {
if ((hexString.length % 2) !== 0) throw Error("uneven length of hex string");
if (hexString.length === 0) return [];
return hexString.match(/../g).map((byte) => parseInt(byte, 16));
}
function toHexString(buf) {
return buf.reduce((str, byte) => str + byte.toString(16).padStart(2, '0'), '');
}
function itemCompare(a, b) {
if (a.timestamp === b.timestamp) {
if (a.idHex < b.idHex) return -1;
else if (a.idHex > b.idHex) return 1;
return 0;
}
return a.timestamp - b.timestamp;
}
function binarySearch(arr, first, last, cmp) {
let count = last - first;
while (count > 0) {
let it = first;
let step = Math.floor(count / 2);
it += step;
if (cmp(arr[it])) {
first = ++it;
count -= step + 1;
} else {
count = step;
}
}
return first;
}
function findLowerBound(arr, first, last, value, cmp) {
return binarySearch(arr, first, last, (a) => cmp(a, value) < 0);
}
function findUpperBound(arr, first, last, value, cmp) {
return binarySearch(arr, first, last, (a) => cmp(value, a) >= 0);
}
module.exports = Negentropy;

1
test/cpp/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/harness

2
test/cpp/Makefile Normal file
View File

@ -0,0 +1,2 @@
harness: harness.cpp ../../cpp/Negentropy.h
g++ -g -std=c++20 -I../../cpp/ -I ./hoytech-cpp/ harness.cpp -o harness

91
test/cpp/harness.cpp Normal file
View File

@ -0,0 +1,91 @@
#include <iostream>
#include <sstream>
#include <hoytech/error.h>
#include <hoytech/hex.h>
#include "Negentropy.h"
std::vector<std::string> split(const std::string &s, char delim) {
std::vector<std::string> result;
std::stringstream ss (s);
std::string item;
while (getline (ss, item, delim)) {
result.push_back (item);
}
return result;
}
int main() {
const uint64_t idSize = 16;
// x1 is client, x2 is server
Negentropy x1(idSize);
Negentropy x2(idSize);
std::string line;
while (std::cin) {
std::getline(std::cin, line);
if (!line.size()) continue;
auto items = split(line, ',');
if (items.size() != 3) throw hoytech::error("too few items");
int mode = std::stoi(items[0]);
uint64_t created = std::stoull(items[1]);
auto id = hoytech::from_hex(items[2]);
if (id.size() != idSize) throw hoytech::error("unexpected id size");
if (mode == 1) {
x1.addItem(created, id);
} else if (mode == 2) {
x2.addItem(created, id);
} else if (mode == 3) {
x1.addItem(created, id);
x2.addItem(created, id);
} else {
throw hoytech::error("unexpected mode");
}
}
x1.seal();
x2.seal();
std::string q;
uint64_t round = 0;
while (1) {
// CLIENT -> SERVER
if (round == 0) {
q = x1.initiate();
} else {
std::vector<std::string> have, need;
q = x1.reconcile(q, have, need);
for (auto &id : have) std::cout << "xor,HAVE," << hoytech::to_hex(id) << "\n";
for (auto &id : need) std::cout << "xor,NEED," << hoytech::to_hex(id) << "\n";
}
if (q.size() == 0) break;
std::cerr << "[" << round << "] CLIENT -> SERVER: " << q.size() << " bytes" << std::endl;
// SERVER -> CLIENT
q = x2.reconcile(q);
std::cerr << "[" << round << "] SERVER -> CLIENT: " << q.size() << " bytes" << std::endl;
round++;
}
return 0;
}

1
test/cpp/hoytech-cpp Submodule

@ -0,0 +1 @@
Subproject commit 4440cad898898b85de499dd5c49486b2b6d26d40

68
test/js/harness.js Normal file
View File

@ -0,0 +1,68 @@
const readline = require('readline');
const Negentropy = require('../../js/Negentropy.js');
const idSize = 16;
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
terminal: false
});
let n = 0;
let x1 = new Negentropy(idSize);
let x2 = new Negentropy(idSize);
rl.on('line', (line) => {
let items = line.split(',');
if (items.length !== 3) throw Error("too few items");
let mode = parseInt(items[0]);
let created = parseInt(items[1]);
let id = items[2];
if (id.length !== idSize*2) throw Error("unexpected id size");
if (mode === 1) {
x1.addItem(created, id);
} else if (mode === 2) {
x2.addItem(created, id);
} else if (mode === 3) {
x1.addItem(created, id);
x2.addItem(created, id);
} else {
throw Error("unexpected mode");
}
n++;
});
rl.once('close', () => {
x1.seal();
x2.seal();
let q;
let round = 0;
while (true) {
if (round === 0) {
q = x1.initiate();
} else {
let [newQ, haveIds, needIds] = x1.reconcile(q);
q = newQ;
for (let id of haveIds) console.log(`xor,HAVE,${id}`);
for (let id of needIds) console.log(`xor,NEED,${id}`);
}
if (q.length === 0) break;
console.error(`[${round}] CLIENT -> SERVER: ${q.length / 2} bytes`);
let [newQ, haveIds, needIds] = x2.reconcile(q);
q = newQ;
console.error(`[${round}] SERVER -> CLIENT: ${q.length / 2} bytes`);
round++;
}
});

85
test/test.pl Executable file
View File

@ -0,0 +1,85 @@
#!/usr/bin/env perl
use IPC::Open2;
use Session::Token;
my $harnessType = shift || die "please provide harness type (cpp, js, etc)";
my $idSize = shift || 16;
my $harnessCmd;
if ($harnessType eq 'cpp') {
$harnessCmd = './cpp/harness';
} elsif ($harnessType eq 'js') {
$harnessCmd = 'node js/harness.js';
} else {
die "unknown harness type: $harnessType";
}
srand($ENV{SEED} || 0);
my $stgen = Session::Token->new(seed => "\x00" x 1024, alphabet => '0123456789abcdef', length => $idSize * 2);
while(1) {
my $ids1 = {};
my $ids2 = {};
my $pid = open2(my $outfile, my $infile, $harnessCmd);
my $num = rnd(10000) + 1;
for (1..$num) {
my $mode;
if (rand() < .01) {
$mode = rnd(2) + 1;
} else {
$mode = 3;
}
my $created = 1677970534 + rnd($num);
my $id = $stgen->get;
$ids1->{$id} = 1 if $mode == 1 || $mode == 3;
$ids2->{$id} = 1 if $mode == 2 || $mode == 3;
print $infile "$mode,$created,$id\n";
}
close($infile);
while (<$outfile>) {
if (/^xor,(HAVE|NEED),(\w+)/) {
my ($action, $id) = ($1, $2);
if ($action eq 'NEED') {
die "duplicate insert of $action,$id" if $ids1->{$id};
$ids1->{$id} = 1;
} elsif ($action eq 'HAVE') {
die "duplicate insert of $action,$id" if $ids2->{$id};
$ids2->{$id} = 1;
}
}
}
waitpid($pid, 0);
my $child_exit_status = $?;
die "failure running test harness" if $child_exit_status;
for my $id (keys %$ids1) {
die "$id not in ids2" if !$ids2->{$id};
}
for my $id (keys %$ids2) {
die "$id not in ids1" if !$ids1->{$id};
}
print "\n-----------OK-----------\n";
}
sub rnd {
my $n = shift;
return int(rand() * $n);
}