Move Portal wire protocol out of state specific code (#843)
* Allow for custom protocol id in portal wire protocol * Move Portal wire protocol out of state specific code
This commit is contained in:
parent
11c120e400
commit
6f6345a022
|
@ -14,7 +14,7 @@ import
|
|||
eth/keys, eth/net/nat,
|
||||
eth/p2p/discoveryv5/protocol as discv5_protocol,
|
||||
./conf, ./rpc/[eth_api, bridge_client, discovery_api],
|
||||
./network/state/[portal_network, content]
|
||||
./network/state/[state_network, state_content]
|
||||
|
||||
proc initializeBridgeClient(maybeUri: Option[string]): Option[BridgeClient] =
|
||||
try:
|
||||
|
@ -53,7 +53,7 @@ proc run(config: PortalConf) {.raises: [CatchableError, Defect].} =
|
|||
|
||||
d.open()
|
||||
|
||||
let portal = PortalNetwork.new(d, newEmptyInMemoryStorage())
|
||||
let stateNetwork = StateNetwork.new(d, newEmptyInMemoryStorage())
|
||||
|
||||
if config.metricsEnabled:
|
||||
let
|
||||
|
@ -79,7 +79,7 @@ proc run(config: PortalConf) {.raises: [CatchableError, Defect].} =
|
|||
let bridgeClient = initializeBridgeClient(config.bridgeUri)
|
||||
|
||||
d.start()
|
||||
portal.start()
|
||||
stateNetwork.start()
|
||||
|
||||
runForever()
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@ import
|
|||
eth/[keys, net/nat],
|
||||
eth/p2p/discoveryv5/[enr, node],
|
||||
eth/p2p/discoveryv5/protocol as discv5_protocol,
|
||||
./state/messages, ./state/portal_protocol
|
||||
./wire/messages, ./wire/portal_protocol
|
||||
|
||||
type
|
||||
PortalCmd* = enum
|
||||
|
@ -170,7 +170,7 @@ proc run(config: DiscoveryConf) =
|
|||
|
||||
d.open()
|
||||
|
||||
let portal = PortalProtocol.new(d, testHandler)
|
||||
let portal = PortalProtocol.new(d, "portal".toBytes(), testHandler)
|
||||
|
||||
if config.metricsEnabled:
|
||||
let
|
||||
|
|
|
@ -1,17 +1,21 @@
|
|||
import
|
||||
std/[options, sugar],
|
||||
stew/results,
|
||||
stew/[results, byteutils],
|
||||
eth/p2p/discoveryv5/[protocol, node],
|
||||
./content, ./portal_protocol
|
||||
../wire/portal_protocol,
|
||||
./state_content
|
||||
|
||||
const
|
||||
StateProtocolId* = "portal:state".toBytes()
|
||||
|
||||
# TODO expose function in domain specific way i.e operating od state network
|
||||
# objects i.e nodes, tries, hashes
|
||||
type PortalNetwork* = ref object
|
||||
storage: ContentStorage
|
||||
type StateNetwork* = ref object
|
||||
portalProtocol*: PortalProtocol
|
||||
storage: ContentStorage
|
||||
|
||||
proc getHandler(storage: ContentStorage): ContentHandler =
|
||||
return (proc (contentKey: content.ByteList): ContentResult =
|
||||
return (proc (contentKey: state_content.ByteList): ContentResult =
|
||||
let maybeContent = storage.getContent(contentKey)
|
||||
if (maybeContent.isSome()):
|
||||
ContentResult(kind: ContentFound, content: maybeContent.unsafeGet())
|
||||
|
@ -22,7 +26,7 @@ proc getHandler(storage: ContentStorage): ContentHandler =
|
|||
# 1. Return proper domain types instead of bytes
|
||||
# 2. First check if item is in storage instead of doing lookup
|
||||
# 3. Put item into storage (if in radius) after succesful lookup
|
||||
proc getContent*(p:PortalNetwork, key: ContentKey):
|
||||
proc getContent*(p: StateNetwork, key: ContentKey):
|
||||
Future[Option[seq[byte]]] {.async.} =
|
||||
let keyAsBytes = encodeKeyAsList(key)
|
||||
let id = contentIdAsUint256(toContentId(keyAsBytes))
|
||||
|
@ -31,15 +35,15 @@ proc getContent*(p:PortalNetwork, key: ContentKey):
|
|||
# types from here
|
||||
return result.map(x => x.asSeq())
|
||||
|
||||
proc new*(T: type PortalNetwork, baseProtocol: protocol.Protocol,
|
||||
proc new*(T: type StateNetwork, baseProtocol: protocol.Protocol,
|
||||
storage: ContentStorage , dataRadius = UInt256.high()): T =
|
||||
let portalProto =
|
||||
PortalProtocol.new(baseProtocol, getHandler(storage), dataRadius)
|
||||
let portalProtocol = PortalProtocol.new(
|
||||
baseProtocol, StateProtocolId, getHandler(storage), dataRadius)
|
||||
|
||||
return PortalNetwork(storage: storage, portalProtocol: portalProto)
|
||||
return StateNetwork(portalProtocol: portalProtocol, storage: storage)
|
||||
|
||||
proc start*(p: PortalNetwork) =
|
||||
proc start*(p: StateNetwork) =
|
||||
p.portalProtocol.start()
|
||||
|
||||
proc stop*(p: PortalNetwork) =
|
||||
proc stop*(p: StateNetwork) =
|
||||
p.portalProtocol.stop()
|
|
@ -9,17 +9,16 @@
|
|||
|
||||
import
|
||||
std/[sequtils, sets, algorithm],
|
||||
stew/[results, byteutils], chronicles, chronos, nimcrypto/hash,
|
||||
stew/results, chronicles, chronos, nimcrypto/hash,
|
||||
eth/rlp, eth/p2p/discoveryv5/[protocol, node, enr, routing_table, random2, nodes_verification],
|
||||
./messages
|
||||
|
||||
export messages
|
||||
|
||||
logScope:
|
||||
topics = "portal"
|
||||
topics = "portal_wire"
|
||||
|
||||
const
|
||||
PortalProtocolId* = "portal".toBytes()
|
||||
Alpha = 3 ## Kademlia concurrency factor
|
||||
LookupRequestLimit = 3 ## Amount of distances requested in a single Findnode
|
||||
## message for a lookup or query
|
||||
|
@ -51,6 +50,7 @@ type
|
|||
proc(contentKey: ByteList): ContentResult {.raises: [Defect], gcsafe.}
|
||||
|
||||
PortalProtocol* = ref object of TalkProtocol
|
||||
protocolId: seq[byte]
|
||||
routingTable: RoutingTable
|
||||
baseProtocol*: protocol.Protocol
|
||||
dataRadius*: UInt256
|
||||
|
@ -179,7 +179,9 @@ proc messageHandler*(protocol: TalkProtocol, request: seq[byte],
|
|||
else:
|
||||
@[]
|
||||
|
||||
proc new*(T: type PortalProtocol, baseProtocol: protocol.Protocol,
|
||||
proc new*(T: type PortalProtocol,
|
||||
baseProtocol: protocol.Protocol,
|
||||
protocolId: seq[byte],
|
||||
contentHandler: ContentHandler,
|
||||
dataRadius = UInt256.high()): T =
|
||||
let proto = PortalProtocol(
|
||||
|
@ -188,9 +190,10 @@ proc new*(T: type PortalProtocol, baseProtocol: protocol.Protocol,
|
|||
protocolHandler: messageHandler,
|
||||
baseProtocol: baseProtocol,
|
||||
dataRadius: dataRadius,
|
||||
handleContentRequest: contentHandler)
|
||||
handleContentRequest: contentHandler,
|
||||
protocolId: protocolId)
|
||||
|
||||
proto.baseProtocol.registerTalkProtocol(PortalProtocolId, proto).expect(
|
||||
proto.baseProtocol.registerTalkProtocol(proto.protocolId, proto).expect(
|
||||
"Only one protocol should have this id")
|
||||
|
||||
return proto
|
||||
|
@ -202,11 +205,10 @@ proc new*(T: type PortalProtocol, baseProtocol: protocol.Protocol,
|
|||
proc reqResponse[Request: SomeMessage, Response: SomeMessage](
|
||||
p: PortalProtocol,
|
||||
toNode: Node,
|
||||
protocol: seq[byte],
|
||||
request: Request
|
||||
): Future[PortalResult[Response]] {.async.} =
|
||||
let respResult =
|
||||
await talkreq(p.baseProtocol, toNode, protocol, encodeMessage(request))
|
||||
await talkreq(p.baseProtocol, toNode, p.protocolId, encodeMessage(request))
|
||||
|
||||
return respResult
|
||||
.flatMap(proc (x: seq[byte]): Result[Message, cstring] = decodeMessage(x))
|
||||
|
@ -226,8 +228,7 @@ proc ping*(p: PortalProtocol, dst: Node):
|
|||
dataRadius: p.dataRadius)
|
||||
|
||||
trace "Send message request", dstId = dst.id, kind = MessageKind.ping
|
||||
return await reqResponse[PingMessage, PongMessage](
|
||||
p, dst, PortalProtocolId, ping)
|
||||
return await reqResponse[PingMessage, PongMessage](p, dst, ping)
|
||||
|
||||
proc findNode*(p: PortalProtocol, dst: Node, distances: List[uint16, 256]):
|
||||
Future[PortalResult[NodesMessage]] {.async.} =
|
||||
|
@ -235,16 +236,14 @@ proc findNode*(p: PortalProtocol, dst: Node, distances: List[uint16, 256]):
|
|||
|
||||
trace "Send message request", dstId = dst.id, kind = MessageKind.findnode
|
||||
# TODO Add nodes validation
|
||||
return await reqResponse[FindNodeMessage, NodesMessage](
|
||||
p, dst, PortalProtocolId, fn)
|
||||
return await reqResponse[FindNodeMessage, NodesMessage](p, dst, fn)
|
||||
|
||||
proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList):
|
||||
Future[PortalResult[FoundContentMessage]] {.async.} =
|
||||
let fc = FindContentMessage(contentKey: contentKey)
|
||||
|
||||
trace "Send message request", dstId = dst.id, kind = MessageKind.findcontent
|
||||
return await reqResponse[FindContentMessage, FoundContentMessage](
|
||||
p, dst, PortalProtocolId, fc)
|
||||
return await reqResponse[FindContentMessage, FoundContentMessage](p, dst, fc)
|
||||
|
||||
proc offer*(p: PortalProtocol, dst: Node, contentKeys: ContentKeysList):
|
||||
Future[PortalResult[AcceptMessage]] {.async.} =
|
||||
|
@ -252,8 +251,7 @@ proc offer*(p: PortalProtocol, dst: Node, contentKeys: ContentKeysList):
|
|||
|
||||
trace "Send message request", dstId = dst.id, kind = MessageKind.offer
|
||||
|
||||
return await reqResponse[OfferMessage, AcceptMessage](
|
||||
p, dst, PortalProtocolId, offer)
|
||||
return await reqResponse[OfferMessage, AcceptMessage](p, dst, offer)
|
||||
|
||||
# TODO: Actually have to parse the offer message and get the uTP connection
|
||||
# id, and initiate an uTP stream with given uTP connection id to get the data
|
|
@ -8,9 +8,9 @@
|
|||
{. warning[UnusedImport]:off .}
|
||||
|
||||
import
|
||||
./test_portal_encoding,
|
||||
./test_portal,
|
||||
./test_content_network,
|
||||
./test_discovery_rpc,
|
||||
./test_portal_wire_encoding,
|
||||
./test_portal_wire_protocol,
|
||||
./test_custom_distance,
|
||||
./test_state_network,
|
||||
./test_discovery_rpc,
|
||||
./test_bridge_parser
|
||||
|
|
|
@ -9,9 +9,9 @@
|
|||
|
||||
import
|
||||
unittest2, stint, stew/[byteutils, results], eth/p2p/discoveryv5/enr,
|
||||
../network/state/messages
|
||||
../network/wire/messages
|
||||
|
||||
suite "Portal Protocol Message Encodings":
|
||||
suite "Portal Wire Protocol Message Encodings":
|
||||
test "Ping Request":
|
||||
var dataRadius: UInt256
|
||||
let
|
|
@ -8,12 +8,14 @@
|
|||
{.used.}
|
||||
|
||||
import
|
||||
chronos, testutils/unittests, stew/shims/net,
|
||||
chronos, testutils/unittests, stew/shims/net, stew/byteutils,
|
||||
eth/keys, eth/p2p/discoveryv5/routing_table, nimcrypto/[hash, sha2],
|
||||
eth/p2p/discoveryv5/protocol as discv5_protocol,
|
||||
../network/state/portal_protocol,
|
||||
../network/wire/portal_protocol,
|
||||
./test_helpers
|
||||
|
||||
const protocolId = "portal".toBytes()
|
||||
|
||||
type Default2NodeTest = ref object
|
||||
node1: discv5_protocol.Protocol
|
||||
node2: discv5_protocol.Protocol
|
||||
|
@ -31,8 +33,8 @@ proc defaultTestCase(rng: ref BrHmacDrbgContext): Default2NodeTest =
|
|||
node2 = initDiscoveryNode(
|
||||
rng, PrivateKey.random(rng[]), localAddress(20303))
|
||||
|
||||
proto1 = PortalProtocol.new(node1, testHandler)
|
||||
proto2 = PortalProtocol.new(node2, testHandler)
|
||||
proto1 = PortalProtocol.new(node1, protocolId, testHandler)
|
||||
proto2 = PortalProtocol.new(node2, protocolId, testHandler)
|
||||
|
||||
Default2NodeTest(node1: node1, node2: node2, proto1: proto1, proto2: proto2)
|
||||
|
||||
|
@ -42,7 +44,7 @@ proc stopTest(test: Default2NodeTest) {.async.} =
|
|||
await test.node1.closeWait()
|
||||
await test.node2.closeWait()
|
||||
|
||||
procSuite "Portal Tests":
|
||||
procSuite "Portal Wire Protocol Tests":
|
||||
let rng = newRng()
|
||||
|
||||
asyncTest "Portal Ping/Pong":
|
||||
|
@ -139,9 +141,9 @@ procSuite "Portal Tests":
|
|||
node3 = initDiscoveryNode(
|
||||
rng, PrivateKey.random(rng[]), localAddress(20304))
|
||||
|
||||
proto1 = PortalProtocol.new(node1, testHandler)
|
||||
proto2 = PortalProtocol.new(node2, testHandler)
|
||||
proto3 = PortalProtocol.new(node3, testHandler)
|
||||
proto1 = PortalProtocol.new(node1, protocolId, testHandler)
|
||||
proto2 = PortalProtocol.new(node2, protocolId, testHandler)
|
||||
proto3 = PortalProtocol.new(node3, protocolId, testHandler)
|
||||
|
||||
# Node1 knows about Node2, and Node2 knows about Node3 which hold all content
|
||||
check proto1.addNode(node2.localNode) == Added
|
|
@ -11,7 +11,8 @@ import
|
|||
eth/[keys, trie/db, trie/hexary, ssz/ssz_serialization],
|
||||
eth/p2p/discoveryv5/protocol as discv5_protocol, eth/p2p/discoveryv5/routing_table,
|
||||
../../nimbus/[genesis, chain_config, config, db/db_chain],
|
||||
../network/state/portal_protocol, ../network/state/content, ../network/state/portal_network,
|
||||
../network/wire/portal_protocol,
|
||||
../network/state/[state_content, state_network],
|
||||
./test_helpers
|
||||
|
||||
proc genesisToTrie(filePath: string): HexaryTrie =
|
||||
|
@ -33,7 +34,7 @@ proc genesisToTrie(filePath: string): HexaryTrie =
|
|||
# Trie exists already in flat db, but need to provide the root
|
||||
initHexaryTrie(chainDB.db, header.stateRoot, chainDB.pruneTrie)
|
||||
|
||||
procSuite "Content Network":
|
||||
procSuite "State Content Network":
|
||||
let rng = newRng()
|
||||
asyncTest "Test Share Full State":
|
||||
let
|
||||
|
@ -44,8 +45,8 @@ procSuite "Content Network":
|
|||
node2 = initDiscoveryNode(
|
||||
rng, PrivateKey.random(rng[]), localAddress(20303))
|
||||
|
||||
proto1 = PortalNetwork.new(node1, ContentStorage(trie: trie))
|
||||
proto2 = PortalNetwork.new(node2, ContentStorage(trie: trie))
|
||||
proto1 = StateNetwork.new(node1, ContentStorage(trie: trie))
|
||||
proto2 = StateNetwork.new(node2, ContentStorage(trie: trie))
|
||||
|
||||
check proto2.portalProtocol.addNode(node1.localNode) == Added
|
||||
|
||||
|
@ -60,7 +61,7 @@ procSuite "Content Network":
|
|||
let
|
||||
contentKey = ContentKey(
|
||||
networkId: 0'u16,
|
||||
contentType: content.ContentType.Account,
|
||||
contentType: state_content.ContentType.Account,
|
||||
nodeHash: nodeHash)
|
||||
|
||||
let foundContent = await proto2.getContent(contentKey)
|
||||
|
@ -85,9 +86,9 @@ procSuite "Content Network":
|
|||
rng, PrivateKey.random(rng[]), localAddress(20304))
|
||||
|
||||
|
||||
proto1 = PortalNetwork.new(node1, ContentStorage(trie: trie))
|
||||
proto2 = PortalNetwork.new(node2, ContentStorage(trie: trie))
|
||||
proto3 = PortalNetwork.new(node3, ContentStorage(trie: trie))
|
||||
proto1 = StateNetwork.new(node1, ContentStorage(trie: trie))
|
||||
proto2 = StateNetwork.new(node2, ContentStorage(trie: trie))
|
||||
proto3 = StateNetwork.new(node3, ContentStorage(trie: trie))
|
||||
|
||||
|
||||
# Node1 knows about Node2, and Node2 knows about Node3 which hold all content
|
||||
|
@ -107,7 +108,7 @@ procSuite "Content Network":
|
|||
|
||||
let contentKey = ContentKey(
|
||||
networkId: 0'u16,
|
||||
contentType: content.ContentType.Account,
|
||||
contentType: state_content.ContentType.Account,
|
||||
nodeHash: nodeHash)
|
||||
|
||||
let foundContent = await proto1.getContent(contentKey)
|
Loading…
Reference in New Issue