make a easy macro to wrap procs, add more keys

This commit is contained in:
Giovanni Petrantoni 2020-08-11 02:06:48 +09:00
parent 4ead4bd219
commit 537e3bc27d
5 changed files with 59 additions and 34 deletions

View File

@ -96,6 +96,34 @@ when defined(profiler_optick):
defer:
popEvent(ev)
proc getName(node: NimNode): string {.compileTime.} =
case node.kind
of nnkSym:
return $node
of nnkPostfix:
return node[1].strVal
of nnkIdent:
return node.strVal
of nnkEmpty:
return "anonymous"
else:
error("Unknown name.")
macro profiled*(p: untyped): untyped =
let name = p.name.getName()
var code = newStmtList()
var keySym = genSym(nskLet)
let prefix = quote do:
{.gcsafe.}:
const pos = instantiationInfo()
let event_desc {.global.} = createEvent(`name`.cstring, `name`.len.uint16, pos.filename.cstring, pos.filename.len.uint16, pos.line.uint32)
let `keySym` = pushEvent(event_desc)
defer:
popEvent(`keySym`)
# inject our code
p[6].insert(0, prefix)
p
proc load() =
var candidates: seq[string]
libCandidates("OptickCore", candidates)
@ -136,4 +164,5 @@ when defined(profiler_optick):
asyncCheck frameTicker()
# asyncCheck pollHook()
else:
template profile*(name: string): untyped = discard
macro profiled*(p: untyped): untyped =
p

View File

@ -11,7 +11,8 @@ import strutils
import chronos, chronicles, stew/byteutils
import stream/connection,
vbuffer,
protocols/protocol
protocols/protocol,
errors
logScope:
topics = "multistream"
@ -49,7 +50,7 @@ template validateSuffix(str: string): untyped =
proc select*(m: MultistreamSelect,
conn: Connection,
proto: seq[string]):
Future[string] {.async.} =
Future[string] {.async, profiled.} =
trace "initiating handshake", codec = m.codec
## select a remote protocol
await conn.write(m.codec) # write handshake

View File

@ -82,7 +82,7 @@ method init*(g: GossipSub) =
g.handler = handler
g.codec = GossipSubCodec
proc replenishFanout(g: GossipSub, topic: string) =
proc replenishFanout(g: GossipSub, topic: string) {.profiled.} =
## get fanout peers for a topic
trace "about to replenish fanout"
@ -100,9 +100,7 @@ proc replenishFanout(g: GossipSub, topic: string) =
trace "fanout replenished with peers", peers = g.fanout.peers(topic)
proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
profile "rebalanceMesh"
proc rebalanceMesh(g: GossipSub, topic: string) {.async, profiled.} =
logScope:
topic
@ -164,7 +162,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
trace "mesh balanced, got peers", peers = g.mesh.peers(topic)
proc dropFanoutPeers(g: GossipSub) =
proc dropFanoutPeers(g: GossipSub) {.profiled.} =
# drop peers that we haven't published to in
# GossipSubFanoutTTL seconds
let now = Moment.now()
@ -179,7 +177,7 @@ proc dropFanoutPeers(g: GossipSub) =
libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.peers(topic).int64, labelValues = [topic])
proc getGossipPeers(g: GossipSub): Table[PubSubPeer, ControlMessage] {.gcsafe.} =
proc getGossipPeers(g: GossipSub): Table[PubSubPeer, ControlMessage] {.gcsafe, profiled.} =
## gossip iHave messages to peers
##
@ -247,7 +245,7 @@ proc heartbeat(g: GossipSub) {.async.} =
await sleepAsync(GossipSubHeartbeatInterval)
method unsubscribePeer*(g: GossipSub, peer: PeerID) =
method unsubscribePeer*(g: GossipSub, peer: PeerID) {.profiled.} =
## handle peer disconnects
##
@ -282,7 +280,7 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) =
method subscribeTopic*(g: GossipSub,
topic: string,
subscribe: bool,
peerId: PeerID) {.gcsafe, async.} =
peerId: PeerID) {.gcsafe, async, profiled.} =
await procCall FloodSub(g).subscribeTopic(topic, subscribe, peerId)
logScope:
@ -323,7 +321,7 @@ method subscribeTopic*(g: GossipSub,
proc handleGraft(g: GossipSub,
peer: PubSubPeer,
grafts: seq[ControlGraft]): seq[ControlPrune] =
grafts: seq[ControlGraft]): seq[ControlPrune] {.profiled.} =
for graft in grafts:
let topic = graft.topicID
logScope:
@ -356,7 +354,7 @@ proc handleGraft(g: GossipSub,
libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.peers(topic).int64, labelValues = [topic])
proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.profiled.} =
for prune in prunes:
trace "peer pruned topic", peer = peer.id, topic = prune.topicID
@ -367,7 +365,7 @@ proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
proc handleIHave(g: GossipSub,
peer: PubSubPeer,
ihaves: seq[ControlIHave]): ControlIWant =
ihaves: seq[ControlIHave]): ControlIWant {.profiled.} =
for ihave in ihaves:
trace "peer sent ihave",
peer = peer.id, topic = ihave.topicID, msgs = ihave.messageIDs
@ -379,7 +377,7 @@ proc handleIHave(g: GossipSub,
proc handleIWant(g: GossipSub,
peer: PubSubPeer,
iwants: seq[ControlIWant]): seq[Message] =
iwants: seq[ControlIWant]): seq[Message] {.profiled.} =
for iwant in iwants:
for mid in iwant.messageIDs:
trace "peer sent iwant", peer = peer.id, messageID = mid
@ -389,9 +387,7 @@ proc handleIWant(g: GossipSub,
method rpcHandler*(g: GossipSub,
peer: PubSubPeer,
rpcMsgs: seq[RPCMsg]) {.async.} =
profile "rpcHandler"
rpcMsgs: seq[RPCMsg]) {.async, profiled.} =
await procCall PubSub(g).rpcHandler(peer, rpcMsgs)
for m in rpcMsgs: # for all RPC messages
@ -467,12 +463,12 @@ method rpcHandler*(g: GossipSub,
method subscribe*(g: GossipSub,
topic: string,
handler: TopicHandler) {.async.} =
handler: TopicHandler) {.async, profiled.} =
await procCall PubSub(g).subscribe(topic, handler)
await g.rebalanceMesh(topic)
method unsubscribe*(g: GossipSub,
topics: seq[TopicPair]) {.async.} =
topics: seq[TopicPair]) {.async, profiled.} =
await procCall PubSub(g).unsubscribe(topics)
for (topic, handler) in topics:
@ -485,7 +481,7 @@ method unsubscribe*(g: GossipSub,
let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)])))
discard g.broadcast(peers, prune, DefaultSendTimeout)
method unsubscribeAll*(g: GossipSub, topic: string) {.async.} =
method unsubscribeAll*(g: GossipSub, topic: string) {.async, profiled.} =
await procCall PubSub(g).unsubscribeAll(topic)
if topic in g.mesh:
@ -498,9 +494,7 @@ method unsubscribeAll*(g: GossipSub, topic: string) {.async.} =
method publish*(g: GossipSub,
topic: string,
data: seq[byte],
timeout: Duration = InfiniteDuration): Future[int] {.async.} =
profile "publish"
timeout: Duration = InfiniteDuration): Future[int] {.async, profiled.} =
# base returns always 0
discard await procCall PubSub(g).publish(topic, data, timeout)
trace "publishing message on topic", topic, data = data.shortLog

