Send pluralized wantBlock messages (#1016)
* don't unroll wantCids when sending wantBlock message in blockPresenceHandler * workaround logging * Fixes logformatting upraises for sequences. * Applies upraises rule for setProperty of textmode for sequences. * Replaces upraises with raises * Removes redundant log in sendWantHave
This commit is contained in:
parent
921159f87f
commit
8e29939cf8
|
@ -129,26 +129,26 @@ proc stop*(b: BlockExcEngine) {.async.} =
|
||||||
|
|
||||||
proc sendWantHave(
|
proc sendWantHave(
|
||||||
b: BlockExcEngine,
|
b: BlockExcEngine,
|
||||||
address: BlockAddress, # pluralize this entire call chain, please
|
addresses: seq[BlockAddress],
|
||||||
excluded: seq[BlockExcPeerCtx],
|
excluded: seq[BlockExcPeerCtx],
|
||||||
peers: seq[BlockExcPeerCtx]): Future[void] {.async.} =
|
peers: seq[BlockExcPeerCtx]): Future[void] {.async.} =
|
||||||
trace "Sending wantHave request to peers", address
|
|
||||||
for p in peers:
|
for p in peers:
|
||||||
if p notin excluded:
|
if p notin excluded:
|
||||||
if address notin p.peerHave:
|
let toAsk = addresses.filterIt(it notin p.peerHave)
|
||||||
|
trace "Sending wantHave request", toAsk, peer = p.id
|
||||||
await b.network.request.sendWantList(
|
await b.network.request.sendWantList(
|
||||||
p.id,
|
p.id,
|
||||||
@[address],
|
toAsk,
|
||||||
wantType = WantType.WantHave) # we only want to know if the peer has the block
|
wantType = WantType.WantHave)
|
||||||
|
|
||||||
proc sendWantBlock(
|
proc sendWantBlock(
|
||||||
b: BlockExcEngine,
|
b: BlockExcEngine,
|
||||||
address: BlockAddress, # pluralize this entire call chain, please
|
addresses: seq[BlockAddress],
|
||||||
blockPeer: BlockExcPeerCtx): Future[void] {.async.} =
|
blockPeer: BlockExcPeerCtx): Future[void] {.async.} =
|
||||||
trace "Sending wantBlock request to", peer = blockPeer.id, address
|
trace "Sending wantBlock request to", addresses, peer = blockPeer.id
|
||||||
await b.network.request.sendWantList(
|
await b.network.request.sendWantList(
|
||||||
blockPeer.id,
|
blockPeer.id,
|
||||||
@[address],
|
addresses,
|
||||||
wantType = WantType.WantBlock) # we want this remote to send us a block
|
wantType = WantType.WantBlock) # we want this remote to send us a block
|
||||||
|
|
||||||
proc monitorBlockHandle(
|
proc monitorBlockHandle(
|
||||||
|
@ -197,9 +197,10 @@ proc requestBlock*(
|
||||||
if peer =? maybePeer:
|
if peer =? maybePeer:
|
||||||
asyncSpawn b.monitorBlockHandle(blockFuture, address, peer.id)
|
asyncSpawn b.monitorBlockHandle(blockFuture, address, peer.id)
|
||||||
b.pendingBlocks.setInFlight(address)
|
b.pendingBlocks.setInFlight(address)
|
||||||
await b.sendWantBlock(address, peer)
|
# TODO: Send more block addresses if at all sensible.
|
||||||
|
await b.sendWantBlock(@[address], peer)
|
||||||
codex_block_exchange_want_block_lists_sent.inc()
|
codex_block_exchange_want_block_lists_sent.inc()
|
||||||
await b.sendWantHave(address, @[peer], toSeq(b.peers))
|
await b.sendWantHave(@[address], @[peer], toSeq(b.peers))
|
||||||
codex_block_exchange_want_have_lists_sent.inc()
|
codex_block_exchange_want_have_lists_sent.inc()
|
||||||
|
|
||||||
# Don't let timeouts bubble up. We can't be too broad here or we break
|
# Don't let timeouts bubble up. We can't be too broad here or we break
|
||||||
|
@ -246,8 +247,7 @@ proc blockPresenceHandler*(
|
||||||
|
|
||||||
if wantCids.len > 0:
|
if wantCids.len > 0:
|
||||||
trace "Peer has blocks in our wantList", peer, wantCount = wantCids.len
|
trace "Peer has blocks in our wantList", peer, wantCount = wantCids.len
|
||||||
discard await allFinished(
|
await b.sendWantBlock(wantCids, peerCtx)
|
||||||
wantCids.mapIt(b.sendWantBlock(it, peerCtx)))
|
|
||||||
|
|
||||||
# if none of the connected peers report our wants in their have list,
|
# if none of the connected peers report our wants in their have list,
|
||||||
# fire up discovery
|
# fire up discovery
|
||||||
|
|
|
@ -98,7 +98,6 @@ import pkg/questionable/results
|
||||||
import ./utils/json except formatIt # TODO: remove exception?
|
import ./utils/json except formatIt # TODO: remove exception?
|
||||||
import pkg/stew/byteutils
|
import pkg/stew/byteutils
|
||||||
import pkg/stint
|
import pkg/stint
|
||||||
import pkg/upraises
|
|
||||||
|
|
||||||
export byteutils
|
export byteutils
|
||||||
export chronicles except toJson, formatIt, `%`
|
export chronicles except toJson, formatIt, `%`
|
||||||
|
@ -107,7 +106,6 @@ export sequtils
|
||||||
export json except formatIt
|
export json except formatIt
|
||||||
export strutils
|
export strutils
|
||||||
export sugar
|
export sugar
|
||||||
export upraises
|
|
||||||
export results
|
export results
|
||||||
|
|
||||||
func shortLog*(long: string, ellipses = "*", start = 3, stop = 6): string =
|
func shortLog*(long: string, ellipses = "*", start = 3, stop = 6): string =
|
||||||
|
@ -184,12 +182,12 @@ template formatIt*(format: LogFormat, T: typedesc, body: untyped) =
|
||||||
let v = opts.map(opt => opt.formatJsonOption)
|
let v = opts.map(opt => opt.formatJsonOption)
|
||||||
setProperty(r, key, json.`%`(v))
|
setProperty(r, key, json.`%`(v))
|
||||||
|
|
||||||
proc setProperty*(r: var JsonRecord, key: string, val: seq[T]) =
|
proc setProperty*(r: var JsonRecord, key: string, val: seq[T]) {.raises:[ValueError, IOError].} =
|
||||||
var it {.inject, used.}: T
|
var it {.inject, used.}: T
|
||||||
let v = val.map(it => body)
|
let v = val.map(it => body)
|
||||||
setProperty(r, key, json.`%`(v))
|
setProperty(r, key, json.`%`(v))
|
||||||
|
|
||||||
proc setProperty*(r: var JsonRecord, key: string, val: T) {.upraises:[ValueError, IOError].} =
|
proc setProperty*(r: var JsonRecord, key: string, val: T) {.raises:[ValueError, IOError].} =
|
||||||
var it {.inject, used.}: T = val
|
var it {.inject, used.}: T = val
|
||||||
let v = body
|
let v = body
|
||||||
setProperty(r, key, json.`%`(v))
|
setProperty(r, key, json.`%`(v))
|
||||||
|
@ -220,12 +218,12 @@ template formatIt*(format: LogFormat, T: typedesc, body: untyped) =
|
||||||
let v = opts.map(opt => opt.formatTextLineOption)
|
let v = opts.map(opt => opt.formatTextLineOption)
|
||||||
setProperty(r, key, v.formatTextLineSeq)
|
setProperty(r, key, v.formatTextLineSeq)
|
||||||
|
|
||||||
proc setProperty*(r: var TextLineRecord, key: string, val: seq[T]) =
|
proc setProperty*(r: var TextLineRecord, key: string, val: seq[T]) {.raises:[ValueError, IOError].} =
|
||||||
var it {.inject, used.}: T
|
var it {.inject, used.}: T
|
||||||
let v = val.map(it => body)
|
let v = val.map(it => body)
|
||||||
setProperty(r, key, v.formatTextLineSeq)
|
setProperty(r, key, v.formatTextLineSeq)
|
||||||
|
|
||||||
proc setProperty*(r: var TextLineRecord, key: string, val: T) {.upraises:[ValueError, IOError].} =
|
proc setProperty*(r: var TextLineRecord, key: string, val: T) {.raises:[ValueError, IOError].} =
|
||||||
var it {.inject, used.}: T = val
|
var it {.inject, used.}: T = val
|
||||||
let v = body
|
let v = body
|
||||||
setProperty(r, key, v)
|
setProperty(r, key, v)
|
||||||
|
|
Loading…
Reference in New Issue