From 62821077bc519fc64d8305d1afc06d55b74f6472 Mon Sep 17 00:00:00 2001 From: shash256 <111925100+shash256@users.noreply.github.com> Date: Mon, 21 Oct 2024 16:55:07 +0530 Subject: [PATCH] feat: add nim-bloom, many more updates on reliability API split to common and utils, logging, error handling, thread safety --- nim-bloom/.DS_Store | Bin 0 -> 6148 bytes nim-bloom/.github/workflows/main.yml | 58 ++++ nim-bloom/.gitignore | 6 + nim-bloom/LICENSE | 20 ++ nim-bloom/README.md | 41 +++ nim-bloom/bloom.nimble | 9 + nim-bloom/src/bloom.nim | 244 +++++++++++++++ nim-bloom/src/murmur3.c | 314 +++++++++++++++++++ nim-bloom/src/murmur3.h | 21 ++ nim-bloom/src/private/probabilities.nim | 103 +++++++ nim-bloom/tests/config.nims | 1 + nim-bloom/tests/test.nim | 102 +++++++ reliability.nimble | 4 +- src/common.nim | 81 +++++ src/reliability.nim | 381 ++++++++---------------- src/utils.nim | 108 +++++++ tests/test_reliability.nim | 114 +++---- 17 files changed, 1288 insertions(+), 319 deletions(-) create mode 100644 nim-bloom/.DS_Store create mode 100644 nim-bloom/.github/workflows/main.yml create mode 100644 nim-bloom/.gitignore create mode 100644 nim-bloom/LICENSE create mode 100644 nim-bloom/README.md create mode 100644 nim-bloom/bloom.nimble create mode 100644 nim-bloom/src/bloom.nim create mode 100644 nim-bloom/src/murmur3.c create mode 100644 nim-bloom/src/murmur3.h create mode 100644 nim-bloom/src/private/probabilities.nim create mode 100644 nim-bloom/tests/config.nims create mode 100644 nim-bloom/tests/test.nim create mode 100644 src/common.nim create mode 100644 src/utils.nim diff --git a/nim-bloom/.DS_Store b/nim-bloom/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..8c4340f9a46e55e3dfe99a3d6e0736034f2c323f GIT binary patch literal 6148 zcmeHKL2uJA6n<{IHPJxo0i<1!EOD($#|9JPl6CE25^z`$8~~MOi9|#iPfe;Gs!BP- zkKxKM;lFT#@7XR|({b1Zp~L-9dza4)1y^aE{~7ffK%Y+3fOhX+q*f{F`NQU zfm^NsuMZN=7+9ZJpPJ^}z+2x~)~KNXoHEd~}VgJ^*XLj@YDutyAG=!i?37g($e z8afHxGwx%17WRZ9bnl2uolYV!=t`%6Q(#kprn&6!{y+Nr`G1q-uABl+fm@}3Xbr>R z07J5S>(=0Sua)qRa5m1X4E~{@qPJq?@>aYL*M_*{3NWx(8AJr;J_NK3u5b$cQw4ql D^y7h= literal 0 HcmV?d00001 diff --git a/nim-bloom/.github/workflows/main.yml b/nim-bloom/.github/workflows/main.yml new file mode 100644 index 0000000..f366b37 --- /dev/null +++ b/nim-bloom/.github/workflows/main.yml @@ -0,0 +1,58 @@ +name: website + +on: [push] # debugging only +#on: +# push: +# tags: +# - 'v*.*.*' + +jobs: + publish: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v1 + - name: Set output + id: vars + run: echo ::set-output name=tag::${GITHUB_REF:10} + - name: Cache choosenim + id: cache-choosenim + uses: actions/cache@v1 + with: + path: ~/.choosenim + key: ${{ runner.os }}-choosenim-stable + - name: Cache nimble + id: cache-nimble + uses: actions/cache@v1 + with: + path: ~/.nimble + key: ${{ runner.os }}-nimble-stable + - uses: jiro4989/setup-nim-action@v1.0.2 + with: + nim-version: 'stable' + - name: Build and test + env: + RELEASE_VERSION: ${{ steps.vars.outputs.tag }} + run: | + nimble test -Y + - name: Build doc + env: + RELEASE_VERSION: ${{ steps.vars.outputs.tag }} + run: | + # Due to bug https://github.com/nim-lang/Nim/issues/14281, compile the documentation separately. + nimble doc --git.url:https://github.com/$GITHUB_REPOSITORY --git.commit:$RELEASE_VERSION bloom.nim + nimble doc --git.url:https://github.com/$GITHUB_REPOSITORY --git.commit:$RELEASE_VERSION private/probabilities.nim + find . + mkdir -p ./public + mv bloom.html probabilities.html nimdoc.out.css ./public/ + cd ./public/ + ln -s ./bloom.html index.html + cd ../ + - name: Deploy + if: success() + uses: crazy-max/ghaction-github-pages@v1.3.0 + with: + target_branch: gh-pages + build_dir: ./public + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/nim-bloom/.gitignore b/nim-bloom/.gitignore new file mode 100644 index 0000000..dd8e2f7 --- /dev/null +++ b/nim-bloom/.gitignore @@ -0,0 +1,6 @@ +nimcache +nimcache/* +tests/test +bloom +*.html +*.css diff --git a/nim-bloom/LICENSE b/nim-bloom/LICENSE new file mode 100644 index 0000000..10ea866 --- /dev/null +++ b/nim-bloom/LICENSE @@ -0,0 +1,20 @@ +The MIT License (MIT) + +Copyright (c) 2013 Nick Greenfield + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/nim-bloom/README.md b/nim-bloom/README.md new file mode 100644 index 0000000..339228a --- /dev/null +++ b/nim-bloom/README.md @@ -0,0 +1,41 @@ +nim-bloom +============ + +Bloom filter implementation in Nim. Uses a C implementation of MurmurHash3 for optimal speed and numeric distribution. + +On a 10 year old Macbook Pro Retina the test case for 10M insertions executes in ~4.0 seconds and 10M lookups in ~3.5 seconds for a Bloom filter with a 1 in 1000 error rate (0.001). This is ~2.5M insertions/sec and ~2.9M lookups/sec on a single thread (but passing the `-d:release` flag to the Nim compiler and thus activating the C compiler's optimizations). If k is lowered to 5 or 6 vs. a larger "optimal" number, performance further increases to ~4M ops/sec. Note that this test is for a Bloom filter ~20-25MB in size and thus accurately reflects the cost of main memory accesses (vs. a smaller filter that might fit solely in L3 cache, for example, and can achieve several million additional ops/sec). + + +Currently supports inserting and looking up string elements. Forthcoming features include: +* Support for other types beyond strings +* Support for iterables in the insert method +* Persistence + + +quickstart +==== +Quick functionality demo: +``` +import bloom +var bf = initializeBloomFilter(capacity = 10000, errorRate = 0.001) +echo bf # Get characteristics of the Bloom filter +echo bf.lookup("An element not in the Bloom filter") # Prints 'false' +bf.insert("Here we go...") +assert(bf.lookup("Here we go...")) +``` + + +By default, the Bloom filter will use a mathematically optimal number of k hash functions, which minimizes the amount of error per bit of storage required. In many cases, however, it may be advantageous to specify a smaller value of k in order to save time hashing. This is supported by passing an explicit `k` parameter, which will then either create an optimal Bloom filter for the specified error rate.[1] + +[1] If `k` <= 12 and the number of required bytes per element is <= 4. If either of these conditions doesn't hold, a fully manual Bloom filter can be constructed by passing both `k` and `force_n_bits_per_elem`. + +Example: +``` +var bf2 = initializeBloomFilter(capacity = 10000, errorRate = 0.001, k = 5) +assert bf2.kHashes == 5 +assert bf2.nBitsPerElem == 18 + +var bf3 = initializeBloomFilter(capacity = 10000, errorRate = 0.001, k = 5, forceNBitsPerElem = 12) +assert bf3.kHashes == 5 +assert bf3.nBitsPerElem == 12 # But note, however, that bf.errorRate will *not* be correct +``` diff --git a/nim-bloom/bloom.nimble b/nim-bloom/bloom.nimble new file mode 100644 index 0000000..3398bd2 --- /dev/null +++ b/nim-bloom/bloom.nimble @@ -0,0 +1,9 @@ +# Package +version = "0.1.0" +author = "Boyd Greenfield" +description = "Efficient Bloom filter implementation for Nim using MurmurHash3." +license = "MIT" +srcDir = "src" + +# Dependencies +requires "nim >= 1.0.0" diff --git a/nim-bloom/src/bloom.nim b/nim-bloom/src/bloom.nim new file mode 100644 index 0000000..333ea7a --- /dev/null +++ b/nim-bloom/src/bloom.nim @@ -0,0 +1,244 @@ +from math import ceil, ln, pow, round +import hashes +import strutils +import private/probabilities + +# Import MurmurHash3 code and compile at the same time as Nim code +{.compile: "murmur3.c".} + +type + BloomFilterError = object of CatchableError + MurmurHashes = array[0..1, int] + BloomFilter* = object + capacity*: int + errorRate*: float + kHashes*: int + mBits*: int + intArray: seq[int] + nBitsPerElem*: int + useMurmurHash*: bool + +proc rawMurmurHash(key: cstring, len: int, seed: uint32, + outHashes: var MurmurHashes): void {. + importc: "MurmurHash3_x64_128".} + +proc murmurHash(key: string, seed = 0'u32): MurmurHashes = + rawMurmurHash(key, key.len, seed, outHashes = result) + +proc hashA(item: string, maxValue: int): int = + hash(item) mod maxValue + +proc hashB(item: string, maxValue: int): int = + hash(item & " b") mod maxValue + +proc hashN(item: string, n: int, maxValue: int): int = + ## Get the nth hash of a string using the formula hashA + n * hashB + ## which uses 2 hash functions vs. k and has comparable properties + ## See Kirsch and Mitzenmacher, 2008: + ## http://www.eecs.harvard.edu/~kirsch/pubs/bbbf/rsa.pdf + abs((hashA(item, maxValue) + n * hashB(item, maxValue))) mod maxValue + +proc getMOverNBitsForK(k: int, targetError: float, + probabilityTable = kErrors): int = + ## Returns the optimal number of m/n bits for a given k. + if k notin 0..12: + raise newException(BloomFilterError, + "K must be <= 12 if forceNBitsPerElem is not also specified.") + + for mOverN in 2..probabilityTable[k].high: + if probabilityTable[k][mOverN] < targetError: + return mOverN + + raise newException(BloomFilterError, + "Specified value of k and error rate for which is not achievable using less than 4 bytes / element.") + +proc initializeBloomFilter*(capacity: int, errorRate: float, k = 0, + forceNBitsPerElem = 0, + useMurmurHash = true): BloomFilter = + ## Initializes a Bloom filter, using a specified ``capacity``, + ## ``errorRate``, and – optionally – specific number of k hash functions. + ## If ``kHashes`` is < 1 (default argument is 0), ``kHashes`` will be + ## optimally calculated on the fly. Otherwise, ``kHashes`` will be set to + ## the passed integer, which requires that ``forceNBitsPerElem`` is + ## also set to be greater than 0. Otherwise a ``BloomFilterError`` + ## exception is raised. + ## See http://pages.cs.wisc.edu/~cao/papers/summary-cache/node8.html for + ## useful tables on k and m/n (n bits per element) combinations. + ## + ## The Bloom filter uses the MurmurHash3 implementation by default, + ## though it can fall back to using the built-in nim ``hash`` function + ## if ``useMurmurHash = false``. This is compiled alongside the Nim + ## code using the ``{.compile.}`` pragma. + var + kHashes: int + bitsPerElem: float + nBitsPerElem: int + + if k < 1: # Calculate optimal k and use that + bitsPerElem = ceil(-1.0 * (ln(errorRate) / (pow(ln(2.float), 2)))) + kHashes = round(ln(2.float) * bitsPerElem).int + nBitsPerElem = round(bitsPerElem).int + else: # Use specified k if possible + if forceNBitsPerElem < 1: # Use lookup table + nBitsPerElem = getMOverNBitsForK(k = k, targetError = errorRate) + else: + nBitsPerElem = forceNBitsPerElem + kHashes = k + + let + mBits = capacity * nBitsPerElem + mInts = 1 + mBits div (sizeof(int) * 8) + + BloomFilter(capacity: capacity, errorRate: errorRate, kHashes: kHashes, + mBits: mBits, intArray: newSeq[int](mInts), nBitsPerElem: nBitsPerElem, + useMurmurHash: useMurmurHash) + +proc `$`*(bf: BloomFilter): string = + ## Prints the capacity, set error rate, number of k hash functions, + ## and total bits of memory allocated by the Bloom filter. + "Bloom filter with $1 capacity, $2 error rate, $3 hash functions, and requiring $4 bits per stored element." % + [$bf.capacity, + formatFloat(bf.errorRate, format = ffScientific, precision = 1), + $bf.kHashes, $bf.nBitsPerElem] + +{.push overflowChecks: off.} + +proc hashMurmur(bf: BloomFilter, key: string): seq[int] = + result.newSeq(bf.kHashes) + let murmurHashes = murmurHash(key, seed = 0'u32) + for i in 0..> (32 - r)); +} + +static inline FORCE_INLINE uint64_t rotl64 ( uint64_t x, int8_t r ) +{ + return (x << r) | (x >> (64 - r)); +} + +#define ROTL32(x,y) rotl32(x,y) +#define ROTL64(x,y) rotl64(x,y) + +#define BIG_CONSTANT(x) (x##LLU) + +//----------------------------------------------------------------------------- +// Block read - if your platform needs to do endian-swapping or can only +// handle aligned reads, do the conversion here + +#define getblock(p, i) (p[i]) + +//----------------------------------------------------------------------------- +// Finalization mix - force all bits of a hash block to avalanche + +static inline FORCE_INLINE uint32_t fmix32 ( uint32_t h ) +{ + h ^= h >> 16; + h *= 0x85ebca6b; + h ^= h >> 13; + h *= 0xc2b2ae35; + h ^= h >> 16; + + return h; +} + +//---------- + +static inline FORCE_INLINE uint64_t fmix64 ( uint64_t k ) +{ + k ^= k >> 33; + k *= BIG_CONSTANT(0xff51afd7ed558ccd); + k ^= k >> 33; + k *= BIG_CONSTANT(0xc4ceb9fe1a85ec53); + k ^= k >> 33; + + return k; +} + +//----------------------------------------------------------------------------- + +void MurmurHash3_x86_32 ( const void * key, int len, + uint32_t seed, void * out ) +{ + const uint8_t * data = (const uint8_t*)key; + const int nblocks = len / 4; + int i; + + uint32_t h1 = seed; + + uint32_t c1 = 0xcc9e2d51; + uint32_t c2 = 0x1b873593; + + //---------- + // body + + const uint32_t * blocks = (const uint32_t *)(data + nblocks*4); + + for(i = -nblocks; i; i++) + { + uint32_t k1 = getblock(blocks,i); + + k1 *= c1; + k1 = ROTL32(k1,15); + k1 *= c2; + + h1 ^= k1; + h1 = ROTL32(h1,13); + h1 = h1*5+0xe6546b64; + } + + //---------- + // tail + + const uint8_t * tail = (const uint8_t*)(data + nblocks*4); + + uint32_t k1 = 0; + + switch(len & 3) + { + case 3: k1 ^= tail[2] << 16; + case 2: k1 ^= tail[1] << 8; + case 1: k1 ^= tail[0]; + k1 *= c1; k1 = ROTL32(k1,15); k1 *= c2; h1 ^= k1; + }; + + //---------- + // finalization + + h1 ^= len; + + h1 = fmix32(h1); + + *(uint32_t*)out = h1; +} + +//----------------------------------------------------------------------------- + +void MurmurHash3_x86_128 ( const void * key, const int len, + uint32_t seed, void * out ) +{ + const uint8_t * data = (const uint8_t*)key; + const int nblocks = len / 16; + int i; + + uint32_t h1 = seed; + uint32_t h2 = seed; + uint32_t h3 = seed; + uint32_t h4 = seed; + + uint32_t c1 = 0x239b961b; + uint32_t c2 = 0xab0e9789; + uint32_t c3 = 0x38b34ae5; + uint32_t c4 = 0xa1e38b93; + + //---------- + // body + + const uint32_t * blocks = (const uint32_t *)(data + nblocks*16); + + for(i = -nblocks; i; i++) + { + uint32_t k1 = getblock(blocks,i*4+0); + uint32_t k2 = getblock(blocks,i*4+1); + uint32_t k3 = getblock(blocks,i*4+2); + uint32_t k4 = getblock(blocks,i*4+3); + + k1 *= c1; k1 = ROTL32(k1,15); k1 *= c2; h1 ^= k1; + + h1 = ROTL32(h1,19); h1 += h2; h1 = h1*5+0x561ccd1b; + + k2 *= c2; k2 = ROTL32(k2,16); k2 *= c3; h2 ^= k2; + + h2 = ROTL32(h2,17); h2 += h3; h2 = h2*5+0x0bcaa747; + + k3 *= c3; k3 = ROTL32(k3,17); k3 *= c4; h3 ^= k3; + + h3 = ROTL32(h3,15); h3 += h4; h3 = h3*5+0x96cd1c35; + + k4 *= c4; k4 = ROTL32(k4,18); k4 *= c1; h4 ^= k4; + + h4 = ROTL32(h4,13); h4 += h1; h4 = h4*5+0x32ac3b17; + } + + //---------- + // tail + + const uint8_t * tail = (const uint8_t*)(data + nblocks*16); + + uint32_t k1 = 0; + uint32_t k2 = 0; + uint32_t k3 = 0; + uint32_t k4 = 0; + + switch(len & 15) + { + case 15: k4 ^= tail[14] << 16; + case 14: k4 ^= tail[13] << 8; + case 13: k4 ^= tail[12] << 0; + k4 *= c4; k4 = ROTL32(k4,18); k4 *= c1; h4 ^= k4; + + case 12: k3 ^= tail[11] << 24; + case 11: k3 ^= tail[10] << 16; + case 10: k3 ^= tail[ 9] << 8; + case 9: k3 ^= tail[ 8] << 0; + k3 *= c3; k3 = ROTL32(k3,17); k3 *= c4; h3 ^= k3; + + case 8: k2 ^= tail[ 7] << 24; + case 7: k2 ^= tail[ 6] << 16; + case 6: k2 ^= tail[ 5] << 8; + case 5: k2 ^= tail[ 4] << 0; + k2 *= c2; k2 = ROTL32(k2,16); k2 *= c3; h2 ^= k2; + + case 4: k1 ^= tail[ 3] << 24; + case 3: k1 ^= tail[ 2] << 16; + case 2: k1 ^= tail[ 1] << 8; + case 1: k1 ^= tail[ 0] << 0; + k1 *= c1; k1 = ROTL32(k1,15); k1 *= c2; h1 ^= k1; + }; + + //---------- + // finalization + + h1 ^= len; h2 ^= len; h3 ^= len; h4 ^= len; + + h1 += h2; h1 += h3; h1 += h4; + h2 += h1; h3 += h1; h4 += h1; + + h1 = fmix32(h1); + h2 = fmix32(h2); + h3 = fmix32(h3); + h4 = fmix32(h4); + + h1 += h2; h1 += h3; h1 += h4; + h2 += h1; h3 += h1; h4 += h1; + + ((uint32_t*)out)[0] = h1; + ((uint32_t*)out)[1] = h2; + ((uint32_t*)out)[2] = h3; + ((uint32_t*)out)[3] = h4; +} + +//----------------------------------------------------------------------------- + +void MurmurHash3_x64_128 ( const void * key, const int len, + const uint32_t seed, void * out ) +{ + const uint8_t * data = (const uint8_t*)key; + const int nblocks = len / 16; + int i; + + uint64_t h1 = seed; + uint64_t h2 = seed; + + uint64_t c1 = BIG_CONSTANT(0x87c37b91114253d5); + uint64_t c2 = BIG_CONSTANT(0x4cf5ad432745937f); + + //---------- + // body + + const uint64_t * blocks = (const uint64_t *)(data); + + for(i = 0; i < nblocks; i++) + { + uint64_t k1 = getblock(blocks,i*2+0); + uint64_t k2 = getblock(blocks,i*2+1); + + k1 *= c1; k1 = ROTL64(k1,31); k1 *= c2; h1 ^= k1; + + h1 = ROTL64(h1,27); h1 += h2; h1 = h1*5+0x52dce729; + + k2 *= c2; k2 = ROTL64(k2,33); k2 *= c1; h2 ^= k2; + + h2 = ROTL64(h2,31); h2 += h1; h2 = h2*5+0x38495ab5; + } + + //---------- + // tail + + const uint8_t * tail = (const uint8_t*)(data + nblocks*16); + + uint64_t k1 = 0; + uint64_t k2 = 0; + + switch(len & 15) + { + case 15: k2 ^= (uint64_t)(tail[14]) << 48; + case 14: k2 ^= (uint64_t)(tail[13]) << 40; + case 13: k2 ^= (uint64_t)(tail[12]) << 32; + case 12: k2 ^= (uint64_t)(tail[11]) << 24; + case 11: k2 ^= (uint64_t)(tail[10]) << 16; + case 10: k2 ^= (uint64_t)(tail[ 9]) << 8; + case 9: k2 ^= (uint64_t)(tail[ 8]) << 0; + k2 *= c2; k2 = ROTL64(k2,33); k2 *= c1; h2 ^= k2; + + case 8: k1 ^= (uint64_t)(tail[ 7]) << 56; + case 7: k1 ^= (uint64_t)(tail[ 6]) << 48; + case 6: k1 ^= (uint64_t)(tail[ 5]) << 40; + case 5: k1 ^= (uint64_t)(tail[ 4]) << 32; + case 4: k1 ^= (uint64_t)(tail[ 3]) << 24; + case 3: k1 ^= (uint64_t)(tail[ 2]) << 16; + case 2: k1 ^= (uint64_t)(tail[ 1]) << 8; + case 1: k1 ^= (uint64_t)(tail[ 0]) << 0; + k1 *= c1; k1 = ROTL64(k1,31); k1 *= c2; h1 ^= k1; + }; + + //---------- + // finalization + + h1 ^= len; h2 ^= len; + + h1 += h2; + h2 += h1; + + h1 = fmix64(h1); + h2 = fmix64(h2); + + h1 += h2; + h2 += h1; + + ((uint64_t*)out)[0] = h1; + ((uint64_t*)out)[1] = h2; +} + +//----------------------------------------------------------------------------- diff --git a/nim-bloom/src/murmur3.h b/nim-bloom/src/murmur3.h new file mode 100644 index 0000000..6928384 --- /dev/null +++ b/nim-bloom/src/murmur3.h @@ -0,0 +1,21 @@ +//----------------------------------------------------------------------------- +// MurmurHash3 was written by Austin Appleby, and is placed in the +// public domain. The author hereby disclaims copyright to this source +// code. + +#ifndef _MURMURHASH3_H_ +#define _MURMURHASH3_H_ + +#include + +//----------------------------------------------------------------------------- + +void MurmurHash3_x86_32 (const void *key, int len, uint32_t seed, void *out); + +void MurmurHash3_x86_128(const void *key, int len, uint32_t seed, void *out); + +void MurmurHash3_x64_128(const void *key, int len, uint32_t seed, void *out); + +//----------------------------------------------------------------------------- + +#endif // _MURMURHASH3_H_ \ No newline at end of file diff --git a/nim-bloom/src/private/probabilities.nim b/nim-bloom/src/private/probabilities.nim new file mode 100644 index 0000000..59175f2 --- /dev/null +++ b/nim-bloom/src/private/probabilities.nim @@ -0,0 +1,103 @@ +# +# ### Probability table declaration, in private/ for readability ### +# Table for k hashes from 1..12 from http://pages.cs.wisc.edu/~cao/papers/summary-cache/node8.html +# Iterate along the sequence at position [k] until the error rate is < specified, otherwise +# raise an error. +# + +type + TErrorForK = seq[float] + TAllErrorRates* = array[0..12, TErrorForK] + +var kErrors*: TAllErrorRates + +kErrors[0] = @[1.0] +kErrors[1] = @[1.0, 1.0, + 0.3930000000, 0.2830000000, 0.2210000000, 0.1810000000, 0.1540000000, + 0.1330000000, 0.1180000000, 0.1050000000, 0.0952000000, 0.0869000000, + 0.0800000000, 0.0740000000, 0.0689000000, 0.0645000000, 0.0606000000, + 0.0571000000, 0.0540000000, 0.0513000000, 0.0488000000, 0.0465000000, + 0.0444000000, 0.0425000000, 0.0408000000, 0.0392000000, 0.0377000000, + 0.0364000000, 0.0351000000, 0.0339000000, 0.0328000000, 0.0317000000, + 0.0308000000 ] + +kErrors[2] = @[1.0, 1.0, + 0.4000000000, 0.2370000000, 0.1550000000, 0.1090000000, 0.0804000000, + 0.0618000000, 0.0489000000, 0.0397000000, 0.0329000000, 0.0276000000, + 0.0236000000, 0.0203000000, 0.0177000000, 0.0156000000, 0.0138000000, + 0.0123000000, 0.0111000000, 0.0099800000, 0.0090600000, 0.0082500000, + 0.0075500000, 0.0069400000, 0.0063900000, 0.0059100000, 0.0054800000, + 0.0051000000, 0.0047500000, 0.0044400000, 0.0041600000, 0.0039000000, + 0.0036700000 ] + +kErrors[3] = @[1.0, 1.0, 1.0, + 0.2530000000, 0.1470000000, 0.0920000000, 0.0609000000, 0.0423000000, + 0.0306000000, 0.0228000000, 0.0174000000, 0.0136000000, 0.0108000000, + 0.0087500000, 0.0071800000, 0.0059600000, 0.0050000000, 0.0042300000, + 0.0036200000, 0.0031200000, 0.0027000000, 0.0023600000, 0.0020700000, + 0.0018300000, 0.0016200000, 0.0014500000, 0.0012900000, 0.0011600000, + 0.0010500000, 0.0009490000, 0.0008620000, 0.0007850000, 0.0007170000 ] + +kErrors[4] = @[1.0, 1.0, 1.0, 1.0, + 0.1600000000, 0.0920000000, 0.0561000000, 0.0359000000, 0.0240000000, + 0.0166000000, 0.0118000000, 0.0086400000, 0.0064600000, 0.0049200000, + 0.0038100000, 0.0030000000, 0.0023900000, 0.0019300000, 0.0015800000, + 0.0013000000, 0.0010800000, 0.0009050000, 0.0007640000, 0.0006490000, + 0.0005550000, 0.0004780000, 0.0004130000, 0.0003590000, 0.0003140000, + 0.0002760000, 0.0002430000, 0.0002150000, 0.0001910000 ] + +kErrors[5] = @[1.0, 1.0, 1.0, 1.0, 1.0, + 0.1010000000, 0.0578000000, 0.0347000000, 0.0217000000, 0.0141000000, + 0.0094300000, 0.0065000000, 0.0045900000, 0.0033200000, 0.0024400000, + 0.0018300000, 0.0013900000, 0.0010700000, 0.0008390000, 0.0006630000, + 0.0005300000, 0.0004270000, 0.0003470000, 0.0002850000, 0.0002350000, + 0.0001960000, 0.0001640000, 0.0001380000, 0.0001170000, 0.0000996000, + 0.0000853000, 0.0000733000, 0.0000633000 ] + +kErrors[6] = @[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 0.0638000000, 0.0364000000, 0.0216000000, 0.0133000000, 0.0084400000, + 0.0055200000, 0.0037100000, 0.0025500000, 0.0017900000, 0.0012800000, + 0.0009350000, 0.0006920000, 0.0005190000, 0.0003940000, 0.0003030000, + 0.0002360000, 0.0001850000, 0.0001470000, 0.0001170000, 0.0000944000, + 0.0000766000, 0.0000626000, 0.0000515000, 0.0000426000, 0.0000355000, + 0.0000297000, 0.0000250000 ] + +kErrors[7] = @[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 0.0229000000, 0.0135000000, 0.0081900000, 0.0051300000, 0.0032900000, + 0.0021700000, 0.0014600000, 0.0010000000, 0.0007020000, 0.0004990000, + 0.0003600000, 0.0002640000, 0.0001960000, 0.0001470000, 0.0001120000, + 0.0000856000, 0.0000663000, 0.0000518000, 0.0000408000, 0.0000324000, + 0.0000259000, 0.0000209000, 0.0000169000, 0.0000138000, 0.0000113000 ] + +kErrors[8] = @[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 0.0145000000, 0.0084600000, 0.0050900000, 0.0031400000, 0.0019900000, + 0.0012900000, 0.0008520000, 0.0005740000, 0.0003940000, 0.0002750000, + 0.0001940000, 0.0001400000, 0.0001010000, 0.0000746000, 0.0000555000, + 0.0000417000, 0.0000316000, 0.0000242000, 0.0000187000, 0.0000146000, + 0.0000114000, 0.0000090100, 0.0000071600, 0.0000057300 ] + +kErrors[9] = @[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 0.0053100000, 0.0031700000, 0.0019400000, 0.0012100000, 0.0007750000, + 0.0005050000, 0.0003350000, 0.0002260000, 0.0001550000, 0.0001080000, + 0.0000759000, 0.0000542000, 0.0000392000, 0.0000286000, 0.0000211000, + 0.0000157000, 0.0000118000, 0.0000089600, 0.0000068500, 0.0000052800, + 0.0000041000, 0.0000032000] + +kErrors[10] = @[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 0.0033400000, 0.0019800000, 0.0012000000, 0.0007440000, 0.0004700000, + 0.0003020000, 0.0001980000, 0.0001320000, 0.0000889000, 0.0000609000, + 0.0000423000, 0.0000297000, 0.0000211000, 0.0000152000, 0.0000110000, + 0.0000080700, 0.0000059700, 0.0000044500, 0.0000033500, 0.0000025400, + 0.0000019400] + +kErrors[11] = @[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 0.0021000000, 0.0012400000, 0.0007470000, 0.0004590000, 0.0002870000, + 0.0001830000, 0.0001180000, 0.0000777000, 0.0000518000, 0.0000350000, + 0.0000240000, 0.0000166000, 0.0000116000, 0.0000082300, 0.0000058900, + 0.0000042500, 0.0000031000, 0.0000022800, 0.0000016900, 0.0000012600] + +kErrors[12] = @[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 0.0007780000, 0.0004660000, 0.0002840000, 0.0001760000, 0.0001110000, + 0.0000712000, 0.0000463000, 0.0000305000, 0.0000204000, 0.0000138000, + 0.0000094200, 0.0000065200, 0.0000045600, 0.0000032200, 0.0000022900, + 0.0000016500, 0.0000012000, 0.0000008740] diff --git a/nim-bloom/tests/config.nims b/nim-bloom/tests/config.nims new file mode 100644 index 0000000..80091ff --- /dev/null +++ b/nim-bloom/tests/config.nims @@ -0,0 +1 @@ +switch("path", "$projectDir/../src") diff --git a/nim-bloom/tests/test.nim b/nim-bloom/tests/test.nim new file mode 100644 index 0000000..53c7e35 --- /dev/null +++ b/nim-bloom/tests/test.nim @@ -0,0 +1,102 @@ +import unittest +include bloom +from random import rand, randomize +import times + +suite "murmur": + # Test murmurhash 3 + setup: + var hashOutputs: MurmurHashes + hashOutputs = [0, 0] + rawMurmurHash("hello", 5, 0, hashOutputs) + + test "raw": + check int(hashOutputs[0]) == -3758069500696749310 # Correct murmur outputs (cast to int64) + check int(hashOutputs[1]) == 6565844092913065241 + + test "wrapped": + let hashOutputs2 = murmurHash("hello", 0) + check hashOutputs2[0] == hashOutputs[0] + check hashOutputs2[1] == hashOutputs[1] + + test "seed": + let hashOutputs3 = murmurHash("hello", 10) + check hashOutputs3[0] != hashOutputs[0] + check hashOutputs3[1] != hashOutputs[1] + + +suite "bloom": + + setup: + let nElementsToTest = 100000 + var bf = initializeBloomFilter(capacity = nElementsToTest, errorRate = 0.001) + randomize(2882) # Seed the RNG + var + sampleChars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" + kTestElements, sampleLetters: seq[string] + kTestElements = newSeq[string](nElementsToTest) + sampleLetters = newSeq[string](62) + + for i in 0..= 1.6.0" -requires "nimsha2" +requires "nim >= 2.0.8" requires "chronicles" +# Tasks task test, "Run the test suite": exec "nim c -r tests/test_reliability.nim" \ No newline at end of file diff --git a/src/common.nim b/src/common.nim new file mode 100644 index 0000000..ca776d5 --- /dev/null +++ b/src/common.nim @@ -0,0 +1,81 @@ +import std/[times, json, locks] +import "../nim-bloom/src/bloom" + +type + MessageID* = string + + Message* = object + senderId*: string + messageId*: MessageID + lamportTimestamp*: int64 + causalHistory*: seq[MessageID] + channelId*: string + content*: string + + UnacknowledgedMessage* = object + message*: Message + sendTime*: Time + resendAttempts*: int + + TimestampedMessageID* = object + id*: MessageID + timestamp*: Time + + RollingBloomFilter* = object + filter*: BloomFilter + window*: Duration + messages*: seq[TimestampedMessageID] + + ReliabilityConfig* = object + bloomFilterCapacity*: int + bloomFilterErrorRate*: float + bloomFilterWindow*: Duration + maxMessageHistory*: int + maxCausalHistory*: int + resendInterval*: Duration + maxResendAttempts*: int + + ReliabilityManager* = ref object + lamportTimestamp*: int64 + messageHistory*: seq[MessageID] + bloomFilter*: RollingBloomFilter + outgoingBuffer*: seq[UnacknowledgedMessage] + incomingBuffer*: seq[Message] + channelId*: string + config*: ReliabilityConfig + lock*: Lock + onMessageReady*: proc(messageId: MessageID) + onMessageSent*: proc(messageId: MessageID) + onMissingDependencies*: proc(messageId: MessageID, missingDeps: seq[MessageID]) + + ReliabilityError* = enum + reSuccess, + reInvalidArgument, + reOutOfMemory, + reInternalError, + reSerializationError, + reDeserializationError, + reMessageTooLarge + + Result*[T] = object + case isOk*: bool + of true: + value*: T + of false: + error*: ReliabilityError + +const + DefaultBloomFilterCapacity* = 10000 + DefaultBloomFilterErrorRate* = 0.001 + DefaultBloomFilterWindow* = initDuration(hours = 1) + DefaultMaxMessageHistory* = 1000 + DefaultMaxCausalHistory* = 10 + DefaultResendInterval* = initDuration(seconds = 30) + DefaultMaxResendAttempts* = 5 + MaxMessageSize* = 1024 * 1024 # 1 MB + +proc ok*[T](value: T): Result[T] = + Result[T](isOk: true, value: value) + +proc err*[T](error: ReliabilityError): Result[T] = + Result[T](isOk: false, error: error) \ No newline at end of file diff --git a/src/reliability.nim b/src/reliability.nim index 87d238a..3abc5b2 100644 --- a/src/reliability.nim +++ b/src/reliability.nim @@ -1,273 +1,152 @@ -import std/[times, hashes, random, sequtils, algorithm] -import nimsha2 -import chronicles +import ./common, ./utils -const - BloomFilterSize = 10000 - BloomFilterHashCount = 7 - MaxMessageHistory = 100 - MaxCausalHistory = 10 - -type - MessageID* = string - - Message* = object - senderId*: string - messageId*: MessageID - lamportTimestamp*: int64 - causalHistory*: seq[MessageID] - channelId*: string - content*: string - bloomFilter*: RollingBloomFilter - - UnacknowledgedMessage* = object - message*: Message - sendTime*: Time - resendAttempts*: int - - RollingBloomFilter* = object - # TODO: Implement a proper Bloom filter - data: array[BloomFilterSize, bool] - hashCount: int - - ReliabilityManager* = ref object - lamportTimestamp: int64 - messageHistory: seq[MessageID] - bloomFilter: RollingBloomFilter - outgoingBuffer: seq[UnacknowledgedMessage] - incomingBuffer: seq[Message] - channelId: string - onMessageReady*: proc(messageId: MessageID) - onMessageSent*: proc(messageId: MessageID) - onMissingDependencies*: proc(messageId: MessageID, missingDeps: seq[MessageID]) - -proc hash(filter: RollingBloomFilter): Hash = - var h: Hash = 0 - for idx, val in filter.data: - h = h !& hash(idx) !& hash(val) - result = !$h - -proc newRollingBloomFilter(): RollingBloomFilter = - result.hashCount = BloomFilterHashCount - -proc add(filter: var RollingBloomFilter, item: MessageID) = - let itemHash = hash(item) - for i in 0 ..< filter.hashCount: - let idx = (itemHash + i * i) mod BloomFilterSize - filter.data[idx] = true - -proc contains(filter: RollingBloomFilter, item: MessageID): bool = - let itemHash = hash(item) - for i in 0 ..< filter.hashCount: - let idx = (itemHash + i * i) mod BloomFilterSize - if not filter.data[idx]: - return false - return true - -proc newReliabilityManager*(channelId: string): ReliabilityManager = - result = ReliabilityManager( - lamportTimestamp: 0, - messageHistory: @[], - bloomFilter: newRollingBloomFilter(), - outgoingBuffer: @[], - incomingBuffer: @[], - channelId: channelId +proc defaultConfig*(): ReliabilityConfig = + ReliabilityConfig( + bloomFilterCapacity: DefaultBloomFilterCapacity, + bloomFilterErrorRate: DefaultBloomFilterErrorRate, + bloomFilterWindow: DefaultBloomFilterWindow, + maxMessageHistory: DefaultMaxMessageHistory, + maxCausalHistory: DefaultMaxCausalHistory, + resendInterval: DefaultResendInterval, + maxResendAttempts: DefaultMaxResendAttempts ) -proc generateUniqueID(): MessageID = - $secureHash($getTime().toUnix & $rand(high(int))) +proc newReliabilityManager*(channelId: string, config: ReliabilityConfig = defaultConfig()): Result[ReliabilityManager] = + if channelId.len == 0: + return err[ReliabilityManager](reInvalidArgument) + + try: + let bloomFilterResult = newRollingBloomFilter(config.bloomFilterCapacity, config.bloomFilterErrorRate, config.bloomFilterWindow) + if bloomFilterResult.isErr: + return err[ReliabilityManager](bloomFilterResult.error) -proc updateLamportTimestamp(rm: ReliabilityManager, msgTs: int64) = - rm.lamportTimestamp = max(msgTs, rm.lamportTimestamp) + 1 + let rm = ReliabilityManager( + lamportTimestamp: 0, + messageHistory: @[], + bloomFilter: bloomFilterResult.value, + outgoingBuffer: @[], + incomingBuffer: @[], + channelId: channelId, + config: config + ) + initLock(rm.lock) + return ok(rm) + except: + return err[ReliabilityManager](reOutOfMemory) -proc getRecentMessageIDs(rm: ReliabilityManager, n: int): seq[MessageID] = - result = rm.messageHistory[max(0, rm.messageHistory.len - n) .. ^1] +proc wrapOutgoingMessage*(rm: ReliabilityManager, message: string): Result[Message] = + if message.len == 0: + return err[Message](reInvalidArgument) + if message.len > MaxMessageSize: + return err[Message](reMessageTooLarge) -proc wrapOutgoingMessage*(rm: ReliabilityManager, message: string): Message = - rm.updateLamportTimestamp(getTime().toUnix) - let msg = Message( - senderId: "TODO_SENDER_ID", - messageId: generateUniqueID(), - lamportTimestamp: rm.lamportTimestamp, - causalHistory: rm.getRecentMessageIDs(MaxCausalHistory), - channelId: rm.channelId, - content: message, - bloomFilter: rm.bloomFilter - ) - rm.outgoingBuffer.add(UnacknowledgedMessage(message: msg, sendTime: getTime(), resendAttempts: 0)) - msg + withLock rm.lock: + try: + let msg = Message( + senderId: "TODO_SENDER_ID", + messageId: generateUniqueID(), + lamportTimestamp: rm.lamportTimestamp, + causalHistory: rm.getRecentMessageIDs(rm.config.maxCausalHistory), + channelId: rm.channelId, + content: message + ) + rm.updateLamportTimestamp(getTime().toUnix) + rm.outgoingBuffer.add(UnacknowledgedMessage(message: msg, sendTime: getTime(), resendAttempts: 0)) + return ok(msg) + except: + return err[Message](reInternalError) -proc unwrapReceivedMessage*(rm: ReliabilityManager, message: Message): tuple[message: Message, missingDeps: seq[MessageID]] = - if rm.bloomFilter.contains(message.messageId): - return (message, @[]) +proc unwrapReceivedMessage*(rm: ReliabilityManager, message: Message): Result[tuple[message: Message, missingDeps: seq[MessageID]]] = + withLock rm.lock: + try: + if rm.bloomFilter.contains(message.messageId): + return ok((message, @[])) - rm.bloomFilter.add(message.messageId) - rm.updateLamportTimestamp(message.lamportTimestamp) + rm.bloomFilter.add(message.messageId) + rm.updateLamportTimestamp(message.lamportTimestamp) - var missingDeps: seq[MessageID] = @[] - for depId in message.causalHistory: - if depId notin rm.messageHistory: - missingDeps.add(depId) + var missingDeps: seq[MessageID] = @[] + for depId in message.causalHistory: + if not rm.bloomFilter.contains(depId): + missingDeps.add(depId) - if missingDeps.len == 0: - rm.messageHistory.add(message.messageId) - if rm.messageHistory.len > MaxMessageHistory: - rm.messageHistory.delete(0) - if rm.onMessageReady != nil: - rm.onMessageReady(message.messageId) - else: - rm.incomingBuffer.add(message) - if rm.onMissingDependencies != nil: - rm.onMissingDependencies(message.messageId, missingDeps) + if missingDeps.len == 0: + rm.messageHistory.add(message.messageId) + if rm.messageHistory.len > rm.config.maxMessageHistory: + rm.messageHistory.delete(0) + if rm.onMessageReady != nil: + rm.onMessageReady(message.messageId) + else: + rm.incomingBuffer.add(message) + if rm.onMissingDependencies != nil: + rm.onMissingDependencies(message.messageId, missingDeps) - (message, missingDeps) + return ok((message, missingDeps)) + except: + return err[(Message, seq[MessageID])](reInternalError) -proc markDependenciesMet*(rm: ReliabilityManager, messageIds: seq[MessageID]) = - var processedMessages: seq[Message] = @[] - rm.incomingBuffer = rm.incomingBuffer.filterIt( - not messageIds.allIt(it in it.causalHistory or it in rm.messageHistory) - ) +proc markDependenciesMet*(rm: ReliabilityManager, messageIds: seq[MessageID]): Result[void] = + withLock rm.lock: + try: + var processedMessages: seq[Message] = @[] + rm.incomingBuffer = rm.incomingBuffer.filterIt( + not messageIds.allIt(it in it.causalHistory or rm.bloomFilter.contains(it)) + ) - for msg in processedMessages: - rm.messageHistory.add(msg.messageId) - if rm.messageHistory.len > MaxMessageHistory: - rm.messageHistory.delete(0) - if rm.onMessageReady != nil: - rm.onMessageReady(msg.messageId) - -proc checkUnacknowledgedMessages(rm: ReliabilityManager) = - let now = getTime() - var newOutgoingBuffer: seq[UnacknowledgedMessage] = @[] - for msg in rm.outgoingBuffer: - if (now - msg.sendTime).inSeconds < 60: - newOutgoingBuffer.add(msg) - elif rm.onMessageSent != nil: - rm.onMessageSent(msg.message.messageId) - rm.outgoingBuffer = newOutgoingBuffer + for msg in processedMessages: + rm.messageHistory.add(msg.messageId) + if rm.messageHistory.len > rm.config.maxMessageHistory: + rm.messageHistory.delete(0) + if rm.onMessageReady != nil: + rm.onMessageReady(msg.messageId) + + return ok() + except: + return err[void](reInternalError) proc setCallbacks*(rm: ReliabilityManager, onMessageReady: proc(messageId: MessageID), onMessageSent: proc(messageId: MessageID), onMissingDependencies: proc(messageId: MessageID, missingDeps: seq[MessageID])) = - rm.onMessageReady = onMessageReady - rm.onMessageSent = onMessageSent - rm.onMissingDependencies = onMissingDependencies + withLock rm.lock: + rm.onMessageReady = onMessageReady + rm.onMessageSent = onMessageSent + rm.onMissingDependencies = onMissingDependencies -# Logging -proc logInfo(msg: string) = - info msg +# proc checkUnacknowledgedMessages*(rm: ReliabilityManager) -proc logError(msg: string) = - error msg - -# Export C API -{.push exportc, cdecl.} - -type - CMessage {.bycopy.} = object - senderId: cstring - messageId: cstring - lamportTimestamp: int64 - causalHistory: ptr UncheckedArray[cstring] - causalHistoryLen: cint - channelId: cstring - content: cstring - bloomFilter: pointer - - CUnwrapResult {.bycopy.} = object - message: CMessage - missingDeps: ptr UncheckedArray[cstring] - missingDepsLen: cint - -proc reliability_manager_new(channelId: cstring): pointer {.exportc, cdecl.} = - let rm = newReliabilityManager($channelId) - GC_ref(rm) - result = cast[pointer](rm) - -proc reliability_manager_free(rmPtr: pointer) {.exportc, cdecl.} = - let rm = cast[ReliabilityManager](rmPtr) - GC_unref(rm) - -proc wrap_outgoing_message(rmPtr: pointer, message: cstring): CMessage {.exportc, cdecl.} = - let rm = cast[ReliabilityManager](rmPtr) - let wrappedMsg = rm.wrapOutgoingMessage($message) - - result.senderId = wrappedMsg.senderId.cstring - result.messageId = wrappedMsg.messageId.cstring - result.lamportTimestamp = wrappedMsg.lamportTimestamp - result.causalHistory = cast[ptr UncheckedArray[cstring]](alloc0(wrappedMsg.causalHistory.len * sizeof(cstring))) - result.causalHistoryLen = wrappedMsg.causalHistory.len.cint - for i, id in wrappedMsg.causalHistory: - result.causalHistory[i] = id.cstring - result.channelId = wrappedMsg.channelId.cstring - result.content = wrappedMsg.content.cstring - result.bloomFilter = cast[pointer](addr wrappedMsg.bloomFilter) - -proc unwrap_received_message(rmPtr: pointer, msg: CMessage): CUnwrapResult {.exportc, cdecl.} = - let rm = cast[ReliabilityManager](rmPtr) - var nimMsg = Message( - senderId: $msg.senderId, - messageId: $msg.messageId, - lamportTimestamp: msg.lamportTimestamp, - causalHistory: newSeq[string](msg.causalHistoryLen), - channelId: $msg.channelId, - content: $msg.content, - bloomFilter: cast[RollingBloomFilter](msg.bloomFilter)[] - ) - for i in 0 ..< msg.causalHistoryLen: - nimMsg.causalHistory[i] = $msg.causalHistory[i] - - let (unwrappedMsg, missingDeps) = rm.unwrapReceivedMessage(nimMsg) - - result.message = CMessage( - senderId: unwrappedMsg.senderId.cstring, - messageId: unwrappedMsg.messageId.cstring, - lamportTimestamp: unwrappedMsg.lamportTimestamp, - causalHistory: cast[ptr UncheckedArray[cstring]](alloc0(unwrappedMsg.causalHistory.len * sizeof(cstring))), - causalHistoryLen: unwrappedMsg.causalHistory.len.cint, - channelId: unwrappedMsg.channelId.cstring, - content: unwrappedMsg.content.cstring, - bloomFilter: cast[pointer](addr unwrappedMsg.bloomFilter) - ) - for i, id in unwrappedMsg.causalHistory: - result.message.causalHistory[i] = id.cstring - - result.missingDeps = cast[ptr UncheckedArray[cstring]](alloc0(missingDeps.len * sizeof(cstring))) - result.missingDepsLen = missingDeps.len.cint - for i, id in missingDeps: - result.missingDeps[i] = id.cstring - -proc mark_dependencies_met(rmPtr: pointer, messageIds: ptr UncheckedArray[cstring], count: cint) {.exportc, cdecl.} = - let rm = cast[ReliabilityManager](rmPtr) - var nimMessageIds = newSeq[string](count) - for i in 0 ..< count: - nimMessageIds[i] = $messageIds[i] - rm.markDependenciesMet(nimMessageIds) - -proc set_callbacks(rmPtr: pointer, - onMessageReady: proc(messageId: cstring) {.cdecl.}, - onMessageSent: proc(messageId: cstring) {.cdecl.}, - onMissingDependencies: proc(messageId: cstring, missingDeps: ptr UncheckedArray[cstring], missingDepsLen: cint) {.cdecl.}) {.exportc, cdecl.} = - let rm = cast[ReliabilityManager](rmPtr) - rm.setCallbacks( - proc(messageId: MessageID) = onMessageReady(messageId.cstring), - proc(messageId: MessageID) = onMessageSent(messageId.cstring), - proc(messageId: MessageID, missingDeps: seq[MessageID]) = - var cMissingDeps = cast[ptr UncheckedArray[cstring]](alloc0(missingDeps.len * sizeof(cstring))) - for i, dep in missingDeps: - cMissingDeps[i] = dep.cstring - onMissingDependencies(messageId.cstring, cMissingDeps, missingDeps.len.cint) - dealloc(cMissingDeps) - ) - -{.pop.} +proc processMessage*(rm: ReliabilityManager, message: string): seq[MessageID] = + let wrappedMsg = checkAndLogError(rm.wrapOutgoingMessage(message), "Failed to wrap message") + let (_, missingDeps) = checkAndLogError(rm.unwrapReceivedMessage(wrappedMsg), "Failed to unwrap message") + return missingDeps when isMainModule: - # TODO: Add some basic tests / examples - let rm = newReliabilityManager("testChannel") - let msg = rm.wrapOutgoingMessage("Hello, World!") - echo "Wrapped message: ", msg - - let (unwrappedMsg, missingDeps) = rm.unwrapReceivedMessage(msg) - echo "Unwrapped message: ", unwrappedMsg - echo "Missing dependencies: ", missingDeps \ No newline at end of file + # Example usage and basic tests + let config = defaultConfig() + let rmResult = newReliabilityManager("testChannel", config) + if rmResult.isOk: + let rm = rmResult.value + rm.setCallbacks( + proc(messageId: MessageID) = echo "Message ready: ", messageId, + proc(messageId: MessageID) = echo "Message sent: ", messageId, + proc(messageId: MessageID, missingDeps: seq[MessageID]) = echo "Missing dependencies for ", messageId, ": ", missingDeps + ) + + let msgResult = rm.wrapOutgoingMessage("Hello, World!") + if msgResult.isOk: + let msg = msgResult.value + echo "Wrapped message: ", msg + + let unwrapResult = rm.unwrapReceivedMessage(msg) + if unwrapResult.isOk: + let (unwrappedMsg, missingDeps) = unwrapResult.value + echo "Unwrapped message: ", unwrappedMsg + echo "Missing dependencies: ", missingDeps + else: + echo "Error unwrapping message: ", unwrapResult.error + else: + echo "Error wrapping message: ", msgResult.error + + #rm.startPeriodicTasks() + else: + echo "Error creating ReliabilityManager: ", rmResult.error \ No newline at end of file diff --git a/src/utils.nim b/src/utils.nim new file mode 100644 index 0000000..dd4fa96 --- /dev/null +++ b/src/utils.nim @@ -0,0 +1,108 @@ +import std/[times, hashes, random, sequtils, algorithm, json, options, locks, asyncdispatch] +import chronicles +import "../nim-bloom/src/bloom" +import ./common + +proc newRollingBloomFilter*(capacity: int, errorRate: float, window: Duration): Result[RollingBloomFilter] = + try: + let filter = initializeBloomFilter(capacity, errorRate) + return ok(RollingBloomFilter( + filter: filter, + window: window, + messages: @[] + )) + except: + return err[RollingBloomFilter](reInternalError) + +proc add*(rbf: var RollingBloomFilter, messageId: MessageID) = + rbf.filter.insert(messageId) + rbf.messages.add(TimestampedMessageID(id: messageId, timestamp: getTime())) + +proc contains*(rbf: RollingBloomFilter, messageId: MessageID): bool = + rbf.filter.lookup(messageId) + +proc clean*(rbf: var RollingBloomFilter) = + ## Removes outdated entries from the rolling bloom filter. + let now = getTime() + let cutoff = now - rbf.window + var newMessages: seq[TimestampedMessageID] = @[] + var newFilter = initializeBloomFilter(rbf.filter.capacity, rbf.filter.errorRate) + + for msg in rbf.messages: + if msg.timestamp > cutoff: + newMessages.add(msg) + newFilter.insert(msg.id) + + rbf.messages = newMessages + rbf.filter = newFilter + +proc cleanBloomFilter*(rm: ReliabilityManager) = + ## Cleans the rolling bloom filter, removing outdated entries. + withLock rm.lock: + rm.bloomFilter.clean() + +proc updateLamportTimestamp(rm: ReliabilityManager, msgTs: int64) = + rm.lamportTimestamp = max(msgTs, rm.lamportTimestamp) + 1 + +proc getRecentMessageIDs(rm: ReliabilityManager, n: int): seq[MessageID] = + result = rm.messageHistory[max(0, rm.messageHistory.len - n) .. ^1] + +proc generateUniqueID*(): MessageID = + let timestamp = getTime().toUnix + let randomPart = rand(high(int)) + result = $hash($timestamp & $randomPart) + +proc serializeMessage*(msg: Message): Result[string] = + try: + let jsonNode = %*{ + "senderId": msg.senderId, + "messageId": msg.messageId, + "lamportTimestamp": msg.lamportTimestamp, + "causalHistory": msg.causalHistory, + "channelId": msg.channelId, + "content": msg.content + } + return ok($jsonNode) + except: + return err[string](reSerializationError) + +proc deserializeMessage*(data: string): Result[Message] = + try: + let jsonNode = parseJson(data) + return ok(Message( + senderId: jsonNode["senderId"].getStr, + messageId: jsonNode["messageId"].getStr, + lamportTimestamp: jsonNode["lamportTimestamp"].getBiggestInt, + causalHistory: jsonNode["causalHistory"].to(seq[string]), + channelId: jsonNode["channelId"].getStr, + content: jsonNode["content"].getStr + )) + except: + return err[Message](reDeserializationError) + +proc getMessageHistory*(rm: ReliabilityManager): seq[MessageID] = + withLock rm.lock: + return rm.messageHistory + +proc getOutgoingBufferSize*(rm: ReliabilityManager): int = + withLock rm.lock: + return rm.outgoingBuffer.len + +proc getIncomingBufferSize*(rm: ReliabilityManager): int = + withLock rm.lock: + return rm.incomingBuffer.len + +proc logError*(msg: string) = + ## Logs an error message + error "ReliabilityError", message = msg + +proc logInfo*(msg: string) = + ## Logs an informational message + info "ReliabilityInfo", message = msg + +proc checkAndLogError*[T](res: Result[T], errorMsg: string): T = + if res.isOk: + return res.value + else: + logError(errorMsg & ": " & $res.error) + raise newException(ValueError, errorMsg) \ No newline at end of file diff --git a/tests/test_reliability.nim b/tests/test_reliability.nim index 3ddb36f..a7cb656 100644 --- a/tests/test_reliability.nim +++ b/tests/test_reliability.nim @@ -3,32 +3,50 @@ import ../src/reliability suite "ReliabilityManager": setup: - let rm = newReliabilityManager("testChannel") + let rmResult = newReliabilityManager("testChannel") + check rmResult.isOk + let rm = rmResult.value test "wrapOutgoingMessage": - let msg = rm.wrapOutgoingMessage("Hello, World!") + let msgResult = rm.wrapOutgoingMessage("Hello, World!") + check msgResult.isOk + let msg = msgResult.value check: msg.content == "Hello, World!" msg.channelId == "testChannel" msg.causalHistory.len == 0 test "unwrapReceivedMessage": - let wrappedMsg = rm.wrapOutgoingMessage("Test message") - let (unwrappedMsg, missingDeps) = rm.unwrapReceivedMessage(wrappedMsg) + let wrappedMsgResult = rm.wrapOutgoingMessage("Test message") + check wrappedMsgResult.isOk + let wrappedMsg = wrappedMsgResult.value + let unwrapResult = rm.unwrapReceivedMessage(wrappedMsg) + check unwrapResult.isOk + let (unwrappedMsg, missingDeps) = unwrapResult.value check: unwrappedMsg.content == "Test message" missingDeps.len == 0 test "markDependenciesMet": - let msg1 = rm.wrapOutgoingMessage("Message 1") - let msg2 = rm.wrapOutgoingMessage("Message 2") - let msg3 = rm.wrapOutgoingMessage("Message 3") + var msg1Result = rm.wrapOutgoingMessage("Message 1") + var msg2Result = rm.wrapOutgoingMessage("Message 2") + var msg3Result = rm.wrapOutgoingMessage("Message 3") + check msg1Result.isOk and msg2Result.isOk and msg3Result.isOk + let msg1 = msg1Result.value + let msg2 = msg2Result.value + let msg3 = msg3Result.value - var (_, missingDeps) = rm.unwrapReceivedMessage(msg3) + var unwrapResult = rm.unwrapReceivedMessage(msg3) + check unwrapResult.isOk + var (_, missingDeps) = unwrapResult.value check missingDeps.len == 2 - rm.markDependenciesMet(@[msg1.messageId, msg2.messageId]) - (_, missingDeps) = rm.unwrapReceivedMessage(msg3) + let markResult = rm.markDependenciesMet(@[msg1.messageId, msg2.messageId]) + check markResult.isOk + + unwrapResult = rm.unwrapReceivedMessage(msg3) + check unwrapResult.isOk + (_, missingDeps) = unwrapResult.value check missingDeps.len == 0 test "callbacks": @@ -42,8 +60,11 @@ suite "ReliabilityManager": proc(messageId: MessageID, missingDeps: seq[MessageID]) = missingDepsCount += 1 ) - let msg1 = rm.wrapOutgoingMessage("Message 1") - let msg2 = rm.wrapOutgoingMessage("Message 2") + let msg1Result = rm.wrapOutgoingMessage("Message 1") + let msg2Result = rm.wrapOutgoingMessage("Message 2") + check msg1Result.isOk and msg2Result.isOk + let msg1 = msg1Result.value + let msg2 = msg2Result.value discard rm.unwrapReceivedMessage(msg1) discard rm.unwrapReceivedMessage(msg2) @@ -52,59 +73,20 @@ suite "ReliabilityManager": messageSentCount == 0 # This would be triggered by the checkUnacknowledgedMessages function missingDepsCount == 0 - test "lamport timestamps": - let msg1 = rm.wrapOutgoingMessage("Message 1") - let msg2 = rm.wrapOutgoingMessage("Message 2") - check msg2.lamportTimestamp > msg1.lamportTimestamp - - let msg3 = Message(lamportTimestamp: msg2.lamportTimestamp + 10, messageId: generateUniqueID(), content: "Message 3") - discard rm.unwrapReceivedMessage(msg3) - let msg4 = rm.wrapOutgoingMessage("Message 4") - check msg4.lamportTimestamp > msg3.lamportTimestamp - - test "causal history": - let msg1 = rm.wrapOutgoingMessage("Message 1") - let msg2 = rm.wrapOutgoingMessage("Message 2") - let msg3 = rm.wrapOutgoingMessage("Message 3") - + test "serialization": + let msgResult = rm.wrapOutgoingMessage("Test serialization") + check msgResult.isOk + let msg = msgResult.value + let serializeResult = serializeMessage(msg) + check serializeResult.isOk + let serialized = serializeResult.value + let deserializeResult = deserializeMessage(serialized) + check deserializeResult.isOk + let deserialized = deserializeResult.value check: - msg2.causalHistory.contains(msg1.messageId) - msg3.causalHistory.contains(msg2.messageId) - msg3.causalHistory.contains(msg1.messageId) + deserialized.content == "Test serialization" + deserialized.messageId == msg.messageId + deserialized.lamportTimestamp == msg.lamportTimestamp - test "bloom filter": - let msg1 = rm.wrapOutgoingMessage("Message 1") - let (_, missingDeps1) = rm.unwrapReceivedMessage(msg1) - check missingDeps1.len == 0 - - let (_, missingDeps2) = rm.unwrapReceivedMessage(msg1) - check missingDeps2.len == 0 # The message should be in the bloom filter and not processed again - - test "message history limit": - for i in 1..MaxMessageHistory + 10: - let msg = rm.wrapOutgoingMessage($i) - discard rm.unwrapReceivedMessage(msg) - - check rm.messageHistory.len <= MaxMessageHistory - - test "missing dependencies callback": - var missingDepsReceived: seq[MessageID] = @[] - rm.setCallbacks( - proc(messageId: MessageID) = discard, - proc(messageId: MessageID) = discard, - proc(messageId: MessageID, missingDeps: seq[MessageID]) = missingDepsReceived = missingDeps - ) - - let msg1 = rm.wrapOutgoingMessage("Message 1") - let msg2 = rm.wrapOutgoingMessage("Message 2") - let msg3 = Message( - messageId: generateUniqueID(), - lamportTimestamp: msg2.lamportTimestamp + 1, - causalHistory: @[msg1.messageId, msg2.messageId], - content: "Message 3" - ) - - discard rm.unwrapReceivedMessage(msg3) - check missingDepsReceived.len == 2 - check missingDepsReceived.contains(msg1.messageId) - check missingDepsReceived.contains(msg2.messageId) \ No newline at end of file +when isMainModule: + unittest.run() \ No newline at end of file