chore: address new comments

This commit is contained in:
shash256 2025-02-06 15:23:12 +05:30
parent bb9e89b4a5
commit ac2e9c33fb
6 changed files with 74 additions and 80 deletions

2
.gitignore vendored
View File

@ -1,6 +1,6 @@
nimcache
nimcache/*
tests/bloom
tests/test_bloom
nim-bloom/bloom
.DS_Store
src/.DS_Store

View File

@ -1,13 +1,14 @@
import std/times
type
MessageID* = string
MessageID* = seq[byte]
ChannelID* = seq[byte]
Message* = object
messageId*: MessageID
lamportTimestamp*: int64
causalHistory*: seq[MessageID]
channelId*: string
channelId*: ChannelID
content*: seq[byte]
bloomFilter*: seq[byte]

View File

@ -9,9 +9,7 @@ type
TErrorForK = seq[float]
TAllErrorRates* = array[0..12, TErrorForK]
var kErrors* {.threadvar.}: TAllErrorRates
kErrors = [
const kErrors*: TAllErrorRates = [
@[1.0],
@[1.0, 1.0, 0.3930000000, 0.2830000000, 0.2210000000, 0.1810000000,
0.1540000000, 0.1330000000, 0.1180000000, 0.1050000000, 0.0952000000,

View File

@ -3,18 +3,14 @@ import std/options
import endians
import ../src/[message, protobufutil, bloom, reliability_utils]
proc toBytes(s: string): seq[byte] =
result = newSeq[byte](s.len)
copyMem(result[0].addr, s[0].unsafeAddr, s.len)
proc encode*(msg: Message): ProtoBuffer =
var pb = initProtoBuffer()
pb.write(1, msg.messageId)
pb.write(1, msg.messageId)
pb.write(2, uint64(msg.lamportTimestamp))
for hist in msg.causalHistory:
pb.write(3, hist.toBytes) # Convert string to bytes for proper length handling
pb.write(3, hist)
pb.write(4, msg.channelId)
pb.write(5, msg.content)
@ -35,8 +31,7 @@ proc decode*(T: type Message, buffer: seq[byte]): ProtobufResult[T] =
return err(ProtobufError.missingRequiredField("lamportTimestamp"))
msg.lamportTimestamp = int64(timestamp)
# Decode causal history
var causalHistory: seq[string]
var causalHistory: seq[seq[byte]]
let histResult = pb.getRepeatedField(3, causalHistory)
if histResult.isOk:
msg.causalHistory = causalHistory
@ -53,23 +48,19 @@ proc decode*(T: type Message, buffer: seq[byte]): ProtobufResult[T] =
ok(msg)
proc serializeMessage*(msg: Message): Result[seq[byte], ReliabilityError] =
try:
let pb = encode(msg)
ok(pb.buffer)
except:
return err(reSerializationError)
let pb = encode(msg)
ok(pb.buffer)
proc deserializeMessage*(data: seq[byte]): Result[Message, ReliabilityError] =
let msg = Message.decode(data).valueOr:
return err(ReliabilityError.reDeserializationError)
ok(msg)
proc serializeBloomFilter*(filter: BloomFilter): Result[seq[byte], ReliabilityError] =
try:
var pb = initProtoBuffer()
var pb = initProtoBuffer()
# Convert intArray to bytes
# Convert intArray to bytes
try:
var bytes = newSeq[byte](filter.intArray.len * sizeof(int))
for i, val in filter.intArray:
var leVal: int
@ -82,27 +73,27 @@ proc serializeBloomFilter*(filter: BloomFilter): Result[seq[byte], ReliabilityEr
pb.write(3, uint64(filter.errorRate * 1_000_000))
pb.write(4, uint64(filter.kHashes))
pb.write(5, uint64(filter.mBits))
pb.finish()
ok(pb.buffer)
except:
return err(ReliabilityError.reSerializationError)
return err(ReliabilityError.reSerializationError)
pb.finish()
ok(pb.buffer)
proc deserializeBloomFilter*(data: seq[byte]): Result[BloomFilter, ReliabilityError] =
if data.len == 0:
return err(ReliabilityError.reDeserializationError)
try:
let pb = initProtoBuffer(data)
var bytes: seq[byte]
var cap, errRate, kHashes, mBits: uint64
let pb = initProtoBuffer(data)
var bytes: seq[byte]
var cap, errRate, kHashes, mBits: uint64
try:
if not pb.getField(1, bytes).get() or
not pb.getField(2, cap).get() or
not pb.getField(3, errRate).get() or
not pb.getField(4, kHashes).get() or
not pb.getField(5, mBits).get():
return err(reDeserializationError)
return err(ReliabilityError.reDeserializationError)
# Convert bytes back to intArray
var intArray = newSeq[int](bytes.len div sizeof(int))

View File

@ -1,4 +1,5 @@
import std/[times, locks]
import chronicles
import ./[rolling_bloom_filter, message]
type
@ -9,10 +10,10 @@ type
bloomFilterErrorRate*: float
maxMessageHistory*: int
maxCausalHistory*: int
resendInterval*: times.Duration
resendInterval*: Duration
maxResendAttempts*: int
syncMessageInterval*: times.Duration
bufferSweepInterval*: times.Duration
syncMessageInterval*: Duration
bufferSweepInterval*: Duration
ReliabilityManager* = ref object
lamportTimestamp*: int64
@ -20,7 +21,7 @@ type
bloomFilter*: RollingBloomFilter
outgoingBuffer*: seq[UnacknowledgedMessage]
incomingBuffer*: seq[Message]
channelId*: string
channelId*: ChannelID
config*: ReliabilityConfig
lock*: Lock
onMessageReady*: proc(messageId: MessageID) {.gcsafe.}
@ -60,17 +61,15 @@ proc cleanup*(rm: ReliabilityManager) {.raises: [].} =
rm.outgoingBuffer.setLen(0)
rm.incomingBuffer.setLen(0)
rm.messageHistory.setLen(0)
except ValueError as e:
logError("Error during cleanup: " & e.msg)
except Exception:
logError("Error during cleanup: " & getCurrentExceptionMsg())
error "Error during cleanup", msg = getCurrentExceptionMsg()
proc cleanBloomFilter*(rm: ReliabilityManager) {.gcsafe, raises: [].} =
withLock rm.lock:
try:
rm.bloomFilter.clean()
except Exception:
logError("Failed to clean ReliabilityManager bloom filter: " & getCurrentExceptionMsg())
error "Failed to clean bloom filter", msg = getCurrentExceptionMsg()
proc addToHistory*(rm: ReliabilityManager, msgId: MessageID) {.gcsafe, raises: [].} =
rm.messageHistory.add(msgId)

View File

@ -15,48 +15,53 @@ const
DefaultBloomFilterErrorRate* = 0.001
CapacityFlexPercent* = 20
proc logError*(msg: string) =
error "ReliabilityError", message = msg
proc newRollingBloomFilter*(capacity: int = DefaultBloomFilterCapacity,
errorRate: float = DefaultBloomFilterErrorRate): RollingBloomFilter {.gcsafe.} =
let targetCapacity = if capacity <= 0: DefaultBloomFilterCapacity else: capacity
let targetError = if errorRate <= 0.0 or errorRate >= 1.0: DefaultBloomFilterErrorRate else: errorRate
let filterResult = initializeBloomFilter(targetCapacity, targetError)
if filterResult.isErr:
error "Failed to initialize bloom filter", error = filterResult.error
# Try with default values if custom values failed
if capacity != DefaultBloomFilterCapacity or errorRate != DefaultBloomFilterErrorRate:
let defaultResult = initializeBloomFilter(DefaultBloomFilterCapacity, DefaultBloomFilterErrorRate)
if defaultResult.isErr:
error "Failed to initialize bloom filter with default parameters", error = defaultResult.error
let minCapacity = (DefaultBloomFilterCapacity.float * (100 - CapacityFlexPercent).float / 100.0).int
let maxCapacity = (DefaultBloomFilterCapacity.float * (100 + CapacityFlexPercent).float / 100.0).int
info "Successfully initialized bloom filter with default parameters",
capacity = DefaultBloomFilterCapacity,
minCapacity = minCapacity,
maxCapacity = maxCapacity
proc logInfo*(msg: string) =
info "ReliabilityInfo", message = msg
proc newRollingBloomFilter*(capacity: int, errorRate: float): RollingBloomFilter {.gcsafe.} =
try:
var filterResult: Result[BloomFilter, string]
{.gcsafe.}:
filterResult = initializeBloomFilter(capacity, errorRate)
if filterResult.isOk:
logInfo("Successfully initialized bloom filter")
let targetCapacity = capacity
let minCapacity = (capacity.float * 0.8).int
let maxCapacity = (capacity.float * 1.2).int
return RollingBloomFilter(
filter: filterResult.get(),
capacity: targetCapacity,
filter: defaultResult.get(),
capacity: DefaultBloomFilterCapacity,
minCapacity: minCapacity,
maxCapacity: maxCapacity,
messages: @[]
)
else:
logError("Failed to initialize bloom filter: " & filterResult.error)
except Exception:
logError("Failed to initialize bloom filter: " & getCurrentExceptionMsg())
error "Could not create bloom filter", error = filterResult.error
let minCapacity = (targetCapacity.float * (100 - CapacityFlexPercent).float / 100.0).int
let maxCapacity = (targetCapacity.float * (100 + CapacityFlexPercent).float / 100.0).int
# Default fallback case
let defaultResult = initializeBloomFilter(DefaultBloomFilterCapacity, DefaultBloomFilterErrorRate)
if defaultResult.isOk:
return RollingBloomFilter(
filter: defaultResult.get(),
capacity: DefaultBloomFilterCapacity,
minCapacity: (DefaultBloomFilterCapacity.float * 0.8).int,
maxCapacity: (DefaultBloomFilterCapacity.float * 1.2).int,
messages: @[]
)
else:
logError("Failed to initialize bloom filter with default parameters: " & defaultResult.error)
info "Successfully initialized bloom filter",
capacity = targetCapacity,
minCapacity = minCapacity,
maxCapacity = maxCapacity
return RollingBloomFilter(
filter: filterResult.get(),
capacity: targetCapacity,
minCapacity: minCapacity,
maxCapacity: maxCapacity,
messages: @[]
)
proc clean*(rbf: var RollingBloomFilter) {.gcsafe.} =
try:
@ -66,7 +71,7 @@ proc clean*(rbf: var RollingBloomFilter) {.gcsafe.} =
# Initialize new filter
let newFilterResult = initializeBloomFilter(rbf.maxCapacity, rbf.filter.errorRate)
if newFilterResult.isErr:
logError("Failed to create new bloom filter: " & newFilterResult.error)
error "Failed to create new bloom filter", error = newFilterResult.error
return
var newFilter = newFilterResult.get()
@ -78,20 +83,20 @@ proc clean*(rbf: var RollingBloomFilter) {.gcsafe.} =
for i in startIdx ..< rbf.messages.len:
newMessages.add(rbf.messages[i])
newFilter.insert(rbf.messages[i])
newFilter.insert(cast[string](rbf.messages[i]))
rbf.messages = newMessages
rbf.filter = newFilter
except Exception:
logError("Failed to clean bloom filter: " & getCurrentExceptionMsg())
error "Failed to clean bloom filter", error = getCurrentExceptionMsg()
proc add*(rbf: var RollingBloomFilter, messageId: MessageID) {.gcsafe.} =
## Adds a message ID to the rolling bloom filter.
##
## Parameters:
## - messageId: The ID of the message to add.
rbf.filter.insert(messageId)
rbf.filter.insert(cast[string](messageId))
rbf.messages.add(messageId)
# Clean if we exceed max capacity
@ -106,4 +111,4 @@ proc contains*(rbf: RollingBloomFilter, messageId: MessageID): bool {.gcsafe.} =
##
## Returns:
## True if the message ID is probably in the filter, false otherwise.
rbf.filter.lookup(messageId)
rbf.filter.lookup(cast[string](messageId))