diff --git a/apps/chat2/chat2.nim b/apps/chat2/chat2.nim index 875e781a8..964a4ed3c 100644 --- a/apps/chat2/chat2.nim +++ b/apps/chat2/chat2.nim @@ -564,16 +564,11 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = # Subscribe to a topic, if relay is mounted if conf.relay: - proc handler(topic: Topic, data: seq[byte]) {.async, gcsafe.} = + proc handler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = trace "Hit subscribe handler", topic - let decoded = WakuMessage.decode(data) - - if decoded.isOk(): - if decoded.get().contentTopic == chat.contentTopic: - chat.printReceivedMessage(decoded.get()) - else: - trace "Invalid encoded WakuMessage", error = decoded.error + if msg.contentTopic == chat.contentTopic: + chat.printReceivedMessage(msg) let topic = DefaultPubsubTopic node.subscribe(topic, handler) diff --git a/apps/chat2bridge/chat2bridge.nim b/apps/chat2bridge/chat2bridge.nim index 9d5e6e1aa..cfa2716a0 100644 --- a/apps/chat2bridge/chat2bridge.nim +++ b/apps/chat2bridge/chat2bridge.nim @@ -194,15 +194,14 @@ proc start*(cmb: Chat2MatterBridge) {.async.} = # Always mount relay for bridge # `triggerSelf` is false on a `bridge` to avoid duplicates - await cmb.nodev2.mountRelay(triggerSelf = false) + await cmb.nodev2.mountRelay() + cmb.nodev2.wakuRelay.triggerSelf = false # Bridging # Handle messages on Waku v2 and bridge to Matterbridge - proc relayHandler(pubsubTopic: PubsubTopic, data: seq[byte]) {.async, gcsafe, raises: [Defect].} = - let msg = WakuMessage.decode(data) - if msg.isOk(): - trace "Bridging message from Chat2 to Matterbridge", msg=msg[] - cmb.toMatterbridge(msg[]) + proc relayHandler(pubsubTopic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = + trace "Bridging message from Chat2 to Matterbridge", msg=msg + cmb.toMatterbridge(msg) cmb.nodev2.subscribe(DefaultPubsubTopic, relayHandler) diff --git a/apps/networkmonitor/networkmonitor.nim b/apps/networkmonitor/networkmonitor.nim index 7fb6b40a2..7e95f49a6 100644 --- a/apps/networkmonitor/networkmonitor.nim +++ b/apps/networkmonitor/networkmonitor.nim @@ -306,13 +306,8 @@ proc subscribeAndHandleMessages(node: WakuNode, msgPerContentTopic: ContentTopicMessageTableRef) = # handle function - proc handler(pubsubTopic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} = - let messageRes = WakuMessage.decode(data) - if messageRes.isErr(): - warn "could not decode message", data=data, pubsubTopic=pubsubTopic - - let message = messageRes.get() - trace "rx message", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic + proc handler(pubsubTopic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = + trace "rx message", pubsubTopic=pubsubTopic, contentTopic=msg.contentTopic # If we reach a table limit size, remove c topics with the least messages. let tableSize = 100 @@ -322,10 +317,10 @@ proc subscribeAndHandleMessages(node: WakuNode, # TODO: Will overflow at some point # +1 if content topic existed, init to 1 otherwise - if msgPerContentTopic.hasKey(message.contentTopic): - msgPerContentTopic[message.contentTopic] += 1 + if msgPerContentTopic.hasKey(msg.contentTopic): + msgPerContentTopic[msg.contentTopic] += 1 else: - msgPerContentTopic[message.contentTopic] = 1 + msgPerContentTopic[msg.contentTopic] = 1 node.subscribe(pubsubTopic, handler) diff --git a/apps/wakubridge/wakubridge.nim b/apps/wakubridge/wakubridge.nim index bed56bb82..324028e31 100644 --- a/apps/wakubridge/wakubridge.nim +++ b/apps/wakubridge/wakubridge.nim @@ -264,7 +264,8 @@ proc start*(bridge: WakuBridge) {.async.} = # Always mount relay for bridge. # `triggerSelf` is false on a `bridge` to avoid duplicates - await bridge.nodev2.mountRelay(triggerSelf = false) + await bridge.nodev2.mountRelay() + bridge.nodev2.wakuRelay.triggerSelf = false # Bridging # Handle messages on Waku v1 and bridge to Waku v2 @@ -275,12 +276,11 @@ proc start*(bridge: WakuBridge) {.async.} = bridge.nodev1.registerEnvReceivedHandler(handleEnvReceived) # Handle messages on Waku v2 and bridge to Waku v1 - proc relayHandler(pubsubTopic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.decode(data) - if msg.isOk() and msg.get().isBridgeable(): + proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = + if msg.isBridgeable(): try: - trace "Bridging message from V2 to V1", msg=msg.tryGet() - bridge.toWakuV1(msg.tryGet()) + trace "Bridging message from V2 to V1", msg=msg + bridge.toWakuV1(msg) except ValueError: trace "Failed to convert message to Waku v1. Check content-topic format.", msg=msg waku_bridge_dropped.inc(labelValues = ["value_error"]) diff --git a/examples/v2/subscriber.nim b/examples/v2/subscriber.nim index b8e1cb913..26c23e39a 100644 --- a/examples/v2/subscriber.nim +++ b/examples/v2/subscriber.nim @@ -83,14 +83,13 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} = # any content topic can be chosen. make sure it matches the publisher let contentTopic = ContentTopic("/examples/1/pubsub-example/proto") - proc handler(pubsubTopic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} = - let message = WakuMessage.decode(data).value - let payloadStr = string.fromBytes(message.payload) - if message.contentTopic == contentTopic: + proc handler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = + let payloadStr = string.fromBytes(msg.payload) + if msg.contentTopic == contentTopic: notice "message received", payload=payloadStr, pubsubTopic=pubsubTopic, - contentTopic=message.contentTopic, - timestamp=message.timestamp + contentTopic=msg.contentTopic, + timestamp=msg.timestamp node.subscribe(pubSubTopic, handler) when isMainModule: diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index df612b51e..fe1ce1510 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -60,14 +60,11 @@ suite "WakuNode": await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) var completionFut = newFuture[bool]() - proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.decode(data) - if msg.isOk(): - let val = msg.value() - check: - topic == pubSubTopic - val.contentTopic == contentTopic - val.payload == payload + proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = + check: + topic == pubSubTopic + msg.contentTopic == contentTopic + msg.payload == payload completionFut.complete(true) node2.subscribe(pubSubTopic, relayHandler) diff --git a/tests/v2/test_wakunode_lightpush.nim b/tests/v2/test_wakunode_lightpush.nim index 88e6a45a3..d2eddc55e 100644 --- a/tests/v2/test_wakunode_lightpush.nim +++ b/tests/v2/test_wakunode_lightpush.nim @@ -43,10 +43,9 @@ suite "WakuNode - Lightpush": let message = fakeWakuMessage() var completionFutRelay = newFuture[bool]() - proc relayHandler(pubsubTopic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.decode(data).get() + proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = check: - pubsubTopic == DefaultPubsubTopic + topic == DefaultPubsubTopic msg == message completionFutRelay.complete(true) destNode.subscribe(DefaultPubsubTopic, relayHandler) diff --git a/tests/v2/waku_relay/test_waku_relay.nim b/tests/v2/waku_relay/test_waku_relay.nim index 9e2b4a4a0..ff6df1d69 100644 --- a/tests/v2/waku_relay/test_waku_relay.nim +++ b/tests/v2/waku_relay/test_waku_relay.nim @@ -16,14 +16,14 @@ import ../testlib/wakucore -proc noopRawHandler(): PubsubRawHandler = - var handler: PubsubRawHandler - handler = proc(pubsubTopic: PubsubTopic, data: seq[byte]): Future[void] {.gcsafe, noSideEffect.} = discard +proc noopRawHandler(): WakuRelayHandler = + var handler: WakuRelayHandler + handler = proc(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = discard handler -proc newTestWakuRelay(switch = newTestSwitch(), self = true): Future[WakuRelay] {.async.} = - let proto = WakuRelay.new(switch, triggerSelf = self).tryGet() +proc newTestWakuRelay(switch = newTestSwitch()): Future[WakuRelay] {.async.} = + let proto = WakuRelay.new(switch).tryGet() await proto.start() let protocolMatcher = proc(proto: string): bool {.gcsafe.} = @@ -85,7 +85,7 @@ suite "Waku Relay": topics.contains(networkC) ## When - nodeA.unsubscribeAll(networkA) + nodeA.unsubscribe(networkA) ## Then check: diff --git a/tests/v2/waku_relay/test_wakunode_relay.nim b/tests/v2/waku_relay/test_wakunode_relay.nim index 4a57e4a35..59447345f 100644 --- a/tests/v2/waku_relay/test_wakunode_relay.nim +++ b/tests/v2/waku_relay/test_wakunode_relay.nim @@ -94,14 +94,11 @@ suite "WakuNode - Relay": ) var completionFut = newFuture[bool]() - proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.decode(data) - if msg.isOk(): - let val = msg.value() - check: - topic == pubSubTopic - val.contentTopic == contentTopic - val.payload == payload + proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = + check: + topic == pubSubTopic + msg.contentTopic == contentTopic + msg.payload == payload completionFut.complete(true) node3.subscribe(pubSubTopic, relayHandler) @@ -182,19 +179,14 @@ suite "WakuNode - Relay": node2.wakuRelay.addValidator(pubSubTopic, validator) var completionFut = newFuture[bool]() - proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - debug "relayed pubsub topic:", topic - let msg = WakuMessage.decode(data) - if msg.isOk(): - let val = msg.value() - check: - topic == pubSubTopic - # check that only messages with contentTopic1 is relayed (but not contentTopic2) - val.contentTopic == contentTopic1 + proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = + check: + topic == pubSubTopic + # check that only messages with contentTopic1 is relayed (but not contentTopic2) + msg.contentTopic == contentTopic1 # relay handler is called completionFut.complete(true) - node3.subscribe(pubSubTopic, relayHandler) await sleepAsync(500.millis) @@ -269,14 +261,11 @@ suite "WakuNode - Relay": await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) var completionFut = newFuture[bool]() - proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.decode(data) - if msg.isOk(): - let val = msg.value() - check: - topic == pubSubTopic - val.contentTopic == contentTopic - val.payload == payload + proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = + check: + topic == pubSubTopic + msg.contentTopic == contentTopic + msg.payload == payload completionFut.complete(true) node1.subscribe(pubSubTopic, relayHandler) @@ -313,14 +302,11 @@ suite "WakuNode - Relay": await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) var completionFut = newFuture[bool]() - proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.decode(data) - if msg.isOk(): - let val = msg.value() - check: - topic == pubSubTopic - val.contentTopic == contentTopic - val.payload == payload + proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = + check: + topic == pubSubTopic + msg.contentTopic == contentTopic + msg.payload == payload completionFut.complete(true) node1.subscribe(pubSubTopic, relayHandler) @@ -361,14 +347,11 @@ suite "WakuNode - Relay": await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) var completionFut = newFuture[bool]() - proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.decode(data) - if msg.isOk(): - let val = msg.value() - check: - topic == pubSubTopic - val.contentTopic == contentTopic - val.payload == payload + proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = + check: + topic == pubSubTopic + msg.contentTopic == contentTopic + msg.payload == payload completionFut.complete(true) node1.subscribe(pubSubTopic, relayHandler) @@ -404,14 +387,11 @@ suite "WakuNode - Relay": await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) var completionFut = newFuture[bool]() - proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.decode(data) - if msg.isOk(): - let val = msg.value() - check: - topic == pubSubTopic - val.contentTopic == contentTopic - val.payload == payload + proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = + check: + topic == pubSubTopic + msg.contentTopic == contentTopic + msg.payload == payload completionFut.complete(true) node1.subscribe(pubSubTopic, relayHandler) @@ -447,14 +427,11 @@ suite "WakuNode - Relay": await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) var completionFut = newFuture[bool]() - proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.decode(data) - if msg.isOk(): - let val = msg.value() - check: - topic == pubSubTopic - val.contentTopic == contentTopic - val.payload == payload + proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = + check: + topic == pubSubTopic + msg.contentTopic == contentTopic + msg.payload == payload completionFut.complete(true) node1.subscribe(pubSubTopic, relayHandler) @@ -468,3 +445,54 @@ suite "WakuNode - Relay": (await completionFut.withTimeout(5.seconds)) == true await node1.stop() await node2.stop() + + asyncTest "Bad peers with low reputation are disconnected": + # Create 5 nodes + let nodes = toSeq(0..<5).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))) + await allFutures(nodes.mapIt(it.start())) + await allFutures(nodes.mapIt(it.mountRelay())) + + # subscribe all nodes to a topic + let topic = "topic" + for node in nodes: node.wakuRelay.subscribe(topic, nil) + await sleepAsync(500.millis) + + # connect nodes in full mesh + for i in 0..<5: + for j in 0..<5: + if i == j: + continue + let connOk = await nodes[i].peerManager.connectRelay(nodes[j].switch.peerInfo.toRemotePeerInfo()) + require connOk + + # connection triggers different actions, wait for them + await sleepAsync(1.seconds) + + # all peers are connected in a mesh, 4 conns each + for i in 0..<5: + check: + nodes[i].peerManager.switch.connManager.getConnections().len == 4 + + # node[0] publishes wrong messages (random bytes not decoding into WakuMessage) + for j in 0..<50: + discard await nodes[0].wakuRelay.publish(topic, urandom(1*(10^3))) + + # long wait, must be higher than the configured decayInterval (how often score is updated) + await sleepAsync(20.seconds) + + # all nodes lower the score of nodes[0] (will change if gossipsub params or amount of msg changes) + for i in 1..<5: + check: + nodes[i].wakuRelay.peerStats[nodes[0].switch.peerInfo.peerId].score == -249999.9 + + # nodes[0] was blacklisted from all other peers, no connections + check: + nodes[0].peerManager.switch.connManager.getConnections().len == 0 + + # the rest of the nodes now have 1 conn less (kicked nodes[0] out) + for i in 1..<5: + check: + nodes[i].peerManager.switch.connManager.getConnections().len == 3 + + # Stop all nodes + await allFutures(nodes.mapIt(it.stop())) diff --git a/tests/v2/waku_rln_relay/test_wakunode_rln_relay.nim b/tests/v2/waku_rln_relay/test_wakunode_rln_relay.nim index 1839478be..9462241ef 100644 --- a/tests/v2/waku_rln_relay/test_wakunode_rln_relay.nim +++ b/tests/v2/waku_rln_relay/test_wakunode_rln_relay.nim @@ -83,12 +83,10 @@ procSuite "WakuNode - RLN relay": await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) var completionFut = newFuture[bool]() - proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.decode(data) - if msg.isOk(): - debug "The received topic:", topic - if topic == rlnRelayPubSubTopic: - completionFut.complete(true) + proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = + debug "The received topic:", topic + if topic == rlnRelayPubSubTopic: + completionFut.complete(true) # mount the relay handler node3.subscribe(rlnRelayPubSubTopic, relayHandler) @@ -172,12 +170,10 @@ procSuite "WakuNode - RLN relay": # define a custom relay handler var completionFut = newFuture[bool]() - proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.decode(data) - if msg.isOk(): - debug "The received topic:", topic - if topic == rlnRelayPubSubTopic: - completionFut.complete(true) + proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = + debug "The received topic:", topic + if topic == rlnRelayPubSubTopic: + completionFut.complete(true) # mount the relay handler node3.subscribe(rlnRelayPubSubTopic, relayHandler) @@ -302,20 +298,17 @@ procSuite "WakuNode - RLN relay": var completionFut2 = newFuture[bool]() var completionFut3 = newFuture[bool]() var completionFut4 = newFuture[bool]() - proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.decode(data) - if msg.isOk(): - let wm = msg.value() - debug "The received topic:", topic - if topic == rlnRelayPubSubTopic: - if wm == wm1: - completionFut1.complete(true) - if wm == wm2: - completionFut2.complete(true) - if wm == wm3: - completionFut3.complete(true) - if wm == wm4: - completionFut4.complete(true) + proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = + debug "The received topic:", topic + if topic == rlnRelayPubSubTopic: + if msg == wm1: + completionFut1.complete(true) + if msg == wm2: + completionFut2.complete(true) + if msg == wm3: + completionFut3.complete(true) + if msg == wm4: + completionFut4.complete(true) # mount the relay handler for node3 diff --git a/tests/wakubridge/test_wakubridge.nim b/tests/wakubridge/test_wakubridge.nim index c89d67e4e..cc7cc4725 100644 --- a/tests/wakubridge/test_wakubridge.nim +++ b/tests/wakubridge/test_wakubridge.nim @@ -88,21 +88,20 @@ procSuite "WakuBridge": waitFor bridge.start() waitFor v2Node.start() - await v2Node.mountRelay(@[DefaultBridgeTopic], triggerSelf = false) + await v2Node.mountRelay(@[DefaultBridgeTopic]) + v2Node.wakuRelay.triggerSelf = false discard waitFor v1Node.rlpxConnect(newNode(bridge.nodev1.toENode())) waitFor waku_node.connectToNodes(v2Node, @[bridge.nodev2.switch.peerInfo.toRemotePeerInfo()]) var completionFut = newFuture[bool]() - proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.decode(data) - - if msg.isOk() and msg.value().version == 1: + proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = + if msg.version == 1: check: # Message fields are as expected - msg.value().contentTopic == contentTopic # Topic translation worked - string.fromBytes(msg.value().payload).contains("from V1") + msg.contentTopic == contentTopic # Topic translation worked + string.fromBytes(msg.payload).contains("from V1") completionFut.complete(true) diff --git a/waku/v2/node/jsonrpc/relay/handlers.nim b/waku/v2/node/jsonrpc/relay/handlers.nim index 81ded0507..31f50834e 100644 --- a/waku/v2/node/jsonrpc/relay/handlers.nim +++ b/waku/v2/node/jsonrpc/relay/handlers.nim @@ -66,7 +66,7 @@ proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageC # Unsubscribe all handlers from requested topics for topic in topics: - node.unsubscribeAll(topic) + node.unsubscribe(topic) cache.unsubscribe(topic) return true diff --git a/waku/v2/node/peer_manager/peer_manager.nim b/waku/v2/node/peer_manager/peer_manager.nim index d79400891..2f7cb47c0 100644 --- a/waku/v2/node/peer_manager/peer_manager.nim +++ b/waku/v2/node/peer_manager/peer_manager.nim @@ -359,7 +359,7 @@ proc new*(T: type PeerManager, storage: storage, initialBackoffInSec: initialBackoffInSec, backoffFactor: backoffFactor, - outPeersTarget: max(maxConnections div 10, 10), + outPeersTarget: max(maxConnections div 2, 10), maxFailedAttempts: maxFailedAttempts, colocationLimit: colocationLimit) diff --git a/waku/v2/node/rest/relay/handlers.nim b/waku/v2/node/rest/relay/handlers.nim index fdd01156d..6274f685a 100644 --- a/waku/v2/node/rest/relay/handlers.nim +++ b/waku/v2/node/rest/relay/handlers.nim @@ -88,7 +88,7 @@ proc installRelayDeleteSubscriptionsV1Handler*(router: var RestRouter, node: Wak # Unsubscribe all handlers from requested topics for topic in req: - node.unsubscribeAll(string(topic)) + node.unsubscribe(string(topic)) cache.unsubscribe(string(topic)) # Successfully unsubscribed from all requested topics diff --git a/waku/v2/node/rest/relay/topic_cache.nim b/waku/v2/node/rest/relay/topic_cache.nim index 2d2feb67a..301165087 100644 --- a/waku/v2/node/rest/relay/topic_cache.nim +++ b/waku/v2/node/rest/relay/topic_cache.nim @@ -23,7 +23,7 @@ type TopicCache* = MessageCache[PubSubTopic] ##### Message handler -type TopicCacheMessageHandler* = SubscriptionHandler +type TopicCacheMessageHandler* = WakuRelayHandler proc messageHandler*(cache: TopicCache): TopicCacheMessageHandler = diff --git a/waku/v2/node/waku_node.nim b/waku/v2/node/waku_node.nim index 1b0413610..2dde32f7b 100644 --- a/waku/v2/node/waku_node.nim +++ b/waku/v2/node/waku_node.nim @@ -269,14 +269,10 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) = await node.wakuArchive.handleMessage(topic, msg) - let defaultHandler = proc(topic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.decode(data) - if msg.isErr(): - return - - await traceHandler(topic, msg.value) - await filterHandler(topic, msg.value) - await archiveHandler(topic, msg.value) + let defaultHandler = proc(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = + await traceHandler(topic, msg) + await filterHandler(topic, msg) + await archiveHandler(topic, msg) node.wakuRelay.subscribe(topic, defaultHandler) @@ -302,27 +298,16 @@ proc subscribe*(node: WakuNode, topic: PubsubTopic, handler: WakuRelayHandler) = node.registerRelayDefaultHandler(topic) node.wakuRelay.subscribe(topic, handler) -proc unsubscribe*(node: WakuNode, topic: PubsubTopic, handler: WakuRelayHandler) = - ## Unsubscribes a handler from a PubSub topic. +proc unsubscribe*(node: WakuNode, topic: PubsubTopic) = + ## Unsubscribes from a specific PubSub topic. + if node.wakuRelay.isNil(): error "Invalid API call to `unsubscribe`. WakuRelay not mounted." return - debug "unsubscribe", oubsubTopic= topic + info "unsubscribe", pubsubTopic=topic - let wakuRelay = node.wakuRelay - wakuRelay.unsubscribe(@[(topic, handler)]) - -proc unsubscribeAll*(node: WakuNode, topic: PubsubTopic) = - ## Unsubscribes all handlers registered on a specific PubSub topic. - - if node.wakuRelay.isNil(): - error "Invalid API call to `unsubscribeAll`. WakuRelay not mounted." - return - - info "unsubscribeAll", topic=topic - - node.wakuRelay.unsubscribeAll(topic) + node.wakuRelay.unsubscribe(topic) proc publish*(node: WakuNode, topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = @@ -370,7 +355,6 @@ proc startRelay*(node: WakuNode) {.async.} = proc mountRelay*(node: WakuNode, topics: seq[string] = @[], - triggerSelf = true, peerExchangeHandler = none(RoutingRecordsHandler)) {.async, gcsafe.} = if not node.wakuRelay.isNil(): error "wakuRelay already mounted, skipping" @@ -379,10 +363,7 @@ proc mountRelay*(node: WakuNode, ## The default relay topics is the union of all configured topics plus default PubsubTopic(s) info "mounting relay protocol" - let initRes = WakuRelay.new( - node.switch, - triggerSelf = triggerSelf - ) + let initRes = WakuRelay.new(node.switch) if initRes.isErr(): error "failed mounting relay protocol", error=initRes.error return diff --git a/waku/v2/waku_relay/protocol.nim b/waku/v2/waku_relay/protocol.nim index aad03741b..2bd054769 100644 --- a/waku/v2/waku_relay/protocol.nim +++ b/waku/v2/waku_relay/protocol.nim @@ -16,6 +16,7 @@ import libp2p/multihash, libp2p/protocols/pubsub/pubsub, libp2p/protocols/pubsub/gossipsub, + libp2p/protocols/pubsub/rpc/messages, libp2p/stream/connection, libp2p/switch import @@ -29,19 +30,100 @@ logScope: const WakuRelayCodec* = "/vac/waku/relay/2.0.0" +# see: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#overview-of-new-parameters +const TopicParameters = TopicParams( + topicWeight: 1, -type WakuRelayResult*[T] = Result[T, string] - -type - PubsubRawHandler* = proc(pubsubTopic: PubsubTopic, data: seq[byte]): Future[void] {.gcsafe, raises: [Defect].} - SubscriptionHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage): Future[void] {.gcsafe, raises: [Defect].} + # p1: favours peers already in the mesh + timeInMeshWeight: 0.01, + timeInMeshQuantum: 1.seconds, + timeInMeshCap: 10.0, + + # p2: rewards fast peers + firstMessageDeliveriesWeight: 1.0, + firstMessageDeliveriesDecay: 0.5, + firstMessageDeliveriesCap: 10.0, + + # p3: penalizes lazy peers. safe low value + meshMessageDeliveriesWeight: 0.0, + meshMessageDeliveriesDecay: 0.0, + meshMessageDeliveriesCap: 0, + meshMessageDeliveriesThreshold: 0, + meshMessageDeliveriesWindow: 0.milliseconds, + meshMessageDeliveriesActivation: 0.seconds, + + # p3b: tracks history of prunes + meshFailurePenaltyWeight: 0.0, + meshFailurePenaltyDecay: 0.0, + + # p4: penalizes invalid messages. highly penalize + # peers sending wrong messages + invalidMessageDeliveriesWeight: -100.0, + invalidMessageDeliveriesDecay: 0.5 + ) + +# see: https://rfc.vac.dev/spec/29/#gossipsub-v10-parameters +const GossipsubParameters = GossipSubParams( + explicit: true, + pruneBackoff: chronos.minutes(1), + unsubscribeBackoff: chronos.seconds(5), + floodPublish: true, + gossipFactor: 0.25, + + d: 6, + dLow: 4, + dHigh: 12, + dScore: 6, + dOut: 3, + dLazy: 6, + + heartbeatInterval: chronos.seconds(1), + historyLength: 6, + historyGossip: 3, + fanoutTTL: chronos.minutes(1), + seenTTL: chronos.minutes(2), + + # no gossip is sent to peers below this score + gossipThreshold: -100, + + # no self-published msgs are sent to peers below this score + publishThreshold: -1000, + + # used to trigger disconnections + ignore peer if below this score + graylistThreshold: -10000, + + # grafts better peers if the mesh median score drops below this. unset. + opportunisticGraftThreshold: 0, + + # how often peer scoring is updated + decayInterval: chronos.seconds(12), + + # below this we consider the parameter to be zero + decayToZero: 0.01, + + # remember peer score during x after it disconnects + retainScore: chronos.minutes(10), + + # p5: application specific, unset + appSpecificWeight: 0.0, + + # p6: penalizes peers sharing more than threshold ips + ipColocationFactorWeight: -50.0, + ipColocationFactorThreshold: 5.0, + + # p7: penalizes bad behaviour (weight and decay) + behaviourPenaltyWeight: -10.0, + behaviourPenaltyDecay: 0.986, + + # triggers disconnections of bad peers aka score