mirror of
https://github.com/status-im/nimbus-eth1.git
synced 2025-01-27 04:26:07 +00:00
parent
75f2c76ad0
commit
4cb4f50bf3
@ -505,16 +505,18 @@ proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList):
|
|||||||
id = dst.id
|
id = dst.id
|
||||||
return err("Trying to connect to node with unknown address")
|
return err("Trying to connect to node with unknown address")
|
||||||
|
|
||||||
let socketRes = await p.stream.transport.connectTo(
|
let connectionResult =
|
||||||
nodeAddress.unsafeGet(), uint16.fromBytesBE(m.connectionId))
|
await p.stream.connectTo(
|
||||||
if socketRes.isErr():
|
nodeAddress.unsafeGet(),
|
||||||
# TODO: get proper error mapped
|
uint16.fromBytesBE(m.connectionId)
|
||||||
return err("Error connecting to uTP socket")
|
)
|
||||||
let socket = socketRes.get()
|
|
||||||
if not socket.isConnected():
|
|
||||||
socket.close()
|
|
||||||
return err("Portal uTP socket is not in connected state")
|
|
||||||
|
|
||||||
|
if connectionResult.isErr():
|
||||||
|
error "Utp connection error while trying to find content",
|
||||||
|
msg = connectionResult.error
|
||||||
|
return err("Error connecting uTP socket")
|
||||||
|
|
||||||
|
let socket = connectionResult.get()
|
||||||
# Read all bytes from the socket
|
# Read all bytes from the socket
|
||||||
# This will either end with a FIN, or because the read action times out.
|
# This will either end with a FIN, or because the read action times out.
|
||||||
# A FIN does not necessarily mean that the data read is complete. Further
|
# A FIN does not necessarily mean that the data read is complete. Further
|
||||||
@ -570,17 +572,20 @@ proc offer*(p: PortalProtocol, dst: Node, contentKeys: ContentKeysList):
|
|||||||
error "Trying to connect to node with unknown address",
|
error "Trying to connect to node with unknown address",
|
||||||
id = dst.id
|
id = dst.id
|
||||||
return err("Trying to connect to node with unknown address")
|
return err("Trying to connect to node with unknown address")
|
||||||
|
|
||||||
|
let connectionResult =
|
||||||
|
await p.stream.connectTo(
|
||||||
|
nodeAddress.unsafeGet(),
|
||||||
|
uint16.fromBytesBE(m.connectionId)
|
||||||
|
)
|
||||||
|
|
||||||
let clientSocketRes = await p.stream.transport.connectTo(
|
if connectionResult.isErr():
|
||||||
nodeAddress.unsafeGet(), uint16.fromBytesBE(m.connectionId))
|
error "Utp connection error while trying to offer content",
|
||||||
if clientSocketRes.isErr():
|
msg = connectionResult.error
|
||||||
# TODO: get proper error mapped
|
return err("Error connecting uTP socket")
|
||||||
return err("Error connecting to uTP socket")
|
|
||||||
let clientSocket = clientSocketRes.get()
|
|
||||||
if not clientSocket.isConnected():
|
|
||||||
clientSocket.close()
|
|
||||||
return err("Portal uTP socket is not in connected state")
|
|
||||||
|
|
||||||
|
let clientSocket = connectionResult.get()
|
||||||
|
|
||||||
for contentKey in requestedContentKeys:
|
for contentKey in requestedContentKeys:
|
||||||
let contentIdOpt = p.toContentId(contentKey)
|
let contentIdOpt = p.toContentId(contentKey)
|
||||||
if contentIdOpt.isSome():
|
if contentIdOpt.isSome():
|
||||||
|
@ -41,7 +41,7 @@ type
|
|||||||
{.gcsafe, raises: [Defect].}
|
{.gcsafe, raises: [Defect].}
|
||||||
|
|
||||||
PortalStream* = ref object
|
PortalStream* = ref object
|
||||||
transport*: UtpDiscv5Protocol
|
transport: UtpDiscv5Protocol
|
||||||
# TODO:
|
# TODO:
|
||||||
# Decide on what's the better collection to use and set some limits in them
|
# Decide on what's the better collection to use and set some limits in them
|
||||||
# on how many uTP transfers allowed to happen concurrently.
|
# on how many uTP transfers allowed to happen concurrently.
|
||||||
@ -99,6 +99,34 @@ proc addContentRequest*(
|
|||||||
|
|
||||||
return connectionId
|
return connectionId
|
||||||
|
|
||||||
|
proc connectTo*(
|
||||||
|
stream: PortalStream,
|
||||||
|
nodeAddress: NodeAddress,
|
||||||
|
connectionId: uint16): Future[Result[UtpSocket[NodeAddress], string]] {.async.} =
|
||||||
|
let socketRes = await stream.transport.connectTo(nodeAddress, connectionId)
|
||||||
|
|
||||||
|
if socketRes.isErr():
|
||||||
|
case socketRes.error.kind
|
||||||
|
of SocketAlreadyExists:
|
||||||
|
# This error means that there is already socket to this nodeAddress with given
|
||||||
|
# connection id, in our use case it most probably means that other side sent us
|
||||||
|
# connection id which is already used.
|
||||||
|
# For now we just fail connection and return an error. Another strategy to consider
|
||||||
|
# would be to check what is the connection status, and then re-use it, or
|
||||||
|
# close it and retry connection.
|
||||||
|
let msg = "Socket to " & $nodeAddress & "with connection id: " & $connectionId & " already exists"
|
||||||
|
return err(msg)
|
||||||
|
of ConnectionTimedOut:
|
||||||
|
# Another strategy for handling this error would be to retry connecting a few times
|
||||||
|
# before giving up. But we know (as we control the uTP impl) that this error will only
|
||||||
|
# be returned when a SYN packet was re-sent 3 times and failed to be acked. This
|
||||||
|
# should be enough for us to known that the remote host is not reachable.
|
||||||
|
let msg = "uTP timeout while trying to connect to " & $nodeAddress
|
||||||
|
return err(msg)
|
||||||
|
|
||||||
|
let socket = socketRes.get()
|
||||||
|
return ok(socket)
|
||||||
|
|
||||||
proc writeAndClose(
|
proc writeAndClose(
|
||||||
socket: UtpSocket[NodeAddress], stream: PortalStream,
|
socket: UtpSocket[NodeAddress], stream: PortalStream,
|
||||||
request: ContentRequest) {.async.} =
|
request: ContentRequest) {.async.} =
|
||||||
|
Loading…
x
Reference in New Issue
Block a user