nwaku/waku/waku_sync/protocol.nim

521 lines
15 KiB
Nim

{.push raises: [].}
import
std/[options, sugar, sequtils],
stew/byteutils,
results,
chronicles,
chronos,
metrics,
libp2p/utility,
libp2p/protocols/protocol,
libp2p/stream/connection,
libp2p/crypto/crypto,
eth/p2p/discoveryv5/enr
import
../common/nimchronos,
../common/enr,
../waku_core,
../waku_archive,
../waku_store/[client, common],
../waku_enr,
../node/peer_manager/peer_manager,
./raw_bindings,
./common,
./session
logScope:
topics = "waku sync"
type WakuSync* = ref object of LPProtocol
storage: NegentropyStorage
maxFrameSize: int # Negentropy param to limit the size of payloads
peerManager: PeerManager
syncInterval: timer.Duration # Time between each syncronisation attempt
syncRange: timer.Duration # Amount of time in the past to sync
relayJitter: Duration # Amount of time since the present to ignore when syncing
transferCallBack: Option[TransferCallback] # Callback for message transfers.
pruneCallBack: Option[PruneCallBack] # Callback with the result of the archive query
pruneStart: Timestamp # Last pruning start timestamp
pruneOffset: timer.Duration # Offset to prune a bit more than necessary.
periodicSyncFut: Future[void]
periodicPruneFut: Future[void]
proc storageSize*(self: WakuSync): int =
self.storage.len
proc messageIngress*(self: WakuSync, pubsubTopic: PubsubTopic, msg: WakuMessage) =
if msg.ephemeral:
return
let msgHash: WakuMessageHash = computeMessageHash(pubsubTopic, msg)
trace "inserting message into waku sync storage ",
msg_hash = msgHash.to0xHex(), timestamp = msg.timestamp
self.storage.insert(msg.timestamp, msgHash).isOkOr:
error "failed to insert message ", msg_hash = msgHash.to0xHex(), error = $error
proc messageIngress*(
self: WakuSync, pubsubTopic: PubsubTopic, msgHash: WakuMessageHash, msg: WakuMessage
) =
if msg.ephemeral:
return
trace "inserting message into waku sync storage ",
msg_hash = msgHash.to0xHex(), timestamp = msg.timestamp
if self.storage.insert(msg.timestamp, msgHash).isErr():
error "failed to insert message ", msg_hash = msgHash.to0xHex()
proc calculateRange(
jitter: Duration = 20.seconds, syncRange: Duration = 1.hours
): (int64, int64) =
## Calculates the start and end time of a sync session
var now = getNowInNanosecondTime()
# Because of message jitter inherent to Relay protocol
now -= jitter.nanos
let syncRange = syncRange.nanos
let syncStart = now - syncRange
let syncEnd = now
return (syncStart, syncEnd)
proc request(
self: WakuSync, conn: Connection
): Future[Result[seq[WakuMessageHash], string]] {.async.} =
let (syncStart, syncEnd) = calculateRange(self.relayJitter)
let initialized =
?clientInitialize(self.storage, conn, self.maxFrameSize, syncStart, syncEnd)
debug "sync session initialized",
client = self.peerManager.switch.peerInfo.peerId,
server = conn.peerId,
frameSize = self.maxFrameSize,
timeStart = syncStart,
timeEnd = syncEnd
var hashes: seq[WakuMessageHash]
var reconciled = initialized
while true:
let sent = ?await reconciled.send()
trace "sync payload sent",
client = self.peerManager.switch.peerInfo.peerId,
server = conn.peerId,
payload = reconciled.payload
let received = ?await sent.listenBack()
trace "sync payload received",
client = self.peerManager.switch.peerInfo.peerId,
server = conn.peerId,
payload = received.payload
reconciled = (?received.clientReconcile(hashes)).valueOr:
let completed = error # Result[Reconciled, Completed]
?await completed.clientTerminate()
debug "sync session ended gracefully",
client = self.peerManager.switch.peerInfo.peerId, server = conn.peerId
return ok(hashes)
continue
proc storeSynchronization*(
self: WakuSync, peerInfo: Option[RemotePeerInfo] = none(RemotePeerInfo)
): Future[Result[(seq[WakuMessageHash], RemotePeerInfo), string]] {.async.} =
let peer = peerInfo.valueOr:
self.peerManager.selectPeer(WakuSyncCodec).valueOr:
return err("No suitable peer found for sync")
let connOpt = await self.peerManager.dialPeer(peer, WakuSyncCodec)
let conn: Connection = connOpt.valueOr:
return err("Cannot establish sync connection")
let hashes: seq[WakuMessageHash] = (await self.request(conn)).valueOr:
error "sync session ended",
server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId, error
return err("Sync request error: " & error)
return ok((hashes, peer))
proc handleSyncSession(
self: WakuSync, conn: Connection
): Future[Result[seq[WakuMessageHash], string]] {.async.} =
let (syncStart, syncEnd) = calculateRange(self.relayJitter)
let initialized =
?serverInitialize(self.storage, conn, self.maxFrameSize, syncStart, syncEnd)
var sent = initialized
while true:
let received = ?await sent.listenBack()
trace "sync payload received",
server = self.peerManager.switch.peerInfo.peerId,
client = conn.peerId,
payload = received.payload
let reconciled = (?received.serverReconcile()).valueOr:
let completed = error # Result[Reconciled, Completed]
let hashes = await completed.serverTerminate()
return ok(hashes)
sent = ?await reconciled.send()
trace "sync payload sent",
server = self.peerManager.switch.peerInfo.peerId,
client = conn.peerId,
payload = reconciled.payload
continue
proc initProtocolHandler(self: WakuSync) =
proc handle(conn: Connection, proto: string) {.async, closure.} =
debug "sync session requested",
server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId
let hashes = (await self.handleSyncSession(conn)).valueOr:
debug "sync session ended",
server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId, error
#TODO send error code and desc to client
return
if hashes.len > 0 and self.transferCallBack.isSome():
let callback = self.transferCallBack.get()
(await callback(hashes, conn.peerId)).isOkOr:
error "transfer callback failed", error = $error
debug "sync session ended gracefully",
server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId
self.handler = handle
self.codec = WakuSyncCodec
proc createPruneCallback(
self: WakuSync, wakuArchive: WakuArchive
): Result[PruneCallBack, string] =
if wakuArchive.isNil():
return err ("waku archive unavailable")
let callback: PruneCallback = proc(
pruneStart: Timestamp, pruneStop: Timestamp, cursor: Option[WakuMessageHash]
): Future[
Result[(seq[(WakuMessageHash, Timestamp)], Option[WakuMessageHash]), string]
] {.async: (raises: []), closure.} =
let archiveCursor =
if cursor.isSome():
some(cursor.get())
else:
none(ArchiveCursor)
let query = ArchiveQuery(
includeData: true,
cursor: archiveCursor,
startTime: some(pruneStart),
endTime: some(pruneStop),
pageSize: 100,
)
let catchable = catch:
await wakuArchive.findMessages(query)
if catchable.isErr():
return err("archive error: " & catchable.error.msg)
let res = catchable.get()
let response = res.valueOr:
return err("archive error: " & $error)
let elements = collect(newSeq):
for (hash, msg) in response.hashes.zip(response.messages):
(hash, msg.timestamp)
let cursor = response.cursor
return ok((elements, cursor))
return ok(callback)
proc createTransferCallback(
self: WakuSync, wakuArchive: WakuArchive, wakuStoreClient: WakuStoreClient
): Result[TransferCallback, string] =
if wakuArchive.isNil():
return err("waku archive unavailable")
if wakuStoreClient.isNil():
return err("waku store client unavailable")
let callback: TransferCallback = proc(
hashes: seq[WakuMessageHash], peerId: PeerId
): Future[Result[void, string]] {.async: (raises: []), closure.} =
var query = StoreQueryRequest()
query.includeData = true
query.messageHashes = hashes
query.paginationLimit = some(uint64(100))
while true:
let catchable = catch:
await wakuStoreClient.query(query, peerId)
if catchable.isErr():
return err("store client error: " & catchable.error.msg)
let res = catchable.get()
let response = res.valueOr:
return err("store client error: " & $error)
query.paginationCursor = response.paginationCursor
for kv in response.messages:
let handleRes = catch:
await wakuArchive.handleMessage(kv.pubsubTopic.get(), kv.message.get())
if handleRes.isErr():
error "message transfer failed", error = handleRes.error.msg
# Messages can be synced next time since they are not added to storage yet.
continue
self.messageIngress(kv.pubsubTopic.get(), kv.messageHash, kv.message.get())
if query.paginationCursor.isNone():
break
return ok()
return ok(callback)
proc initFillStorage(
self: WakuSync, wakuArchive: WakuArchive
): Future[Result[void, string]] {.async.} =
if wakuArchive.isNil():
return err("waku archive unavailable")
let endTime = getNowInNanosecondTime()
let starTime = endTime - self.syncRange.nanos
var query = ArchiveQuery(
includeData: true,
cursor: none(ArchiveCursor),
startTime: some(starTime),
endTime: some(endTime),
pageSize: 100,
)
while true:
let response = (await wakuArchive.findMessages(query)).valueOr:
return err($error)
for i in 0 ..< response.hashes.len:
let hash = response.hashes[i]
let topic = response.topics[i]
let msg = response.messages[i]
self.messageIngress(topic, hash, msg)
if response.cursor.isNone():
break
query.cursor = response.cursor
return ok()
proc new*(
T: type WakuSync,
peerManager: PeerManager,
maxFrameSize: int = DefaultMaxFrameSize,
syncRange: timer.Duration = DefaultSyncRange,
syncInterval: timer.Duration = DefaultSyncInterval,
relayJitter: Duration = DefaultGossipSubJitter,
wakuArchive: WakuArchive,
wakuStoreClient: WakuStoreClient,
pruneCallback: Option[PruneCallback] = none(PruneCallback),
transferCallback: Option[TransferCallback] = none(TransferCallback),
): Future[Result[T, string]] {.async.} =
let storage = NegentropyStorage.new().valueOr:
return err("negentropy storage creation failed")
var sync = WakuSync(
storage: storage,
peerManager: peerManager,
maxFrameSize: maxFrameSize,
syncInterval: syncInterval,
syncRange: syncRange,
relayJitter: relayJitter,
pruneOffset: syncInterval div 100,
)
sync.initProtocolHandler()
sync.pruneCallBack = pruneCallback
if sync.pruneCallBack.isNone():
let res = sync.createPruneCallback(wakuArchive)
if res.isErr():
error "pruning callback creation error", error = res.error
else:
sync.pruneCallBack = some(res.get())
sync.transferCallBack = transferCallback
if sync.transferCallBack.isNone():
let res = sync.createTransferCallback(wakuArchive, wakuStoreClient)
if res.isErr():
error "transfer callback creation error", error = res.error
else:
sync.transferCallBack = some(res.get())
let res = await sync.initFillStorage(wakuArchive)
if res.isErr():
warn "will not sync messages before this point in time", error = res.error
info "WakuSync protocol initialized"
return ok(sync)
proc periodicSync(self: WakuSync, callback: TransferCallback) {.async.} =
debug "periodic sync initialized", interval = $self.syncInterval
while true: # infinite loop
await sleepAsync(self.syncInterval)
debug "periodic sync started"
var
hashes: seq[WakuMessageHash]
peer: RemotePeerInfo
tries = 3
while true:
let res = (await self.storeSynchronization()).valueOr:
# we either try again or log an error and break
if tries > 0:
tries -= 1
await sleepAsync(RetryDelay)
continue
else:
error "sync failed", error = $error
break
hashes = res[0]
peer = res[1]
break
if hashes.len > 0:
tries = 3
while true:
(await callback(hashes, peer.peerId)).isOkOr:
# we either try again or log an error and break
if tries > 0:
tries -= 1
await sleepAsync(RetryDelay)
continue
else:
error "transfer callback failed", error = $error
break
break
debug "periodic sync done", hashSynced = hashes.len
continue
proc periodicPrune(self: WakuSync, callback: PruneCallback) {.async.} =
debug "periodic prune initialized", interval = $self.syncInterval
# Default T minus 60m
self.pruneStart = getNowInNanosecondTime() - self.syncRange.nanos
await sleepAsync(self.syncInterval)
# Default T minus 55m
var pruneStop = getNowInNanosecondTime() - self.syncRange.nanos
while true: # infinite loop
await sleepAsync(self.syncInterval)
debug "periodic prune started",
startTime = self.pruneStart - self.pruneOffset.nanos,
endTime = pruneStop,
storageSize = self.storage.len
var (elements, cursor) =
(newSeq[(WakuMessageHash, Timestamp)](0), none(WakuMessageHash))
var tries = 3
while true:
(elements, cursor) = (
await callback(self.pruneStart - self.pruneOffset.nanos, pruneStop, cursor)
).valueOr:
# we either try again or log an error and break
if tries > 0:
tries -= 1
await sleepAsync(RetryDelay)
continue
else:
error "pruning callback failed", error = $error
break
if elements.len == 0:
# no elements to remove, stop
break
for (hash, timestamp) in elements:
self.storage.erase(timestamp, hash).isOkOr:
error "storage erase failed",
timestamp = timestamp, msg_hash = hash.to0xHex(), error = $error
continue
if cursor.isNone():
# no more pages, stop
break
self.pruneStart = pruneStop
pruneStop = getNowInNanosecondTime() - self.syncRange.nanos
debug "periodic prune done", storageSize = self.storage.len
continue
proc start*(self: WakuSync) =
self.started = true
if self.transferCallBack.isSome() and self.syncInterval > ZeroDuration:
self.periodicSyncFut = self.periodicSync(self.transferCallBack.get())
if self.pruneCallBack.isSome() and self.syncInterval > ZeroDuration:
self.periodicPruneFut = self.periodicPrune(self.pruneCallBack.get())
info "WakuSync protocol started"
proc stopWait*(self: WakuSync) {.async.} =
if self.transferCallBack.isSome() and self.syncInterval > ZeroDuration:
await self.periodicSyncFut.cancelAndWait()
if self.pruneCallBack.isSome() and self.syncInterval > ZeroDuration:
await self.periodicPruneFut.cancelAndWait()
info "WakuSync protocol stopped"