swarm/network: Keep span across roundtrip (#19140)

* swarm/newtork: WIP Span request span until delivery and put

* swarm/storage: Introduce new trace across single fetcher lifespan

* swarm/network: Put span ids for sendpriority in context value

* swarm: Add global span store in tracing

* swarm/tracing: Add context key constants

* swarm/tracing: Add comments

* swarm/storage: Remove redundant fix for filestore

* swarm/tracing: Elaborate constants comments

* swarm/network, swarm/storage, swarm:tracing: Minor cleanup
This commit is contained in:
lash 2019-02-20 14:50:37 +01:00 committed by Viktor Trón
parent 460d206f30
commit d36e974ba3
7 changed files with 127 additions and 45 deletions

View File

@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/storage"
"github.com/ethereum/go-ethereum/swarm/tracing"
opentracing "github.com/opentracing/opentracing-go" opentracing "github.com/opentracing/opentracing-go"
) )
@ -143,7 +144,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
var osp opentracing.Span var osp opentracing.Span
ctx, osp = spancontext.StartSpan( ctx, osp = spancontext.StartSpan(
ctx, ctx,
"retrieve.request") "stream.handle.retrieve")
s, err := sp.getServer(NewStream(swarmChunkServerStreamName, "", true)) s, err := sp.getServer(NewStream(swarmChunkServerStreamName, "", true))
if err != nil { if err != nil {
@ -207,17 +208,19 @@ type ChunkDeliveryMsgRetrieval ChunkDeliveryMsg
//defines a chunk delivery for syncing (without accounting) //defines a chunk delivery for syncing (without accounting)
type ChunkDeliveryMsgSyncing ChunkDeliveryMsg type ChunkDeliveryMsgSyncing ChunkDeliveryMsg
// TODO: Fix context SNAFU // chunk delivery msg is response to retrieverequest msg
func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error { func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error {
var osp opentracing.Span
ctx, osp = spancontext.StartSpan(
ctx,
"chunk.delivery")
processReceivedChunksCount.Inc(1) processReceivedChunksCount.Inc(1)
// retrieve the span for the originating retrieverequest
spanId := fmt.Sprintf("stream.send.request.%v.%v", sp.ID(), req.Addr)
span := tracing.ShiftSpanByKey(spanId)
go func() { go func() {
defer osp.Finish() if span != nil {
defer span.(opentracing.Span).Finish()
}
req.peer = sp req.peer = sp
err := d.chunkStore.Put(ctx, storage.NewChunk(req.Addr, req.SData)) err := d.chunkStore.Put(ctx, storage.NewChunk(req.Addr, req.SData))
@ -233,7 +236,9 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch
return nil return nil
} }
// RequestFromPeers sends a chunk retrieve request to // RequestFromPeers sends a chunk retrieve request to a peer
// The most eligible peer that hasn't already been sent to is chosen
// TODO: define "eligible"
func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (*enode.ID, chan struct{}, error) { func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (*enode.ID, chan struct{}, error) {
requestFromPeersCount.Inc(1) requestFromPeersCount.Inc(1)
var sp *Peer var sp *Peer
@ -268,11 +273,15 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (
} }
} }
// setting this value in the context creates a new span that can persist across the sendpriority queue and the network roundtrip
// this span will finish only when delivery is handled (or times out)
ctx = context.WithValue(ctx, tracing.StoreLabelId, "stream.send.request")
ctx = context.WithValue(ctx, tracing.StoreLabelMeta, fmt.Sprintf("%v.%v", sp.ID(), req.Addr))
err := sp.SendPriority(ctx, &RetrieveRequestMsg{ err := sp.SendPriority(ctx, &RetrieveRequestMsg{
Addr: req.Addr, Addr: req.Addr,
SkipCheck: req.SkipCheck, SkipCheck: req.SkipCheck,
HopCount: req.HopCount, HopCount: req.HopCount,
}, Top, "request.from.peers") }, Top)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }

View File

@ -300,7 +300,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
return return
} }
log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To) log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To)
err := p.SendPriority(ctx, msg, c.priority, "") err := p.SendPriority(ctx, msg, c.priority)
if err != nil { if err != nil {
log.Warn("SendPriority error", "err", err) log.Warn("SendPriority error", "err", err)
} }

