feat: integrate sds-r with message channels

This commit is contained in:
jm-clius 2025-10-03 16:44:08 +01:00
parent fac33e2fb6
commit 211a4da63f

231
sds.md
View File

@ -58,6 +58,9 @@ other participants using the corresponding message ID.
* **Participant ID:**
Each participant has a globally unique, immutable ID
visible to other participants in the communication.
* **Sender ID:**
The *Participant ID* of the original sender of a message,
often coupled with a *Message ID*.
## Wire protocol
@ -75,15 +78,20 @@ syntax = "proto3";
message HistoryEntry {
string message_id = 1; // Unique identifier of the SDS message, as defined in `Message`
optional bytes retrieval_hint = 2; // Optional information to help remote parties retrieve this SDS message; For example, A Waku deterministic message hash or routing payload hash
optional string sender_id = 3; // Participant ID of original message sender. Only populated if using optional SDS Repair extension
}
message Message {
string sender_id = 1; // Participant ID of the message sender
string message_id = 2; // Unique identifier of the message
string channel_id = 3; // Identifier of the channel to which the message belongs
optional int32 lamport_timestamp = 10; // Logical timestamp for causal ordering in channel
optional uint64 lamport_timestamp = 10; // Logical timestamp for causal ordering in channel
repeated HistoryEntry causal_history = 11; // List of preceding message IDs that this message causally depends on. Generally 2 or 3 message IDs are included.
optional bytes bloom_filter = 12; // Bloom filter representing received message IDs in channel
repeated HistoryEntry repair_request = 13; // Capped list of history entries missing from sender's causal history. Only populated if using the optional SDS Repair extension.
optional bytes content = 20; // Actual content of the message
}
```
@ -111,7 +119,11 @@ Its importance is expected to increase once a p2p retrieval mechanism is added t
Each participant MUST maintain:
* A Lamport timestamp for each channel of communication,
initialized to current epoch time in nanosecond resolution.
initialized to current epoch time in millisecond resolution.
The Lamport timestamp is increased as described in the [protocol steps](#protocol-steps)
to maintain a logical ordering of events while staying close to the current epoch time.
This allows the messages from new joiners to be correctly ordered with other recent messages,
without these new participants first having to synchronize past messages to discover the current Lamport timestamp.
* A bloom filter for received message IDs per channel.
The bloom filter SHOULD be rolled over and
recomputed once it reaches a predefined capacity of message IDs.
@ -144,8 +156,11 @@ the `lamport_timestamp`, `causal_history` and `bloom_filter` fields.
Before broadcasting a message:
* the participant MUST increase its local Lamport timestamp by `1` and
include this in the `lamport_timestamp` field.
* the participant MUST set its local Lamport timestamp
to the maximum between the current value + `1`
and the current epoch time in milliseconds.
In other words the local Lamport timestamp is set to `max(timeNowInMs, current_lamport_timestamp + 1)`.
* the participant MUST include the increased Lamport timestamp in the message's `lamport_timestamp` field.
* the participant MUST determine the preceding few message IDs in the local history
and include these in an ordered list in the `causal_history` field.
The number of message IDs to include in the `causal_history` depends on the application.
@ -250,7 +265,8 @@ participants SHOULD periodically send sync messages to maintain state.
These sync messages:
* MUST be sent with empty content
* MUST include an incremented Lamport timestamp
* MUST include a Lamport timestamp increased to `max(timeNowInMs, current_lamport_timestamp + 1)`,
where `timeNowInMs` is the current epoch time in milliseconds.
* MUST include causal history and bloom filter according to regular message rules
* MUST NOT be added to the unacknowledged outgoing buffer
* MUST NOT be included in causal histories of subsequent messages
@ -281,82 +297,181 @@ Upon reception,
ephemeral messages SHOULD be delivered immediately without buffering for causal dependencies
or including in the local log.
## Implementation Suggestions
### SDS Repair (SDS-R)
This section provides practical guidance based on the js-waku implementation of SDS.
SDS Repair (SDS-R) is an optional extension module for SDS,
allowing participants in a communication to collectively repair any gaps in causal history (missing messages)
preferably over a limited time window.
Since SDS-R acts as coordinated rebroadcasting of missing messages,
which involves all participants of the communication,
it is most appropriate in a limited use case for repairing relatively recent missed dependencies.
It is not meant to replace mechanisms for long-term consistency,
such as peer-to-peer syncing or the use of a high-availability centralised cache (Store node).
### Default Configuration Values
#### SDS-R message fields
The js-waku implementation uses the following defaults:
- **Bloom filter capacity**: 10,000 messages
- **Bloom filter error rate**: 0.001 (0.1% false positive rate)
- **Causal history size**: 200 message IDs
- **Possible ACKs threshold**: 2 bloom filter hits before considering a message acknowledged
SDS-R adds the following fields to SDS messages:
* `sender_id` in `HistoryEntry`:
the original message sender's participant ID.
This is used to determine the group of participants who will respond to a repair request.
* `repair_request` in `Message`:
a capped list of history entries missing for the message sender
and for which it's requesting a repair.
With 200 messages in causal history, assuming 32-byte message IDs and 32-byte retrieval hints (e.g., Waku message hashes),
each message carries 200 × 64 bytes = 12.8 KB of causal history overhead.
#### SDS-R participant state
### External Task Scheduling
SDS-R adds the following to each participant state:
The js-waku implementation delegates periodic task scheduling to the library consumer by providing methods:
* Outgoing **repair request buffer**:
a list of locally missing `HistoryEntry`s
each mapped to a future request timestamp, `T_req`,
after which this participant will request a repair if at that point the missing dependency has not been repaired yet.
`T_req` is computed as a pseudorandom backoff from the timestamp when the dependency was detected missing.
[Determining `T_req`](#determine-t_req) is described below.
We RECOMMEND that the outgoing repair request buffer be chronologically ordered in ascending order of `T_req`.
- Incoming **repair request buffer**:
a list of locally available `HistoryEntry`s
that were requested for repair by a remote participant
AND for which this participant might be an eligible responder,
each mapped to a future response timestamp, `T_resp`,
after which this participant will rebroadcast the corresponding requested `Message` if at that point no other participant had rebroadcast the `Message`.
`T_resp` is computed as a pseudorandom backoff from the timestamp when the repair was first requested.
[Determining `T_resp`](#determine-t_resp) is described below.
We describe below how a participant can [determine if they're an eligible responder](#determine-response-group) for a specific repair request.
- Augmented local history log:
for each message ID kept in the local log for which the participant could be a repair responder,
the full SDS `Message` must be cached rather than just the message ID,
in case this participant is called upon to rebroadcast the message.
We describe below how a participant can [determine if they're an eligible responder](#determine-response-group) for a specific message.
- `processTasks()`: Process queued send/receive operations
- `sweepIncomingBuffer()`: Check and deliver messages with met dependencies, returns missing dependencies
- `sweepOutgoingBuffer()`: Return unacknowledged and possibly acknowledged messages for retry
- `pushOutgoingSyncMessage(callback)`: Send a sync message
**_Note:_** The required state can likely be significantly reduced in future by simply requiring that a responding participant should _reconstruct_ the original `Message` when rebroadcasting, rather than the simpler, but heavier, requirement of caching the entire received `Message` content in local history.
The implementation does not include internal timers,
allowing applications to integrate SDS with their existing scheduling infrastructure.
#### SDS-R global state
### Message Processing
For a specific channel (that is, within a specific SDS-controlled communication)
the following SDS-R configuration state SHOULD be common for all participants in the conversation:
#### Handling Missing Dependencies
* `T_min`: the _minimum_ time period to wait before a missing causal entry can be repaired.
We RECOMMEND a value of at least 30 seconds.
* `T_max`: the _maximum_ time period over which missing causal entries can be repaired.
We RECOMMEND a value of between 120 and 600 seconds.
When `sweepIncomingBuffer()` returns missing dependencies,
the implementation emits an `InMessageMissing` event with `HistoryEntry[]` containing:
- `messageId`: The missing message identifier
- `retrievalHint`: Optional bytes to help retrieve the message (e.g., transport-specific hash)
Furthermore, to avoid a broadcast storm with multiple participants responding to a repair request,
participants in a single channel MAY be divided into discrete response groups.
Participants will only respond to a repair request if they are in the response group for that request.
The global `num_response_groups` variable configures the number of response groups for this communication.
Its use is described below.
A reasonable default value for `num_response_groups` is one response group for every `128` participants.
In other words, if the (roughly) expected number of participants is expressed as `num_participants`, then
`num_response_groups = num_participants div 128 + 1`.
In other words, if there are fewer than 128 participants in a communication,
they will all belong to the same response group.
#### Timeout for Lost Messages
We RECOMMEND that the global state variables `T_min`, `T_max` and `num_response_groups` be set _statically_ for a specific SDS-R application,
based on expected number of group participants and volume of traffic.
The `timeoutForLostMessagesMs` option allows marking messages as irretrievably lost after a timeout.
When configured, the implementation emits an `InMessageLost` event after the timeout expires.
**_Note:_** Future versions of this protocol will recommend dynamic global SDS-R variables, based on the current number of participants.
### Events Emitted
#### SDS-R send message
The js-waku implementation uses a `TypedEventEmitter` pattern to emit events for:
- **Incoming messages**: received, delivered, missing dependencies, lost (after timeout)
- **Outgoing messages**: sent, acknowledged, possibly acknowledged
- **Sync messages**: sent, received
- **Errors**: task execution failures
SDS-R adds the following steps when sending a message:
### SDK Usage: ReliableChannel
Before broadcasting a message,
* the participant SHOULD populate the `repair_request` field in the message
with _eligible_ entries from the outgoing repair request buffer.
An entry is eligible to be included in a `repair_request`
if its corresponding request timestamp, `T_req`, has expired (in other words, `T_req <= current_time`).
The maximum number of repair request entries to include is up to the application.
We RECOMMEND that this quota be filled by the eligible entries from the outgoing repair request buffer with the lowest `T_req`.
We RECOMMEND a maximum of 3 entries.
If there are no eligible entries in the buffer, this optional field MUST be left unset.
The SDK provides a high-level `ReliableChannel` abstraction that wraps the core SDS `MessageChannel` with automatic task scheduling and Waku protocol integration:
#### SDS-R receive message
#### Configuration
On receiving a message,
* the participant MUST remove entries matching the received message ID from its _outgoing_ repair request buffer.
This ensures that the participant does not request repairs for dependencies that have now been met.
* the participant MUST remove entries matching the received message ID from its _incoming_ repair request buffer.
This ensures that the participant does not respond to repair requests that another participant has already responded to.
* the participant SHOULD check for any unmet causal dependencies that do not yet have a corresponding entry in its outgoing repair request buffer.
For each such dependency, the participant SHOULD add a new entry against a unique `T_req` timestamp.
It MUST compute the `T_req` for each such HistoryEntry according to the steps outlined in [_Determine T_req_](#determine-t_req).
* for each item in the `repair_request` field:
- the participant MUST remove entries matching the repair message ID from its own outgoing repair request buffer.
This limits the number of participants that will request a common missing dependency.
- if the participant has the requested `Message` in its local history _and_ is an eligible responder for the repair request,
it SHOULD add the request to its incoming repair request buffer against a unique `T_resp` timestamp for that entry.
It MUST compute the `T_resp` for each such repair request according to the steps outlined in [_Determine T_resp_](#determine-t_resp).
It MUST determine if it's an eligible responder for a repair request according to the steps outlined in [_Determine response group_](#determine-response-group).
The ReliableChannel uses these default intervals:
- **Sync message interval**: 30 seconds minimum between sync messages (randomized backoff)
- **Retry interval**: 30 seconds for unacknowledged messages
- **Max retry attempts**: 10 attempts before giving up
- **Store query interval**: 10 seconds for missing message retrieval
#### Determine T_req
#### Task Scheduling Implementation
A participant determines the repair request timestamp, `T_req`,
for a missing `HistoryEntry` as follows:
The SDK automatically schedules SDS periodic tasks:
- **Sync messages**: Uses exponential backoff with randomization; sent faster (0.5x multiplier) after receiving content to acknowledge others
- **Outgoing buffer sweeps**: Triggered after each retry interval for unacknowledged messages
- **Incoming buffer sweeps**: Performed after each incoming message and during missing message retrieval
- **Process tasks**: Called immediately after sending/receiving messages and during sync
```
T_req = current_time + hash(participant_id, message_id) % (T_max - T_min) + T_min
```
#### Integration with Waku Protocols
where `current_time` is the current timestamp,
`participant_id` is the participant's _own_ participant ID (not the `sender_id` in the missing `HistoryEntry`),
`message_id` is the missing `HistoryEntry`'s message ID,
and `T_min` and `T_max` are as set out in [SDS-R global state](#sds-r-global-state).
ReliableChannel integrates SDS with Waku:
- **Sending**: Uses LightPush or Relay protocols; includes Waku message hash as retrieval hint (32 bytes)
- **Receiving**: Subscribes via Filter protocol; unwraps SDS messages before passing to application
- **Missing message retrieval**: Queries Store nodes using retrieval hints from causal history
- **Query on connect**: Automatically queries Store when connecting to new peers (enabled by default)
This allows `T_req` to be pseudorandomly and linearly distributed as a backoff of between `T_min` and `T_max` from current time.
> **_Note:_** placing `T_req` values on an exponential backoff curve will likely be more appropriate and is left for a future improvement.
#### Determine T_resp
A participant determines the repair response timestamp, `T_resp`,
for a `HistoryEntry` that it could repair as follows:
```
distance = hash(participant_id) XOR hash(sender_id)
T_resp = current_time + distance*hash(message_id) % T_max
```
where `current_time` is the current timestamp,
`participant_id` is the participant's _own_ (local) participant ID,
`sender_id` is the requested `HistoryEntry` sender ID,
`message_id` is the requested `HistoryEntry` message ID,
and `T_max` is as set out in [SDS-R global state](#sds-r-global-state).
We first calculate the logical `distance` between the local `participant_id` and the original `sender_id`.
If this participant is the original sender, the `distance` will be `0`.
It should then be clear that the original participant will have a response backoff time of `0`, making it the most likely responder.
The `T_resp` values for other eligible participants will be pseudorandomly and linearly distributed as a backoff of up to `T_max` from current time.
> **_Note:_** placing `T_resp` values on an exponential backoff curve will likely be more appropriate and is left for a future improvement.
#### Determine response group
Given a message with `sender_id` and `message_id`,
a participant with `participant_id` is in the response group for that message if
```
hash(participant_id, message_id) % num_response_groups == hash(sender_id, message_id) % num_response_groups
```
where `num_response_groups` is as set out in [SDS-R global state](#sds-r-global-state).
This ensures that a participant will always be in the response group for its own published messages.
It also allows participants to determine immediately on first reception of a message or a history entry
if they are in the associated response group.
#### SDS-R incoming repair request buffer sweep
An SDS-R participant MUST periodically check if there are any incoming requests in the *incoming repair request buffer* that is due for a response.
For each item in the buffer,
the participant SHOULD broadcast the corresponding `Message` from local history
if its corresponding response timestamp, `T_resp`, has expired (in other words, `T_resp <= current_time`).
#### SDS-R Periodic Sync Message
If the participant is due to send a periodic sync message,
it SHOULD send the message according to [SDS-R send message](#sds-r-send-message)
if there are any eligible items in the outgoing repair request buffer,
regardless of whether other participants have also recently broadcast a Periodic Sync message.
## Copyright