mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-08 00:43:06 +00:00
Merge branch 'master' into rlnv2-only
This commit is contained in:
commit
3b2413951b
12
.github/workflows/ci.yml
vendored
12
.github/workflows/ci.yml
vendored
@ -52,6 +52,7 @@ jobs:
|
||||
needs: changes
|
||||
if: ${{ needs.changes.outputs.v2 == 'true' || needs.changes.outputs.common == 'true' }}
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
os: [ubuntu-latest, macos-13]
|
||||
runs-on: ${{ matrix.os }}
|
||||
@ -82,6 +83,7 @@ jobs:
|
||||
needs: changes
|
||||
if: ${{ needs.changes.outputs.v2 == 'true' || needs.changes.outputs.common == 'true' }}
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
os: [ubuntu-latest, macos-13]
|
||||
runs-on: ${{ matrix.os }}
|
||||
@ -107,16 +109,16 @@ jobs:
|
||||
|
||||
- name: Run tests
|
||||
run: |
|
||||
if [ ${{ runner.os }} == "Linux" ]; then
|
||||
sudo docker run --rm -d -e POSTGRES_PASSWORD=test123 -p 5432:5432 postgres:15.4-alpine3.18
|
||||
fi
|
||||
|
||||
postgres_enabled=0
|
||||
if [ ${{ runner.os }} == "Linux" ]; then
|
||||
sudo docker run --rm -d -e POSTGRES_PASSWORD=test123 -p 5432:5432 postgres:15.4-alpine3.18
|
||||
postgres_enabled=1
|
||||
fi
|
||||
|
||||
make V=1 LOG_LEVEL=DEBUG QUICK_AND_DIRTY_COMPILER=1 POSTGRES=$postgres_enabled test testwakunode2
|
||||
export MAKEFLAGS="-j1"
|
||||
export NIMFLAGS="--colors:off -d:chronicles_colors:none"
|
||||
|
||||
make V=1 LOG_LEVEL=DEBUG QUICK_AND_DIRTY_COMPILER=1 POSTGRES=$postgres_enabled test testwakunode2>>>>>>> master
|
||||
|
||||
build-docker-image:
|
||||
needs: changes
|
||||
|
||||
21
CHANGELOG.md
21
CHANGELOG.md
@ -1,3 +1,24 @@
|
||||
## v0.28.1 (2024-05-29)
|
||||
|
||||
This patch release fixes the following bug:
|
||||
- Store node does not retrieve messages because the meta field is missing in queries.
|
||||
|
||||
### Bug Fix
|
||||
|
||||
- Commit that fixes the bug [8b42f199](https://github.com/waku-org/nwaku/commit/8b42f199baf4e00794c4cec4d8601c3f6c330a20)
|
||||
|
||||
This is a patch release that is fully backwards-compatible with release `v0.28.0`.
|
||||
|
||||
It supports the same [libp2p protocols](https://docs.libp2p.io/concepts/protocols/):
|
||||
| Protocol | Spec status | Protocol id |
|
||||
| ---: | :---: | :--- |
|
||||
| [`11/WAKU2-RELAY`](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/11/relay.md) | `stable` | `/vac/waku/relay/2.0.0` |
|
||||
| [`12/WAKU2-FILTER`](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/12/filter.md) | `draft` | `/vac/waku/filter/2.0.0-beta1` <br />`/vac/waku/filter-subscribe/2.0.0-beta1` <br />`/vac/waku/filter-push/2.0.0-beta1` |
|
||||
| [`13/WAKU2-STORE`](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/13/store.md) | `draft` | `/vac/waku/store/2.0.0-beta4` |
|
||||
| [`19/WAKU2-LIGHTPUSH`](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/19/lightpush.md) | `draft` | `/vac/waku/lightpush/2.0.0-beta1` |
|
||||
| [`66/WAKU2-METADATA`](https://github.com/waku-org/specs/blob/master/standards/core/metadata.md) | `raw` | `/vac/waku/metadata/1.0.0` |
|
||||
|
||||
|
||||
## v0.28.0 (2024-05-22)
|
||||
|
||||
## What's Changed
|
||||
|
||||
@ -169,7 +169,6 @@ proc parseCmdArg*(T: type EthRpcUrl, s: string): T =
|
||||
var wsPattern =
|
||||
re2"^(wss?):\/\/((localhost)|([\w_-]+(?:(?:\.[\w_-]+)+)))(:[0-9]{1,5})?([\w.,@?^=%&:\/~+#-]*[\w@?^=%&\/~+#-])*"
|
||||
if regex.match(s, wsPattern):
|
||||
echo "here"
|
||||
raise newException(
|
||||
ValueError, "Websocket RPC URL is not supported, Please use an HTTP URL"
|
||||
)
|
||||
|
||||
@ -104,14 +104,14 @@ proc createEthAccount(): Future[(keys.PrivateKey, Address)] {.async.} =
|
||||
|
||||
var tx: EthSend
|
||||
tx.source = accounts[0]
|
||||
tx.value = some(ethToWei(10.u256))
|
||||
tx.value = some(ethToWei(1000.u256))
|
||||
tx.to = some(acc)
|
||||
tx.gasPrice = some(gasPrice)
|
||||
|
||||
# Send 10 eth to acc
|
||||
discard await web3.send(tx)
|
||||
let balance = await web3.provider.eth_getBalance(acc, "latest")
|
||||
assert(balance == ethToWei(10.u256))
|
||||
assert(balance == ethToWei(1000.u256))
|
||||
|
||||
return (pk, acc)
|
||||
|
||||
@ -138,7 +138,7 @@ proc runAnvil(): Process =
|
||||
let runAnvil = startProcess(
|
||||
anvilPath,
|
||||
args = [
|
||||
"--port", "8540", "--gas-limit", "300000000000000", "--balance", "10000",
|
||||
"--port", "8540", "--gas-limit", "300000000000000", "--balance", "1000000000",
|
||||
"--chain-id", "1337",
|
||||
],
|
||||
options = {poUsePath},
|
||||
|
||||
2
vendor/nim-bearssl
vendored
2
vendor/nim-bearssl
vendored
@ -1 +1 @@
|
||||
Subproject commit 86f212c6a5d76b52e20fad2e318cc5436d04fc26
|
||||
Subproject commit a806cbfab5fe8de49c76139f8705fff79daf99ee
|
||||
@ -6,7 +6,7 @@ This folder contains code related to Waku, both as a node and as a protocol.
|
||||
|
||||
This is an implementation in Nim of the Waku suite of protocols.
|
||||
|
||||
See [specifications](https://rfc.vac.dev/spec/10/).
|
||||
See [specifications](https://rfc.vac.dev/waku/standards/core/10/waku2).
|
||||
|
||||
## How to Build & Run
|
||||
|
||||
|
||||
@ -9,6 +9,15 @@ import chronicles, eth/net/nat, stew/results, nativesockets
|
||||
logScope:
|
||||
topics = "nat"
|
||||
|
||||
## Due to the design of nim-eth/nat module we must ensure it is only initialized once.
|
||||
## see: https://github.com/waku-org/nwaku/issues/2628
|
||||
## Details: nim-eth/nat module starts a meaintenance thread for refreshing the NAT mappings, but everything in the module is global,
|
||||
## there is no room to store multiple configurations.
|
||||
## Exact meaning: redirectPorts cannot be called twice in a program lifetime.
|
||||
## During waku tests we happen to start several node instances in parallel thus resulting in multiple NAT configurations and multiple threads.
|
||||
## Those threads will dead lock each other in tear down.
|
||||
var singletonNat: bool = false
|
||||
|
||||
proc setupNat*(
|
||||
natConf, clientId: string, tcpPort, udpPort: Port
|
||||
): Result[
|
||||
@ -26,26 +35,35 @@ proc setupNat*(
|
||||
tuple[ip: Option[IpAddress], tcpPort: Option[Port], udpPort: Option[Port]]
|
||||
|
||||
if strategy != NatNone:
|
||||
let extIp = getExternalIP(strategy)
|
||||
if extIP.isSome():
|
||||
endpoint.ip = some(extIp.get())
|
||||
# RedirectPorts in considered a gcsafety violation
|
||||
# because it obtains the address of a non-gcsafe proc?
|
||||
var extPorts: Option[(Port, Port)]
|
||||
try:
|
||||
extPorts = (
|
||||
{.gcsafe.}:
|
||||
redirectPorts(tcpPort = tcpPort, udpPort = udpPort, description = clientId)
|
||||
)
|
||||
except CatchableError:
|
||||
# TODO: nat.nim Error: can raise an unlisted exception: Exception. Isolate here for now.
|
||||
error "unable to determine external ports"
|
||||
extPorts = none((Port, Port))
|
||||
## Only initialize the NAT module once
|
||||
## redirectPorts cannot be called twice in a program lifetime.
|
||||
## We can do it as same happens if getExternalIP fails and returns None
|
||||
if singletonNat:
|
||||
warn "NAT already initialized, skipping as cannot be done multiple times"
|
||||
else:
|
||||
singletonNat = true
|
||||
let extIp = getExternalIP(strategy)
|
||||
if extIP.isSome():
|
||||
endpoint.ip = some(extIp.get())
|
||||
# RedirectPorts in considered a gcsafety violation
|
||||
# because it obtains the address of a non-gcsafe proc?
|
||||
var extPorts: Option[(Port, Port)]
|
||||
try:
|
||||
extPorts = (
|
||||
{.gcsafe.}:
|
||||
redirectPorts(
|
||||
tcpPort = tcpPort, udpPort = udpPort, description = clientId
|
||||
)
|
||||
)
|
||||
except CatchableError:
|
||||
# TODO: nat.nim Error: can raise an unlisted exception: Exception. Isolate here for now.
|
||||
error "unable to determine external ports"
|
||||
extPorts = none((Port, Port))
|
||||
|
||||
if extPorts.isSome():
|
||||
let (extTcpPort, extUdpPort) = extPorts.get()
|
||||
endpoint.tcpPort = some(extTcpPort)
|
||||
endpoint.udpPort = some(extUdpPort)
|
||||
if extPorts.isSome():
|
||||
let (extTcpPort, extUdpPort) = extPorts.get()
|
||||
endpoint.tcpPort = some(extTcpPort)
|
||||
endpoint.udpPort = some(extUdpPort)
|
||||
else: # NatNone
|
||||
if not natConf.startsWith("extip:"):
|
||||
return err("not a valid NAT mechanism: " & $natConf)
|
||||
|
||||
@ -366,13 +366,6 @@ type WakuNodeConf* = object
|
||||
name: "filternode"
|
||||
.}: string
|
||||
|
||||
filterTimeout* {.
|
||||
desc:
|
||||
"Filter clients will be wiped out if not able to receive push messages within this timeout. In seconds.",
|
||||
defaultValue: 14400, # 4 hours
|
||||
name: "filter-timeout"
|
||||
.}: int64
|
||||
|
||||
filterSubscriptionTimeout* {.
|
||||
desc:
|
||||
"Timeout for filter subscription without ping or refresh it, in seconds. Only for v2 filter protocol.",
|
||||
|
||||
@ -380,7 +380,11 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} =
|
||||
pm.peerStore.hasPeer(peerId, WakuRelayCodec) and
|
||||
not metadata.shards.anyIt(pm.wakuMetadata.shards.contains(it))
|
||||
):
|
||||
reason = "no shards in common"
|
||||
let myShardsString = "[ " & toSeq(pm.wakuMetadata.shards).join(", ") & "]"
|
||||
let otherShardsString = "[ " & metadata.shards.join(", ") & "]"
|
||||
reason =
|
||||
"no shards in common: my_shards = " & myShardsString & " others_shards = " &
|
||||
otherShardsString
|
||||
break guardClauses
|
||||
|
||||
return
|
||||
|
||||
@ -225,7 +225,7 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
|
||||
return
|
||||
|
||||
proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
||||
debug "waku.relay received",
|
||||
notice "waku.relay received",
|
||||
my_peer_id = node.peerId,
|
||||
pubsubTopic = topic,
|
||||
msg_hash = topic.computeMessageHash(msg).to0xHex(),
|
||||
@ -356,10 +356,10 @@ proc publish*(
|
||||
#TODO instead of discard return error when 0 peers received the message
|
||||
discard await node.wakuRelay.publish(pubsubTopic, message)
|
||||
|
||||
trace "waku.relay published",
|
||||
notice "waku.relay published",
|
||||
peerId = node.peerId,
|
||||
pubsubTopic = pubsubTopic,
|
||||
hash = pubsubTopic.computeMessageHash(message).to0xHex(),
|
||||
msg_hash = pubsubTopic.computeMessageHash(message).to0xHex(),
|
||||
publishTime = getNowInNanosecondTime()
|
||||
|
||||
return ok()
|
||||
@ -952,7 +952,7 @@ proc mountLightPush*(
|
||||
if publishedCount == 0:
|
||||
## Agreed change expected to the lightpush protocol to better handle such case. https://github.com/waku-org/pm/issues/93
|
||||
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
|
||||
debug "Lightpush request has not been published to any peers",
|
||||
notice "Lightpush request has not been published to any peers",
|
||||
msg_hash = msgHash
|
||||
|
||||
return ok()
|
||||
@ -994,7 +994,7 @@ proc lightpushPublish*(
|
||||
): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
|
||||
let msgHash = pubsubTopic.computeMessageHash(message).to0xHex()
|
||||
if not node.wakuLightpushClient.isNil():
|
||||
debug "publishing message with lightpush",
|
||||
notice "publishing message with lightpush",
|
||||
pubsubTopic = pubsubTopic,
|
||||
contentTopic = message.contentTopic,
|
||||
target_peer_id = peer.peerId,
|
||||
@ -1002,7 +1002,7 @@ proc lightpushPublish*(
|
||||
return await node.wakuLightpushClient.publish(pubsubTopic, message, peer)
|
||||
|
||||
if not node.wakuLightPush.isNil():
|
||||
debug "publishing message with self hosted lightpush",
|
||||
notice "publishing message with self hosted lightpush",
|
||||
pubsubTopic = pubsubTopic,
|
||||
contentTopic = message.contentTopic,
|
||||
target_peer_id = peer.peerId,
|
||||
|
||||
@ -29,7 +29,7 @@ const
|
||||
WakuArchiveDefaultRetentionPolicyInterval* = chronos.minutes(30)
|
||||
|
||||
# Metrics reporting
|
||||
WakuArchiveDefaultMetricsReportInterval* = chronos.minutes(1)
|
||||
WakuArchiveDefaultMetricsReportInterval* = chronos.minutes(30)
|
||||
|
||||
# Message validation
|
||||
# 20 seconds maximum allowable sender timestamp "drift"
|
||||
@ -103,7 +103,7 @@ proc handleMessage*(
|
||||
else:
|
||||
getNanosecondTime(getTime().toUnixFloat())
|
||||
|
||||
trace "handling message",
|
||||
notice "archive handling message",
|
||||
msg_hash = msgHashHex,
|
||||
pubsubTopic = pubsubTopic,
|
||||
contentTopic = msg.contentTopic,
|
||||
@ -117,7 +117,7 @@ proc handleMessage*(
|
||||
waku_archive_errors.inc(labelValues = [insertFailure])
|
||||
error "failed to insert message", error = error
|
||||
|
||||
debug "message archived",
|
||||
notice "message archived",
|
||||
msg_hash = msgHashHex,
|
||||
pubsubTopic = pubsubTopic,
|
||||
contentTopic = msg.contentTopic,
|
||||
|
||||
@ -883,7 +883,7 @@ proc acquireDatabaseLock*(
|
||||
proc releaseDatabaseLock*(
|
||||
s: PostgresDriver, lockId: int = 841886
|
||||
): Future[ArchiveDriverResult[void]] {.async.} =
|
||||
## Acquire an advisory lock (useful to avoid more than one application running migrations at the same time)
|
||||
## Release an advisory lock (useful to avoid more than one application running migrations at the same time)
|
||||
let unlocked = (
|
||||
await s.getStr(
|
||||
fmt"""
|
||||
@ -930,6 +930,16 @@ proc addPartition(
|
||||
"CREATE TABLE IF NOT EXISTS " & partitionName & " PARTITION OF " &
|
||||
"messages FOR VALUES FROM ('" & fromInNanoSec & "') TO ('" & untilInNanoSec & "');"
|
||||
|
||||
# Lock the db
|
||||
(await self.acquireDatabaseLock()).isOkOr:
|
||||
error "failed to acquire lock", error = error
|
||||
return err("failed to lock the db")
|
||||
|
||||
defer:
|
||||
(await self.releaseDatabaseLock()).isOkOr:
|
||||
error "failed to release lock", error = error
|
||||
return err("failed to unlock the db.")
|
||||
|
||||
(await self.performWriteQuery(createPartitionQuery)).isOkOr:
|
||||
if error.contains("already exists"):
|
||||
debug "skip create new partition as it already exists: ", skipped_error = $error
|
||||
|
||||
@ -94,8 +94,6 @@ method getMessagesV2*(
|
||||
maxPageSize = DefaultPageSize,
|
||||
ascendingOrder = true,
|
||||
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async, deprecated.} =
|
||||
echo "here"
|
||||
|
||||
let cursor = cursor.map(toDbCursor)
|
||||
|
||||
let rowsRes = s.db.selectMessagesByHistoryQueryWithLimit(
|
||||
|
||||
@ -152,7 +152,7 @@ proc handleSubscribeRequest*(
|
||||
return FilterSubscribeResponse.ok(request.requestId)
|
||||
|
||||
proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} =
|
||||
trace "pushing message to subscribed peer", peer = peer
|
||||
trace "pushing message to subscribed peer", peer_id = shortLog(peer)
|
||||
|
||||
if not wf.peerManager.peerStore.hasPeer(peer, WakuFilterPushCodec):
|
||||
# Check that peer has not been removed from peer store
|
||||
@ -176,7 +176,7 @@ proc pushToPeers(
|
||||
let msgHash =
|
||||
messagePush.pubsubTopic.computeMessageHash(messagePush.wakuMessage).to0xHex()
|
||||
|
||||
debug "pushing message to subscribed peers",
|
||||
notice "pushing message to subscribed peers",
|
||||
pubsubTopic = messagePush.pubsubTopic,
|
||||
contentTopic = messagePush.wakuMessage.contentTopic,
|
||||
target_peer_ids = targetPeerIds,
|
||||
@ -216,7 +216,7 @@ proc handleMessage*(
|
||||
) {.async.} =
|
||||
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
|
||||
|
||||
debug "handling message", pubsubTopic = pubsubTopic, msg_hash = msgHash
|
||||
notice "handling message", pubsubTopic = pubsubTopic, msg_hash = msgHash
|
||||
|
||||
let handleMessageStartTime = Moment.now()
|
||||
|
||||
@ -225,8 +225,10 @@ proc handleMessage*(
|
||||
let subscribedPeers =
|
||||
wf.subscriptions.findSubscribedPeers(pubsubTopic, message.contentTopic)
|
||||
if subscribedPeers.len == 0:
|
||||
debug "no subscribed peers found",
|
||||
pubsubTopic = pubsubTopic, contentTopic = message.contentTopic
|
||||
notice "no subscribed peers found",
|
||||
pubsubTopic = pubsubTopic,
|
||||
contentTopic = message.contentTopic,
|
||||
msg_hash = msgHash
|
||||
return
|
||||
|
||||
let messagePush = MessagePush(pubsubTopic: pubsubTopic, wakuMessage: message)
|
||||
@ -242,7 +244,7 @@ proc handleMessage*(
|
||||
target_peer_ids = subscribedPeers.mapIt(shortLog(it))
|
||||
waku_filter_errors.inc(labelValues = [pushTimeoutFailure])
|
||||
else:
|
||||
debug "pushed message succesfully to all subscribers",
|
||||
notice "pushed message succesfully to all subscribers",
|
||||
pubsubTopic = pubsubTopic,
|
||||
contentTopic = message.contentTopic,
|
||||
msg_hash = msgHash,
|
||||
|
||||
@ -50,11 +50,11 @@ proc handleRequest*(
|
||||
pubSubTopic = request.get().pubSubTopic
|
||||
message = request.get().message
|
||||
waku_lightpush_messages.inc(labelValues = ["PushRequest"])
|
||||
debug "push request",
|
||||
peerId = peerId,
|
||||
notice "lightpush request",
|
||||
peer_id = peerId,
|
||||
requestId = requestId,
|
||||
pubsubTopic = pubsubTopic,
|
||||
hash = pubsubTopic.computeMessageHash(message).to0xHex()
|
||||
msg_hash = pubsubTopic.computeMessageHash(message).to0xHex()
|
||||
|
||||
let handleRes = await wl.pushHandler(peerId, pubsubTopic, message)
|
||||
isSuccess = handleRes.isOk()
|
||||
|
||||
@ -206,15 +206,17 @@ proc generateOrderedValidator(w: WakuRelay): auto {.gcsafe.} =
|
||||
pubsubTopic = pubsubTopic, error = $error
|
||||
return ValidationResult.Reject
|
||||
|
||||
let msgHash = computeMessageHash(pubsubTopic, msg).to0xHex()
|
||||
|
||||
# now sequentially validate the message
|
||||
for (validator, _) in w.wakuValidators:
|
||||
for (validator, errorMessage) in w.wakuValidators:
|
||||
let validatorRes = await validator(pubsubTopic, msg)
|
||||
|
||||
if validatorRes != ValidationResult.Accept:
|
||||
let msgHash = computeMessageHash(pubsubTopic, msg).to0xHex()
|
||||
error "protocol generateOrderedValidator reject waku validator",
|
||||
msg_hash = msgHash, pubsubTopic = pubsubTopic, validatorRes = validatorRes
|
||||
msg_hash = msgHash,
|
||||
pubsubTopic = pubsubTopic,
|
||||
validatorRes = validatorRes,
|
||||
error = errorMessage
|
||||
|
||||
return validatorRes
|
||||
|
||||
@ -305,8 +307,8 @@ proc publish*(
|
||||
w: WakuRelay, pubsubTopic: PubsubTopic, message: WakuMessage
|
||||
): Future[int] {.async.} =
|
||||
let data = message.encode().buffer
|
||||
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
|
||||
|
||||
debug "start publish Waku message", msg_hash = msgHash, pubsubTopic = pubsubTopic
|
||||
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
|
||||
notice "start publish Waku message", msg_hash = msgHash, pubsubTopic = pubsubTopic
|
||||
|
||||
return await procCall GossipSub(w).publish(pubsubTopic, data)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user