diff --git a/util/eventlog/loggables/loggables.go b/util/eventlog/loggables/loggables.go new file mode 100644 index 00000000..40d49ea3 --- /dev/null +++ b/util/eventlog/loggables/loggables.go @@ -0,0 +1,71 @@ +// Package loggables includes a bunch of transaltor functions for commonplace/stdlib +// objects. This is boilerplate code that shouldn't change much, and not sprinkled +// all over the place (i.e. gather it here). +// +// Note: it may make sense to put all stdlib Loggable functions in the eventlog +// package. Putting it here for now in case we don't want to polute it. +package loggables + +import ( + "net" + + ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + + logging "github.com/ipfs/go-ipfs/vendor/go-log-v1.0.0" + + peer "github.com/ipfs/go-libp2p/p2p/peer" +) + +// NetConn returns an eventlog.Metadata with the conn addresses +func NetConn(c net.Conn) logging.Loggable { + return logging.Metadata{ + "localAddr": c.LocalAddr(), + "remoteAddr": c.RemoteAddr(), + } +} + +// Error returns an eventlog.Metadata with an error +func Error(e error) logging.Loggable { + return logging.Metadata{ + "error": e.Error(), + } +} + +// Dial metadata is metadata for dial events +func Dial(sys string, lid, rid peer.ID, laddr, raddr ma.Multiaddr) DeferredMap { + m := DeferredMap{} + m["subsystem"] = sys + if lid != "" { + m["localPeer"] = func() interface{} { return lid.Pretty() } + } + if laddr != nil { + m["localAddr"] = func() interface{} { return laddr.String() } + } + if rid != "" { + m["remotePeer"] = func() interface{} { return rid.Pretty() } + } + if raddr != nil { + m["remoteAddr"] = func() interface{} { return raddr.String() } + } + return m +} + +// DeferredMap is a Loggable which may contained deffered values. +type DeferredMap map[string]interface{} + +// Loggable describes objects that can be marshalled into Metadata for logging +func (m DeferredMap) Loggable() map[string]interface{} { + m2 := map[string]interface{}{} + for k, v := range m { + + if vf, ok := v.(func() interface{}); ok { + // if it's a DeferredVal, call it. + m2[k] = vf() + + } else { + // else use the value as is. + m2[k] = v + } + } + return m2 +} diff --git a/util/metrics/bw_stats.go b/util/metrics/bw_stats.go new file mode 100644 index 00000000..c624fe6a --- /dev/null +++ b/util/metrics/bw_stats.go @@ -0,0 +1,90 @@ +package metrics + +import ( + "sync" + + gm "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/whyrusleeping/go-metrics" + + peer "github.com/ipfs/go-libp2p/p2p/peer" + protocol "github.com/ipfs/go-libp2p/p2p/protocol" +) + +type Stats struct { + TotalIn int64 + TotalOut int64 + RateIn float64 + RateOut float64 +} + +type BandwidthCounter struct { + lock sync.Mutex + totalIn gm.Meter + totalOut gm.Meter + reg gm.Registry +} + +func NewBandwidthCounter() *BandwidthCounter { + reg := gm.NewRegistry() + return &BandwidthCounter{ + totalIn: gm.GetOrRegisterMeter("totalIn", reg), + totalOut: gm.GetOrRegisterMeter("totalOut", reg), + reg: reg, + } +} + +func (bwc *BandwidthCounter) LogSentMessage(size int64) { + bwc.totalOut.Mark(size) +} + +func (bwc *BandwidthCounter) LogRecvMessage(size int64) { + bwc.totalIn.Mark(size) +} + +func (bwc *BandwidthCounter) LogSentMessageStream(size int64, proto protocol.ID, p peer.ID) { + meter := gm.GetOrRegisterMeter("/peer/out/"+string(p), bwc.reg) + meter.Mark(size) + + pmeter := gm.GetOrRegisterMeter("/proto/out/"+string(proto), bwc.reg) + pmeter.Mark(size) +} + +func (bwc *BandwidthCounter) LogRecvMessageStream(size int64, proto protocol.ID, p peer.ID) { + meter := gm.GetOrRegisterMeter("/peer/in/"+string(p), bwc.reg) + meter.Mark(size) + + pmeter := gm.GetOrRegisterMeter("/proto/in/"+string(proto), bwc.reg) + pmeter.Mark(size) +} + +func (bwc *BandwidthCounter) GetBandwidthForPeer(p peer.ID) (out Stats) { + inMeter := gm.GetOrRegisterMeter("/peer/in/"+string(p), bwc.reg).Snapshot() + outMeter := gm.GetOrRegisterMeter("/peer/out/"+string(p), bwc.reg).Snapshot() + + return Stats{ + TotalIn: inMeter.Count(), + TotalOut: outMeter.Count(), + RateIn: inMeter.RateFine(), + RateOut: outMeter.RateFine(), + } +} + +func (bwc *BandwidthCounter) GetBandwidthForProtocol(proto protocol.ID) (out Stats) { + inMeter := gm.GetOrRegisterMeter(string("/proto/in/"+proto), bwc.reg).Snapshot() + outMeter := gm.GetOrRegisterMeter(string("/proto/out/"+proto), bwc.reg).Snapshot() + + return Stats{ + TotalIn: inMeter.Count(), + TotalOut: outMeter.Count(), + RateIn: inMeter.RateFine(), + RateOut: outMeter.RateFine(), + } +} + +func (bwc *BandwidthCounter) GetBandwidthTotals() (out Stats) { + return Stats{ + TotalIn: bwc.totalIn.Count(), + TotalOut: bwc.totalOut.Count(), + RateIn: bwc.totalIn.RateFine(), + RateOut: bwc.totalOut.RateFine(), + } +} diff --git a/util/metrics/conn/conn.go b/util/metrics/conn/conn.go new file mode 100644 index 00000000..238b174b --- /dev/null +++ b/util/metrics/conn/conn.go @@ -0,0 +1,39 @@ +package meterconn + +import ( + manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" + metrics "github.com/ipfs/go-libp2p/util/metrics" +) + +type MeteredConn struct { + mesRecv metrics.MeterCallback + mesSent metrics.MeterCallback + + manet.Conn +} + +func WrapConn(bwc metrics.Reporter, c manet.Conn) manet.Conn { + return newMeteredConn(c, bwc.LogRecvMessage, bwc.LogSentMessage) +} + +func newMeteredConn(base manet.Conn, rcb metrics.MeterCallback, scb metrics.MeterCallback) manet.Conn { + return &MeteredConn{ + Conn: base, + mesRecv: rcb, + mesSent: scb, + } +} + +func (mc *MeteredConn) Read(b []byte) (int, error) { + n, err := mc.Conn.Read(b) + + mc.mesRecv(int64(n)) + return n, err +} + +func (mc *MeteredConn) Write(b []byte) (int, error) { + n, err := mc.Conn.Write(b) + + mc.mesSent(int64(n)) + return n, err +} diff --git a/util/metrics/interface.go b/util/metrics/interface.go new file mode 100644 index 00000000..be038fc8 --- /dev/null +++ b/util/metrics/interface.go @@ -0,0 +1,19 @@ +package metrics + +import ( + peer "github.com/ipfs/go-libp2p/p2p/peer" + protocol "github.com/ipfs/go-libp2p/p2p/protocol" +) + +type StreamMeterCallback func(int64, protocol.ID, peer.ID) +type MeterCallback func(int64) + +type Reporter interface { + LogSentMessage(int64) + LogRecvMessage(int64) + LogSentMessageStream(int64, protocol.ID, peer.ID) + LogRecvMessageStream(int64, protocol.ID, peer.ID) + GetBandwidthForPeer(peer.ID) Stats + GetBandwidthForProtocol(protocol.ID) Stats + GetBandwidthTotals() Stats +} diff --git a/util/metrics/stream/metered.go b/util/metrics/stream/metered.go new file mode 100644 index 00000000..3c4e44c1 --- /dev/null +++ b/util/metrics/stream/metered.go @@ -0,0 +1,52 @@ +package meterstream + +import ( + inet "github.com/ipfs/go-libp2p/p2p/net" + peer "github.com/ipfs/go-libp2p/p2p/peer" + protocol "github.com/ipfs/go-libp2p/p2p/protocol" + metrics "github.com/ipfs/go-libp2p/util/metrics" +) + +type meteredStream struct { + // keys for accessing metrics data + protoKey protocol.ID + peerKey peer.ID + + inet.Stream + + // callbacks for reporting bandwidth usage + mesSent metrics.StreamMeterCallback + mesRecv metrics.StreamMeterCallback +} + +func newMeteredStream(base inet.Stream, pid protocol.ID, p peer.ID, recvCB, sentCB metrics.StreamMeterCallback) inet.Stream { + return &meteredStream{ + Stream: base, + mesSent: sentCB, + mesRecv: recvCB, + protoKey: pid, + peerKey: p, + } +} + +func WrapStream(base inet.Stream, pid protocol.ID, bwc metrics.Reporter) inet.Stream { + return newMeteredStream(base, pid, base.Conn().RemotePeer(), bwc.LogRecvMessageStream, bwc.LogSentMessageStream) +} + +func (s *meteredStream) Read(b []byte) (int, error) { + n, err := s.Stream.Read(b) + + // Log bytes read + s.mesRecv(int64(n), s.protoKey, s.peerKey) + + return n, err +} + +func (s *meteredStream) Write(b []byte) (int, error) { + n, err := s.Stream.Write(b) + + // Log bytes written + s.mesSent(int64(n), s.protoKey, s.peerKey) + + return n, err +} diff --git a/util/metrics/stream/metered_test.go b/util/metrics/stream/metered_test.go new file mode 100644 index 00000000..f91abbd1 --- /dev/null +++ b/util/metrics/stream/metered_test.go @@ -0,0 +1,74 @@ +package meterstream + +import ( + "io" + "io/ioutil" + "testing" + + u "github.com/ipfs/go-ipfs/util" + inet "github.com/ipfs/go-libp2p/p2p/net" + peer "github.com/ipfs/go-libp2p/p2p/peer" + protocol "github.com/ipfs/go-libp2p/p2p/protocol" +) + +type FakeStream struct { + ReadBuf io.Reader + inet.Stream +} + +func (fs *FakeStream) Read(b []byte) (int, error) { + return fs.ReadBuf.Read(b) +} + +func (fs *FakeStream) Write(b []byte) (int, error) { + return len(b), nil +} + +func TestCallbacksWork(t *testing.T) { + fake := new(FakeStream) + + var sent int64 + var recv int64 + + sentCB := func(n int64, proto protocol.ID, p peer.ID) { + sent += n + } + + recvCB := func(n int64, proto protocol.ID, p peer.ID) { + recv += n + } + + ms := newMeteredStream(fake, protocol.ID("TEST"), peer.ID("PEER"), recvCB, sentCB) + + toWrite := int64(100000) + toRead := int64(100000) + + fake.ReadBuf = io.LimitReader(u.NewTimeSeededRand(), toRead) + writeData := io.LimitReader(u.NewTimeSeededRand(), toWrite) + + n, err := io.Copy(ms, writeData) + if err != nil { + t.Fatal(err) + } + + if n != toWrite { + t.Fatal("incorrect write amount") + } + + if toWrite != sent { + t.Fatal("incorrectly reported writes", toWrite, sent) + } + + n, err = io.Copy(ioutil.Discard, ms) + if err != nil { + t.Fatal(err) + } + + if n != toRead { + t.Fatal("incorrect read amount") + } + + if toRead != recv { + t.Fatal("incorrectly reported reads") + } +}