Keep connection alive when peer doesn't support pubsub (#754)
This commit is contained in:
parent
dfbfbe6eb6
commit
abbeaab684
|
@ -292,19 +292,11 @@ proc getOrCreatePeer*(
|
||||||
proc getConn(): Future[Connection] {.async.} =
|
proc getConn(): Future[Connection] {.async.} =
|
||||||
return await p.switch.dial(peerId, protos)
|
return await p.switch.dial(peerId, protos)
|
||||||
|
|
||||||
proc dropConn(peer: PubSubPeer) =
|
|
||||||
proc dropConnAsync(peer: PubSubPeer) {.async.} =
|
|
||||||
try:
|
|
||||||
await p.switch.disconnect(peer.peerId)
|
|
||||||
except CatchableError as exc: # never cancelled
|
|
||||||
trace "Failed to close connection", peer, error = exc.name, msg = exc.msg
|
|
||||||
asyncSpawn dropConnAsync(peer)
|
|
||||||
|
|
||||||
proc onEvent(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe.} =
|
proc onEvent(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe.} =
|
||||||
p.onPubSubPeerEvent(peer, event)
|
p.onPubSubPeerEvent(peer, event)
|
||||||
|
|
||||||
# create new pubsub peer
|
# create new pubsub peer
|
||||||
let pubSubPeer = PubSubPeer.new(peerId, getConn, dropConn, onEvent, protos[0], p.maxMessageSize)
|
let pubSubPeer = PubSubPeer.new(peerId, getConn, onEvent, protos[0], p.maxMessageSize)
|
||||||
debug "created new pubsub peer", peerId
|
debug "created new pubsub peer", peerId
|
||||||
|
|
||||||
p.peers[peerId] = pubSubPeer
|
p.peers[peerId] = pubSubPeer
|
||||||
|
|
|
@ -51,7 +51,6 @@ type
|
||||||
|
|
||||||
PubSubPeer* = ref object of RootObj
|
PubSubPeer* = ref object of RootObj
|
||||||
getConn*: GetConn # callback to establish a new send connection
|
getConn*: GetConn # callback to establish a new send connection
|
||||||
dropConn*: DropConn # Function pointer to use to drop connections
|
|
||||||
onEvent*: OnEvent # Connectivity updates for peer
|
onEvent*: OnEvent # Connectivity updates for peer
|
||||||
codec*: string # the protocol that this peer joined from
|
codec*: string # the protocol that this peer joined from
|
||||||
sendConn*: Connection # cached send connection
|
sendConn*: Connection # cached send connection
|
||||||
|
@ -206,9 +205,6 @@ proc connectImpl(p: PubSubPeer) {.async.} =
|
||||||
await connectOnce(p)
|
await connectOnce(p)
|
||||||
except CatchableError as exc: # never cancelled
|
except CatchableError as exc: # never cancelled
|
||||||
debug "Could not establish send connection", msg = exc.msg
|
debug "Could not establish send connection", msg = exc.msg
|
||||||
finally:
|
|
||||||
# drop the connection, else we end up with ghost peers
|
|
||||||
if p.dropConn != nil: p.dropConn(p)
|
|
||||||
|
|
||||||
proc connect*(p: PubSubPeer) =
|
proc connect*(p: PubSubPeer) =
|
||||||
asyncSpawn connectImpl(p)
|
asyncSpawn connectImpl(p)
|
||||||
|
@ -286,14 +282,12 @@ proc new*(
|
||||||
T: typedesc[PubSubPeer],
|
T: typedesc[PubSubPeer],
|
||||||
peerId: PeerId,
|
peerId: PeerId,
|
||||||
getConn: GetConn,
|
getConn: GetConn,
|
||||||
dropConn: DropConn,
|
|
||||||
onEvent: OnEvent,
|
onEvent: OnEvent,
|
||||||
codec: string,
|
codec: string,
|
||||||
maxMessageSize: int): T =
|
maxMessageSize: int): T =
|
||||||
|
|
||||||
T(
|
T(
|
||||||
getConn: getConn,
|
getConn: getConn,
|
||||||
dropConn: dropConn,
|
|
||||||
onEvent: onEvent,
|
onEvent: onEvent,
|
||||||
codec: codec,
|
codec: codec,
|
||||||
peerId: peerId,
|
peerId: peerId,
|
||||||
|
|
|
@ -22,10 +22,7 @@ proc getPubSubPeer(p: TestGossipSub, peerId: PeerId): PubSubPeer =
|
||||||
proc getConn(): Future[Connection] =
|
proc getConn(): Future[Connection] =
|
||||||
p.switch.dial(peerId, GossipSubCodec)
|
p.switch.dial(peerId, GossipSubCodec)
|
||||||
|
|
||||||
proc dropConn(peer: PubSubPeer) =
|
let pubSubPeer = PubSubPeer.new(peerId, getConn, nil, GossipSubCodec, 1024 * 1024)
|
||||||
discard # we don't care about it here yet
|
|
||||||
|
|
||||||
let pubSubPeer = PubSubPeer.new(peerId, getConn, dropConn, nil, GossipSubCodec, 1024 * 1024)
|
|
||||||
debug "created new pubsub peer", peerId
|
debug "created new pubsub peer", peerId
|
||||||
|
|
||||||
p.peers[peerId] = pubSubPeer
|
p.peers[peerId] = pubSubPeer
|
||||||
|
|
Loading…
Reference in New Issue