feat: waku store sync 2.0 protocols & tests (#3216)

This commit is contained in:
Simon-Pierre Vivier 2025-01-23 16:13:26 -05:00 committed by GitHub
parent 54a7a68754
commit 6ee494d902
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 1007 additions and 12 deletions

View File

@ -1,6 +1,14 @@
import std/[options, random], chronos, chronicles
import waku/[node/peer_manager, waku_core, waku_store_sync/common], ../testlib/wakucore
import
waku/[
node/peer_manager,
waku_core,
waku_store_sync/common,
waku_store_sync/reconciliation,
waku_store_sync/transfer,
],
../testlib/wakucore
randomize()
@ -12,7 +20,7 @@ proc randomHash*(rng: var Rand): WakuMessageHash =
return hash
#[ proc newTestWakuRecon*(
proc newTestWakuRecon*(
switch: Switch,
idsRx: AsyncQueue[SyncID],
wantsTx: AsyncQueue[(PeerId, Fingerprint)],
@ -25,8 +33,8 @@ proc randomHash*(rng: var Rand): WakuMessageHash =
wakuArchive = nil,
relayJitter = 0.seconds,
idsRx = idsRx,
wantsTx = wantsTx,
needsTx = needsTx,
localWantsTx = wantsTx,
remoteNeedsTx = needsTx,
)
let proto = res.get()
@ -34,9 +42,9 @@ proc randomHash*(rng: var Rand): WakuMessageHash =
proto.start()
switch.mount(proto)
return proto ]#
return proto
#[ proc newTestWakuTransfer*(
proc newTestWakuTransfer*(
switch: Switch,
idsTx: AsyncQueue[SyncID],
wantsRx: AsyncQueue[(PeerId, Fingerprint)],
@ -48,11 +56,11 @@ proc randomHash*(rng: var Rand): WakuMessageHash =
peerManager = peerManager,
wakuArchive = nil,
idsTx = idsTx,
wantsRx = wantsRx,
needsRx = needsRx,
localWantsRx = wantsRx,
remoteNeedsRx = needsRx,
)
proto.start()
switch.mount(proto)
return proto ]#
return proto

View File

