feat: waku sync full topic support (#3275)

This commit is contained in:
Simon-Pierre Vivier 2025-09-12 08:12:35 -04:00 committed by GitHub
parent a1bbb61f47
commit 9327da5a7b
14 changed files with 980 additions and 310 deletions

View File

@ -22,22 +22,22 @@ proc randomHash*(rng: var Rand): WakuMessageHash =
proc newTestWakuRecon*(
switch: Switch,
idsRx: AsyncQueue[SyncID],
wantsTx: AsyncQueue[PeerId],
needsTx: AsyncQueue[(PeerId, Fingerprint)],
cluster: uint16 = 1,
pubsubTopics: seq[PubsubTopic] = @[],
contentTopics: seq[ContentTopic] = @[],
syncRange: timer.Duration = DefaultSyncRange,
shards: seq[uint16] = @[0, 1, 2, 3, 4, 5, 6, 7],
idsRx: AsyncQueue[(SyncID, PubsubTopic, ContentTopic)],
wantsTx: AsyncQueue[PeerId],
needsTx: AsyncQueue[(PeerId, WakuMessageHash)],
): Future[SyncReconciliation] {.async.} =
let peerManager = PeerManager.new(switch)
let res = await SyncReconciliation.new(
cluster = cluster,
shards = shards,
pubsubTopics = pubsubTopics,
contentTopics = contentTopics,
peerManager = peerManager,
wakuArchive = nil,
relayJitter = 0.seconds,
syncRange = syncRange,
relayJitter = 0.seconds,
idsRx = idsRx,
localWantsTx = wantsTx,
remoteNeedsTx = needsTx,
@ -52,9 +52,9 @@ proc newTestWakuRecon*(
proc newTestWakuTransfer*(
switch: Switch,
idsTx: AsyncQueue[SyncID],
idsTx: AsyncQueue[(SyncID, PubsubTopic, ContentTopic)],
wantsRx: AsyncQueue[PeerId],
needsRx: AsyncQueue[(PeerId, Fingerprint)],
needsRx: AsyncQueue[(PeerId, WakuMessageHash)],
): SyncTransfer =
let peerManager = PeerManager.new(switch)

View File

@ -99,6 +99,8 @@ suite "Waku Store Sync Codec":
let range4 = (bounds4, RangeType.ItemSet)
let payload = RangesData(
pubsubTopics: @[DefaultPubsubTopic],
contentTopics: @[],
ranges: @[range1, range2, range3, range4],
fingerprints: @[],
itemSets: @[itemSet1, itemSet2, itemSet3, itemSet4],
@ -139,6 +141,8 @@ suite "Waku Store Sync Codec":
ranges.add(range)
let payload = RangesData(
pubsubTopics: @[DefaultPubsubTopic],
contentTopics: @[],
ranges: ranges,
fingerprints:
@[randomHash(rng), randomHash(rng), randomHash(rng), randomHash(rng)],
@ -186,8 +190,13 @@ suite "Waku Store Sync Codec":
ranges.add((bound, RangeType.ItemSet))
itemSets.add(itemSet)
let payload =
RangesData(ranges: ranges, fingerprints: fingerprints, itemSets: itemSets)
let payload = RangesData(
pubsubTopics: @[DefaultPubsubTopic],
contentTopics: @[],
ranges: ranges,
fingerprints: fingerprints,
itemSets: itemSets,
)
let encodedPayload = payload.deltaEncode()

View File

@ -39,7 +39,7 @@ suite "Waku Sync: reconciliation":
var clientSwitch {.threadvar.}: Switch
var
idsChannel {.threadvar.}: AsyncQueue[SyncID]
idsChannel {.threadvar.}: AsyncQueue[(SyncID, PubsubTopic, ContentTopic)]
localWants {.threadvar.}: AsyncQueue[PeerId]
remoteNeeds {.threadvar.}: AsyncQueue[(PeerId, WakuMessageHash)]
@ -55,13 +55,10 @@ suite "Waku Sync: reconciliation":
await allFutures(serverSwitch.start(), clientSwitch.start())
idsChannel = newAsyncQueue[SyncID]()
idsChannel = newAsyncQueue[(SyncID, PubsubTopic, ContentTopic)]()
localWants = newAsyncQueue[PeerId]()
remoteNeeds = newAsyncQueue[(PeerId, WakuMessageHash)]()
server = await newTestWakuRecon(serverSwitch, idsChannel, localWants, remoteNeeds)
client = await newTestWakuRecon(clientSwitch, idsChannel, localWants, remoteNeeds)
serverPeerInfo = serverSwitch.peerInfo.toRemotePeerInfo()
clientPeerInfo = clientSwitch.peerInfo.toRemotePeerInfo()
@ -72,6 +69,13 @@ suite "Waku Sync: reconciliation":
await allFutures(serverSwitch.stop(), clientSwitch.stop())
asyncTest "sync 2 nodes both empty":
server = await newTestWakuRecon(
serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
client = await newTestWakuRecon(
clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
check:
idsChannel.len == 0
remoteNeeds.len == 0
@ -84,6 +88,13 @@ suite "Waku Sync: reconciliation":
remoteNeeds.len == 0
asyncTest "sync 2 nodes empty client full server":
server = await newTestWakuRecon(
serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
client = await newTestWakuRecon(
clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
let
msg1 = fakeWakuMessage(ts = now(), contentTopic = DefaultContentTopic)
msg2 = fakeWakuMessage(ts = now() + 1, contentTopic = DefaultContentTopic)
@ -92,11 +103,13 @@ suite "Waku Sync: reconciliation":
hash2 = computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2)
hash3 = computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg3)
server.messageIngress(hash1, msg1)
server.messageIngress(hash2, msg2)
server.messageIngress(hash3, msg3)
server.messageIngress(hash1, DefaultPubsubTopic, msg1)
server.messageIngress(hash2, DefaultPubsubTopic, msg2)
server.messageIngress(hash3, DefaultPubsubTopic, msg3)
check:
remoteNeeds.len == 0
localWants.len == 0
remoteNeeds.contains((clientPeerInfo.peerId, hash1)) == false
remoteNeeds.contains((clientPeerInfo.peerId, hash2)) == false
remoteNeeds.contains((clientPeerInfo.peerId, hash3)) == false
@ -105,11 +118,19 @@ suite "Waku Sync: reconciliation":
assert res.isOk(), res.error
check:
remoteNeeds.len == 3
remoteNeeds.contains((clientPeerInfo.peerId, hash1)) == true
remoteNeeds.contains((clientPeerInfo.peerId, hash2)) == true
remoteNeeds.contains((clientPeerInfo.peerId, hash3)) == true
asyncTest "sync 2 nodes full client empty server":
server = await newTestWakuRecon(
serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
client = await newTestWakuRecon(
clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
let
msg1 = fakeWakuMessage(ts = now(), contentTopic = DefaultContentTopic)
msg2 = fakeWakuMessage(ts = now() + 1, contentTopic = DefaultContentTopic)
@ -118,11 +139,13 @@ suite "Waku Sync: reconciliation":
hash2 = computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2)
hash3 = computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg3)
client.messageIngress(hash1, msg1)
client.messageIngress(hash2, msg2)
client.messageIngress(hash3, msg3)
client.messageIngress(hash1, DefaultPubsubTopic, msg1)
client.messageIngress(hash2, DefaultPubsubTopic, msg2)
client.messageIngress(hash3, DefaultPubsubTopic, msg3)
check:
remoteNeeds.len == 0
localWants.len == 0
remoteNeeds.contains((serverPeerInfo.peerId, hash1)) == false
remoteNeeds.contains((serverPeerInfo.peerId, hash2)) == false
remoteNeeds.contains((serverPeerInfo.peerId, hash3)) == false
@ -131,11 +154,19 @@ suite "Waku Sync: reconciliation":
assert res.isOk(), res.error
check:
remoteNeeds.len == 3
remoteNeeds.contains((serverPeerInfo.peerId, hash1)) == true
remoteNeeds.contains((serverPeerInfo.peerId, hash2)) == true
remoteNeeds.contains((serverPeerInfo.peerId, hash3)) == true
asyncTest "sync 2 nodes different hashes":
server = await newTestWakuRecon(
serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
client = await newTestWakuRecon(
clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
let
msg1 = fakeWakuMessage(ts = now(), contentTopic = DefaultContentTopic)
msg2 = fakeWakuMessage(ts = now() + 1, contentTopic = DefaultContentTopic)
@ -144,12 +175,14 @@ suite "Waku Sync: reconciliation":
hash2 = computeMessageHash(DefaultPubsubTopic, msg2)
hash3 = computeMessageHash(DefaultPubsubTopic, msg3)
server.messageIngress(hash1, msg1)
server.messageIngress(hash2, msg2)
client.messageIngress(hash1, msg1)
client.messageIngress(hash3, msg3)
server.messageIngress(hash1, DefaultPubsubTopic, msg1)
server.messageIngress(hash2, DefaultPubsubTopic, msg2)
client.messageIngress(hash1, DefaultPubsubTopic, msg1)
client.messageIngress(hash3, DefaultPubsubTopic, msg3)
check:
remoteNeeds.len == 0
localWants.len == 0
remoteNeeds.contains((serverPeerInfo.peerId, hash3)) == false
remoteNeeds.contains((clientPeerInfo.peerId, hash2)) == false
@ -157,51 +190,82 @@ suite "Waku Sync: reconciliation":
assert syncRes.isOk(), $syncRes.error
check:
remoteNeeds.len == 2
remoteNeeds.contains((serverPeerInfo.peerId, hash3)) == true
remoteNeeds.contains((clientPeerInfo.peerId, hash2)) == true
asyncTest "sync 2 nodes different shards":
server = await newTestWakuRecon(
serverSwitch,
@["/waku/2/rs/2/1", "/waku/2/rs/2/2", "/waku/2/rs/2/3", "/waku/2/rs/2/4"],
@[DefaultContentTopic],
DefaultSyncRange,
idsChannel,
localWants,
remoteNeeds,
)
client = await newTestWakuRecon(
clientSwitch,
@["/waku/2/rs/2/3", "/waku/2/rs/2/4", "/waku/2/rs/2/5", "/waku/2/rs/2/6"],
@[DefaultContentTopic],
DefaultSyncRange,
idsChannel,
localWants,
remoteNeeds,
)
let
msg1 = fakeWakuMessage(ts = now(), contentTopic = DefaultContentTopic)
msg2 = fakeWakuMessage(ts = now() + 1, contentTopic = DefaultContentTopic)
msg3 = fakeWakuMessage(ts = now() + 2, contentTopic = DefaultContentTopic)
hash1 = computeMessageHash(DefaultPubsubTopic, msg1)
hash2 = computeMessageHash(DefaultPubsubTopic, msg2)
hash3 = computeMessageHash(DefaultPubsubTopic, msg3)
msg4 = fakeWakuMessage(ts = now() + 3, contentTopic = DefaultContentTopic)
msg5 = fakeWakuMessage(ts = now() + 4, contentTopic = DefaultContentTopic)
msg6 = fakeWakuMessage(ts = now() + 5, contentTopic = DefaultContentTopic)
hash1 = computeMessageHash("/waku/2/rs/2/1", msg1)
hash2 = computeMessageHash("/waku/2/rs/2/2", msg2)
hash3 = computeMessageHash("/waku/2/rs/2/3", msg3)
hash4 = computeMessageHash("/waku/2/rs/2/4", msg4)
hash5 = computeMessageHash("/waku/2/rs/2/5", msg5)
hash6 = computeMessageHash("/waku/2/rs/2/6", msg6)
server.messageIngress(hash1, msg1)
server.messageIngress(hash2, msg2)
client.messageIngress(hash1, msg1)
client.messageIngress(hash3, msg3)
server.messageIngress(hash1, "/waku/2/rs/2/1", msg1)
server.messageIngress(hash2, "/waku/2/rs/2/2", msg2)
server.messageIngress(hash3, "/waku/2/rs/2/3", msg3)
client.messageIngress(hash4, "/waku/2/rs/2/4", msg4)
client.messageIngress(hash5, "/waku/2/rs/2/5", msg5)
client.messageIngress(hash6, "/waku/2/rs/2/6", msg6)
check:
remoteNeeds.contains((serverPeerInfo.peerId, hash3)) == false
remoteNeeds.contains((clientPeerInfo.peerId, hash2)) == false
server = await newTestWakuRecon(
serverSwitch, idsChannel, localWants, remoteNeeds, shards = @[0.uint16, 1, 2, 3]
)
client = await newTestWakuRecon(
clientSwitch, idsChannel, localWants, remoteNeeds, shards = @[4.uint16, 5, 6, 7]
)
remoteNeeds.len == 0
var syncRes = await client.storeSynchronization(some(serverPeerInfo))
assert syncRes.isOk(), $syncRes.error
check:
remoteNeeds.len == 0
remoteNeeds.len == 2
remoteNeeds.contains((clientPeerInfo.peerId, hash3)) == true
remoteNeeds.contains((serverPeerInfo.peerId, hash4)) == true
asyncTest "sync 2 nodes same hashes":
server = await newTestWakuRecon(
serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
client = await newTestWakuRecon(
clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
let
msg1 = fakeWakuMessage(ts = now(), contentTopic = DefaultContentTopic)
msg2 = fakeWakuMessage(ts = now() + 1, contentTopic = DefaultContentTopic)
hash1 = computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg1)
hash2 = computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2)
server.messageIngress(hash1, msg1)
client.messageIngress(hash1, msg1)
server.messageIngress(hash2, msg2)
client.messageIngress(hash2, msg2)
server.messageIngress(hash1, DefaultPubsubTopic, msg1)
client.messageIngress(hash1, DefaultPubsubTopic, msg1)
server.messageIngress(hash2, DefaultPubsubTopic, msg2)
client.messageIngress(hash2, DefaultPubsubTopic, msg2)
check:
remoteNeeds.len == 0
@ -213,6 +277,13 @@ suite "Waku Sync: reconciliation":
remoteNeeds.len == 0
asyncTest "sync 2 nodes 100K msgs 1 diff":
server = await newTestWakuRecon(
serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
client = await newTestWakuRecon(
clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
let msgCount = 100_000
var diffIndex = rand(msgCount)
var diff: WakuMessageHash
@ -228,28 +299,37 @@ suite "Waku Sync: reconciliation":
let msg = fakeWakuMessage(ts = timestamp, contentTopic = DefaultContentTopic)
let hash = computeMessageHash(DefaultPubsubTopic, msg)
server.messageIngress(hash, msg)
server.messageIngress(hash, DefaultPubsubTopic, msg)
if i != diffIndex:
client.messageIngress(hash, msg)
client.messageIngress(hash, DefaultPubsubTopic, msg)
else:
diff = hash
timestamp += Timestamp(part)
check:
remoteNeeds.len == 0
remoteNeeds.contains((clientPeerInfo.peerId, WakuMessageHash(diff))) == false
let res = await client.storeSynchronization(some(serverPeerInfo))
assert res.isOk(), $res.error
check:
remoteNeeds.len == 1
remoteNeeds.contains((clientPeerInfo.peerId, WakuMessageHash(diff))) == true
asyncTest "sync 2 nodes 10K msgs 1K diffs":
server = await newTestWakuRecon(
serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
client = await newTestWakuRecon(
clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
const
msgCount = 200_000 # total messages on the server
diffCount = 100 # messages initially missing on the client
msgCount = 10_000 # total messages on the server
diffCount = 1000 # messages initially missing on the client
## ── choose which messages will be absent from the client ─────────────
var missingIdx: HashSet[int]
@ -266,9 +346,9 @@ suite "Waku Sync: reconciliation":
msg = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic)
h = computeMessageHash(DefaultPubsubTopic, msg)
server.messageIngress(h, msg) # every msg is on the server
server.messageIngress(h, DefaultPubsubTopic, msg) # every msg is on the server
if i notin missingIdx:
client.messageIngress(h, msg) # all but 100 are on the client
client.messageIngress(h, DefaultPubsubTopic, msg) # all but 100 are on the client
ts += Timestamp(step)
## ── sanity before we start the round ─────────────────────────────────
@ -278,10 +358,17 @@ suite "Waku Sync: reconciliation":
let res = await client.storeSynchronization(some(serverPeerInfo))
assert res.isOk(), $res.error
## ── verify that ≈100 diffs were queued (allow 10 % slack) ────────────
check remoteNeeds.len >= 90 # ≈ 100 × 0.9
## ── verify that ≈1000 diffs were queued (allow 10 % slack) ────────────
check remoteNeeds.len >= 900 # ≈ 1000 × 0.9
asyncTest "sync 2 nodes 400K msgs 100k diffs":
server = await newTestWakuRecon(
serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
client = await newTestWakuRecon(
clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
const
msgCount = 400_000
diffCount = 100_000
@ -301,9 +388,9 @@ suite "Waku Sync: reconciliation":
msg = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic)
h = computeMessageHash(DefaultPubsubTopic, msg)
server.messageIngress(h, msg)
server.messageIngress(h, DefaultPubsubTopic, msg)
if i notin missingIdx:
client.messageIngress(h, msg)
client.messageIngress(h, DefaultPubsubTopic, msg)
else:
diffMsgHashes.incl h
@ -319,6 +406,13 @@ suite "Waku Sync: reconciliation":
check deliveredHash in diffMsgHashes
asyncTest "sync 2 nodes 100 msgs 20 diff 1-second window":
server = await newTestWakuRecon(
serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
client = await newTestWakuRecon(
clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
const
msgCount = 100
diffCount = 20
@ -339,12 +433,12 @@ suite "Waku Sync: reconciliation":
for i in 0 ..< msgCount:
let msg = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic)
let hash = computeMessageHash(DefaultPubsubTopic, msg)
server.messageIngress(hash, msg)
server.messageIngress(hash, DefaultPubsubTopic, msg)
if i in missingIdx:
diffMsgHashes.incl hash
else:
client.messageIngress(hash, msg)
client.messageIngress(hash, DefaultPubsubTopic, msg)
ts += Timestamp(step)
@ -360,6 +454,13 @@ suite "Waku Sync: reconciliation":
check deliveredHash in diffMsgHashes
asyncTest "sync 2 nodes 500k msgs 300k diff stress window":
server = await newTestWakuRecon(
serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
client = await newTestWakuRecon(
clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
const
msgCount = 500_000
diffCount = 300_000
@ -385,12 +486,12 @@ suite "Waku Sync: reconciliation":
for i in 0 ..< msgCount:
let msg = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic)
let hash = computeMessageHash(DefaultPubsubTopic, msg)
server.messageIngress(hash, msg)
server.messageIngress(hash, DefaultPubsubTopic, msg)
if i in missingSet:
diffMsgHashes.incl hash
else:
client.messageIngress(hash, msg)
client.messageIngress(hash, DefaultPubsubTopic, msg)
ts += Timestamp(step)
@ -406,6 +507,13 @@ suite "Waku Sync: reconciliation":
check deliveredHash in diffMsgHashes
asyncTest "sync 2 nodes, 40 msgs: 18 in-window diff, 20 out-window ignored":
server = await newTestWakuRecon(
serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
client = await newTestWakuRecon(
clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
const
diffInWin = 18
diffOutWin = 20
@ -428,7 +536,7 @@ suite "Waku Sync: reconciliation":
for _ in 0 ..< diffInWin:
let msg = fakeWakuMessage(ts = Timestamp ts, contentTopic = DefaultContentTopic)
let hash = computeMessageHash(DefaultPubsubTopic, msg)
server.messageIngress(hash, msg)
server.messageIngress(hash, DefaultPubsubTopic, msg)
inWinHashes.incl hash
ts += Timestamp(stepIn)
@ -436,7 +544,7 @@ suite "Waku Sync: reconciliation":
for _ in 0 ..< diffOutWin:
let msg = fakeWakuMessage(ts = Timestamp ts, contentTopic = DefaultContentTopic)
let hash = computeMessageHash(DefaultPubsubTopic, msg)
server.messageIngress(hash, msg)
server.messageIngress(hash, DefaultPubsubTopic, msg)
outWinHashes.incl hash
ts += Timestamp(stepOut)
@ -445,11 +553,11 @@ suite "Waku Sync: reconciliation":
let oneSec = timer.seconds(1)
server = await newTestWakuRecon(
serverSwitch, idsChannel, localWants, remoteNeeds, syncRange = oneSec
serverSwitch, @[], @[], oneSec, idsChannel, localWants, remoteNeeds
)
client = await newTestWakuRecon(
clientSwitch, idsChannel, localWants, remoteNeeds, syncRange = oneSec
clientSwitch, @[], @[], oneSec, idsChannel, localWants, remoteNeeds
)
defer:
@ -467,6 +575,13 @@ suite "Waku Sync: reconciliation":
check deliveredHashes notin outWinHashes
asyncTest "hash-fingerprint collision, same timestamp stable sort":
server = await newTestWakuRecon(
serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
client = await newTestWakuRecon(
clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
let ts = Timestamp(getNowInNanosecondTime())
var msg1 = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic)
@ -482,13 +597,17 @@ suite "Waku Sync: reconciliation":
check h1 != h2
server.messageIngress(h1, msg1)
client.messageIngress(h2, msg2)
server.messageIngress(h1, DefaultPubsubTopic, msg1)
client.messageIngress(h2, DefaultPubsubTopic, msg2)
check remoteNeeds.len == 0
server = await newTestWakuRecon(serverSwitch, idsChannel, localWants, remoteNeeds)
server = await newTestWakuRecon(
serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
client = await newTestWakuRecon(clientSwitch, idsChannel, localWants, remoteNeeds)
client = await newTestWakuRecon(
clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
defer:
server.stop()
@ -508,6 +627,13 @@ suite "Waku Sync: reconciliation":
check vec[0].time == ts and vec[1].time == ts
asyncTest "malformed message-ID is ignored during reconciliation":
server = await newTestWakuRecon(
serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
client = await newTestWakuRecon(
clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
let nowTs = Timestamp(getNowInNanosecondTime())
let goodMsg = fakeWakuMessage(ts = nowTs, contentTopic = DefaultContentTopic)
@ -518,13 +644,17 @@ suite "Waku Sync: reconciliation":
badHash[i] = 0'u8
let badMsg = fakeWakuMessage(ts = Timestamp(0), contentTopic = DefaultContentTopic)
server.messageIngress(goodHash, goodMsg)
server.messageIngress(badHash, badMsg)
server.messageIngress(goodHash, DefaultPubsubTopic, goodMsg)
server.messageIngress(badHash, DefaultPubsubTopic, badMsg)
check remoteNeeds.len == 0
server = await newTestWakuRecon(serverSwitch, idsChannel, localWants, remoteNeeds)
client = await newTestWakuRecon(clientSwitch, idsChannel, localWants, remoteNeeds)
server = await newTestWakuRecon(
serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
client = await newTestWakuRecon(
clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
defer:
server.stop()
@ -539,6 +669,13 @@ suite "Waku Sync: reconciliation":
check neededHash != badHash
asyncTest "malformed ID: future-timestamp msg is ignored":
server = await newTestWakuRecon(
serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
client = await newTestWakuRecon(
clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
let nowNs = getNowInNanosecondTime()
let tsNow = Timestamp(nowNs)
@ -552,12 +689,16 @@ suite "Waku Sync: reconciliation":
let badMsg = fakeWakuMessage(ts = badTs, contentTopic = DefaultContentTopic)
let badHash = computeMessageHash(DefaultPubsubTopic, badMsg)
server.messageIngress(goodHash, goodMsg)
server.messageIngress(badHash, badMsg)
server.messageIngress(goodHash, DefaultPubsubTopic, goodMsg)
server.messageIngress(badHash, DefaultPubsubTopic, badMsg)
check remoteNeeds.len == 0
server = await newTestWakuRecon(serverSwitch, idsChannel, localWants, remoteNeeds)
client = await newTestWakuRecon(clientSwitch, idsChannel, localWants, remoteNeeds)
server = await newTestWakuRecon(
serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
client = await newTestWakuRecon(
clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
defer:
server.stop()
@ -572,16 +713,27 @@ suite "Waku Sync: reconciliation":
check neededHash != badHash
asyncTest "duplicate ID is queued only once":
server = await newTestWakuRecon(
serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
client = await newTestWakuRecon(
clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
let ts = Timestamp(getNowInNanosecondTime())
let msg = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic)
let h = computeMessageHash(DefaultPubsubTopic, msg)
server.messageIngress(h, msg)
server.messageIngress(h, msg)
server.messageIngress(h, DefaultPubsubTopic, msg)
server.messageIngress(h, DefaultPubsubTopic, msg)
check remoteNeeds.len == 0
server = await newTestWakuRecon(serverSwitch, idsChannel, localWants, remoteNeeds)
client = await newTestWakuRecon(clientSwitch, idsChannel, localWants, remoteNeeds)
server = await newTestWakuRecon(
serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
client = await newTestWakuRecon(
clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
defer:
server.stop()
@ -595,19 +747,30 @@ suite "Waku Sync: reconciliation":
check neededHash == h
asyncTest "sync terminates immediately when no diffs exist":
server = await newTestWakuRecon(
serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
client = await newTestWakuRecon(
clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds
)
let ts = Timestamp(getNowInNanosecondTime())
let msg = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic)
let hash = computeMessageHash(DefaultPubsubTopic, msg)
server.messageIngress(hash, msg)
client.messageIngress(hash, msg)
server.messageIngress(hash, DefaultPubsubTopic, msg)
client.messageIngress(hash, DefaultPubsubTopic, msg)
let idsQ = newAsyncQueue[SyncID]()
let idsQ = newAsyncQueue[(SyncID, PubsubTopic, ContentTopic)]()
let wantsQ = newAsyncQueue[PeerId]()
let needsQ = newAsyncQueue[(PeerId, Fingerprint)]()
server = await newTestWakuRecon(serverSwitch, idsQ, wantsQ, needsQ)
client = await newTestWakuRecon(clientSwitch, idsQ, wantsQ, needsQ)
server = await newTestWakuRecon(
serverSwitch, @[], @[], DefaultSyncRange, idsQ, wantsQ, needsQ
)
client = await newTestWakuRecon(
clientSwitch, @[], @[], DefaultSyncRange, idsQ, wantsQ, needsQ
)
defer:
server.stop()
@ -630,10 +793,10 @@ suite "Waku Sync: transfer":
clientArchive {.threadvar.}: WakuArchive
var
serverIds {.threadvar.}: AsyncQueue[SyncID]
serverIds {.threadvar.}: AsyncQueue[(SyncID, PubsubTopic, ContentTopic)]
serverLocalWants {.threadvar.}: AsyncQueue[PeerId]
serverRemoteNeeds {.threadvar.}: AsyncQueue[(PeerId, WakuMessageHash)]
clientIds {.threadvar.}: AsyncQueue[SyncID]
clientIds {.threadvar.}: AsyncQueue[(SyncID, PubsubTopic, ContentTopic)]
clientLocalWants {.threadvar.}: AsyncQueue[PeerId]
clientRemoteNeeds {.threadvar.}: AsyncQueue[(PeerId, WakuMessageHash)]
@ -661,7 +824,7 @@ suite "Waku Sync: transfer":
serverPeerManager = PeerManager.new(serverSwitch)
clientPeerManager = PeerManager.new(clientSwitch)
serverIds = newAsyncQueue[SyncID]()
serverIds = newAsyncQueue[(SyncID, PubsubTopic, ContentTopic)]()
serverLocalWants = newAsyncQueue[PeerId]()
serverRemoteNeeds = newAsyncQueue[(PeerId, WakuMessageHash)]()
@ -673,7 +836,7 @@ suite "Waku Sync: transfer":
remoteNeedsRx = serverRemoteNeeds,
)
clientIds = newAsyncQueue[SyncID]()
clientIds = newAsyncQueue[(SyncID, PubsubTopic, ContentTopic)]()
clientLocalWants = newAsyncQueue[PeerId]()
clientRemoteNeeds = newAsyncQueue[(PeerId, WakuMessageHash)]()
@ -733,7 +896,8 @@ suite "Waku Sync: transfer":
check:
response.messages.len > 0
asyncTest "Check the exact missing messages are received":
## Disabled until we impl. DOS protection again
#[ asyncTest "Check the exact missing messages are received":
let timeSlice = calculateTimeRange()
let timeWindow = int64(timeSlice.b) - int64(timeSlice.a)
let (part, _) = divmod(timeWindow, 3)
@ -768,4 +932,4 @@ suite "Waku Sync: transfer":
check received == expected
check clientIds.len == 0
check clientIds.len == 0 ]#

View File

@ -2,6 +2,8 @@ import unittest, nimcrypto, std/sequtils, results
import ../../waku/waku_store_sync/[reconciliation, common]
import ../../waku/waku_store_sync/storage/seq_storage
import ../../waku/waku_core/message/digest
import ../../waku/waku_core/topics/pubsub_topic
import ../../waku/waku_core/topics/content_topic
proc toDigest(s: string): WakuMessageHash =
let d = nimcrypto.keccak256.digest((s & "").toOpenArrayByte(0, (s.len - 1)))
@ -18,8 +20,8 @@ suite "Waku Sync reconciliation":
const N = 2_048
const mismatchI = 70
let local = SeqStorage.new(@[])
let remote = SeqStorage.new(@[])
let local = SeqStorage.new(@[], @[], @[])
let remote = SeqStorage.new(@[], @[], @[])
var baseHashMismatch: WakuMessageHash
var remoteHashMismatch: WakuMessageHash
@ -27,7 +29,10 @@ suite "Waku Sync reconciliation":
for i in 0 ..< N:
let ts = 1000 + i
let hashLocal = toDigest("msg" & $i)
local.insert(SyncID(time: ts, hash: hashLocal)).isOkOr:
local.insert(
SyncID(time: ts, hash: hashLocal), DefaultPubsubTopic, DefaultContentTopic
).isOkOr:
assert false, "failed to insert hash: " & $error
var hashRemote = hashLocal
@ -35,18 +40,23 @@ suite "Waku Sync reconciliation":
baseHashMismatch = hashLocal
remoteHashMismatch = toDigest("msg" & $i & "_x")
hashRemote = remoteHashMismatch
remote.insert(SyncID(time: ts, hash: hashRemote)).isOkOr:
remote.insert(
SyncID(time: ts, hash: hashRemote), DefaultPubsubTopic, DefaultContentTopic
).isOkOr:
assert false, "failed to insert hash: " & $error
var z: WakuMessageHash
let whole = SyncID(time: 1000, hash: z) .. SyncID(time: 1000 + N - 1, hash: z)
check local.computeFingerprint(whole) != remote.computeFingerprint(whole)
check local.computeFingerprint(whole, @[DefaultPubsubTopic], @[DefaultContentTopic]) !=
remote.computeFingerprint(whole, @[DefaultPubsubTopic], @[DefaultContentTopic])
let remoteFp = remote.computeFingerprint(whole)
let remoteFp =
remote.computeFingerprint(whole, @[DefaultPubsubTopic], @[DefaultContentTopic])
let payload = RangesData(
cluster: 0,
shards: @[0],
pubsubTopics: @[DefaultPubsubTopic],
contentTopics: @[DefaultContentTopic],
ranges: @[(whole, RangeType.Fingerprint)],
fingerprints: @[remoteFp],
itemSets: @[],
@ -75,8 +85,10 @@ suite "Waku Sync reconciliation":
const threshold = 4
const partitions = 2
let local = SeqStorage.new(@[], threshold = threshold, partitions = partitions)
let remote = SeqStorage.new(@[], threshold = threshold, partitions = partitions)
let local =
SeqStorage.new(@[], @[], @[], threshold = threshold, partitions = partitions)
let remote =
SeqStorage.new(@[], @[], @[], threshold = threshold, partitions = partitions)
var mismatchHash: WakuMessageHash
for i in 0 ..< 8:
@ -90,8 +102,12 @@ suite "Waku Sync reconciliation":
mismatchHash = toDigest("msg" & $i & "_x")
localHash = mismatchHash
discard local.insert (SyncID(time: t, hash: localHash))
discard remote.insert(SyncID(time: t, hash: remoteHash))
discard local.insert(
SyncID(time: t, hash: localHash), DefaultPubsubTopic, DefaultContentTopic
)
discard remote.insert(
SyncID(time: t, hash: remoteHash), DefaultPubsubTopic, DefaultContentTopic
)
var zeroHash: WakuMessageHash
let wholeRange =
@ -100,10 +116,15 @@ suite "Waku Sync reconciliation":
var toSend, toRecv: seq[WakuMessageHash]
let payload = RangesData(
cluster: 0,
shards: @[0],
pubsubTopics: @[DefaultPubsubTopic],
contentTopics: @[DefaultContentTopic],
ranges: @[(wholeRange, RangeType.Fingerprint)],
fingerprints: @[remote.computeFingerprint(wholeRange)],
fingerprints:
@[
remote.computeFingerprint(
wholeRange, @[DefaultPubsubTopic], @[DefaultContentTopic]
)
],
itemSets: @[],
)
@ -120,15 +141,18 @@ suite "Waku Sync reconciliation":
const N = 2_048
const mismatchI = 70
let local = SeqStorage.new(@[])
let remote = SeqStorage.new(@[])
let local = SeqStorage.new(@[], @[], @[])
let remote = SeqStorage.new(@[], @[], @[])
var baseHashMismatch, remoteHashMismatch: WakuMessageHash
for i in 0 ..< N:
let ts = 1000 + i
let hashLocal = toDigest("msg" & $i)
local.insert(SyncID(time: ts, hash: hashLocal)).isOkOr:
local.insert(
SyncID(time: ts, hash: hashLocal), DefaultPubsubTopic, DefaultContentTopic
).isOkOr:
assert false, "failed to insert hash: " & $error
var hashRemote = hashLocal
@ -136,19 +160,32 @@ suite "Waku Sync reconciliation":
baseHashMismatch = hashLocal
remoteHashMismatch = toDigest("msg" & $i & "_x")
hashRemote = remoteHashMismatch
remote.insert(SyncID(time: ts, hash: hashRemote)).isOkOr:
remote.insert(
SyncID(time: ts, hash: hashRemote), DefaultPubsubTopic, DefaultContentTopic
).isOkOr:
assert false, "failed to insert hash: " & $error
var zero: WakuMessageHash
let sliceWhole =
SyncID(time: 1000, hash: zero) .. SyncID(time: 1000 + N - 1, hash: zero)
check local.computeFingerprint(sliceWhole) != remote.computeFingerprint(sliceWhole)
check local.computeFingerprint(
sliceWhole, @[DefaultPubsubTopic], @[DefaultContentTopic]
) !=
remote.computeFingerprint(
sliceWhole, @[DefaultPubsubTopic], @[DefaultContentTopic]
)
let payload1 = RangesData(
cluster: 0,
shards: @[0],
pubsubTopics: @[DefaultPubsubTopic],
contentTopics: @[DefaultContentTopic],
ranges: @[(sliceWhole, RangeType.Fingerprint)],
fingerprints: @[remote.computeFingerprint(sliceWhole)],
fingerprints:
@[
remote.computeFingerprint(
sliceWhole, @[DefaultPubsubTopic], @[DefaultContentTopic]
)
],
itemSets: @[],
)
@ -167,10 +204,15 @@ suite "Waku Sync reconciliation":
check subSlice.a.time != 0
let payload2 = RangesData(
cluster: 0,
shards: @[0],
pubsubTopics: @[DefaultPubsubTopic],
contentTopics: @[DefaultContentTopic],
ranges: @[(subSlice, RangeType.Fingerprint)],
fingerprints: @[remote.computeFingerprint(subSlice)],
fingerprints:
@[
remote.computeFingerprint(
subSlice, @[DefaultPubsubTopic], @[DefaultContentTopic]
)
],
itemSets: @[],
)
@ -192,8 +234,8 @@ suite "Waku Sync reconciliation":
check toRecv2.len == 0
test "second-round payload remote":
let local = SeqStorage.new(@[])
let remote = SeqStorage.new(@[])
let local = SeqStorage.new(@[], @[], @[])
let remote = SeqStorage.new(@[], @[], @[])
var baseHash: WakuMessageHash
var alteredHash: WakuMessageHash
@ -201,7 +243,10 @@ suite "Waku Sync reconciliation":
for i in 0 ..< 8:
let ts = 1000 + i
let hashLocal = toDigest("msg" & $i)
local.insert(SyncID(time: ts, hash: hashLocal)).isOkOr:
local.insert(
SyncID(time: ts, hash: hashLocal), DefaultPubsubTopic, DefaultContentTopic
).isOkOr:
assert false, "failed to insert hash: " & $error
var hashRemote = hashLocal
@ -210,23 +255,33 @@ suite "Waku Sync reconciliation":
alteredHash = toDigest("msg" & $i & "_x")
hashRemote = alteredHash
remote.insert(SyncID(time: ts, hash: hashRemote)).isOkOr:
remote.insert(
SyncID(time: ts, hash: hashRemote), DefaultPubsubTopic, DefaultContentTopic
).isOkOr:
assert false, "failed to insert hash: " & $error
var zero: WakuMessageHash
let slice = SyncID(time: 1000, hash: zero) .. SyncID(time: 1007, hash: zero)
check local.computeFingerprint(slice) != remote.computeFingerprint(slice)
check local.computeFingerprint(slice, @[DefaultPubsubTopic], @[DefaultContentTopic]) !=
remote.computeFingerprint(slice, @[DefaultPubsubTopic], @[DefaultContentTopic])
var toSend1, toRecv1: seq[WakuMessageHash]
let pay1 = RangesData(
cluster: 0,
shards: @[0],
let rangeData = RangesData(
pubsubTopics: @[DefaultPubsubTopic],
contentTopics: @[DefaultContentTopic],
ranges: @[(slice, RangeType.Fingerprint)],
fingerprints: @[remote.computeFingerprint(slice)],
fingerprints:
@[
remote.computeFingerprint(
slice, @[DefaultPubsubTopic], @[DefaultContentTopic]
)
],
itemSets: @[],
)
let rep1 = local.processPayload(pay1, toSend1, toRecv1)
let rep1 = local.processPayload(rangeData, toSend1, toRecv1)
check rep1.ranges.len == 1
check rep1.ranges[0][1] == RangeType.ItemSet

View File

@ -2,6 +2,8 @@ import unittest, nimcrypto, std/sequtils
import ../../waku/waku_store_sync/[reconciliation, common]
import ../../waku/waku_store_sync/storage/seq_storage
import ../../waku/waku_core/message/digest
import ../../waku/waku_core/topics/pubsub_topic
import ../../waku/waku_core/topics/content_topic
proc toDigest*(s: string): WakuMessageHash =
let d = nimcrypto.keccak256.digest((s & "").toOpenArrayByte(0, s.high))
@ -16,30 +18,38 @@ suite "Waku Sync reconciliation":
const N = 2_000
const idx = 137
let local = SeqStorage.new(@[])
let remote = SeqStorage.new(@[])
let local = SeqStorage.new(@[], @[], @[])
let remote = SeqStorage.new(@[], @[], @[])
var baseH, altH: WakuMessageHash
for i in 0 ..< N:
let ts = 1000 + i
let h = toDigest("msg" & $i)
discard local.insert(SyncID(time: ts, hash: h))
discard
local.insert(SyncID(time: ts, hash: h), DefaultPubsubTopic, DefaultContentTopic)
var hr = h
if i == idx:
baseH = h
altH = toDigest("msg" & $i & "x")
hr = altH
discard remote.insert(SyncID(time: ts, hash: hr))
discard remote.insert(
SyncID(time: ts, hash: hr), DefaultPubsubTopic, DefaultContentTopic
)
var z: WakuMessageHash
let whole = SyncID(time: 1000, hash: z) .. SyncID(time: 1000 + N - 1, hash: z)
var s1, r1: seq[WakuMessageHash]
let p1 = RangesData(
cluster: 0,
shards: @[0],
pubsubTopics: @[DefaultPubsubTopic],
contentTopics: @[DefaultContentTopic],
ranges: @[(whole, RangeType.Fingerprint)],
fingerprints: @[remote.computeFingerprint(whole)],
fingerprints:
@[
remote.computeFingerprint(
whole, @[DefaultPubsubTopic], @[DefaultContentTopic]
)
],
itemSets: @[],
)
let rep1 = local.processPayload(p1, s1, r1)
@ -52,10 +62,11 @@ suite "Waku Sync reconciliation":
var s2, r2: seq[WakuMessageHash]
let p2 = RangesData(
cluster: 0,
shards: @[0],
pubsubTopics: @[DefaultPubsubTopic],
contentTopics: @[DefaultContentTopic],
ranges: @[(sub, RangeType.Fingerprint)],
fingerprints: @[remote.computeFingerprint(sub)],
fingerprints:
@[remote.computeFingerprint(sub, @[DefaultPubsubTopic], @[DefaultContentTopic])],
itemSets: @[],
)
let rep2 = local.processPayload(p2, s2, r2)
@ -67,15 +78,20 @@ suite "Waku Sync reconciliation":
check s3.len == 1 and s3[0] == altH
check r3.len == 1 and r3[0] == baseH
discard local.insert(SyncID(time: mismT, hash: altH))
discard remote.insert(SyncID(time: mismT, hash: baseH))
discard local.insert(
SyncID(time: mismT, hash: altH), DefaultPubsubTopic, DefaultContentTopic
)
discard remote.insert(
SyncID(time: mismT, hash: baseH), DefaultPubsubTopic, DefaultContentTopic
)
var s4, r4: seq[WakuMessageHash]
let p3 = RangesData(
cluster: 0,
shards: @[0],
pubsubTopics: @[DefaultPubsubTopic],
contentTopics: @[DefaultContentTopic],
ranges: @[(sub, RangeType.Fingerprint)],
fingerprints: @[remote.computeFingerprint(sub)],
fingerprints:
@[remote.computeFingerprint(sub, @[DefaultPubsubTopic], @[DefaultContentTopic])],
itemSets: @[],
)
let rep3 = local.processPayload(p3, s4, r4)
@ -86,14 +102,15 @@ suite "Waku Sync reconciliation":
const N = 120
const pivot = 60
let local = SeqStorage.new(@[])
let remote = SeqStorage.new(@[])
let local = SeqStorage.new(@[], @[], @[])
let remote = SeqStorage.new(@[], @[], @[])
var diffHash: WakuMessageHash
for i in 0 ..< N:
let ts = 1000 + i
let h = toDigest("msg" & $i)
discard local.insert(SyncID(time: ts, hash: h))
discard
local.insert(SyncID(time: ts, hash: h), DefaultPubsubTopic, DefaultContentTopic)
var hr: WakuMessageHash
if i >= pivot:
diffHash = toDigest("msg" & $i & "_x")
@ -101,7 +118,9 @@ suite "Waku Sync reconciliation":
else:
hr = h
discard remote.insert(SyncID(time: ts, hash: hr))
discard remote.insert(
SyncID(time: ts, hash: hr), DefaultPubsubTopic, DefaultContentTopic
)
var z: WakuMessageHash
let sliceA = SyncID(time: 1000, hash: z) .. SyncID(time: 1059, hash: z)
@ -109,11 +128,18 @@ suite "Waku Sync reconciliation":
var s, r: seq[WakuMessageHash]
let payload = RangesData(
cluster: 0,
shards: @[0],
pubsubTopics: @[DefaultPubsubTopic],
contentTopics: @[DefaultContentTopic],
ranges: @[(sliceA, RangeType.Fingerprint), (sliceB, RangeType.Fingerprint)],
fingerprints:
@[remote.computeFingerprint(sliceA), remote.computeFingerprint(sliceB)],
@[
remote.computeFingerprint(
sliceA, @[DefaultPubsubTopic], @[DefaultContentTopic]
),
remote.computeFingerprint(
sliceB, @[DefaultPubsubTopic], @[DefaultContentTopic]
),
],
itemSets: @[],
)
let reply = local.processPayload(payload, s, r)
@ -135,23 +161,31 @@ suite "Waku Sync reconciliation":
for i in 0 ..< N:
let ts = 1000 + i
let h = toDigest("msg" & $i)
discard local.insert(SyncID(time: ts, hash: h))
discard
local.insert(SyncID(time: ts, hash: h), DefaultPubsubTopic, DefaultContentTopic)
var hr = h
if i == idx:
baseH = h
altH = toDigest("msg" & $i & "_x")
hr = altH
discard remote.insert(SyncID(time: ts, hash: hr))
discard remote.insert(
SyncID(time: ts, hash: hr), DefaultPubsubTopic, DefaultContentTopic
)
var z: WakuMessageHash
let slice = SyncID(time: 1000, hash: z) .. SyncID(time: 1000 + N - 1, hash: z)
var toS, toR: seq[WakuMessageHash]
let p = RangesData(
cluster: 0,
shards: @[0],
pubsubTopics: @[DefaultPubsubTopic],
contentTopics: @[DefaultContentTopic],
ranges: @[(slice, RangeType.Fingerprint)],
fingerprints: @[remote.computeFingerprint(slice)],
fingerprints:
@[
remote.computeFingerprint(
slice, @[DefaultPubsubTopic], @[DefaultContentTopic]
)
],
itemSets: @[],
)
let reply = local.processPayload(p, toS, toR)
@ -171,14 +205,15 @@ suite "Waku Sync reconciliation":
const N = 80_000
const bad = N - 10
let local = SeqStorage.new(@[])
let remote = SeqStorage.new(@[])
let local = SeqStorage.new(@[], @[], @[])
let remote = SeqStorage.new(@[], @[], @[])
var baseH, altH: WakuMessageHash
for i in 0 ..< N:
let ts = 1000 + i
let h = toDigest("msg" & $i)
discard local.insert(SyncID(time: ts, hash: h))
discard
local.insert(SyncID(time: ts, hash: h), DefaultPubsubTopic, DefaultContentTopic)
let hr =
if i == bad:
@ -187,7 +222,9 @@ suite "Waku Sync reconciliation":
altH
else:
h
discard remote.insert(SyncID(time: ts, hash: hr))
discard remote.insert(
SyncID(time: ts, hash: hr), DefaultPubsubTopic, DefaultContentTopic
)
var slice =
SyncID(time: 1000, hash: EmptyFingerprint) ..
@ -196,10 +233,15 @@ suite "Waku Sync reconciliation":
proc fpReply(s: Slice[SyncID], sendQ, recvQ: var seq[WakuMessageHash]): RangesData =
local.processPayload(
RangesData(
cluster: 0,
shards: @[0],
pubsubTopics: @[DefaultPubsubTopic],
contentTopics: @[DefaultContentTopic],
ranges: @[(s, RangeType.Fingerprint)],
fingerprints: @[remote.computeFingerprint(s)],
fingerprints:
@[
remote.computeFingerprint(
s, @[DefaultPubsubTopic], @[DefaultContentTopic]
)
],
itemSets: @[],
),
sendQ,
@ -213,7 +255,8 @@ suite "Waku Sync reconciliation":
check rep.ranges.len == 8
check rep.ranges.allIt(it[1] == RangeType.Fingerprint)
for (sl, _) in rep.ranges:
if local.computeFingerprint(sl) != remote.computeFingerprint(sl):
if local.computeFingerprint(sl, @[DefaultPubsubTopic], @[DefaultContentTopic]) !=
remote.computeFingerprint(sl, @[DefaultPubsubTopic], @[DefaultContentTopic]):
slice = sl
break
@ -235,8 +278,12 @@ suite "Waku Sync reconciliation":
check qSend.len == 1 and qSend[0] == altH
check qRecv.len == 1 and qRecv[0] == baseH
discard local.insert(SyncID(time: slice.a.time, hash: altH))
discard remote.insert(SyncID(time: slice.a.time, hash: baseH))
discard local.insert(
SyncID(time: slice.a.time, hash: altH), DefaultPubsubTopic, DefaultContentTopic
)
discard remote.insert(
SyncID(time: slice.a.time, hash: baseH), DefaultPubsubTopic, DefaultContentTopic
)
var send6, recv6: seq[WakuMessageHash]
let rep6 = fpReply(slice, send6, recv6)

View File

@ -1,6 +1,6 @@
{.used.}
import std/[options, random], testutils/unittests, chronos
import std/[options, random, sequtils, packedsets], testutils/unittests, chronos
import
../../waku/waku_core,
@ -13,23 +13,32 @@ suite "Waku Sync Storage":
var rng = initRand()
let count = 10_000
var elements = newSeqOfCap[SyncID](count)
var pubsub = newSeqOfCap[PubsubTopic](count)
var content = newSeqOfCap[ContentTopic](count)
var emptySet = @[0].toPackedSet()
emptySet.excl(0)
for i in 0 ..< count:
let id = SyncID(time: Timestamp(i), hash: randomHash(rng))
elements.add(id)
pubsub.add(DefaultPubsubTopic)
content.add(DefaultContentTopic)
var storage1 = SeqStorage.new(elements)
var storage2 = SeqStorage.new(elements)
var storage1 = SeqStorage.new(elements, pubsub, content)
var storage2 = SeqStorage.new(elements, pubsub, content)
let lb = elements[0]
let ub = elements[count - 1]
let bounds = lb .. ub
let fingerprint1 = storage1.computeFingerprint(bounds)
let fingerprint1 = storage1.computeFingerprint(bounds, @[], @[])
var outputPayload: RangesData
storage2.processFingerprintRange(bounds, fingerprint1, outputPayload)
storage2.processFingerprintRange(
bounds, emptySet, emptySet, fingerprint1, outputPayload
)
let expected =
RangesData(ranges: @[(bounds, RangeType.Skip)], fingerprints: @[], itemSets: @[])
@ -42,6 +51,12 @@ suite "Waku Sync Storage":
let count = 1000
var elements1 = newSeqOfCap[SyncID](count)
var elements2 = newSeqOfCap[SyncID](count)
var pubsub = newSeqOfCap[PubsubTopic](count)
var content = newSeqOfCap[ContentTopic](count)
var emptySet = @[0].toPackedSet()
emptySet.excl(0)
var diffs: seq[Fingerprint]
for i in 0 ..< count:
@ -53,7 +68,10 @@ suite "Waku Sync Storage":
else:
diffs.add(id.hash)
var storage1 = SeqStorage.new(elements1)
pubsub.add(DefaultPubsubTopic)
content.add(DefaultContentTopic)
var storage1 = SeqStorage.new(elements1, pubsub, content)
let lb = elements1[0]
let ub = elements1[count - 1]
@ -66,7 +84,9 @@ suite "Waku Sync Storage":
toRecv: seq[Fingerprint]
outputPayload: RangesData
storage1.processItemSetRange(bounds, itemSet2, toSend, toRecv, outputPayload)
storage1.processItemSetRange(
bounds, emptySet, emptySet, itemSet2, toSend, toRecv, outputPayload
)
check:
toSend == diffs
@ -79,11 +99,11 @@ suite "Waku Sync Storage":
let element1 = SyncID(time: Timestamp(1000), hash: randomHash(rng))
let element2 = SyncID(time: Timestamp(2000), hash: randomHash(rng))
let res1 = storage.insert(element1)
let res1 = storage.insert(element1, DefaultPubsubTopic, DefaultContentTopic)
assert res1.isOk(), $res1.error
let count1 = storage.length()
let res2 = storage.insert(element2)
let res2 = storage.insert(element2, DefaultPubsubTopic, DefaultContentTopic)
assert res2.isOk(), $res2.error
let count2 = storage.length()
@ -96,24 +116,33 @@ suite "Waku Sync Storage":
let element = SyncID(time: Timestamp(1000), hash: randomHash(rng))
let storage = SeqStorage.new(@[element])
let storage =
SeqStorage.new(@[element], @[DefaultPubsubTopic], @[DefaultContentTopic])
let res = storage.insert(element)
let res = storage.insert(element, DefaultPubsubTopic, DefaultContentTopic)
check:
res.isErr() == true
res.isErr() == false
storage.length() == 1
test "prune elements":
var rng = initRand()
let count = 1000
var elements = newSeqOfCap[SyncID](count)
var pubsub = newSeqOfCap[PubsubTopic](count)
var content = newSeqOfCap[ContentTopic](count)
var emptySet = @[0].toPackedSet()
emptySet.excl(0)
for i in 0 ..< count:
let id = SyncID(time: Timestamp(i), hash: randomHash(rng))
elements.add(id)
pubsub.add(DefaultPubsubTopic)
content.add(DefaultContentTopic)
let storage = SeqStorage.new(elements)
let storage = SeqStorage.new(elements, pubsub, content)
let beforeCount = storage.length()
@ -126,6 +155,52 @@ suite "Waku Sync Storage":
pruned == 500
afterCount == 500
test "topics recycling":
var rng = initRand()
let count = 1000
var elements = newSeqOfCap[SyncID](count)
var pubsub = newSeqOfCap[PubsubTopic](count)
var content = newSeqOfCap[ContentTopic](count)
var emptySet = @[0].toPackedSet()
emptySet.excl(0)
for i in 0 ..< (count div 2):
let id = SyncID(time: Timestamp(i), hash: randomHash(rng))
elements.add(id)
pubsub.add(DefaultPubsubTopic)
content.add("my/custom/topic")
for i in (count div 2) ..< count:
let id = SyncID(time: Timestamp(i), hash: randomHash(rng))
elements.add(id)
pubsub.add(DefaultPubsubTopic)
content.add(DefaultContentTopic)
let storage = SeqStorage.new(elements, pubsub, content)
let beforeCount = storage.unusedContentTopicsLen()
let pruned = storage.prune(Timestamp(500))
let afterCount = storage.unusedContentTopicsLen()
check:
beforeCount == 0
pruned == 500
afterCount == 1
let id = SyncID(time: Timestamp(1001), hash: randomHash(rng))
let res = storage.insert(id, DefaultPubsubTopic, "my/other/topic")
assert res.isOk(), $res.error
let reuseCount = storage.unusedContentTopicsLen()
check:
reuseCount == 0
## disabled tests are rough benchmark
#[ test "10M fingerprint":
var rng = initRand()

View File

@ -235,6 +235,7 @@ proc setupProtocols(
(
await node.mountStoreSync(
conf.clusterId, conf.subscribeShards, conf.contentTopics,
confStoreSync.rangeSec, confStoreSync.intervalSec,
confStoreSync.relayJitterSec,
)

View File

@ -273,29 +273,24 @@ proc mountMix*(
proc mountStoreSync*(
node: WakuNode,
storeSyncRange = 3600.uint32,
storeSyncInterval = 300.uint32,
storeSyncRelayJitter = 20.uint32,
cluster: uint16,
shards: seq[uint16],
contentTopics: seq[string],
storeSyncRange: uint32,
storeSyncInterval: uint32,
storeSyncRelayJitter: uint32,
): Future[Result[void, string]] {.async.} =
let idsChannel = newAsyncQueue[SyncID](0)
let wantsChannel = newAsyncQueue[PeerId](0)
let idsChannel = newAsyncQueue[(SyncID, PubsubTopic, ContentTopic)](0)
let wantsChannel = newAsyncQueue[(PeerId)](0)
let needsChannel = newAsyncQueue[(PeerId, WakuMessageHash)](0)
var cluster: uint16
var shards: seq[uint16]
let enrRes = node.enr.toTyped()
if enrRes.isOk():
let shardingRes = enrRes.get().relaySharding()
if shardingRes.isSome():
let relayShard = shardingRes.get()
cluster = relayShard.clusterID
shards = relayShard.shardIds
let pubsubTopics = shards.mapIt($RelayShard(clusterId: cluster, shardId: it))
let recon =
?await SyncReconciliation.new(
cluster, shards, node.peerManager, node.wakuArchive, storeSyncRange.seconds,
storeSyncInterval.seconds, storeSyncRelayJitter.seconds, idsChannel, wantsChannel,
needsChannel,
pubsubTopics, contentTopics, node.peerManager, node.wakuArchive,
storeSyncRange.seconds, storeSyncInterval.seconds, storeSyncRelayJitter.seconds,
idsChannel, wantsChannel, needsChannel,
)
node.wakuStoreReconciliation = recon

View File

@ -1,6 +1,6 @@
{.push raises: [].}
import std/sequtils, stew/leb128
import std/sequtils, stew/[leb128, byteutils]
import ../common/protobuf, ../waku_core/message, ../waku_core/time, ./common
@ -52,18 +52,26 @@ proc deltaEncode*(value: RangesData): seq[byte] =
i = 0
j = 0
# encode cluster
buf = uint64(value.cluster).toBytes(Leb128)
# encode pubsub topics
buf = uint64(value.pubsubTopics.len).toBytes(Leb128)
output &= @buf
# encode shards
buf = uint64(value.shards.len).toBytes(Leb128)
output &= @buf
for shard in value.shards:
buf = uint64(shard).toBytes(Leb128)
for topic in value.pubsubTopics:
buf = uint64(topic.len).toBytes(Leb128)
output &= @buf
output &= topic.toBytes()
# encode content topics
buf = uint64(value.contentTopics.len).toBytes(Leb128)
output &= @buf
for topic in value.contentTopics:
buf = uint64(topic.len).toBytes(Leb128)
output &= @buf
output &= topic.toBytes()
# the first range is implicit but must be explicit when encoded
let (bound, _) = value.ranges[0]
@ -221,37 +229,34 @@ proc getReconciled(idx: var int, buffer: seq[byte]): Result[bool, string] =
return ok(recon)
proc getCluster(idx: var int, buffer: seq[byte]): Result[uint16, string] =
proc getTopics(idx: var int, buffer: seq[byte]): Result[seq[string], string] =
if idx + VarIntLen > buffer.len:
return err("Cannot decode cluster")
return err("Cannot decode topic count")
let slice = buffer[idx ..< idx + VarIntLen]
let (val, len) = uint64.fromBytes(slice, Leb128)
idx += len
let topicCount = int(val)
return ok(uint16(val))
proc getShards(idx: var int, buffer: seq[byte]): Result[seq[uint16], string] =
if idx + VarIntLen > buffer.len:
return err("Cannot decode shards count")
let slice = buffer[idx ..< idx + VarIntLen]
let (val, len) = uint64.fromBytes(slice, Leb128)
idx += len
let shardsLen = val
var shards: seq[uint16]
for i in 0 ..< shardsLen:
var topics: seq[string]
for i in 0 ..< topicCount:
if idx + VarIntLen > buffer.len:
return err("Cannot decode shard value. idx: " & $i)
return err("Cannot decode length. Topic index: " & $i)
let slice = buffer[idx ..< idx + VarIntLen]
let (val, len) = uint64.fromBytes(slice, Leb128)
idx += len
let topicLen = int(val)
shards.add(uint16(val))
if idx + topicLen > buffer.len:
return err("Cannot decode bytes. Topic index: " & $i)
return ok(shards)
let topic = string.fromBytes(buffer[idx ..< idx + topicLen])
idx += topicLen
topics.add(topic)
return ok(topics)
proc deltaDecode*(
itemSet: var ItemSet, buffer: seq[byte], setLength: int
@ -294,8 +299,8 @@ proc deltaDecode*(T: type RangesData, buffer: seq[byte]): Result[T, string] =
lastTime = Timestamp(0)
idx = 0
payload.cluster = ?getCluster(idx, buffer)
payload.shards = ?getShards(idx, buffer)
payload.pubsubTopics = ?getTopics(idx, buffer)
payload.contentTopics = ?getTopics(idx, buffer)
lastTime = ?getTimestamp(idx, buffer)

View File

@ -26,8 +26,8 @@ type
ItemSet = 2
RangesData* = object
cluster*: uint16
shards*: seq[uint16]
pubsubTopics*: seq[PubsubTopic]
contentTopics*: seq[ContentTopic]
ranges*: seq[(Slice[SyncID], RangeType)]
fingerprints*: seq[Fingerprint] # Range type fingerprint stored here in order

View File

@ -1,7 +1,7 @@
{.push raises: [].}
import
std/[sequtils, options, packedsets],
std/[sequtils, options, sets],
stew/byteutils,
results,
chronicles,
@ -20,6 +20,7 @@ import
../waku_core/codecs,
../waku_core/time,
../waku_core/topics/pubsub_topic,
../waku_core/topics/content_topic,
../waku_core/message/digest,
../waku_core/message/message,
../node/peer_manager/peer_manager,
@ -37,8 +38,8 @@ logScope:
const DefaultStorageCap = 50_000
type SyncReconciliation* = ref object of LPProtocol
cluster: uint16
shards: PackedSet[uint16]
pubsubTopics: HashSet[PubsubTopic] # Empty set means accept all. See spec.
contentTopics: HashSet[ContentTopic] # Empty set means accept all. See spec.
peerManager: PeerManager
@ -46,10 +47,13 @@ type SyncReconciliation* = ref object of LPProtocol
storage: SyncStorage
# AsyncQueues are used as communication channels between
# reconciliation and transfer protocols.
idsRx: AsyncQueue[SyncID]
localWantsTx: AsyncQueue[PeerId]
# Receive IDs from transfer protocol for storage
idsRx: AsyncQueue[(SyncID, PubsubTopic, ContentTopic)]
# Send Hashes to transfer protocol for reception
localWantsTx: AsyncQueue[(PeerId)]
# Send Hashes to transfer protocol for transmission
remoteNeedsTx: AsyncQueue[(PeerId, WakuMessageHash)]
# params
@ -74,11 +78,14 @@ proc messageIngress*(
let id = SyncID(time: msg.timestamp, hash: msgHash)
self.storage.insert(id).isOkOr:
error "failed to insert new message", msg_hash = msgHash.toHex(), err = error
self.storage.insert(id, pubsubTopic, msg.contentTopic).isOkOr:
error "failed to insert new message", msg_hash = $id.hash.toHex(), error = $error
proc messageIngress*(
self: SyncReconciliation, msgHash: WakuMessageHash, msg: WakuMessage
self: SyncReconciliation,
msgHash: WakuMessageHash,
pubsubTopic: PubsubTopic,
msg: WakuMessage,
) =
trace "message ingress", msg_hash = msgHash.toHex(), msg = msg
@ -87,14 +94,69 @@ proc messageIngress*(
let id = SyncID(time: msg.timestamp, hash: msgHash)
self.storage.insert(id).isOkOr:
error "failed to insert new message", msg_hash = msgHash.toHex(), err = error
self.storage.insert(id, pubsubTopic, msg.contentTopic).isOkOr:
error "failed to insert new message", msg_hash = $id.hash.toHex(), error = $error
proc messageIngress*(self: SyncReconciliation, id: SyncID) =
trace "message ingress", id = id
proc messageIngress*(
self: SyncReconciliation,
id: SyncID,
pubsubTopic: PubsubTopic,
contentTopic: ContentTopic,
) =
self.storage.insert(id, pubsubTopic, contentTopic).isOkOr:
error "failed to insert new message", msg_hash = $id.hash.toHex(), error = $error
self.storage.insert(id).isOkOr:
error "failed to insert new message", msg_hash = id.hash.toHex(), err = error
proc preProcessPayload(
self: SyncReconciliation, payload: RangesData
): Option[RangesData] =
## Check the received payload for topics and/or time mismatch.
var payload = payload
# Always use the smallest pubsub topic scope possible
if payload.pubsubTopics.len > 0 and self.pubsubTopics.len > 0:
let pubsubIntersection = self.pubsubTopics * payload.pubsubTopics.toHashSet()
if pubsubIntersection.len < 1:
return none(RangesData)
payload.pubsubTopics = pubsubIntersection.toSeq()
elif self.pubsubTopics.len > 0:
payload.pubsubTopics = self.pubsubTopics.toSeq()
# Always use the smallest content topic scope possible
if payload.contentTopics.len > 0 and self.contentTopics.len > 0:
let contentIntersection = self.contentTopics * payload.contentTopics.toHashSet()
if contentIntersection.len < 1:
return none(RangesData)
payload.contentTopics = contentIntersection.toSeq()
elif self.contentTopics.len > 0:
payload.contentTopics = self.contentTopics.toSeq()
let timeRange = calculateTimeRange(self.relayJitter, self.syncRange)
let selfLowerBound = timeRange.a
# for non skip ranges check if they happen before any of our ranges
# convert to skip range before processing
for i in 0 ..< payload.ranges.len:
let rangeType = payload.ranges[i][1]
if rangeType != RangeType.Skip:
continue
let upperBound = payload.ranges[i][0].b.time
if selfLowerBound > upperBound:
payload.ranges[i][1] = RangeType.Skip
if rangeType == RangeType.Fingerprint:
payload.fingerprints.delete(0)
elif rangeType == RangeType.ItemSet:
payload.itemSets.delete(0)
else:
break
return some(payload)
proc processRequest(
self: SyncReconciliation, conn: Connection
@ -136,16 +198,23 @@ proc processRequest(
sendPayload: RangesData
rawPayload: seq[byte]
# Only process the ranges IF the shards and cluster matches
if self.cluster == recvPayload.cluster and
recvPayload.shards.toPackedSet() == self.shards:
sendPayload = self.storage.processPayload(recvPayload, hashToSend, hashToRecv)
let preProcessedPayloadRes = self.preProcessPayload(recvPayload)
if preProcessedPayloadRes.isSome():
let preProcessedPayload = preProcessedPayloadRes.get()
trace "pre-processed payload",
local = self.peerManager.switch.peerInfo.peerId,
remote = conn.peerId,
payload = preProcessedPayload
sendPayload =
self.storage.processPayload(preProcessedPayload, hashToSend, hashToRecv)
trace "sync payload processed",
hash_to_send = hashToSend, hash_to_recv = hashToRecv
sendPayload.cluster = self.cluster
sendPayload.shards = self.shards.toSeq()
sendPayload.pubsubTopics = self.pubsubTopics.toSeq()
sendPayload.contentTopics = self.contentTopics.toSeq()
for hash in hashToSend:
self.remoteNeedsTx.addLastNoWait((conn.peerId, hash))
@ -187,18 +256,24 @@ proc processRequest(
return ok()
proc initiate(
self: SyncReconciliation, connection: Connection
self: SyncReconciliation,
connection: Connection,
offset: Duration,
syncRange: Duration,
pubsubTopics: seq[PubsubTopic],
contentTopics: seq[ContentTopic],
): Future[Result[void, string]] {.async.} =
let
timeRange = calculateTimeRange(self.relayJitter, self.syncRange)
timeRange = calculateTimeRange(offset, syncRange)
lower = SyncID(time: timeRange.a, hash: EmptyFingerprint)
upper = SyncID(time: timeRange.b, hash: FullFingerprint)
bounds = lower .. upper
fingerprint = self.storage.computeFingerprint(bounds)
fingerprint = self.storage.computeFingerprint(bounds, pubsubTopics, contentTopics)
initPayload = RangesData(
cluster: self.cluster,
shards: self.shards.toSeq(),
pubsubTopics: pubsubTopics,
contentTopics: contentTopics,
ranges: @[(bounds, RangeType.Fingerprint)],
fingerprints: @[fingerprint],
itemSets: @[],
@ -227,7 +302,12 @@ proc initiate(
return ok()
proc storeSynchronization*(
self: SyncReconciliation, peerInfo: Option[RemotePeerInfo] = none(RemotePeerInfo)
self: SyncReconciliation,
peerInfo: Option[RemotePeerInfo] = none(RemotePeerInfo),
offset: Duration = self.relayJitter,
syncRange: Duration = self.syncRange,
pubsubTopics: HashSet[PubsubTopic] = self.pubsubTopics,
contentTopics: HashSet[ContentTopic] = self.contentTopics,
): Future[Result[void, string]] {.async.} =
let peer = peerInfo.valueOr:
self.peerManager.selectPeer(WakuReconciliationCodec).valueOr:
@ -241,7 +321,11 @@ proc storeSynchronization*(
debug "sync session initialized",
local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId
(await self.initiate(conn)).isOkOr:
(
await self.initiate(
conn, offset, syncRange, pubsubTopics.toSeq(), contentTopics.toSeq()
)
).isOkOr:
error "sync session failed",
local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId, err = error
@ -254,15 +338,13 @@ proc storeSynchronization*(
proc initFillStorage(
syncRange: timer.Duration, wakuArchive: WakuArchive
): Future[Result[seq[SyncID], string]] {.async.} =
): Future[Result[SeqStorage, string]] {.async.} =
if wakuArchive.isNil():
return err("waku archive unavailable")
let endTime = getNowInNanosecondTime()
let starTime = endTime - syncRange.nanos
#TODO special query for only timestap and hash ???
var query = ArchiveQuery(
includeData: true,
cursor: none(ArchiveCursor),
@ -274,39 +356,40 @@ proc initFillStorage(
debug "initial storage filling started"
var ids = newSeqOfCap[SyncID](DefaultStorageCap)
# we assume IDs are in order
var storage = SeqStorage.new(DefaultStorageCap)
while true:
let response = (await wakuArchive.findMessages(query)).valueOr:
return err("archive retrival failed: " & $error)
# we assume IDs are already in order
for i in 0 ..< response.hashes.len:
let hash = response.hashes[i]
let msg = response.messages[i]
let pubsubTopic = response.topics[i]
ids.add(SyncID(time: msg.timestamp, hash: hash))
let id = SyncID(time: msg.timestamp, hash: hash)
discard storage.insert(id, pubsubTopic, msg.contentTopic)
if response.cursor.isNone():
break
query.cursor = response.cursor
debug "initial storage filling done", elements = ids.len
debug "initial storage filling done", elements = storage.length()
return ok(ids)
return ok(storage)
proc new*(
T: type SyncReconciliation,
cluster: uint16,
shards: seq[uint16],
pubsubTopics: seq[PubSubTopic],
contentTopics: seq[ContentTopic],
peerManager: PeerManager,
wakuArchive: WakuArchive,
syncRange: timer.Duration = DefaultSyncRange,
syncInterval: timer.Duration = DefaultSyncInterval,
relayJitter: timer.Duration = DefaultGossipSubJitter,
idsRx: AsyncQueue[SyncID],
idsRx: AsyncQueue[(SyncID, PubsubTopic, ContentTopic)],
localWantsTx: AsyncQueue[PeerId],
remoteNeedsTx: AsyncQueue[(PeerId, WakuMessageHash)],
): Future[Result[T, string]] {.async.} =
@ -316,11 +399,11 @@ proc new*(
warn "will not sync messages before this point in time", error = res.error
SeqStorage.new(DefaultStorageCap)
else:
SeqStorage.new(res.get())
res.get()
var sync = SyncReconciliation(
cluster: cluster,
shards: shards.toPackedSet(),
pubsubTopics: pubsubTopics.toHashSet(),
contentTopics: contentTopics.toHashSet(),
peerManager: peerManager,
storage: storage,
syncRange: syncRange,
@ -381,9 +464,9 @@ proc periodicPrune(self: SyncReconciliation) {.async.} =
proc idsReceiverLoop(self: SyncReconciliation) {.async.} =
while true: # infinite loop
let id = await self.idsRx.popfirst()
let (id, pubsub, content) = await self.idsRx.popfirst()
self.messageIngress(id)
self.messageIngress(id, pubsub, content)
proc start*(self: SyncReconciliation) =
if self.started:

View File

@ -1,8 +1,14 @@
import std/[algorithm, sequtils, math, options], results, chronos, stew/arrayops
import
std/[algorithm, sequtils, math, options, tables, packedsets, sugar],
results,
chronos,
stew/arrayops
import
../../waku_core/time,
../../waku_core/message/digest,
../../waku_core/topics/pubsub_topic,
../../waku_core/topics/content_topic,
../common,
./range_processing,
./storage
@ -10,43 +16,119 @@ import
type SeqStorage* = ref object of SyncStorage
elements: seq[SyncID]
pubsubTopicIndexes: seq[int]
contentTopicIndexes: seq[int]
pubsubTopics: seq[PubSubTopic]
contentTopics: seq[ContentTopic]
unusedPubsubTopicSet: PackedSet[int]
unusedContentTopicSet: PackedSet[int]
# Numer of parts a range will be splitted into.
partitionCount: int
# Number of element in a range for which item sets are used instead of fingerprints.
lengthThreshold: int
method length*(self: SeqStorage): int =
method length*(self: SeqStorage): int {.raises: [].} =
return self.elements.len
method insert*(self: SeqStorage, element: SyncID): Result[void, string] {.raises: [].} =
let idx = self.elements.lowerBound(element, common.cmp)
proc pubsubTopicsLen*(self: SeqStorage): int {.raises: [].} =
return self.pubsubTopics.len
proc contentTopicsLen*(self: SeqStorage): int {.raises: [].} =
return self.contentTopics.len
proc unusedPubsubTopicsLen*(self: SeqStorage): int {.raises: [].} =
return self.unusedPubsubTopicSet.len
proc unusedContentTopicsLen*(self: SeqStorage): int {.raises: [].} =
return self.unusedContentTopicSet.len
proc getPubsubTopicIndex(self: SeqStorage, pubsubTopic: PubSubTopic): int =
for i, selfTopic in self.pubsubTopics:
if pubsubTopic == selfTopic:
return i
if self.unusedPubsubTopicSet.len > 0:
let unusedIdx = self.unusedPubsubTopicSet.toSeq()[0]
self.unusedPubsubTopicSet.excl(unusedIdx)
self.pubsubTopics[unusedIdx] = pubsubTopic
return unusedIdx
let newIdx = self.pubsubTopics.len
self.pubsubTopics.add(pubsubTopic)
return newidx
proc getContentTopicIndex(self: SeqStorage, contentTopic: ContentTopic): int =
for i, selfTopic in self.contentTopics:
if contentTopic == selfTopic:
return i
if self.unusedContentTopicSet.len > 0:
let unusedIdx = self.unusedContentTopicSet.toSeq()[0]
self.unusedContentTopicSet.excl(unusedIdx)
self.contentTopics[unusedIdx] = contentTopic
return unusedIdx
let newIdx = self.contentTopics.len
self.contentTopics.add(contentTopic)
return newIdx
proc insertAt(
self: SeqStorage,
idx: int,
element: SyncID,
pubsubTopic: PubSubTopic,
contentTopic: ContentTopic,
) =
if idx < self.elements.len and self.elements[idx] == element:
return err("duplicate element")
# duplicate element ignore
return
self.elements.insert(element, idx)
let pubsubIndex = self.getPubsubTopicIndex(pubsubTopic)
let contentIndex = self.getContentTopicIndex(contentTopic)
self.pubsubTopicIndexes.insert(pubsubIndex, idx)
self.contentTopicIndexes.insert(contentIndex, idx)
method insert*(
self: SeqStorage,
element: SyncID,
pubsubTopic: PubSubTopic,
contentTopic: ContentTopic,
): Result[void, string] {.raises: [].} =
let idx = self.elements.lowerBound(element, common.cmp)
self.insertAt(idx, element, pubsubTopic, contentTopic)
return ok()
method batchInsert*(
self: SeqStorage, elements: seq[SyncID]
self: SeqStorage,
elements: seq[SyncID],
pubsubTopics: seq[PubSubTopic],
contentTopics: seq[ContentTopic],
): Result[void, string] {.raises: [].} =
## Insert the sorted seq of new elements.
if elements.len == 1:
return self.insert(elements[0])
#TODO custom impl. ???
return self.insert(elements[0], pubsubTopics[0], contentTopics[0])
if not elements.isSorted(common.cmp):
return err("seq not sorted")
var merged = newSeqOfCap[SyncID](self.elements.len + elements.len)
var idx = 0
for i in 0 ..< elements.len:
let element = elements[i]
let pubsubTopic = pubsubTopics[i]
let contentTopic = contentTopics[i]
merged.merge(self.elements, elements, common.cmp)
idx = self.elements[idx ..< self.elements.len].lowerBound(element, common.cmp)
self.elements = merged.deduplicate(true)
self.insertAt(idx, element, pubsubTopic, contentTopic)
return ok()
@ -62,11 +144,32 @@ method prune*(self: SeqStorage, timestamp: Timestamp): int {.raises: [].} =
let idx = self.elements.lowerBound(bound, common.cmp)
self.elements.delete(0 ..< idx)
self.pubsubTopicIndexes.delete(0 ..< idx)
self.contentTopicIndexes.delete(0 ..< idx)
# Free unused content topics
let contentIdxSet = self.contentTopicIndexes.toPackedSet()
var contentTopicSet: PackedSet[int]
for i in 0 ..< self.contentTopics.len:
contentTopicSet.incl(i)
self.unusedContentTopicSet = contentTopicSet - contentIdxSet
# Free unused pubsub topics
let pubsubIdxSet = self.pubsubTopicIndexes.toPackedSet()
var pubsubTopicSet: PackedSet[int]
for i in 0 ..< self.pubsubTopics.len:
pubsubTopicSet.incl(i)
self.unusedPubsubTopicSet = pubsubTopicSet - pubsubIdxSet
return idx
proc computefingerprintFromSlice(
self: SeqStorage, sliceOpt: Option[Slice[int]]
self: SeqStorage,
sliceOpt: Option[Slice[int]],
pubsubTopicSet: PackedSet[int],
contentTopicSet: PackedSet[int],
): Fingerprint =
## XOR all hashes of a slice of the storage.
@ -77,7 +180,21 @@ proc computefingerprintFromSlice(
let idxSlice = sliceOpt.get()
for id in self.elements[idxSlice]:
let elementSlice = self.elements[idxSlice]
let pubsubSlice = self.pubsubTopicIndexes[idxSlice]
let contentSlice = self.contentTopicIndexes[idxSlice]
for i in 0 ..< elementSlice.len:
let id = elementSlice[i]
let pubsub = pubsubSlice[i]
let content = contentSlice[i]
if pubsubTopicSet.len > 0 and pubsub notin pubsubTopicSet:
continue
if contentTopicSet.len > 0 and content notin contentTopicSet:
continue
fingerprint = fingerprint xor id.hash
return fingerprint
@ -85,8 +202,6 @@ proc computefingerprintFromSlice(
proc findIdxBounds(self: SeqStorage, slice: Slice[SyncID]): Option[Slice[int]] =
## Given bounds find the corresponding indices in this storage
#TODO can thoses 2 binary search be combined for efficiency ???
let lower = self.elements.lowerBound(slice.a, common.cmp)
var upper = self.elements.upperBound(slice.b, common.cmp)
@ -101,21 +216,58 @@ proc findIdxBounds(self: SeqStorage, slice: Slice[SyncID]): Option[Slice[int]] =
return some(lower ..< upper)
method computeFingerprint*(
self: SeqStorage, bounds: Slice[SyncID]
self: SeqStorage,
bounds: Slice[SyncID],
pubsubTopics: seq[PubsubTopic],
contentTopics: seq[ContentTopic],
): Fingerprint {.raises: [].} =
let idxSliceOpt = self.findIdxBounds(bounds)
return self.computefingerprintFromSlice(idxSliceOpt)
var pubsubTopicSet = initPackedSet[int]()
for inputTopic in pubsubTopics:
for i, localTopic in self.pubsubTopics:
if inputTopic == localTopic:
pubsubTopicSet.incl(i)
var contentTopicSet = initPackedSet[int]()
for inputTopic in contentTopics:
for i, localTopic in self.contentTopics:
if inputTopic == localTopic:
contentTopicSet.incl(i)
return self.computefingerprintFromSlice(idxSliceOpt, pubsubTopicSet, contentTopicSet)
proc getFilteredElements(
self: SeqStorage,
slice: Slice[int],
pubsubTopicSet: PackedSet[int],
contentTopicSet: PackedSet[int],
): seq[SyncID] =
let elements = collect(newSeq):
for i in slice:
if pubsubTopicSet.len > 0 and self.pubsubTopicIndexes[i] notin pubsubTopicSet:
continue
if contentTopicSet.len > 0 and self.contentTopicIndexes[i] notin contentTopicSet:
continue
self.elements[i]
elements
proc processFingerprintRange*(
self: SeqStorage,
inputBounds: Slice[SyncID],
pubsubTopicSet: PackedSet[int],
contentTopicSet: PackedSet[int],
inputFingerprint: Fingerprint,
output: var RangesData,
) {.raises: [].} =
## Compares fingerprints and partition new ranges.
let idxSlice = self.findIdxBounds(inputBounds)
let ourFingerprint = self.computeFingerprintFromSlice(idxSlice)
let ourFingerprint =
self.computeFingerprintFromSlice(idxSlice, pubsubTopicSet, contentTopicSet)
if ourFingerprint == inputFingerprint:
output.ranges.add((inputBounds, RangeType.Skip))
@ -131,7 +283,8 @@ proc processFingerprintRange*(
if slice.len <= self.lengthThreshold:
output.ranges.add((inputBounds, RangeType.ItemSet))
let state = ItemSet(elements: self.elements[slice], reconciled: false)
let elements = self.getFilteredElements(slice, pubsubTopicSet, contentTopicSet)
let state = ItemSet(elements: elements, reconciled: false)
output.itemSets.add(state)
return
@ -149,11 +302,13 @@ proc processFingerprintRange*(
if slice.len <= self.lengthThreshold:
output.ranges.add((partitionBounds, RangeType.ItemSet))
let state = ItemSet(elements: self.elements[slice], reconciled: false)
let elements = self.getFilteredElements(slice, pubsubTopicSet, contentTopicSet)
let state = ItemSet(elements: elements, reconciled: false)
output.itemSets.add(state)
continue
let fingerprint = self.computeFingerprintFromSlice(some(slice))
let fingerprint =
self.computeFingerprintFromSlice(some(slice), pubsubTopicSet, contentTopicSet)
output.ranges.add((partitionBounds, RangeType.Fingerprint))
output.fingerprints.add(fingerprint)
continue
@ -161,6 +316,8 @@ proc processFingerprintRange*(
proc processItemSetRange*(
self: SeqStorage,
inputBounds: Slice[SyncID],
pubsubTopicSet: PackedSet[int],
contentTopicSet: PackedSet[int],
inputItemSet: ItemSet,
hashToSend: var seq[Fingerprint],
hashToRecv: var seq[Fingerprint],
@ -190,6 +347,16 @@ proc processItemSetRange*(
while (j < m):
let ourElement = self.elements[j]
let pubsub = self.pubsubTopicIndexes[j]
let content = self.contentTopicIndexes[j]
if pubsubTopicSet.len > 0 and pubsub notin pubsubTopicSet:
j.inc()
continue
if contentTopicSet.len > 0 and content notin contentTopicSet:
j.inc()
continue
if i >= n:
# in case we have more elements
@ -217,7 +384,8 @@ proc processItemSetRange*(
if not inputItemSet.reconciled:
output.ranges.add((inputBounds, RangeType.ItemSet))
let state = ItemSet(elements: self.elements[slice], reconciled: true)
let elements = self.getFilteredElements(slice, pubsubTopicSet, contentTopicSet)
let state = ItemSet(elements: elements, reconciled: true)
output.itemSets.add(state)
else:
output.ranges.add((inputBounds, RangeType.Skip))
@ -234,6 +402,18 @@ method processPayload*(
i = 0
j = 0
var pubsubTopicSet = initPackedSet[int]()
for inputTopic in input.pubsubTopics:
for i, localTopic in self.pubsubTopics:
if inputTopic == localTopic:
pubsubTopicSet.incl(i)
var contentTopicSet = initPackedSet[int]()
for inputTopic in input.contentTopics:
for i, localTopic in self.contentTopics:
if inputTopic == localTopic:
contentTopicSet.incl(i)
for (bounds, rangeType) in input.ranges:
case rangeType
of RangeType.Skip:
@ -244,14 +424,18 @@ method processPayload*(
let fingerprint = input.fingerprints[i]
i.inc()
self.processFingerprintRange(bounds, fingerprint, output)
self.processFingerprintRange(
bounds, pubsubTopicSet, contentTopicSet, fingerprint, output
)
continue
of RangeType.ItemSet:
let itemSet = input.itemsets[j]
j.inc()
self.processItemSetRange(bounds, itemSet, hashToSend, hashToRecv, output)
self.processItemSetRange(
bounds, pubsubTopicSet, contentTopicSet, itemSet, hashToSend, hashToRecv, output
)
continue
@ -295,8 +479,41 @@ proc new*(T: type SeqStorage, capacity: int, threshold = 100, partitions = 8): T
)
proc new*(
T: type SeqStorage, elements: seq[SyncID], threshold = 100, partitions = 8
T: type SeqStorage,
elements: seq[SyncID],
pubsubTopics: seq[PubsubTopic],
contentTopics: seq[ContentTopic],
threshold = 100,
partitions = 8,
): T =
var idx = 0
var uniquePubsubTopics = initOrderedTable[PubsubTopic, int]()
for pubsub in pubsubTopics:
if pubsub notin uniquePubsubTopics:
uniquePubsubTopics[pubsub] = idx
idx.inc()
let pubsubTopicIndexes = collect(newSeq):
for pubsub in pubsubTopics:
uniquePubsubTopics[pubsub]
idx = 0
var uniqueContentTopics = initOrderedTable[ContentTopic, int]()
for content in contentTopics:
if content notin uniqueContentTopics:
uniqueContentTopics[content] = idx
idx.inc()
let contentTopicIndexes = collect(newSeq):
for content in contentTopics:
uniqueContentTopics[content]
return SeqStorage(
elements: elements, lengthThreshold: threshold, partitionCount: partitions
elements: elements,
pubsubTopics: uniquePubsubTopics.keys.toSeq(),
contentTopics: uniqueContentTopics.keys.toSeq(),
pubsubTopicIndexes: pubsubTopicIndexes,
contentTopicIndexes: contentTopicIndexes,
lengthThreshold: threshold,
partitionCount: partitions,
)

View File

@ -1,16 +1,23 @@
import results
import ../../waku_core/time, ../common
import
../../waku_core/time,
../../waku_core/topics/content_topic,
../../waku_core/topics/pubsub_topic,
../common
type SyncStorage* = ref object of RootObj
method insert*(
self: SyncStorage, element: SyncID
self: SyncStorage, element: SyncID, pubsubTopic: PubsubTopic, topic: ContentTopic
): Result[void, string] {.base, gcsafe, raises: [].} =
return err("insert method not implemented for SyncStorage")
method batchInsert*(
self: SyncStorage, elements: seq[SyncID]
self: SyncStorage,
elements: seq[SyncID],
pubsubTopics: seq[PubsubTopic],
contentTopics: seq[ContentTopic],
): Result[void, string] {.base, gcsafe, raises: [].} =
return err("batchInsert method not implemented for SyncStorage")
@ -20,17 +27,26 @@ method prune*(
-1
method computeFingerprint*(
self: SyncStorage, bounds: Slice[SyncID]
self: SyncStorage,
bounds: Slice[SyncID],
pubsubTopics: seq[PubsubTopic],
contentTopics: seq[ContentTopic],
): Fingerprint {.base, gcsafe, raises: [].} =
return EmptyFingerprint
return FullFingerprint
method processPayload*(
self: SyncStorage,
payload: RangesData,
input: RangesData,
hashToSend: var seq[Fingerprint],
hashToRecv: var seq[Fingerprint],
): RangesData {.base, gcsafe, raises: [].} =
return RangesData()
return RangesData(
pubsubTopics: @["InsertPubsubTopicHere"],
contentTopics: @["InsertContentTopicHere"],
ranges: @[],
fingerprints: @[FullFingerprint],
itemSets: @[],
)
method length*(self: SyncStorage): int {.base, gcsafe, raises: [].} =
-1

View File

@ -16,6 +16,8 @@ import
../common/protobuf,
../waku_enr,
../waku_core/codecs,
../waku_core/topics/pubsub_topic,
../waku_core/topics/content_topic,
../waku_core/message/digest,
../waku_core/message/message,
../waku_core/message/default_values,
@ -34,7 +36,7 @@ type SyncTransfer* = ref object of LPProtocol
peerManager: PeerManager
# Send IDs to reconciliation protocol for storage
idsTx: AsyncQueue[SyncID]
idsTx: AsyncQueue[(SyncID, PubsubTopic, ContentTopic)]
# Receive Hashes from reconciliation protocol for reception
localWantsRx: AsyncQueue[PeerId]
@ -179,7 +181,8 @@ proc initProtocolHandler(self: SyncTransfer) =
continue
let id = SyncID(time: msg.timestamp, hash: hash)
await self.idsTx.addLast(id)
await self.idsTx.addLast((id, pubsub, msg.contentTopic))
continue
@ -197,7 +200,7 @@ proc new*(
T: type SyncTransfer,
peerManager: PeerManager,
wakuArchive: WakuArchive,
idsTx: AsyncQueue[SyncID],
idsTx: AsyncQueue[(SyncID, PubsubTopic, ContentTopic)],
localWantsRx: AsyncQueue[PeerId],
remoteNeedsRx: AsyncQueue[(PeerId, WakuMessageHash)],
): T =