diff --git a/Makefile b/Makefile index 5ce8fe208..97d557e39 100644 --- a/Makefile +++ b/Makefile @@ -40,8 +40,8 @@ ifeq ($(detected_OS),Windows) NIM_PARAMS += --passL:"-L$(MINGW_PATH)/lib" NIM_PARAMS += --passL:"-Lvendor/nim-nat-traversal/vendor/miniupnp/miniupnpc" NIM_PARAMS += --passL:"-Lvendor/nim-nat-traversal/vendor/libnatpmp-upstream" - - LIBS = -static -lws2_32 -lbcrypt -liphlpapi -luserenv -lntdll -lminiupnpc -lnatpmp -lpq + + LIBS = -static -lws2_32 -lbcrypt -liphlpapi -luserenv -lntdll -lminiupnpc -lnatpmp -lpq NIM_PARAMS += $(foreach lib,$(LIBS),--passL:"$(lib)") endif @@ -83,7 +83,7 @@ HEAPTRACKER_INJECT ?= 0 ifeq ($(HEAPTRACKER), 1) # Needed to make nimbus-build-system use the Nim's 'heaptrack_support' branch DOCKER_NIM_COMMIT := NIM_COMMIT=heaptrack_support -TARGET := prod-with-heaptrack +TARGET := heaptrack-build ifeq ($(HEAPTRACKER_INJECT), 1) # the Nim compiler will load 'libheaptrack_inject.so' @@ -340,6 +340,17 @@ docker-image: --target $(TARGET) \ --tag $(DOCKER_IMAGE_NAME) . +docker-quick-image: MAKE_TARGET ?= wakunode2 +docker-quick-image: DOCKER_IMAGE_TAG ?= $(MAKE_TARGET)-$(GIT_VERSION) +docker-quick-image: DOCKER_IMAGE_NAME ?= wakuorg/nwaku:$(DOCKER_IMAGE_TAG) +docker-quick-image: NIM_PARAMS := $(NIM_PARAMS) -d:chronicles_colors:none -d:insecure -d:postgres --passL:$(LIBRLN_FILE) --passL:-lm +docker-quick-image: | build deps librln wakunode2 + docker build \ + --build-arg="MAKE_TARGET=$(MAKE_TARGET)" \ + --tag $(DOCKER_IMAGE_NAME) \ + --file docker/binaries/Dockerfile.bn.amd64 \ + . + docker-push: docker push $(DOCKER_IMAGE_NAME) @@ -367,6 +378,14 @@ docker-liteprotocoltester: --file apps/liteprotocoltester/Dockerfile.liteprotocoltester.compile \ . +docker-quick-liteprotocoltester: DOCKER_LPT_TAG ?= latest +docker-quick-liteprotocoltester: DOCKER_LPT_NAME ?= wakuorg/liteprotocoltester:$(DOCKER_LPT_TAG) +docker-quick-liteprotocoltester: | liteprotocoltester + docker build \ + --tag $(DOCKER_LPT_NAME) \ + --file apps/liteprotocoltester/Dockerfile.liteprotocoltester \ + . + docker-liteprotocoltester-push: docker push $(DOCKER_LPT_NAME) @@ -497,5 +516,4 @@ release-notes: release-notes |\ sed -E 's@#([0-9]+)@[#\1](https://github.com/waku-org/nwaku/issues/\1)@g' # I could not get the tool to replace issue ids with links, so using sed for now, -# asked here: https://github.com/bvieira/sv4git/discussions/101 - +# asked here: https://github.com/bvieira/sv4git/discussions/101 \ No newline at end of file diff --git a/ci/Jenkinsfile.release b/ci/Jenkinsfile.release index fcc353be8..1a2125402 100644 --- a/ci/Jenkinsfile.release +++ b/ci/Jenkinsfile.release @@ -78,7 +78,7 @@ pipeline { "--build-arg=NIMFLAGS='${params.NIMFLAGS} -d:postgres ' " + "--build-arg=LOG_LEVEL='${params.LOWEST_LOG_LEVEL_ALLOWED}' " + "--build-arg=DEBUG='${params.DEBUG ? "1" : "0"} ' " + - "--target=${params.HEAPTRACK ? "prod-with-heaptrack" : "prod"} ." + "--target=${params.HEAPTRACK ? "heaptrack-build" : "prod"} ." ) } } } diff --git a/tests/waku_rln_relay/test_waku_rln_relay.nim b/tests/waku_rln_relay/test_waku_rln_relay.nim index bc1c3f640..95ec7b4c7 100644 --- a/tests/waku_rln_relay/test_waku_rln_relay.nim +++ b/tests/waku_rln_relay/test_waku_rln_relay.nim @@ -722,13 +722,13 @@ suite "Waku rln relay": # validate messages # validateMessage proc checks the validity of the message fields and adds it to the log (if valid) let - msgValidate1 = wakuRlnRelay.validateMessageAndUpdateLog(wm1, some(time)) + msgValidate1 = wakuRlnRelay.validateMessageAndUpdateLog(wm1) # wm2 is published within the same Epoch as wm1 and should be found as spam - msgValidate2 = wakuRlnRelay.validateMessageAndUpdateLog(wm2, some(time)) + msgValidate2 = wakuRlnRelay.validateMessageAndUpdateLog(wm2) # a valid message should be validated successfully - msgValidate3 = wakuRlnRelay.validateMessageAndUpdateLog(wm3, some(time)) + msgValidate3 = wakuRlnRelay.validateMessageAndUpdateLog(wm3) # wm4 has no rln proof and should not be validated - msgValidate4 = wakuRlnRelay.validateMessageAndUpdateLog(wm4, some(time)) + msgValidate4 = wakuRlnRelay.validateMessageAndUpdateLog(wm4) check: msgValidate1 == MessageValidationResult.Valid @@ -778,9 +778,9 @@ suite "Waku rln relay": # validate messages # validateMessage proc checks the validity of the message fields and adds it to the log (if valid) let - msgValidate1 = wakuRlnRelay1.validateMessageAndUpdateLog(wm1, some(time)) + msgValidate1 = wakuRlnRelay1.validateMessageAndUpdateLog(wm1) # since this message is from a different sender, it should be validated successfully - msgValidate2 = wakuRlnRelay1.validateMessageAndUpdateLog(wm2, some(time)) + msgValidate2 = wakuRlnRelay1.validateMessageAndUpdateLog(wm2) check: msgValidate1 == MessageValidationResult.Valid diff --git a/tests/waku_rln_relay/test_wakunode_rln_relay.nim b/tests/waku_rln_relay/test_wakunode_rln_relay.nim index dd91e95bd..fc176258a 100644 --- a/tests/waku_rln_relay/test_wakunode_rln_relay.nim +++ b/tests/waku_rln_relay/test_wakunode_rln_relay.nim @@ -487,13 +487,6 @@ procSuite "WakuNode - RLN relay": await node3.stop() xasyncTest "clearNullifierLog: should clear epochs > MaxEpochGap": -<<<<<<< HEAD -======= - ## This is skipped because is flaky and made CI randomly fail but is useful to run manually -<<<<<<< HEAD ->>>>>>> deprecate_sync_strategy -======= ->>>>>>> deprecate_sync_strategy # Given two nodes let contentTopic = ContentTopic("/waku/2/default-content/proto") diff --git a/tests/waku_store_sync/sync_utils.nim b/tests/waku_store_sync/sync_utils.nim index a81ad6e2f..e7fd82b57 100644 --- a/tests/waku_store_sync/sync_utils.nim +++ b/tests/waku_store_sync/sync_utils.nim @@ -1,4 +1,4 @@ -import std/[options, random], chronos +import std/[options, random], chronos, chronicles import waku/[ @@ -23,7 +23,7 @@ proc randomHash*(rng: var Rand): WakuMessageHash = proc newTestWakuRecon*( switch: Switch, idsRx: AsyncQueue[SyncID], - wantsTx: AsyncQueue[(PeerId, Fingerprint)], + wantsTx: AsyncQueue[PeerId], needsTx: AsyncQueue[(PeerId, Fingerprint)], cluster: uint16 = 1, shards: seq[uint16] = @[0, 1, 2, 3, 4, 5, 6, 7], @@ -51,7 +51,7 @@ proc newTestWakuRecon*( proc newTestWakuTransfer*( switch: Switch, idsTx: AsyncQueue[SyncID], - wantsRx: AsyncQueue[(PeerId, Fingerprint)], + wantsRx: AsyncQueue[PeerId], needsRx: AsyncQueue[(PeerId, Fingerprint)], ): SyncTransfer = let peerManager = PeerManager.new(switch) diff --git a/tests/waku_store_sync/test_protocol.nim b/tests/waku_store_sync/test_protocol.nim index df14de6a1..efdd6a885 100644 --- a/tests/waku_store_sync/test_protocol.nim +++ b/tests/waku_store_sync/test_protocol.nim @@ -27,7 +27,7 @@ suite "Waku Sync: reconciliation": var idsChannel {.threadvar.}: AsyncQueue[SyncID] - localWants {.threadvar.}: AsyncQueue[(PeerId, WakuMessageHash)] + localWants {.threadvar.}: AsyncQueue[PeerId] remoteNeeds {.threadvar.}: AsyncQueue[(PeerId, WakuMessageHash)] var server {.threadvar.}: SyncReconciliation @@ -43,7 +43,7 @@ suite "Waku Sync: reconciliation": await allFutures(serverSwitch.start(), clientSwitch.start()) idsChannel = newAsyncQueue[SyncID]() - localWants = newAsyncQueue[(PeerId, WakuMessageHash)]() + localWants = newAsyncQueue[PeerId]() remoteNeeds = newAsyncQueue[(PeerId, WakuMessageHash)]() server = await newTestWakuRecon(serverSwitch, idsChannel, localWants, remoteNeeds) @@ -61,7 +61,6 @@ suite "Waku Sync: reconciliation": asyncTest "sync 2 nodes both empty": check: idsChannel.len == 0 - localWants.len == 0 remoteNeeds.len == 0 let res = await client.storeSynchronization(some(serverPeerInfo)) @@ -69,7 +68,6 @@ suite "Waku Sync: reconciliation": check: idsChannel.len == 0 - localWants.len == 0 remoteNeeds.len == 0 asyncTest "sync 2 nodes empty client full server": @@ -141,8 +139,6 @@ suite "Waku Sync: reconciliation": check: remoteNeeds.contains((serverPeerInfo.peerId, hash3)) == false remoteNeeds.contains((clientPeerInfo.peerId, hash2)) == false - localWants.contains((clientPeerInfo.peerId, hash3)) == false - localWants.contains((serverPeerInfo.peerId, hash2)) == false var syncRes = await client.storeSynchronization(some(serverPeerInfo)) assert syncRes.isOk(), $syncRes.error @@ -150,8 +146,6 @@ suite "Waku Sync: reconciliation": check: remoteNeeds.contains((serverPeerInfo.peerId, hash3)) == true remoteNeeds.contains((clientPeerInfo.peerId, hash2)) == true - localWants.contains((clientPeerInfo.peerId, hash3)) == true - localWants.contains((serverPeerInfo.peerId, hash2)) == true asyncTest "sync 2 nodes different shards": let @@ -170,8 +164,6 @@ suite "Waku Sync: reconciliation": check: remoteNeeds.contains((serverPeerInfo.peerId, hash3)) == false remoteNeeds.contains((clientPeerInfo.peerId, hash2)) == false - localWants.contains((clientPeerInfo.peerId, hash3)) == false - localWants.contains((serverPeerInfo.peerId, hash2)) == false server = await newTestWakuRecon( serverSwitch, idsChannel, localWants, remoteNeeds, shards = @[0.uint16, 1, 2, 3] @@ -185,7 +177,6 @@ suite "Waku Sync: reconciliation": check: remoteNeeds.len == 0 - localWants.len == 0 asyncTest "sync 2 nodes same hashes": let @@ -200,14 +191,12 @@ suite "Waku Sync: reconciliation": client.messageIngress(hash2, msg2) check: - localWants.len == 0 remoteNeeds.len == 0 let res = await client.storeSynchronization(some(serverPeerInfo)) assert res.isOk(), $res.error check: - localWants.len == 0 remoteNeeds.len == 0 asyncTest "sync 2 nodes 100K msgs 1 diff": @@ -236,14 +225,12 @@ suite "Waku Sync: reconciliation": timestamp += Timestamp(part) check: - localWants.contains((serverPeerInfo.peerId, WakuMessageHash(diff))) == false remoteNeeds.contains((clientPeerInfo.peerId, WakuMessageHash(diff))) == false let res = await client.storeSynchronization(some(serverPeerInfo)) assert res.isOk(), $res.error check: - localWants.contains((serverPeerInfo.peerId, WakuMessageHash(diff))) == true remoteNeeds.contains((clientPeerInfo.peerId, WakuMessageHash(diff))) == true asyncTest "sync 2 nodes 10K msgs 1K diffs": @@ -286,7 +273,6 @@ suite "Waku Sync: reconciliation": continue check: - localWants.len == 0 remoteNeeds.len == 0 let res = await client.storeSynchronization(some(serverPeerInfo)) @@ -294,7 +280,6 @@ suite "Waku Sync: reconciliation": # timimg issue make it hard to match exact numbers check: - localWants.len > 900 remoteNeeds.len > 900 suite "Waku Sync: transfer": @@ -310,10 +295,10 @@ suite "Waku Sync: transfer": var serverIds {.threadvar.}: AsyncQueue[SyncID] - serverLocalWants {.threadvar.}: AsyncQueue[(PeerId, WakuMessageHash)] + serverLocalWants {.threadvar.}: AsyncQueue[PeerId] serverRemoteNeeds {.threadvar.}: AsyncQueue[(PeerId, WakuMessageHash)] clientIds {.threadvar.}: AsyncQueue[SyncID] - clientLocalWants {.threadvar.}: AsyncQueue[(PeerId, WakuMessageHash)] + clientLocalWants {.threadvar.}: AsyncQueue[PeerId] clientRemoteNeeds {.threadvar.}: AsyncQueue[(PeerId, WakuMessageHash)] var @@ -341,7 +326,7 @@ suite "Waku Sync: transfer": clientPeerManager = PeerManager.new(clientSwitch) serverIds = newAsyncQueue[SyncID]() - serverLocalWants = newAsyncQueue[(PeerId, WakuMessageHash)]() + serverLocalWants = newAsyncQueue[PeerId]() serverRemoteNeeds = newAsyncQueue[(PeerId, WakuMessageHash)]() server = SyncTransfer.new( @@ -353,7 +338,7 @@ suite "Waku Sync: transfer": ) clientIds = newAsyncQueue[SyncID]() - clientLocalWants = newAsyncQueue[(PeerId, WakuMessageHash)]() + clientLocalWants = newAsyncQueue[PeerId]() clientRemoteNeeds = newAsyncQueue[(PeerId, WakuMessageHash)]() client = SyncTransfer.new( @@ -389,8 +374,8 @@ suite "Waku Sync: transfer": serverDriver = serverDriver.put(DefaultPubsubTopic, msgs) - # add server info and msg hash to client want channel - let want = (serverPeerInfo.peerId, hash) + # add server info to client want channel + let want = serverPeerInfo.peerId await clientLocalWants.put(want) # add client info and msg hash to server need channel diff --git a/tests/wakunode_rest/test_rest_admin.nim b/tests/wakunode_rest/test_rest_admin.nim index bdab61a75..a3546f1f8 100644 --- a/tests/wakunode_rest/test_rest_admin.nim +++ b/tests/wakunode_rest/test_rest_admin.nim @@ -120,7 +120,9 @@ suite "Waku v2 Rest API - Admin": check: getRes.status == 200 $getRes.contentType == $MIMETYPE_JSON - getRes.data.len() == 0 + getRes.data.len() == 1 + getRes.data[0].multiaddr == nonExistentPeer + getRes.data[0].connected == CannotConnect asyncTest "Get filter data": await allFutures( @@ -274,7 +276,7 @@ suite "Waku v2 Rest API - Admin": check: postRes.status == 200 - let getRes = await client.getConnectedRelayPeers() + let getRes = await client.getRelayPeers() check: getRes.status == 200 @@ -286,13 +288,13 @@ suite "Waku v2 Rest API - Admin": # Check peer 3 # Todo: investigate why the test setup missing remote peer's shard info - # let getRes2 = await client.getConnectedRelayPeersByShard(0) + # let getRes2 = await client.getRelayPeersByShard(0) # check: # getRes2.status == 200 # $getRes2.contentType == $MIMETYPE_JSON # getRes2.data.peers.len() == 2 - let getRes3 = await client.getConnectedRelayPeersByShard(99) + let getRes3 = await client.getRelayPeersByShard(99) check: getRes3.status == 200 $getRes3.contentType == $MIMETYPE_JSON diff --git a/waku/factory/builder.nim b/waku/factory/builder.nim index caa84db63..78b07ed9b 100644 --- a/waku/factory/builder.nim +++ b/waku/factory/builder.nim @@ -196,6 +196,8 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] = except CatchableError: return err("failed to create switch: " & getCurrentExceptionMsg()) + let netConfig = builder.netConfig.get() + let peerManager = PeerManager.new( switch = switch, storage = builder.peerStorage.get(nil), @@ -203,12 +205,13 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] = maxServicePeers = some(builder.maxServicePeers), colocationLimit = builder.colocationLimit, shardedPeerManagement = builder.shardAware, + dnsNameServers = netConfig.dnsNameServers, ) var node: WakuNode try: node = WakuNode.new( - netConfig = builder.netConfig.get(), + netConfig = netConfig, enr = builder.record.get(), switch = switch, peerManager = peerManager, diff --git a/waku/factory/internal_config.nim b/waku/factory/internal_config.nim index 08f11f1c5..b5275d00b 100644 --- a/waku/factory/internal_config.nim +++ b/waku/factory/internal_config.nim @@ -155,6 +155,7 @@ proc networkConfiguration*(conf: WakuNodeConf, clientId: string): NetConfigResul dns4DomainName = dns4DomainName, discv5UdpPort = discv5UdpPort, wakuFlags = some(wakuFlags), + dnsNameServers = conf.dnsAddrsNameServers, ) return netConfigRes diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index 9760d1580..c40db3b54 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -59,6 +59,7 @@ type Waku* = ref object wakuDiscv5*: WakuDiscoveryV5 dynamicBootstrapNodes: seq[RemotePeerInfo] dnsRetryLoopHandle: Future[void] + networkConnLoopHandle: Future[void] discoveryMngr: DiscoveryManager node*: WakuNode @@ -363,6 +364,15 @@ proc startDnsDiscoveryRetryLoop(waku: ptr Waku): Future[void] {.async.} = error "failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg() return +# The network connectivity loop checks periodically whether the node is online or not +# and triggers any change that depends on the network connectivity state +proc startNetworkConnectivityLoop(waku: Waku): Future[void] {.async.} = + while true: + await sleepAsync(15.seconds) + + # Update online state + await waku.node.peerManager.updateOnlineState() + proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} = debug "Retrieve dynamic bootstrap nodes" @@ -400,6 +410,9 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} = if not waku[].deliveryMonitor.isNil(): waku[].deliveryMonitor.startDeliveryMonitor() + # Start network connectivity check loop + waku[].networkConnLoopHandle = waku[].startNetworkConnectivityLoop() + return ok() # Waku shutdown @@ -411,6 +424,9 @@ proc stop*(waku: Waku): Future[void] {.async: (raises: [Exception]).} = if not waku.metricsServer.isNil(): await waku.metricsServer.stop() + if not waku.networkConnLoopHandle.isNil(): + await waku.networkConnLoopHandle.cancelAndWait() + if not waku.wakuDiscv5.isNil(): await waku.wakuDiscv5.stop() diff --git a/waku/node/config.nim b/waku/node/config.nim index 311e26771..51aadb48d 100644 --- a/waku/node/config.nim +++ b/waku/node/config.nim @@ -15,6 +15,7 @@ type NetConfig* = object extIp*: Option[IpAddress] extPort*: Option[Port] dns4DomainName*: Option[string] + dnsNameServers*: seq[IpAddress] announcedAddresses*: seq[MultiAddress] extMultiAddrs*: seq[MultiAddress] enrMultiAddrs*: seq[MultiAddress] @@ -75,6 +76,7 @@ proc init*( discv5UdpPort = none(Port), clusterId: uint16 = 0, wakuFlags = none(CapabilitiesBitfield), + dnsNameServers = newSeq[IpAddress](), ): NetConfigResult = ## Initialize and validate waku node network configuration @@ -165,6 +167,7 @@ proc init*( extPort: extPort, wssEnabled: wssEnabled, dns4DomainName: dns4DomainName, + dnsNameServers: dnsNameServers, announcedAddresses: announcedAddresses, extMultiAddrs: extMultiAddrs, enrMultiaddrs: enrMultiaddrs, diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index 602718d5d..75c72449a 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -8,6 +8,7 @@ import libp2p/multistream, libp2p/muxers/muxer, libp2p/nameresolving/nameresolver, + libp2p/nameresolving/dnsresolver, libp2p/peerstore import @@ -73,6 +74,8 @@ const # Max peers that we allow from the same IP DefaultColocationLimit* = 5 + DNSCheckDomain = "one.one.one.one" + type ConnectionChangeHandler* = proc( peerId: PeerId, peerEvent: PeerEventKind ): Future[void] {.gcsafe, raises: [Defect].} @@ -95,11 +98,16 @@ type PeerManager* = ref object of RootObj started: bool shardedPeerManagement: bool # temp feature flag onConnectionChange*: ConnectionChangeHandler + dnsNameServers*: seq[IpAddress] + online: bool #~~~~~~~~~~~~~~~~~~~# # Helper Functions # #~~~~~~~~~~~~~~~~~~~# +template isOnline*(self: PeerManager): bool = + self.online + proc calculateBackoff( initialBackoffInSec: int, backoffFactor: int, failedAttempts: int ): timer.Duration = @@ -535,7 +543,38 @@ proc getStreamByPeerIdAndProtocol*( return ok(streamRes.get()) +proc checkInternetConnectivity( + nameServerIps: seq[IpAddress], timeout = 2.seconds +): Future[bool] {.async.} = + var nameServers: seq[TransportAddress] + for ip in nameServerIps: + nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53 + + let dnsResolver = DnsResolver.new(nameServers) + + # Resolve domain IP + let resolved = await dnsResolver.resolveIp(DNSCheckDomain, 0.Port, Domain.AF_UNSPEC) + + if resolved.len > 0: + return true + else: + return false + +proc updateOnlineState*(pm: PeerManager) {.async.} = + let numConnectedPeers = + pm.switch.peerStore.peers().countIt(it.connectedness == Connected) + + if numConnectedPeers > 0: + pm.online = true + else: + pm.online = await checkInternetConnectivity(pm.dnsNameServers) + proc connectToRelayPeers*(pm: PeerManager) {.async.} = + # only attempt if current node is online + if not pm.isOnline(): + error "connectToRelayPeers: won't attempt new connections - node is offline" + return + var (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec) let totalRelayPeers = inRelayPeers.len + outRelayPeers.len @@ -778,6 +817,10 @@ proc manageRelayPeers*(pm: PeerManager) {.async.} = if pm.wakuMetadata.shards.len == 0: return + if not pm.isOnline(): + error "manageRelayPeers: won't attempt new connections - node is offline" + return + var peersToConnect: HashSet[PeerId] # Can't use RemotePeerInfo as they are ref objects var peersToDisconnect: int @@ -1005,6 +1048,7 @@ proc new*( maxFailedAttempts = MaxFailedAttempts, colocationLimit = DefaultColocationLimit, shardedPeerManagement = false, + dnsNameServers = newSeq[IpAddress](), ): PeerManager {.gcsafe.} = let capacity = switch.peerStore.capacity let maxConnections = switch.connManager.inSema.size @@ -1055,6 +1099,8 @@ proc new*( maxFailedAttempts: maxFailedAttempts, colocationLimit: colocationLimit, shardedPeerManagement: shardedPeerManagement, + dnsNameServers: dnsNameServers, + online: true, ) proc peerHook(peerId: PeerId, event: PeerEvent): Future[void] {.gcsafe.} = diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index a544bdc80..ce86c3c57 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -212,7 +212,7 @@ proc mountStoreSync*( storeSyncRelayJitter = 20, ): Future[Result[void, string]] {.async.} = let idsChannel = newAsyncQueue[SyncID](0) - let wantsChannel = newAsyncQueue[(PeerId, WakuMessageHash)](0) + let wantsChannel = newAsyncQueue[PeerId](0) let needsChannel = newAsyncQueue[(PeerId, WakuMessageHash)](0) var cluster: uint16 diff --git a/waku/waku_api/rest/admin/client.nim b/waku/waku_api/rest/admin/client.nim index 4b46ca136..7d45544e2 100644 --- a/waku/waku_api/rest/admin/client.nim +++ b/waku/waku_api/rest/admin/client.nim @@ -28,6 +28,10 @@ proc getPeerById*( rest, endpoint: "/admin/v1/peer/{peerId}", meth: HttpMethod.MethodGet .} +proc getServicePeers*(): RestResponse[seq[WakuPeer]] {. + rest, endpoint: "/admin/v1/peers/service", meth: HttpMethod.MethodGet +.} + proc getConnectedPeers*(): RestResponse[seq[WakuPeer]] {. rest, endpoint: "/admin/v1/peers/connected", meth: HttpMethod.MethodGet .} @@ -38,16 +42,14 @@ proc getConnectedPeersByShard*( rest, endpoint: "/admin/v1/peers/connected/on/{shardId}", meth: HttpMethod.MethodGet .} -proc getConnectedRelayPeers*(): RestResponse[PeersOfShards] {. - rest, endpoint: "/admin/v1/peers/connected/relay", meth: HttpMethod.MethodGet +proc getRelayPeers*(): RestResponse[PeersOfShards] {. + rest, endpoint: "/admin/v1/peers/relay", meth: HttpMethod.MethodGet .} -proc getConnectedRelayPeersByShard*( +proc getRelayPeersByShard*( shardId: uint16 ): RestResponse[PeersOfShard] {. - rest, - endpoint: "/admin/v1/peers/connected/relay/on/{shardId}", - meth: HttpMethod.MethodGet + rest, endpoint: "/admin/v1/peers/relay/on/{shardId}", meth: HttpMethod.MethodGet .} proc getMeshPeers*(): RestResponse[PeersOfShards] {. diff --git a/waku/waku_api/rest/admin/handlers.nim b/waku/waku_api/rest/admin/handlers.nim index ada60e870..9cf6ec131 100644 --- a/waku/waku_api/rest/admin/handlers.nim +++ b/waku/waku_api/rest/admin/handlers.nim @@ -34,12 +34,13 @@ logScope: const ROUTE_ADMIN_V1_PEERS* = "/admin/v1/peers" # returns all peers const ROUTE_ADMIN_V1_SINGLE_PEER* = "/admin/v1/peer/{peerId}" +const ROUTE_ADMIN_V1_SERVICE_PEERS* = "/admin/v1/peers/service" # returns all peers + const ROUTE_ADMIN_V1_CONNECTED_PEERS* = "/admin/v1/peers/connected" const ROUTE_ADMIN_V1_CONNECTED_PEERS_ON_SHARD* = "/admin/v1/peers/connected/on/{shardId}" -const ROUTE_ADMIN_V1_CONNECTED_RELAY_PEERS* = "/admin/v1/peers/connected/relay" -const ROUTE_ADMIN_V1_CONNECTED_RELAY_PEERS_ON_SHARD* = - "/admin/v1/peers/connected/relay/on/{shardId}" +const ROUTE_ADMIN_V1_RELAY_PEERS* = "/admin/v1/peers/relay" +const ROUTE_ADMIN_V1_RELAY_PEERS_ON_SHARD* = "/admin/v1/peers/relay/on/{shardId}" const ROUTE_ADMIN_V1_MESH_PEERS* = "/admin/v1/peers/mesh" const ROUTE_ADMIN_V1_MESH_PEERS_ON_SHARD* = "/admin/v1/peers/mesh/on/{shardId}" @@ -62,46 +63,48 @@ proc tuplesToWakuPeers(peers: var WakuPeers, peersTup: seq[PeerProtocolTuple]) = peer.origin, ) -proc populateAdminPeerInfo(peers: var WakuPeers, node: WakuNode, codec: string) = - let peersForCodec = node.peerManager.switch.peerStore.peers(codec).mapIt( - ( - multiaddr: constructMultiaddrStr(it), - protocol: codec, - shards: it.getShards(), - connected: it.connectedness, - agent: it.agent, - origin: it.origin, +proc populateAdminPeerInfo( + peers: var WakuPeers, node: WakuNode, codec: Option[string] = none[string]() +) = + if codec.isNone(): + peers = node.peerManager.switch.peerStore.peers().mapIt(WakuPeer.init(it)) + else: + let peersTuples = node.peerManager.switch.peerStore.peers(codec.get()).mapIt( + ( + multiaddr: constructMultiaddrStr(it), + protocol: codec.get(), + shards: it.getShards(), + connected: it.connectedness, + agent: it.agent, + origin: it.origin, + ) ) - ) - tuplesToWakuPeers(peers, peersForCodec) + tuplesToWakuPeers(peers, peersTuples) + +proc populateAdminPeerInfoForAll(node: WakuNode): WakuPeers = + var peers: WakuPeers = @[] + populateAdminPeerInfo(peers, node) + return peers proc populateAdminPeerInfoForCodecs(node: WakuNode, codecs: seq[string]): WakuPeers = var peers: WakuPeers = @[] for codec in codecs: - populateAdminPeerInfo(peers, node, codec) + populateAdminPeerInfo(peers, node, some(codec)) return peers proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = router.api(MethodGet, ROUTE_ADMIN_V1_PEERS) do() -> RestApiResponse: - let peers = populateAdminPeerInfoForCodecs( - node, - @[ - WakuRelayCodec, WakuFilterSubscribeCodec, WakuStoreCodec, WakuLegacyStoreCodec, - WakuLegacyLightPushCodec, WakuLightPushCodec, WakuPeerExchangeCodec, - WakuReconciliationCodec, - ], - ) + let peers = populateAdminPeerInfoForAll(node) - let resp = RestApiResponse.jsonResponse(peers, status = Http200) - if resp.isErr(): - error "An error occurred while building the json response: ", error = resp.error + let resp = RestApiResponse.jsonResponse(peers, status = Http200).valueOr: + error "An error occurred while building the json response: ", error = error return RestApiResponse.internalServerError( - fmt("An error occurred while building the json response: {resp.error}") + fmt("An error occurred while building the json response: {error}") ) - return resp.get() + return resp router.api(MethodGet, ROUTE_ADMIN_V1_SINGLE_PEER) do( peerId: string @@ -115,19 +118,18 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = if node.peerManager.switch.peerStore.peerExists(peerIdVal): let peerInfo = node.peerManager.switch.peerStore.getPeer(peerIdVal) let peer = WakuPeer.init(peerInfo) - let resp = RestApiResponse.jsonResponse(peer, status = Http200) - if resp.isErr(): - error "An error occurred while building the json response: ", error = resp.error + let resp = RestApiResponse.jsonResponse(peer, status = Http200).valueOr: + error "An error occurred while building the json response: ", error = error return RestApiResponse.internalServerError( - fmt("An error occurred while building the json response: {resp.error}") + fmt("An error occurred while building the json response: {error}") ) - return resp.get() + return resp else: return RestApiResponse.notFound(fmt("Peer with ID {peerId} not found")) - router.api(MethodGet, ROUTE_ADMIN_V1_CONNECTED_PEERS) do() -> RestApiResponse: - let allPeers = populateAdminPeerInfoForCodecs( + router.api(MethodGet, ROUTE_ADMIN_V1_SERVICE_PEERS) do() -> RestApiResponse: + let peers = populateAdminPeerInfoForCodecs( node, @[ WakuRelayCodec, WakuFilterSubscribeCodec, WakuStoreCodec, WakuLegacyStoreCodec, @@ -136,16 +138,26 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = ], ) - let connectedPeers = allPeers.filterIt(it.connected == Connectedness.Connected) - - let resp = RestApiResponse.jsonResponse(connectedPeers, status = Http200) - if resp.isErr(): - error "An error occurred while building the json response: ", error = resp.error + let resp = RestApiResponse.jsonResponse(peers, status = Http200).valueOr: + error "An error occurred while building the json response: ", error = error return RestApiResponse.internalServerError( - fmt("An error occurred while building the json response: {resp.error}") + fmt("An error occurred while building the json response: {error}") ) - return resp.get() + return resp + + router.api(MethodGet, ROUTE_ADMIN_V1_CONNECTED_PEERS) do() -> RestApiResponse: + let allPeers = populateAdminPeerInfoForAll(node) + + let connectedPeers = allPeers.filterIt(it.connected == Connectedness.Connected) + + let resp = RestApiResponse.jsonResponse(connectedPeers, status = Http200).valueOr: + error "An error occurred while building the json response: ", error = error + return RestApiResponse.internalServerError( + fmt("An error occurred while building the json response: {error}") + ) + + return resp router.api(MethodGet, ROUTE_ADMIN_V1_CONNECTED_PEERS_ON_SHARD) do( shardId: uint16 @@ -153,29 +165,21 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = let shard = shardId.valueOr: return RestApiResponse.badRequest(fmt("Invalid shardId: {error}")) - let allPeers = populateAdminPeerInfoForCodecs( - node, - @[ - WakuRelayCodec, WakuFilterSubscribeCodec, WakuStoreCodec, WakuLegacyStoreCodec, - WakuLegacyLightPushCodec, WakuLightPushCodec, WakuPeerExchangeCodec, - WakuReconciliationCodec, - ], - ) + let allPeers = populateAdminPeerInfoForAll(node) let connectedPeers = allPeers.filterIt( it.connected == Connectedness.Connected and it.shards.contains(shard) ) - let resp = RestApiResponse.jsonResponse(connectedPeers, status = Http200) - if resp.isErr(): - error "An error occurred while building the json response: ", error = resp.error + let resp = RestApiResponse.jsonResponse(connectedPeers, status = Http200).valueOr: + error "An error occurred while building the json response: ", error = error return RestApiResponse.internalServerError( - fmt("An error occurred while building the json response: {resp.error}") + fmt("An error occurred while building the json response: {error}") ) - return resp.get() + return resp - router.api(MethodGet, ROUTE_ADMIN_V1_CONNECTED_RELAY_PEERS) do() -> RestApiResponse: + router.api(MethodGet, ROUTE_ADMIN_V1_RELAY_PEERS) do() -> RestApiResponse: if node.wakuRelay.isNil(): return RestApiResponse.serviceUnavailable( "Error: Relay Protocol is not mounted to the node" @@ -195,16 +199,15 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = ) ) - let resp = RestApiResponse.jsonResponse(relayPeers, status = Http200) - if resp.isErr(): - error "An error occurred while building the json response: ", error = resp.error + let resp = RestApiResponse.jsonResponse(relayPeers, status = Http200).valueOr: + error "An error occurred while building the json response: ", error = error return RestApiResponse.internalServerError( - fmt("An error occurred while building the json response: {resp.error}") + fmt("An error occurred while building the json response: {error}") ) - return resp.get() + return resp - router.api(MethodGet, ROUTE_ADMIN_V1_CONNECTED_RELAY_PEERS_ON_SHARD) do( + router.api(MethodGet, ROUTE_ADMIN_V1_RELAY_PEERS_ON_SHARD) do( shardId: uint16 ) -> RestApiResponse: let shard = shardId.valueOr: @@ -223,14 +226,13 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = shard: shard, peers: toSeq(pubsubPeers).mapIt(WakuPeer.init(it, node.peerManager)) ) - let resp = RestApiResponse.jsonResponse(relayPeer, status = Http200) - if resp.isErr(): - error "An error occurred while building the json response: ", error = resp.error + let resp = RestApiResponse.jsonResponse(relayPeer, status = Http200).valueOr: + error "An error occurred while building the json response: ", error = error return RestApiResponse.internalServerError( - fmt("An error occurred while building the json response: {resp.error}") + fmt("An error occurred while building the json response: {error}") ) - return resp.get() + return resp router.api(MethodGet, ROUTE_ADMIN_V1_MESH_PEERS) do() -> RestApiResponse: if node.wakuRelay.isNil(): @@ -252,14 +254,13 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = ) ) - let resp = RestApiResponse.jsonResponse(relayPeers, status = Http200) - if resp.isErr(): - error "An error occurred while building the json response: ", error = resp.error + let resp = RestApiResponse.jsonResponse(relayPeers, status = Http200).valueOr: + error "An error occurred while building the json response: ", error = error return RestApiResponse.internalServerError( - fmt("An error occurred while building the json response: {resp.error}") + fmt("An error occurred while building the json response: {error}") ) - return resp.get() + return resp router.api(MethodGet, ROUTE_ADMIN_V1_MESH_PEERS_ON_SHARD) do( shardId: uint16 @@ -280,14 +281,13 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = shard: shard, peers: toSeq(peers).mapIt(WakuPeer.init(it, node.peerManager)) ) - let resp = RestApiResponse.jsonResponse(relayPeer, status = Http200) - if resp.isErr(): - error "An error occurred while building the json response: ", error = resp.error + let resp = RestApiResponse.jsonResponse(relayPeer, status = Http200).valueOr: + error "An error occurred while building the json response: ", error = error return RestApiResponse.internalServerError( - fmt("An error occurred while building the json response: {resp.error}") + fmt("An error occurred while building the json response: {error}") ) - return resp.get() + return resp proc installAdminV1PostPeersHandler(router: var RestRouter, node: WakuNode) = router.api(MethodPost, ROUTE_ADMIN_V1_PEERS) do( diff --git a/waku/waku_rln_relay/conversion_utils.nim b/waku/waku_rln_relay/conversion_utils.nim index f83a9b6bd..7393668e5 100644 --- a/waku/waku_rln_relay/conversion_utils.nim +++ b/waku/waku_rln_relay/conversion_utils.nim @@ -27,9 +27,6 @@ proc inHex*( valueHex = "0" & valueHex return toLowerAscii(valueHex) -proc toUserMessageLimit*(v: UInt256): UserMessageLimit = - return cast[UserMessageLimit](v) - proc encodeLengthPrefix*(input: openArray[byte]): seq[byte] = ## returns length prefixed version of the input ## with the following format [len<8>|input] @@ -79,7 +76,17 @@ proc serialize*( return output proc serialize*(witness: RLNWitnessInput): seq[byte] = - ## Serializes the witness into a byte array according to the RLN protocol format + ## Serializes the RLN witness into a byte array following zerokit's expected format. + ## The serialized format includes: + ## - identity_secret (32 bytes, little-endian with zero padding) + ## - user_message_limit (32 bytes, little-endian with zero padding) + ## - message_id (32 bytes, little-endian with zero padding) + ## - merkle tree depth (8 bytes, little-endian) = path_elements.len / 32 + ## - path_elements (each 32 bytes, ordered bottom-to-top) + ## - merkle tree depth again (8 bytes, little-endian) + ## - identity_path_index (sequence of bits as bytes, 0 = left, 1 = right) + ## - x (32 bytes, little-endian with zero padding) + ## - external_nullifier (32 bytes, little-endian with zero padding) var buffer: seq[byte] buffer.add(@(witness.identity_secret)) buffer.add(@(witness.user_message_limit)) @@ -147,4 +154,23 @@ proc fromEpoch*(epoch: Epoch): uint64 = func `+`*(a, b: Quantity): Quantity {.borrow.} func u256*(n: Quantity): UInt256 {.inline.} = - n.uint64.stuint(256) \ No newline at end of file + n.uint64.stuint(256) + +proc uint64ToField*(n: uint64): array[32, byte] = + ## Converts uint64 to 32-byte little-endian array with zero padding + var bytes = toBytes(n, Endianness.littleEndian) + result[0 ..< bytes.len] = bytes + +proc UInt256ToField*(v: UInt256): array[32, byte] = + return cast[array[32, byte]](v) + +proc seqToField*(s: seq[byte]): array[32, byte] = + result = default(array[32, byte]) + let len = min(s.len, 32) + for i in 0 ..< len: + result[i] = s[i] + +proc uint64ToIndex*(index: MembershipIndex, depth: int): seq[byte] = + result = newSeq[byte](depth) + for i in 0 ..< depth: + result[i] = byte((index shr i) and 1) # LSB-first bit decomposition diff --git a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim index b5845d170..44949d43d 100644 --- a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim +++ b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim @@ -89,25 +89,6 @@ proc setMetadata*( return err("failed to persist rln metadata: " & getCurrentExceptionMsg()) return ok() -proc uint64ToField*(n: uint64): array[32, byte] = - ## Converts uint64 to 32-byte little-endian array with zero padding - var bytes = toBytes(n, Endianness.littleEndian) - result[0 ..< bytes.len] = bytes - -proc UInt256ToField*(v: UInt256): array[32, byte] = - return cast[array[32, byte]](v) - -proc seqToField*(s: seq[byte]): array[32, byte] = - result = default(array[32, byte]) - let len = min(s.len, 32) - for i in 0 ..< len: - result[i] = s[i] - -proc uint64ToIndex(index: MembershipIndex, depth: int): seq[byte] = - result = newSeq[byte](depth) - for i in 0 ..< depth: - result[i] = byte((index shr i) and 1) # LSB-first bit decomposition - proc fetchMerkleProofElements*( g: OnchainGroupManager ): Future[Result[seq[byte], string]] {.async.} = @@ -199,6 +180,11 @@ proc trackRootChanges*(g: OnchainGroupManager) {.async.} = if proofResult.isErr(): error "Failed to fetch Merkle proof", error = proofResult.error g.merkleProofCache = proofResult.get() + + # also need update registerd membership + let memberCount = cast[int64](await wakuRlnContract.nextFreeIndex().call()) + waku_rln_number_registered_memberships.set(float64(memberCount)) + await sleepAsync(rpcDelay) method atomicBatch*( @@ -356,7 +342,6 @@ method generateProof*( except CatchableError: error "Failed to fetch merkle proof", error = getCurrentExceptionMsg() - if (g.merkleProofCache.len mod 32) != 0: return err("Invalid merkle proof cache length") @@ -460,7 +445,7 @@ method verifyProof*( if extNullRes.isErr(): return err("could not construct external nullifier: " & extNullRes.error) normalizedProof.externalNullifier = extNullRes.get() - + let proofBytes = serialize(normalizedProof, input) let proofBuffer = proofBytes.toBuffer() @@ -476,7 +461,6 @@ method verifyProof*( , ) - if not ffiOk: warn "verify_with_roots() returned failure status", proof = proof return err("could not verify the proof") @@ -585,7 +569,6 @@ method init*(g: OnchainGroupManager): Future[GroupManagerResult[void]] {.async.} ethRpc.ondisconnect = proc() = asyncSpawn onDisconnect() - waku_rln_number_registered_memberships.set(int64(g.rlnInstance.leavesSet())) g.initialized = true return ok() diff --git a/waku/waku_rln_relay/protocol_metrics.nim b/waku/waku_rln_relay/protocol_metrics.nim index 121727809..2210328f4 100644 --- a/waku/waku_rln_relay/protocol_metrics.nim +++ b/waku/waku_rln_relay/protocol_metrics.nim @@ -85,6 +85,7 @@ proc getRlnMetricsLogger*(): RLNMetricsLogger = var cumulativeProofsVerified = 0.float64 var cumulativeProofsGenerated = 0.float64 var cumulativeProofsRemaining = 100.float64 + var cumulativeRegisteredMember = 0.float64 when defined(metrics): logMetrics = proc() = @@ -107,6 +108,9 @@ proc getRlnMetricsLogger*(): RLNMetricsLogger = let freshProofsRemainingCount = parseAndAccumulate( waku_rln_remaining_proofs_per_epoch, cumulativeProofsRemaining ) + let freshRegisteredMemberCount = parseAndAccumulate( + waku_rln_number_registered_memberships, cumulativeRegisteredMember + ) info "Total messages", count = freshMsgCount info "Total spam messages", count = freshSpamCount @@ -116,5 +120,6 @@ proc getRlnMetricsLogger*(): RLNMetricsLogger = info "Total proofs verified", count = freshProofsVerifiedCount info "Total proofs generated", count = freshProofsGeneratedCount info "Total proofs remaining", count = freshProofsRemainingCount + info "Total registered members", count = freshRegisteredMemberCount return logMetrics diff --git a/waku/waku_rln_relay/protocol_types.nim b/waku/waku_rln_relay/protocol_types.nim index 41372bef3..c6f52e00b 100644 --- a/waku/waku_rln_relay/protocol_types.nim +++ b/waku/waku_rln_relay/protocol_types.nim @@ -52,17 +52,19 @@ type RateLimitProof* = object ## the external nullifier used for the generation of the `proof` (derived from poseidon([epoch, rln_identifier])) externalNullifier*: ExternalNullifier -type - Fr = array[32, byte] # Field element representation (256 bits) +type UInt40* = StUint[40] +type UInt32* = StUint[32] +type + Field = array[32, byte] # Field element representation (256 bits) RLNWitnessInput* = object - identity_secret*: Fr - user_message_limit*: Fr - message_id*: Fr + identity_secret*: Field + user_message_limit*: Field + message_id*: Field path_elements*: seq[byte] identity_path_index*: seq[byte] - x*: Fr - external_nullifier*: Fr + x*: Field + external_nullifier*: Field type ProofMetadata* = object nullifier*: Nullifier diff --git a/waku/waku_rln_relay/rln_relay.nim b/waku/waku_rln_relay/rln_relay.nim index 8e64f2fde..522f58da2 100644 --- a/waku/waku_rln_relay/rln_relay.nim +++ b/waku/waku_rln_relay/rln_relay.nim @@ -184,7 +184,7 @@ proc absDiff*(e1, e2: Epoch): uint64 = return epoch2 - epoch1 proc validateMessage*( - rlnPeer: WakuRLNRelay, msg: WakuMessage, timeOption = none(float64) + rlnPeer: WakuRLNRelay, msg: WakuMessage ): MessageValidationResult = ## validate the supplied `msg` based on the waku-rln-relay routing protocol i.e., ## the `msg`'s epoch is within MaxEpochGap of the current epoch @@ -206,12 +206,8 @@ proc validateMessage*( # checks if the `msg`'s epoch is far from the current epoch # it corresponds to the validation of rln external nullifier - var epoch: Epoch - if timeOption.isSome(): - epoch = rlnPeer.calcEpoch(timeOption.get()) - else: - # get current rln epoch - epoch = rlnPeer.getCurrentEpoch() + # get current rln epoch + let epoch: Epoch = rlnPeer.getCurrentEpoch() let msgEpoch = proof.epoch @@ -276,12 +272,12 @@ proc validateMessage*( return MessageValidationResult.Valid proc validateMessageAndUpdateLog*( - rlnPeer: WakuRLNRelay, msg: WakuMessage, timeOption = none(float64) + rlnPeer: WakuRLNRelay, msg: WakuMessage ): MessageValidationResult = ## validates the message and updates the log to prevent double messaging ## in future messages - let isValidMessage = rlnPeer.validateMessage(msg, timeOption) + let isValidMessage = rlnPeer.validateMessage(msg) let decodeRes = RateLimitProof.init(msg.proof) if decodeRes.isErr(): diff --git a/waku/waku_store_sync/reconciliation.nim b/waku/waku_store_sync/reconciliation.nim index c08a9e434..d9912a3df 100644 --- a/waku/waku_store_sync/reconciliation.nim +++ b/waku/waku_store_sync/reconciliation.nim @@ -46,13 +46,10 @@ type SyncReconciliation* = ref object of LPProtocol storage: SyncStorage - # Receive IDs from transfer protocol for storage + # AsyncQueues are used as communication channels between + # reconciliation and transfer protocols. idsRx: AsyncQueue[SyncID] - - # Send Hashes to transfer protocol for reception - localWantsTx: AsyncQueue[(PeerId, WakuMessageHash)] - - # Send Hashes to transfer protocol for transmission + localWantsTx: AsyncQueue[PeerId] remoteNeedsTx: AsyncQueue[(PeerId, WakuMessageHash)] # params @@ -100,6 +97,9 @@ proc processRequest( roundTrips = 0 diffs = 0 + # Signal to transfer protocol that this reconciliation is starting + await self.localWantsTx.addLast(conn.peerId) + while true: let readRes = catch: await conn.readLp(int.high) @@ -143,7 +143,6 @@ proc processRequest( diffs.inc() for hash in hashToRecv: - self.localWantsTx.addLastNoWait((conn.peerId, hash)) diffs.inc() rawPayload = sendPayload.deltaEncode() @@ -168,6 +167,9 @@ proc processRequest( continue + # Signal to transfer protocol that this reconciliation is done + await self.localWantsTx.addLast(conn.peerId) + reconciliation_roundtrips.observe(roundTrips) reconciliation_differences.observe(diffs) @@ -296,7 +298,7 @@ proc new*( syncInterval: timer.Duration = DefaultSyncInterval, relayJitter: timer.Duration = DefaultGossipSubJitter, idsRx: AsyncQueue[SyncID], - localWantsTx: AsyncQueue[(PeerId, WakuMessageHash)], + localWantsTx: AsyncQueue[PeerId], remoteNeedsTx: AsyncQueue[(PeerId, WakuMessageHash)], ): Future[Result[T, string]] {.async.} = let res = await initFillStorage(syncRange, wakuArchive) diff --git a/waku/waku_store_sync/transfer.nim b/waku/waku_store_sync/transfer.nim index 5a52cac9c..c1e5d3e37 100644 --- a/waku/waku_store_sync/transfer.nim +++ b/waku/waku_store_sync/transfer.nim @@ -37,9 +37,9 @@ type SyncTransfer* = ref object of LPProtocol idsTx: AsyncQueue[SyncID] # Receive Hashes from reconciliation protocol for reception - localWantsRx: AsyncQueue[(PeerId, WakuMessageHash)] + localWantsRx: AsyncQueue[PeerId] localWantsRxFut: Future[void] - inSessions: Table[PeerId, HashSet[WakuMessageHash]] + inSessions: HashSet[PeerId] # Receive Hashes from reconciliation protocol for transmission remoteNeedsRx: AsyncQueue[(PeerId, WakuMessageHash)] @@ -78,19 +78,14 @@ proc openConnection( return ok(conn) proc wantsReceiverLoop(self: SyncTransfer) {.async.} = - ## Waits for message hashes, - ## store the peers and hashes locally as - ## "supposed to be received" + ## Waits for peer ids of nodes + ## we are reconciliating with while true: # infinite loop - let (peerId, fingerprint) = await self.localWantsRx.popFirst() + let peerId = await self.localWantsRx.popFirst() - self.inSessions.withValue(peerId, value): - value[].incl(fingerprint) - do: - var hashes = initHashSet[WakuMessageHash]() - hashes.incl(fingerprint) - self.inSessions[peerId] = hashes + if self.inSessions.containsOrIncl(peerId): + self.inSessions.excl(peerId) return @@ -137,6 +132,10 @@ proc needsReceiverLoop(self: SyncTransfer) {.async.} = proc initProtocolHandler(self: SyncTransfer) = let handler = proc(conn: Connection, proto: string) {.async, closure.} = while true: + if not self.inSessions.contains(conn.peerId): + error "unwanted peer, disconnecting", remote = conn.peerId + break + let readRes = catch: await conn.readLp(int64(DefaultMaxWakuMessageSize)) @@ -157,16 +156,6 @@ proc initProtocolHandler(self: SyncTransfer) = let hash = computeMessageHash(pubsub, msg) - self.inSessions.withValue(conn.peerId, value): - if value[].missingOrExcl(hash): - error "unwanted hash received, disconnecting" - self.inSessions.del(conn.peerId) - break - do: - error "unwanted hash received, disconnecting" - self.inSessions.del(conn.peerId) - break - #TODO verify msg RLN proof... (await self.wakuArchive.syncMessageIngress(hash, pubsub, msg)).isOkOr: @@ -193,7 +182,7 @@ proc new*( peerManager: PeerManager, wakuArchive: WakuArchive, idsTx: AsyncQueue[SyncID], - localWantsRx: AsyncQueue[(PeerId, WakuMessageHash)], + localWantsRx: AsyncQueue[PeerId], remoteNeedsRx: AsyncQueue[(PeerId, WakuMessageHash)], ): T = var transfer = SyncTransfer(