chore: rearrange files

This commit is contained in:
shash256 2025-01-13 17:32:33 +04:00
parent ecde0a9ea8
commit 24d6026750
8 changed files with 172 additions and 202 deletions

View File

@ -1,74 +0,0 @@
import std/[times, locks]
import ./bloom
type
MessageID* = string
Message* = object
messageId*: MessageID
lamportTimestamp*: int64
causalHistory*: seq[MessageID]
channelId*: string
content*: seq[byte]
bloomFilter*: seq[byte]
UnacknowledgedMessage* = object
message*: Message
sendTime*: Time
resendAttempts*: int
TimestampedMessageID* = object
id*: MessageID
timestamp*: Time
PeriodicSyncCallback* = proc() {.gcsafe, raises: [].}
RollingBloomFilter* = object
filter*: BloomFilter
window*: times.Duration
messages*: seq[TimestampedMessageID]
ReliabilityConfig* = object
bloomFilterCapacity*: int
bloomFilterErrorRate*: float
bloomFilterWindow*: times.Duration
maxMessageHistory*: int
maxCausalHistory*: int
resendInterval*: times.Duration
maxResendAttempts*: int
syncMessageInterval*: times.Duration
bufferSweepInterval*: times.Duration
ReliabilityManager* = ref object
lamportTimestamp*: int64
messageHistory*: seq[MessageID]
bloomFilter*: RollingBloomFilter
outgoingBuffer*: seq[UnacknowledgedMessage]
incomingBuffer*: seq[Message]
channelId*: string
config*: ReliabilityConfig
lock*: Lock
onMessageReady*: proc(messageId: MessageID) {.gcsafe.}
onMessageSent*: proc(messageId: MessageID) {.gcsafe.}
onMissingDependencies*: proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.}
onPeriodicSync*: PeriodicSyncCallback
ReliabilityError* = enum
reInvalidArgument
reOutOfMemory
reInternalError
reSerializationError
reDeserializationError
reMessageTooLarge
const
DefaultBloomFilterCapacity* = 10000
DefaultBloomFilterErrorRate* = 0.001
DefaultBloomFilterWindow* = initDuration(hours = 1)
DefaultMaxMessageHistory* = 1000
DefaultMaxCausalHistory* = 10
DefaultResendInterval* = initDuration(seconds = 60)
DefaultMaxResendAttempts* = 5
DefaultSyncMessageInterval* = initDuration(seconds = 30)
DefaultBufferSweepInterval* = initDuration(seconds = 60)
MaxMessageSize* = 1024 * 1024 # 1 MB

30
src/message.nim Normal file
View File

@ -0,0 +1,30 @@
import std/times
type
MessageID* = string
Message* = object
messageId*: MessageID
lamportTimestamp*: int64
causalHistory*: seq[MessageID]
channelId*: string
content*: seq[byte]
bloomFilter*: seq[byte]
UnacknowledgedMessage* = object
message*: Message
sendTime*: Time
resendAttempts*: int
TimestampedMessageID* = object
id*: MessageID
timestamp*: Time
const
DefaultMaxMessageHistory* = 1000
DefaultMaxCausalHistory* = 10
DefaultResendInterval* = initDuration(seconds = 60)
DefaultMaxResendAttempts* = 5
DefaultSyncMessageInterval* = initDuration(seconds = 30)
DefaultBufferSweepInterval* = initDuration(seconds = 60)
MaxMessageSize* = 1024 * 1024 # 1 MB

View File

@ -1,8 +1,6 @@
import ./protobufutil
import ./common
import ./bloom
import libp2p/protobuf/minprotobuf
import std/options
import ../src/[message, protobufutil, bloom, reliability_utils]
proc toBytes(s: string): seq[byte] =
result = newSeq[byte](s.len)

View File

@ -11,13 +11,12 @@ type
ProtobufErrorKind* {.pure.} = enum
DecodeFailure
MissingRequiredField
InvalidLengthField
ProtobufError* = object
case kind*: ProtobufErrorKind
of DecodeFailure:
error*: minprotobuf.ProtoError
of MissingRequiredField, InvalidLengthField:
of MissingRequiredField:
field*: string
ProtobufResult*[T] = Result[T, ProtobufError]
@ -30,7 +29,4 @@ converter toProtobufError*(err: minprotobuf.ProtoError): ProtobufError =
ProtobufError(kind: ProtobufErrorKind.DecodeFailure, error: err)
proc missingRequiredField*(T: type ProtobufError, field: string): T =
ProtobufError(kind: ProtobufErrorKind.MissingRequiredField, field: field)
proc invalidLengthField*(T: type ProtobufError, field: string): T =
ProtobufError(kind: ProtobufErrorKind.InvalidLengthField, field: field)
ProtobufError(kind: ProtobufErrorKind.MissingRequiredField, field: field)

View File

@ -1,26 +1,6 @@
import std/[times, locks]
import std/[times, locks, tables, sets]
import chronos, results
import ./common
import ./utils
import ./protobuf
import std/[tables, sets]
proc defaultConfig*(): ReliabilityConfig =
## Creates a default configuration for the ReliabilityManager.
##
## Returns:
## A ReliabilityConfig object with default values.
ReliabilityConfig(
bloomFilterCapacity: DefaultBloomFilterCapacity,
bloomFilterErrorRate: DefaultBloomFilterErrorRate,
bloomFilterWindow: DefaultBloomFilterWindow,
maxMessageHistory: DefaultMaxMessageHistory,
maxCausalHistory: DefaultMaxCausalHistory,
resendInterval: DefaultResendInterval,
maxResendAttempts: DefaultMaxResendAttempts,
syncMessageInterval: DefaultSyncMessageInterval,
bufferSweepInterval: DefaultBufferSweepInterval
)
import ../src/[message, protobuf, reliability_utils, rolling_bloom_filter]
proc newReliabilityManager*(channelId: string, config: ReliabilityConfig = defaultConfig()): Result[ReliabilityManager, ReliabilityError] =
## Creates a new ReliabilityManager with the specified channel ID and configuration.
@ -189,43 +169,6 @@ proc processIncomingBuffer(rm: ReliabilityManager) =
newIncomingBuffer.add(msg)
rm.incomingBuffer = newIncomingBuffer
# withLock rm.lock:
# var processedAny = true
# while processedAny:
# processedAny = false
# var newIncomingBuffer: seq[Message] = @[]
# for msg in rm.incomingBuffer:
# var allDependenciesMet = true
# for depId in msg.causalHistory:
# if not rm.bloomFilter.contains(depId):
# allDependenciesMet = false
# break
# # Check if dependency is still in incoming buffer
# for bufferedMsg in rm.incomingBuffer:
# if bufferedMsg.messageId == depId:
# allDependenciesMet = false
# break
# if not allDependenciesMet:
# break
# if allDependenciesMet:
# # Process message
# rm.addToHistory(msg.messageId)
# if rm.onMessageReady != nil:
# rm.onMessageReady(msg.messageId)
# processedAny = true
# else:
# # Keep in buffer
# newIncomingBuffer.add(msg)
# rm.incomingBuffer = newIncomingBuffer
# # Exit if no messages were processed in this pass
# if not processedAny:
# break
proc unwrapReceivedMessage*(rm: ReliabilityManager, message: seq[byte]): Result[tuple[message: seq[byte], missingDeps: seq[MessageID]], ReliabilityError] =
## Unwraps a received message and processes its reliability metadata.
@ -396,14 +339,4 @@ proc resetReliabilityManager*(rm: ReliabilityManager): Result[void, ReliabilityE
)
return ok()
except:
return err(reInternalError)
proc cleanup*(rm: ReliabilityManager) {.raises: [].} =
if not rm.isNil:
{.gcsafe.}:
try:
rm.outgoingBuffer.setLen(0)
rm.incomingBuffer.setLen(0)
rm.messageHistory.setLen(0)
except Exception as e:
logError("Error during cleanup: " & e.msg)
return err(reInternalError)

95
src/reliability_utils.nim Normal file
View File

