Track incoming connections (#181)

* call write until all is written out

* wip: rework with proper half-closed

* add eof and closed handling

* wip

* close connection on chronos close

* don't use read

* make noise work again

* don't reraise just yet

* fixes after backporting

* remove on transport close cleanup

* revert back allread

* rust interop fixes

* read from stream

* inc count before closing

* rebasing master

* store incomming connections

* fix merge

* remove unneeded changes

* use internal close flag to indicate disposal
This commit is contained in:
Dmitriy Ryajov 2020-05-21 11:33:48 -06:00 committed by GitHub
parent 7900fd9f61
commit ba53c08b3c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 34 additions and 16 deletions

View File

@ -106,6 +106,11 @@ method init(f: FloodSub) =
f.handler = handler f.handler = handler
f.codec = FloodSubCodec f.codec = FloodSubCodec
method subscribeToPeer*(p: FloodSub,
conn: Connection) {.async.} =
await procCall PubSub(p).subscribeToPeer(conn)
asyncCheck p.handleConn(conn, FloodSubCodec)
method publish*(f: FloodSub, method publish*(f: FloodSub,
topic: string, topic: string,
data: seq[byte]) {.async.} = data: seq[byte]) {.async.} =

View File

@ -101,6 +101,11 @@ method handleDisconnect(g: GossipSub, peer: PubSubPeer) {.async.} =
for t in g.fanout.keys: for t in g.fanout.keys:
g.fanout[t].excl(peer.id) g.fanout[t].excl(peer.id)
method subscribeToPeer*(p: GossipSub,
conn: Connection) {.async.} =
await procCall PubSub(p).subscribeToPeer(conn)
asyncCheck p.handleConn(conn, GossipSubCodec)
method subscribeTopic*(g: GossipSub, method subscribeTopic*(g: GossipSub,
topic: string, topic: string,
subscribe: bool, subscribe: bool,

View File

@ -110,6 +110,16 @@ proc getPeer(p: PubSub,
peer.observers = p.observers peer.observers = p.observers
result = peer result = peer
proc internalClenaup(p: PubSub, conn: Connection) {.async.} =
# handle connection close
if conn.closed:
return
var peer = p.getPeer(conn.peerInfo, p.codec)
await conn.closeEvent.wait()
trace "connection closed, cleaning up peer", peer = conn.peerInfo.id
await p.cleanUpHelper(peer)
method handleConn*(p: PubSub, method handleConn*(p: PubSub,
conn: Connection, conn: Connection,
proto: string) {.base, async.} = proto: string) {.base, async.} =
@ -141,15 +151,7 @@ method handleConn*(p: PubSub,
peer.handler = handler peer.handler = handler
await peer.handle(conn) # spawn peer read loop await peer.handle(conn) # spawn peer read loop
trace "pubsub peer handler ended, cleaning up" trace "pubsub peer handler ended, cleaning up"
await p.cleanUpHelper(peer) await p.internalClenaup(conn)
proc internalClenaup(p: PubSub, conn: Connection) {.async.} =
# handle connection close
var peer = p.getPeer(conn.peerInfo, p.codec)
await conn.closeEvent.wait()
trace "connection closed, cleaning up peer", peer = conn.peerInfo.id
await p.cleanUpHelper(peer)
method subscribeToPeer*(p: PubSub, method subscribeToPeer*(p: PubSub,
conn: Connection) {.base, async.} = conn: Connection) {.base, async.} =

View File

@ -71,7 +71,8 @@ method atEof*(s: ChronosStream): bool {.inline.} =
s.client.atEof() s.client.atEof()
method close*(s: ChronosStream) {.async.} = method close*(s: ChronosStream) {.async.} =
if not s.closed: if not s.isClosed:
s.isClosed = true
trace "shutting chronos stream", address = $s.client.remoteAddress() trace "shutting chronos stream", address = $s.client.remoteAddress()
if not s.client.closed(): if not s.client.closed():
try: try:

View File

@ -439,6 +439,15 @@ proc newSwitch*(peerInfo: PeerInfo,
muxer.connection.peerInfo = await s.identify(stream) muxer.connection.peerInfo = await s.identify(stream)
await stream.close() await stream.close()
# store muxer for connection
s.muxed[muxer.connection.peerInfo.id] = muxer
# store muxed connection
s.connections[muxer.connection.peerInfo.id] = muxer.connection
# try establishing a pubsub connection
await s.subscribeToPeer(muxer.connection.peerInfo)
for k in secureManagers.keys: for k in secureManagers.keys:
trace "adding secure manager ", codec = secureManagers[k].codec trace "adding secure manager ", codec = secureManagers[k].codec
result.secureManagers[k] = secureManagers[k] result.secureManagers[k] = secureManagers[k]

View File

@ -24,10 +24,6 @@ import utils, ../../libp2p/[errors,
import ../helpers import ../helpers
proc createGossipSub(): GossipSub =
var peerInfo = PeerInfo.init(PrivateKey.random(RSA).get())
result = newPubSub(GossipSub, peerInfo)
proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} = proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} =
if sender == receiver: if sender == receiver:
return return
@ -35,7 +31,7 @@ proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} =
# this is for testing purposes only # this is for testing purposes only
# peers can be inside `mesh` and `fanout`, not just `gossipsub` # peers can be inside `mesh` and `fanout`, not just `gossipsub`
var ceil = 15 var ceil = 15
let fsub = cast[GossipSub](sender.pubSub.get()) let fsub = GossipSub(sender.pubSub.get())
while (not fsub.gossipsub.hasKey(key) or while (not fsub.gossipsub.hasKey(key) or
not fsub.gossipsub[key].contains(receiver.peerInfo.id)) and not fsub.gossipsub[key].contains(receiver.peerInfo.id)) and
(not fsub.mesh.hasKey(key) or (not fsub.mesh.hasKey(key) or
@ -277,7 +273,7 @@ suite "GossipSub":
check: check:
"foobar" in gossipSub1.gossipsub "foobar" in gossipSub1.gossipsub
await passed.wait(5.seconds) await passed.wait(1.seconds)
trace "test done, stopping..." trace "test done, stopping..."