View File

@ -31,6 +31,7 @@ import (
"github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/storage"
"github.com/ethereum/go-ethereum/swarm/tracing"
opentracing "github.com/opentracing/opentracing-go" opentracing "github.com/opentracing/opentracing-go"
) )
@ -65,7 +66,6 @@ type Peer struct {
// on creating a new client in offered hashes handler. // on creating a new client in offered hashes handler.
clientParams map[Stream]*clientParams clientParams map[Stream]*clientParams
quit chan struct{} quit chan struct{}
spans sync.Map
} }
type WrappedPriorityMsg struct { type WrappedPriorityMsg struct {
@ -83,16 +83,10 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
clients: make(map[Stream]*client), clients: make(map[Stream]*client),
clientParams: make(map[Stream]*clientParams), clientParams: make(map[Stream]*clientParams),
quit: make(chan struct{}), quit: make(chan struct{}),
spans: sync.Map{},
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
go p.pq.Run(ctx, func(i interface{}) { go p.pq.Run(ctx, func(i interface{}) {
wmsg := i.(WrappedPriorityMsg) wmsg := i.(WrappedPriorityMsg)
defer p.spans.Delete(wmsg.Context)
sp, ok := p.spans.Load(wmsg.Context)
if ok {
defer sp.(opentracing.Span).Finish()
}
err := p.Send(wmsg.Context, wmsg.Msg) err := p.Send(wmsg.Context, wmsg.Msg)
if err != nil { if err != nil {
log.Error("Message send error, dropping peer", "peer", p.ID(), "err", err) log.Error("Message send error, dropping peer", "peer", p.ID(), "err", err)
@ -129,6 +123,7 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
go func() { go func() {
<-p.quit <-p.quit
cancel() cancel()
}() }()
return p return p
@ -158,21 +153,15 @@ func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8,
spanName += ".retrieval" spanName += ".retrieval"
} }
return p.SendPriority(ctx, msg, priority, spanName) ctx = context.WithValue(ctx, "stream_send_tag", nil)
return p.SendPriority(ctx, msg, priority)
} }
// SendPriority sends message to the peer using the outgoing priority queue // SendPriority sends message to the peer using the outgoing priority queue
func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8, traceId string) error { func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error {
defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("peer.sendpriority_t.%d", priority), nil).UpdateSince(time.Now()) defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("peer.sendpriority_t.%d", priority), nil).UpdateSince(time.Now())
tracing.StartSaveSpan(ctx)
metrics.GetOrRegisterCounter(fmt.Sprintf("peer.sendpriority.%d", priority), nil).Inc(1) metrics.GetOrRegisterCounter(fmt.Sprintf("peer.sendpriority.%d", priority), nil).Inc(1)
if traceId != "" {
var sp opentracing.Span
ctx, sp = spancontext.StartSpan(
ctx,
traceId,
)
p.spans.Store(ctx, sp)
}
wmsg := WrappedPriorityMsg{ wmsg := WrappedPriorityMsg{
Context: ctx, Context: ctx,
Msg: msg, Msg: msg,
@ -190,7 +179,8 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error {
var sp opentracing.Span var sp opentracing.Span
ctx, sp := spancontext.StartSpan( ctx, sp := spancontext.StartSpan(
context.TODO(), context.TODO(),
"send.offered.hashes") "send.offered.hashes",
)
defer sp.Finish() defer sp.Finish()
hashes, from, to, proof, err := s.setNextBatch(f, t) hashes, from, to, proof, err := s.setNextBatch(f, t)
@ -215,7 +205,8 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error {
Stream: s.stream, Stream: s.stream,
} }
log.Trace("Swarm syncer offer batch", "peer", p.ID(), "stream", s.stream, "len", len(hashes), "from", from, "to", to) log.Trace("Swarm syncer offer batch", "peer", p.ID(), "stream", s.stream, "len", len(hashes), "from", from, "to", to)
return p.SendPriority(ctx, msg, s.priority, "send.offered.hashes") ctx = context.WithValue(ctx, "stream_send_tag", "send.offered.hashes")
return p.SendPriority(ctx, msg, s.priority)
} }
func (p *Peer) getServer(s Stream) (*server, error) { func (p *Peer) getServer(s Stream) (*server, error) {

View File

@ -381,7 +381,7 @@ func (r *Registry) Subscribe(peerId enode.ID, s Stream, h *Range, priority uint8
} }
log.Debug("Subscribe ", "peer", peerId, "stream", s, "history", h) log.Debug("Subscribe ", "peer", peerId, "stream", s, "history", h)
return peer.SendPriority(context.TODO(), msg, priority, "") return peer.SendPriority(context.TODO(), msg, priority)
} }
func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error { func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error {
@ -757,7 +757,7 @@ func (c *client) batchDone(p *Peer, req *OfferedHashesMsg, hashes []byte) error
return err return err
} }
if err := p.SendPriority(context.TODO(), tp, c.priority, ""); err != nil { if err := p.SendPriority(context.TODO(), tp, c.priority); err != nil {
return err return err
} }
if c.to > 0 && tp.Takeover.End >= c.to { if c.to > 0 && tp.Takeover.End >= c.to {

View File

@ -26,6 +26,9 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/opentracing/opentracing-go"
lru "github.com/hashicorp/golang-lru" lru "github.com/hashicorp/golang-lru"
) )
@ -208,7 +211,11 @@ func (n *NetStore) getOrCreateFetcher(ctx context.Context, ref Address) *fetcher
// the peers which requested the chunk should not be requested to deliver it. // the peers which requested the chunk should not be requested to deliver it.
peers := &sync.Map{} peers := &sync.Map{}
fetcher := newFetcher(ref, n.NewNetFetcherFunc(cctx, ref, peers), destroy, peers, n.closeC) cctx, sp := spancontext.StartSpan(
cctx,
"netstore.fetcher",
)
fetcher := newFetcher(sp, ref, n.NewNetFetcherFunc(cctx, ref, peers), destroy, peers, n.closeC)
n.fetchers.Add(key, fetcher) n.fetchers.Add(key, fetcher)
return fetcher return fetcher
@ -242,6 +249,7 @@ type fetcher struct {
peers *sync.Map // the peers which asked for the chunk peers *sync.Map // the peers which asked for the chunk
requestCnt int32 // number of requests on this chunk. If all the requests are done (delivered or context is done) the cancel function is called requestCnt int32 // number of requests on this chunk. If all the requests are done (delivered or context is done) the cancel function is called
deliverOnce *sync.Once // guarantees that we only close deliveredC once deliverOnce *sync.Once // guarantees that we only close deliveredC once
span opentracing.Span // measure retrieve time per chunk
} }
// newFetcher creates a new fetcher object for the fiven addr. fetch is the function which actually // newFetcher creates a new fetcher object for the fiven addr. fetch is the function which actually
@ -250,7 +258,7 @@ type fetcher struct {
// 1. when the chunk has been fetched all peers have been either notified or their context has been done // 1. when the chunk has been fetched all peers have been either notified or their context has been done
// 2. the chunk has not been fetched but all context from all the requests has been done // 2. the chunk has not been fetched but all context from all the requests has been done
// The peers map stores all the peers which have requested chunk. // The peers map stores all the peers which have requested chunk.
func newFetcher(addr Address, nf NetFetcher, cancel func(), peers *sync.Map, closeC chan struct{}) *fetcher { func newFetcher(span opentracing.Span, addr Address, nf NetFetcher, cancel func(), peers *sync.Map, closeC chan struct{}) *fetcher {
cancelOnce := &sync.Once{} // cancel should only be called once cancelOnce := &sync.Once{} // cancel should only be called once
return &fetcher{ return &fetcher{
addr: addr, addr: addr,
@ -264,6 +272,7 @@ func newFetcher(addr Address, nf NetFetcher, cancel func(), peers *sync.Map, clo
}) })
}, },
peers: peers, peers: peers,
span: span,
} }
} }
@ -276,6 +285,7 @@ func (f *fetcher) Fetch(rctx context.Context) (Chunk, error) {
if atomic.AddInt32(&f.requestCnt, -1) == 0 { if atomic.AddInt32(&f.requestCnt, -1) == 0 {
f.cancel() f.cancel()
} }
f.span.Finish()
}() }()
// The peer asking for the chunk. Store in the shared peers map, but delete after the request // The peer asking for the chunk. Store in the shared peers map, but delete after the request

