From f35b8999b32f5cf80091157ef8d1e15e06a815d1 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Wed, 15 Jul 2020 13:18:55 -0600 Subject: [PATCH] some light cleanup for pub/gossip sub (#273) * move peer table out to its own file * move peer table * cleanup `==` and add one to peerinfo * add peertable * missed equality check --- libp2p/peerinfo.nim | 16 ++++++++++ libp2p/protocols/pubsub/floodsub.nim | 5 +-- libp2p/protocols/pubsub/gossipsub.nim | 43 +++++++------------------- libp2p/protocols/pubsub/peertable.nim | 42 +++++++++++++++++++++++++ libp2p/protocols/pubsub/pubsub.nim | 15 ++------- libp2p/protocols/pubsub/pubsubpeer.nim | 42 ++++++++++++------------- tests/pubsub/testfloodsub.nim | 3 +- tests/pubsub/testgossipsub.nim | 1 + 8 files changed, 99 insertions(+), 68 deletions(-) create mode 100644 libp2p/protocols/pubsub/peertable.nim diff --git a/libp2p/peerinfo.nim b/libp2p/peerinfo.nim index b885714d9..8c38d3f54 100644 --- a/libp2p/peerinfo.nim +++ b/libp2p/peerinfo.nim @@ -133,3 +133,19 @@ proc publicKey*(p: PeerInfo): Option[PublicKey] {.inline.} = result = p.key else: result = some(p.privateKey.getKey().tryGet()) + +func `==`*(a, b: PeerInfo): bool = + # override equiality to support both nil and peerInfo comparisons + # this in the future will allow us to recycle refs + let + aptr = cast[pointer](a) + bptr = cast[pointer](b) + + if isNil(aptr) and isNil(bptr): + return true + + if isNil(aptr) or isNil(bptr): + return false + + if aptr == bptr and a.peerId == b.peerId: + return true diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 3d8f70bc4..77c8c4390 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -12,6 +12,7 @@ import chronos, chronicles, metrics import pubsub, pubsubpeer, timedcache, + peertable, rpc/[messages, message], ../../stream/connection, ../../peerid, @@ -24,8 +25,8 @@ const FloodSubCodec* = "/floodsub/1.0.0" type FloodSub* = ref object of PubSub - floodsub*: PeerTable # topic to remote peer map - seen*: TimedCache[string] # list of messages forwarded to peers + floodsub*: PeerTable # topic to remote peer map + seen*: TimedCache[string] # list of messages forwarded to peers method subscribeTopic*(f: FloodSub, topic: string, diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 5fc2c4b50..0da4ca41f 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -12,6 +12,7 @@ import chronos, chronicles, metrics import pubsub, floodsub, pubsubpeer, + peertable, mcache, timedcache, rpc/[messages, message], @@ -68,28 +69,6 @@ declareGauge(libp2p_gossipsub_peers_per_topic_gossipsub, "gossipsub peers per topic in gossipsub", labels = ["topic"]) -func addPeer(table: var PeerTable, topic: string, peer: PubSubPeer): bool = - # returns true if the peer was added, false if it was already in the collection - not table.mgetOrPut(topic, initHashSet[PubSubPeer]()).containsOrIncl(peer) - -func removePeer(table: var PeerTable, topic: string, peer: PubSubPeer) = - table.withValue(topic, peers): - peers[].excl(peer) - if peers[].len == 0: - table.del(topic) - -func hasPeer(table: PeerTable, topic: string, peer: PubSubPeer): bool = - (topic in table) and (peer in table[topic]) - -func peers(table: PeerTable, topic: string): int = - if topic in table: - table[topic].len - else: - 0 - -func getPeers(table: Table[string, HashSet[string]], topic: string): HashSet[string] = - table.getOrDefault(topic, initHashSet[string]()) - method init*(g: GossipSub) = proc handler(conn: Connection, proto: string) {.async.} = ## main protocol handler that gets triggered on every @@ -147,7 +126,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = # send a graft message to the peer grafts.add peer discard g.mesh.addPeer(topic, peer) - trace "got peer", peer = peer.id + trace "got peer", peer = $peer if g.mesh.peers(topic) > GossipSubDhi: # prune peers if we've gone over @@ -331,7 +310,7 @@ proc handleGraft(g: GossipSub, grafts: seq[ControlGraft]): seq[ControlPrune] = for graft in grafts: let topic = graft.topicID - trace "processing graft message", topic, peer = peer.id + trace "processing graft message", topic, peer = $peer # If they send us a graft before they send us a subscribe, what should # we do? For now, we add them to mesh but don't add them to gossipsub. @@ -344,7 +323,7 @@ proc handleGraft(g: GossipSub, if g.mesh.addPeer(topic, peer): g.fanout.removePeer(topic, peer) else: - trace "Peer already in mesh", topic, peer = peer.id + trace "Peer already in mesh", topic, peer = $peer else: result.add(ControlPrune(topicID: topic)) else: @@ -355,7 +334,7 @@ proc handleGraft(g: GossipSub, proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) = for prune in prunes: - trace "processing prune message", peer = peer.id, + trace "processing prune message", peer = $peer, topicID = prune.topicID g.mesh.removePeer(prune.topicID, peer) @@ -366,7 +345,7 @@ proc handleIHave(g: GossipSub, peer: PubSubPeer, ihaves: seq[ControlIHave]): ControlIWant = for ihave in ihaves: - trace "processing ihave message", peer = peer.id, + trace "processing ihave message", peer = $peer, topicID = ihave.topicID, msgs = ihave.messageIDs @@ -380,7 +359,7 @@ proc handleIWant(g: GossipSub, iwants: seq[ControlIWant]): seq[Message] = for iwant in iwants: for mid in iwant.messageIDs: - trace "processing iwant message", peer = peer.id, + trace "processing iwant message", peer = $peer, messageID = mid let msg = g.mcache.get(mid) if msg.isSome: @@ -452,12 +431,13 @@ method rpcHandler*(g: GossipSub, respControl.iwant.add(g.handleIHave(peer, control.ihave)) respControl.prune.add(g.handleGraft(peer, control.graft)) + let messages = g.handleIWant(peer, control.iwant) if respControl.graft.len > 0 or respControl.prune.len > 0 or respControl.ihave.len > 0 or respControl.iwant.len > 0: await peer.send( @[RPCMsg(control: some(respControl), - messages: g.handleIWant(peer, control.iwant))]) + messages: messages)]) method subscribe*(g: GossipSub, topic: string, @@ -511,8 +491,9 @@ method publish*(g: GossipSub, msg = Message.init(g.peerInfo, data, topic, g.msgSeqno, g.sign) msgId = g.msgIdProvider(msg) - trace "publishing on topic", - topic, peers = peers.len, msg = msg.shortLog() + trace "created new message", msg + + trace "publishing on topic", topic, peers = peers.len if msgId notin g.mcache: g.mcache.put(msgId, msg) diff --git a/libp2p/protocols/pubsub/peertable.nim b/libp2p/protocols/pubsub/peertable.nim new file mode 100644 index 000000000..810a1fad3 --- /dev/null +++ b/libp2p/protocols/pubsub/peertable.nim @@ -0,0 +1,42 @@ +## Nim-LibP2P +## Copyright (c) 2019 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import tables, sequtils, sets +import pubsubpeer + +type + PeerTable* = Table[string, HashSet[PubSubPeer]] + +proc hasPeerID*(t: PeerTable, topic, peerId: string): bool = + # unefficient but used only in tests! + let peers = toSeq(t.getOrDefault(topic)) + peers.any do (peer: PubSubPeer) -> bool: + peer.id == peerId + +func addPeer*(table: var PeerTable, topic: string, peer: PubSubPeer): bool = + # returns true if the peer was added, false if it was already in the collection + not table.mgetOrPut(topic, initHashSet[PubSubPeer]()).containsOrIncl(peer) + +func removePeer*(table: var PeerTable, topic: string, peer: PubSubPeer) = + table.withValue(topic, peers): + peers[].excl(peer) + if peers[].len == 0: + table.del(topic) + +func hasPeer*(table: PeerTable, topic: string, peer: PubSubPeer): bool = + (topic in table) and (peer in table[topic]) + +func peers*(table: PeerTable, topic: string): int = + if topic in table: + table[topic].len + else: + 0 + +func getPeers*(table: Table[string, HashSet[string]], topic: string): HashSet[string] = + table.getOrDefault(topic, initHashSet[string]()) diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 66027e2d2..4ec749381 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -30,8 +30,6 @@ declareCounter(libp2p_pubsub_validation_failure, "pubsub failed validated messag declarePublicCounter(libp2p_pubsub_messages_published, "published messages", labels = ["topic"]) type - PeerTable* = Table[string, HashSet[PubSubPeer]] - SendRes = tuple[published: seq[string], failed: seq[string]] # keep private TopicHandler* = proc(topic: string, @@ -62,20 +60,11 @@ type msgIdProvider*: MsgIdProvider # Turn message into message id (not nil) msgSeqno*: uint64 -proc hasPeerID*(t: PeerTable, topic, peerId: string): bool = - # unefficient but used only in tests! - let peers = t.getOrDefault(topic) - if peers.len == 0: - false - else: - let ps = toSeq(peers) - ps.any do (peer: PubSubPeer) -> bool: - peer.id == peerId - method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.base.} = ## handle peer disconnects ## - if not isNil(peer.peerInfo) and peer.id in p.peers: + if not isNil(peer.peerInfo) and + peer.id in p.peers and not peer.inUse(): trace "deleting peer", peer = peer.id p.peers.del(peer.id) trace "peer disconnected", peer = peer.id diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 612a5ebc8..c59b41930 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -32,14 +32,15 @@ type onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].} PubSubPeer* = ref object of RootObj - proto*: string # the protocol that this peer joined from + proto*: string # the protocol that this peer joined from sendConn: Connection peerInfo*: PeerInfo handler*: RPCHandler - sentRpcCache: TimedCache[string] # cache for already sent messages - recvdRpcCache: TimedCache[string] # cache for already received messages + sentRpcCache: TimedCache[string] # cache for already sent messages + recvdRpcCache: TimedCache[string] # cache for already received messages onConnect*: AsyncEvent observers*: ref seq[PubSubObserver] # ref as in smart_ptr + refs: int # how many active connections this peer has RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.} @@ -53,27 +54,21 @@ func `==`*(a, b: PubSubPeer): bool = let aptr = cast[pointer](a) bptr = cast[pointer](b) - if aptr == nil: - if bptr == nil: - true - else: - false - elif bptr == nil: - false - else: - if a.peerInfo == nil: - if b.peerInfo == nil: - true - else: - false - else: - if b.peerInfo == nil: - false - else: - a.peerInfo.id == b.peerInfo.id + + if isNil(aptr) and isNil(bptr): + return true + + if isNil(aptr) or isNil(bptr): + return false + + if aptr == bptr and a.peerInfo == b.peerInfo: + return true proc id*(p: PubSubPeer): string = p.peerInfo.id +proc inUse*(p: PubSubPeer): bool = + p.refs > 0 + proc connected*(p: PubSubPeer): bool = not(isNil(p.sendConn)) @@ -82,6 +77,7 @@ proc `conn=`*(p: PubSubPeer, conn: Connection) = trace "attaching send connection for peer", peer = p.id p.sendConn = conn p.onConnect.fire() + p.refs.inc() proc conn*(p: PubSubPeer): Connection = p.sendConn @@ -104,6 +100,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = trace "handling pubsub rpc", peer = p.id, closed = conn.closed try: try: + p.refs.inc() while not conn.closed: trace "waiting for data", peer = p.id, closed = conn.closed let data = await conn.readLp(64 * 1024) @@ -141,6 +138,8 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = except CatchableError as exc: trace "Exception occurred in PubSubPeer.handle", exc = exc.msg raise exc + finally: + p.refs.dec() proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = logScope: @@ -187,6 +186,7 @@ proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = p.sendConn = nil p.onConnect.clear() + p.refs.dec() raise exc proc sendMsg*(p: PubSubPeer, diff --git a/tests/pubsub/testfloodsub.nim b/tests/pubsub/testfloodsub.nim index 2d21b3118..78d319d35 100644 --- a/tests/pubsub/testfloodsub.nim +++ b/tests/pubsub/testfloodsub.nim @@ -18,7 +18,8 @@ import utils, crypto/crypto, protocols/pubsub/pubsub, protocols/pubsub/floodsub, - protocols/pubsub/rpc/messages] + protocols/pubsub/rpc/messages, + protocols/pubsub/peertable] import ../helpers diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 3bcfe6cf6..5e7a80c1e 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -19,6 +19,7 @@ import utils, ../../libp2p/[errors, crypto/crypto, protocols/pubsub/pubsub, protocols/pubsub/gossipsub, + protocols/pubsub/peertable, protocols/pubsub/rpc/messages] import ../helpers