From 785a3b47b0fd19efe4c46fba2eb3a548348d7af5 Mon Sep 17 00:00:00 2001 From: Kim De Mey Date: Thu, 23 Sep 2021 14:26:41 +0200 Subject: [PATCH] Allow for passing Portal specific bootstrap nodes (#844) * Allow for passing Portal specific bootstrap nodes * Fix to also replaceNode when decodeMessage fails * Add portal bootstrap node tests and reorder test cases --- fluffy/conf.nim | 8 +- fluffy/fluffy.nim | 3 +- fluffy/network/portalcli.nim | 7 +- fluffy/network/state/state_network.nim | 8 +- fluffy/network/wire/messages.nim | 2 +- fluffy/network/wire/portal_protocol.nim | 64 ++++++-- fluffy/tests/test_portal_wire_protocol.nim | 166 +++++++++++++-------- 7 files changed, 174 insertions(+), 84 deletions(-) diff --git a/fluffy/conf.nim b/fluffy/conf.nim index 598ff4168..b329372f0 100644 --- a/fluffy/conf.nim +++ b/fluffy/conf.nim @@ -39,7 +39,7 @@ type name: "listen-address" }: ValidIpAddress bootnodes* {. - desc: "ENR URI of node to bootstrap discovery with. Argument may be repeated" + desc: "ENR URI of node to bootstrap Discovery v5 with. Argument may be repeated" name: "bootnode" .}: seq[Record] nat* {. @@ -60,6 +60,12 @@ type defaultValue: PrivateKey.random(keys.newRng()[]) name: "nodekey" .}: PrivateKey + # Note: This will add bootstrap nodes for each enabled Portal network. + # No distinction is being made on bootstrap nodes for a specific network. + portalBootnodes* {. + desc: "ENR URI of node to bootstrap the Portal protocols with. Argument may be repeated" + name: "portal-bootnode" .}: seq[Record] + metricsEnabled* {. defaultValue: false desc: "Enable the metrics server" diff --git a/fluffy/fluffy.nim b/fluffy/fluffy.nim index 0d8cd2a6e..e65803526 100644 --- a/fluffy/fluffy.nim +++ b/fluffy/fluffy.nim @@ -53,7 +53,8 @@ proc run(config: PortalConf) {.raises: [CatchableError, Defect].} = d.open() - let stateNetwork = StateNetwork.new(d, newEmptyInMemoryStorage()) + let stateNetwork = StateNetwork.new(d, newEmptyInMemoryStorage(), + bootstrapRecords = config.portalBootnodes) if config.metricsEnabled: let diff --git a/fluffy/network/portalcli.nim b/fluffy/network/portalcli.nim index 9db84cecf..272a17d0e 100644 --- a/fluffy/network/portalcli.nim +++ b/fluffy/network/portalcli.nim @@ -60,6 +60,10 @@ type defaultValue: PrivateKey.random(keys.newRng()[]) name: "nodekey" .}: PrivateKey + portalBootnodes* {. + desc: "ENR URI of node to bootstrap the Portal protocol with. Argument may be repeated" + name: "portal-bootnode" .}: seq[Record] + metricsEnabled* {. defaultValue: false desc: "Enable the metrics server" @@ -170,7 +174,8 @@ proc run(config: DiscoveryConf) = d.open() - let portal = PortalProtocol.new(d, "portal".toBytes(), testHandler) + let portal = PortalProtocol.new(d, "portal".toBytes(), testHandler, + bootstrapRecords = config.portalBootnodes) if config.metricsEnabled: let diff --git a/fluffy/network/state/state_network.nim b/fluffy/network/state/state_network.nim index 556736fb2..45eb15f39 100644 --- a/fluffy/network/state/state_network.nim +++ b/fluffy/network/state/state_network.nim @@ -1,7 +1,7 @@ import std/[options, sugar], stew/[results, byteutils], - eth/p2p/discoveryv5/[protocol, node], + eth/p2p/discoveryv5/[protocol, node, enr], ../wire/portal_protocol, ./state_content @@ -36,9 +36,11 @@ proc getContent*(p: StateNetwork, key: ContentKey): return result.map(x => x.asSeq()) proc new*(T: type StateNetwork, baseProtocol: protocol.Protocol, - storage: ContentStorage , dataRadius = UInt256.high()): T = + storage: ContentStorage , dataRadius = UInt256.high(), + bootstrapRecords: openarray[Record] = []): T = let portalProtocol = PortalProtocol.new( - baseProtocol, StateProtocolId, getHandler(storage), dataRadius) + baseProtocol, StateProtocolId, getHandler(storage), dataRadius, + bootstrapRecords) return StateNetwork(portalProtocol: portalProtocol, storage: storage) diff --git a/fluffy/network/wire/messages.nim b/fluffy/network/wire/messages.nim index 856bcf1f7..685b9221d 100644 --- a/fluffy/network/wire/messages.nim +++ b/fluffy/network/wire/messages.nim @@ -125,7 +125,7 @@ proc encodeMessage*[T: SomeMessage](m: T): seq[byte] = proc decodeMessage*(body: openarray[byte]): Result[Message, cstring] = # Decodes to the specific `Message` type. if body.len < 1: - return err("No message data") + return err("No message data, peer might not support this talk protocol") var kind: MessageKind if not checkedEnumAssign(kind, body[0]): diff --git a/fluffy/network/wire/portal_protocol.nim b/fluffy/network/wire/portal_protocol.nim index 48973ab50..28eb9db8b 100644 --- a/fluffy/network/wire/portal_protocol.nim +++ b/fluffy/network/wire/portal_protocol.nim @@ -55,6 +55,7 @@ type baseProtocol*: protocol.Protocol dataRadius*: UInt256 handleContentRequest: ContentHandler + bootstrapRecords*: seq[Record] lastLookup: chronos.Moment refreshLoop: Future[void] revalidateLoop: Future[void] @@ -74,6 +75,15 @@ type proc addNode*(p: PortalProtocol, node: Node): NodeStatus = p.routingTable.addNode(node) +proc addNode*(p: PortalProtocol, r: Record): bool = + let node = newNode(r) + if node.isOk(): + p.addNode(node[]) == Added + else: + false + +func localNode*(p: PortalProtocol): Node = p.baseProtocol.localNode + proc neighbours*(p: PortalProtocol, id: NodeId, seenOnly = false): seq[Node] = p.routingTable.neighbours(id = id, seenOnly = seenOnly) @@ -183,15 +193,17 @@ proc new*(T: type PortalProtocol, baseProtocol: protocol.Protocol, protocolId: seq[byte], contentHandler: ContentHandler, - dataRadius = UInt256.high()): T = + dataRadius = UInt256.high(), + bootstrapRecords: openarray[Record] = []): T = let proto = PortalProtocol( + protocolHandler: messageHandler, + protocolId: protocolId, routingTable: RoutingTable.init(baseProtocol.localNode, DefaultBitsPerHop, DefaultTableIpLimits, baseProtocol.rng), - protocolHandler: messageHandler, baseProtocol: baseProtocol, dataRadius: dataRadius, handleContentRequest: contentHandler, - protocolId: protocolId) + bootstrapRecords: @bootstrapRecords) proto.baseProtocol.registerTalkProtocol(proto.protocolId, proto).expect( "Only one protocol should have this id") @@ -200,28 +212,32 @@ proc new*(T: type PortalProtocol, # Sends the discv5 talkreq nessage with provided Portal message, awaits and # validates the proper response, and updates the Portal Network routing table. -# In discoveryv5 bootstrap nodes are not replaced in case of failure, but -# for now the Portal protocol has no notion of bootstrap nodes. proc reqResponse[Request: SomeMessage, Response: SomeMessage]( p: PortalProtocol, toNode: Node, request: Request ): Future[PortalResult[Response]] {.async.} = - let respResult = + let talkresp = await talkreq(p.baseProtocol, toNode, p.protocolId, encodeMessage(request)) - return respResult + # Note: Failure of `decodeMessage` might also simply mean that the peer is + # not supporting the specific talk protocol, as according to specification + # an empty response needs to be send in that case. + # See: https://github.com/ethereum/devp2p/blob/master/discv5/discv5-wire.md#talkreq-request-0x05 + let messageResponse = talkresp .flatMap(proc (x: seq[byte]): Result[Message, cstring] = decodeMessage(x)) .flatMap(proc (m: Message): Result[Response, cstring] = - let reqResult = getInnerMessageResult[Response]( + getInnerMessageResult[Response]( m, cstring"Invalid message response received") - if reqResult.isOk(): - p.routingTable.setJustSeen(toNode) - else: - p.routingTable.replaceNode(toNode) - reqResult ) + if messageResponse.isOk(): + p.routingTable.setJustSeen(toNode) + else: + p.routingTable.replaceNode(toNode) + + return messageResponse + proc ping*(p: PortalProtocol, dst: Node): Future[PortalResult[PongMessage]] {.async.} = let ping = PingMessage(enrSeq: p.baseProtocol.localNode.record.seqNum, @@ -495,9 +511,16 @@ proc queryRandom*(p: PortalProtocol): Future[seq[Node]] = ## Perform a query for a random target, return all nodes discovered. p.query(NodeId.random(p.baseProtocol.rng[])) -proc seedTable(p: PortalProtocol) = - # TODO: Just picking something here for now. Should definitely add portal - # protocol info k:v pair in the ENRs and filter on that. +proc seedTable*(p: PortalProtocol) = + ## Seed the table with nodes from the discv5 table and with specifically + ## provided bootstrap nodes. The latter are then supposed to be nodes + ## supporting the wire protocol for the specific content network. + # Note: We allow replacing the bootstrap nodes in the routing table as it is + # possible that some of these are not supporting the specific portal network. + + # TODO: Picking some nodes from discv5 routing table now. Should definitely + # add supported Portal network info in a k:v pair in the ENRs and filter on + # that. let closestNodes = p.baseProtocol.neighbours( NodeId.random(p.baseProtocol.rng[]), seenOnly = true) @@ -507,6 +530,15 @@ proc seedTable(p: PortalProtocol) = else: debug "Node from discv5 routing table could not be added", uri = toURI(node.record) + # Seed the table with bootstrap nodes. + for record in p.bootstrapRecords: + if p.addNode(record): + debug "Added bootstrap node", uri = toURI(record), + protocolId = p.protocolId + else: + error "Bootstrap node could not be added", uri = toURI(record), + protocolId = p.protocolId + proc populateTable(p: PortalProtocol) {.async.} = ## Do a set of initial lookups to quickly populate the table. # start with a self target query (neighbour nodes) diff --git a/fluffy/tests/test_portal_wire_protocol.nim b/fluffy/tests/test_portal_wire_protocol.nim index 8ea69e090..db95a05e6 100644 --- a/fluffy/tests/test_portal_wire_protocol.nim +++ b/fluffy/tests/test_portal_wire_protocol.nim @@ -47,7 +47,7 @@ proc stopTest(test: Default2NodeTest) {.async.} = procSuite "Portal Wire Protocol Tests": let rng = newRng() - asyncTest "Portal Ping/Pong": + asyncTest "Ping/Pong": let test = defaultTestCase(rng) let pong = await test.proto1.ping(test.proto2.baseProtocol.localNode) @@ -59,36 +59,7 @@ procSuite "Portal Wire Protocol Tests": await test.stopTest() - asyncTest "Portal correctly mark node as seen after request": - let test = defaultTestCase(rng) - - let initialNeighbours = test.proto1.neighbours(test.proto1.baseProtocol.localNode.id, seenOnly = false) - - check: - len(initialNeighbours) == 0 - - discard test.proto1.addNode(test.proto2.baseProtocol.localNode) - - let allNeighboursAfterAdd = test.proto1.neighbours(test.proto1.baseProtocol.localNode.id, seenOnly = false) - let seenNeighboursAfterAdd = test.proto1.neighbours(test.proto1.baseProtocol.localNode.id, seenOnly = true) - - check: - len(allNeighboursAfterAdd) == 1 - len(seenNeighboursAfterAdd) == 0 - - let pong = await test.proto1.ping(test.proto2.baseProtocol.localNode) - - let allNeighboursAfterPing = test.proto1.neighbours(test.proto1.baseProtocol.localNode.id, seenOnly = false) - let seenNeighboursAfterPing = test.proto1.neighbours(test.proto1.baseProtocol.localNode.id, seenOnly = true) - - check: - pong.isOk() - len(allNeighboursAfterPing) == 1 - len(seenNeighboursAfterPing) == 1 - - await test.stopTest() - - asyncTest "Portal FindNode/Nodes": + asyncTest "FindNode/Nodes": let test = defaultTestCase(rng) block: # Find itself @@ -132,7 +103,75 @@ procSuite "Portal Wire Protocol Tests": await test.stopTest() - asyncTest "Portal lookup nodes": + asyncTest "FindContent/FoundContent - send enrs": + let test = defaultTestCase(rng) + + # ping in one direction to add, ping in the other to update as seen. + check (await test.node1.ping(test.node2.localNode)).isOk() + check (await test.node2.ping(test.node1.localNode)).isOk() + + # Start the portal protocol to seed nodes from the discoveryv5 routing + # table. + test.proto2.start() + + let contentKey = List.init(@[1'u8], 2048) + + # content does not exist so this should provide us with the closest nodes + # to the content, which is the only node in the routing table. + let foundContent = await test.proto1.findContent(test.proto2.baseProtocol.localNode, + contentKey) + + check: + foundContent.isOk() + foundContent.get().enrs.len() == 1 + foundContent.get().payload.len() == 0 + + await test.stopTest() + + asyncTest "Offer/Accept": + let test = defaultTestCase(rng) + let contentKeys = ContentKeysList(List(@[ByteList(@[byte 0x01, 0x02, 0x03])])) + + let accept = await test.proto1.offer( + test.proto2.baseProtocol.localNode, contentKeys) + + check: + accept.isOk() + accept.get().connectionId.len == 2 + accept.get().contentKeys.len == contentKeys.len + + await test.stopTest() + + asyncTest "Correctly mark node as seen after request": + let test = defaultTestCase(rng) + + let initialNeighbours = test.proto1.neighbours(test.proto1.baseProtocol.localNode.id, seenOnly = false) + + check: + len(initialNeighbours) == 0 + + discard test.proto1.addNode(test.proto2.baseProtocol.localNode) + + let allNeighboursAfterAdd = test.proto1.neighbours(test.proto1.baseProtocol.localNode.id, seenOnly = false) + let seenNeighboursAfterAdd = test.proto1.neighbours(test.proto1.baseProtocol.localNode.id, seenOnly = true) + + check: + len(allNeighboursAfterAdd) == 1 + len(seenNeighboursAfterAdd) == 0 + + let pong = await test.proto1.ping(test.proto2.baseProtocol.localNode) + + let allNeighboursAfterPing = test.proto1.neighbours(test.proto1.baseProtocol.localNode.id, seenOnly = false) + let seenNeighboursAfterPing = test.proto1.neighbours(test.proto1.baseProtocol.localNode.id, seenOnly = true) + + check: + pong.isOk() + len(allNeighboursAfterPing) == 1 + len(seenNeighboursAfterPing) == 1 + + await test.stopTest() + + asyncTest "Lookup nodes": let node1 = initDiscoveryNode( rng, PrivateKey.random(rng[]), localAddress(20302)) @@ -161,42 +200,47 @@ procSuite "Portal Wire Protocol Tests": await node2.closeWait() await node3.closeWait() + asyncTest "Valid Bootstrap Node": + let + node1 = initDiscoveryNode( + rng, PrivateKey.random(rng[]), localAddress(20302)) + node2 = initDiscoveryNode( + rng, PrivateKey.random(rng[]), localAddress(20303)) - asyncTest "Portal FindContent/FoundContent - send enrs": - let test = defaultTestCase(rng) + proto1 = PortalProtocol.new(node1, protocolId, testHandler) + proto2 = PortalProtocol.new(node2, protocolId, testHandler, + bootstrapRecords = [node1.localNode.record]) - # ping in one direction to add, ping in the other to update as seen. - check (await test.node1.ping(test.node2.localNode)).isOk() - check (await test.node2.ping(test.node1.localNode)).isOk() + proto1.start() + proto2.start() - # Start the portal protocol to seed nodes from the discoveryv5 routing - # table. - test.proto2.start() + check proto2.neighbours(proto2.localNode.id).len == 1 - let contentKey = List.init(@[1'u8], 2048) + proto1.stop() + proto2.stop() + await node1.closeWait() + await node2.closeWait() - # content does not exist so this should provide us with the closest nodes - # to the content, which is the only node in the routing table. - let foundContent = await test.proto1.findContent(test.proto2.baseProtocol.localNode, - contentKey) + asyncTest "Invalid Bootstrap Node": + let + node1 = initDiscoveryNode( + rng, PrivateKey.random(rng[]), localAddress(20302)) + node2 = initDiscoveryNode( + rng, PrivateKey.random(rng[]), localAddress(20303)) - check: - foundContent.isOk() - foundContent.get().enrs.len() == 1 - foundContent.get().payload.len() == 0 + # No portal protocol for node1, hence an invalid bootstrap node + proto2 = PortalProtocol.new(node2, protocolId, testHandler, + bootstrapRecords = [node1.localNode.record]) - await test.stopTest() + # seedTable to add node1 to the routing table + proto2.seedTable() + check proto2.neighbours(proto2.localNode.id).len == 1 - asyncTest "Portal Offer/Accept": - let test = defaultTestCase(rng) - let contentKeys = ContentKeysList(List(@[ByteList(@[byte 0x01, 0x02, 0x03])])) + # This should fail and drop node1 from the routing table + await proto2.revalidateNode(node1.localNode) - let accept = await test.proto1.offer( - test.proto2.baseProtocol.localNode, contentKeys) + check proto2.neighbours(proto2.localNode.id).len == 0 - check: - accept.isOk() - accept.get().connectionId.len == 2 - accept.get().contentKeys.len == contentKeys.len - - await test.stopTest() + proto2.stop() + await node1.closeWait() + await node2.closeWait()