Add Portal protocol config and add config options to fluffy cli (#928)

This commit is contained in:
Kim De Mey 2022-01-18 09:01:22 +01:00 committed by GitHub
parent f19a64a2fe
commit 81ebfd2b2a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 98 additions and 30 deletions

View File

@ -54,7 +54,7 @@ type
defaultValue: defaultListenAddress defaultValue: defaultListenAddress
defaultValueDesc: $defaultListenAddressDesc defaultValueDesc: $defaultListenAddressDesc
desc: "Listening address for the Discovery v5 traffic" desc: "Listening address for the Discovery v5 traffic"
name: "listen-address" }: ValidIpAddress name: "listen-address" .}: ValidIpAddress
# Note: This will add bootstrap nodes for both Discovery v5 network and each # Note: This will add bootstrap nodes for both Discovery v5 network and each
# enabled Portal network. No distinction is made on bootstrap nodes per # enabled Portal network. No distinction is made on bootstrap nodes per
@ -66,7 +66,7 @@ type
bootstrapNodesFile* {. bootstrapNodesFile* {.
desc: "Specifies a line-delimited file of ENR URIs to bootstrap Discovery v5 and Portal networks from" desc: "Specifies a line-delimited file of ENR URIs to bootstrap Discovery v5 and Portal networks from"
defaultValue: "" defaultValue: ""
name: "bootstrap-file" }: InputFile name: "bootstrap-file" .}: InputFile
nat* {. nat* {.
desc: "Specify method to use for determining public address. " & desc: "Specify method to use for determining public address. " &
@ -92,7 +92,7 @@ type
desc: "The directory where fluffy will store the content data" desc: "The directory where fluffy will store the content data"
defaultValue: defaultDataDir() defaultValue: defaultDataDir()
defaultValueDesc: $defaultDataDirDesc defaultValueDesc: $defaultDataDirDesc
name: "data-dir" }: OutDir name: "data-dir" .}: OutDir
metricsEnabled* {. metricsEnabled* {.
defaultValue: false defaultValue: false
@ -113,18 +113,18 @@ type
rpcEnabled* {. rpcEnabled* {.
desc: "Enable the JSON-RPC server" desc: "Enable the JSON-RPC server"
defaultValue: false defaultValue: false
name: "rpc" }: bool name: "rpc" .}: bool
rpcPort* {. rpcPort* {.
desc: "HTTP port for the JSON-RPC server" desc: "HTTP port for the JSON-RPC server"
defaultValue: 8545 defaultValue: 8545
name: "rpc-port" }: Port name: "rpc-port" .}: Port
rpcAddress* {. rpcAddress* {.
desc: "Listening address of the RPC server" desc: "Listening address of the RPC server"
defaultValue: defaultAdminListenAddress defaultValue: defaultAdminListenAddress
defaultValueDesc: $defaultAdminListenAddressDesc defaultValueDesc: $defaultAdminListenAddressDesc
name: "rpc-address" }: ValidIpAddress name: "rpc-address" .}: ValidIpAddress
bridgeUri* {. bridgeUri* {.
defaultValue: none(string) defaultValue: none(string)
@ -140,6 +140,24 @@ type
desc: "URI of eth client where to proxy unimplemented rpc methods to" desc: "URI of eth client where to proxy unimplemented rpc methods to"
name: "proxy-uri" .}: ClientConfig name: "proxy-uri" .}: ClientConfig
tableIpLimit* {.
hidden
desc: "Maximum amount of nodes with the same IP in the routing tables"
defaultValue: DefaultTableIpLimit
name: "table-ip-limit" .}: uint
bucketIpLimit* {.
hidden
desc: "Maximum amount of nodes with the same IP in the routing tables buckets"
defaultValue: DefaultBucketIpLimit
name: "bucket-ip-limit" .}: uint
bitsPerHop* {.
hidden
desc: "Kademlia's b variable, increase for less hops per lookup"
defaultValue: DefaultBitsPerHop
name: "bits-per-hop" .}: int
case cmd* {. case cmd* {.
command command
defaultValue: noCommand .}: PortalCmd defaultValue: noCommand .}: PortalCmd

View File

@ -19,7 +19,7 @@ import
rpc_portal_debug_api], rpc_portal_debug_api],
./network/state/[state_network, state_content], ./network/state/[state_network, state_content],
./network/history/[history_network, history_content], ./network/history/[history_network, history_content],
./network/wire/portal_stream, ./network/wire/[portal_stream, portal_protocol_config],
./content_db ./content_db
proc initializeBridgeClient(maybeUri: Option[string]): Option[BridgeClient] = proc initializeBridgeClient(maybeUri: Option[string]): Option[BridgeClient] =
@ -73,11 +73,13 @@ proc run(config: PortalConf) {.raises: [CatchableError, Defect].} =
# One instance of PortalStream and thus UtpDiscv5Protocol is shared over all # One instance of PortalStream and thus UtpDiscv5Protocol is shared over all
# the Portal networks. # the Portal networks.
portalConfig = PortalProtocolConfig.init(
config.tableIpLimit, config.bucketIpLimit, config.bitsPerHop)
portalStream = PortalStream.new(d) portalStream = PortalStream.new(d)
stateNetwork = StateNetwork.new(d, db, portalStream, stateNetwork = StateNetwork.new(d, db, portalStream,
bootstrapRecords = bootstrapRecords) bootstrapRecords = bootstrapRecords, portalConfig = portalConfig)
historyNetwork = HistoryNetwork.new(d, db, portalStream, historyNetwork = HistoryNetwork.new(d, db, portalStream,
bootstrapRecords = bootstrapRecords) bootstrapRecords = bootstrapRecords, portalConfig = portalConfig)
# TODO: If no new network key is generated then we should first check if an # TODO: If no new network key is generated then we should first check if an
# enr file exists, and in the case it does read out the seqNum from it and # enr file exists, and in the case it does read out the seqNum from it and

View File

@ -10,7 +10,7 @@ import
stew/results, chronos, stew/results, chronos,
eth/p2p/discoveryv5/[protocol, enr], eth/p2p/discoveryv5/[protocol, enr],
../../content_db, ../../content_db,
../wire/[portal_protocol, portal_stream], ../wire/[portal_protocol, portal_stream, portal_protocol_config],
./history_content ./history_content
const const
@ -53,10 +53,12 @@ proc new*(
contentDB: ContentDB, contentDB: ContentDB,
portalStream: PortalStream, portalStream: PortalStream,
dataRadius = UInt256.high(), dataRadius = UInt256.high(),
bootstrapRecords: openArray[Record] = []): T = bootstrapRecords: openArray[Record] = [],
portalConfig: PortalProtocolConfig = defaultPortalProtocolConfig): T =
let portalProtocol = PortalProtocol.new( let portalProtocol = PortalProtocol.new(
baseProtocol, historyProtocolId, contentDB, toContentIdHandler, baseProtocol, historyProtocolId, contentDB, toContentIdHandler,
portalStream, dataRadius, bootstrapRecords) portalStream, dataRadius, bootstrapRecords,
config = portalConfig)
return HistoryNetwork(portalProtocol: portalProtocol, contentDB: contentDB) return HistoryNetwork(portalProtocol: portalProtocol, contentDB: contentDB)

View File

@ -10,7 +10,7 @@ import
stew/results, chronos, stew/results, chronos,
eth/p2p/discoveryv5/[protocol, enr], eth/p2p/discoveryv5/[protocol, enr],
../../content_db, ../../content_db,
../wire/[portal_protocol, portal_stream], ../wire/[portal_protocol, portal_stream, portal_protocol_config],
./state_content, ./state_content,
./state_distance ./state_distance
@ -53,10 +53,12 @@ proc new*(
contentDB: ContentDB, contentDB: ContentDB,
portalStream: PortalStream, portalStream: PortalStream,
dataRadius = UInt256.high(), dataRadius = UInt256.high(),
bootstrapRecords: openArray[Record] = []): T = bootstrapRecords: openArray[Record] = [],
portalConfig: PortalProtocolConfig = defaultPortalProtocolConfig): T =
let portalProtocol = PortalProtocol.new( let portalProtocol = PortalProtocol.new(
baseProtocol, stateProtocolId, contentDB, toContentIdHandler, baseProtocol, stateProtocolId, contentDB, toContentIdHandler,
portalStream, dataRadius, bootstrapRecords, stateDistanceCalculator) portalStream, dataRadius, bootstrapRecords, stateDistanceCalculator,
config = portalConfig)
return StateNetwork(portalProtocol: portalProtocol, contentDB: contentDB) return StateNetwork(portalProtocol: portalProtocol, contentDB: contentDB)

View File

@ -17,7 +17,7 @@ import
eth/rlp, eth/p2p/discoveryv5/[protocol, node, enr, routing_table, random2, eth/rlp, eth/p2p/discoveryv5/[protocol, node, enr, routing_table, random2,
nodes_verification], nodes_verification],
../../content_db, ../../content_db,
./portal_stream, "."/[portal_stream, portal_protocol_config],
./messages ./messages
export messages, routing_table export messages, routing_table
@ -233,13 +233,16 @@ proc new*(T: type PortalProtocol,
stream: PortalStream, stream: PortalStream,
dataRadius = UInt256.high(), dataRadius = UInt256.high(),
bootstrapRecords: openArray[Record] = [], bootstrapRecords: openArray[Record] = [],
distanceCalculator: DistanceCalculator = XorDistanceCalculator distanceCalculator: DistanceCalculator = XorDistanceCalculator,
config: PortalProtocolConfig = defaultPortalProtocolConfig
): T = ): T =
let proto = PortalProtocol( let proto = PortalProtocol(
protocolHandler: messageHandler, protocolHandler: messageHandler,
protocolId: protocolId, protocolId: protocolId,
routingTable: RoutingTable.init(baseProtocol.localNode, DefaultBitsPerHop, routingTable: RoutingTable.init(
DefaultTableIpLimits, baseProtocol.rng, distanceCalculator), baseProtocol.localNode, config.bitsPerHop, config.tableIpLimits,
baseProtocol.rng, distanceCalculator),
baseProtocol: baseProtocol, baseProtocol: baseProtocol,
contentDB: contentDB, contentDB: contentDB,
toContentId: toContentId, toContentId: toContentId,
@ -354,7 +357,7 @@ proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList):
# validation is required, using a length prefix here might be beneficial for # validation is required, using a length prefix here might be beneficial for
# this. # this.
let readData = socket.read() let readData = socket.read()
if await readData.withTimeout(1.seconds): if await readData.withTimeout(p.stream.readTimeout):
let content = readData.read let content = readData.read
await socket.closeWait() await socket.closeWait()
return ok(FoundContent(kind: Content, content: ByteList(content))) return ok(FoundContent(kind: Content, content: ByteList(content)))

View File

@ -0,0 +1,26 @@
import
eth/p2p/discoveryv5/routing_table
type
PortalProtocolConfig* = object
tableIpLimits*: TableIpLimits
bitsPerHop*: int
const
defaultPortalProtocolConfig* = PortalProtocolConfig(
tableIpLimits: DefaultTableIpLimits,
bitsPerHop: DefaultBitsPerHop)
proc init*(
T: type PortalProtocolConfig,
tableIpLimit: uint,
bucketIpLimit: uint,
bitsPerHop: int): T =
PortalProtocolConfig(
tableIpLimits: TableIpLimits(
tableIpLimit: tableIpLimit,
bucketIpLimit: bucketIpLimit),
bitsPerHop: bitsPerHop
)

View File

@ -18,7 +18,8 @@ export utp_discv5_protocol
const const
utpProtocolId = "utp".toBytes() utpProtocolId = "utp".toBytes()
connectionTimeout = 5.seconds defaultConnectionTimeout = 5.seconds
defaultReadTimeout = 2.seconds
type type
ContentRequest = object ContentRequest = object
@ -50,6 +51,8 @@ type
# received data? # received data?
contentRequests: seq[ContentRequest] contentRequests: seq[ContentRequest]
contentOffers: seq[ContentOffer] contentOffers: seq[ContentOffer]
connectionTimeout: Duration
readTimeout*: Duration
rng: ref BrHmacDrbgContext rng: ref BrHmacDrbgContext
proc addContentOffer*( proc addContentOffer*(
@ -63,7 +66,7 @@ proc addContentOffer*(
connectionId: id, connectionId: id,
nodeId: nodeId, nodeId: nodeId,
contentKeys: contentKeys, contentKeys: contentKeys,
timeout: Moment.now() + connectionTimeout) timeout: Moment.now() + stream.connectionTimeout)
stream.contentOffers.add(contentOffer) stream.contentOffers.add(contentOffer)
return connectionId return connectionId
@ -79,7 +82,7 @@ proc addContentRequest*(
connectionId: id, connectionId: id,
nodeId: nodeId, nodeId: nodeId,
content: content, content: content,
timeout: Moment.now() + connectionTimeout) timeout: Moment.now() + stream.connectionTimeout)
stream.contentRequests.add(contentRequest) stream.contentRequests.add(contentRequest)
return connectionId return connectionId
@ -91,14 +94,14 @@ proc writeAndClose(socket: UtpSocket[Node], data: seq[byte]) {.async.} =
await socket.closeWait() await socket.closeWait()
proc readAndClose(socket: UtpSocket[Node]) {.async.} = proc readAndClose(socket: UtpSocket[Node], stream: PortalStream) {.async.} =
# Read all bytes from the socket # Read all bytes from the socket
# This will either end with a FIN, or because the read action times out. # This will either end with a FIN, or because the read action times out.
# A FIN does not necessarily mean that the data read is complete. Further # A FIN does not necessarily mean that the data read is complete. Further
# validation is required, using a length prefix here might be beneficial for # validation is required, using a length prefix here might be beneficial for
# this. # this.
var readData = socket.read() var readData = socket.read()
if await readData.withTimeout(1.seconds): if await readData.withTimeout(stream.readTimeout):
# TODO: Content needs to be validated, stored and also offered again as part # TODO: Content needs to be validated, stored and also offered again as part
# of the neighborhood gossip. This will require access to the specific # of the neighborhood gossip. This will require access to the specific
# Portal wire protocol for the network it was received on. Some async event # Portal wire protocol for the network it was received on. Some async event
@ -137,7 +140,7 @@ proc registerIncomingSocketCallback(
for i, offer in stream.contentOffers: for i, offer in stream.contentOffers:
if offer.connectionId == client.connectionId and if offer.connectionId == client.connectionId and
offer.nodeId == client.remoteAddress.id: offer.nodeId == client.remoteAddress.id:
let fut = client.readAndClose() let fut = client.readAndClose(stream)
stream.contentOffers.del(i) stream.contentOffers.del(i)
return fut return fut
@ -164,9 +167,15 @@ proc allowRegisteredIdCallback(
x.connectionId == connectionId and x.nodeId == remoteAddress.id) x.connectionId == connectionId and x.nodeId == remoteAddress.id)
) )
proc new*(T: type PortalStream, baseProtocol: protocol.Protocol): T = proc new*(
T: type PortalStream, baseProtocol: protocol.Protocol,
connectionTimeout = defaultConnectionTimeout,
readTimeout = defaultReadTimeout): T =
let let
stream = PortalStream(rng: baseProtocol.rng) stream = PortalStream(
connectionTimeout: connectionTimeout,
readTimeout: readTimeout,
rng: baseProtocol.rng)
socketConfig = SocketConfig.init( socketConfig = SocketConfig.init(
incomingSocketReceiveTimeout = none(Duration)) incomingSocketReceiveTimeout = none(Duration))

View File

@ -284,6 +284,7 @@ for NUM_NODE in $(seq 0 $(( NUM_NODES - 1 ))); do
--metrics \ --metrics \
--metrics-address="127.0.0.1" \ --metrics-address="127.0.0.1" \
--metrics-port="$(( BASE_METRICS_PORT + NUM_NODE ))" \ --metrics-port="$(( BASE_METRICS_PORT + NUM_NODE ))" \
--bits-per-hop=5 \
${EXTRA_ARGS} \ ${EXTRA_ARGS} \
> "${DATA_DIR}/log${NUM_NODE}.txt" 2>&1 & > "${DATA_DIR}/log${NUM_NODE}.txt" 2>&1 &

View File

@ -53,7 +53,12 @@ procSuite "Portal testnet tests":
let routingTableInfo = await client.discv5_routingTableInfo() let routingTableInfo = await client.discv5_routingTableInfo()
var start: seq[NodeId] var start: seq[NodeId]
let nodes = foldl(routingTableInfo.buckets, a & b, start) let nodes = foldl(routingTableInfo.buckets, a & b, start)
if i == 0: # bootstrap node has all nodes (however not all verified) if i == 0:
# bootstrap node has all nodes (however not all verified), however this
# is highly dependent on the bits per hop and the amount of nodes
# launched and can thus easily fail.
# TODO: Set up the network with multiple bootstrap nodes to have a more
# robust set-up.
check nodes.len == config.nodeCount - 1 check nodes.len == config.nodeCount - 1
else: # Other nodes will have bootstrap node at this point, and maybe more else: # Other nodes will have bootstrap node at this point, and maybe more
check nodes.len > 0 check nodes.len > 0