diff --git a/eth_p2p.nimble b/eth_p2p.nimble index 5e49bae..a028367 100644 --- a/eth_p2p.nimble +++ b/eth_p2p.nimble @@ -24,7 +24,7 @@ requires "nim > 0.18.0", "json_serialization" proc runTest(name: string, defs = "", lang = "c") = - exec "nim " & lang & " " & defs & " -d:testing --experimental:ForLoopMacros -r tests/" & name + exec "nim " & lang & " " & defs & " -d:testing --threads:on --experimental:ForLoopMacros -r tests/" & name task test, "Runs the test suite": runTest "all_tests" diff --git a/eth_p2p/mock_peers.nim b/eth_p2p/mock_peers.nim index 2a41c00..a76534a 100644 --- a/eth_p2p/mock_peers.nim +++ b/eth_p2p/mock_peers.nim @@ -4,7 +4,7 @@ import private/types, rlpx, ../eth_p2p type - Action = proc (p: Peer, data: Rlp): Future[void] + Action = proc (p: Peer, data: Rlp): Future[void] {.gcsafe.} ProtocolMessagePair = object protocol: ProtocolInfo @@ -87,10 +87,10 @@ proc expectationViolationMsg(mock: MockConf, result.add "\n" proc addProtocol(mock: MockConf, p: ProtocolInfo): ProtocolInfo = - new result + result = create ProtocolInfoObj deepCopy(result[], p[]) - proc incomingMsgHandler(p: Peer, receivedMsgId: int, rlp: Rlp): Future[void] = + proc incomingMsgHandler(p: Peer, receivedMsgId: int, rlp: Rlp): Future[void] {.gcsafe.} = let (receivedMsgProto, receivedMsgInfo) = p.getMsgMetadata(receivedMsgId) let expectedMsgIdx = mock.receivedMsgsCount diff --git a/eth_p2p/private/types.nim b/eth_p2p/private/types.nim index 6569293..6861ce3 100644 --- a/eth_p2p/private/types.nim +++ b/eth_p2p/private/types.nim @@ -58,8 +58,8 @@ type observers*: Table[int, PeerObserver] PeerObserver* = object - onPeerConnected*: proc(p: Peer) - onPeerDisconnected*: proc(p: Peer) + onPeerConnected*: proc(p: Peer) {.gcsafe.} + onPeerDisconnected*: proc(p: Peer) {.gcsafe.} Capability* = object name*: string @@ -80,7 +80,7 @@ type ## Quasy-private types. Use at your own risk. ## - ProtocolInfo* = ref object + ProtocolInfoObj* = object name*: string version*: int messages*: seq[MessageInfo] @@ -93,6 +93,8 @@ type handshake*: HandshakeStep disconnectHandler*: DisconnectionHandler + ProtocolInfo* = ptr ProtocolInfoObj + MessageInfo* = object id*: int name*: string @@ -105,8 +107,8 @@ type Dispatcher* = ref object # private # The dispatcher stores the mapping of negotiated message IDs between - # two connected peers. The dispatcher objects are shared between - # connections running with the same set of supported protocols. + # two connected peers. The dispatcher may be shared between connections + # running with the same set of supported protocols. # # `protocolOffsets` will hold one slot of each locally supported # protocol. If the other peer also supports the protocol, the stored @@ -131,13 +133,13 @@ type # Private types: MessageHandlerDecorator* = proc(msgId: int, n: NimNode): NimNode - MessageHandler* = proc(x: Peer, msgId: int, data: Rlp): Future[void] - MessageContentPrinter* = proc(msg: pointer): string - RequestResolver* = proc(msg: pointer, future: FutureBase) - NextMsgResolver* = proc(msgData: Rlp, future: FutureBase) - PeerStateInitializer* = proc(peer: Peer): RootRef - NetworkStateInitializer* = proc(network: EthereumNode): RootRef - HandshakeStep* = proc(peer: Peer): Future[void] + MessageHandler* = proc(x: Peer, msgId: int, data: Rlp): Future[void] {.gcsafe.} + MessageContentPrinter* = proc(msg: pointer): string {.gcsafe.} + RequestResolver* = proc(msg: pointer, future: FutureBase) {.gcsafe.} + NextMsgResolver* = proc(msgData: Rlp, future: FutureBase) {.gcsafe.} + PeerStateInitializer* = proc(peer: Peer): RootRef {.gcsafe.} + NetworkStateInitializer* = proc(network: EthereumNode): RootRef {.gcsafe.} + HandshakeStep* = proc(peer: Peer): Future[void] {.gcsafe.} DisconnectionHandler* = proc(peer: Peer, reason: DisconnectionReason): Future[void] {.gcsafe.} diff --git a/eth_p2p/rlpx.nim b/eth_p2p/rlpx.nim index 5bee23b..ee0329b 100644 --- a/eth_p2p/rlpx.nim +++ b/eth_p2p/rlpx.nim @@ -29,7 +29,6 @@ when tracingEnabled: var gProtocols: seq[ProtocolInfo] - gDispatchers = initSet[Dispatcher]() gDevp2pInfo: ProtocolInfo # The variables above are immutable RTTI information. We need to tell @@ -43,7 +42,7 @@ proc newFuture[T](location: var Future[T]) = proc `$`*(p: Peer): string {.inline.} = $p.remote -proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = true) {.async.} +proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = true) {.gcsafe, async.} template raisePeerDisconnected(msg: string, r: DisconnectionReason) = var e = newException(PeerDisconnected, msg) @@ -81,7 +80,7 @@ proc getDispatcher(node: EthereumNode, # https://github.com/nim-lang/Nim/issues/7457 # We should be able to find an existing dispatcher without allocating a new one - new(result) + new result newSeq(result.protocolOffsets, allProtocols.len) result.protocolOffsets.fill -1 @@ -97,24 +96,19 @@ proc getDispatcher(node: EthereumNode, nextUserMsgId += localProtocol.messages.len break findMatchingProtocol - if result in gDispatchers: - return gDispatchers[result] - else: - template copyTo(src, dest; index: int) = - for i in 0 ..< src.len: - dest[index + i] = addr src[i] + template copyTo(src, dest; index: int) = + for i in 0 ..< src.len: + dest[index + i] = addr src[i] - result.messages = newSeq[ptr MessageInfo](nextUserMsgId) - devp2pInfo.messages.copyTo(result.messages, 0) + result.messages = newSeq[ptr MessageInfo](nextUserMsgId) + devp2pInfo.messages.copyTo(result.messages, 0) - for localProtocol in node.protocols: - let idx = localProtocol.index - if result.protocolOffsets[idx] != -1: - result.activeProtocols.add localProtocol - localProtocol.messages.copyTo(result.messages, - result.protocolOffsets[idx]) - - gDispatchers.incl result + for localProtocol in node.protocols: + let idx = localProtocol.index + if result.protocolOffsets[idx] != -1: + result.activeProtocols.add localProtocol + localProtocol.messages.copyTo(result.messages, + result.protocolOffsets[idx]) proc getMsgName*(peer: Peer, msgId: int): string = if not peer.dispatcher.isNil and @@ -147,7 +141,7 @@ proc getMsgMetadata*(peer: Peer, msgId: int): (ProtocolInfo, ptr MessageInfo) = proc newProtocol(name: string, version: int, peerInit: PeerStateInitializer, networkInit: NetworkStateInitializer): ProtocolInfo = - new result + result = create ProtocolInfoObj result.name = name result.version = version result.messages = @[] @@ -211,7 +205,7 @@ proc requestResolver[MsgType](msg: pointer, future: FutureBase) = exc = getCurrentException().name, err = getCurrentExceptionMsg() -proc registerMsg(protocol: var ProtocolInfo, +proc registerMsg(protocol: ProtocolInfo, id: int, name: string, thunk: MessageHandler, printer: MessageContentPrinter, @@ -285,7 +279,7 @@ template compressMsg(peer: Peer, data: Bytes): Bytes = else: data -proc sendMsg*(peer: Peer, data: Bytes) {.async.} = +proc sendMsg*(peer: Peer, data: Bytes) {.gcsafe, async.} = var cipherText = encryptMsg(peer.compressMsg(data), peer.secretsState) try: @@ -379,11 +373,12 @@ proc resolveResponseFuture(peer: Peer, msgId: int, msg: pointer, reqId: int) = template req: auto = outstandingReqs()[idx] if req.future.finished: - assert req.timeoutAt < fastEpochTime() + assert req.timeoutAt <= fastEpochTime() # Here we'll remove the expired request by swapping # it with the last one in the deque (if necessary): if idx != outstandingReqs.len - 1: req = outstandingReqs.popLast + continue else: outstandingReqs.shrink(fromLast = 1) # This was the last item, so we don't have any @@ -573,7 +568,7 @@ template networkState*(connection: Peer, Protocol: type): untyped = ## particular connection. protocolState(connection.network, Protocol) -proc initProtocolState*[T](state: T, x: Peer|EthereumNode) = discard +proc initProtocolState*[T](state: T, x: Peer|EthereumNode) {.gcsafe.} = discard proc createPeerState[ProtocolState](peer: Peer): RootRef = var res = new ProtocolState @@ -581,7 +576,7 @@ proc createPeerState[ProtocolState](peer: Peer): RootRef = initProtocolState(res, peer) return cast[RootRef](res) -proc createNetworkState[NetworkState](network: EthereumNode): RootRef = +proc createNetworkState[NetworkState](network: EthereumNode): RootRef {.gcsafe.} = var res = new NetworkState mixin initProtocolState initProtocolState(res, network) @@ -688,6 +683,7 @@ macro p2pProtocolImpl(name: static[string], of rlpxResponse: userHandlerProc.applyDecorator incomingResponseDecorator else: discard + userHandlerProc.addPragma ident"gcsafe" userHandlerProc.addPragma ident"async" # We allow the user handler to use `openarray` params, but we turn @@ -882,7 +878,7 @@ macro p2pProtocolImpl(name: static[string], let thunkName = ident(msgName & "_thunk") var thunkProc = quote do: - proc `thunkName`(`msgSender`: `Peer`, _: int, data: Rlp) = + proc `thunkName`(`msgSender`: `Peer`, _: int, data: Rlp) {.gcsafe.} = var `receivedRlp` = data var `receivedMsg` {.noinit.}: `msgRecord` `readParamsPrelude` @@ -917,6 +913,7 @@ macro p2pProtocolImpl(name: static[string], var msgSendProc = n # TODO: check that the first param has the correct type msgSendProc.params[1][0] = msgRecipient + msgSendProc.addPragma ident"gcsafe" # Add a timeout parameter for all request procs case msgKind @@ -1089,7 +1086,6 @@ macro p2pProtocolImpl(name: static[string], setEventHandlers(`protocol`, `handshake`, `disconnectHandler`) result.add newCall(bindSym("registerProtocol"), protocol) - when isMainModule: echo repr(result) when defined(debugRlpxProtocol) or defined(debugMacros): echo repr(result) diff --git a/eth_p2p/rlpx_protocols/whisper_protocol.nim b/eth_p2p/rlpx_protocols/whisper_protocol.nim index a114593..46ae403 100644 --- a/eth_p2p/rlpx_protocols/whisper_protocol.nim +++ b/eth_p2p/rlpx_protocols/whisper_protocol.nim @@ -100,7 +100,7 @@ type ## XXX: really big messages can cause excessive mem usage when using msg \ ## count - FilterMsgHandler* = proc(msg: ReceivedMessage) {.closure.} + FilterMsgHandler* = proc(msg: ReceivedMessage) {.gcsafe, closure.} Filter* = object src: Option[PublicKey] @@ -581,7 +581,7 @@ proc subscribeFilter*(filters: var Filters, filter: Filter, debug "Filter added", filter = id return id -proc notify*(filters: var Filters, msg: Message) = +proc notify*(filters: var Filters, msg: Message) {.gcsafe.} = var decoded: Option[DecodedPayload] var keyHash: Hash @@ -667,10 +667,10 @@ type filters*: Filters config*: WhisperConfig -proc run(peer: Peer) {.async.} -proc run(node: EthereumNode, network: WhisperNetwork) {.async.} +proc run(peer: Peer) {.gcsafe, async.} +proc run(node: EthereumNode, network: WhisperNetwork) {.gcsafe, async.} -proc initProtocolState*(network: WhisperNetwork, node: EthereumNode) = +proc initProtocolState*(network: WhisperNetwork, node: EthereumNode) {.gcsafe.} = network.queue = initQueue(defaultQueueCapacity) network.filters = initTable[string, Filter]() network.config.bloom = fullBloom()