Add MessageIdProvider for `WakuRelay` (#803)

* Add MessageIdProvider

* Fix unit test. Changelog.
This commit is contained in:
Hanno Cornelius 2022-01-10 15:07:01 +01:00 committed by GitHub
parent 685d43b2b3
commit 62dbb3d0f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 23 additions and 12 deletions

View File

@ -15,6 +15,9 @@ which is a sequence of string.
- All `HistoryResponse` messages are now auto-paginated to a maximum of 100 messages per response - All `HistoryResponse` messages are now auto-paginated to a maximum of 100 messages per response
- Increased maximum length for reading from a libp2p input stream to allow largest possible protocol messages, including `HistoryResponse` messages at max size. - Increased maximum length for reading from a libp2p input stream to allow largest possible protocol messages, including `HistoryResponse` messages at max size.
### Fixes
- Added GossipSub `MessageIdProvider` for `11/WAKU2-RELAY` messages.
## 2021-11-05 v0.6 ## 2021-11-05 v0.6
Some useful features and fixes in this release, include: Some useful features and fixes in this release, include:

View File

@ -128,8 +128,10 @@ procSuite "Waku v2 JSON-RPC API":
node3 = WakuNode.new(nodeKey3, bindIp, Port(60003), some(extIp), some(port)) node3 = WakuNode.new(nodeKey3, bindIp, Port(60003), some(extIp), some(port))
pubSubTopic = "polling" pubSubTopic = "polling"
contentTopic = defaultContentTopic contentTopic = defaultContentTopic
payload = @[byte 9] payload1 = @[byte 9]
message = WakuMessage(payload: payload, contentTopic: contentTopic) message1 = WakuMessage(payload: payload1, contentTopic: contentTopic)
payload2 = @[byte 8]
message2 = WakuMessage(payload: payload2, contentTopic: contentTopic)
await node1.start() await node1.start()
node1.mountRelay(@[pubSubTopic]) node1.mountRelay(@[pubSubTopic])
@ -157,7 +159,7 @@ procSuite "Waku v2 JSON-RPC API":
await client.connect("127.0.0.1", rpcPort) await client.connect("127.0.0.1", rpcPort)
# First see if we can retrieve messages published on the default topic (node is already subscribed) # First see if we can retrieve messages published on the default topic (node is already subscribed)
await node2.publish(defaultTopic, message) await node2.publish(defaultTopic, message1)
await sleepAsync(2000.millis) await sleepAsync(2000.millis)
@ -166,7 +168,7 @@ procSuite "Waku v2 JSON-RPC API":
check: check:
messages.len == 1 messages.len == 1
messages[0].contentTopic == contentTopic messages[0].contentTopic == contentTopic
messages[0].payload == payload messages[0].payload == payload1
# Ensure that read messages are cleared from cache # Ensure that read messages are cleared from cache
messages = await client.get_waku_v2_relay_v1_messages(pubSubTopic) messages = await client.get_waku_v2_relay_v1_messages(pubSubTopic)
@ -184,7 +186,7 @@ procSuite "Waku v2 JSON-RPC API":
response == true response == true
# Now publish a message on node1 and see if we receive it on node3 # Now publish a message on node1 and see if we receive it on node3
await node1.publish(pubSubTopic, message) await node1.publish(pubSubTopic, message2)
await sleepAsync(2000.millis) await sleepAsync(2000.millis)
@ -193,7 +195,7 @@ procSuite "Waku v2 JSON-RPC API":
check: check:
messages.len == 1 messages.len == 1
messages[0].contentTopic == contentTopic messages[0].contentTopic == contentTopic
messages[0].payload == payload messages[0].payload == payload2
# Ensure that read messages are cleared from cache # Ensure that read messages are cleared from cache
messages = await client.get_waku_v2_relay_v1_messages(pubSubTopic) messages = await client.get_waku_v2_relay_v1_messages(pubSubTopic)

View File

@ -1,7 +1,7 @@
{.push raises: [Defect].} {.push raises: [Defect].}
import import
std/[options, tables, strutils, sequtils, os], std/[hashes, options, tables, strutils, sequtils, os],
chronos, chronicles, metrics, chronos, chronicles, metrics,
stew/shims/net as stewNet, stew/shims/net as stewNet,
stew/byteutils, stew/byteutils,
@ -9,9 +9,9 @@ import
eth/p2p/discoveryv5/enr, eth/p2p/discoveryv5/enr,
libp2p/crypto/crypto, libp2p/crypto/crypto,
libp2p/protocols/ping, libp2p/protocols/ping,
libp2p/protocols/pubsub/gossipsub, libp2p/protocols/pubsub/[gossipsub, rpc/messages],
libp2p/nameresolving/dnsresolver, libp2p/nameresolving/dnsresolver,
libp2p/builders, libp2p/[builders, multihash],
libp2p/transports/[transport, tcptransport, wstransport], libp2p/transports/[transport, tcptransport, wstransport],
../protocol/[waku_relay, waku_message], ../protocol/[waku_relay, waku_message],
../protocol/waku_store/waku_store, ../protocol/waku_store/waku_store,
@ -632,12 +632,18 @@ proc mountRelay*(node: WakuNode,
relayMessages = true, relayMessages = true,
triggerSelf = true) triggerSelf = true)
# @TODO: Better error handling: CatchableError is raised by `waitFor` # @TODO: Better error handling: CatchableError is raised by `waitFor`
{.gcsafe, raises: [Defect, InitializationError, LPError, CatchableError].} = {.gcsafe, raises: [Defect, InitializationError, LPError, CatchableError].} =
func msgIdProvider(m: messages.Message): seq[byte] =
let mh = MultiHash.digest("sha2-256", m.data)
if mh.isOk():
return mh[].data.buffer
else:
return ($m.data.hash).toBytes()
let wakuRelay = WakuRelay.init( let wakuRelay = WakuRelay.init(
switch = node.switch, switch = node.switch,
# Use default msgIdProvider = msgIdProvider,
#msgIdProvider = msgIdProvider,
triggerSelf = triggerSelf, triggerSelf = triggerSelf,
sign = false, sign = false,
verifySignature = false, verifySignature = false,