diff --git a/waku/waku_store_sync/reconciliation.nim b/waku/waku_store_sync/reconciliation.nim index 4ebcc1d9c..7db066cb1 100644 --- a/waku/waku_store_sync/reconciliation.nim +++ b/waku/waku_store_sync/reconciliation.nim @@ -240,21 +240,25 @@ proc processRequest( return ok() proc initiate( - self: SyncReconciliation, connection: Connection + self: SyncReconciliation, + connection: Connection, + offset: Duration, + syncRange: Duration, + pubsubTopics: seq[PubsubTopic], + contentTopics: seq[ContentTopic], ): Future[Result[void, string]] {.async.} = let - timeRange = calculateTimeRange(self.relayJitter, self.syncRange) + timeRange = calculateTimeRange(offset, syncRange) lower = SyncID(time: timeRange.a, hash: EmptyFingerprint) upper = SyncID(time: timeRange.b, hash: FullFingerprint) bounds = lower .. upper - fingerprint = self.storage.computeFingerprint( - bounds, self.pubsubTopics.toSeq(), self.contentTopics.toSeq() - ) + fingerprint = self.storage.computeFingerprint(bounds, pubsubTopics, contentTopics) + initPayload = RangesData( cluster: self.cluster, - pubsubTopics: self.pubsubTopics.toSeq(), - contentTopics: self.contentTopics.toSeq(), + pubsubTopics: pubsubTopics, + contentTopics: contentTopics, ranges: @[(bounds, RangeType.Fingerprint)], fingerprints: @[fingerprint], itemSets: @[], @@ -282,7 +286,12 @@ proc initiate( return ok() proc storeSynchronization*( - self: SyncReconciliation, peerInfo: Option[RemotePeerInfo] = none(RemotePeerInfo) + self: SyncReconciliation, + peerInfo: Option[RemotePeerInfo] = none(RemotePeerInfo), + offset: Duration = self.relayJitter, + syncRange: Duration = self.syncRange, + pubsubTopics: HashSet[PubsubTopic] = self.pubsubTopics, + contentTopics: HashSet[ContentTopic] = self.contentTopics, ): Future[Result[void, string]] {.async.} = let peer = peerInfo.valueOr: self.peerManager.selectPeer(WakuReconciliationCodec).valueOr: @@ -296,7 +305,11 @@ proc storeSynchronization*( debug "sync session initialized", local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId - (await self.initiate(conn)).isOkOr: + ( + await self.initiate( + conn, offset, syncRange, pubsubTopics.toSeq(), contentTopics.toSeq() + ) + ).isOkOr: error "sync session failed", local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId, err = error @@ -316,8 +329,6 @@ proc initFillStorage( let endTime = getNowInNanosecondTime() let starTime = endTime - syncRange.nanos - #TODO special query for only timestap and hash ??? - var query = ArchiveQuery( includeData: true, cursor: none(ArchiveCursor), @@ -331,12 +342,11 @@ proc initFillStorage( var storage = SeqStorage.new(DefaultStorageCap) - # we assume IDs are in order - while true: let response = (await wakuArchive.findMessages(query)).valueOr: return err("archive retrival failed: " & $error) + # we assume IDs are already in order for i in 0 ..< response.hashes.len: let hash = response.hashes[i] let msg = response.messages[i]