From 106c02321cdcc550f4231ffb647d881a77633e59 Mon Sep 17 00:00:00 2001 From: kaichaosun Date: Wed, 14 May 2025 16:28:27 +0800 Subject: [PATCH] chore: compile the segmentation logic. --- chat_sdk/segmentation.nim | 472 +++++++++++++++++++++----------------- 1 file changed, 257 insertions(+), 215 deletions(-) diff --git a/chat_sdk/segmentation.nim b/chat_sdk/segmentation.nim index 222ab6f..ff38105 100644 --- a/chat_sdk/segmentation.nim +++ b/chat_sdk/segmentation.nim @@ -1,12 +1,23 @@ import math, times, sequtils, strutils, options import nimcrypto # For Keccak256 hashing -import logging # Placeholder for logging import results import leopard # Import nim-leopard for Reed-Solomon +import chronicles # External dependencies (still needed) # import protobuf # Nim protobuf library (e.g., protobuf-nim) +# SegmentMessage type (unchanged) +type + SegmentMessage* = ref object + entireMessageHash*: seq[byte] + index*: uint32 + segmentsCount*: uint32 + paritySegmentIndex*: uint32 + paritySegmentsCount*: uint32 + payload*: seq[byte] + + # Placeholder types (unchanged) type WakuNewMessage = object @@ -22,7 +33,8 @@ type sigPubKey: seq[byte] Persistence = object - # Placeholder for persistence interface + completedMessages: Table[seq[byte], bool] # Mock storage for completed message hashes + segments: Table[(seq[byte], seq[byte]), seq[SegmentMessage]] # Stores segments by (hash, sigPubKey) # Error definitions (unchanged) const @@ -37,15 +49,6 @@ const SegmentsParityRate = 0.125 SegmentsReedsolomonMaxCount = 256 -# SegmentMessage type (unchanged) -type - SegmentMessage* = ref object - entireMessageHash*: seq[byte] - index*: uint32 - segmentsCount*: uint32 - paritySegmentIndex*: uint32 - paritySegmentsCount*: uint32 - payload*: seq[byte] # Validation methods (unchanged) proc isValid*(s: SegmentMessage): bool = @@ -59,219 +62,258 @@ type MessageSender* = ref object messaging: Messaging persistence: Persistence - logger: Logger Messaging = object maxMessageSize: int +proc initPersistence(): Persistence = + Persistence( + completedMessages: initTable[seq[byte], bool](), + segments: initTable[(seq[byte], seq[byte]), seq[SegmentMessage]]() + ) + +proc isMessageAlreadyCompleted(p: Persistence, hash: seq[byte]): Result[bool, string] = + return ok(p.completedMessages.getOrDefault(hash, false)) + +proc saveMessageSegment(p: var Persistence, segment: SegmentMessage, sigPubKey: seq[byte], timestamp: int64): Result[void, string] = + let key = (segment.entireMessageHash, sigPubKey) + # Initialize or append to the segments list + if not p.segments.hasKey(key): + p.segments[key] = @[] + p.segments[key].add(segment) + return ok() + +proc getMessageSegments(p: Persistence, hash: seq[byte], sigPubKey: seq[byte]): Result[seq[SegmentMessage], string] = + let key = (hash, sigPubKey) + return ok(p.segments.getOrDefault(key, @[])) + +proc completeMessageSegments(p: var Persistence, hash: seq[byte], sigPubKey: seq[byte], timestamp: int64): Result[void, string] = + p.completedMessages[hash] = true + return ok() + +# Replicate message (unchanged) +proc replicateMessageWithNewPayload(message: WakuNewMessage, payload: seq[byte]): Result[WakuNewMessage, string] = + var copiedMessage = WakuNewMessage(payload: payload) + return ok(copiedMessage) + +proc protoMarshal(msg: SegmentMessage): Result[seq[byte], string] = + return err("protoMarshal not implemented") + +proc protoUnmarshal(data: seq[byte], msg: var SegmentMessage): Result[void, string] = + return err("protoUnmarshal not implemented") + +# Segment message into smaller chunks (updated with nim-leopard) +proc segmentMessageInternal(newMessage: WakuNewMessage, segmentSize: int): Result[seq[WakuNewMessage], string] = + if newMessage.payload.len <= segmentSize: + return ok(@[newMessage]) + + let entireMessageHash = keccak256.digest(newMessage.payload) + let entirePayloadSize = newMessage.payload.len + + let segmentsCount = int(ceil(entirePayloadSize.float / segmentSize.float)) + let paritySegmentsCount = int(floor(segmentsCount.float * SegmentsParityRate)) + + var segmentPayloads = newSeq[seq[byte]](segmentsCount + paritySegmentsCount) + var segmentMessages = newSeq[WakuNewMessage](segmentsCount) + + for i in 0.. entirePayloadSize: + endIndex = entirePayloadSize + + let segmentPayload = newMessage.payload[start.. SegmentsReedsolomonMaxCount: + return ok(segmentMessages) + + # Align last segment payload for Reed-Solomon (leopard requires fixed-size shards) + let lastSegmentPayload = segmentPayloads[segmentsCount-1] + segmentPayloads[segmentsCount-1] = newSeq[byte](segmentSize) + segmentPayloads[segmentsCount-1][0.. 0: + payloads[firstSegmentMessage.segmentsCount - 1] = lastNonParitySegmentPayload + + # Combine payload + var entirePayload = newSeq[byte]() + for i in 0.. entirePayloadSize: -# end = entirePayloadSize - -# let segmentPayload = newMessage.payload[start.. SegmentsReedsolomonMaxCount: -# return ok(segmentMessages) - -# # Align last segment payload for Reed-Solomon (leopard requires fixed-size shards) -# let lastSegmentPayload = segmentPayloads[segmentsCount-1] -# segmentPayloads[segmentsCount-1] = newSeq[byte](segmentSize) -# copy(lastSegmentPayload, segmentPayloads[segmentsCount-1]) - -# # Allocate space for parity shards -# for i in segmentsCount..<(segmentsCount + paritySegmentsCount): -# segmentPayloads[i] = newSeq[byte](segmentSize) - -# # Use nim-leopard for Reed-Solomon encoding -# let encodeResult = leopard.encode(segmentPayloads, segmentsCount, paritySegmentsCount) -# if encodeResult.isErr: -# return err("failed to encode segments with leopard: " & encodeResult.error) - -# # Create parity messages -# for i in segmentsCount..<(segmentsCount + paritySegmentsCount): -# let parityIndex = i - segmentsCount -# let segmentWithMetadata = SegmentMessage( -# entireMessageHash: entireMessageHash.data, -# segmentsCount: 0, -# paritySegmentIndex: uint32(parityIndex), -# paritySegmentsCount: uint32(paritySegmentsCount), -# payload: segmentPayloads[i] -# ) - -# let marshaledSegment = protoMarshal(segmentWithMetadata) -# if marshaledSegment.isErr: -# return err("failed to marshal parity SegmentMessage: " & marshaledSegment.error) - -# let segmentMessage = replicateMessageWithNewPayload(newMessage, marshaledSegment.get()) -# if segmentMessage.isErr: -# return err("failed to replicate parity message: " & segmentMessage.error) - -# segmentMessages.add(segmentMessage.get()) - -# return ok(segmentMessages) - -# # Handle SegmentationLayerV2 (updated with nim-leopard) -# proc handleSegmentationLayerV2*(s: MessageSender, message: StatusMessage): Result[void, string] = -# let logger = s.logger.withFields( -# "site", "handleSegmentationLayerV2", -# "hash", message.transportLayer.hash.toHex -# ) - -# var segmentMessage = SegmentMessage() -# let unmarshalResult = protoUnmarshal(message.transportLayer.payload, segmentMessage) -# if unmarshalResult.isErr: -# return err("failed to unmarshal SegmentMessage: " & unmarshalResult.error) - -# logger.debug("handling message segment", -# "EntireMessageHash", segmentMessage.entireMessageHash.toHex, -# "Index", $segmentMessage.index, -# "SegmentsCount", $segmentMessage.segmentsCount, -# "ParitySegmentIndex", $segmentMessage.paritySegmentIndex, -# "ParitySegmentsCount", $segmentMessage.paritySegmentsCount -# ) - -# let alreadyCompleted = s.persistence.isMessageAlreadyCompleted(segmentMessage.entireMessageHash) -# if alreadyCompleted.isErr: -# return err(alreadyCompleted.error) -# if alreadyCompleted.get(): -# return err(ErrMessageSegmentsAlreadyCompleted) - -# if not segmentMessage.isValid(): -# return err(ErrMessageSegmentsInvalidCount) - -# let saveResult = s.persistence.saveMessageSegment(segmentMessage, message.transportLayer.sigPubKey, getTime().toUnix) -# if saveResult.isErr: -# return err(saveResult.error) - -# let segments = s.persistence.getMessageSegments(segmentMessage.entireMessageHash, message.transportLayer.sigPubKey) -# if segments.isErr: -# return err(segments.error) - -# if segments.get().len == 0: -# return err("unexpected state: no segments found after save operation") - -# let firstSegmentMessage = segments.get()[0] -# let lastSegmentMessage = segments.get()[^1] - -# if firstSegmentMessage.isParityMessage() or segments.get().len != int(firstSegmentMessage.segmentsCount): -# return err(ErrMessageSegmentsIncomplete) - -# var payloads = newSeq[seq[byte]](firstSegmentMessage.segmentsCount + lastSegmentMessage.paritySegmentsCount) -# let payloadSize = firstSegmentMessage.payload.len - -# let restoreUsingParityData = lastSegmentMessage.isParityMessage() -# if not restoreUsingParityData: -# for i, segment in segments.get(): -# payloads[i] = segment.payload -# else: -# var lastNonParitySegmentPayload: seq[byte] -# for segment in segments.get(): -# if not segment.isParityMessage(): -# if segment.index == firstSegmentMessage.segmentsCount - 1: -# payloads[segment.index] = newSeq[byte](payloadSize) -# copy(segment.payload, payloads[segment.index]) -# lastNonParitySegmentPayload = segment.payload -# else: -# payloads[segment.index] = segment.payload -# else: -# payloads[firstSegmentMessage.segmentsCount + segment.paritySegmentIndex] = segment.payload - -# # Use nim-leopard for Reed-Solomon reconstruction -# let reconstructResult = leopard.decode(payloads, int(firstSegmentMessage.segmentsCount), int(lastSegmentMessage.paritySegmentsCount)) -# if reconstructResult.isErr: -# return err("failed to reconstruct payloads with leopard: " & reconstructResult.error) - -# # Verify by checking hash (leopard doesn't have a direct verify function) -# var tempPayload = newSeq[byte]() -# for i in 0.. 0: -# payloads[firstSegmentMessage.segmentsCount - 1] = lastNonParitySegmentPayload - -# # Combine payload -# var entirePayload = newSeq[byte]() -# for i in 0..