Add proper concurrent connections.
Add SeenTable to avoid continuous attempts to dead peers. Refactor onSecond. Block backward sync while forward sync is working. SyncManager now checks responses according corresponding requests + tests. SyncManager now watching for not progressing local_head_slot and resets SyncQueue.
This commit is contained in:
parent
3a6a9f8135
commit
12e28a1fa9
|
@ -436,19 +436,31 @@ proc handleMissingBlocks(node: BeaconNode) =
|
|||
# discard setTimer(Moment.now()) do (p: pointer):
|
||||
# handleMissingBlocks(node)
|
||||
|
||||
proc onSecond(node: BeaconNode, moment: Moment) {.async.} =
|
||||
node.handleMissingBlocks()
|
||||
proc onSecond(node: BeaconNode) {.async.} =
|
||||
## This procedure will be called once per second.
|
||||
if not(node.syncManager.inProgress):
|
||||
node.handleMissingBlocks()
|
||||
|
||||
let nextSecond = max(Moment.now(), moment + chronos.seconds(1))
|
||||
discard setTimer(nextSecond) do (p: pointer):
|
||||
asyncCheck node.onSecond(nextSecond)
|
||||
proc runOnSecondLoop(node: BeaconNode) {.async.} =
|
||||
var sleepTime = chronos.seconds(1)
|
||||
while true:
|
||||
await chronos.sleepAsync(sleepTime)
|
||||
let start = chronos.now(chronos.Moment)
|
||||
await node.onSecond()
|
||||
let finish = chronos.now(chronos.Moment)
|
||||
debug "onSecond task completed", elapsed = $(finish - start)
|
||||
if finish - start > chronos.seconds(1):
|
||||
sleepTime = chronos.seconds(0)
|
||||
else:
|
||||
sleepTime = chronos.seconds(1) - (finish - start)
|
||||
|
||||
proc runSyncLoop(node: BeaconNode) {.async.} =
|
||||
proc runForwardSyncLoop(node: BeaconNode) {.async.} =
|
||||
proc getLocalHeadSlot(): Slot =
|
||||
result = node.blockPool.head.blck.slot
|
||||
|
||||
proc getLocalWallSlot(): Slot {.gcsafe.} =
|
||||
let epoch = node.beaconClock.now().toSlot().slot.compute_epoch_at_slot() + 1'u64
|
||||
let epoch = node.beaconClock.now().toSlot().slot.compute_epoch_at_slot() +
|
||||
1'u64
|
||||
result = epoch.compute_start_slot_at_epoch()
|
||||
|
||||
proc updateLocalBlocks(list: openarray[SignedBeaconBlock]): Result[void, BlockError] =
|
||||
|
@ -490,7 +502,7 @@ proc runSyncLoop(node: BeaconNode) {.async.} =
|
|||
|
||||
node.network.peerPool.setScoreCheck(scoreCheck)
|
||||
|
||||
var syncman = newSyncManager[Peer, PeerID](
|
||||
node.syncManager = newSyncManager[Peer, PeerID](
|
||||
node.network.peerPool, getLocalHeadSlot, getLocalWallSlot,
|
||||
updateLocalBlocks,
|
||||
# 4 blocks per chunk is the optimal value right now, because our current
|
||||
|
@ -501,7 +513,7 @@ proc runSyncLoop(node: BeaconNode) {.async.} =
|
|||
chunkSize = 4
|
||||
)
|
||||
|
||||
await syncman.sync()
|
||||
await node.syncManager.sync()
|
||||
|
||||
proc currentSlot(node: BeaconNode): Slot =
|
||||
node.beaconClock.now.slotOrZero
|
||||
|
@ -695,11 +707,8 @@ proc run*(node: BeaconNode) =
|
|||
addTimer(fromNow) do (p: pointer):
|
||||
asyncCheck node.onSlotStart(curSlot, nextSlot)
|
||||
|
||||
let second = Moment.now() + chronos.seconds(1)
|
||||
discard setTimer(second) do (p: pointer):
|
||||
asyncCheck node.onSecond(second)
|
||||
|
||||
node.syncLoop = runSyncLoop(node)
|
||||
node.onSecondLoop = runOnSecondLoop(node)
|
||||
node.forwardSyncLoop = runForwardSyncLoop(node)
|
||||
|
||||
# main event loop
|
||||
while status == BeaconNodeStatus.Running:
|
||||
|
|
|
@ -19,7 +19,8 @@ import
|
|||
spec/[datatypes, crypto, digest, helpers],
|
||||
conf, time, beacon_chain_db, sszdump,
|
||||
attestation_pool, block_pool, eth2_network,
|
||||
beacon_node_types, mainchain_monitor, request_manager
|
||||
beacon_node_types, mainchain_monitor, request_manager,
|
||||
sync_manager
|
||||
|
||||
# This removes an invalid Nim warning that the digest module is unused here
|
||||
# It's currently used for `shortLog(head.blck.root)`
|
||||
|
@ -42,9 +43,11 @@ type
|
|||
beaconClock*: BeaconClock
|
||||
rpcServer*: RpcServer
|
||||
forkDigest*: ForkDigest
|
||||
syncManager*: SyncManager[Peer, PeerID]
|
||||
topicBeaconBlocks*: string
|
||||
topicAggregateAndProofs*: string
|
||||
syncLoop*: Future[void]
|
||||
forwardSyncLoop*: Future[void]
|
||||
onSecondLoop*: Future[void]
|
||||
|
||||
const
|
||||
MaxEmptySlotCount* = uint64(10*60) div SECONDS_PER_SLOT
|
||||
|
|
|
@ -42,6 +42,10 @@ type
|
|||
# warning about unused import (rpc/messages).
|
||||
GossipMsg = messages.Message
|
||||
|
||||
SeenItem* = object
|
||||
pinfo*: PeerInfo
|
||||
stamp*: chronos.Moment
|
||||
|
||||
# TODO Is this really needed?
|
||||
Eth2Node* = ref object of RootObj
|
||||
switch*: Switch
|
||||
|
@ -52,6 +56,11 @@ type
|
|||
libp2pTransportLoops*: seq[Future[void]]
|
||||
discoveryLoop: Future[void]
|
||||
metadata*: Eth2Metadata
|
||||
connectTimeout*: chronos.Duration
|
||||
seenThreshold*: chronos.Duration
|
||||
connQueue: AsyncQueue[PeerInfo]
|
||||
seenTable: Table[PeerID, SeenItem]
|
||||
connWorkers: seq[Future[void]]
|
||||
|
||||
EthereumNode = Eth2Node # needed for the definitions in p2p_backends_helpers
|
||||
|
||||
|
@ -189,6 +198,9 @@ const
|
|||
PeerScoreHighLimit* = 1000
|
||||
## Max value of peer's score
|
||||
|
||||
ConcurrentConnections* = 10
|
||||
## Maximum number of active concurrent connection requests.
|
||||
|
||||
template neterr(kindParam: Eth2NetworkingErrorKind): auto =
|
||||
err(type(result), Eth2NetworkingError(kind: kindParam))
|
||||
|
||||
|
@ -202,6 +214,12 @@ declareCounter gossip_messages_received,
|
|||
declarePublicGauge libp2p_successful_dials,
|
||||
"Number of successfully dialed peers"
|
||||
|
||||
declarePublicGauge libp2p_failed_dials,
|
||||
"Number of dialing attempts that failed"
|
||||
|
||||
declarePublicGauge libp2p_timeout_dials,
|
||||
"Number of dialing attempts that exceeded timeout"
|
||||
|
||||
declarePublicGauge libp2p_peers,
|
||||
"Number of active libp2p peers"
|
||||
|
||||
|
@ -648,26 +666,70 @@ proc toPeerInfo(r: Option[enr.TypedRecord]): PeerInfo =
|
|||
if r.isSome:
|
||||
return r.get.toPeerInfo
|
||||
|
||||
proc isSeen*(network: ETh2Node, pinfo: PeerInfo): bool =
|
||||
let currentTime = now(chronos.Moment)
|
||||
let item = network.seenTable.getOrDefault(pinfo.peerId)
|
||||
if isNil(item.pinfo):
|
||||
# Peer is not in SeenTable.
|
||||
return false
|
||||
if currentTime - item.stamp >= network.seenThreshold:
|
||||
network.seenTable.del(pinfo.peerId)
|
||||
return false
|
||||
return true
|
||||
|
||||
proc addSeen*(network: ETh2Node, pinfo: PeerInfo) =
|
||||
let item = SeenItem(pinfo: pinfo, stamp: now(chronos.Moment))
|
||||
network.seenTable[pinfo.peerId] = item
|
||||
|
||||
proc dialPeer*(node: Eth2Node, peerInfo: PeerInfo) {.async.} =
|
||||
logScope: peer = $peerInfo
|
||||
|
||||
debug "Connecting to peer"
|
||||
if await withTimeout(node.switch.connect(peerInfo), 10.seconds):
|
||||
var peer = node.getPeer(peerInfo)
|
||||
peer.wasDialed = true
|
||||
await node.switch.connect(peerInfo)
|
||||
var peer = node.getPeer(peerInfo)
|
||||
peer.wasDialed = true
|
||||
|
||||
#let msDial = newMultistream()
|
||||
#let conn = node.switch.connections.getOrDefault(peerInfo.id)
|
||||
#let ls = await msDial.list(conn)
|
||||
#debug "Supported protocols", ls
|
||||
#let msDial = newMultistream()
|
||||
#let conn = node.switch.connections.getOrDefault(peerInfo.id)
|
||||
#let ls = await msDial.list(conn)
|
||||
#debug "Supported protocols", ls
|
||||
|
||||
debug "Initializing connection"
|
||||
await initializeConnection(peer)
|
||||
debug "Initializing connection"
|
||||
await initializeConnection(peer)
|
||||
|
||||
inc libp2p_successful_dials
|
||||
debug "Network handshakes completed"
|
||||
else:
|
||||
debug "Connection timed out"
|
||||
inc libp2p_successful_dials
|
||||
debug "Network handshakes completed"
|
||||
|
||||
proc connectWorker(network: Eth2Node) {.async.} =
|
||||
debug "Connection worker started"
|
||||
while true:
|
||||
let pi = await network.connQueue.popFirst()
|
||||
let r1 = network.peerPool.hasPeer(pi.peerId)
|
||||
let r2 = network.isSeen(pi)
|
||||
|
||||
if not(r1) and not(r2):
|
||||
# We trying to connect to peers which are not present in our PeerPool and
|
||||
# not present in our SeenTable.
|
||||
var fut = network.dialPeer(pi)
|
||||
# We discarding here just because we going to check future state, to avoid
|
||||
# condition where connection happens and timeout reached.
|
||||
let res = await withTimeout(fut, network.connectTimeout)
|
||||
# We handling only timeout and errors, because successfull connections
|
||||
# will be stored in PeerPool.
|
||||
if fut.finished():
|
||||
if fut.failed() and not(fut.cancelled()):
|
||||
debug "Unable to establish connection with peer", peer = $pi,
|
||||
errMsg = fut.readError().msg
|
||||
inc libp2p_failed_dials
|
||||
network.addSeen(pi)
|
||||
continue
|
||||
debug "Connection to remote peer timed out", peer = $pi
|
||||
inc libp2p_timeout_dials
|
||||
network.addSeen(pi)
|
||||
else:
|
||||
debug "Peer is already connected or already seen", peer = $pi,
|
||||
peer_pool_has_peer = $r1, seen_table_has_peer = $r2,
|
||||
seen_table_size = len(network.seenTable)
|
||||
|
||||
proc runDiscoveryLoop*(node: Eth2Node) {.async.} =
|
||||
debug "Starting discovery loop"
|
||||
|
@ -686,8 +748,7 @@ proc runDiscoveryLoop*(node: Eth2Node) {.async.} =
|
|||
if peerInfo != nil:
|
||||
if peerInfo.id notin node.switch.connections:
|
||||
debug "Discovered new peer", peer = $peer
|
||||
# TODO do this in parallel
|
||||
await node.dialPeer(peerInfo)
|
||||
await node.connQueue.addLast(peerInfo)
|
||||
else:
|
||||
peerInfo.close()
|
||||
except CatchableError as err:
|
||||
|
@ -715,6 +776,10 @@ proc init*(T: type Eth2Node, conf: BeaconNodeConf, enrForkId: ENRForkID,
|
|||
result.switch = switch
|
||||
result.wantedPeers = conf.maxPeers
|
||||
result.peerPool = newPeerPool[Peer, PeerID](maxPeers = conf.maxPeers)
|
||||
result.connectTimeout = 10.seconds
|
||||
result.seenThreshold = 10.minutes
|
||||
result.seenTable = initTable[PeerID, SeenItem]()
|
||||
result.connQueue = newAsyncQueue[PeerInfo](ConcurrentConnections)
|
||||
result.metadata = getPersistentNetMetadata(conf)
|
||||
result.discovery = Eth2DiscoveryProtocol.new(
|
||||
conf, ip, tcpPort, udpPort, privKey.toRaw,
|
||||
|
@ -729,6 +794,9 @@ proc init*(T: type Eth2Node, conf: BeaconNodeConf, enrForkId: ENRForkID,
|
|||
if msg.protocolMounter != nil:
|
||||
msg.protocolMounter result
|
||||
|
||||
for i in 0 ..< ConcurrentConnections:
|
||||
result.connWorkers.add(connectWorker(result))
|
||||
|
||||
template publicKey*(node: Eth2Node): keys.PublicKey =
|
||||
node.discovery.privKey.toPublicKey.tryGet()
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import chronicles
|
||||
import options, deques, heapqueue, tables, strutils, sequtils, math
|
||||
import options, deques, heapqueue, tables, strutils, sequtils, math, algorithm
|
||||
import stew/[bitseqs, results], chronos, chronicles
|
||||
import spec/datatypes, spec/digest, peer_pool, eth2_network
|
||||
import eth/async_utils
|
||||
|
@ -20,11 +20,13 @@ const
|
|||
PeerScoreNoBlocks* = -100
|
||||
## Peer did not respond in time on `blocksByRange` request.
|
||||
PeerScoreGoodBlocks* = 100
|
||||
## Peer' `blocksByRange` answer is fine.
|
||||
## Peer's `blocksByRange` answer is fine.
|
||||
PeerScoreBadBlocks* = -1000
|
||||
## Peer response contains incorrect blocks
|
||||
## Peer's response contains incorrect blocks.
|
||||
PeerScoreBadResponse* = -1000
|
||||
## Peer's response is not in requested range.
|
||||
PeerScoreJokeBlocks* = -200
|
||||
## Peer response contains too many empty blocks
|
||||
## Peer response contains too many empty blocks.
|
||||
|
||||
type
|
||||
SyncFailureKind* = enum
|
||||
|
@ -32,7 +34,8 @@ type
|
|||
StatusDownload,
|
||||
StatusStale,
|
||||
EmptyProblem,
|
||||
BlockDownload
|
||||
BlockDownload,
|
||||
BadResponse
|
||||
|
||||
GetSlotCallback* = proc(): Slot {.gcsafe, raises: [Defect].}
|
||||
|
||||
|
@ -56,6 +59,10 @@ type
|
|||
request*: SyncRequest[T]
|
||||
data*: seq[SignedBeaconBlock]
|
||||
|
||||
SyncWaiter*[T] = object
|
||||
future: Future[bool]
|
||||
request: SyncRequest[T]
|
||||
|
||||
SyncQueue*[T] = ref object
|
||||
inpSlot*: Slot
|
||||
outSlot*: Slot
|
||||
|
@ -66,9 +73,9 @@ type
|
|||
queueSize*: int
|
||||
|
||||
counter*: uint64
|
||||
pending*: Table[uint64, Slot]
|
||||
pending*: Table[uint64, SyncRequest[T]]
|
||||
|
||||
waiters: seq[Future[bool]]
|
||||
waiters: seq[SyncWaiter[T]]
|
||||
syncUpdate*: SyncUpdateCallback[T]
|
||||
|
||||
debtsQueue: HeapQueue[SyncRequest[T]]
|
||||
|
@ -92,6 +99,7 @@ type
|
|||
chunkSize: uint64
|
||||
queue: SyncQueue[A]
|
||||
failures: seq[SyncFailure[A]]
|
||||
inProgress*: bool
|
||||
|
||||
SyncMoment* = object
|
||||
stamp*: chronos.Moment
|
||||
|
@ -126,6 +134,43 @@ proc getShortMap*[T](req: SyncRequest[T],
|
|||
slider = slider + req.step
|
||||
result = res
|
||||
|
||||
proc contains*[T](req: SyncRequest[T], slot: Slot): bool {.inline.} =
|
||||
slot >= req.slot and slot < req.slot + req.count * req.step and
|
||||
((slot - req.slot) mod req.step == 0)
|
||||
|
||||
proc cmp*[T](a, b: SyncRequest[T]): int =
|
||||
result = cmp(uint64(a.slot), uint64(b.slot))
|
||||
|
||||
proc checkResponse*[T](req: SyncRequest[T],
|
||||
data: openarray[SignedBeaconBlock]): bool =
|
||||
if len(data) == 0:
|
||||
# Impossible to verify empty response.
|
||||
return true
|
||||
|
||||
if uint64(len(data)) > req.count:
|
||||
# Number of blocks in response should be less or equal to number of
|
||||
# requested blocks.
|
||||
return false
|
||||
|
||||
var slot = req.slot
|
||||
var rindex = 0'u64
|
||||
var dindex = 0
|
||||
|
||||
while (rindex < req.count) and (dindex < len(data)):
|
||||
if slot < data[dindex].message.slot:
|
||||
discard
|
||||
elif slot == data[dindex].message.slot:
|
||||
inc(dindex)
|
||||
else:
|
||||
return false
|
||||
slot = slot + req.step
|
||||
rindex = rindex + 1'u64
|
||||
|
||||
if dindex == len(data):
|
||||
return true
|
||||
else:
|
||||
return false
|
||||
|
||||
proc getFullMap*[T](req: SyncRequest[T],
|
||||
data: openarray[SignedBeaconBlock]): string =
|
||||
# Returns all slot numbers in ``data`` as comma-delimeted string.
|
||||
|
@ -251,9 +296,9 @@ proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T],
|
|||
chunkSize: chunkSize,
|
||||
queueSize: queueSize,
|
||||
syncUpdate: updateCb,
|
||||
waiters: newSeq[Future[bool]](),
|
||||
waiters: newSeq[SyncWaiter[T]](),
|
||||
counter: 1'u64,
|
||||
pending: initTable[uint64, Slot](),
|
||||
pending: initTable[uint64, SyncRequest[T]](),
|
||||
debtsQueue: initHeapQueue[SyncRequest[T]](),
|
||||
inpSlot: start,
|
||||
outSlot: start
|
||||
|
@ -276,7 +321,7 @@ proc lastSlot*[T](req: SyncRequest[T]): Slot {.inline.} =
|
|||
proc makePending*[T](sq: SyncQueue[T], req: var SyncRequest[T]) =
|
||||
req.index = sq.counter
|
||||
sq.counter = sq.counter + 1'u64
|
||||
sq.pending[req.index] = req.slot
|
||||
sq.pending[req.index] = req
|
||||
|
||||
proc updateLastSlot*[T](sq: SyncQueue[T], last: Slot) {.inline.} =
|
||||
## Update last slot stored in queue ``sq`` with value ``last``.
|
||||
|
@ -287,23 +332,25 @@ proc updateLastSlot*[T](sq: SyncQueue[T], last: Slot) {.inline.} =
|
|||
|
||||
proc wakeupWaiters[T](sq: SyncQueue[T], flag = true) {.inline.} =
|
||||
## Wakeup one or all blocked waiters.
|
||||
for waiter in sq.waiters:
|
||||
if not(waiter.finished()):
|
||||
waiter.complete(flag)
|
||||
for item in sq.waiters:
|
||||
if not(item.future.finished()):
|
||||
item.future.complete(flag)
|
||||
|
||||
proc waitForChanges[T](sq: SyncQueue[T]): Future[bool] {.async.} =
|
||||
proc waitForChanges[T](sq: SyncQueue[T],
|
||||
req: SyncRequest[T]): Future[bool] {.async.} =
|
||||
## Create new waiter and wait for completion from `wakeupWaiters()`.
|
||||
var waiter = newFuture[bool]("SyncQueue.waitForChanges")
|
||||
sq.waiters.add(waiter)
|
||||
var waitfut = newFuture[bool]("SyncQueue.waitForChanges")
|
||||
let waititem = SyncWaiter[T](future: waitfut, request: req)
|
||||
sq.waiters.add(waititem)
|
||||
try:
|
||||
result = await waiter
|
||||
result = await waitfut
|
||||
finally:
|
||||
sq.waiters.delete(sq.waiters.find(waiter))
|
||||
sq.waiters.delete(sq.waiters.find(waititem))
|
||||
|
||||
proc wakeupAndWaitWaiters[T](sq: SyncQueue[T]) {.async.} =
|
||||
## This procedure will perform wakeupWaiters(false) and blocks until last
|
||||
## waiter will be awakened.
|
||||
var waitChanges = sq.waitForChanges()
|
||||
var waitChanges = sq.waitForChanges(SyncRequest.empty(T))
|
||||
sq.wakeupWaiters(false)
|
||||
discard await waitChanges
|
||||
|
||||
|
@ -390,7 +437,7 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
|
|||
while true:
|
||||
if (sq.queueSize > 0) and
|
||||
(sr.slot >= sq.outSlot + uint64(sq.queueSize) * sq.chunkSize):
|
||||
let res = await sq.waitForChanges()
|
||||
let res = await sq.waitForChanges(sr)
|
||||
if res:
|
||||
continue
|
||||
else:
|
||||
|
@ -719,8 +766,8 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
|
|||
local_head_slot = headSlot, peer = peer,
|
||||
tolerance_value = man.toleranceValue,
|
||||
peer_score = peer.getScore(), topics = "syncman"
|
||||
# let failure = SyncFailure.init(SyncFailureKind.StatusInvalid, peer)
|
||||
# man.failures.add(failure)
|
||||
let failure = SyncFailure.init(SyncFailureKind.StatusInvalid, peer)
|
||||
man.failures.add(failure)
|
||||
break
|
||||
|
||||
if peerAge >= man.maxStatusAge:
|
||||
|
@ -734,8 +781,8 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
|
|||
debug "Failed to get remote peer's status, exiting", peer = peer,
|
||||
peer_score = peer.getScore(), peer_head_slot = peerSlot,
|
||||
topics = "syncman"
|
||||
# let failure = SyncFailure.init(SyncFailureKind.StatusDownload, peer)
|
||||
# man.failures.add(failure)
|
||||
let failure = SyncFailure.init(SyncFailureKind.StatusDownload, peer)
|
||||
man.failures.add(failure)
|
||||
break
|
||||
|
||||
let newPeerSlot = peer.getHeadSlot()
|
||||
|
@ -746,8 +793,8 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
|
|||
local_head_slot = headSlot,
|
||||
remote_new_head_slot = newPeerSlot,
|
||||
peer = peer, peer_score = peer.getScore(), topics = "syncman"
|
||||
# let failure = SyncFailure.init(SyncFailureKind.StatusStale, peer)
|
||||
# man.failures.add(failure)
|
||||
let failure = SyncFailure.init(SyncFailureKind.StatusStale, peer)
|
||||
man.failures.add(failure)
|
||||
break
|
||||
|
||||
debug "Peer's status information updated", wall_clock_slot = wallSlot,
|
||||
|
@ -776,8 +823,8 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
|
|||
# are available in PeerPool. We going to wait for RESP_TIMEOUT time,
|
||||
# so all pending requests should be finished at this moment.
|
||||
await sleepAsync(RESP_TIMEOUT)
|
||||
# let failure = SyncFailure.init(SyncFailureKind.EmptyProblem, peer)
|
||||
# man.failures.add(failure)
|
||||
let failure = SyncFailure.init(SyncFailureKind.EmptyProblem, peer)
|
||||
man.failures.add(failure)
|
||||
break
|
||||
|
||||
debug "Creating new request for peer", wall_clock_slot = wallSlot,
|
||||
|
@ -794,6 +841,18 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
|
|||
blocks_map = smap, request_slot = req.slot,
|
||||
request_count = req.count, request_step = req.step,
|
||||
peer = peer, peer_score = peer.getScore(), topics = "syncman"
|
||||
|
||||
if not(checkResponse(req, data)):
|
||||
peer.updateScore(PeerScoreBadResponse)
|
||||
warn "Received blocks sequence is not in requested range",
|
||||
blocks_count = len(data), blocks_map = smap,
|
||||
request_slot = req.slot, request_count = req.count,
|
||||
request_step = req.step, peer = peer,
|
||||
peer_score = peer.getScore(), topics = "syncman"
|
||||
let failure = SyncFailure.init(SyncFailureKind.BadResponse, peer)
|
||||
man.failures.add(failure)
|
||||
break
|
||||
|
||||
# Scoring will happen in `syncUpdate`.
|
||||
await man.queue.push(req, data)
|
||||
# Cleaning up failures.
|
||||
|
@ -805,8 +864,8 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
|
|||
request_slot = req.slot, request_count = req.count,
|
||||
request_step = req.step, peer = peer,
|
||||
peer_score = peer.getScore(), topics = "syncman"
|
||||
# let failure = SyncFailure.init(SyncFailureKind.BlockDownload, peer)
|
||||
# man.failures.add(failure)
|
||||
let failure = SyncFailure.init(SyncFailureKind.BlockDownload, peer)
|
||||
man.failures.add(failure)
|
||||
break
|
||||
|
||||
result = peer
|
||||
|
@ -830,7 +889,7 @@ proc sync*[A, B](man: SyncManager[A, B]) {.async.} =
|
|||
template workersCount(): int =
|
||||
if isNil(acquireFut): len(pending) else: (len(pending) - 1)
|
||||
|
||||
proc speedometerTask() {.async.} =
|
||||
proc watchTask() {.async.} =
|
||||
while true:
|
||||
let lsm1 = SyncMoment.now(man.getLocalHeadSlot())
|
||||
await sleepAsync(chronos.seconds(int(SECONDS_PER_SLOT)))
|
||||
|
@ -838,11 +897,18 @@ proc sync*[A, B](man: SyncManager[A, B]) {.async.} =
|
|||
if workersCount() == 0:
|
||||
syncSpeed = 0.0
|
||||
else:
|
||||
syncSpeed = speed(lsm1, lsm2)
|
||||
if (lsm2.slot - lsm1.slot == 0'u64) and (workersCount() > 1):
|
||||
debug "Syncing process is not progressing, reset the queue",
|
||||
workers_count = workersCount(),
|
||||
to_slot = man.queue.outSlot,
|
||||
local_head_slot = lsm1.slot
|
||||
await man.queue.resetWait(none[Slot]())
|
||||
else:
|
||||
syncSpeed = speed(lsm1, lsm2)
|
||||
|
||||
debug "Synchronization loop started", topics = "syncman"
|
||||
|
||||
traceAsyncErrors speedometerTask()
|
||||
traceAsyncErrors watchTask()
|
||||
|
||||
while true:
|
||||
wallSlot = man.getLocalWallSlot()
|
||||
|
@ -866,6 +932,7 @@ proc sync*[A, B](man: SyncManager[A, B]) {.async.} =
|
|||
difference = (wallSlot - headSlot),
|
||||
max_head_age = man.maxHeadAge, topics = "syncman"
|
||||
if len(pending) == 0:
|
||||
man.inProgress = false
|
||||
await sleepAsync(man.sleepTime)
|
||||
else:
|
||||
var peerFut = one(pending)
|
||||
|
@ -873,6 +940,8 @@ proc sync*[A, B](man: SyncManager[A, B]) {.async.} =
|
|||
# later.
|
||||
discard await withTimeout(peerFut, man.sleepTime)
|
||||
else:
|
||||
man.inProgress = true
|
||||
|
||||
if isNil(acquireFut):
|
||||
acquireFut = man.pool.acquire()
|
||||
pending.add(acquireFut)
|
||||
|
@ -949,10 +1018,11 @@ proc sync*[A, B](man: SyncManager[A, B]) {.async.} =
|
|||
|
||||
if len(man.failures) > man.maxRecurringFailures and (workersCount() > 1):
|
||||
debug "Number of recurring failures exceeds limit, reseting queue",
|
||||
workers_count = $workers_count(), rec_failures = $len(man.failures)
|
||||
workers_count = workers_count(), rec_failures = len(man.failures)
|
||||
await man.queue.resetWait(none[Slot]())
|
||||
|
||||
debug "Synchronization loop end tick", wall_head_slot = wallSlot,
|
||||
local_head_slot = headSlot, workers_count = workersCount(),
|
||||
waiting_for_new_peer = $not(isNil(acquireFut)),
|
||||
sync_speed = syncSpeed, topics = "syncman"
|
||||
sync_speed = syncSpeed, queue_slot = man.queue.outSlot,
|
||||
topics = "syncman"
|
||||
|
|
|
@ -379,10 +379,103 @@ suite "SyncManager test suite":
|
|||
|
||||
for counter in countdown(32'u64, 2'u64):
|
||||
let req = SyncRequest[SomeTPeer](slot: Slot(10), count: counter,
|
||||
step: 1'u64)
|
||||
step: 1'u64)
|
||||
let sr = SyncResult[SomeTPeer](request: req, data: chain1)
|
||||
check sr.getLastNonEmptySlot() == Slot(10)
|
||||
|
||||
let req = SyncRequest[SomeTPeer](slot: Slot(100), count: 1'u64, step: 1'u64)
|
||||
let sr = SyncResult[SomeTPeer](request: req, data: chain2)
|
||||
check sr.getLastNonEmptySlot() == Slot(100)
|
||||
|
||||
test "[SyncQueue] contains() test":
|
||||
proc checkRange[T](req: SyncRequest[T]): bool =
|
||||
var slot = req.slot
|
||||
var counter = 0'u64
|
||||
while counter < req.count:
|
||||
if not(req.contains(slot)):
|
||||
return false
|
||||
slot = slot + req.step
|
||||
counter = counter + 1'u64
|
||||
return true
|
||||
|
||||
var req1 = SyncRequest[SomeTPeer](slot: Slot(5), count: 10'u64, step: 1'u64)
|
||||
var req2 = SyncRequest[SomeTPeer](slot: Slot(1), count: 10'u64, step: 2'u64)
|
||||
var req3 = SyncRequest[SomeTPeer](slot: Slot(2), count: 10'u64, step: 3'u64)
|
||||
var req4 = SyncRequest[SomeTPeer](slot: Slot(3), count: 10'u64, step: 4'u64)
|
||||
var req5 = SyncRequest[SomeTPeer](slot: Slot(4), count: 10'u64, step: 5'u64)
|
||||
|
||||
check:
|
||||
req1.checkRange() == true
|
||||
req2.checkRange() == true
|
||||
req3.checkRange() == true
|
||||
req4.checkRange() == true
|
||||
req5.checkRange() == true
|
||||
|
||||
req1.contains(Slot(4)) == false
|
||||
req1.contains(Slot(15)) == false
|
||||
|
||||
req2.contains(Slot(0)) == false
|
||||
req2.contains(Slot(21)) == false
|
||||
req2.contains(Slot(20)) == false
|
||||
|
||||
req3.contains(Slot(0)) == false
|
||||
req3.contains(Slot(1)) == false
|
||||
req3.contains(Slot(32)) == false
|
||||
req3.contains(Slot(31)) == false
|
||||
req3.contains(Slot(30)) == false
|
||||
|
||||
req4.contains(Slot(0)) == false
|
||||
req4.contains(Slot(1)) == false
|
||||
req4.contains(Slot(2)) == false
|
||||
req4.contains(Slot(43)) == false
|
||||
req4.contains(Slot(42)) == false
|
||||
req4.contains(Slot(41)) == false
|
||||
req4.contains(Slot(40)) == false
|
||||
|
||||
req5.contains(Slot(0)) == false
|
||||
req5.contains(Slot(1)) == false
|
||||
req5.contains(Slot(2)) == false
|
||||
req5.contains(Slot(3)) == false
|
||||
req5.contains(Slot(54)) == false
|
||||
req5.contains(Slot(53)) == false
|
||||
req5.contains(Slot(52)) == false
|
||||
req5.contains(Slot(51)) == false
|
||||
req5.contains(Slot(50)) == false
|
||||
|
||||
test "[SyncQueue] checkResponse() test":
|
||||
let chain = createChain(Slot(10), Slot(20))
|
||||
let r1 = SyncRequest[SomeTPeer](slot: Slot(11), count: 1'u64, step: 1'u64)
|
||||
let r21 = SyncRequest[SomeTPeer](slot: Slot(11), count: 2'u64, step: 1'u64)
|
||||
let r22 = SyncRequest[SomeTPeer](slot: Slot(11), count: 2'u64, step: 2'u64)
|
||||
|
||||
check:
|
||||
checkResponse(r1, @[chain[1]]) == true
|
||||
checkResponse(r1, @[]) == true
|
||||
checkResponse(r1, @[chain[1], chain[1]]) == false
|
||||
checkResponse(r1, @[chain[0]]) == false
|
||||
checkResponse(r1, @[chain[2]]) == false
|
||||
|
||||
checkResponse(r21, @[chain[1]]) == true
|
||||
checkResponse(r21, @[]) == true
|
||||
checkResponse(r21, @[chain[1], chain[2]]) == true
|
||||
checkResponse(r21, @[chain[2]]) == true
|
||||
checkResponse(r21, @[chain[1], chain[2], chain[3]]) == false
|
||||
checkResponse(r21, @[chain[0], chain[1]]) == false
|
||||
checkResponse(r21, @[chain[0]]) == false
|
||||
checkResponse(r21, @[chain[2], chain[1]]) == false
|
||||
checkResponse(r21, @[chain[2], chain[1]]) == false
|
||||
checkResponse(r21, @[chain[2], chain[3]]) == false
|
||||
checkResponse(r21, @[chain[3]]) == false
|
||||
|
||||
checkResponse(r22, @[chain[1]]) == true
|
||||
checkResponse(r22, @[]) == true
|
||||
checkResponse(r22, @[chain[1], chain[3]]) == true
|
||||
checkResponse(r22, @[chain[3]]) == true
|
||||
checkResponse(r22, @[chain[1], chain[3], chain[5]]) == false
|
||||
checkResponse(r22, @[chain[0], chain[1]]) == false
|
||||
checkResponse(r22, @[chain[1], chain[2]]) == false
|
||||
checkResponse(r22, @[chain[2], chain[3]]) == false
|
||||
checkResponse(r22, @[chain[3], chain[4]]) == false
|
||||
checkResponse(r22, @[chain[4], chain[5]]) == false
|
||||
checkResponse(r22, @[chain[4]]) == false
|
||||
checkResponse(r22, @[chain[3], chain[1]]) == false
|
||||
|
|
Loading…
Reference in New Issue