Blocks ordering

This commit is contained in:
Yuriy Glukhov 2018-10-02 14:40:08 +03:00
parent 7f14c435d6
commit e64ab468ae
3 changed files with 67 additions and 14 deletions

View File

@ -258,15 +258,23 @@ proc open*(d: DiscoveryProtocol) =
let ta = initTAddress(d.address.ip, d.address.udpPort)
d.transp = newDatagramTransport(processClient, udata = d, local = ta)
proc lookupRandom*(d: DiscoveryProtocol): Future[seq[Node]] {.inline.} =
d.kademlia.lookupRandom()
proc run(d: DiscoveryProtocol) {.async.} =
while true:
discard await d.lookupRandom()
await sleepAsync(3000)
echo "Discovered nodes: ", d.kademlia.nodesDiscovered
proc bootstrap*(d: DiscoveryProtocol) {.async.} =
await d.kademlia.bootstrap(d.bootstrapNodes)
discard d.run()
proc resolve*(d: DiscoveryProtocol, n: NodeId): Future[Node] =
d.kademlia.resolve(n)
proc lookupRandom*(d: DiscoveryProtocol): Future[seq[Node]] {.inline.} =
d.kademlia.lookupRandom()
proc randomNodes*(d: DiscoveryProtocol, count: int): seq[Node] {.inline.} =
d.kademlia.randomNodes(count)

View File

@ -67,6 +67,7 @@ proc newNode*(enode: ENode): Node =
proc distanceTo(n: Node, id: NodeId): UInt256 = n.id xor id
proc `$`*(n: Node): string =
# "Node[" & $n.node & "]"
"Node[" & $n.node.address.ip & ":" & $n.node.address.udpPort & "]"
proc hash*(n: Node): hashes.Hash = hash(n.node.pubkey.data)
@ -479,6 +480,8 @@ proc randomNodes*(k: KademliaProtocol, count: int): seq[Node] =
result.add(node)
seen.incl(node)
proc nodesDiscovered*(k: KademliaProtocol): int {.inline.} = k.routing.len
when isMainModule:
proc randomNode(): Node =
newNode("enode://aa36fdf33dd030378a0168efe6ed7d5cc587fafa3cdd375854fe735a2e11ea3650ba29644e2db48368c46e1f60e716300ba49396cd63778bf8a818c09bded46f@13.93.211.84:30303")

View File

@ -162,7 +162,8 @@ type
WantedBlocksState = enum
Initial,
Requested,
Received
Received,
Persisted
WantedBlocks = object
startIndex: BlockNumber
@ -178,6 +179,7 @@ type
chain: AbstractChainDB
peerPool: PeerPool
trustedPeers: HashSet[Peer]
hasOutOfOrderBlocks: bool
proc endIndex(b: WantedBlocks): BlockNumber =
result = b.startIndex
@ -185,12 +187,13 @@ proc endIndex(b: WantedBlocks): BlockNumber =
proc availableWorkItem(ctx: SyncContext): int =
var maxPendingBlock = ctx.finalizedBlock
echo "queue len: ", ctx.workQueue.len
result = -1
for i in 0 .. ctx.workQueue.high:
case ctx.workQueue[i].state
of Initial:
return i
of Received:
of Persisted:
result = i
else:
discard
@ -211,23 +214,58 @@ proc availableWorkItem(ctx: SyncContext): int =
numBlocks = maxHeadersFetch
ctx.workQueue[result] = WantedBlocks(startIndex: nextRequestedBlock, numBlocks: numBlocks.uint, state: Initial)
proc persistWorkItem(ctx: SyncContext, wi: var WantedBlocks) =
ctx.chain.persistBlocks(wi.headers, wi.bodies)
wi.headers.setLen(0)
wi.bodies.setLen(0)
ctx.finalizedBlock = wi.endIndex
wi.state = Persisted
proc persistPendingWorkItems(ctx: SyncContext) =
var nextStartIndex = ctx.finalizedBlock + 1
var keepRunning = true
var hasOutOfOrderBlocks = false
debug "Looking for out of order blocks"
while keepRunning:
keepRunning = false
hasOutOfOrderBlocks = false
for i in 0 ..< ctx.workQueue.len:
let start = ctx.workQueue[i].startIndex
if ctx.workQueue[i].state == Received:
if start == nextStartIndex:
debug "Persisting pending work item", start
ctx.persistWorkItem(ctx.workQueue[i])
nextStartIndex = ctx.finalizedBlock + 1
keepRunning = true
break
else:
hasOutOfOrderBlocks = true
ctx.hasOutOfOrderBlocks = hasOutOfOrderBlocks
proc returnWorkItem(ctx: SyncContext, workItem: int) =
let wi = addr ctx.workQueue[workItem]
let askedBlocks = wi.numBlocks.int
let receivedBlocks = wi.headers.len
let start = wi.startIndex
if askedBlocks == receivedBlocks:
debug "Work item complete", startBlock = wi.startIndex,
debug "Work item complete", start,
askedBlocks,
receivedBlocks
else:
warn "Work item complete", startBlock = wi.startIndex,
warn "Work item complete", start,
askedBlocks,
receivedBlocks
ctx.chain.persistBlocks(wi.headers, wi.bodies)
wi.headers.setLen(0)
wi.bodies.setLen(0)
if wi.startIndex != ctx.finalizedBlock + 1:
info "Blocks out of order", start, final = ctx.finalizedBlock
ctx.hasOutOfOrderBlocks = true
else:
info "Persisting blocks", start
ctx.persistWorkItem(wi[])
if ctx.hasOutOfOrderBlocks:
ctx.persistPendingWorkItems()
proc newSyncContext(chain: AbstractChainDB, peerPool: PeerPool): SyncContext =
new result
@ -276,7 +314,6 @@ proc obtainBlocksFromPeer(syncCtx: SyncContext, peer: Peer) {.async.} =
try:
let results = await peer.getBlockHeaders(request)
if results.isSome:
workItem.state = Received
shallowCopy(workItem.headers, results.get.headers)
var bodies = newSeq[BlockBody]()
@ -292,8 +329,11 @@ proc obtainBlocksFromPeer(syncCtx: SyncContext, peer: Peer) {.async.} =
let b = await peer.getBlockBodies(hashes)
bodies.add(b.get.blocks)
shallowCopy(workItem.bodies, bodies)
dataReceived = true
if bodies.len == workItem.headers.len:
shallowCopy(workItem.bodies, bodies)
dataReceived = true
else:
warn "Bodies len != headers.len", bodies = bodies.len, headers = workItem.headers.len
except:
# the success case uses `continue`, so we can just fall back to the
# failure path below. If we signal time-outs with exceptions such
@ -301,8 +341,10 @@ proc obtainBlocksFromPeer(syncCtx: SyncContext, peer: Peer) {.async.} =
discard
if dataReceived:
workItem.state = Received
syncCtx.returnWorkItem workItemIdx
else:
workItem.state = Initial
try:
await peer.disconnect(SubprotocolReason)
except:
@ -310,7 +352,7 @@ proc obtainBlocksFromPeer(syncCtx: SyncContext, peer: Peer) {.async.} =
syncCtx.handleLostPeer()
break
debug "Nothing to sync"
debug "Fininshed otaining blocks", peer
proc peersAgreeOnChain(a, b: Peer): Future[bool] {.async.} =
# Returns true if one of the peers acknowledges existense of the best block