From 211a4da63feee1c451bd8b3be569c440a4372cc8 Mon Sep 17 00:00:00 2001 From: jm-clius Date: Fri, 3 Oct 2025 16:44:08 +0100 Subject: [PATCH] feat: integrate sds-r with message channels --- sds.md | 231 ++++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 173 insertions(+), 58 deletions(-) diff --git a/sds.md b/sds.md index 1b1a902c03..acb683ccf2 100644 --- a/sds.md +++ b/sds.md @@ -58,6 +58,9 @@ other participants using the corresponding message ID. * **Participant ID:** Each participant has a globally unique, immutable ID visible to other participants in the communication. +* **Sender ID:** +The *Participant ID* of the original sender of a message, +often coupled with a *Message ID*. ## Wire protocol @@ -75,15 +78,20 @@ syntax = "proto3"; message HistoryEntry { string message_id = 1; // Unique identifier of the SDS message, as defined in `Message` optional bytes retrieval_hint = 2; // Optional information to help remote parties retrieve this SDS message; For example, A Waku deterministic message hash or routing payload hash + + optional string sender_id = 3; // Participant ID of original message sender. Only populated if using optional SDS Repair extension } message Message { string sender_id = 1; // Participant ID of the message sender string message_id = 2; // Unique identifier of the message string channel_id = 3; // Identifier of the channel to which the message belongs - optional int32 lamport_timestamp = 10; // Logical timestamp for causal ordering in channel + optional uint64 lamport_timestamp = 10; // Logical timestamp for causal ordering in channel repeated HistoryEntry causal_history = 11; // List of preceding message IDs that this message causally depends on. Generally 2 or 3 message IDs are included. optional bytes bloom_filter = 12; // Bloom filter representing received message IDs in channel + + repeated HistoryEntry repair_request = 13; // Capped list of history entries missing from sender's causal history. Only populated if using the optional SDS Repair extension. + optional bytes content = 20; // Actual content of the message } ``` @@ -111,7 +119,11 @@ Its importance is expected to increase once a p2p retrieval mechanism is added t Each participant MUST maintain: * A Lamport timestamp for each channel of communication, -initialized to current epoch time in nanosecond resolution. +initialized to current epoch time in millisecond resolution. +The Lamport timestamp is increased as described in the [protocol steps](#protocol-steps) +to maintain a logical ordering of events while staying close to the current epoch time. +This allows the messages from new joiners to be correctly ordered with other recent messages, +without these new participants first having to synchronize past messages to discover the current Lamport timestamp. * A bloom filter for received message IDs per channel. The bloom filter SHOULD be rolled over and recomputed once it reaches a predefined capacity of message IDs. @@ -144,8 +156,11 @@ the `lamport_timestamp`, `causal_history` and `bloom_filter` fields. Before broadcasting a message: -* the participant MUST increase its local Lamport timestamp by `1` and -include this in the `lamport_timestamp` field. +* the participant MUST set its local Lamport timestamp +to the maximum between the current value + `1` +and the current epoch time in milliseconds. +In other words the local Lamport timestamp is set to `max(timeNowInMs, current_lamport_timestamp + 1)`. +* the participant MUST include the increased Lamport timestamp in the message's `lamport_timestamp` field. * the participant MUST determine the preceding few message IDs in the local history and include these in an ordered list in the `causal_history` field. The number of message IDs to include in the `causal_history` depends on the application. @@ -250,7 +265,8 @@ participants SHOULD periodically send sync messages to maintain state. These sync messages: * MUST be sent with empty content -* MUST include an incremented Lamport timestamp +* MUST include a Lamport timestamp increased to `max(timeNowInMs, current_lamport_timestamp + 1)`, +where `timeNowInMs` is the current epoch time in milliseconds. * MUST include causal history and bloom filter according to regular message rules * MUST NOT be added to the unacknowledged outgoing buffer * MUST NOT be included in causal histories of subsequent messages @@ -281,82 +297,181 @@ Upon reception, ephemeral messages SHOULD be delivered immediately without buffering for causal dependencies or including in the local log. -## Implementation Suggestions +### SDS Repair (SDS-R) -This section provides practical guidance based on the js-waku implementation of SDS. +SDS Repair (SDS-R) is an optional extension module for SDS, +allowing participants in a communication to collectively repair any gaps in causal history (missing messages) +preferably over a limited time window. +Since SDS-R acts as coordinated rebroadcasting of missing messages, +which involves all participants of the communication, +it is most appropriate in a limited use case for repairing relatively recent missed dependencies. +It is not meant to replace mechanisms for long-term consistency, +such as peer-to-peer syncing or the use of a high-availability centralised cache (Store node). -### Default Configuration Values +#### SDS-R message fields -The js-waku implementation uses the following defaults: -- **Bloom filter capacity**: 10,000 messages -- **Bloom filter error rate**: 0.001 (0.1% false positive rate) -- **Causal history size**: 200 message IDs -- **Possible ACKs threshold**: 2 bloom filter hits before considering a message acknowledged +SDS-R adds the following fields to SDS messages: +* `sender_id` in `HistoryEntry`: +the original message sender's participant ID. +This is used to determine the group of participants who will respond to a repair request. +* `repair_request` in `Message`: +a capped list of history entries missing for the message sender +and for which it's requesting a repair. -With 200 messages in causal history, assuming 32-byte message IDs and 32-byte retrieval hints (e.g., Waku message hashes), -each message carries 200 × 64 bytes = 12.8 KB of causal history overhead. +#### SDS-R participant state -### External Task Scheduling +SDS-R adds the following to each participant state: -The js-waku implementation delegates periodic task scheduling to the library consumer by providing methods: +* Outgoing **repair request buffer**: +a list of locally missing `HistoryEntry`s +each mapped to a future request timestamp, `T_req`, +after which this participant will request a repair if at that point the missing dependency has not been repaired yet. +`T_req` is computed as a pseudorandom backoff from the timestamp when the dependency was detected missing. +[Determining `T_req`](#determine-t_req) is described below. +We RECOMMEND that the outgoing repair request buffer be chronologically ordered in ascending order of `T_req`. +- Incoming **repair request buffer**: +a list of locally available `HistoryEntry`s +that were requested for repair by a remote participant +AND for which this participant might be an eligible responder, +each mapped to a future response timestamp, `T_resp`, +after which this participant will rebroadcast the corresponding requested `Message` if at that point no other participant had rebroadcast the `Message`. +`T_resp` is computed as a pseudorandom backoff from the timestamp when the repair was first requested. +[Determining `T_resp`](#determine-t_resp) is described below. +We describe below how a participant can [determine if they're an eligible responder](#determine-response-group) for a specific repair request. +- Augmented local history log: +for each message ID kept in the local log for which the participant could be a repair responder, +the full SDS `Message` must be cached rather than just the message ID, +in case this participant is called upon to rebroadcast the message. +We describe below how a participant can [determine if they're an eligible responder](#determine-response-group) for a specific message. -- `processTasks()`: Process queued send/receive operations -- `sweepIncomingBuffer()`: Check and deliver messages with met dependencies, returns missing dependencies -- `sweepOutgoingBuffer()`: Return unacknowledged and possibly acknowledged messages for retry -- `pushOutgoingSyncMessage(callback)`: Send a sync message +**_Note:_** The required state can likely be significantly reduced in future by simply requiring that a responding participant should _reconstruct_ the original `Message` when rebroadcasting, rather than the simpler, but heavier, requirement of caching the entire received `Message` content in local history. -The implementation does not include internal timers, -allowing applications to integrate SDS with their existing scheduling infrastructure. +#### SDS-R global state -### Message Processing +For a specific channel (that is, within a specific SDS-controlled communication) +the following SDS-R configuration state SHOULD be common for all participants in the conversation: -#### Handling Missing Dependencies +* `T_min`: the _minimum_ time period to wait before a missing causal entry can be repaired. +We RECOMMEND a value of at least 30 seconds. +* `T_max`: the _maximum_ time period over which missing causal entries can be repaired. +We RECOMMEND a value of between 120 and 600 seconds. -When `sweepIncomingBuffer()` returns missing dependencies, -the implementation emits an `InMessageMissing` event with `HistoryEntry[]` containing: -- `messageId`: The missing message identifier -- `retrievalHint`: Optional bytes to help retrieve the message (e.g., transport-specific hash) +Furthermore, to avoid a broadcast storm with multiple participants responding to a repair request, +participants in a single channel MAY be divided into discrete response groups. +Participants will only respond to a repair request if they are in the response group for that request. +The global `num_response_groups` variable configures the number of response groups for this communication. +Its use is described below. +A reasonable default value for `num_response_groups` is one response group for every `128` participants. +In other words, if the (roughly) expected number of participants is expressed as `num_participants`, then +`num_response_groups = num_participants div 128 + 1`. +In other words, if there are fewer than 128 participants in a communication, +they will all belong to the same response group. -#### Timeout for Lost Messages +We RECOMMEND that the global state variables `T_min`, `T_max` and `num_response_groups` be set _statically_ for a specific SDS-R application, +based on expected number of group participants and volume of traffic. -The `timeoutForLostMessagesMs` option allows marking messages as irretrievably lost after a timeout. -When configured, the implementation emits an `InMessageLost` event after the timeout expires. +**_Note:_** Future versions of this protocol will recommend dynamic global SDS-R variables, based on the current number of participants. -### Events Emitted +#### SDS-R send message -The js-waku implementation uses a `TypedEventEmitter` pattern to emit events for: -- **Incoming messages**: received, delivered, missing dependencies, lost (after timeout) -- **Outgoing messages**: sent, acknowledged, possibly acknowledged -- **Sync messages**: sent, received -- **Errors**: task execution failures +SDS-R adds the following steps when sending a message: -### SDK Usage: ReliableChannel +Before broadcasting a message, +* the participant SHOULD populate the `repair_request` field in the message +with _eligible_ entries from the outgoing repair request buffer. +An entry is eligible to be included in a `repair_request` +if its corresponding request timestamp, `T_req`, has expired (in other words, `T_req <= current_time`). +The maximum number of repair request entries to include is up to the application. +We RECOMMEND that this quota be filled by the eligible entries from the outgoing repair request buffer with the lowest `T_req`. +We RECOMMEND a maximum of 3 entries. +If there are no eligible entries in the buffer, this optional field MUST be left unset. -The SDK provides a high-level `ReliableChannel` abstraction that wraps the core SDS `MessageChannel` with automatic task scheduling and Waku protocol integration: +#### SDS-R receive message -#### Configuration +On receiving a message, +* the participant MUST remove entries matching the received message ID from its _outgoing_ repair request buffer. +This ensures that the participant does not request repairs for dependencies that have now been met. +* the participant MUST remove entries matching the received message ID from its _incoming_ repair request buffer. +This ensures that the participant does not respond to repair requests that another participant has already responded to. +* the participant SHOULD check for any unmet causal dependencies that do not yet have a corresponding entry in its outgoing repair request buffer. +For each such dependency, the participant SHOULD add a new entry against a unique `T_req` timestamp. +It MUST compute the `T_req` for each such HistoryEntry according to the steps outlined in [_Determine T_req_](#determine-t_req). +* for each item in the `repair_request` field: + - the participant MUST remove entries matching the repair message ID from its own outgoing repair request buffer. + This limits the number of participants that will request a common missing dependency. + - if the participant has the requested `Message` in its local history _and_ is an eligible responder for the repair request, + it SHOULD add the request to its incoming repair request buffer against a unique `T_resp` timestamp for that entry. + It MUST compute the `T_resp` for each such repair request according to the steps outlined in [_Determine T_resp_](#determine-t_resp). + It MUST determine if it's an eligible responder for a repair request according to the steps outlined in [_Determine response group_](#determine-response-group). -The ReliableChannel uses these default intervals: -- **Sync message interval**: 30 seconds minimum between sync messages (randomized backoff) -- **Retry interval**: 30 seconds for unacknowledged messages -- **Max retry attempts**: 10 attempts before giving up -- **Store query interval**: 10 seconds for missing message retrieval +#### Determine T_req -#### Task Scheduling Implementation +A participant determines the repair request timestamp, `T_req`, +for a missing `HistoryEntry` as follows: -The SDK automatically schedules SDS periodic tasks: -- **Sync messages**: Uses exponential backoff with randomization; sent faster (0.5x multiplier) after receiving content to acknowledge others -- **Outgoing buffer sweeps**: Triggered after each retry interval for unacknowledged messages -- **Incoming buffer sweeps**: Performed after each incoming message and during missing message retrieval -- **Process tasks**: Called immediately after sending/receiving messages and during sync +``` +T_req = current_time + hash(participant_id, message_id) % (T_max - T_min) + T_min +``` -#### Integration with Waku Protocols +where `current_time` is the current timestamp, +`participant_id` is the participant's _own_ participant ID (not the `sender_id` in the missing `HistoryEntry`), +`message_id` is the missing `HistoryEntry`'s message ID, +and `T_min` and `T_max` are as set out in [SDS-R global state](#sds-r-global-state). -ReliableChannel integrates SDS with Waku: -- **Sending**: Uses LightPush or Relay protocols; includes Waku message hash as retrieval hint (32 bytes) -- **Receiving**: Subscribes via Filter protocol; unwraps SDS messages before passing to application -- **Missing message retrieval**: Queries Store nodes using retrieval hints from causal history -- **Query on connect**: Automatically queries Store when connecting to new peers (enabled by default) +This allows `T_req` to be pseudorandomly and linearly distributed as a backoff of between `T_min` and `T_max` from current time. + +> **_Note:_** placing `T_req` values on an exponential backoff curve will likely be more appropriate and is left for a future improvement. + +#### Determine T_resp + +A participant determines the repair response timestamp, `T_resp`, +for a `HistoryEntry` that it could repair as follows: + +``` +distance = hash(participant_id) XOR hash(sender_id) +T_resp = current_time + distance*hash(message_id) % T_max +``` + +where `current_time` is the current timestamp, +`participant_id` is the participant's _own_ (local) participant ID, +`sender_id` is the requested `HistoryEntry` sender ID, +`message_id` is the requested `HistoryEntry` message ID, +and `T_max` is as set out in [SDS-R global state](#sds-r-global-state). + +We first calculate the logical `distance` between the local `participant_id` and the original `sender_id`. +If this participant is the original sender, the `distance` will be `0`. +It should then be clear that the original participant will have a response backoff time of `0`, making it the most likely responder. +The `T_resp` values for other eligible participants will be pseudorandomly and linearly distributed as a backoff of up to `T_max` from current time. + +> **_Note:_** placing `T_resp` values on an exponential backoff curve will likely be more appropriate and is left for a future improvement. + +#### Determine response group + +Given a message with `sender_id` and `message_id`, +a participant with `participant_id` is in the response group for that message if + +``` +hash(participant_id, message_id) % num_response_groups == hash(sender_id, message_id) % num_response_groups +``` + +where `num_response_groups` is as set out in [SDS-R global state](#sds-r-global-state). +This ensures that a participant will always be in the response group for its own published messages. +It also allows participants to determine immediately on first reception of a message or a history entry +if they are in the associated response group. + +#### SDS-R incoming repair request buffer sweep + +An SDS-R participant MUST periodically check if there are any incoming requests in the *incoming repair request buffer* that is due for a response. +For each item in the buffer, +the participant SHOULD broadcast the corresponding `Message` from local history +if its corresponding response timestamp, `T_resp`, has expired (in other words, `T_resp <= current_time`). + +#### SDS-R Periodic Sync Message + +If the participant is due to send a periodic sync message, +it SHOULD send the message according to [SDS-R send message](#sds-r-send-message) +if there are any eligible items in the outgoing repair request buffer, +regardless of whether other participants have also recently broadcast a Periodic Sync message. ## Copyright