diff --git a/.gitmodules b/.gitmodules index cca78e2a0..a1757842c 100644 --- a/.gitmodules +++ b/.gitmodules @@ -168,4 +168,4 @@ ignore = untracked path = vendor/negentropy url = https://github.com/waku-org/negentropy.git - branch = master + branch = master \ No newline at end of file diff --git a/tests/waku_sync/sync_utils.nim b/tests/waku_sync/sync_utils.nim index c0d88b5fb..124c543e1 100644 --- a/tests/waku_sync/sync_utils.nim +++ b/tests/waku_sync/sync_utils.nim @@ -11,7 +11,7 @@ proc newTestWakuSync*( const DefaultFrameSize = 153600 let peerManager = PeerManager.new(switch) - proto = WakuSync.new(peerManager, DefaultFrameSize, 0.seconds, some(handler)) + proto = WakuSync.new(peerManager, DefaultFrameSize, 0.seconds, 0, some(handler)) assert proto != nil proto.start() diff --git a/tests/waku_sync/test_protocol.nim b/tests/waku_sync/test_protocol.nim index cd7ad8a3a..be1290206 100644 --- a/tests/waku_sync/test_protocol.nim +++ b/tests/waku_sync/test_protocol.nim @@ -429,10 +429,10 @@ suite "Waku Sync": check: s2.insert(msg2.timestamp, msgHash2).isOk() - let subrange1Res = SubRange.new(s1, 0, int64.high) + let subrange1Res = SubRange.new(s1, 0, uint64.high) assert subrange1Res.isOk(), $subrange1Res.error let subrange1 = subrange1Res.value - let subrange2Res = SubRange.new(s2, 0, int64.high) + let subrange2Res = SubRange.new(s2, 0, uint64.high) assert subrange2Res.isOk(), $subrange2Res.error let subrange2 = subrange2Res.value diff --git a/waku/waku_sync/codec.nim b/waku/waku_sync/codec.nim index 96cb8a728..bc50eb30f 100644 --- a/waku/waku_sync/codec.nim +++ b/waku/waku_sync/codec.nim @@ -59,6 +59,7 @@ proc decode*(T: type SyncPayload, buffer: seq[byte]): ProtobufResult[T] = else: req.hashes = newSeqOfCap[WakuMessageHash](buffer.len) for buf in buffer: - req.messageHashes.add(WakuMessageHash.fromBytes(buf)) + let msg: WakuMessageHash = fromBytes(buf) + req.hashes.add(msg) return ok(req) diff --git a/waku/waku_sync/common.nim b/waku/waku_sync/common.nim index 0d674d5da..37502f805 100644 --- a/waku/waku_sync/common.nim +++ b/waku/waku_sync/common.nim @@ -3,10 +3,16 @@ when (NimMajor, NimMinor) < (1, 4): else: {.push raises: [].} -import std/[options] +import std/[options], chronos import ../waku_core +const DefaultSyncInterval*: timer.Duration = Hour const WakuSyncCodec* = "/vac/waku/sync/1.0.0" +const DefaultFrameSize* = 153600 + +type WakuSyncCallback* = proc(hashes: seq[WakuMessageHash], syncPeer: RemotePeerInfo) {. + async: (raises: []), closure +.} type SyncPayload* = object rangeStart*: Option[uint64] @@ -14,6 +20,6 @@ type SyncPayload* = object frameSize*: Option[uint64] - negentropy*: seq[byte] + negentropy*: seq[byte] # negentropy protocol payload hashes*: seq[WakuMessageHash] diff --git a/waku/waku_sync/protocol.nim b/waku/waku_sync/protocol.nim index 42ce64b36..13632ade9 100644 --- a/waku/waku_sync/protocol.nim +++ b/waku/waku_sync/protocol.nim @@ -27,21 +27,14 @@ import logScope: topics = "waku sync" -const DefaultSyncInterval: timer.Duration = Hour -const DefaultFrameSize = 153600 - -type - WakuSyncCallback* = proc(hashes: seq[WakuMessageHash], syncPeer: RemotePeerInfo) {. - async: (raises: []), closure, gcsafe - .} - - WakuSync* = ref object of LPProtocol - storage: Storage - peerManager: PeerManager - maxFrameSize: int # Not sure if this should be protocol defined or not... - syncInterval: timer.Duration - callback: Option[WakuSyncCallback] - periodicSyncFut: Future[void] +type WakuSync* = ref object of LPProtocol + storage: Storage # Negentropy protocol storage + peerManager: PeerManager + maxFrameSize: int # Not sure if this should be protocol defined or not... + syncInterval: timer.Duration # Time between each syncronisation attempt + relayJitter: int64 # Time delay until all messages are mostly received network wide + callback: Option[WakuSyncCallback] # Callback with the result of the syncronisation + periodicSyncFut: Future[void] proc ingessMessage*(self: WakuSync, pubsubTopic: PubsubTopic, msg: WakuMessage) = if msg.ephemeral: @@ -50,24 +43,69 @@ proc ingessMessage*(self: WakuSync, pubsubTopic: PubsubTopic, msg: WakuMessage) # because what if messages is received via gossip and sync as well? # Might 2 entries to be inserted into storage which is inefficient. let msgHash: WakuMessageHash = computeMessageHash(pubsubTopic, msg) - info "inserting message into storage ", hash = msgHash, timestamp = msg.timestamp + + trace "inserting message into storage ", + hash = msgHash.toHex(), timestamp = msg.timestamp if self.storage.insert(msg.timestamp, msgHash).isErr(): debug "failed to insert message ", hash = msgHash.toHex() +proc calculateRange(relayJitter: int64): (int64, int64) = + var now = getNowInNanosecondTime() + + # Because of message jitter inherent to GossipSub + now -= relayJitter + + let range = getNanosecondTime(3600) # 1 hour + + let start = now - range + let `end` = now + + return (start, `end`) + proc request( self: WakuSync, conn: Connection ): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} = - let syncSession = SyncSession( - sessType: SyncSessionType.CLIENT, - curState: SyncSessionState.INIT, - frameSize: DefaultFrameSize, - rangeStart: 0, #TODO: Pass start of this hour?? - rangeEnd: times.getTime().toUnix(), - ) - let hashes = (await syncSession.HandleClientSession(conn, self.storage)).valueOr: - return err(error) - return ok(hashes) + let (start, `end`) = calculateRange(self.relayJitter) + + let frameSize = DefaultFrameSize + + let initialized = ?clientInitialize(self.storage, conn, frameSize, start, `end`) + + debug "sync session initialized", + client = self.peerManager.switch.peerInfo.peerId, + server = conn.peerId, + frameSize = frameSize, + timeStart = start, + timeEnd = `end` + + var hashes: seq[WakuMessageHash] + var reconciled = initialized + + while true: + let sent = ?await reconciled.send() + + trace "sync payload sent", + client = self.peerManager.switch.peerInfo.peerId, + server = conn.peerId, + payload = reconciled.payload + + let received = ?await sent.listenBack() + + trace "sync payload received", + client = self.peerManager.switch.peerInfo.peerId, + server = conn.peerId, + payload = received.payload + + reconciled = (?received.clientReconcile(hashes)).valueOr: + let completed = error # Result[Reconciled, Completed] + + ?await completed.clientTerminate() + + debug "sync session ended gracefully", + client = self.peerManager.switch.peerInfo.peerId, server = conn.peerId + + return ok(hashes) proc sync*( self: WakuSync @@ -79,6 +117,10 @@ proc sync*( return err("Cannot establish sync connection") let hashes: seq[WakuMessageHash] = (await self.request(conn)).valueOr: + debug "sync session ended", + server = self.peerManager.switch.peerInfo.peerId, + client = conn.peerId, + error = $error return err("Sync request error: " & error) return ok((hashes, peer)) @@ -91,24 +133,66 @@ proc sync*( return err("Cannot establish sync connection") let hashes: seq[WakuMessageHash] = (await self.request(conn)).valueOr: + debug "sync session ended", + server = self.peerManager.switch.peerInfo.peerId, + client = conn.peerId, + error = $error + return err("Sync request error: " & error) return ok(hashes) +proc handleLoop( + self: WakuSync, conn: Connection +): Future[Result[seq[WakuMessageHash], string]] {.async.} = + let (start, `end`) = calculateRange(self.relayJitter) + + let frameSize = DefaultFrameSize + + let initialized = ?serverInitialize(self.storage, conn, frameSize, start, `end`) + + var sent = initialized + + while true: + let received = ?await sent.listenBack() + + trace "sync payload received", + server = self.peerManager.switch.peerInfo.peerId, + client = conn.peerId, + payload = received.payload + + let reconciled = (?received.serverReconcile()).valueOr: + let completed = error # Result[Reconciled, Completed] + + let hashes = await completed.serverTerminate() + + return ok(hashes) + + sent = ?await reconciled.send() + + trace "sync payload sent", + server = self.peerManager.switch.peerInfo.peerId, + client = conn.peerId, + payload = reconciled.payload + proc initProtocolHandler(self: WakuSync) = proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = - let syncSession = SyncSession( - sessType: SyncSessionType.SERVER, - curState: SyncSessionState.INIT, - frameSize: DefaultFrameSize, - rangeStart: 0, #TODO: Pass start of this hour?? - rangeEnd: 0, - ) - debug "Server sync session requested", remotePeer = $conn.peerId + debug "sync session requested", + server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId - await syncSession.HandleServerSession(conn, self.storage) + let hashes = (await self.handleLoop(conn)).valueOr: + debug "sync session ended", + server = self.peerManager.switch.peerInfo.peerId, + client = conn.peerId, + error = $error - debug "Server sync session ended" + #TODO send error code and desc to client + return + + #TODO handle the hashes that the server need from the client + + debug "sync session ended gracefully", + server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId self.handler = handle self.codec = WakuSyncCodec @@ -118,10 +202,11 @@ proc new*( peerManager: PeerManager, maxFrameSize: int = DefaultFrameSize, syncInterval: timer.Duration = DefaultSyncInterval, + relayJitter: int64 = 20, #Default gossipsub jitter in network. callback: Option[WakuSyncCallback] = none(WakuSyncCallback), ): T = let storage = Storage.new().valueOr: - error "storage creation failed" + debug "storage creation failed" return let sync = WakuSync( @@ -130,11 +215,12 @@ proc new*( maxFrameSize: maxFrameSize, syncInterval: syncInterval, callback: callback, + relayJitter: relayJitter, ) sync.initProtocolHandler() - info "Created WakuSync protocol" + info "WakuSync protocol initialized" return sync @@ -143,7 +229,7 @@ proc periodicSync(self: WakuSync) {.async.} = await sleepAsync(self.syncInterval) let (hashes, peer) = (await self.sync()).valueOr: - error "periodic sync error", error = error + debug "periodic sync error", error = error continue let callback = self.callback.valueOr: @@ -157,9 +243,9 @@ proc start*(self: WakuSync) = # start periodic-sync only if interval is set. self.periodicSyncFut = self.periodicSync() + info "WakuSync protocol started" + proc stopWait*(self: WakuSync) {.async.} = await self.periodicSyncFut.cancelAndWait() -#[ TODO:Fetch from storageManager?? - proc storageSize*(self: WakuSync): int = - return self.storage.size() ]# + info "WakuSync protocol stopped" diff --git a/waku/waku_sync/session.nim b/waku/waku_sync/session.nim index 3fc6b5544..7ebe9e708 100644 --- a/waku/waku_sync/session.nim +++ b/waku/waku_sync/session.nim @@ -3,149 +3,234 @@ when (NimMajor, NimMinor) < (1, 4): else: {.push raises: [].} -import std/options, stew/results, chronicles, chronos, libp2p/stream/connection +import std/options, stew/results, chronos, libp2p/stream/connection -import ../common/nimchronos, ../waku_core, ./raw_bindings, ./storage_manager +import + ../common/nimchronos, + ../common/protobuf, + ../waku_core, + ./raw_bindings, + ./common, + ./codec -logScope: - topics = "waku sync" +#TODO add states for protocol negotiation -type SyncSessionType* = enum - CLIENT = 1 - SERVER = 2 +### Type State ### -type SyncSessionState* = enum - INIT = 1 - NEGENTROPY_SYNC = 2 - COMPLETE = 3 +type ClientSync* = object + haveHashes: seq[WakuMessageHash] -type SyncSession* = ref object - sessType*: SyncSessionType - curState*: SyncSessionState - frameSize*: int - rangeStart*: int64 - rangeEnd*: int64 - negentropy*: NegentropySubRange +type ServerSync* = object -#[ - Session State Machine - 1. negotiate sync params - 2. start negentropy sync - 3. find out local needhashes - 4. If client, share peer's needhashes to peer -]# +type Reconciled*[T] = object + sync: T + negentropy: NegentropySubRange + connection: Connection + frameSize: int + payload*: SyncPayload -proc initializeNegentropy( - self: SyncSession, storage: Storage, syncStartTime: int64, syncEndTime: int64 -): Result[void, string] = - #TODO Create a subrange - let subrange = SubRange.new(storage, uint64(syncStartTime), uint64(syncEndTime)).valueOr: - return err(error) - let negentropy = NegentropySubrange.new(subrange, self.frameSize).valueOr: - return err(error) +type Sent*[T] = object + sync: T + negentropy: NegentropySubRange + connection: Connection + frameSize: int - self.negentropy = negentropy +type Received*[T] = object + sync: T + negentropy: NegentropySubRange + connection: Connection + frameSize: int + payload*: SyncPayload + +type Completed*[T] = object + sync: T + negentropy: NegentropySubRange + connection: Connection + haveHashes: seq[WakuMessageHash] + +### State Transition ### + +proc clientInitialize*( + store: Storage, + conn: Connection, + frameSize = DefaultFrameSize, + start = int64.low, + `end` = int64.high, +): Result[Reconciled[ClientSync], string] = + let subrange = ?SubRange.new(store, uint64(start), uint64(`end`)) + + let negentropy = ?NegentropySubrange.new(subrange, frameSize) + + let negentropyPayload = ?negentropy.initiate() + + let payload = SyncPayload(negentropy: seq[byte](negentropyPayload)) + + let sync = ClientSync() + + return ok( + Reconciled[ClientSync]( + sync: sync, + negentropy: negentropy, + connection: conn, + frameSize: frameSize, + payload: payload, + ) + ) + +proc serverInitialize*( + store: Storage, + conn: Connection, + frameSize = DefaultFrameSize, + start = int64.low, + `end` = int64.high, +): Result[Sent[ServerSync], string] = + let subrange = ?SubRange.new(store, uint64(start), uint64(`end`)) + + let negentropy = ?NegentropySubrange.new(subrange, frameSize) + + let sync = ServerSync() + + return ok( + Sent[ServerSync]( + sync: sync, negentropy: negentropy, connection: conn, frameSize: frameSize + ) + ) + +proc send*[T](self: Reconciled[T]): Future[Result[Sent[T], string]] {.async.} = + let writeRes = catch: + await self.connection.writeLP(self.payload.encode().buffer) + + if writeRes.isErr(): + return err("send connection write error: " & writeRes.error.msg) + + return ok( + Sent[T]( + sync: self.sync, + negentropy: self.negentropy, + connection: self.connection, + frameSize: self.frameSize, + ) + ) + +proc listenBack*[T](self: Sent[T]): Future[Result[Received[T], string]] {.async.} = + let readRes = catch: + await self.connection.readLp(-1) + + let buffer: seq[byte] = + if readRes.isOk(): + readRes.get() + else: + return err("listenBack connection read error: " & readRes.error.msg) + + # can't otherwise the compiler complains + #let payload = SyncPayload.decode(buffer).valueOr: + #return err($error) + + let decodeRes = SyncPayload.decode(buffer) + + let payload = + if decodeRes.isOk(): + decodeRes.get() + else: + let decodeError: ProtobufError = decodeRes.error + let errMsg = $decodeError + return err("listenBack decoding error: " & errMsg) + + return ok( + Received[T]( + sync: self.sync, + negentropy: self.negentropy, + connection: self.connection, + frameSize: self.frameSize, + payload: payload, + ) + ) + +proc clientReconcile*( + self: Received[ClientSync], needHashes: var seq[WakuMessageHash] +): Result[Result[Reconciled[ClientSync], Completed[ClientSync]], string] = + var haves = self.sync.haveHashes + + let responseOpt = + ?self.negentropy.clientReconcile( + NegentropyPayload(self.payload.negentropy), haves, needHashes + ) + + let sync = ClientSync(haveHashes: haves) + + let response = responseOpt.valueOr: + let res = Result[Reconciled[ClientSync], Completed[ClientSync]].err( + Completed[ClientSync]( + sync: sync, negentropy: self.negentropy, connection: self.connection + ) + ) + + return ok(res) + + let payload = SyncPayload(negentropy: seq[byte](response), hashes: haves) + + let res = Result[Reconciled[ClientSync], Completed[ClientSync]].ok( + Reconciled[ClientSync]( + sync: sync, + negentropy: self.negentropy, + connection: self.connection, + frameSize: self.frameSize, + payload: payload, + ) + ) + + return ok(res) + +proc serverReconcile*( + self: Received[ServerSync] +): Result[Result[Reconciled[ServerSync], Completed[ServerSync]], string] = + if self.payload.negentropy.len == 0: + let res = Result[Reconciled[ServerSync], Completed[ServerSync]].err( + Completed[ServerSync]( + sync: self.sync, + negentropy: self.negentropy, + connection: self.connection, + haveHashes: self.payload.hashes, + ) + ) + + return ok(res) + + let response = + ?self.negentropy.serverReconcile(NegentropyPayload(self.payload.negentropy)) + + let payload = SyncPayload(negentropy: seq[byte](response)) + + let res = Result[Reconciled[ServerSync], Completed[ServerSync]].ok( + Reconciled[ServerSync]( + sync: self.sync, + negentropy: self.negentropy, + connection: self.connection, + frameSize: self.frameSize, + payload: payload, + ) + ) + + return ok(res) + +proc clientTerminate*( + self: Completed[ClientSync] +): Future[Result[void, string]] {.async.} = + let payload = SyncPayload(hashes: self.sync.haveHashes) + + let writeRes = catch: + await self.connection.writeLp(payload.encode().buffer) + + if writeRes.isErr(): + return err("clientTerminate connection write error: " & writeRes.error.msg) + + self.negentropy.delete() return ok() -proc HandleClientSession*( - self: SyncSession, conn: Connection, storage: Storage -): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} = - if self - .initializeNegentropy( - storage, - timestampInSeconds(getNowInNanosecondTime()), - # now , TODO: this needs to be tuned maybe consider 20 seconds jitter in network. - int64.high, #timestampInSeconds(getNowInNanosecondTime()) - 60 * 60, # 1 hour - ) - .isErr(): - return - defer: - self.negentropy.delete() +proc serverTerminate*( + self: Completed[ServerSync] +): Future[seq[WakuMessageHash]] {.async.} = + self.negentropy.delete() - let payload = self.negentropy.initiate().valueOr: - return err(error) - debug "Client sync session initialized", remotePeer = conn.peerId - - let writeRes = catch: - await conn.writeLP(seq[byte](payload)) - - trace "request sent to server", payload = toHex(seq[byte](payload)) - - if writeRes.isErr(): - return err(writeRes.error.msg) - - var - haveHashes: seq[WakuMessageHash] # Send it across to Server at the end of sync - needHashes: seq[WakuMessageHash] - - while true: - let readRes = catch: - await conn.readLp(self.frameSize) - - let buffer: seq[byte] = readRes.valueOr: - return err(error.msg) - - trace "Received Sync request from peer", payload = toHex(buffer) - - let request = NegentropyPayload(buffer) - - let responseOpt = self.negentropy.clientReconcile(request, haveHashes, needHashes).valueOr: - return err(error) - - let response = responseOpt.valueOr: - debug "Closing connection, client sync session is done" - await conn.close() - break - - trace "Sending Sync response to peer", payload = toHex(seq[byte](response)) - - let writeRes = catch: - await conn.writeLP(seq[byte](response)) - - if writeRes.isErr(): - return err(writeRes.error.msg) - - return ok(needHashes) - -proc HandleServerSession*( - self: SyncSession, conn: Connection, storage: Storage -) {.async, gcsafe.} = - #TODO: Pass sync time based on data in request?? - #TODO: Return error rather than closing stream abruptly? - if self - .initializeNegentropy( - storage, - timestampInSeconds(getNowInNanosecondTime()), - int64.high, #timestampInSeconds(getNowInNanosecondTime()) - 60 * 60, - ) - .isErr(): - return - defer: - self.negentropy.delete() - - while not conn.isClosed: - let requestRes = catch: - await conn.readLp(self.frameSize) - - let buffer = requestRes.valueOr: - if error.name != $LPStreamRemoteClosedError or error.name != $LPStreamClosedError: - debug "Connection reading error", error = error.msg - - break - - #TODO: Once we receive needHashes or endOfSync, we should close this stream. - let request = NegentropyPayload(buffer) - - let response = self.negentropy.serverReconcile(request).valueOr: - error "Reconciliation error", error = error - break - - let writeRes = catch: - await conn.writeLP(seq[byte](response)) - - if writeRes.isErr(): - error "Connection write error", error = writeRes.error.msg - break - - return + return self.haveHashes