Some more Options to Opt and similar changes (#1611)
Includes nim-eth bump.
This commit is contained in:
parent
7dbcf94280
commit
8ebac4c878
|
@ -344,21 +344,21 @@ proc get(db: ContentDB, T: type BlockHeader, contentId: ContentId): Opt[T] =
|
|||
|
||||
proc get(db: ContentDB, T: type BlockBody, contentId: ContentId,
|
||||
header: BlockHeader): Opt[T] =
|
||||
let encoded = db.get(contentId)
|
||||
if encoded.isNone():
|
||||
let encoded = db.get(contentId).valueOr:
|
||||
return Opt.none(T)
|
||||
|
||||
let timestamp = Moment.init(header.timestamp.toUnix(), Second)
|
||||
let body =
|
||||
if isShanghai(chainConfig, timestamp):
|
||||
BlockBody.fromPortalBlockBodyOrRaise(
|
||||
decodeSszOrRaise(encoded.get(), PortalBlockBodyShanghai))
|
||||
elif isPoSBlock(chainConfig, header.blockNumber.truncate(uint64)):
|
||||
BlockBody.fromPortalBlockBodyOrRaise(
|
||||
decodeSszOrRaise(encoded.get(), PortalBlockBodyLegacy))
|
||||
else:
|
||||
BlockBody.fromPortalBlockBodyOrRaise(
|
||||
decodeSszOrRaise(encoded.get(), PortalBlockBodyLegacy))
|
||||
let
|
||||
timestamp = Moment.init(header.timestamp.toUnix(), Second)
|
||||
body =
|
||||
if isShanghai(chainConfig, timestamp):
|
||||
BlockBody.fromPortalBlockBodyOrRaise(
|
||||
decodeSszOrRaise(encoded, PortalBlockBodyShanghai))
|
||||
elif isPoSBlock(chainConfig, header.blockNumber.truncate(uint64)):
|
||||
BlockBody.fromPortalBlockBodyOrRaise(
|
||||
decodeSszOrRaise(encoded, PortalBlockBodyLegacy))
|
||||
else:
|
||||
BlockBody.fromPortalBlockBodyOrRaise(
|
||||
decodeSszOrRaise(encoded, PortalBlockBodyLegacy))
|
||||
|
||||
Opt.some(body)
|
||||
|
||||
|
@ -733,17 +733,13 @@ proc validateContent(
|
|||
for i, contentItem in contentItems:
|
||||
let contentKey = contentKeys[i]
|
||||
if await n.validateContent(contentItem, contentKey):
|
||||
let contentIdOpt = n.portalProtocol.toContentId(contentKey)
|
||||
if contentIdOpt.isNone():
|
||||
let contentId = n.portalProtocol.toContentId(contentKey).valueOr:
|
||||
error "Received offered content with invalid content key", contentKey
|
||||
return false
|
||||
|
||||
let contentId = contentIdOpt.get()
|
||||
|
||||
n.portalProtocol.storeContent(contentKey, contentId, contentItem)
|
||||
|
||||
info "Received offered content validated successfully", contentKey
|
||||
|
||||
else:
|
||||
error "Received offered content failed validation", contentKey
|
||||
return false
|
||||
|
|
|
@ -223,7 +223,7 @@ proc addNode*(p: PortalProtocol, r: Record): bool =
|
|||
else:
|
||||
false
|
||||
|
||||
proc getNode*(p: PortalProtocol, id: NodeId): Option[Node] =
|
||||
proc getNode*(p: PortalProtocol, id: NodeId): Opt[Node] =
|
||||
p.routingTable.getNode(id)
|
||||
|
||||
func localNode*(p: PortalProtocol): Node = p.baseProtocol.localNode
|
||||
|
@ -316,15 +316,13 @@ proc handleFindContent(
|
|||
maxPayloadSize = maxDiscv5PacketSize - talkRespOverhead - contentOverhead
|
||||
enrOverhead = 4 # per added ENR, 4 bytes offset overhead
|
||||
|
||||
let contentIdResult = p.toContentId(fc.contentKey)
|
||||
|
||||
if contentIdResult.isErr:
|
||||
let contentId = p.toContentId(fc.contentKey).valueOr:
|
||||
# Return empty response when content key validation fails
|
||||
# TODO: Better would be to return no message at all? Needs changes on
|
||||
# discv5 layer.
|
||||
return @[]
|
||||
|
||||
let contentResult = p.dbGet(fc.contentKey, contentIdResult.get())
|
||||
let contentResult = p.dbGet(fc.contentKey, contentId)
|
||||
|
||||
if contentResult.isOk():
|
||||
let content = contentResult.get()
|
||||
|
@ -340,7 +338,7 @@ proc handleFindContent(
|
|||
# Don't have the content, send closest neighbours to content id.
|
||||
let
|
||||
closestNodes = p.routingTable.neighbours(
|
||||
NodeId(contentIdResult.get()), seenOnly = true)
|
||||
NodeId(contentId), seenOnly = true)
|
||||
enrs = truncateEnrs(closestNodes, maxPayloadSize, enrOverhead)
|
||||
portal_content_enrs_packed.observe(enrs.len().int64)
|
||||
|
||||
|
@ -438,8 +436,8 @@ proc getInitialRadius(rc: RadiusConfig): UInt256 =
|
|||
# In case of a dynamic radius we start from the maximum value to quickly
|
||||
# gather as much data as possible, and also make sure each data piece in
|
||||
# the database is in our range after a node restart.
|
||||
# Alternative would be to store node the radius in database, and initialize it
|
||||
# from database after a restart
|
||||
# Alternative would be to store node the radius in database, and initialize
|
||||
# it from database after a restart
|
||||
return UInt256.high()
|
||||
|
||||
proc new*(T: type PortalProtocol,
|
||||
|
@ -609,25 +607,20 @@ proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList):
|
|||
return err("Trying to connect to node with unknown address")
|
||||
|
||||
# uTP protocol uses BE for all values in the header, incl. connection id
|
||||
let connectionResult =
|
||||
await p.stream.connectTo(
|
||||
nodeAddress.unsafeGet(),
|
||||
uint16.fromBytesBE(m.connectionId)
|
||||
)
|
||||
|
||||
if connectionResult.isErr():
|
||||
debug "uTP connection error while trying to find content",
|
||||
error = connectionResult.error
|
||||
let socket =
|
||||
(await p.stream.connectTo(
|
||||
nodeAddress.unsafeGet(),
|
||||
uint16.fromBytesBE(m.connectionId)
|
||||
)).valueOr:
|
||||
debug "uTP connection error for find content", error
|
||||
return err("Error connecting uTP socket")
|
||||
|
||||
let socket = connectionResult.get()
|
||||
|
||||
try:
|
||||
# Read all bytes from the socket
|
||||
# This will either end with a FIN, or because the read action times out.
|
||||
# A FIN does not necessarily mean that the data read is complete. Further
|
||||
# validation is required, using a length prefix here might be beneficial for
|
||||
# this.
|
||||
# A FIN does not necessarily mean that the data read is complete.
|
||||
# Further validation is required, using a length prefix here might be
|
||||
# beneficial for this.
|
||||
let readFut = socket.read()
|
||||
|
||||
readFut.cancelCallback = proc(udate: pointer) {.gcsafe.} =
|
||||
|
@ -649,7 +642,7 @@ proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList):
|
|||
else :
|
||||
debug "Socket read time-out",
|
||||
socketKey = socket.socketKey
|
||||
# Note: This might look a bit strange, be not doing a socket.close()
|
||||
# Note: This might look a bit strange, but not doing a socket.close()
|
||||
# here as this is already done internally. utp_socket `checkTimeouts`
|
||||
# already does a socket.destroy() on timeout. Might want to change the
|
||||
# API on this later though.
|
||||
|
@ -771,19 +764,14 @@ proc offer(p: PortalProtocol, o: OfferRequest):
|
|||
id = o.dst.id
|
||||
return err("Trying to connect to node with unknown address")
|
||||
|
||||
let connectionResult =
|
||||
await p.stream.connectTo(
|
||||
nodeAddress.unsafeGet(),
|
||||
uint16.fromBytesBE(m.connectionId)
|
||||
)
|
||||
|
||||
if connectionResult.isErr():
|
||||
debug "Utp connection error while trying to offer content",
|
||||
error = connectionResult.error
|
||||
let socket =
|
||||
(await p.stream.connectTo(
|
||||
nodeAddress.unsafeGet(),
|
||||
uint16.fromBytesBE(m.connectionId)
|
||||
)).valueOr:
|
||||
debug "uTP connection error for offer content", error
|
||||
return err("Error connecting uTP socket")
|
||||
|
||||
let socket = connectionResult.get()
|
||||
|
||||
template lenu32(x: untyped): untyped =
|
||||
uint32(len(x))
|
||||
|
||||
|
@ -797,10 +785,8 @@ proc offer(p: PortalProtocol, o: OfferRequest):
|
|||
output.write(toBytes(content.lenu32, Leb128).toOpenArray())
|
||||
output.write(content)
|
||||
|
||||
let dataWritten = await socket.write(output.getOutput)
|
||||
if dataWritten.isErr:
|
||||
debug "Error writing requested data",
|
||||
error = dataWritten.error
|
||||
let dataWritten = (await socket.write(output.getOutput)).valueOr:
|
||||
debug "Error writing requested data", error
|
||||
# No point in trying to continue writing data
|
||||
socket.close()
|
||||
return err("Error writing requested data")
|
||||
|
@ -825,10 +811,8 @@ proc offer(p: PortalProtocol, o: OfferRequest):
|
|||
# When data turns out missing, add a 0 size varint
|
||||
output.write(toBytes(0'u8, Leb128).toOpenArray())
|
||||
|
||||
let dataWritten = await socket.write(output.getOutput)
|
||||
if dataWritten.isErr:
|
||||
debug "Error writing requested data",
|
||||
error = dataWritten.error
|
||||
let dataWritten = (await socket.write(output.getOutput)).valueOr:
|
||||
debug "Error writing requested data", error
|
||||
# No point in trying to continue writing data
|
||||
socket.close()
|
||||
return err("Error writing requested data")
|
||||
|
@ -1143,12 +1127,9 @@ proc neighborhoodGossip*(
|
|||
|
||||
# Just taking the first content item as target id.
|
||||
# TODO: come up with something better?
|
||||
let contentIdOpt = p.toContentId(contentList[0].contentKey)
|
||||
if contentIdOpt.isNone():
|
||||
let contentId = p.toContentId(contentList[0].contentKey).valueOr:
|
||||
return 0
|
||||
|
||||
let contentId = contentIdOpt.get()
|
||||
|
||||
# For selecting the closest nodes to whom to gossip the content a mixed
|
||||
# approach is taken:
|
||||
# 1. Select the closest neighbours in the routing table
|
||||
|
@ -1163,7 +1144,6 @@ proc neighborhoodGossip*(
|
|||
# in its propagation than when looking only for nodes in the own routing
|
||||
# table, but at the same time avoid unnecessary node lookups.
|
||||
# It might still cause issues in data getting propagated in a wider id range.
|
||||
|
||||
const maxGossipNodes = 8
|
||||
|
||||
let closestLocalNodes = p.routingTable.neighbours(
|
||||
|
@ -1308,7 +1288,7 @@ proc stop*(p: PortalProtocol) =
|
|||
worker.cancel()
|
||||
p.offerWorkers = @[]
|
||||
|
||||
proc resolve*(p: PortalProtocol, id: NodeId): Future[Option[Node]] {.async.} =
|
||||
proc resolve*(p: PortalProtocol, id: NodeId): Future[Opt[Node]] {.async.} =
|
||||
## Resolve a `Node` based on provided `NodeId`.
|
||||
##
|
||||
## This will first look in the own routing table. If the node is known, it
|
||||
|
@ -1316,14 +1296,14 @@ proc resolve*(p: PortalProtocol, id: NodeId): Future[Option[Node]] {.async.} =
|
|||
## does not reply, a lookup is done to see if it can find a (newer) record of
|
||||
## the node on the network.
|
||||
if id == p.localNode.id:
|
||||
return some(p.localNode)
|
||||
return Opt.some(p.localNode)
|
||||
|
||||
let node = p.routingTable.getNode(id)
|
||||
let node = p.getNode(id)
|
||||
if node.isSome():
|
||||
let nodesMessage = await p.findNodes(node.get(), @[0'u16])
|
||||
# TODO: Handle failures better. E.g. stop on different failures than timeout
|
||||
if nodesMessage.isOk() and nodesMessage[].len > 0:
|
||||
return some(nodesMessage[][0])
|
||||
return Opt.some(nodesMessage[][0])
|
||||
|
||||
let discovered = await p.lookup(id)
|
||||
for n in discovered:
|
||||
|
@ -1331,11 +1311,12 @@ proc resolve*(p: PortalProtocol, id: NodeId): Future[Option[Node]] {.async.} =
|
|||
if node.isSome() and node.get().record.seqNum >= n.record.seqNum:
|
||||
return node
|
||||
else:
|
||||
return some(n)
|
||||
return Opt.some(n)
|
||||
|
||||
return node
|
||||
|
||||
proc resolveWithRadius*(p: PortalProtocol, id: NodeId): Future[Option[(Node, UInt256)]] {.async.} =
|
||||
proc resolveWithRadius*(
|
||||
p: PortalProtocol, id: NodeId): Future[Opt[(Node, UInt256)]] {.async.} =
|
||||
## Resolve a `Node` based on provided `NodeId`, also try to establish what
|
||||
## is known radius of found node.
|
||||
##
|
||||
|
@ -1349,30 +1330,24 @@ proc resolveWithRadius*(p: PortalProtocol, id: NodeId): Future[Option[(Node, UIn
|
|||
##
|
||||
|
||||
let n = await p.resolve(id)
|
||||
|
||||
if n.isNone():
|
||||
return none((Node, UInt256))
|
||||
return Opt.none((Node, UInt256))
|
||||
|
||||
let node = n.unsafeGet()
|
||||
|
||||
let r = p.radiusCache.get(id)
|
||||
|
||||
if r.isSome():
|
||||
return some((node, r.unsafeGet()))
|
||||
return Opt.some((node, r.unsafeGet()))
|
||||
|
||||
let pongResult = await p.ping(node)
|
||||
|
||||
if pongResult.isOk():
|
||||
let maybeRadius = p.radiusCache.get(id)
|
||||
|
||||
# After successful ping radius should already be in cache, but for the unlikely
|
||||
# case that it is not, check it just to be sure.
|
||||
# TODO: rafactor ping to return node radius.
|
||||
# After successful ping radius should already be in cache, but for the
|
||||
# unlikely case that it is not, check it just to be sure.
|
||||
# TODO: refactor ping to return node radius.
|
||||
if maybeRadius.isNone():
|
||||
return none((Node, UInt256))
|
||||
|
||||
# If pong is successful, radius of the node should definitly be in local
|
||||
# radius cache
|
||||
return some((node, maybeRadius.unsafeGet()))
|
||||
return Opt.none((Node, UInt256))
|
||||
else:
|
||||
return Opt.some((node, maybeRadius.unsafeGet()))
|
||||
else:
|
||||
return none((Node, UInt256))
|
||||
return Opt.none((Node, UInt256))
|
||||
|
|
|
@ -1 +1 @@
|
|||
Subproject commit 6b8a7b009eb94bfdac1410f243865984bdd7c4f2
|
||||
Subproject commit 26ae539598e31efbaa016f6694b9a60ea08fc4b6
|
Loading…
Reference in New Issue