diff --git a/apps/chat2disco/chat2disco.nim b/apps/chat2disco/chat2disco.nim index 837b3c1cd..100d7f173 100644 --- a/apps/chat2disco/chat2disco.nim +++ b/apps/chat2disco/chat2disco.nim @@ -207,14 +207,18 @@ proc createRoom(c: Chat, roomName: string) {.async.} = echo "Connected to discovered peers" c.rooms[roomName] = ChatRoom( - serviceId: serviceIdStr, pubsubTopic: pubsubTopic, - contentTopic: contentTopic, discovered: peers + serviceId: serviceIdStr, + pubsubTopic: pubsubTopic, + contentTopic: contentTopic, + discovered: peers, ) else: echo "Warning: Kademlia not available. Room created locally only." c.rooms[roomName] = ChatRoom( - serviceId: serviceIdStr, pubsubTopic: pubsubTopic, - contentTopic: contentTopic, discovered: @[] + serviceId: serviceIdStr, + pubsubTopic: pubsubTopic, + contentTopic: contentTopic, + discovered: @[], ) # Unsubscribe from previous room's pubsub topic @@ -226,11 +230,8 @@ proc createRoom(c: Chat, roomName: string) {.async.} = c.currentRoom = roomName # Subscribe to this room's pubsub topic - c.node.subscribe( - (kind: PubsubSub, topic: pubsubTopic), c.relayHandler - ).isOkOr: - error "failed to subscribe to room pubsub topic", - topic = pubsubTopic, error = error + c.node.subscribe((kind: PubsubSub, topic: pubsubTopic), c.relayHandler).isOkOr: + error "failed to subscribe to room pubsub topic", topic = pubsubTopic, error = error echo &"Created/joined room '{roomName}'. Pubsub topic: {pubsubTopic}" @@ -251,7 +252,8 @@ proc writeAndPrint(c: Chat) {.async.} = if roomName in c.rooms: # Unsubscribe from current room, subscribe to target room - if c.currentRoom.len > 0 and c.currentRoom != roomName and c.currentRoom in c.rooms: + if c.currentRoom.len > 0 and c.currentRoom != roomName and + c.currentRoom in c.rooms: let oldTopic = c.rooms[c.currentRoom].pubsubTopic c.node.unsubscribe((kind: PubsubUnsub, topic: oldTopic)).isOkOr: error "failed to unsubscribe from room", topic = oldTopic, error = error @@ -259,9 +261,7 @@ proc writeAndPrint(c: Chat) {.async.} = c.currentRoom = roomName let newTopic = c.rooms[roomName].pubsubTopic - c.node.subscribe( - (kind: PubsubSub, topic: newTopic), c.relayHandler - ).isOkOr: + c.node.subscribe((kind: PubsubSub, topic: newTopic), c.relayHandler).isOkOr: error "failed to subscribe to room pubsub topic", topic = newTopic, error = error @@ -373,6 +373,11 @@ proc processInput(rfd: AsyncFD, rng: crypto.Rng) {.async.} = node.announcedAddresses.setLen(0) ## remove previous addresses node.announcedAddresses.add(addresses) + # Also set the PeerInfo announcedAddrs (newer libp2p short-circuits mappers in expandAddrs + # when non-empty; reduces reliance on the waku capturing mapper during start + hp races). + if not node.switch.isNil and not node.switch.peerInfo.isNil: + node.switch.peerInfo.announcedAddrs = addresses + info "chat2disco node announced addresses updated", announcedAddresses = node.announcedAddresses @@ -383,6 +388,17 @@ proc processInput(rfd: AsyncFD, rng: crypto.Rng) {.async.} = node.switch.services = @[Service(holePunchService)] + # Call setup explicitly (bypasses the deprecated switch.add which used to do it). + # Required after the libp2p update so that AutonatService (and AutoRelay via HP) + # populate their addressMapper / handlers before switch.start runs the services + # (otherwise enableAddressMapper=true + nil mapper leads to nil call / SEGV at 589). + try: + holePunchService.setup(node.switch) + except CatchableError as exc: + error "failed to setup hole punch / autonat service (address mappers etc.)", + error = exc.msg + quit(QuitFailure) + if conf.relay: (await node.mountRelay()).isOkOr: error "failed to mount relay", error diff --git a/waku/discovery/autonat_service.nim b/waku/discovery/autonat_service.nim index 16d014708..354740242 100644 --- a/waku/discovery/autonat_service.nim +++ b/waku/discovery/autonat_service.nim @@ -13,6 +13,15 @@ proc getAutonatService*(rng: crypto.Rng): AutonatService = ## minConfidence is used as threshold to determine the state. ## If maxQueueSize > numPeersToAsk past samples are considered ## in the calculation. + ## + ## NOTE: After obtaining the service (and wrapping in HPService if using holepunch), + ## the caller *must* call .setup(theSwitch) on it (or on the HPService) before adding + ## to switch.services and calling switch.start(). This populates the addressMapper + ## proc (and handlers) that autonat.start() assumes when enableAddressMapper (default). + ## Direct assignment to switch.services (as done in factory + chat2disco) bypasses + ## the deprecated switch.add() that used to auto-call setup. Missing setup + enable=true + ## leads to adding a nil mapper and nil call during peerInfo.update inside switch.start + ## (surfaces as SEGV at waku_node:589). let autonatService = AutonatService.new( autonatClient = AutonatClient.new(), rng = rng, diff --git a/waku/discovery/waku_kademlia.nim b/waku/discovery/waku_kademlia.nim index 69cf72a5a..a4309ff90 100644 --- a/waku/discovery/waku_kademlia.nim +++ b/waku/discovery/waku_kademlia.nim @@ -88,6 +88,10 @@ proc toRemotePeerInfo(record: ExtendedPeerRecord): Option[RemotePeerInfo] = proc lookup*( self: WakuKademlia, codec: string ): Future[seq[RemotePeerInfo]] {.async: (raises: []).} = + if self.protocol.isNil: + return @[] + # The underlying ServiceDiscovery/KadDHT is started by switch.start() (via ms.start); + # if called very early the protocol may still be starting its loops/handlers. let serviceId = hashServiceId(codec) let catchRes = catch: @@ -144,6 +148,8 @@ proc periodicLookup( while true: await sleepAsync(interval) + if self.protocol.isNil: + continue let services = self.discoveredServices if services.len == 0: continue diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index 84e480bf2..81d874b6a 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -89,6 +89,12 @@ proc setupSwitchServices( waku.node.announcedAddresses.setLen(0) ## remove previous addresses waku.node.announcedAddresses.add(addresses) + + # Also set the PeerInfo announcedAddrs (newer libp2p short-circuits mappers in expandAddrs + # when non-empty; reduces reliance on the waku capturing mapper during start + hp races). + if not isNil(waku.node.switch) and not isNil(waku.node.switch.peerInfo): + waku.node.switch.peerInfo.announcedAddrs = addresses + info "waku node announced addresses updated", announcedAddresses = waku.node.announcedAddresses @@ -106,8 +112,24 @@ proc setupSwitchServices( ) let holePunchService = HPService.new(autonatService, autoRelayService) waku.node.switch.services = @[Service(holePunchService)] + # Call setup explicitly (bypasses the deprecated switch.add which used to do it). + # Required after the libp2p update so that AutonatService (and AutoRelay via HP) + # populate their addressMapper / handlers before switch.start (otherwise enableAddressMapper + # + nil mapper leads to nil call / SEGV at waku_node:589). + try: + holePunchService.setup(waku.node.switch) + except CatchableError as exc: + error "failed to setup hole punch / autonat service (address mappers etc.)", + error = exc.msg + # cannot easily quit here; re-raise so caller sees the init failure + raise else: waku.node.switch.services = @[Service(autonatService)] + try: + autonatService.setup(waku.node.switch) + except CatchableError as exc: + error "failed to setup autonat service (address mappers etc.)", error = exc.msg + raise ## Initialisation diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index c6989d9ed..98ae0e0ac 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -466,6 +466,8 @@ proc updateAnnouncedAddrWithPrimaryIpAddr*(node: WakuNode): Result[void, string] ## Update the Switch addresses node.switch.peerInfo.addrs = newAnnouncedAddresses + # Keep announcedAddrs in sync so expandAddrs prefers it and skips mappers. + node.switch.peerInfo.announcedAddrs = newAnnouncedAddresses for transport in node.switch.transports: for address in transport.addrs: @@ -573,16 +575,22 @@ proc start*(node: WakuNode) {.async.} = if not node.wakuRendezvousClient.isNil(): await node.wakuRendezvousClient.start() - if not node.wakuKademlia.isNil(): - node.wakuKademlia.start() - ## The switch uses this mapper to update peer info addrs - ## with announced addrs after start + ## with announced addrs after start (and during start via autonat/hp etc.). + ## Guard + also set announcedAddrs (newer PeerInfo short-circuits mappers entirely + ## in expandAddrs when non-empty) to reduce capture/GC/race surface with the hp + ## onReservation callback that mutates announced during the services-start phase. let addressMapper = proc( listenAddrs: seq[MultiAddress] ): Future[seq[MultiAddress]] {.gcsafe, async: (raises: [CancelledError]).} = - return node.announcedAddresses + if node.isNil or node.switch.isNil or node.switch.peerInfo.isNil: + return listenAddrs + if node.announcedAddresses.len > 0: + return node.announcedAddresses + return listenAddrs node.switch.peerInfo.addressMappers.add(addressMapper) + if node.announcedAddresses.len > 0: + node.switch.peerInfo.announcedAddrs = node.announcedAddresses ## The switch will update addresses after start using the addressMapper ## NOTE: This will dispatch gossipsub start to the WakuRelay.start method override @@ -591,6 +599,14 @@ proc start*(node: WakuNode) {.async.} = # After switch.start, run custom Logos Delivery relay start logic await node.reconnectRelayPeers() + # Start user-level loops for mounted protocols (e.g. wakuKademlia periodic lookup) + # only *after* switch.start() has activated the underlying LPProtocol (ms.start calls + # the disco/kad .start which registers handlers/loops/started, starts transports etc.). + # The 60s sleep in the periodic still protects first work, but ordering is now correct + # and avoids any interleaving with start-time heartbeats/mappers in the new libp2p kad/service-disco. + if not node.wakuKademlia.isNil(): + node.wakuKademlia.start() + node.started = true if not node.wakuFilterClient.isNil():