Merge branch 'master' into testapi-delete
This commit is contained in:
commit
5355c0c1f6
|
@ -154,9 +154,7 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =
|
||||||
let
|
let
|
||||||
haves = b.peers.peersHave(cid)
|
haves = b.peers.peersHave(cid)
|
||||||
|
|
||||||
trace "Current number of peers for block", cid, peers = haves.len
|
|
||||||
if haves.len < b.minPeersPerBlock:
|
if haves.len < b.minPeersPerBlock:
|
||||||
trace "Discovering block", cid
|
|
||||||
try:
|
try:
|
||||||
let
|
let
|
||||||
request = b.discovery
|
request = b.discovery
|
||||||
|
@ -168,7 +166,6 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =
|
||||||
let
|
let
|
||||||
peers = await request
|
peers = await request
|
||||||
|
|
||||||
trace "Discovered peers for block", peers = peers.len, cid
|
|
||||||
let
|
let
|
||||||
dialed = await allFinished(
|
dialed = await allFinished(
|
||||||
peers.mapIt( b.network.dialPeer(it.data) ))
|
peers.mapIt( b.network.dialPeer(it.data) ))
|
||||||
|
@ -189,10 +186,9 @@ proc queueFindBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} =
|
||||||
for cid in cids:
|
for cid in cids:
|
||||||
if cid notin b.discoveryQueue:
|
if cid notin b.discoveryQueue:
|
||||||
try:
|
try:
|
||||||
trace "Queueing find block", cid, queue = b.discoveryQueue.len
|
|
||||||
b.discoveryQueue.putNoWait(cid)
|
b.discoveryQueue.putNoWait(cid)
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
trace "Exception queueing discovery request", exc = exc.msg
|
warn "Exception queueing discovery request", exc = exc.msg
|
||||||
|
|
||||||
proc queueProvideBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} =
|
proc queueProvideBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} =
|
||||||
for cid in cids:
|
for cid in cids:
|
||||||
|
|
|
@ -125,7 +125,7 @@ proc stop*(b: BlockExcEngine) {.async.} =
|
||||||
|
|
||||||
proc sendWantHave(
|
proc sendWantHave(
|
||||||
b: BlockExcEngine,
|
b: BlockExcEngine,
|
||||||
address: BlockAddress,
|
address: BlockAddress, # pluralize this entire call chain, please
|
||||||
excluded: seq[BlockExcPeerCtx],
|
excluded: seq[BlockExcPeerCtx],
|
||||||
peers: seq[BlockExcPeerCtx]): Future[void] {.async.} =
|
peers: seq[BlockExcPeerCtx]): Future[void] {.async.} =
|
||||||
trace "Sending wantHave request to peers", address
|
trace "Sending wantHave request to peers", address
|
||||||
|
@ -139,7 +139,7 @@ proc sendWantHave(
|
||||||
|
|
||||||
proc sendWantBlock(
|
proc sendWantBlock(
|
||||||
b: BlockExcEngine,
|
b: BlockExcEngine,
|
||||||
address: BlockAddress,
|
address: BlockAddress, # pluralize this entire call chain, please
|
||||||
blockPeer: BlockExcPeerCtx): Future[void] {.async.} =
|
blockPeer: BlockExcPeerCtx): Future[void] {.async.} =
|
||||||
trace "Sending wantBlock request to", peer = blockPeer.id, address
|
trace "Sending wantBlock request to", peer = blockPeer.id, address
|
||||||
await b.network.request.sendWantList(
|
await b.network.request.sendWantList(
|
||||||
|
@ -154,13 +154,11 @@ proc monitorBlockHandle(
|
||||||
peerId: PeerId) {.async.} =
|
peerId: PeerId) {.async.} =
|
||||||
|
|
||||||
try:
|
try:
|
||||||
trace "Monitoring block handle", address, peerId
|
|
||||||
discard await handle
|
discard await handle
|
||||||
trace "Block handle success", address, peerId
|
|
||||||
except CancelledError as exc:
|
except CancelledError as exc:
|
||||||
trace "Block handle cancelled", address, peerId
|
trace "Block handle cancelled", address, peerId
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
trace "Error block handle, disconnecting peer", address, exc = exc.msg, peerId
|
warn "Error block handle, disconnecting peer", address, exc = exc.msg, peerId
|
||||||
|
|
||||||
# TODO: really, this is just a quick and dirty way of
|
# TODO: really, this is just a quick and dirty way of
|
||||||
# preventing hitting the same "bad" peer every time, however,
|
# preventing hitting the same "bad" peer every time, however,
|
||||||
|
@ -217,7 +215,6 @@ proc blockPresenceHandler*(
|
||||||
b: BlockExcEngine,
|
b: BlockExcEngine,
|
||||||
peer: PeerId,
|
peer: PeerId,
|
||||||
blocks: seq[BlockPresence]) {.async.} =
|
blocks: seq[BlockPresence]) {.async.} =
|
||||||
trace "Received presence update for peer", peer, blocks = blocks.len
|
|
||||||
let
|
let
|
||||||
peerCtx = b.peers.get(peer)
|
peerCtx = b.peers.get(peer)
|
||||||
wantList = toSeq(b.pendingBlocks.wantList)
|
wantList = toSeq(b.pendingBlocks.wantList)
|
||||||
|
@ -227,12 +224,6 @@ proc blockPresenceHandler*(
|
||||||
|
|
||||||
for blk in blocks:
|
for blk in blocks:
|
||||||
if presence =? Presence.init(blk):
|
if presence =? Presence.init(blk):
|
||||||
logScope:
|
|
||||||
address = $presence.address
|
|
||||||
have = presence.have
|
|
||||||
price = presence.price
|
|
||||||
|
|
||||||
trace "Updating presence"
|
|
||||||
peerCtx.setPresence(presence)
|
peerCtx.setPresence(presence)
|
||||||
|
|
||||||
let
|
let
|
||||||
|
@ -323,14 +314,12 @@ proc resolveBlocks*(b: BlockExcEngine, blocks: seq[Block]) {.async.} =
|
||||||
proc payForBlocks(engine: BlockExcEngine,
|
proc payForBlocks(engine: BlockExcEngine,
|
||||||
peer: BlockExcPeerCtx,
|
peer: BlockExcPeerCtx,
|
||||||
blocksDelivery: seq[BlockDelivery]) {.async.} =
|
blocksDelivery: seq[BlockDelivery]) {.async.} =
|
||||||
trace "Paying for blocks", len = blocksDelivery.len
|
|
||||||
|
|
||||||
let
|
let
|
||||||
sendPayment = engine.network.request.sendPayment
|
sendPayment = engine.network.request.sendPayment
|
||||||
price = peer.price(blocksDelivery.mapIt(it.address))
|
price = peer.price(blocksDelivery.mapIt(it.address))
|
||||||
|
|
||||||
if payment =? engine.wallet.pay(peer, price):
|
if payment =? engine.wallet.pay(peer, price):
|
||||||
trace "Sending payment for blocks", price
|
trace "Sending payment for blocks", price, len = blocksDelivery.len
|
||||||
await sendPayment(peer.id, payment)
|
await sendPayment(peer.id, payment)
|
||||||
|
|
||||||
proc validateBlockDelivery(
|
proc validateBlockDelivery(
|
||||||
|
@ -365,7 +354,7 @@ proc blocksDeliveryHandler*(
|
||||||
b: BlockExcEngine,
|
b: BlockExcEngine,
|
||||||
peer: PeerId,
|
peer: PeerId,
|
||||||
blocksDelivery: seq[BlockDelivery]) {.async.} =
|
blocksDelivery: seq[BlockDelivery]) {.async.} =
|
||||||
trace "Got blocks from peer", peer, len = blocksDelivery.len
|
trace "Received blocks from peer", peer, blocks = (blocksDelivery.mapIt($it.address)).join(",")
|
||||||
|
|
||||||
var validatedBlocksDelivery: seq[BlockDelivery]
|
var validatedBlocksDelivery: seq[BlockDelivery]
|
||||||
for bd in blocksDelivery:
|
for bd in blocksDelivery:
|
||||||
|
@ -411,7 +400,6 @@ proc wantListHandler*(
|
||||||
b: BlockExcEngine,
|
b: BlockExcEngine,
|
||||||
peer: PeerId,
|
peer: PeerId,
|
||||||
wantList: WantList) {.async.} =
|
wantList: WantList) {.async.} =
|
||||||
trace "Got wantList for peer", peer, items = wantList.entries.len
|
|
||||||
let
|
let
|
||||||
peerCtx = b.peers.get(peer)
|
peerCtx = b.peers.get(peer)
|
||||||
if isNil(peerCtx):
|
if isNil(peerCtx):
|
||||||
|
@ -430,8 +418,6 @@ proc wantListHandler*(
|
||||||
wantType = $e.wantType
|
wantType = $e.wantType
|
||||||
|
|
||||||
if idx < 0: # updating entry
|
if idx < 0: # updating entry
|
||||||
trace "Processing new want list entry"
|
|
||||||
|
|
||||||
let
|
let
|
||||||
have = await e.address in b.localStore
|
have = await e.address in b.localStore
|
||||||
price = @(
|
price = @(
|
||||||
|
@ -442,41 +428,35 @@ proc wantListHandler*(
|
||||||
codex_block_exchange_want_have_lists_received.inc()
|
codex_block_exchange_want_have_lists_received.inc()
|
||||||
|
|
||||||
if not have and e.sendDontHave:
|
if not have and e.sendDontHave:
|
||||||
trace "Adding dont have entry to presence response"
|
|
||||||
presence.add(
|
presence.add(
|
||||||
BlockPresence(
|
BlockPresence(
|
||||||
address: e.address,
|
address: e.address,
|
||||||
`type`: BlockPresenceType.DontHave,
|
`type`: BlockPresenceType.DontHave,
|
||||||
price: price))
|
price: price))
|
||||||
elif have and e.wantType == WantType.WantHave:
|
elif have and e.wantType == WantType.WantHave:
|
||||||
trace "Adding have entry to presence response"
|
|
||||||
presence.add(
|
presence.add(
|
||||||
BlockPresence(
|
BlockPresence(
|
||||||
address: e.address,
|
address: e.address,
|
||||||
`type`: BlockPresenceType.Have,
|
`type`: BlockPresenceType.Have,
|
||||||
price: price))
|
price: price))
|
||||||
elif e.wantType == WantType.WantBlock:
|
elif e.wantType == WantType.WantBlock:
|
||||||
trace "Added entry to peer's want blocks list"
|
|
||||||
peerCtx.peerWants.add(e)
|
peerCtx.peerWants.add(e)
|
||||||
codex_block_exchange_want_block_lists_received.inc()
|
codex_block_exchange_want_block_lists_received.inc()
|
||||||
else:
|
else:
|
||||||
# peer doesn't want this block anymore
|
# peer doesn't want this block anymore
|
||||||
if e.cancel:
|
if e.cancel:
|
||||||
trace "Removing entry from peer want list"
|
|
||||||
peerCtx.peerWants.del(idx)
|
peerCtx.peerWants.del(idx)
|
||||||
else:
|
else:
|
||||||
trace "Updating entry in peer want list"
|
|
||||||
# peer might want to ask for the same cid with
|
# peer might want to ask for the same cid with
|
||||||
# different want params
|
# different want params
|
||||||
peerCtx.peerWants[idx] = e # update entry
|
peerCtx.peerWants[idx] = e # update entry
|
||||||
|
|
||||||
if presence.len > 0:
|
if presence.len > 0:
|
||||||
trace "Sending presence to remote", items = presence.len
|
trace "Sending presence to remote", items = presence.mapIt($it).join(",")
|
||||||
await b.network.request.sendPresence(peer, presence)
|
await b.network.request.sendPresence(peer, presence)
|
||||||
|
|
||||||
trace "Scheduling a task to check want-list", peer
|
|
||||||
if not b.scheduleTask(peerCtx):
|
if not b.scheduleTask(peerCtx):
|
||||||
trace "Unable to schedule task for peer", peer
|
warn "Unable to schedule task for peer", peer
|
||||||
|
|
||||||
proc accountHandler*(
|
proc accountHandler*(
|
||||||
engine: BlockExcEngine,
|
engine: BlockExcEngine,
|
||||||
|
@ -541,8 +521,6 @@ proc dropPeer*(b: BlockExcEngine, peer: PeerId) =
|
||||||
b.peers.remove(peer)
|
b.peers.remove(peer)
|
||||||
|
|
||||||
proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
|
proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
|
||||||
trace "Handling task for peer", peer = task.id
|
|
||||||
|
|
||||||
# Send to the peer blocks he wants to get,
|
# Send to the peer blocks he wants to get,
|
||||||
# if they present in our local store
|
# if they present in our local store
|
||||||
|
|
||||||
|
@ -559,7 +537,6 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
|
||||||
if peerWant.address in addresses:
|
if peerWant.address in addresses:
|
||||||
peerWant.inFlight = inFlight
|
peerWant.inFlight = inFlight
|
||||||
|
|
||||||
trace "wantsBlocks", peer = task.id, n = wantsBlocks.len
|
|
||||||
if wantsBlocks.len > 0:
|
if wantsBlocks.len > 0:
|
||||||
# Mark wants as in-flight.
|
# Mark wants as in-flight.
|
||||||
let wantAddresses = wantsBlocks.mapIt(it.address)
|
let wantAddresses = wantsBlocks.mapIt(it.address)
|
||||||
|
@ -567,7 +544,6 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
|
||||||
wantsBlocks.sort(SortOrder.Descending)
|
wantsBlocks.sort(SortOrder.Descending)
|
||||||
|
|
||||||
proc localLookup(e: WantListEntry): Future[?!BlockDelivery] {.async.} =
|
proc localLookup(e: WantListEntry): Future[?!BlockDelivery] {.async.} =
|
||||||
trace "Handling lookup for entry", address = e.address
|
|
||||||
if e.address.leaf:
|
if e.address.leaf:
|
||||||
(await b.localStore.getBlockAndProof(e.address.treeCid, e.address.index)).map(
|
(await b.localStore.getBlockAndProof(e.address.treeCid, e.address.index)).map(
|
||||||
(blkAndProof: (Block, CodexProof)) =>
|
(blkAndProof: (Block, CodexProof)) =>
|
||||||
|
@ -591,7 +567,7 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
|
||||||
updateInFlight(failedAddresses, false)
|
updateInFlight(failedAddresses, false)
|
||||||
|
|
||||||
if blocksDelivery.len > 0:
|
if blocksDelivery.len > 0:
|
||||||
trace "Sending blocks to peer", peer = task.id, blocks = blocksDelivery.len
|
trace "Sending blocks to peer", peer = task.id, blocks = (blocksDelivery.mapIt($it.address)).join(",")
|
||||||
await b.network.request.sendBlocksDelivery(
|
await b.network.request.sendBlocksDelivery(
|
||||||
task.id,
|
task.id,
|
||||||
blocksDelivery
|
blocksDelivery
|
||||||
|
@ -600,7 +576,6 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
|
||||||
codex_block_exchange_blocks_sent.inc(blocksDelivery.len.int64)
|
codex_block_exchange_blocks_sent.inc(blocksDelivery.len.int64)
|
||||||
|
|
||||||
task.peerWants.keepItIf(it.address notin successAddresses)
|
task.peerWants.keepItIf(it.address notin successAddresses)
|
||||||
trace "Removed entries from peerWants", peerWants = task.peerWants.len
|
|
||||||
|
|
||||||
proc blockexcTaskRunner(b: BlockExcEngine) {.async.} =
|
proc blockexcTaskRunner(b: BlockExcEngine) {.async.} =
|
||||||
## process tasks
|
## process tasks
|
||||||
|
@ -611,7 +586,6 @@ proc blockexcTaskRunner(b: BlockExcEngine) {.async.} =
|
||||||
let
|
let
|
||||||
peerCtx = await b.taskQueue.pop()
|
peerCtx = await b.taskQueue.pop()
|
||||||
|
|
||||||
trace "Got new task from queue", peerId = peerCtx.id
|
|
||||||
await b.taskHandler(peerCtx)
|
await b.taskHandler(peerCtx)
|
||||||
|
|
||||||
info "Exiting blockexc task runner"
|
info "Exiting blockexc task runner"
|
||||||
|
|
|
@ -58,15 +58,13 @@ proc getWantHandle*(
|
||||||
inFlight: inFlight,
|
inFlight: inFlight,
|
||||||
startTime: getMonoTime().ticks)
|
startTime: getMonoTime().ticks)
|
||||||
|
|
||||||
trace "Adding pending future for block", address, inFlight = p.blocks[address].inFlight
|
|
||||||
|
|
||||||
p.updatePendingBlockGauge()
|
p.updatePendingBlockGauge()
|
||||||
return await p.blocks[address].handle.wait(timeout)
|
return await p.blocks[address].handle.wait(timeout)
|
||||||
except CancelledError as exc:
|
except CancelledError as exc:
|
||||||
trace "Blocks cancelled", exc = exc.msg, address
|
trace "Blocks cancelled", exc = exc.msg, address
|
||||||
raise exc
|
raise exc
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
trace "Pending WANT failed or expired", exc = exc.msg
|
error "Pending WANT failed or expired", exc = exc.msg
|
||||||
# no need to cancel, it is already cancelled by wait()
|
# no need to cancel, it is already cancelled by wait()
|
||||||
raise exc
|
raise exc
|
||||||
finally:
|
finally:
|
||||||
|
@ -88,8 +86,6 @@ proc resolve*(
|
||||||
|
|
||||||
for bd in blocksDelivery:
|
for bd in blocksDelivery:
|
||||||
p.blocks.withValue(bd.address, blockReq):
|
p.blocks.withValue(bd.address, blockReq):
|
||||||
trace "Resolving block", address = bd.address
|
|
||||||
|
|
||||||
if not blockReq.handle.finished:
|
if not blockReq.handle.finished:
|
||||||
let
|
let
|
||||||
startTime = blockReq.startTime
|
startTime = blockReq.startTime
|
||||||
|
@ -99,7 +95,9 @@ proc resolve*(
|
||||||
blockReq.handle.complete(bd.blk)
|
blockReq.handle.complete(bd.blk)
|
||||||
|
|
||||||
codex_block_exchange_retrieval_time_us.set(retrievalDurationUs)
|
codex_block_exchange_retrieval_time_us.set(retrievalDurationUs)
|
||||||
trace "Block retrieval time", retrievalDurationUs, address = bd.address
|
|
||||||
|
if retrievalDurationUs > 500000:
|
||||||
|
warn "High block retrieval time", retrievalDurationUs, address = bd.address
|
||||||
else:
|
else:
|
||||||
trace "Block handle already finished", address = bd.address
|
trace "Block handle already finished", address = bd.address
|
||||||
|
|
||||||
|
@ -112,7 +110,6 @@ proc setInFlight*(
|
||||||
|
|
||||||
p.blocks.withValue(address, pending):
|
p.blocks.withValue(address, pending):
|
||||||
pending[].inFlight = inFlight
|
pending[].inFlight = inFlight
|
||||||
trace "Setting inflight", address, inFlight = pending[].inFlight
|
|
||||||
|
|
||||||
proc isInFlight*(
|
proc isInFlight*(
|
||||||
p: PendingBlocksManager,
|
p: PendingBlocksManager,
|
||||||
|
@ -122,7 +119,6 @@ proc isInFlight*(
|
||||||
|
|
||||||
p.blocks.withValue(address, pending):
|
p.blocks.withValue(address, pending):
|
||||||
result = pending[].inFlight
|
result = pending[].inFlight
|
||||||
trace "Getting inflight", address, inFlight = result
|
|
||||||
|
|
||||||
proc contains*(p: PendingBlocksManager, cid: Cid): bool =
|
proc contains*(p: PendingBlocksManager, cid: Cid): bool =
|
||||||
BlockAddress.init(cid) in p.blocks
|
BlockAddress.init(cid) in p.blocks
|
||||||
|
|
|
@ -96,7 +96,6 @@ proc send*(b: BlockExcNetwork, id: PeerId, msg: pb.Message) {.async.} =
|
||||||
b.peers.withValue(id, peer):
|
b.peers.withValue(id, peer):
|
||||||
try:
|
try:
|
||||||
await b.inflightSema.acquire()
|
await b.inflightSema.acquire()
|
||||||
trace "Sending message to peer", peer = id
|
|
||||||
await peer[].send(msg)
|
await peer[].send(msg)
|
||||||
except CatchableError as err:
|
except CatchableError as err:
|
||||||
error "Error sending message", peer = id, msg = err.msg
|
error "Error sending message", peer = id, msg = err.msg
|
||||||
|
@ -113,7 +112,6 @@ proc handleWantList(
|
||||||
##
|
##
|
||||||
|
|
||||||
if not b.handlers.onWantList.isNil:
|
if not b.handlers.onWantList.isNil:
|
||||||
trace "Handling want list for peer", peer = peer.id, items = list.entries.len
|
|
||||||
await b.handlers.onWantList(peer.id, list)
|
await b.handlers.onWantList(peer.id, list)
|
||||||
|
|
||||||
proc sendWantList*(
|
proc sendWantList*(
|
||||||
|
@ -128,7 +126,6 @@ proc sendWantList*(
|
||||||
## Send a want message to peer
|
## Send a want message to peer
|
||||||
##
|
##
|
||||||
|
|
||||||
trace "Sending want list to peer", peer = id, `type` = $wantType, items = addresses.len
|
|
||||||
let msg = WantList(
|
let msg = WantList(
|
||||||
entries: addresses.mapIt(
|
entries: addresses.mapIt(
|
||||||
WantListEntry(
|
WantListEntry(
|
||||||
|
@ -157,7 +154,6 @@ proc handleBlocksDelivery(
|
||||||
##
|
##
|
||||||
|
|
||||||
if not b.handlers.onBlocksDelivery.isNil:
|
if not b.handlers.onBlocksDelivery.isNil:
|
||||||
trace "Handling blocks for peer", peer = peer.id, items = blocksDelivery.len
|
|
||||||
await b.handlers.onBlocksDelivery(peer.id, blocksDelivery)
|
await b.handlers.onBlocksDelivery(peer.id, blocksDelivery)
|
||||||
|
|
||||||
|
|
||||||
|
@ -178,7 +174,6 @@ proc handleBlockPresence(
|
||||||
##
|
##
|
||||||
|
|
||||||
if not b.handlers.onPresence.isNil:
|
if not b.handlers.onPresence.isNil:
|
||||||
trace "Handling block presence for peer", peer = peer.id, items = presence.len
|
|
||||||
await b.handlers.onPresence(peer.id, presence)
|
await b.handlers.onPresence(peer.id, presence)
|
||||||
|
|
||||||
proc sendBlockPresence*(
|
proc sendBlockPresence*(
|
||||||
|
|
|
@ -45,7 +45,6 @@ proc readLoop*(b: NetworkPeer, conn: Connection) {.async.} =
|
||||||
let
|
let
|
||||||
data = await conn.readLp(MaxMessageSize.int)
|
data = await conn.readLp(MaxMessageSize.int)
|
||||||
msg = Message.protobufDecode(data).mapFailure().tryGet()
|
msg = Message.protobufDecode(data).mapFailure().tryGet()
|
||||||
trace "Got message for peer", peer = b.id
|
|
||||||
await b.handler(b, msg)
|
await b.handler(b, msg)
|
||||||
except CatchableError as err:
|
except CatchableError as err:
|
||||||
warn "Exception in blockexc read loop", msg = err.msg
|
warn "Exception in blockexc read loop", msg = err.msg
|
||||||
|
@ -64,10 +63,9 @@ proc send*(b: NetworkPeer, msg: Message) {.async.} =
|
||||||
let conn = await b.connect()
|
let conn = await b.connect()
|
||||||
|
|
||||||
if isNil(conn):
|
if isNil(conn):
|
||||||
trace "Unable to get send connection for peer message not sent", peer = b.id
|
warn "Unable to get send connection for peer message not sent", peer = b.id
|
||||||
return
|
return
|
||||||
|
|
||||||
trace "Sending message to remote", peer = b.id
|
|
||||||
await conn.writeLp(protobufEncode(msg))
|
await conn.writeLp(protobufEncode(msg))
|
||||||
|
|
||||||
proc broadcast*(b: NetworkPeer, msg: Message) =
|
proc broadcast*(b: NetworkPeer, msg: Message) =
|
||||||
|
@ -75,7 +73,7 @@ proc broadcast*(b: NetworkPeer, msg: Message) =
|
||||||
try:
|
try:
|
||||||
await b.send(msg)
|
await b.send(msg)
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
trace "Exception broadcasting message to peer", peer = b.id, exc = exc.msg
|
warn "Exception broadcasting message to peer", peer = b.id, exc = exc.msg
|
||||||
|
|
||||||
asyncSpawn sendAwaiter()
|
asyncSpawn sendAwaiter()
|
||||||
|
|
||||||
|
|
|
@ -25,9 +25,6 @@ import ../../logutils
|
||||||
|
|
||||||
export payments, nitro
|
export payments, nitro
|
||||||
|
|
||||||
logScope:
|
|
||||||
topics = "codex peercontext"
|
|
||||||
|
|
||||||
type
|
type
|
||||||
BlockExcPeerCtx* = ref object of RootObj
|
BlockExcPeerCtx* = ref object of RootObj
|
||||||
id*: PeerId
|
id*: PeerId
|
||||||
|
@ -66,5 +63,4 @@ func price*(self: BlockExcPeerCtx, addresses: seq[BlockAddress]): UInt256 =
|
||||||
self.blocks.withValue(a, precense):
|
self.blocks.withValue(a, precense):
|
||||||
price += precense[].price
|
price += precense[].price
|
||||||
|
|
||||||
trace "Blocks price", price
|
|
||||||
price
|
price
|
||||||
|
|
|
@ -47,15 +47,12 @@ func contains*(self: PeerCtxStore, peerId: PeerId): bool =
|
||||||
peerId in self.peers
|
peerId in self.peers
|
||||||
|
|
||||||
func add*(self: PeerCtxStore, peer: BlockExcPeerCtx) =
|
func add*(self: PeerCtxStore, peer: BlockExcPeerCtx) =
|
||||||
trace "Adding peer to peer context store", peer = peer.id
|
|
||||||
self.peers[peer.id] = peer
|
self.peers[peer.id] = peer
|
||||||
|
|
||||||
func remove*(self: PeerCtxStore, peerId: PeerId) =
|
func remove*(self: PeerCtxStore, peerId: PeerId) =
|
||||||
trace "Removing peer from peer context store", peer = peerId
|
|
||||||
self.peers.del(peerId)
|
self.peers.del(peerId)
|
||||||
|
|
||||||
func get*(self: PeerCtxStore, peerId: PeerId): BlockExcPeerCtx =
|
func get*(self: PeerCtxStore, peerId: PeerId): BlockExcPeerCtx =
|
||||||
trace "Retrieving peer from peer context store", peer = peerId
|
|
||||||
self.peers.getOrDefault(peerId, nil)
|
self.peers.getOrDefault(peerId, nil)
|
||||||
|
|
||||||
func len*(self: PeerCtxStore): int =
|
func len*(self: PeerCtxStore): int =
|
||||||
|
|
|
@ -73,16 +73,14 @@ method find*(
|
||||||
cid: Cid): Future[seq[SignedPeerRecord]] {.async, base.} =
|
cid: Cid): Future[seq[SignedPeerRecord]] {.async, base.} =
|
||||||
## Find block providers
|
## Find block providers
|
||||||
##
|
##
|
||||||
|
|
||||||
trace "Finding providers for block", cid
|
|
||||||
without providers =?
|
without providers =?
|
||||||
(await d.protocol.getProviders(cid.toNodeId())).mapFailure, error:
|
(await d.protocol.getProviders(cid.toNodeId())).mapFailure, error:
|
||||||
trace "Error finding providers for block", cid, error = error.msg
|
warn "Error finding providers for block", cid, error = error.msg
|
||||||
|
|
||||||
return providers.filterIt( not (it.data.peerId == d.peerId) )
|
return providers.filterIt( not (it.data.peerId == d.peerId) )
|
||||||
|
|
||||||
method provide*(d: Discovery, cid: Cid) {.async, base.} =
|
method provide*(d: Discovery, cid: Cid) {.async, base.} =
|
||||||
## Provide a bock Cid
|
## Provide a block Cid
|
||||||
##
|
##
|
||||||
let
|
let
|
||||||
nodes = await d.protocol.addProvider(
|
nodes = await d.protocol.addProvider(
|
||||||
|
|
|
@ -95,15 +95,14 @@ proc retrieveCid(
|
||||||
break
|
break
|
||||||
|
|
||||||
bytes += buff.len
|
bytes += buff.len
|
||||||
trace "Sending chunk", size = buff.len
|
|
||||||
await resp.sendChunk(addr buff[0], buff.len)
|
await resp.sendChunk(addr buff[0], buff.len)
|
||||||
await resp.finish()
|
await resp.finish()
|
||||||
codex_api_downloads.inc()
|
codex_api_downloads.inc()
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
trace "Excepting streaming blocks", exc = exc.msg
|
warn "Excepting streaming blocks", exc = exc.msg
|
||||||
return RestApiResponse.error(Http500)
|
return RestApiResponse.error(Http500)
|
||||||
finally:
|
finally:
|
||||||
trace "Sent bytes", cid = cid, bytes
|
info "Sent bytes", cid = cid, bytes
|
||||||
if not stream.isNil:
|
if not stream.isNil:
|
||||||
await stream.close()
|
await stream.close()
|
||||||
|
|
||||||
|
|
|
@ -34,17 +34,13 @@ type
|
||||||
localStore*: BlockStore # local block store
|
localStore*: BlockStore # local block store
|
||||||
|
|
||||||
method getBlock*(self: NetworkStore, address: BlockAddress): Future[?!Block] {.async.} =
|
method getBlock*(self: NetworkStore, address: BlockAddress): Future[?!Block] {.async.} =
|
||||||
trace "Getting block from local store or network", address
|
|
||||||
|
|
||||||
without blk =? (await self.localStore.getBlock(address)), err:
|
without blk =? (await self.localStore.getBlock(address)), err:
|
||||||
if not (err of BlockNotFoundError):
|
if not (err of BlockNotFoundError):
|
||||||
trace "Error getting block from local store", address, err = err.msg
|
error "Error getting block from local store", address, err = err.msg
|
||||||
return failure err
|
return failure err
|
||||||
|
|
||||||
trace "Block not in local store", address, err = err.msg
|
|
||||||
|
|
||||||
without newBlock =? (await self.engine.requestBlock(address)), err:
|
without newBlock =? (await self.engine.requestBlock(address)), err:
|
||||||
trace "Unable to get block from exchange engine", address, err = err.msg
|
error "Unable to get block from exchange engine", address, err = err.msg
|
||||||
return failure err
|
return failure err
|
||||||
|
|
||||||
return success newBlock
|
return success newBlock
|
||||||
|
|
|
@ -139,10 +139,9 @@ method getCidAndProof*(
|
||||||
return failure(err)
|
return failure(err)
|
||||||
|
|
||||||
without (cid, proof) =? (Cid, CodexProof).decode(value), err:
|
without (cid, proof) =? (Cid, CodexProof).decode(value), err:
|
||||||
trace "Unable to decode cid and proof", err = err.msg
|
error "Unable to decode cid and proof", err = err.msg
|
||||||
return failure(err)
|
return failure(err)
|
||||||
|
|
||||||
trace "Got cid and proof for block", cid, proof = $proof
|
|
||||||
return success (cid, proof)
|
return success (cid, proof)
|
||||||
|
|
||||||
method getCid*(
|
method getCid*(
|
||||||
|
@ -154,10 +153,11 @@ method getCid*(
|
||||||
|
|
||||||
without value =? await self.metaDs.get(key), err:
|
without value =? await self.metaDs.get(key), err:
|
||||||
if err of DatastoreKeyNotFound:
|
if err of DatastoreKeyNotFound:
|
||||||
trace "Cid not found", treeCid, index
|
# This failure is expected to happen frequently:
|
||||||
|
# NetworkStore.getBlock will call RepoStore.getBlock before starting the block exchange engine.
|
||||||
return failure(newException(BlockNotFoundError, err.msg))
|
return failure(newException(BlockNotFoundError, err.msg))
|
||||||
else:
|
else:
|
||||||
trace "Error getting cid from datastore", err = err.msg, key
|
error "Error getting cid from datastore", err = err.msg, key
|
||||||
return failure(err)
|
return failure(err)
|
||||||
|
|
||||||
return (Cid, CodexProof).decodeCid(value)
|
return (Cid, CodexProof).decodeCid(value)
|
||||||
|
@ -170,21 +170,19 @@ method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} =
|
||||||
cid = cid
|
cid = cid
|
||||||
|
|
||||||
if cid.isEmpty:
|
if cid.isEmpty:
|
||||||
trace "Empty block, ignoring"
|
|
||||||
return cid.emptyBlock
|
return cid.emptyBlock
|
||||||
|
|
||||||
without key =? makePrefixKey(self.postFixLen, cid), err:
|
without key =? makePrefixKey(self.postFixLen, cid), err:
|
||||||
trace "Error getting key from provider", err = err.msg
|
error "Error getting key from provider", err = err.msg
|
||||||
return failure(err)
|
return failure(err)
|
||||||
|
|
||||||
without data =? await self.repoDs.get(key), err:
|
without data =? await self.repoDs.get(key), err:
|
||||||
if not (err of DatastoreKeyNotFound):
|
if not (err of DatastoreKeyNotFound):
|
||||||
trace "Error getting block from datastore", err = err.msg, key
|
error "Error getting block from datastore", err = err.msg, key
|
||||||
return failure(err)
|
return failure(err)
|
||||||
|
|
||||||
return failure(newException(BlockNotFoundError, err.msg))
|
return failure(newException(BlockNotFoundError, err.msg))
|
||||||
|
|
||||||
trace "Got block for cid", cid
|
|
||||||
return Block.new(cid, data, verify = true)
|
return Block.new(cid, data, verify = true)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -82,7 +82,6 @@ method readOnce*(
|
||||||
## Raise exception if we are already at EOF.
|
## Raise exception if we are already at EOF.
|
||||||
##
|
##
|
||||||
|
|
||||||
trace "Reading from manifest", cid = self.manifest.cid.get(), blocks = self.manifest.blocksCount
|
|
||||||
if self.atEof:
|
if self.atEof:
|
||||||
raise newLPStreamEOFError()
|
raise newLPStreamEOFError()
|
||||||
|
|
||||||
|
@ -104,7 +103,7 @@ method readOnce*(
|
||||||
without blk =? await self.store.getBlock(address), error:
|
without blk =? await self.store.getBlock(address), error:
|
||||||
raise newLPStreamReadError(error)
|
raise newLPStreamReadError(error)
|
||||||
|
|
||||||
trace "Reading bytes from store stream", blockNum, cid = blk.cid, bytes = readBytes, blockOffset
|
trace "Reading bytes from store stream", manifestCid = self.manifest.cid.get(), numBlocks = self.manifest.blocksCount, blockNum, blkCid = blk.cid, bytes = readBytes, blockOffset
|
||||||
|
|
||||||
# Copy `readBytes` bytes starting at `blockOffset` from the block into the outbuf
|
# Copy `readBytes` bytes starting at `blockOffset` from the block into the outbuf
|
||||||
if blk.isEmpty:
|
if blk.isEmpty:
|
||||||
|
|
|
@ -34,12 +34,6 @@ proc new*(
|
||||||
proc slots*(validation: Validation): seq[SlotId] =
|
proc slots*(validation: Validation): seq[SlotId] =
|
||||||
validation.slots.toSeq
|
validation.slots.toSeq
|
||||||
|
|
||||||
proc iterateSlots(validation: Validation, action: proc(s: SlotId): Future[void] {.async.}) {.async.} =
|
|
||||||
# Copy of hashSet, for iteration.
|
|
||||||
let slots = validation.slots
|
|
||||||
for slotId in slots:
|
|
||||||
await action(slotId)
|
|
||||||
|
|
||||||
proc getCurrentPeriod(validation: Validation): UInt256 =
|
proc getCurrentPeriod(validation: Validation): UInt256 =
|
||||||
return validation.periodicity.periodOf(validation.clock.now().u256)
|
return validation.periodicity.periodOf(validation.clock.now().u256)
|
||||||
|
|
||||||
|
@ -61,12 +55,12 @@ proc subscribeSlotFilled(validation: Validation) {.async.} =
|
||||||
|
|
||||||
proc removeSlotsThatHaveEnded(validation: Validation) {.async.} =
|
proc removeSlotsThatHaveEnded(validation: Validation) {.async.} =
|
||||||
var ended: HashSet[SlotId]
|
var ended: HashSet[SlotId]
|
||||||
proc onSlot(slotId: SlotId) {.async.} =
|
let slots = validation.slots
|
||||||
|
for slotId in slots:
|
||||||
let state = await validation.market.slotState(slotId)
|
let state = await validation.market.slotState(slotId)
|
||||||
if state != SlotState.Filled:
|
if state != SlotState.Filled:
|
||||||
trace "Removing slot", slotId
|
trace "Removing slot", slotId
|
||||||
ended.incl(slotId)
|
ended.incl(slotId)
|
||||||
await validation.iterateSlots(onSlot)
|
|
||||||
validation.slots.excl(ended)
|
validation.slots.excl(ended)
|
||||||
|
|
||||||
proc markProofAsMissing(validation: Validation,
|
proc markProofAsMissing(validation: Validation,
|
||||||
|
@ -88,10 +82,10 @@ proc markProofAsMissing(validation: Validation,
|
||||||
error "Marking proof as missing failed", msg = e.msg
|
error "Marking proof as missing failed", msg = e.msg
|
||||||
|
|
||||||
proc markProofsAsMissing(validation: Validation) {.async.} =
|
proc markProofsAsMissing(validation: Validation) {.async.} =
|
||||||
proc onSlot(slotId: SlotId) {.async.} =
|
let slots = validation.slots
|
||||||
|
for slotId in slots:
|
||||||
let previousPeriod = validation.getCurrentPeriod() - 1
|
let previousPeriod = validation.getCurrentPeriod() - 1
|
||||||
await validation.markProofAsMissing(slotId, previousPeriod)
|
await validation.markProofAsMissing(slotId, previousPeriod)
|
||||||
await validation.iterateSlots(onSlot)
|
|
||||||
|
|
||||||
proc run(validation: Validation) {.async.} =
|
proc run(validation: Validation) {.async.} =
|
||||||
trace "Validation started"
|
trace "Validation started"
|
||||||
|
|
Loading…
Reference in New Issue