diff --git a/apps/chat2/chat2.nim b/apps/chat2/chat2.nim index 47ae862b6..775090ccd 100644 --- a/apps/chat2/chat2.nim +++ b/apps/chat2/chat2.nim @@ -50,7 +50,6 @@ const Help = """ const PayloadV1* {.booldefine.} = false - DefaultTopic* = "/waku/2/default-waku/proto" # XXX Connected is a bit annoying, because incoming connections don't trigger state change # Could poll connection pool or something here, I suppose @@ -175,7 +174,7 @@ proc printReceivedMessage(c: Chat, msg: WakuMessage) = echo &"{chatLine}" c.prompt = false showChatPrompt(c) - trace "Printing message", topic=DefaultTopic, chatLine, + trace "Printing message", topic=DefaultPubsubTopic, chatLine, contentTopic = msg.contentTopic else: debug "Invalid encoded WakuMessage payload", @@ -194,7 +193,7 @@ proc printReceivedMessage(c: Chat, msg: WakuMessage) = c.prompt = false showChatPrompt(c) - trace "Printing message", topic=DefaultTopic, chatLine, + trace "Printing message", topic=DefaultPubsubTopic, chatLine, contentTopic = msg.contentTopic proc readNick(transp: StreamTransport): Future[string] {.async.} = @@ -243,9 +242,9 @@ proc publish(c: Chat, line: string) = c.node.wakuRlnRelay.lastEpoch = message.proof.epoch if not c.node.wakuLightPush.isNil(): # Attempt lightpush - asyncSpawn c.node.lightpushPublish(DefaultTopic, message) + asyncSpawn c.node.lightpushPublish(DefaultPubsubTopic, message) else: - asyncSpawn c.node.publish(DefaultTopic, message, handler) + asyncSpawn c.node.publish(DefaultPubsubTopic, message, handler) else: warn "Payload encoding failed", error = encodedPayload.error else: @@ -272,9 +271,9 @@ proc publish(c: Chat, line: string) = if not c.node.wakuLightPush.isNil(): # Attempt lightpush - asyncSpawn c.node.lightpushPublish(DefaultTopic, message) + asyncSpawn c.node.lightpushPublish(DefaultPubsubTopic, message) else: - asyncSpawn c.node.publish(DefaultTopic, message) + asyncSpawn c.node.publish(DefaultPubsubTopic, message) # TODO This should read or be subscribe handler subscribe proc readAndPrint(c: Chat) {.async.} = @@ -327,7 +326,7 @@ proc writeAndPrint(c: Chat) {.async.} = if not c.node.wakuFilter.isNil(): echo "unsubscribing from content filters..." - await c.node.unsubscribe(pubsubTopic=DefaultTopic, contentTopics=c.contentTopic) + await c.node.unsubscribe(pubsubTopic=DefaultPubsubTopic, contentTopics=c.contentTopic) echo "quitting..." @@ -502,12 +501,12 @@ proc processInput(rfd: AsyncFD) {.async.} = node.setFilterPeer(parseRemotePeerInfo(conf.filternode)) - proc filterHandler(pubsubTopic: string, msg: WakuMessage) {.gcsafe.} = + proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.gcsafe.} = trace "Hit filter handler", contentTopic=msg.contentTopic chat.printReceivedMessage(msg) - await node.subscribe(pubsubTopic=DefaultTopic, contentTopics=chat.contentTopic, filterHandler) + await node.subscribe(pubsubTopic=DefaultPubsubTopic, contentTopics=chat.contentTopic, filterHandler) # Subscribe to a topic, if relay is mounted if conf.relay: @@ -522,7 +521,7 @@ proc processInput(rfd: AsyncFD) {.async.} = else: trace "Invalid encoded WakuMessage", error = decoded.error - let topic = cast[Topic](DefaultTopic) + let topic = DefaultPubsubTopic node.subscribe(topic, handler) when defined(rln) or defined(rlnzerokit): diff --git a/apps/chat2bridge/chat2bridge.nim b/apps/chat2bridge/chat2bridge.nim index b51a4d540..8cabd16e3 100644 --- a/apps/chat2bridge/chat2bridge.nim +++ b/apps/chat2bridge/chat2bridge.nim @@ -32,7 +32,6 @@ logScope: ################## const - DefaultTopic* = chat2.DefaultTopic DeduplQSize = 20 # Maximum number of seen messages to keep in deduplication queue ######### @@ -92,7 +91,7 @@ proc toChat2(cmb: Chat2MatterBridge, jsonNode: JsonNode) {.async.} = chat2_mb_transfers.inc(labelValues = ["mb_to_chat2"]) - await cmb.nodev2.publish(DefaultTopic, msg) + await cmb.nodev2.publish(DefaultPubsubTopic, msg) proc toMatterbridge(cmb: Chat2MatterBridge, msg: WakuMessage) {.gcsafe, raises: [Exception].} = if cmb.seen.containsOrAdd(msg.payload.hash()): @@ -195,13 +194,13 @@ proc start*(cmb: Chat2MatterBridge) {.async.} = # Bridging # Handle messages on Waku v2 and bridge to Matterbridge - proc relayHandler(pubsubTopic: string, data: seq[byte]) {.async, gcsafe, raises: [Defect].} = + proc relayHandler(pubsubTopic: PubsubTopic, data: seq[byte]) {.async, gcsafe, raises: [Defect].} = let msg = WakuMessage.decode(data) if msg.isOk(): trace "Bridging message from Chat2 to Matterbridge", msg=msg[] cmb.toMatterbridge(msg[]) - cmb.nodev2.subscribe(DefaultTopic, relayHandler) + cmb.nodev2.subscribe(DefaultPubsubTopic, relayHandler) proc stop*(cmb: Chat2MatterBridge) {.async.} = info "Stopping Chat2MatterBridge" diff --git a/apps/wakubridge/wakubridge.nim b/apps/wakubridge/wakubridge.nim index ad1a4cb43..fc208b0d0 100644 --- a/apps/wakubridge/wakubridge.nim +++ b/apps/wakubridge/wakubridge.nim @@ -302,7 +302,7 @@ proc start*(bridge: WakuBridge) {.async.} = bridge.nodev1.registerEnvReceivedHandler(handleEnvReceived) # Handle messages on Waku v2 and bridge to Waku v1 - proc relayHandler(pubsubTopic: string, data: seq[byte]) {.async, gcsafe.} = + proc relayHandler(pubsubTopic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} = let msg = WakuMessage.decode(data) if msg.isOk() and msg.get().isBridgeable(): try: diff --git a/tests/v2/test_peer_exchange.nim b/tests/v2/test_peer_exchange.nim index 8a6ef4bb5..668f83e72 100644 --- a/tests/v2/test_peer_exchange.nim +++ b/tests/v2/test_peer_exchange.nim @@ -11,7 +11,8 @@ import import ../../waku/v2/node/waku_node, ../../waku/v2/utils/peers, - ../test_helpers + ../test_helpers, + ./testlib/common procSuite "Peer Exchange": asyncTest "GossipSub (relay) peer exchange": @@ -43,7 +44,7 @@ procSuite "Peer Exchange": check: # Node 3 is informed of node 2 via peer exchange peer == node1.switch.peerInfo.peerId - topic == defaultTopic + topic == DefaultPubsubTopic peerRecords.countIt(it.peerId == node2.switch.peerInfo.peerId) == 1 if (not completionFut.completed()): diff --git a/tests/v2/test_rest_relay_api.nim b/tests/v2/test_rest_relay_api.nim index a0fbcbc34..85af8630c 100644 --- a/tests/v2/test_rest_relay_api.nim +++ b/tests/v2/test_rest_relay_api.nim @@ -45,7 +45,7 @@ suite "REST API - Relay": restServer.start() let pubSubTopics = @[ - PubSubTopicString("pubsub-topic-1"), + PubSubTopicString("pubsub-topic-1"), PubSubTopicString("pubsub-topic-2"), PubSubTopicString("pubsub-topic-3") ] @@ -204,7 +204,7 @@ suite "REST API - Relay": ] discard await client.relayPostSubscriptionsV1(newTopics) - let response = await client.relayPostMessagesV1(defaultTopic, RelayWakuMessage( + let response = await client.relayPostMessagesV1(DefaultPubsubTopic, RelayWakuMessage( payload: Base64String.encode("TEST-PAYLOAD"), contentTopic: some(ContentTopicString(defaultContentTopic)), timestamp: some(int64(2022)) diff --git a/tests/v2/test_waku_filter.nim b/tests/v2/test_waku_filter.nim index d7957493c..9086a61ef 100644 --- a/tests/v2/test_waku_filter.nim +++ b/tests/v2/test_waku_filter.nim @@ -56,7 +56,7 @@ suite "Waku Filter": let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo() let pushHandlerFuture = newFuture[(string, WakuMessage)]() - proc pushHandler(pubsubTopic: string, message: WakuMessage) {.gcsafe, closure.} = + proc pushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) {.gcsafe, closure.} = pushHandlerFuture.complete((pubsubTopic, message)) let @@ -99,7 +99,7 @@ suite "Waku Filter": let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo() var pushHandlerFuture = newFuture[void]() - proc pushHandler(pubsubTopic: string, message: WakuMessage) {.gcsafe, closure.} = + proc pushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) {.gcsafe, closure.} = pushHandlerFuture.complete() let @@ -151,7 +151,7 @@ suite "Waku Filter": let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo() var pushHandlerFuture = newFuture[void]() - proc pushHandler(pubsubTopic: string, message: WakuMessage) {.gcsafe, closure.} = + proc pushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) {.gcsafe, closure.} = pushHandlerFuture.complete() let @@ -216,7 +216,7 @@ suite "Waku Filter": let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo() var pushHandlerFuture = newFuture[void]() - proc pushHandler(pubsubTopic: string, message: WakuMessage) {.gcsafe, closure.} = + proc pushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) {.gcsafe, closure.} = pushHandlerFuture.complete() let diff --git a/tests/v2/test_waku_lightpush.nim b/tests/v2/test_waku_lightpush.nim index 874021202..3ecda1509 100644 --- a/tests/v2/test_waku_lightpush.nim +++ b/tests/v2/test_waku_lightpush.nim @@ -43,7 +43,7 @@ suite "Waku Lightpush": ## Given let handlerFuture = newFuture[(string, WakuMessage)]() - let handler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} = + let handler = proc(peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} = handlerFuture.complete((pubsubTopic, message)) return ok() @@ -87,7 +87,7 @@ suite "Waku Lightpush": let error = "test_failure" let handlerFuture = newFuture[void]() - let handler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} = + let handler = proc(peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} = handlerFuture.complete() return err(error) diff --git a/tests/v2/test_wakunode_lightpush.nim b/tests/v2/test_wakunode_lightpush.nim index b1cfe1aa5..1ef9faad1 100644 --- a/tests/v2/test_wakunode_lightpush.nim +++ b/tests/v2/test_wakunode_lightpush.nim @@ -43,7 +43,7 @@ procSuite "WakuNode - Lightpush": let message = fakeWakuMessage() var completionFutRelay = newFuture[bool]() - proc relayHandler(pubsubTopic: string, data: seq[byte]) {.async, gcsafe.} = + proc relayHandler(pubsubTopic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} = let msg = WakuMessage.decode(data).get() check: pubsubTopic == DefaultPubsubTopic diff --git a/tests/v2/test_wakunode_store.nim b/tests/v2/test_wakunode_store.nim index ab5de53df..05e443a2f 100644 --- a/tests/v2/test_wakunode_store.nim +++ b/tests/v2/test_wakunode_store.nim @@ -180,15 +180,15 @@ procSuite "WakuNode - Store": msg2 = fakeWakuMessage(payload="hello world2", ts=(timeOrigin + getNanoSecondTime(2))) msg3 = fakeWakuMessage(payload="hello world3", ts=(timeOrigin + getNanoSecondTime(3))) - require server.wakuStore.store.put(DefaultTopic, msg1).isOk() - require server.wakuStore.store.put(DefaultTopic, msg2).isOk() + require server.wakuStore.store.put(DefaultPubsubTopic, msg1).isOk() + require server.wakuStore.store.put(DefaultPubsubTopic, msg2).isOk() # Insert the same message in both node's store let receivedTime3 = now() + getNanosecondTime(10) digest3 = computeDigest(msg3) - require server.wakuStore.store.put(DefaultTopic, msg3, digest3, receivedTime3).isOk() - require client.wakuStore.store.put(DefaultTopic, msg3, digest3, receivedTime3).isOk() + require server.wakuStore.store.put(DefaultPubsubTopic, msg3, digest3, receivedTime3).isOk() + require client.wakuStore.store.put(DefaultPubsubTopic, msg3, digest3, receivedTime3).isOk() let serverPeer = server.peerInfo.toRemotePeerInfo() diff --git a/tests/v2/testlib/common.nim b/tests/v2/testlib/common.nim index 2f3416580..4684a2955 100644 --- a/tests/v2/testlib/common.nim +++ b/tests/v2/testlib/common.nim @@ -5,9 +5,9 @@ import ../../../waku/v2/protocol/waku_message, ../../../waku/v2/utils/time -const - DefaultPubsubTopic* = "/waku/2/default-waku/proto" - DefaultContentTopic* = ContentTopic("/waku/2/default-content/proto") +export + waku_message.DefaultPubsubTopic, + waku_message.DefaultContentTopic proc now*(): Timestamp = diff --git a/tools/simulation/quicksim2.nim b/tools/simulation/quicksim2.nim index 8321111a2..12490f98b 100644 --- a/tools/simulation/quicksim2.nim +++ b/tools/simulation/quicksim2.nim @@ -21,10 +21,6 @@ template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0] const sigWakuPath = sourceDir / ".." / ".." / "waku" / "v2" / "node" / "jsonrpc" / "jsonrpc_callsigs.nim" createRpcSigs(RpcHttpClient, sigWakuPath) -const defaultTopic = "/waku/2/default-waku/proto" - -const defaultContentTopic = ContentTopic("waku/2/default-content/proto") - const topicAmount = 10 #100 proc message(i: int): ProtoBuffer = @@ -45,7 +41,7 @@ for i in 0.. 0: message.timestamp diff --git a/waku/v2/protocol/waku_store/pagination.nim b/waku/v2/protocol/waku_store/pagination.nim index 9d2caf826..ebb98ce0f 100644 --- a/waku/v2/protocol/waku_store/pagination.nim +++ b/waku/v2/protocol/waku_store/pagination.nim @@ -37,7 +37,7 @@ proc computeDigest*(msg: WakuMessage): MessageDigest = # Computes the hash return ctx.finish() -proc compute*(T: type PagingIndex, msg: WakuMessage, receivedTime: Timestamp, pubsubTopic: string): T = +proc compute*(T: type PagingIndex, msg: WakuMessage, receivedTime: Timestamp, pubsubTopic: PubsubTopic): T = ## Takes a WakuMessage with received timestamp and returns its Index. let digest = computeDigest(msg) diff --git a/waku/v2/protocol/waku_store/protocol.nim b/waku/v2/protocol/waku_store/protocol.nim index 6ed29944d..0fe831279 100644 --- a/waku/v2/protocol/waku_store/protocol.nim +++ b/waku/v2/protocol/waku_store/protocol.nim @@ -37,8 +37,6 @@ logScope: const WakuStoreCodec* = "/vac/waku/store/2.0.0-beta4" - DefaultTopic* = "/waku/2/default-waku/proto" - MaxMessageTimestampVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" @@ -230,7 +228,7 @@ proc isValidMessage(msg: WakuMessage): bool = return lowerBound <= msg.timestamp and msg.timestamp <= upperBound -proc handleMessage*(w: WakuStore, pubsubTopic: string, msg: WakuMessage) = +proc handleMessage*(w: WakuStore, pubsubTopic: PubsubTopic, msg: WakuMessage) = if w.store.isNil(): # Messages should not be stored return