diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go index fae6994f0..02c5f222c 100644 --- a/swarm/network/stream/delivery.go +++ b/swarm/network/stream/delivery.go @@ -27,6 +27,7 @@ import ( "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/storage" + "github.com/ethereum/go-ethereum/swarm/tracing" opentracing "github.com/opentracing/opentracing-go" ) @@ -143,7 +144,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req * var osp opentracing.Span ctx, osp = spancontext.StartSpan( ctx, - "retrieve.request") + "stream.handle.retrieve") s, err := sp.getServer(NewStream(swarmChunkServerStreamName, "", true)) if err != nil { @@ -207,17 +208,19 @@ type ChunkDeliveryMsgRetrieval ChunkDeliveryMsg //defines a chunk delivery for syncing (without accounting) type ChunkDeliveryMsgSyncing ChunkDeliveryMsg -// TODO: Fix context SNAFU +// chunk delivery msg is response to retrieverequest msg func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error { - var osp opentracing.Span - ctx, osp = spancontext.StartSpan( - ctx, - "chunk.delivery") processReceivedChunksCount.Inc(1) + // retrieve the span for the originating retrieverequest + spanId := fmt.Sprintf("stream.send.request.%v.%v", sp.ID(), req.Addr) + span := tracing.ShiftSpanByKey(spanId) + go func() { - defer osp.Finish() + if span != nil { + defer span.(opentracing.Span).Finish() + } req.peer = sp err := d.chunkStore.Put(ctx, storage.NewChunk(req.Addr, req.SData)) @@ -233,7 +236,9 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch return nil } -// RequestFromPeers sends a chunk retrieve request to +// RequestFromPeers sends a chunk retrieve request to a peer +// The most eligible peer that hasn't already been sent to is chosen +// TODO: define "eligible" func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (*enode.ID, chan struct{}, error) { requestFromPeersCount.Inc(1) var sp *Peer @@ -268,11 +273,15 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) ( } } + // setting this value in the context creates a new span that can persist across the sendpriority queue and the network roundtrip + // this span will finish only when delivery is handled (or times out) + ctx = context.WithValue(ctx, tracing.StoreLabelId, "stream.send.request") + ctx = context.WithValue(ctx, tracing.StoreLabelMeta, fmt.Sprintf("%v.%v", sp.ID(), req.Addr)) err := sp.SendPriority(ctx, &RetrieveRequestMsg{ Addr: req.Addr, SkipCheck: req.SkipCheck, HopCount: req.HopCount, - }, Top, "request.from.peers") + }, Top) if err != nil { return nil, nil, err } diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go index de4e8a3bb..b293724cc 100644 --- a/swarm/network/stream/messages.go +++ b/swarm/network/stream/messages.go @@ -300,7 +300,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg return } log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To) - err := p.SendPriority(ctx, msg, c.priority, "") + err := p.SendPriority(ctx, msg, c.priority) if err != nil { log.Warn("SendPriority error", "err", err) } diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go index 68da8f44a..c59799e08 100644 --- a/swarm/network/stream/peer.go +++ b/swarm/network/stream/peer.go @@ -31,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" + "github.com/ethereum/go-ethereum/swarm/tracing" opentracing "github.com/opentracing/opentracing-go" ) @@ -65,7 +66,6 @@ type Peer struct { // on creating a new client in offered hashes handler. clientParams map[Stream]*clientParams quit chan struct{} - spans sync.Map } type WrappedPriorityMsg struct { @@ -83,16 +83,10 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { clients: make(map[Stream]*client), clientParams: make(map[Stream]*clientParams), quit: make(chan struct{}), - spans: sync.Map{}, } ctx, cancel := context.WithCancel(context.Background()) go p.pq.Run(ctx, func(i interface{}) { wmsg := i.(WrappedPriorityMsg) - defer p.spans.Delete(wmsg.Context) - sp, ok := p.spans.Load(wmsg.Context) - if ok { - defer sp.(opentracing.Span).Finish() - } err := p.Send(wmsg.Context, wmsg.Msg) if err != nil { log.Error("Message send error, dropping peer", "peer", p.ID(), "err", err) @@ -129,6 +123,7 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { go func() { <-p.quit + cancel() }() return p @@ -158,21 +153,15 @@ func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, spanName += ".retrieval" } - return p.SendPriority(ctx, msg, priority, spanName) + ctx = context.WithValue(ctx, "stream_send_tag", nil) + return p.SendPriority(ctx, msg, priority) } // SendPriority sends message to the peer using the outgoing priority queue -func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8, traceId string) error { +func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error { defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("peer.sendpriority_t.%d", priority), nil).UpdateSince(time.Now()) + tracing.StartSaveSpan(ctx) metrics.GetOrRegisterCounter(fmt.Sprintf("peer.sendpriority.%d", priority), nil).Inc(1) - if traceId != "" { - var sp opentracing.Span - ctx, sp = spancontext.StartSpan( - ctx, - traceId, - ) - p.spans.Store(ctx, sp) - } wmsg := WrappedPriorityMsg{ Context: ctx, Msg: msg, @@ -190,7 +179,8 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error { var sp opentracing.Span ctx, sp := spancontext.StartSpan( context.TODO(), - "send.offered.hashes") + "send.offered.hashes", + ) defer sp.Finish() hashes, from, to, proof, err := s.setNextBatch(f, t) @@ -215,7 +205,8 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error { Stream: s.stream, } log.Trace("Swarm syncer offer batch", "peer", p.ID(), "stream", s.stream, "len", len(hashes), "from", from, "to", to) - return p.SendPriority(ctx, msg, s.priority, "send.offered.hashes") + ctx = context.WithValue(ctx, "stream_send_tag", "send.offered.hashes") + return p.SendPriority(ctx, msg, s.priority) } func (p *Peer) getServer(s Stream) (*server, error) { diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index 622b46e4c..8e2a5f31a 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -381,7 +381,7 @@ func (r *Registry) Subscribe(peerId enode.ID, s Stream, h *Range, priority uint8 } log.Debug("Subscribe ", "peer", peerId, "stream", s, "history", h) - return peer.SendPriority(context.TODO(), msg, priority, "") + return peer.SendPriority(context.TODO(), msg, priority) } func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error { @@ -757,7 +757,7 @@ func (c *client) batchDone(p *Peer, req *OfferedHashesMsg, hashes []byte) error return err } - if err := p.SendPriority(context.TODO(), tp, c.priority, ""); err != nil { + if err := p.SendPriority(context.TODO(), tp, c.priority); err != nil { return err } if c.to > 0 && tp.Takeover.End >= c.to { diff --git a/swarm/storage/netstore.go b/swarm/storage/netstore.go index 202af2bf5..8a44f51a8 100644 --- a/swarm/storage/netstore.go +++ b/swarm/storage/netstore.go @@ -26,6 +26,9 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/swarm/log" + "github.com/ethereum/go-ethereum/swarm/spancontext" + "github.com/opentracing/opentracing-go" + lru "github.com/hashicorp/golang-lru" ) @@ -208,7 +211,11 @@ func (n *NetStore) getOrCreateFetcher(ctx context.Context, ref Address) *fetcher // the peers which requested the chunk should not be requested to deliver it. peers := &sync.Map{} - fetcher := newFetcher(ref, n.NewNetFetcherFunc(cctx, ref, peers), destroy, peers, n.closeC) + cctx, sp := spancontext.StartSpan( + cctx, + "netstore.fetcher", + ) + fetcher := newFetcher(sp, ref, n.NewNetFetcherFunc(cctx, ref, peers), destroy, peers, n.closeC) n.fetchers.Add(key, fetcher) return fetcher @@ -233,15 +240,16 @@ func (n *NetStore) RequestsCacheLen() int { // One fetcher object is responsible to fetch one chunk for one address, and keep track of all the // peers who have requested it and did not receive it yet. type fetcher struct { - addr Address // address of chunk - chunk Chunk // fetcher can set the chunk on the fetcher - deliveredC chan struct{} // chan signalling chunk delivery to requests - cancelledC chan struct{} // chan signalling the fetcher has been cancelled (removed from fetchers in NetStore) - netFetcher NetFetcher // remote fetch function to be called with a request source taken from the context - cancel func() // cleanup function for the remote fetcher to call when all upstream contexts are called - peers *sync.Map // the peers which asked for the chunk - requestCnt int32 // number of requests on this chunk. If all the requests are done (delivered or context is done) the cancel function is called - deliverOnce *sync.Once // guarantees that we only close deliveredC once + addr Address // address of chunk + chunk Chunk // fetcher can set the chunk on the fetcher + deliveredC chan struct{} // chan signalling chunk delivery to requests + cancelledC chan struct{} // chan signalling the fetcher has been cancelled (removed from fetchers in NetStore) + netFetcher NetFetcher // remote fetch function to be called with a request source taken from the context + cancel func() // cleanup function for the remote fetcher to call when all upstream contexts are called + peers *sync.Map // the peers which asked for the chunk + requestCnt int32 // number of requests on this chunk. If all the requests are done (delivered or context is done) the cancel function is called + deliverOnce *sync.Once // guarantees that we only close deliveredC once + span opentracing.Span // measure retrieve time per chunk } // newFetcher creates a new fetcher object for the fiven addr. fetch is the function which actually @@ -250,7 +258,7 @@ type fetcher struct { // 1. when the chunk has been fetched all peers have been either notified or their context has been done // 2. the chunk has not been fetched but all context from all the requests has been done // The peers map stores all the peers which have requested chunk. -func newFetcher(addr Address, nf NetFetcher, cancel func(), peers *sync.Map, closeC chan struct{}) *fetcher { +func newFetcher(span opentracing.Span, addr Address, nf NetFetcher, cancel func(), peers *sync.Map, closeC chan struct{}) *fetcher { cancelOnce := &sync.Once{} // cancel should only be called once return &fetcher{ addr: addr, @@ -264,6 +272,7 @@ func newFetcher(addr Address, nf NetFetcher, cancel func(), peers *sync.Map, clo }) }, peers: peers, + span: span, } } @@ -276,6 +285,7 @@ func (f *fetcher) Fetch(rctx context.Context) (Chunk, error) { if atomic.AddInt32(&f.requestCnt, -1) == 0 { f.cancel() } + f.span.Finish() }() // The peer asking for the chunk. Store in the shared peers map, but delete after the request diff --git a/swarm/swarm.go b/swarm/swarm.go index 3ab98b3ab..651ad97c7 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -426,6 +426,7 @@ func (s *Swarm) Start(srv *p2p.Server) error { func (s *Swarm) Stop() error { if s.tracerClose != nil { err := s.tracerClose.Close() + tracing.FinishSpans() if err != nil { return err } diff --git a/swarm/tracing/tracing.go b/swarm/tracing/tracing.go index f95fa41b8..55875464b 100644 --- a/swarm/tracing/tracing.go +++ b/swarm/tracing/tracing.go @@ -1,21 +1,39 @@ package tracing import ( + "context" "io" "os" "strings" + "sync" "time" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/swarm/spancontext" + + opentracing "github.com/opentracing/opentracing-go" jaeger "github.com/uber/jaeger-client-go" jaegercfg "github.com/uber/jaeger-client-go/config" cli "gopkg.in/urfave/cli.v1" ) -var Enabled bool = false +var ( + // Enabled turns tracing on for the current swarm instance + Enabled bool = false + store = spanStore{} +) -// TracingEnabledFlag is the CLI flag name to use to enable trace collections. -const TracingEnabledFlag = "tracing" +const ( + // TracingEnabledFlag is the CLI flag name to use to enable trace collections. + TracingEnabledFlag = "tracing" + + // StoreLabelId is the context value key of the name of the span to be saved + StoreLabelId = "span_save_id" + + // StoreLabelMeta is the context value key that together with StoreLabelId constitutes the retrieval key for saved spans in the span store + // StartSaveSpan and ShiftSpanByKey + StoreLabelMeta = "span_save_meta" +) var ( Closer io.Closer @@ -100,3 +118,56 @@ func initTracer(endpoint, svc string) (closer io.Closer) { return closer } + +// spanStore holds saved spans +type spanStore struct { + spans sync.Map +} + +// StartSaveSpan stores the span specified in the passed context for later retrieval +// The span object but be context value on the key StoreLabelId. +// It will be stored under the the following string key context.Value(StoreLabelId)|.|context.Value(StoreLabelMeta) +func StartSaveSpan(ctx context.Context) context.Context { + if !Enabled { + return ctx + } + traceId := ctx.Value(StoreLabelId) + + if traceId != nil { + traceStr := traceId.(string) + var sp opentracing.Span + ctx, sp = spancontext.StartSpan( + ctx, + traceStr, + ) + traceMeta := ctx.Value(StoreLabelMeta) + if traceMeta != nil { + traceStr = traceStr + "." + traceMeta.(string) + } + store.spans.Store(traceStr, sp) + } + return ctx +} + +// ShiftSpanByKey retrieves the span stored under the key of the string given as argument +// The span is then deleted from the store +func ShiftSpanByKey(k string) opentracing.Span { + if !Enabled { + return nil + } + span, spanOk := store.spans.Load(k) + if !spanOk { + return nil + } + store.spans.Delete(k) + return span.(opentracing.Span) +} + +// FinishSpans calls `Finish()` on all stored spans +// It should be called on instance shutdown +func FinishSpans() { + store.spans.Range(func(_, v interface{}) bool { + v.(opentracing.Span).Finish() + return true + }) +}