From 31f9c0dfee0d51dd592652d8b068c604ca28299d Mon Sep 17 00:00:00 2001 From: Simon-Pierre Date: Wed, 27 May 2026 10:13:48 -0400 Subject: [PATCH] pubsub topic instead of content topic --- apps/chat2disco/chat2disco.nim | 72 ++++++++++++++++++++++++---------- 1 file changed, 51 insertions(+), 21 deletions(-) diff --git a/apps/chat2disco/chat2disco.nim b/apps/chat2disco/chat2disco.nim index cb5260802..837b3c1cd 100644 --- a/apps/chat2disco/chat2disco.nim +++ b/apps/chat2disco/chat2disco.nim @@ -65,6 +65,7 @@ const Help = """ type ChatRoom = object serviceId*: string + pubsubTopic*: string contentTopic*: string discovered*: seq[RemotePeerInfo] @@ -77,6 +78,7 @@ type prompt: bool rooms: Table[string, ChatRoom] currentRoom: string + relayHandler: WakuRelayHandler PrivateKey* = crypto.PrivateKey Topic* = waku_core.PubsubTopic @@ -181,7 +183,7 @@ proc publish(c: Chat, line: string) = ) try: - (waitFor c.node.publish(some(DefaultPubsubTopic), message)).isOkOr: + (waitFor c.node.publish(some(room.pubsubTopic), message)).isOkOr: error "failed to publish message", error = error except CatchableError: error "caught error publishing message: ", error = getCurrentExceptionMsg() @@ -189,6 +191,7 @@ proc publish(c: Chat, line: string) = proc createRoom(c: Chat, roomName: string) {.async.} = let serviceIdStr = "/waku/chat-room/" & roomName & "/1.0.0" let contentTopic = "/chat2disco/1/" & roomName & "/proto" + let pubsubTopic = "/chat2disco/room/" & roomName let serviceInfo = ServiceInfo(id: serviceIdStr, data: @[]) @@ -204,15 +207,32 @@ proc createRoom(c: Chat, roomName: string) {.async.} = echo "Connected to discovered peers" c.rooms[roomName] = ChatRoom( - serviceId: serviceIdStr, contentTopic: contentTopic, discovered: peers + serviceId: serviceIdStr, pubsubTopic: pubsubTopic, + contentTopic: contentTopic, discovered: peers ) else: echo "Warning: Kademlia not available. Room created locally only." - c.rooms[roomName] = - ChatRoom(serviceId: serviceIdStr, contentTopic: contentTopic, discovered: @[]) + c.rooms[roomName] = ChatRoom( + serviceId: serviceIdStr, pubsubTopic: pubsubTopic, + contentTopic: contentTopic, discovered: @[] + ) + + # Unsubscribe from previous room's pubsub topic + if c.currentRoom.len > 0 and c.currentRoom in c.rooms: + let oldTopic = c.rooms[c.currentRoom].pubsubTopic + c.node.unsubscribe((kind: PubsubUnsub, topic: oldTopic)).isOkOr: + error "failed to unsubscribe from previous room", topic = oldTopic, error = error c.currentRoom = roomName - echo &"Created/joined room '{roomName}'. Content topic: {contentTopic}" + + # Subscribe to this room's pubsub topic + c.node.subscribe( + (kind: PubsubSub, topic: pubsubTopic), c.relayHandler + ).isOkOr: + error "failed to subscribe to room pubsub topic", + topic = pubsubTopic, error = error + + echo &"Created/joined room '{roomName}'. Pubsub topic: {pubsubTopic}" # TODO Implement proc writeAndPrint(c: Chat) {.async.} = @@ -230,7 +250,21 @@ proc writeAndPrint(c: Chat) {.async.} = continue if roomName in c.rooms: + # Unsubscribe from current room, subscribe to target room + if c.currentRoom.len > 0 and c.currentRoom != roomName and c.currentRoom in c.rooms: + let oldTopic = c.rooms[c.currentRoom].pubsubTopic + c.node.unsubscribe((kind: PubsubUnsub, topic: oldTopic)).isOkOr: + error "failed to unsubscribe from room", topic = oldTopic, error = error + c.currentRoom = roomName + + let newTopic = c.rooms[roomName].pubsubTopic + c.node.subscribe( + (kind: PubsubSub, topic: newTopic), c.relayHandler + ).isOkOr: + error "failed to subscribe to room pubsub topic", + topic = newTopic, error = error + echo &"Switched to room '{roomName}'" else: await c.createRoom(roomName) @@ -241,7 +275,7 @@ proc writeAndPrint(c: Chat) {.async.} = echo "Joined rooms:" for name, room in c.rooms: let marker = if name == c.currentRoom: " *" else: "" - echo &" {name} ({room.discovered.len} peers){marker}" + echo &" {name} (pubsub: {room.pubsubTopic}, {room.discovered.len} peers){marker}" elif line.startsWith("/nick"): c.nick = await readNick(c.transp) echo "You are now known as " & c.nick @@ -394,20 +428,12 @@ proc processInput(rfd: AsyncFD, rng: crypto.Rng) {.async.} = prompt: false, ) - let roomName = await readRoom(transp) - if roomName.len > 0: - await chat.createRoom(roomName) - - let peerInfo = node.switch.peerInfo - let listenStr = $peerInfo.addrs[0] & "/p2p/" & $peerInfo.peerId - echo &"Listening on\n {listenStr}" - - # Subscribe to relay topic + # Define relay handler for pubsub-topic-based routing if conf.relay: proc handler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = var matched = false for roomName, room in chat.rooms: - if msg.contentTopic == room.contentTopic: + if topic == room.pubsubTopic: let chatLine = getChatLine(msg.payload) let prefix = if chat.rooms.len > 1: @@ -431,11 +457,15 @@ proc processInput(rfd: AsyncFD, rng: crypto.Rng) {.async.} = chat.prompt = false showChatPrompt(chat) - node.subscribe( - (kind: PubsubSub, topic: DefaultPubsubTopic), WakuRelayHandler(handler) - ).isOkOr: - error "failed to subscribe to pubsub topic", - topic = DefaultPubsubTopic, error = error + chat.relayHandler = WakuRelayHandler(handler) + + let roomName = await readRoom(transp) + if roomName.len > 0: + await chat.createRoom(roomName) + + let peerInfo = node.switch.peerInfo + let listenStr = $peerInfo.addrs[0] & "/p2p/" & $peerInfo.peerId + echo &"Listening on\n {listenStr}" if conf.metricsLogging: startMetricsLog()