don't timeout in pubsub

This commit is contained in:
Dmitriy Ryajov 2020-02-04 17:16:21 +01:00
parent 1bd933cd5a
commit 2232ca598e
3 changed files with 12 additions and 10 deletions

View File

@ -32,19 +32,19 @@ type
protocol*: LPProtocol protocol*: LPProtocol
match*: Matcher match*: Matcher
MultisteamSelect* = ref object of RootObj MultistreamSelect* = ref object of RootObj
handlers*: seq[HandlerHolder] handlers*: seq[HandlerHolder]
codec*: string codec*: string
na: string na: string
ls: string ls: string
proc newMultistream*(): MultisteamSelect = proc newMultistream*(): MultistreamSelect =
new result new result
result.codec = MSCodec result.codec = MSCodec
result.ls = Ls result.ls = Ls
result.na = Na result.na = Na
proc select*(m: MultisteamSelect, proc select*(m: MultistreamSelect,
conn: Connection, conn: Connection,
proto: seq[string]): proto: seq[string]):
Future[string] {.async.} = Future[string] {.async.} =
@ -81,7 +81,7 @@ proc select*(m: MultisteamSelect,
trace "selected protocol", protocol = result trace "selected protocol", protocol = result
break break
proc select*(m: MultisteamSelect, proc select*(m: MultistreamSelect,
conn: Connection, conn: Connection,
proto: string): Future[bool] {.async.} = proto: string): Future[bool] {.async.} =
if proto.len > 0: if proto.len > 0:
@ -89,10 +89,10 @@ proc select*(m: MultisteamSelect,
else: else:
result = (await m.select(conn, @[])) == Codec result = (await m.select(conn, @[])) == Codec
proc select*(m: MultisteamSelect, conn: Connection): Future[bool] = proc select*(m: MultistreamSelect, conn: Connection): Future[bool] =
m.select(conn, "") m.select(conn, "")
proc list*(m: MultisteamSelect, proc list*(m: MultistreamSelect,
conn: Connection): Future[seq[string]] {.async.} = conn: Connection): Future[seq[string]] {.async.} =
## list remote protos requests on connection ## list remote protos requests on connection
if not await m.select(conn): if not await m.select(conn):
@ -108,7 +108,7 @@ proc list*(m: MultisteamSelect,
result = list result = list
proc handle*(m: MultisteamSelect, conn: Connection) {.async, gcsafe.} = proc handle*(m: MultistreamSelect, conn: Connection) {.async, gcsafe.} =
trace "handle: starting multistream handling" trace "handle: starting multistream handling"
try: try:
while not conn.closed: while not conn.closed:
@ -152,7 +152,7 @@ proc handle*(m: MultisteamSelect, conn: Connection) {.async, gcsafe.} =
finally: finally:
trace "leaving multistream loop" trace "leaving multistream loop"
proc addHandler*[T: LPProtocol](m: MultisteamSelect, proc addHandler*[T: LPProtocol](m: MultistreamSelect,
codec: string, codec: string,
protocol: T, protocol: T,
matcher: Matcher = nil) = matcher: Matcher = nil) =
@ -167,7 +167,7 @@ proc addHandler*[T: LPProtocol](m: MultisteamSelect,
protocol: protocol, protocol: protocol,
match: matcher)) match: matcher))
proc addHandler*[T: LPProtoHandler](m: MultisteamSelect, proc addHandler*[T: LPProtoHandler](m: MultistreamSelect,
codec: string, codec: string,
handler: T, handler: T,
matcher: Matcher = nil) = matcher: Matcher = nil) =

View File

@ -43,6 +43,8 @@ proc isConnected*(p: PubSubPeer): bool =
proc `conn=`*(p: PubSubPeer, conn: Connection) = proc `conn=`*(p: PubSubPeer, conn: Connection) =
trace "attaching send connection for peer", peer = p.id trace "attaching send connection for peer", peer = p.id
p.sendConn = conn p.sendConn = conn
p.sendConn.timeout = InfiniteDuration
p.onConnect.fire() p.onConnect.fire()
proc handle*(p: PubSubPeer, conn: Connection) {.async.} = proc handle*(p: PubSubPeer, conn: Connection) {.async.} =

View File

@ -40,7 +40,7 @@ type
transports*: seq[Transport] transports*: seq[Transport]
protocols*: seq[LPProtocol] protocols*: seq[LPProtocol]
muxers*: Table[string, MuxerProvider] muxers*: Table[string, MuxerProvider]
ms*: MultisteamSelect ms*: MultistreamSelect
identity*: Identify identity*: Identify
streamHandler*: StreamHandler streamHandler*: StreamHandler
secureManagers*: Table[string, Secure] secureManagers*: Table[string, Secure]