diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 77c8c4390..2be053c49 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -97,13 +97,9 @@ method rpcHandler*(f: FloodSub, trace "exception in message handler", exc = exc.msg # forward the message to all peers interested in it - let (published, failed) = await f.sendHelper(toSendPeers, m.messages) - for p in failed: - let peer = f.peers.getOrDefault(p) - if not(isNil(peer)): - f.handleDisconnect(peer) # cleanup failed peers + let published = await f.publishHelper(toSendPeers, m.messages) - trace "forwared message to peers", peers = published.len + trace "forwared message to peers", peers = published method init*(f: FloodSub) = proc handler(conn: Connection, proto: string) {.async.} = @@ -139,18 +135,15 @@ method publish*(f: FloodSub, trace "publishing on topic", name = topic inc f.msgSeqno let msg = Message.init(f.peerInfo, data, topic, f.msgSeqno, f.sign) + # start the future but do not wait yet - let (published, failed) = await f.sendHelper(f.floodsub.getOrDefault(topic), @[msg]) - for p in failed: - let peer = f.peers.getOrDefault(p) - if not isNil(peer): - f.handleDisconnect(peer) # cleanup failed peers + let published = await f.publishHelper(f.floodsub.getOrDefault(topic), @[msg]) libp2p_pubsub_messages_published.inc(labelValues = [topic]) - trace "published message to peers", peers = published.len, + trace "published message to peers", peers = published, msg = msg.shortLog() - return published.len + return published method unsubscribe*(f: FloodSub, topics: seq[TopicPair]) {.async.} = diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 6e41e0eb1..0c848771e 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -422,13 +422,9 @@ method rpcHandler*(g: GossipSub, trace "exception in message handler", exc = exc.msg # forward the message to all peers interested in it - let (published, failed) = await g.sendHelper(toSendPeers, m.messages) - for p in failed: - let peer = g.peers.getOrDefault(p) - if not isNil(peer): - g.handleDisconnect(peer) # cleanup failed peers + let published = await g.publishHelper(toSendPeers, m.messages) - trace "forwared message to peers", peers = published.len + trace "forwared message to peers", peers = published var respControl: ControlMessage if m.control.isSome: @@ -501,18 +497,13 @@ method publish*(g: GossipSub, if msgId notin g.mcache: g.mcache.put(msgId, msg) - let (published, failed) = await g.sendHelper(peers, @[msg]) - for p in failed: - let peer = g.peers.getOrDefault(p) - if not isNil(peer): - g.handleDisconnect(peer) # cleanup failed peers - - if published.len > 0: + let published = await g.publishHelper(peers, @[msg]) + if published > 0: libp2p_pubsub_messages_published.inc(labelValues = [topic]) - trace "published message to peers", peers = published.len, + trace "published message to peers", peers = published, msg = msg.shortLog() - return published.len + return published method start*(g: GossipSub) {.async.} = trace "gossipsub start" diff --git a/libp2p/protocols/pubsub/peertable.nim b/libp2p/protocols/pubsub/peertable.nim index 810a1fad3..eda623168 100644 --- a/libp2p/protocols/pubsub/peertable.nim +++ b/libp2p/protocols/pubsub/peertable.nim @@ -8,20 +8,22 @@ ## those terms. import tables, sequtils, sets -import pubsubpeer +import pubsubpeer, ../../peerid type - PeerTable* = Table[string, HashSet[PubSubPeer]] + PeerTable* = Table[string, HashSet[PubSubPeer]] # topic string to peer map proc hasPeerID*(t: PeerTable, topic, peerId: string): bool = - # unefficient but used only in tests! let peers = toSeq(t.getOrDefault(topic)) peers.any do (peer: PubSubPeer) -> bool: peer.id == peerId func addPeer*(table: var PeerTable, topic: string, peer: PubSubPeer): bool = - # returns true if the peer was added, false if it was already in the collection - not table.mgetOrPut(topic, initHashSet[PubSubPeer]()).containsOrIncl(peer) + # returns true if the peer was added, + # false if it was already in the collection + not table.mgetOrPut(topic, + initHashSet[PubSubPeer]()) + .containsOrIncl(peer) func removePeer*(table: var PeerTable, topic: string, peer: PubSubPeer) = table.withValue(topic, peers): diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index d94eec1d9..3cf027126 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -48,12 +48,13 @@ type handler*: seq[TopicHandler] PubSub* = ref object of LPProtocol - peerInfo*: PeerInfo # this peer's info - topics*: Table[string, Topic] # local topics - peers*: Table[string, PubSubPeer] # peerid to peer map - triggerSelf*: bool # trigger own local handler on publish - verifySignature*: bool # enable signature verification - sign*: bool # enable message signing + peerInfo*: PeerInfo # this peer's info + topics*: Table[string, Topic] # local topics + peers*: Table[string, PubSubPeer] # peerid to peer map + conns*: Table[PeerInfo, HashSet[Connection]] # peers connections + triggerSelf*: bool # trigger own local handler on publish + verifySignature*: bool # enable signature verification + sign*: bool # enable message signing cleanupLock: AsyncLock validators*: Table[string, HashSet[ValidatorHandler]] observers: ref seq[PubSubObserver] # ref as in smart_ptr @@ -63,8 +64,7 @@ type method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.base.} = ## handle peer disconnects ## - if not isNil(peer.peerInfo) and - peer.id in p.peers and not peer.inUse(): + if not(isNil(peer)) and peer.peerInfo notin p.conns: trace "deleting peer", peer = peer.id p.peers.del(peer.id) trace "peer disconnected", peer = peer.id @@ -72,6 +72,24 @@ method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.base.} = # metrics libp2p_pubsub_peers.set(p.peers.len.int64) +proc onConnClose(p: PubSub, conn: Connection) {.async.} = + try: + let peer = conn.peerInfo + await conn.closeEvent.wait() + + if peer in p.conns: + p.conns[peer].excl(conn) + if p.conns[peer].len <= 0: + p.conns.del(peer) + + if peer.id in p.peers: + p.handleDisconnect(p.peers[peer.id]) + + except CancelledError as exc: + raise exc + except CatchableError as exc: + trace "exception in onConnClose handler", exc = exc.msg + proc sendSubs*(p: PubSub, peer: PubSubPeer, topics: seq[string], @@ -87,11 +105,14 @@ proc sendSubs*(p: PubSub, await peer.sendSubOpts(topics, subscribe) except CancelledError as exc: - p.handleDisconnect(peer) + if not(isNil(peer)) and not(isNil(peer.conn)): + await peer.conn.close() + raise exc except CatchableError as exc: trace "unable to send subscriptions", exc = exc.msg - p.handleDisconnect(peer) + if not(isNil(peer)) and not(isNil(peer.conn)): + await peer.conn.close() method subscribeTopic*(p: PubSub, topic: string, @@ -150,11 +171,19 @@ method handleConn*(p: PubSub, await conn.close() return + # track connection + p.conns.mgetOrPut(conn.peerInfo, + initHashSet[Connection]()) + .incl(conn) + + asyncCheck p.onConnClose(conn) + proc handler(peer: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = # call pubsub rpc handler await p.rpcHandler(peer, msgs) let peer = p.getOrCreatePeer(conn.peerInfo, proto) + let topics = toSeq(p.topics.keys) if topics.len > 0: await p.sendSubs(peer, topics, true) @@ -168,13 +197,20 @@ method handleConn*(p: PubSub, except CatchableError as exc: trace "exception ocurred in pubsub handle", exc = exc.msg finally: - p.handleDisconnect(peer) await conn.close() method subscribePeer*(p: PubSub, conn: Connection) {.base.} = if not(isNil(conn)): - let peer = p.getOrCreatePeer(conn.peerInfo, p.codec) trace "subscribing to peer", peerId = conn.peerInfo.id + + # track connection + p.conns.mgetOrPut(conn.peerInfo, + initHashSet[Connection]()) + .incl(conn) + + asyncCheck p.onConnClose(conn) + + let peer = p.getOrCreatePeer(conn.peerInfo, p.codec) if not peer.connected: peer.conn = conn @@ -183,11 +219,9 @@ method unsubscribePeer*(p: PubSub, peerInfo: PeerInfo) {.base, async.} = let peer = p.peers[peerInfo.id] trace "unsubscribing from peer", peerId = $peerInfo - if not(isNil(peer.conn)): + if not(isNil(peer)) and not(isNil(peer.conn)): await peer.conn.close() - p.handleDisconnect(peer) - proc connected*(p: PubSub, peerInfo: PeerInfo): bool = if peerInfo.id in p.peers: let peer = p.peers[peerInfo.id] @@ -269,6 +303,18 @@ proc sendHelper*(p: PubSub, return (published, failed) +proc publishHelper*(p: PubSub, + sendPeers: HashSet[PubSubPeer], + msgs: seq[Message]): Future[int] {.async.} = + # send messages and cleanup failed peers + let (published, failed) = await p.sendHelper(sendPeers, msgs) + for f in failed: + let peer = p.peers.getOrDefault(f) + if not(isNil(peer)) and not(isNil(peer.conn)): + await peer.conn.close() + + return published.len + method publish*(p: PubSub, topic: string, data: seq[byte]): Future[int] {.base, async.} = diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 8515a7165..f223ee06c 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -40,7 +40,6 @@ type recvdRpcCache: TimedCache[string] # cache for already received messages onConnect*: AsyncEvent observers*: ref seq[PubSubObserver] # ref as in smart_ptr - refs: int # how many active connections this peer has RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.} @@ -50,9 +49,6 @@ func hash*(p: PubSubPeer): Hash = proc id*(p: PubSubPeer): string = p.peerInfo.id -proc inUse*(p: PubSubPeer): bool = - p.refs > 0 - proc connected*(p: PubSubPeer): bool = not(isNil(p.sendConn)) @@ -61,7 +57,6 @@ proc `conn=`*(p: PubSubPeer, conn: Connection) = trace "attaching send connection for peer", peer = p.id p.sendConn = conn p.onConnect.fire() - p.refs.inc() proc conn*(p: PubSubPeer): Connection = p.sendConn @@ -86,7 +81,6 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = debug "starting pubsub read loop for peer", closed = conn.closed try: try: - p.refs.inc() while not conn.closed: trace "waiting for data", closed = conn.closed let data = await conn.readLp(64 * 1024) @@ -124,8 +118,6 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = except CatchableError as exc: trace "Exception occurred in PubSubPeer.handle", exc = exc.msg raise exc - finally: - p.refs.dec() proc send*(p: PubSubPeer, msg: RPCMsg) {.async.} = logScope: @@ -172,7 +164,6 @@ proc send*(p: PubSubPeer, msg: RPCMsg) {.async.} = p.sendConn = nil p.onConnect.clear() - p.refs.dec() raise exc proc sendSubOpts*(p: PubSubPeer, topics: seq[string], subscribe: bool): Future[void] = diff --git a/libp2p/standard_setup.nim b/libp2p/standard_setup.nim index 702673f7a..9ab8c262c 100644 --- a/libp2p/standard_setup.nim +++ b/libp2p/standard_setup.nim @@ -39,7 +39,7 @@ proc newStandardSwitch*(privKey = none(PrivateKey), msgIdProvider: MsgIdProvider = defaultMsgIdProvider, rng = newRng(), inTimeout: Duration = 1.minutes, - outTimeout: Duration = 5.minutes): Switch = + outTimeout: Duration = 1.minutes): Switch = proc createMplex(conn: Connection): Muxer = Mplex.init( conn, diff --git a/tests/testmplex.nim b/tests/testmplex.nim index 82664ccfe..3f84794fe 100644 --- a/tests/testmplex.nim +++ b/tests/testmplex.nim @@ -211,7 +211,7 @@ suite "Mplex": check: waitFor(testResetWrite()) == true - test "timeout, channel should reset": + test "reset - channel should reset on timeout": proc testResetWrite(): Future[bool] {.async.} = proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard let