Refactor portal stream (#1191)

This commit is contained in:
KonradStaniec 2022-08-17 09:32:06 +02:00 committed by GitHub
parent f07945d37b
commit 5f2117d081
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 159 additions and 147 deletions

View File

@ -102,30 +102,13 @@ proc run(config: PortalConf) {.raises: [CatchableError, Defect].} =
config.bitsPerHop,
config.radiusConfig
)
stateNetwork = StateNetwork.new(d, db,
bootstrapRecords = bootstrapRecords, portalConfig = portalConfig)
historyNetwork = HistoryNetwork.new(d, db,
streamManager = StreamManager.new(d)
stateNetwork = StateNetwork.new(d, db, streamManager,
bootstrapRecords = bootstrapRecords, portalConfig = portalConfig)
# One instance of UtpDiscv5Protocol is shared over all the PortalStreams.
let
socketConfig = SocketConfig.init(
incomingSocketReceiveTimeout = none(Duration),
payloadSize = uint32(maxUtpPayloadSize))
streamTransport = UtpDiscv5Protocol.new(
d,
utpProtocolId,
registerIncomingSocketCallback(@[
stateNetwork.portalProtocol.stream,
historyNetwork.portalProtocol.stream]),
# for now we do not use user data in callbacks
nil,
allowRegisteredIdCallback(@[
stateNetwork.portalProtocol.stream,
historyNetwork.portalProtocol.stream]),
socketConfig)
stateNetwork.setStreamTransport(streamTransport)
historyNetwork.setStreamTransport(streamTransport)
historyNetwork = HistoryNetwork.new(d, db, streamManager,
bootstrapRecords = bootstrapRecords, portalConfig = portalConfig)
# TODO: If no new network key is generated then we should first check if an
# enr file exists, and in the case it does read out the seqNum from it and

View File

@ -40,13 +40,11 @@ type
HistoryNetwork* = ref object
portalProtocol*: PortalProtocol
contentDB*: ContentDB
contentQueue*: AsyncQueue[(ContentKeysList, seq[seq[byte]])]
processContentLoop: Future[void]
Block* = (BlockHeader, BlockBody)
func setStreamTransport*(n: HistoryNetwork, transport: UtpDiscv5Protocol) =
setTransport(n.portalProtocol.stream, transport)
func toContentIdHandler(contentKey: ByteList): Option[ContentId] =
some(toContentId(contentKey))
@ -740,20 +738,30 @@ proc new*(
T: type HistoryNetwork,
baseProtocol: protocol.Protocol,
contentDB: ContentDB,
streamManager: StreamManager,
bootstrapRecords: openArray[Record] = [],
portalConfig: PortalProtocolConfig = defaultPortalProtocolConfig): T =
let cq = newAsyncQueue[(ContentKeysList, seq[seq[byte]])](50)
let s = streamManager.registerNewStream(cq)
let portalProtocol = PortalProtocol.new(
baseProtocol, historyProtocolId, contentDB,
toContentIdHandler, dbGetHandler, bootstrapRecords,
toContentIdHandler, dbGetHandler, s, bootstrapRecords,
config = portalConfig)
return HistoryNetwork(portalProtocol: portalProtocol, contentDB: contentDB)
return HistoryNetwork(
portalProtocol: portalProtocol,
contentDB: contentDB,
contentQueue: cq
)
proc processContentLoop(n: HistoryNetwork) {.async.} =
try:
while true:
let (contentKeys, contentItems) =
await n.portalProtocol.stream.contentQueue.popFirst()
await n.contentQueue.popFirst()
# content passed here can have less items then contentKeys, but not more.
for i, contentItem in contentItems:

View File

@ -23,11 +23,9 @@ const
type StateNetwork* = ref object
portalProtocol*: PortalProtocol
contentDB*: ContentDB
contentQueue*: AsyncQueue[(ContentKeysList, seq[seq[byte]])]
processContentLoop: Future[void]
func setStreamTransport*(n: StateNetwork, transport: UtpDiscv5Protocol) =
setTransport(n.portalProtocol.stream, transport)
func toContentIdHandler(contentKey: ByteList): Option[ContentId] =
toContentId(contentKey)
@ -77,15 +75,25 @@ proc new*(
T: type StateNetwork,
baseProtocol: protocol.Protocol,
contentDB: ContentDB,
streamManager: StreamManager,
bootstrapRecords: openArray[Record] = [],
portalConfig: PortalProtocolConfig = defaultPortalProtocolConfig): T =
let cq = newAsyncQueue[(ContentKeysList, seq[seq[byte]])](50)
let s = streamManager.registerNewStream(cq)
let portalProtocol = PortalProtocol.new(
baseProtocol, stateProtocolId, contentDB,
toContentIdHandler, dbGetHandler,
toContentIdHandler, dbGetHandler, s,
bootstrapRecords, stateDistanceCalculator,
config = portalConfig)
return StateNetwork(portalProtocol: portalProtocol, contentDB: contentDB)
return StateNetwork(
portalProtocol: portalProtocol,
contentDB: contentDB,
contentQueue: cq
)
proc processContentLoop(n: StateNetwork) {.async.} =
try:

View File

@ -444,6 +444,7 @@ proc new*(T: type PortalProtocol,
contentDB: ContentDB,
toContentId: ToContentIdHandler,
dbGet: DbGetHandler,
stream: PortalStream,
bootstrapRecords: openArray[Record] = [],
distanceCalculator: DistanceCalculator = XorDistanceCalculator,
config: PortalProtocolConfig = defaultPortalProtocolConfig
@ -464,16 +465,13 @@ proc new*(T: type PortalProtocol,
radiusConfig: config.radiusConfig,
dataRadius: initialRadius,
bootstrapRecords: @bootstrapRecords,
stream: stream,
radiusCache: RadiusCache.init(256),
offerQueue: newAsyncQueue[OfferRequest](concurrentOffers))
proto.baseProtocol.registerTalkProtocol(@(proto.protocolId), proto).expect(
"Only one protocol should have this id")
let stream = PortalStream.new(udata = proto, rng = proto.baseProtocol.rng)
proto.stream = stream
proto
# Sends the discv5 talkreq nessage with provided Portal message, awaits and

View File

@ -22,7 +22,7 @@ logScope:
topics = "portal_stream"
const
utpProtocolId* = "utp".toBytes()
utpProtocolId = "utp".toBytes()
defaultConnectionTimeout = 15.seconds
defaultContentReadTimeout = 60.seconds
@ -32,7 +32,7 @@ const
# discv5 ordinary message packets, for which below calculation applies.
talkReqOverhead = getTalkReqOverhead(utpProtocolId)
utpHeaderOverhead = 20
maxUtpPayloadSize* = maxDiscv5PacketSize - talkReqOverhead - utpHeaderOverhead
maxUtpPayloadSize = maxDiscv5PacketSize - talkReqOverhead - utpHeaderOverhead
type
ContentRequest = object
@ -67,9 +67,13 @@ type
connectionTimeout: Duration
contentReadTimeout*: Duration
rng: ref HmacDrbgContext
udata: pointer
contentQueue*: AsyncQueue[(ContentKeysList, seq[seq[byte]])]
StreamManager* = ref object
transport: UtpDiscv5Protocol
streams: seq[PortalStream]
rng: ref HmacDrbgContext
proc pruneAllowedConnections(stream: PortalStream) =
# Prune requests and offers that didn't receive a connection request
# before `connectionTimeout`.
@ -79,10 +83,6 @@ proc pruneAllowedConnections(stream: PortalStream) =
stream.contentOffers.keepIf(proc(x: ContentOffer): bool =
x.timeout > now)
proc getUserData*[T](stream: PortalStream): T =
## Obtain user data stored in ``stream`` object.
cast[T](stream.udata)
proc addContentOffer*(
stream: PortalStream, nodeId: NodeId, contentKeys: ContentKeysList): Bytes2 =
stream.pruneAllowedConnections()
@ -264,56 +264,23 @@ proc readContentOffer(
# socket, and let the specific networks handle that.
await stream.contentQueue.put((offer.contentKeys, contentItems))
proc new*(
proc new(
T: type PortalStream,
udata: ref,
connectionTimeout = defaultConnectionTimeout,
contentReadTimeout = defaultContentReadTimeout,
rng = newRng()): T =
GC_ref(udata)
let
stream = PortalStream(
udata: cast[pointer](udata),
connectionTimeout: connectionTimeout,
contentReadTimeout: contentReadTimeout,
contentQueue: newAsyncQueue[(ContentKeysList, seq[seq[byte]])](50),
rng: rng)
transport: UtpDiscv5Protocol,
contentQueue: AsyncQueue[(ContentKeysList, seq[seq[byte]])],
connectionTimeout: Duration,
contentReadTimeout: Duration,
rng: ref HmacDrbgContext): T =
let stream = PortalStream(
transport: transport,
connectionTimeout: connectionTimeout,
contentReadTimeout: contentReadTimeout,
contentQueue: contentQueue,
rng: rng
)
stream
func setTransport*(stream: PortalStream, transport: UtpDiscv5Protocol) =
stream.transport = transport
# TODO: I think I'd like it more if we weren't to capture the stream.
proc registerIncomingSocketCallback*(
streams: seq[PortalStream]): AcceptConnectionCallback[NodeAddress] =
return (
proc(server: UtpRouter[NodeAddress], socket: UtpSocket[NodeAddress]): Future[void] =
for stream in streams:
# Note: Connection id of uTP SYN is different from other packets, it is
# actually the peers `send_conn_id`, opposed to `receive_conn_id` for all
# other packets.
for i, request in stream.contentRequests:
if request.connectionId == socket.connectionId and
request.nodeId == socket.remoteAddress.nodeId:
let fut = socket.writeContentRequest(stream, request)
stream.contentRequests.del(i)
return fut
for i, offer in stream.contentOffers:
if offer.connectionId == socket.connectionId and
offer.nodeId == socket.remoteAddress.nodeId:
let fut = socket.readContentOffer(stream, offer)
stream.contentOffers.del(i)
return fut
# TODO: Is there a scenario where this can happen,
# considering `allowRegisteredIdCallback`? If not, doAssert?
var fut = newFuture[void]("fluffy.AcceptConnectionCallback")
fut.complete()
return fut
)
proc allowedConnection(
stream: PortalStream, address: NodeAddress, connectionId: uint16): bool =
return
@ -324,12 +291,78 @@ proc allowedConnection(
proc (x: ContentOffer): bool =
x.connectionId == connectionId and x.nodeId == address.nodeId)
proc allowRegisteredIdCallback*(
streams: seq[PortalStream]): AllowConnectionCallback[NodeAddress] =
return (
proc(r: UtpRouter[NodeAddress], remoteAddress: NodeAddress, connectionId: uint16): bool =
for stream in streams:
# stream.pruneAllowedConnections()
if allowedConnection(stream, remoteAddress, connectionId):
return true
proc handleIncomingConnection(server: UtpRouter[NodeAddress], socket: UtpSocket[NodeAddress]): Future[void] =
let manager = getUserData[NodeAddress, StreamManager](server)
for stream in manager.streams:
# Note: Connection id of uTP SYN is different from other packets, it is
# actually the peers `send_conn_id`, opposed to `receive_conn_id` for all
# other packets.
for i, request in stream.contentRequests:
if request.connectionId == socket.connectionId and
request.nodeId == socket.remoteAddress.nodeId:
let fut = socket.writeContentRequest(stream, request)
stream.contentRequests.del(i)
return fut
for i, offer in stream.contentOffers:
if offer.connectionId == socket.connectionId and
offer.nodeId == socket.remoteAddress.nodeId:
let fut = socket.readContentOffer(stream, offer)
stream.contentOffers.del(i)
return fut
# TODO: Is there a scenario where this can happen,
# considering `allowRegisteredIdCallback`? If not, doAssert?
var fut = newFuture[void]("fluffy.AcceptConnectionCallback")
fut.complete()
return fut
proc allowIncomingConnection(r: UtpRouter[NodeAddress], remoteAddress: NodeAddress, connectionId: uint16): bool =
let manager = getUserData[NodeAddress, StreamManager](r)
for stream in manager.streams:
# stream.pruneAllowedConnections()
if allowedConnection(stream, remoteAddress, connectionId):
return true
proc new*(T: type StreamManager, d: protocol.Protocol): T =
let socketConfig = SocketConfig.init(
# Setting to none means that incoming sockets are in Connected state, which
# means they can send and receive data.
incomingSocketReceiveTimeout = none(Duration),
payloadSize = uint32(maxUtpPayloadSize)
)
let manager = StreamManager(streams: @[], rng: d.rng)
let utpOverDiscV5Protocol = UtpDiscv5Protocol.new(
d,
utpProtocolId,
handleIncomingConnection ,
manager,
allowIncomingConnection,
socketConfig
)
manager.transport = utpOverDiscV5Protocol
return manager
proc registerNewStream*(
m : StreamManager,
contentQueue: AsyncQueue[(ContentKeysList, seq[seq[byte]])],
connectionTimeout = defaultConnectionTimeout,
contentReadTimeout = defaultContentReadTimeout): PortalStream =
let s = PortalStream.new(
m.transport,
contentQueue,
connectionTimeout,
contentReadTimeout,
m.rng
)
m.streams.add(s)
return s

View File

@ -25,21 +25,8 @@ proc newHistoryNode(rng: ref HmacDrbgContext, port: int): HistoryNode =
let
node = initDiscoveryNode(rng, PrivateKey.random(rng[]), localAddress(port))
db = ContentDB.new("", uint32.high, inMemory = true)
socketConfig = SocketConfig.init(
incomingSocketReceiveTimeout = none(Duration),
payloadSize = uint32(maxUtpPayloadSize)
)
hn = HistoryNetwork.new(node, db)
streamTransport = UtpDiscv5Protocol.new(
node,
utpProtocolId,
registerIncomingSocketCallback(@[hn.portalProtocol.stream]),
nil,
allowRegisteredIdCallback(@[hn.portalProtocol.stream]),
socketConfig
)
hn.setStreamTransport(streamTransport)
streamManager = StreamManager.new(node)
hn = HistoryNetwork.new(node, db, streamManager)
return HistoryNode(discoveryProtocol: node, historyNetwork: hn)

View File

@ -41,22 +41,14 @@ proc initPortalProtocol(
let
d = initDiscoveryNode(rng, privKey, address, bootstrapRecords)
db = ContentDB.new("", uint32.high, inMemory = true)
manager = StreamManager.new(d)
q = newAsyncQueue[(ContentKeysList, seq[seq[byte]])](50)
stream = manager.registerNewStream(q)
proto = PortalProtocol.new(
d, protocolId, db, toContentId, dbGetHandler,
d, protocolId, db, toContentId, dbGetHandler, stream,
bootstrapRecords = bootstrapRecords)
socketConfig = SocketConfig.init(
incomingSocketReceiveTimeout = none(Duration),
payloadSize = uint32(maxUtpPayloadSize))
streamTransport = UtpDiscv5Protocol.new(
d, utpProtocolId,
registerIncomingSocketCallback(@[proto.stream]),
nil,
allowRegisteredIdCallback(@[proto.stream]),
socketConfig)
proto.stream.setTransport(streamTransport)
return proto
proc stopPortalProtocol(proto: PortalProtocol) {.async.} =
@ -347,8 +339,12 @@ procSuite "Portal Wire Protocol Tests":
dbLimit = 100_000'u32
db = ContentDB.new("", dbLimit, inMemory = true)
m = StreamManager.new(node1)
q = newAsyncQueue[(ContentKeysList, seq[seq[byte]])](50)
stream = m.registerNewStream(q)
proto1 = PortalProtocol.new(
node1, protocolId, db, toContentId, dbGetHandler)
node1, protocolId, db, toContentId, dbGetHandler, stream)
let item = genByteSeq(10_000)
var distances: seq[UInt256] = @[]

View File

@ -11,7 +11,7 @@ import
eth/[keys, trie/db, trie/hexary],
eth/p2p/discoveryv5/protocol as discv5_protocol, eth/p2p/discoveryv5/routing_table,
../../nimbus/[genesis, chain_config, config, db/db_chain, db/state_db],
../network/wire/portal_protocol,
../network/wire/[portal_protocol, portal_stream],
../network/state/[state_content, state_network],
../content_db,
./test_helpers
@ -42,11 +42,13 @@ procSuite "State Content Network":
node1 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20302))
sm1 = StreamManager.new(node1)
node2 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20303))
sm2 = StreamManager.new(node2)
proto1 = StateNetwork.new(node1, ContentDB.new("", uint32.high, inMemory = true))
proto2 = StateNetwork.new(node2, ContentDB.new("", uint32.high, inMemory = true))
proto1 = StateNetwork.new(node1, ContentDB.new("", uint32.high, inMemory = true), sm1)
proto2 = StateNetwork.new(node2, ContentDB.new("", uint32.high, inMemory = true), sm2)
check proto2.portalProtocol.addNode(node1.localNode) == Added
@ -96,14 +98,17 @@ procSuite "State Content Network":
trie = genesisToTrie("fluffy" / "tests" / "custom_genesis" / "chainid7.json")
node1 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20302))
sm1 = StreamManager.new(node1)
node2 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20303))
sm2 = StreamManager.new(node2)
node3 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20304))
sm3 = StreamManager.new(node3)
proto1 = StateNetwork.new(node1, ContentDB.new("", uint32.high, inMemory = true))
proto2 = StateNetwork.new(node2, ContentDB.new("", uint32.high, inMemory = true))
proto3 = StateNetwork.new(node3, ContentDB.new("", uint32.high, inMemory = true))
proto1 = StateNetwork.new(node1, ContentDB.new("", uint32.high, inMemory = true), sm1)
proto2 = StateNetwork.new(node2, ContentDB.new("", uint32.high, inMemory = true), sm2)
proto3 = StateNetwork.new(node3, ContentDB.new("", uint32.high, inMemory = true), sm3)
# Node1 knows about Node2, and Node2 knows about Node3 which hold all content
check proto1.portalProtocol.addNode(node2.localNode) == Added
@ -156,12 +161,13 @@ procSuite "State Content Network":
let
node1 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20302))
sm1 = StreamManager.new(node1)
node2 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20303))
sm2 = StreamManager.new(node2)
proto1 = StateNetwork.new(node1, ContentDB.new("", uint32.high, inMemory = true))
proto2 = StateNetwork.new(node2, ContentDB.new("", uint32.high, inMemory = true))
proto1 = StateNetwork.new(node1, ContentDB.new("", uint32.high, inMemory = true), sm1)
proto2 = StateNetwork.new(node2, ContentDB.new("", uint32.high, inMemory = true), sm2)
check (await node1.ping(node2.localNode)).isOk()
check (await node2.ping(node1.localNode)).isOk()

View File

@ -231,19 +231,12 @@ proc run(config: PortalCliConf) =
let
db = ContentDB.new("", config.storageSize, inMemory = true)
sm = StreamManager.new(d)
cq = newAsyncQueue[(ContentKeysList, seq[seq[byte]])](50)
stream = sm.registerNewStream(cq)
portal = PortalProtocol.new(d, config.protocolId, db,
testContentIdHandler, dbGetHandler,
testContentIdHandler, dbGetHandler, stream,
bootstrapRecords = bootstrapRecords)
socketConfig = SocketConfig.init(
incomingSocketReceiveTimeout = none(Duration))
streamTransport = UtpDiscv5Protocol.new(
d,
utpProtocolId,
registerIncomingSocketCallback(@[portal.stream]),
nil,
allowRegisteredIdCallback(@[portal.stream]), socketConfig)
setTransport(portal.stream, streamTransport)
if config.metricsEnabled:
let