@ -0,0 +1,95 @@
import std/[times, locks]
import ./[rolling_bloom_filter, message]
type
PeriodicSyncCallback* = proc() {.gcsafe, raises: [].}
ReliabilityConfig* = object
bloomFilterCapacity*: int
bloomFilterErrorRate*: float
bloomFilterWindow*: times.Duration
maxMessageHistory*: int
maxCausalHistory*: int
resendInterval*: times.Duration
maxResendAttempts*: int
syncMessageInterval*: times.Duration
bufferSweepInterval*: times.Duration
ReliabilityManager* = ref object
lamportTimestamp*: int64
messageHistory*: seq[MessageID]
bloomFilter*: RollingBloomFilter
outgoingBuffer*: seq[UnacknowledgedMessage]
incomingBuffer*: seq[Message]
channelId*: string
config*: ReliabilityConfig
lock*: Lock
onMessageReady*: proc(messageId: MessageID) {.gcsafe.}
onMessageSent*: proc(messageId: MessageID) {.gcsafe.}
onMissingDependencies*: proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.}
onPeriodicSync*: PeriodicSyncCallback
ReliabilityError* = enum
reInvalidArgument
reOutOfMemory
reInternalError
reSerializationError
reDeserializationError
reMessageTooLarge
proc defaultConfig*(): ReliabilityConfig =
## Creates a default configuration for the ReliabilityManager.
##
## Returns:
## A ReliabilityConfig object with default values.
ReliabilityConfig(
bloomFilterCapacity: DefaultBloomFilterCapacity,
bloomFilterErrorRate: DefaultBloomFilterErrorRate,
bloomFilterWindow: DefaultBloomFilterWindow,
maxMessageHistory: DefaultMaxMessageHistory,
maxCausalHistory: DefaultMaxCausalHistory,
resendInterval: DefaultResendInterval,
maxResendAttempts: DefaultMaxResendAttempts,
syncMessageInterval: DefaultSyncMessageInterval,
bufferSweepInterval: DefaultBufferSweepInterval
)
proc cleanup*(rm: ReliabilityManager) {.raises: [].} =
if not rm.isNil:
{.gcsafe.}:
try:
rm.outgoingBuffer.setLen(0)
rm.incomingBuffer.setLen(0)
rm.messageHistory.setLen(0)
except Exception as e:
logError("Error during cleanup: " & e.msg)
proc cleanBloomFilter*(rm: ReliabilityManager) {.gcsafe, raises: [].} =
withLock rm.lock:
try:
rm.bloomFilter.clean()
except Exception as e:
logError("Failed to clean ReliabilityManager bloom filter: " & e.msg)
proc addToHistory*(rm: ReliabilityManager, msgId: MessageID) {.gcsafe, raises: [].} =
rm.messageHistory.add(msgId)
if rm.messageHistory.len > rm.config.maxMessageHistory:
rm.messageHistory.delete(0)
proc updateLamportTimestamp*(rm: ReliabilityManager, msgTs: int64) {.gcsafe, raises: [].} =
rm.lamportTimestamp = max(msgTs, rm.lamportTimestamp) + 1
proc getRecentMessageIDs*(rm: ReliabilityManager, n: int): seq[MessageID] =
result = rm.messageHistory[max(0, rm.messageHistory.len - n) .. ^1]
proc getMessageHistory*(rm: ReliabilityManager): seq[MessageID] =
withLock rm.lock:
result = rm.messageHistory
proc getOutgoingBuffer*(rm: ReliabilityManager): seq[UnacknowledgedMessage] =
withLock rm.lock:
result = rm.outgoingBuffer
proc getIncomingBuffer*(rm: ReliabilityManager): seq[Message] =
withLock rm.lock:
result = rm.incomingBuffer

View File

@ -1,7 +1,18 @@
import std/[times, locks]
import chronos, chronicles
import ./bloom
import ./common
import std/times
import chronos
import chronicles
import ./[bloom, message]
type
RollingBloomFilter* = object
filter*: BloomFilter
window*: times.Duration
messages*: seq[TimestampedMessageID]
const
DefaultBloomFilterCapacity* = 10000
DefaultBloomFilterErrorRate* = 0.001
DefaultBloomFilterWindow* = initDuration(hours = 1)
proc logError*(msg: string) =
error "ReliabilityError", message = msg
@ -81,34 +92,4 @@ proc clean*(rbf: var RollingBloomFilter) {.gcsafe.} =
rbf.messages = newMessages
rbf.filter = newFilter
except Exception as e:
logError("Failed to clean bloom filter: " & e.msg)
proc cleanBloomFilter*(rm: ReliabilityManager) {.gcsafe, raises: [].} =
withLock rm.lock:
try:
rm.bloomFilter.clean()
except Exception as e:
logError("Failed to clean ReliabilityManager bloom filter: " & e.msg)
proc addToHistory*(rm: ReliabilityManager, msgId: MessageID) =
rm.messageHistory.add(msgId)
if rm.messageHistory.len > rm.config.maxMessageHistory:
rm.messageHistory.delete(0)
proc updateLamportTimestamp*(rm: ReliabilityManager, msgTs: int64) =
rm.lamportTimestamp = max(msgTs, rm.lamportTimestamp) + 1
proc getRecentMessageIDs*(rm: ReliabilityManager, n: int): seq[MessageID] =
result = rm.messageHistory[max(0, rm.messageHistory.len - n) .. ^1]
proc getMessageHistory*(rm: ReliabilityManager): seq[MessageID] =
withLock rm.lock:
result = rm.messageHistory
proc getOutgoingBuffer*(rm: ReliabilityManager): seq[UnacknowledgedMessage] =
withLock rm.lock:
result = rm.outgoingBuffer
proc getIncomingBuffer*(rm: ReliabilityManager): seq[Message] =
withLock rm.lock:
result = rm.incomingBuffer
logError("Failed to clean bloom filter: " & e.msg)

View File

@ -1,8 +1,5 @@
import unittest, results, chronos, std/times
import ../src/reliability
import ../src/common
import ../src/protobuf
import ../src/utils
import ../src/[reliability, message, protobuf, reliability_utils, rolling_bloom_filter]
# Core functionality tests
suite "Core Operations":
@ -296,12 +293,14 @@ suite "Periodic Tasks & Buffer Management":
finalBuffer.len == 3 # Should have removed acknowledged messages
messageSentCount == 3 # Should have triggered sent callback for acknowledged messages
test "periodic buffer sweep":
test "periodic buffer sweep and bloom clean":
var messageSentCount = 0
var config = defaultConfig()
config.resendInterval = initDuration(milliseconds = 100) # Very short for testing
config.bufferSweepInterval = initDuration(milliseconds = 50)
config.resendInterval = initDuration(milliseconds = 100) # Short for testing
config.bufferSweepInterval = initDuration(milliseconds = 50) # Frequent sweeps
config.bloomFilterWindow = initDuration(milliseconds = 150) # Short window
config.maxResendAttempts = 3 # Set a low number of max attempts
let rmResultP = newReliabilityManager("testChannel", config)
check rmResultP.isOk()
@ -313,27 +312,39 @@ suite "Periodic Tasks & Buffer Management":
proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = discard
)
# Add message to buffer
# First message - should be cleaned from bloom filter later
let msg1 = @[byte(1)]
let id1 = "msg1"
let wrap1 = rm.wrapOutgoingMessage(msg1, id1)
check wrap1.isOk()
let initialBuffer = rm.getOutgoingBuffer()
check initialBuffer[0].resendAttempts == 0
check:
initialBuffer[0].resendAttempts == 0
rm.bloomFilter.contains(id1)
rm.startPeriodicTasks()
# Wait long enough for several sweep intervals
waitFor sleepAsync(chronos.milliseconds(300))
# Wait long enough for bloom filter window to pass and first message to exceed max retries
waitFor sleepAsync(chronos.milliseconds(500))
# Add new message
let msg2 = @[byte(2)]
let id2 = "msg2"
let wrap2 = rm.wrapOutgoingMessage(msg2, id2)
check wrap2.isOk()
let finalBuffer = rm.getOutgoingBuffer()
check:
finalBuffer.len == 1
finalBuffer[0].resendAttempts > 0
finalBuffer.len == 1 # Only msg2 should be in buffer, msg1 should be removed after max retries
finalBuffer[0].message.messageId == id2 # Verify it's the second message
finalBuffer[0].resendAttempts == 0 # New message should have 0 attempts
not rm.bloomFilter.contains(id1) # Bloom filter cleaning check
rm.bloomFilter.contains(id2) # New message still in filter
rm.cleanup()
test "periodic sync":
test "periodic sync callback":
var syncCallCount = 0
rm.setCallbacks(
proc(messageId: MessageID) {.gcsafe.} = discard,