From 81ebfd2b2a1cf96ebee4a63c9e187b0647c79683 Mon Sep 17 00:00:00 2001 From: Kim De Mey Date: Tue, 18 Jan 2022 09:01:22 +0100 Subject: [PATCH] Add Portal protocol config and add config options to fluffy cli (#928) --- fluffy/conf.nim | 32 +++++++++++++++---- fluffy/fluffy.nim | 8 +++-- fluffy/network/history/history_network.nim | 8 +++-- fluffy/network/state/state_network.nim | 8 +++-- fluffy/network/wire/portal_protocol.nim | 13 +++++--- .../network/wire/portal_protocol_config.nim | 26 +++++++++++++++ fluffy/network/wire/portal_stream.nim | 25 ++++++++++----- fluffy/scripts/launch_local_testnet.sh | 1 + fluffy/scripts/test_portal_testnet.nim | 7 +++- 9 files changed, 98 insertions(+), 30 deletions(-) create mode 100644 fluffy/network/wire/portal_protocol_config.nim diff --git a/fluffy/conf.nim b/fluffy/conf.nim index bb4bd30b8..4ced96dc6 100644 --- a/fluffy/conf.nim +++ b/fluffy/conf.nim @@ -54,7 +54,7 @@ type defaultValue: defaultListenAddress defaultValueDesc: $defaultListenAddressDesc desc: "Listening address for the Discovery v5 traffic" - name: "listen-address" }: ValidIpAddress + name: "listen-address" .}: ValidIpAddress # Note: This will add bootstrap nodes for both Discovery v5 network and each # enabled Portal network. No distinction is made on bootstrap nodes per @@ -66,7 +66,7 @@ type bootstrapNodesFile* {. desc: "Specifies a line-delimited file of ENR URIs to bootstrap Discovery v5 and Portal networks from" defaultValue: "" - name: "bootstrap-file" }: InputFile + name: "bootstrap-file" .}: InputFile nat* {. desc: "Specify method to use for determining public address. " & @@ -92,7 +92,7 @@ type desc: "The directory where fluffy will store the content data" defaultValue: defaultDataDir() defaultValueDesc: $defaultDataDirDesc - name: "data-dir" }: OutDir + name: "data-dir" .}: OutDir metricsEnabled* {. defaultValue: false @@ -113,18 +113,18 @@ type rpcEnabled* {. desc: "Enable the JSON-RPC server" defaultValue: false - name: "rpc" }: bool + name: "rpc" .}: bool rpcPort* {. desc: "HTTP port for the JSON-RPC server" defaultValue: 8545 - name: "rpc-port" }: Port + name: "rpc-port" .}: Port rpcAddress* {. desc: "Listening address of the RPC server" defaultValue: defaultAdminListenAddress defaultValueDesc: $defaultAdminListenAddressDesc - name: "rpc-address" }: ValidIpAddress + name: "rpc-address" .}: ValidIpAddress bridgeUri* {. defaultValue: none(string) @@ -140,6 +140,24 @@ type desc: "URI of eth client where to proxy unimplemented rpc methods to" name: "proxy-uri" .}: ClientConfig + tableIpLimit* {. + hidden + desc: "Maximum amount of nodes with the same IP in the routing tables" + defaultValue: DefaultTableIpLimit + name: "table-ip-limit" .}: uint + + bucketIpLimit* {. + hidden + desc: "Maximum amount of nodes with the same IP in the routing tables buckets" + defaultValue: DefaultBucketIpLimit + name: "bucket-ip-limit" .}: uint + + bitsPerHop* {. + hidden + desc: "Kademlia's b variable, increase for less hops per lookup" + defaultValue: DefaultBitsPerHop + name: "bits-per-hop" .}: int + case cmd* {. command defaultValue: noCommand .}: PortalCmd @@ -182,7 +200,7 @@ proc parseCmdArg*(T: type PrivateKey, p: TaintedString): T proc completeCmdArg*(T: type PrivateKey, val: TaintedString): seq[string] = return @[] -proc parseCmdArg*(T: type ClientConfig, p: TaintedString): T +proc parseCmdArg*(T: type ClientConfig, p: TaintedString): T {.raises: [Defect, ConfigurationError].} = let uri = parseUri(p) if (uri.scheme == "http" or uri.scheme == "https"): diff --git a/fluffy/fluffy.nim b/fluffy/fluffy.nim index d11b4388c..4aac4707e 100644 --- a/fluffy/fluffy.nim +++ b/fluffy/fluffy.nim @@ -19,7 +19,7 @@ import rpc_portal_debug_api], ./network/state/[state_network, state_content], ./network/history/[history_network, history_content], - ./network/wire/portal_stream, + ./network/wire/[portal_stream, portal_protocol_config], ./content_db proc initializeBridgeClient(maybeUri: Option[string]): Option[BridgeClient] = @@ -73,11 +73,13 @@ proc run(config: PortalConf) {.raises: [CatchableError, Defect].} = # One instance of PortalStream and thus UtpDiscv5Protocol is shared over all # the Portal networks. + portalConfig = PortalProtocolConfig.init( + config.tableIpLimit, config.bucketIpLimit, config.bitsPerHop) portalStream = PortalStream.new(d) stateNetwork = StateNetwork.new(d, db, portalStream, - bootstrapRecords = bootstrapRecords) + bootstrapRecords = bootstrapRecords, portalConfig = portalConfig) historyNetwork = HistoryNetwork.new(d, db, portalStream, - bootstrapRecords = bootstrapRecords) + bootstrapRecords = bootstrapRecords, portalConfig = portalConfig) # TODO: If no new network key is generated then we should first check if an # enr file exists, and in the case it does read out the seqNum from it and diff --git a/fluffy/network/history/history_network.nim b/fluffy/network/history/history_network.nim index 15b9cbb82..0774b3bda 100644 --- a/fluffy/network/history/history_network.nim +++ b/fluffy/network/history/history_network.nim @@ -10,7 +10,7 @@ import stew/results, chronos, eth/p2p/discoveryv5/[protocol, enr], ../../content_db, - ../wire/[portal_protocol, portal_stream], + ../wire/[portal_protocol, portal_stream, portal_protocol_config], ./history_content const @@ -53,10 +53,12 @@ proc new*( contentDB: ContentDB, portalStream: PortalStream, dataRadius = UInt256.high(), - bootstrapRecords: openArray[Record] = []): T = + bootstrapRecords: openArray[Record] = [], + portalConfig: PortalProtocolConfig = defaultPortalProtocolConfig): T = let portalProtocol = PortalProtocol.new( baseProtocol, historyProtocolId, contentDB, toContentIdHandler, - portalStream, dataRadius, bootstrapRecords) + portalStream, dataRadius, bootstrapRecords, + config = portalConfig) return HistoryNetwork(portalProtocol: portalProtocol, contentDB: contentDB) diff --git a/fluffy/network/state/state_network.nim b/fluffy/network/state/state_network.nim index 966989c39..620289984 100644 --- a/fluffy/network/state/state_network.nim +++ b/fluffy/network/state/state_network.nim @@ -10,7 +10,7 @@ import stew/results, chronos, eth/p2p/discoveryv5/[protocol, enr], ../../content_db, - ../wire/[portal_protocol, portal_stream], + ../wire/[portal_protocol, portal_stream, portal_protocol_config], ./state_content, ./state_distance @@ -53,10 +53,12 @@ proc new*( contentDB: ContentDB, portalStream: PortalStream, dataRadius = UInt256.high(), - bootstrapRecords: openArray[Record] = []): T = + bootstrapRecords: openArray[Record] = [], + portalConfig: PortalProtocolConfig = defaultPortalProtocolConfig): T = let portalProtocol = PortalProtocol.new( baseProtocol, stateProtocolId, contentDB, toContentIdHandler, - portalStream, dataRadius, bootstrapRecords, stateDistanceCalculator) + portalStream, dataRadius, bootstrapRecords, stateDistanceCalculator, + config = portalConfig) return StateNetwork(portalProtocol: portalProtocol, contentDB: contentDB) diff --git a/fluffy/network/wire/portal_protocol.nim b/fluffy/network/wire/portal_protocol.nim index 3aa86569e..049e297fe 100644 --- a/fluffy/network/wire/portal_protocol.nim +++ b/fluffy/network/wire/portal_protocol.nim @@ -17,7 +17,7 @@ import eth/rlp, eth/p2p/discoveryv5/[protocol, node, enr, routing_table, random2, nodes_verification], ../../content_db, - ./portal_stream, + "."/[portal_stream, portal_protocol_config], ./messages export messages, routing_table @@ -233,13 +233,16 @@ proc new*(T: type PortalProtocol, stream: PortalStream, dataRadius = UInt256.high(), bootstrapRecords: openArray[Record] = [], - distanceCalculator: DistanceCalculator = XorDistanceCalculator + distanceCalculator: DistanceCalculator = XorDistanceCalculator, + config: PortalProtocolConfig = defaultPortalProtocolConfig ): T = + let proto = PortalProtocol( protocolHandler: messageHandler, protocolId: protocolId, - routingTable: RoutingTable.init(baseProtocol.localNode, DefaultBitsPerHop, - DefaultTableIpLimits, baseProtocol.rng, distanceCalculator), + routingTable: RoutingTable.init( + baseProtocol.localNode, config.bitsPerHop, config.tableIpLimits, + baseProtocol.rng, distanceCalculator), baseProtocol: baseProtocol, contentDB: contentDB, toContentId: toContentId, @@ -354,7 +357,7 @@ proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList): # validation is required, using a length prefix here might be beneficial for # this. let readData = socket.read() - if await readData.withTimeout(1.seconds): + if await readData.withTimeout(p.stream.readTimeout): let content = readData.read await socket.closeWait() return ok(FoundContent(kind: Content, content: ByteList(content))) diff --git a/fluffy/network/wire/portal_protocol_config.nim b/fluffy/network/wire/portal_protocol_config.nim new file mode 100644 index 000000000..8edc4ae3c --- /dev/null +++ b/fluffy/network/wire/portal_protocol_config.nim @@ -0,0 +1,26 @@ +import + eth/p2p/discoveryv5/routing_table + +type + PortalProtocolConfig* = object + tableIpLimits*: TableIpLimits + bitsPerHop*: int + +const + defaultPortalProtocolConfig* = PortalProtocolConfig( + tableIpLimits: DefaultTableIpLimits, + bitsPerHop: DefaultBitsPerHop) + +proc init*( + T: type PortalProtocolConfig, + tableIpLimit: uint, + bucketIpLimit: uint, + bitsPerHop: int): T = + + PortalProtocolConfig( + tableIpLimits: TableIpLimits( + tableIpLimit: tableIpLimit, + bucketIpLimit: bucketIpLimit), + bitsPerHop: bitsPerHop + ) + diff --git a/fluffy/network/wire/portal_stream.nim b/fluffy/network/wire/portal_stream.nim index 478bef0ed..15da0dc87 100644 --- a/fluffy/network/wire/portal_stream.nim +++ b/fluffy/network/wire/portal_stream.nim @@ -18,7 +18,8 @@ export utp_discv5_protocol const utpProtocolId = "utp".toBytes() - connectionTimeout = 5.seconds + defaultConnectionTimeout = 5.seconds + defaultReadTimeout = 2.seconds type ContentRequest = object @@ -50,6 +51,8 @@ type # received data? contentRequests: seq[ContentRequest] contentOffers: seq[ContentOffer] + connectionTimeout: Duration + readTimeout*: Duration rng: ref BrHmacDrbgContext proc addContentOffer*( @@ -63,7 +66,7 @@ proc addContentOffer*( connectionId: id, nodeId: nodeId, contentKeys: contentKeys, - timeout: Moment.now() + connectionTimeout) + timeout: Moment.now() + stream.connectionTimeout) stream.contentOffers.add(contentOffer) return connectionId @@ -79,7 +82,7 @@ proc addContentRequest*( connectionId: id, nodeId: nodeId, content: content, - timeout: Moment.now() + connectionTimeout) + timeout: Moment.now() + stream.connectionTimeout) stream.contentRequests.add(contentRequest) return connectionId @@ -91,14 +94,14 @@ proc writeAndClose(socket: UtpSocket[Node], data: seq[byte]) {.async.} = await socket.closeWait() -proc readAndClose(socket: UtpSocket[Node]) {.async.} = +proc readAndClose(socket: UtpSocket[Node], stream: PortalStream) {.async.} = # Read all bytes from the socket # This will either end with a FIN, or because the read action times out. # A FIN does not necessarily mean that the data read is complete. Further # validation is required, using a length prefix here might be beneficial for # this. var readData = socket.read() - if await readData.withTimeout(1.seconds): + if await readData.withTimeout(stream.readTimeout): # TODO: Content needs to be validated, stored and also offered again as part # of the neighborhood gossip. This will require access to the specific # Portal wire protocol for the network it was received on. Some async event @@ -137,7 +140,7 @@ proc registerIncomingSocketCallback( for i, offer in stream.contentOffers: if offer.connectionId == client.connectionId and offer.nodeId == client.remoteAddress.id: - let fut = client.readAndClose() + let fut = client.readAndClose(stream) stream.contentOffers.del(i) return fut @@ -164,9 +167,15 @@ proc allowRegisteredIdCallback( x.connectionId == connectionId and x.nodeId == remoteAddress.id) ) -proc new*(T: type PortalStream, baseProtocol: protocol.Protocol): T = +proc new*( + T: type PortalStream, baseProtocol: protocol.Protocol, + connectionTimeout = defaultConnectionTimeout, + readTimeout = defaultReadTimeout): T = let - stream = PortalStream(rng: baseProtocol.rng) + stream = PortalStream( + connectionTimeout: connectionTimeout, + readTimeout: readTimeout, + rng: baseProtocol.rng) socketConfig = SocketConfig.init( incomingSocketReceiveTimeout = none(Duration)) diff --git a/fluffy/scripts/launch_local_testnet.sh b/fluffy/scripts/launch_local_testnet.sh index 62b6f297f..805b19544 100755 --- a/fluffy/scripts/launch_local_testnet.sh +++ b/fluffy/scripts/launch_local_testnet.sh @@ -284,6 +284,7 @@ for NUM_NODE in $(seq 0 $(( NUM_NODES - 1 ))); do --metrics \ --metrics-address="127.0.0.1" \ --metrics-port="$(( BASE_METRICS_PORT + NUM_NODE ))" \ + --bits-per-hop=5 \ ${EXTRA_ARGS} \ > "${DATA_DIR}/log${NUM_NODE}.txt" 2>&1 & diff --git a/fluffy/scripts/test_portal_testnet.nim b/fluffy/scripts/test_portal_testnet.nim index 916b562db..85279d280 100644 --- a/fluffy/scripts/test_portal_testnet.nim +++ b/fluffy/scripts/test_portal_testnet.nim @@ -53,7 +53,12 @@ procSuite "Portal testnet tests": let routingTableInfo = await client.discv5_routingTableInfo() var start: seq[NodeId] let nodes = foldl(routingTableInfo.buckets, a & b, start) - if i == 0: # bootstrap node has all nodes (however not all verified) + if i == 0: + # bootstrap node has all nodes (however not all verified), however this + # is highly dependent on the bits per hop and the amount of nodes + # launched and can thus easily fail. + # TODO: Set up the network with multiple bootstrap nodes to have a more + # robust set-up. check nodes.len == config.nodeCount - 1 else: # Other nodes will have bootstrap node at this point, and maybe more check nodes.len > 0