diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 415fdbf9f..8cacf3455 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -52,6 +52,7 @@ jobs: needs: changes if: ${{ needs.changes.outputs.v2 == 'true' || needs.changes.outputs.common == 'true' }} strategy: + fail-fast: false matrix: os: [ubuntu-latest, macos-13] runs-on: ${{ matrix.os }} @@ -82,6 +83,7 @@ jobs: needs: changes if: ${{ needs.changes.outputs.v2 == 'true' || needs.changes.outputs.common == 'true' }} strategy: + fail-fast: false matrix: os: [ubuntu-latest, macos-13] runs-on: ${{ matrix.os }} @@ -107,16 +109,16 @@ jobs: - name: Run tests run: | - if [ ${{ runner.os }} == "Linux" ]; then - sudo docker run --rm -d -e POSTGRES_PASSWORD=test123 -p 5432:5432 postgres:15.4-alpine3.18 - fi - postgres_enabled=0 if [ ${{ runner.os }} == "Linux" ]; then + sudo docker run --rm -d -e POSTGRES_PASSWORD=test123 -p 5432:5432 postgres:15.4-alpine3.18 postgres_enabled=1 fi - make V=1 LOG_LEVEL=DEBUG QUICK_AND_DIRTY_COMPILER=1 POSTGRES=$postgres_enabled test testwakunode2 + export MAKEFLAGS="-j1" + export NIMFLAGS="--colors:off -d:chronicles_colors:none" + + make V=1 LOG_LEVEL=DEBUG QUICK_AND_DIRTY_COMPILER=1 POSTGRES=$postgres_enabled test testwakunode2>>>>>>> master build-docker-image: needs: changes diff --git a/CHANGELOG.md b/CHANGELOG.md index 6a80c006c..addd3e0da 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,24 @@ +## v0.28.1 (2024-05-29) + +This patch release fixes the following bug: +- Store node does not retrieve messages because the meta field is missing in queries. + +### Bug Fix + +- Commit that fixes the bug [8b42f199](https://github.com/waku-org/nwaku/commit/8b42f199baf4e00794c4cec4d8601c3f6c330a20) + +This is a patch release that is fully backwards-compatible with release `v0.28.0`. + +It supports the same [libp2p protocols](https://docs.libp2p.io/concepts/protocols/): +| Protocol | Spec status | Protocol id | +| ---: | :---: | :--- | +| [`11/WAKU2-RELAY`](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/11/relay.md) | `stable` | `/vac/waku/relay/2.0.0` | +| [`12/WAKU2-FILTER`](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/12/filter.md) | `draft` | `/vac/waku/filter/2.0.0-beta1`
`/vac/waku/filter-subscribe/2.0.0-beta1`
`/vac/waku/filter-push/2.0.0-beta1` | +| [`13/WAKU2-STORE`](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/13/store.md) | `draft` | `/vac/waku/store/2.0.0-beta4` | +| [`19/WAKU2-LIGHTPUSH`](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/19/lightpush.md) | `draft` | `/vac/waku/lightpush/2.0.0-beta1` | +| [`66/WAKU2-METADATA`](https://github.com/waku-org/specs/blob/master/standards/core/metadata.md) | `raw` | `/vac/waku/metadata/1.0.0` | + + ## v0.28.0 (2024-05-22) ## What's Changed diff --git a/apps/networkmonitor/networkmonitor_config.nim b/apps/networkmonitor/networkmonitor_config.nim index d56a4941b..8e76b6f32 100644 --- a/apps/networkmonitor/networkmonitor_config.nim +++ b/apps/networkmonitor/networkmonitor_config.nim @@ -169,7 +169,6 @@ proc parseCmdArg*(T: type EthRpcUrl, s: string): T = var wsPattern = re2"^(wss?):\/\/((localhost)|([\w_-]+(?:(?:\.[\w_-]+)+)))(:[0-9]{1,5})?([\w.,@?^=%&:\/~+#-]*[\w@?^=%&\/~+#-])*" if regex.match(s, wsPattern): - echo "here" raise newException( ValueError, "Websocket RPC URL is not supported, Please use an HTTP URL" ) diff --git a/tests/waku_rln_relay/test_rln_group_manager_onchain.nim b/tests/waku_rln_relay/test_rln_group_manager_onchain.nim index aa28c7cf7..b1e89a4fe 100644 --- a/tests/waku_rln_relay/test_rln_group_manager_onchain.nim +++ b/tests/waku_rln_relay/test_rln_group_manager_onchain.nim @@ -104,14 +104,14 @@ proc createEthAccount(): Future[(keys.PrivateKey, Address)] {.async.} = var tx: EthSend tx.source = accounts[0] - tx.value = some(ethToWei(10.u256)) + tx.value = some(ethToWei(1000.u256)) tx.to = some(acc) tx.gasPrice = some(gasPrice) # Send 10 eth to acc discard await web3.send(tx) let balance = await web3.provider.eth_getBalance(acc, "latest") - assert(balance == ethToWei(10.u256)) + assert(balance == ethToWei(1000.u256)) return (pk, acc) @@ -138,7 +138,7 @@ proc runAnvil(): Process = let runAnvil = startProcess( anvilPath, args = [ - "--port", "8540", "--gas-limit", "300000000000000", "--balance", "10000", + "--port", "8540", "--gas-limit", "300000000000000", "--balance", "1000000000", "--chain-id", "1337", ], options = {poUsePath}, diff --git a/vendor/nim-bearssl b/vendor/nim-bearssl index 86f212c6a..a806cbfab 160000 --- a/vendor/nim-bearssl +++ b/vendor/nim-bearssl @@ -1 +1 @@ -Subproject commit 86f212c6a5d76b52e20fad2e318cc5436d04fc26 +Subproject commit a806cbfab5fe8de49c76139f8705fff79daf99ee diff --git a/waku/README.md b/waku/README.md index c19408ee4..5b9462775 100644 --- a/waku/README.md +++ b/waku/README.md @@ -6,7 +6,7 @@ This folder contains code related to Waku, both as a node and as a protocol. This is an implementation in Nim of the Waku suite of protocols. -See [specifications](https://rfc.vac.dev/spec/10/). +See [specifications](https://rfc.vac.dev/waku/standards/core/10/waku2). ## How to Build & Run diff --git a/waku/common/utils/nat.nim b/waku/common/utils/nat.nim index e05a345e6..5835a8e7f 100644 --- a/waku/common/utils/nat.nim +++ b/waku/common/utils/nat.nim @@ -9,6 +9,15 @@ import chronicles, eth/net/nat, stew/results, nativesockets logScope: topics = "nat" +## Due to the design of nim-eth/nat module we must ensure it is only initialized once. +## see: https://github.com/waku-org/nwaku/issues/2628 +## Details: nim-eth/nat module starts a meaintenance thread for refreshing the NAT mappings, but everything in the module is global, +## there is no room to store multiple configurations. +## Exact meaning: redirectPorts cannot be called twice in a program lifetime. +## During waku tests we happen to start several node instances in parallel thus resulting in multiple NAT configurations and multiple threads. +## Those threads will dead lock each other in tear down. +var singletonNat: bool = false + proc setupNat*( natConf, clientId: string, tcpPort, udpPort: Port ): Result[ @@ -26,26 +35,35 @@ proc setupNat*( tuple[ip: Option[IpAddress], tcpPort: Option[Port], udpPort: Option[Port]] if strategy != NatNone: - let extIp = getExternalIP(strategy) - if extIP.isSome(): - endpoint.ip = some(extIp.get()) - # RedirectPorts in considered a gcsafety violation - # because it obtains the address of a non-gcsafe proc? - var extPorts: Option[(Port, Port)] - try: - extPorts = ( - {.gcsafe.}: - redirectPorts(tcpPort = tcpPort, udpPort = udpPort, description = clientId) - ) - except CatchableError: - # TODO: nat.nim Error: can raise an unlisted exception: Exception. Isolate here for now. - error "unable to determine external ports" - extPorts = none((Port, Port)) + ## Only initialize the NAT module once + ## redirectPorts cannot be called twice in a program lifetime. + ## We can do it as same happens if getExternalIP fails and returns None + if singletonNat: + warn "NAT already initialized, skipping as cannot be done multiple times" + else: + singletonNat = true + let extIp = getExternalIP(strategy) + if extIP.isSome(): + endpoint.ip = some(extIp.get()) + # RedirectPorts in considered a gcsafety violation + # because it obtains the address of a non-gcsafe proc? + var extPorts: Option[(Port, Port)] + try: + extPorts = ( + {.gcsafe.}: + redirectPorts( + tcpPort = tcpPort, udpPort = udpPort, description = clientId + ) + ) + except CatchableError: + # TODO: nat.nim Error: can raise an unlisted exception: Exception. Isolate here for now. + error "unable to determine external ports" + extPorts = none((Port, Port)) - if extPorts.isSome(): - let (extTcpPort, extUdpPort) = extPorts.get() - endpoint.tcpPort = some(extTcpPort) - endpoint.udpPort = some(extUdpPort) + if extPorts.isSome(): + let (extTcpPort, extUdpPort) = extPorts.get() + endpoint.tcpPort = some(extTcpPort) + endpoint.udpPort = some(extUdpPort) else: # NatNone if not natConf.startsWith("extip:"): return err("not a valid NAT mechanism: " & $natConf) diff --git a/waku/factory/external_config.nim b/waku/factory/external_config.nim index 439f07bc9..bc3c08bb1 100644 --- a/waku/factory/external_config.nim +++ b/waku/factory/external_config.nim @@ -366,13 +366,6 @@ type WakuNodeConf* = object name: "filternode" .}: string - filterTimeout* {. - desc: - "Filter clients will be wiped out if not able to receive push messages within this timeout. In seconds.", - defaultValue: 14400, # 4 hours - name: "filter-timeout" - .}: int64 - filterSubscriptionTimeout* {. desc: "Timeout for filter subscription without ping or refresh it, in seconds. Only for v2 filter protocol.", diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index e49de6a59..679041580 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -380,7 +380,11 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} = pm.peerStore.hasPeer(peerId, WakuRelayCodec) and not metadata.shards.anyIt(pm.wakuMetadata.shards.contains(it)) ): - reason = "no shards in common" + let myShardsString = "[ " & toSeq(pm.wakuMetadata.shards).join(", ") & "]" + let otherShardsString = "[ " & metadata.shards.join(", ") & "]" + reason = + "no shards in common: my_shards = " & myShardsString & " others_shards = " & + otherShardsString break guardClauses return diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 2d699d34e..77cba5567 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -225,7 +225,7 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) = return proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = - debug "waku.relay received", + notice "waku.relay received", my_peer_id = node.peerId, pubsubTopic = topic, msg_hash = topic.computeMessageHash(msg).to0xHex(), @@ -356,10 +356,10 @@ proc publish*( #TODO instead of discard return error when 0 peers received the message discard await node.wakuRelay.publish(pubsubTopic, message) - trace "waku.relay published", + notice "waku.relay published", peerId = node.peerId, pubsubTopic = pubsubTopic, - hash = pubsubTopic.computeMessageHash(message).to0xHex(), + msg_hash = pubsubTopic.computeMessageHash(message).to0xHex(), publishTime = getNowInNanosecondTime() return ok() @@ -952,7 +952,7 @@ proc mountLightPush*( if publishedCount == 0: ## Agreed change expected to the lightpush protocol to better handle such case. https://github.com/waku-org/pm/issues/93 let msgHash = computeMessageHash(pubsubTopic, message).to0xHex() - debug "Lightpush request has not been published to any peers", + notice "Lightpush request has not been published to any peers", msg_hash = msgHash return ok() @@ -994,7 +994,7 @@ proc lightpushPublish*( ): Future[WakuLightPushResult[void]] {.async, gcsafe.} = let msgHash = pubsubTopic.computeMessageHash(message).to0xHex() if not node.wakuLightpushClient.isNil(): - debug "publishing message with lightpush", + notice "publishing message with lightpush", pubsubTopic = pubsubTopic, contentTopic = message.contentTopic, target_peer_id = peer.peerId, @@ -1002,7 +1002,7 @@ proc lightpushPublish*( return await node.wakuLightpushClient.publish(pubsubTopic, message, peer) if not node.wakuLightPush.isNil(): - debug "publishing message with self hosted lightpush", + notice "publishing message with self hosted lightpush", pubsubTopic = pubsubTopic, contentTopic = message.contentTopic, target_peer_id = peer.peerId, diff --git a/waku/waku_archive/archive.nim b/waku/waku_archive/archive.nim index fa395c62e..f7b05b748 100644 --- a/waku/waku_archive/archive.nim +++ b/waku/waku_archive/archive.nim @@ -29,7 +29,7 @@ const WakuArchiveDefaultRetentionPolicyInterval* = chronos.minutes(30) # Metrics reporting - WakuArchiveDefaultMetricsReportInterval* = chronos.minutes(1) + WakuArchiveDefaultMetricsReportInterval* = chronos.minutes(30) # Message validation # 20 seconds maximum allowable sender timestamp "drift" @@ -103,7 +103,7 @@ proc handleMessage*( else: getNanosecondTime(getTime().toUnixFloat()) - trace "handling message", + notice "archive handling message", msg_hash = msgHashHex, pubsubTopic = pubsubTopic, contentTopic = msg.contentTopic, @@ -117,7 +117,7 @@ proc handleMessage*( waku_archive_errors.inc(labelValues = [insertFailure]) error "failed to insert message", error = error - debug "message archived", + notice "message archived", msg_hash = msgHashHex, pubsubTopic = pubsubTopic, contentTopic = msg.contentTopic, diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index 3afd79c1f..00bee087b 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -883,7 +883,7 @@ proc acquireDatabaseLock*( proc releaseDatabaseLock*( s: PostgresDriver, lockId: int = 841886 ): Future[ArchiveDriverResult[void]] {.async.} = - ## Acquire an advisory lock (useful to avoid more than one application running migrations at the same time) + ## Release an advisory lock (useful to avoid more than one application running migrations at the same time) let unlocked = ( await s.getStr( fmt""" @@ -930,6 +930,16 @@ proc addPartition( "CREATE TABLE IF NOT EXISTS " & partitionName & " PARTITION OF " & "messages FOR VALUES FROM ('" & fromInNanoSec & "') TO ('" & untilInNanoSec & "');" + # Lock the db + (await self.acquireDatabaseLock()).isOkOr: + error "failed to acquire lock", error = error + return err("failed to lock the db") + + defer: + (await self.releaseDatabaseLock()).isOkOr: + error "failed to release lock", error = error + return err("failed to unlock the db.") + (await self.performWriteQuery(createPartitionQuery)).isOkOr: if error.contains("already exists"): debug "skip create new partition as it already exists: ", skipped_error = $error diff --git a/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim b/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim index b817282f5..e9072ceef 100644 --- a/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim +++ b/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim @@ -94,8 +94,6 @@ method getMessagesV2*( maxPageSize = DefaultPageSize, ascendingOrder = true, ): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async, deprecated.} = - echo "here" - let cursor = cursor.map(toDbCursor) let rowsRes = s.db.selectMessagesByHistoryQueryWithLimit( diff --git a/waku/waku_filter_v2/protocol.nim b/waku/waku_filter_v2/protocol.nim index 100791fa8..eff3d0990 100644 --- a/waku/waku_filter_v2/protocol.nim +++ b/waku/waku_filter_v2/protocol.nim @@ -152,7 +152,7 @@ proc handleSubscribeRequest*( return FilterSubscribeResponse.ok(request.requestId) proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} = - trace "pushing message to subscribed peer", peer = peer + trace "pushing message to subscribed peer", peer_id = shortLog(peer) if not wf.peerManager.peerStore.hasPeer(peer, WakuFilterPushCodec): # Check that peer has not been removed from peer store @@ -176,7 +176,7 @@ proc pushToPeers( let msgHash = messagePush.pubsubTopic.computeMessageHash(messagePush.wakuMessage).to0xHex() - debug "pushing message to subscribed peers", + notice "pushing message to subscribed peers", pubsubTopic = messagePush.pubsubTopic, contentTopic = messagePush.wakuMessage.contentTopic, target_peer_ids = targetPeerIds, @@ -216,7 +216,7 @@ proc handleMessage*( ) {.async.} = let msgHash = computeMessageHash(pubsubTopic, message).to0xHex() - debug "handling message", pubsubTopic = pubsubTopic, msg_hash = msgHash + notice "handling message", pubsubTopic = pubsubTopic, msg_hash = msgHash let handleMessageStartTime = Moment.now() @@ -225,8 +225,10 @@ proc handleMessage*( let subscribedPeers = wf.subscriptions.findSubscribedPeers(pubsubTopic, message.contentTopic) if subscribedPeers.len == 0: - debug "no subscribed peers found", - pubsubTopic = pubsubTopic, contentTopic = message.contentTopic + notice "no subscribed peers found", + pubsubTopic = pubsubTopic, + contentTopic = message.contentTopic, + msg_hash = msgHash return let messagePush = MessagePush(pubsubTopic: pubsubTopic, wakuMessage: message) @@ -242,7 +244,7 @@ proc handleMessage*( target_peer_ids = subscribedPeers.mapIt(shortLog(it)) waku_filter_errors.inc(labelValues = [pushTimeoutFailure]) else: - debug "pushed message succesfully to all subscribers", + notice "pushed message succesfully to all subscribers", pubsubTopic = pubsubTopic, contentTopic = message.contentTopic, msg_hash = msgHash, diff --git a/waku/waku_lightpush/protocol.nim b/waku/waku_lightpush/protocol.nim index 02bca0582..8cbfcf36b 100644 --- a/waku/waku_lightpush/protocol.nim +++ b/waku/waku_lightpush/protocol.nim @@ -50,11 +50,11 @@ proc handleRequest*( pubSubTopic = request.get().pubSubTopic message = request.get().message waku_lightpush_messages.inc(labelValues = ["PushRequest"]) - debug "push request", - peerId = peerId, + notice "lightpush request", + peer_id = peerId, requestId = requestId, pubsubTopic = pubsubTopic, - hash = pubsubTopic.computeMessageHash(message).to0xHex() + msg_hash = pubsubTopic.computeMessageHash(message).to0xHex() let handleRes = await wl.pushHandler(peerId, pubsubTopic, message) isSuccess = handleRes.isOk() diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index f5772f5c4..03d5b596e 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -206,15 +206,17 @@ proc generateOrderedValidator(w: WakuRelay): auto {.gcsafe.} = pubsubTopic = pubsubTopic, error = $error return ValidationResult.Reject - let msgHash = computeMessageHash(pubsubTopic, msg).to0xHex() - # now sequentially validate the message - for (validator, _) in w.wakuValidators: + for (validator, errorMessage) in w.wakuValidators: let validatorRes = await validator(pubsubTopic, msg) if validatorRes != ValidationResult.Accept: + let msgHash = computeMessageHash(pubsubTopic, msg).to0xHex() error "protocol generateOrderedValidator reject waku validator", - msg_hash = msgHash, pubsubTopic = pubsubTopic, validatorRes = validatorRes + msg_hash = msgHash, + pubsubTopic = pubsubTopic, + validatorRes = validatorRes, + error = errorMessage return validatorRes @@ -305,8 +307,8 @@ proc publish*( w: WakuRelay, pubsubTopic: PubsubTopic, message: WakuMessage ): Future[int] {.async.} = let data = message.encode().buffer - let msgHash = computeMessageHash(pubsubTopic, message).to0xHex() - debug "start publish Waku message", msg_hash = msgHash, pubsubTopic = pubsubTopic + let msgHash = computeMessageHash(pubsubTopic, message).to0xHex() + notice "start publish Waku message", msg_hash = msgHash, pubsubTopic = pubsubTopic return await procCall GossipSub(w).publish(pubsubTopic, data)