diff --git a/fluffy/network/wire/portal_protocol.nim b/fluffy/network/wire/portal_protocol.nim index a2af2f7ec..51bc4e04f 100644 --- a/fluffy/network/wire/portal_protocol.nim +++ b/fluffy/network/wire/portal_protocol.nim @@ -505,16 +505,18 @@ proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList): id = dst.id return err("Trying to connect to node with unknown address") - let socketRes = await p.stream.transport.connectTo( - nodeAddress.unsafeGet(), uint16.fromBytesBE(m.connectionId)) - if socketRes.isErr(): - # TODO: get proper error mapped - return err("Error connecting to uTP socket") - let socket = socketRes.get() - if not socket.isConnected(): - socket.close() - return err("Portal uTP socket is not in connected state") + let connectionResult = + await p.stream.connectTo( + nodeAddress.unsafeGet(), + uint16.fromBytesBE(m.connectionId) + ) + if connectionResult.isErr(): + error "Utp connection error while trying to find content", + msg = connectionResult.error + return err("Error connecting uTP socket") + + let socket = connectionResult.get() # Read all bytes from the socket # This will either end with a FIN, or because the read action times out. # A FIN does not necessarily mean that the data read is complete. Further @@ -570,17 +572,20 @@ proc offer*(p: PortalProtocol, dst: Node, contentKeys: ContentKeysList): error "Trying to connect to node with unknown address", id = dst.id return err("Trying to connect to node with unknown address") + + let connectionResult = + await p.stream.connectTo( + nodeAddress.unsafeGet(), + uint16.fromBytesBE(m.connectionId) + ) - let clientSocketRes = await p.stream.transport.connectTo( - nodeAddress.unsafeGet(), uint16.fromBytesBE(m.connectionId)) - if clientSocketRes.isErr(): - # TODO: get proper error mapped - return err("Error connecting to uTP socket") - let clientSocket = clientSocketRes.get() - if not clientSocket.isConnected(): - clientSocket.close() - return err("Portal uTP socket is not in connected state") + if connectionResult.isErr(): + error "Utp connection error while trying to offer content", + msg = connectionResult.error + return err("Error connecting uTP socket") + let clientSocket = connectionResult.get() + for contentKey in requestedContentKeys: let contentIdOpt = p.toContentId(contentKey) if contentIdOpt.isSome(): diff --git a/fluffy/network/wire/portal_stream.nim b/fluffy/network/wire/portal_stream.nim index e7e6ff9bd..8a72b5c26 100644 --- a/fluffy/network/wire/portal_stream.nim +++ b/fluffy/network/wire/portal_stream.nim @@ -41,7 +41,7 @@ type {.gcsafe, raises: [Defect].} PortalStream* = ref object - transport*: UtpDiscv5Protocol + transport: UtpDiscv5Protocol # TODO: # Decide on what's the better collection to use and set some limits in them # on how many uTP transfers allowed to happen concurrently. @@ -99,6 +99,34 @@ proc addContentRequest*( return connectionId +proc connectTo*( + stream: PortalStream, + nodeAddress: NodeAddress, + connectionId: uint16): Future[Result[UtpSocket[NodeAddress], string]] {.async.} = + let socketRes = await stream.transport.connectTo(nodeAddress, connectionId) + + if socketRes.isErr(): + case socketRes.error.kind + of SocketAlreadyExists: + # This error means that there is already socket to this nodeAddress with given + # connection id, in our use case it most probably means that other side sent us + # connection id which is already used. + # For now we just fail connection and return an error. Another strategy to consider + # would be to check what is the connection status, and then re-use it, or + # close it and retry connection. + let msg = "Socket to " & $nodeAddress & "with connection id: " & $connectionId & " already exists" + return err(msg) + of ConnectionTimedOut: + # Another strategy for handling this error would be to retry connecting a few times + # before giving up. But we know (as we control the uTP impl) that this error will only + # be returned when a SYN packet was re-sent 3 times and failed to be acked. This + # should be enough for us to known that the remote host is not reachable. + let msg = "uTP timeout while trying to connect to " & $nodeAddress + return err(msg) + + let socket = socketRes.get() + return ok(socket) + proc writeAndClose( socket: UtpSocket[NodeAddress], stream: PortalStream, request: ContentRequest) {.async.} =