From 2232ca598e514ab79befac972c9ae0517b1196c3 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Tue, 4 Feb 2020 17:16:21 +0100 Subject: [PATCH] don't timeout in pubsub --- libp2p/multistream.nim | 18 +++++++++--------- libp2p/protocols/pubsub/pubsubpeer.nim | 2 ++ libp2p/switch.nim | 2 +- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/libp2p/multistream.nim b/libp2p/multistream.nim index 061d3be..c8b2ff6 100644 --- a/libp2p/multistream.nim +++ b/libp2p/multistream.nim @@ -32,19 +32,19 @@ type protocol*: LPProtocol match*: Matcher - MultisteamSelect* = ref object of RootObj + MultistreamSelect* = ref object of RootObj handlers*: seq[HandlerHolder] codec*: string na: string ls: string -proc newMultistream*(): MultisteamSelect = +proc newMultistream*(): MultistreamSelect = new result result.codec = MSCodec result.ls = Ls result.na = Na -proc select*(m: MultisteamSelect, +proc select*(m: MultistreamSelect, conn: Connection, proto: seq[string]): Future[string] {.async.} = @@ -81,7 +81,7 @@ proc select*(m: MultisteamSelect, trace "selected protocol", protocol = result break -proc select*(m: MultisteamSelect, +proc select*(m: MultistreamSelect, conn: Connection, proto: string): Future[bool] {.async.} = if proto.len > 0: @@ -89,10 +89,10 @@ proc select*(m: MultisteamSelect, else: result = (await m.select(conn, @[])) == Codec -proc select*(m: MultisteamSelect, conn: Connection): Future[bool] = +proc select*(m: MultistreamSelect, conn: Connection): Future[bool] = m.select(conn, "") -proc list*(m: MultisteamSelect, +proc list*(m: MultistreamSelect, conn: Connection): Future[seq[string]] {.async.} = ## list remote protos requests on connection if not await m.select(conn): @@ -108,7 +108,7 @@ proc list*(m: MultisteamSelect, result = list -proc handle*(m: MultisteamSelect, conn: Connection) {.async, gcsafe.} = +proc handle*(m: MultistreamSelect, conn: Connection) {.async, gcsafe.} = trace "handle: starting multistream handling" try: while not conn.closed: @@ -152,7 +152,7 @@ proc handle*(m: MultisteamSelect, conn: Connection) {.async, gcsafe.} = finally: trace "leaving multistream loop" -proc addHandler*[T: LPProtocol](m: MultisteamSelect, +proc addHandler*[T: LPProtocol](m: MultistreamSelect, codec: string, protocol: T, matcher: Matcher = nil) = @@ -167,7 +167,7 @@ proc addHandler*[T: LPProtocol](m: MultisteamSelect, protocol: protocol, match: matcher)) -proc addHandler*[T: LPProtoHandler](m: MultisteamSelect, +proc addHandler*[T: LPProtoHandler](m: MultistreamSelect, codec: string, handler: T, matcher: Matcher = nil) = diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 4988e7a..6fdbc81 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -43,6 +43,8 @@ proc isConnected*(p: PubSubPeer): bool = proc `conn=`*(p: PubSubPeer, conn: Connection) = trace "attaching send connection for peer", peer = p.id p.sendConn = conn + p.sendConn.timeout = InfiniteDuration + p.onConnect.fire() proc handle*(p: PubSubPeer, conn: Connection) {.async.} = diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 44ba0d5..23f4074 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -40,7 +40,7 @@ type transports*: seq[Transport] protocols*: seq[LPProtocol] muxers*: Table[string, MuxerProvider] - ms*: MultisteamSelect + ms*: MultistreamSelect identity*: Identify streamHandler*: StreamHandler secureManagers*: Table[string, Secure]