diff --git a/tests/waku_store_sync/sync_utils.nim b/tests/waku_store_sync/sync_utils.nim index d5cb601a2..fe62e02a1 100644 --- a/tests/waku_store_sync/sync_utils.nim +++ b/tests/waku_store_sync/sync_utils.nim @@ -22,22 +22,22 @@ proc randomHash*(rng: var Rand): WakuMessageHash = proc newTestWakuRecon*( switch: Switch, - idsRx: AsyncQueue[SyncID], - wantsTx: AsyncQueue[PeerId], - needsTx: AsyncQueue[(PeerId, Fingerprint)], - cluster: uint16 = 1, + pubsubTopics: seq[PubsubTopic] = @[], + contentTopics: seq[ContentTopic] = @[], syncRange: timer.Duration = DefaultSyncRange, - shards: seq[uint16] = @[0, 1, 2, 3, 4, 5, 6, 7], + idsRx: AsyncQueue[(SyncID, PubsubTopic, ContentTopic)], + wantsTx: AsyncQueue[PeerId], + needsTx: AsyncQueue[(PeerId, WakuMessageHash)], ): Future[SyncReconciliation] {.async.} = let peerManager = PeerManager.new(switch) let res = await SyncReconciliation.new( - cluster = cluster, - shards = shards, + pubsubTopics = pubsubTopics, + contentTopics = contentTopics, peerManager = peerManager, wakuArchive = nil, - relayJitter = 0.seconds, syncRange = syncRange, + relayJitter = 0.seconds, idsRx = idsRx, localWantsTx = wantsTx, remoteNeedsTx = needsTx, @@ -52,9 +52,9 @@ proc newTestWakuRecon*( proc newTestWakuTransfer*( switch: Switch, - idsTx: AsyncQueue[SyncID], + idsTx: AsyncQueue[(SyncID, PubsubTopic, ContentTopic)], wantsRx: AsyncQueue[PeerId], - needsRx: AsyncQueue[(PeerId, Fingerprint)], + needsRx: AsyncQueue[(PeerId, WakuMessageHash)], ): SyncTransfer = let peerManager = PeerManager.new(switch) diff --git a/tests/waku_store_sync/test_codec.nim b/tests/waku_store_sync/test_codec.nim index 4ca1812dc..fdfd3f2f0 100644 --- a/tests/waku_store_sync/test_codec.nim +++ b/tests/waku_store_sync/test_codec.nim @@ -99,6 +99,8 @@ suite "Waku Store Sync Codec": let range4 = (bounds4, RangeType.ItemSet) let payload = RangesData( + pubsubTopics: @[DefaultPubsubTopic], + contentTopics: @[], ranges: @[range1, range2, range3, range4], fingerprints: @[], itemSets: @[itemSet1, itemSet2, itemSet3, itemSet4], @@ -139,6 +141,8 @@ suite "Waku Store Sync Codec": ranges.add(range) let payload = RangesData( + pubsubTopics: @[DefaultPubsubTopic], + contentTopics: @[], ranges: ranges, fingerprints: @[randomHash(rng), randomHash(rng), randomHash(rng), randomHash(rng)], @@ -186,8 +190,13 @@ suite "Waku Store Sync Codec": ranges.add((bound, RangeType.ItemSet)) itemSets.add(itemSet) - let payload = - RangesData(ranges: ranges, fingerprints: fingerprints, itemSets: itemSets) + let payload = RangesData( + pubsubTopics: @[DefaultPubsubTopic], + contentTopics: @[], + ranges: ranges, + fingerprints: fingerprints, + itemSets: itemSets, + ) let encodedPayload = payload.deltaEncode() diff --git a/tests/waku_store_sync/test_protocol.nim b/tests/waku_store_sync/test_protocol.nim index 552d9ca4f..bd13716a2 100644 --- a/tests/waku_store_sync/test_protocol.nim +++ b/tests/waku_store_sync/test_protocol.nim @@ -39,7 +39,7 @@ suite "Waku Sync: reconciliation": var clientSwitch {.threadvar.}: Switch var - idsChannel {.threadvar.}: AsyncQueue[SyncID] + idsChannel {.threadvar.}: AsyncQueue[(SyncID, PubsubTopic, ContentTopic)] localWants {.threadvar.}: AsyncQueue[PeerId] remoteNeeds {.threadvar.}: AsyncQueue[(PeerId, WakuMessageHash)] @@ -55,13 +55,10 @@ suite "Waku Sync: reconciliation": await allFutures(serverSwitch.start(), clientSwitch.start()) - idsChannel = newAsyncQueue[SyncID]() + idsChannel = newAsyncQueue[(SyncID, PubsubTopic, ContentTopic)]() localWants = newAsyncQueue[PeerId]() remoteNeeds = newAsyncQueue[(PeerId, WakuMessageHash)]() - server = await newTestWakuRecon(serverSwitch, idsChannel, localWants, remoteNeeds) - client = await newTestWakuRecon(clientSwitch, idsChannel, localWants, remoteNeeds) - serverPeerInfo = serverSwitch.peerInfo.toRemotePeerInfo() clientPeerInfo = clientSwitch.peerInfo.toRemotePeerInfo() @@ -72,6 +69,13 @@ suite "Waku Sync: reconciliation": await allFutures(serverSwitch.stop(), clientSwitch.stop()) asyncTest "sync 2 nodes both empty": + server = await newTestWakuRecon( + serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) + client = await newTestWakuRecon( + clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) + check: idsChannel.len == 0 remoteNeeds.len == 0 @@ -84,6 +88,13 @@ suite "Waku Sync: reconciliation": remoteNeeds.len == 0 asyncTest "sync 2 nodes empty client full server": + server = await newTestWakuRecon( + serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) + client = await newTestWakuRecon( + clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) + let msg1 = fakeWakuMessage(ts = now(), contentTopic = DefaultContentTopic) msg2 = fakeWakuMessage(ts = now() + 1, contentTopic = DefaultContentTopic) @@ -92,11 +103,13 @@ suite "Waku Sync: reconciliation": hash2 = computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2) hash3 = computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg3) - server.messageIngress(hash1, msg1) - server.messageIngress(hash2, msg2) - server.messageIngress(hash3, msg3) + server.messageIngress(hash1, DefaultPubsubTopic, msg1) + server.messageIngress(hash2, DefaultPubsubTopic, msg2) + server.messageIngress(hash3, DefaultPubsubTopic, msg3) check: + remoteNeeds.len == 0 + localWants.len == 0 remoteNeeds.contains((clientPeerInfo.peerId, hash1)) == false remoteNeeds.contains((clientPeerInfo.peerId, hash2)) == false remoteNeeds.contains((clientPeerInfo.peerId, hash3)) == false @@ -105,11 +118,19 @@ suite "Waku Sync: reconciliation": assert res.isOk(), res.error check: + remoteNeeds.len == 3 remoteNeeds.contains((clientPeerInfo.peerId, hash1)) == true remoteNeeds.contains((clientPeerInfo.peerId, hash2)) == true remoteNeeds.contains((clientPeerInfo.peerId, hash3)) == true asyncTest "sync 2 nodes full client empty server": + server = await newTestWakuRecon( + serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) + client = await newTestWakuRecon( + clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) + let msg1 = fakeWakuMessage(ts = now(), contentTopic = DefaultContentTopic) msg2 = fakeWakuMessage(ts = now() + 1, contentTopic = DefaultContentTopic) @@ -118,11 +139,13 @@ suite "Waku Sync: reconciliation": hash2 = computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2) hash3 = computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg3) - client.messageIngress(hash1, msg1) - client.messageIngress(hash2, msg2) - client.messageIngress(hash3, msg3) + client.messageIngress(hash1, DefaultPubsubTopic, msg1) + client.messageIngress(hash2, DefaultPubsubTopic, msg2) + client.messageIngress(hash3, DefaultPubsubTopic, msg3) check: + remoteNeeds.len == 0 + localWants.len == 0 remoteNeeds.contains((serverPeerInfo.peerId, hash1)) == false remoteNeeds.contains((serverPeerInfo.peerId, hash2)) == false remoteNeeds.contains((serverPeerInfo.peerId, hash3)) == false @@ -131,11 +154,19 @@ suite "Waku Sync: reconciliation": assert res.isOk(), res.error check: + remoteNeeds.len == 3 remoteNeeds.contains((serverPeerInfo.peerId, hash1)) == true remoteNeeds.contains((serverPeerInfo.peerId, hash2)) == true remoteNeeds.contains((serverPeerInfo.peerId, hash3)) == true asyncTest "sync 2 nodes different hashes": + server = await newTestWakuRecon( + serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) + client = await newTestWakuRecon( + clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) + let msg1 = fakeWakuMessage(ts = now(), contentTopic = DefaultContentTopic) msg2 = fakeWakuMessage(ts = now() + 1, contentTopic = DefaultContentTopic) @@ -144,12 +175,14 @@ suite "Waku Sync: reconciliation": hash2 = computeMessageHash(DefaultPubsubTopic, msg2) hash3 = computeMessageHash(DefaultPubsubTopic, msg3) - server.messageIngress(hash1, msg1) - server.messageIngress(hash2, msg2) - client.messageIngress(hash1, msg1) - client.messageIngress(hash3, msg3) + server.messageIngress(hash1, DefaultPubsubTopic, msg1) + server.messageIngress(hash2, DefaultPubsubTopic, msg2) + client.messageIngress(hash1, DefaultPubsubTopic, msg1) + client.messageIngress(hash3, DefaultPubsubTopic, msg3) check: + remoteNeeds.len == 0 + localWants.len == 0 remoteNeeds.contains((serverPeerInfo.peerId, hash3)) == false remoteNeeds.contains((clientPeerInfo.peerId, hash2)) == false @@ -157,51 +190,82 @@ suite "Waku Sync: reconciliation": assert syncRes.isOk(), $syncRes.error check: + remoteNeeds.len == 2 remoteNeeds.contains((serverPeerInfo.peerId, hash3)) == true remoteNeeds.contains((clientPeerInfo.peerId, hash2)) == true asyncTest "sync 2 nodes different shards": + server = await newTestWakuRecon( + serverSwitch, + @["/waku/2/rs/2/1", "/waku/2/rs/2/2", "/waku/2/rs/2/3", "/waku/2/rs/2/4"], + @[DefaultContentTopic], + DefaultSyncRange, + idsChannel, + localWants, + remoteNeeds, + ) + + client = await newTestWakuRecon( + clientSwitch, + @["/waku/2/rs/2/3", "/waku/2/rs/2/4", "/waku/2/rs/2/5", "/waku/2/rs/2/6"], + @[DefaultContentTopic], + DefaultSyncRange, + idsChannel, + localWants, + remoteNeeds, + ) + let msg1 = fakeWakuMessage(ts = now(), contentTopic = DefaultContentTopic) msg2 = fakeWakuMessage(ts = now() + 1, contentTopic = DefaultContentTopic) msg3 = fakeWakuMessage(ts = now() + 2, contentTopic = DefaultContentTopic) - hash1 = computeMessageHash(DefaultPubsubTopic, msg1) - hash2 = computeMessageHash(DefaultPubsubTopic, msg2) - hash3 = computeMessageHash(DefaultPubsubTopic, msg3) + msg4 = fakeWakuMessage(ts = now() + 3, contentTopic = DefaultContentTopic) + msg5 = fakeWakuMessage(ts = now() + 4, contentTopic = DefaultContentTopic) + msg6 = fakeWakuMessage(ts = now() + 5, contentTopic = DefaultContentTopic) + hash1 = computeMessageHash("/waku/2/rs/2/1", msg1) + hash2 = computeMessageHash("/waku/2/rs/2/2", msg2) + hash3 = computeMessageHash("/waku/2/rs/2/3", msg3) + hash4 = computeMessageHash("/waku/2/rs/2/4", msg4) + hash5 = computeMessageHash("/waku/2/rs/2/5", msg5) + hash6 = computeMessageHash("/waku/2/rs/2/6", msg6) - server.messageIngress(hash1, msg1) - server.messageIngress(hash2, msg2) - client.messageIngress(hash1, msg1) - client.messageIngress(hash3, msg3) + server.messageIngress(hash1, "/waku/2/rs/2/1", msg1) + server.messageIngress(hash2, "/waku/2/rs/2/2", msg2) + server.messageIngress(hash3, "/waku/2/rs/2/3", msg3) + + client.messageIngress(hash4, "/waku/2/rs/2/4", msg4) + client.messageIngress(hash5, "/waku/2/rs/2/5", msg5) + client.messageIngress(hash6, "/waku/2/rs/2/6", msg6) check: - remoteNeeds.contains((serverPeerInfo.peerId, hash3)) == false - remoteNeeds.contains((clientPeerInfo.peerId, hash2)) == false - - server = await newTestWakuRecon( - serverSwitch, idsChannel, localWants, remoteNeeds, shards = @[0.uint16, 1, 2, 3] - ) - client = await newTestWakuRecon( - clientSwitch, idsChannel, localWants, remoteNeeds, shards = @[4.uint16, 5, 6, 7] - ) + remoteNeeds.len == 0 var syncRes = await client.storeSynchronization(some(serverPeerInfo)) assert syncRes.isOk(), $syncRes.error check: - remoteNeeds.len == 0 + remoteNeeds.len == 2 + remoteNeeds.contains((clientPeerInfo.peerId, hash3)) == true + remoteNeeds.contains((serverPeerInfo.peerId, hash4)) == true asyncTest "sync 2 nodes same hashes": + server = await newTestWakuRecon( + serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) + client = await newTestWakuRecon( + clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) + let msg1 = fakeWakuMessage(ts = now(), contentTopic = DefaultContentTopic) msg2 = fakeWakuMessage(ts = now() + 1, contentTopic = DefaultContentTopic) hash1 = computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg1) hash2 = computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2) - server.messageIngress(hash1, msg1) - client.messageIngress(hash1, msg1) - server.messageIngress(hash2, msg2) - client.messageIngress(hash2, msg2) + server.messageIngress(hash1, DefaultPubsubTopic, msg1) + client.messageIngress(hash1, DefaultPubsubTopic, msg1) + server.messageIngress(hash2, DefaultPubsubTopic, msg2) + client.messageIngress(hash2, DefaultPubsubTopic, msg2) check: remoteNeeds.len == 0 @@ -213,6 +277,13 @@ suite "Waku Sync: reconciliation": remoteNeeds.len == 0 asyncTest "sync 2 nodes 100K msgs 1 diff": + server = await newTestWakuRecon( + serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) + client = await newTestWakuRecon( + clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) + let msgCount = 100_000 var diffIndex = rand(msgCount) var diff: WakuMessageHash @@ -228,28 +299,37 @@ suite "Waku Sync: reconciliation": let msg = fakeWakuMessage(ts = timestamp, contentTopic = DefaultContentTopic) let hash = computeMessageHash(DefaultPubsubTopic, msg) - server.messageIngress(hash, msg) + server.messageIngress(hash, DefaultPubsubTopic, msg) if i != diffIndex: - client.messageIngress(hash, msg) + client.messageIngress(hash, DefaultPubsubTopic, msg) else: diff = hash timestamp += Timestamp(part) check: + remoteNeeds.len == 0 remoteNeeds.contains((clientPeerInfo.peerId, WakuMessageHash(diff))) == false let res = await client.storeSynchronization(some(serverPeerInfo)) assert res.isOk(), $res.error check: + remoteNeeds.len == 1 remoteNeeds.contains((clientPeerInfo.peerId, WakuMessageHash(diff))) == true asyncTest "sync 2 nodes 10K msgs 1K diffs": + server = await newTestWakuRecon( + serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) + client = await newTestWakuRecon( + clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) + const - msgCount = 200_000 # total messages on the server - diffCount = 100 # messages initially missing on the client + msgCount = 10_000 # total messages on the server + diffCount = 1000 # messages initially missing on the client ## ── choose which messages will be absent from the client ───────────── var missingIdx: HashSet[int] @@ -266,9 +346,9 @@ suite "Waku Sync: reconciliation": msg = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic) h = computeMessageHash(DefaultPubsubTopic, msg) - server.messageIngress(h, msg) # every msg is on the server + server.messageIngress(h, DefaultPubsubTopic, msg) # every msg is on the server if i notin missingIdx: - client.messageIngress(h, msg) # all but 100 are on the client + client.messageIngress(h, DefaultPubsubTopic, msg) # all but 100 are on the client ts += Timestamp(step) ## ── sanity before we start the round ───────────────────────────────── @@ -278,10 +358,17 @@ suite "Waku Sync: reconciliation": let res = await client.storeSynchronization(some(serverPeerInfo)) assert res.isOk(), $res.error - ## ── verify that ≈100 diffs were queued (allow 10 % slack) ──────────── - check remoteNeeds.len >= 90 # ≈ 100 × 0.9 + ## ── verify that ≈1000 diffs were queued (allow 10 % slack) ──────────── + check remoteNeeds.len >= 900 # ≈ 1000 × 0.9 asyncTest "sync 2 nodes 400K msgs 100k diffs": + server = await newTestWakuRecon( + serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) + client = await newTestWakuRecon( + clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) + const msgCount = 400_000 diffCount = 100_000 @@ -301,9 +388,9 @@ suite "Waku Sync: reconciliation": msg = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic) h = computeMessageHash(DefaultPubsubTopic, msg) - server.messageIngress(h, msg) + server.messageIngress(h, DefaultPubsubTopic, msg) if i notin missingIdx: - client.messageIngress(h, msg) + client.messageIngress(h, DefaultPubsubTopic, msg) else: diffMsgHashes.incl h @@ -319,6 +406,13 @@ suite "Waku Sync: reconciliation": check deliveredHash in diffMsgHashes asyncTest "sync 2 nodes 100 msgs 20 diff – 1-second window": + server = await newTestWakuRecon( + serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) + client = await newTestWakuRecon( + clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) + const msgCount = 100 diffCount = 20 @@ -339,12 +433,12 @@ suite "Waku Sync: reconciliation": for i in 0 ..< msgCount: let msg = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic) let hash = computeMessageHash(DefaultPubsubTopic, msg) - server.messageIngress(hash, msg) + server.messageIngress(hash, DefaultPubsubTopic, msg) if i in missingIdx: diffMsgHashes.incl hash else: - client.messageIngress(hash, msg) + client.messageIngress(hash, DefaultPubsubTopic, msg) ts += Timestamp(step) @@ -360,6 +454,13 @@ suite "Waku Sync: reconciliation": check deliveredHash in diffMsgHashes asyncTest "sync 2 nodes 500k msgs 300k diff – stress window": + server = await newTestWakuRecon( + serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) + client = await newTestWakuRecon( + clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) + const msgCount = 500_000 diffCount = 300_000 @@ -385,12 +486,12 @@ suite "Waku Sync: reconciliation": for i in 0 ..< msgCount: let msg = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic) let hash = computeMessageHash(DefaultPubsubTopic, msg) - server.messageIngress(hash, msg) + server.messageIngress(hash, DefaultPubsubTopic, msg) if i in missingSet: diffMsgHashes.incl hash else: - client.messageIngress(hash, msg) + client.messageIngress(hash, DefaultPubsubTopic, msg) ts += Timestamp(step) @@ -406,6 +507,13 @@ suite "Waku Sync: reconciliation": check deliveredHash in diffMsgHashes asyncTest "sync 2 nodes, 40 msgs: 18 in-window diff, 20 out-window ignored": + server = await newTestWakuRecon( + serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) + client = await newTestWakuRecon( + clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) + const diffInWin = 18 diffOutWin = 20 @@ -428,7 +536,7 @@ suite "Waku Sync: reconciliation": for _ in 0 ..< diffInWin: let msg = fakeWakuMessage(ts = Timestamp ts, contentTopic = DefaultContentTopic) let hash = computeMessageHash(DefaultPubsubTopic, msg) - server.messageIngress(hash, msg) + server.messageIngress(hash, DefaultPubsubTopic, msg) inWinHashes.incl hash ts += Timestamp(stepIn) @@ -436,7 +544,7 @@ suite "Waku Sync: reconciliation": for _ in 0 ..< diffOutWin: let msg = fakeWakuMessage(ts = Timestamp ts, contentTopic = DefaultContentTopic) let hash = computeMessageHash(DefaultPubsubTopic, msg) - server.messageIngress(hash, msg) + server.messageIngress(hash, DefaultPubsubTopic, msg) outWinHashes.incl hash ts += Timestamp(stepOut) @@ -445,11 +553,11 @@ suite "Waku Sync: reconciliation": let oneSec = timer.seconds(1) server = await newTestWakuRecon( - serverSwitch, idsChannel, localWants, remoteNeeds, syncRange = oneSec + serverSwitch, @[], @[], oneSec, idsChannel, localWants, remoteNeeds ) client = await newTestWakuRecon( - clientSwitch, idsChannel, localWants, remoteNeeds, syncRange = oneSec + clientSwitch, @[], @[], oneSec, idsChannel, localWants, remoteNeeds ) defer: @@ -467,6 +575,13 @@ suite "Waku Sync: reconciliation": check deliveredHashes notin outWinHashes asyncTest "hash-fingerprint collision, same timestamp – stable sort": + server = await newTestWakuRecon( + serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) + client = await newTestWakuRecon( + clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) + let ts = Timestamp(getNowInNanosecondTime()) var msg1 = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic) @@ -482,13 +597,17 @@ suite "Waku Sync: reconciliation": check h1 != h2 - server.messageIngress(h1, msg1) - client.messageIngress(h2, msg2) + server.messageIngress(h1, DefaultPubsubTopic, msg1) + client.messageIngress(h2, DefaultPubsubTopic, msg2) check remoteNeeds.len == 0 - server = await newTestWakuRecon(serverSwitch, idsChannel, localWants, remoteNeeds) + server = await newTestWakuRecon( + serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) - client = await newTestWakuRecon(clientSwitch, idsChannel, localWants, remoteNeeds) + client = await newTestWakuRecon( + clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) defer: server.stop() @@ -508,6 +627,13 @@ suite "Waku Sync: reconciliation": check vec[0].time == ts and vec[1].time == ts asyncTest "malformed message-ID is ignored during reconciliation": + server = await newTestWakuRecon( + serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) + client = await newTestWakuRecon( + clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) + let nowTs = Timestamp(getNowInNanosecondTime()) let goodMsg = fakeWakuMessage(ts = nowTs, contentTopic = DefaultContentTopic) @@ -518,13 +644,17 @@ suite "Waku Sync: reconciliation": badHash[i] = 0'u8 let badMsg = fakeWakuMessage(ts = Timestamp(0), contentTopic = DefaultContentTopic) - server.messageIngress(goodHash, goodMsg) - server.messageIngress(badHash, badMsg) + server.messageIngress(goodHash, DefaultPubsubTopic, goodMsg) + server.messageIngress(badHash, DefaultPubsubTopic, badMsg) check remoteNeeds.len == 0 - server = await newTestWakuRecon(serverSwitch, idsChannel, localWants, remoteNeeds) - client = await newTestWakuRecon(clientSwitch, idsChannel, localWants, remoteNeeds) + server = await newTestWakuRecon( + serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) + client = await newTestWakuRecon( + clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) defer: server.stop() @@ -539,6 +669,13 @@ suite "Waku Sync: reconciliation": check neededHash != badHash asyncTest "malformed ID: future-timestamp msg is ignored": + server = await newTestWakuRecon( + serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) + client = await newTestWakuRecon( + clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) + let nowNs = getNowInNanosecondTime() let tsNow = Timestamp(nowNs) @@ -552,12 +689,16 @@ suite "Waku Sync: reconciliation": let badMsg = fakeWakuMessage(ts = badTs, contentTopic = DefaultContentTopic) let badHash = computeMessageHash(DefaultPubsubTopic, badMsg) - server.messageIngress(goodHash, goodMsg) - server.messageIngress(badHash, badMsg) + server.messageIngress(goodHash, DefaultPubsubTopic, goodMsg) + server.messageIngress(badHash, DefaultPubsubTopic, badMsg) check remoteNeeds.len == 0 - server = await newTestWakuRecon(serverSwitch, idsChannel, localWants, remoteNeeds) - client = await newTestWakuRecon(clientSwitch, idsChannel, localWants, remoteNeeds) + server = await newTestWakuRecon( + serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) + client = await newTestWakuRecon( + clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) defer: server.stop() @@ -572,16 +713,27 @@ suite "Waku Sync: reconciliation": check neededHash != badHash asyncTest "duplicate ID is queued only once": + server = await newTestWakuRecon( + serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) + client = await newTestWakuRecon( + clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) + let ts = Timestamp(getNowInNanosecondTime()) let msg = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic) let h = computeMessageHash(DefaultPubsubTopic, msg) - server.messageIngress(h, msg) - server.messageIngress(h, msg) + server.messageIngress(h, DefaultPubsubTopic, msg) + server.messageIngress(h, DefaultPubsubTopic, msg) check remoteNeeds.len == 0 - server = await newTestWakuRecon(serverSwitch, idsChannel, localWants, remoteNeeds) - client = await newTestWakuRecon(clientSwitch, idsChannel, localWants, remoteNeeds) + server = await newTestWakuRecon( + serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) + client = await newTestWakuRecon( + clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) defer: server.stop() @@ -595,19 +747,30 @@ suite "Waku Sync: reconciliation": check neededHash == h asyncTest "sync terminates immediately when no diffs exist": + server = await newTestWakuRecon( + serverSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) + client = await newTestWakuRecon( + clientSwitch, @[], @[], DefaultSyncRange, idsChannel, localWants, remoteNeeds + ) + let ts = Timestamp(getNowInNanosecondTime()) let msg = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic) let hash = computeMessageHash(DefaultPubsubTopic, msg) - server.messageIngress(hash, msg) - client.messageIngress(hash, msg) + server.messageIngress(hash, DefaultPubsubTopic, msg) + client.messageIngress(hash, DefaultPubsubTopic, msg) - let idsQ = newAsyncQueue[SyncID]() + let idsQ = newAsyncQueue[(SyncID, PubsubTopic, ContentTopic)]() let wantsQ = newAsyncQueue[PeerId]() let needsQ = newAsyncQueue[(PeerId, Fingerprint)]() - server = await newTestWakuRecon(serverSwitch, idsQ, wantsQ, needsQ) - client = await newTestWakuRecon(clientSwitch, idsQ, wantsQ, needsQ) + server = await newTestWakuRecon( + serverSwitch, @[], @[], DefaultSyncRange, idsQ, wantsQ, needsQ + ) + client = await newTestWakuRecon( + clientSwitch, @[], @[], DefaultSyncRange, idsQ, wantsQ, needsQ + ) defer: server.stop() @@ -630,10 +793,10 @@ suite "Waku Sync: transfer": clientArchive {.threadvar.}: WakuArchive var - serverIds {.threadvar.}: AsyncQueue[SyncID] + serverIds {.threadvar.}: AsyncQueue[(SyncID, PubsubTopic, ContentTopic)] serverLocalWants {.threadvar.}: AsyncQueue[PeerId] serverRemoteNeeds {.threadvar.}: AsyncQueue[(PeerId, WakuMessageHash)] - clientIds {.threadvar.}: AsyncQueue[SyncID] + clientIds {.threadvar.}: AsyncQueue[(SyncID, PubsubTopic, ContentTopic)] clientLocalWants {.threadvar.}: AsyncQueue[PeerId] clientRemoteNeeds {.threadvar.}: AsyncQueue[(PeerId, WakuMessageHash)] @@ -661,7 +824,7 @@ suite "Waku Sync: transfer": serverPeerManager = PeerManager.new(serverSwitch) clientPeerManager = PeerManager.new(clientSwitch) - serverIds = newAsyncQueue[SyncID]() + serverIds = newAsyncQueue[(SyncID, PubsubTopic, ContentTopic)]() serverLocalWants = newAsyncQueue[PeerId]() serverRemoteNeeds = newAsyncQueue[(PeerId, WakuMessageHash)]() @@ -673,7 +836,7 @@ suite "Waku Sync: transfer": remoteNeedsRx = serverRemoteNeeds, ) - clientIds = newAsyncQueue[SyncID]() + clientIds = newAsyncQueue[(SyncID, PubsubTopic, ContentTopic)]() clientLocalWants = newAsyncQueue[PeerId]() clientRemoteNeeds = newAsyncQueue[(PeerId, WakuMessageHash)]() @@ -733,7 +896,8 @@ suite "Waku Sync: transfer": check: response.messages.len > 0 - asyncTest "Check the exact missing messages are received": + ## Disabled until we impl. DOS protection again + #[ asyncTest "Check the exact missing messages are received": let timeSlice = calculateTimeRange() let timeWindow = int64(timeSlice.b) - int64(timeSlice.a) let (part, _) = divmod(timeWindow, 3) @@ -768,4 +932,4 @@ suite "Waku Sync: transfer": check received == expected - check clientIds.len == 0 + check clientIds.len == 0 ]# diff --git a/tests/waku_store_sync/test_range_split.nim b/tests/waku_store_sync/test_range_split.nim index 50ebc39fd..546f2cfa5 100644 --- a/tests/waku_store_sync/test_range_split.nim +++ b/tests/waku_store_sync/test_range_split.nim @@ -2,6 +2,8 @@ import unittest, nimcrypto, std/sequtils, results import ../../waku/waku_store_sync/[reconciliation, common] import ../../waku/waku_store_sync/storage/seq_storage import ../../waku/waku_core/message/digest +import ../../waku/waku_core/topics/pubsub_topic +import ../../waku/waku_core/topics/content_topic proc toDigest(s: string): WakuMessageHash = let d = nimcrypto.keccak256.digest((s & "").toOpenArrayByte(0, (s.len - 1))) @@ -18,8 +20,8 @@ suite "Waku Sync – reconciliation": const N = 2_048 const mismatchI = 70 - let local = SeqStorage.new(@[]) - let remote = SeqStorage.new(@[]) + let local = SeqStorage.new(@[], @[], @[]) + let remote = SeqStorage.new(@[], @[], @[]) var baseHashMismatch: WakuMessageHash var remoteHashMismatch: WakuMessageHash @@ -27,7 +29,10 @@ suite "Waku Sync – reconciliation": for i in 0 ..< N: let ts = 1000 + i let hashLocal = toDigest("msg" & $i) - local.insert(SyncID(time: ts, hash: hashLocal)).isOkOr: + + local.insert( + SyncID(time: ts, hash: hashLocal), DefaultPubsubTopic, DefaultContentTopic + ).isOkOr: assert false, "failed to insert hash: " & $error var hashRemote = hashLocal @@ -35,18 +40,23 @@ suite "Waku Sync – reconciliation": baseHashMismatch = hashLocal remoteHashMismatch = toDigest("msg" & $i & "_x") hashRemote = remoteHashMismatch - remote.insert(SyncID(time: ts, hash: hashRemote)).isOkOr: + + remote.insert( + SyncID(time: ts, hash: hashRemote), DefaultPubsubTopic, DefaultContentTopic + ).isOkOr: assert false, "failed to insert hash: " & $error var z: WakuMessageHash let whole = SyncID(time: 1000, hash: z) .. SyncID(time: 1000 + N - 1, hash: z) - check local.computeFingerprint(whole) != remote.computeFingerprint(whole) + check local.computeFingerprint(whole, @[DefaultPubsubTopic], @[DefaultContentTopic]) != + remote.computeFingerprint(whole, @[DefaultPubsubTopic], @[DefaultContentTopic]) - let remoteFp = remote.computeFingerprint(whole) + let remoteFp = + remote.computeFingerprint(whole, @[DefaultPubsubTopic], @[DefaultContentTopic]) let payload = RangesData( - cluster: 0, - shards: @[0], + pubsubTopics: @[DefaultPubsubTopic], + contentTopics: @[DefaultContentTopic], ranges: @[(whole, RangeType.Fingerprint)], fingerprints: @[remoteFp], itemSets: @[], @@ -75,8 +85,10 @@ suite "Waku Sync – reconciliation": const threshold = 4 const partitions = 2 - let local = SeqStorage.new(@[], threshold = threshold, partitions = partitions) - let remote = SeqStorage.new(@[], threshold = threshold, partitions = partitions) + let local = + SeqStorage.new(@[], @[], @[], threshold = threshold, partitions = partitions) + let remote = + SeqStorage.new(@[], @[], @[], threshold = threshold, partitions = partitions) var mismatchHash: WakuMessageHash for i in 0 ..< 8: @@ -90,8 +102,12 @@ suite "Waku Sync – reconciliation": mismatchHash = toDigest("msg" & $i & "_x") localHash = mismatchHash - discard local.insert (SyncID(time: t, hash: localHash)) - discard remote.insert(SyncID(time: t, hash: remoteHash)) + discard local.insert( + SyncID(time: t, hash: localHash), DefaultPubsubTopic, DefaultContentTopic + ) + discard remote.insert( + SyncID(time: t, hash: remoteHash), DefaultPubsubTopic, DefaultContentTopic + ) var zeroHash: WakuMessageHash let wholeRange = @@ -100,10 +116,15 @@ suite "Waku Sync – reconciliation": var toSend, toRecv: seq[WakuMessageHash] let payload = RangesData( - cluster: 0, - shards: @[0], + pubsubTopics: @[DefaultPubsubTopic], + contentTopics: @[DefaultContentTopic], ranges: @[(wholeRange, RangeType.Fingerprint)], - fingerprints: @[remote.computeFingerprint(wholeRange)], + fingerprints: + @[ + remote.computeFingerprint( + wholeRange, @[DefaultPubsubTopic], @[DefaultContentTopic] + ) + ], itemSets: @[], ) @@ -120,15 +141,18 @@ suite "Waku Sync – reconciliation": const N = 2_048 const mismatchI = 70 - let local = SeqStorage.new(@[]) - let remote = SeqStorage.new(@[]) + let local = SeqStorage.new(@[], @[], @[]) + let remote = SeqStorage.new(@[], @[], @[]) var baseHashMismatch, remoteHashMismatch: WakuMessageHash for i in 0 ..< N: let ts = 1000 + i let hashLocal = toDigest("msg" & $i) - local.insert(SyncID(time: ts, hash: hashLocal)).isOkOr: + + local.insert( + SyncID(time: ts, hash: hashLocal), DefaultPubsubTopic, DefaultContentTopic + ).isOkOr: assert false, "failed to insert hash: " & $error var hashRemote = hashLocal @@ -136,19 +160,32 @@ suite "Waku Sync – reconciliation": baseHashMismatch = hashLocal remoteHashMismatch = toDigest("msg" & $i & "_x") hashRemote = remoteHashMismatch - remote.insert(SyncID(time: ts, hash: hashRemote)).isOkOr: + + remote.insert( + SyncID(time: ts, hash: hashRemote), DefaultPubsubTopic, DefaultContentTopic + ).isOkOr: assert false, "failed to insert hash: " & $error var zero: WakuMessageHash let sliceWhole = SyncID(time: 1000, hash: zero) .. SyncID(time: 1000 + N - 1, hash: zero) - check local.computeFingerprint(sliceWhole) != remote.computeFingerprint(sliceWhole) + check local.computeFingerprint( + sliceWhole, @[DefaultPubsubTopic], @[DefaultContentTopic] + ) != + remote.computeFingerprint( + sliceWhole, @[DefaultPubsubTopic], @[DefaultContentTopic] + ) let payload1 = RangesData( - cluster: 0, - shards: @[0], + pubsubTopics: @[DefaultPubsubTopic], + contentTopics: @[DefaultContentTopic], ranges: @[(sliceWhole, RangeType.Fingerprint)], - fingerprints: @[remote.computeFingerprint(sliceWhole)], + fingerprints: + @[ + remote.computeFingerprint( + sliceWhole, @[DefaultPubsubTopic], @[DefaultContentTopic] + ) + ], itemSets: @[], ) @@ -167,10 +204,15 @@ suite "Waku Sync – reconciliation": check subSlice.a.time != 0 let payload2 = RangesData( - cluster: 0, - shards: @[0], + pubsubTopics: @[DefaultPubsubTopic], + contentTopics: @[DefaultContentTopic], ranges: @[(subSlice, RangeType.Fingerprint)], - fingerprints: @[remote.computeFingerprint(subSlice)], + fingerprints: + @[ + remote.computeFingerprint( + subSlice, @[DefaultPubsubTopic], @[DefaultContentTopic] + ) + ], itemSets: @[], ) @@ -192,8 +234,8 @@ suite "Waku Sync – reconciliation": check toRecv2.len == 0 test "second-round payload remote": - let local = SeqStorage.new(@[]) - let remote = SeqStorage.new(@[]) + let local = SeqStorage.new(@[], @[], @[]) + let remote = SeqStorage.new(@[], @[], @[]) var baseHash: WakuMessageHash var alteredHash: WakuMessageHash @@ -201,7 +243,10 @@ suite "Waku Sync – reconciliation": for i in 0 ..< 8: let ts = 1000 + i let hashLocal = toDigest("msg" & $i) - local.insert(SyncID(time: ts, hash: hashLocal)).isOkOr: + + local.insert( + SyncID(time: ts, hash: hashLocal), DefaultPubsubTopic, DefaultContentTopic + ).isOkOr: assert false, "failed to insert hash: " & $error var hashRemote = hashLocal @@ -210,23 +255,33 @@ suite "Waku Sync – reconciliation": alteredHash = toDigest("msg" & $i & "_x") hashRemote = alteredHash - remote.insert(SyncID(time: ts, hash: hashRemote)).isOkOr: + remote.insert( + SyncID(time: ts, hash: hashRemote), DefaultPubsubTopic, DefaultContentTopic + ).isOkOr: assert false, "failed to insert hash: " & $error var zero: WakuMessageHash let slice = SyncID(time: 1000, hash: zero) .. SyncID(time: 1007, hash: zero) - check local.computeFingerprint(slice) != remote.computeFingerprint(slice) + check local.computeFingerprint(slice, @[DefaultPubsubTopic], @[DefaultContentTopic]) != + remote.computeFingerprint(slice, @[DefaultPubsubTopic], @[DefaultContentTopic]) var toSend1, toRecv1: seq[WakuMessageHash] - let pay1 = RangesData( - cluster: 0, - shards: @[0], + + let rangeData = RangesData( + pubsubTopics: @[DefaultPubsubTopic], + contentTopics: @[DefaultContentTopic], ranges: @[(slice, RangeType.Fingerprint)], - fingerprints: @[remote.computeFingerprint(slice)], + fingerprints: + @[ + remote.computeFingerprint( + slice, @[DefaultPubsubTopic], @[DefaultContentTopic] + ) + ], itemSets: @[], ) - let rep1 = local.processPayload(pay1, toSend1, toRecv1) + + let rep1 = local.processPayload(rangeData, toSend1, toRecv1) check rep1.ranges.len == 1 check rep1.ranges[0][1] == RangeType.ItemSet diff --git a/tests/waku_store_sync/test_state_transition.nim b/tests/waku_store_sync/test_state_transition.nim index 732a577a9..d94d6bed2 100644 --- a/tests/waku_store_sync/test_state_transition.nim +++ b/tests/waku_store_sync/test_state_transition.nim @@ -2,6 +2,8 @@ import unittest, nimcrypto, std/sequtils import ../../waku/waku_store_sync/[reconciliation, common] import ../../waku/waku_store_sync/storage/seq_storage import ../../waku/waku_core/message/digest +import ../../waku/waku_core/topics/pubsub_topic +import ../../waku/waku_core/topics/content_topic proc toDigest*(s: string): WakuMessageHash = let d = nimcrypto.keccak256.digest((s & "").toOpenArrayByte(0, s.high)) @@ -16,30 +18,38 @@ suite "Waku Sync – reconciliation": const N = 2_000 const idx = 137 - let local = SeqStorage.new(@[]) - let remote = SeqStorage.new(@[]) + let local = SeqStorage.new(@[], @[], @[]) + let remote = SeqStorage.new(@[], @[], @[]) var baseH, altH: WakuMessageHash for i in 0 ..< N: let ts = 1000 + i let h = toDigest("msg" & $i) - discard local.insert(SyncID(time: ts, hash: h)) + discard + local.insert(SyncID(time: ts, hash: h), DefaultPubsubTopic, DefaultContentTopic) var hr = h if i == idx: baseH = h altH = toDigest("msg" & $i & "x") hr = altH - discard remote.insert(SyncID(time: ts, hash: hr)) + discard remote.insert( + SyncID(time: ts, hash: hr), DefaultPubsubTopic, DefaultContentTopic + ) var z: WakuMessageHash let whole = SyncID(time: 1000, hash: z) .. SyncID(time: 1000 + N - 1, hash: z) var s1, r1: seq[WakuMessageHash] let p1 = RangesData( - cluster: 0, - shards: @[0], + pubsubTopics: @[DefaultPubsubTopic], + contentTopics: @[DefaultContentTopic], ranges: @[(whole, RangeType.Fingerprint)], - fingerprints: @[remote.computeFingerprint(whole)], + fingerprints: + @[ + remote.computeFingerprint( + whole, @[DefaultPubsubTopic], @[DefaultContentTopic] + ) + ], itemSets: @[], ) let rep1 = local.processPayload(p1, s1, r1) @@ -52,10 +62,11 @@ suite "Waku Sync – reconciliation": var s2, r2: seq[WakuMessageHash] let p2 = RangesData( - cluster: 0, - shards: @[0], + pubsubTopics: @[DefaultPubsubTopic], + contentTopics: @[DefaultContentTopic], ranges: @[(sub, RangeType.Fingerprint)], - fingerprints: @[remote.computeFingerprint(sub)], + fingerprints: + @[remote.computeFingerprint(sub, @[DefaultPubsubTopic], @[DefaultContentTopic])], itemSets: @[], ) let rep2 = local.processPayload(p2, s2, r2) @@ -67,15 +78,20 @@ suite "Waku Sync – reconciliation": check s3.len == 1 and s3[0] == altH check r3.len == 1 and r3[0] == baseH - discard local.insert(SyncID(time: mismT, hash: altH)) - discard remote.insert(SyncID(time: mismT, hash: baseH)) + discard local.insert( + SyncID(time: mismT, hash: altH), DefaultPubsubTopic, DefaultContentTopic + ) + discard remote.insert( + SyncID(time: mismT, hash: baseH), DefaultPubsubTopic, DefaultContentTopic + ) var s4, r4: seq[WakuMessageHash] let p3 = RangesData( - cluster: 0, - shards: @[0], + pubsubTopics: @[DefaultPubsubTopic], + contentTopics: @[DefaultContentTopic], ranges: @[(sub, RangeType.Fingerprint)], - fingerprints: @[remote.computeFingerprint(sub)], + fingerprints: + @[remote.computeFingerprint(sub, @[DefaultPubsubTopic], @[DefaultContentTopic])], itemSets: @[], ) let rep3 = local.processPayload(p3, s4, r4) @@ -86,14 +102,15 @@ suite "Waku Sync – reconciliation": const N = 120 const pivot = 60 - let local = SeqStorage.new(@[]) - let remote = SeqStorage.new(@[]) + let local = SeqStorage.new(@[], @[], @[]) + let remote = SeqStorage.new(@[], @[], @[]) var diffHash: WakuMessageHash for i in 0 ..< N: let ts = 1000 + i let h = toDigest("msg" & $i) - discard local.insert(SyncID(time: ts, hash: h)) + discard + local.insert(SyncID(time: ts, hash: h), DefaultPubsubTopic, DefaultContentTopic) var hr: WakuMessageHash if i >= pivot: diffHash = toDigest("msg" & $i & "_x") @@ -101,7 +118,9 @@ suite "Waku Sync – reconciliation": else: hr = h - discard remote.insert(SyncID(time: ts, hash: hr)) + discard remote.insert( + SyncID(time: ts, hash: hr), DefaultPubsubTopic, DefaultContentTopic + ) var z: WakuMessageHash let sliceA = SyncID(time: 1000, hash: z) .. SyncID(time: 1059, hash: z) @@ -109,11 +128,18 @@ suite "Waku Sync – reconciliation": var s, r: seq[WakuMessageHash] let payload = RangesData( - cluster: 0, - shards: @[0], + pubsubTopics: @[DefaultPubsubTopic], + contentTopics: @[DefaultContentTopic], ranges: @[(sliceA, RangeType.Fingerprint), (sliceB, RangeType.Fingerprint)], fingerprints: - @[remote.computeFingerprint(sliceA), remote.computeFingerprint(sliceB)], + @[ + remote.computeFingerprint( + sliceA, @[DefaultPubsubTopic], @[DefaultContentTopic] + ), + remote.computeFingerprint( + sliceB, @[DefaultPubsubTopic], @[DefaultContentTopic] + ), + ], itemSets: @[], ) let reply = local.processPayload(payload, s, r) @@ -135,23 +161,31 @@ suite "Waku Sync – reconciliation": for i in 0 ..< N: let ts = 1000 + i let h = toDigest("msg" & $i) - discard local.insert(SyncID(time: ts, hash: h)) + discard + local.insert(SyncID(time: ts, hash: h), DefaultPubsubTopic, DefaultContentTopic) var hr = h if i == idx: baseH = h altH = toDigest("msg" & $i & "_x") hr = altH - discard remote.insert(SyncID(time: ts, hash: hr)) + discard remote.insert( + SyncID(time: ts, hash: hr), DefaultPubsubTopic, DefaultContentTopic + ) var z: WakuMessageHash let slice = SyncID(time: 1000, hash: z) .. SyncID(time: 1000 + N - 1, hash: z) var toS, toR: seq[WakuMessageHash] let p = RangesData( - cluster: 0, - shards: @[0], + pubsubTopics: @[DefaultPubsubTopic], + contentTopics: @[DefaultContentTopic], ranges: @[(slice, RangeType.Fingerprint)], - fingerprints: @[remote.computeFingerprint(slice)], + fingerprints: + @[ + remote.computeFingerprint( + slice, @[DefaultPubsubTopic], @[DefaultContentTopic] + ) + ], itemSets: @[], ) let reply = local.processPayload(p, toS, toR) @@ -171,14 +205,15 @@ suite "Waku Sync – reconciliation": const N = 80_000 const bad = N - 10 - let local = SeqStorage.new(@[]) - let remote = SeqStorage.new(@[]) + let local = SeqStorage.new(@[], @[], @[]) + let remote = SeqStorage.new(@[], @[], @[]) var baseH, altH: WakuMessageHash for i in 0 ..< N: let ts = 1000 + i let h = toDigest("msg" & $i) - discard local.insert(SyncID(time: ts, hash: h)) + discard + local.insert(SyncID(time: ts, hash: h), DefaultPubsubTopic, DefaultContentTopic) let hr = if i == bad: @@ -187,7 +222,9 @@ suite "Waku Sync – reconciliation": altH else: h - discard remote.insert(SyncID(time: ts, hash: hr)) + discard remote.insert( + SyncID(time: ts, hash: hr), DefaultPubsubTopic, DefaultContentTopic + ) var slice = SyncID(time: 1000, hash: EmptyFingerprint) .. @@ -196,10 +233,15 @@ suite "Waku Sync – reconciliation": proc fpReply(s: Slice[SyncID], sendQ, recvQ: var seq[WakuMessageHash]): RangesData = local.processPayload( RangesData( - cluster: 0, - shards: @[0], + pubsubTopics: @[DefaultPubsubTopic], + contentTopics: @[DefaultContentTopic], ranges: @[(s, RangeType.Fingerprint)], - fingerprints: @[remote.computeFingerprint(s)], + fingerprints: + @[ + remote.computeFingerprint( + s, @[DefaultPubsubTopic], @[DefaultContentTopic] + ) + ], itemSets: @[], ), sendQ, @@ -213,7 +255,8 @@ suite "Waku Sync – reconciliation": check rep.ranges.len == 8 check rep.ranges.allIt(it[1] == RangeType.Fingerprint) for (sl, _) in rep.ranges: - if local.computeFingerprint(sl) != remote.computeFingerprint(sl): + if local.computeFingerprint(sl, @[DefaultPubsubTopic], @[DefaultContentTopic]) != + remote.computeFingerprint(sl, @[DefaultPubsubTopic], @[DefaultContentTopic]): slice = sl break @@ -235,8 +278,12 @@ suite "Waku Sync – reconciliation": check qSend.len == 1 and qSend[0] == altH check qRecv.len == 1 and qRecv[0] == baseH - discard local.insert(SyncID(time: slice.a.time, hash: altH)) - discard remote.insert(SyncID(time: slice.a.time, hash: baseH)) + discard local.insert( + SyncID(time: slice.a.time, hash: altH), DefaultPubsubTopic, DefaultContentTopic + ) + discard remote.insert( + SyncID(time: slice.a.time, hash: baseH), DefaultPubsubTopic, DefaultContentTopic + ) var send6, recv6: seq[WakuMessageHash] let rep6 = fpReply(slice, send6, recv6) diff --git a/tests/waku_store_sync/test_storage.nim b/tests/waku_store_sync/test_storage.nim index 9e9a80b29..930d3f7dc 100644 --- a/tests/waku_store_sync/test_storage.nim +++ b/tests/waku_store_sync/test_storage.nim @@ -1,6 +1,6 @@ {.used.} -import std/[options, random], testutils/unittests, chronos +import std/[options, random, sequtils, packedsets], testutils/unittests, chronos import ../../waku/waku_core, @@ -13,23 +13,32 @@ suite "Waku Sync Storage": var rng = initRand() let count = 10_000 var elements = newSeqOfCap[SyncID](count) + var pubsub = newSeqOfCap[PubsubTopic](count) + var content = newSeqOfCap[ContentTopic](count) + + var emptySet = @[0].toPackedSet() + emptySet.excl(0) for i in 0 ..< count: let id = SyncID(time: Timestamp(i), hash: randomHash(rng)) elements.add(id) + pubsub.add(DefaultPubsubTopic) + content.add(DefaultContentTopic) - var storage1 = SeqStorage.new(elements) - var storage2 = SeqStorage.new(elements) + var storage1 = SeqStorage.new(elements, pubsub, content) + var storage2 = SeqStorage.new(elements, pubsub, content) let lb = elements[0] let ub = elements[count - 1] let bounds = lb .. ub - let fingerprint1 = storage1.computeFingerprint(bounds) + let fingerprint1 = storage1.computeFingerprint(bounds, @[], @[]) var outputPayload: RangesData - storage2.processFingerprintRange(bounds, fingerprint1, outputPayload) + storage2.processFingerprintRange( + bounds, emptySet, emptySet, fingerprint1, outputPayload + ) let expected = RangesData(ranges: @[(bounds, RangeType.Skip)], fingerprints: @[], itemSets: @[]) @@ -42,6 +51,12 @@ suite "Waku Sync Storage": let count = 1000 var elements1 = newSeqOfCap[SyncID](count) var elements2 = newSeqOfCap[SyncID](count) + var pubsub = newSeqOfCap[PubsubTopic](count) + var content = newSeqOfCap[ContentTopic](count) + + var emptySet = @[0].toPackedSet() + emptySet.excl(0) + var diffs: seq[Fingerprint] for i in 0 ..< count: @@ -53,7 +68,10 @@ suite "Waku Sync Storage": else: diffs.add(id.hash) - var storage1 = SeqStorage.new(elements1) + pubsub.add(DefaultPubsubTopic) + content.add(DefaultContentTopic) + + var storage1 = SeqStorage.new(elements1, pubsub, content) let lb = elements1[0] let ub = elements1[count - 1] @@ -66,7 +84,9 @@ suite "Waku Sync Storage": toRecv: seq[Fingerprint] outputPayload: RangesData - storage1.processItemSetRange(bounds, itemSet2, toSend, toRecv, outputPayload) + storage1.processItemSetRange( + bounds, emptySet, emptySet, itemSet2, toSend, toRecv, outputPayload + ) check: toSend == diffs @@ -79,11 +99,11 @@ suite "Waku Sync Storage": let element1 = SyncID(time: Timestamp(1000), hash: randomHash(rng)) let element2 = SyncID(time: Timestamp(2000), hash: randomHash(rng)) - let res1 = storage.insert(element1) + let res1 = storage.insert(element1, DefaultPubsubTopic, DefaultContentTopic) assert res1.isOk(), $res1.error let count1 = storage.length() - let res2 = storage.insert(element2) + let res2 = storage.insert(element2, DefaultPubsubTopic, DefaultContentTopic) assert res2.isOk(), $res2.error let count2 = storage.length() @@ -96,24 +116,33 @@ suite "Waku Sync Storage": let element = SyncID(time: Timestamp(1000), hash: randomHash(rng)) - let storage = SeqStorage.new(@[element]) + let storage = + SeqStorage.new(@[element], @[DefaultPubsubTopic], @[DefaultContentTopic]) - let res = storage.insert(element) + let res = storage.insert(element, DefaultPubsubTopic, DefaultContentTopic) check: - res.isErr() == true + res.isErr() == false + storage.length() == 1 test "prune elements": var rng = initRand() let count = 1000 var elements = newSeqOfCap[SyncID](count) + var pubsub = newSeqOfCap[PubsubTopic](count) + var content = newSeqOfCap[ContentTopic](count) + + var emptySet = @[0].toPackedSet() + emptySet.excl(0) for i in 0 ..< count: let id = SyncID(time: Timestamp(i), hash: randomHash(rng)) elements.add(id) + pubsub.add(DefaultPubsubTopic) + content.add(DefaultContentTopic) - let storage = SeqStorage.new(elements) + let storage = SeqStorage.new(elements, pubsub, content) let beforeCount = storage.length() @@ -126,6 +155,52 @@ suite "Waku Sync Storage": pruned == 500 afterCount == 500 + test "topics recycling": + var rng = initRand() + let count = 1000 + var elements = newSeqOfCap[SyncID](count) + var pubsub = newSeqOfCap[PubsubTopic](count) + var content = newSeqOfCap[ContentTopic](count) + + var emptySet = @[0].toPackedSet() + emptySet.excl(0) + + for i in 0 ..< (count div 2): + let id = SyncID(time: Timestamp(i), hash: randomHash(rng)) + + elements.add(id) + pubsub.add(DefaultPubsubTopic) + content.add("my/custom/topic") + + for i in (count div 2) ..< count: + let id = SyncID(time: Timestamp(i), hash: randomHash(rng)) + + elements.add(id) + pubsub.add(DefaultPubsubTopic) + content.add(DefaultContentTopic) + + let storage = SeqStorage.new(elements, pubsub, content) + + let beforeCount = storage.unusedContentTopicsLen() + + let pruned = storage.prune(Timestamp(500)) + + let afterCount = storage.unusedContentTopicsLen() + + check: + beforeCount == 0 + pruned == 500 + afterCount == 1 + + let id = SyncID(time: Timestamp(1001), hash: randomHash(rng)) + let res = storage.insert(id, DefaultPubsubTopic, "my/other/topic") + assert res.isOk(), $res.error + + let reuseCount = storage.unusedContentTopicsLen() + + check: + reuseCount == 0 + ## disabled tests are rough benchmark #[ test "10M fingerprint": var rng = initRand() diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 2410f5f98..daf6f1b5c 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -235,6 +235,7 @@ proc setupProtocols( ( await node.mountStoreSync( + conf.clusterId, conf.subscribeShards, conf.contentTopics, confStoreSync.rangeSec, confStoreSync.intervalSec, confStoreSync.relayJitterSec, ) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 4ee8af3b0..06cb8e479 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -273,29 +273,24 @@ proc mountMix*( proc mountStoreSync*( node: WakuNode, - storeSyncRange = 3600.uint32, - storeSyncInterval = 300.uint32, - storeSyncRelayJitter = 20.uint32, + cluster: uint16, + shards: seq[uint16], + contentTopics: seq[string], + storeSyncRange: uint32, + storeSyncInterval: uint32, + storeSyncRelayJitter: uint32, ): Future[Result[void, string]] {.async.} = - let idsChannel = newAsyncQueue[SyncID](0) - let wantsChannel = newAsyncQueue[PeerId](0) + let idsChannel = newAsyncQueue[(SyncID, PubsubTopic, ContentTopic)](0) + let wantsChannel = newAsyncQueue[(PeerId)](0) let needsChannel = newAsyncQueue[(PeerId, WakuMessageHash)](0) - var cluster: uint16 - var shards: seq[uint16] - let enrRes = node.enr.toTyped() - if enrRes.isOk(): - let shardingRes = enrRes.get().relaySharding() - if shardingRes.isSome(): - let relayShard = shardingRes.get() - cluster = relayShard.clusterID - shards = relayShard.shardIds + let pubsubTopics = shards.mapIt($RelayShard(clusterId: cluster, shardId: it)) let recon = ?await SyncReconciliation.new( - cluster, shards, node.peerManager, node.wakuArchive, storeSyncRange.seconds, - storeSyncInterval.seconds, storeSyncRelayJitter.seconds, idsChannel, wantsChannel, - needsChannel, + pubsubTopics, contentTopics, node.peerManager, node.wakuArchive, + storeSyncRange.seconds, storeSyncInterval.seconds, storeSyncRelayJitter.seconds, + idsChannel, wantsChannel, needsChannel, ) node.wakuStoreReconciliation = recon diff --git a/waku/waku_store_sync/codec.nim b/waku/waku_store_sync/codec.nim index 815ed9d61..a00de8e0b 100644 --- a/waku/waku_store_sync/codec.nim +++ b/waku/waku_store_sync/codec.nim @@ -1,6 +1,6 @@ {.push raises: [].} -import std/sequtils, stew/leb128 +import std/sequtils, stew/[leb128, byteutils] import ../common/protobuf, ../waku_core/message, ../waku_core/time, ./common @@ -52,18 +52,26 @@ proc deltaEncode*(value: RangesData): seq[byte] = i = 0 j = 0 - # encode cluster - buf = uint64(value.cluster).toBytes(Leb128) + # encode pubsub topics + buf = uint64(value.pubsubTopics.len).toBytes(Leb128) output &= @buf - # encode shards - buf = uint64(value.shards.len).toBytes(Leb128) - output &= @buf - - for shard in value.shards: - buf = uint64(shard).toBytes(Leb128) + for topic in value.pubsubTopics: + buf = uint64(topic.len).toBytes(Leb128) output &= @buf + output &= topic.toBytes() + + # encode content topics + buf = uint64(value.contentTopics.len).toBytes(Leb128) + output &= @buf + + for topic in value.contentTopics: + buf = uint64(topic.len).toBytes(Leb128) + output &= @buf + + output &= topic.toBytes() + # the first range is implicit but must be explicit when encoded let (bound, _) = value.ranges[0] @@ -221,37 +229,34 @@ proc getReconciled(idx: var int, buffer: seq[byte]): Result[bool, string] = return ok(recon) -proc getCluster(idx: var int, buffer: seq[byte]): Result[uint16, string] = +proc getTopics(idx: var int, buffer: seq[byte]): Result[seq[string], string] = if idx + VarIntLen > buffer.len: - return err("Cannot decode cluster") + return err("Cannot decode topic count") let slice = buffer[idx ..< idx + VarIntLen] let (val, len) = uint64.fromBytes(slice, Leb128) idx += len + let topicCount = int(val) - return ok(uint16(val)) - -proc getShards(idx: var int, buffer: seq[byte]): Result[seq[uint16], string] = - if idx + VarIntLen > buffer.len: - return err("Cannot decode shards count") - - let slice = buffer[idx ..< idx + VarIntLen] - let (val, len) = uint64.fromBytes(slice, Leb128) - idx += len - let shardsLen = val - - var shards: seq[uint16] - for i in 0 ..< shardsLen: + var topics: seq[string] + for i in 0 ..< topicCount: if idx + VarIntLen > buffer.len: - return err("Cannot decode shard value. idx: " & $i) + return err("Cannot decode length. Topic index: " & $i) let slice = buffer[idx ..< idx + VarIntLen] let (val, len) = uint64.fromBytes(slice, Leb128) idx += len + let topicLen = int(val) - shards.add(uint16(val)) + if idx + topicLen > buffer.len: + return err("Cannot decode bytes. Topic index: " & $i) - return ok(shards) + let topic = string.fromBytes(buffer[idx ..< idx + topicLen]) + idx += topicLen + + topics.add(topic) + + return ok(topics) proc deltaDecode*( itemSet: var ItemSet, buffer: seq[byte], setLength: int @@ -294,8 +299,8 @@ proc deltaDecode*(T: type RangesData, buffer: seq[byte]): Result[T, string] = lastTime = Timestamp(0) idx = 0 - payload.cluster = ?getCluster(idx, buffer) - payload.shards = ?getShards(idx, buffer) + payload.pubsubTopics = ?getTopics(idx, buffer) + payload.contentTopics = ?getTopics(idx, buffer) lastTime = ?getTimestamp(idx, buffer) diff --git a/waku/waku_store_sync/common.nim b/waku/waku_store_sync/common.nim index e2eac0f85..da8a5df95 100644 --- a/waku/waku_store_sync/common.nim +++ b/waku/waku_store_sync/common.nim @@ -26,8 +26,8 @@ type ItemSet = 2 RangesData* = object - cluster*: uint16 - shards*: seq[uint16] + pubsubTopics*: seq[PubsubTopic] + contentTopics*: seq[ContentTopic] ranges*: seq[(Slice[SyncID], RangeType)] fingerprints*: seq[Fingerprint] # Range type fingerprint stored here in order diff --git a/waku/waku_store_sync/reconciliation.nim b/waku/waku_store_sync/reconciliation.nim index 0601d2c23..838c49400 100644 --- a/waku/waku_store_sync/reconciliation.nim +++ b/waku/waku_store_sync/reconciliation.nim @@ -1,7 +1,7 @@ {.push raises: [].} import - std/[sequtils, options, packedsets], + std/[sequtils, options, sets], stew/byteutils, results, chronicles, @@ -20,6 +20,7 @@ import ../waku_core/codecs, ../waku_core/time, ../waku_core/topics/pubsub_topic, + ../waku_core/topics/content_topic, ../waku_core/message/digest, ../waku_core/message/message, ../node/peer_manager/peer_manager, @@ -37,8 +38,8 @@ logScope: const DefaultStorageCap = 50_000 type SyncReconciliation* = ref object of LPProtocol - cluster: uint16 - shards: PackedSet[uint16] + pubsubTopics: HashSet[PubsubTopic] # Empty set means accept all. See spec. + contentTopics: HashSet[ContentTopic] # Empty set means accept all. See spec. peerManager: PeerManager @@ -46,10 +47,13 @@ type SyncReconciliation* = ref object of LPProtocol storage: SyncStorage - # AsyncQueues are used as communication channels between - # reconciliation and transfer protocols. - idsRx: AsyncQueue[SyncID] - localWantsTx: AsyncQueue[PeerId] + # Receive IDs from transfer protocol for storage + idsRx: AsyncQueue[(SyncID, PubsubTopic, ContentTopic)] + + # Send Hashes to transfer protocol for reception + localWantsTx: AsyncQueue[(PeerId)] + + # Send Hashes to transfer protocol for transmission remoteNeedsTx: AsyncQueue[(PeerId, WakuMessageHash)] # params @@ -74,11 +78,14 @@ proc messageIngress*( let id = SyncID(time: msg.timestamp, hash: msgHash) - self.storage.insert(id).isOkOr: - error "failed to insert new message", msg_hash = msgHash.toHex(), err = error + self.storage.insert(id, pubsubTopic, msg.contentTopic).isOkOr: + error "failed to insert new message", msg_hash = $id.hash.toHex(), error = $error proc messageIngress*( - self: SyncReconciliation, msgHash: WakuMessageHash, msg: WakuMessage + self: SyncReconciliation, + msgHash: WakuMessageHash, + pubsubTopic: PubsubTopic, + msg: WakuMessage, ) = trace "message ingress", msg_hash = msgHash.toHex(), msg = msg @@ -87,14 +94,69 @@ proc messageIngress*( let id = SyncID(time: msg.timestamp, hash: msgHash) - self.storage.insert(id).isOkOr: - error "failed to insert new message", msg_hash = msgHash.toHex(), err = error + self.storage.insert(id, pubsubTopic, msg.contentTopic).isOkOr: + error "failed to insert new message", msg_hash = $id.hash.toHex(), error = $error -proc messageIngress*(self: SyncReconciliation, id: SyncID) = - trace "message ingress", id = id +proc messageIngress*( + self: SyncReconciliation, + id: SyncID, + pubsubTopic: PubsubTopic, + contentTopic: ContentTopic, +) = + self.storage.insert(id, pubsubTopic, contentTopic).isOkOr: + error "failed to insert new message", msg_hash = $id.hash.toHex(), error = $error - self.storage.insert(id).isOkOr: - error "failed to insert new message", msg_hash = id.hash.toHex(), err = error +proc preProcessPayload( + self: SyncReconciliation, payload: RangesData +): Option[RangesData] = + ## Check the received payload for topics and/or time mismatch. + + var payload = payload + + # Always use the smallest pubsub topic scope possible + if payload.pubsubTopics.len > 0 and self.pubsubTopics.len > 0: + let pubsubIntersection = self.pubsubTopics * payload.pubsubTopics.toHashSet() + + if pubsubIntersection.len < 1: + return none(RangesData) + + payload.pubsubTopics = pubsubIntersection.toSeq() + elif self.pubsubTopics.len > 0: + payload.pubsubTopics = self.pubsubTopics.toSeq() + + # Always use the smallest content topic scope possible + if payload.contentTopics.len > 0 and self.contentTopics.len > 0: + let contentIntersection = self.contentTopics * payload.contentTopics.toHashSet() + + if contentIntersection.len < 1: + return none(RangesData) + + payload.contentTopics = contentIntersection.toSeq() + elif self.contentTopics.len > 0: + payload.contentTopics = self.contentTopics.toSeq() + + let timeRange = calculateTimeRange(self.relayJitter, self.syncRange) + let selfLowerBound = timeRange.a + + # for non skip ranges check if they happen before any of our ranges + # convert to skip range before processing + for i in 0 ..< payload.ranges.len: + let rangeType = payload.ranges[i][1] + if rangeType != RangeType.Skip: + continue + + let upperBound = payload.ranges[i][0].b.time + if selfLowerBound > upperBound: + payload.ranges[i][1] = RangeType.Skip + + if rangeType == RangeType.Fingerprint: + payload.fingerprints.delete(0) + elif rangeType == RangeType.ItemSet: + payload.itemSets.delete(0) + else: + break + + return some(payload) proc processRequest( self: SyncReconciliation, conn: Connection @@ -136,16 +198,23 @@ proc processRequest( sendPayload: RangesData rawPayload: seq[byte] - # Only process the ranges IF the shards and cluster matches - if self.cluster == recvPayload.cluster and - recvPayload.shards.toPackedSet() == self.shards: - sendPayload = self.storage.processPayload(recvPayload, hashToSend, hashToRecv) + let preProcessedPayloadRes = self.preProcessPayload(recvPayload) + if preProcessedPayloadRes.isSome(): + let preProcessedPayload = preProcessedPayloadRes.get() + + trace "pre-processed payload", + local = self.peerManager.switch.peerInfo.peerId, + remote = conn.peerId, + payload = preProcessedPayload + + sendPayload = + self.storage.processPayload(preProcessedPayload, hashToSend, hashToRecv) trace "sync payload processed", hash_to_send = hashToSend, hash_to_recv = hashToRecv - sendPayload.cluster = self.cluster - sendPayload.shards = self.shards.toSeq() + sendPayload.pubsubTopics = self.pubsubTopics.toSeq() + sendPayload.contentTopics = self.contentTopics.toSeq() for hash in hashToSend: self.remoteNeedsTx.addLastNoWait((conn.peerId, hash)) @@ -187,18 +256,24 @@ proc processRequest( return ok() proc initiate( - self: SyncReconciliation, connection: Connection + self: SyncReconciliation, + connection: Connection, + offset: Duration, + syncRange: Duration, + pubsubTopics: seq[PubsubTopic], + contentTopics: seq[ContentTopic], ): Future[Result[void, string]] {.async.} = let - timeRange = calculateTimeRange(self.relayJitter, self.syncRange) + timeRange = calculateTimeRange(offset, syncRange) lower = SyncID(time: timeRange.a, hash: EmptyFingerprint) upper = SyncID(time: timeRange.b, hash: FullFingerprint) bounds = lower .. upper - fingerprint = self.storage.computeFingerprint(bounds) + fingerprint = self.storage.computeFingerprint(bounds, pubsubTopics, contentTopics) + initPayload = RangesData( - cluster: self.cluster, - shards: self.shards.toSeq(), + pubsubTopics: pubsubTopics, + contentTopics: contentTopics, ranges: @[(bounds, RangeType.Fingerprint)], fingerprints: @[fingerprint], itemSets: @[], @@ -227,7 +302,12 @@ proc initiate( return ok() proc storeSynchronization*( - self: SyncReconciliation, peerInfo: Option[RemotePeerInfo] = none(RemotePeerInfo) + self: SyncReconciliation, + peerInfo: Option[RemotePeerInfo] = none(RemotePeerInfo), + offset: Duration = self.relayJitter, + syncRange: Duration = self.syncRange, + pubsubTopics: HashSet[PubsubTopic] = self.pubsubTopics, + contentTopics: HashSet[ContentTopic] = self.contentTopics, ): Future[Result[void, string]] {.async.} = let peer = peerInfo.valueOr: self.peerManager.selectPeer(WakuReconciliationCodec).valueOr: @@ -241,7 +321,11 @@ proc storeSynchronization*( debug "sync session initialized", local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId - (await self.initiate(conn)).isOkOr: + ( + await self.initiate( + conn, offset, syncRange, pubsubTopics.toSeq(), contentTopics.toSeq() + ) + ).isOkOr: error "sync session failed", local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId, err = error @@ -254,15 +338,13 @@ proc storeSynchronization*( proc initFillStorage( syncRange: timer.Duration, wakuArchive: WakuArchive -): Future[Result[seq[SyncID], string]] {.async.} = +): Future[Result[SeqStorage, string]] {.async.} = if wakuArchive.isNil(): return err("waku archive unavailable") let endTime = getNowInNanosecondTime() let starTime = endTime - syncRange.nanos - #TODO special query for only timestap and hash ??? - var query = ArchiveQuery( includeData: true, cursor: none(ArchiveCursor), @@ -274,39 +356,40 @@ proc initFillStorage( debug "initial storage filling started" - var ids = newSeqOfCap[SyncID](DefaultStorageCap) - - # we assume IDs are in order + var storage = SeqStorage.new(DefaultStorageCap) while true: let response = (await wakuArchive.findMessages(query)).valueOr: return err("archive retrival failed: " & $error) + # we assume IDs are already in order for i in 0 ..< response.hashes.len: let hash = response.hashes[i] let msg = response.messages[i] + let pubsubTopic = response.topics[i] - ids.add(SyncID(time: msg.timestamp, hash: hash)) + let id = SyncID(time: msg.timestamp, hash: hash) + discard storage.insert(id, pubsubTopic, msg.contentTopic) if response.cursor.isNone(): break query.cursor = response.cursor - debug "initial storage filling done", elements = ids.len + debug "initial storage filling done", elements = storage.length() - return ok(ids) + return ok(storage) proc new*( T: type SyncReconciliation, - cluster: uint16, - shards: seq[uint16], + pubsubTopics: seq[PubSubTopic], + contentTopics: seq[ContentTopic], peerManager: PeerManager, wakuArchive: WakuArchive, syncRange: timer.Duration = DefaultSyncRange, syncInterval: timer.Duration = DefaultSyncInterval, relayJitter: timer.Duration = DefaultGossipSubJitter, - idsRx: AsyncQueue[SyncID], + idsRx: AsyncQueue[(SyncID, PubsubTopic, ContentTopic)], localWantsTx: AsyncQueue[PeerId], remoteNeedsTx: AsyncQueue[(PeerId, WakuMessageHash)], ): Future[Result[T, string]] {.async.} = @@ -316,11 +399,11 @@ proc new*( warn "will not sync messages before this point in time", error = res.error SeqStorage.new(DefaultStorageCap) else: - SeqStorage.new(res.get()) + res.get() var sync = SyncReconciliation( - cluster: cluster, - shards: shards.toPackedSet(), + pubsubTopics: pubsubTopics.toHashSet(), + contentTopics: contentTopics.toHashSet(), peerManager: peerManager, storage: storage, syncRange: syncRange, @@ -381,9 +464,9 @@ proc periodicPrune(self: SyncReconciliation) {.async.} = proc idsReceiverLoop(self: SyncReconciliation) {.async.} = while true: # infinite loop - let id = await self.idsRx.popfirst() + let (id, pubsub, content) = await self.idsRx.popfirst() - self.messageIngress(id) + self.messageIngress(id, pubsub, content) proc start*(self: SyncReconciliation) = if self.started: diff --git a/waku/waku_store_sync/storage/seq_storage.nim b/waku/waku_store_sync/storage/seq_storage.nim index b1782c22a..272b3a78a 100644 --- a/waku/waku_store_sync/storage/seq_storage.nim +++ b/waku/waku_store_sync/storage/seq_storage.nim @@ -1,8 +1,14 @@ -import std/[algorithm, sequtils, math, options], results, chronos, stew/arrayops +import + std/[algorithm, sequtils, math, options, tables, packedsets, sugar], + results, + chronos, + stew/arrayops import ../../waku_core/time, ../../waku_core/message/digest, + ../../waku_core/topics/pubsub_topic, + ../../waku_core/topics/content_topic, ../common, ./range_processing, ./storage @@ -10,43 +16,119 @@ import type SeqStorage* = ref object of SyncStorage elements: seq[SyncID] + pubsubTopicIndexes: seq[int] + contentTopicIndexes: seq[int] + + pubsubTopics: seq[PubSubTopic] + contentTopics: seq[ContentTopic] + + unusedPubsubTopicSet: PackedSet[int] + unusedContentTopicSet: PackedSet[int] + # Numer of parts a range will be splitted into. partitionCount: int # Number of element in a range for which item sets are used instead of fingerprints. lengthThreshold: int -method length*(self: SeqStorage): int = +method length*(self: SeqStorage): int {.raises: [].} = return self.elements.len -method insert*(self: SeqStorage, element: SyncID): Result[void, string] {.raises: [].} = - let idx = self.elements.lowerBound(element, common.cmp) +proc pubsubTopicsLen*(self: SeqStorage): int {.raises: [].} = + return self.pubsubTopics.len +proc contentTopicsLen*(self: SeqStorage): int {.raises: [].} = + return self.contentTopics.len + +proc unusedPubsubTopicsLen*(self: SeqStorage): int {.raises: [].} = + return self.unusedPubsubTopicSet.len + +proc unusedContentTopicsLen*(self: SeqStorage): int {.raises: [].} = + return self.unusedContentTopicSet.len + +proc getPubsubTopicIndex(self: SeqStorage, pubsubTopic: PubSubTopic): int = + for i, selfTopic in self.pubsubTopics: + if pubsubTopic == selfTopic: + return i + + if self.unusedPubsubTopicSet.len > 0: + let unusedIdx = self.unusedPubsubTopicSet.toSeq()[0] + self.unusedPubsubTopicSet.excl(unusedIdx) + self.pubsubTopics[unusedIdx] = pubsubTopic + return unusedIdx + + let newIdx = self.pubsubTopics.len + self.pubsubTopics.add(pubsubTopic) + return newidx + +proc getContentTopicIndex(self: SeqStorage, contentTopic: ContentTopic): int = + for i, selfTopic in self.contentTopics: + if contentTopic == selfTopic: + return i + + if self.unusedContentTopicSet.len > 0: + let unusedIdx = self.unusedContentTopicSet.toSeq()[0] + self.unusedContentTopicSet.excl(unusedIdx) + self.contentTopics[unusedIdx] = contentTopic + return unusedIdx + + let newIdx = self.contentTopics.len + self.contentTopics.add(contentTopic) + return newIdx + +proc insertAt( + self: SeqStorage, + idx: int, + element: SyncID, + pubsubTopic: PubSubTopic, + contentTopic: ContentTopic, +) = if idx < self.elements.len and self.elements[idx] == element: - return err("duplicate element") + # duplicate element ignore + return self.elements.insert(element, idx) + let pubsubIndex = self.getPubsubTopicIndex(pubsubTopic) + let contentIndex = self.getContentTopicIndex(contentTopic) + + self.pubsubTopicIndexes.insert(pubsubIndex, idx) + self.contentTopicIndexes.insert(contentIndex, idx) + +method insert*( + self: SeqStorage, + element: SyncID, + pubsubTopic: PubSubTopic, + contentTopic: ContentTopic, +): Result[void, string] {.raises: [].} = + let idx = self.elements.lowerBound(element, common.cmp) + self.insertAt(idx, element, pubsubTopic, contentTopic) + return ok() method batchInsert*( - self: SeqStorage, elements: seq[SyncID] + self: SeqStorage, + elements: seq[SyncID], + pubsubTopics: seq[PubSubTopic], + contentTopics: seq[ContentTopic], ): Result[void, string] {.raises: [].} = ## Insert the sorted seq of new elements. if elements.len == 1: - return self.insert(elements[0]) - - #TODO custom impl. ??? + return self.insert(elements[0], pubsubTopics[0], contentTopics[0]) if not elements.isSorted(common.cmp): return err("seq not sorted") - var merged = newSeqOfCap[SyncID](self.elements.len + elements.len) + var idx = 0 + for i in 0 ..< elements.len: + let element = elements[i] + let pubsubTopic = pubsubTopics[i] + let contentTopic = contentTopics[i] - merged.merge(self.elements, elements, common.cmp) + idx = self.elements[idx ..< self.elements.len].lowerBound(element, common.cmp) - self.elements = merged.deduplicate(true) + self.insertAt(idx, element, pubsubTopic, contentTopic) return ok() @@ -62,11 +144,32 @@ method prune*(self: SeqStorage, timestamp: Timestamp): int {.raises: [].} = let idx = self.elements.lowerBound(bound, common.cmp) self.elements.delete(0 ..< idx) + self.pubsubTopicIndexes.delete(0 ..< idx) + self.contentTopicIndexes.delete(0 ..< idx) + + # Free unused content topics + let contentIdxSet = self.contentTopicIndexes.toPackedSet() + var contentTopicSet: PackedSet[int] + for i in 0 ..< self.contentTopics.len: + contentTopicSet.incl(i) + + self.unusedContentTopicSet = contentTopicSet - contentIdxSet + + # Free unused pubsub topics + let pubsubIdxSet = self.pubsubTopicIndexes.toPackedSet() + var pubsubTopicSet: PackedSet[int] + for i in 0 ..< self.pubsubTopics.len: + pubsubTopicSet.incl(i) + + self.unusedPubsubTopicSet = pubsubTopicSet - pubsubIdxSet return idx proc computefingerprintFromSlice( - self: SeqStorage, sliceOpt: Option[Slice[int]] + self: SeqStorage, + sliceOpt: Option[Slice[int]], + pubsubTopicSet: PackedSet[int], + contentTopicSet: PackedSet[int], ): Fingerprint = ## XOR all hashes of a slice of the storage. @@ -77,7 +180,21 @@ proc computefingerprintFromSlice( let idxSlice = sliceOpt.get() - for id in self.elements[idxSlice]: + let elementSlice = self.elements[idxSlice] + let pubsubSlice = self.pubsubTopicIndexes[idxSlice] + let contentSlice = self.contentTopicIndexes[idxSlice] + + for i in 0 ..< elementSlice.len: + let id = elementSlice[i] + let pubsub = pubsubSlice[i] + let content = contentSlice[i] + + if pubsubTopicSet.len > 0 and pubsub notin pubsubTopicSet: + continue + + if contentTopicSet.len > 0 and content notin contentTopicSet: + continue + fingerprint = fingerprint xor id.hash return fingerprint @@ -85,8 +202,6 @@ proc computefingerprintFromSlice( proc findIdxBounds(self: SeqStorage, slice: Slice[SyncID]): Option[Slice[int]] = ## Given bounds find the corresponding indices in this storage - #TODO can thoses 2 binary search be combined for efficiency ??? - let lower = self.elements.lowerBound(slice.a, common.cmp) var upper = self.elements.upperBound(slice.b, common.cmp) @@ -101,21 +216,58 @@ proc findIdxBounds(self: SeqStorage, slice: Slice[SyncID]): Option[Slice[int]] = return some(lower ..< upper) method computeFingerprint*( - self: SeqStorage, bounds: Slice[SyncID] + self: SeqStorage, + bounds: Slice[SyncID], + pubsubTopics: seq[PubsubTopic], + contentTopics: seq[ContentTopic], ): Fingerprint {.raises: [].} = let idxSliceOpt = self.findIdxBounds(bounds) - return self.computefingerprintFromSlice(idxSliceOpt) + + var pubsubTopicSet = initPackedSet[int]() + for inputTopic in pubsubTopics: + for i, localTopic in self.pubsubTopics: + if inputTopic == localTopic: + pubsubTopicSet.incl(i) + + var contentTopicSet = initPackedSet[int]() + for inputTopic in contentTopics: + for i, localTopic in self.contentTopics: + if inputTopic == localTopic: + contentTopicSet.incl(i) + + return self.computefingerprintFromSlice(idxSliceOpt, pubsubTopicSet, contentTopicSet) + +proc getFilteredElements( + self: SeqStorage, + slice: Slice[int], + pubsubTopicSet: PackedSet[int], + contentTopicSet: PackedSet[int], +): seq[SyncID] = + let elements = collect(newSeq): + for i in slice: + if pubsubTopicSet.len > 0 and self.pubsubTopicIndexes[i] notin pubsubTopicSet: + continue + + if contentTopicSet.len > 0 and self.contentTopicIndexes[i] notin contentTopicSet: + continue + + self.elements[i] + + elements proc processFingerprintRange*( self: SeqStorage, inputBounds: Slice[SyncID], + pubsubTopicSet: PackedSet[int], + contentTopicSet: PackedSet[int], inputFingerprint: Fingerprint, output: var RangesData, ) {.raises: [].} = ## Compares fingerprints and partition new ranges. let idxSlice = self.findIdxBounds(inputBounds) - let ourFingerprint = self.computeFingerprintFromSlice(idxSlice) + let ourFingerprint = + self.computeFingerprintFromSlice(idxSlice, pubsubTopicSet, contentTopicSet) if ourFingerprint == inputFingerprint: output.ranges.add((inputBounds, RangeType.Skip)) @@ -131,7 +283,8 @@ proc processFingerprintRange*( if slice.len <= self.lengthThreshold: output.ranges.add((inputBounds, RangeType.ItemSet)) - let state = ItemSet(elements: self.elements[slice], reconciled: false) + let elements = self.getFilteredElements(slice, pubsubTopicSet, contentTopicSet) + let state = ItemSet(elements: elements, reconciled: false) output.itemSets.add(state) return @@ -149,11 +302,13 @@ proc processFingerprintRange*( if slice.len <= self.lengthThreshold: output.ranges.add((partitionBounds, RangeType.ItemSet)) - let state = ItemSet(elements: self.elements[slice], reconciled: false) + let elements = self.getFilteredElements(slice, pubsubTopicSet, contentTopicSet) + let state = ItemSet(elements: elements, reconciled: false) output.itemSets.add(state) continue - let fingerprint = self.computeFingerprintFromSlice(some(slice)) + let fingerprint = + self.computeFingerprintFromSlice(some(slice), pubsubTopicSet, contentTopicSet) output.ranges.add((partitionBounds, RangeType.Fingerprint)) output.fingerprints.add(fingerprint) continue @@ -161,6 +316,8 @@ proc processFingerprintRange*( proc processItemSetRange*( self: SeqStorage, inputBounds: Slice[SyncID], + pubsubTopicSet: PackedSet[int], + contentTopicSet: PackedSet[int], inputItemSet: ItemSet, hashToSend: var seq[Fingerprint], hashToRecv: var seq[Fingerprint], @@ -190,6 +347,16 @@ proc processItemSetRange*( while (j < m): let ourElement = self.elements[j] + let pubsub = self.pubsubTopicIndexes[j] + let content = self.contentTopicIndexes[j] + + if pubsubTopicSet.len > 0 and pubsub notin pubsubTopicSet: + j.inc() + continue + + if contentTopicSet.len > 0 and content notin contentTopicSet: + j.inc() + continue if i >= n: # in case we have more elements @@ -217,7 +384,8 @@ proc processItemSetRange*( if not inputItemSet.reconciled: output.ranges.add((inputBounds, RangeType.ItemSet)) - let state = ItemSet(elements: self.elements[slice], reconciled: true) + let elements = self.getFilteredElements(slice, pubsubTopicSet, contentTopicSet) + let state = ItemSet(elements: elements, reconciled: true) output.itemSets.add(state) else: output.ranges.add((inputBounds, RangeType.Skip)) @@ -234,6 +402,18 @@ method processPayload*( i = 0 j = 0 + var pubsubTopicSet = initPackedSet[int]() + for inputTopic in input.pubsubTopics: + for i, localTopic in self.pubsubTopics: + if inputTopic == localTopic: + pubsubTopicSet.incl(i) + + var contentTopicSet = initPackedSet[int]() + for inputTopic in input.contentTopics: + for i, localTopic in self.contentTopics: + if inputTopic == localTopic: + contentTopicSet.incl(i) + for (bounds, rangeType) in input.ranges: case rangeType of RangeType.Skip: @@ -244,14 +424,18 @@ method processPayload*( let fingerprint = input.fingerprints[i] i.inc() - self.processFingerprintRange(bounds, fingerprint, output) + self.processFingerprintRange( + bounds, pubsubTopicSet, contentTopicSet, fingerprint, output + ) continue of RangeType.ItemSet: let itemSet = input.itemsets[j] j.inc() - self.processItemSetRange(bounds, itemSet, hashToSend, hashToRecv, output) + self.processItemSetRange( + bounds, pubsubTopicSet, contentTopicSet, itemSet, hashToSend, hashToRecv, output + ) continue @@ -295,8 +479,41 @@ proc new*(T: type SeqStorage, capacity: int, threshold = 100, partitions = 8): T ) proc new*( - T: type SeqStorage, elements: seq[SyncID], threshold = 100, partitions = 8 + T: type SeqStorage, + elements: seq[SyncID], + pubsubTopics: seq[PubsubTopic], + contentTopics: seq[ContentTopic], + threshold = 100, + partitions = 8, ): T = + var idx = 0 + var uniquePubsubTopics = initOrderedTable[PubsubTopic, int]() + for pubsub in pubsubTopics: + if pubsub notin uniquePubsubTopics: + uniquePubsubTopics[pubsub] = idx + idx.inc() + + let pubsubTopicIndexes = collect(newSeq): + for pubsub in pubsubTopics: + uniquePubsubTopics[pubsub] + + idx = 0 + var uniqueContentTopics = initOrderedTable[ContentTopic, int]() + for content in contentTopics: + if content notin uniqueContentTopics: + uniqueContentTopics[content] = idx + idx.inc() + + let contentTopicIndexes = collect(newSeq): + for content in contentTopics: + uniqueContentTopics[content] + return SeqStorage( - elements: elements, lengthThreshold: threshold, partitionCount: partitions + elements: elements, + pubsubTopics: uniquePubsubTopics.keys.toSeq(), + contentTopics: uniqueContentTopics.keys.toSeq(), + pubsubTopicIndexes: pubsubTopicIndexes, + contentTopicIndexes: contentTopicIndexes, + lengthThreshold: threshold, + partitionCount: partitions, ) diff --git a/waku/waku_store_sync/storage/storage.nim b/waku/waku_store_sync/storage/storage.nim index 043261460..4d22d0163 100644 --- a/waku/waku_store_sync/storage/storage.nim +++ b/waku/waku_store_sync/storage/storage.nim @@ -1,16 +1,23 @@ import results -import ../../waku_core/time, ../common +import + ../../waku_core/time, + ../../waku_core/topics/content_topic, + ../../waku_core/topics/pubsub_topic, + ../common type SyncStorage* = ref object of RootObj method insert*( - self: SyncStorage, element: SyncID + self: SyncStorage, element: SyncID, pubsubTopic: PubsubTopic, topic: ContentTopic ): Result[void, string] {.base, gcsafe, raises: [].} = return err("insert method not implemented for SyncStorage") method batchInsert*( - self: SyncStorage, elements: seq[SyncID] + self: SyncStorage, + elements: seq[SyncID], + pubsubTopics: seq[PubsubTopic], + contentTopics: seq[ContentTopic], ): Result[void, string] {.base, gcsafe, raises: [].} = return err("batchInsert method not implemented for SyncStorage") @@ -20,17 +27,26 @@ method prune*( -1 method computeFingerprint*( - self: SyncStorage, bounds: Slice[SyncID] + self: SyncStorage, + bounds: Slice[SyncID], + pubsubTopics: seq[PubsubTopic], + contentTopics: seq[ContentTopic], ): Fingerprint {.base, gcsafe, raises: [].} = - return EmptyFingerprint + return FullFingerprint method processPayload*( self: SyncStorage, - payload: RangesData, + input: RangesData, hashToSend: var seq[Fingerprint], hashToRecv: var seq[Fingerprint], ): RangesData {.base, gcsafe, raises: [].} = - return RangesData() + return RangesData( + pubsubTopics: @["InsertPubsubTopicHere"], + contentTopics: @["InsertContentTopicHere"], + ranges: @[], + fingerprints: @[FullFingerprint], + itemSets: @[], + ) method length*(self: SyncStorage): int {.base, gcsafe, raises: [].} = -1 diff --git a/waku/waku_store_sync/transfer.nim b/waku/waku_store_sync/transfer.nim index 4dd663e98..f7f06bfd9 100644 --- a/waku/waku_store_sync/transfer.nim +++ b/waku/waku_store_sync/transfer.nim @@ -16,6 +16,8 @@ import ../common/protobuf, ../waku_enr, ../waku_core/codecs, + ../waku_core/topics/pubsub_topic, + ../waku_core/topics/content_topic, ../waku_core/message/digest, ../waku_core/message/message, ../waku_core/message/default_values, @@ -34,7 +36,7 @@ type SyncTransfer* = ref object of LPProtocol peerManager: PeerManager # Send IDs to reconciliation protocol for storage - idsTx: AsyncQueue[SyncID] + idsTx: AsyncQueue[(SyncID, PubsubTopic, ContentTopic)] # Receive Hashes from reconciliation protocol for reception localWantsRx: AsyncQueue[PeerId] @@ -179,7 +181,8 @@ proc initProtocolHandler(self: SyncTransfer) = continue let id = SyncID(time: msg.timestamp, hash: hash) - await self.idsTx.addLast(id) + + await self.idsTx.addLast((id, pubsub, msg.contentTopic)) continue @@ -197,7 +200,7 @@ proc new*( T: type SyncTransfer, peerManager: PeerManager, wakuArchive: WakuArchive, - idsTx: AsyncQueue[SyncID], + idsTx: AsyncQueue[(SyncID, PubsubTopic, ContentTopic)], localWantsRx: AsyncQueue[PeerId], remoteNeedsRx: AsyncQueue[(PeerId, WakuMessageHash)], ): T =