@ -0,0 +1,377 @@
{.used.}
import
std/[options, sets, random, math],
testutils/unittests,
chronos,
libp2p/crypto/crypto,
stew/byteutils
import
../../waku/[
node/peer_manager,
waku_core,
waku_core/message,
waku_core/message/digest,
waku_store_sync/common,
waku_store_sync/storage/range_processing,
waku_store_sync/reconciliation,
waku_store_sync/transfer,
waku_archive/archive,
waku_archive/driver,
waku_archive/common,
],
../testlib/[wakucore, testasync],
../waku_archive/archive_utils,
./sync_utils
suite "Waku Sync: reconciliation":
var serverSwitch {.threadvar.}: Switch
var clientSwitch {.threadvar.}: Switch
var
idsChannel {.threadvar.}: AsyncQueue[SyncID]
localWants {.threadvar.}: AsyncQueue[(PeerId, WakuMessageHash)]
remoteNeeds {.threadvar.}: AsyncQueue[(PeerId, WakuMessageHash)]
var server {.threadvar.}: SyncReconciliation
var client {.threadvar.}: SyncReconciliation
var serverPeerInfo {.threadvar.}: RemotePeerInfo
var clientPeerInfo {.threadvar.}: RemotePeerInfo
asyncSetup:
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
idsChannel = newAsyncQueue[SyncID]()
localWants = newAsyncQueue[(PeerId, WakuMessageHash)]()
remoteNeeds = newAsyncQueue[(PeerId, WakuMessageHash)]()
server = await newTestWakuRecon(serverSwitch, idsChannel, localWants, remoteNeeds)
client = await newTestWakuRecon(clientSwitch, idsChannel, localWants, remoteNeeds)
serverPeerInfo = serverSwitch.peerInfo.toRemotePeerInfo()
clientPeerInfo = clientSwitch.peerInfo.toRemotePeerInfo()
asyncTeardown:
await allFutures(server.stop(), client.stop())
await allFutures(serverSwitch.stop(), clientSwitch.stop())
asyncTest "sync 2 nodes both empty":
check:
idsChannel.len == 0
localWants.len == 0
remoteNeeds.len == 0
let res = await client.storeSynchronization(some(serverPeerInfo))
assert res.isOk(), res.error
check:
idsChannel.len == 0
localWants.len == 0
remoteNeeds.len == 0
asyncTest "sync 2 nodes empty client full server":
let
msg1 = fakeWakuMessage(ts = now(), contentTopic = DefaultContentTopic)
msg2 = fakeWakuMessage(ts = now() + 1, contentTopic = DefaultContentTopic)
msg3 = fakeWakuMessage(ts = now() + 2, contentTopic = DefaultContentTopic)
hash1 = computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg1)
hash2 = computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2)
hash3 = computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg3)
server.messageIngress(hash1, msg1)
server.messageIngress(hash2, msg2)
server.messageIngress(hash3, msg3)
check:
remoteNeeds.contains((clientPeerInfo.peerId, hash1)) == false
remoteNeeds.contains((clientPeerInfo.peerId, hash2)) == false
remoteNeeds.contains((clientPeerInfo.peerId, hash3)) == false
let res = await client.storeSynchronization(some(serverPeerInfo))
assert res.isOk(), res.error
check:
remoteNeeds.contains((clientPeerInfo.peerId, hash1)) == true
remoteNeeds.contains((clientPeerInfo.peerId, hash2)) == true
remoteNeeds.contains((clientPeerInfo.peerId, hash3)) == true
asyncTest "sync 2 nodes full client empty server":
let
msg1 = fakeWakuMessage(ts = now(), contentTopic = DefaultContentTopic)
msg2 = fakeWakuMessage(ts = now() + 1, contentTopic = DefaultContentTopic)
msg3 = fakeWakuMessage(ts = now() + 2, contentTopic = DefaultContentTopic)
hash1 = computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg1)
hash2 = computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2)
hash3 = computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg3)
client.messageIngress(hash1, msg1)
client.messageIngress(hash2, msg2)
client.messageIngress(hash3, msg3)
check:
remoteNeeds.contains((serverPeerInfo.peerId, hash1)) == false
remoteNeeds.contains((serverPeerInfo.peerId, hash2)) == false
remoteNeeds.contains((serverPeerInfo.peerId, hash3)) == false
let res = await client.storeSynchronization(some(serverPeerInfo))
assert res.isOk(), res.error
check:
remoteNeeds.contains((serverPeerInfo.peerId, hash1)) == true
remoteNeeds.contains((serverPeerInfo.peerId, hash2)) == true
remoteNeeds.contains((serverPeerInfo.peerId, hash3)) == true
asyncTest "sync 2 nodes different hashes":
let
msg1 = fakeWakuMessage(ts = now(), contentTopic = DefaultContentTopic)
msg2 = fakeWakuMessage(ts = now() + 1, contentTopic = DefaultContentTopic)
msg3 = fakeWakuMessage(ts = now() + 2, contentTopic = DefaultContentTopic)
hash1 = computeMessageHash(DefaultPubsubTopic, msg1)
hash2 = computeMessageHash(DefaultPubsubTopic, msg2)
hash3 = computeMessageHash(DefaultPubsubTopic, msg3)
server.messageIngress(hash1, msg1)
server.messageIngress(hash2, msg2)
client.messageIngress(hash1, msg1)
client.messageIngress(hash3, msg3)
check:
remoteNeeds.contains((serverPeerInfo.peerId, hash3)) == false
remoteNeeds.contains((clientPeerInfo.peerId, hash2)) == false
localWants.contains((clientPeerInfo.peerId, hash3)) == false
localWants.contains((serverPeerInfo.peerId, hash2)) == false
var syncRes = await client.storeSynchronization(some(serverPeerInfo))
assert syncRes.isOk(), $syncRes.error
check:
remoteNeeds.contains((serverPeerInfo.peerId, hash3)) == true
remoteNeeds.contains((clientPeerInfo.peerId, hash2)) == true
localWants.contains((clientPeerInfo.peerId, hash3)) == true
localWants.contains((serverPeerInfo.peerId, hash2)) == true
asyncTest "sync 2 nodes same hashes":
let
msg1 = fakeWakuMessage(ts = now(), contentTopic = DefaultContentTopic)
msg2 = fakeWakuMessage(ts = now() + 1, contentTopic = DefaultContentTopic)
hash1 = computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg1)
hash2 = computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2)
server.messageIngress(hash1, msg1)
client.messageIngress(hash1, msg1)
server.messageIngress(hash2, msg2)
client.messageIngress(hash2, msg2)
check:
localWants.len == 0
remoteNeeds.len == 0
let res = await client.storeSynchronization(some(serverPeerInfo))
assert res.isOk(), $res.error
check:
localWants.len == 0
remoteNeeds.len == 0
asyncTest "sync 2 nodes 100K msgs 1 diff":
let msgCount = 100_000
var diffIndex = rand(msgCount)
var diff: WakuMessageHash
# the sync window is 1 hour, spread msg equally in that time
let timeSlice = calculateTimeRange()
let timeWindow = int64(timeSlice.b) - int64(timeSlice.a)
let (part, _) = divmod(timeWindow, 100_000)
var timestamp = timeSlice.a
for i in 0 ..< msgCount:
let msg = fakeWakuMessage(ts = timestamp, contentTopic = DefaultContentTopic)
let hash = computeMessageHash(DefaultPubsubTopic, msg)
server.messageIngress(hash, msg)
if i != diffIndex:
client.messageIngress(hash, msg)
else:
diff = hash
timestamp += Timestamp(part)
check:
localWants.contains((serverPeerInfo.peerId, WakuMessageHash(diff))) == false
remoteNeeds.contains((clientPeerInfo.peerId, WakuMessageHash(diff))) == false
let res = await client.storeSynchronization(some(serverPeerInfo))
assert res.isOk(), $res.error
check:
localWants.contains((serverPeerInfo.peerId, WakuMessageHash(diff))) == true
remoteNeeds.contains((clientPeerInfo.peerId, WakuMessageHash(diff))) == true
asyncTest "sync 2 nodes 10K msgs 1K diffs":
let msgCount = 10_000
var diffCount = 1_000
var diffMsgHashes: HashSet[WakuMessageHash]
var randIndexes: HashSet[int]
# Diffs
for i in 0 ..< diffCount:
var randInt = rand(0 ..< msgCount)
#make sure we actually have the right number of diffs
while randInt in randIndexes:
randInt = rand(0 ..< msgCount)
randIndexes.incl(randInt)
# sync window is 1 hour, spread msg equally in that time
let timeSlice = calculateTimeRange()
let timeWindow = int64(timeSlice.b) - int64(timeSlice.a)
let (part, _) = divmod(timeWindow, 100_000)
var timestamp = timeSlice.a
for i in 0 ..< msgCount:
let
msg = fakeWakuMessage(ts = timestamp, contentTopic = DefaultContentTopic)
hash = computeMessageHash(DefaultPubsubTopic, msg)
server.messageIngress(hash, msg)
if i in randIndexes:
diffMsgHashes.incl(hash)
else:
client.messageIngress(hash, msg)
timestamp += Timestamp(part)
continue
check:
localWants.len == 0
remoteNeeds.len == 0
let res = await client.storeSynchronization(some(serverPeerInfo))
assert res.isOk(), $res.error
# timimg issue make it hard to match exact numbers
check:
localWants.len > 900
remoteNeeds.len > 900
suite "Waku Sync: transfer":
var
serverSwitch {.threadvar.}: Switch
clientSwitch {.threadvar.}: Switch
var
serverDriver {.threadvar.}: ArchiveDriver
clientDriver {.threadvar.}: ArchiveDriver
serverArchive {.threadvar.}: WakuArchive
clientArchive {.threadvar.}: WakuArchive
var
serverIds {.threadvar.}: AsyncQueue[SyncID]
serverLocalWants {.threadvar.}: AsyncQueue[(PeerId, WakuMessageHash)]
serverRemoteNeeds {.threadvar.}: AsyncQueue[(PeerId, WakuMessageHash)]
clientIds {.threadvar.}: AsyncQueue[SyncID]
clientLocalWants {.threadvar.}: AsyncQueue[(PeerId, WakuMessageHash)]
clientRemoteNeeds {.threadvar.}: AsyncQueue[(PeerId, WakuMessageHash)]
var
server {.threadvar.}: SyncTransfer
client {.threadvar.}: SyncTransfer
var
serverPeerInfo {.threadvar.}: RemotePeerInfo
clientPeerInfo {.threadvar.}: RemotePeerInfo
asyncSetup:
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
serverDriver = newSqliteArchiveDriver()
clientDriver = newSqliteArchiveDriver()
serverArchive = newWakuArchive(serverDriver)
clientArchive = newWakuArchive(clientDriver)
let
serverPeerManager = PeerManager.new(serverSwitch)
clientPeerManager = PeerManager.new(clientSwitch)
serverIds = newAsyncQueue[SyncID]()
serverLocalWants = newAsyncQueue[(PeerId, WakuMessageHash)]()
serverRemoteNeeds = newAsyncQueue[(PeerId, WakuMessageHash)]()
server = SyncTransfer.new(
peerManager = serverPeerManager,
wakuArchive = serverArchive,
idsTx = serverIds,
localWantsRx = serverLocalWants,
remoteNeedsRx = serverRemoteNeeds,
)
clientIds = newAsyncQueue[SyncID]()
clientLocalWants = newAsyncQueue[(PeerId, WakuMessageHash)]()
clientRemoteNeeds = newAsyncQueue[(PeerId, WakuMessageHash)]()
client = SyncTransfer.new(
peerManager = clientPeerManager,
wakuArchive = clientArchive,
idsTx = clientIds,
localWantsRx = clientLocalWants,
remoteNeedsRx = clientRemoteNeeds,
)
server.start()
client.start()
serverSwitch.mount(server)
clientSwitch.mount(client)
serverPeerInfo = serverSwitch.peerInfo.toRemotePeerInfo()
clientPeerInfo = clientSwitch.peerInfo.toRemotePeerInfo()
serverPeerManager.addPeer(clientPeerInfo)
clientPeermanager.addPeer(serverPeerInfo)
asyncTeardown:
await allFutures(server.stopWait(), client.stopWait())
await allFutures(serverSwitch.stop(), clientSwitch.stop())
asyncTest "transfer 1 message":
let msg = fakeWakuMessage()
let hash = computeMessageHash(DefaultPubsubTopic, msg)
let msgs = @[msg]
serverDriver = serverDriver.put(DefaultPubsubTopic, msgs)
# add server info and msg hash to client want channel
let want = (serverPeerInfo.peerId, hash)
await clientLocalWants.put(want)
# add client info and msg hash to server need channel
let need = (clientPeerInfo.peerId, hash)
await serverRemoteNeeds.put(need)
# give time for transfer to happen
await sleepAsync(250.milliseconds)
var query = ArchiveQuery()
query.includeData = true
query.hashes = @[hash]
let res = await clientArchive.findMessages(query)
assert res.isOk(), $res.error
check:
msg == res.get().messages[0]

