diff --git a/chat_sdk.nimble b/chat_sdk.nimble index b7d1da0..199cdf5 100644 --- a/chat_sdk.nimble +++ b/chat_sdk.nimble @@ -17,3 +17,6 @@ task buildStaticLib, "Build static library for C bindings": task migrate, "Run database migrations": exec "nim c -r apps/run_migration.nim" + +task segment, "Run segmentation": + exec "nim c -r chat_sdk/segmentation.nim" diff --git a/chat_sdk/db.nim b/chat_sdk/db.nim new file mode 100644 index 0000000..cde7bf0 --- /dev/null +++ b/chat_sdk/db.nim @@ -0,0 +1,121 @@ +import std/strutils +import results +import db_connector/db_sqlite +import db_models + +type MessageSegmentsPersistence = object + db: DbConn + +proc removeMessageSegmentsOlderThan*( + self: MessageSegmentsPersistence, timestamp: int64 +): Result[void, string] = + try: + self.db.exec(sql"DELETE FROM message_segments WHERE timestamp < ?", timestamp) + ok() + except DbError as e: + err("remove message segments with error: " & e.msg) + +proc removeMessageSegmentsCompletedOlderThan*( + self: MessageSegmentsPersistence, timestamp: int64 +): Result[void, string] = + try: + self.db.exec( + sql"DELETE FROM message_segments_completed WHERE timestamp < ?", timestamp + ) + ok() + except DbError as e: + err("remove message segments completed with error: " & e.msg) + +proc isMessageAlreadyCompleted*( + self: MessageSegmentsPersistence, hash: seq[byte] +): Result[bool, string] = + try: + let row = self.db.getRow( + sql"SELECT COUNT(*) FROM message_segments_completed WHERE hash = ?", hash + ) + if row.len == 0: + return ok(false) + let count = row[0].parseInt + ok(count > 0) + except CatchableError as e: + err("check message already completed with error: " & e.msg) + +proc saveMessageSegment*( + self: MessageSegmentsPersistence, + segment: SegmentMessage, + sigPubKeyBlob: seq[byte], + timestamp: int64, +): Result[void, string] = + try: + self.db.exec( + sql""" + INSERT INTO message_segments ( + hash, segment_index, segments_count, parity_segment_index, + parity_segments_count, sig_pub_key, payload, timestamp + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", + segment.entireMessageHash, + segment.index, + segment.segmentsCount, + segment.paritySegmentIndex, + segment.paritySegmentsCount, + sigPubKeyBlob, + segment.payload, + timestamp, + ) + ok() + except DbError as e: + err("save message segments with error: " & e.msg) + +proc getMessageSegments*(self: MessageSegmentsPersistence, hash: seq[byte], sigPubKeyBlob: seq[byte]): Result[seq[SegmentMessage], string] = + var segments = newSeq[SegmentMessage]() + let query = sql""" + SELECT + hash, segment_index, segments_count, parity_segment_index, parity_segments_count, payload + FROM + message_segments + WHERE + hash = ? AND sig_pub_key = ? + ORDER BY + (segments_count = 0) ASC, + segment_index ASC, + parity_segment_index ASC + """ + + try: + for row in self.db.rows(query, hash, sigPubKeyBlob): + let segment = SegmentMessage( + entireMessageHash: cast[seq[byte]](row[0]), + index: uint32(parseInt(row[1])), + segmentsCount: uint32(parseInt(row[2])), + paritySegmentIndex: uint32(parseInt(row[3])), + paritySegmentsCount: uint32(parseInt(row[4])), + payload: cast[seq[byte]](row[5]) + ) + segments.add(segment) + return ok(segments) + except CatchableError as e: + return err("get Message Segments with error: " & e.msg) + +proc completeMessageSegments*( + self: MessageSegmentsPersistence, + hash: seq[byte], + sigPubKeyBlob: seq[byte], + timestamp: int64 +): Result[void, string] = + try: + self.db.exec(sql"BEGIN") + + # Delete old message segments + self.db.exec(sql"DELETE FROM message_segments WHERE hash = ? AND sig_pub_key = ?", hash, sigPubKeyBlob) + + # Insert completed marker + self.db.exec(sql"INSERT INTO message_segments_completed (hash, sig_pub_key, timestamp) VALUES (?, ?, ?)", + hash, sigPubKeyBlob, timestamp) + + self.db.exec(sql"COMMIT") + return ok() + except DbError as e: + try: + self.db.exec(sql"ROLLBACK") + except: discard + return err("complete segment messages with error: " & e.msg) \ No newline at end of file diff --git a/chat_sdk/db_models.nim b/chat_sdk/db_models.nim new file mode 100644 index 0000000..392760b --- /dev/null +++ b/chat_sdk/db_models.nim @@ -0,0 +1,8 @@ +type + SegmentMessage* = ref object + entireMessageHash*: seq[byte] + index*: uint32 + segmentsCount*: uint32 + paritySegmentIndex*: uint32 + paritySegmentsCount*: uint32 + payload*: seq[byte] diff --git a/chat_sdk/segmentation.nim b/chat_sdk/segmentation.nim index 3a61390..1f027ef 100644 --- a/chat_sdk/segmentation.nim +++ b/chat_sdk/segmentation.nim @@ -1,7 +1,7 @@ import math, times, sequtils, strutils, options -import nimcrypto # For Keccak256 hashing +import nimcrypto import results -import leopard # Import nim-leopard for Reed-Solomon +import leopard import chronicles # External dependencies (still needed) @@ -196,8 +196,7 @@ proc segmentMessageInternal(newMessage: WakuNewMessage, segmentSize: int): Resul return ok(segmentMessages) -# Handle SegmentationLayerV2 (updated with nim-leopard) -proc handleSegmentationLayerV2*(s: MessageSender, message: var StatusMessage): Result[void, string] = +proc handleSegmentationLayer*(s: MessageSender, message: var StatusMessage): Result[void, string] = logScope: site = "handleSegmentationLayerV2" hash = message.transportLayer.hash.toHex @@ -303,10 +302,6 @@ proc handleSegmentationLayerV2*(s: MessageSender, message: var StatusMessage): R message.transportLayer.payload = entirePayload return ok() -# Other procs (unchanged) -proc handleSegmentationLayerV1*(s: MessageSender, message: StatusMessage): Result[void, string] = - # Same as previous translation - discard proc cleanupSegments*(s: MessageSender): Result[void, string] = # Same as previous translation diff --git a/migrations/002_create_message_segments_table.sql b/migrations/002_create_message_segments_table.sql new file mode 100644 index 0000000..df75ee7 --- /dev/null +++ b/migrations/002_create_message_segments_table.sql @@ -0,0 +1,19 @@ +CREATE TABLE IF NOT EXISTS message_segments ( + hash BLOB NOT NULL, + segment_index INTEGER NOT NULL, + segments_count INTEGER NOT NULL, + payload BLOB NOT NULL, + sig_pub_key BLOB NOT NULL, + timestamp INTEGER DEFAULT 0, + PRIMARY KEY (hash, sig_pub_key, segment_index) ON CONFLICT REPLACE +); + +CREATE TABLE IF NOT EXISTS message_segments_completed ( + hash BLOB NOT NULL, + sig_pub_key BLOB NOT NULL, + timestamp INTEGER DEFAULT 0, + PRIMARY KEY (hash, sig_pub_key) +); + +CREATE INDEX idx_message_segments_timestamp ON message_segments(timestamp); +CREATE INDEX idx_message_segments_completed_timestamp ON message_segments_completed(timestamp); \ No newline at end of file