moving stuff around

This commit is contained in:
SionoiS 2024-06-03 16:56:03 -04:00
parent 5172d95537
commit 9782b84903
No known key found for this signature in database
GPG Key ID: C9458A8CB1852951
4 changed files with 349 additions and 350 deletions

View File

@ -0,0 +1,102 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import std/[options, sugar, sequtils], chronos, libp2p/peerId
import ../waku_core, ../waku_archive, ../waku_store/[client, common], ../waku_sync
type
TransferCallback* = proc(
hashes: seq[WakuMessageHash], peerId: PeerId
): Future[Result[void, string]] {.async: (raises: []), closure.}
PruneCallback* = proc(
startTime: Timestamp, endTime: Timestamp, cursor = none(WakuMessageHash)
): Future[
Result[(seq[(WakuMessageHash, Timestamp)], Option[WakuMessageHash]), string]
] {.async: (raises: []), closure.}
# Procs called by wakunode and then passed to WakuStoreSync.new
proc transferHandler*(
wakuSync: WakuSync, wakuArchive: WakuArchive, wakuStoreClient: WakuStoreClient
): TransferCallback =
return proc(
hashes: seq[WakuMessageHash], peerId: PeerId
): Future[Result[void, string]] {.async: (raises: []), closure.} =
var query = StoreQueryRequest()
query.includeData = true
query.messageHashes = hashes
query.paginationLimit = some(uint64(100))
while true:
let catchable = catch:
await wakuStoreClient.query(query, peerId)
if catchable.isErr():
return err("store client error: " & catchable.error.msg)
let res = catchable.get()
let response = res.valueOr:
return err("store client error: " & $error)
query.paginationCursor = response.paginationCursor
for kv in response.messages:
let handleRes = catch:
await wakuArchive.handleMessage(kv.pubsubTopic.get(), kv.message.get())
if handleRes.isErr():
#error "message transfer failed", error = handleRes.error.msg
# Messages can be synced next time since they are not added to storage yet.
continue
wakuSync.ingessMessage(kv.pubsubTopic.get(), kv.message.get())
if query.paginationCursor.isNone():
break
return ok()
proc pruningHandler*(wakuArchive: WakuArchive): PruneCallback =
return proc(
pruneStart: Timestamp, pruneStop: Timestamp, cursor: Option[WakuMessageHash]
): Future[
Result[(seq[(WakuMessageHash, Timestamp)], Option[WakuMessageHash]), string]
] {.async: (raises: []), closure.} =
let archiveCursor =
if cursor.isSome():
some(ArchiveCursor(hash: cursor.get()))
else:
none(ArchiveCursor)
let query = ArchiveQuery(
includeData: true,
cursor: archiveCursor,
startTime: some(pruneStart),
endTime: some(pruneStop),
pageSize: 100,
)
let catchable = catch:
await wakuArchive.findMessages(query)
if catchable.isErr():
return err("archive error: " & catchable.error.msg)
let res = catchable.get()
let response = res.valueOr:
return err("archive error: " & $error)
let elements = collect(newSeq):
for (hash, msg) in response.hashes.zip(response.messages):
(hash, msg.timestamp)
let cursor =
if response.cursor.isNone():
none(WakuMessageHash)
else:
some(response.cursor.get().hash)
return ok((elements, cursor))

View File

View File

