From 2ab9c3d363dc665c35643d3a6e831a91adcfe6ad Mon Sep 17 00:00:00 2001 From: Simon-Pierre Vivier Date: Fri, 29 Nov 2024 09:09:41 -0500 Subject: [PATCH] feat: remove Waku Sync 1.0 & Negentropy (#3185) --- .gitmodules | 5 - Makefile | 31 -- tests/waku_sync/sync_utils.nim | 37 -- tests/waku_sync/test_all.nim | 3 - tests/waku_sync/test_bindings.nim | 141 ------- tests/waku_sync/test_protocol.nim | 374 ------------------ vendor/negentropy | 1 - waku/factory/node_factory.nim | 11 - waku/node/waku_node.nim | 48 --- waku/waku_api/rest/admin/handlers.nim | 13 - waku/waku_sync.nim | 5 - waku/waku_sync/codec.nim | 57 --- waku/waku_sync/protocol.nim | 531 -------------------------- waku/waku_sync/raw_bindings.nim | 513 ------------------------- waku/waku_sync/session.nim | 240 ------------ waku/waku_sync/storage_manager.nim | 76 ---- 16 files changed, 2086 deletions(-) delete mode 100644 tests/waku_sync/sync_utils.nim delete mode 100644 tests/waku_sync/test_all.nim delete mode 100644 tests/waku_sync/test_bindings.nim delete mode 100644 tests/waku_sync/test_protocol.nim delete mode 160000 vendor/negentropy delete mode 100644 waku/waku_sync.nim delete mode 100644 waku/waku_sync/codec.nim delete mode 100644 waku/waku_sync/protocol.nim delete mode 100644 waku/waku_sync/raw_bindings.nim delete mode 100644 waku/waku_sync/session.nim delete mode 100644 waku/waku_sync/storage_manager.nim diff --git a/.gitmodules b/.gitmodules index 5650701b4..12e4bc390 100644 --- a/.gitmodules +++ b/.gitmodules @@ -169,11 +169,6 @@ url = https://github.com/nim-lang/db_connector.git ignore = untracked branch = master -[submodule "vendor/negentropy"] - ignore = untracked - path = vendor/negentropy - url = https://github.com/waku-org/negentropy.git - branch = master [submodule "vendor/nph"] ignore = untracked branch = master diff --git a/Makefile b/Makefile index a2deeac37..1d8531a05 100644 --- a/Makefile +++ b/Makefile @@ -178,35 +178,6 @@ clean-librln: # Extend clean target clean: | clean-librln -###################### -### NEGENTROPY ### -###################### -.PHONY: negentropy - -LIBNEGENTROPY_BUILDDIR := $(CURDIR)/vendor/negentropy/cpp -LIBNEGENTROPY_FILE := libnegentropy.a - -deps: | negentropy - -clean: | negentropy-clean - -$(LIBNEGENTROPY_FILE): - $(MAKE) -C $(LIBNEGENTROPY_BUILDDIR) && \ - cp $(LIBNEGENTROPY_BUILDDIR)/${LIBNEGENTROPY_FILE} ${LIBNEGENTROPY_FILE} - -negentropy: | $(LIBNEGENTROPY_FILE) - ## Pass libnegentropy and it's deps to linker. - $(eval LIBNEGENTROPY_PATH := $(shell if [ -f "$(LIBNEGENTROPY_FILE)" ]; then echo "$(LIBNEGENTROPY_FILE)"; else echo "./$(LIBNEGENTROPY_FILE)"; fi)) - $(eval NIM_PARAMS += --passL:$(LIBNEGENTROPY_PATH) --passL:-lcrypto --passL:-lssl --passL:-lstdc++) -ifeq ($(detected_OS),Darwin) - $(eval NIM_PARAMS += --passL:-L/opt/homebrew/lib/) -endif - -negentropy-clean: - $(MAKE) -C $(LIBNEGENTROPY_BUILDDIR) clean && \ - rm ${LIBNEGENTROPY_FILE} - - ################# ## Waku Common ## ################# @@ -480,7 +451,6 @@ cwaku_example: | build libwaku ./examples/cbindings/base64.c \ -lwaku -Lbuild/ \ -pthread -ldl -lm \ - -lnegentropy -Lvendor/negentropy/cpp/ \ -lminiupnpc -Lvendor/nim-nat-traversal/vendor/miniupnp/miniupnpc/build/ \ -lnatpmp -Lvendor/nim-nat-traversal/vendor/libnatpmp-upstream/ \ vendor/nim-libbacktrace/libbacktrace_wrapper.o \ @@ -493,7 +463,6 @@ cppwaku_example: | build libwaku ./examples/cpp/base64.cpp \ -lwaku -Lbuild/ \ -pthread -ldl -lm \ - -lnegentropy -Lvendor/negentropy/cpp/ \ -lminiupnpc -Lvendor/nim-nat-traversal/vendor/miniupnp/miniupnpc/build/ \ -lnatpmp -Lvendor/nim-nat-traversal/vendor/libnatpmp-upstream/ \ vendor/nim-libbacktrace/libbacktrace_wrapper.o \ diff --git a/tests/waku_sync/sync_utils.nim b/tests/waku_sync/sync_utils.nim deleted file mode 100644 index be0e44d7e..000000000 --- a/tests/waku_sync/sync_utils.nim +++ /dev/null @@ -1,37 +0,0 @@ -{.used.} - -import std/options, chronos, chronicles, libp2p/crypto/crypto - -import waku/[node/peer_manager, waku_core, waku_sync], ../testlib/wakucore - -proc newTestWakuSync*( - switch: Switch, - transfer: Option[TransferCallback] = none(TransferCallback), - prune: Option[PruneCallback] = none(PruneCallback), - interval: Duration = DefaultSyncInterval, -): Future[WakuSync] {.async.} = - let peerManager = PeerManager.new(switch) - - let fakePruneCallback = proc( - pruneStart: Timestamp, pruneStop: Timestamp, cursor: Option[WakuMessageHash] - ): Future[ - Result[(seq[(WakuMessageHash, Timestamp)], Option[WakuMessageHash]), string] - ] {.async: (raises: []), closure.} = - return ok((@[], none(WakuMessageHash))) - - let res = await WakuSync.new( - peerManager = peerManager, - relayJitter = 0.seconds, - syncInterval = interval, - wakuArchive = nil, - wakuStoreClient = nil, - pruneCallback = some(fakePruneCallback), - transferCallback = none(TransferCallback), - ) - - let proto = res.get() - - proto.start() - switch.mount(proto) - - return proto diff --git a/tests/waku_sync/test_all.nim b/tests/waku_sync/test_all.nim deleted file mode 100644 index b5801e4ac..000000000 --- a/tests/waku_sync/test_all.nim +++ /dev/null @@ -1,3 +0,0 @@ -{.used.} - -import ./test_protocol, ./test_bindings diff --git a/tests/waku_sync/test_bindings.nim b/tests/waku_sync/test_bindings.nim deleted file mode 100644 index f2099ea50..000000000 --- a/tests/waku_sync/test_bindings.nim +++ /dev/null @@ -1,141 +0,0 @@ -{.used.} - -import std/[options], testutils/unittests, chronos, libp2p/crypto/crypto, std/random - -import - ../../waku/ - [node/peer_manager, waku_core, waku_core/message/digest, waku_sync/raw_bindings], - ../testlib/[wakucore], - ./sync_utils - -random.randomize() - -#TODO clean this up - -suite "Bindings": - var storage {.threadvar.}: NegentropyStorage - var messages {.threadvar.}: seq[(WakuMessageHash, WakuMessage)] - - setup: - let storageRes = NegentropyStorage.new() - assert storageRes.isOk(), $storageRes.error - storage = storageRes.get() - - messages = @[] - for _ in 0 ..< 10: - let msg = fakeWakuMessage() - let hash = computeMessageHash(DefaultPubsubTopic, msg) - messages.add((hash, msg)) - - teardown: - storage.delete() - - test "storage insert": - check: - storage.len() == 0 - - let insRes = storage.insert(messages[0][1].timestamp, messages[0][0]) - - assert insRes.isOk(), $insRes.error - - check: - storage.len() == 1 - - test "storage erase": - let insRes = storage.insert(messages[0][1].timestamp, messages[0][0]) - assert insRes.isOk(), $insRes.error - - check: - storage.len() == 1 - - var delRes = storage.erase(messages[0][1].timestamp, messages[0][0]) - assert delRes.isOk() - - check: - storage.len() == 0 - - delRes = storage.erase(messages[0][1].timestamp, messages[0][0]) - assert delRes.isErr() - - check: - storage.len() == 0 - - test "subrange": - for (hash, msg) in messages: - let insRes = storage.insert(msg.timestamp, hash) - assert insRes.isOk(), $insRes.error - - check: - storage.len() == 10 - - let subrangeRes = NegentropySubRangeStorage.new(storage) - assert subrangeRes.isOk(), subrangeRes.error - let subrange = subrangeRes.get() - - check: - subrange.len() == 10 - - #[ test "storage memory size": - for (hash, msg) in messages: - let insRes = storage.insert(msg.timestamp, hash) - assert insRes.isOk(), $insRes.error - - check: - storage.len() == 10 - - for (hash, msg) in messages: - let delRes = storage.erase(msg.timestamp, hash) - assert delRes.isOk(), $delRes.error - - check: - storage.len() == 0 - - #TODO validate that the occupied memory didn't grow. ]# - - test "reconcile server differences": - for (hash, msg) in messages: - let insRes = storage.insert(msg.timestamp, hash) - assert insRes.isOk(), $insRes.error - - let clientNegentropyRes = Negentropy.new(storage, 0) - - let storageRes = NegentropyStorage.new() - assert storageRes.isOk(), $storageRes.error - let serverStorage = storageRes.get() - - for (hash, msg) in messages: - let insRes = serverStorage.insert(msg.timestamp, hash) - assert insRes.isOk(), $insRes.error - - # the extra msg - let msg = fakeWakuMessage() - let hash = computeMessageHash(DefaultPubsubTopic, msg) - let insRes = serverStorage.insert(msg.timestamp, hash) - assert insRes.isOk(), $insRes.error - - let serverNegentropyRes = Negentropy.new(serverStorage, 0) - - assert clientNegentropyRes.isOk(), $clientNegentropyRes.error - assert serverNegentropyRes.isOk(), $serverNegentropyRes.error - - let clientNegentropy = clientNegentropyRes.get() - let serverNegentropy = serverNegentropyRes.get() - - let initRes = clientNegentropy.initiate() - assert initRes.isOk(), $initRes.error - let init = initRes.get() - - let reconRes = serverNegentropy.serverReconcile(init) - assert reconRes.isOk(), $reconRes.error - let srecon = reconRes.get() - - var - haves: seq[WakuMessageHash] - needs: seq[WakuMessageHash] - let creconRes = clientNegentropy.clientReconcile(srecon, haves, needs) - assert creconRes.isOk(), $creconRes.error - let reconOpt = creconRes.get() - - check: - reconOpt.isNone() - needs[0] == hash diff --git a/tests/waku_sync/test_protocol.nim b/tests/waku_sync/test_protocol.nim deleted file mode 100644 index c203471fb..000000000 --- a/tests/waku_sync/test_protocol.nim +++ /dev/null @@ -1,374 +0,0 @@ -{.used.} - -import - std/[options, sets], - testutils/unittests, - chronos, - chronicles, - libp2p/crypto/crypto, - stew/byteutils, - std/random - -import - ../../waku/[ - node/peer_manager, - waku_core, - waku_core/message/digest, - waku_sync, - waku_sync/raw_bindings, - ], - ../testlib/[wakucore, testasync], - ./sync_utils - -random.randomize() - -suite "Waku Sync": - var serverSwitch {.threadvar.}: Switch - var clientSwitch {.threadvar.}: Switch - - var server {.threadvar.}: WakuSync - var client {.threadvar.}: WakuSync - - var serverPeerInfo {.threadvar.}: Option[RemotePeerInfo] - - asyncSetup: - serverSwitch = newTestSwitch() - clientSwitch = newTestSwitch() - - await allFutures(serverSwitch.start(), clientSwitch.start()) - - server = await newTestWakuSync(serverSwitch) - client = await newTestWakuSync(clientSwitch) - - serverPeerInfo = some(serverSwitch.peerInfo.toRemotePeerInfo()) - - asyncTeardown: - await sleepAsync(10.milliseconds) - - await allFutures(server.stop(), client.stop()) - await allFutures(serverSwitch.stop(), clientSwitch.stop()) - - suite "Protocol": - asyncTest "sync 2 nodes both empty": - let hashes = await client.storeSynchronization(serverPeerInfo) - assert hashes.isOk(), hashes.error - check: - hashes.value[0].len == 0 - - asyncTest "sync 2 nodes empty client full server": - let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic) - let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic) - let msg3 = fakeWakuMessage(contentTopic = DefaultContentTopic) - - server.messageIngress(DefaultPubsubTopic, msg1) - server.messageIngress(DefaultPubsubTopic, msg2) - server.messageIngress(DefaultPubsubTopic, msg3) - - var hashes = await client.storeSynchronization(serverPeerInfo) - - assert hashes.isOk(), hashes.error - check: - hashes.value[0].len == 3 - computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg1) in hashes.value[0] - computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2) in hashes.value[0] - computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg3) in hashes.value[0] - - asyncTest "sync 2 nodes full client empty server": - let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic) - let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic) - let msg3 = fakeWakuMessage(contentTopic = DefaultContentTopic) - - client.messageIngress(DefaultPubsubTopic, msg1) - client.messageIngress(DefaultPubsubTopic, msg2) - client.messageIngress(DefaultPubsubTopic, msg3) - - var hashes = await client.storeSynchronization(serverPeerInfo) - assert hashes.isOk(), hashes.error - check: - hashes.value[0].len == 0 - - asyncTest "sync 2 nodes different hashes": - let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic) - let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic) - - server.messageIngress(DefaultPubsubTopic, msg1) - client.messageIngress(DefaultPubsubTopic, msg1) - server.messageIngress(DefaultPubsubTopic, msg2) - - var syncRes = await client.storeSynchronization(serverPeerInfo) - - check: - syncRes.isOk() - - var hashes = syncRes.get() - - check: - hashes[0].len == 1 - hashes[0][0] == computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2) - - #Assuming message is fetched from peer - client.messageIngress(DefaultPubsubTopic, msg2) - - syncRes = await client.storeSynchronization(serverPeerInfo) - - check: - syncRes.isOk() - - hashes = syncRes.get() - - check: - hashes[0].len == 0 - - asyncTest "sync 2 nodes same hashes": - let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic) - let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic) - - server.messageIngress(DefaultPubsubTopic, msg1) - client.messageIngress(DefaultPubsubTopic, msg1) - server.messageIngress(DefaultPubsubTopic, msg2) - client.messageIngress(DefaultPubsubTopic, msg2) - - let hashes = await client.storeSynchronization(serverPeerInfo) - assert hashes.isOk(), $hashes.error - check: - hashes.value[0].len == 0 - - asyncTest "sync 2 nodes 100K msgs": - var i = 0 - let msgCount = 100000 - var diffIndex = rand(msgCount) - var diffMsg: WakuMessage - while i < msgCount: - let msg = fakeWakuMessage(contentTopic = DefaultContentTopic) - if i != diffIndex: - client.messageIngress(DefaultPubsubTopic, msg) - else: - diffMsg = msg - server.messageIngress(DefaultPubsubTopic, msg) - i += 1 - - let hashes = await client.storeSynchronization(serverPeerInfo) - assert hashes.isOk(), $hashes.error - - check: - hashes.value[0].len == 1 - hashes.value[0][0] == computeMessageHash(DefaultPubsubTopic, diffMsg) - - asyncTest "sync 2 nodes 100K msgs 10K diffs": - var i = 0 - let msgCount = 100000 - var diffCount = 10000 - - var diffMsgHashes: seq[WakuMessageHash] - var randIndexes: seq[int] - while i < diffCount: - let randInt = rand(msgCount) - if randInt in randIndexes: - continue - randIndexes.add(randInt) - i += 1 - - i = 0 - var tmpDiffCnt = diffCount - while i < msgCount: - let msg = fakeWakuMessage(contentTopic = DefaultContentTopic) - if tmpDiffCnt > 0 and i in randIndexes: - diffMsgHashes.add(computeMessageHash(DefaultPubsubTopic, msg)) - tmpDiffCnt = tmpDiffCnt - 1 - else: - client.messageIngress(DefaultPubsubTopic, msg) - - server.messageIngress(DefaultPubsubTopic, msg) - i += 1 - - let hashes = await client.storeSynchronization(serverPeerInfo) - assert hashes.isOk(), $hashes.error - - check: - hashes.value[0].len == diffCount - toHashSet(hashes.value[0]) == toHashSet(diffMsgHashes) - - asyncTest "sync 3 nodes 2 client 1 server": - ## Setup - let client2Switch = newTestSwitch() - await client2Switch.start() - let client2 = await newTestWakuSync(client2Switch) - - let msgCount = 10000 - var i = 0 - - while i < msgCount: - i += 1 - let msg = fakeWakuMessage(contentTopic = DefaultContentTopic) - if i mod 2 == 0: - client2.messageIngress(DefaultPubsubTopic, msg) - else: - client.messageIngress(DefaultPubsubTopic, msg) - server.messageIngress(DefaultPubsubTopic, msg) - - let fut1 = client.storeSynchronization(serverPeerInfo) - let fut2 = client2.storeSynchronization(serverPeerInfo) - waitFor allFutures(fut1, fut2) - - let hashes1 = fut1.read() - let hashes2 = fut2.read() - - assert hashes1.isOk(), $hashes1.error - assert hashes2.isOk(), $hashes2.error - - check: - hashes1.value[0].len == int(msgCount / 2) - hashes2.value[0].len == int(msgCount / 2) - - await client2.stop() - await client2Switch.stop() - - asyncTest "sync 6 nodes varying sync diffs": - ## Setup - let - client2Switch = newTestSwitch() - client3Switch = newTestSwitch() - client4Switch = newTestSwitch() - client5Switch = newTestSwitch() - - await allFutures( - client2Switch.start(), - client3Switch.start(), - client4Switch.start(), - client5Switch.start(), - ) - - let - client2 = await newTestWakuSync(client2Switch) - client3 = await newTestWakuSync(client3Switch) - client4 = await newTestWakuSync(client4Switch) - client5 = await newTestWakuSync(client5Switch) - - let msgCount = 100000 - var i = 0 - - while i < msgCount: - let msg = fakeWakuMessage(contentTopic = DefaultContentTopic) - if i < msgCount - 1: - client.messageIngress(DefaultPubsubTopic, msg) - if i < msgCount - 10: - client2.messageIngress(DefaultPubsubTopic, msg) - if i < msgCount - 100: - client3.messageIngress(DefaultPubsubTopic, msg) - if i < msgCount - 1000: - client4.messageIngress(DefaultPubsubTopic, msg) - if i < msgCount - 10000: - client5.messageIngress(DefaultPubsubTopic, msg) - server.messageIngress(DefaultPubsubTopic, msg) - i += 1 - - var timeBefore = getNowInNanosecondTime() - let hashes1 = await client.storeSynchronization(serverPeerInfo) - var timeAfter = getNowInNanosecondTime() - var syncTime = (timeAfter - timeBefore) - debug "sync time in seconds", msgsTotal = msgCount, diff = 1, syncTime = syncTime - assert hashes1.isOk(), $hashes1.error - check: - hashes1.value[0].len == 1 - - timeBefore = getNowInNanosecondTime() - let hashes2 = await client2.storeSynchronization(serverPeerInfo) - timeAfter = getNowInNanosecondTime() - syncTime = (timeAfter - timeBefore) - debug "sync time in seconds", msgsTotal = msgCount, diff = 10, syncTime = syncTime - assert hashes2.isOk(), $hashes2.error - check: - hashes2.value[0].len == 10 - - timeBefore = getNowInNanosecondTime() - let hashes3 = await client3.storeSynchronization(serverPeerInfo) - timeAfter = getNowInNanosecondTime() - syncTime = (timeAfter - timeBefore) - debug "sync time in seconds", - msgsTotal = msgCount, diff = 100, syncTime = syncTime - assert hashes3.isOk(), $hashes3.error - check: - hashes3.value[0].len == 100 - - timeBefore = getNowInNanosecondTime() - let hashes4 = await client4.storeSynchronization(serverPeerInfo) - timeAfter = getNowInNanosecondTime() - syncTime = (timeAfter - timeBefore) - debug "sync time in seconds", - msgsTotal = msgCount, diff = 1000, syncTime = syncTime - assert hashes4.isOk(), $hashes4.error - check: - hashes4.value[0].len == 1000 - - timeBefore = getNowInNanosecondTime() - let hashes5 = await client5.storeSynchronization(serverPeerInfo) - timeAfter = getNowInNanosecondTime() - syncTime = (timeAfter - timeBefore) - debug "sync time in seconds", - msgsTotal = msgCount, diff = 10000, syncTime = syncTime - assert hashes5.isOk(), $hashes5.error - check: - hashes5.value[0].len == 10000 - - await allFutures(client2.stop(), client3.stop(), client4.stop(), client5.stop()) - await allFutures( - client2Switch.stop(), - client3Switch.stop(), - client4Switch.stop(), - client5Switch.stop(), - ) - - asyncTest "sync 3 nodes cyclic": - let - node1Switch = newTestSwitch() - node2Switch = newTestSwitch() - node3Switch = newTestSwitch() - - await allFutures(node1Switch.start(), node2Switch.start(), node3Switch.start()) - - let node1PeerInfo = some(node1Switch.peerInfo.toRemotePeerInfo()) - let node2PeerInfo = some(node2Switch.peerInfo.toRemotePeerInfo()) - let node3PeerInfo = some(node3Switch.peerInfo.toRemotePeerInfo()) - - let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic) - let hash1 = computeMessageHash(DefaultPubsubTopic, msg1) - let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic) - let hash2 = computeMessageHash(DefaultPubsubTopic, msg2) - let msg3 = fakeWakuMessage(contentTopic = DefaultContentTopic) - let hash3 = computeMessageHash(DefaultPubsubTopic, msg3) - - let - node1 = await newTestWakuSync(node1Switch) - node2 = await newTestWakuSync(node2Switch) - node3 = await newTestWakuSync(node3Switch) - - node1.messageIngress(DefaultPubsubTopic, msg1) - node2.messageIngress(DefaultPubsubTopic, msg1) - node2.messageIngress(DefaultPubsubTopic, msg2) - node3.messageIngress(DefaultPubsubTopic, msg3) - - let f1 = node1.storeSynchronization(node2PeerInfo) - let f2 = node2.storeSynchronization(node3PeerInfo) - let f3 = node3.storeSynchronization(node1PeerInfo) - - waitFor allFutures(f1, f2, f3) - - let hashes1 = f1.read() - let hashes2 = f2.read() - let hashes3 = f3.read() - - assert hashes1.isOk(), hashes1.error - assert hashes2.isOk(), hashes2.error - assert hashes3.isOk(), hashes3.error - - check: - hashes1.get()[0].len == 1 - hashes2.get()[0].len == 1 - hashes3.get()[0].len == 1 - - hashes1.get()[0][0] == hash2 - hashes2.get()[0][0] == hash3 - hashes3.get()[0][0] == hash1 - - await allFutures(node1.stop(), node2.stop(), node3.stop()) - await allFutures(node1Switch.stop(), node2Switch.stop(), node3Switch.stop()) diff --git a/vendor/negentropy b/vendor/negentropy deleted file mode 160000 index 449b304d6..000000000 --- a/vendor/negentropy +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 449b304d67e9401bd96451cb349eb6b17b713f67 diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 009cceceb..2ed2ad2e7 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -313,17 +313,6 @@ proc setupProtocols( if conf.store and conf.storeResume: node.setupStoreResume() - if conf.storeSync: - ( - await node.mountWakuSync( - int(conf.storeSyncMaxPayloadSize), - conf.storeSyncRange.seconds(), - conf.storeSyncInterval.seconds(), - conf.storeSyncRelayJitter.seconds(), - ) - ).isOkOr: - return err("failed to mount waku sync protocol: " & $error) - # NOTE Must be mounted after relay if conf.lightpush: try: diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 7daf1a8df..50b65bfc5 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -39,7 +39,6 @@ import ../waku_filter_v2/client as filter_client, ../waku_filter_v2/subscriptions as filter_subscriptions, ../waku_metadata, - ../waku_sync, ../waku_lightpush/client as lightpush_client, ../waku_lightpush/common, ../waku_lightpush/protocol, @@ -108,7 +107,6 @@ type wakuPeerExchange*: WakuPeerExchange wakuMetadata*: WakuMetadata wakuSharding*: Sharding - wakuSync*: WakuSync enr*: enr.Record libp2pPing*: Ping rng*: ref rand.HmacDrbgContext @@ -177,42 +175,6 @@ proc connectToNodes*( proc disconnectNode*(node: WakuNode, remotePeer: RemotePeerInfo) {.async.} = await peer_manager.disconnectNode(node.peerManager, remotePeer) -## Waku Sync - -proc mountWakuSync*( - node: WakuNode, - maxFrameSize: int = DefaultMaxFrameSize, - syncRange: timer.Duration = DefaultSyncRange, - syncInterval: timer.Duration = DefaultSyncInterval, - relayJitter: Duration = DefaultGossipSubJitter, -): Future[Result[void, string]] {.async.} = - if not node.wakuSync.isNil(): - return err("already mounted") - - node.wakuSync = ( - await WakuSync.new( - peerManager = node.peerManager, - maxFrameSize = maxFrameSize, - syncRange = syncRange, - syncInterval = syncInterval, - relayJitter = relayJitter, - wakuArchive = node.wakuArchive, - wakuStoreClient = node.wakuStoreClient, - ) - ).valueOr: - return err("initialization failed: " & error) - - let catchable = catch: - node.switch.mount(node.wakuSync, protocolMatcher(WakuSyncCodec)) - - if catchable.isErr(): - return err("switch mounting failed: " & catchable.error.msg) - - if node.started: - node.wakuSync.start() - - return ok() - ## Waku Metadata proc mountMetadata*(node: WakuNode, clusterId: uint32): Result[void, string] = @@ -269,19 +231,12 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) = await node.wakuArchive.handleMessage(topic, msg) - proc syncHandler(topic: PubsubTopic, msg: WakuMessage) {.async.} = - if node.wakuSync.isNil(): - return - - node.wakuSync.messageIngress(topic, msg) - let defaultHandler = proc( topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = await traceHandler(topic, msg) await filterHandler(topic, msg) await archiveHandler(topic, msg) - await syncHandler(topic, msg) discard node.wakuRelay.subscribe(topic, defaultHandler) @@ -1349,9 +1304,6 @@ proc start*(node: WakuNode) {.async.} = if not node.wakuStoreResume.isNil(): await node.wakuStoreResume.start() - if not node.wakuSync.isNil(): - node.wakuSync.start() - ## The switch uses this mapper to update peer info addrs ## with announced addrs after start let addressMapper = proc( diff --git a/waku/waku_api/rest/admin/handlers.nim b/waku/waku_api/rest/admin/handlers.nim index b35ac6eae..1e24ad56d 100644 --- a/waku/waku_api/rest/admin/handlers.nim +++ b/waku/waku_api/rest/admin/handlers.nim @@ -16,7 +16,6 @@ import ../../../waku_relay, ../../../waku_peer_exchange, ../../../waku_node, - ../../../waku_sync, ../../../node/peer_manager, ../responses, ../serdes, @@ -106,18 +105,6 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = ) tuplesToWakuPeers(peers, pxPeers) - if not node.wakuSync.isNil(): - # Map WakuSync peers to WakuPeers and add to return list - let syncPeers = node.peerManager.wakuPeerStore.peers(WakuSyncCodec).mapIt( - ( - multiaddr: constructMultiaddrStr(it), - protocol: WakuSyncCodec, - connected: it.connectedness == Connectedness.Connected, - origin: it.origin, - ) - ) - tuplesToWakuPeers(peers, syncPeers) - let resp = RestApiResponse.jsonResponse(peers, status = Http200) if resp.isErr(): error "An error ocurred while building the json respose: ", error = resp.error diff --git a/waku/waku_sync.nim b/waku/waku_sync.nim deleted file mode 100644 index 61cb2df4e..000000000 --- a/waku/waku_sync.nim +++ /dev/null @@ -1,5 +0,0 @@ -{.push raises: [].} - -import ./waku_sync/protocol, ./waku_sync/common - -export common, protocol diff --git a/waku/waku_sync/codec.nim b/waku/waku_sync/codec.nim deleted file mode 100644 index 6e3d9bd63..000000000 --- a/waku/waku_sync/codec.nim +++ /dev/null @@ -1,57 +0,0 @@ -{.push raises: [].} - -import std/options, stew/arrayops -import ../common/protobuf, ../waku_core, ./common - -proc encode*(req: SyncPayload): ProtoBuffer = - var pb = initProtoBuffer() - - if req.syncRange.isSome(): - pb.write3(31, req.syncRange.get()[0]) - pb.write3(32, req.syncRange.get()[1]) - - if req.frameSize.isSome(): - pb.write3(33, req.frameSize.get()) - - if req.negentropy.len > 0: - pb.write3(1, req.negentropy) - - if req.hashes.len > 0: - for hash in req.hashes: - pb.write3(20, hash) - - return pb - -proc decode*(T: type SyncPayload, buffer: seq[byte]): ProtobufResult[T] = - var req = SyncPayload() - let pb = initProtoBuffer(buffer) - - var rangeStart: uint64 - var rangeEnd: uint64 - if ?pb.getField(31, rangeStart) and ?pb.getField(32, rangeEnd): - req.syncRange = some((rangeStart, rangeEnd)) - else: - req.syncRange = none((uint64, uint64)) - - var frame: uint64 - if ?pb.getField(33, frame): - req.frameSize = some(frame) - else: - req.frameSize = none(uint64) - - var negentropy: seq[byte] - if ?pb.getField(1, negentropy): - req.negentropy = negentropy - else: - req.negentropy = @[] - - var buffer: seq[seq[byte]] - if not ?pb.getRepeatedField(20, buffer): - req.hashes = @[] - else: - req.hashes = newSeqOfCap[WakuMessageHash](buffer.len) - for buf in buffer: - let msg: WakuMessageHash = fromBytes(buf) - req.hashes.add(msg) - - return ok(req) diff --git a/waku/waku_sync/protocol.nim b/waku/waku_sync/protocol.nim deleted file mode 100644 index a76a6f173..000000000 --- a/waku/waku_sync/protocol.nim +++ /dev/null @@ -1,531 +0,0 @@ -{.push raises: [].} - -import - std/[options, sugar, sequtils], - stew/byteutils, - results, - chronicles, - chronos, - metrics, - libp2p/utility, - libp2p/protocols/protocol, - libp2p/stream/connection, - libp2p/crypto/crypto, - eth/p2p/discoveryv5/enr -import - ../common/nimchronos, - ../common/enr, - ../waku_core, - ../waku_archive, - ../waku_store/[client, common], - ../waku_enr, - ../node/peer_manager/peer_manager, - ./raw_bindings, - ./common, - ./session - -logScope: - topics = "waku sync" - -type WakuSync* = ref object of LPProtocol - storage: NegentropyStorage - maxFrameSize: int # Negentropy param to limit the size of payloads - - peerManager: PeerManager - - syncInterval: timer.Duration # Time between each syncronisation attempt - syncRange: timer.Duration # Amount of time in the past to sync - relayJitter: Duration # Amount of time since the present to ignore when syncing - transferCallBack: Option[TransferCallback] # Callback for message transfers. - - pruneCallBack: Option[PruneCallBack] # Callback with the result of the archive query - pruneStart: Timestamp # Last pruning start timestamp - pruneOffset: timer.Duration # Offset to prune a bit more than necessary. - - periodicSyncFut: Future[void] - periodicPruneFut: Future[void] - -proc storageSize*(self: WakuSync): int = - self.storage.len - -proc messageIngress*(self: WakuSync, pubsubTopic: PubsubTopic, msg: WakuMessage) = - if msg.ephemeral: - return - - let msgHash: WakuMessageHash = computeMessageHash(pubsubTopic, msg) - - trace "inserting message into waku sync storage ", - msg_hash = msgHash.to0xHex(), timestamp = msg.timestamp - - self.storage.insert(msg.timestamp, msgHash).isOkOr: - error "failed to insert message ", msg_hash = msgHash.to0xHex(), error = $error - -proc messageIngress*( - self: WakuSync, pubsubTopic: PubsubTopic, msgHash: WakuMessageHash, msg: WakuMessage -) = - if msg.ephemeral: - return - - trace "inserting message into waku sync storage ", - msg_hash = msgHash.to0xHex(), timestamp = msg.timestamp - - self.storage.insert(msg.timestamp, msgHash).isOkOr: - error "failed to insert message ", msg_hash = msgHash.to0xHex(), error = error - -proc calculateRange( - jitter: Duration = 20.seconds, syncRange: Duration = 1.hours -): (int64, int64) = - ## Calculates the start and end time of a sync session - - var now = getNowInNanosecondTime() - - # Because of message jitter inherent to Relay protocol - now -= jitter.nanos - - let syncRange = syncRange.nanos - - let syncStart = now - syncRange - let syncEnd = now - - return (syncStart, syncEnd) - -proc request( - self: WakuSync, conn: Connection -): Future[Result[seq[WakuMessageHash], string]] {.async.} = - let (syncStart, syncEnd) = calculateRange(self.relayJitter) - - let initialized = - ?clientInitialize(self.storage, conn, self.maxFrameSize, syncStart, syncEnd) - - debug "sync session initialized", - client = self.peerManager.switch.peerInfo.peerId, - server = conn.peerId, - frameSize = self.maxFrameSize, - timeStart = syncStart, - timeEnd = syncEnd - - var hashes: seq[WakuMessageHash] - var reconciled = initialized - - while true: - let sent = ?await reconciled.send() - - trace "sync payload sent", - client = self.peerManager.switch.peerInfo.peerId, - server = conn.peerId, - payload = reconciled.payload - - let received = ?await sent.listenBack() - - trace "sync payload received", - client = self.peerManager.switch.peerInfo.peerId, - server = conn.peerId, - payload = received.payload - - reconciled = (?received.clientReconcile(hashes)).valueOr: - let completed = error # Result[Reconciled, Completed] - - ?await completed.clientTerminate() - - debug "sync session ended gracefully", - client = self.peerManager.switch.peerInfo.peerId, server = conn.peerId - - trace "hashes to sync", - client = self.peerManager.switch.peerInfo.peerId, msg_hashes = $hashes - - return ok(hashes) - - continue - -proc storeSynchronization*( - self: WakuSync, peerInfo: Option[RemotePeerInfo] = none(RemotePeerInfo) -): Future[Result[(seq[WakuMessageHash], RemotePeerInfo), string]] {.async.} = - let peer = peerInfo.valueOr: - self.peerManager.selectPeer(WakuSyncCodec).valueOr: - return err("No suitable peer found for sync") - - let connOpt = await self.peerManager.dialPeer(peer, WakuSyncCodec) - - let conn: Connection = connOpt.valueOr: - return err("Cannot establish sync connection") - - let hashes: seq[WakuMessageHash] = (await self.request(conn)).valueOr: - error "sync session ended", - server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId, error - - return err("Sync request error: " & error) - - return ok((hashes, peer)) - -proc handleSyncSession( - self: WakuSync, conn: Connection -): Future[Result[seq[WakuMessageHash], string]] {.async.} = - let (syncStart, syncEnd) = calculateRange(self.relayJitter) - - let initialized = - ?serverInitialize(self.storage, conn, self.maxFrameSize, syncStart, syncEnd) - - var sent = initialized - - while true: - let received = ?await sent.listenBack() - - trace "sync payload received", - server = self.peerManager.switch.peerInfo.peerId, - client = conn.peerId, - payload = received.payload - - let reconciled = (?received.serverReconcile()).valueOr: - let completed = error # Result[Reconciled, Completed] - - let hashes = await completed.serverTerminate() - - trace "hashes to sync", - server = self.peerManager.switch.peerInfo.peerId, msg_hashes = $hashes - - return ok(hashes) - - sent = ?await reconciled.send() - - trace "sync payload sent", - server = self.peerManager.switch.peerInfo.peerId, - client = conn.peerId, - payload = reconciled.payload - - continue - -proc initProtocolHandler(self: WakuSync) = - proc handle(conn: Connection, proto: string) {.async, closure.} = - debug "sync session requested", - server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId - - let hashes = (await self.handleSyncSession(conn)).valueOr: - debug "sync session ended", - server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId, error - - #TODO send error code and desc to client - return - - if hashes.len > 0 and self.transferCallBack.isSome(): - let callback = self.transferCallBack.get() - - (await callback(hashes, conn.peerId)).isOkOr: - error "transfer callback failed", error = $error - - debug "sync session ended gracefully", - server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId - - self.handler = handle - self.codec = WakuSyncCodec - -proc createPruneCallback( - self: WakuSync, wakuArchive: WakuArchive -): Result[PruneCallBack, string] = - if wakuArchive.isNil(): - return err ("waku archive unavailable") - - let callback: PruneCallback = proc( - pruneStart: Timestamp, pruneStop: Timestamp, cursor: Option[WakuMessageHash] - ): Future[ - Result[(seq[(WakuMessageHash, Timestamp)], Option[WakuMessageHash]), string] - ] {.async: (raises: []), closure.} = - let archiveCursor = - if cursor.isSome(): - some(cursor.get()) - else: - none(ArchiveCursor) - - let query = ArchiveQuery( - includeData: true, - cursor: archiveCursor, - startTime: some(pruneStart), - endTime: some(pruneStop), - pageSize: 100, - ) - - let catchable = catch: - await wakuArchive.findMessages(query) - - if catchable.isErr(): - return err("archive error: " & catchable.error.msg) - - let res = catchable.get() - let response = res.valueOr: - return err("archive error: " & $error) - - let elements = collect(newSeq): - for (hash, msg) in response.hashes.zip(response.messages): - (hash, msg.timestamp) - - let cursor = response.cursor - - return ok((elements, cursor)) - - return ok(callback) - -proc createTransferCallback( - self: WakuSync, wakuArchive: WakuArchive, wakuStoreClient: WakuStoreClient -): Result[TransferCallback, string] = - if wakuArchive.isNil(): - return err("waku archive unavailable") - - if wakuStoreClient.isNil(): - return err("waku store client unavailable") - - let callback: TransferCallback = proc( - hashes: seq[WakuMessageHash], peerId: PeerId - ): Future[Result[void, string]] {.async: (raises: []), closure.} = - var query = StoreQueryRequest() - query.includeData = true - query.messageHashes = hashes - query.paginationLimit = some(uint64(100)) - - while true: - let catchable = catch: - await wakuStoreClient.query(query, peerId) - - if catchable.isErr(): - return err("store client error: " & catchable.error.msg) - - let res = catchable.get() - let response = res.valueOr: - return err("store client error: " & $error) - - query.paginationCursor = response.paginationCursor - - for kv in response.messages: - let handleRes = catch: - await wakuArchive.syncMessageIngress( - kv.messageHash, kv.pubsubTopic.get(), kv.message.get() - ) - - if handleRes.isErr(): - error "message transfer failed", error = handleRes.error.msg - # Messages can be synced next time since they are not added to storage yet. - continue - - self.messageIngress(kv.pubsubTopic.get(), kv.messageHash, kv.message.get()) - - if query.paginationCursor.isNone(): - break - - return ok() - - return ok(callback) - -proc initFillStorage( - self: WakuSync, wakuArchive: WakuArchive -): Future[Result[void, string]] {.async.} = - if wakuArchive.isNil(): - return err("waku archive unavailable") - - let endTime = getNowInNanosecondTime() - let starTime = endTime - self.syncRange.nanos - - var query = ArchiveQuery( - includeData: true, - cursor: none(ArchiveCursor), - startTime: some(starTime), - endTime: some(endTime), - pageSize: 100, - ) - - while true: - let response = (await wakuArchive.findMessages(query)).valueOr: - return err($error) - - for i in 0 ..< response.hashes.len: - let hash = response.hashes[i] - let topic = response.topics[i] - let msg = response.messages[i] - - self.messageIngress(topic, hash, msg) - - if response.cursor.isNone(): - break - - query.cursor = response.cursor - - return ok() - -proc new*( - T: type WakuSync, - peerManager: PeerManager, - maxFrameSize: int = DefaultMaxFrameSize, - syncRange: timer.Duration = DefaultSyncRange, - syncInterval: timer.Duration = DefaultSyncInterval, - relayJitter: Duration = DefaultGossipSubJitter, - wakuArchive: WakuArchive, - wakuStoreClient: WakuStoreClient, - pruneCallback: Option[PruneCallback] = none(PruneCallback), - transferCallback: Option[TransferCallback] = none(TransferCallback), -): Future[Result[T, string]] {.async.} = - let storage = NegentropyStorage.new().valueOr: - return err("negentropy storage creation failed") - - var sync = WakuSync( - storage: storage, - peerManager: peerManager, - maxFrameSize: maxFrameSize, - syncInterval: syncInterval, - syncRange: syncRange, - relayJitter: relayJitter, - pruneOffset: syncInterval div 10, # 10% offset - ) - - sync.initProtocolHandler() - - sync.pruneCallBack = pruneCallback - - if sync.pruneCallBack.isNone(): - let res = sync.createPruneCallback(wakuArchive) - - if res.isErr(): - error "pruning callback creation error", error = res.error - else: - sync.pruneCallBack = some(res.get()) - - sync.transferCallBack = transferCallback - - if sync.transferCallBack.isNone(): - let res = sync.createTransferCallback(wakuArchive, wakuStoreClient) - - if res.isErr(): - error "transfer callback creation error", error = res.error - else: - sync.transferCallBack = some(res.get()) - - let res = await sync.initFillStorage(wakuArchive) - if res.isErr(): - warn "will not sync messages before this point in time", error = res.error - - info "WakuSync protocol initialized" - - return ok(sync) - -proc periodicSync(self: WakuSync, callback: TransferCallback) {.async.} = - debug "periodic sync initialized", interval = $self.syncInterval - - # to stagger the intervals - await sleepAsync((self.syncInterval div 2)) - - while true: # infinite loop - await sleepAsync(self.syncInterval) - - debug "periodic sync started" - - var - hashes: seq[WakuMessageHash] - peer: RemotePeerInfo - tries = 3 - - while true: - let res = (await self.storeSynchronization()).valueOr: - # we either try again or log an error and break - if tries > 0: - tries -= 1 - await sleepAsync(RetryDelay) - continue - else: - error "sync failed", error = $error - break - - hashes = res[0] - peer = res[1] - break - - if hashes.len > 0: - tries = 3 - while true: - (await callback(hashes, peer.peerId)).isOkOr: - # we either try again or log an error and break - if tries > 0: - tries -= 1 - await sleepAsync(RetryDelay) - continue - else: - error "transfer callback failed", error = $error - break - - break - - debug "periodic sync done", hashSynced = hashes.len - - continue - -proc periodicPrune(self: WakuSync, callback: PruneCallback) {.async.} = - debug "periodic prune initialized", interval = $self.syncInterval - - # Default T minus 60m - self.pruneStart = getNowInNanosecondTime() - self.syncRange.nanos - - await sleepAsync(self.syncInterval) - - # Default T minus 55m - var pruneStop = getNowInNanosecondTime() - self.syncRange.nanos - - while true: # infinite loop - await sleepAsync(self.syncInterval) - - debug "periodic prune started", - startTime = self.pruneStart - self.pruneOffset.nanos, - endTime = pruneStop, - storageSize = self.storage.len - - var (elements, cursor) = - (newSeq[(WakuMessageHash, Timestamp)](0), none(WakuMessageHash)) - - var tries = 3 - while true: - (elements, cursor) = ( - await callback(self.pruneStart - self.pruneOffset.nanos, pruneStop, cursor) - ).valueOr: - # we either try again or log an error and break - if tries > 0: - tries -= 1 - await sleepAsync(RetryDelay) - continue - else: - error "pruning callback failed", error = $error - break - - if elements.len == 0: - # no elements to remove, stop - break - - for (hash, timestamp) in elements: - self.storage.erase(timestamp, hash).isOkOr: - error "storage erase failed", - timestamp = timestamp, msg_hash = hash.to0xHex(), error = $error - continue - - if cursor.isNone(): - # no more pages, stop - break - - self.pruneStart = pruneStop - pruneStop = getNowInNanosecondTime() - self.syncRange.nanos - - debug "periodic prune done", storageSize = self.storage.len - - continue - -proc start*(self: WakuSync) = - self.started = true - - if self.transferCallBack.isSome() and self.syncInterval > ZeroDuration: - self.periodicSyncFut = self.periodicSync(self.transferCallBack.get()) - - if self.pruneCallBack.isSome() and self.syncInterval > ZeroDuration: - self.periodicPruneFut = self.periodicPrune(self.pruneCallBack.get()) - - info "WakuSync protocol started" - -proc stopWait*(self: WakuSync) {.async.} = - if self.transferCallBack.isSome() and self.syncInterval > ZeroDuration: - await self.periodicSyncFut.cancelAndWait() - - if self.pruneCallBack.isSome() and self.syncInterval > ZeroDuration: - await self.periodicPruneFut.cancelAndWait() - - info "WakuSync protocol stopped" diff --git a/waku/waku_sync/raw_bindings.nim b/waku/waku_sync/raw_bindings.nim deleted file mode 100644 index ce5ddd070..000000000 --- a/waku/waku_sync/raw_bindings.nim +++ /dev/null @@ -1,513 +0,0 @@ -{.push raises: [].} - -from os import DirSep - -import std/[strutils], chronicles, std/options, stew/byteutils, confutils, results -import ../waku_core/message - -const negentropyPath = - currentSourcePath.rsplit(DirSep, 1)[0] & DirSep & ".." & DirSep & ".." & DirSep & - "vendor" & DirSep & "negentropy" & DirSep & "cpp" & DirSep - -const NEGENTROPY_HEADER = negentropyPath & "negentropy_wrapper.h" - -logScope: - topics = "waku sync" - -type Buffer = object - len*: uint64 - `ptr`*: ptr uint8 - -type BindingResult = object - output: Buffer - have_ids_len: uint - need_ids_len: uint - have_ids: ptr Buffer - need_ids: ptr Buffer - error: cstring - -proc toWakuMessageHash(buffer: Buffer): WakuMessageHash = - assert buffer.len == 32 - - var hash: WakuMessageHash - - copyMem(hash[0].addr, buffer.ptr, 32) - - return hash - -proc toBuffer(x: openArray[byte]): Buffer = - ## converts the input to a Buffer object - ## the Buffer object is used to communicate data with the rln lib - var temp = @x - let baseAddr = cast[pointer](x) - let output = Buffer(`ptr`: cast[ptr uint8](baseAddr), len: uint64(temp.len)) - return output - -proc bufferToBytes(buffer: ptr Buffer, len: Option[uint64] = none(uint64)): seq[byte] = - var bufLen: uint64 - if isNone(len): - bufLen = buffer.len - else: - bufLen = len.get() - if bufLen == 0: - return @[] - trace "length of buffer is", len = bufLen - let bytes = newSeq[byte](bufLen) - copyMem(bytes[0].unsafeAddr, buffer.ptr, bufLen) - return bytes - -proc toBufferSeq(buffLen: uint, buffPtr: ptr Buffer): seq[Buffer] = - var uncheckedArr = cast[ptr UncheckedArray[Buffer]](buffPtr) - var mySequence = newSeq[Buffer](buffLen) - for i in 0 .. buffLen - 1: - mySequence[i] = uncheckedArr[i] - return mySequence - -### Storage ### - -type NegentropyStorage* = distinct pointer - -proc get_last_error(): cstring {.header: NEGENTROPY_HEADER, importc: "get_last_error".} - -# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L27 -proc storage_init( - db_path: cstring, name: cstring -): NegentropyStorage {.header: NEGENTROPY_HEADER, importc: "storage_new".} - -# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L41 -proc raw_insert( - storage: NegentropyStorage, timestamp: uint64, id: ptr Buffer -): bool {.header: NEGENTROPY_HEADER, importc: "storage_insert".} - -# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L43 -proc raw_erase( - storage: NegentropyStorage, timestamp: uint64, id: ptr Buffer -): bool {.header: NEGENTROPY_HEADER, importc: "storage_erase".} - -# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L29 -proc free( - storage: NegentropyStorage -) {.header: NEGENTROPY_HEADER, importc: "storage_delete".} - -# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L31 -proc size( - storage: NegentropyStorage -): cint {.header: NEGENTROPY_HEADER, importc: "storage_size".} - -### Negentropy ### - -type RawNegentropy* = distinct pointer - -# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L33 -proc constructNegentropy( - storage: NegentropyStorage, frameSizeLimit: uint64 -): RawNegentropy {.header: NEGENTROPY_HEADER, importc: "negentropy_new".} - -# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L37 -proc raw_initiate( - negentropy: RawNegentropy, r: ptr BindingResult -): int {.header: NEGENTROPY_HEADER, importc: "negentropy_initiate".} - -# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L39 -proc raw_setInitiator( - negentropy: RawNegentropy -) {.header: NEGENTROPY_HEADER, importc: "negentropy_setinitiator".} - -# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L45 -proc raw_reconcile( - negentropy: RawNegentropy, query: ptr Buffer, r: ptr BindingResult -): int {.header: NEGENTROPY_HEADER, importc: "reconcile".} - -# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L51 -proc raw_reconcile_with_ids( - negentropy: RawNegentropy, query: ptr Buffer, r: ptr BindingResult -): int {.header: NEGENTROPY_HEADER, importc: "reconcile_with_ids_no_cbk".} - -# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L35 -proc free( - negentropy: RawNegentropy -) {.header: NEGENTROPY_HEADER, importc: "negentropy_delete".} - -# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L53 -proc free_result( - r: ptr BindingResult -) {.header: NEGENTROPY_HEADER, importc: "free_result".} - -### SubRange ### - -type NegentropySubRangeStorage* = distinct pointer - -# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L57 -proc subrange_init( - storage: NegentropyStorage, startTimestamp: uint64, endTimestamp: uint64 -): NegentropySubRangeStorage {.header: NEGENTROPY_HEADER, importc: "subrange_new".} - -# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L59 -proc free( - subrange: NegentropySubRangeStorage -) {.header: NEGENTROPY_HEADER, importc: "subrange_delete".} - -# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L31 -proc size( - subrange: NegentropySubRangeStorage -): cint {.header: NEGENTROPY_HEADER, importc: "subrange_size".} - -### Negentropy with NegentropySubRangeStorage ### - -type RawNegentropySubRange = distinct pointer - -# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L61 -proc constructNegentropyWithSubRange( - subrange: NegentropySubRangeStorage, frameSizeLimit: uint64 -): RawNegentropySubRange {. - header: NEGENTROPY_HEADER, importc: "negentropy_subrange_new" -.} - -# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L65 -proc raw_initiate_subrange( - negentropy: RawNegentropySubRange, r: ptr BindingResult -): int {.header: NEGENTROPY_HEADER, importc: "negentropy_subrange_initiate".} - -# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L67 -proc raw_reconcile_subrange( - negentropy: RawNegentropySubRange, query: ptr Buffer, r: ptr BindingResult -): int {.header: NEGENTROPY_HEADER, importc: "reconcile_subrange".} - -# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L69 -proc raw_reconcile_with_ids_subrange( - negentropy: RawNegentropySubRange, query: ptr Buffer, r: ptr BindingResult -): int {.header: NEGENTROPY_HEADER, importc: "reconcile_with_ids_subrange_no_cbk".} - -# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L63 -proc free( - negentropy: RawNegentropySubRange -) {.header: NEGENTROPY_HEADER, importc: "negentropy_subrange_delete".} - -### Wrappings ### - -### Storage ### - -proc `==`*(a: NegentropyStorage, b: pointer): bool {.borrow.} - -proc new*(T: type NegentropyStorage): Result[T, string] = - #TODO db name and path - let storage = storage_init("", "") - - #[ TODO: Uncomment once we move to lmdb - if storage == nil: - return err("storage initialization failed") ]# - - if storage == nil: - return err($get_last_error()) - - return ok(storage) - -proc delete*(storage: NegentropyStorage) = - storage.free() - -proc erase*( - storage: NegentropyStorage, id: int64, hash: WakuMessageHash -): Result[void, string] = - var buffer = toBuffer(hash) - var bufPtr = addr(buffer) - let res = raw_erase(storage, uint64(id), bufPtr) - - #TODO error handling once we move to lmdb - - if res: - return ok() - else: - return err($get_last_error()) - -proc insert*( - storage: NegentropyStorage, id: int64, hash: WakuMessageHash -): Result[void, string] = - var buffer = toBuffer(hash) - var bufPtr = addr(buffer) - let res = raw_insert(storage, uint64(id), bufPtr) - - #TODO error handling once we move to lmdb - - if res: - return ok() - else: - return err($get_last_error()) - -proc len*(storage: NegentropyStorage): int = - int(storage.size) - -### SubRange ### - -proc `==`*(a: NegentropySubRangeStorage, b: pointer): bool {.borrow.} - -proc new*( - T: type NegentropySubRangeStorage, - storage: NegentropyStorage, - startTime: uint64 = uint64.low, - endTime: uint64 = uint64.high, -): Result[T, string] = - let subrange = subrange_init(storage, startTime, endTime) - - #[ TODO: Uncomment once we move to lmdb - if storage == nil: - return err("storage initialization failed") ]# - - if subrange == nil: - return err($get_last_error()) - - return ok(subrange) - -proc delete*(subrange: NegentropySubRangeStorage) = - subrange.free() - -proc len*(subrange: NegentropySubRangeStorage): int = - int(subrange.size) - -### Interface ### - -type - Negentropy* = ref object of RootObj - - NegentropyWithSubRange = ref object of Negentropy - inner: RawNegentropySubRange - - NegentropyWithStorage = ref object of Negentropy - inner: RawNegentropy - - NegentropyPayload* = distinct seq[byte] - -method delete*(self: Negentropy) {.base, gcsafe.} = - discard - -method initiate*(self: Negentropy): Result[NegentropyPayload, string] {.base.} = - discard - -method serverReconcile*( - self: Negentropy, query: NegentropyPayload -): Result[NegentropyPayload, string] {.base.} = - discard - -method clientReconcile*( - self: Negentropy, - query: NegentropyPayload, - haves: var seq[WakuMessageHash], - needs: var seq[WakuMessageHash], -): Result[Option[NegentropyPayload], string] {.base.} = - discard - -### Impl. ### - -proc new*( - T: type Negentropy, - storage: NegentropyStorage | NegentropySubRangeStorage, - frameSizeLimit: int, -): Result[T, string] = - if storage is NegentropyStorage: - let raw_negentropy = - constructNegentropy(NegentropyStorage(storage), uint64(frameSizeLimit)) - - if cast[pointer](raw_negentropy) == nil: - return err($get_last_error()) - - let negentropy = NegentropyWithStorage(inner: raw_negentropy) - return ok(negentropy) - elif storage is NegentropySubRangeStorage: - let raw_negentropy = constructNegentropyWithSubRange( - NegentropySubRangeStorage(storage), uint64(frameSizeLimit) - ) - - if cast[pointer](raw_negentropy) == nil: - return err($get_last_error()) - - let negentropy = NegentropyWithSubRange(inner: raw_negentropy) - return ok(negentropy) - -method delete*(self: NegentropyWithSubRange) = - self.inner.free() - -method initiate*(self: NegentropyWithSubRange): Result[NegentropyPayload, string] = - ## Client inititate a sync session with a server by sending a payload - var myResult {.noinit.}: BindingResult = BindingResult() - var myResultPtr = addr myResult - - let ret = self.inner.raw_initiate_subrange(myResultPtr) - if ret < 0 or myResultPtr == nil: - error "negentropy initiate failed with code ", code = ret - return err($get_last_error()) - let bytes: seq[byte] = bufferToBytes(addr(myResultPtr.output)) - free_result(myResultPtr) - trace "received return from initiate", len = myResultPtr.output.len - - return ok(NegentropyPayload(bytes)) - -method serverReconcile*( - self: NegentropyWithSubRange, query: NegentropyPayload -): Result[NegentropyPayload, string] = - ## Server response to a negentropy payload. - ## Always return an answer. - - let queryBuf = toBuffer(seq[byte](query)) - var queryBufPtr = queryBuf.unsafeAddr #TODO: Figure out why addr(buffer) throws error - var myResult {.noinit.}: BindingResult = BindingResult() - var myResultPtr = addr myResult - - let ret = self.inner.raw_reconcile_subrange(queryBufPtr, myResultPtr) - if ret < 0: - error "raw_reconcile failed with code ", code = ret - return err($get_last_error()) - trace "received return from raw_reconcile", len = myResultPtr.output.len - - let outputBytes: seq[byte] = bufferToBytes(addr(myResultPtr.output)) - trace "outputBytes len", len = outputBytes.len - free_result(myResultPtr) - - return ok(NegentropyPayload(outputBytes)) - -method clientReconcile*( - self: NegentropyWithSubRange, - query: NegentropyPayload, - haves: var seq[WakuMessageHash], - needs: var seq[WakuMessageHash], -): Result[Option[NegentropyPayload], string] = - ## Client response to a negentropy payload. - ## May return an answer, if not the sync session done. - - let cQuery = toBuffer(seq[byte](query)) - - var myResult {.noinit.}: BindingResult = BindingResult() - myResult.have_ids_len = 0 - myResult.need_ids_len = 0 - var myResultPtr = addr myResult - - let ret = self.inner.raw_reconcile_with_ids_subrange(cQuery.unsafeAddr, myResultPtr) - if ret < 0: - error "raw_reconcile failed with code ", code = ret - return err($get_last_error()) - - let output = bufferToBytes(addr myResult.output) - - var - have_hashes: seq[Buffer] - need_hashes: seq[Buffer] - - if myResult.have_ids_len > 0: - have_hashes = toBufferSeq(myResult.have_ids_len, myResult.have_ids) - if myResult.need_ids_len > 0: - need_hashes = toBufferSeq(myResult.need_ids_len, myResult.need_ids) - - trace "have and need hashes ", - have_count = have_hashes.len, need_count = need_hashes.len - - for i in 0 .. have_hashes.len - 1: - var hash = toWakuMessageHash(have_hashes[i]) - trace "have hashes ", index = i, msg_hash = hash.to0xHex() - haves.add(hash) - - for i in 0 .. need_hashes.len - 1: - var hash = toWakuMessageHash(need_hashes[i]) - trace "need hashes ", index = i, msg_hash = hash.to0xHex() - needs.add(hash) - - trace "return ", output = output, len = output.len - - free_result(myResultPtr) - - if output.len < 1: - return ok(none(NegentropyPayload)) - - return ok(some(NegentropyPayload(output))) - -method delete*(self: NegentropyWithStorage) = - self.inner.free() - -method initiate*(self: NegentropyWithStorage): Result[NegentropyPayload, string] = - ## Client inititate a sync session with a server by sending a payload - var myResult {.noinit.}: BindingResult = BindingResult() - var myResultPtr = addr myResult - - let ret = self.inner.raw_initiate(myResultPtr) - if ret < 0 or myResultPtr == nil: - error "negentropy initiate failed with code ", code = ret - return err($get_last_error()) - let bytes: seq[byte] = bufferToBytes(addr(myResultPtr.output)) - free_result(myResultPtr) - trace "received return from initiate", len = myResultPtr.output.len - - return ok(NegentropyPayload(bytes)) - -method serverReconcile*( - self: NegentropyWithStorage, query: NegentropyPayload -): Result[NegentropyPayload, string] = - ## Server response to a negentropy payload. - ## Always return an answer. - - let queryBuf = toBuffer(seq[byte](query)) - var queryBufPtr = queryBuf.unsafeAddr #TODO: Figure out why addr(buffer) throws error - var myResult {.noinit.}: BindingResult = BindingResult() - var myResultPtr = addr myResult - - let ret = self.inner.raw_reconcile(queryBufPtr, myResultPtr) - if ret < 0: - error "raw_reconcile failed with code ", code = ret - return err($get_last_error()) - trace "received return from raw_reconcile", len = myResultPtr.output.len - - let outputBytes: seq[byte] = bufferToBytes(addr(myResultPtr.output)) - trace "outputBytes len", len = outputBytes.len - free_result(myResultPtr) - - return ok(NegentropyPayload(outputBytes)) - -method clientReconcile*( - self: NegentropyWithStorage, - query: NegentropyPayload, - haves: var seq[WakuMessageHash], - needs: var seq[WakuMessageHash], -): Result[Option[NegentropyPayload], string] = - ## Client response to a negentropy payload. - ## May return an answer, if not the sync session done. - - let cQuery = toBuffer(seq[byte](query)) - - var myResult {.noinit.}: BindingResult = BindingResult() - myResult.have_ids_len = 0 - myResult.need_ids_len = 0 - var myResultPtr = addr myResult - - let ret = self.inner.raw_reconcile_with_ids(cQuery.unsafeAddr, myResultPtr) - if ret < 0: - error "raw_reconcile failed with code ", code = ret - return err($get_last_error()) - - let output = bufferToBytes(addr myResult.output) - - var - have_hashes: seq[Buffer] - need_hashes: seq[Buffer] - - if myResult.have_ids_len > 0: - have_hashes = toBufferSeq(myResult.have_ids_len, myResult.have_ids) - if myResult.need_ids_len > 0: - need_hashes = toBufferSeq(myResult.need_ids_len, myResult.need_ids) - - trace "have and need hashes ", - have_count = have_hashes.len, need_count = need_hashes.len - - for i in 0 .. have_hashes.len - 1: - var hash = toWakuMessageHash(have_hashes[i]) - trace "have hashes ", index = i, msg_hash = hash.to0xHex() - haves.add(hash) - - for i in 0 .. need_hashes.len - 1: - var hash = toWakuMessageHash(need_hashes[i]) - trace "need hashes ", index = i, msg_hash = hash.to0xHex() - needs.add(hash) - - trace "return ", output = output, len = output.len - - free_result(myResultPtr) - - if output.len < 1: - return ok(none(NegentropyPayload)) - - return ok(some(NegentropyPayload(output))) diff --git a/waku/waku_sync/session.nim b/waku/waku_sync/session.nim deleted file mode 100644 index ff6d741ef..000000000 --- a/waku/waku_sync/session.nim +++ /dev/null @@ -1,240 +0,0 @@ -{.push raises: [].} - -import std/options, results, chronos, libp2p/stream/connection - -import - ../common/nimchronos, - ../common/protobuf, - ../waku_core, - ./raw_bindings, - ./common, - ./codec - -#TODO add states for protocol negotiation - -### Type State ### - -type ClientSync* = object - haveHashes: seq[WakuMessageHash] - -type ServerSync* = object - -# T is either ClientSync or ServerSync - -type Reconciled*[T] = object - sync: T - negentropy: Negentropy - connection: Connection - frameSize: int - payload*: SyncPayload - -type Sent*[T] = object - sync: T - negentropy: Negentropy - connection: Connection - frameSize: int - -type Received*[T] = object - sync: T - negentropy: Negentropy - connection: Connection - frameSize: int - payload*: SyncPayload - -type Completed*[T] = object - sync: T - negentropy: Negentropy - connection: Connection - haveHashes: seq[WakuMessageHash] - -### State Transition ### - -proc clientInitialize*( - store: NegentropyStorage, - conn: Connection, - frameSize = DefaultMaxFrameSize, - start = int64.low, - `end` = int64.high, -): Result[Reconciled[ClientSync], string] = - let subrange = ?NegentropySubRangeStorage.new(store, uint64(start), uint64(`end`)) - - let negentropy = ?Negentropy.new(subrange, frameSize) - - let negentropyPayload = ?negentropy.initiate() - - let payload = SyncPayload(negentropy: seq[byte](negentropyPayload)) - - let sync = ClientSync() - - return ok( - Reconciled[ClientSync]( - sync: sync, - negentropy: negentropy, - connection: conn, - frameSize: frameSize, - payload: payload, - ) - ) - -proc serverInitialize*( - store: NegentropyStorage, - conn: Connection, - frameSize = DefaultMaxFrameSize, - syncStart = int64.low, - syncEnd = int64.high, -): Result[Sent[ServerSync], string] = - let subrange = - ?NegentropySubRangeStorage.new(store, uint64(syncStart), uint64(syncEnd)) - - let negentropy = ?Negentropy.new(subrange, frameSize) - - let sync = ServerSync() - - return ok( - Sent[ServerSync]( - sync: sync, negentropy: negentropy, connection: conn, frameSize: frameSize - ) - ) - -proc send*[T](self: Reconciled[T]): Future[Result[Sent[T], string]] {.async.} = - let writeRes = catch: - await self.connection.writeLP(self.payload.encode().buffer) - - if writeRes.isErr(): - return err("send connection write error: " & writeRes.error.msg) - - return ok( - Sent[T]( - sync: self.sync, - negentropy: self.negentropy, - connection: self.connection, - frameSize: self.frameSize, - ) - ) - -proc listenBack*[T](self: Sent[T]): Future[Result[Received[T], string]] {.async.} = - let readRes = catch: - await self.connection.readLp(-1) - - let buffer: seq[byte] = - if readRes.isOk(): - readRes.get() - else: - return err("listenBack connection read error: " & readRes.error.msg) - - # can't otherwise the compiler complains - #let payload = SyncPayload.decode(buffer).valueOr: - #return err($error) - - let decodeRes = SyncPayload.decode(buffer) - - let payload = - if decodeRes.isOk(): - decodeRes.get() - else: - let decodeError: ProtobufError = decodeRes.error - let errMsg = $decodeError - return err("listenBack decoding error: " & errMsg) - - return ok( - Received[T]( - sync: self.sync, - negentropy: self.negentropy, - connection: self.connection, - frameSize: self.frameSize, - payload: payload, - ) - ) - -# Aliasing for readability -type ContinueOrCompleted[T] = Result[Reconciled[T], Completed[T]] -type Continue[T] = Reconciled[T] - -proc clientReconcile*( - self: Received[ClientSync], needHashes: var seq[WakuMessageHash] -): Result[ContinueOrCompleted[ClientSync], string] = - var haves = self.sync.haveHashes - - let responseOpt = - ?self.negentropy.clientReconcile( - NegentropyPayload(self.payload.negentropy), haves, needHashes - ) - - let sync = ClientSync(haveHashes: haves) - - let response = responseOpt.valueOr: - let res = ContinueOrCompleted[ClientSync].err( - Completed[ClientSync]( - sync: sync, negentropy: self.negentropy, connection: self.connection - ) - ) - - return ok(res) - - let payload = SyncPayload(negentropy: seq[byte](response)) - - let res = ContinueOrCompleted[ClientSync].ok( - Continue[ClientSync]( - sync: sync, - negentropy: self.negentropy, - connection: self.connection, - frameSize: self.frameSize, - payload: payload, - ) - ) - - return ok(res) - -proc serverReconcile*( - self: Received[ServerSync] -): Result[ContinueOrCompleted[ServerSync], string] = - if self.payload.negentropy.len == 0: - let res = ContinueOrCompleted[ServerSync].err( - Completed[ServerSync]( - sync: self.sync, - negentropy: self.negentropy, - connection: self.connection, - haveHashes: self.payload.hashes, - ) - ) - - return ok(res) - - let response = - ?self.negentropy.serverReconcile(NegentropyPayload(self.payload.negentropy)) - - let payload = SyncPayload(negentropy: seq[byte](response)) - - let res = ContinueOrCompleted[ServerSync].ok( - Continue[ServerSync]( - sync: self.sync, - negentropy: self.negentropy, - connection: self.connection, - frameSize: self.frameSize, - payload: payload, - ) - ) - - return ok(res) - -proc clientTerminate*( - self: Completed[ClientSync] -): Future[Result[void, string]] {.async.} = - let payload = SyncPayload(hashes: self.sync.haveHashes) - - let writeRes = catch: - await self.connection.writeLp(payload.encode().buffer) - - if writeRes.isErr(): - return err("clientTerminate connection write error: " & writeRes.error.msg) - - self.negentropy.delete() - - return ok() - -proc serverTerminate*( - self: Completed[ServerSync] -): Future[seq[WakuMessageHash]] {.async.} = - self.negentropy.delete() - - return self.haveHashes diff --git a/waku/waku_sync/storage_manager.nim b/waku/waku_sync/storage_manager.nim deleted file mode 100644 index 528da2b64..000000000 --- a/waku/waku_sync/storage_manager.nim +++ /dev/null @@ -1,76 +0,0 @@ -# Unused yet. Kept for future use. -#[ when (NimMajor, NimMinor) < (1, 4): - {.push raises: [Defect].} -else: - {.push raises: [].} - -import std/[times, tables, options], chronicles, chronos, stew/results - -import ./raw_bindings, ../waku_core/time - -logScope: - topics = "waku sync" - -type WakuSyncStorageManager* = ref object - storages: OrderedTable[string, Storage] - # Map of dateTime and Storage objects. DateTime is of the format YYYYMMDDHH - maxHours: int64 - -proc new*( - T: type WakuSyncStorageManager, - hoursToStore: times.Duration = initDuration(minutes = 120), -): T = - return WakuSyncStorageManager(maxHours: hoursToStore.inHours) - -proc getRecentStorage*(self: WakuSyncStorageManager): Result[Option[Storage], string] = - if self.storages.len() == 0: - return ok(none(Storage)) - var storageToFetch: Storage - #is there a more effective way to fetch last element? - for k, storage in self.storages: - storageToFetch = storage - - return ok(some(storageToFetch)) - -proc deleteOldestStorage*(self: WakuSyncStorageManager) = - var storageToDelete: Storage - var time: string - #is there a more effective way to fetch first element? - for k, storage in self.storages: - storageToDelete = storage - time = k - break - - if self.storages.pop(time, storageToDelete): - delete(storageToDelete) - -proc retrieveStorage*( - self: WakuSyncStorageManager, time: Timestamp -): Result[Option[Storage], string] = - var timestamp: Timestamp - if time == 0: - timestamp = timestampInSeconds(getNowInNanosecondTime()) - debug "timestamp not provided, using now to fetch storage", timestamp = timestamp - else: - timestamp = timestampInSeconds(time) - let tsTime = times.fromUnix(timestamp) - let dateTime = times.format(tsTime, "yyyyMMddHH", utc()) - - var storage: Storage = self.storages.getOrDefault(dateTime) - if storage == nil: - #create a new storage - # TODO: May need synchronization?? - # Limit number of storages to configured duration - let hours = self.storages.len() - if hours == self.maxHours: - #Need to delete oldest storage at this point, but what if that is being synced? - self.deleteOldestStorage() - info "number of storages reached, deleting the oldest" - info "creating a new storage for ", time = dateTime - storage = Storage.new().valueOr: - error "storage creation failed" - return err(error) - self.storages[dateTime] = storage - - return ok(some(storage)) - ]#