View File

@ -426,6 +426,7 @@ func (s *Swarm) Start(srv *p2p.Server) error {
func (s *Swarm) Stop() error { func (s *Swarm) Stop() error {
if s.tracerClose != nil { if s.tracerClose != nil {
err := s.tracerClose.Close() err := s.tracerClose.Close()
tracing.FinishSpans()
if err != nil { if err != nil {
return err return err
} }

View File

@ -1,21 +1,39 @@
package tracing package tracing
import ( import (
"context"
"io" "io"
"os" "os"
"strings" "strings"
"sync"
"time" "time"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/swarm/spancontext"
opentracing "github.com/opentracing/opentracing-go"
jaeger "github.com/uber/jaeger-client-go" jaeger "github.com/uber/jaeger-client-go"
jaegercfg "github.com/uber/jaeger-client-go/config" jaegercfg "github.com/uber/jaeger-client-go/config"
cli "gopkg.in/urfave/cli.v1" cli "gopkg.in/urfave/cli.v1"
) )
var Enabled bool = false var (
// Enabled turns tracing on for the current swarm instance
Enabled bool = false
store = spanStore{}
)
const (
// TracingEnabledFlag is the CLI flag name to use to enable trace collections. // TracingEnabledFlag is the CLI flag name to use to enable trace collections.
const TracingEnabledFlag = "tracing" TracingEnabledFlag = "tracing"
// StoreLabelId is the context value key of the name of the span to be saved
StoreLabelId = "span_save_id"
// StoreLabelMeta is the context value key that together with StoreLabelId constitutes the retrieval key for saved spans in the span store
// StartSaveSpan and ShiftSpanByKey
StoreLabelMeta = "span_save_meta"
)
var ( var (
Closer io.Closer Closer io.Closer
@ -100,3 +118,56 @@ func initTracer(endpoint, svc string) (closer io.Closer) {
return closer return closer
} }
// spanStore holds saved spans
type spanStore struct {
spans sync.Map
}
// StartSaveSpan stores the span specified in the passed context for later retrieval
// The span object but be context value on the key StoreLabelId.
// It will be stored under the the following string key context.Value(StoreLabelId)|.|context.Value(StoreLabelMeta)
func StartSaveSpan(ctx context.Context) context.Context {
if !Enabled {
return ctx
}
traceId := ctx.Value(StoreLabelId)
if traceId != nil {
traceStr := traceId.(string)
var sp opentracing.Span
ctx, sp = spancontext.StartSpan(
ctx,
traceStr,
)
traceMeta := ctx.Value(StoreLabelMeta)
if traceMeta != nil {
traceStr = traceStr + "." + traceMeta.(string)
}
store.spans.Store(traceStr, sp)
}
return ctx
}
// ShiftSpanByKey retrieves the span stored under the key of the string given as argument
// The span is then deleted from the store
func ShiftSpanByKey(k string) opentracing.Span {
if !Enabled {
return nil
}
span, spanOk := store.spans.Load(k)
if !spanOk {
return nil
}
store.spans.Delete(k)
return span.(opentracing.Span)
}
// FinishSpans calls `Finish()` on all stored spans
// It should be called on instance shutdown
func FinishSpans() {
store.spans.Range(func(_, v interface{}) bool {
v.(opentracing.Span).Finish()
return true
})
}