From 5f2117d081886d2c58a2832dd3e5323dd39cb2e3 Mon Sep 17 00:00:00 2001 From: KonradStaniec Date: Wed, 17 Aug 2022 09:32:06 +0200 Subject: [PATCH] Refactor portal stream (#1191) --- fluffy/fluffy.nim | 27 +--- fluffy/network/history/history_network.nim | 20 ++- fluffy/network/state/state_network.nim | 18 ++- fluffy/network/wire/portal_protocol.nim | 6 +- fluffy/network/wire/portal_stream.nim | 155 +++++++++++++-------- fluffy/tests/test_history_network.nim | 17 +-- fluffy/tests/test_portal_wire_protocol.nim | 24 ++-- fluffy/tests/test_state_network.nim | 24 ++-- fluffy/tools/portalcli.nim | 15 +- 9 files changed, 159 insertions(+), 147 deletions(-) diff --git a/fluffy/fluffy.nim b/fluffy/fluffy.nim index 997287d06..bb9ef726f 100644 --- a/fluffy/fluffy.nim +++ b/fluffy/fluffy.nim @@ -102,30 +102,13 @@ proc run(config: PortalConf) {.raises: [CatchableError, Defect].} = config.bitsPerHop, config.radiusConfig ) - stateNetwork = StateNetwork.new(d, db, - bootstrapRecords = bootstrapRecords, portalConfig = portalConfig) - historyNetwork = HistoryNetwork.new(d, db, + streamManager = StreamManager.new(d) + + stateNetwork = StateNetwork.new(d, db, streamManager, bootstrapRecords = bootstrapRecords, portalConfig = portalConfig) - # One instance of UtpDiscv5Protocol is shared over all the PortalStreams. - let - socketConfig = SocketConfig.init( - incomingSocketReceiveTimeout = none(Duration), - payloadSize = uint32(maxUtpPayloadSize)) - streamTransport = UtpDiscv5Protocol.new( - d, - utpProtocolId, - registerIncomingSocketCallback(@[ - stateNetwork.portalProtocol.stream, - historyNetwork.portalProtocol.stream]), - # for now we do not use user data in callbacks - nil, - allowRegisteredIdCallback(@[ - stateNetwork.portalProtocol.stream, - historyNetwork.portalProtocol.stream]), - socketConfig) - stateNetwork.setStreamTransport(streamTransport) - historyNetwork.setStreamTransport(streamTransport) + historyNetwork = HistoryNetwork.new(d, db, streamManager, + bootstrapRecords = bootstrapRecords, portalConfig = portalConfig) # TODO: If no new network key is generated then we should first check if an # enr file exists, and in the case it does read out the seqNum from it and diff --git a/fluffy/network/history/history_network.nim b/fluffy/network/history/history_network.nim index 48d9b94b9..29418c7d9 100644 --- a/fluffy/network/history/history_network.nim +++ b/fluffy/network/history/history_network.nim @@ -40,13 +40,11 @@ type HistoryNetwork* = ref object portalProtocol*: PortalProtocol contentDB*: ContentDB + contentQueue*: AsyncQueue[(ContentKeysList, seq[seq[byte]])] processContentLoop: Future[void] Block* = (BlockHeader, BlockBody) -func setStreamTransport*(n: HistoryNetwork, transport: UtpDiscv5Protocol) = - setTransport(n.portalProtocol.stream, transport) - func toContentIdHandler(contentKey: ByteList): Option[ContentId] = some(toContentId(contentKey)) @@ -740,20 +738,30 @@ proc new*( T: type HistoryNetwork, baseProtocol: protocol.Protocol, contentDB: ContentDB, + streamManager: StreamManager, bootstrapRecords: openArray[Record] = [], portalConfig: PortalProtocolConfig = defaultPortalProtocolConfig): T = + + let cq = newAsyncQueue[(ContentKeysList, seq[seq[byte]])](50) + + let s = streamManager.registerNewStream(cq) + let portalProtocol = PortalProtocol.new( baseProtocol, historyProtocolId, contentDB, - toContentIdHandler, dbGetHandler, bootstrapRecords, + toContentIdHandler, dbGetHandler, s, bootstrapRecords, config = portalConfig) - return HistoryNetwork(portalProtocol: portalProtocol, contentDB: contentDB) + return HistoryNetwork( + portalProtocol: portalProtocol, + contentDB: contentDB, + contentQueue: cq + ) proc processContentLoop(n: HistoryNetwork) {.async.} = try: while true: let (contentKeys, contentItems) = - await n.portalProtocol.stream.contentQueue.popFirst() + await n.contentQueue.popFirst() # content passed here can have less items then contentKeys, but not more. for i, contentItem in contentItems: diff --git a/fluffy/network/state/state_network.nim b/fluffy/network/state/state_network.nim index d6dd8114b..ee92fbd56 100644 --- a/fluffy/network/state/state_network.nim +++ b/fluffy/network/state/state_network.nim @@ -23,11 +23,9 @@ const type StateNetwork* = ref object portalProtocol*: PortalProtocol contentDB*: ContentDB + contentQueue*: AsyncQueue[(ContentKeysList, seq[seq[byte]])] processContentLoop: Future[void] -func setStreamTransport*(n: StateNetwork, transport: UtpDiscv5Protocol) = - setTransport(n.portalProtocol.stream, transport) - func toContentIdHandler(contentKey: ByteList): Option[ContentId] = toContentId(contentKey) @@ -77,15 +75,25 @@ proc new*( T: type StateNetwork, baseProtocol: protocol.Protocol, contentDB: ContentDB, + streamManager: StreamManager, bootstrapRecords: openArray[Record] = [], portalConfig: PortalProtocolConfig = defaultPortalProtocolConfig): T = + + let cq = newAsyncQueue[(ContentKeysList, seq[seq[byte]])](50) + + let s = streamManager.registerNewStream(cq) + let portalProtocol = PortalProtocol.new( baseProtocol, stateProtocolId, contentDB, - toContentIdHandler, dbGetHandler, + toContentIdHandler, dbGetHandler, s, bootstrapRecords, stateDistanceCalculator, config = portalConfig) - return StateNetwork(portalProtocol: portalProtocol, contentDB: contentDB) + return StateNetwork( + portalProtocol: portalProtocol, + contentDB: contentDB, + contentQueue: cq + ) proc processContentLoop(n: StateNetwork) {.async.} = try: diff --git a/fluffy/network/wire/portal_protocol.nim b/fluffy/network/wire/portal_protocol.nim index 5f6d56793..06ca8c31a 100644 --- a/fluffy/network/wire/portal_protocol.nim +++ b/fluffy/network/wire/portal_protocol.nim @@ -444,6 +444,7 @@ proc new*(T: type PortalProtocol, contentDB: ContentDB, toContentId: ToContentIdHandler, dbGet: DbGetHandler, + stream: PortalStream, bootstrapRecords: openArray[Record] = [], distanceCalculator: DistanceCalculator = XorDistanceCalculator, config: PortalProtocolConfig = defaultPortalProtocolConfig @@ -464,16 +465,13 @@ proc new*(T: type PortalProtocol, radiusConfig: config.radiusConfig, dataRadius: initialRadius, bootstrapRecords: @bootstrapRecords, + stream: stream, radiusCache: RadiusCache.init(256), offerQueue: newAsyncQueue[OfferRequest](concurrentOffers)) proto.baseProtocol.registerTalkProtocol(@(proto.protocolId), proto).expect( "Only one protocol should have this id") - let stream = PortalStream.new(udata = proto, rng = proto.baseProtocol.rng) - - proto.stream = stream - proto # Sends the discv5 talkreq nessage with provided Portal message, awaits and diff --git a/fluffy/network/wire/portal_stream.nim b/fluffy/network/wire/portal_stream.nim index bebc42ea7..263e04dc4 100644 --- a/fluffy/network/wire/portal_stream.nim +++ b/fluffy/network/wire/portal_stream.nim @@ -22,7 +22,7 @@ logScope: topics = "portal_stream" const - utpProtocolId* = "utp".toBytes() + utpProtocolId = "utp".toBytes() defaultConnectionTimeout = 15.seconds defaultContentReadTimeout = 60.seconds @@ -32,7 +32,7 @@ const # discv5 ordinary message packets, for which below calculation applies. talkReqOverhead = getTalkReqOverhead(utpProtocolId) utpHeaderOverhead = 20 - maxUtpPayloadSize* = maxDiscv5PacketSize - talkReqOverhead - utpHeaderOverhead + maxUtpPayloadSize = maxDiscv5PacketSize - talkReqOverhead - utpHeaderOverhead type ContentRequest = object @@ -67,9 +67,13 @@ type connectionTimeout: Duration contentReadTimeout*: Duration rng: ref HmacDrbgContext - udata: pointer contentQueue*: AsyncQueue[(ContentKeysList, seq[seq[byte]])] + StreamManager* = ref object + transport: UtpDiscv5Protocol + streams: seq[PortalStream] + rng: ref HmacDrbgContext + proc pruneAllowedConnections(stream: PortalStream) = # Prune requests and offers that didn't receive a connection request # before `connectionTimeout`. @@ -79,10 +83,6 @@ proc pruneAllowedConnections(stream: PortalStream) = stream.contentOffers.keepIf(proc(x: ContentOffer): bool = x.timeout > now) -proc getUserData*[T](stream: PortalStream): T = - ## Obtain user data stored in ``stream`` object. - cast[T](stream.udata) - proc addContentOffer*( stream: PortalStream, nodeId: NodeId, contentKeys: ContentKeysList): Bytes2 = stream.pruneAllowedConnections() @@ -264,56 +264,23 @@ proc readContentOffer( # socket, and let the specific networks handle that. await stream.contentQueue.put((offer.contentKeys, contentItems)) -proc new*( +proc new( T: type PortalStream, - udata: ref, - connectionTimeout = defaultConnectionTimeout, - contentReadTimeout = defaultContentReadTimeout, - rng = newRng()): T = - GC_ref(udata) - let - stream = PortalStream( - udata: cast[pointer](udata), - connectionTimeout: connectionTimeout, - contentReadTimeout: contentReadTimeout, - contentQueue: newAsyncQueue[(ContentKeysList, seq[seq[byte]])](50), - rng: rng) + transport: UtpDiscv5Protocol, + contentQueue: AsyncQueue[(ContentKeysList, seq[seq[byte]])], + connectionTimeout: Duration, + contentReadTimeout: Duration, + rng: ref HmacDrbgContext): T = + let stream = PortalStream( + transport: transport, + connectionTimeout: connectionTimeout, + contentReadTimeout: contentReadTimeout, + contentQueue: contentQueue, + rng: rng + ) stream -func setTransport*(stream: PortalStream, transport: UtpDiscv5Protocol) = - stream.transport = transport - -# TODO: I think I'd like it more if we weren't to capture the stream. -proc registerIncomingSocketCallback*( - streams: seq[PortalStream]): AcceptConnectionCallback[NodeAddress] = - return ( - proc(server: UtpRouter[NodeAddress], socket: UtpSocket[NodeAddress]): Future[void] = - for stream in streams: - # Note: Connection id of uTP SYN is different from other packets, it is - # actually the peers `send_conn_id`, opposed to `receive_conn_id` for all - # other packets. - for i, request in stream.contentRequests: - if request.connectionId == socket.connectionId and - request.nodeId == socket.remoteAddress.nodeId: - let fut = socket.writeContentRequest(stream, request) - stream.contentRequests.del(i) - return fut - - for i, offer in stream.contentOffers: - if offer.connectionId == socket.connectionId and - offer.nodeId == socket.remoteAddress.nodeId: - let fut = socket.readContentOffer(stream, offer) - stream.contentOffers.del(i) - return fut - - # TODO: Is there a scenario where this can happen, - # considering `allowRegisteredIdCallback`? If not, doAssert? - var fut = newFuture[void]("fluffy.AcceptConnectionCallback") - fut.complete() - return fut - ) - proc allowedConnection( stream: PortalStream, address: NodeAddress, connectionId: uint16): bool = return @@ -324,12 +291,78 @@ proc allowedConnection( proc (x: ContentOffer): bool = x.connectionId == connectionId and x.nodeId == address.nodeId) -proc allowRegisteredIdCallback*( - streams: seq[PortalStream]): AllowConnectionCallback[NodeAddress] = - return ( - proc(r: UtpRouter[NodeAddress], remoteAddress: NodeAddress, connectionId: uint16): bool = - for stream in streams: - # stream.pruneAllowedConnections() - if allowedConnection(stream, remoteAddress, connectionId): - return true +proc handleIncomingConnection(server: UtpRouter[NodeAddress], socket: UtpSocket[NodeAddress]): Future[void] = + let manager = getUserData[NodeAddress, StreamManager](server) + + for stream in manager.streams: + # Note: Connection id of uTP SYN is different from other packets, it is + # actually the peers `send_conn_id`, opposed to `receive_conn_id` for all + # other packets. + for i, request in stream.contentRequests: + if request.connectionId == socket.connectionId and + request.nodeId == socket.remoteAddress.nodeId: + let fut = socket.writeContentRequest(stream, request) + stream.contentRequests.del(i) + return fut + + for i, offer in stream.contentOffers: + if offer.connectionId == socket.connectionId and + offer.nodeId == socket.remoteAddress.nodeId: + let fut = socket.readContentOffer(stream, offer) + stream.contentOffers.del(i) + return fut + + # TODO: Is there a scenario where this can happen, + # considering `allowRegisteredIdCallback`? If not, doAssert? + var fut = newFuture[void]("fluffy.AcceptConnectionCallback") + fut.complete() + return fut + +proc allowIncomingConnection(r: UtpRouter[NodeAddress], remoteAddress: NodeAddress, connectionId: uint16): bool = + let manager = getUserData[NodeAddress, StreamManager](r) + for stream in manager.streams: + # stream.pruneAllowedConnections() + if allowedConnection(stream, remoteAddress, connectionId): + return true + +proc new*(T: type StreamManager, d: protocol.Protocol): T = + let socketConfig = SocketConfig.init( + # Setting to none means that incoming sockets are in Connected state, which + # means they can send and receive data. + incomingSocketReceiveTimeout = none(Duration), + payloadSize = uint32(maxUtpPayloadSize) ) + + let manager = StreamManager(streams: @[], rng: d.rng) + + let utpOverDiscV5Protocol = UtpDiscv5Protocol.new( + d, + utpProtocolId, + handleIncomingConnection , + manager, + allowIncomingConnection, + socketConfig + ) + + manager.transport = utpOverDiscV5Protocol + + return manager + +proc registerNewStream*( + m : StreamManager, + contentQueue: AsyncQueue[(ContentKeysList, seq[seq[byte]])], + connectionTimeout = defaultConnectionTimeout, + contentReadTimeout = defaultContentReadTimeout): PortalStream = + + let s = PortalStream.new( + m.transport, + contentQueue, + connectionTimeout, + contentReadTimeout, + m.rng + ) + + m.streams.add(s) + + return s + diff --git a/fluffy/tests/test_history_network.nim b/fluffy/tests/test_history_network.nim index 74c80a75f..b12c72391 100644 --- a/fluffy/tests/test_history_network.nim +++ b/fluffy/tests/test_history_network.nim @@ -25,21 +25,8 @@ proc newHistoryNode(rng: ref HmacDrbgContext, port: int): HistoryNode = let node = initDiscoveryNode(rng, PrivateKey.random(rng[]), localAddress(port)) db = ContentDB.new("", uint32.high, inMemory = true) - socketConfig = SocketConfig.init( - incomingSocketReceiveTimeout = none(Duration), - payloadSize = uint32(maxUtpPayloadSize) - ) - hn = HistoryNetwork.new(node, db) - streamTransport = UtpDiscv5Protocol.new( - node, - utpProtocolId, - registerIncomingSocketCallback(@[hn.portalProtocol.stream]), - nil, - allowRegisteredIdCallback(@[hn.portalProtocol.stream]), - socketConfig - ) - - hn.setStreamTransport(streamTransport) + streamManager = StreamManager.new(node) + hn = HistoryNetwork.new(node, db, streamManager) return HistoryNode(discoveryProtocol: node, historyNetwork: hn) diff --git a/fluffy/tests/test_portal_wire_protocol.nim b/fluffy/tests/test_portal_wire_protocol.nim index d883a9234..838bd1b26 100644 --- a/fluffy/tests/test_portal_wire_protocol.nim +++ b/fluffy/tests/test_portal_wire_protocol.nim @@ -41,22 +41,14 @@ proc initPortalProtocol( let d = initDiscoveryNode(rng, privKey, address, bootstrapRecords) db = ContentDB.new("", uint32.high, inMemory = true) + manager = StreamManager.new(d) + q = newAsyncQueue[(ContentKeysList, seq[seq[byte]])](50) + stream = manager.registerNewStream(q) + proto = PortalProtocol.new( - d, protocolId, db, toContentId, dbGetHandler, + d, protocolId, db, toContentId, dbGetHandler, stream, bootstrapRecords = bootstrapRecords) - socketConfig = SocketConfig.init( - incomingSocketReceiveTimeout = none(Duration), - payloadSize = uint32(maxUtpPayloadSize)) - streamTransport = UtpDiscv5Protocol.new( - d, utpProtocolId, - registerIncomingSocketCallback(@[proto.stream]), - nil, - allowRegisteredIdCallback(@[proto.stream]), - socketConfig) - - proto.stream.setTransport(streamTransport) - return proto proc stopPortalProtocol(proto: PortalProtocol) {.async.} = @@ -347,8 +339,12 @@ procSuite "Portal Wire Protocol Tests": dbLimit = 100_000'u32 db = ContentDB.new("", dbLimit, inMemory = true) + m = StreamManager.new(node1) + q = newAsyncQueue[(ContentKeysList, seq[seq[byte]])](50) + stream = m.registerNewStream(q) + proto1 = PortalProtocol.new( - node1, protocolId, db, toContentId, dbGetHandler) + node1, protocolId, db, toContentId, dbGetHandler, stream) let item = genByteSeq(10_000) var distances: seq[UInt256] = @[] diff --git a/fluffy/tests/test_state_network.nim b/fluffy/tests/test_state_network.nim index 45ff678d5..529938a1e 100644 --- a/fluffy/tests/test_state_network.nim +++ b/fluffy/tests/test_state_network.nim @@ -11,7 +11,7 @@ import eth/[keys, trie/db, trie/hexary], eth/p2p/discoveryv5/protocol as discv5_protocol, eth/p2p/discoveryv5/routing_table, ../../nimbus/[genesis, chain_config, config, db/db_chain, db/state_db], - ../network/wire/portal_protocol, + ../network/wire/[portal_protocol, portal_stream], ../network/state/[state_content, state_network], ../content_db, ./test_helpers @@ -42,11 +42,13 @@ procSuite "State Content Network": node1 = initDiscoveryNode( rng, PrivateKey.random(rng[]), localAddress(20302)) + sm1 = StreamManager.new(node1) node2 = initDiscoveryNode( rng, PrivateKey.random(rng[]), localAddress(20303)) + sm2 = StreamManager.new(node2) - proto1 = StateNetwork.new(node1, ContentDB.new("", uint32.high, inMemory = true)) - proto2 = StateNetwork.new(node2, ContentDB.new("", uint32.high, inMemory = true)) + proto1 = StateNetwork.new(node1, ContentDB.new("", uint32.high, inMemory = true), sm1) + proto2 = StateNetwork.new(node2, ContentDB.new("", uint32.high, inMemory = true), sm2) check proto2.portalProtocol.addNode(node1.localNode) == Added @@ -96,14 +98,17 @@ procSuite "State Content Network": trie = genesisToTrie("fluffy" / "tests" / "custom_genesis" / "chainid7.json") node1 = initDiscoveryNode( rng, PrivateKey.random(rng[]), localAddress(20302)) + sm1 = StreamManager.new(node1) node2 = initDiscoveryNode( rng, PrivateKey.random(rng[]), localAddress(20303)) + sm2 = StreamManager.new(node2) node3 = initDiscoveryNode( rng, PrivateKey.random(rng[]), localAddress(20304)) + sm3 = StreamManager.new(node3) - proto1 = StateNetwork.new(node1, ContentDB.new("", uint32.high, inMemory = true)) - proto2 = StateNetwork.new(node2, ContentDB.new("", uint32.high, inMemory = true)) - proto3 = StateNetwork.new(node3, ContentDB.new("", uint32.high, inMemory = true)) + proto1 = StateNetwork.new(node1, ContentDB.new("", uint32.high, inMemory = true), sm1) + proto2 = StateNetwork.new(node2, ContentDB.new("", uint32.high, inMemory = true), sm2) + proto3 = StateNetwork.new(node3, ContentDB.new("", uint32.high, inMemory = true), sm3) # Node1 knows about Node2, and Node2 knows about Node3 which hold all content check proto1.portalProtocol.addNode(node2.localNode) == Added @@ -156,12 +161,13 @@ procSuite "State Content Network": let node1 = initDiscoveryNode( rng, PrivateKey.random(rng[]), localAddress(20302)) + sm1 = StreamManager.new(node1) node2 = initDiscoveryNode( rng, PrivateKey.random(rng[]), localAddress(20303)) + sm2 = StreamManager.new(node2) - - proto1 = StateNetwork.new(node1, ContentDB.new("", uint32.high, inMemory = true)) - proto2 = StateNetwork.new(node2, ContentDB.new("", uint32.high, inMemory = true)) + proto1 = StateNetwork.new(node1, ContentDB.new("", uint32.high, inMemory = true), sm1) + proto2 = StateNetwork.new(node2, ContentDB.new("", uint32.high, inMemory = true), sm2) check (await node1.ping(node2.localNode)).isOk() check (await node2.ping(node1.localNode)).isOk() diff --git a/fluffy/tools/portalcli.nim b/fluffy/tools/portalcli.nim index 316cf7e3e..d5212fcdf 100644 --- a/fluffy/tools/portalcli.nim +++ b/fluffy/tools/portalcli.nim @@ -231,19 +231,12 @@ proc run(config: PortalCliConf) = let db = ContentDB.new("", config.storageSize, inMemory = true) + sm = StreamManager.new(d) + cq = newAsyncQueue[(ContentKeysList, seq[seq[byte]])](50) + stream = sm.registerNewStream(cq) portal = PortalProtocol.new(d, config.protocolId, db, - testContentIdHandler, dbGetHandler, + testContentIdHandler, dbGetHandler, stream, bootstrapRecords = bootstrapRecords) - socketConfig = SocketConfig.init( - incomingSocketReceiveTimeout = none(Duration)) - streamTransport = UtpDiscv5Protocol.new( - d, - utpProtocolId, - registerIncomingSocketCallback(@[portal.stream]), - nil, - allowRegisteredIdCallback(@[portal.stream]), socketConfig) - - setTransport(portal.stream, streamTransport) if config.metricsEnabled: let