diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 9f56ee03a..c1864089f 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -38,7 +38,7 @@ const GossipSubHistoryGossip* = 3 # heartbeat interval const GossipSubHeartbeatInitialDelay* = 100.millis -const GossipSubHeartbeatInterval* = 1.seconds +const GossipSubHeartbeatInterval* = 5.seconds # TODO: per the spec it should be 1 second # fanout ttl const GossipSubFanoutTTL* = 1.minutes @@ -144,7 +144,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = shuffle(newPeers) - trace "getting peers", topic, peers = peerIds.len + trace "getting peers", topic, peers = newPeers.len for id in newPeers: if g.mesh.peers(topic) >= GossipSubD: @@ -208,9 +208,6 @@ proc dropFanoutPeers(g: GossipSub) = g.fanout.del(topic) trace "dropping fanout topic", topic - for topic in dropping: - g.lastFanoutPubSub.del(topic) - libp2p_gossipsub_peers_per_topic_fanout .set(g.fanout.peers(topic).int64, labelValues = [topic]) @@ -245,10 +242,6 @@ proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} = trace "got gossip peers", peers = result.len break - if allPeers.len == 0: - trace "no peers for topic, skipping", topicID = topic - break - if id in gossipPeers: continue @@ -284,7 +277,7 @@ proc heartbeat(g: GossipSub) {.async.} = except CatchableError as exc: trace "exception ocurred in gossipsub heartbeat", exc = exc.msg - await sleepAsync(5.seconds) + await sleepAsync(GossipSubHeartbeatInterval) method handleDisconnect*(g: GossipSub, peer: PubSubPeer) = ## handle peer disconnects @@ -308,7 +301,7 @@ method handleDisconnect*(g: GossipSub, peer: PubSubPeer) = .set(g.fanout.peers(t).int64, labelValues = [t]) method subscribePeer*(p: GossipSub, - conn: Connection) = + conn: Connection) = procCall PubSub(p).subscribePeer(conn) asyncCheck p.handleConn(conn, GossipSubCodec) @@ -316,7 +309,7 @@ method subscribeTopic*(g: GossipSub, topic: string, subscribe: bool, peerId: string) {.gcsafe, async.} = - await procCall PubSub(g).subscribeTopic(topic, subscribe, peerId) + await procCall FloodSub(g).subscribeTopic(topic, subscribe, peerId) if subscribe: trace "adding subscription for topic", peer = peerId, name = topic @@ -521,6 +514,13 @@ method publish*(g: GossipSub, g.replenishFanout(topic) peers = g.fanout.getOrDefault(topic) + # even if we couldn't publish, + # we still attempted to publish + # on the topic, so it makes sense + # to update the last topic publish + # time + g.lastFanoutPubSub[topic] = Moment.fromNow(GossipSubFanoutTTL) + let msg = Message.init(g.peerInfo, data, topic, g.sign) msgId = g.msgIdProvider(msg) diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index dbfe012ba..1a97ffab1 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -57,7 +57,7 @@ type cleanupLock: AsyncLock validators*: Table[string, HashSet[ValidatorHandler]] observers: ref seq[PubSubObserver] # ref as in smart_ptr - msgIdProvider*: MsgIdProvider # Turn message into message id (not nil) + msgIdProvider*: MsgIdProvider # Turn message into message id (not nil) method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.base.} = ## handle peer disconnects @@ -65,10 +65,10 @@ method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.base.} = if not isNil(peer.peerInfo) and peer.id in p.peers: trace "deleting peer", peer = peer.id p.peers.del(peer.id) + trace "peer disconnected", peer = peer.id # metrics libp2p_pubsub_peers.set(p.peers.len.int64) - trace "peer disconnected", peer = peer.id proc sendSubs*(p: PubSub, peer: PubSubPeer, @@ -120,9 +120,9 @@ method rpcHandler*(p: PubSub, trace "about to subscribe to topic", topicId = s.topic await p.subscribeTopic(s.topic, s.subscribe, peer.id) -proc getPeer(p: PubSub, - peerInfo: PeerInfo, - proto: string): PubSubPeer = +proc getOrCreatePeer(p: PubSub, + peerInfo: PeerInfo, + proto: string): PubSubPeer = if peerInfo.id in p.peers: return p.peers[peerInfo.id] @@ -132,7 +132,6 @@ proc getPeer(p: PubSub, p.peers[peer.id] = peer peer.observers = p.observers - libp2p_pubsub_peers.set(p.peers.len.int64) return peer method handleConn*(p: PubSub, @@ -158,7 +157,7 @@ method handleConn*(p: PubSub, # call pubsub rpc handler await p.rpcHandler(peer, msgs) - let peer = p.getPeer(conn.peerInfo, proto) + let peer = p.getOrCreatePeer(conn.peerInfo, proto) let topics = toSeq(p.topics.keys) if topics.len > 0: await p.sendSubs(peer, topics, true) @@ -177,23 +176,27 @@ method handleConn*(p: PubSub, method subscribePeer*(p: PubSub, conn: Connection) {.base.} = if not(isNil(conn)): - let peer = p.getPeer(conn.peerInfo, p.codec) + let peer = p.getOrCreatePeer(conn.peerInfo, p.codec) trace "subscribing to peer", peerId = conn.peerInfo.id if not peer.connected: peer.conn = conn method unsubscribePeer*(p: PubSub, peerInfo: PeerInfo) {.base, async.} = - let peer = p.getPeer(peerInfo, p.codec) - trace "unsubscribing from peer", peerId = $peerInfo - if not(isNil(peer.conn)): - await peer.conn.close() + if peerInfo.id in p.peers: + let peer = p.peers[peerInfo.id] - p.handleDisconnect(peer) + trace "unsubscribing from peer", peerId = $peerInfo + if not(isNil(peer.conn)): + await peer.conn.close() -proc connected*(p: PubSub, peer: PeerInfo): bool = - let peer = p.getPeer(peer, p.codec) - if not(isNil(peer)): - return peer.connected + p.handleDisconnect(peer) + +proc connected*(p: PubSub, peerInfo: PeerInfo): bool = + if peerInfo.id in p.peers: + let peer = p.peers[peerInfo.id] + + if not(isNil(peer)): + return peer.connected method unsubscribe*(p: PubSub, topics: seq[TopicPair]) {.base, async.} = @@ -205,6 +208,11 @@ method unsubscribe*(p: PubSub, if h == t.handler: p.topics[t.topic].handler.del(i) + # make sure we delete the topic if + # no more handlers are left + if p.topics[t.topic].handler.len <= 0: + p.topics.del(t.topic) + method unsubscribe*(p: PubSub, topic: string, handler: TopicHandler): Future[void] {.base.} = diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 212404dab..db1312e83 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -36,7 +36,6 @@ type sendConn: Connection peerInfo*: PeerInfo handler*: RPCHandler - topics*: seq[string] sentRpcCache: TimedCache[string] # cache for already sent messages recvdRpcCache: TimedCache[string] # cache for already received messages onConnect*: AsyncEvent diff --git a/libp2p/protocols/secure/secure.nim b/libp2p/protocols/secure/secure.nim index 7f05fe49e..aeff8fa50 100644 --- a/libp2p/protocols/secure/secure.nim +++ b/libp2p/protocols/secure/secure.nim @@ -42,11 +42,11 @@ method initStream*(s: SecureConn) = procCall Connection(s).initStream() method close*(s: SecureConn) {.async.} = - await procCall Connection(s).close() - if not(isNil(s.stream)): await s.stream.close() + await procCall Connection(s).close() + method readMessage*(c: SecureConn): Future[seq[byte]] {.async, base.} = doAssert(false, "Not implemented!") diff --git a/libp2p/stream/chronosstream.nim b/libp2p/stream/chronosstream.nim index 312ae237e..4c27ff2e5 100644 --- a/libp2p/stream/chronosstream.nim +++ b/libp2p/stream/chronosstream.nim @@ -75,15 +75,10 @@ method close*(s: ChronosStream) {.async.} = if not s.isClosed: trace "shutting down chronos stream", address = $s.client.remoteAddress(), oid = s.oid - - # TODO: the sequence here matters - # don't move it after the connections - # close bellow if not s.client.closed(): await s.client.closeWait() await procCall Connection(s).close() - except CancelledError as exc: raise exc except CatchableError as exc: diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 418b0ca79..015eb32a7 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -302,12 +302,11 @@ proc cleanupConn(s: Switch, conn: Connection) {.async, gcsafe.} = conn.peerInfo.close() finally: await conn.close() + libp2p_peers.set(s.connections.len.int64) if lock.locked(): lock.release() - libp2p_peers.set(s.connections.len.int64) - proc disconnect*(s: Switch, peer: PeerInfo) {.async, gcsafe.} = let connections = s.connections.getOrDefault(peer.id) for connHolder in connections: diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index c555d703c..2fb13d765 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -32,6 +32,7 @@ suite "GossipSub internal": gossipSub.mesh[topic] = initHashSet[string]() var conns = newSeq[Connection]() + gossipSub.gossipsub[topic] = initHashSet[string]() for i in 0..<15: let conn = newBufferStream(noop) conns &= conn @@ -60,6 +61,7 @@ suite "GossipSub internal": gossipSub.mesh[topic] = initHashSet[string]() gossipSub.topics[topic] = Topic() # has to be in topics to rebalance + gossipSub.gossipsub[topic] = initHashSet[string]() var conns = newSeq[Connection]() for i in 0..<15: let conn = newBufferStream(noop) @@ -99,7 +101,6 @@ suite "GossipSub internal": conn.peerInfo = peerInfo gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id].handler = handler - gossipSub.peers[peerInfo.id].topics &= topic gossipSub.gossipsub[topic].incl(peerInfo.id) check gossipSub.gossipsub[topic].len == 15 diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 572d4f617..757429500 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -86,7 +86,6 @@ suite "GossipSub": nodes[1].addValidator("foobar", validator) tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1 - result = (await validatorFut) and (await handlerFut) await allFuturesThrowing( nodes[0].stop(), @@ -142,7 +141,6 @@ suite "GossipSub": awaiters.add((await nodes[1].start())) await subscribeNodes(nodes) - await nodes[1].subscribe("foo", handler) await nodes[1].subscribe("bar", handler)