View File

@ -110,7 +110,7 @@ proc broadcast*(p: PubSub,
proc sendSubs*(p: PubSub,
peer: PubSubPeer,
topics: seq[string],
subscribe: bool) {.async.} =
subscribe: bool) {.async, profiled.} =
## send subscriptions to remote peer
discard await p.broadcast([peer], RPCMsg(subscriptions: topics.mapIt(SubOpts(subscribe: subscribe, topic: it))), DefaultSendTimeout)
@ -137,7 +137,7 @@ method rpcHandler*(p: PubSub,
proc getOrCreatePeer*(
p: PubSub,
peer: PeerID,
proto: string): PubSubPeer =
proto: string): PubSubPeer {.profiled.} =
if peer in p.peers:
return p.peers[peer]
@ -289,7 +289,7 @@ method stop*(p: PubSub) {.async, base.} =
method addValidator*(p: PubSub,
topic: varargs[string],
hook: ValidatorHandler) {.base.} =
hook: ValidatorHandler) {.base, profiled.} =
for t in topic:
if t notin p.validators:
p.validators[t] = initHashSet[ValidatorHandler]()
@ -299,12 +299,12 @@ method addValidator*(p: PubSub,
method removeValidator*(p: PubSub,
topic: varargs[string],
hook: ValidatorHandler) {.base.} =
hook: ValidatorHandler) {.base, profiled.} =
for t in topic:
if t in p.validators:
p.validators[t].excl(hook)
method validate*(p: PubSub, message: Message): Future[bool] {.async, base.} =
method validate*(p: PubSub, message: Message): Future[bool] {.async, base, profiled.} =
var pending: seq[Future[bool]]
trace "about to validate message"
for topic in message.topicIDs:

View File

@ -12,7 +12,8 @@ import chronicles, chronos, metrics
import ../varint,
../vbuffer,
../peerinfo,
../multiaddress
../multiaddress,
../errors
declareGauge(libp2p_open_streams, "open stream instances", labels = ["type"])
@ -98,7 +99,7 @@ method readOnce*(s: LPStream,
proc readExactly*(s: LPStream,
pbytes: pointer,
nbytes: int):
Future[void] {.async.} =
Future[void] {.async, profiled.} =
if s.atEof:
raise newLPStreamEOFError()
@ -146,7 +147,7 @@ proc readLine*(s: LPStream,
if len(result) == lim:
break
proc readVarint*(conn: LPStream): Future[uint64] {.async, gcsafe.} =
proc readVarint*(conn: LPStream): Future[uint64] {.async, gcsafe, profiled.} =
var
varint: uint64
length: int
@ -162,7 +163,7 @@ proc readVarint*(conn: LPStream): Future[uint64] {.async, gcsafe.} =
if true: # can't end with a raise apparently
raise (ref InvalidVarintError)(msg: "Cannot parse varint")
proc readLp*(s: LPStream, maxSize: int): Future[seq[byte]] {.async, gcsafe.} =
proc readLp*(s: LPStream, maxSize: int): Future[seq[byte]] {.async, gcsafe, profiled.} =
## read length prefixed msg, with the length encoded as a varint
let
length = await s.readVarint()
@ -191,7 +192,7 @@ method write*(s: LPStream, msg: seq[byte]) {.base, async.} =
proc write*(s: LPStream, pbytes: pointer, nbytes: int): Future[void] {.deprecated: "seq".} =
s.write(@(toOpenArray(cast[ptr UncheckedArray[byte]](pbytes), 0, nbytes - 1)))
proc write*(s: LPStream, msg: string): Future[void] =
proc write*(s: LPStream, msg: string): Future[void] {.profiled.} =
s.write(@(toOpenArrayByte(msg, 0, msg.high)))
# TODO: split `close` into `close` and `dispose/destroy`