From 7192b5756a77e99daceed0a9dcaac5b0e2e8d16a Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Tue, 14 Jul 2020 14:00:40 -0600 Subject: [PATCH] cleanup `==` and add one to peerinfo --- libp2p/peerinfo.nim | 13 ++++++++ libp2p/protocols/pubsub/pubsubpeer.nim | 41 ++++++++++++-------------- 2 files changed, 32 insertions(+), 22 deletions(-) diff --git a/libp2p/peerinfo.nim b/libp2p/peerinfo.nim index b885714d9..0ce53d13d 100644 --- a/libp2p/peerinfo.nim +++ b/libp2p/peerinfo.nim @@ -133,3 +133,16 @@ proc publicKey*(p: PeerInfo): Option[PublicKey] {.inline.} = result = p.key else: result = some(p.privateKey.getKey().tryGet()) + +func `==`*(a, b: PeerInfo): bool = + # override equiality to support both nil and peerInfo comparisons + # this in the future will allow us to recycle refs + let + aptr = cast[pointer](a) + bptr = cast[pointer](b) + + if isNil(aptr) and isNil(bptr): + return true + + if aptr == bptr and a.peerId == b.peerId: + return true diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index f5fcd1719..a15591e87 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -32,18 +32,19 @@ type onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].} PubSubPeer* = ref object of RootObj - proto*: string # the protocol that this peer joined from + proto*: string # the protocol that this peer joined from sendConn: Connection peerInfo*: PeerInfo handler*: RPCHandler - sentRpcCache: TimedCache[string] # cache for already sent messages - recvdRpcCache: TimedCache[string] # cache for already received messages + sentRpcCache: TimedCache[string] # cache for already sent messages + recvdRpcCache: TimedCache[string] # cache for already received messages onConnect*: AsyncEvent observers*: ref seq[PubSubObserver] # ref as in smart_ptr + refs: int # how many active connections this peer has RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.} -func hash*(p: PubSubPeer): Hash = +func hash*(p: PubSubPeer): Hash = # int is either 32/64, so intptr basically, pubsubpeer is a ref cast[pointer](p).hash @@ -53,27 +54,18 @@ func `==`*(a, b: PubSubPeer): bool = let aptr = cast[pointer](a) bptr = cast[pointer](b) - if aptr == nil: - if bptr == nil: - true - else: - false - elif bptr == nil: - false - else: - if a.peerInfo == nil: - if b.peerInfo == nil: - true - else: - false - else: - if b.peerInfo == nil: - false - else: - a.peerInfo.id == b.peerInfo.id + + if isNil(aptr) and isNil(bptr): + return true + + if aptr == bptr and a.peerInfo == b.peerInfo: + return true proc id*(p: PubSubPeer): string = p.peerInfo.id +proc inUse*(p: PubSubPeer): bool = + p.refs > 0 + proc connected*(p: PubSubPeer): bool = not(isNil(p.sendConn)) @@ -82,6 +74,7 @@ proc `conn=`*(p: PubSubPeer, conn: Connection) = trace "attaching send connection for peer", peer = p.id p.sendConn = conn p.onConnect.fire() + p.refs.inc() proc conn*(p: PubSubPeer): Connection = p.sendConn @@ -104,6 +97,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = trace "handling pubsub rpc", peer = p.id, closed = conn.closed try: try: + p.refs.inc() while not conn.closed: trace "waiting for data", peer = p.id, closed = conn.closed let data = await conn.readLp(64 * 1024) @@ -135,6 +129,8 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = except CatchableError as exc: trace "Exception occurred in PubSubPeer.handle", exc = exc.msg raise exc + finally: + p.refs.dec() proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = logScope: @@ -181,6 +177,7 @@ proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = p.sendConn = nil p.onConnect.clear() + p.refs.dec() raise exc proc sendMsg*(p: PubSubPeer,