Allow for passing Portal specific bootstrap nodes (#844)
* Allow for passing Portal specific bootstrap nodes * Fix to also replaceNode when decodeMessage fails * Add portal bootstrap node tests and reorder test cases
This commit is contained in:
parent
6f6345a022
commit
785a3b47b0
|
@ -39,7 +39,7 @@ type
|
|||
name: "listen-address" }: ValidIpAddress
|
||||
|
||||
bootnodes* {.
|
||||
desc: "ENR URI of node to bootstrap discovery with. Argument may be repeated"
|
||||
desc: "ENR URI of node to bootstrap Discovery v5 with. Argument may be repeated"
|
||||
name: "bootnode" .}: seq[Record]
|
||||
|
||||
nat* {.
|
||||
|
@ -60,6 +60,12 @@ type
|
|||
defaultValue: PrivateKey.random(keys.newRng()[])
|
||||
name: "nodekey" .}: PrivateKey
|
||||
|
||||
# Note: This will add bootstrap nodes for each enabled Portal network.
|
||||
# No distinction is being made on bootstrap nodes for a specific network.
|
||||
portalBootnodes* {.
|
||||
desc: "ENR URI of node to bootstrap the Portal protocols with. Argument may be repeated"
|
||||
name: "portal-bootnode" .}: seq[Record]
|
||||
|
||||
metricsEnabled* {.
|
||||
defaultValue: false
|
||||
desc: "Enable the metrics server"
|
||||
|
|
|
@ -53,7 +53,8 @@ proc run(config: PortalConf) {.raises: [CatchableError, Defect].} =
|
|||
|
||||
d.open()
|
||||
|
||||
let stateNetwork = StateNetwork.new(d, newEmptyInMemoryStorage())
|
||||
let stateNetwork = StateNetwork.new(d, newEmptyInMemoryStorage(),
|
||||
bootstrapRecords = config.portalBootnodes)
|
||||
|
||||
if config.metricsEnabled:
|
||||
let
|
||||
|
|
|
@ -60,6 +60,10 @@ type
|
|||
defaultValue: PrivateKey.random(keys.newRng()[])
|
||||
name: "nodekey" .}: PrivateKey
|
||||
|
||||
portalBootnodes* {.
|
||||
desc: "ENR URI of node to bootstrap the Portal protocol with. Argument may be repeated"
|
||||
name: "portal-bootnode" .}: seq[Record]
|
||||
|
||||
metricsEnabled* {.
|
||||
defaultValue: false
|
||||
desc: "Enable the metrics server"
|
||||
|
@ -170,7 +174,8 @@ proc run(config: DiscoveryConf) =
|
|||
|
||||
d.open()
|
||||
|
||||
let portal = PortalProtocol.new(d, "portal".toBytes(), testHandler)
|
||||
let portal = PortalProtocol.new(d, "portal".toBytes(), testHandler,
|
||||
bootstrapRecords = config.portalBootnodes)
|
||||
|
||||
if config.metricsEnabled:
|
||||
let
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import
|
||||
std/[options, sugar],
|
||||
stew/[results, byteutils],
|
||||
eth/p2p/discoveryv5/[protocol, node],
|
||||
eth/p2p/discoveryv5/[protocol, node, enr],
|
||||
../wire/portal_protocol,
|
||||
./state_content
|
||||
|
||||
|
@ -36,9 +36,11 @@ proc getContent*(p: StateNetwork, key: ContentKey):
|
|||
return result.map(x => x.asSeq())
|
||||
|
||||
proc new*(T: type StateNetwork, baseProtocol: protocol.Protocol,
|
||||
storage: ContentStorage , dataRadius = UInt256.high()): T =
|
||||
storage: ContentStorage , dataRadius = UInt256.high(),
|
||||
bootstrapRecords: openarray[Record] = []): T =
|
||||
let portalProtocol = PortalProtocol.new(
|
||||
baseProtocol, StateProtocolId, getHandler(storage), dataRadius)
|
||||
baseProtocol, StateProtocolId, getHandler(storage), dataRadius,
|
||||
bootstrapRecords)
|
||||
|
||||
return StateNetwork(portalProtocol: portalProtocol, storage: storage)
|
||||
|
||||
|
|
|
@ -125,7 +125,7 @@ proc encodeMessage*[T: SomeMessage](m: T): seq[byte] =
|
|||
proc decodeMessage*(body: openarray[byte]): Result[Message, cstring] =
|
||||
# Decodes to the specific `Message` type.
|
||||
if body.len < 1:
|
||||
return err("No message data")
|
||||
return err("No message data, peer might not support this talk protocol")
|
||||
|
||||
var kind: MessageKind
|
||||
if not checkedEnumAssign(kind, body[0]):
|
||||
|
|
|
@ -55,6 +55,7 @@ type
|
|||
baseProtocol*: protocol.Protocol
|
||||
dataRadius*: UInt256
|
||||
handleContentRequest: ContentHandler
|
||||
bootstrapRecords*: seq[Record]
|
||||
lastLookup: chronos.Moment
|
||||
refreshLoop: Future[void]
|
||||
revalidateLoop: Future[void]
|
||||
|
@ -74,6 +75,15 @@ type
|
|||
proc addNode*(p: PortalProtocol, node: Node): NodeStatus =
|
||||
p.routingTable.addNode(node)
|
||||
|
||||
proc addNode*(p: PortalProtocol, r: Record): bool =
|
||||
let node = newNode(r)
|
||||
if node.isOk():
|
||||
p.addNode(node[]) == Added
|
||||
else:
|
||||
false
|
||||
|
||||
func localNode*(p: PortalProtocol): Node = p.baseProtocol.localNode
|
||||
|
||||
proc neighbours*(p: PortalProtocol, id: NodeId, seenOnly = false): seq[Node] =
|
||||
p.routingTable.neighbours(id = id, seenOnly = seenOnly)
|
||||
|
||||
|
@ -183,15 +193,17 @@ proc new*(T: type PortalProtocol,
|
|||
baseProtocol: protocol.Protocol,
|
||||
protocolId: seq[byte],
|
||||
contentHandler: ContentHandler,
|
||||
dataRadius = UInt256.high()): T =
|
||||
dataRadius = UInt256.high(),
|
||||
bootstrapRecords: openarray[Record] = []): T =
|
||||
let proto = PortalProtocol(
|
||||
protocolHandler: messageHandler,
|
||||
protocolId: protocolId,
|
||||
routingTable: RoutingTable.init(baseProtocol.localNode, DefaultBitsPerHop,
|
||||
DefaultTableIpLimits, baseProtocol.rng),
|
||||
protocolHandler: messageHandler,
|
||||
baseProtocol: baseProtocol,
|
||||
dataRadius: dataRadius,
|
||||
handleContentRequest: contentHandler,
|
||||
protocolId: protocolId)
|
||||
bootstrapRecords: @bootstrapRecords)
|
||||
|
||||
proto.baseProtocol.registerTalkProtocol(proto.protocolId, proto).expect(
|
||||
"Only one protocol should have this id")
|
||||
|
@ -200,28 +212,32 @@ proc new*(T: type PortalProtocol,
|
|||
|
||||
# Sends the discv5 talkreq nessage with provided Portal message, awaits and
|
||||
# validates the proper response, and updates the Portal Network routing table.
|
||||
# In discoveryv5 bootstrap nodes are not replaced in case of failure, but
|
||||
# for now the Portal protocol has no notion of bootstrap nodes.
|
||||
proc reqResponse[Request: SomeMessage, Response: SomeMessage](
|
||||
p: PortalProtocol,
|
||||
toNode: Node,
|
||||
request: Request
|
||||
): Future[PortalResult[Response]] {.async.} =
|
||||
let respResult =
|
||||
let talkresp =
|
||||
await talkreq(p.baseProtocol, toNode, p.protocolId, encodeMessage(request))
|
||||
|
||||
return respResult
|
||||
# Note: Failure of `decodeMessage` might also simply mean that the peer is
|
||||
# not supporting the specific talk protocol, as according to specification
|
||||
# an empty response needs to be send in that case.
|
||||
# See: https://github.com/ethereum/devp2p/blob/master/discv5/discv5-wire.md#talkreq-request-0x05
|
||||
let messageResponse = talkresp
|
||||
.flatMap(proc (x: seq[byte]): Result[Message, cstring] = decodeMessage(x))
|
||||
.flatMap(proc (m: Message): Result[Response, cstring] =
|
||||
let reqResult = getInnerMessageResult[Response](
|
||||
getInnerMessageResult[Response](
|
||||
m, cstring"Invalid message response received")
|
||||
if reqResult.isOk():
|
||||
p.routingTable.setJustSeen(toNode)
|
||||
else:
|
||||
p.routingTable.replaceNode(toNode)
|
||||
reqResult
|
||||
)
|
||||
|
||||
if messageResponse.isOk():
|
||||
p.routingTable.setJustSeen(toNode)
|
||||
else:
|
||||
p.routingTable.replaceNode(toNode)
|
||||
|
||||
return messageResponse
|
||||
|
||||
proc ping*(p: PortalProtocol, dst: Node):
|
||||
Future[PortalResult[PongMessage]] {.async.} =
|
||||
let ping = PingMessage(enrSeq: p.baseProtocol.localNode.record.seqNum,
|
||||
|
@ -495,9 +511,16 @@ proc queryRandom*(p: PortalProtocol): Future[seq[Node]] =
|
|||
## Perform a query for a random target, return all nodes discovered.
|
||||
p.query(NodeId.random(p.baseProtocol.rng[]))
|
||||
|
||||
proc seedTable(p: PortalProtocol) =
|
||||
# TODO: Just picking something here for now. Should definitely add portal
|
||||
# protocol info k:v pair in the ENRs and filter on that.
|
||||
proc seedTable*(p: PortalProtocol) =
|
||||
## Seed the table with nodes from the discv5 table and with specifically
|
||||
## provided bootstrap nodes. The latter are then supposed to be nodes
|
||||
## supporting the wire protocol for the specific content network.
|
||||
# Note: We allow replacing the bootstrap nodes in the routing table as it is
|
||||
# possible that some of these are not supporting the specific portal network.
|
||||
|
||||
# TODO: Picking some nodes from discv5 routing table now. Should definitely
|
||||
# add supported Portal network info in a k:v pair in the ENRs and filter on
|
||||
# that.
|
||||
let closestNodes = p.baseProtocol.neighbours(
|
||||
NodeId.random(p.baseProtocol.rng[]), seenOnly = true)
|
||||
|
||||
|
@ -507,6 +530,15 @@ proc seedTable(p: PortalProtocol) =
|
|||
else:
|
||||
debug "Node from discv5 routing table could not be added", uri = toURI(node.record)
|
||||
|
||||
# Seed the table with bootstrap nodes.
|
||||
for record in p.bootstrapRecords:
|
||||
if p.addNode(record):
|
||||
debug "Added bootstrap node", uri = toURI(record),
|
||||
protocolId = p.protocolId
|
||||
else:
|
||||
error "Bootstrap node could not be added", uri = toURI(record),
|
||||
protocolId = p.protocolId
|
||||
|
||||
proc populateTable(p: PortalProtocol) {.async.} =
|
||||
## Do a set of initial lookups to quickly populate the table.
|
||||
# start with a self target query (neighbour nodes)
|
||||
|
|
|
@ -47,7 +47,7 @@ proc stopTest(test: Default2NodeTest) {.async.} =
|
|||
procSuite "Portal Wire Protocol Tests":
|
||||
let rng = newRng()
|
||||
|
||||
asyncTest "Portal Ping/Pong":
|
||||
asyncTest "Ping/Pong":
|
||||
let test = defaultTestCase(rng)
|
||||
|
||||
let pong = await test.proto1.ping(test.proto2.baseProtocol.localNode)
|
||||
|
@ -59,36 +59,7 @@ procSuite "Portal Wire Protocol Tests":
|
|||
|
||||
await test.stopTest()
|
||||
|
||||
asyncTest "Portal correctly mark node as seen after request":
|
||||
let test = defaultTestCase(rng)
|
||||
|
||||
let initialNeighbours = test.proto1.neighbours(test.proto1.baseProtocol.localNode.id, seenOnly = false)
|
||||
|
||||
check:
|
||||
len(initialNeighbours) == 0
|
||||
|
||||
discard test.proto1.addNode(test.proto2.baseProtocol.localNode)
|
||||
|
||||
let allNeighboursAfterAdd = test.proto1.neighbours(test.proto1.baseProtocol.localNode.id, seenOnly = false)
|
||||
let seenNeighboursAfterAdd = test.proto1.neighbours(test.proto1.baseProtocol.localNode.id, seenOnly = true)
|
||||
|
||||
check:
|
||||
len(allNeighboursAfterAdd) == 1
|
||||
len(seenNeighboursAfterAdd) == 0
|
||||
|
||||
let pong = await test.proto1.ping(test.proto2.baseProtocol.localNode)
|
||||
|
||||
let allNeighboursAfterPing = test.proto1.neighbours(test.proto1.baseProtocol.localNode.id, seenOnly = false)
|
||||
let seenNeighboursAfterPing = test.proto1.neighbours(test.proto1.baseProtocol.localNode.id, seenOnly = true)
|
||||
|
||||
check:
|
||||
pong.isOk()
|
||||
len(allNeighboursAfterPing) == 1
|
||||
len(seenNeighboursAfterPing) == 1
|
||||
|
||||
await test.stopTest()
|
||||
|
||||
asyncTest "Portal FindNode/Nodes":
|
||||
asyncTest "FindNode/Nodes":
|
||||
let test = defaultTestCase(rng)
|
||||
|
||||
block: # Find itself
|
||||
|
@ -132,7 +103,75 @@ procSuite "Portal Wire Protocol Tests":
|
|||
|
||||
await test.stopTest()
|
||||
|
||||
asyncTest "Portal lookup nodes":
|
||||
asyncTest "FindContent/FoundContent - send enrs":
|
||||
let test = defaultTestCase(rng)
|
||||
|
||||
# ping in one direction to add, ping in the other to update as seen.
|
||||
check (await test.node1.ping(test.node2.localNode)).isOk()
|
||||
check (await test.node2.ping(test.node1.localNode)).isOk()
|
||||
|
||||
# Start the portal protocol to seed nodes from the discoveryv5 routing
|
||||
# table.
|
||||
test.proto2.start()
|
||||
|
||||
let contentKey = List.init(@[1'u8], 2048)
|
||||
|
||||
# content does not exist so this should provide us with the closest nodes
|
||||
# to the content, which is the only node in the routing table.
|
||||
let foundContent = await test.proto1.findContent(test.proto2.baseProtocol.localNode,
|
||||
contentKey)
|
||||
|
||||
check:
|
||||
foundContent.isOk()
|
||||
foundContent.get().enrs.len() == 1
|
||||
foundContent.get().payload.len() == 0
|
||||
|
||||
await test.stopTest()
|
||||
|
||||
asyncTest "Offer/Accept":
|
||||
let test = defaultTestCase(rng)
|
||||
let contentKeys = ContentKeysList(List(@[ByteList(@[byte 0x01, 0x02, 0x03])]))
|
||||
|
||||
let accept = await test.proto1.offer(
|
||||
test.proto2.baseProtocol.localNode, contentKeys)
|
||||
|
||||
check:
|
||||
accept.isOk()
|
||||
accept.get().connectionId.len == 2
|
||||
accept.get().contentKeys.len == contentKeys.len
|
||||
|
||||
await test.stopTest()
|
||||
|
||||
asyncTest "Correctly mark node as seen after request":
|
||||
let test = defaultTestCase(rng)
|
||||
|
||||
let initialNeighbours = test.proto1.neighbours(test.proto1.baseProtocol.localNode.id, seenOnly = false)
|
||||
|
||||
check:
|
||||
len(initialNeighbours) == 0
|
||||
|
||||
discard test.proto1.addNode(test.proto2.baseProtocol.localNode)
|
||||
|
||||
let allNeighboursAfterAdd = test.proto1.neighbours(test.proto1.baseProtocol.localNode.id, seenOnly = false)
|
||||
let seenNeighboursAfterAdd = test.proto1.neighbours(test.proto1.baseProtocol.localNode.id, seenOnly = true)
|
||||
|
||||
check:
|
||||
len(allNeighboursAfterAdd) == 1
|
||||
len(seenNeighboursAfterAdd) == 0
|
||||
|
||||
let pong = await test.proto1.ping(test.proto2.baseProtocol.localNode)
|
||||
|
||||
let allNeighboursAfterPing = test.proto1.neighbours(test.proto1.baseProtocol.localNode.id, seenOnly = false)
|
||||
let seenNeighboursAfterPing = test.proto1.neighbours(test.proto1.baseProtocol.localNode.id, seenOnly = true)
|
||||
|
||||
check:
|
||||
pong.isOk()
|
||||
len(allNeighboursAfterPing) == 1
|
||||
len(seenNeighboursAfterPing) == 1
|
||||
|
||||
await test.stopTest()
|
||||
|
||||
asyncTest "Lookup nodes":
|
||||
let
|
||||
node1 = initDiscoveryNode(
|
||||
rng, PrivateKey.random(rng[]), localAddress(20302))
|
||||
|
@ -161,42 +200,47 @@ procSuite "Portal Wire Protocol Tests":
|
|||
await node2.closeWait()
|
||||
await node3.closeWait()
|
||||
|
||||
asyncTest "Valid Bootstrap Node":
|
||||
let
|
||||
node1 = initDiscoveryNode(
|
||||
rng, PrivateKey.random(rng[]), localAddress(20302))
|
||||
node2 = initDiscoveryNode(
|
||||
rng, PrivateKey.random(rng[]), localAddress(20303))
|
||||
|
||||
asyncTest "Portal FindContent/FoundContent - send enrs":
|
||||
let test = defaultTestCase(rng)
|
||||
proto1 = PortalProtocol.new(node1, protocolId, testHandler)
|
||||
proto2 = PortalProtocol.new(node2, protocolId, testHandler,
|
||||
bootstrapRecords = [node1.localNode.record])
|
||||
|
||||
# ping in one direction to add, ping in the other to update as seen.
|
||||
check (await test.node1.ping(test.node2.localNode)).isOk()
|
||||
check (await test.node2.ping(test.node1.localNode)).isOk()
|
||||
proto1.start()
|
||||
proto2.start()
|
||||
|
||||
# Start the portal protocol to seed nodes from the discoveryv5 routing
|
||||
# table.
|
||||
test.proto2.start()
|
||||
check proto2.neighbours(proto2.localNode.id).len == 1
|
||||
|
||||
let contentKey = List.init(@[1'u8], 2048)
|
||||
proto1.stop()
|
||||
proto2.stop()
|
||||
await node1.closeWait()
|
||||
await node2.closeWait()
|
||||
|
||||
# content does not exist so this should provide us with the closest nodes
|
||||
# to the content, which is the only node in the routing table.
|
||||
let foundContent = await test.proto1.findContent(test.proto2.baseProtocol.localNode,
|
||||
contentKey)
|
||||
asyncTest "Invalid Bootstrap Node":
|
||||
let
|
||||
node1 = initDiscoveryNode(
|
||||
rng, PrivateKey.random(rng[]), localAddress(20302))
|
||||
node2 = initDiscoveryNode(
|
||||
rng, PrivateKey.random(rng[]), localAddress(20303))
|
||||
|
||||
check:
|
||||
foundContent.isOk()
|
||||
foundContent.get().enrs.len() == 1
|
||||
foundContent.get().payload.len() == 0
|
||||
# No portal protocol for node1, hence an invalid bootstrap node
|
||||
proto2 = PortalProtocol.new(node2, protocolId, testHandler,
|
||||
bootstrapRecords = [node1.localNode.record])
|
||||
|
||||
await test.stopTest()
|
||||
# seedTable to add node1 to the routing table
|
||||
proto2.seedTable()
|
||||
check proto2.neighbours(proto2.localNode.id).len == 1
|
||||
|
||||
asyncTest "Portal Offer/Accept":
|
||||
let test = defaultTestCase(rng)
|
||||
let contentKeys = ContentKeysList(List(@[ByteList(@[byte 0x01, 0x02, 0x03])]))
|
||||
# This should fail and drop node1 from the routing table
|
||||
await proto2.revalidateNode(node1.localNode)
|
||||
|
||||
let accept = await test.proto1.offer(
|
||||
test.proto2.baseProtocol.localNode, contentKeys)
|
||||
check proto2.neighbours(proto2.localNode.id).len == 0
|
||||
|
||||
check:
|
||||
accept.isOk()
|
||||
accept.get().connectionId.len == 2
|
||||
accept.get().contentKeys.len == contentKeys.len
|
||||
|
||||
await test.stopTest()
|
||||
proto2.stop()
|
||||
await node1.closeWait()
|
||||
await node2.closeWait()
|
||||
|
|
Loading…
Reference in New Issue