Initial commit.

This commit is contained in:
cheatfate 2020-07-25 09:12:23 +03:00 committed by zah
parent 20a2525390
commit 99dcb81e77
3 changed files with 95 additions and 38 deletions

View File

@ -1,6 +1,6 @@
import
# Std lib
typetraits, strutils, os, random, algorithm, sequtils,
typetraits, strutils, os, random, algorithm, sequtils, math,
options as stdOptions,
# Status libs
@ -78,6 +78,10 @@ type
next_fork_version*: Version
next_fork_epoch*: Epoch
AverageThroughput* = object
count*: uint64
average*: float
Peer* = ref object
network*: Eth2Node
info*: PeerInfo
@ -86,6 +90,7 @@ type
connectionState*: ConnectionState
protocolStates*: seq[RootRef]
maxInactivityAllowed*: Duration
netThroughput: AverageThroughput
score*: int
lacksSnappy: bool
@ -302,10 +307,8 @@ proc getKey*(peer: Peer): PeerID {.inline.} =
proc getFuture*(peer: Peer): Future[void] {.inline.} =
result = peer.info.lifeFuture()
proc `<`*(a, b: Peer): bool =
result = `<`(a.score, b.score)
proc getScore*(a: Peer): int =
## Returns current score value for peer ``peer``.
result = a.score
proc updateScore*(peer: Peer, score: int) {.inline.} =
@ -314,6 +317,44 @@ proc updateScore*(peer: Peer, score: int) {.inline.} =
if peer.score > PeerScoreHighLimit:
peer.score = PeerScoreHighLimit
proc calcThroughput(dur: Duration, value: uint64): float {.inline.} =
let secs = float(chronos.seconds(1).nanoseconds)
if isZero(dur):
0.0
else:
float(value) * (secs / float(dur.nanoseconds))
proc updateNetThroughput*(peer: Peer, dur: Duration,
bytesCount: uint64) {.inline.} =
## Update peer's ``peer`` network throughput.
let bytesPerSecond = calcThroughput(dur, bytesCount)
let a = peer.netThroughput.average
let n = peer.netThroughput.count
peer.netThroughput.average = a + (bytesPerSecond - a) / float(n + 1)
inc(peer.netThroughput.count)
proc netBps*(peer: Peer): float {.inline.} =
## Returns current network throughput average value in Bps for peer ``peer``.
round((peer.netThroughput.average * 10_000) / 10_000)
proc netKbps*(peer: Peer): float {.inline.} =
## Returns current network throughput average value in Kbps for peer ``peer``.
round(((peer.netThroughput.average / 1024) * 10_000) / 10_000)
proc netMbps*(peer: Peer): float {.inline.} =
## Returns current network throughput average value in Mbps for peer ``peer``.
round(((peer.netThroughput.average / (1024 * 1024)) * 10_000) / 10_000)
proc `<`*(a, b: Peer): bool =
## Comparison function, which first checks peer's scores, and if the peers'
## score is equal it compares peers' network throughput.
if a.score < b.score:
true
elif a.score == b.score:
(a.netThroughput.average < b.netThroughput.average)
else:
false
proc isSeen*(network: ETh2Node, pinfo: PeerInfo): bool =
let currentTime = now(chronos.Moment)
let item = network.seenTable.getOrDefault(pinfo.peerId)
@ -473,10 +514,8 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
# Read the response
return awaitWithTimeout(
readResponse(when useNativeSnappy: libp2pInput(stream)
else: stream,
peer.lacksSnappy,
ResponseMsg),
readResponse(when useNativeSnappy: libp2pInput(stream) else: stream,
peer.lacksSnappy, peer, ResponseMsg),
deadline, neterr(ReadResponseTimeout))
finally:
await safeClose(stream)
@ -593,7 +632,7 @@ proc handleIncomingStream(network: Eth2Node,
let msg = if sizeof(MsgRec) > 0:
try:
awaitWithTimeout(readChunkPayload(s, noSnappy, MsgRec), deadline):
awaitWithTimeout(readChunkPayload(s, noSnappy, peer, MsgRec), deadline):
returnInvalidRequest(errorMsgLit "Request full data not sent in time")
except SerializationError as err:

View File

@ -90,9 +90,9 @@ proc uncompressFramedStream*(conn: Connection,
return ok output
proc readChunkPayload(conn: Connection,
noSnappy: bool,
proc readChunkPayload(conn: Connection, noSnappy: bool, peer: Peer,
MsgType: type): Future[NetRes[MsgType]] {.async.} =
let sm = now(chronos.Moment)
let size =
try: await conn.readVarint()
except LPStreamEOFError: #, LPStreamIncompleteError, InvalidVarintError
@ -112,17 +112,23 @@ proc readChunkPayload(conn: Connection,
if noSnappy:
var bytes = newSeq[byte](size.int)
await conn.readExactly(addr bytes[0], bytes.len)
# `10` is the maximum size of variable integer on wire, so error could
# not be significant.
peer.updateNetThroughput(now(chronos.Moment) - sm, uint64(10 + len(bytes)))
return ok SSZ.decode(bytes, MsgType)
else:
let data = await conn.uncompressFramedStream(size.int)
if data.isOk:
# `10` is the maximum size of variable integer on wire, so error could
# not be significant.
peer.updateNetThroughput(now(chronos.Moment) - sm,
uint64(10 + size))
return ok SSZ.decode(data.get(), MsgType)
else:
debug "Snappy decompression/read failed", msg = $data.error, conn = $conn
return neterr InvalidSnappyBytes
proc readResponseChunk(conn: Connection,
noSnappy: bool,
proc readResponseChunk(conn: Connection, noSnappy: bool, peer: Peer,
MsgType: typedesc): Future[NetRes[MsgType]] {.async.} =
try:
var responseCodeByte: byte
@ -138,7 +144,7 @@ proc readResponseChunk(conn: Connection,
let responseCode = ResponseCode responseCodeByte
case responseCode:
of InvalidRequest, ServerError:
let errorMsgChunk = await readChunkPayload(conn, noSnappy, ErrorMsg)
let errorMsgChunk = await readChunkPayload(conn, noSnappy, peer, ErrorMsg)
let errorMsg = if errorMsgChunk.isOk: errorMsgChunk.value
else: return err(errorMsgChunk.error)
return err Eth2NetworkingError(kind: ReceivedErrorResponse,
@ -147,19 +153,18 @@ proc readResponseChunk(conn: Connection,
of Success:
discard
return await readChunkPayload(conn, noSnappy, MsgType)
return await readChunkPayload(conn, noSnappy, peer, MsgType)
except LPStreamEOFError, LPStreamIncompleteError:
return neterr UnexpectedEOF
proc readResponse(conn: Connection,
noSnappy: bool,
proc readResponse(conn: Connection, noSnappy: bool, peer: Peer,
MsgType: type): Future[NetRes[MsgType]] {.gcsafe, async.} =
when MsgType is seq:
type E = ElemType(MsgType)
var results: MsgType
while true:
let nextRes = await conn.readResponseChunk(noSnappy, E)
let nextRes = await conn.readResponseChunk(noSnappy, peer, E)
if nextRes.isErr:
if nextRes.error.kind == PotentiallyExpectedEOF:
return ok results
@ -167,4 +172,4 @@ proc readResponse(conn: Connection,
else:
results.add nextRes.value
else:
return await conn.readResponseChunk(noSnappy, MsgType)
return await conn.readResponseChunk(noSnappy, peer, MsgType)

View File

@ -613,18 +613,21 @@ proc getBlocks*[A, B](man: SyncManager[A, B], peer: A,
doAssert(not(req.isEmpty()), "Request must not be empty!")
debug "Requesting blocks from peer", peer = peer,
slot = req.slot, slot_count = req.count, step = req.step,
peer_score = peer.getScore(), topics = "syncman"
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
topics = "syncman"
var workFut = awaitne beaconBlocksByRange(peer, req.slot, req.count, req.step)
if workFut.failed():
debug "Error, while waiting getBlocks response", peer = peer,
slot = req.slot, slot_count = req.count, step = req.step,
errMsg = workFut.readError().msg, topics = "syncman"
errMsg = workFut.readError().msg, peer_speed = peer.netKbps(),
topics = "syncman"
else:
let res = workFut.read()
if res.isErr:
debug "Error, while reading getBlocks response",
peer = peer, slot = req.slot, count = req.count,
step = req.step, topics = "syncman"
step = req.step, peer_speed = peer.netKbps(),
topics = "syncman"
result = res
template headAge(): uint64 =
@ -663,6 +666,7 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
debug "Starting syncing with peer", peer = peer,
peer_score = peer.getScore(),
peer_speed = peer.netKbps(),
topics = "syncman"
try:
while true:
@ -674,14 +678,15 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
debug "Peer's syncing status", wall_clock_slot = wallSlot,
remote_head_slot = peerSlot, local_head_slot = headSlot,
peer_score = peer.getScore(), peer = peer, topics = "syncman"
peer_score = peer.getScore(), peer = peer,
peer_speed = peer.netKbps(), topics = "syncman"
if peerSlot > wallSlot + man.toleranceValue:
# Our wall timer is broken, or peer's status information is invalid.
debug "Local timer is broken or peer's status information is invalid",
wall_clock_slot = wallSlot, remote_head_slot = peerSlot,
local_head_slot = headSlot, peer = peer,
tolerance_value = man.toleranceValue,
tolerance_value = man.toleranceValue, peer_speed = peer.netKbps(),
peer_score = peer.getScore(), topics = "syncman"
let failure = SyncFailure.init(SyncFailureKind.StatusInvalid, peer)
man.failures.add(failure)
@ -691,7 +696,8 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
# Peer's status information is very old, we going to update it.
debug "Updating peer's status information", wall_clock_slot = wallSlot,
remote_head_slot = peerSlot, local_head_slot = headSlot,
peer = peer, peer_score = peer.getScore(), topics = "syncman"
peer = peer, peer_score = peer.getScore(),
peer_speed = peer.netKbps(), topics = "syncman"
checkPeerScore peer:
let res = await peer.updateStatus()
@ -700,7 +706,7 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
peer.updateScore(PeerScoreNoStatus)
debug "Failed to get remote peer's status, exiting", peer = peer,
peer_score = peer.getScore(), peer_head_slot = peerSlot,
topics = "syncman"
peer_speed = peer.netKbps(), topics = "syncman"
let failure = SyncFailure.init(SyncFailureKind.StatusDownload, peer)
man.failures.add(failure)
break
@ -710,9 +716,9 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
peer.updateScore(PeerScoreStaleStatus)
debug "Peer's status information is stale, exiting",
wall_clock_slot = wallSlot, remote_old_head_slot = peerSlot,
local_head_slot = headSlot,
remote_new_head_slot = newPeerSlot,
peer = peer, peer_score = peer.getScore(), topics = "syncman"
local_head_slot = headSlot, remote_new_head_slot = newPeerSlot,
peer = peer, peer_score = peer.getScore(),
peer_speed = peer.netKbps(), topics = "syncman"
let failure = SyncFailure.init(SyncFailureKind.StatusStale, peer)
man.failures.add(failure)
break
@ -720,14 +726,16 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
debug "Peer's status information updated", wall_clock_slot = wallSlot,
remote_old_head_slot = peerSlot, local_head_slot = headSlot,
remote_new_head_slot = newPeerSlot, peer = peer,
peer_score = peer.getScore(), topics = "syncman"
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
topics = "syncman"
peer.updateScore(PeerScoreGoodStatus)
peerSlot = newPeerSlot
if (peerAge <= man.maxHeadAge) and (headAge <= man.maxHeadAge):
debug "We are in sync with peer, exiting", wall_clock_slot = wallSlot,
remote_head_slot = peerSlot, local_head_slot = headSlot,
peer = peer, peer_score = peer.getScore(), topics = "syncman"
peer = peer, peer_score = peer.getScore(),
peer_speed = peer.netKbps(), topics = "syncman"
break
let req = man.queue.pop(peerSlot, peer)
@ -737,12 +745,12 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
queue_input_slot = man.queue.inpSlot,
queue_output_slot = man.queue.outSlot,
queue_last_slot = man.queue.lastSlot,
peer_score = peer.getScore(), topics = "syncman"
peer_speed = peer.netKbps(), peer_score = peer.getScore(),
topics = "syncman"
# Sometimes when syncing is almost done but last requests are still
# pending, this can fall into endless cycle, when low number of peers
# are available in PeerPool. We going to wait for RESP_TIMEOUT time,
# so all pending requests should be finished at this moment.
checkPeerScore peer:
await sleepAsync(RESP_TIMEOUT)
@ -753,7 +761,7 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
debug "Creating new request for peer", wall_clock_slot = wallSlot,
remote_head_slot = peerSlot, local_head_slot = headSlot,
request_slot = req.slot, request_count = req.count,
request_step = req.step, peer = peer,
request_step = req.step, peer = peer, peer_speed = peer.netKbps(),
peer_score = peer.getScore(), topics = "syncman"
checkPeerScore peer:
@ -765,7 +773,8 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
debug "Received blocks on request", blocks_count = len(data),
blocks_map = smap, request_slot = req.slot,
request_count = req.count, request_step = req.step,
peer = peer, peer_score = peer.getScore(), topics = "syncman"
peer = peer, peer_score = peer.getScore(),
peer_speed = peer.netKbps(), topics = "syncman"
if not(checkResponse(req, data)):
peer.updateScore(PeerScoreBadResponse)
@ -773,7 +782,8 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
blocks_count = len(data), blocks_map = smap,
request_slot = req.slot, request_count = req.count,
request_step = req.step, peer = peer,
peer_score = peer.getScore(), topics = "syncman"
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
topics = "syncman"
let failure = SyncFailure.init(SyncFailureKind.BadResponse, peer)
man.failures.add(failure)
break
@ -789,7 +799,8 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
debug "Failed to receive blocks on request",
request_slot = req.slot, request_count = req.count,
request_step = req.step, peer = peer,
peer_score = peer.getScore(), topics = "syncman"
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
topics = "syncman"
let failure = SyncFailure.init(SyncFailureKind.BlockDownload, peer)
man.failures.add(failure)
break
@ -884,7 +895,8 @@ proc sync*[A, B](man: SyncManager[A, B]) {.async.} =
debug "Synchronization loop starting new worker", peer = peer,
wall_head_slot = wallSlot, local_head_slot = headSlot,
peer_score = peer.getScore(), topics = "syncman"
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
topics = "syncman"
temp.add(syncWorker(man, peer))
# We will create new `acquireFut` later.
@ -900,6 +912,7 @@ proc sync*[A, B](man: SyncManager[A, B]) {.async.} =
debug "Synchronization loop got worker finished",
wall_head_slot = wallSlot, local_head_slot = headSlot,
peer = peer, peer_score = peer.getScore(),
peer_speed = peer.netKbps(),
topics = "syncman"
else:
if fut == acquireFut: