ad-hoc sync parameters

This commit is contained in:
SionoiS 2025-02-12 15:54:32 -05:00
parent fb9536da3f
commit 56b7bf7e59
No known key found for this signature in database
GPG Key ID: C9458A8CB1852951

View File

@ -240,21 +240,25 @@ proc processRequest(
return ok() return ok()
proc initiate( proc initiate(
self: SyncReconciliation, connection: Connection self: SyncReconciliation,
connection: Connection,
offset: Duration,
syncRange: Duration,
pubsubTopics: seq[PubsubTopic],
contentTopics: seq[ContentTopic],
): Future[Result[void, string]] {.async.} = ): Future[Result[void, string]] {.async.} =
let let
timeRange = calculateTimeRange(self.relayJitter, self.syncRange) timeRange = calculateTimeRange(offset, syncRange)
lower = SyncID(time: timeRange.a, hash: EmptyFingerprint) lower = SyncID(time: timeRange.a, hash: EmptyFingerprint)
upper = SyncID(time: timeRange.b, hash: FullFingerprint) upper = SyncID(time: timeRange.b, hash: FullFingerprint)
bounds = lower .. upper bounds = lower .. upper
fingerprint = self.storage.computeFingerprint( fingerprint = self.storage.computeFingerprint(bounds, pubsubTopics, contentTopics)
bounds, self.pubsubTopics.toSeq(), self.contentTopics.toSeq()
)
initPayload = RangesData( initPayload = RangesData(
cluster: self.cluster, cluster: self.cluster,
pubsubTopics: self.pubsubTopics.toSeq(), pubsubTopics: pubsubTopics,
contentTopics: self.contentTopics.toSeq(), contentTopics: contentTopics,
ranges: @[(bounds, RangeType.Fingerprint)], ranges: @[(bounds, RangeType.Fingerprint)],
fingerprints: @[fingerprint], fingerprints: @[fingerprint],
itemSets: @[], itemSets: @[],
@ -282,7 +286,12 @@ proc initiate(
return ok() return ok()
proc storeSynchronization*( proc storeSynchronization*(
self: SyncReconciliation, peerInfo: Option[RemotePeerInfo] = none(RemotePeerInfo) self: SyncReconciliation,
peerInfo: Option[RemotePeerInfo] = none(RemotePeerInfo),
offset: Duration = self.relayJitter,
syncRange: Duration = self.syncRange,
pubsubTopics: HashSet[PubsubTopic] = self.pubsubTopics,
contentTopics: HashSet[ContentTopic] = self.contentTopics,
): Future[Result[void, string]] {.async.} = ): Future[Result[void, string]] {.async.} =
let peer = peerInfo.valueOr: let peer = peerInfo.valueOr:
self.peerManager.selectPeer(WakuReconciliationCodec).valueOr: self.peerManager.selectPeer(WakuReconciliationCodec).valueOr:
@ -296,7 +305,11 @@ proc storeSynchronization*(
debug "sync session initialized", debug "sync session initialized",
local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId
(await self.initiate(conn)).isOkOr: (
await self.initiate(
conn, offset, syncRange, pubsubTopics.toSeq(), contentTopics.toSeq()
)
).isOkOr:
error "sync session failed", error "sync session failed",
local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId, err = error local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId, err = error
@ -316,8 +329,6 @@ proc initFillStorage(
let endTime = getNowInNanosecondTime() let endTime = getNowInNanosecondTime()
let starTime = endTime - syncRange.nanos let starTime = endTime - syncRange.nanos
#TODO special query for only timestap and hash ???
var query = ArchiveQuery( var query = ArchiveQuery(
includeData: true, includeData: true,
cursor: none(ArchiveCursor), cursor: none(ArchiveCursor),
@ -331,12 +342,11 @@ proc initFillStorage(
var storage = SeqStorage.new(DefaultStorageCap) var storage = SeqStorage.new(DefaultStorageCap)
# we assume IDs are in order
while true: while true:
let response = (await wakuArchive.findMessages(query)).valueOr: let response = (await wakuArchive.findMessages(query)).valueOr:
return err("archive retrival failed: " & $error) return err("archive retrival failed: " & $error)
# we assume IDs are already in order
for i in 0 ..< response.hashes.len: for i in 0 ..< response.hashes.len:
let hash = response.hashes[i] let hash = response.hashes[i]
let msg = response.messages[i] let msg = response.messages[i]