@ -0,0 +1,214 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[options, sugar, sequtils],
stew/[results, byteutils],
chronicles,
chronos,
metrics,
libp2p/protocols/protocol,
libp2p/stream/connection,
libp2p/crypto/crypto,
eth/p2p/discoveryv5/enr
import
../common/nimchronos,
../common/enr,
../waku_core,
../waku_archive,
../waku_store/[client, common],
../waku_enr,
../node/peer_manager/peer_manager,
../waku_sync
logScope:
topics = "waku store sync"
type WakuStoreSync* = ref object of LPProtocol
peerManager: PeerManager
maxFrameSize: int
syncInterval: timer.Duration # Time between each syncronisation attempt
relayJitter: Duration # Time delay until all messages are mostly received network wide
transferCallBack: Option[TransferCallback] # Callback for message transfer.
pruneCallBack: Option[PruneCallBack] # Callback with the result of the archive query
pruneStart: Timestamp # Last pruning start timestamp
pruneOffset: timer.Duration # Offset to prune a bit more than necessary.
periodicSyncFut: Future[void]
periodicPruneFut: Future[void]
proc calculateRange(jitter: Duration, syncRange: Duration = 1.hours): (int64, int64) =
var now = getNowInNanosecondTime()
# Because of message jitter inherent to Relay protocol
now -= jitter.nanos
let range = syncRange.nanos
let start = now - range
let `end` = now
return (start, `end`)
proc fillSyncStorage(
self: WakuStoreSync, wakuSync: WakuSync, wakuArchive: WakuArchive
): Future[Result[void, string]] {.async.} =
if wakuArchive.isNil():
return ok()
let endTime = getNowInNanosecondTime()
let starTime = endTime - self.syncInterval.nanos
var query = ArchiveQuery(
includeData: true,
cursor: none(ArchiveCursor),
startTime: some(starTime),
endTime: some(endTime),
pageSize: 100,
)
while true:
let response = (await wakuArchive.findMessages(query)).valueOr:
return err($error)
for (topic, msg) in response.topics.zip(response.messages):
wakuSync.ingessMessage(topic, msg)
if response.cursor.isNone():
break
query.cursor = response.cursor
return ok()
# Either create a callback in waku sync or move this logic back to it
proc periodicSync(
self: WakuStoreSync, wakuSync: WakuSync, callback: TransferCallback
) {.async.} =
debug "periodic sync initialized", interval = $self.syncInterval
while true:
await sleepAsync(self.syncInterval)
debug "periodic sync started"
let peer: RemotePeerInfo = self.peerManager.selectPeer(WakuSyncCodec).valueOr:
error "No suitable peer found for sync"
return
let connOpt = await self.peerManager.dialPeer(peer, WakuSyncCodec)
let conn: Connection = connOpt.valueOr:
error "Cannot establish sync connection"
return
let (rangeStart, rangeEnd) = calculateRange(self.relayJitter, self.syncInterval)
var
hashes: seq[WakuMessageHash]
tries = 3
while true:
hashes = (await wakuSync.request(conn, self.maxFrameSize, rangeStart, rangeEnd)).valueOr:
error "sync failed", error = $error
if tries > 0:
tries -= 1
await sleepAsync(30.seconds)
continue
else:
break
break
if hashes.len > 0:
tries = 3
while true:
(await callback(hashes, peer.peerId)).isOkOr:
error "transfer callback failed", error = $error
if tries > 0:
tries -= 1
await sleepAsync(30.seconds)
continue
else:
break
break
debug "periodic sync done", hashSynced = hashes.len
# Same here
proc periodicPrune(
self: WakuStoreSync, wakuSync: WakuSync, callback: PruneCallback
) {.async.} =
debug "periodic prune initialized", interval = $self.syncInterval
await sleepAsync(self.syncInterval)
var pruneStop = getNowInNanosecondTime()
while true:
await sleepAsync(self.syncInterval)
debug "periodic prune started",
startTime = self.pruneStart - self.pruneOffset.nanos,
endTime = pruneStop,
storageSize = wakuSync.storageSize()
var (elements, cursor) =
(newSeq[(WakuMessageHash, Timestamp)](0), none(WakuMessageHash))
var tries = 3
while true:
(elements, cursor) = (
await callback(self.pruneStart - self.pruneOffset.nanos, pruneStop, cursor)
).valueOr:
error "pruning callback failed", error = $error
if tries > 0:
tries -= 1
await sleepAsync(30.seconds)
continue
else:
break
if elements.len == 0:
break
for (hash, timestamp) in elements:
wakuSync.eraseMessage(timestamp, hash).isOkOr:
error "storage erase failed",
timestamp = timestamp, msg_hash = hash, error = $error
continue
if cursor.isNone():
break
self.pruneStart = pruneStop
pruneStop = getNowInNanosecondTime()
debug "periodic prune done", storageSize = wakuSync.storageSize()
proc start*(self: WakuStoreSync) =
self.started = true
self.pruneStart = getNowInNanosecondTime()
if self.transferCallBack.isSome() and self.syncInterval > ZeroDuration:
self.periodicSyncFut = self.periodicSync(self.transferCallBack.get())
if self.pruneCallBack.isSome() and self.syncInterval > ZeroDuration:
self.periodicPruneFut = self.periodicPrune(self.pruneCallBack.get())
info "WakuStoreSync protocol started"
proc stopWait*(self: WakuStoreSync) {.async.} =
if self.transferCallBack.isSome() and self.syncInterval > ZeroDuration:
await self.periodicSyncFut.cancelAndWait()
if self.pruneCallBack.isSome() and self.syncInterval > ZeroDuration:
await self.periodicPruneFut.cancelAndWait()
info "WakuStoreSync protocol stopped"

View File

@ -30,20 +30,11 @@ logScope:
type WakuSync* = ref object of LPProtocol type WakuSync* = ref object of LPProtocol
storage: NegentropyStorage storage: NegentropyStorage
peerManager: PeerManager
transferCallBack: TransferCallback
maxFrameSize: int maxFrameSize: int
syncInterval: timer.Duration # Time between each syncronisation attempt
relayJitter: Duration # Time delay until all messages are mostly received network wide
transferCallBack: Option[TransferCallback] # Callback for message transfer.
pruneCallBack: Option[PruneCallBack] # Callback with the result of the archive query
pruneStart: Timestamp # Last pruning start timestamp
pruneOffset: timer.Duration # Offset to prune a bit more than necessary.
periodicSyncFut: Future[void]
periodicPruneFut: Future[void]
proc storageSize*(self: WakuSync): int = proc storageSize*(self: WakuSync): int =
self.storage.len self.storage.len
@ -59,33 +50,29 @@ proc ingessMessage*(self: WakuSync, pubsubTopic: PubsubTopic, msg: WakuMessage)
if self.storage.insert(msg.timestamp, msgHash).isErr(): if self.storage.insert(msg.timestamp, msgHash).isErr():
error "failed to insert message ", msg_hash = msgHash.to0xHex() error "failed to insert message ", msg_hash = msgHash.to0xHex()
proc calculateRange(jitter: Duration, syncRange: Duration = 1.hours): (int64, int64) = proc eraseMessage*(
var now = getNowInNanosecondTime() self: WakuSync, timestamp: Timestamp, msgHash: WakuMessageHash
): Result[void, string] =
self.storage.erase(timestamp, msgHash).isOkOr:
return err($error)
# Because of message jitter inherent to Relay protocol return ok()
now -= jitter.nanos
let range = syncRange.nanos proc request*(
self: WakuSync,
let start = now - range conn: Connection,
let `end` = now maxFrameSize: int,
rangeStart: int64,
return (start, `end`) rangeEnd: int64,
proc request(
self: WakuSync, conn: Connection
): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} = ): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} =
let (start, `end`) = calculateRange(self.relayJitter)
let initialized = let initialized =
?clientInitialize(self.storage, conn, self.maxFrameSize, start, `end`) ?clientInitialize(self.storage, conn, maxFrameSize, rangeStart, rangeEnd)
debug "sync session initialized", debug "sync session initialized",
client = self.peerManager.switch.peerInfo.peerId,
server = conn.peerId, server = conn.peerId,
frameSize = self.maxFrameSize, frameSize = maxFrameSize,
timeStart = start, timeStart = rangeStart,
timeEnd = `end` timeEnd = rangeEnd
var hashes: seq[WakuMessageHash] var hashes: seq[WakuMessageHash]
var reconciled = initialized var reconciled = initialized
@ -93,70 +80,32 @@ proc request(
while true: while true:
let sent = ?await reconciled.send() let sent = ?await reconciled.send()
trace "sync payload sent", trace "sync payload sent", server = conn.peerId, payload = reconciled.payload
client = self.peerManager.switch.peerInfo.peerId,
server = conn.peerId,
payload = reconciled.payload
let received = ?await sent.listenBack() let received = ?await sent.listenBack()
trace "sync payload received", trace "sync payload received", server = conn.peerId, payload = received.payload
client = self.peerManager.switch.peerInfo.peerId,
server = conn.peerId,
payload = received.payload
reconciled = (?received.clientReconcile(hashes)).valueOr: reconciled = (?received.clientReconcile(hashes)).valueOr:
let completed = error # Result[Reconciled, Completed] let completed = error # Result[Reconciled, Completed]
?await completed.clientTerminate() ?await completed.clientTerminate()
debug "sync session ended gracefully", debug "sync session ended gracefully", server = conn.peerId
client = self.peerManager.switch.peerInfo.peerId, server = conn.peerId
return ok(hashes) return ok(hashes)
proc sync*(
self: WakuSync, peerInfo: Option[RemotePeerInfo] = none(RemotePeerInfo)
): Future[Result[(seq[WakuMessageHash], RemotePeerInfo), string]] {.async, gcsafe.} =
let peer =
if peerInfo.isSome():
peerInfo.get()
else:
let peer: RemotePeerInfo = self.peerManager.selectPeer(WakuSyncCodec).valueOr:
return err("No suitable peer found for sync")
peer
let connOpt = await self.peerManager.dialPeer(peer, WakuSyncCodec)
let conn: Connection = connOpt.valueOr:
return err("Cannot establish sync connection")
let hashes: seq[WakuMessageHash] = (await self.request(conn)).valueOr:
debug "sync session ended",
server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId, error
return err("Sync request error: " & error)
return ok((hashes, peer))
proc handleLoop( proc handleLoop(
self: WakuSync, conn: Connection self: WakuSync, conn: Connection
): Future[Result[seq[WakuMessageHash], string]] {.async.} = ): Future[Result[seq[WakuMessageHash], string]] {.async.} =
let (start, `end`) = calculateRange(self.relayJitter) let initialized = ?serverInitialize(self.storage, conn, self.maxFrameSize)
let initialized =
?serverInitialize(self.storage, conn, self.maxFrameSize, start, `end`)
var sent = initialized var sent = initialized
while true: while true:
let received = ?await sent.listenBack() let received = ?await sent.listenBack()
trace "sync payload received", trace "sync payload received", client = conn.peerId, payload = received.payload
server = self.peerManager.switch.peerInfo.peerId,
client = conn.peerId,
payload = received.payload
let reconciled = (?received.serverReconcile()).valueOr: let reconciled = (?received.serverReconcile()).valueOr:
let completed = error # Result[Reconciled, Completed] let completed = error # Result[Reconciled, Completed]
@ -167,303 +116,37 @@ proc handleLoop(
sent = ?await reconciled.send() sent = ?await reconciled.send()
trace "sync payload sent", trace "sync payload sent", client = conn.peerId, payload = reconciled.payload
server = self.peerManager.switch.peerInfo.peerId,
client = conn.peerId,
payload = reconciled.payload
proc initProtocolHandler(self: WakuSync) = proc initProtocolHandler(self: WakuSync) =
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
debug "sync session requested", debug "sync session requested", client = conn.peerId
server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId
let hashes = (await self.handleLoop(conn)).valueOr: let hashes = (await self.handleLoop(conn)).valueOr:
debug "sync session ended", debug "sync session ended", client = conn.peerId, error
server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId, error
#TODO send error code and desc to client #TODO send error code and desc to client
return return
if hashes.len > 0 and self.transferCallBack.isSome(): if hashes.len > 0:
let callback = self.transferCallBack.get() (await self.transferCallBack(hashes, conn.peerId)).isOkOr:
(await callback(hashes, conn.peerId)).isOkOr:
error "transfer callback failed", error = $error error "transfer callback failed", error = $error
debug "sync session ended gracefully", debug "sync session ended gracefully", client = conn.peerId
server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId
self.handler = handle self.handler = handle
self.codec = WakuSyncCodec self.codec = WakuSyncCodec
proc initPruningHandler(self: WakuSync, wakuArchive: WakuArchive) =
if wakuArchive.isNil():
return
self.pruneCallBack = some(
proc(
pruneStart: Timestamp, pruneStop: Timestamp, cursor: Option[WakuMessageHash]
): Future[
Result[(seq[(WakuMessageHash, Timestamp)], Option[WakuMessageHash]), string]
] {.async: (raises: []), closure.} =
let archiveCursor =
if cursor.isSome():
some(ArchiveCursor(hash: cursor.get()))
else:
none(ArchiveCursor)
let query = ArchiveQuery(
includeData: true,
cursor: archiveCursor,
startTime: some(pruneStart),
endTime: some(pruneStop),
pageSize: 100,
)
let catchable = catch:
await wakuArchive.findMessages(query)
if catchable.isErr():
return err("archive error: " & catchable.error.msg)
let res = catchable.get()
let response = res.valueOr:
return err("archive error: " & $error)
let elements = collect(newSeq):
for (hash, msg) in response.hashes.zip(response.messages):
(hash, msg.timestamp)
let cursor =
if response.cursor.isNone():
none(WakuMessageHash)
else:
some(response.cursor.get().hash)
return ok((elements, cursor))
)
proc initTransferHandler(
self: WakuSync, wakuArchive: WakuArchive, wakuStoreClient: WakuStoreClient
) =
if wakuArchive.isNil() or wakuStoreClient.isNil():
return
self.transferCallBack = some(
proc(
hashes: seq[WakuMessageHash], peerId: PeerId
): Future[Result[void, string]] {.async: (raises: []), closure.} =
var query = StoreQueryRequest()
query.includeData = true
query.messageHashes = hashes
query.paginationLimit = some(uint64(100))
while true:
let catchable = catch:
await wakuStoreClient.query(query, peerId)
if catchable.isErr():
return err("store client error: " & catchable.error.msg)
let res = catchable.get()
let response = res.valueOr:
return err("store client error: " & $error)
query.paginationCursor = response.paginationCursor
for kv in response.messages:
let handleRes = catch:
await wakuArchive.handleMessage(kv.pubsubTopic.get(), kv.message.get())
if handleRes.isErr():
error "message transfer failed", error = handleRes.error.msg
# Messages can be synced next time since they are not added to storage yet.
continue
self.ingessMessage(kv.pubsubTopic.get(), kv.message.get())
if query.paginationCursor.isNone():
break
return ok()
)
proc initFillStorage(
self: WakuSync, wakuArchive: WakuArchive
): Future[Result[void, string]] {.async.} =
if wakuArchive.isNil():
return ok()
let endTime = getNowInNanosecondTime()
let starTime = endTime - self.syncInterval.nanos
var query = ArchiveQuery(
includeData: true,
cursor: none(ArchiveCursor),
startTime: some(starTime),
endTime: some(endTime),
pageSize: 100,
)
while true:
let response = (await wakuArchive.findMessages(query)).valueOr:
return err($error)
for (topic, msg) in response.topics.zip(response.messages):
self.ingessMessage(topic, msg)
if response.cursor.isNone():
break
query.cursor = response.cursor
return ok()
proc new*( proc new*(
T: type WakuSync, T: type WakuSync, transferCallBack: TransferCallback, maxFrameSize: int
peerManager: PeerManager, ): Result[T, string] =
maxFrameSize: int = DefaultMaxFrameSize,
syncInterval: timer.Duration = DefaultSyncInterval,
relayJitter: Duration = DefaultGossipSubJitter,
pruning: bool,
wakuArchive: WakuArchive,
wakuStoreClient: WakuStoreClient,
): Future[Result[T, string]] {.async.} =
let storage = NegentropyStorage.new().valueOr: let storage = NegentropyStorage.new().valueOr:
return err("negentropy storage creation failed") return err("negentropy storage creation failed")
let sync = WakuSync( let sync = WakuSync(storage, transferCallBack, maxFrameSize)
storage: storage,
peerManager: peerManager,
maxFrameSize: maxFrameSize,
syncInterval: syncInterval,
relayJitter: relayJitter,
pruneOffset: syncInterval div 2,
)
sync.initProtocolHandler() sync.initProtocolHandler()
if pruning:
sync.initPruningHandler(wakuArchive)
sync.initTransferHandler(wakuArchive, wakuStoreClient)
let res = await sync.initFillStorage(wakuArchive)
if res.isErr():
return err("initial storage filling error: " & res.error)
info "WakuSync protocol initialized" info "WakuSync protocol initialized"
return ok(sync) return ok(sync)
proc periodicSync(self: WakuSync, callback: TransferCallback) {.async.} =
debug "periodic sync initialized", interval = $self.syncInterval
while true:
await sleepAsync(self.syncInterval)
debug "periodic sync started"
var
hashes: seq[WakuMessageHash]
peer: RemotePeerInfo
tries = 3
while true:
let res = (await self.sync()).valueOr:
error "sync failed", error = $error
if tries > 0:
tries -= 1
await sleepAsync(30.seconds)
continue
else:
break
hashes = res[0]
peer = res[1]
break
if hashes.len > 0:
tries = 3
while true:
(await callback(hashes, peer.peerId)).isOkOr:
error "transfer callback failed", error = $error
if tries > 0:
tries -= 1
await sleepAsync(30.seconds)
continue
else:
break
break
debug "periodic sync done", hashSynced = hashes.len
proc periodicPrune(self: WakuSync, callback: PruneCallback) {.async.} =
debug "periodic prune initialized", interval = $self.syncInterval
await sleepAsync(self.syncInterval)
var pruneStop = getNowInNanosecondTime()
while true:
await sleepAsync(self.syncInterval)
debug "periodic prune started",
startTime = self.pruneStart - self.pruneOffset.nanos,
endTime = pruneStop,
storageSize = self.storage.len
var (elements, cursor) =
(newSeq[(WakuMessageHash, Timestamp)](0), none(WakuMessageHash))
var tries = 3
while true:
(elements, cursor) = (
await callback(self.pruneStart - self.pruneOffset.nanos, pruneStop, cursor)
).valueOr:
error "pruning callback failed", error = $error
if tries > 0:
tries -= 1
await sleepAsync(30.seconds)
continue
else:
break
if elements.len == 0:
break
for (hash, timestamp) in elements:
self.storage.erase(timestamp, hash).isOkOr:
error "storage erase failed",
timestamp = timestamp, msg_hash = hash, error = $error
continue
if cursor.isNone():
break
self.pruneStart = pruneStop
pruneStop = getNowInNanosecondTime()
debug "periodic prune done", storageSize = self.storage.len
proc start*(self: WakuSync) =
self.started = true
self.pruneStart = getNowInNanosecondTime()
if self.transferCallBack.isSome() and self.syncInterval > ZeroDuration:
self.periodicSyncFut = self.periodicSync(self.transferCallBack.get())
if self.pruneCallBack.isSome() and self.syncInterval > ZeroDuration:
self.periodicPruneFut = self.periodicPrune(self.pruneCallBack.get())
info "WakuSync protocol started"
proc stopWait*(self: WakuSync) {.async.} =
if self.transferCallBack.isSome() and self.syncInterval > ZeroDuration:
await self.periodicSyncFut.cancelAndWait()
if self.pruneCallBack.isSome() and self.syncInterval > ZeroDuration:
await self.periodicPruneFut.cancelAndWait()
info "WakuSync protocol stopped"