State network content retrieval (#2033)
* initial rework of contentLoop for state network * prepare content for storage and retrieval * modify test to account for retrieval data
This commit is contained in:
parent
402a3eda73
commit
47b254d8b5
|
@ -98,6 +98,33 @@ type
|
||||||
ContractCodeRetrieval* = object
|
ContractCodeRetrieval* = object
|
||||||
code*: Bytecode
|
code*: Bytecode
|
||||||
|
|
||||||
|
OfferContentValueType* = enum
|
||||||
|
accountTrieNodeOffer,
|
||||||
|
contractTrieNodeOffer,
|
||||||
|
contractCodeOffer,
|
||||||
|
|
||||||
|
OfferContentValue* = object
|
||||||
|
case contentType*: ContentType
|
||||||
|
of unused:
|
||||||
|
discard
|
||||||
|
of accountTrieNode:
|
||||||
|
accountTrieNode*: AccountTrieNodeOffer
|
||||||
|
of contractTrieNode:
|
||||||
|
contractTrieNode*: ContractTrieNodeOffer
|
||||||
|
of contractCode:
|
||||||
|
contractCode*: ContractCodeOffer
|
||||||
|
|
||||||
|
RetrievalContentValue* = object
|
||||||
|
case contentType*: ContentType
|
||||||
|
of unused:
|
||||||
|
discard
|
||||||
|
of accountTrieNode:
|
||||||
|
accountTrieNode*: AccountTrieNodeRetrieval
|
||||||
|
of contractTrieNode:
|
||||||
|
contractTrieNode*: ContractTrieNodeRetrieval
|
||||||
|
of contractCode:
|
||||||
|
contractCode*: ContractCodeRetrieval
|
||||||
|
|
||||||
func encode*(contentKey: ContentKey): ByteList =
|
func encode*(contentKey: ContentKey): ByteList =
|
||||||
doAssert(contentKey.contentType != unused)
|
doAssert(contentKey.contentType != unused)
|
||||||
ByteList.init(SSZ.encode(contentKey))
|
ByteList.init(SSZ.encode(contentKey))
|
||||||
|
@ -125,6 +152,60 @@ func toContentId*(contentKey: ByteList): ContentId =
|
||||||
func toContentId*(contentKey: ContentKey): ContentId =
|
func toContentId*(contentKey: ContentKey): ContentId =
|
||||||
toContentId(encode(contentKey))
|
toContentId(encode(contentKey))
|
||||||
|
|
||||||
|
func offerContentToRetrievalContent*(offerContent: OfferContentValue): RetrievalContentValue =
|
||||||
|
case offerContent.contentType:
|
||||||
|
of unused:
|
||||||
|
raiseAssert "Converting content with unused content type"
|
||||||
|
of accountTrieNode:
|
||||||
|
RetrievalContentValue(
|
||||||
|
contentType: accountTrieNode,
|
||||||
|
accountTrieNode: AccountTrieNodeRetrieval(node: offerContent.accountTrieNode.proof[^1])
|
||||||
|
) # TODO implement properly
|
||||||
|
of contractTrieNode:
|
||||||
|
RetrievalContentValue(
|
||||||
|
contentType: contractTrieNode,
|
||||||
|
contractTrieNode: ContractTrieNodeRetrieval(node: offerContent.contractTrieNode.storageProof[^1])
|
||||||
|
) # TODO implement properly
|
||||||
|
of contractCode:
|
||||||
|
RetrievalContentValue(
|
||||||
|
contentType: contractCode,
|
||||||
|
contractCode: ContractCodeRetrieval(code: offerContent.contractCode.code)
|
||||||
|
)
|
||||||
|
|
||||||
|
func encode*(content: RetrievalContentValue): seq[byte] =
|
||||||
|
case content.contentType:
|
||||||
|
of unused:
|
||||||
|
raiseAssert "Encoding content with unused content type"
|
||||||
|
of accountTrieNode:
|
||||||
|
SSZ.encode(content.accountTrieNode)
|
||||||
|
of contractTrieNode:
|
||||||
|
SSZ.encode(content.contractTrieNode)
|
||||||
|
of contractCode:
|
||||||
|
SSZ.encode(content.contractCode)
|
||||||
|
|
||||||
|
func decodeKV*(contentKey: ByteList, contentValue: seq[byte]): Opt[(ContentKey, OfferContentValue)] =
|
||||||
|
const empty = Opt.none((ContentKey, OfferContentValue))
|
||||||
|
let
|
||||||
|
key = contentKey.decode().valueOr:
|
||||||
|
return empty
|
||||||
|
value = case key.contentType:
|
||||||
|
of unused:
|
||||||
|
return empty
|
||||||
|
of accountTrieNode:
|
||||||
|
let val = decodeSsz(contentValue, AccountTrieNodeOffer).valueOr:
|
||||||
|
return empty
|
||||||
|
OfferContentValue(contentType: accountTrieNode, accountTrieNode: val)
|
||||||
|
of contractTrieNode:
|
||||||
|
let val = decodeSsz(contentValue, ContractTrieNodeOffer).valueOr:
|
||||||
|
return empty
|
||||||
|
OfferContentValue(contentType: contractTrieNode, contractTrieNode: val)
|
||||||
|
of contractCode:
|
||||||
|
let val = decodeSsz(contentValue, ContractCodeOffer).valueOr:
|
||||||
|
return empty
|
||||||
|
OfferContentValue(contentType: contractCode, contractCode: val)
|
||||||
|
|
||||||
|
Opt.some((key, value))
|
||||||
|
|
||||||
func packNibbles*(nibbles: seq[byte]): Nibbles =
|
func packNibbles*(nibbles: seq[byte]): Nibbles =
|
||||||
doAssert(nibbles.len() <= MAX_UNPACKED_NIBBLES_LEN, "Can't pack more than 64 nibbles")
|
doAssert(nibbles.len() <= MAX_UNPACKED_NIBBLES_LEN, "Can't pack more than 64 nibbles")
|
||||||
|
|
||||||
|
|
|
@ -60,70 +60,39 @@ proc getContent*(n: StateNetwork, key: ContentKey):
|
||||||
# domain types.
|
# domain types.
|
||||||
return Opt.some(contentResult.content)
|
return Opt.some(contentResult.content)
|
||||||
|
|
||||||
proc validateAccountTrieNode(key: ContentKey, contentValue: seq[byte]): bool =
|
proc validateAccountTrieNode(key: ContentKey, contentValue: OfferContentValue): bool =
|
||||||
let value = decodeSsz(contentValue, AccountTrieNodeOffer).valueOr:
|
|
||||||
warn "Received invalid account trie proof", error
|
|
||||||
return false
|
|
||||||
true
|
true
|
||||||
|
|
||||||
proc validateContractTrieNode(key: ContentKey, contentValue: seq[byte]): bool =
|
proc validateContractTrieNode(key: ContentKey, contentValue: OfferContentValue): bool =
|
||||||
let value = decodeSsz(contentValue, ContractTrieNodeOffer).valueOr:
|
|
||||||
warn "Received invalid contract trie proof", error
|
|
||||||
return false
|
|
||||||
true
|
true
|
||||||
|
|
||||||
proc validateContractCode(key: ContentKey, contentValue: seq[byte]): bool =
|
proc validateContractCode(key: ContentKey, contentValue: OfferContentValue): bool =
|
||||||
let value = decodeSsz(contentValue, ContractCodeOffer).valueOr:
|
|
||||||
warn "Received invalid contract code", error
|
|
||||||
return false
|
|
||||||
true
|
true
|
||||||
|
|
||||||
proc validateContent*(
|
proc validateContent*(
|
||||||
n: StateNetwork,
|
contentKey: ContentKey,
|
||||||
contentKey: ByteList,
|
contentValue: OfferContentValue): bool =
|
||||||
contentValue: seq[byte]): bool =
|
case contentKey.contentType:
|
||||||
let key = contentKey.decode().valueOr:
|
|
||||||
return false
|
|
||||||
|
|
||||||
case key.contentType:
|
|
||||||
of unused:
|
of unused:
|
||||||
warn "Received content with unused content type"
|
warn "Received content with unused content type"
|
||||||
false
|
false
|
||||||
of accountTrieNode:
|
of accountTrieNode:
|
||||||
validateAccountTrieNode(key, contentValue)
|
validateAccountTrieNode(contentKey, contentValue)
|
||||||
of contractTrieNode:
|
of contractTrieNode:
|
||||||
validateContractTrieNode(key, contentValue)
|
validateContractTrieNode(contentKey, contentValue)
|
||||||
of contractCode:
|
of contractCode:
|
||||||
validateContractCode(key, contentValue)
|
validateContractCode(contentKey, contentValue)
|
||||||
|
|
||||||
proc validateContent(
|
|
||||||
n: StateNetwork,
|
|
||||||
contentKeys: ContentKeysList,
|
|
||||||
contentValues: seq[seq[byte]]): bool =
|
|
||||||
for i, contentValue in contentValues:
|
|
||||||
let contentKey = contentKeys[i]
|
|
||||||
if n.validateContent(contentKey, contentValue):
|
|
||||||
let contentId = n.portalProtocol.toContentId(contentKey).valueOr:
|
|
||||||
error "Received offered content with invalid content key", contentKey
|
|
||||||
return false
|
|
||||||
|
|
||||||
n.portalProtocol.storeContent(contentKey, contentId, contentValue)
|
|
||||||
|
|
||||||
info "Received offered content validated successfully", contentKey
|
|
||||||
else:
|
|
||||||
error "Received offered content failed validation", contentKey
|
|
||||||
return false
|
|
||||||
|
|
||||||
proc recursiveGossipAccountTrieNode(
|
proc recursiveGossipAccountTrieNode(
|
||||||
p: PortalProtocol,
|
p: PortalProtocol,
|
||||||
maybeSrcNodeId: Opt[NodeId],
|
maybeSrcNodeId: Opt[NodeId],
|
||||||
decodedKey: ContentKey,
|
decodedKey: ContentKey,
|
||||||
contentValue: seq[byte]
|
decodedValue: AccountTrieNodeOffer
|
||||||
): Future[void] {.async.} =
|
): Future[void] {.async.} =
|
||||||
var nibbles = decodedKey.accountTrieNodeKey.path.unpackNibbles()
|
var
|
||||||
let decodedValue = decodeSsz(contentValue, AccountTrieNodeOffer).valueOr:
|
nibbles = decodedKey.accountTrieNodeKey.path.unpackNibbles()
|
||||||
raiseAssert "Received offered content failed validation"
|
proof = decodedValue.proof
|
||||||
var proof = decodedValue.proof
|
|
||||||
discard nibbles.pop()
|
discard nibbles.pop()
|
||||||
discard (distinctBase proof).pop()
|
discard (distinctBase proof).pop()
|
||||||
let
|
let
|
||||||
|
@ -144,33 +113,29 @@ proc recursiveGossipContractTrieNode(
|
||||||
p: PortalProtocol,
|
p: PortalProtocol,
|
||||||
maybeSrcNodeId: Opt[NodeId],
|
maybeSrcNodeId: Opt[NodeId],
|
||||||
decodedKey: ContentKey,
|
decodedKey: ContentKey,
|
||||||
contentValue: seq[byte]
|
decodedValue: ContractTrieNodeOffer
|
||||||
): Future[void] {.async.} =
|
): Future[void] {.async.} =
|
||||||
return
|
return
|
||||||
|
|
||||||
proc gossipContent*(
|
proc gossipContent*(
|
||||||
p: PortalProtocol,
|
p: PortalProtocol,
|
||||||
maybeSrcNodeId: Opt[NodeId],
|
maybeSrcNodeId: Opt[NodeId],
|
||||||
contentKeys: ContentKeysList,
|
contentKey: ByteList,
|
||||||
contentValues: seq[seq[byte]]
|
decodedKey: ContentKey,
|
||||||
|
contentValue: seq[byte],
|
||||||
|
decodedValue: OfferContentValue
|
||||||
): Future[void] {.async.} =
|
): Future[void] {.async.} =
|
||||||
for i, contentValue in contentValues:
|
case decodedKey.contentType:
|
||||||
let
|
of unused:
|
||||||
contentKey = contentKeys[i]
|
raiseAssert "Gossiping content with unused content type"
|
||||||
decodedKey = contentKey.decode().valueOr:
|
of accountTrieNode:
|
||||||
raiseAssert "Received offered content with invalid content key"
|
await recursiveGossipAccountTrieNode(p, maybeSrcNodeId, decodedKey, decodedValue.accountTrieNode)
|
||||||
case decodedKey.contentType:
|
of contractTrieNode:
|
||||||
of unused:
|
await recursiveGossipContractTrieNode(p, maybeSrcNodeId, decodedKey, decodedValue.contractTrieNode)
|
||||||
warn("Gossiping content with unused content type")
|
of contractCode:
|
||||||
continue
|
await p.neighborhoodGossipDiscardPeers(
|
||||||
of accountTrieNode:
|
maybeSrcNodeId, ContentKeysList.init(@[contentKey]), @[contentValue]
|
||||||
await recursiveGossipAccountTrieNode(p, maybeSrcNodeId, decodedKey, contentValue)
|
)
|
||||||
of contractTrieNode:
|
|
||||||
await recursiveGossipContractTrieNode(p, maybeSrcNodeId, decodedKey, contentValue)
|
|
||||||
of contractCode:
|
|
||||||
await neighborhoodGossipDiscardPeers(
|
|
||||||
p, maybeSrcNodeId, ContentKeysList.init(@[contentKey]), @[contentValue]
|
|
||||||
)
|
|
||||||
|
|
||||||
proc new*(
|
proc new*(
|
||||||
T: type StateNetwork,
|
T: type StateNetwork,
|
||||||
|
@ -201,13 +166,32 @@ proc processContentLoop(n: StateNetwork) {.async.} =
|
||||||
try:
|
try:
|
||||||
while true:
|
while true:
|
||||||
let (maybeSrcNodeId, contentKeys, contentValues) = await n.contentQueue.popFirst()
|
let (maybeSrcNodeId, contentKeys, contentValues) = await n.contentQueue.popFirst()
|
||||||
if n.validateContent(contentKeys, contentValues):
|
for i, contentValue in contentValues:
|
||||||
await gossipContent(
|
let
|
||||||
n.portalProtocol,
|
contentKey = contentKeys[i]
|
||||||
maybeSrcNodeId,
|
(decodedKey, decodedValue) = decodeKV(contentKey, contentValue).valueOr:
|
||||||
contentKeys,
|
error "Unable to decode offered Key/Value"
|
||||||
contentValues
|
continue
|
||||||
|
if validateContent(decodedKey, decodedValue):
|
||||||
|
let
|
||||||
|
valueForRetrieval = decodedValue.offerContentToRetrievalContent().encode()
|
||||||
|
contentId = n.portalProtocol.toContentId(contentKey).valueOr:
|
||||||
|
error "Received offered content with invalid content key", contentKey
|
||||||
|
continue
|
||||||
|
|
||||||
|
n.portalProtocol.storeContent(contentKey, contentId, valueForRetrieval)
|
||||||
|
info "Received offered content validated successfully", contentKey
|
||||||
|
|
||||||
|
await gossipContent(
|
||||||
|
n.portalProtocol,
|
||||||
|
maybeSrcNodeId,
|
||||||
|
contentKey,
|
||||||
|
decodedKey,
|
||||||
|
contentValue,
|
||||||
|
decodedValue
|
||||||
)
|
)
|
||||||
|
else:
|
||||||
|
error "Received offered content failed validation", contentKey
|
||||||
except CancelledError:
|
except CancelledError:
|
||||||
trace "processContentLoop canceled"
|
trace "processContentLoop canceled"
|
||||||
|
|
||||||
|
|
|
@ -39,30 +39,49 @@ procSuite "State Network Gossip":
|
||||||
proto.start()
|
proto.start()
|
||||||
clients.add(proto)
|
clients.add(proto)
|
||||||
|
|
||||||
for i, pair in recursiveGossipSteps[0..^2]:
|
for i in 0..numOfClients-1:
|
||||||
let
|
let
|
||||||
currentNode = clients[i]
|
currentNode = clients[i]
|
||||||
nextNode = clients[i+1]
|
nextNode = clients[i+1]
|
||||||
key = ByteList.init(pair.content_key.hexToSeqByte())
|
|
||||||
decodedKey = key.decode().valueOr:
|
|
||||||
raiseAssert "Cannot decode key"
|
|
||||||
nextKey = ByteList.init(recursiveGossipSteps[1].content_key.hexToSeqByte())
|
|
||||||
decodedNextKey = nextKey.decode().valueOr:
|
|
||||||
raiseAssert "Cannot decode key"
|
|
||||||
value = pair.content_value.hexToSeqByte()
|
|
||||||
nextValue = recursiveGossipSteps[1].content_value.hexToSeqByte()
|
|
||||||
|
|
||||||
check:
|
check:
|
||||||
currentNode.portalProtocol.addNode(nextNode.portalProtocol.localNode) == Added
|
currentNode.portalProtocol.addNode(nextNode.portalProtocol.localNode) == Added
|
||||||
(await currentNode.portalProtocol.ping(nextNode.portalProtocol.localNode)).isOk()
|
(await currentNode.portalProtocol.ping(nextNode.portalProtocol.localNode)).isOk()
|
||||||
|
|
||||||
await currentNode.portalProtocol.gossipContent(Opt.none(NodeId), ContentKeysList.init(@[key]), @[value])
|
for i in 0..numOfClients-1:
|
||||||
await sleepAsync(100.milliseconds)
|
let
|
||||||
let gossipedValue = await nextNode.getContent(decodedNextKey)
|
pair = recursiveGossipSteps[i]
|
||||||
|
currentNode = clients[i]
|
||||||
|
nextNode = clients[i+1]
|
||||||
|
|
||||||
check:
|
key = ByteList.init(pair.content_key.hexToSeqByte())
|
||||||
gossipedValue.isSome()
|
decodedKey = key.decode().valueOr:
|
||||||
gossipedValue.get() == nextValue
|
raiseAssert "Cannot decode key"
|
||||||
|
|
||||||
|
nextKey = ByteList.init(recursiveGossipSteps[1].content_key.hexToSeqByte())
|
||||||
|
decodedNextKey = nextKey.decode().valueOr:
|
||||||
|
raiseAssert "Cannot decode key"
|
||||||
|
|
||||||
|
value = pair.content_value.hexToSeqByte()
|
||||||
|
decodedValue = SSZ.decode(value, AccountTrieNodeOffer)
|
||||||
|
offerValue = OfferContentValue(contentType: accountTrieNode, accountTrieNode: decodedValue)
|
||||||
|
|
||||||
|
nextValue = recursiveGossipSteps[1].content_value.hexToSeqByte()
|
||||||
|
nextDecodedValue = SSZ.decode(nextValue, AccountTrieNodeOffer)
|
||||||
|
nextOfferValue = OfferContentValue(contentType: accountTrieNode, accountTrieNode: nextDecodedValue)
|
||||||
|
nextRetrievalValue = nextOfferValue.offerContentToRetrievalContent().encode()
|
||||||
|
|
||||||
|
if i == 0:
|
||||||
|
await currentNode.portalProtocol.gossipContent(
|
||||||
|
Opt.none(NodeId),
|
||||||
|
key,
|
||||||
|
decodedKey,
|
||||||
|
value,
|
||||||
|
offerValue
|
||||||
|
)
|
||||||
|
|
||||||
|
await sleepAsync(100.milliseconds) #TODO figure out how to get rid of this sleep
|
||||||
|
|
||||||
|
check (await nextNode.getContent(decodedNextKey)) == Opt.some(nextRetrievalValue)
|
||||||
|
|
||||||
for i in 0..numOfClients:
|
for i in 0..numOfClients:
|
||||||
await clients[i].portalProtocol.baseProtocol.closeWait()
|
await clients[i].portalProtocol.baseProtocol.closeWait()
|
||||||
|
|
Loading…
Reference in New Issue