Add concurrency to the content offers of neighborhoodGossip proc (#1027)

* Add concurrency to the content offers of neighborhoodGossip proc

And remove some whitespace

* Remove more whitespace and adjust for 80 char line limit

* Update fluffy grafana dashboard to include gossip offer results
This commit is contained in:
Kim De Mey 2022-04-01 18:01:50 +02:00 committed by GitHub
parent 84ff179cd9
commit 9d656e99c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 299 additions and 36 deletions

View File

@ -16,7 +16,7 @@
"editable": true, "editable": true,
"gnetId": null, "gnetId": null,
"graphTooltip": 0, "graphTooltip": 0,
"id": 12, "id": 13,
"links": [], "links": [],
"panels": [ "panels": [
{ {
@ -1397,7 +1397,7 @@
"y": 40 "y": 40
}, },
"hiddenSeries": false, "hiddenSeries": false,
"id": 8, "id": 44,
"legend": { "legend": {
"avg": false, "avg": false,
"current": false, "current": false,
@ -1425,17 +1425,25 @@
"targets": [ "targets": [
{ {
"exemplar": true, "exemplar": true,
"expr": "portal_message_decoding_failures_total", "expr": "rate(portal_gossip_offers_successful_total[$__rate_interval])",
"interval": "", "interval": "",
"legendFormat": "", "legendFormat": "portal_gossip_offers_successful[{{protocol_id}}]",
"refId": "A" "refId": "A"
},
{
"exemplar": true,
"expr": "rate(portal_gossip_offers_failed_total[$__rate_interval])",
"hide": false,
"interval": "",
"legendFormat": "portal_gossip_offers_failed[{{protocol_id}}]",
"refId": "B"
} }
], ],
"thresholds": [], "thresholds": [],
"timeFrom": null, "timeFrom": null,
"timeRegions": [], "timeRegions": [],
"timeShift": null, "timeShift": null,
"title": "Portal message decoding failures", "title": "Neighborhood gossip content offers",
"tooltip": { "tooltip": {
"shared": true, "shared": true,
"sort": 0, "sort": 0,
@ -1451,7 +1459,7 @@
}, },
"yaxes": [ "yaxes": [
{ {
"$$hashKey": "object:595", "$$hashKey": "object:4139",
"format": "short", "format": "short",
"label": null, "label": null,
"logBase": 1, "logBase": 1,
@ -1460,7 +1468,7 @@
"show": true "show": true
}, },
{ {
"$$hashKey": "object:596", "$$hashKey": "object:4140",
"format": "short", "format": "short",
"label": null, "label": null,
"logBase": 1, "logBase": 1,
@ -1578,6 +1586,102 @@
"alignLevel": null "alignLevel": null
} }
}, },
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": null,
"fieldConfig": {
"defaults": {},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 48
},
"hiddenSeries": false,
"id": 8,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "7.5.9",
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"exemplar": true,
"expr": "portal_message_decoding_failures_total",
"interval": "",
"legendFormat": "",
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Portal message decoding failures",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"$$hashKey": "object:595",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"$$hashKey": "object:596",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{ {
"aliasColors": {}, "aliasColors": {},
"bars": false, "bars": false,
@ -1640,7 +1744,9 @@
} }
], ],
"thresholds": [], "thresholds": [],
"timeFrom": null,
"timeRegions": [], "timeRegions": [],
"timeShift": null,
"title": "uTP outgoing connections", "title": "uTP outgoing connections",
"tooltip": { "tooltip": {
"shared": true, "shared": true,
@ -1742,7 +1848,9 @@
} }
], ],
"thresholds": [], "thresholds": [],
"timeFrom": null,
"timeRegions": [], "timeRegions": [],
"timeShift": null,
"title": "uTP Packets", "title": "uTP Packets",
"tooltip": { "tooltip": {
"shared": true, "shared": true,
@ -1781,9 +1889,105 @@
"align": false, "align": false,
"alignLevel": null "alignLevel": null
} }
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": null,
"fieldConfig": {
"defaults": {},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 69
},
"hiddenSeries": false,
"id": 42,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "7.5.9",
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"exemplar": true,
"expr": "utp_established_connections",
"interval": "",
"legendFormat": "",
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "uTP established connections",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"$$hashKey": "object:3811",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"$$hashKey": "object:3812",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
} }
], ],
"refresh": "5s", "refresh": false,
"schemaVersion": 27, "schemaVersion": 27,
"style": "dark", "style": "dark",
"tags": [], "tags": [],
@ -1797,6 +2001,6 @@
"timepicker": {}, "timepicker": {},
"timezone": "", "timezone": "",
"title": "Fluffy Dashboard", "title": "Fluffy Dashboard",
"uid": "iWQQPuPnk", "uid": "iWQQPuPnkadsf",
"version": 9 "version": 4
} }

