Add join(Peer) procedure.

This commit is contained in:
cheatfate 2019-06-05 11:15:49 +03:00
parent d2d6d7fc16
commit 6d9f3662ed
No known key found for this signature in database
GPG Key ID: 46ADD633A7201F95
2 changed files with 13 additions and 2 deletions

View File

@ -38,6 +38,7 @@ type
protocolStates*: seq[RootRef]
outstandingRequests*: seq[Deque[OutstandingRequest]]
awaitedMessages*: seq[FutureBase]
messageLoop*: Future[void]
when useSnappy:
snappyEnabled*: bool

View File

@ -255,6 +255,16 @@ proc supports*(peer: Peer, Protocol: type): bool {.inline.} =
template perPeerMsgId(peer: Peer, MsgType: type): int =
perPeerMsgIdImpl(peer, MsgType.msgProtocol.protocolInfo, MsgType.msgId)
proc join*(peer: Peer): Future[void] =
## Wait until `peer` is alive.
var retFuture = newFuture[void]("rlpx.peer.join")
proc continuation(udata: pointer) = retFuture.complete()
if not peer.messageLoop.finished:
peer.messageLoop.addCallback(continuation)
else:
retFuture.complete()
return retFuture
proc writeMsgId(p: ProtocolInfo, msgId: int, peer: Peer,
rlpOut: var RlpWriter) =
let baseMsgId = peer.dispatcher.protocolOffsets[p.index]
@ -1277,9 +1287,9 @@ proc postHelloSteps(peer: Peer, h: devp2p.hello) {.async.} =
# the other peer may arrrive too early and be processed before
# the handshake code got a change to wait for them.
#
var messageProcessingLoop = peer.dispatchMessages()
peer.messageLoop = peer.dispatchMessages()
messageProcessingLoop.callback = proc(p: pointer) {.gcsafe.} =
peer.messageLoop.callback = proc(p: pointer) {.gcsafe.} =
if messageProcessingLoop.failed:
debug "Ending dispatchMessages loop", peer,
err = messageProcessingLoop.error.msg