chore: address review comments

This commit is contained in:
shash256 2025-01-30 22:44:55 +05:30
parent 6b0b9c34fa
commit bb9e89b4a5
6 changed files with 85 additions and 71 deletions

View File

@ -10,7 +10,7 @@ type
errorRate*: float errorRate*: float
kHashes*: int kHashes*: int
mBits*: int mBits*: int
intArray: seq[int] intArray*: seq[int]
{.push overflowChecks: off.} # Turn off overflow checks for hashing operations {.push overflowChecks: off.} # Turn off overflow checks for hashing operations

View File

@ -16,10 +16,6 @@ type
sendTime*: Time sendTime*: Time
resendAttempts*: int resendAttempts*: int
TimestampedMessageID* = object
id*: MessageID
timestamp*: Time
const const
DefaultMaxMessageHistory* = 1000 DefaultMaxMessageHistory* = 1000
DefaultMaxCausalHistory* = 10 DefaultMaxCausalHistory* = 10

View File

@ -9,7 +9,9 @@ type
TErrorForK = seq[float] TErrorForK = seq[float]
TAllErrorRates* = array[0..12, TErrorForK] TAllErrorRates* = array[0..12, TErrorForK]
let kErrors*: TAllErrorRates = [ var kErrors* {.threadvar.}: TAllErrorRates
kErrors = [
@[1.0], @[1.0],
@[1.0, 1.0, 0.3930000000, 0.2830000000, 0.2210000000, 0.1810000000, @[1.0, 1.0, 0.3930000000, 0.2830000000, 0.2210000000, 0.1810000000,
0.1540000000, 0.1330000000, 0.1180000000, 0.1050000000, 0.0952000000, 0.1540000000, 0.1330000000, 0.1180000000, 0.1050000000, 0.0952000000,
@ -95,4 +97,4 @@ let kErrors*: TAllErrorRates = [
0.0000712000, 0.0000463000, 0.0000305000, 0.0000204000, 0.0000138000, 0.0000712000, 0.0000463000, 0.0000305000, 0.0000204000, 0.0000138000,
0.0000094200, 0.0000065200, 0.0000045600, 0.0000032200, 0.0000022900, 0.0000094200, 0.0000065200, 0.0000045600, 0.0000032200, 0.0000022900,
0.0000016500, 0.0000012000, 0.0000008740] 0.0000016500, 0.0000012000, 0.0000008740]
] ]

View File

@ -1,5 +1,6 @@
import libp2p/protobuf/minprotobuf import libp2p/protobuf/minprotobuf
import std/options import std/options
import endians
import ../src/[message, protobufutil, bloom, reliability_utils] import ../src/[message, protobufutil, bloom, reliability_utils]
proc toBytes(s: string): seq[byte] = proc toBytes(s: string): seq[byte] =
@ -56,17 +57,13 @@ proc serializeMessage*(msg: Message): Result[seq[byte], ReliabilityError] =
let pb = encode(msg) let pb = encode(msg)
ok(pb.buffer) ok(pb.buffer)
except: except:
err(reSerializationError) return err(reSerializationError)
proc deserializeMessage*(data: seq[byte]): Result[Message, ReliabilityError] = proc deserializeMessage*(data: seq[byte]): Result[Message, ReliabilityError] =
try: let msg = Message.decode(data).valueOr:
let msgResult = Message.decode(data) return err(ReliabilityError.reDeserializationError)
if msgResult.isOk:
ok(msgResult.get) ok(msg)
else:
err(reSerializationError)
except:
err(reDeserializationError)
proc serializeBloomFilter*(filter: BloomFilter): Result[seq[byte], ReliabilityError] = proc serializeBloomFilter*(filter: BloomFilter): Result[seq[byte], ReliabilityError] =
try: try:
@ -75,8 +72,10 @@ proc serializeBloomFilter*(filter: BloomFilter): Result[seq[byte], ReliabilityEr
# Convert intArray to bytes # Convert intArray to bytes
var bytes = newSeq[byte](filter.intArray.len * sizeof(int)) var bytes = newSeq[byte](filter.intArray.len * sizeof(int))
for i, val in filter.intArray: for i, val in filter.intArray:
var leVal: int
littleEndian64(addr leVal, unsafeAddr val)
let start = i * sizeof(int) let start = i * sizeof(int)
copyMem(addr bytes[start], unsafeAddr val, sizeof(int)) copyMem(addr bytes[start], addr leVal, sizeof(int))
pb.write(1, bytes) pb.write(1, bytes)
pb.write(2, uint64(filter.capacity)) pb.write(2, uint64(filter.capacity))
@ -87,11 +86,11 @@ proc serializeBloomFilter*(filter: BloomFilter): Result[seq[byte], ReliabilityEr
pb.finish() pb.finish()
ok(pb.buffer) ok(pb.buffer)
except: except:
err(reSerializationError) return err(ReliabilityError.reSerializationError)
proc deserializeBloomFilter*(data: seq[byte]): Result[BloomFilter, ReliabilityError] = proc deserializeBloomFilter*(data: seq[byte]): Result[BloomFilter, ReliabilityError] =
if data.len == 0: if data.len == 0:
return err(reDeserializationError) return err(ReliabilityError.reDeserializationError)
try: try:
let pb = initProtoBuffer(data) let pb = initProtoBuffer(data)
@ -108,8 +107,10 @@ proc deserializeBloomFilter*(data: seq[byte]): Result[BloomFilter, ReliabilityEr
# Convert bytes back to intArray # Convert bytes back to intArray
var intArray = newSeq[int](bytes.len div sizeof(int)) var intArray = newSeq[int](bytes.len div sizeof(int))
for i in 0 ..< intArray.len: for i in 0 ..< intArray.len:
var leVal: int
let start = i * sizeof(int) let start = i * sizeof(int)
copyMem(addr intArray[i], unsafeAddr bytes[start], sizeof(int)) copyMem(addr leVal, unsafeAddr bytes[start], sizeof(int))
littleEndian64(addr intArray[i], addr leVal)
ok(BloomFilter( ok(BloomFilter(
intArray: intArray, intArray: intArray,
@ -119,4 +120,4 @@ proc deserializeBloomFilter*(data: seq[byte]): Result[BloomFilter, ReliabilityEr
mBits: int(mBits) mBits: int(mBits)
)) ))
except: except:
err(reDeserializationError) return err(ReliabilityError.reDeserializationError)

View File

@ -7,7 +7,6 @@ type
ReliabilityConfig* = object ReliabilityConfig* = object
bloomFilterCapacity*: int bloomFilterCapacity*: int
bloomFilterErrorRate*: float bloomFilterErrorRate*: float
bloomFilterWindow*: times.Duration
maxMessageHistory*: int maxMessageHistory*: int
maxCausalHistory*: int maxCausalHistory*: int
resendInterval*: times.Duration resendInterval*: times.Duration
@ -29,7 +28,7 @@ type
onMissingDependencies*: proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} onMissingDependencies*: proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.}
onPeriodicSync*: PeriodicSyncCallback onPeriodicSync*: PeriodicSyncCallback
ReliabilityError* = enum ReliabilityError* {.pure.} = enum
reInvalidArgument reInvalidArgument
reOutOfMemory reOutOfMemory
reInternalError reInternalError
@ -45,7 +44,6 @@ proc defaultConfig*(): ReliabilityConfig =
ReliabilityConfig( ReliabilityConfig(
bloomFilterCapacity: DefaultBloomFilterCapacity, bloomFilterCapacity: DefaultBloomFilterCapacity,
bloomFilterErrorRate: DefaultBloomFilterErrorRate, bloomFilterErrorRate: DefaultBloomFilterErrorRate,
bloomFilterWindow: DefaultBloomFilterWindow,
maxMessageHistory: DefaultMaxMessageHistory, maxMessageHistory: DefaultMaxMessageHistory,
maxCausalHistory: DefaultMaxCausalHistory, maxCausalHistory: DefaultMaxCausalHistory,
resendInterval: DefaultResendInterval, resendInterval: DefaultResendInterval,
@ -55,21 +53,24 @@ proc defaultConfig*(): ReliabilityConfig =
) )
proc cleanup*(rm: ReliabilityManager) {.raises: [].} = proc cleanup*(rm: ReliabilityManager) {.raises: [].} =
if not rm.isNil: if not rm.isNil():
{.gcsafe.}: {.gcsafe.}:
try: try:
rm.outgoingBuffer.setLen(0) withLock rm.lock:
rm.incomingBuffer.setLen(0) rm.outgoingBuffer.setLen(0)
rm.messageHistory.setLen(0) rm.incomingBuffer.setLen(0)
except Exception as e: rm.messageHistory.setLen(0)
except ValueError as e:
logError("Error during cleanup: " & e.msg) logError("Error during cleanup: " & e.msg)
except Exception:
logError("Error during cleanup: " & getCurrentExceptionMsg())
proc cleanBloomFilter*(rm: ReliabilityManager) {.gcsafe, raises: [].} = proc cleanBloomFilter*(rm: ReliabilityManager) {.gcsafe, raises: [].} =
withLock rm.lock: withLock rm.lock:
try: try:
rm.bloomFilter.clean() rm.bloomFilter.clean()
except Exception as e: except Exception:
logError("Failed to clean ReliabilityManager bloom filter: " & e.msg) logError("Failed to clean ReliabilityManager bloom filter: " & getCurrentExceptionMsg())
proc addToHistory*(rm: ReliabilityManager, msgId: MessageID) {.gcsafe, raises: [].} = proc addToHistory*(rm: ReliabilityManager, msgId: MessageID) {.gcsafe, raises: [].} =
rm.messageHistory.add(msgId) rm.messageHistory.add(msgId)

View File

@ -1,4 +1,3 @@
import std/times
import chronos import chronos
import chronicles import chronicles
import ./[bloom, message] import ./[bloom, message]
@ -6,13 +5,15 @@ import ./[bloom, message]
type type
RollingBloomFilter* = object RollingBloomFilter* = object
filter*: BloomFilter filter*: BloomFilter
window*: times.Duration capacity*: int
messages*: seq[TimestampedMessageID] minCapacity*: int
maxCapacity*: int
messages*: seq[MessageID]
const const
DefaultBloomFilterCapacity* = 10000 DefaultBloomFilterCapacity* = 10000
DefaultBloomFilterErrorRate* = 0.001 DefaultBloomFilterErrorRate* = 0.001
DefaultBloomFilterWindow* = initDuration(hours = 1) CapacityFlexPercent* = 20
proc logError*(msg: string) = proc logError*(msg: string) =
error "ReliabilityError", message = msg error "ReliabilityError", message = msg
@ -20,7 +21,7 @@ proc logError*(msg: string) =
proc logInfo*(msg: string) = proc logInfo*(msg: string) =
info "ReliabilityInfo", message = msg info "ReliabilityInfo", message = msg
proc newRollingBloomFilter*(capacity: int, errorRate: float, window: times.Duration): RollingBloomFilter {.gcsafe.} = proc newRollingBloomFilter*(capacity: int, errorRate: float): RollingBloomFilter {.gcsafe.} =
try: try:
var filterResult: Result[BloomFilter, string] var filterResult: Result[BloomFilter, string]
{.gcsafe.}: {.gcsafe.}:
@ -28,29 +29,62 @@ proc newRollingBloomFilter*(capacity: int, errorRate: float, window: times.Durat
if filterResult.isOk: if filterResult.isOk:
logInfo("Successfully initialized bloom filter") logInfo("Successfully initialized bloom filter")
let targetCapacity = capacity
let minCapacity = (capacity.float * 0.8).int
let maxCapacity = (capacity.float * 1.2).int
return RollingBloomFilter( return RollingBloomFilter(
filter: filterResult.get(), # Extract the BloomFilter from Result filter: filterResult.get(),
window: window, capacity: targetCapacity,
minCapacity: minCapacity,
maxCapacity: maxCapacity,
messages: @[] messages: @[]
) )
else: else:
logError("Failed to initialize bloom filter: " & filterResult.error) logError("Failed to initialize bloom filter: " & filterResult.error)
# Fall through to default case below
except:
logError("Failed to initialize bloom filter")
except Exception:
logError("Failed to initialize bloom filter: " & getCurrentExceptionMsg())
# Default fallback case # Default fallback case
let defaultResult = initializeBloomFilter(DefaultBloomFilterCapacity, DefaultBloomFilterErrorRate) let defaultResult = initializeBloomFilter(DefaultBloomFilterCapacity, DefaultBloomFilterErrorRate)
if defaultResult.isOk: if defaultResult.isOk:
return RollingBloomFilter( return RollingBloomFilter(
filter: defaultResult.get(), filter: defaultResult.get(),
window: window, capacity: DefaultBloomFilterCapacity,
minCapacity: (DefaultBloomFilterCapacity.float * 0.8).int,
maxCapacity: (DefaultBloomFilterCapacity.float * 1.2).int,
messages: @[] messages: @[]
) )
else: else:
# If even default initialization fails, raise an exception logError("Failed to initialize bloom filter with default parameters: " & defaultResult.error)
logError("Failed to initialize bloom filter with default parameters")
proc clean*(rbf: var RollingBloomFilter) {.gcsafe.} =
try:
if rbf.messages.len <= rbf.maxCapacity:
return # Don't clean unless we exceed max capacity
# Initialize new filter
let newFilterResult = initializeBloomFilter(rbf.maxCapacity, rbf.filter.errorRate)
if newFilterResult.isErr:
logError("Failed to create new bloom filter: " & newFilterResult.error)
return
var newFilter = newFilterResult.get()
# Keep most recent messages up to minCapacity
let keepCount = rbf.minCapacity
let startIdx = max(0, rbf.messages.len - keepCount)
var newMessages: seq[MessageID] = @[]
for i in startIdx ..< rbf.messages.len:
newMessages.add(rbf.messages[i])
newFilter.insert(rbf.messages[i])
rbf.messages = newMessages
rbf.filter = newFilter
except Exception:
logError("Failed to clean bloom filter: " & getCurrentExceptionMsg())
proc add*(rbf: var RollingBloomFilter, messageId: MessageID) {.gcsafe.} = proc add*(rbf: var RollingBloomFilter, messageId: MessageID) {.gcsafe.} =
## Adds a message ID to the rolling bloom filter. ## Adds a message ID to the rolling bloom filter.
@ -58,7 +92,11 @@ proc add*(rbf: var RollingBloomFilter, messageId: MessageID) {.gcsafe.} =
## Parameters: ## Parameters:
## - messageId: The ID of the message to add. ## - messageId: The ID of the message to add.
rbf.filter.insert(messageId) rbf.filter.insert(messageId)
rbf.messages.add(TimestampedMessageID(id: messageId, timestamp: getTime())) rbf.messages.add(messageId)
# Clean if we exceed max capacity
if rbf.messages.len > rbf.maxCapacity:
rbf.clean()
proc contains*(rbf: RollingBloomFilter, messageId: MessageID): bool {.gcsafe.} = proc contains*(rbf: RollingBloomFilter, messageId: MessageID): bool {.gcsafe.} =
## Checks if a message ID is in the rolling bloom filter. ## Checks if a message ID is in the rolling bloom filter.
@ -68,28 +106,4 @@ proc contains*(rbf: RollingBloomFilter, messageId: MessageID): bool {.gcsafe.} =
## ##
## Returns: ## Returns:
## True if the message ID is probably in the filter, false otherwise. ## True if the message ID is probably in the filter, false otherwise.
rbf.filter.lookup(messageId) rbf.filter.lookup(messageId)
proc clean*(rbf: var RollingBloomFilter) {.gcsafe.} =
try:
let now = getTime()
let cutoff = now - rbf.window
var newMessages: seq[TimestampedMessageID] = @[]
# Initialize new filter
let newFilterResult = initializeBloomFilter(rbf.filter.capacity, rbf.filter.errorRate)
if newFilterResult.isErr:
logError("Failed to create new bloom filter: " & newFilterResult.error)
return
var newFilter = newFilterResult.get()
for msg in rbf.messages:
if msg.timestamp > cutoff:
newMessages.add(msg)
newFilter.insert(msg.id)
rbf.messages = newMessages
rbf.filter = newFilter
except Exception as e:
logError("Failed to clean bloom filter: " & e.msg)