feat: add nim-bloom, many more updates on reliability API

split to common and utils, logging, error handling, thread safety
This commit is contained in:
shash256 2024-10-21 16:55:07 +05:30
parent 297516995c
commit 62821077bc
17 changed files with 1288 additions and 319 deletions

BIN
nim-bloom/.DS_Store vendored Normal file

Binary file not shown.

58
nim-bloom/.github/workflows/main.yml vendored Normal file
View File

@ -0,0 +1,58 @@
name: website
on: [push] # debugging only
#on:
# push:
# tags:
# - 'v*.*.*'
jobs:
publish:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v1
- name: Set output
id: vars
run: echo ::set-output name=tag::${GITHUB_REF:10}
- name: Cache choosenim
id: cache-choosenim
uses: actions/cache@v1
with:
path: ~/.choosenim
key: ${{ runner.os }}-choosenim-stable
- name: Cache nimble
id: cache-nimble
uses: actions/cache@v1
with:
path: ~/.nimble
key: ${{ runner.os }}-nimble-stable
- uses: jiro4989/setup-nim-action@v1.0.2
with:
nim-version: 'stable'
- name: Build and test
env:
RELEASE_VERSION: ${{ steps.vars.outputs.tag }}
run: |
nimble test -Y
- name: Build doc
env:
RELEASE_VERSION: ${{ steps.vars.outputs.tag }}
run: |
# Due to bug https://github.com/nim-lang/Nim/issues/14281, compile the documentation separately.
nimble doc --git.url:https://github.com/$GITHUB_REPOSITORY --git.commit:$RELEASE_VERSION bloom.nim
nimble doc --git.url:https://github.com/$GITHUB_REPOSITORY --git.commit:$RELEASE_VERSION private/probabilities.nim
find .
mkdir -p ./public
mv bloom.html probabilities.html nimdoc.out.css ./public/
cd ./public/
ln -s ./bloom.html index.html
cd ../
- name: Deploy
if: success()
uses: crazy-max/ghaction-github-pages@v1.3.0
with:
target_branch: gh-pages
build_dir: ./public
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

6
nim-bloom/.gitignore vendored Normal file
View File

