cleanup `==` and add one to peerinfo

This commit is contained in:
Dmitriy Ryajov 2020-07-14 14:00:40 -06:00
parent b0a9916d4d
commit 7192b5756a
No known key found for this signature in database
GPG Key ID: DA8C680CE7C657A4
2 changed files with 32 additions and 22 deletions

View File

@ -133,3 +133,16 @@ proc publicKey*(p: PeerInfo): Option[PublicKey] {.inline.} =
result = p.key
else:
result = some(p.privateKey.getKey().tryGet())
func `==`*(a, b: PeerInfo): bool =
# override equiality to support both nil and peerInfo comparisons
# this in the future will allow us to recycle refs
let
aptr = cast[pointer](a)
bptr = cast[pointer](b)
if isNil(aptr) and isNil(bptr):
return true
if aptr == bptr and a.peerId == b.peerId:
return true

View File

@ -32,14 +32,15 @@ type
onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].}
PubSubPeer* = ref object of RootObj
proto*: string # the protocol that this peer joined from
proto*: string # the protocol that this peer joined from
sendConn: Connection
peerInfo*: PeerInfo
handler*: RPCHandler
sentRpcCache: TimedCache[string] # cache for already sent messages
recvdRpcCache: TimedCache[string] # cache for already received messages
sentRpcCache: TimedCache[string] # cache for already sent messages
recvdRpcCache: TimedCache[string] # cache for already received messages
onConnect*: AsyncEvent
observers*: ref seq[PubSubObserver] # ref as in smart_ptr
refs: int # how many active connections this peer has
RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.}
@ -53,27 +54,18 @@ func `==`*(a, b: PubSubPeer): bool =
let
aptr = cast[pointer](a)
bptr = cast[pointer](b)
if aptr == nil:
if bptr == nil:
true
else:
false
elif bptr == nil:
false
else:
if a.peerInfo == nil:
if b.peerInfo == nil:
true
else:
false
else:
if b.peerInfo == nil:
false
else:
a.peerInfo.id == b.peerInfo.id
if isNil(aptr) and isNil(bptr):
return true
if aptr == bptr and a.peerInfo == b.peerInfo:
return true
proc id*(p: PubSubPeer): string = p.peerInfo.id
proc inUse*(p: PubSubPeer): bool =
p.refs > 0
proc connected*(p: PubSubPeer): bool =
not(isNil(p.sendConn))
@ -82,6 +74,7 @@ proc `conn=`*(p: PubSubPeer, conn: Connection) =
trace "attaching send connection for peer", peer = p.id
p.sendConn = conn
p.onConnect.fire()
p.refs.inc()
proc conn*(p: PubSubPeer): Connection =
p.sendConn
@ -104,6 +97,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
trace "handling pubsub rpc", peer = p.id, closed = conn.closed
try:
try:
p.refs.inc()
while not conn.closed:
trace "waiting for data", peer = p.id, closed = conn.closed
let data = await conn.readLp(64 * 1024)
@ -135,6 +129,8 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
except CatchableError as exc:
trace "Exception occurred in PubSubPeer.handle", exc = exc.msg
raise exc
finally:
p.refs.dec()
proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} =
logScope:
@ -181,6 +177,7 @@ proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} =
p.sendConn = nil
p.onConnect.clear()
p.refs.dec()
raise exc
proc sendMsg*(p: PubSubPeer,