Add data radius cache per portal protocol (#930)
This commit is contained in:
parent
261c0b51a7
commit
69b0cc07b6
|
@ -118,7 +118,9 @@ template toSszType*(x: UInt256): array[32, byte] =
|
|||
toBytesLE(x)
|
||||
|
||||
func fromSszBytes*(T: type UInt256, data: openArray[byte]):
|
||||
T {.raises: [MalformedSszError, Defect].} =
|
||||
# TODO: These two SSZ Errors (order matters ugh!) is because of raisesssz
|
||||
# pragma on raiseIncorrectSize(). Need to fix that there.
|
||||
T {.raises: [Defect, MalformedSszError, SszSizeMismatchError].} =
|
||||
if data.len != sizeof(result):
|
||||
raiseIncorrectSize T
|
||||
|
||||
|
|
|
@ -15,7 +15,7 @@ import
|
|||
stew/results, chronicles, chronos, nimcrypto/hash, bearssl,
|
||||
ssz_serialization,
|
||||
eth/rlp, eth/p2p/discoveryv5/[protocol, node, enr, routing_table, random2,
|
||||
nodes_verification],
|
||||
nodes_verification, lru],
|
||||
../../content_db,
|
||||
"."/[portal_stream, portal_protocol_config],
|
||||
./messages
|
||||
|
@ -41,6 +41,8 @@ type
|
|||
|
||||
PortalProtocolId* = array[2, byte]
|
||||
|
||||
RadiusCache* = LRUCache[NodeId, UInt256]
|
||||
|
||||
PortalProtocol* = ref object of TalkProtocol
|
||||
protocolId: PortalProtocolId
|
||||
routingTable*: RoutingTable
|
||||
|
@ -53,6 +55,7 @@ type
|
|||
refreshLoop: Future[void]
|
||||
revalidateLoop: Future[void]
|
||||
stream: PortalStream
|
||||
radiusCache: RadiusCache
|
||||
|
||||
PortalResult*[T] = Result[T, cstring]
|
||||
|
||||
|
@ -86,7 +89,18 @@ func inRange*(p: PortalProtocol, contentId: ContentId): bool =
|
|||
let distance = p.routingTable.distance(p.localNode.id, contentId)
|
||||
distance <= p.dataRadius
|
||||
|
||||
func handlePing(p: PortalProtocol, ping: PingMessage): seq[byte] =
|
||||
func handlePing(
|
||||
p: PortalProtocol, ping: PingMessage, srcId: NodeId): seq[byte] =
|
||||
# TODO: This should become custom per Portal Network
|
||||
# TODO: Need to think about the effect of malicious actor sending lots of
|
||||
# pings from different nodes to clear the LRU.
|
||||
let customPayloadDecoded =
|
||||
try: SSZ.decode(ping.customPayload.asSeq(), CustomPayload)
|
||||
except MalformedSszError, SszSizeMismatchError:
|
||||
# invalid custom payload, send empty back
|
||||
return @[]
|
||||
p.radiusCache.put(srcId, customPayloadDecoded.dataRadius)
|
||||
|
||||
let customPayload = CustomPayload(dataRadius: p.dataRadius)
|
||||
let p = PongMessage(enrSeq: p.baseProtocol.localNode.record.seqNum,
|
||||
customPayload: ByteList(SSZ.encode(customPayload)))
|
||||
|
@ -209,7 +223,7 @@ proc messageHandler(protocol: TalkProtocol, request: seq[byte],
|
|||
|
||||
case message.kind
|
||||
of MessageKind.ping:
|
||||
p.handlePing(message.ping)
|
||||
p.handlePing(message.ping, srcId)
|
||||
of MessageKind.findnodes:
|
||||
p.handleFindNodes(message.findNodes)
|
||||
of MessageKind.findcontent:
|
||||
|
@ -248,7 +262,8 @@ proc new*(T: type PortalProtocol,
|
|||
toContentId: toContentId,
|
||||
stream: stream,
|
||||
dataRadius: dataRadius,
|
||||
bootstrapRecords: @bootstrapRecords)
|
||||
bootstrapRecords: @bootstrapRecords,
|
||||
radiusCache: RadiusCache.init(256))
|
||||
|
||||
proto.baseProtocol.registerTalkProtocol(@(proto.protocolId), proto).expect(
|
||||
"Only one protocol should have this id")
|
||||
|
@ -287,7 +302,7 @@ proc reqResponse[Request: SomeMessage, Response: SomeMessage](
|
|||
|
||||
return messageResponse
|
||||
|
||||
proc ping*(p: PortalProtocol, dst: Node):
|
||||
proc pingImpl*(p: PortalProtocol, dst: Node):
|
||||
Future[PortalResult[PongMessage]] {.async.} =
|
||||
let customPayload = CustomPayload(dataRadius: p.dataRadius)
|
||||
let ping = PingMessage(enrSeq: p.baseProtocol.localNode.record.seqNum,
|
||||
|
@ -332,6 +347,23 @@ proc recordsFromBytes*(rawRecords: List[ByteList, 32]): PortalResult[seq[Record]
|
|||
|
||||
ok(records)
|
||||
|
||||
proc ping*(p: PortalProtocol, dst: Node):
|
||||
Future[PortalResult[PongMessage]] {.async.} =
|
||||
let pongResponse = await p.pingImpl(dst)
|
||||
|
||||
if pongResponse.isOK():
|
||||
let pong = pongResponse.get()
|
||||
# TODO: This should become custom per Portal Network
|
||||
let customPayloadDecoded =
|
||||
try: SSZ.decode(pong.customPayload.asSeq(), CustomPayload)
|
||||
except MalformedSszError, SszSizeMismatchError:
|
||||
# invalid custom payload
|
||||
return err("Pong message contains invalid custom payload")
|
||||
|
||||
p.radiusCache.put(dst.id, customPayloadDecoded.dataRadius)
|
||||
|
||||
return pongResponse
|
||||
|
||||
proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList):
|
||||
Future[PortalResult[FoundContent]] {.async.} =
|
||||
let contentMessageResponse = await p.findContentImpl(dst, contentKey)
|
||||
|
|
Loading…
Reference in New Issue