@ -0,0 +1,6 @@
nimcache
nimcache/*
tests/test
bloom
*.html
*.css

20
nim-bloom/LICENSE Normal file
View File

@ -0,0 +1,20 @@
The MIT License (MIT)
Copyright (c) 2013 Nick Greenfield
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

41
nim-bloom/README.md Normal file
View File

@ -0,0 +1,41 @@
nim-bloom
============
Bloom filter implementation in Nim. Uses a C implementation of MurmurHash3 for optimal speed and numeric distribution.
On a 10 year old Macbook Pro Retina the test case for 10M insertions executes in ~4.0 seconds and 10M lookups in ~3.5 seconds for a Bloom filter with a 1 in 1000 error rate (0.001). This is ~2.5M insertions/sec and ~2.9M lookups/sec on a single thread (but passing the `-d:release` flag to the Nim compiler and thus activating the C compiler's optimizations). If k is lowered to 5 or 6 vs. a larger "optimal" number, performance further increases to ~4M ops/sec. Note that this test is for a Bloom filter ~20-25MB in size and thus accurately reflects the cost of main memory accesses (vs. a smaller filter that might fit solely in L3 cache, for example, and can achieve several million additional ops/sec).
Currently supports inserting and looking up string elements. Forthcoming features include:
* Support for other types beyond strings
* Support for iterables in the insert method
* Persistence
quickstart
====
Quick functionality demo:
```
import bloom
var bf = initializeBloomFilter(capacity = 10000, errorRate = 0.001)
echo bf # Get characteristics of the Bloom filter
echo bf.lookup("An element not in the Bloom filter") # Prints 'false'
bf.insert("Here we go...")
assert(bf.lookup("Here we go..."))
```
By default, the Bloom filter will use a mathematically optimal number of k hash functions, which minimizes the amount of error per bit of storage required. In many cases, however, it may be advantageous to specify a smaller value of k in order to save time hashing. This is supported by passing an explicit `k` parameter, which will then either create an optimal Bloom filter for the specified error rate.[1]
[1] If `k` <= 12 and the number of required bytes per element is <= 4. If either of these conditions doesn't hold, a fully manual Bloom filter can be constructed by passing both `k` and `force_n_bits_per_elem`.
Example:
```
var bf2 = initializeBloomFilter(capacity = 10000, errorRate = 0.001, k = 5)
assert bf2.kHashes == 5
assert bf2.nBitsPerElem == 18
var bf3 = initializeBloomFilter(capacity = 10000, errorRate = 0.001, k = 5, forceNBitsPerElem = 12)
assert bf3.kHashes == 5
assert bf3.nBitsPerElem == 12 # But note, however, that bf.errorRate will *not* be correct
```

9
nim-bloom/bloom.nimble Normal file
View File

@ -0,0 +1,9 @@
# Package
version = "0.1.0"
author = "Boyd Greenfield"
description = "Efficient Bloom filter implementation for Nim using MurmurHash3."
license = "MIT"
srcDir = "src"
# Dependencies
requires "nim >= 1.0.0"

244
nim-bloom/src/bloom.nim Normal file
View File

@ -0,0 +1,244 @@
from math import ceil, ln, pow, round
import hashes
import strutils
import private/probabilities
# Import MurmurHash3 code and compile at the same time as Nim code
{.compile: "murmur3.c".}
type
BloomFilterError = object of CatchableError
MurmurHashes = array[0..1, int]
BloomFilter* = object
capacity*: int
errorRate*: float
kHashes*: int
mBits*: int
intArray: seq[int]
nBitsPerElem*: int
useMurmurHash*: bool
proc rawMurmurHash(key: cstring, len: int, seed: uint32,
outHashes: var MurmurHashes): void {.
importc: "MurmurHash3_x64_128".}
proc murmurHash(key: string, seed = 0'u32): MurmurHashes =
rawMurmurHash(key, key.len, seed, outHashes = result)
proc hashA(item: string, maxValue: int): int =
hash(item) mod maxValue
proc hashB(item: string, maxValue: int): int =
hash(item & " b") mod maxValue
proc hashN(item: string, n: int, maxValue: int): int =
## Get the nth hash of a string using the formula hashA + n * hashB
## which uses 2 hash functions vs. k and has comparable properties
## See Kirsch and Mitzenmacher, 2008:
## http://www.eecs.harvard.edu/~kirsch/pubs/bbbf/rsa.pdf
abs((hashA(item, maxValue) + n * hashB(item, maxValue))) mod maxValue
proc getMOverNBitsForK(k: int, targetError: float,
probabilityTable = kErrors): int =
## Returns the optimal number of m/n bits for a given k.
if k notin 0..12:
raise newException(BloomFilterError,
"K must be <= 12 if forceNBitsPerElem is not also specified.")
for mOverN in 2..probabilityTable[k].high:
if probabilityTable[k][mOverN] < targetError:
return mOverN
raise newException(BloomFilterError,
"Specified value of k and error rate for which is not achievable using less than 4 bytes / element.")
proc initializeBloomFilter*(capacity: int, errorRate: float, k = 0,
forceNBitsPerElem = 0,
useMurmurHash = true): BloomFilter =
## Initializes a Bloom filter, using a specified ``capacity``,
## ``errorRate``, and optionally specific number of k hash functions.
## If ``kHashes`` is < 1 (default argument is 0), ``kHashes`` will be
## optimally calculated on the fly. Otherwise, ``kHashes`` will be set to
## the passed integer, which requires that ``forceNBitsPerElem`` is
## also set to be greater than 0. Otherwise a ``BloomFilterError``
## exception is raised.
## See http://pages.cs.wisc.edu/~cao/papers/summary-cache/node8.html for
## useful tables on k and m/n (n bits per element) combinations.
##
## The Bloom filter uses the MurmurHash3 implementation by default,
## though it can fall back to using the built-in nim ``hash`` function
## if ``useMurmurHash = false``. This is compiled alongside the Nim
## code using the ``{.compile.}`` pragma.
var
kHashes: int
bitsPerElem: float
nBitsPerElem: int
if k < 1: # Calculate optimal k and use that
bitsPerElem = ceil(-1.0 * (ln(errorRate) / (pow(ln(2.float), 2))))
kHashes = round(ln(2.float) * bitsPerElem).int
nBitsPerElem = round(bitsPerElem).int
else: # Use specified k if possible
if forceNBitsPerElem < 1: # Use lookup table
nBitsPerElem = getMOverNBitsForK(k = k, targetError = errorRate)
else:
nBitsPerElem = forceNBitsPerElem
kHashes = k
let
mBits = capacity * nBitsPerElem
mInts = 1 + mBits div (sizeof(int) * 8)
BloomFilter(capacity: capacity, errorRate: errorRate, kHashes: kHashes,
mBits: mBits, intArray: newSeq[int](mInts), nBitsPerElem: nBitsPerElem,
useMurmurHash: useMurmurHash)
proc `$`*(bf: BloomFilter): string =
## Prints the capacity, set error rate, number of k hash functions,
## and total bits of memory allocated by the Bloom filter.
"Bloom filter with $1 capacity, $2 error rate, $3 hash functions, and requiring $4 bits per stored element." %
[$bf.capacity,
formatFloat(bf.errorRate, format = ffScientific, precision = 1),
$bf.kHashes, $bf.nBitsPerElem]
{.push overflowChecks: off.}
proc hashMurmur(bf: BloomFilter, key: string): seq[int] =
result.newSeq(bf.kHashes)
let murmurHashes = murmurHash(key, seed = 0'u32)
for i in 0..<bf.kHashes:
result[i] = abs(murmurHashes[0] + i * murmurHashes[1]) mod bf.mBits
{.pop.}
proc hashNim(bf: BloomFilter, key: string): seq[int] =
result.newSeq(bf.kHashes)
for i in 0..<bf.kHashes:
result[i] = hashN(key, i, bf.mBits)
proc hash(bf: BloomFilter, key: string): seq[int] =
if bf.useMurmurHash:
bf.hashMurmur(key)
else:
bf.hashNim(key)
proc insert*(bf: var BloomFilter, item: string) =
## Insert an item (string) into the Bloom filter.
var hashSet = bf.hash(item)
for h in hashSet:
let
intAddress = h div (sizeof(int) * 8)
bitOffset = h mod (sizeof(int) * 8)
bf.intArray[intAddress] = bf.intArray[intAddress] or (1 shl bitOffset)
proc lookup*(bf: BloomFilter, item: string): bool =
## Lookup an item (string) into the Bloom filter.
## If the item is present, ``lookup`` is guaranteed to return ``true``.
## If the item is not present, ``lookup`` will return ``false``
## with a probability 1 - ``bf.errorRate``.
var hashSet = bf.hash(item)
for h in hashSet:
let
intAddress = h div (sizeof(int) * 8)
bitOffset = h mod (sizeof(int) * 8)
currentInt = bf.intArray[intAddress]
if currentInt != (currentInt or (1 shl bitOffset)):
return false
return true
when isMainModule:
from random import rand, randomize
import times
# Test murmurhash 3
echo("Testing MurmurHash3 code...")
var hashOutputs: MurmurHashes
hashOutputs = [0, 0]
rawMurmurHash("hello", 5, 0, hashOutputs)
assert int(hashOutputs[0]) == -3758069500696749310 # Correct murmur outputs (cast to int64)
assert int(hashOutputs[1]) == 6565844092913065241
let hashOutputs2 = murmurHash("hello", 0)
assert hashOutputs2[0] == hashOutputs[0]
assert hashOutputs2[1] == hashOutputs[1]
let hashOutputs3 = murmurHash("hello", 10)
assert hashOutputs3[0] != hashOutputs[0]
assert hashOutputs3[1] != hashOutputs[1]
# Some quick and dirty tests (not complete)
var nElementsToTest = 100000
var bf = initializeBloomFilter(nElementsToTest, 0.001)
assert(bf of BloomFilter)
echo(bf)
var bf2 = initializeBloomFilter(10000, 0.001, k = 4,
forceNBitsPerElem = 20)
assert(bf2 of BloomFilter)
echo(bf2)
echo("Testing insertions and lookups...")
echo("Test element in BF2?: ", bf2.lookup("testing"))
echo("Inserting element.")
bf2.insert("testing")
echo("Test element in BF2?: ", bf2.lookup("testing"))
assert(bf2.lookup("testing"))
# Now test for speed with bf
randomize(2882) # Seed the RNG
var
sampleChars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
kTestElements, sampleLetters: seq[string]
kTestElements = newSeq[string](nElementsToTest)
sampleLetters = newSeq[string](62)
for i in 0..(nElementsToTest - 1):
var newString = ""
for j in 0..7:
newString.add(sampleChars[rand(51)])
kTestElements[i] = newString
var startTime, endTime: float
startTime = cpuTime()
for i in 0..(nElementsToTest - 1):
bf.insert(kTestElements[i])
endTime = cpuTime()
echo("Took ", formatFloat(endTime - startTime, format = ffDecimal,
precision = 4), " seconds to insert ", nElementsToTest, " items.")
var falsePositives = 0
for i in 0..(nElementsToTest - 1):
var falsePositiveString = ""
for j in 0..8: # By definition not in bf as 9 chars not 8
falsePositiveString.add(sampleChars[rand(51)])
if bf.lookup(falsePositiveString):
falsePositives += 1
echo("N false positives (of ", nElementsToTest, " lookups): ", falsePositives)
echo("False positive rate ", formatFloat(falsePositives / nElementsToTest,
format = ffDecimal, precision = 4))
var lookupErrors = 0
startTime = cpuTime()
for i in 0..(nElementsToTest - 1):
if not bf.lookup(kTestElements[i]):
lookupErrors += 1
endTime = cpuTime()
echo("Took ", formatFloat(endTime - startTime, format = ffDecimal,
precision = 4), " seconds to lookup ", nElementsToTest, " items.")
echo("N lookup errors (should be 0): ", lookupErrors)
# Finally test correct k / mOverN specification,
# first case raises an error, second works
try:
discard getMOverNBitsForK(k = 2, targetError = 0.00001)
assert false
except BloomFilterError:
assert true
assert getMOverNBitsForK(k = 2, targetError = 0.1) == 6
assert getMOverNBitsForK(k = 7, targetError = 0.01) == 10
assert getMOverNBitsForK(k = 7, targetError = 0.001) == 16
var bf3 = initializeBloomFilter(1000, 0.01, k = 4)
assert bf3.nBitsPerElem == 11

314
nim-bloom/src/murmur3.c Normal file
View File

@ -0,0 +1,314 @@
//-----------------------------------------------------------------------------
// MurmurHash3 was written by Austin Appleby, and is placed in the public
// domain. The author hereby disclaims copyright to this source code.
// Note - The x86 and x64 versions do _not_ produce the same results, as the
// algorithms are optimized for their respective platforms. You can still
// compile and run any of them on any platform, but your performance with the
// non-native version will be less than optimal.
#include "murmur3.h"
//-----------------------------------------------------------------------------
// Platform-specific functions and macros
#ifdef __GNUC__
#define FORCE_INLINE __attribute__((always_inline)) inline
#else
#define FORCE_INLINE
#endif
static inline FORCE_INLINE uint32_t rotl32 ( uint32_t x, int8_t r )
{
return (x << r) | (x >> (32 - r));
}
static inline FORCE_INLINE uint64_t rotl64 ( uint64_t x, int8_t r )
{
return (x << r) | (x >> (64 - r));
}
#define ROTL32(x,y) rotl32(x,y)
#define ROTL64(x,y) rotl64(x,y)
#define BIG_CONSTANT(x) (x##LLU)
//-----------------------------------------------------------------------------
// Block read - if your platform needs to do endian-swapping or can only
// handle aligned reads, do the conversion here
#define getblock(p, i) (p[i])
//-----------------------------------------------------------------------------
// Finalization mix - force all bits of a hash block to avalanche
static inline FORCE_INLINE uint32_t fmix32 ( uint32_t h )
{
h ^= h >> 16;
h *= 0x85ebca6b;
h ^= h >> 13;
h *= 0xc2b2ae35;
h ^= h >> 16;
return h;
}
//----------
static inline FORCE_INLINE uint64_t fmix64 ( uint64_t k )
{
k ^= k >> 33;
k *= BIG_CONSTANT(0xff51afd7ed558ccd);
k ^= k >> 33;
k *= BIG_CONSTANT(0xc4ceb9fe1a85ec53);
k ^= k >> 33;
return k;
}
//-----------------------------------------------------------------------------
void MurmurHash3_x86_32 ( const void * key, int len,
uint32_t seed, void * out )
{
const uint8_t * data = (const uint8_t*)key;
const int nblocks = len / 4;
int i;
uint32_t h1 = seed;
uint32_t c1 = 0xcc9e2d51;
uint32_t c2 = 0x1b873593;
//----------
// body
const uint32_t * blocks = (const uint32_t *)(data + nblocks*4);
for(i = -nblocks; i; i++)
{
uint32_t k1 = getblock(blocks,i);
k1 *= c1;
k1 = ROTL32(k1,15);
k1 *= c2;
h1 ^= k1;
h1 = ROTL32(h1,13);
h1 = h1*5+0xe6546b64;
}
//----------
// tail
const uint8_t * tail = (const uint8_t*)(data + nblocks*4);
uint32_t k1 = 0;
switch(len & 3)
{
case 3: k1 ^= tail[2] << 16;
case 2: k1 ^= tail[1] << 8;
case 1: k1 ^= tail[0];
k1 *= c1; k1 = ROTL32(k1,15); k1 *= c2; h1 ^= k1;
};
//----------
// finalization
h1 ^= len;
h1 = fmix32(h1);
*(uint32_t*)out = h1;
}
//-----------------------------------------------------------------------------
void MurmurHash3_x86_128 ( const void * key, const int len,
uint32_t seed, void * out )
{
const uint8_t * data = (const uint8_t*)key;
const int nblocks = len / 16;
int i;
uint32_t h1 = seed;
uint32_t h2 = seed;
uint32_t h3 = seed;
uint32_t h4 = seed;
uint32_t c1 = 0x239b961b;
uint32_t c2 = 0xab0e9789;
uint32_t c3 = 0x38b34ae5;
uint32_t c4 = 0xa1e38b93;
//----------
// body
const uint32_t * blocks = (const uint32_t *)(data + nblocks*16);
for(i = -nblocks; i; i++)
{
uint32_t k1 = getblock(blocks,i*4+0);
uint32_t k2 = getblock(blocks,i*4+1);
uint32_t k3 = getblock(blocks,i*4+2);
uint32_t k4 = getblock(blocks,i*4+3);
k1 *= c1; k1 = ROTL32(k1,15); k1 *= c2; h1 ^= k1;
h1 = ROTL32(h1,19); h1 += h2; h1 = h1*5+0x561ccd1b;
k2 *= c2; k2 = ROTL32(k2,16); k2 *= c3; h2 ^= k2;
h2 = ROTL32(h2,17); h2 += h3; h2 = h2*5+0x0bcaa747;
k3 *= c3; k3 = ROTL32(k3,17); k3 *= c4; h3 ^= k3;
h3 = ROTL32(h3,15); h3 += h4; h3 = h3*5+0x96cd1c35;
k4 *= c4; k4 = ROTL32(k4,18); k4 *= c1; h4 ^= k4;
h4 = ROTL32(h4,13); h4 += h1; h4 = h4*5+0x32ac3b17;
}
//----------
// tail
const uint8_t * tail = (const uint8_t*)(data + nblocks*16);
uint32_t k1 = 0;
uint32_t k2 = 0;
uint32_t k3 = 0;
uint32_t k4 = 0;
switch(len & 15)
{
case 15: k4 ^= tail[14] << 16;
case 14: k4 ^= tail[13] << 8;
case 13: k4 ^= tail[12] << 0;
k4 *= c4; k4 = ROTL32(k4,18); k4 *= c1; h4 ^= k4;
case 12: k3 ^= tail[11] << 24;
case 11: k3 ^= tail[10] << 16;
case 10: k3 ^= tail[ 9] << 8;
case 9: k3 ^= tail[ 8] << 0;
k3 *= c3; k3 = ROTL32(k3,17); k3 *= c4; h3 ^= k3;
case 8: k2 ^= tail[ 7] << 24;
case 7: k2 ^= tail[ 6] << 16;
case 6: k2 ^= tail[ 5] << 8;
case 5: k2 ^= tail[ 4] << 0;
k2 *= c2; k2 = ROTL32(k2,16); k2 *= c3; h2 ^= k2;
case 4: k1 ^= tail[ 3] << 24;
case 3: k1 ^= tail[ 2] << 16;
case 2: k1 ^= tail[ 1] << 8;
case 1: k1 ^= tail[ 0] << 0;
k1 *= c1; k1 = ROTL32(k1,15); k1 *= c2; h1 ^= k1;
};
//----------
// finalization
h1 ^= len; h2 ^= len; h3 ^= len; h4 ^= len;
h1 += h2; h1 += h3; h1 += h4;
h2 += h1; h3 += h1; h4 += h1;
h1 = fmix32(h1);
h2 = fmix32(h2);
h3 = fmix32(h3);
h4 = fmix32(h4);
h1 += h2; h1 += h3; h1 += h4;
h2 += h1; h3 += h1; h4 += h1;
((uint32_t*)out)[0] = h1;
((uint32_t*)out)[1] = h2;
((uint32_t*)out)[2] = h3;
((uint32_t*)out)[3] = h4;
}
//-----------------------------------------------------------------------------
void MurmurHash3_x64_128 ( const void * key, const int len,
const uint32_t seed, void * out )
{
const uint8_t * data = (const uint8_t*)key;
const int nblocks = len / 16;
int i;
uint64_t h1 = seed;
uint64_t h2 = seed;
uint64_t c1 = BIG_CONSTANT(0x87c37b91114253d5);
uint64_t c2 = BIG_CONSTANT(0x4cf5ad432745937f);
//----------
// body
const uint64_t * blocks = (const uint64_t *)(data);
for(i = 0; i < nblocks; i++)
{
uint64_t k1 = getblock(blocks,i*2+0);
uint64_t k2 = getblock(blocks,i*2+1);
k1 *= c1; k1 = ROTL64(k1,31); k1 *= c2; h1 ^= k1;
h1 = ROTL64(h1,27); h1 += h2; h1 = h1*5+0x52dce729;
k2 *= c2; k2 = ROTL64(k2,33); k2 *= c1; h2 ^= k2;
h2 = ROTL64(h2,31); h2 += h1; h2 = h2*5+0x38495ab5;
}
//----------
// tail
const uint8_t * tail = (const uint8_t*)(data + nblocks*16);
uint64_t k1 = 0;
uint64_t k2 = 0;
switch(len & 15)
{
case 15: k2 ^= (uint64_t)(tail[14]) << 48;
case 14: k2 ^= (uint64_t)(tail[13]) << 40;
case 13: k2 ^= (uint64_t)(tail[12]) << 32;
case 12: k2 ^= (uint64_t)(tail[11]) << 24;
case 11: k2 ^= (uint64_t)(tail[10]) << 16;
case 10: k2 ^= (uint64_t)(tail[ 9]) << 8;
case 9: k2 ^= (uint64_t)(tail[ 8]) << 0;
k2 *= c2; k2 = ROTL64(k2,33); k2 *= c1; h2 ^= k2;
case 8: k1 ^= (uint64_t)(tail[ 7]) << 56;
case 7: k1 ^= (uint64_t)(tail[ 6]) << 48;
case 6: k1 ^= (uint64_t)(tail[ 5]) << 40;
case 5: k1 ^= (uint64_t)(tail[ 4]) << 32;
case 4: k1 ^= (uint64_t)(tail[ 3]) << 24;
case 3: k1 ^= (uint64_t)(tail[ 2]) << 16;
case 2: k1 ^= (uint64_t)(tail[ 1]) << 8;
case 1: k1 ^= (uint64_t)(tail[ 0]) << 0;
k1 *= c1; k1 = ROTL64(k1,31); k1 *= c2; h1 ^= k1;
};
//----------
// finalization
h1 ^= len; h2 ^= len;
h1 += h2;
h2 += h1;
h1 = fmix64(h1);
h2 = fmix64(h2);
h1 += h2;
h2 += h1;
((uint64_t*)out)[0] = h1;
((uint64_t*)out)[1] = h2;
}
//-----------------------------------------------------------------------------

21
nim-bloom/src/murmur3.h Normal file
View File

@ -0,0 +1,21 @@
//-----------------------------------------------------------------------------
// MurmurHash3 was written by Austin Appleby, and is placed in the
// public domain. The author hereby disclaims copyright to this source
// code.
#ifndef _MURMURHASH3_H_
#define _MURMURHASH3_H_
#include <stdint.h>
//-----------------------------------------------------------------------------
void MurmurHash3_x86_32 (const void *key, int len, uint32_t seed, void *out);
void MurmurHash3_x86_128(const void *key, int len, uint32_t seed, void *out);
void MurmurHash3_x64_128(const void *key, int len, uint32_t seed, void *out);
//-----------------------------------------------------------------------------
#endif // _MURMURHASH3_H_

View File

@ -0,0 +1,103 @@
#
# ### Probability table declaration, in private/ for readability ###
# Table for k hashes from 1..12 from http://pages.cs.wisc.edu/~cao/papers/summary-cache/node8.html
# Iterate along the sequence at position [k] until the error rate is < specified, otherwise
# raise an error.
#
type
TErrorForK = seq[float]
TAllErrorRates* = array[0..12, TErrorForK]
var kErrors*: TAllErrorRates
kErrors[0] = @[1.0]
kErrors[1] = @[1.0, 1.0,
0.3930000000, 0.2830000000, 0.2210000000, 0.1810000000, 0.1540000000,
0.1330000000, 0.1180000000, 0.1050000000, 0.0952000000, 0.0869000000,
0.0800000000, 0.0740000000, 0.0689000000, 0.0645000000, 0.0606000000,
0.0571000000, 0.0540000000, 0.0513000000, 0.0488000000, 0.0465000000,
0.0444000000, 0.0425000000, 0.0408000000, 0.0392000000, 0.0377000000,
0.0364000000, 0.0351000000, 0.0339000000, 0.0328000000, 0.0317000000,
0.0308000000 ]
kErrors[2] = @[1.0, 1.0,
0.4000000000, 0.2370000000, 0.1550000000, 0.1090000000, 0.0804000000,
0.0618000000, 0.0489000000, 0.0397000000, 0.0329000000, 0.0276000000,
0.0236000000, 0.0203000000, 0.0177000000, 0.0156000000, 0.0138000000,
0.0123000000, 0.0111000000, 0.0099800000, 0.0090600000, 0.0082500000,
0.0075500000, 0.0069400000, 0.0063900000, 0.0059100000, 0.0054800000,
0.0051000000, 0.0047500000, 0.0044400000, 0.0041600000, 0.0039000000,
0.0036700000 ]
kErrors[3] = @[1.0, 1.0, 1.0,
0.2530000000, 0.1470000000, 0.0920000000, 0.0609000000, 0.0423000000,
0.0306000000, 0.0228000000, 0.0174000000, 0.0136000000, 0.0108000000,
0.0087500000, 0.0071800000, 0.0059600000, 0.0050000000, 0.0042300000,
0.0036200000, 0.0031200000, 0.0027000000, 0.0023600000, 0.0020700000,
0.0018300000, 0.0016200000, 0.0014500000, 0.0012900000, 0.0011600000,
0.0010500000, 0.0009490000, 0.0008620000, 0.0007850000, 0.0007170000 ]
kErrors[4] = @[1.0, 1.0, 1.0, 1.0,
0.1600000000, 0.0920000000, 0.0561000000, 0.0359000000, 0.0240000000,
0.0166000000, 0.0118000000, 0.0086400000, 0.0064600000, 0.0049200000,
0.0038100000, 0.0030000000, 0.0023900000, 0.0019300000, 0.0015800000,
0.0013000000, 0.0010800000, 0.0009050000, 0.0007640000, 0.0006490000,
0.0005550000, 0.0004780000, 0.0004130000, 0.0003590000, 0.0003140000,
0.0002760000, 0.0002430000, 0.0002150000, 0.0001910000 ]
kErrors[5] = @[1.0, 1.0, 1.0, 1.0, 1.0,
0.1010000000, 0.0578000000, 0.0347000000, 0.0217000000, 0.0141000000,
0.0094300000, 0.0065000000, 0.0045900000, 0.0033200000, 0.0024400000,
0.0018300000, 0.0013900000, 0.0010700000, 0.0008390000, 0.0006630000,
0.0005300000, 0.0004270000, 0.0003470000, 0.0002850000, 0.0002350000,
0.0001960000, 0.0001640000, 0.0001380000, 0.0001170000, 0.0000996000,
0.0000853000, 0.0000733000, 0.0000633000 ]
kErrors[6] = @[1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
0.0638000000, 0.0364000000, 0.0216000000, 0.0133000000, 0.0084400000,
0.0055200000, 0.0037100000, 0.0025500000, 0.0017900000, 0.0012800000,
0.0009350000, 0.0006920000, 0.0005190000, 0.0003940000, 0.0003030000,
0.0002360000, 0.0001850000, 0.0001470000, 0.0001170000, 0.0000944000,
0.0000766000, 0.0000626000, 0.0000515000, 0.0000426000, 0.0000355000,
0.0000297000, 0.0000250000 ]
kErrors[7] = @[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
0.0229000000, 0.0135000000, 0.0081900000, 0.0051300000, 0.0032900000,
0.0021700000, 0.0014600000, 0.0010000000, 0.0007020000, 0.0004990000,
0.0003600000, 0.0002640000, 0.0001960000, 0.0001470000, 0.0001120000,
0.0000856000, 0.0000663000, 0.0000518000, 0.0000408000, 0.0000324000,
0.0000259000, 0.0000209000, 0.0000169000, 0.0000138000, 0.0000113000 ]
kErrors[8] = @[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
0.0145000000, 0.0084600000, 0.0050900000, 0.0031400000, 0.0019900000,
0.0012900000, 0.0008520000, 0.0005740000, 0.0003940000, 0.0002750000,
0.0001940000, 0.0001400000, 0.0001010000, 0.0000746000, 0.0000555000,
0.0000417000, 0.0000316000, 0.0000242000, 0.0000187000, 0.0000146000,
0.0000114000, 0.0000090100, 0.0000071600, 0.0000057300 ]
kErrors[9] = @[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
0.0053100000, 0.0031700000, 0.0019400000, 0.0012100000, 0.0007750000,
0.0005050000, 0.0003350000, 0.0002260000, 0.0001550000, 0.0001080000,
0.0000759000, 0.0000542000, 0.0000392000, 0.0000286000, 0.0000211000,
0.0000157000, 0.0000118000, 0.0000089600, 0.0000068500, 0.0000052800,
0.0000041000, 0.0000032000]
kErrors[10] = @[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
0.0033400000, 0.0019800000, 0.0012000000, 0.0007440000, 0.0004700000,
0.0003020000, 0.0001980000, 0.0001320000, 0.0000889000, 0.0000609000,
0.0000423000, 0.0000297000, 0.0000211000, 0.0000152000, 0.0000110000,
0.0000080700, 0.0000059700, 0.0000044500, 0.0000033500, 0.0000025400,
0.0000019400]
kErrors[11] = @[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
0.0021000000, 0.0012400000, 0.0007470000, 0.0004590000, 0.0002870000,
0.0001830000, 0.0001180000, 0.0000777000, 0.0000518000, 0.0000350000,
0.0000240000, 0.0000166000, 0.0000116000, 0.0000082300, 0.0000058900,
0.0000042500, 0.0000031000, 0.0000022800, 0.0000016900, 0.0000012600]
kErrors[12] = @[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
0.0007780000, 0.0004660000, 0.0002840000, 0.0001760000, 0.0001110000,
0.0000712000, 0.0000463000, 0.0000305000, 0.0000204000, 0.0000138000,
0.0000094200, 0.0000065200, 0.0000045600, 0.0000032200, 0.0000022900,
0.0000016500, 0.0000012000, 0.0000008740]

View File

@ -0,0 +1 @@
switch("path", "$projectDir/../src")

102
nim-bloom/tests/test.nim Normal file
View File

@ -0,0 +1,102 @@
import unittest
include bloom
from random import rand, randomize
import times
suite "murmur":
# Test murmurhash 3
setup:
var hashOutputs: MurmurHashes
hashOutputs = [0, 0]
rawMurmurHash("hello", 5, 0, hashOutputs)
test "raw":
check int(hashOutputs[0]) == -3758069500696749310 # Correct murmur outputs (cast to int64)
check int(hashOutputs[1]) == 6565844092913065241
test "wrapped":
let hashOutputs2 = murmurHash("hello", 0)
check hashOutputs2[0] == hashOutputs[0]
check hashOutputs2[1] == hashOutputs[1]
test "seed":
let hashOutputs3 = murmurHash("hello", 10)
check hashOutputs3[0] != hashOutputs[0]
check hashOutputs3[1] != hashOutputs[1]
suite "bloom":
setup:
let nElementsToTest = 100000
var bf = initializeBloomFilter(capacity = nElementsToTest, errorRate = 0.001)
randomize(2882) # Seed the RNG
var
sampleChars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
kTestElements, sampleLetters: seq[string]
kTestElements = newSeq[string](nElementsToTest)
sampleLetters = newSeq[string](62)
for i in 0..<nElementsToTest:
var newString = ""
for j in 0..7:
newString.add(sampleChars[rand(51)])
kTestElements[i] = newString
for i in 0..<nElementsToTest:
bf.insert(kTestElements[i])
test "params":
check(bf.capacity == nElementsToTest)
check(bf.errorRate == 0.001)
check(bf.kHashes == 10)
check(bf.nBitsPerElem == 15)
check(bf.mBits == 15 * nElementsToTest)
check(bf.useMurmurHash == true)
test "not hit":
check(bf.lookup("nothing") == false)
test "hit":
bf.insert("hit")
check(bf.lookup("hit") == true)
test "force params":
var bf2 = initializeBloomFilter(10000, 0.001, k = 4, forceNBitsPerElem = 20)
check(bf2.capacity == 10000)
check(bf2.errorRate == 0.001)
check(bf2.kHashes == 4)
check(bf2.nBitsPerElem == 20)
check(bf2.mBits == 200000)
check(bf2.useMurmurHash == true)
test "error rate":
var falsePositives = 0
for i in 0..<nElementsToTest:
var falsePositiveString = ""
for j in 0..8: # By definition not in bf as 9 chars not 8
falsePositiveString.add(sampleChars[rand(51)])
if bf.lookup(falsePositiveString):
falsePositives += 1
check falsePositives / nElementsToTest < bf.errorRate
test "lookup errors":
var lookupErrors = 0
for i in 0..<nElementsToTest:
if not bf.lookup(kTestElements[i]):
lookupErrors += 1
check lookupErrors == 0
# Finally test correct k / mOverN specification,
test "k/(m/n) spec":
expect(BloomFilterError):
discard getMOverNBitsForK(k = 2, targetError = 0.00001)
check getMOverNBitsForK(k = 2, targetError = 0.1) == 6
check getMOverNBitsForK(k = 7, targetError = 0.01) == 10
check getMOverNBitsForK(k = 7, targetError = 0.001) == 16
var bf3 = initializeBloomFilter(1000, 0.01, k = 4)
check bf3.nBitsPerElem == 11

View File

@ -6,9 +6,9 @@ license = "MIT"
srcDir = "src"
# Dependencies
requires "nim >= 1.6.0"
requires "nimsha2"
requires "nim >= 2.0.8"
requires "chronicles"
# Tasks
task test, "Run the test suite":
exec "nim c -r tests/test_reliability.nim"

81
src/common.nim Normal file
View File

@ -0,0 +1,81 @@
import std/[times, json, locks]
import "../nim-bloom/src/bloom"
type
MessageID* = string
Message* = object
senderId*: string
messageId*: MessageID
lamportTimestamp*: int64
causalHistory*: seq[MessageID]
channelId*: string
content*: string
UnacknowledgedMessage* = object
message*: Message
sendTime*: Time
resendAttempts*: int
TimestampedMessageID* = object
id*: MessageID
timestamp*: Time
RollingBloomFilter* = object
filter*: BloomFilter
window*: Duration
messages*: seq[TimestampedMessageID]
ReliabilityConfig* = object
bloomFilterCapacity*: int
bloomFilterErrorRate*: float
bloomFilterWindow*: Duration
maxMessageHistory*: int
maxCausalHistory*: int
resendInterval*: Duration
maxResendAttempts*: int
ReliabilityManager* = ref object
lamportTimestamp*: int64
messageHistory*: seq[MessageID]
bloomFilter*: RollingBloomFilter
outgoingBuffer*: seq[UnacknowledgedMessage]
incomingBuffer*: seq[Message]
channelId*: string
config*: ReliabilityConfig
lock*: Lock
onMessageReady*: proc(messageId: MessageID)
onMessageSent*: proc(messageId: MessageID)
onMissingDependencies*: proc(messageId: MessageID, missingDeps: seq[MessageID])
ReliabilityError* = enum
reSuccess,
reInvalidArgument,
reOutOfMemory,
reInternalError,
reSerializationError,
reDeserializationError,
reMessageTooLarge
Result*[T] = object
case isOk*: bool
of true:
value*: T
of false:
error*: ReliabilityError
const
DefaultBloomFilterCapacity* = 10000
DefaultBloomFilterErrorRate* = 0.001
DefaultBloomFilterWindow* = initDuration(hours = 1)
DefaultMaxMessageHistory* = 1000
DefaultMaxCausalHistory* = 10
DefaultResendInterval* = initDuration(seconds = 30)
DefaultMaxResendAttempts* = 5
MaxMessageSize* = 1024 * 1024 # 1 MB
proc ok*[T](value: T): Result[T] =
Result[T](isOk: true, value: value)
proc err*[T](error: ReliabilityError): Result[T] =
Result[T](isOk: false, error: error)

View File

@ -1,273 +1,152 @@
import std/[times, hashes, random, sequtils, algorithm]
import nimsha2
import chronicles
import ./common, ./utils
const
BloomFilterSize = 10000
BloomFilterHashCount = 7
MaxMessageHistory = 100
MaxCausalHistory = 10
type
MessageID* = string
Message* = object
senderId*: string
messageId*: MessageID
lamportTimestamp*: int64
causalHistory*: seq[MessageID]
channelId*: string
content*: string
bloomFilter*: RollingBloomFilter
UnacknowledgedMessage* = object
message*: Message
sendTime*: Time
resendAttempts*: int
RollingBloomFilter* = object
# TODO: Implement a proper Bloom filter
data: array[BloomFilterSize, bool]
hashCount: int
ReliabilityManager* = ref object
lamportTimestamp: int64
messageHistory: seq[MessageID]
bloomFilter: RollingBloomFilter
outgoingBuffer: seq[UnacknowledgedMessage]
incomingBuffer: seq[Message]
channelId: string
onMessageReady*: proc(messageId: MessageID)
onMessageSent*: proc(messageId: MessageID)
onMissingDependencies*: proc(messageId: MessageID, missingDeps: seq[MessageID])
proc hash(filter: RollingBloomFilter): Hash =
var h: Hash = 0
for idx, val in filter.data:
h = h !& hash(idx) !& hash(val)
result = !$h
proc newRollingBloomFilter(): RollingBloomFilter =
result.hashCount = BloomFilterHashCount
proc add(filter: var RollingBloomFilter, item: MessageID) =
let itemHash = hash(item)
for i in 0 ..< filter.hashCount:
let idx = (itemHash + i * i) mod BloomFilterSize
filter.data[idx] = true
proc contains(filter: RollingBloomFilter, item: MessageID): bool =
let itemHash = hash(item)
for i in 0 ..< filter.hashCount:
let idx = (itemHash + i * i) mod BloomFilterSize
if not filter.data[idx]:
return false
return true
proc newReliabilityManager*(channelId: string): ReliabilityManager =
result = ReliabilityManager(
lamportTimestamp: 0,
messageHistory: @[],
bloomFilter: newRollingBloomFilter(),
outgoingBuffer: @[],
incomingBuffer: @[],
channelId: channelId
proc defaultConfig*(): ReliabilityConfig =
ReliabilityConfig(
bloomFilterCapacity: DefaultBloomFilterCapacity,
bloomFilterErrorRate: DefaultBloomFilterErrorRate,
bloomFilterWindow: DefaultBloomFilterWindow,
maxMessageHistory: DefaultMaxMessageHistory,
maxCausalHistory: DefaultMaxCausalHistory,
resendInterval: DefaultResendInterval,
maxResendAttempts: DefaultMaxResendAttempts
)
proc generateUniqueID(): MessageID =
$secureHash($getTime().toUnix & $rand(high(int)))
proc newReliabilityManager*(channelId: string, config: ReliabilityConfig = defaultConfig()): Result[ReliabilityManager] =
if channelId.len == 0:
return err[ReliabilityManager](reInvalidArgument)
try:
let bloomFilterResult = newRollingBloomFilter(config.bloomFilterCapacity, config.bloomFilterErrorRate, config.bloomFilterWindow)
if bloomFilterResult.isErr:
return err[ReliabilityManager](bloomFilterResult.error)
proc updateLamportTimestamp(rm: ReliabilityManager, msgTs: int64) =
rm.lamportTimestamp = max(msgTs, rm.lamportTimestamp) + 1
let rm = ReliabilityManager(
lamportTimestamp: 0,
messageHistory: @[],
bloomFilter: bloomFilterResult.value,
outgoingBuffer: @[],
incomingBuffer: @[],
channelId: channelId,
config: config
)
initLock(rm.lock)
return ok(rm)
except:
return err[ReliabilityManager](reOutOfMemory)
proc getRecentMessageIDs(rm: ReliabilityManager, n: int): seq[MessageID] =
result = rm.messageHistory[max(0, rm.messageHistory.len - n) .. ^1]
proc wrapOutgoingMessage*(rm: ReliabilityManager, message: string): Result[Message] =
if message.len == 0:
return err[Message](reInvalidArgument)
if message.len > MaxMessageSize:
return err[Message](reMessageTooLarge)
proc wrapOutgoingMessage*(rm: ReliabilityManager, message: string): Message =
rm.updateLamportTimestamp(getTime().toUnix)
let msg = Message(
senderId: "TODO_SENDER_ID",
messageId: generateUniqueID(),
lamportTimestamp: rm.lamportTimestamp,
causalHistory: rm.getRecentMessageIDs(MaxCausalHistory),
channelId: rm.channelId,
content: message,
bloomFilter: rm.bloomFilter
)
rm.outgoingBuffer.add(UnacknowledgedMessage(message: msg, sendTime: getTime(), resendAttempts: 0))
msg
withLock rm.lock:
try:
let msg = Message(
senderId: "TODO_SENDER_ID",
messageId: generateUniqueID(),
lamportTimestamp: rm.lamportTimestamp,
causalHistory: rm.getRecentMessageIDs(rm.config.maxCausalHistory),
channelId: rm.channelId,
content: message
)
rm.updateLamportTimestamp(getTime().toUnix)
rm.outgoingBuffer.add(UnacknowledgedMessage(message: msg, sendTime: getTime(), resendAttempts: 0))
return ok(msg)
except:
return err[Message](reInternalError)
proc unwrapReceivedMessage*(rm: ReliabilityManager, message: Message): tuple[message: Message, missingDeps: seq[MessageID]] =
if rm.bloomFilter.contains(message.messageId):
return (message, @[])
proc unwrapReceivedMessage*(rm: ReliabilityManager, message: Message): Result[tuple[message: Message, missingDeps: seq[MessageID]]] =
withLock rm.lock:
try:
if rm.bloomFilter.contains(message.messageId):
return ok((message, @[]))
rm.bloomFilter.add(message.messageId)
rm.updateLamportTimestamp(message.lamportTimestamp)
rm.bloomFilter.add(message.messageId)
rm.updateLamportTimestamp(message.lamportTimestamp)
var missingDeps: seq[MessageID] = @[]
for depId in message.causalHistory:
if depId notin rm.messageHistory:
missingDeps.add(depId)
var missingDeps: seq[MessageID] = @[]
for depId in message.causalHistory:
if not rm.bloomFilter.contains(depId):
missingDeps.add(depId)
if missingDeps.len == 0:
rm.messageHistory.add(message.messageId)
if rm.messageHistory.len > MaxMessageHistory:
rm.messageHistory.delete(0)
if rm.onMessageReady != nil:
rm.onMessageReady(message.messageId)
else:
rm.incomingBuffer.add(message)
if rm.onMissingDependencies != nil:
rm.onMissingDependencies(message.messageId, missingDeps)
if missingDeps.len == 0:
rm.messageHistory.add(message.messageId)
if rm.messageHistory.len > rm.config.maxMessageHistory:
rm.messageHistory.delete(0)
if rm.onMessageReady != nil:
rm.onMessageReady(message.messageId)
else:
rm.incomingBuffer.add(message)
if rm.onMissingDependencies != nil:
rm.onMissingDependencies(message.messageId, missingDeps)
(message, missingDeps)
return ok((message, missingDeps))
except:
return err[(Message, seq[MessageID])](reInternalError)
proc markDependenciesMet*(rm: ReliabilityManager, messageIds: seq[MessageID]) =
var processedMessages: seq[Message] = @[]
rm.incomingBuffer = rm.incomingBuffer.filterIt(
not messageIds.allIt(it in it.causalHistory or it in rm.messageHistory)
)
proc markDependenciesMet*(rm: ReliabilityManager, messageIds: seq[MessageID]): Result[void] =
withLock rm.lock:
try:
var processedMessages: seq[Message] = @[]
rm.incomingBuffer = rm.incomingBuffer.filterIt(
not messageIds.allIt(it in it.causalHistory or rm.bloomFilter.contains(it))
)
for msg in processedMessages:
rm.messageHistory.add(msg.messageId)
if rm.messageHistory.len > MaxMessageHistory:
rm.messageHistory.delete(0)
if rm.onMessageReady != nil:
rm.onMessageReady(msg.messageId)
proc checkUnacknowledgedMessages(rm: ReliabilityManager) =
let now = getTime()
var newOutgoingBuffer: seq[UnacknowledgedMessage] = @[]
for msg in rm.outgoingBuffer:
if (now - msg.sendTime).inSeconds < 60:
newOutgoingBuffer.add(msg)
elif rm.onMessageSent != nil:
rm.onMessageSent(msg.message.messageId)
rm.outgoingBuffer = newOutgoingBuffer
for msg in processedMessages:
rm.messageHistory.add(msg.messageId)
if rm.messageHistory.len > rm.config.maxMessageHistory:
rm.messageHistory.delete(0)
if rm.onMessageReady != nil:
rm.onMessageReady(msg.messageId)
return ok()
except:
return err[void](reInternalError)
proc setCallbacks*(rm: ReliabilityManager,
onMessageReady: proc(messageId: MessageID),
onMessageSent: proc(messageId: MessageID),
onMissingDependencies: proc(messageId: MessageID, missingDeps: seq[MessageID])) =
rm.onMessageReady = onMessageReady
rm.onMessageSent = onMessageSent
rm.onMissingDependencies = onMissingDependencies
withLock rm.lock:
rm.onMessageReady = onMessageReady
rm.onMessageSent = onMessageSent
rm.onMissingDependencies = onMissingDependencies
# Logging
proc logInfo(msg: string) =
info msg
# proc checkUnacknowledgedMessages*(rm: ReliabilityManager)
proc logError(msg: string) =
error msg
# Export C API
{.push exportc, cdecl.}
type
CMessage {.bycopy.} = object
senderId: cstring
messageId: cstring
lamportTimestamp: int64
causalHistory: ptr UncheckedArray[cstring]
causalHistoryLen: cint
channelId: cstring
content: cstring
bloomFilter: pointer
CUnwrapResult {.bycopy.} = object
message: CMessage
missingDeps: ptr UncheckedArray[cstring]
missingDepsLen: cint
proc reliability_manager_new(channelId: cstring): pointer {.exportc, cdecl.} =
let rm = newReliabilityManager($channelId)
GC_ref(rm)
result = cast[pointer](rm)
proc reliability_manager_free(rmPtr: pointer) {.exportc, cdecl.} =
let rm = cast[ReliabilityManager](rmPtr)
GC_unref(rm)
proc wrap_outgoing_message(rmPtr: pointer, message: cstring): CMessage {.exportc, cdecl.} =
let rm = cast[ReliabilityManager](rmPtr)
let wrappedMsg = rm.wrapOutgoingMessage($message)
result.senderId = wrappedMsg.senderId.cstring
result.messageId = wrappedMsg.messageId.cstring
result.lamportTimestamp = wrappedMsg.lamportTimestamp
result.causalHistory = cast[ptr UncheckedArray[cstring]](alloc0(wrappedMsg.causalHistory.len * sizeof(cstring)))
result.causalHistoryLen = wrappedMsg.causalHistory.len.cint
for i, id in wrappedMsg.causalHistory:
result.causalHistory[i] = id.cstring
result.channelId = wrappedMsg.channelId.cstring
result.content = wrappedMsg.content.cstring
result.bloomFilter = cast[pointer](addr wrappedMsg.bloomFilter)
proc unwrap_received_message(rmPtr: pointer, msg: CMessage): CUnwrapResult {.exportc, cdecl.} =
let rm = cast[ReliabilityManager](rmPtr)
var nimMsg = Message(
senderId: $msg.senderId,
messageId: $msg.messageId,
lamportTimestamp: msg.lamportTimestamp,
causalHistory: newSeq[string](msg.causalHistoryLen),
channelId: $msg.channelId,
content: $msg.content,
bloomFilter: cast[RollingBloomFilter](msg.bloomFilter)[]
)
for i in 0 ..< msg.causalHistoryLen:
nimMsg.causalHistory[i] = $msg.causalHistory[i]
let (unwrappedMsg, missingDeps) = rm.unwrapReceivedMessage(nimMsg)
result.message = CMessage(
senderId: unwrappedMsg.senderId.cstring,
messageId: unwrappedMsg.messageId.cstring,
lamportTimestamp: unwrappedMsg.lamportTimestamp,
causalHistory: cast[ptr UncheckedArray[cstring]](alloc0(unwrappedMsg.causalHistory.len * sizeof(cstring))),
causalHistoryLen: unwrappedMsg.causalHistory.len.cint,
channelId: unwrappedMsg.channelId.cstring,
content: unwrappedMsg.content.cstring,
bloomFilter: cast[pointer](addr unwrappedMsg.bloomFilter)
)
for i, id in unwrappedMsg.causalHistory:
result.message.causalHistory[i] = id.cstring
result.missingDeps = cast[ptr UncheckedArray[cstring]](alloc0(missingDeps.len * sizeof(cstring)))
result.missingDepsLen = missingDeps.len.cint
for i, id in missingDeps:
result.missingDeps[i] = id.cstring
proc mark_dependencies_met(rmPtr: pointer, messageIds: ptr UncheckedArray[cstring], count: cint) {.exportc, cdecl.} =
let rm = cast[ReliabilityManager](rmPtr)
var nimMessageIds = newSeq[string](count)
for i in 0 ..< count:
nimMessageIds[i] = $messageIds[i]
rm.markDependenciesMet(nimMessageIds)
proc set_callbacks(rmPtr: pointer,
onMessageReady: proc(messageId: cstring) {.cdecl.},
onMessageSent: proc(messageId: cstring) {.cdecl.},
onMissingDependencies: proc(messageId: cstring, missingDeps: ptr UncheckedArray[cstring], missingDepsLen: cint) {.cdecl.}) {.exportc, cdecl.} =
let rm = cast[ReliabilityManager](rmPtr)
rm.setCallbacks(
proc(messageId: MessageID) = onMessageReady(messageId.cstring),
proc(messageId: MessageID) = onMessageSent(messageId.cstring),
proc(messageId: MessageID, missingDeps: seq[MessageID]) =
var cMissingDeps = cast[ptr UncheckedArray[cstring]](alloc0(missingDeps.len * sizeof(cstring)))
for i, dep in missingDeps:
cMissingDeps[i] = dep.cstring
onMissingDependencies(messageId.cstring, cMissingDeps, missingDeps.len.cint)
dealloc(cMissingDeps)
)
{.pop.}
proc processMessage*(rm: ReliabilityManager, message: string): seq[MessageID] =
let wrappedMsg = checkAndLogError(rm.wrapOutgoingMessage(message), "Failed to wrap message")
let (_, missingDeps) = checkAndLogError(rm.unwrapReceivedMessage(wrappedMsg), "Failed to unwrap message")
return missingDeps
when isMainModule:
# TODO: Add some basic tests / examples
let rm = newReliabilityManager("testChannel")
let msg = rm.wrapOutgoingMessage("Hello, World!")
echo "Wrapped message: ", msg
let (unwrappedMsg, missingDeps) = rm.unwrapReceivedMessage(msg)
echo "Unwrapped message: ", unwrappedMsg
echo "Missing dependencies: ", missingDeps
# Example usage and basic tests
let config = defaultConfig()
let rmResult = newReliabilityManager("testChannel", config)
if rmResult.isOk:
let rm = rmResult.value
rm.setCallbacks(
proc(messageId: MessageID) = echo "Message ready: ", messageId,
proc(messageId: MessageID) = echo "Message sent: ", messageId,
proc(messageId: MessageID, missingDeps: seq[MessageID]) = echo "Missing dependencies for ", messageId, ": ", missingDeps
)
let msgResult = rm.wrapOutgoingMessage("Hello, World!")
if msgResult.isOk:
let msg = msgResult.value
echo "Wrapped message: ", msg
let unwrapResult = rm.unwrapReceivedMessage(msg)
if unwrapResult.isOk:
let (unwrappedMsg, missingDeps) = unwrapResult.value
echo "Unwrapped message: ", unwrappedMsg
echo "Missing dependencies: ", missingDeps
else:
echo "Error unwrapping message: ", unwrapResult.error
else:
echo "Error wrapping message: ", msgResult.error
#rm.startPeriodicTasks()
else:
echo "Error creating ReliabilityManager: ", rmResult.error

108
src/utils.nim Normal file
View File

@ -0,0 +1,108 @@
import std/[times, hashes, random, sequtils, algorithm, json, options, locks, asyncdispatch]
import chronicles
import "../nim-bloom/src/bloom"
import ./common
proc newRollingBloomFilter*(capacity: int, errorRate: float, window: Duration): Result[RollingBloomFilter] =
try:
let filter = initializeBloomFilter(capacity, errorRate)
return ok(RollingBloomFilter(
filter: filter,
window: window,
messages: @[]
))
except:
return err[RollingBloomFilter](reInternalError)
proc add*(rbf: var RollingBloomFilter, messageId: MessageID) =
rbf.filter.insert(messageId)
rbf.messages.add(TimestampedMessageID(id: messageId, timestamp: getTime()))
proc contains*(rbf: RollingBloomFilter, messageId: MessageID): bool =
rbf.filter.lookup(messageId)
proc clean*(rbf: var RollingBloomFilter) =
## Removes outdated entries from the rolling bloom filter.
let now = getTime()
let cutoff = now - rbf.window
var newMessages: seq[TimestampedMessageID] = @[]
var newFilter = initializeBloomFilter(rbf.filter.capacity, rbf.filter.errorRate)
for msg in rbf.messages:
if msg.timestamp > cutoff:
newMessages.add(msg)
newFilter.insert(msg.id)
rbf.messages = newMessages
rbf.filter = newFilter
proc cleanBloomFilter*(rm: ReliabilityManager) =
## Cleans the rolling bloom filter, removing outdated entries.
withLock rm.lock:
rm.bloomFilter.clean()
proc updateLamportTimestamp(rm: ReliabilityManager, msgTs: int64) =
rm.lamportTimestamp = max(msgTs, rm.lamportTimestamp) + 1
proc getRecentMessageIDs(rm: ReliabilityManager, n: int): seq[MessageID] =
result = rm.messageHistory[max(0, rm.messageHistory.len - n) .. ^1]
proc generateUniqueID*(): MessageID =
let timestamp = getTime().toUnix
let randomPart = rand(high(int))
result = $hash($timestamp & $randomPart)
proc serializeMessage*(msg: Message): Result[string] =
try:
let jsonNode = %*{
"senderId": msg.senderId,
"messageId": msg.messageId,
"lamportTimestamp": msg.lamportTimestamp,
"causalHistory": msg.causalHistory,
"channelId": msg.channelId,
"content": msg.content
}
return ok($jsonNode)
except:
return err[string](reSerializationError)
proc deserializeMessage*(data: string): Result[Message] =
try:
let jsonNode = parseJson(data)
return ok(Message(
senderId: jsonNode["senderId"].getStr,
messageId: jsonNode["messageId"].getStr,
lamportTimestamp: jsonNode["lamportTimestamp"].getBiggestInt,
causalHistory: jsonNode["causalHistory"].to(seq[string]),
channelId: jsonNode["channelId"].getStr,
content: jsonNode["content"].getStr
))
except:
return err[Message](reDeserializationError)
proc getMessageHistory*(rm: ReliabilityManager): seq[MessageID] =
withLock rm.lock:
return rm.messageHistory
proc getOutgoingBufferSize*(rm: ReliabilityManager): int =
withLock rm.lock:
return rm.outgoingBuffer.len
proc getIncomingBufferSize*(rm: ReliabilityManager): int =
withLock rm.lock:
return rm.incomingBuffer.len
proc logError*(msg: string) =
## Logs an error message
error "ReliabilityError", message = msg
proc logInfo*(msg: string) =
## Logs an informational message
info "ReliabilityInfo", message = msg
proc checkAndLogError*[T](res: Result[T], errorMsg: string): T =
if res.isOk:
return res.value
else:
logError(errorMsg & ": " & $res.error)
raise newException(ValueError, errorMsg)

View File

@ -3,32 +3,50 @@ import ../src/reliability
suite "ReliabilityManager":
setup:
let rm = newReliabilityManager("testChannel")
let rmResult = newReliabilityManager("testChannel")
check rmResult.isOk
let rm = rmResult.value
test "wrapOutgoingMessage":
let msg = rm.wrapOutgoingMessage("Hello, World!")
let msgResult = rm.wrapOutgoingMessage("Hello, World!")
check msgResult.isOk
let msg = msgResult.value
check:
msg.content == "Hello, World!"
msg.channelId == "testChannel"
msg.causalHistory.len == 0
test "unwrapReceivedMessage":
let wrappedMsg = rm.wrapOutgoingMessage("Test message")
let (unwrappedMsg, missingDeps) = rm.unwrapReceivedMessage(wrappedMsg)
let wrappedMsgResult = rm.wrapOutgoingMessage("Test message")
check wrappedMsgResult.isOk
let wrappedMsg = wrappedMsgResult.value
let unwrapResult = rm.unwrapReceivedMessage(wrappedMsg)
check unwrapResult.isOk
let (unwrappedMsg, missingDeps) = unwrapResult.value
check:
unwrappedMsg.content == "Test message"
missingDeps.len == 0
test "markDependenciesMet":
let msg1 = rm.wrapOutgoingMessage("Message 1")
let msg2 = rm.wrapOutgoingMessage("Message 2")
let msg3 = rm.wrapOutgoingMessage("Message 3")
var msg1Result = rm.wrapOutgoingMessage("Message 1")
var msg2Result = rm.wrapOutgoingMessage("Message 2")
var msg3Result = rm.wrapOutgoingMessage("Message 3")
check msg1Result.isOk and msg2Result.isOk and msg3Result.isOk
let msg1 = msg1Result.value
let msg2 = msg2Result.value
let msg3 = msg3Result.value
var (_, missingDeps) = rm.unwrapReceivedMessage(msg3)
var unwrapResult = rm.unwrapReceivedMessage(msg3)
check unwrapResult.isOk
var (_, missingDeps) = unwrapResult.value
check missingDeps.len == 2
rm.markDependenciesMet(@[msg1.messageId, msg2.messageId])
(_, missingDeps) = rm.unwrapReceivedMessage(msg3)
let markResult = rm.markDependenciesMet(@[msg1.messageId, msg2.messageId])
check markResult.isOk
unwrapResult = rm.unwrapReceivedMessage(msg3)
check unwrapResult.isOk
(_, missingDeps) = unwrapResult.value
check missingDeps.len == 0
test "callbacks":
@ -42,8 +60,11 @@ suite "ReliabilityManager":
proc(messageId: MessageID, missingDeps: seq[MessageID]) = missingDepsCount += 1
)
let msg1 = rm.wrapOutgoingMessage("Message 1")
let msg2 = rm.wrapOutgoingMessage("Message 2")
let msg1Result = rm.wrapOutgoingMessage("Message 1")
let msg2Result = rm.wrapOutgoingMessage("Message 2")
check msg1Result.isOk and msg2Result.isOk
let msg1 = msg1Result.value
let msg2 = msg2Result.value
discard rm.unwrapReceivedMessage(msg1)
discard rm.unwrapReceivedMessage(msg2)
@ -52,59 +73,20 @@ suite "ReliabilityManager":
messageSentCount == 0 # This would be triggered by the checkUnacknowledgedMessages function
missingDepsCount == 0
test "lamport timestamps":
let msg1 = rm.wrapOutgoingMessage("Message 1")
let msg2 = rm.wrapOutgoingMessage("Message 2")
check msg2.lamportTimestamp > msg1.lamportTimestamp
let msg3 = Message(lamportTimestamp: msg2.lamportTimestamp + 10, messageId: generateUniqueID(), content: "Message 3")
discard rm.unwrapReceivedMessage(msg3)
let msg4 = rm.wrapOutgoingMessage("Message 4")
check msg4.lamportTimestamp > msg3.lamportTimestamp
test "causal history":
let msg1 = rm.wrapOutgoingMessage("Message 1")
let msg2 = rm.wrapOutgoingMessage("Message 2")
let msg3 = rm.wrapOutgoingMessage("Message 3")
test "serialization":
let msgResult = rm.wrapOutgoingMessage("Test serialization")
check msgResult.isOk
let msg = msgResult.value
let serializeResult = serializeMessage(msg)
check serializeResult.isOk
let serialized = serializeResult.value
let deserializeResult = deserializeMessage(serialized)
check deserializeResult.isOk
let deserialized = deserializeResult.value
check:
msg2.causalHistory.contains(msg1.messageId)
msg3.causalHistory.contains(msg2.messageId)
msg3.causalHistory.contains(msg1.messageId)
deserialized.content == "Test serialization"
deserialized.messageId == msg.messageId
deserialized.lamportTimestamp == msg.lamportTimestamp
test "bloom filter":
let msg1 = rm.wrapOutgoingMessage("Message 1")
let (_, missingDeps1) = rm.unwrapReceivedMessage(msg1)
check missingDeps1.len == 0
let (_, missingDeps2) = rm.unwrapReceivedMessage(msg1)
check missingDeps2.len == 0 # The message should be in the bloom filter and not processed again
test "message history limit":
for i in 1..MaxMessageHistory + 10:
let msg = rm.wrapOutgoingMessage($i)
discard rm.unwrapReceivedMessage(msg)
check rm.messageHistory.len <= MaxMessageHistory
test "missing dependencies callback":
var missingDepsReceived: seq[MessageID] = @[]
rm.setCallbacks(
proc(messageId: MessageID) = discard,
proc(messageId: MessageID) = discard,
proc(messageId: MessageID, missingDeps: seq[MessageID]) = missingDepsReceived = missingDeps
)
let msg1 = rm.wrapOutgoingMessage("Message 1")
let msg2 = rm.wrapOutgoingMessage("Message 2")
let msg3 = Message(
messageId: generateUniqueID(),
lamportTimestamp: msg2.lamportTimestamp + 1,
causalHistory: @[msg1.messageId, msg2.messageId],
content: "Message 3"
)
discard rm.unwrapReceivedMessage(msg3)
check missingDepsReceived.len == 2
check missingDepsReceived.contains(msg1.messageId)
check missingDepsReceived.contains(msg2.messageId)
when isMainModule:
unittest.run()