diff --git a/fluffy/network/state/state_network.nim b/fluffy/network/state/state_network.nim index ee92fbd56..9763b6a15 100644 --- a/fluffy/network/state/state_network.nim +++ b/fluffy/network/state/state_network.nim @@ -99,7 +99,7 @@ proc processContentLoop(n: StateNetwork) {.async.} = try: while true: # Just dropping state date for now - discard await n.portalProtocol.stream.contentQueue.popFirst() + discard await n.contentQueue.popFirst() except CancelledError: trace "processContentLoop canceled" diff --git a/fluffy/network/wire/portal_stream.nim b/fluffy/network/wire/portal_stream.nim index 263e04dc4..40e57653d 100644 --- a/fluffy/network/wire/portal_stream.nim +++ b/fluffy/network/wire/portal_stream.nim @@ -239,12 +239,14 @@ proc readContentOffer( else: # Invalid data, stop reading content, but still process data received # so far. - debug "Reading content item failed, content offer failed", contentKeys = offer.contentKeys + debug "Reading content item failed, content offer failed", + contentKeys = offer.contentKeys break else: # Read timed out, stop further reading, but still process data received # so far. - debug "Reading data from socket timed out, content offer failed", contentKeys = offer.contentKeys + debug "Reading data from socket timed out, content offer failed", + contentKeys = offer.contentKeys break if socket.atEof(): @@ -291,7 +293,9 @@ proc allowedConnection( proc (x: ContentOffer): bool = x.connectionId == connectionId and x.nodeId == address.nodeId) -proc handleIncomingConnection(server: UtpRouter[NodeAddress], socket: UtpSocket[NodeAddress]): Future[void] = +proc handleIncomingConnection( + server: UtpRouter[NodeAddress], + socket: UtpSocket[NodeAddress]): Future[void] = let manager = getUserData[NodeAddress, StreamManager](server) for stream in manager.streams: @@ -318,7 +322,10 @@ proc handleIncomingConnection(server: UtpRouter[NodeAddress], socket: UtpSocket[ fut.complete() return fut -proc allowIncomingConnection(r: UtpRouter[NodeAddress], remoteAddress: NodeAddress, connectionId: uint16): bool = +proc allowIncomingConnection( + r: UtpRouter[NodeAddress], + remoteAddress: NodeAddress, + connectionId: uint16): bool = let manager = getUserData[NodeAddress, StreamManager](r) for stream in manager.streams: # stream.pruneAllowedConnections() @@ -326,23 +333,22 @@ proc allowIncomingConnection(r: UtpRouter[NodeAddress], remoteAddress: NodeAddre return true proc new*(T: type StreamManager, d: protocol.Protocol): T = - let socketConfig = SocketConfig.init( - # Setting to none means that incoming sockets are in Connected state, which - # means they can send and receive data. - incomingSocketReceiveTimeout = none(Duration), - payloadSize = uint32(maxUtpPayloadSize) - ) - - let manager = StreamManager(streams: @[], rng: d.rng) - - let utpOverDiscV5Protocol = UtpDiscv5Protocol.new( - d, - utpProtocolId, - handleIncomingConnection , - manager, - allowIncomingConnection, - socketConfig - ) + let + socketConfig = SocketConfig.init( + # Setting to none means that incoming sockets are in Connected state, which + # means they can send and receive data. + incomingSocketReceiveTimeout = none(Duration), + payloadSize = uint32(maxUtpPayloadSize) + ) + manager = StreamManager(streams: @[], rng: d.rng) + utpOverDiscV5Protocol = UtpDiscv5Protocol.new( + d, + utpProtocolId, + handleIncomingConnection , + manager, + allowIncomingConnection, + socketConfig + ) manager.transport = utpOverDiscV5Protocol