Added delivery acknowledgements

This commit is contained in:
Jazz Turner-Baggs 2025-09-05 15:35:47 -07:00
parent 1d2aa3453f
commit c151548516
11 changed files with 321 additions and 22 deletions

View File

@ -5,13 +5,18 @@ author = "jazzz"
description = "An example of the chat sdk in Nim"
license = "MIT"
srcDir = "src"
bin = @["nim_chat_poc"]
bin = @["nim_chat_poc", "dev"]
# Basic build task
task initialize, "Initialize the project after cloning":
exec "./initialize.sh"
# # Clean
# task cleandeps, "Remove and refresh dependencies":
# rm -rf ~/.nimble/pkgs2/sds-*
# rm -rf ~/.nimble/pkgcache/githubcom_jazzznimsds*
# Dependencies
@ -27,3 +32,4 @@ requires "confutils"
requires "eth"
requires "regex"
requires "web3"
requires "file:///Users/jazzz/dev/nim-sds#dev"

View File

@ -14,7 +14,7 @@ message Placeholder {
message PrivateV1Frame {
string conversation_id = 1;
bytes sender = 2;
int64 timestamp = 3; // Sender reported timestamp
oneof frame_type {
common_frames.ContentFrame content = 10;
Placeholder placeholder = 11;

View File

@ -6,6 +6,7 @@
import # Foreign
chronicles,
chronos,
sds,
sequtils,
std/tables,
std/sequtils,
@ -34,8 +35,10 @@ logScope:
#################################################
type
MessageCallback[T] = proc(conversation: Conversation, msg: T): Future[void] {.async.}
NewConvoCallback = proc(conversation: Conversation): Future[void] {.async.}
MessageCallback*[T] = proc(conversation: Conversation, msg: T): Future[void] {.async.}
NewConvoCallback* = proc(conversation: Conversation): Future[void] {.async.}
ReadReceiptCallback* = proc(conversation: Conversation,
msgId: string): Future[void] {.async.}
type KeyEntry* = object
@ -53,6 +56,7 @@ type Client* = ref object
newMessageCallbacks: seq[MessageCallback[ContentFrame]]
newConvoCallbacks: seq[NewConvoCallback]
readReceiptCallbacks: seq[ReadReceiptCallback]
#################################################
# Constructors
@ -64,6 +68,8 @@ proc newClient*(name: string, cfg: WakuConfig): Client {.raises: [IOError,
try:
let waku = initWakuClient(cfg)
let rm = newReliabilityManager().valueOr:
raise newException(ValueError, fmt"SDS InitializationError")
var q = QueueRef(queue: newAsyncQueue[ChatPayload](10))
var c = Client(ident: createIdentity(name),
@ -130,6 +136,14 @@ proc notifyNewConversation(client: Client, convo: Conversation) =
for cb in client.newConvoCallbacks:
discard cb(convo)
proc onReadReceipt*(client: Client, callback: ReadReceiptCallback) =
client.readReceiptCallbacks.add(callback)
proc notifyReadReceipt(client: Client, convo: Conversation,
messageId: MessageId) =
for cb in client.readReceiptCallbacks:
discard cb(convo, messageId)
#################################################
# Functional
#################################################
@ -144,7 +158,7 @@ proc createIntroBundle*(self: var Client): IntroBundle =
self.keyStore[ephemeralKey.getPublicKey().bytes().bytesToHex()] = KeyEntry(
keyType: "ephemeral",
privateKey: ephemeralKey,
timestamp: getTimestamp()
timestamp: getCurrentTimestamp()
)
result = IntroBundle(
@ -189,11 +203,19 @@ proc newPrivateConversation*(client: Client,
participant: @(destPubkey.bytes()),
participantEphemeralId: introBundle.ephemeralId,
discriminator: "test"
)
)
let env = wrapEnv(encrypt(InboxV1Frame(invitePrivateV1: invite,
recipient: "")), convoId)
let convo = initPrivateV1(client.identity(), destPubkey, "default")
let deliveryAckCb = proc(
conversation: Conversation,
msgId: string): Future[void] {.async.} =
client.notifyReadReceipt(conversation, msgId)
let convo = initPrivateV1(client.identity(), destPubkey, "default", deliveryAckCb)
client.addConversation(convo)
# TODO: Subscribe to new content topic

View File

@ -4,6 +4,7 @@ import ./conversations/convo_type
import crypto
import identity
import proto_types
import types
type ConvoId = string
@ -16,3 +17,5 @@ type
proc notifyNewMessage(self: Self, convo: Conversation,
content: ContentFrame)
proc notifyReadReceipt(self: Self, convo: Conversation,
msgId: MessageId)

View File

@ -1,10 +1,12 @@
import blake2
import chronicles
import chronos
import sds
import std/[sequtils, strutils, strformat]
import std/algorithm
import sugar
import tables
import ../conversation_store
import ../crypto
@ -13,15 +15,16 @@ import ../delivery/waku_client
import ../[
identity,
proto_types,
types,
utils
]
import convo_type
type
PrivateV1* = ref object of Conversation
# Placeholder for PrivateV1 conversation type
sdsClient: ReliabilityManager
owner: Identity
topic: string
participants: seq[PublicKey]
@ -48,27 +51,81 @@ proc derive_topic(participants: seq[PublicKey], discriminator: string): string =
## Derives a topic from the participants' public keys.
return "/convo/private/" & getConvoIdRaw(participants, discriminator)
proc calcMsgId(self: PrivateV1, msgBytes: seq[byte]): string =
let s = fmt"{self.getConvoId()}|{msgBytes}"
result = getBlake2b(s, 16, "")
proc encrypt*(convo: PrivateV1, frame: PrivateV1Frame): EncryptedPayload =
result = EncryptedPayload(plaintext: Plaintext(payload: encode(frame)))
proc decrypt*(convo: PrivateV1, enc: EncryptedPayload): PrivateV1Frame =
result = decode(enc.plaintext.payload, PrivateV1Frame).get()
proc wireCallbacks(convo: PrivateV1, deliveryAckCb: proc(
conversation: Conversation,
msgId: string): Future[void] {.async.} = nil) =
## Accepts lambdas/functions to be called from Reliability Manager callbacks.
let funcMsg = proc(messageId: SdsMessageID,
channelId: SdsChannelID) {.gcsafe.} =
debug "sds message ready", messageId = messageId,
channelId = channelId
let funcDeliveryAck = proc(messageId: SdsMessageID,
channelId: SdsChannelID) {.gcsafe.} =
debug "sds message ack", messageId = messageId,
channelId = channelId, cb = repr(deliveryAckCb)
if deliveryAckCb != nil:
asyncSpawn deliveryAckCb(convo, messageId)
let funcDroppedMsg = proc(messageId: SdsMessageID, missingDeps: seq[
SdsMessageID], channelId: SdsChannelID) {.gcsafe.} =
debug "sds message missing", messageId = messageId,
missingDeps = missingDeps, channelId = channelId
convo.sdsClient.setCallbacks(
funcMsg, funcDeliveryAck, funcDroppedMsg
)
proc initPrivateV1*(owner: Identity, participant: PublicKey,
discriminator: string = "default"): PrivateV1 =
discriminator: string = "default", deliveryAckCb: proc(
conversation: Conversation,
msgId: string): Future[void] {.async.} = nil):
PrivateV1 =
var participants = @[owner.getPubkey(), participant];
return PrivateV1(
var rm = newReliabilityManager().valueOr:
raise newException(ValueError, fmt"sds initialization: {repr(error)}")
result = PrivateV1(
sdsClient: rm,
owner: owner,
topic: derive_topic(participants, discriminator),
participants: participants,
discriminator: discriminator
)
result.wireCallbacks(deliveryAckCb)
result.sdsClient.ensureChannel(result.getConvoId()).isOkOr:
raise newException(ValueError, "bad sds channel")
proc sendFrame(self: PrivateV1, ds: WakuClient,
msg: PrivateV1Frame): Future[void]{.async.} =
let encryptedBytes = EncryptedPayload(plaintext: Plaintext(payload: encode(msg)))
let frameBytes = encode(msg)
let msgId = self.calcMsgId(frameBytes)
let sdsPayload = self.sdsClient.wrapOutgoingMessage(frameBytes, msgId,
self.getConvoId()).valueOr:
raise newException(ValueError, fmt"sds wrapOutgoingMessage failed: {repr(error)}")
let encryptedBytes = EncryptedPayload(plaintext: Plaintext(
payload: sdsPayload))
discard ds.sendPayload(self.getTopic(), encryptedBytes.toEnvelope(
self.getConvoId()))
@ -81,8 +138,19 @@ proc handleFrame*[T: ConversationStore](convo: PrivateV1, client: T,
## Dispatcher for Incoming `PrivateV1Frames`.
## Calls further processing depending on the kind of frame.
let enc = decode(bytes, EncryptedPayload).get() # TODO: handle result
let frame = convo.decrypt(enc) # TODO: handle result
let enc = decode(bytes, EncryptedPayload).valueOr:
raise newException(ValueError, fmt"Failed to decode EncryptedPayload: {repr(error)}")
# TODO: Decrypt the payload
let (frameData, missingDeps, channelId) = convo.sdsClient.unwrapReceivedMessage(
enc.plaintext.payload).valueOr:
raise newException(ValueError, fmt"Failed to unwrap SDS message:{repr(error)}")
debug "sds unwrap", convo = convo.id(), missingDeps = missingDeps,
channelId = channelId
let frame = decode(frameData, PrivateV1Frame).valueOr:
raise newException(ValueError, "Failed to decode SdsM: " & error)
if frame.sender == @(convo.owner.getPubkey().bytes()):
notice "Self Message", convo = convo.id()
@ -102,7 +170,7 @@ method sendMessage*(convo: PrivateV1, ds: WakuClient,
try:
let frame = PrivateV1Frame(sender: @(convo.owner.getPubkey().bytes()),
content: content_frame)
timestamp: getCurrentTimestamp(), content: content_frame)
await convo.sendFrame(ds, frame)
except Exception as e:

View File

@ -118,7 +118,7 @@ proc buildWakuNode(cfg: WakuConfig): WakuNode =
builder.withNetworkConfigurationDetails(ip, Port(cfg.port)).tryGet()
let node = builder.build().tryGet()
node.mountMetadata(cfg.clusterId, @[1'u16, 2'u16]).expect("failed to mount waku metadata protocol")
node.mountMetadata(cfg.clusterId, cfg.shardId).expect("failed to mount waku metadata protocol")
result = node

View File

@ -72,8 +72,12 @@ proc createPrivateV1FromInvite*[T: ConversationStore](client: T,
let destPubkey = loadPublicKeyFromBytes(invite.initiator).valueOr:
raise newException(ValueError, "Invalid public key in intro bundle.")
let convo = initPrivateV1(client.identity(), destPubkey, "default")
let deliveryAckCb = proc(
conversation: Conversation,
msgId: string): Future[void] {.async.} =
client.notifyReadReceipt(conversation, msgId)
let convo = initPrivateV1(client.identity(), destPubkey, "default", deliveryAckCb)
notice "Creating PrivateV1 conversation", client = client.getId(),
topic = convo.getConvoId()
client.addConversation(convo)

View File

@ -1 +1,4 @@
type ChatError* = string
type MessageId* = string

View File

@ -4,11 +4,11 @@ import crypto
import blake2
import strutils
proc getTimestamp*(): Timestamp =
proc getCurrentTimestamp*(): Timestamp =
result = waku_core.getNanosecondTime(getTime().toUnix())
proc hash_func*(s: string): string =
proc hash_func*(s: string | seq[byte]): string =
# This should be Blake2s but it does not exist so substituting with Blake2b
result = getBlake2b(s, 4, "")

View File

@ -1,6 +1,191 @@
## Utilties for development and debugging
import chronos
import chronicles
import strformat
import tables
import sds
import std/typetraits
import sequtils
logScope:
topics = "dev"
type A = ref object
asg: string
proc dir*[T](obj: T) =
echo "Object of type: ", T
for name, value in fieldPairs(obj):
echo " ", name, ": ", value
if type(value) is string:
echo " ", name, ": ", value
if type(value) is Table:
echo "hmmmm"
proc `$`(rm: ReliabilityManager): string =
result = "RM"
type Msg = object
id*: string
shouldDrop: bool
data*: seq[byte]
proc send(rm: ReliabilityManager, channel: string, id: string, msg: seq[
byte]): seq[byte] =
result = rm.wrapOutgoingMessage(msg, id, channel).valueOr:
raise newException(ValueError, "Bad outgoing message")
proc recv(rm: ReliabilityManager, bytes: seq[byte]) =
let (unwrapped, missingDeps, channelId) = rm.unwrapReceivedMessage(bytes).valueOr:
raise newException(ValueError, "Bad unwrap")
info "RECV", channel = channelId, data = unwrapped, mdeps = missingDeps
proc main() {.async.} =
var messageSentCount = 0
let CHANNEL = "CHANNEL"
let msg = @[byte(1), 2, 3]
let msgId = "test-msg-1"
var rm = newReliabilityManager().valueOr:
raise newException(ValueError, fmt"SDS InitializationError")
rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
echo "OnMsgReady"
discard,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
echo "OnMsgSent"
messageSentCount += 1,
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID],
channelId: SdsChannelID) {.gcsafe.} =
echo "OnMissing"
discard,
)
let sourceMsgs = @[Msg(id: "1", shouldDrop: false, data: @[byte(1), 2, 3]),
Msg(id: "2", shouldDrop: true, data: @[byte(0), 5, 6]),
Msg(id: "3", shouldDrop: false, data: @[byte(7), 8, 9]),
]
rm.ensureChannel(CHANNEL).isOkOr:
raise newException(ValueError, "Bad channel")
let encodedMessage = sourceMsgs.map(proc(m: Msg): seq[byte] =
rm.wrapOutgoingMessage(m.data, m.id, CHANNEL).valueOr:
raise newException(ValueError, "Bad outgoing message"))
var i = 0
var droppedMessages: seq[seq[byte]] = @[]
for x in encodedMessage:
if i mod 2 == 0:
droppedMessages.add(x)
inc(i)
# let droppedMessages = encodedMessage.keepIf(proc(item: seq[
# byte]): bool =
# items.find(item) mod 2 == 0
# )
# let droppedMessages = enumerate(encodedMessage).filter(proc(x: seq[
# byte]): bool = x[0] mod 2 == 0)
for s in droppedMessages:
info "DROPPED", len = len(s)
for m in droppedMessages:
let (unwrapped, missingDeps, channelId) = rm.unwrapReceivedMessage(m).valueOr:
raise newException(ValueError, "Bad unwrap")
info "RECV", channel = channelId, data = unwrapped, mdeps = missingDeps
proc messageSequence(){.async.} =
var messageSentCount = 0
let CHANNEL = "CHANNEL"
var raya = newReliabilityManager().valueOr:
raise newException(ValueError, fmt"SDS InitializationError")
var saro = newReliabilityManager().valueOr:
raise newException(ValueError, fmt"SDS InitializationError")
raya.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
debug "OnMsgReady", client = "raya", id = messageId
discard,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
debug "OnMsgSent", client = "raya", id = messageId
messageSentCount += 1,
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID],
channelId: SdsChannelID) {.gcsafe.} =
info "OnMissing", client = "raya", id = messageId
discard,
)
saro.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
debug "OnMsgReady", client = "saro", id = messageId
discard,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
debug "OnMsgSent", client = "saro", id = messageId
messageSentCount += 1,
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID],
channelId: SdsChannelID) {.gcsafe.} =
info "OnMissing", client = "saro", id = messageId
discard,
)
raya.ensureChannel(CHANNEL).isOkOr:
raise newException(ValueError, "Bad channel")
saro.ensureChannel(CHANNEL).isOkOr:
raise newException(ValueError, "Bad channel")
let s1 = saro.send(CHANNEL, "s1", @[byte(1), 1, 1])
raya.recv(s1)
let r1 = raya.send(CHANNEL, "r1", @[byte(2), 1, 2])
saro.recv(r1)
let r2 = raya.send(CHANNEL, "r2", @[byte(2), 2, 2])
# saro.recv(r2)
let r3 = raya.send(CHANNEL, "r3", @[byte(2), 3, 2])
saro.recv(r3)
let s2 = saro.send(CHANNEL, "s2", @[byte(1), 2, 1])
raya.recv(s2)
let s3 = saro.send(CHANNEL, "s3", @[byte(1), 3, 1])
raya.recv(s3)
let r4 = raya.send(CHANNEL, "r4", @[byte(2), 4, 2])
saro.recv(r4)
saro.recv(r2)
let r5 = raya.send(CHANNEL, "r5", @[byte(2), 5, 2])
saro.recv(r5)
let r6 = raya.send(CHANNEL, "r6", @[byte(2), 6, 2])
saro.recv(r6)
saro.recv(r4)
saro.recv(r3)
saro.recv(r4)
when isMainModule:
waitFor messageSequence()
echo ">>>"

View File

@ -9,7 +9,6 @@ import chat_sdk/utils
import content_types/all
const SELF_DEFINED = 99
type ImageFrame {.proto3.} = object
@ -69,6 +68,11 @@ proc main() {.async.} =
await convo.sendMessage(saro.ds, initImage(
"https://waku.org/theme/image/logo-black.svg"))
)
saro.onReadReceipt(proc(convo: Conversation, msgId: string) {.async.} =
echo " Saro -- Read Receipt for " & msgId
)
await saro.start()
var raya = newClient("Raya", cfg_raya)
@ -82,6 +86,10 @@ proc main() {.async.} =
echo " ------> Raya :: New Conversation: " & convo.id()
await convo.sendMessage(raya.ds, initTextFrame("Hello").toContentFrame())
)
raya.onReadReceipt(proc(convo: Conversation, msgId: string) {.async.} =
echo " raya -- Read Receipt for " & msgId
)
await raya.start()