diff --git a/nim_chat_poc.nimble b/nim_chat_poc.nimble index 7c10bce..fa4866a 100644 --- a/nim_chat_poc.nimble +++ b/nim_chat_poc.nimble @@ -5,13 +5,18 @@ author = "jazzz" description = "An example of the chat sdk in Nim" license = "MIT" srcDir = "src" -bin = @["nim_chat_poc"] +bin = @["nim_chat_poc", "dev"] # Basic build task task initialize, "Initialize the project after cloning": exec "./initialize.sh" +# # Clean +# task cleandeps, "Remove and refresh dependencies": +# rm -rf ~/.nimble/pkgs2/sds-* +# rm -rf ~/.nimble/pkgcache/githubcom_jazzznimsds* + # Dependencies @@ -27,3 +32,4 @@ requires "confutils" requires "eth" requires "regex" requires "web3" +requires "file:///Users/jazzz/dev/nim-sds#dev" diff --git a/protos/private_v1.proto b/protos/private_v1.proto index f5eb691..9d12c4f 100644 --- a/protos/private_v1.proto +++ b/protos/private_v1.proto @@ -14,7 +14,7 @@ message Placeholder { message PrivateV1Frame { string conversation_id = 1; bytes sender = 2; - + int64 timestamp = 3; // Sender reported timestamp oneof frame_type { common_frames.ContentFrame content = 10; Placeholder placeholder = 11; diff --git a/src/chat_sdk/client.nim b/src/chat_sdk/client.nim index 33998d5..66ad952 100644 --- a/src/chat_sdk/client.nim +++ b/src/chat_sdk/client.nim @@ -6,6 +6,7 @@ import # Foreign chronicles, chronos, + sds, sequtils, std/tables, std/sequtils, @@ -34,8 +35,10 @@ logScope: ################################################# type - MessageCallback[T] = proc(conversation: Conversation, msg: T): Future[void] {.async.} - NewConvoCallback = proc(conversation: Conversation): Future[void] {.async.} + MessageCallback*[T] = proc(conversation: Conversation, msg: T): Future[void] {.async.} + NewConvoCallback* = proc(conversation: Conversation): Future[void] {.async.} + ReadReceiptCallback* = proc(conversation: Conversation, + msgId: string): Future[void] {.async.} type KeyEntry* = object @@ -53,6 +56,7 @@ type Client* = ref object newMessageCallbacks: seq[MessageCallback[ContentFrame]] newConvoCallbacks: seq[NewConvoCallback] + readReceiptCallbacks: seq[ReadReceiptCallback] ################################################# # Constructors @@ -64,6 +68,8 @@ proc newClient*(name: string, cfg: WakuConfig): Client {.raises: [IOError, try: let waku = initWakuClient(cfg) + let rm = newReliabilityManager().valueOr: + raise newException(ValueError, fmt"SDS InitializationError") var q = QueueRef(queue: newAsyncQueue[ChatPayload](10)) var c = Client(ident: createIdentity(name), @@ -130,6 +136,14 @@ proc notifyNewConversation(client: Client, convo: Conversation) = for cb in client.newConvoCallbacks: discard cb(convo) +proc onReadReceipt*(client: Client, callback: ReadReceiptCallback) = + client.readReceiptCallbacks.add(callback) + +proc notifyReadReceipt(client: Client, convo: Conversation, + messageId: MessageId) = + for cb in client.readReceiptCallbacks: + discard cb(convo, messageId) + ################################################# # Functional ################################################# @@ -144,7 +158,7 @@ proc createIntroBundle*(self: var Client): IntroBundle = self.keyStore[ephemeralKey.getPublicKey().bytes().bytesToHex()] = KeyEntry( keyType: "ephemeral", privateKey: ephemeralKey, - timestamp: getTimestamp() + timestamp: getCurrentTimestamp() ) result = IntroBundle( @@ -189,11 +203,19 @@ proc newPrivateConversation*(client: Client, participant: @(destPubkey.bytes()), participantEphemeralId: introBundle.ephemeralId, discriminator: "test" - ) + ) + + + let env = wrapEnv(encrypt(InboxV1Frame(invitePrivateV1: invite, recipient: "")), convoId) - let convo = initPrivateV1(client.identity(), destPubkey, "default") + let deliveryAckCb = proc( + conversation: Conversation, + msgId: string): Future[void] {.async.} = + client.notifyReadReceipt(conversation, msgId) + + let convo = initPrivateV1(client.identity(), destPubkey, "default", deliveryAckCb) client.addConversation(convo) # TODO: Subscribe to new content topic diff --git a/src/chat_sdk/conversation_store.nim b/src/chat_sdk/conversation_store.nim index 38383f6..4eb7493 100644 --- a/src/chat_sdk/conversation_store.nim +++ b/src/chat_sdk/conversation_store.nim @@ -4,6 +4,7 @@ import ./conversations/convo_type import crypto import identity import proto_types +import types type ConvoId = string @@ -16,3 +17,5 @@ type proc notifyNewMessage(self: Self, convo: Conversation, content: ContentFrame) + proc notifyReadReceipt(self: Self, convo: Conversation, + msgId: MessageId) diff --git a/src/chat_sdk/conversations/private_v1.nim b/src/chat_sdk/conversations/private_v1.nim index 6ee2e99..00d2762 100644 --- a/src/chat_sdk/conversations/private_v1.nim +++ b/src/chat_sdk/conversations/private_v1.nim @@ -1,10 +1,12 @@ - +import blake2 import chronicles import chronos +import sds import std/[sequtils, strutils, strformat] import std/algorithm import sugar +import tables import ../conversation_store import ../crypto @@ -13,15 +15,16 @@ import ../delivery/waku_client import ../[ identity, proto_types, + types, utils ] import convo_type - type PrivateV1* = ref object of Conversation # Placeholder for PrivateV1 conversation type + sdsClient: ReliabilityManager owner: Identity topic: string participants: seq[PublicKey] @@ -48,27 +51,81 @@ proc derive_topic(participants: seq[PublicKey], discriminator: string): string = ## Derives a topic from the participants' public keys. return "/convo/private/" & getConvoIdRaw(participants, discriminator) +proc calcMsgId(self: PrivateV1, msgBytes: seq[byte]): string = + let s = fmt"{self.getConvoId()}|{msgBytes}" + result = getBlake2b(s, 16, "") + + proc encrypt*(convo: PrivateV1, frame: PrivateV1Frame): EncryptedPayload = result = EncryptedPayload(plaintext: Plaintext(payload: encode(frame))) proc decrypt*(convo: PrivateV1, enc: EncryptedPayload): PrivateV1Frame = result = decode(enc.plaintext.payload, PrivateV1Frame).get() + +proc wireCallbacks(convo: PrivateV1, deliveryAckCb: proc( + conversation: Conversation, + msgId: string): Future[void] {.async.} = nil) = + ## Accepts lambdas/functions to be called from Reliability Manager callbacks. + let funcMsg = proc(messageId: SdsMessageID, + channelId: SdsChannelID) {.gcsafe.} = + debug "sds message ready", messageId = messageId, + channelId = channelId + + let funcDeliveryAck = proc(messageId: SdsMessageID, + channelId: SdsChannelID) {.gcsafe.} = + debug "sds message ack", messageId = messageId, + channelId = channelId, cb = repr(deliveryAckCb) + + if deliveryAckCb != nil: + asyncSpawn deliveryAckCb(convo, messageId) + + let funcDroppedMsg = proc(messageId: SdsMessageID, missingDeps: seq[ + SdsMessageID], channelId: SdsChannelID) {.gcsafe.} = + debug "sds message missing", messageId = messageId, + missingDeps = missingDeps, channelId = channelId + + convo.sdsClient.setCallbacks( + funcMsg, funcDeliveryAck, funcDroppedMsg + ) + + + proc initPrivateV1*(owner: Identity, participant: PublicKey, - discriminator: string = "default"): PrivateV1 = + discriminator: string = "default", deliveryAckCb: proc( + conversation: Conversation, + msgId: string): Future[void] {.async.} = nil): + PrivateV1 = var participants = @[owner.getPubkey(), participant]; - return PrivateV1( + var rm = newReliabilityManager().valueOr: + raise newException(ValueError, fmt"sds initialization: {repr(error)}") + + result = PrivateV1( + sdsClient: rm, owner: owner, topic: derive_topic(participants, discriminator), participants: participants, discriminator: discriminator ) + result.wireCallbacks(deliveryAckCb) + + result.sdsClient.ensureChannel(result.getConvoId()).isOkOr: + raise newException(ValueError, "bad sds channel") + proc sendFrame(self: PrivateV1, ds: WakuClient, msg: PrivateV1Frame): Future[void]{.async.} = - let encryptedBytes = EncryptedPayload(plaintext: Plaintext(payload: encode(msg))) + + let frameBytes = encode(msg) + let msgId = self.calcMsgId(frameBytes) + let sdsPayload = self.sdsClient.wrapOutgoingMessage(frameBytes, msgId, + self.getConvoId()).valueOr: + raise newException(ValueError, fmt"sds wrapOutgoingMessage failed: {repr(error)}") + + let encryptedBytes = EncryptedPayload(plaintext: Plaintext( + payload: sdsPayload)) discard ds.sendPayload(self.getTopic(), encryptedBytes.toEnvelope( self.getConvoId())) @@ -81,8 +138,19 @@ proc handleFrame*[T: ConversationStore](convo: PrivateV1, client: T, ## Dispatcher for Incoming `PrivateV1Frames`. ## Calls further processing depending on the kind of frame. - let enc = decode(bytes, EncryptedPayload).get() # TODO: handle result - let frame = convo.decrypt(enc) # TODO: handle result + let enc = decode(bytes, EncryptedPayload).valueOr: + raise newException(ValueError, fmt"Failed to decode EncryptedPayload: {repr(error)}") + + # TODO: Decrypt the payload + let (frameData, missingDeps, channelId) = convo.sdsClient.unwrapReceivedMessage( + enc.plaintext.payload).valueOr: + raise newException(ValueError, fmt"Failed to unwrap SDS message:{repr(error)}") + + debug "sds unwrap", convo = convo.id(), missingDeps = missingDeps, + channelId = channelId + + let frame = decode(frameData, PrivateV1Frame).valueOr: + raise newException(ValueError, "Failed to decode SdsM: " & error) if frame.sender == @(convo.owner.getPubkey().bytes()): notice "Self Message", convo = convo.id() @@ -102,7 +170,7 @@ method sendMessage*(convo: PrivateV1, ds: WakuClient, try: let frame = PrivateV1Frame(sender: @(convo.owner.getPubkey().bytes()), - content: content_frame) + timestamp: getCurrentTimestamp(), content: content_frame) await convo.sendFrame(ds, frame) except Exception as e: diff --git a/src/chat_sdk/delivery/waku_client.nim b/src/chat_sdk/delivery/waku_client.nim index 16dab9c..f882346 100644 --- a/src/chat_sdk/delivery/waku_client.nim +++ b/src/chat_sdk/delivery/waku_client.nim @@ -118,7 +118,7 @@ proc buildWakuNode(cfg: WakuConfig): WakuNode = builder.withNetworkConfigurationDetails(ip, Port(cfg.port)).tryGet() let node = builder.build().tryGet() - node.mountMetadata(cfg.clusterId, @[1'u16, 2'u16]).expect("failed to mount waku metadata protocol") + node.mountMetadata(cfg.clusterId, cfg.shardId).expect("failed to mount waku metadata protocol") result = node diff --git a/src/chat_sdk/inbox.nim b/src/chat_sdk/inbox.nim index 7ede48b..df1ca53 100644 --- a/src/chat_sdk/inbox.nim +++ b/src/chat_sdk/inbox.nim @@ -72,8 +72,12 @@ proc createPrivateV1FromInvite*[T: ConversationStore](client: T, let destPubkey = loadPublicKeyFromBytes(invite.initiator).valueOr: raise newException(ValueError, "Invalid public key in intro bundle.") - let convo = initPrivateV1(client.identity(), destPubkey, "default") + let deliveryAckCb = proc( + conversation: Conversation, + msgId: string): Future[void] {.async.} = + client.notifyReadReceipt(conversation, msgId) + let convo = initPrivateV1(client.identity(), destPubkey, "default", deliveryAckCb) notice "Creating PrivateV1 conversation", client = client.getId(), topic = convo.getConvoId() client.addConversation(convo) diff --git a/src/chat_sdk/types.nim b/src/chat_sdk/types.nim index bb42ea2..6f44be7 100644 --- a/src/chat_sdk/types.nim +++ b/src/chat_sdk/types.nim @@ -1 +1,4 @@ type ChatError* = string + + +type MessageId* = string diff --git a/src/chat_sdk/utils.nim b/src/chat_sdk/utils.nim index 043075a..237f5f4 100644 --- a/src/chat_sdk/utils.nim +++ b/src/chat_sdk/utils.nim @@ -4,11 +4,11 @@ import crypto import blake2 import strutils -proc getTimestamp*(): Timestamp = +proc getCurrentTimestamp*(): Timestamp = result = waku_core.getNanosecondTime(getTime().toUnix()) -proc hash_func*(s: string): string = +proc hash_func*(s: string | seq[byte]): string = # This should be Blake2s but it does not exist so substituting with Blake2b result = getBlake2b(s, 4, "") diff --git a/src/dev.nim b/src/dev.nim index a9b767e..8db0a06 100644 --- a/src/dev.nim +++ b/src/dev.nim @@ -1,6 +1,191 @@ -## Utilties for development and debugging +import chronos +import chronicles +import strformat +import tables + +import sds + +import std/typetraits +import sequtils + +logScope: + topics = "dev" + +type A = ref object + asg: string proc dir*[T](obj: T) = echo "Object of type: ", T for name, value in fieldPairs(obj): - echo " ", name, ": ", value + if type(value) is string: + echo " ", name, ": ", value + if type(value) is Table: + echo "hmmmm" + +proc `$`(rm: ReliabilityManager): string = + result = "RM" + + + +type Msg = object + id*: string + shouldDrop: bool + data*: seq[byte] + + + + +proc send(rm: ReliabilityManager, channel: string, id: string, msg: seq[ + byte]): seq[byte] = + result = rm.wrapOutgoingMessage(msg, id, channel).valueOr: + raise newException(ValueError, "Bad outgoing message") + +proc recv(rm: ReliabilityManager, bytes: seq[byte]) = + let (unwrapped, missingDeps, channelId) = rm.unwrapReceivedMessage(bytes).valueOr: + raise newException(ValueError, "Bad unwrap") + info "RECV", channel = channelId, data = unwrapped, mdeps = missingDeps + + + +proc main() {.async.} = + + var messageSentCount = 0 + let CHANNEL = "CHANNEL" + let msg = @[byte(1), 2, 3] + let msgId = "test-msg-1" + + var rm = newReliabilityManager().valueOr: + raise newException(ValueError, fmt"SDS InitializationError") + + rm.setCallbacks( + proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = + echo "OnMsgReady" + discard, + proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = + echo "OnMsgSent" + messageSentCount += 1, + proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], + channelId: SdsChannelID) {.gcsafe.} = + echo "OnMissing" + discard, + ) + + + let sourceMsgs = @[Msg(id: "1", shouldDrop: false, data: @[byte(1), 2, 3]), + Msg(id: "2", shouldDrop: true, data: @[byte(0), 5, 6]), + Msg(id: "3", shouldDrop: false, data: @[byte(7), 8, 9]), + ] + rm.ensureChannel(CHANNEL).isOkOr: + raise newException(ValueError, "Bad channel") + + + let encodedMessage = sourceMsgs.map(proc(m: Msg): seq[byte] = + rm.wrapOutgoingMessage(m.data, m.id, CHANNEL).valueOr: + raise newException(ValueError, "Bad outgoing message")) + + var i = 0 + var droppedMessages: seq[seq[byte]] = @[] + for x in encodedMessage: + if i mod 2 == 0: + droppedMessages.add(x) + inc(i) + + + + # let droppedMessages = encodedMessage.keepIf(proc(item: seq[ + # byte]): bool = + # items.find(item) mod 2 == 0 + # ) + # let droppedMessages = enumerate(encodedMessage).filter(proc(x: seq[ + # byte]): bool = x[0] mod 2 == 0) + + for s in droppedMessages: + info "DROPPED", len = len(s) + + for m in droppedMessages: + let (unwrapped, missingDeps, channelId) = rm.unwrapReceivedMessage(m).valueOr: + raise newException(ValueError, "Bad unwrap") + + info "RECV", channel = channelId, data = unwrapped, mdeps = missingDeps + + +proc messageSequence(){.async.} = + var messageSentCount = 0 + let CHANNEL = "CHANNEL" + + + var raya = newReliabilityManager().valueOr: + raise newException(ValueError, fmt"SDS InitializationError") + + var saro = newReliabilityManager().valueOr: + raise newException(ValueError, fmt"SDS InitializationError") + + raya.setCallbacks( + proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = + debug "OnMsgReady", client = "raya", id = messageId + discard, + proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = + debug "OnMsgSent", client = "raya", id = messageId + messageSentCount += 1, + proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], + channelId: SdsChannelID) {.gcsafe.} = + info "OnMissing", client = "raya", id = messageId + discard, + ) + + saro.setCallbacks( + proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = + debug "OnMsgReady", client = "saro", id = messageId + discard, + proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = + debug "OnMsgSent", client = "saro", id = messageId + messageSentCount += 1, + proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], + channelId: SdsChannelID) {.gcsafe.} = + info "OnMissing", client = "saro", id = messageId + discard, + ) + + raya.ensureChannel(CHANNEL).isOkOr: + raise newException(ValueError, "Bad channel") + + saro.ensureChannel(CHANNEL).isOkOr: + raise newException(ValueError, "Bad channel") + + + let s1 = saro.send(CHANNEL, "s1", @[byte(1), 1, 1]) + raya.recv(s1) + + let r1 = raya.send(CHANNEL, "r1", @[byte(2), 1, 2]) + saro.recv(r1) + + let r2 = raya.send(CHANNEL, "r2", @[byte(2), 2, 2]) + # saro.recv(r2) + + let r3 = raya.send(CHANNEL, "r3", @[byte(2), 3, 2]) + saro.recv(r3) + + let s2 = saro.send(CHANNEL, "s2", @[byte(1), 2, 1]) + raya.recv(s2) + + let s3 = saro.send(CHANNEL, "s3", @[byte(1), 3, 1]) + raya.recv(s3) + + + let r4 = raya.send(CHANNEL, "r4", @[byte(2), 4, 2]) + saro.recv(r4) + saro.recv(r2) + + let r5 = raya.send(CHANNEL, "r5", @[byte(2), 5, 2]) + saro.recv(r5) + + let r6 = raya.send(CHANNEL, "r6", @[byte(2), 6, 2]) + saro.recv(r6) + + saro.recv(r4) + saro.recv(r3) + + saro.recv(r4) +when isMainModule: + waitFor messageSequence() + echo ">>>" diff --git a/src/nim_chat_poc.nim b/src/nim_chat_poc.nim index 14364c6..a207e43 100644 --- a/src/nim_chat_poc.nim +++ b/src/nim_chat_poc.nim @@ -9,7 +9,6 @@ import chat_sdk/utils import content_types/all - const SELF_DEFINED = 99 type ImageFrame {.proto3.} = object @@ -69,6 +68,11 @@ proc main() {.async.} = await convo.sendMessage(saro.ds, initImage( "https://waku.org/theme/image/logo-black.svg")) ) + + saro.onReadReceipt(proc(convo: Conversation, msgId: string) {.async.} = + echo " Saro -- Read Receipt for " & msgId + ) + await saro.start() var raya = newClient("Raya", cfg_raya) @@ -82,6 +86,10 @@ proc main() {.async.} = echo " ------> Raya :: New Conversation: " & convo.id() await convo.sendMessage(raya.ds, initTextFrame("Hello").toContentFrame()) ) + raya.onReadReceipt(proc(convo: Conversation, msgId: string) {.async.} = + echo " raya -- Read Receipt for " & msgId + ) + await raya.start()