util metrics
This commit is contained in:
parent
2392434b6d
commit
339bbdaab9
|
@ -0,0 +1,71 @@
|
||||||
|
// Package loggables includes a bunch of transaltor functions for commonplace/stdlib
|
||||||
|
// objects. This is boilerplate code that shouldn't change much, and not sprinkled
|
||||||
|
// all over the place (i.e. gather it here).
|
||||||
|
//
|
||||||
|
// Note: it may make sense to put all stdlib Loggable functions in the eventlog
|
||||||
|
// package. Putting it here for now in case we don't want to polute it.
|
||||||
|
package loggables
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net"
|
||||||
|
|
||||||
|
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||||
|
|
||||||
|
logging "github.com/ipfs/go-ipfs/vendor/go-log-v1.0.0"
|
||||||
|
|
||||||
|
peer "github.com/ipfs/go-libp2p/p2p/peer"
|
||||||
|
)
|
||||||
|
|
||||||
|
// NetConn returns an eventlog.Metadata with the conn addresses
|
||||||
|
func NetConn(c net.Conn) logging.Loggable {
|
||||||
|
return logging.Metadata{
|
||||||
|
"localAddr": c.LocalAddr(),
|
||||||
|
"remoteAddr": c.RemoteAddr(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error returns an eventlog.Metadata with an error
|
||||||
|
func Error(e error) logging.Loggable {
|
||||||
|
return logging.Metadata{
|
||||||
|
"error": e.Error(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Dial metadata is metadata for dial events
|
||||||
|
func Dial(sys string, lid, rid peer.ID, laddr, raddr ma.Multiaddr) DeferredMap {
|
||||||
|
m := DeferredMap{}
|
||||||
|
m["subsystem"] = sys
|
||||||
|
if lid != "" {
|
||||||
|
m["localPeer"] = func() interface{} { return lid.Pretty() }
|
||||||
|
}
|
||||||
|
if laddr != nil {
|
||||||
|
m["localAddr"] = func() interface{} { return laddr.String() }
|
||||||
|
}
|
||||||
|
if rid != "" {
|
||||||
|
m["remotePeer"] = func() interface{} { return rid.Pretty() }
|
||||||
|
}
|
||||||
|
if raddr != nil {
|
||||||
|
m["remoteAddr"] = func() interface{} { return raddr.String() }
|
||||||
|
}
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeferredMap is a Loggable which may contained deffered values.
|
||||||
|
type DeferredMap map[string]interface{}
|
||||||
|
|
||||||
|
// Loggable describes objects that can be marshalled into Metadata for logging
|
||||||
|
func (m DeferredMap) Loggable() map[string]interface{} {
|
||||||
|
m2 := map[string]interface{}{}
|
||||||
|
for k, v := range m {
|
||||||
|
|
||||||
|
if vf, ok := v.(func() interface{}); ok {
|
||||||
|
// if it's a DeferredVal, call it.
|
||||||
|
m2[k] = vf()
|
||||||
|
|
||||||
|
} else {
|
||||||
|
// else use the value as is.
|
||||||
|
m2[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return m2
|
||||||
|
}
|
|
@ -0,0 +1,90 @@
|
||||||
|
package metrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
gm "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/whyrusleeping/go-metrics"
|
||||||
|
|
||||||
|
peer "github.com/ipfs/go-libp2p/p2p/peer"
|
||||||
|
protocol "github.com/ipfs/go-libp2p/p2p/protocol"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Stats struct {
|
||||||
|
TotalIn int64
|
||||||
|
TotalOut int64
|
||||||
|
RateIn float64
|
||||||
|
RateOut float64
|
||||||
|
}
|
||||||
|
|
||||||
|
type BandwidthCounter struct {
|
||||||
|
lock sync.Mutex
|
||||||
|
totalIn gm.Meter
|
||||||
|
totalOut gm.Meter
|
||||||
|
reg gm.Registry
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewBandwidthCounter() *BandwidthCounter {
|
||||||
|
reg := gm.NewRegistry()
|
||||||
|
return &BandwidthCounter{
|
||||||
|
totalIn: gm.GetOrRegisterMeter("totalIn", reg),
|
||||||
|
totalOut: gm.GetOrRegisterMeter("totalOut", reg),
|
||||||
|
reg: reg,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bwc *BandwidthCounter) LogSentMessage(size int64) {
|
||||||
|
bwc.totalOut.Mark(size)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bwc *BandwidthCounter) LogRecvMessage(size int64) {
|
||||||
|
bwc.totalIn.Mark(size)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bwc *BandwidthCounter) LogSentMessageStream(size int64, proto protocol.ID, p peer.ID) {
|
||||||
|
meter := gm.GetOrRegisterMeter("/peer/out/"+string(p), bwc.reg)
|
||||||
|
meter.Mark(size)
|
||||||
|
|
||||||
|
pmeter := gm.GetOrRegisterMeter("/proto/out/"+string(proto), bwc.reg)
|
||||||
|
pmeter.Mark(size)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bwc *BandwidthCounter) LogRecvMessageStream(size int64, proto protocol.ID, p peer.ID) {
|
||||||
|
meter := gm.GetOrRegisterMeter("/peer/in/"+string(p), bwc.reg)
|
||||||
|
meter.Mark(size)
|
||||||
|
|
||||||
|
pmeter := gm.GetOrRegisterMeter("/proto/in/"+string(proto), bwc.reg)
|
||||||
|
pmeter.Mark(size)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bwc *BandwidthCounter) GetBandwidthForPeer(p peer.ID) (out Stats) {
|
||||||
|
inMeter := gm.GetOrRegisterMeter("/peer/in/"+string(p), bwc.reg).Snapshot()
|
||||||
|
outMeter := gm.GetOrRegisterMeter("/peer/out/"+string(p), bwc.reg).Snapshot()
|
||||||
|
|
||||||
|
return Stats{
|
||||||
|
TotalIn: inMeter.Count(),
|
||||||
|
TotalOut: outMeter.Count(),
|
||||||
|
RateIn: inMeter.RateFine(),
|
||||||
|
RateOut: outMeter.RateFine(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bwc *BandwidthCounter) GetBandwidthForProtocol(proto protocol.ID) (out Stats) {
|
||||||
|
inMeter := gm.GetOrRegisterMeter(string("/proto/in/"+proto), bwc.reg).Snapshot()
|
||||||
|
outMeter := gm.GetOrRegisterMeter(string("/proto/out/"+proto), bwc.reg).Snapshot()
|
||||||
|
|
||||||
|
return Stats{
|
||||||
|
TotalIn: inMeter.Count(),
|
||||||
|
TotalOut: outMeter.Count(),
|
||||||
|
RateIn: inMeter.RateFine(),
|
||||||
|
RateOut: outMeter.RateFine(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bwc *BandwidthCounter) GetBandwidthTotals() (out Stats) {
|
||||||
|
return Stats{
|
||||||
|
TotalIn: bwc.totalIn.Count(),
|
||||||
|
TotalOut: bwc.totalOut.Count(),
|
||||||
|
RateIn: bwc.totalIn.RateFine(),
|
||||||
|
RateOut: bwc.totalOut.RateFine(),
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,39 @@
|
||||||
|
package meterconn
|
||||||
|
|
||||||
|
import (
|
||||||
|
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
|
||||||
|
metrics "github.com/ipfs/go-libp2p/util/metrics"
|
||||||
|
)
|
||||||
|
|
||||||
|
type MeteredConn struct {
|
||||||
|
mesRecv metrics.MeterCallback
|
||||||
|
mesSent metrics.MeterCallback
|
||||||
|
|
||||||
|
manet.Conn
|
||||||
|
}
|
||||||
|
|
||||||
|
func WrapConn(bwc metrics.Reporter, c manet.Conn) manet.Conn {
|
||||||
|
return newMeteredConn(c, bwc.LogRecvMessage, bwc.LogSentMessage)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMeteredConn(base manet.Conn, rcb metrics.MeterCallback, scb metrics.MeterCallback) manet.Conn {
|
||||||
|
return &MeteredConn{
|
||||||
|
Conn: base,
|
||||||
|
mesRecv: rcb,
|
||||||
|
mesSent: scb,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mc *MeteredConn) Read(b []byte) (int, error) {
|
||||||
|
n, err := mc.Conn.Read(b)
|
||||||
|
|
||||||
|
mc.mesRecv(int64(n))
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mc *MeteredConn) Write(b []byte) (int, error) {
|
||||||
|
n, err := mc.Conn.Write(b)
|
||||||
|
|
||||||
|
mc.mesSent(int64(n))
|
||||||
|
return n, err
|
||||||
|
}
|
|
@ -0,0 +1,19 @@
|
||||||
|
package metrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
peer "github.com/ipfs/go-libp2p/p2p/peer"
|
||||||
|
protocol "github.com/ipfs/go-libp2p/p2p/protocol"
|
||||||
|
)
|
||||||
|
|
||||||
|
type StreamMeterCallback func(int64, protocol.ID, peer.ID)
|
||||||
|
type MeterCallback func(int64)
|
||||||
|
|
||||||
|
type Reporter interface {
|
||||||
|
LogSentMessage(int64)
|
||||||
|
LogRecvMessage(int64)
|
||||||
|
LogSentMessageStream(int64, protocol.ID, peer.ID)
|
||||||
|
LogRecvMessageStream(int64, protocol.ID, peer.ID)
|
||||||
|
GetBandwidthForPeer(peer.ID) Stats
|
||||||
|
GetBandwidthForProtocol(protocol.ID) Stats
|
||||||
|
GetBandwidthTotals() Stats
|
||||||
|
}
|
|
@ -0,0 +1,52 @@
|
||||||
|
package meterstream
|
||||||
|
|
||||||
|
import (
|
||||||
|
inet "github.com/ipfs/go-libp2p/p2p/net"
|
||||||
|
peer "github.com/ipfs/go-libp2p/p2p/peer"
|
||||||
|
protocol "github.com/ipfs/go-libp2p/p2p/protocol"
|
||||||
|
metrics "github.com/ipfs/go-libp2p/util/metrics"
|
||||||
|
)
|
||||||
|
|
||||||
|
type meteredStream struct {
|
||||||
|
// keys for accessing metrics data
|
||||||
|
protoKey protocol.ID
|
||||||
|
peerKey peer.ID
|
||||||
|
|
||||||
|
inet.Stream
|
||||||
|
|
||||||
|
// callbacks for reporting bandwidth usage
|
||||||
|
mesSent metrics.StreamMeterCallback
|
||||||
|
mesRecv metrics.StreamMeterCallback
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMeteredStream(base inet.Stream, pid protocol.ID, p peer.ID, recvCB, sentCB metrics.StreamMeterCallback) inet.Stream {
|
||||||
|
return &meteredStream{
|
||||||
|
Stream: base,
|
||||||
|
mesSent: sentCB,
|
||||||
|
mesRecv: recvCB,
|
||||||
|
protoKey: pid,
|
||||||
|
peerKey: p,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WrapStream(base inet.Stream, pid protocol.ID, bwc metrics.Reporter) inet.Stream {
|
||||||
|
return newMeteredStream(base, pid, base.Conn().RemotePeer(), bwc.LogRecvMessageStream, bwc.LogSentMessageStream)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *meteredStream) Read(b []byte) (int, error) {
|
||||||
|
n, err := s.Stream.Read(b)
|
||||||
|
|
||||||
|
// Log bytes read
|
||||||
|
s.mesRecv(int64(n), s.protoKey, s.peerKey)
|
||||||
|
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *meteredStream) Write(b []byte) (int, error) {
|
||||||
|
n, err := s.Stream.Write(b)
|
||||||
|
|
||||||
|
// Log bytes written
|
||||||
|
s.mesSent(int64(n), s.protoKey, s.peerKey)
|
||||||
|
|
||||||
|
return n, err
|
||||||
|
}
|
|
@ -0,0 +1,74 @@
|
||||||
|
package meterstream
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
u "github.com/ipfs/go-ipfs/util"
|
||||||
|
inet "github.com/ipfs/go-libp2p/p2p/net"
|
||||||
|
peer "github.com/ipfs/go-libp2p/p2p/peer"
|
||||||
|
protocol "github.com/ipfs/go-libp2p/p2p/protocol"
|
||||||
|
)
|
||||||
|
|
||||||
|
type FakeStream struct {
|
||||||
|
ReadBuf io.Reader
|
||||||
|
inet.Stream
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *FakeStream) Read(b []byte) (int, error) {
|
||||||
|
return fs.ReadBuf.Read(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *FakeStream) Write(b []byte) (int, error) {
|
||||||
|
return len(b), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCallbacksWork(t *testing.T) {
|
||||||
|
fake := new(FakeStream)
|
||||||
|
|
||||||
|
var sent int64
|
||||||
|
var recv int64
|
||||||
|
|
||||||
|
sentCB := func(n int64, proto protocol.ID, p peer.ID) {
|
||||||
|
sent += n
|
||||||
|
}
|
||||||
|
|
||||||
|
recvCB := func(n int64, proto protocol.ID, p peer.ID) {
|
||||||
|
recv += n
|
||||||
|
}
|
||||||
|
|
||||||
|
ms := newMeteredStream(fake, protocol.ID("TEST"), peer.ID("PEER"), recvCB, sentCB)
|
||||||
|
|
||||||
|
toWrite := int64(100000)
|
||||||
|
toRead := int64(100000)
|
||||||
|
|
||||||
|
fake.ReadBuf = io.LimitReader(u.NewTimeSeededRand(), toRead)
|
||||||
|
writeData := io.LimitReader(u.NewTimeSeededRand(), toWrite)
|
||||||
|
|
||||||
|
n, err := io.Copy(ms, writeData)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if n != toWrite {
|
||||||
|
t.Fatal("incorrect write amount")
|
||||||
|
}
|
||||||
|
|
||||||
|
if toWrite != sent {
|
||||||
|
t.Fatal("incorrectly reported writes", toWrite, sent)
|
||||||
|
}
|
||||||
|
|
||||||
|
n, err = io.Copy(ioutil.Discard, ms)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if n != toRead {
|
||||||
|
t.Fatal("incorrect read amount")
|
||||||
|
}
|
||||||
|
|
||||||
|
if toRead != recv {
|
||||||
|
t.Fatal("incorrectly reported reads")
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue