feat: periodic tasks and docs

This commit is contained in:
shash256 2024-11-28 07:01:23 +04:00
parent 62821077bc
commit cb1f40c9c2
3 changed files with 160 additions and 8 deletions

BIN
.DS_Store vendored Normal file

Binary file not shown.

View File

@ -1,6 +1,10 @@
import ./common, ./utils
proc defaultConfig*(): ReliabilityConfig =
## Creates a default configuration for the ReliabilityManager.
##
## Returns:
## A ReliabilityConfig object with default values.
ReliabilityConfig(
bloomFilterCapacity: DefaultBloomFilterCapacity,
bloomFilterErrorRate: DefaultBloomFilterErrorRate,
@ -12,6 +16,14 @@ proc defaultConfig*(): ReliabilityConfig =
)
proc newReliabilityManager*(channelId: string, config: ReliabilityConfig = defaultConfig()): Result[ReliabilityManager] =
## Creates a new ReliabilityManager with the specified channel ID and configuration.
##
## Parameters:
## - channelId: A unique identifier for the communication channel.
## - config: Configuration options for the ReliabilityManager. If not provided, default configuration is used.
##
## Returns:
## A Result containing either a new ReliabilityManager instance or an error.
if channelId.len == 0:
return err[ReliabilityManager](reInvalidArgument)
@ -34,7 +46,14 @@ proc newReliabilityManager*(channelId: string, config: ReliabilityConfig = defau
except:
return err[ReliabilityManager](reOutOfMemory)
proc wrapOutgoingMessage*(rm: ReliabilityManager, message: string): Result[Message] =
proc wrapOutgoingMessage*(rm: ReliabilityManager, message: seq[byte]): Result[seq[byte]] =
## Wraps an outgoing message with reliability metadata.
##
## Parameters:
## - message: The content of the message to be sent.
##
## Returns:
## A Result containing either a Message object with reliability metadata or an error.
if message.len == 0:
return err[Message](reInvalidArgument)
if message.len > MaxMessageSize:
@ -56,7 +75,14 @@ proc wrapOutgoingMessage*(rm: ReliabilityManager, message: string): Result[Messa
except:
return err[Message](reInternalError)
proc unwrapReceivedMessage*(rm: ReliabilityManager, message: Message): Result[tuple[message: Message, missingDeps: seq[MessageID]]] =
proc unwrapReceivedMessage*(rm: ReliabilityManager, message: seq[byte]): Result[tuple[message: seq[byte], missingDeps: seq[MessageID]]] =
## Unwraps a received message and processes its reliability metadata.
##
## Parameters:
## - message: The received Message object.
##
## Returns:
## A Result containing either a tuple with the processed message and missing dependencies, or an error.
withLock rm.lock:
try:
if rm.bloomFilter.contains(message.messageId):
@ -86,6 +112,13 @@ proc unwrapReceivedMessage*(rm: ReliabilityManager, message: Message): Result[tu
return err[(Message, seq[MessageID])](reInternalError)
proc markDependenciesMet*(rm: ReliabilityManager, messageIds: seq[MessageID]): Result[void] =
## Marks the specified message dependencies as met.
##
## Parameters:
## - messageIds: A sequence of message IDs to mark as met.
##
## Returns:
## A Result indicating success or an error.
withLock rm.lock:
try:
var processedMessages: seq[Message] = @[]
@ -108,17 +141,87 @@ proc setCallbacks*(rm: ReliabilityManager,
onMessageReady: proc(messageId: MessageID),
onMessageSent: proc(messageId: MessageID),
onMissingDependencies: proc(messageId: MessageID, missingDeps: seq[MessageID])) =
## Sets the callback functions for various events in the ReliabilityManager.
##
## Parameters:
## - onMessageReady: Callback function called when a message is ready to be processed.
## - onMessageSent: Callback function called when a message is confirmed as sent.
## - onMissingDependencies: Callback function called when a message has missing dependencies.
withLock rm.lock:
rm.onMessageReady = onMessageReady
rm.onMessageSent = onMessageSent
rm.onMissingDependencies = onMissingDependencies
# proc checkUnacknowledgedMessages*(rm: ReliabilityManager)
proc checkUnacknowledgedMessages*(rm: ReliabilityManager) =
## Checks and processes unacknowledged messages in the outgoing buffer.
withLock rm.lock:
let now = getTime()
var newOutgoingBuffer: seq[UnacknowledgedMessage] = @[]
for msg in rm.outgoingBuffer:
if (now - msg.sendTime) < rm.config.resendInterval:
newOutgoingBuffer.add(msg)
elif msg.resendAttempts < rm.config.maxResendAttempts:
# Resend the message
msg.resendAttempts += 1
msg.sendTime = now
newOutgoingBuffer.add(msg)
# Here you would actually resend the message
elif rm.onMessageSent != nil:
rm.onMessageSent(msg.message.messageId)
rm.outgoingBuffer = newOutgoingBuffer
proc processMessage*(rm: ReliabilityManager, message: string): seq[MessageID] =
let wrappedMsg = checkAndLogError(rm.wrapOutgoingMessage(message), "Failed to wrap message")
let (_, missingDeps) = checkAndLogError(rm.unwrapReceivedMessage(wrappedMsg), "Failed to unwrap message")
return missingDeps
proc periodicBufferSweep(rm: ReliabilityManager) {.async.} =
## Periodically sweeps the buffer to clean up and resend messages.
##
## This is an internal function and should not be called directly.
while true:
rm.checkUnacknowledgedMessages()
rm.cleanBloomFilter()
await sleepAsync(5000) # Sleep for 5 seconds
proc periodicSyncMessage(rm: ReliabilityManager) {.async.} =
## Periodically sends a sync message to maintain connectivity.
##
## This is an internal function and should not be called directly.
while true:
discard rm.wrapOutgoingMessage("") # Empty content for sync messages
await sleepAsync(30000) # Sleep for 30 seconds
proc startPeriodicTasks*(rm: ReliabilityManager) =
## Starts the periodic tasks for buffer sweeping and sync message sending.
##
## This procedure should be called after creating a ReliabilityManager to enable automatic maintenance.
asyncCheck rm.periodicBufferSweep()
asyncCheck rm.periodicSyncMessage()
# # To demonstrate how to use the ReliabilityManager
# proc processMessage*(rm: ReliabilityManager, message: string): seq[MessageID] =
# let wrappedMsg = checkAndLogError(rm.wrapOutgoingMessage(message), "Failed to wrap message")
# let (_, missingDeps) = checkAndLogError(rm.unwrapReceivedMessage(wrappedMsg), "Failed to unwrap message")
# return missingDeps
proc resetReliabilityManager*(rm: ReliabilityManager): Result[void] =
## Resets the ReliabilityManager to its initial state.
##
## This procedure clears all buffers and resets the Lamport timestamp.
##
## Returns:
## A Result indicating success or an error if the Bloom filter initialization fails.
withLock rm.lock:
let bloomFilterResult = newRollingBloomFilter(rm.config.bloomFilterCapacity, rm.config.bloomFilterErrorRate, rm.config.bloomFilterWindow)
if bloomFilterResult.isErr:
return err[void](bloomFilterResult.error)
rm.lamportTimestamp = 0
rm.messageHistory.setLen(0)
rm.outgoingBuffer.setLen(0)
rm.incomingBuffer.setLen(0)
rm.bloomFilter = bloomFilterResult.value
return ok()
proc `=destroy`(rm: var ReliabilityManager) =
## Destructor for ReliabilityManager. Ensures proper cleanup of resources.
deinitLock(rm.lock)
when isMainModule:
# Example usage and basic tests
@ -147,6 +250,7 @@ when isMainModule:
else:
echo "Error wrapping message: ", msgResult.error
#rm.startPeriodicTasks()
rm.startPeriodicTasks()
# In a real application, you'd keep the program running to allow periodic tasks to execute
else:
echo "Error creating ReliabilityManager: ", rmResult.error

View File

@ -15,10 +15,21 @@ proc newRollingBloomFilter*(capacity: int, errorRate: float, window: Duration):
return err[RollingBloomFilter](reInternalError)
proc add*(rbf: var RollingBloomFilter, messageId: MessageID) =
## Adds a message ID to the rolling bloom filter.
##
## Parameters:
## - messageId: The ID of the message to add.
rbf.filter.insert(messageId)
rbf.messages.add(TimestampedMessageID(id: messageId, timestamp: getTime()))
proc contains*(rbf: RollingBloomFilter, messageId: MessageID): bool =
## Checks if a message ID is in the rolling bloom filter.
##
## Parameters:
## - messageId: The ID of the message to check.
##
## Returns:
## True if the message ID is probably in the filter, false otherwise.
rbf.filter.lookup(messageId)
proc clean*(rbf: var RollingBloomFilter) =
@ -53,6 +64,13 @@ proc generateUniqueID*(): MessageID =
result = $hash($timestamp & $randomPart)
proc serializeMessage*(msg: Message): Result[string] =
## Serializes a Message object to a JSON string.
##
## Parameters:
## - msg: The Message object to serialize.
##
## Returns:
## A Result containing either the serialized JSON string or an error.
try:
let jsonNode = %*{
"senderId": msg.senderId,
@ -67,6 +85,13 @@ proc serializeMessage*(msg: Message): Result[string] =
return err[string](reSerializationError)
proc deserializeMessage*(data: string): Result[Message] =
## Deserializes a JSON string to a Message object.
##
## Parameters:
## - data: The JSON string to deserialize.
##
## Returns:
## A Result containing either the deserialized Message object or an error.
try:
let jsonNode = parseJson(data)
return ok(Message(
@ -81,14 +106,26 @@ proc deserializeMessage*(data: string): Result[Message] =
return err[Message](reDeserializationError)
proc getMessageHistory*(rm: ReliabilityManager): seq[MessageID] =
## Retrieves the current message history from the ReliabilityManager.
##
## Returns:
## A sequence of MessageIDs representing the current message history.
withLock rm.lock:
return rm.messageHistory
proc getOutgoingBufferSize*(rm: ReliabilityManager): int =
## Returns the current size of the outgoing message buffer.
##
## Returns:
## The number of messages in the outgoing buffer.
withLock rm.lock:
return rm.outgoingBuffer.len
proc getIncomingBufferSize*(rm: ReliabilityManager): int =
## Returns the current size of the incoming message buffer.
##
## Returns:
## The number of messages in the incoming buffer.
withLock rm.lock:
return rm.incomingBuffer.len
@ -101,6 +138,17 @@ proc logInfo*(msg: string) =
info "ReliabilityInfo", message = msg
proc checkAndLogError*[T](res: Result[T], errorMsg: string): T =
## Checks the result of an operation, logs any errors, and returns the value or raises an exception.
##
## Parameters:
## - res: A Result[T] object to check.
## - errorMsg: A message to log if an error occurred.
##
## Returns:
## The value contained in the Result if it was successful.
##
## Raises:
## An exception with the error message if the Result contains an error.
if res.isOk:
return res.value
else: