diff --git a/library/config_json.nim b/library/config_json.nim new file mode 100644 index 0000000..95c529e --- /dev/null +++ b/library/config_json.nim @@ -0,0 +1,82 @@ +## JSON parser for ReliabilityConfig — used by the FFI constructor +## SdsNewReliabilityManagerWithConfig. +## +## Schema: a JSON object where every field is optional. Missing fields fall +## back to the Default* constants in sds/types/reliability_config.nim. +## Duration fields use the suffix "Ms" and are integer milliseconds. +## +## Empty input ("" or NULL on the C side) returns the default config. + +import std/[json, times] +import results +import sds/types/reliability_config + +proc getJsonInt(node: JsonNode, key: string, default: int): int = + if node.hasKey(key) and node[key].kind == JInt: + return node[key].getInt() + return default + +proc getJsonFloat(node: JsonNode, key: string, default: float): float = + if not node.hasKey(key): + return default + case node[key].kind + of JFloat: node[key].getFloat() + of JInt: node[key].getInt().float + else: default + +proc getJsonDurationMs( + node: JsonNode, key: string, default: Duration +): Duration = + if node.hasKey(key) and node[key].kind == JInt: + return initDuration(milliseconds = node[key].getInt()) + return default + +proc parseReliabilityConfig*( + jsonStr: string +): Result[ReliabilityConfig, string] = + ## Parses a JSON string into a ReliabilityConfig. Empty input returns the + ## default config. Unknown keys are ignored. Type-mismatched values fall + ## back to defaults rather than failing. + if jsonStr.len == 0: + return ok(ReliabilityConfig.init()) + + var node: JsonNode + try: + node = parseJson(jsonStr) + except JsonParsingError, ValueError, Exception: + return err("invalid JSON: " & getCurrentExceptionMsg()) + + if node.isNil or node.kind != JObject: + return err("config must be a JSON object") + + ok( + ReliabilityConfig.init( + bloomFilterCapacity = + getJsonInt(node, "bloomFilterCapacity", DefaultBloomFilterCapacity), + bloomFilterErrorRate = + getJsonFloat(node, "bloomFilterErrorRate", DefaultBloomFilterErrorRate), + maxMessageHistory = + getJsonInt(node, "maxMessageHistory", DefaultMaxMessageHistory), + maxCausalHistory = + getJsonInt(node, "maxCausalHistory", DefaultMaxCausalHistory), + resendInterval = + getJsonDurationMs(node, "resendIntervalMs", DefaultResendInterval), + maxResendAttempts = + getJsonInt(node, "maxResendAttempts", DefaultMaxResendAttempts), + syncMessageInterval = getJsonDurationMs( + node, "syncMessageIntervalMs", DefaultSyncMessageInterval + ), + bufferSweepInterval = getJsonDurationMs( + node, "bufferSweepIntervalMs", DefaultBufferSweepInterval + ), + repairTMin = getJsonDurationMs(node, "repairTMinMs", DefaultRepairTMin), + repairTMax = getJsonDurationMs(node, "repairTMaxMs", DefaultRepairTMax), + numResponseGroups = + getJsonInt(node, "numResponseGroups", DefaultNumResponseGroups), + maxRepairRequests = + getJsonInt(node, "maxRepairRequests", DefaultMaxRepairRequests), + repairSweepInterval = getJsonDurationMs( + node, "repairSweepIntervalMs", DefaultRepairSweepInterval + ), + ) + ) diff --git a/library/libsds.h b/library/libsds.h index 0d9840e..553d469 100644 --- a/library/libsds.h +++ b/library/libsds.h @@ -28,6 +28,37 @@ typedef void (*SdsRetrievalHintProvider) (const char* messageId, char** hint, si void* SdsNewReliabilityManager(SdsCallBack callback, void* userData); +// Construct a Reliability Manager with an explicit participant ID and a +// JSON-encoded ReliabilityConfig. +// +// participantId: stable, non-empty identifier for SDS-R. Pass NULL or "" to +// disable SDS-R (the manager will not request or answer +// repairs). It MUST be set-once at construction; do not change +// it across the lifetime of the manager. +// configJson: JSON object with optional fields for ReliabilityConfig. +// Pass NULL or "" to use the full default config. Missing +// fields fall back to per-field defaults. Duration fields use +// the suffix "Ms" (integer milliseconds). +// +// Recognised JSON keys: +// bloomFilterCapacity (int, default 10000) +// bloomFilterErrorRate (float, default 0.001) +// maxMessageHistory (int, default 1000) +// maxCausalHistory (int, default 10) +// resendIntervalMs (int, default 60000) +// maxResendAttempts (int, default 5) +// syncMessageIntervalMs (int, default 30000) +// bufferSweepIntervalMs (int, default 60000) +// repairTMinMs (int, default 30000) +// repairTMaxMs (int, default 300000) +// numResponseGroups (int, default 1) +// maxRepairRequests (int, default 3) +// repairSweepIntervalMs (int, default 5000) +void* SdsNewReliabilityManagerWithConfig(const char* participantId, + const char* configJson, + SdsCallBack callback, + void* userData); + void SdsSetEventCallback(void* ctx, SdsCallBack callback, void* userData); void SdsSetRetrievalHintProvider(void* ctx, SdsRetrievalHintProvider callback, void* userData); @@ -59,6 +90,13 @@ int SdsMarkDependenciesMet(void* ctx, int SdsStartPeriodicTasks(void* ctx, SdsCallBack callback, void* userData); +// Removes a channel and frees its per-channel state (buffers, bloom filter, +// message cache, SDS-R repair entries). Safe to call on a channel that does +// not exist; returns RET_OK in that case. +int SdsRemoveChannel(void* ctx, + const char* channelId, + SdsCallBack callback, + void* userData); #ifdef __cplusplus diff --git a/library/libsds.nim b/library/libsds.nim index af05857..fc90003 100644 --- a/library/libsds.nim +++ b/library/libsds.nim @@ -178,12 +178,17 @@ proc initializeLibrary() {.exported.} = ################################################################################ ### Exported procs -proc SdsNewReliabilityManager( - callback: SdsCallBack, userData: pointer -): pointer {.dynlib, exportc, cdecl.} = +proc createManager( + participantId: cstring, + configJson: cstring, + callback: SdsCallBack, + userData: pointer, +): pointer = + ## Shared implementation for SdsNewReliabilityManager and + ## SdsNewReliabilityManagerWithConfig. Either argument may be NULL or empty + ## to indicate "use defaults". initializeLibrary() - ## Creates a new instance of the Reliability Manager. if isNil(callback): echo "error: missing callback in NewReliabilityManager" return nil @@ -204,11 +209,14 @@ proc SdsNewReliabilityManager( repairReadyCb: onRepairReady(ctx), ) + let pId: cstring = if participantId.isNil: cstring"" else: participantId + let cfg: cstring = if configJson.isNil: cstring"" else: configJson + let retCode = handleRequest( ctx, RequestType.LIFECYCLE, SdsLifecycleRequest.createShared( - SdsLifecycleMsgType.CREATE_RELIABILITY_MANAGER, nil, appCallbacks + SdsLifecycleMsgType.CREATE_RELIABILITY_MANAGER, "", appCallbacks, pId, cfg ), callback, userData, @@ -219,6 +227,26 @@ proc SdsNewReliabilityManager( return ctx +proc SdsNewReliabilityManager( + callback: SdsCallBack, userData: pointer +): pointer {.dynlib, exportc, cdecl.} = + ## Back-compat shim. Constructs a manager with empty participantId (SDS-R + ## disabled) and the default ReliabilityConfig. New code should use + ## SdsNewReliabilityManagerWithConfig. + return createManager(nil, nil, callback, userData) + +proc SdsNewReliabilityManagerWithConfig( + participantId: cstring, + configJson: cstring, + callback: SdsCallBack, + userData: pointer, +): pointer {.dynlib, exportc, cdecl.} = + ## Creates a new instance of the Reliability Manager with an explicit + ## participantId (required for SDS-R) and a JSON-encoded ReliabilityConfig. + ## Either argument may be NULL or empty to fall back to defaults; missing + ## fields inside the JSON also fall back to per-field defaults. + return createManager(participantId, configJson, callback, userData) + proc SdsSetEventCallback( ctx: ptr SdsContext, callback: SdsCallBack, userData: pointer ) {.dynlib, exportc.} = @@ -384,5 +412,37 @@ proc SdsStartPeriodicTasks( userData, ) +proc SdsRemoveChannel( + ctx: ptr SdsContext, + channelId: cstring, + callback: SdsCallBack, + userData: pointer, +): cint {.dynlib, exportc.} = + ## Removes a channel and its associated state from the Reliability Manager. + ## Use this when a user leaves a channel to release per-channel buffers, + ## bloom filter, message cache, and SDS-R repair state. + initializeLibrary() + checkLibsdsParams(ctx, callback, userData) + + if channelId == nil: + let msg = "libsds error: " & "channel ID pointer is NULL" + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + + if $channelId == "": + let msg = "libsds error: " & "channel ID is empty string" + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + + handleRequest( + ctx, + RequestType.LIFECYCLE, + SdsLifecycleRequest.createShared( + SdsLifecycleMsgType.REMOVE_CHANNEL, channelId + ), + callback, + userData, + ) + ### End of exported procs ################################################################################ diff --git a/library/sds_thread/inter_thread_communication/requests/sds_lifecycle_request.nim b/library/sds_thread/inter_thread_communication/requests/sds_lifecycle_request.nim index a0f3adb..363d442 100644 --- a/library/sds_thread/inter_thread_communication/requests/sds_lifecycle_request.nim +++ b/library/sds_thread/inter_thread_communication/requests/sds_lifecycle_request.nim @@ -2,38 +2,54 @@ import std/json import chronos, chronicles, results import library/alloc +import library/config_json import sds type SdsLifecycleMsgType* = enum CREATE_RELIABILITY_MANAGER RESET_RELIABILITY_MANAGER START_PERIODIC_TASKS + REMOVE_CHANNEL type SdsLifecycleRequest* = object operation: SdsLifecycleMsgType channelId: cstring appCallbacks: AppCallbacks + participantId: cstring + configJson: cstring proc createShared*( T: type SdsLifecycleRequest, op: SdsLifecycleMsgType, channelId: cstring = "", appCallbacks: AppCallbacks = nil, + participantId: cstring = "", + configJson: cstring = "", ): ptr type T = var ret = createShared(T) ret[].operation = op ret[].appCallbacks = appCallbacks ret[].channelId = channelId.alloc() + ret[].participantId = participantId.alloc() + ret[].configJson = configJson.alloc() return ret proc destroyShared(self: ptr SdsLifecycleRequest) = deallocShared(self[].channelId) + deallocShared(self[].participantId) + deallocShared(self[].configJson) deallocShared(self) proc createReliabilityManager( - appCallbacks: AppCallbacks = nil + participantId: string, + configJson: string, + appCallbacks: AppCallbacks = nil, ): Future[Result[ReliabilityManager, string]] {.async.} = - let rm = newReliabilityManager().valueOr: + let config = parseReliabilityConfig(configJson).valueOr: + error "Failed to parse reliability config", error = error + return err("Failed to parse reliability config: " & error) + + let rm = newReliabilityManager(config, participantId).valueOr: error "Failed creating reliability manager", error = error return err("Failed creating reliability manager: " & $error) @@ -53,7 +69,11 @@ proc process*( case self.operation of CREATE_RELIABILITY_MANAGER: - rm[] = (await createReliabilityManager(self.appCallbacks)).valueOr: + rm[] = ( + await createReliabilityManager( + $self.participantId, $self.configJson, self.appCallbacks + ) + ).valueOr: error "CREATE_RELIABILITY_MANAGER failed", error = error return err("error processing CREATE_RELIABILITY_MANAGER request: " & $error) of RESET_RELIABILITY_MANAGER: @@ -62,5 +82,9 @@ proc process*( return err("error processing RESET_RELIABILITY_MANAGER request: " & $error) of START_PERIODIC_TASKS: rm[].startPeriodicTasks() + of REMOVE_CHANNEL: + removeChannel(rm[], $self.channelId).isOkOr: + error "REMOVE_CHANNEL failed", error = error + return err("error processing REMOVE_CHANNEL request: " & $error) return ok("")