This commit is contained in:
Zahary Karadjov 2019-01-05 16:03:17 +02:00
parent 7c185b1731
commit bf1147291b
5 changed files with 46 additions and 48 deletions

View File

@ -24,7 +24,7 @@ requires "nim > 0.18.0",
"json_serialization"
proc runTest(name: string, defs = "", lang = "c") =
exec "nim " & lang & " " & defs & " -d:testing --experimental:ForLoopMacros -r tests/" & name
exec "nim " & lang & " " & defs & " -d:testing --threads:on --experimental:ForLoopMacros -r tests/" & name
task test, "Runs the test suite":
runTest "all_tests"

View File

@ -4,7 +4,7 @@ import
private/types, rlpx, ../eth_p2p
type
Action = proc (p: Peer, data: Rlp): Future[void]
Action = proc (p: Peer, data: Rlp): Future[void] {.gcsafe.}
ProtocolMessagePair = object
protocol: ProtocolInfo
@ -87,10 +87,10 @@ proc expectationViolationMsg(mock: MockConf,
result.add "\n"
proc addProtocol(mock: MockConf, p: ProtocolInfo): ProtocolInfo =
new result
result = create ProtocolInfoObj
deepCopy(result[], p[])
proc incomingMsgHandler(p: Peer, receivedMsgId: int, rlp: Rlp): Future[void] =
proc incomingMsgHandler(p: Peer, receivedMsgId: int, rlp: Rlp): Future[void] {.gcsafe.} =
let (receivedMsgProto, receivedMsgInfo) = p.getMsgMetadata(receivedMsgId)
let expectedMsgIdx = mock.receivedMsgsCount

View File

@ -58,8 +58,8 @@ type
observers*: Table[int, PeerObserver]
PeerObserver* = object
onPeerConnected*: proc(p: Peer)
onPeerDisconnected*: proc(p: Peer)
onPeerConnected*: proc(p: Peer) {.gcsafe.}
onPeerDisconnected*: proc(p: Peer) {.gcsafe.}
Capability* = object
name*: string
@ -80,7 +80,7 @@ type
## Quasy-private types. Use at your own risk.
##
ProtocolInfo* = ref object
ProtocolInfoObj* = object
name*: string
version*: int
messages*: seq[MessageInfo]
@ -93,6 +93,8 @@ type
handshake*: HandshakeStep
disconnectHandler*: DisconnectionHandler
ProtocolInfo* = ptr ProtocolInfoObj
MessageInfo* = object
id*: int
name*: string
@ -105,8 +107,8 @@ type
Dispatcher* = ref object # private
# The dispatcher stores the mapping of negotiated message IDs between
# two connected peers. The dispatcher objects are shared between
# connections running with the same set of supported protocols.
# two connected peers. The dispatcher may be shared between connections
# running with the same set of supported protocols.
#
# `protocolOffsets` will hold one slot of each locally supported
# protocol. If the other peer also supports the protocol, the stored
@ -131,13 +133,13 @@ type
# Private types:
MessageHandlerDecorator* = proc(msgId: int, n: NimNode): NimNode
MessageHandler* = proc(x: Peer, msgId: int, data: Rlp): Future[void]
MessageContentPrinter* = proc(msg: pointer): string
RequestResolver* = proc(msg: pointer, future: FutureBase)
NextMsgResolver* = proc(msgData: Rlp, future: FutureBase)
PeerStateInitializer* = proc(peer: Peer): RootRef
NetworkStateInitializer* = proc(network: EthereumNode): RootRef
HandshakeStep* = proc(peer: Peer): Future[void]
MessageHandler* = proc(x: Peer, msgId: int, data: Rlp): Future[void] {.gcsafe.}
MessageContentPrinter* = proc(msg: pointer): string {.gcsafe.}
RequestResolver* = proc(msg: pointer, future: FutureBase) {.gcsafe.}
NextMsgResolver* = proc(msgData: Rlp, future: FutureBase) {.gcsafe.}
PeerStateInitializer* = proc(peer: Peer): RootRef {.gcsafe.}
NetworkStateInitializer* = proc(network: EthereumNode): RootRef {.gcsafe.}
HandshakeStep* = proc(peer: Peer): Future[void] {.gcsafe.}
DisconnectionHandler* = proc(peer: Peer,
reason: DisconnectionReason): Future[void] {.gcsafe.}

View File

@ -29,7 +29,6 @@ when tracingEnabled:
var
gProtocols: seq[ProtocolInfo]
gDispatchers = initSet[Dispatcher]()
gDevp2pInfo: ProtocolInfo
# The variables above are immutable RTTI information. We need to tell
@ -43,7 +42,7 @@ proc newFuture[T](location: var Future[T]) =
proc `$`*(p: Peer): string {.inline.} =
$p.remote
proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = true) {.async.}
proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = true) {.gcsafe, async.}
template raisePeerDisconnected(msg: string, r: DisconnectionReason) =
var e = newException(PeerDisconnected, msg)
@ -81,7 +80,7 @@ proc getDispatcher(node: EthereumNode,
# https://github.com/nim-lang/Nim/issues/7457
# We should be able to find an existing dispatcher without allocating a new one
new(result)
new result
newSeq(result.protocolOffsets, allProtocols.len)
result.protocolOffsets.fill -1
@ -97,24 +96,19 @@ proc getDispatcher(node: EthereumNode,
nextUserMsgId += localProtocol.messages.len
break findMatchingProtocol
if result in gDispatchers:
return gDispatchers[result]
else:
template copyTo(src, dest; index: int) =
for i in 0 ..< src.len:
dest[index + i] = addr src[i]
template copyTo(src, dest; index: int) =
for i in 0 ..< src.len:
dest[index + i] = addr src[i]
result.messages = newSeq[ptr MessageInfo](nextUserMsgId)
devp2pInfo.messages.copyTo(result.messages, 0)
result.messages = newSeq[ptr MessageInfo](nextUserMsgId)
devp2pInfo.messages.copyTo(result.messages, 0)
for localProtocol in node.protocols:
let idx = localProtocol.index
if result.protocolOffsets[idx] != -1:
result.activeProtocols.add localProtocol
localProtocol.messages.copyTo(result.messages,
result.protocolOffsets[idx])
gDispatchers.incl result
for localProtocol in node.protocols:
let idx = localProtocol.index
if result.protocolOffsets[idx] != -1:
result.activeProtocols.add localProtocol
localProtocol.messages.copyTo(result.messages,
result.protocolOffsets[idx])
proc getMsgName*(peer: Peer, msgId: int): string =
if not peer.dispatcher.isNil and
@ -147,7 +141,7 @@ proc getMsgMetadata*(peer: Peer, msgId: int): (ProtocolInfo, ptr MessageInfo) =
proc newProtocol(name: string, version: int,
peerInit: PeerStateInitializer,
networkInit: NetworkStateInitializer): ProtocolInfo =
new result
result = create ProtocolInfoObj
result.name = name
result.version = version
result.messages = @[]
@ -211,7 +205,7 @@ proc requestResolver[MsgType](msg: pointer, future: FutureBase) =
exc = getCurrentException().name,
err = getCurrentExceptionMsg()
proc registerMsg(protocol: var ProtocolInfo,
proc registerMsg(protocol: ProtocolInfo,
id: int, name: string,
thunk: MessageHandler,
printer: MessageContentPrinter,
@ -285,7 +279,7 @@ template compressMsg(peer: Peer, data: Bytes): Bytes =
else:
data
proc sendMsg*(peer: Peer, data: Bytes) {.async.} =
proc sendMsg*(peer: Peer, data: Bytes) {.gcsafe, async.} =
var cipherText = encryptMsg(peer.compressMsg(data), peer.secretsState)
try:
@ -379,11 +373,12 @@ proc resolveResponseFuture(peer: Peer, msgId: int, msg: pointer, reqId: int) =
template req: auto = outstandingReqs()[idx]
if req.future.finished:
assert req.timeoutAt < fastEpochTime()
assert req.timeoutAt <= fastEpochTime()
# Here we'll remove the expired request by swapping
# it with the last one in the deque (if necessary):
if idx != outstandingReqs.len - 1:
req = outstandingReqs.popLast
continue
else:
outstandingReqs.shrink(fromLast = 1)
# This was the last item, so we don't have any
@ -573,7 +568,7 @@ template networkState*(connection: Peer, Protocol: type): untyped =
## particular connection.
protocolState(connection.network, Protocol)
proc initProtocolState*[T](state: T, x: Peer|EthereumNode) = discard
proc initProtocolState*[T](state: T, x: Peer|EthereumNode) {.gcsafe.} = discard
proc createPeerState[ProtocolState](peer: Peer): RootRef =
var res = new ProtocolState
@ -581,7 +576,7 @@ proc createPeerState[ProtocolState](peer: Peer): RootRef =
initProtocolState(res, peer)
return cast[RootRef](res)
proc createNetworkState[NetworkState](network: EthereumNode): RootRef =
proc createNetworkState[NetworkState](network: EthereumNode): RootRef {.gcsafe.} =
var res = new NetworkState
mixin initProtocolState
initProtocolState(res, network)
@ -688,6 +683,7 @@ macro p2pProtocolImpl(name: static[string],
of rlpxResponse: userHandlerProc.applyDecorator incomingResponseDecorator
else: discard
userHandlerProc.addPragma ident"gcsafe"
userHandlerProc.addPragma ident"async"
# We allow the user handler to use `openarray` params, but we turn
@ -882,7 +878,7 @@ macro p2pProtocolImpl(name: static[string],
let thunkName = ident(msgName & "_thunk")
var thunkProc = quote do:
proc `thunkName`(`msgSender`: `Peer`, _: int, data: Rlp) =
proc `thunkName`(`msgSender`: `Peer`, _: int, data: Rlp) {.gcsafe.} =
var `receivedRlp` = data
var `receivedMsg` {.noinit.}: `msgRecord`
`readParamsPrelude`
@ -917,6 +913,7 @@ macro p2pProtocolImpl(name: static[string],
var msgSendProc = n
# TODO: check that the first param has the correct type
msgSendProc.params[1][0] = msgRecipient
msgSendProc.addPragma ident"gcsafe"
# Add a timeout parameter for all request procs
case msgKind
@ -1089,7 +1086,6 @@ macro p2pProtocolImpl(name: static[string],
setEventHandlers(`protocol`, `handshake`, `disconnectHandler`)
result.add newCall(bindSym("registerProtocol"), protocol)
when isMainModule: echo repr(result)
when defined(debugRlpxProtocol) or defined(debugMacros):
echo repr(result)

View File

@ -100,7 +100,7 @@ type
## XXX: really big messages can cause excessive mem usage when using msg \
## count
FilterMsgHandler* = proc(msg: ReceivedMessage) {.closure.}
FilterMsgHandler* = proc(msg: ReceivedMessage) {.gcsafe, closure.}
Filter* = object
src: Option[PublicKey]
@ -581,7 +581,7 @@ proc subscribeFilter*(filters: var Filters, filter: Filter,
debug "Filter added", filter = id
return id
proc notify*(filters: var Filters, msg: Message) =
proc notify*(filters: var Filters, msg: Message) {.gcsafe.} =
var decoded: Option[DecodedPayload]
var keyHash: Hash
@ -667,10 +667,10 @@ type
filters*: Filters
config*: WhisperConfig
proc run(peer: Peer) {.async.}
proc run(node: EthereumNode, network: WhisperNetwork) {.async.}
proc run(peer: Peer) {.gcsafe, async.}
proc run(node: EthereumNode, network: WhisperNetwork) {.gcsafe, async.}
proc initProtocolState*(network: WhisperNetwork, node: EthereumNode) =
proc initProtocolState*(network: WhisperNetwork, node: EthereumNode) {.gcsafe.} =
network.queue = initQueue(defaultQueueCapacity)
network.filters = initTable[string, Filter]()
network.config.bloom = fullBloom()