diff --git a/.gitignore b/.gitignore index cfc9510..6f39798 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,6 @@ nimcache nimcache/* -tests/bloom +tests/test_bloom nim-bloom/bloom .DS_Store src/.DS_Store \ No newline at end of file diff --git a/src/message.nim b/src/message.nim index d792113..436bec9 100644 --- a/src/message.nim +++ b/src/message.nim @@ -1,13 +1,14 @@ import std/times type - MessageID* = string + MessageID* = seq[byte] + ChannelID* = seq[byte] Message* = object messageId*: MessageID lamportTimestamp*: int64 causalHistory*: seq[MessageID] - channelId*: string + channelId*: ChannelID content*: seq[byte] bloomFilter*: seq[byte] diff --git a/src/private/probabilities.nim b/src/private/probabilities.nim index 6eeb28e..209a515 100644 --- a/src/private/probabilities.nim +++ b/src/private/probabilities.nim @@ -9,9 +9,7 @@ type TErrorForK = seq[float] TAllErrorRates* = array[0..12, TErrorForK] -var kErrors* {.threadvar.}: TAllErrorRates - -kErrors = [ +const kErrors*: TAllErrorRates = [ @[1.0], @[1.0, 1.0, 0.3930000000, 0.2830000000, 0.2210000000, 0.1810000000, 0.1540000000, 0.1330000000, 0.1180000000, 0.1050000000, 0.0952000000, diff --git a/src/protobuf.nim b/src/protobuf.nim index b929fb2..0d09d55 100644 --- a/src/protobuf.nim +++ b/src/protobuf.nim @@ -3,18 +3,14 @@ import std/options import endians import ../src/[message, protobufutil, bloom, reliability_utils] -proc toBytes(s: string): seq[byte] = - result = newSeq[byte](s.len) - copyMem(result[0].addr, s[0].unsafeAddr, s.len) - proc encode*(msg: Message): ProtoBuffer = var pb = initProtoBuffer() - pb.write(1, msg.messageId) + pb.write(1, msg.messageId) pb.write(2, uint64(msg.lamportTimestamp)) for hist in msg.causalHistory: - pb.write(3, hist.toBytes) # Convert string to bytes for proper length handling + pb.write(3, hist) pb.write(4, msg.channelId) pb.write(5, msg.content) @@ -35,8 +31,7 @@ proc decode*(T: type Message, buffer: seq[byte]): ProtobufResult[T] = return err(ProtobufError.missingRequiredField("lamportTimestamp")) msg.lamportTimestamp = int64(timestamp) - # Decode causal history - var causalHistory: seq[string] + var causalHistory: seq[seq[byte]] let histResult = pb.getRepeatedField(3, causalHistory) if histResult.isOk: msg.causalHistory = causalHistory @@ -53,23 +48,19 @@ proc decode*(T: type Message, buffer: seq[byte]): ProtobufResult[T] = ok(msg) proc serializeMessage*(msg: Message): Result[seq[byte], ReliabilityError] = - try: - let pb = encode(msg) - ok(pb.buffer) - except: - return err(reSerializationError) + let pb = encode(msg) + ok(pb.buffer) proc deserializeMessage*(data: seq[byte]): Result[Message, ReliabilityError] = let msg = Message.decode(data).valueOr: return err(ReliabilityError.reDeserializationError) - ok(msg) proc serializeBloomFilter*(filter: BloomFilter): Result[seq[byte], ReliabilityError] = - try: - var pb = initProtoBuffer() + var pb = initProtoBuffer() - # Convert intArray to bytes + # Convert intArray to bytes + try: var bytes = newSeq[byte](filter.intArray.len * sizeof(int)) for i, val in filter.intArray: var leVal: int @@ -82,27 +73,27 @@ proc serializeBloomFilter*(filter: BloomFilter): Result[seq[byte], ReliabilityEr pb.write(3, uint64(filter.errorRate * 1_000_000)) pb.write(4, uint64(filter.kHashes)) pb.write(5, uint64(filter.mBits)) - - pb.finish() - ok(pb.buffer) except: - return err(ReliabilityError.reSerializationError) + return err(ReliabilityError.reSerializationError) + + pb.finish() + ok(pb.buffer) proc deserializeBloomFilter*(data: seq[byte]): Result[BloomFilter, ReliabilityError] = if data.len == 0: return err(ReliabilityError.reDeserializationError) - try: - let pb = initProtoBuffer(data) - var bytes: seq[byte] - var cap, errRate, kHashes, mBits: uint64 - + let pb = initProtoBuffer(data) + var bytes: seq[byte] + var cap, errRate, kHashes, mBits: uint64 + + try: if not pb.getField(1, bytes).get() or not pb.getField(2, cap).get() or not pb.getField(3, errRate).get() or not pb.getField(4, kHashes).get() or not pb.getField(5, mBits).get(): - return err(reDeserializationError) + return err(ReliabilityError.reDeserializationError) # Convert bytes back to intArray var intArray = newSeq[int](bytes.len div sizeof(int)) diff --git a/src/reliability_utils.nim b/src/reliability_utils.nim index 34b02ff..ef20785 100644 --- a/src/reliability_utils.nim +++ b/src/reliability_utils.nim @@ -1,4 +1,5 @@ import std/[times, locks] +import chronicles import ./[rolling_bloom_filter, message] type @@ -9,10 +10,10 @@ type bloomFilterErrorRate*: float maxMessageHistory*: int maxCausalHistory*: int - resendInterval*: times.Duration + resendInterval*: Duration maxResendAttempts*: int - syncMessageInterval*: times.Duration - bufferSweepInterval*: times.Duration + syncMessageInterval*: Duration + bufferSweepInterval*: Duration ReliabilityManager* = ref object lamportTimestamp*: int64 @@ -20,7 +21,7 @@ type bloomFilter*: RollingBloomFilter outgoingBuffer*: seq[UnacknowledgedMessage] incomingBuffer*: seq[Message] - channelId*: string + channelId*: ChannelID config*: ReliabilityConfig lock*: Lock onMessageReady*: proc(messageId: MessageID) {.gcsafe.} @@ -60,17 +61,15 @@ proc cleanup*(rm: ReliabilityManager) {.raises: [].} = rm.outgoingBuffer.setLen(0) rm.incomingBuffer.setLen(0) rm.messageHistory.setLen(0) - except ValueError as e: - logError("Error during cleanup: " & e.msg) except Exception: - logError("Error during cleanup: " & getCurrentExceptionMsg()) + error "Error during cleanup", msg = getCurrentExceptionMsg() proc cleanBloomFilter*(rm: ReliabilityManager) {.gcsafe, raises: [].} = withLock rm.lock: try: rm.bloomFilter.clean() except Exception: - logError("Failed to clean ReliabilityManager bloom filter: " & getCurrentExceptionMsg()) + error "Failed to clean bloom filter", msg = getCurrentExceptionMsg() proc addToHistory*(rm: ReliabilityManager, msgId: MessageID) {.gcsafe, raises: [].} = rm.messageHistory.add(msgId) diff --git a/src/rolling_bloom_filter.nim b/src/rolling_bloom_filter.nim index 75f5b44..2660c3a 100644 --- a/src/rolling_bloom_filter.nim +++ b/src/rolling_bloom_filter.nim @@ -15,48 +15,53 @@ const DefaultBloomFilterErrorRate* = 0.001 CapacityFlexPercent* = 20 -proc logError*(msg: string) = - error "ReliabilityError", message = msg +proc newRollingBloomFilter*(capacity: int = DefaultBloomFilterCapacity, + errorRate: float = DefaultBloomFilterErrorRate): RollingBloomFilter {.gcsafe.} = + let targetCapacity = if capacity <= 0: DefaultBloomFilterCapacity else: capacity + let targetError = if errorRate <= 0.0 or errorRate >= 1.0: DefaultBloomFilterErrorRate else: errorRate + + let filterResult = initializeBloomFilter(targetCapacity, targetError) + if filterResult.isErr: + error "Failed to initialize bloom filter", error = filterResult.error + # Try with default values if custom values failed + if capacity != DefaultBloomFilterCapacity or errorRate != DefaultBloomFilterErrorRate: + let defaultResult = initializeBloomFilter(DefaultBloomFilterCapacity, DefaultBloomFilterErrorRate) + if defaultResult.isErr: + error "Failed to initialize bloom filter with default parameters", error = defaultResult.error + + let minCapacity = (DefaultBloomFilterCapacity.float * (100 - CapacityFlexPercent).float / 100.0).int + let maxCapacity = (DefaultBloomFilterCapacity.float * (100 + CapacityFlexPercent).float / 100.0).int + + info "Successfully initialized bloom filter with default parameters", + capacity = DefaultBloomFilterCapacity, + minCapacity = minCapacity, + maxCapacity = maxCapacity -proc logInfo*(msg: string) = - info "ReliabilityInfo", message = msg - -proc newRollingBloomFilter*(capacity: int, errorRate: float): RollingBloomFilter {.gcsafe.} = - try: - var filterResult: Result[BloomFilter, string] - {.gcsafe.}: - filterResult = initializeBloomFilter(capacity, errorRate) - - if filterResult.isOk: - logInfo("Successfully initialized bloom filter") - let targetCapacity = capacity - let minCapacity = (capacity.float * 0.8).int - let maxCapacity = (capacity.float * 1.2).int return RollingBloomFilter( - filter: filterResult.get(), - capacity: targetCapacity, + filter: defaultResult.get(), + capacity: DefaultBloomFilterCapacity, minCapacity: minCapacity, maxCapacity: maxCapacity, messages: @[] ) else: - logError("Failed to initialize bloom filter: " & filterResult.error) - - except Exception: - logError("Failed to initialize bloom filter: " & getCurrentExceptionMsg()) + error "Could not create bloom filter", error = filterResult.error + + let minCapacity = (targetCapacity.float * (100 - CapacityFlexPercent).float / 100.0).int + let maxCapacity = (targetCapacity.float * (100 + CapacityFlexPercent).float / 100.0).int - # Default fallback case - let defaultResult = initializeBloomFilter(DefaultBloomFilterCapacity, DefaultBloomFilterErrorRate) - if defaultResult.isOk: - return RollingBloomFilter( - filter: defaultResult.get(), - capacity: DefaultBloomFilterCapacity, - minCapacity: (DefaultBloomFilterCapacity.float * 0.8).int, - maxCapacity: (DefaultBloomFilterCapacity.float * 1.2).int, - messages: @[] - ) - else: - logError("Failed to initialize bloom filter with default parameters: " & defaultResult.error) + info "Successfully initialized bloom filter", + capacity = targetCapacity, + minCapacity = minCapacity, + maxCapacity = maxCapacity + + return RollingBloomFilter( + filter: filterResult.get(), + capacity: targetCapacity, + minCapacity: minCapacity, + maxCapacity: maxCapacity, + messages: @[] + ) proc clean*(rbf: var RollingBloomFilter) {.gcsafe.} = try: @@ -66,7 +71,7 @@ proc clean*(rbf: var RollingBloomFilter) {.gcsafe.} = # Initialize new filter let newFilterResult = initializeBloomFilter(rbf.maxCapacity, rbf.filter.errorRate) if newFilterResult.isErr: - logError("Failed to create new bloom filter: " & newFilterResult.error) + error "Failed to create new bloom filter", error = newFilterResult.error return var newFilter = newFilterResult.get() @@ -78,20 +83,20 @@ proc clean*(rbf: var RollingBloomFilter) {.gcsafe.} = for i in startIdx ..< rbf.messages.len: newMessages.add(rbf.messages[i]) - newFilter.insert(rbf.messages[i]) + newFilter.insert(cast[string](rbf.messages[i])) rbf.messages = newMessages rbf.filter = newFilter except Exception: - logError("Failed to clean bloom filter: " & getCurrentExceptionMsg()) + error "Failed to clean bloom filter", error = getCurrentExceptionMsg() proc add*(rbf: var RollingBloomFilter, messageId: MessageID) {.gcsafe.} = ## Adds a message ID to the rolling bloom filter. ## ## Parameters: ## - messageId: The ID of the message to add. - rbf.filter.insert(messageId) + rbf.filter.insert(cast[string](messageId)) rbf.messages.add(messageId) # Clean if we exceed max capacity @@ -106,4 +111,4 @@ proc contains*(rbf: RollingBloomFilter, messageId: MessageID): bool {.gcsafe.} = ## ## Returns: ## True if the message ID is probably in the filter, false otherwise. - rbf.filter.lookup(messageId) \ No newline at end of file + rbf.filter.lookup(cast[string](messageId)) \ No newline at end of file