Fluffy: Add validation and local storage of content in remaining state portal rpc methods (#2723)

* Add validation functions to be used in state portal rpc.

* Add validation to remaining state portal rpc methods.

* Lookup local content in recursiveFindContent rpc methods.

* portal_stateFindContent and portal_stateOffer no longer store in db.
This commit is contained in:
bhartnett 2024-10-10 21:24:39 +08:00 committed by GitHub
parent 69f646f417
commit 4ae87e6d19
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 203 additions and 208 deletions

View File

@ -96,9 +96,11 @@ func encode*(contentKey: ContentKey): ContentKeyByteList {.inline.} =
func decode*(
T: type ContentKey, contentKey: ContentKeyByteList
): Result[T, string] {.inline.} =
decodeSsz(contentKey.asSeq(), T)
let key = ?decodeSsz(contentKey.asSeq(), T)
if key.contentType == unused:
return err("ContentKey contentType: unused")
ok(key)
func toContentId*(contentKey: ContentKeyByteList): ContentId {.inline.} =
# TODO: Should we try to parse the content key here for invalid ones?
let idHash = sha256.digest(contentKey.asSeq())
readUintBE[256](idHash.data)

View File

@ -93,34 +93,6 @@ proc gossipOffer*(
srcNodeId: Opt[NodeId],
keyBytes: ContentKeyByteList,
offerBytes: seq[byte],
key: AccountTrieNodeKey,
offer: AccountTrieNodeOffer,
) {.async: (raises: [CancelledError]).} =
let req1Peers = await p.neighborhoodGossip(
srcNodeId, ContentKeysList.init(@[keyBytes]), @[offerBytes]
)
debug "Offered content gossipped successfully with peers", keyBytes, peers = req1Peers
proc gossipOffer*(
p: PortalProtocol,
srcNodeId: Opt[NodeId],
keyBytes: ContentKeyByteList,
offerBytes: seq[byte],
key: ContractTrieNodeKey,
offer: ContractTrieNodeOffer,
) {.async: (raises: [CancelledError]).} =
let req1Peers = await p.neighborhoodGossip(
srcNodeId, ContentKeysList.init(@[keyBytes]), @[offerBytes]
)
debug "Offered content gossipped successfully with peers", keyBytes, peers = req1Peers
proc gossipOffer*(
p: PortalProtocol,
srcNodeId: Opt[NodeId],
keyBytes: ContentKeyByteList,
offerBytes: seq[byte],
key: ContractCodeKey,
offer: ContractCodeOffer,
) {.async: (raises: [CancelledError]).} =
let peers = await p.neighborhoodGossip(
srcNodeId, ContentKeysList.init(@[keyBytes]), @[offerBytes]
@ -136,7 +108,7 @@ proc recursiveGossipOffer*(
key: AccountTrieNodeKey,
offer: AccountTrieNodeOffer,
): Future[ContentKeyByteList] {.async: (raises: [CancelledError]).} =
await gossipOffer(p, srcNodeId, keyBytes, offerBytes, key, offer)
await gossipOffer(p, srcNodeId, keyBytes, offerBytes)
# root node, recursive gossip is finished
if key.path.unpackNibbles().len() == 0:
@ -161,7 +133,7 @@ proc recursiveGossipOffer*(
key: ContractTrieNodeKey,
offer: ContractTrieNodeOffer,
): Future[ContentKeyByteList] {.async: (raises: [CancelledError]).} =
await gossipOffer(p, srcNodeId, keyBytes, offerBytes, key, offer)
await gossipOffer(p, srcNodeId, keyBytes, offerBytes)
# root node, recursive gossip is finished
if key.path.unpackNibbles().len() == 0:

View File

@ -11,6 +11,7 @@ import
results,
chronos,
chronicles,
eth/common/hashes,
eth/p2p/discoveryv5/[protocol, enr],
../../database/content_db,
../history/history_network,
@ -19,7 +20,7 @@ import
./state_validation,
./state_gossip
export results, state_content
export results, state_content, hashes
logScope:
topics = "portal_state"
@ -47,11 +48,10 @@ proc new*(
historyNetwork = Opt.none(HistoryNetwork),
validateStateIsCanonical = true,
): T =
let cq = newAsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])](50)
let s = streamManager.registerNewStream(cq)
let portalProtocol = PortalProtocol.new(
let
cq = newAsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])](50)
s = streamManager.registerNewStream(cq)
portalProtocol = PortalProtocol.new(
baseProtocol,
getProtocolId(portalNetwork, PortalSubnetwork.state),
toContentIdHandler,
@ -63,7 +63,7 @@ proc new*(
config = portalConfig,
)
return StateNetwork(
StateNetwork(
portalProtocol: portalProtocol,
contentDB: contentDB,
contentQueue: cq,
@ -108,7 +108,7 @@ proc getContent(
n.portalProtocol.storeContent(contentKeyBytes, contentId, contentValueBytes)
return Opt.some(contentValue)
Opt.some(contentValue)
proc getAccountTrieNode*(
n: StateNetwork, key: AccountTrieNodeKey
@ -132,11 +132,11 @@ proc getContractCode*(
proc getStateRootByBlockNumOrHash*(
n: StateNetwork, blockNumOrHash: uint64 | Hash32
): Future[Opt[Hash32]] {.async: (raises: [CancelledError]).} =
if n.historyNetwork.isNone():
let hn = n.historyNetwork.valueOr:
warn "History network is not available"
return Opt.none(Hash32)
let header = (await n.historyNetwork.get().getVerifiedBlockHeader(blockNumOrHash)).valueOr:
let header = (await hn.getVerifiedBlockHeader(blockNumOrHash)).valueOr:
warn "Failed to get block header from history", blockNumOrHash
return Opt.none(Hash32)
@ -150,10 +150,10 @@ proc processOffer*(
contentKey: AccountTrieNodeKey | ContractTrieNodeKey | ContractCodeKey,
V: type ContentOfferType,
): Future[Result[void, string]] {.async: (raises: [CancelledError]).} =
let contentValue = V.decode(contentValueBytes).valueOr:
let
contentValue = V.decode(contentValueBytes).valueOr:
return err("Unable to decode offered content value")
let res =
validationRes =
if n.validateStateIsCanonical:
let stateRoot = (await n.getStateRootByBlockNumOrHash(contentValue.blockHash)).valueOr:
return err("Failed to get state root by block hash")
@ -162,8 +162,8 @@ proc processOffer*(
# Skip state root validation
validateOffer(Opt.none(Hash32), contentKey, contentValue)
if res.isErr():
return err("Offered content failed validation: " & res.error())
if validationRes.isErr():
return err("Offered content failed validation: " & validationRes.error())
let contentId = n.portalProtocol.toContentId(contentKeyBytes).valueOr:
return err("Received offered content with invalid content key")
@ -174,8 +174,7 @@ proc processOffer*(
debug "Offered content validated successfully", contentKeyBytes
await gossipOffer(
n.portalProtocol, maybeSrcNodeId, contentKeyBytes, contentValueBytes, contentKey,
contentValue,
n.portalProtocol, maybeSrcNodeId, contentKeyBytes, contentValueBytes
)
ok()
@ -185,31 +184,31 @@ proc processContentLoop(n: StateNetwork) {.async: (raises: []).} =
while true:
let (srcNodeId, contentKeys, contentValues) = await n.contentQueue.popFirst()
for i, contentValueBytes in contentValues:
for i, contentBytes in contentValues:
let
contentKeyBytes = contentKeys[i]
contentKey = ContentKey.decode(contentKeyBytes).valueOr:
error "Unable to decode offered content key", contentKeyBytes
continue
let offerRes =
offerRes =
case contentKey.contentType
of unused:
error "Received content with unused content type"
continue
of accountTrieNode:
await n.processOffer(
srcNodeId, contentKeyBytes, contentValueBytes,
contentKey.accountTrieNodeKey, AccountTrieNodeOffer,
srcNodeId, contentKeyBytes, contentBytes, contentKey.accountTrieNodeKey,
AccountTrieNodeOffer,
)
of contractTrieNode:
await n.processOffer(
srcNodeId, contentKeyBytes, contentValueBytes,
srcNodeId, contentKeyBytes, contentBytes,
contentKey.contractTrieNodeKey, ContractTrieNodeOffer,
)
of contractCode:
await n.processOffer(
srcNodeId, contentKeyBytes, contentValueBytes, contentKey.contractCodeKey,
srcNodeId, contentKeyBytes, contentBytes, contentKey.contractCodeKey,
ContractCodeOffer,
)
if offerRes.isOk():

View File

@ -16,7 +16,7 @@ import
export results, hashes, accounts, addresses, rlp
func fromBytes*(T: type Hash32, hash: openArray[byte]): T =
template fromBytes*(T: type Hash32, hash: openArray[byte]): T =
doAssert(hash.len() == 32)
Hash32(array[32, byte].initCopyFrom(hash))
@ -69,14 +69,12 @@ func rlpDecodeContractTrieNode*(contractTrieNode: TrieNode): Result[UInt256, str
except RlpError as e:
err(e.msg)
func toAccount*(accountProof: TrieProof): Result[Account, string] {.inline.} =
template toAccount*(accountProof: TrieProof): Result[Account, string] =
doAssert(accountProof.len() > 0)
rlpDecodeAccountTrieNode(accountProof[^1])
func toSlot*(storageProof: TrieProof): Result[UInt256, string] {.inline.} =
template toSlot*(storageProof: TrieProof): Result[UInt256, string] =
doAssert(storageProof.len() > 0)
rlpDecodeContractTrieNode(storageProof[^1])
func removeLeafKeyEndNibbles*(
@ -93,11 +91,11 @@ func removeLeafKeyEndNibbles*(
unpackedNibbles.dropN(leafPrefix.len()).packNibbles()
func toPath*(hash: Hash32): Nibbles {.inline.} =
template toPath*(hash: Hash32): Nibbles =
Nibbles.init(hash.data, isEven = true)
func toPath*(address: Address): Nibbles {.inline.} =
template toPath*(address: Address): Nibbles =
keccak256(address.data).toPath()
func toPath*(slotKey: UInt256): Nibbles {.inline.} =
template toPath*(slotKey: UInt256): Nibbles =
keccak256(toBytesBE(slotKey)).toPath()

View File

@ -13,10 +13,10 @@ export results, state_content, hashes
from eth/common/eth_types_rlp import rlpHash
proc hashEquals(value: TrieNode | Bytecode, expectedHash: Hash32): bool {.inline.} =
template hashEquals(value: TrieNode | Bytecode, expectedHash: Hash32): bool =
keccak256(value.asSeq()) == expectedHash
proc isValidNextNode(
func isValidNextNode(
thisNodeRlp: Rlp, rlpIdx: int, nextNode: TrieNode
): bool {.raises: RlpError.} =
let hashOrShortRlp = thisNodeRlp.listElem(rlpIdx)
@ -36,7 +36,7 @@ proc isValidNextNode(
nextNode.hashEquals(nextHash)
# TODO: Refactor this function to improve maintainability
proc validateTrieProof*(
func validateTrieProof*(
expectedRootHash: Opt[Hash32],
path: Nibbles,
proof: TrieProof,
@ -117,7 +117,7 @@ proc validateTrieProof*(
else:
ok()
proc validateRetrieval*(
func validateRetrieval*(
key: AccountTrieNodeKey, value: AccountTrieNodeRetrieval
): Result[void, string] =
if value.node.hashEquals(key.nodeHash):
@ -125,7 +125,7 @@ proc validateRetrieval*(
else:
err("hash of account trie node doesn't match the expected node hash")
proc validateRetrieval*(
func validateRetrieval*(
key: ContractTrieNodeKey, value: ContractTrieNodeRetrieval
): Result[void, string] =
if value.node.hashEquals(key.nodeHash):
@ -133,7 +133,7 @@ proc validateRetrieval*(
else:
err("hash of contract trie node doesn't match the expected node hash")
proc validateRetrieval*(
func validateRetrieval*(
key: ContractCodeKey, value: ContractCodeRetrieval
): Result[void, string] =
if value.code.hashEquals(key.codeHash):
@ -141,14 +141,14 @@ proc validateRetrieval*(
else:
err("hash of bytecode doesn't match the expected code hash")
proc validateOffer*(
func validateOffer*(
trustedStateRoot: Opt[Hash32], key: AccountTrieNodeKey, offer: AccountTrieNodeOffer
): Result[void, string] =
?validateTrieProof(trustedStateRoot, key.path, offer.proof)
validateRetrieval(key, offer.toRetrievalValue())
proc validateOffer*(
func validateOffer*(
trustedStateRoot: Opt[Hash32],
key: ContractTrieNodeKey,
offer: ContractTrieNodeOffer,
@ -166,7 +166,7 @@ proc validateOffer*(
validateRetrieval(key, offer.toRetrievalValue())
proc validateOffer*(
func validateOffer*(
trustedStateRoot: Opt[Hash32], key: ContractCodeKey, offer: ContractCodeOffer
): Result[void, string] =
?validateTrieProof(
@ -181,3 +181,47 @@ proc validateOffer*(
return err("hash of bytecode doesn't match the code hash in the account proof")
validateRetrieval(key, offer.toRetrievalValue())
func validateGetContentKey*(
keyBytes: ContentKeyByteList
): Result[(ContentKey, ContentId), string] =
let key = ?ContentKey.decode(keyBytes)
ok((key, toContentId(keyBytes)))
func validateRetrieval*(
key: ContentKey, contentBytes: seq[byte]
): Result[void, string] =
case key.contentType
of unused:
raiseAssert("ContentKey contentType: unused")
of accountTrieNode:
let retrieval = ?AccountTrieNodeRetrieval.decode(contentBytes)
?validateRetrieval(key.accountTrieNodeKey, retrieval)
of contractTrieNode:
let retrieval = ?ContractTrieNodeRetrieval.decode(contentBytes)
?validateRetrieval(key.contractTrieNodeKey, retrieval)
of contractCode:
let retrieval = ?ContractCodeRetrieval.decode(contentBytes)
?validateRetrieval(key.contractCodeKey, retrieval)
func validateOfferGetValue*(
trustedStateRoot: Opt[Hash32], key: ContentKey, contentBytes: seq[byte]
): Result[seq[byte], string] =
let value =
case key.contentType
of unused:
raiseAssert("ContentKey contentType: unused")
of accountTrieNode:
let offer = ?AccountTrieNodeOffer.decode(contentBytes)
?validateOffer(trustedStateRoot, key.accountTrieNodeKey, offer)
offer.toRetrievalValue.encode()
of contractTrieNode:
let offer = ?ContractTrieNodeOffer.decode(contentBytes)
?validateOffer(trustedStateRoot, key.contractTrieNodeKey, offer)
offer.toRetrievalValue.encode()
of contractCode:
let offer = ?ContractCodeOffer.decode(contentBytes)
?validateOffer(trustedStateRoot, key.contractCodeKey, offer)
offer.toRetrievalValue.encode()
ok(value)

View File

@ -36,26 +36,29 @@ proc installPortalStateApiHandlers*(rpcServer: RpcServer, p: PortalProtocol) =
) -> JsonString:
let
node = toNodeWithAddress(enr)
foundContentResult =
await p.findContent(node, ContentKeyByteList.init(hexToSeqByte(contentKey)))
keyBytes = ContentKeyByteList.init(hexToSeqByte(contentKey))
(key, _) = validateGetContentKey(keyBytes).valueOr:
raise invalidKeyErr()
foundContent = (await p.findContent(node, keyBytes)).valueOr:
raise newException(ValueError, $error)
if foundContentResult.isErr():
raise newException(ValueError, $foundContentResult.error)
else:
let foundContent = foundContentResult.get()
case foundContent.kind
of Content:
let contentValue = foundContent.content
validateRetrieval(key, contentValue).isOkOr:
raise invalidValueErr()
let res = ContentInfo(
content: foundContent.content.to0xHex(), utpTransfer: foundContent.utpTransfer
content: contentValue.to0xHex(), utpTransfer: foundContent.utpTransfer
)
return JrpcConv.encode(res).JsonString
JrpcConv.encode(res).JsonString
of Nodes:
let enrs = foundContent.nodes.map(
proc(n: Node): Record =
n.record
)
let jsonEnrs = JrpcConv.encode(enrs)
return ("{\"enrs\":" & jsonEnrs & "}").JsonString
("{\"enrs\":" & jsonEnrs & "}").JsonString
rpcServer.rpc("portal_stateOffer") do(
enr: Record, contentItems: seq[ContentItem]
@ -65,11 +68,14 @@ proc installPortalStateApiHandlers*(rpcServer: RpcServer, p: PortalProtocol) =
var contentItemsToOffer: seq[ContentKV]
for contentItem in contentItems:
let
contentKey = hexToSeqByte(contentItem[0])
contentValue = hexToSeqByte(contentItem[1])
contentKV = ContentKV(
contentKey: ContentKeyByteList.init(contentKey), content: contentValue
)
keyBytes = ContentKeyByteList.init(hexToSeqByte(contentItem[0]))
(key, _) = validateGetContentKey(keyBytes).valueOr:
raise invalidKeyErr()
contentBytes = hexToSeqByte(contentItem[1])
contentKV = ContentKV(contentKey: keyBytes, content: contentBytes)
discard validateOfferGetValue(Opt.none(Hash32), key, contentBytes).valueOr:
raise invalidValueErr()
contentItemsToOffer.add(contentKV)
let offerResult = (await p.offer(node, contentItemsToOffer)).valueOr:
@ -81,92 +87,82 @@ proc installPortalStateApiHandlers*(rpcServer: RpcServer, p: PortalProtocol) =
contentKey: string
) -> ContentInfo:
let
key = ContentKeyByteList.init(hexToSeqByte(contentKey))
contentId = p.toContentId(key).valueOr:
keyBytes = ContentKeyByteList.init(hexToSeqByte(contentKey))
(key, contentId) = validateGetContentKey(keyBytes).valueOr:
raise invalidKeyErr()
maybeContent = p.dbGet(keyBytes, contentId)
if maybeContent.isSome():
return ContentInfo(content: maybeContent.get().to0xHex(), utpTransfer: false)
contentResult = (await p.contentLookup(key, contentId)).valueOr:
let
foundContent = (await p.contentLookup(keyBytes, contentId)).valueOr:
raise contentNotFoundErr()
contentValue = foundContent.content
return ContentInfo(
content: contentResult.content.to0xHex(), utpTransfer: contentResult.utpTransfer
)
validateRetrieval(key, contentValue).isOkOr:
raise invalidValueErr()
p.storeContent(keyBytes, contentId, contentValue)
ContentInfo(content: contentValue.to0xHex(), utpTransfer: foundContent.utpTransfer)
rpcServer.rpc("portal_stateTraceRecursiveFindContent") do(
contentKey: string
) -> TraceContentLookupResult:
let
key = ContentKeyByteList.init(hexToSeqByte(contentKey))
contentId = p.toContentId(key).valueOr:
keyBytes = ContentKeyByteList.init(hexToSeqByte(contentKey))
(key, contentId) = validateGetContentKey(keyBytes).valueOr:
raise invalidKeyErr()
res = await p.traceContentLookup(key, contentId)
maybeContent = p.dbGet(keyBytes, contentId)
if maybeContent.isSome():
return TraceContentLookupResult(content: maybeContent, utpTransfer: false)
# TODO: Might want to restructure the lookup result here. Potentially doing
# the json conversion in this module.
if res.content.isSome():
return res
else:
let
res = await p.traceContentLookup(keyBytes, contentId)
contentValue = res.content.valueOr:
let data = Opt.some(JrpcConv.encode(res.trace).JsonString)
raise contentNotFoundErrWithTrace(data)
rpcServer.rpc("portal_stateStore") do(
contentKey: string, contentValue: string
) -> bool:
validateRetrieval(key, contentValue).isOkOr:
raise invalidValueErr()
p.storeContent(keyBytes, contentId, contentValue)
res
rpcServer.rpc("portal_stateStore") do(contentKey: string, content: string) -> bool:
let
keyBytes = ContentKeyByteList.init(hexToSeqByte(contentKey))
key = ContentKey.decode(keyBytes).valueOr:
raise invalidKeyErr()
contentId = p.toContentId(keyBytes).valueOr:
(key, contentId) = validateGetContentKey(keyBytes).valueOr:
raise invalidKeyErr()
contentBytes = hexToSeqByte(content)
contentValue = validateOfferGetValue(Opt.none(Hash32), key, contentBytes).valueOr:
raise invalidValueErr()
contentBytes = hexToSeqByte(contentValue)
valueToStore =
case key.contentType
of unused:
raise invalidKeyErr()
of accountTrieNode:
let offer = AccountTrieNodeOffer.decode(contentBytes).valueOr:
raise invalidValueErr
validateOffer(Opt.none(Hash32), key.accountTrieNodeKey, offer).isOkOr:
raise invalidValueErr
offer.toRetrievalValue.encode()
of contractTrieNode:
let offer = ContractTrieNodeOffer.decode(contentBytes).valueOr:
raise invalidValueErr
validateOffer(Opt.none(Hash32), key.contractTrieNodeKey, offer).isOkOr:
raise invalidValueErr
offer.toRetrievalValue.encode()
of contractCode:
let offer = ContractCodeOffer.decode(contentBytes).valueOr:
raise invalidValueErr
validateOffer(Opt.none(Hash32), key.contractCodeKey, offer).isOkOr:
raise invalidValueErr
offer.toRetrievalValue.encode()
p.storeContent(keyBytes, contentId, valueToStore)
p.storeContent(keyBytes, contentId, contentValue)
rpcServer.rpc("portal_stateLocalContent") do(contentKey: string) -> string:
let
keyBytes = ContentKeyByteList.init(hexToSeqByte(contentKey))
key = ContentKey.decode(keyBytes).valueOr:
raise invalidKeyErr()
contentId = p.toContentId(keyBytes).valueOr:
(_, contentId) = validateGetContentKey(keyBytes).valueOr:
raise invalidKeyErr()
contentResult = p.dbGet(keyBytes, contentId).valueOr:
raise contentNotFoundErr()
return contentResult.to0xHex()
contentResult.to0xHex()
rpcServer.rpc("portal_stateGossip") do(
contentKey: string, contentValue: string
) -> int:
rpcServer.rpc("portal_stateGossip") do(contentKey: string, content: string) -> int:
let
key = hexToSeqByte(contentKey)
content = hexToSeqByte(contentValue)
contentKeys = ContentKeysList(@[ContentKeyByteList.init(key)])
numberOfPeers =
await p.neighborhoodGossip(Opt.none(NodeId), contentKeys, @[content])
keyBytes = ContentKeyByteList.init(hexToSeqByte(contentKey))
(key, contentId) = validateGetContentKey(keyBytes).valueOr:
raise invalidKeyErr()
contentBytes = hexToSeqByte(content)
contentValue = validateOfferGetValue(Opt.none(Hash32), key, contentBytes).valueOr:
raise invalidValueErr()
return numberOfPeers
p.storeContent(keyBytes, contentId, contentValue)
await p.neighborhoodGossip(
Opt.none(NodeId), ContentKeysList(@[keyBytes]), @[contentBytes]
)

View File

@ -262,11 +262,7 @@ procSuite "State Endpoints":
stateNode2.mockStateRootLookup(contentValue.blockHash, stateRoot)
await stateNode1.portalProtocol.gossipOffer(
Opt.none(NodeId),
contentKeyBytes,
contentValueBytes,
contentKey.contractCodeKey,
contentValue,
Opt.none(NodeId), contentKeyBytes, contentValueBytes
)
# wait for gossip to complete

View File

@ -66,11 +66,7 @@ procSuite "State Gossip - Gossip Offer":
check not stateNode2.containsId(contentId)
await stateNode1.portalProtocol.gossipOffer(
Opt.none(NodeId),
contentKeyBytes,
contentValueBytes,
contentKey.accountTrieNodeKey,
contentValue,
Opt.none(NodeId), contentKeyBytes, contentValueBytes
)
# wait for offer to be processed by state node 2
@ -138,11 +134,7 @@ procSuite "State Gossip - Gossip Offer":
check not stateNode2.containsId(contentId)
await stateNode1.portalProtocol.gossipOffer(
Opt.none(NodeId),
contentKeyBytes,
contentValueBytes,
contentKey.contractTrieNodeKey,
contentValue,
Opt.none(NodeId), contentKeyBytes, contentValueBytes
)
# wait for offer to be processed by state node 2
@ -201,11 +193,7 @@ procSuite "State Gossip - Gossip Offer":
check not stateNode2.containsId(contentId)
await stateNode1.portalProtocol.gossipOffer(
Opt.none(NodeId),
contentKeyBytes,
contentValueBytes,
contentKey.contractCodeKey,
contentValue,
Opt.none(NodeId), contentKeyBytes, contentValueBytes
)
# wait for offer to be processed by state node 2