View File

@ -122,7 +122,7 @@ proc syncMessageIngress*(
msgHash: WakuMessageHash,
pubsubTopic: PubsubTopic,
msg: WakuMessage,
) {.async.} =
): Future[Result[void, string]] {.async.} =
let insertStartTime = getTime().toUnixFloat()
(await self.driver.put(msgHash, pubsubTopic, msg)).isOkOr:
@ -133,7 +133,7 @@ proc syncMessageIngress*(
contentTopic = msg.contentTopic,
timestamp = msg.timestamp,
error = error
return
return err("failed to insert message")
trace "message archived",
msg_hash = msgHash.to0xHex(),
@ -144,6 +144,8 @@ proc syncMessageIngress*(
let insertDuration = getTime().toUnixFloat() - insertStartTime
waku_archive_insert_duration_seconds.observe(insertDuration)
return ok()
proc findMessages*(
self: WakuArchive, query: ArchiveQuery
): Future[ArchiveResult] {.async, gcsafe.} =

View File

@ -243,7 +243,7 @@ proc getItemSet(
proc deltaDecode*(T: type RangesData, buffer: seq[byte]): Result[T, string] =
if buffer.len == 1:
return err("payload too small")
return ok(RangesData())
var
payload = RangesData()

View File

@ -0,0 +1,17 @@
import metrics
const
Reconciliation* = "reconciliation"
Transfer* = "transfer"
Receiving* = "receive"
Sending* = "sent"
declarePublicHistogram reconciliation_roundtrips,
"the nubmer of roundtrips for each reconciliation",
buckets = [0.0, 1.0, 2.0, 3.0, 5.0, 10.0, Inf]
declarePublicSummary total_bytes_exchanged,
"the number of bytes sent and received by the protocols", ["protocol", "direction"]
declarePublicCounter total_transfer_messages_exchanged,
"the number of messages sent and received by the transfer protocol", ["direction"]

View File

@ -0,0 +1,367 @@
{.push raises: [].}
import
std/sequtils,
stew/byteutils,
results,
chronicles,
chronos,
metrics,
libp2p/utility,
libp2p/protocols/protocol,
libp2p/stream/connection,
libp2p/crypto/crypto,
eth/p2p/discoveryv5/enr
import
../common/nimchronos,
../common/protobuf,
../common/paging,
../waku_enr,
../waku_core/codecs,
../waku_core/time,
../waku_core/topics/pubsub_topic,
../waku_core/message/digest,
../waku_core/message/message,
../node/peer_manager/peer_manager,
../waku_archive,
./common,
./codec,
./storage/storage,
./storage/seq_storage,
./storage/range_processing,
./protocols_metrics
logScope:
topics = "waku reconciliation"
const DefaultStorageCap = 50_000
type SyncReconciliation* = ref object of LPProtocol
peerManager: PeerManager
wakuArchive: WakuArchive
storage: SyncStorage
# Receive IDs from transfer protocol for storage
idsRx: AsyncQueue[SyncID]
# Send Hashes to transfer protocol for reception
localWantsTx: AsyncQueue[(PeerId, WakuMessageHash)]
# Send Hashes to transfer protocol for transmission
remoteNeedsTx: AsyncQueue[(PeerId, WakuMessageHash)]
# params
syncInterval: timer.Duration # Time between each synchronization attempt
syncRange: timer.Duration # Amount of time in the past to sync
relayJitter: Duration # Amount of time since the present to ignore when syncing
# futures
periodicSyncFut: Future[void]
periodicPruneFut: Future[void]
idsReceiverFut: Future[void]
proc messageIngress*(
self: SyncReconciliation, pubsubTopic: PubsubTopic, msg: WakuMessage
) =
let msgHash = computeMessageHash(pubsubTopic, msg)
let id = SyncID(time: msg.timestamp, hash: msgHash)
self.storage.insert(id).isOkOr:
error "failed to insert new message", msg_hash = msgHash.toHex(), err = error
proc messageIngress*(
self: SyncReconciliation, msgHash: WakuMessageHash, msg: WakuMessage
) =
let id = SyncID(time: msg.timestamp, hash: msgHash)
self.storage.insert(id).isOkOr:
error "failed to insert new message", msg_hash = msgHash.toHex(), err = error
proc messageIngress*(self: SyncReconciliation, id: SyncID) =
self.storage.insert(id).isOkOr:
error "failed to insert new message", msg_hash = id.hash.toHex(), err = error
proc processRequest(
self: SyncReconciliation, conn: Connection
): Future[Result[void, string]] {.async.} =
var roundTrips = 0
while true:
let readRes = catch:
await conn.readLp(int.high)
let buffer: seq[byte] = readRes.valueOr:
return err("connection read error: " & error.msg)
total_bytes_exchanged.observe(buffer.len, labelValues = [Reconciliation, Receiving])
let recvPayload = RangesData.deltaDecode(buffer).valueOr:
return err("payload decoding error: " & error)
roundTrips.inc()
trace "sync payload received",
local = self.peerManager.switch.peerInfo.peerId,
remote = conn.peerId,
payload = recvPayload
if recvPayload.ranges.len == 0 or recvPayload.ranges.allIt(it[1] == RangeType.Skip):
break
var
hashToRecv: seq[WakuMessageHash]
hashToSend: seq[WakuMessageHash]
let sendPayload = self.storage.processPayload(recvPayload, hashToSend, hashToRecv)
for hash in hashToSend:
await self.remoteNeedsTx.addLast((conn.peerId, hash))
for hash in hashToRecv:
await self.localWantstx.addLast((conn.peerId, hash))
let rawPayload = sendPayload.deltaEncode()
total_bytes_exchanged.observe(
rawPayload.len, labelValues = [Reconciliation, Sending]
)
let writeRes = catch:
await conn.writeLP(rawPayload)
if writeRes.isErr():
return err("connection write error: " & writeRes.error.msg)
trace "sync payload sent",
local = self.peerManager.switch.peerInfo.peerId,
remote = conn.peerId,
payload = sendPayload
if sendPayload.ranges.len == 0 or sendPayload.ranges.allIt(it[1] == RangeType.Skip):
break
continue
reconciliation_roundtrips.observe(roundTrips)
await conn.close()
return ok()
proc initiate(
self: SyncReconciliation, connection: Connection
): Future[Result[void, string]] {.async.} =
let
timeRange = calculateTimeRange(self.relayJitter, self.syncRange)
lower = SyncID(time: timeRange.a, hash: EmptyFingerprint)
upper = SyncID(time: timeRange.b, hash: FullFingerprint)
bounds = lower .. upper
fingerprint = self.storage.computeFingerprint(bounds)
initPayload = RangesData(
ranges: @[(bounds, RangeType.Fingerprint)],
fingerprints: @[fingerprint],
itemSets: @[],
)
let sendPayload = initPayload.deltaEncode()
total_bytes_exchanged.observe(
sendPayload.len, labelValues = [Reconciliation, Sending]
)
let writeRes = catch:
await connection.writeLP(sendPayload)
if writeRes.isErr():
return err("connection write error: " & writeRes.error.msg)
trace "sync payload sent",
local = self.peerManager.switch.peerInfo.peerId,
remote = connection.peerId,
payload = sendPayload
?await self.processRequest(connection)
return ok()
proc storeSynchronization*(
self: SyncReconciliation, peerInfo: Option[RemotePeerInfo] = none(RemotePeerInfo)
): Future[Result[void, string]] {.async.} =
let peer = peerInfo.valueOr:
self.peerManager.selectPeer(WakuReconciliationCodec).valueOr:
return err("no suitable peer found for sync")
let connOpt = await self.peerManager.dialPeer(peer, WakuReconciliationCodec)
let conn: Connection = connOpt.valueOr:
return err("cannot establish sync connection")
debug "sync session initialized",
local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId
(await self.initiate(conn)).isOkOr:
error "sync session failed",
local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId, err = error
return err("sync request error: " & error)
debug "sync session ended gracefully",
local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId
return ok()
proc initFillStorage(
syncRange: timer.Duration, wakuArchive: WakuArchive
): Future[Result[seq[SyncID], string]] {.async.} =
if wakuArchive.isNil():
return err("waku archive unavailable")
let endTime = getNowInNanosecondTime()
let starTime = endTime - syncRange.nanos
#TODO special query for only timestap and hash ???
var query = ArchiveQuery(
includeData: true,
cursor: none(ArchiveCursor),
startTime: some(starTime),
endTime: some(endTime),
pageSize: 100,
direction: PagingDirection.FORWARD,
)
debug "initial storage filling started"
var ids = newSeq[SyncID](DefaultStorageCap)
# we assume IDs are in order
while true:
let response = (await wakuArchive.findMessages(query)).valueOr:
return err("archive retrival failed: " & $error)
for i in 0 ..< response.hashes.len:
let hash = response.hashes[i]
let msg = response.messages[i]
ids.add(SyncID(time: msg.timestamp, hash: hash))
if response.cursor.isNone():
break
query.cursor = response.cursor
debug "initial storage filling done", elements = ids.len
return ok(ids)
proc new*(
T: type SyncReconciliation,
peerManager: PeerManager,
wakuArchive: WakuArchive,
syncRange: timer.Duration = DefaultSyncRange,
syncInterval: timer.Duration = DefaultSyncInterval,
relayJitter: timer.Duration = DefaultGossipSubJitter,
idsRx: AsyncQueue[SyncID],
localWantsTx: AsyncQueue[(PeerId, WakuMessageHash)],
remoteNeedsTx: AsyncQueue[(PeerId, WakuMessageHash)],
): Future[Result[T, string]] {.async.} =
let res = await initFillStorage(syncRange, wakuArchive)
let storage =
if res.isErr():
warn "will not sync messages before this point in time", error = res.error
SeqStorage.new(DefaultStorageCap)
else:
SeqStorage.new(res.get())
var sync = SyncReconciliation(
peerManager: peerManager,
storage: storage,
syncRange: syncRange,
syncInterval: syncInterval,
relayJitter: relayJitter,
idsRx: idsRx,
localWantsTx: localWantsTx,
remoteNeedsTx: remoteNeedsTx,
)
let handler = proc(conn: Connection, proto: string) {.async, closure.} =
(await sync.processRequest(conn)).isOkOr:
error "request processing error", error = error
return
sync.handler = handler
sync.codec = WakuReconciliationCodec
info "Store Reconciliation protocol initialized"
return ok(sync)
proc periodicSync(self: SyncReconciliation) {.async.} =
debug "periodic sync initialized", interval = $self.syncInterval
while true: # infinite loop
await sleepAsync(self.syncInterval)
debug "periodic sync started"
(await self.storeSynchronization()).isOkOr:
error "periodic sync failed", err = error
continue
debug "periodic sync done"
proc periodicPrune(self: SyncReconciliation) {.async.} =
debug "periodic prune initialized", interval = $self.syncInterval
# preventing sync and prune loops of happening at the same time.
await sleepAsync((self.syncInterval div 2))
while true: # infinite loop
await sleepAsync(self.syncInterval)
debug "periodic prune started"
let time = getNowInNanosecondTime() - self.syncRange.nanos
let count = self.storage.prune(time)
debug "periodic prune done", elements_pruned = count
proc idsReceiverLoop(self: SyncReconciliation) {.async.} =
while true: # infinite loop
let id = await self.idsRx.popfirst()
self.messageIngress(id)
proc start*(self: SyncReconciliation) =
if self.started:
return
self.started = true
if self.syncInterval > ZeroDuration:
self.periodicSyncFut = self.periodicSync()
if self.syncInterval > ZeroDuration:
self.periodicPruneFut = self.periodicPrune()
self.idsReceiverFut = self.idsReceiverLoop()
info "Store Sync Reconciliation protocol started"
proc stopWait*(self: SyncReconciliation) {.async.} =
if self.syncInterval > ZeroDuration:
await self.periodicSyncFut.cancelAndWait()
if self.syncInterval > ZeroDuration:
await self.periodicPruneFut.cancelAndWait()
await self.idsReceiverFut.cancelAndWait()
info "Store Sync Reconciliation protocol stopped"

View File

@ -0,0 +1,224 @@
{.push raises: [].}
import
std/sets,
results,
chronicles,
chronos,
metrics,
libp2p/utility,
libp2p/protocols/protocol,
libp2p/stream/connection,
libp2p/crypto/crypto,
eth/p2p/discoveryv5/enr
import
../common/nimchronos,
../common/protobuf,
../waku_enr,
../waku_core/codecs,
../waku_core/time,
../waku_core/topics/pubsub_topic,
../waku_core/message/digest,
../waku_core/message/message,
../waku_core/message/default_values,
../node/peer_manager/peer_manager,
../waku_archive,
../waku_archive/common,
./common,
./codec,
./protocols_metrics
logScope:
topics = "waku transfer"
type SyncTransfer* = ref object of LPProtocol
wakuArchive: WakuArchive
peerManager: PeerManager
# Send IDs to reconciliation protocol for storage
idsTx: AsyncQueue[SyncID]
# Receive Hashes from reconciliation protocol for reception
localWantsRx: AsyncQueue[(PeerId, WakuMessageHash)]
localWantsRxFut: Future[void]
inSessions: Table[PeerId, HashSet[WakuMessageHash]]
# Receive Hashes from reconciliation protocol for transmission
remoteNeedsRx: AsyncQueue[(PeerId, WakuMessageHash)]
remoteNeedsRxFut: Future[void]
outSessions: Table[PeerId, Connection]
proc sendMessage(
conn: Connection, payload: WakuMessageAndTopic
): Future[Result[void, string]] {.async.} =
let rawPayload = payload.encode().buffer
total_bytes_exchanged.observe(rawPayload.len, labelValues = [Transfer, Sending])
let writeRes = catch:
await conn.writeLP(rawPayload)
if writeRes.isErr():
return err("connection write error: " & writeRes.error.msg)
total_transfer_messages_exchanged.inc(labelValues = [Sending])
return ok()
proc openConnection(
self: SyncTransfer, peerId: PeerId
): Future[Result[Connection, string]] {.async.} =
let connOpt = await self.peerManager.dialPeer(peerId, WakuTransferCodec)
let conn: Connection = connOpt.valueOr:
return err("Cannot establish transfer connection")
debug "transfer session initialized",
local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId
return ok(conn)
proc wantsReceiverLoop(self: SyncTransfer) {.async.} =
## Waits for message hashes,
## store the peers and hashes locally as
## "supposed to be received"
while true: # infinite loop
let (peerId, fingerprint) = await self.localWantsRx.popFirst()
self.inSessions.withValue(peerId, value):
value[].incl(fingerprint)
do:
var hashes = initHashSet[WakuMessageHash]()
hashes.incl(fingerprint)
self.inSessions[peerId] = hashes
return
proc needsReceiverLoop(self: SyncTransfer) {.async.} =
## Waits for message hashes,
## open connection to the other peers,
## get the messages from DB and then send them.
while true: # infinite loop
let (peerId, fingerprint) = await self.remoteNeedsRx.popFirst()
if not self.outSessions.hasKey(peerId):
let connection = (await self.openConnection(peerId)).valueOr:
error "failed to establish transfer connection", error = error
continue
self.outSessions[peerid] = connection
let connection = self.outSessions[peerId]
var query = ArchiveQuery()
query.includeData = true
query.hashes = @[fingerprint]
let response = (await self.wakuArchive.findMessages(query)).valueOr:
error "failed to query archive", error = error
continue
let msg =
WakuMessageAndTopic(pubsub: response.topics[0], message: response.messages[0])
(await sendMessage(connection, msg)).isOkOr:
error "failed to send message", error = error
continue
return
proc initProtocolHandler(self: SyncTransfer) =
let handler = proc(conn: Connection, proto: string) {.async, closure.} =
while true:
let readRes = catch:
await conn.readLp(int64(DefaultMaxWakuMessageSize))
let buffer: seq[byte] = readRes.valueOr:
# connection closed normally
break
total_bytes_exchanged.observe(buffer.len, labelValues = [Transfer, Receiving])
let payload = WakuMessageAndTopic.decode(buffer).valueOr:
error "decoding error", error = $error
continue
total_transfer_messages_exchanged.inc(labelValues = [Receiving])
let msg = payload.message
let pubsub = payload.pubsub
let hash = computeMessageHash(pubsub, msg)
self.inSessions.withValue(conn.peerId, value):
if value[].missingOrExcl(hash):
error "unwanted hash received, disconnecting"
self.inSessions.del(conn.peerId)
await conn.close()
break
do:
error "unwanted hash received, disconnecting"
self.inSessions.del(conn.peerId)
await conn.close()
break
#TODO verify msg RLN proof...
(await self.wakuArchive.syncMessageIngress(hash, pubsub, msg)).isOkOr:
continue
let id = SyncID(time: msg.timestamp, hash: hash)
await self.idsTx.addLast(id)
continue
debug "transfer session ended",
local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId
return
self.handler = handler
self.codec = WakuTransferCodec
proc new*(
T: type SyncTransfer,
peerManager: PeerManager,
wakuArchive: WakuArchive,
idsTx: AsyncQueue[SyncID],
localWantsRx: AsyncQueue[(PeerId, WakuMessageHash)],
remoteNeedsRx: AsyncQueue[(PeerId, WakuMessageHash)],
): T =
var transfer = SyncTransfer(
peerManager: peerManager,
wakuArchive: wakuArchive,
idsTx: idsTx,
localWantsRx: localWantsRx,
remoteNeedsRx: remoteNeedsRx,
)
transfer.initProtocolHandler()
info "Store Transfer protocol initialized"
return transfer
proc start*(self: SyncTransfer) =
if self.started:
return
self.started = true
self.localWantsRxFut = self.wantsReceiverLoop()
self.remoteNeedsRxFut = self.needsReceiverLoop()
info "Store Sync Transfer protocol started"
proc stopWait*(self: SyncTransfer) {.async.} =
self.started = false
await self.localWantsRxFut.cancelAndWait()
await self.remoteNeedsRxFut.cancelAndWait()
info "Store Sync Transfer protocol stopped"