From 537e3bc27d97cdf1bf63a913b1572ea8c36519cf Mon Sep 17 00:00:00 2001 From: Giovanni Petrantoni Date: Tue, 11 Aug 2020 02:06:48 +0900 Subject: [PATCH] make a easy macro to wrap procs, add more keys --- libp2p/errors.nim | 31 ++++++++++++++++++++++- libp2p/multistream.nim | 5 ++-- libp2p/protocols/pubsub/gossipsub.nim | 36 +++++++++++---------------- libp2p/protocols/pubsub/pubsub.nim | 10 ++++---- libp2p/stream/lpstream.nim | 11 ++++---- 5 files changed, 59 insertions(+), 34 deletions(-) diff --git a/libp2p/errors.nim b/libp2p/errors.nim index 080a86b5f..158177579 100644 --- a/libp2p/errors.nim +++ b/libp2p/errors.nim @@ -96,6 +96,34 @@ when defined(profiler_optick): defer: popEvent(ev) + proc getName(node: NimNode): string {.compileTime.} = + case node.kind + of nnkSym: + return $node + of nnkPostfix: + return node[1].strVal + of nnkIdent: + return node.strVal + of nnkEmpty: + return "anonymous" + else: + error("Unknown name.") + + macro profiled*(p: untyped): untyped = + let name = p.name.getName() + var code = newStmtList() + var keySym = genSym(nskLet) + let prefix = quote do: + {.gcsafe.}: + const pos = instantiationInfo() + let event_desc {.global.} = createEvent(`name`.cstring, `name`.len.uint16, pos.filename.cstring, pos.filename.len.uint16, pos.line.uint32) + let `keySym` = pushEvent(event_desc) + defer: + popEvent(`keySym`) + # inject our code + p[6].insert(0, prefix) + p + proc load() = var candidates: seq[string] libCandidates("OptickCore", candidates) @@ -136,4 +164,5 @@ when defined(profiler_optick): asyncCheck frameTicker() # asyncCheck pollHook() else: - template profile*(name: string): untyped = discard \ No newline at end of file + macro profiled*(p: untyped): untyped = + p \ No newline at end of file diff --git a/libp2p/multistream.nim b/libp2p/multistream.nim index c74a81fd8..d6f1ed876 100644 --- a/libp2p/multistream.nim +++ b/libp2p/multistream.nim @@ -11,7 +11,8 @@ import strutils import chronos, chronicles, stew/byteutils import stream/connection, vbuffer, - protocols/protocol + protocols/protocol, + errors logScope: topics = "multistream" @@ -49,7 +50,7 @@ template validateSuffix(str: string): untyped = proc select*(m: MultistreamSelect, conn: Connection, proto: seq[string]): - Future[string] {.async.} = + Future[string] {.async, profiled.} = trace "initiating handshake", codec = m.codec ## select a remote protocol await conn.write(m.codec) # write handshake diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index db3f0bc84..864ee95ac 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -82,7 +82,7 @@ method init*(g: GossipSub) = g.handler = handler g.codec = GossipSubCodec -proc replenishFanout(g: GossipSub, topic: string) = +proc replenishFanout(g: GossipSub, topic: string) {.profiled.} = ## get fanout peers for a topic trace "about to replenish fanout" @@ -100,9 +100,7 @@ proc replenishFanout(g: GossipSub, topic: string) = trace "fanout replenished with peers", peers = g.fanout.peers(topic) -proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = - profile "rebalanceMesh" - +proc rebalanceMesh(g: GossipSub, topic: string) {.async, profiled.} = logScope: topic @@ -164,7 +162,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = trace "mesh balanced, got peers", peers = g.mesh.peers(topic) -proc dropFanoutPeers(g: GossipSub) = +proc dropFanoutPeers(g: GossipSub) {.profiled.} = # drop peers that we haven't published to in # GossipSubFanoutTTL seconds let now = Moment.now() @@ -179,7 +177,7 @@ proc dropFanoutPeers(g: GossipSub) = libp2p_gossipsub_peers_per_topic_fanout .set(g.fanout.peers(topic).int64, labelValues = [topic]) -proc getGossipPeers(g: GossipSub): Table[PubSubPeer, ControlMessage] {.gcsafe.} = +proc getGossipPeers(g: GossipSub): Table[PubSubPeer, ControlMessage] {.gcsafe, profiled.} = ## gossip iHave messages to peers ## @@ -247,7 +245,7 @@ proc heartbeat(g: GossipSub) {.async.} = await sleepAsync(GossipSubHeartbeatInterval) -method unsubscribePeer*(g: GossipSub, peer: PeerID) = +method unsubscribePeer*(g: GossipSub, peer: PeerID) {.profiled.} = ## handle peer disconnects ## @@ -282,7 +280,7 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) = method subscribeTopic*(g: GossipSub, topic: string, subscribe: bool, - peerId: PeerID) {.gcsafe, async.} = + peerId: PeerID) {.gcsafe, async, profiled.} = await procCall FloodSub(g).subscribeTopic(topic, subscribe, peerId) logScope: @@ -323,7 +321,7 @@ method subscribeTopic*(g: GossipSub, proc handleGraft(g: GossipSub, peer: PubSubPeer, - grafts: seq[ControlGraft]): seq[ControlPrune] = + grafts: seq[ControlGraft]): seq[ControlPrune] {.profiled.} = for graft in grafts: let topic = graft.topicID logScope: @@ -356,7 +354,7 @@ proc handleGraft(g: GossipSub, libp2p_gossipsub_peers_per_topic_fanout .set(g.fanout.peers(topic).int64, labelValues = [topic]) -proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) = +proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.profiled.} = for prune in prunes: trace "peer pruned topic", peer = peer.id, topic = prune.topicID @@ -367,7 +365,7 @@ proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) = proc handleIHave(g: GossipSub, peer: PubSubPeer, - ihaves: seq[ControlIHave]): ControlIWant = + ihaves: seq[ControlIHave]): ControlIWant {.profiled.} = for ihave in ihaves: trace "peer sent ihave", peer = peer.id, topic = ihave.topicID, msgs = ihave.messageIDs @@ -379,7 +377,7 @@ proc handleIHave(g: GossipSub, proc handleIWant(g: GossipSub, peer: PubSubPeer, - iwants: seq[ControlIWant]): seq[Message] = + iwants: seq[ControlIWant]): seq[Message] {.profiled.} = for iwant in iwants: for mid in iwant.messageIDs: trace "peer sent iwant", peer = peer.id, messageID = mid @@ -389,9 +387,7 @@ proc handleIWant(g: GossipSub, method rpcHandler*(g: GossipSub, peer: PubSubPeer, - rpcMsgs: seq[RPCMsg]) {.async.} = - profile "rpcHandler" - + rpcMsgs: seq[RPCMsg]) {.async, profiled.} = await procCall PubSub(g).rpcHandler(peer, rpcMsgs) for m in rpcMsgs: # for all RPC messages @@ -467,12 +463,12 @@ method rpcHandler*(g: GossipSub, method subscribe*(g: GossipSub, topic: string, - handler: TopicHandler) {.async.} = + handler: TopicHandler) {.async, profiled.} = await procCall PubSub(g).subscribe(topic, handler) await g.rebalanceMesh(topic) method unsubscribe*(g: GossipSub, - topics: seq[TopicPair]) {.async.} = + topics: seq[TopicPair]) {.async, profiled.} = await procCall PubSub(g).unsubscribe(topics) for (topic, handler) in topics: @@ -485,7 +481,7 @@ method unsubscribe*(g: GossipSub, let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)]))) discard g.broadcast(peers, prune, DefaultSendTimeout) -method unsubscribeAll*(g: GossipSub, topic: string) {.async.} = +method unsubscribeAll*(g: GossipSub, topic: string) {.async, profiled.} = await procCall PubSub(g).unsubscribeAll(topic) if topic in g.mesh: @@ -498,9 +494,7 @@ method unsubscribeAll*(g: GossipSub, topic: string) {.async.} = method publish*(g: GossipSub, topic: string, data: seq[byte], - timeout: Duration = InfiniteDuration): Future[int] {.async.} = - profile "publish" - + timeout: Duration = InfiniteDuration): Future[int] {.async, profiled.} = # base returns always 0 discard await procCall PubSub(g).publish(topic, data, timeout) trace "publishing message on topic", topic, data = data.shortLog diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 475559537..e934c62d7 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -110,7 +110,7 @@ proc broadcast*(p: PubSub, proc sendSubs*(p: PubSub, peer: PubSubPeer, topics: seq[string], - subscribe: bool) {.async.} = + subscribe: bool) {.async, profiled.} = ## send subscriptions to remote peer discard await p.broadcast([peer], RPCMsg(subscriptions: topics.mapIt(SubOpts(subscribe: subscribe, topic: it))), DefaultSendTimeout) @@ -137,7 +137,7 @@ method rpcHandler*(p: PubSub, proc getOrCreatePeer*( p: PubSub, peer: PeerID, - proto: string): PubSubPeer = + proto: string): PubSubPeer {.profiled.} = if peer in p.peers: return p.peers[peer] @@ -289,7 +289,7 @@ method stop*(p: PubSub) {.async, base.} = method addValidator*(p: PubSub, topic: varargs[string], - hook: ValidatorHandler) {.base.} = + hook: ValidatorHandler) {.base, profiled.} = for t in topic: if t notin p.validators: p.validators[t] = initHashSet[ValidatorHandler]() @@ -299,12 +299,12 @@ method addValidator*(p: PubSub, method removeValidator*(p: PubSub, topic: varargs[string], - hook: ValidatorHandler) {.base.} = + hook: ValidatorHandler) {.base, profiled.} = for t in topic: if t in p.validators: p.validators[t].excl(hook) -method validate*(p: PubSub, message: Message): Future[bool] {.async, base.} = +method validate*(p: PubSub, message: Message): Future[bool] {.async, base, profiled.} = var pending: seq[Future[bool]] trace "about to validate message" for topic in message.topicIDs: diff --git a/libp2p/stream/lpstream.nim b/libp2p/stream/lpstream.nim index 1c4269160..bd929ca07 100644 --- a/libp2p/stream/lpstream.nim +++ b/libp2p/stream/lpstream.nim @@ -12,7 +12,8 @@ import chronicles, chronos, metrics import ../varint, ../vbuffer, ../peerinfo, - ../multiaddress + ../multiaddress, + ../errors declareGauge(libp2p_open_streams, "open stream instances", labels = ["type"]) @@ -98,7 +99,7 @@ method readOnce*(s: LPStream, proc readExactly*(s: LPStream, pbytes: pointer, nbytes: int): - Future[void] {.async.} = + Future[void] {.async, profiled.} = if s.atEof: raise newLPStreamEOFError() @@ -146,7 +147,7 @@ proc readLine*(s: LPStream, if len(result) == lim: break -proc readVarint*(conn: LPStream): Future[uint64] {.async, gcsafe.} = +proc readVarint*(conn: LPStream): Future[uint64] {.async, gcsafe, profiled.} = var varint: uint64 length: int @@ -162,7 +163,7 @@ proc readVarint*(conn: LPStream): Future[uint64] {.async, gcsafe.} = if true: # can't end with a raise apparently raise (ref InvalidVarintError)(msg: "Cannot parse varint") -proc readLp*(s: LPStream, maxSize: int): Future[seq[byte]] {.async, gcsafe.} = +proc readLp*(s: LPStream, maxSize: int): Future[seq[byte]] {.async, gcsafe, profiled.} = ## read length prefixed msg, with the length encoded as a varint let length = await s.readVarint() @@ -191,7 +192,7 @@ method write*(s: LPStream, msg: seq[byte]) {.base, async.} = proc write*(s: LPStream, pbytes: pointer, nbytes: int): Future[void] {.deprecated: "seq".} = s.write(@(toOpenArray(cast[ptr UncheckedArray[byte]](pbytes), 0, nbytes - 1))) -proc write*(s: LPStream, msg: string): Future[void] = +proc write*(s: LPStream, msg: string): Future[void] {.profiled.} = s.write(@(toOpenArrayByte(msg, 0, msg.high))) # TODO: split `close` into `close` and `dispose/destroy`