mirror of
https://github.com/logos-messaging/nim-sds.git
synced 2026-01-03 06:33:06 +00:00
initial unwrap implementation
This commit is contained in:
parent
6320667dfd
commit
79f42fb461
@ -39,6 +39,12 @@ int WrapOutgoingMessage(void* ctx,
|
|||||||
SdsCallBack callback,
|
SdsCallBack callback,
|
||||||
void* userData);
|
void* userData);
|
||||||
|
|
||||||
|
int UnwrapReceivedMessage(void* ctx,
|
||||||
|
void* message,
|
||||||
|
size_t messageLen,
|
||||||
|
SdsCallBack callback,
|
||||||
|
void* userData);
|
||||||
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|||||||
@ -214,5 +214,33 @@ proc WrapOutgoingMessage(
|
|||||||
userData,
|
userData,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
proc UnwrapReceivedMessage(
|
||||||
|
ctx: ptr SdsContext,
|
||||||
|
message: pointer,
|
||||||
|
messageLen: csize_t,
|
||||||
|
callback: SdsCallBack,
|
||||||
|
userData: pointer,
|
||||||
|
): cint {.dynlib, exportc.} =
|
||||||
|
initializeLibrary()
|
||||||
|
checkLibsdsParams(ctx, callback, userData)
|
||||||
|
|
||||||
|
if message == nil and messageLen > 0:
|
||||||
|
let msg = "libsds error: " & "message pointer is NULL but length > 0"
|
||||||
|
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||||
|
return RET_ERR
|
||||||
|
|
||||||
|
var msg = allocSharedSeqFromCArray(cast[ptr byte](message), messageLen.int)
|
||||||
|
|
||||||
|
defer:
|
||||||
|
deallocSharedSeq(msg)
|
||||||
|
|
||||||
|
handleRequest(
|
||||||
|
ctx,
|
||||||
|
RequestType.MESSAGE,
|
||||||
|
SdsMessageRequest.createShared(SdsMessageMsgType.UNWRAP_MESSAGE, msg, messageLen),
|
||||||
|
callback,
|
||||||
|
userData,
|
||||||
|
)
|
||||||
|
|
||||||
### End of exported procs
|
### End of exported procs
|
||||||
################################################################################
|
################################################################################
|
||||||
|
|||||||
@ -6,6 +6,7 @@ import ../../../../src/[reliability_utils, reliability, message]
|
|||||||
|
|
||||||
type SdsMessageMsgType* = enum
|
type SdsMessageMsgType* = enum
|
||||||
WRAP_MESSAGE
|
WRAP_MESSAGE
|
||||||
|
UNWRAP_MESSAGE
|
||||||
|
|
||||||
type SdsMessageRequest* = object
|
type SdsMessageRequest* = object
|
||||||
operation: SdsMessageMsgType
|
operation: SdsMessageMsgType
|
||||||
@ -13,6 +14,10 @@ type SdsMessageRequest* = object
|
|||||||
messageLen: csize_t
|
messageLen: csize_t
|
||||||
messageId: cstring
|
messageId: cstring
|
||||||
|
|
||||||
|
type SdsUnwrapResponse* = object
|
||||||
|
message*: seq[byte]
|
||||||
|
missingDeps*: seq[MessageID]
|
||||||
|
|
||||||
proc createShared*(
|
proc createShared*(
|
||||||
T: type SdsMessageRequest,
|
T: type SdsMessageRequest,
|
||||||
op: SdsMessageMsgType,
|
op: SdsMessageMsgType,
|
||||||
@ -48,5 +53,16 @@ proc process*(
|
|||||||
|
|
||||||
# returns a comma-separates string of bytes
|
# returns a comma-separates string of bytes
|
||||||
return ok(wrappedMessage.mapIt($it).join(","))
|
return ok(wrappedMessage.mapIt($it).join(","))
|
||||||
|
of UNWRAP_MESSAGE:
|
||||||
|
let messageBytes = self.message.toSeq()
|
||||||
|
|
||||||
|
let (unwrappedMessage, missingDeps) = unwrapReceivedMessage(rm[], messageBytes).valueOr:
|
||||||
|
error "UNWRAP_MESSAGE failed", error = error
|
||||||
|
return err("error processing UNWRAP_MESSAGE request: " & $error)
|
||||||
|
|
||||||
|
let res = SdsUnwrapResponse(message: unwrappedMessage, missingDeps: missingDeps)
|
||||||
|
|
||||||
|
# return the result as a json string
|
||||||
|
return ok($(%*(res)))
|
||||||
|
|
||||||
return ok("")
|
return ok("")
|
||||||
|
|||||||
@ -2,7 +2,9 @@ import std/[times, locks, tables, sets]
|
|||||||
import chronos, results
|
import chronos, results
|
||||||
import ../src/[message, protobuf, reliability_utils, rolling_bloom_filter]
|
import ../src/[message, protobuf, reliability_utils, rolling_bloom_filter]
|
||||||
|
|
||||||
proc newReliabilityManager*(channelId: string, config: ReliabilityConfig = defaultConfig()): Result[ReliabilityManager, ReliabilityError] =
|
proc newReliabilityManager*(
|
||||||
|
channelId: string, config: ReliabilityConfig = defaultConfig()
|
||||||
|
): Result[ReliabilityManager, ReliabilityError] =
|
||||||
## Creates a new ReliabilityManager with the specified channel ID and configuration.
|
## Creates a new ReliabilityManager with the specified channel ID and configuration.
|
||||||
##
|
##
|
||||||
## Parameters:
|
## Parameters:
|
||||||
@ -13,14 +15,12 @@ proc newReliabilityManager*(channelId: string, config: ReliabilityConfig = defau
|
|||||||
## A Result containing either a new ReliabilityManager instance or an error.
|
## A Result containing either a new ReliabilityManager instance or an error.
|
||||||
if channelId.len == 0:
|
if channelId.len == 0:
|
||||||
return err(reInvalidArgument)
|
return err(reInvalidArgument)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
let bloomFilter = newRollingBloomFilter(
|
let bloomFilter = newRollingBloomFilter(
|
||||||
config.bloomFilterCapacity,
|
config.bloomFilterCapacity, config.bloomFilterErrorRate, config.bloomFilterWindow
|
||||||
config.bloomFilterErrorRate,
|
|
||||||
config.bloomFilterWindow
|
|
||||||
)
|
)
|
||||||
|
|
||||||
let rm = ReliabilityManager(
|
let rm = ReliabilityManager(
|
||||||
lamportTimestamp: 0,
|
lamportTimestamp: 0,
|
||||||
messageHistory: @[],
|
messageHistory: @[],
|
||||||
@ -28,7 +28,7 @@ proc newReliabilityManager*(channelId: string, config: ReliabilityConfig = defau
|
|||||||
outgoingBuffer: @[],
|
outgoingBuffer: @[],
|
||||||
incomingBuffer: @[],
|
incomingBuffer: @[],
|
||||||
channelId: channelId,
|
channelId: channelId,
|
||||||
config: config
|
config: config,
|
||||||
)
|
)
|
||||||
initLock(rm.lock)
|
initLock(rm.lock)
|
||||||
return ok(rm)
|
return ok(rm)
|
||||||
@ -40,27 +40,25 @@ proc reviewAckStatus(rm: ReliabilityManager, msg: Message) =
|
|||||||
while i < rm.outgoingBuffer.len:
|
while i < rm.outgoingBuffer.len:
|
||||||
var acknowledged = false
|
var acknowledged = false
|
||||||
let outMsg = rm.outgoingBuffer[i]
|
let outMsg = rm.outgoingBuffer[i]
|
||||||
|
|
||||||
# Check if message is in causal history
|
# Check if message is in causal history
|
||||||
for msgID in msg.causalHistory:
|
for msgID in msg.causalHistory:
|
||||||
if outMsg.message.messageId == msgID:
|
if outMsg.message.messageId == msgID:
|
||||||
acknowledged = true
|
acknowledged = true
|
||||||
break
|
break
|
||||||
|
|
||||||
# Check bloom filter if not already acknowledged
|
# Check bloom filter if not already acknowledged
|
||||||
if not acknowledged and msg.bloomFilter.len > 0:
|
if not acknowledged and msg.bloomFilter.len > 0:
|
||||||
let bfResult = deserializeBloomFilter(msg.bloomFilter)
|
let bfResult = deserializeBloomFilter(msg.bloomFilter)
|
||||||
if bfResult.isOk:
|
if bfResult.isOk:
|
||||||
var rbf = RollingBloomFilter(
|
var rbf = RollingBloomFilter(
|
||||||
filter: bfResult.get(),
|
filter: bfResult.get(), window: rm.bloomFilter.window, messages: @[]
|
||||||
window: rm.bloomFilter.window,
|
|
||||||
messages: @[]
|
|
||||||
)
|
)
|
||||||
if rbf.contains(outMsg.message.messageId):
|
if rbf.contains(outMsg.message.messageId):
|
||||||
acknowledged = true
|
acknowledged = true
|
||||||
else:
|
else:
|
||||||
logError("Failed to deserialize bloom filter")
|
logError("Failed to deserialize bloom filter")
|
||||||
|
|
||||||
if acknowledged:
|
if acknowledged:
|
||||||
if rm.onMessageSent != nil:
|
if rm.onMessageSent != nil:
|
||||||
rm.onMessageSent(outMsg.message.messageId)
|
rm.onMessageSent(outMsg.message.messageId)
|
||||||
@ -68,7 +66,9 @@ proc reviewAckStatus(rm: ReliabilityManager, msg: Message) =
|
|||||||
else:
|
else:
|
||||||
inc i
|
inc i
|
||||||
|
|
||||||
proc wrapOutgoingMessage*(rm: ReliabilityManager, message: seq[byte], messageId: MessageID): Result[seq[byte], ReliabilityError] =
|
proc wrapOutgoingMessage*(
|
||||||
|
rm: ReliabilityManager, message: seq[byte], messageId: MessageID
|
||||||
|
): Result[seq[byte], ReliabilityError] =
|
||||||
## Wraps an outgoing message with reliability metadata.
|
## Wraps an outgoing message with reliability metadata.
|
||||||
##
|
##
|
||||||
## Parameters:
|
## Parameters:
|
||||||
@ -84,7 +84,7 @@ proc wrapOutgoingMessage*(rm: ReliabilityManager, message: seq[byte], messageId:
|
|||||||
withLock rm.lock:
|
withLock rm.lock:
|
||||||
try:
|
try:
|
||||||
rm.updateLamportTimestamp(getTime().toUnix)
|
rm.updateLamportTimestamp(getTime().toUnix)
|
||||||
|
|
||||||
# Serialize current bloom filter
|
# Serialize current bloom filter
|
||||||
var bloomBytes: seq[byte]
|
var bloomBytes: seq[byte]
|
||||||
let bfResult = serializeBloomFilter(rm.bloomFilter.filter)
|
let bfResult = serializeBloomFilter(rm.bloomFilter.filter)
|
||||||
@ -100,15 +100,13 @@ proc wrapOutgoingMessage*(rm: ReliabilityManager, message: seq[byte], messageId:
|
|||||||
causalHistory: rm.getRecentMessageIDs(rm.config.maxCausalHistory),
|
causalHistory: rm.getRecentMessageIDs(rm.config.maxCausalHistory),
|
||||||
channelId: rm.channelId,
|
channelId: rm.channelId,
|
||||||
content: message,
|
content: message,
|
||||||
bloomFilter: bloomBytes
|
bloomFilter: bloomBytes,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Add to outgoing buffer
|
# Add to outgoing buffer
|
||||||
rm.outgoingBuffer.add(UnacknowledgedMessage(
|
rm.outgoingBuffer.add(
|
||||||
message: msg,
|
UnacknowledgedMessage(message: msg, sendTime: getTime(), resendAttempts: 0)
|
||||||
sendTime: getTime(),
|
)
|
||||||
resendAttempts: 0
|
|
||||||
))
|
|
||||||
|
|
||||||
# Add to causal history and bloom filter
|
# Add to causal history and bloom filter
|
||||||
rm.bloomFilter.add(msg.messageId)
|
rm.bloomFilter.add(msg.messageId)
|
||||||
@ -156,7 +154,7 @@ proc processIncomingBuffer(rm: ReliabilityManager) =
|
|||||||
if rm.onMessageReady != nil:
|
if rm.onMessageReady != nil:
|
||||||
rm.onMessageReady(msg.messageId)
|
rm.onMessageReady(msg.messageId)
|
||||||
processed.incl(msgId)
|
processed.incl(msgId)
|
||||||
|
|
||||||
# Add any dependent messages that might now be ready
|
# Add any dependent messages that might now be ready
|
||||||
if msgId in dependencies:
|
if msgId in dependencies:
|
||||||
for dependentId in dependencies[msgId]:
|
for dependentId in dependencies[msgId]:
|
||||||
@ -170,7 +168,11 @@ proc processIncomingBuffer(rm: ReliabilityManager) =
|
|||||||
|
|
||||||
rm.incomingBuffer = newIncomingBuffer
|
rm.incomingBuffer = newIncomingBuffer
|
||||||
|
|
||||||
proc unwrapReceivedMessage*(rm: ReliabilityManager, message: seq[byte]): Result[tuple[message: seq[byte], missingDeps: seq[MessageID]], ReliabilityError] =
|
proc unwrapReceivedMessage*(
|
||||||
|
rm: ReliabilityManager, message: seq[byte]
|
||||||
|
): Result[tuple[message: seq[byte], missingDeps: seq[MessageID]], ReliabilityError] {.
|
||||||
|
gcsafe
|
||||||
|
.} =
|
||||||
## Unwraps a received message and processes its reliability metadata.
|
## Unwraps a received message and processes its reliability metadata.
|
||||||
##
|
##
|
||||||
## Parameters:
|
## Parameters:
|
||||||
@ -182,7 +184,7 @@ proc unwrapReceivedMessage*(rm: ReliabilityManager, message: seq[byte]): Result[
|
|||||||
let msgResult = deserializeMessage(message)
|
let msgResult = deserializeMessage(message)
|
||||||
if not msgResult.isOk:
|
if not msgResult.isOk:
|
||||||
return err(msgResult.error)
|
return err(msgResult.error)
|
||||||
|
|
||||||
let msg = msgResult.get
|
let msg = msgResult.get
|
||||||
if rm.bloomFilter.contains(msg.messageId):
|
if rm.bloomFilter.contains(msg.messageId):
|
||||||
return ok((msg.content, @[]))
|
return ok((msg.content, @[]))
|
||||||
@ -225,7 +227,9 @@ proc unwrapReceivedMessage*(rm: ReliabilityManager, message: seq[byte]): Result[
|
|||||||
except:
|
except:
|
||||||
return err(reInternalError)
|
return err(reInternalError)
|
||||||
|
|
||||||
proc markDependenciesMet*(rm: ReliabilityManager, messageIds: seq[MessageID]): Result[void, ReliabilityError] =
|
proc markDependenciesMet*(
|
||||||
|
rm: ReliabilityManager, messageIds: seq[MessageID]
|
||||||
|
): Result[void, ReliabilityError] =
|
||||||
## Marks the specified message dependencies as met.
|
## Marks the specified message dependencies as met.
|
||||||
##
|
##
|
||||||
## Parameters:
|
## Parameters:
|
||||||
@ -240,16 +244,19 @@ proc markDependenciesMet*(rm: ReliabilityManager, messageIds: seq[MessageID]): R
|
|||||||
rm.bloomFilter.add(msgId)
|
rm.bloomFilter.add(msgId)
|
||||||
# rm.addToHistory(msgId) -- not needed as this proc usually called when msg in long-term storage of application?
|
# rm.addToHistory(msgId) -- not needed as this proc usually called when msg in long-term storage of application?
|
||||||
rm.processIncomingBuffer()
|
rm.processIncomingBuffer()
|
||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
except:
|
except:
|
||||||
return err(reInternalError)
|
return err(reInternalError)
|
||||||
|
|
||||||
proc setCallbacks*(rm: ReliabilityManager,
|
proc setCallbacks*(
|
||||||
onMessageReady: proc(messageId: MessageID) {.gcsafe.},
|
rm: ReliabilityManager,
|
||||||
onMessageSent: proc(messageId: MessageID) {.gcsafe.},
|
onMessageReady: proc(messageId: MessageID) {.gcsafe.},
|
||||||
onMissingDependencies: proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.},
|
onMessageSent: proc(messageId: MessageID) {.gcsafe.},
|
||||||
onPeriodicSync: PeriodicSyncCallback = nil) =
|
onMissingDependencies:
|
||||||
|
proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.},
|
||||||
|
onPeriodicSync: PeriodicSyncCallback = nil,
|
||||||
|
) =
|
||||||
## Sets the callback functions for various events in the ReliabilityManager.
|
## Sets the callback functions for various events in the ReliabilityManager.
|
||||||
##
|
##
|
||||||
## Parameters:
|
## Parameters:
|
||||||
@ -268,7 +275,7 @@ proc checkUnacknowledgedMessages*(rm: ReliabilityManager) {.raises: [].} =
|
|||||||
withLock rm.lock:
|
withLock rm.lock:
|
||||||
let now = getTime()
|
let now = getTime()
|
||||||
var newOutgoingBuffer: seq[UnacknowledgedMessage] = @[]
|
var newOutgoingBuffer: seq[UnacknowledgedMessage] = @[]
|
||||||
|
|
||||||
try:
|
try:
|
||||||
for unackMsg in rm.outgoingBuffer:
|
for unackMsg in rm.outgoingBuffer:
|
||||||
let elapsed = now - unackMsg.sendTime
|
let elapsed = now - unackMsg.sendTime
|
||||||
@ -298,7 +305,7 @@ proc periodicBufferSweep(rm: ReliabilityManager) {.async: (raises: [CancelledErr
|
|||||||
rm.cleanBloomFilter()
|
rm.cleanBloomFilter()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logError("Error in periodic buffer sweep: " & e.msg)
|
logError("Error in periodic buffer sweep: " & e.msg)
|
||||||
|
|
||||||
await sleepAsync(chronos.milliseconds(rm.config.bufferSweepInterval.inMilliseconds))
|
await sleepAsync(chronos.milliseconds(rm.config.bufferSweepInterval.inMilliseconds))
|
||||||
|
|
||||||
proc periodicSyncMessage(rm: ReliabilityManager) {.async: (raises: [CancelledError]).} =
|
proc periodicSyncMessage(rm: ReliabilityManager) {.async: (raises: [CancelledError]).} =
|
||||||
@ -333,10 +340,9 @@ proc resetReliabilityManager*(rm: ReliabilityManager): Result[void, ReliabilityE
|
|||||||
rm.outgoingBuffer.setLen(0)
|
rm.outgoingBuffer.setLen(0)
|
||||||
rm.incomingBuffer.setLen(0)
|
rm.incomingBuffer.setLen(0)
|
||||||
rm.bloomFilter = newRollingBloomFilter(
|
rm.bloomFilter = newRollingBloomFilter(
|
||||||
rm.config.bloomFilterCapacity,
|
rm.config.bloomFilterCapacity, rm.config.bloomFilterErrorRate,
|
||||||
rm.config.bloomFilterErrorRate,
|
rm.config.bloomFilterWindow,
|
||||||
rm.config.bloomFilterWindow
|
|
||||||
)
|
)
|
||||||
return ok()
|
return ok()
|
||||||
except:
|
except:
|
||||||
return err(reInternalError)
|
return err(reInternalError)
|
||||||
|
|||||||
@ -24,9 +24,10 @@ type
|
|||||||
channelId*: string
|
channelId*: string
|
||||||
config*: ReliabilityConfig
|
config*: ReliabilityConfig
|
||||||
lock*: Lock
|
lock*: Lock
|
||||||
onMessageReady*: proc(messageId: MessageID)
|
onMessageReady*: proc(messageId: MessageID) {.gcsafe.}
|
||||||
onMessageSent*: proc(messageId: MessageID)
|
onMessageSent*: proc(messageId: MessageID) {.gcsafe.}
|
||||||
onMissingDependencies*: proc(messageId: MessageID, missingDeps: seq[MessageID])
|
onMissingDependencies*:
|
||||||
|
proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.}
|
||||||
onPeriodicSync*: proc()
|
onPeriodicSync*: proc()
|
||||||
|
|
||||||
ReliabilityError* = enum
|
ReliabilityError* = enum
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user