View File

@ -38,7 +38,9 @@ func encodeKey(k: ContentKey): (ByteList, ContentId) =
let keyEncoded = encode(k) let keyEncoded = encode(k)
return (keyEncoded, toContentId(keyEncoded)) return (keyEncoded, toContentId(keyEncoded))
func getEncodedKeyForContent(cType: ContentType, chainId: uint16, hash: BlockHash): (ByteList, ContentId) = func getEncodedKeyForContent(
cType: ContentType, chainId: uint16, hash: BlockHash):
(ByteList, ContentId) =
let contentKeyType = ContentKeyType(chainId: chainId, blockHash: hash) let contentKeyType = ContentKeyType(chainId: chainId, blockHash: hash)
let contentKey = let contentKey =
@ -52,15 +54,16 @@ func getEncodedKeyForContent(cType: ContentType, chainId: uint16, hash: BlockHas
return encodeKey(contentKey) return encodeKey(contentKey)
proc validateHeaderBytes*(bytes: seq[byte], hash: BlockHash): Option[BlockHeader] = proc validateHeaderBytes*(
bytes: seq[byte], hash: BlockHash): Option[BlockHeader] =
try: try:
var rlp = rlpFromBytes(bytes) var rlp = rlpFromBytes(bytes)
let blockHeader = rlp.read(BlockHeader) let blockHeader = rlp.read(BlockHeader)
if not (blockHeader.blockHash() == hash): if not (blockHeader.blockHash() == hash):
# TODO: Header with different hash than expected maybe we should punish peer which sent # TODO: Header with different hash than expecte, maybe we should punish
# us this ? # peer which sent us this ?
return none(BlockHeader) return none(BlockHeader)
return some(blockHeader) return some(blockHeader)
@ -69,7 +72,9 @@ proc validateHeaderBytes*(bytes: seq[byte], hash: BlockHash): Option[BlockHeader
# TODO add some logging about failed decoding # TODO add some logging about failed decoding
return none(BlockHeader) return none(BlockHeader)
proc validateBodyBytes*(bytes: seq[byte], txRoot: KeccakHash, ommersHash: KeccakHash): Option[BlockBody] = proc validateBodyBytes*(
bytes: seq[byte], txRoot: KeccakHash, ommersHash: KeccakHash):
Option[BlockBody] =
try: try:
var rlp = rlpFromBytes(bytes) var rlp = rlpFromBytes(bytes)
@ -79,8 +84,9 @@ proc validateBodyBytes*(bytes: seq[byte], txRoot: KeccakHash, ommersHash: Keccak
let calculatedOmmersHash = rlpHash(blockBody.uncles) let calculatedOmmersHash = rlpHash(blockBody.uncles)
if txRoot != calculatedTxRoot or ommersHash != calculatedOmmersHash: if txRoot != calculatedTxRoot or ommersHash != calculatedOmmersHash:
# we got block body (bundle of transactions and uncles) which do not match # we got block body (bundle of transactions and uncles) which do not match
# header. For now just ignore it, but maybe we should penalize peer sending us such data? # header. For now just ignore it, but maybe we should penalize peer
# sending us such data?
return none(BlockBody) return none(BlockBody)
return some(blockBody) return some(blockBody)
@ -89,7 +95,8 @@ proc validateBodyBytes*(bytes: seq[byte], txRoot: KeccakHash, ommersHash: Keccak
# TODO add some logging about failed decoding # TODO add some logging about failed decoding
return none(BlockBody) return none(BlockBody)
proc getContentFromDb(h: HistoryNetwork, T: type, contentId: ContentId): Option[T] = proc getContentFromDb(
h: HistoryNetwork, T: type, contentId: ContentId): Option[T] =
if h.portalProtocol.inRange(contentId): if h.portalProtocol.inRange(contentId):
let contentFromDB = h.contentDB.get(contentId) let contentFromDB = h.contentDB.get(contentId)
if contentFromDB.isSome(): if contentFromDB.isSome():
@ -98,14 +105,17 @@ proc getContentFromDb(h: HistoryNetwork, T: type, contentId: ContentId): Option[
let content = rlp.read(T) let content = rlp.read(T)
return some(content) return some(content)
except CatchableError as e: except CatchableError as e:
# Content in db should always have valid formatting, so this should not happen # Content in db should always have valid formatting, so this should not
# happen
raiseAssert(e.msg) raiseAssert(e.msg)
else: else:
return none(T) return none(T)
else: else:
return none(T) return none(T)
proc getBlockHeader*(h: HistoryNetwork, chainId: uint16, hash: BlockHash): Future[Option[BlockHeader]] {.async.} = proc getBlockHeader*(
h: HistoryNetwork, chainId: uint16, hash: BlockHash):
Future[Option[BlockHeader]] {.async.} =
let (keyEncoded, contentId) = getEncodedKeyForContent(blockHeader, chainId, hash) let (keyEncoded, contentId) = getEncodedKeyForContent(blockHeader, chainId, hash)
let maybeHeaderFromDb = h.getContentFromDb(BlockHeader, contentId) let maybeHeaderFromDb = h.getContentFromDb(BlockHeader, contentId)
@ -128,7 +138,9 @@ proc getBlockHeader*(h: HistoryNetwork, chainId: uint16, hash: BlockHash): Futur
return maybeHeader return maybeHeader
proc getBlock*(h: HistoryNetwork, chainId: uint16, hash: BlockHash): Future[Option[Block]] {.async.} = proc getBlock*(
h: HistoryNetwork, chainId: uint16, hash: BlockHash):
Future[Option[Block]] {.async.} =
let maybeHeader = await h.getBlockHeader(chainId, hash) let maybeHeader = await h.getBlockHeader(chainId, hash)
if maybeHeader.isNone(): if maybeHeader.isNone():

View File

@ -19,7 +19,7 @@ const MAX* = high(Uint256)
# - modulo operations # - modulo operations
# - abs operation # - abs operation
# and the results are eqivalent to function described in spec. # and the results are eqivalent to function described in spec.
# #
# The way it works is as follows. Let say we have integers modulo 8: # The way it works is as follows. Let say we have integers modulo 8:
# [0, 1, 2, 3, 4, 5, 6, 7] # [0, 1, 2, 3, 4, 5, 6, 7]
# and we want to calculate minimal distance between 0 and 5. # and we want to calculate minimal distance between 0 and 5.
@ -27,7 +27,7 @@ const MAX* = high(Uint256)
# From this we know that the shorter distance is the one wraping around 0, which # From this we know that the shorter distance is the one wraping around 0, which
# is equal to 3 # is equal to 3
func stateDistance*(node_id: UInt256, content_id: UInt256): UInt256 = func stateDistance*(node_id: UInt256, content_id: UInt256): UInt256 =
let rawDiff = let rawDiff =
if node_id > content_id: if node_id > content_id:
node_id - content_id node_id - content_id
else: else:
@ -50,7 +50,7 @@ func log2DistanceImpl(value: UInt256): uint16 =
if value == UInt256.one: if value == UInt256.one:
return 0'u16 return 0'u16
var comp = value var comp = value
var ret = 0'u16 var ret = 0'u16
while (comp > 1): while (comp > 1):

View File

@ -53,6 +53,12 @@ declareHistogram portal_content_keys_offered,
declareHistogram portal_content_keys_accepted, declareHistogram portal_content_keys_accepted,
"Portal wire protocol amount of content keys per accept message received", "Portal wire protocol amount of content keys per accept message received",
labels = ["protocol_id"], buckets = contentKeysBuckets labels = ["protocol_id"], buckets = contentKeysBuckets
declareCounter portal_gossip_offers_successful,
"Portal wire protocol successful content offers from neighborhood gossip",
labels = ["protocol_id"]
declareCounter portal_gossip_offers_failed,
"Portal wire protocol failed content offers from neighborhood gossip",
labels = ["protocol_id"]
# Note: These metrics are to get some idea on how many enrs are send on average. # Note: These metrics are to get some idea on how many enrs are send on average.
# Relevant issue: https://github.com/ethereum/portal-network-specs/issues/136 # Relevant issue: https://github.com/ethereum/portal-network-specs/issues/136
@ -93,6 +99,20 @@ const
16 # HMAC 16 # HMAC
discv5MaxSize = 1280 discv5MaxSize = 1280
# These are the concurrent offers per Portal wire protocol that is running.
# Using the `offerQueue` allows for limiting the amount of offers send and
# thus how many streams can be started.
# TODO:
# More thought needs to go into this as it is currently on a per network
# basis. Keep it simple like that? Or limit it better at the stream transport
# level? In the latter case, this might still need to be checked/blocked at
# the very start of sending the offer, because blocking/waiting too long
# between the received accept message and actually starting the stream and
# sending data could give issues due to timeouts on the other side.
# And then there are still limits to be applied also for FindContent and the
# incoming directions.
concurrentOffers = 50
type type
ToContentIdHandler* = ToContentIdHandler* =
proc(contentKey: ByteList): Option[ContentId] {.raises: [Defect], gcsafe.} proc(contentKey: ByteList): Option[ContentId] {.raises: [Defect], gcsafe.}
@ -114,6 +134,8 @@ type
revalidateLoop: Future[void] revalidateLoop: Future[void]
stream*: PortalStream stream*: PortalStream
radiusCache: RadiusCache radiusCache: RadiusCache
offerQueue: AsyncQueue[(Node, ContentKeysList)]
offerWorkers: seq[Future[void]]
PortalResult*[T] = Result[T, cstring] PortalResult*[T] = Result[T, cstring]
@ -362,7 +384,8 @@ proc new*(T: type PortalProtocol,
toContentId: toContentId, toContentId: toContentId,
dataRadius: dataRadius, dataRadius: dataRadius,
bootstrapRecords: @bootstrapRecords, bootstrapRecords: @bootstrapRecords,
radiusCache: RadiusCache.init(256)) radiusCache: RadiusCache.init(256),
offerQueue: newAsyncQueue[(Node, ContentKeysList)](concurrentOffers))
proto.baseProtocol.registerTalkProtocol(@(proto.protocolId), proto).expect( proto.baseProtocol.registerTalkProtocol(@(proto.protocolId), proto).expect(
"Only one protocol should have this id") "Only one protocol should have this id")
@ -505,7 +528,7 @@ proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList):
id = dst.id id = dst.id
return err("Trying to connect to node with unknown address") return err("Trying to connect to node with unknown address")
let connectionResult = let connectionResult =
await p.stream.connectTo( await p.stream.connectTo(
nodeAddress.unsafeGet(), nodeAddress.unsafeGet(),
uint16.fromBytesBE(m.connectionId) uint16.fromBytesBE(m.connectionId)
@ -515,7 +538,7 @@ proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList):
error "Utp connection error while trying to find content", error "Utp connection error while trying to find content",
msg = connectionResult.error msg = connectionResult.error
return err("Error connecting uTP socket") return err("Error connecting uTP socket")
let socket = connectionResult.get() let socket = connectionResult.get()
# Read all bytes from the socket # Read all bytes from the socket
# This will either end with a FIN, or because the read action times out. # This will either end with a FIN, or because the read action times out.
@ -572,10 +595,10 @@ proc offer*(p: PortalProtocol, dst: Node, contentKeys: ContentKeysList):
error "Trying to connect to node with unknown address", error "Trying to connect to node with unknown address",
id = dst.id id = dst.id
return err("Trying to connect to node with unknown address") return err("Trying to connect to node with unknown address")
let connectionResult = let connectionResult =
await p.stream.connectTo( await p.stream.connectTo(
nodeAddress.unsafeGet(), nodeAddress.unsafeGet(),
uint16.fromBytesBE(m.connectionId) uint16.fromBytesBE(m.connectionId)
) )
@ -585,7 +608,7 @@ proc offer*(p: PortalProtocol, dst: Node, contentKeys: ContentKeysList):
return err("Error connecting uTP socket") return err("Error connecting uTP socket")
let clientSocket = connectionResult.get() let clientSocket = connectionResult.get()
for contentKey in requestedContentKeys: for contentKey in requestedContentKeys:
let contentIdOpt = p.toContentId(contentKey) let contentIdOpt = p.toContentId(contentKey)
if contentIdOpt.isSome(): if contentIdOpt.isSome():
@ -606,6 +629,19 @@ proc offer*(p: PortalProtocol, dst: Node, contentKeys: ContentKeysList):
else: else:
return err("No accept response") return err("No accept response")
proc offerWorker(p: PortalProtocol) {.async.} =
while true:
let (node, contentKeys) = await p.offerQueue.popFirst()
let res = await p.offer(node, contentKeys)
if res.isOk():
portal_gossip_offers_successful.inc(labelValues = [$p.protocolId])
else:
portal_gossip_offers_failed.inc(labelValues = [$p.protocolId])
proc offerQueueEmpty*(p: PortalProtocol): bool =
p.offerQueue.empty()
proc neighborhoodGossip*(p: PortalProtocol, contentKeys: ContentKeysList) {.async.} = proc neighborhoodGossip*(p: PortalProtocol, contentKeys: ContentKeysList) {.async.} =
let contentKey = contentKeys[0] # for now only 1 item is considered let contentKey = contentKeys[0] # for now only 1 item is considered
let contentIdOpt = p.toContentId(contentKey) let contentIdOpt = p.toContentId(contentKey)
@ -622,8 +658,7 @@ proc neighborhoodGossip*(p: PortalProtocol, contentKeys: ContentKeysList) {.asyn
NodeId(contentId), k = 6, seenOnly = false) NodeId(contentId), k = 6, seenOnly = false)
for node in closestNodes: for node in closestNodes:
# Not doing anything if this fails await p.offerQueue.addLast((node, contentKeys))
discard await p.offer(node, contentKeys)
proc processContent( proc processContent(
stream: PortalStream, contentKeys: ContentKeysList, content: seq[byte]) stream: PortalStream, contentKeys: ContentKeysList, content: seq[byte])
@ -947,12 +982,19 @@ proc start*(p: PortalProtocol) =
p.refreshLoop = refreshLoop(p) p.refreshLoop = refreshLoop(p)
p.revalidateLoop = revalidateLoop(p) p.revalidateLoop = revalidateLoop(p)
for i in 0 ..< concurrentOffers:
p.offerWorkers.add(offerWorker(p))
proc stop*(p: PortalProtocol) = proc stop*(p: PortalProtocol) =
if not p.revalidateLoop.isNil: if not p.revalidateLoop.isNil:
p.revalidateLoop.cancel() p.revalidateLoop.cancel()
if not p.refreshLoop.isNil: if not p.refreshLoop.isNil:
p.refreshLoop.cancel() p.refreshLoop.cancel()
for worker in p.offerWorkers:
worker.cancel()
p.offerWorkers = @[]
proc resolve*(p: PortalProtocol, id: NodeId): Future[Option[Node]] {.async.} = proc resolve*(p: PortalProtocol, id: NodeId): Future[Option[Node]] {.async.} =
## Resolve a `Node` based on provided `NodeId`. ## Resolve a `Node` based on provided `NodeId`.
## ##

View File

@ -100,8 +100,8 @@ proc addContentRequest*(
return connectionId return connectionId
proc connectTo*( proc connectTo*(
stream: PortalStream, stream: PortalStream,
nodeAddress: NodeAddress, nodeAddress: NodeAddress,
connectionId: uint16): Future[Result[UtpSocket[NodeAddress], string]] {.async.} = connectionId: uint16): Future[Result[UtpSocket[NodeAddress], string]] {.async.} =
let socketRes = await stream.transport.connectTo(nodeAddress, connectionId) let socketRes = await stream.transport.connectTo(nodeAddress, connectionId)
@ -112,7 +112,7 @@ proc connectTo*(
# connection id, in our use case it most probably means that other side sent us # connection id, in our use case it most probably means that other side sent us
# connection id which is already used. # connection id which is already used.
# For now we just fail connection and return an error. Another strategy to consider # For now we just fail connection and return an error. Another strategy to consider
# would be to check what is the connection status, and then re-use it, or # would be to check what is the connection status, and then re-use it, or
# close it and retry connection. # close it and retry connection.
let msg = "Socket to " & $nodeAddress & "with connection id: " & $connectionId & " already exists" let msg = "Socket to " & $nodeAddress & "with connection id: " & $connectionId & " already exists"
return err(msg) return err(msg)

View File

@ -154,6 +154,11 @@ proc propagateHistoryDb*(
# TODO: This call will get the content we just stored in the db, so it # TODO: This call will get the content we just stored in the db, so it
# might be an improvement to directly pass it. # might be an improvement to directly pass it.
await p.neighborhoodGossip(ContentKeysList(@[encode(value[0])])) await p.neighborhoodGossip(ContentKeysList(@[encode(value[0])]))
# Need to be sure that all offers where started. TODO: this is not great.
while not p.offerQueueEmpty():
error "WAITING FOR OFFER QUEUE EMPTY"
await sleepAsync(500.milliseconds)
return ok() return ok()
else: else:
return err(blockData.error) return err(blockData.error)