mirror of https://github.com/waku-org/nwaku.git
521 lines
15 KiB
Nim
521 lines
15 KiB
Nim
|
{.push raises: [].}
|
||
|
|
||
|
import
|
||
|
std/[options, sugar, sequtils],
|
||
|
stew/byteutils,
|
||
|
results,
|
||
|
chronicles,
|
||
|
chronos,
|
||
|
metrics,
|
||
|
libp2p/utility,
|
||
|
libp2p/protocols/protocol,
|
||
|
libp2p/stream/connection,
|
||
|
libp2p/crypto/crypto,
|
||
|
eth/p2p/discoveryv5/enr
|
||
|
import
|
||
|
../common/nimchronos,
|
||
|
../common/enr,
|
||
|
../waku_core,
|
||
|
../waku_archive,
|
||
|
../waku_store/[client, common],
|
||
|
../waku_enr,
|
||
|
../node/peer_manager/peer_manager,
|
||
|
./raw_bindings,
|
||
|
./common,
|
||
|
./session
|
||
|
|
||
|
logScope:
|
||
|
topics = "waku sync"
|
||
|
|
||
|
type WakuSync* = ref object of LPProtocol
|
||
|
storage: NegentropyStorage
|
||
|
maxFrameSize: int # Negentropy param to limit the size of payloads
|
||
|
|
||
|
peerManager: PeerManager
|
||
|
|
||
|
syncInterval: timer.Duration # Time between each syncronisation attempt
|
||
|
syncRange: timer.Duration # Amount of time in the past to sync
|
||
|
relayJitter: Duration # Amount of time since the present to ignore when syncing
|
||
|
transferCallBack: Option[TransferCallback] # Callback for message transfers.
|
||
|
|
||
|
pruneCallBack: Option[PruneCallBack] # Callback with the result of the archive query
|
||
|
pruneStart: Timestamp # Last pruning start timestamp
|
||
|
pruneOffset: timer.Duration # Offset to prune a bit more than necessary.
|
||
|
|
||
|
periodicSyncFut: Future[void]
|
||
|
periodicPruneFut: Future[void]
|
||
|
|
||
|
proc storageSize*(self: WakuSync): int =
|
||
|
self.storage.len
|
||
|
|
||
|
proc messageIngress*(self: WakuSync, pubsubTopic: PubsubTopic, msg: WakuMessage) =
|
||
|
if msg.ephemeral:
|
||
|
return
|
||
|
|
||
|
let msgHash: WakuMessageHash = computeMessageHash(pubsubTopic, msg)
|
||
|
|
||
|
trace "inserting message into waku sync storage ",
|
||
|
msg_hash = msgHash.to0xHex(), timestamp = msg.timestamp
|
||
|
|
||
|
self.storage.insert(msg.timestamp, msgHash).isOkOr:
|
||
|
error "failed to insert message ", msg_hash = msgHash.to0xHex(), error = $error
|
||
|
|
||
|
proc messageIngress*(
|
||
|
self: WakuSync, pubsubTopic: PubsubTopic, msgHash: WakuMessageHash, msg: WakuMessage
|
||
|
) =
|
||
|
if msg.ephemeral:
|
||
|
return
|
||
|
|
||
|
trace "inserting message into waku sync storage ",
|
||
|
msg_hash = msgHash.to0xHex(), timestamp = msg.timestamp
|
||
|
|
||
|
if self.storage.insert(msg.timestamp, msgHash).isErr():
|
||
|
error "failed to insert message ", msg_hash = msgHash.to0xHex()
|
||
|
|
||
|
proc calculateRange(
|
||
|
jitter: Duration = 20.seconds, syncRange: Duration = 1.hours
|
||
|
): (int64, int64) =
|
||
|
## Calculates the start and end time of a sync session
|
||
|
|
||
|
var now = getNowInNanosecondTime()
|
||
|
|
||
|
# Because of message jitter inherent to Relay protocol
|
||
|
now -= jitter.nanos
|
||
|
|
||
|
let syncRange = syncRange.nanos
|
||
|
|
||
|
let syncStart = now - syncRange
|
||
|
let syncEnd = now
|
||
|
|
||
|
return (syncStart, syncEnd)
|
||
|
|
||
|
proc request(
|
||
|
self: WakuSync, conn: Connection
|
||
|
): Future[Result[seq[WakuMessageHash], string]] {.async.} =
|
||
|
let (syncStart, syncEnd) = calculateRange(self.relayJitter)
|
||
|
|
||
|
let initialized =
|
||
|
?clientInitialize(self.storage, conn, self.maxFrameSize, syncStart, syncEnd)
|
||
|
|
||
|
debug "sync session initialized",
|
||
|
client = self.peerManager.switch.peerInfo.peerId,
|
||
|
server = conn.peerId,
|
||
|
frameSize = self.maxFrameSize,
|
||
|
timeStart = syncStart,
|
||
|
timeEnd = syncEnd
|
||
|
|
||
|
var hashes: seq[WakuMessageHash]
|
||
|
var reconciled = initialized
|
||
|
|
||
|
while true:
|
||
|
let sent = ?await reconciled.send()
|
||
|
|
||
|
trace "sync payload sent",
|
||
|
client = self.peerManager.switch.peerInfo.peerId,
|
||
|
server = conn.peerId,
|
||
|
payload = reconciled.payload
|
||
|
|
||
|
let received = ?await sent.listenBack()
|
||
|
|
||
|
trace "sync payload received",
|
||
|
client = self.peerManager.switch.peerInfo.peerId,
|
||
|
server = conn.peerId,
|
||
|
payload = received.payload
|
||
|
|
||
|
reconciled = (?received.clientReconcile(hashes)).valueOr:
|
||
|
let completed = error # Result[Reconciled, Completed]
|
||
|
|
||
|
?await completed.clientTerminate()
|
||
|
|
||
|
debug "sync session ended gracefully",
|
||
|
client = self.peerManager.switch.peerInfo.peerId, server = conn.peerId
|
||
|
|
||
|
return ok(hashes)
|
||
|
|
||
|
continue
|
||
|
|
||
|
proc storeSynchronization*(
|
||
|
self: WakuSync, peerInfo: Option[RemotePeerInfo] = none(RemotePeerInfo)
|
||
|
): Future[Result[(seq[WakuMessageHash], RemotePeerInfo), string]] {.async.} =
|
||
|
let peer = peerInfo.valueOr:
|
||
|
self.peerManager.selectPeer(WakuSyncCodec).valueOr:
|
||
|
return err("No suitable peer found for sync")
|
||
|
|
||
|
let connOpt = await self.peerManager.dialPeer(peer, WakuSyncCodec)
|
||
|
|
||
|
let conn: Connection = connOpt.valueOr:
|
||
|
return err("Cannot establish sync connection")
|
||
|
|
||
|
let hashes: seq[WakuMessageHash] = (await self.request(conn)).valueOr:
|
||
|
error "sync session ended",
|
||
|
server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId, error
|
||
|
|
||
|
return err("Sync request error: " & error)
|
||
|
|
||
|
return ok((hashes, peer))
|
||
|
|
||
|
proc handleSyncSession(
|
||
|
self: WakuSync, conn: Connection
|
||
|
): Future[Result[seq[WakuMessageHash], string]] {.async.} =
|
||
|
let (syncStart, syncEnd) = calculateRange(self.relayJitter)
|
||
|
|
||
|
let initialized =
|
||
|
?serverInitialize(self.storage, conn, self.maxFrameSize, syncStart, syncEnd)
|
||
|
|
||
|
var sent = initialized
|
||
|
|
||
|
while true:
|
||
|
let received = ?await sent.listenBack()
|
||
|
|
||
|
trace "sync payload received",
|
||
|
server = self.peerManager.switch.peerInfo.peerId,
|
||
|
client = conn.peerId,
|
||
|
payload = received.payload
|
||
|
|
||
|
let reconciled = (?received.serverReconcile()).valueOr:
|
||
|
let completed = error # Result[Reconciled, Completed]
|
||
|
|
||
|
let hashes = await completed.serverTerminate()
|
||
|
|
||
|
return ok(hashes)
|
||
|
|
||
|
sent = ?await reconciled.send()
|
||
|
|
||
|
trace "sync payload sent",
|
||
|
server = self.peerManager.switch.peerInfo.peerId,
|
||
|
client = conn.peerId,
|
||
|
payload = reconciled.payload
|
||
|
|
||
|
continue
|
||
|
|
||
|
proc initProtocolHandler(self: WakuSync) =
|
||
|
proc handle(conn: Connection, proto: string) {.async, closure.} =
|
||
|
debug "sync session requested",
|
||
|
server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId
|
||
|
|
||
|
let hashes = (await self.handleSyncSession(conn)).valueOr:
|
||
|
debug "sync session ended",
|
||
|
server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId, error
|
||
|
|
||
|
#TODO send error code and desc to client
|
||
|
return
|
||
|
|
||
|
if hashes.len > 0 and self.transferCallBack.isSome():
|
||
|
let callback = self.transferCallBack.get()
|
||
|
|
||
|
(await callback(hashes, conn.peerId)).isOkOr:
|
||
|
error "transfer callback failed", error = $error
|
||
|
|
||
|
debug "sync session ended gracefully",
|
||
|
server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId
|
||
|
|
||
|
self.handler = handle
|
||
|
self.codec = WakuSyncCodec
|
||
|
|
||
|
proc createPruneCallback(
|
||
|
self: WakuSync, wakuArchive: WakuArchive
|
||
|
): Result[PruneCallBack, string] =
|
||
|
if wakuArchive.isNil():
|
||
|
return err ("waku archive unavailable")
|
||
|
|
||
|
let callback: PruneCallback = proc(
|
||
|
pruneStart: Timestamp, pruneStop: Timestamp, cursor: Option[WakuMessageHash]
|
||
|
): Future[
|
||
|
Result[(seq[(WakuMessageHash, Timestamp)], Option[WakuMessageHash]), string]
|
||
|
] {.async: (raises: []), closure.} =
|
||
|
let archiveCursor =
|
||
|
if cursor.isSome():
|
||
|
some(cursor.get())
|
||
|
else:
|
||
|
none(ArchiveCursor)
|
||
|
|
||
|
let query = ArchiveQuery(
|
||
|
includeData: true,
|
||
|
cursor: archiveCursor,
|
||
|
startTime: some(pruneStart),
|
||
|
endTime: some(pruneStop),
|
||
|
pageSize: 100,
|
||
|
)
|
||
|
|
||
|
let catchable = catch:
|
||
|
await wakuArchive.findMessages(query)
|
||
|
|
||
|
if catchable.isErr():
|
||
|
return err("archive error: " & catchable.error.msg)
|
||
|
|
||
|
let res = catchable.get()
|
||
|
let response = res.valueOr:
|
||
|
return err("archive error: " & $error)
|
||
|
|
||
|
let elements = collect(newSeq):
|
||
|
for (hash, msg) in response.hashes.zip(response.messages):
|
||
|
(hash, msg.timestamp)
|
||
|
|
||
|
let cursor = response.cursor
|
||
|
|
||
|
return ok((elements, cursor))
|
||
|
|
||
|
return ok(callback)
|
||
|
|
||
|
proc createTransferCallback(
|
||
|
self: WakuSync, wakuArchive: WakuArchive, wakuStoreClient: WakuStoreClient
|
||
|
): Result[TransferCallback, string] =
|
||
|
if wakuArchive.isNil():
|
||
|
return err("waku archive unavailable")
|
||
|
|
||
|
if wakuStoreClient.isNil():
|
||
|
return err("waku store client unavailable")
|
||
|
|
||
|
let callback: TransferCallback = proc(
|
||
|
hashes: seq[WakuMessageHash], peerId: PeerId
|
||
|
): Future[Result[void, string]] {.async: (raises: []), closure.} =
|
||
|
var query = StoreQueryRequest()
|
||
|
query.includeData = true
|
||
|
query.messageHashes = hashes
|
||
|
query.paginationLimit = some(uint64(100))
|
||
|
|
||
|
while true:
|
||
|
let catchable = catch:
|
||
|
await wakuStoreClient.query(query, peerId)
|
||
|
|
||
|
if catchable.isErr():
|
||
|
return err("store client error: " & catchable.error.msg)
|
||
|
|
||
|
let res = catchable.get()
|
||
|
let response = res.valueOr:
|
||
|
return err("store client error: " & $error)
|
||
|
|
||
|
query.paginationCursor = response.paginationCursor
|
||
|
|
||
|
for kv in response.messages:
|
||
|
let handleRes = catch:
|
||
|
await wakuArchive.handleMessage(kv.pubsubTopic.get(), kv.message.get())
|
||
|
|
||
|
if handleRes.isErr():
|
||
|
error "message transfer failed", error = handleRes.error.msg
|
||
|
# Messages can be synced next time since they are not added to storage yet.
|
||
|
continue
|
||
|
|
||
|
self.messageIngress(kv.pubsubTopic.get(), kv.messageHash, kv.message.get())
|
||
|
|
||
|
if query.paginationCursor.isNone():
|
||
|
break
|
||
|
|
||
|
return ok()
|
||
|
|
||
|
return ok(callback)
|
||
|
|
||
|
proc initFillStorage(
|
||
|
self: WakuSync, wakuArchive: WakuArchive
|
||
|
): Future[Result[void, string]] {.async.} =
|
||
|
if wakuArchive.isNil():
|
||
|
return err("waku archive unavailable")
|
||
|
|
||
|
let endTime = getNowInNanosecondTime()
|
||
|
let starTime = endTime - self.syncRange.nanos
|
||
|
|
||
|
var query = ArchiveQuery(
|
||
|
includeData: true,
|
||
|
cursor: none(ArchiveCursor),
|
||
|
startTime: some(starTime),
|
||
|
endTime: some(endTime),
|
||
|
pageSize: 100,
|
||
|
)
|
||
|
|
||
|
while true:
|
||
|
let response = (await wakuArchive.findMessages(query)).valueOr:
|
||
|
return err($error)
|
||
|
|
||
|
for i in 0 ..< response.hashes.len:
|
||
|
let hash = response.hashes[i]
|
||
|
let topic = response.topics[i]
|
||
|
let msg = response.messages[i]
|
||
|
|
||
|
self.messageIngress(topic, hash, msg)
|
||
|
|
||
|
if response.cursor.isNone():
|
||
|
break
|
||
|
|
||
|
query.cursor = response.cursor
|
||
|
|
||
|
return ok()
|
||
|
|
||
|
proc new*(
|
||
|
T: type WakuSync,
|
||
|
peerManager: PeerManager,
|
||
|
maxFrameSize: int = DefaultMaxFrameSize,
|
||
|
syncRange: timer.Duration = DefaultSyncRange,
|
||
|
syncInterval: timer.Duration = DefaultSyncInterval,
|
||
|
relayJitter: Duration = DefaultGossipSubJitter,
|
||
|
wakuArchive: WakuArchive,
|
||
|
wakuStoreClient: WakuStoreClient,
|
||
|
pruneCallback: Option[PruneCallback] = none(PruneCallback),
|
||
|
transferCallback: Option[TransferCallback] = none(TransferCallback),
|
||
|
): Future[Result[T, string]] {.async.} =
|
||
|
let storage = NegentropyStorage.new().valueOr:
|
||
|
return err("negentropy storage creation failed")
|
||
|
|
||
|
var sync = WakuSync(
|
||
|
storage: storage,
|
||
|
peerManager: peerManager,
|
||
|
maxFrameSize: maxFrameSize,
|
||
|
syncInterval: syncInterval,
|
||
|
syncRange: syncRange,
|
||
|
relayJitter: relayJitter,
|
||
|
pruneOffset: syncInterval div 100,
|
||
|
)
|
||
|
|
||
|
sync.initProtocolHandler()
|
||
|
|
||
|
sync.pruneCallBack = pruneCallback
|
||
|
|
||
|
if sync.pruneCallBack.isNone():
|
||
|
let res = sync.createPruneCallback(wakuArchive)
|
||
|
|
||
|
if res.isErr():
|
||
|
error "pruning callback creation error", error = res.error
|
||
|
else:
|
||
|
sync.pruneCallBack = some(res.get())
|
||
|
|
||
|
sync.transferCallBack = transferCallback
|
||
|
|
||
|
if sync.transferCallBack.isNone():
|
||
|
let res = sync.createTransferCallback(wakuArchive, wakuStoreClient)
|
||
|
|
||
|
if res.isErr():
|
||
|
error "transfer callback creation error", error = res.error
|
||
|
else:
|
||
|
sync.transferCallBack = some(res.get())
|
||
|
|
||
|
let res = await sync.initFillStorage(wakuArchive)
|
||
|
if res.isErr():
|
||
|
warn "will not sync messages before this point in time", error = res.error
|
||
|
|
||
|
info "WakuSync protocol initialized"
|
||
|
|
||
|
return ok(sync)
|
||
|
|
||
|
proc periodicSync(self: WakuSync, callback: TransferCallback) {.async.} =
|
||
|
debug "periodic sync initialized", interval = $self.syncInterval
|
||
|
|
||
|
while true: # infinite loop
|
||
|
await sleepAsync(self.syncInterval)
|
||
|
|
||
|
debug "periodic sync started"
|
||
|
|
||
|
var
|
||
|
hashes: seq[WakuMessageHash]
|
||
|
peer: RemotePeerInfo
|
||
|
tries = 3
|
||
|
|
||
|
while true:
|
||
|
let res = (await self.storeSynchronization()).valueOr:
|
||
|
# we either try again or log an error and break
|
||
|
if tries > 0:
|
||
|
tries -= 1
|
||
|
await sleepAsync(RetryDelay)
|
||
|
continue
|
||
|
else:
|
||
|
error "sync failed", error = $error
|
||
|
break
|
||
|
|
||
|
hashes = res[0]
|
||
|
peer = res[1]
|
||
|
break
|
||
|
|
||
|
if hashes.len > 0:
|
||
|
tries = 3
|
||
|
while true:
|
||
|
(await callback(hashes, peer.peerId)).isOkOr:
|
||
|
# we either try again or log an error and break
|
||
|
if tries > 0:
|
||
|
tries -= 1
|
||
|
await sleepAsync(RetryDelay)
|
||
|
continue
|
||
|
else:
|
||
|
error "transfer callback failed", error = $error
|
||
|
break
|
||
|
|
||
|
break
|
||
|
|
||
|
debug "periodic sync done", hashSynced = hashes.len
|
||
|
|
||
|
continue
|
||
|
|
||
|
proc periodicPrune(self: WakuSync, callback: PruneCallback) {.async.} =
|
||
|
debug "periodic prune initialized", interval = $self.syncInterval
|
||
|
|
||
|
# Default T minus 60m
|
||
|
self.pruneStart = getNowInNanosecondTime() - self.syncRange.nanos
|
||
|
|
||
|
await sleepAsync(self.syncInterval)
|
||
|
|
||
|
# Default T minus 55m
|
||
|
var pruneStop = getNowInNanosecondTime() - self.syncRange.nanos
|
||
|
|
||
|
while true: # infinite loop
|
||
|
await sleepAsync(self.syncInterval)
|
||
|
|
||
|
debug "periodic prune started",
|
||
|
startTime = self.pruneStart - self.pruneOffset.nanos,
|
||
|
endTime = pruneStop,
|
||
|
storageSize = self.storage.len
|
||
|
|
||
|
var (elements, cursor) =
|
||
|
(newSeq[(WakuMessageHash, Timestamp)](0), none(WakuMessageHash))
|
||
|
|
||
|
var tries = 3
|
||
|
while true:
|
||
|
(elements, cursor) = (
|
||
|
await callback(self.pruneStart - self.pruneOffset.nanos, pruneStop, cursor)
|
||
|
).valueOr:
|
||
|
# we either try again or log an error and break
|
||
|
if tries > 0:
|
||
|
tries -= 1
|
||
|
await sleepAsync(RetryDelay)
|
||
|
continue
|
||
|
else:
|
||
|
error "pruning callback failed", error = $error
|
||
|
break
|
||
|
|
||
|
if elements.len == 0:
|
||
|
# no elements to remove, stop
|
||
|
break
|
||
|
|
||
|
for (hash, timestamp) in elements:
|
||
|
self.storage.erase(timestamp, hash).isOkOr:
|
||
|
error "storage erase failed",
|
||
|
timestamp = timestamp, msg_hash = hash.to0xHex(), error = $error
|
||
|
continue
|
||
|
|
||
|
if cursor.isNone():
|
||
|
# no more pages, stop
|
||
|
break
|
||
|
|
||
|
self.pruneStart = pruneStop
|
||
|
pruneStop = getNowInNanosecondTime() - self.syncRange.nanos
|
||
|
|
||
|
debug "periodic prune done", storageSize = self.storage.len
|
||
|
|
||
|
continue
|
||
|
|
||
|
proc start*(self: WakuSync) =
|
||
|
self.started = true
|
||
|
|
||
|
if self.transferCallBack.isSome() and self.syncInterval > ZeroDuration:
|
||
|
self.periodicSyncFut = self.periodicSync(self.transferCallBack.get())
|
||
|
|
||
|
if self.pruneCallBack.isSome() and self.syncInterval > ZeroDuration:
|
||
|
self.periodicPruneFut = self.periodicPrune(self.pruneCallBack.get())
|
||
|
|
||
|
info "WakuSync protocol started"
|
||
|
|
||
|
proc stopWait*(self: WakuSync) {.async.} =
|
||
|
if self.transferCallBack.isSome() and self.syncInterval > ZeroDuration:
|
||
|
await self.periodicSyncFut.cancelAndWait()
|
||
|
|
||
|
if self.pruneCallBack.isSome() and self.syncInterval > ZeroDuration:
|
||
|
await self.periodicPruneFut.cancelAndWait()
|
||
|
|
||
|
info "WakuSync protocol stopped"
|