pubsub topic instead of content topic

This commit is contained in:
Simon-Pierre 2026-05-27 10:13:48 -04:00
parent d6c2d61c87
commit 31f9c0dfee
No known key found for this signature in database
GPG Key ID: C9458A8CB1852951

View File

@ -65,6 +65,7 @@ const Help = """
type
ChatRoom = object
serviceId*: string
pubsubTopic*: string
contentTopic*: string
discovered*: seq[RemotePeerInfo]
@ -77,6 +78,7 @@ type
prompt: bool
rooms: Table[string, ChatRoom]
currentRoom: string
relayHandler: WakuRelayHandler
PrivateKey* = crypto.PrivateKey
Topic* = waku_core.PubsubTopic
@ -181,7 +183,7 @@ proc publish(c: Chat, line: string) =
)
try:
(waitFor c.node.publish(some(DefaultPubsubTopic), message)).isOkOr:
(waitFor c.node.publish(some(room.pubsubTopic), message)).isOkOr:
error "failed to publish message", error = error
except CatchableError:
error "caught error publishing message: ", error = getCurrentExceptionMsg()
@ -189,6 +191,7 @@ proc publish(c: Chat, line: string) =
proc createRoom(c: Chat, roomName: string) {.async.} =
let serviceIdStr = "/waku/chat-room/" & roomName & "/1.0.0"
let contentTopic = "/chat2disco/1/" & roomName & "/proto"
let pubsubTopic = "/chat2disco/room/" & roomName
let serviceInfo = ServiceInfo(id: serviceIdStr, data: @[])
@ -204,15 +207,32 @@ proc createRoom(c: Chat, roomName: string) {.async.} =
echo "Connected to discovered peers"
c.rooms[roomName] = ChatRoom(
serviceId: serviceIdStr, contentTopic: contentTopic, discovered: peers
serviceId: serviceIdStr, pubsubTopic: pubsubTopic,
contentTopic: contentTopic, discovered: peers
)
else:
echo "Warning: Kademlia not available. Room created locally only."
c.rooms[roomName] =
ChatRoom(serviceId: serviceIdStr, contentTopic: contentTopic, discovered: @[])
c.rooms[roomName] = ChatRoom(
serviceId: serviceIdStr, pubsubTopic: pubsubTopic,
contentTopic: contentTopic, discovered: @[]
)
# Unsubscribe from previous room's pubsub topic
if c.currentRoom.len > 0 and c.currentRoom in c.rooms:
let oldTopic = c.rooms[c.currentRoom].pubsubTopic
c.node.unsubscribe((kind: PubsubUnsub, topic: oldTopic)).isOkOr:
error "failed to unsubscribe from previous room", topic = oldTopic, error = error
c.currentRoom = roomName
echo &"Created/joined room '{roomName}'. Content topic: {contentTopic}"
# Subscribe to this room's pubsub topic
c.node.subscribe(
(kind: PubsubSub, topic: pubsubTopic), c.relayHandler
).isOkOr:
error "failed to subscribe to room pubsub topic",
topic = pubsubTopic, error = error
echo &"Created/joined room '{roomName}'. Pubsub topic: {pubsubTopic}"
# TODO Implement
proc writeAndPrint(c: Chat) {.async.} =
@ -230,7 +250,21 @@ proc writeAndPrint(c: Chat) {.async.} =
continue
if roomName in c.rooms:
# Unsubscribe from current room, subscribe to target room
if c.currentRoom.len > 0 and c.currentRoom != roomName and c.currentRoom in c.rooms:
let oldTopic = c.rooms[c.currentRoom].pubsubTopic
c.node.unsubscribe((kind: PubsubUnsub, topic: oldTopic)).isOkOr:
error "failed to unsubscribe from room", topic = oldTopic, error = error
c.currentRoom = roomName
let newTopic = c.rooms[roomName].pubsubTopic
c.node.subscribe(
(kind: PubsubSub, topic: newTopic), c.relayHandler
).isOkOr:
error "failed to subscribe to room pubsub topic",
topic = newTopic, error = error
echo &"Switched to room '{roomName}'"
else:
await c.createRoom(roomName)
@ -241,7 +275,7 @@ proc writeAndPrint(c: Chat) {.async.} =
echo "Joined rooms:"
for name, room in c.rooms:
let marker = if name == c.currentRoom: " *" else: ""
echo &" {name} ({room.discovered.len} peers){marker}"
echo &" {name} (pubsub: {room.pubsubTopic}, {room.discovered.len} peers){marker}"
elif line.startsWith("/nick"):
c.nick = await readNick(c.transp)
echo "You are now known as " & c.nick
@ -394,20 +428,12 @@ proc processInput(rfd: AsyncFD, rng: crypto.Rng) {.async.} =
prompt: false,
)
let roomName = await readRoom(transp)
if roomName.len > 0:
await chat.createRoom(roomName)
let peerInfo = node.switch.peerInfo
let listenStr = $peerInfo.addrs[0] & "/p2p/" & $peerInfo.peerId
echo &"Listening on\n {listenStr}"
# Subscribe to relay topic
# Define relay handler for pubsub-topic-based routing
if conf.relay:
proc handler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
var matched = false
for roomName, room in chat.rooms:
if msg.contentTopic == room.contentTopic:
if topic == room.pubsubTopic:
let chatLine = getChatLine(msg.payload)
let prefix =
if chat.rooms.len > 1:
@ -431,11 +457,15 @@ proc processInput(rfd: AsyncFD, rng: crypto.Rng) {.async.} =
chat.prompt = false
showChatPrompt(chat)
node.subscribe(
(kind: PubsubSub, topic: DefaultPubsubTopic), WakuRelayHandler(handler)
).isOkOr:
error "failed to subscribe to pubsub topic",
topic = DefaultPubsubTopic, error = error
chat.relayHandler = WakuRelayHandler(handler)
let roomName = await readRoom(transp)
if roomName.len > 0:
await chat.createRoom(roomName)
let peerInfo = node.switch.peerInfo
let listenStr = $peerInfo.addrs[0] & "/p2p/" & $peerInfo.peerId
echo &"Listening on\n {listenStr}"
if conf.metricsLogging:
startMetricsLog()