This commit is contained in:
KonradStaniec 2022-08-22 12:23:26 +02:00 committed by GitHub
parent daac75796f
commit 88263e664b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 28 additions and 22 deletions

View File

@ -99,7 +99,7 @@ proc processContentLoop(n: StateNetwork) {.async.} =
try: try:
while true: while true:
# Just dropping state date for now # Just dropping state date for now
discard await n.portalProtocol.stream.contentQueue.popFirst() discard await n.contentQueue.popFirst()
except CancelledError: except CancelledError:
trace "processContentLoop canceled" trace "processContentLoop canceled"

View File

@ -239,12 +239,14 @@ proc readContentOffer(
else: else:
# Invalid data, stop reading content, but still process data received # Invalid data, stop reading content, but still process data received
# so far. # so far.
debug "Reading content item failed, content offer failed", contentKeys = offer.contentKeys debug "Reading content item failed, content offer failed",
contentKeys = offer.contentKeys
break break
else: else:
# Read timed out, stop further reading, but still process data received # Read timed out, stop further reading, but still process data received
# so far. # so far.
debug "Reading data from socket timed out, content offer failed", contentKeys = offer.contentKeys debug "Reading data from socket timed out, content offer failed",
contentKeys = offer.contentKeys
break break
if socket.atEof(): if socket.atEof():
@ -291,7 +293,9 @@ proc allowedConnection(
proc (x: ContentOffer): bool = proc (x: ContentOffer): bool =
x.connectionId == connectionId and x.nodeId == address.nodeId) x.connectionId == connectionId and x.nodeId == address.nodeId)
proc handleIncomingConnection(server: UtpRouter[NodeAddress], socket: UtpSocket[NodeAddress]): Future[void] = proc handleIncomingConnection(
server: UtpRouter[NodeAddress],
socket: UtpSocket[NodeAddress]): Future[void] =
let manager = getUserData[NodeAddress, StreamManager](server) let manager = getUserData[NodeAddress, StreamManager](server)
for stream in manager.streams: for stream in manager.streams:
@ -318,7 +322,10 @@ proc handleIncomingConnection(server: UtpRouter[NodeAddress], socket: UtpSocket[
fut.complete() fut.complete()
return fut return fut
proc allowIncomingConnection(r: UtpRouter[NodeAddress], remoteAddress: NodeAddress, connectionId: uint16): bool = proc allowIncomingConnection(
r: UtpRouter[NodeAddress],
remoteAddress: NodeAddress,
connectionId: uint16): bool =
let manager = getUserData[NodeAddress, StreamManager](r) let manager = getUserData[NodeAddress, StreamManager](r)
for stream in manager.streams: for stream in manager.streams:
# stream.pruneAllowedConnections() # stream.pruneAllowedConnections()
@ -326,16 +333,15 @@ proc allowIncomingConnection(r: UtpRouter[NodeAddress], remoteAddress: NodeAddre
return true return true
proc new*(T: type StreamManager, d: protocol.Protocol): T = proc new*(T: type StreamManager, d: protocol.Protocol): T =
let socketConfig = SocketConfig.init( let
socketConfig = SocketConfig.init(
# Setting to none means that incoming sockets are in Connected state, which # Setting to none means that incoming sockets are in Connected state, which
# means they can send and receive data. # means they can send and receive data.
incomingSocketReceiveTimeout = none(Duration), incomingSocketReceiveTimeout = none(Duration),
payloadSize = uint32(maxUtpPayloadSize) payloadSize = uint32(maxUtpPayloadSize)
) )
manager = StreamManager(streams: @[], rng: d.rng)
let manager = StreamManager(streams: @[], rng: d.rng) utpOverDiscV5Protocol = UtpDiscv5Protocol.new(
let utpOverDiscV5Protocol = UtpDiscv5Protocol.new(
d, d,
utpProtocolId, utpProtocolId,
handleIncomingConnection , handleIncomingConnection ,