Export required types

This commit is contained in:
Jazz Turner-Baggs 2025-08-25 15:11:02 -07:00
parent 7883bf7bfa
commit 77c30db82e
No known key found for this signature in database

View File

@ -2,6 +2,8 @@ import std/[times, locks, tables, sets, options]
import chronos, results, chronicles
import ./[message, protobuf, reliability_utils, rolling_bloom_filter]
export message, reliability_utils, protobuf
proc newReliabilityManager*(
config: ReliabilityConfig = defaultConfig()
): Result[ReliabilityManager, ReliabilityError] =
@ -47,12 +49,12 @@ proc reviewAckStatus(rm: ReliabilityManager, msg: SdsMessage) {.gcsafe.} =
capacity: bfResult.get().capacity,
minCapacity: (
bfResult.get().capacity.float * (100 - CapacityFlexPercent).float / 100.0
).int,
maxCapacity: (
bfResult.get().capacity.float * (100 + CapacityFlexPercent).float / 100.0
).int,
messages: @[],
)
).int,
maxCapacity: (
bfResult.get().capacity.float * (100 + CapacityFlexPercent).float / 100.0
).int,
messages: @[],
)
)
else:
error "Failed to deserialize bloom filter", error = bfResult.error
@ -112,14 +114,16 @@ proc wrapOutgoingMessage*(
let msg = SdsMessage(
messageId: messageId,
lamportTimestamp: channel.lamportTimestamp,
causalHistory: rm.getRecentSdsMessageIDs(rm.config.maxCausalHistory, channelId),
causalHistory: rm.getRecentSdsMessageIDs(rm.config.maxCausalHistory,
channelId),
channelId: channelId,
content: message,
bloomFilter: bfResult.get(),
)
channel.outgoingBuffer.add(
UnacknowledgedMessage(message: msg, sendTime: getTime(), resendAttempts: 0)
UnacknowledgedMessage(message: msg, sendTime: getTime(),
resendAttempts: 0)
)
# Add to causal history and bloom filter
@ -132,7 +136,8 @@ proc wrapOutgoingMessage*(
channelId = channelId, msg = getCurrentExceptionMsg()
return err(ReliabilityError.reSerializationError)
proc processIncomingBuffer(rm: ReliabilityManager, channelId: SdsChannelID) {.gcsafe.} =
proc processIncomingBuffer(rm: ReliabilityManager,
channelId: SdsChannelID) {.gcsafe.} =
withLock rm.lock:
if channelId notin rm.channels:
error "Channel does not exist", channelId = channelId
@ -176,7 +181,8 @@ proc processIncomingBuffer(rm: ReliabilityManager, channelId: SdsChannelID) {.gc
proc unwrapReceivedMessage*(
rm: ReliabilityManager, message: seq[byte]
): Result[
tuple[message: seq[byte], missingDeps: seq[SdsMessageID], channelId: SdsChannelID],
tuple[message: seq[byte], missingDeps: seq[SdsMessageID],
channelId: SdsChannelID],
ReliabilityError,
] =
## Unwraps a received message and processes its reliability metadata.
@ -234,7 +240,8 @@ proc unwrapReceivedMessage*(
return err(ReliabilityError.reDeserializationError)
proc markDependenciesMet*(
rm: ReliabilityManager, messageIds: seq[SdsMessageID], channelId: SdsChannelID
rm: ReliabilityManager, messageIds: seq[SdsMessageID],
channelId: SdsChannelID
): Result[void, ReliabilityError] =
## Marks the specified message dependencies as met.
##
@ -330,7 +337,8 @@ proc periodicBufferSweep(
except Exception:
error "Error in periodic buffer sweep", msg = getCurrentExceptionMsg()
await sleepAsync(chronos.milliseconds(rm.config.bufferSweepInterval.inMilliseconds))
await sleepAsync(chronos.milliseconds(
rm.config.bufferSweepInterval.inMilliseconds))
proc periodicSyncMessage(
rm: ReliabilityManager
@ -351,7 +359,8 @@ proc startPeriodicTasks*(rm: ReliabilityManager) =
asyncSpawn rm.periodicBufferSweep()
asyncSpawn rm.periodicSyncMessage()
proc resetReliabilityManager*(rm: ReliabilityManager): Result[void, ReliabilityError] =
proc resetReliabilityManager*(rm: ReliabilityManager): Result[void,
ReliabilityError] =
## Resets the ReliabilityManager to its initial state.
##
## This procedure clears all buffers and resets the Lamport timestamp.