2
0
mirror of synced 2025-02-23 22:28:11 +00:00

Misc debug status, pex conn tracking improvements

This commit is contained in:
Matt Joiner 2023-05-01 10:19:24 +10:00
parent 7e65e55c35
commit 1e13625c73
No known key found for this signature in database
GPG Key ID: 6B990B8185E7F782
12 changed files with 175 additions and 31 deletions

View File

@ -686,7 +686,7 @@ func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
} }
} }
// Performs initiator handshakes and returns a connection. Returns nil *connection if no connection // Performs initiator handshakes and returns a connection. Returns nil *PeerConn if no connection
// for valid reasons. // for valid reasons.
func (cl *Client) initiateProtocolHandshakes( func (cl *Client) initiateProtocolHandshakes(
ctx context.Context, ctx context.Context,
@ -730,7 +730,9 @@ func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfus
return nil, errors.New("dial failed") return nil, errors.New("dial failed")
} }
addrIpPort, _ := tryIpPortFromNetAddr(addr) addrIpPort, _ := tryIpPortFromNetAddr(addr)
c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, obfuscatedHeader, newConnectionOpts{ c, err := cl.initiateProtocolHandshakes(
context.Background(), nc, t, obfuscatedHeader,
newConnectionOpts{
outgoing: true, outgoing: true,
remoteAddr: addr, remoteAddr: addr,
// It would be possible to retrieve a public IP from the dialer used here? // It would be possible to retrieve a public IP from the dialer used here?
@ -1510,13 +1512,17 @@ func (cl *Client) newConnection(nc net.Conn, opts newConnectionOpts) (c *PeerCon
} }
} }
c.peerImpl = c c.peerImpl = c
c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c) c.logger = cl.logger.WithDefaultLevel(log.Warning)
c.setRW(connStatsReadWriter{nc, c}) c.setRW(connStatsReadWriter{nc, c})
c.r = &rateLimitedReader{ c.r = &rateLimitedReader{
l: cl.config.DownloadRateLimiter, l: cl.config.DownloadRateLimiter,
r: c.r, r: c.r,
} }
c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", opts.remoteAddr, opts.network, opts.outgoing) c.logger.Levelf(
log.Debug,
"new PeerConn %p [Client %p remoteAddr %v network %v outgoing %t]",
c, cl, opts.remoteAddr, opts.network, opts.outgoing,
)
for _, f := range cl.config.Callbacks.NewPeer { for _, f := range cl.config.Callbacks.NewPeer {
f(&c.Peer) f(&c.Peer)
} }

2
go.mod
View File

@ -12,7 +12,7 @@ require (
github.com/anacrolix/dht/v2 v2.19.2-0.20221121215055-066ad8494444 github.com/anacrolix/dht/v2 v2.19.2-0.20221121215055-066ad8494444
github.com/anacrolix/envpprof v1.2.1 github.com/anacrolix/envpprof v1.2.1
github.com/anacrolix/fuse v0.2.0 github.com/anacrolix/fuse v0.2.0
github.com/anacrolix/generics v0.0.0-20220618083756-f99e35403a60 github.com/anacrolix/generics v0.0.0-20230428105757-683593396d68
github.com/anacrolix/go-libutp v1.2.0 github.com/anacrolix/go-libutp v1.2.0
github.com/anacrolix/log v0.13.2-0.20221123232138-02e2764801c3 github.com/anacrolix/log v0.13.2-0.20221123232138-02e2764801c3
github.com/anacrolix/missinggo v1.3.0 github.com/anacrolix/missinggo v1.3.0

2
go.sum
View File

@ -78,6 +78,8 @@ github.com/anacrolix/fuse v0.2.0 h1:pc+To78kI2d/WUjIyrsdqeJQAesuwpGxlI3h1nAv3Do=
github.com/anacrolix/fuse v0.2.0/go.mod h1:Kfu02xBwnySDpH3N23BmrP3MDfwAQGRLUCj6XyeOvBQ= github.com/anacrolix/fuse v0.2.0/go.mod h1:Kfu02xBwnySDpH3N23BmrP3MDfwAQGRLUCj6XyeOvBQ=
github.com/anacrolix/generics v0.0.0-20220618083756-f99e35403a60 h1:k4/h2B1gGF+PJGyGHxs8nmHHt1pzWXZWBj6jn4OBlRc= github.com/anacrolix/generics v0.0.0-20220618083756-f99e35403a60 h1:k4/h2B1gGF+PJGyGHxs8nmHHt1pzWXZWBj6jn4OBlRc=
github.com/anacrolix/generics v0.0.0-20220618083756-f99e35403a60/go.mod h1:ff2rHB/joTV03aMSSn/AZNnaIpUw0h3njetGsaXcMy8= github.com/anacrolix/generics v0.0.0-20220618083756-f99e35403a60/go.mod h1:ff2rHB/joTV03aMSSn/AZNnaIpUw0h3njetGsaXcMy8=
github.com/anacrolix/generics v0.0.0-20230428105757-683593396d68 h1:fyXlBfnlFzZSFckJ8QLb2lfmWfY++4RiUnae7ZMuv0A=
github.com/anacrolix/generics v0.0.0-20230428105757-683593396d68/go.mod h1:ff2rHB/joTV03aMSSn/AZNnaIpUw0h3njetGsaXcMy8=
github.com/anacrolix/go-libutp v1.2.0 h1:sjxoB+/ARiKUR7IK/6wLWyADIBqGmu1fm0xo+8Yy7u0= github.com/anacrolix/go-libutp v1.2.0 h1:sjxoB+/ARiKUR7IK/6wLWyADIBqGmu1fm0xo+8Yy7u0=
github.com/anacrolix/go-libutp v1.2.0/go.mod h1:RrJ3KcaDcf9Jqp33YL5V/5CBEc6xMc7aJL8wXfuWL50= github.com/anacrolix/go-libutp v1.2.0/go.mod h1:RrJ3KcaDcf9Jqp33YL5V/5CBEc6xMc7aJL8wXfuWL50=
github.com/anacrolix/log v0.3.0/go.mod h1:lWvLTqzAnCWPJA08T2HCstZi0L1y2Wyvm3FJgwU9jwU= github.com/anacrolix/log v0.3.0/go.mod h1:lWvLTqzAnCWPJA08T2HCstZi0L1y2Wyvm3FJgwU9jwU=

44
netip-addrport.go Normal file
View File

@ -0,0 +1,44 @@
package torrent
import (
"fmt"
"net"
"net/netip"
"github.com/anacrolix/dht/v2/krpc"
)
func ipv4AddrPortFromKrpcNodeAddr(na krpc.NodeAddr) (_ netip.AddrPort, err error) {
ip4 := na.IP.To4()
if ip4 == nil {
err = fmt.Errorf("not an ipv4 address: %v", na.IP)
return
}
addr := netip.AddrFrom4([4]byte(ip4))
addrPort := netip.AddrPortFrom(addr, uint16(na.Port))
return addrPort, nil
}
func ipv6AddrPortFromKrpcNodeAddr(na krpc.NodeAddr) (_ netip.AddrPort, err error) {
ip6 := na.IP.To16()
if ip6 == nil {
err = fmt.Errorf("not an ipv4 address: %v", na.IP)
return
}
addr := netip.AddrFrom16([16]byte(ip6))
addrPort := netip.AddrPortFrom(addr, uint16(na.Port))
return addrPort, nil
}
func addrPortFromPeerRemoteAddr(pra PeerRemoteAddr) (netip.AddrPort, error) {
switch v := pra.(type) {
case *net.TCPAddr:
return v.AddrPort(), nil
case *net.UDPAddr:
return v.AddrPort(), nil
case netip.AddrPort:
return v, nil
default:
return netip.ParseAddrPort(pra.String())
}
}

View File

@ -275,7 +275,7 @@ func (cn *Peer) iterContiguousPieceRequests(f func(piece pieceIndex, count int))
next(None[pieceIndex]()) next(None[pieceIndex]())
} }
func (cn *Peer) writeStatus(w io.Writer, t *Torrent) { func (cn *Peer) writeStatus(w io.Writer) {
// \t isn't preserved in <pre> blocks? // \t isn't preserved in <pre> blocks?
if cn.closed.IsSet() { if cn.closed.IsSet() {
fmt.Fprint(w, "CLOSED: ") fmt.Fprint(w, "CLOSED: ")

View File

@ -5,7 +5,10 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"math/bits"
"strconv" "strconv"
"strings"
"unsafe"
"github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/metainfo"
) )
@ -33,8 +36,31 @@ type (
PeerExtensionBits [8]byte PeerExtensionBits [8]byte
) )
var bitTags = []struct {
bit ExtensionBit
tag string
}{
// Ordered by their base protocol type values (PORT, fast.., EXTENDED)
{ExtensionBitDHT, "dht"},
{ExtensionBitFast, "fast"},
{ExtensionBitExtended, "ext"},
}
func (pex PeerExtensionBits) String() string { func (pex PeerExtensionBits) String() string {
return hex.EncodeToString(pex[:]) pexHex := hex.EncodeToString(pex[:])
tags := make([]string, 0, len(bitTags)+1)
for _, bitTag := range bitTags {
if pex.GetBit(bitTag.bit) {
tags = append(tags, bitTag.tag)
pex.SetBit(bitTag.bit, false)
}
}
unknownCount := bits.OnesCount64(*(*uint64)((unsafe.Pointer(unsafe.SliceData(pex[:])))))
if unknownCount != 0 {
tags = append(tags, fmt.Sprintf("%v unknown", unknownCount))
}
return fmt.Sprintf("%v (%s)", pexHex, strings.Join(tags, ", "))
} }
func NewPeerExtensionBytes(bits ...ExtensionBit) (ret PeerExtensionBits) { func NewPeerExtensionBytes(bits ...ExtensionBit) (ret PeerExtensionBits) {

View File

@ -28,6 +28,7 @@ func (m *PexMsg) Message(pexExtendedId ExtensionNumber) Message {
} }
} }
// Unmarshals and returns a PEX message.
func LoadPexMsg(b []byte) (ret PexMsg, err error) { func LoadPexMsg(b []byte) (ret PexMsg, err error) {
err = bencode.Unmarshal(b, &ret) err = bencode.Unmarshal(b, &ret)
return return

View File

@ -62,7 +62,19 @@ type PeerConn struct {
} }
func (cn *PeerConn) peerImplStatusLines() []string { func (cn *PeerConn) peerImplStatusLines() []string {
return []string{fmt.Sprintf("%+-55q %s %s", cn.PeerID, cn.PeerExtensionBytes, cn.connString)} lines := make([]string, 0, 2)
lines = append(
lines,
fmt.Sprintf("%+-55q %v %s", cn.PeerID, cn.PeerExtensionBytes, cn.connString))
if cn.supportsExtension(pp.ExtensionNamePex) {
lines = append(
lines,
fmt.Sprintf(
"pex: %v conns, %v unsent events",
cn.pex.remoteLiveConns,
cn.pex.numPending()))
}
return lines
} }
// Returns true if the connection is over IPv6. // Returns true if the connection is over IPv6.
@ -848,6 +860,7 @@ func (c *PeerConn) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err
c.requestPendingMetadata() c.requestPendingMetadata()
if !t.cl.config.DisablePEX { if !t.cl.config.DisablePEX {
t.pex.Add(c) // we learnt enough now t.pex.Add(c) // we learnt enough now
// This checks the extension is supported internally.
c.pex.Init(c) c.pex.Init(c)
} }
return nil return nil

21
pex.go
View File

@ -145,8 +145,8 @@ func (me *pexMsgFactory) append(event pexEvent) {
} }
} }
func (me *pexMsgFactory) PexMsg() pp.PexMsg { func (me *pexMsgFactory) PexMsg() *pp.PexMsg {
return me.msg return &me.msg
} }
// Convert an arbitrary torrent peer Addr into one that can be represented by the compact addr // Convert an arbitrary torrent peer Addr into one that can be represented by the compact addr
@ -225,7 +225,7 @@ func (s *pexState) Genmsg(start *pexEvent) (pp.PexMsg, *pexEvent) {
s.RLock() s.RLock()
defer s.RUnlock() defer s.RUnlock()
if start == nil { if start == nil {
return s.msg0.PexMsg(), s.tail return *s.msg0.PexMsg(), s.tail
} }
var msg pexMsgFactory var msg pexMsgFactory
last := start last := start
@ -236,5 +236,18 @@ func (s *pexState) Genmsg(start *pexEvent) (pp.PexMsg, *pexEvent) {
msg.append(*e) msg.append(*e)
last = e last = e
} }
return msg.PexMsg(), last return *msg.PexMsg(), last
}
// The same as Genmsg but just counts up the distinct events that haven't been sent.
func (s *pexState) numPending(start *pexEvent) (num int) {
s.RLock()
defer s.RUnlock()
if start == nil {
return s.msg0.PexMsg().Len()
}
for e := start.next; e != nil; e = e.next {
num++
}
return
} }

View File

@ -2,8 +2,11 @@ package torrent
import ( import (
"fmt" "fmt"
"net/netip"
"time" "time"
g "github.com/anacrolix/generics"
"github.com/anacrolix/log" "github.com/anacrolix/log"
pp "github.com/anacrolix/torrent/peer_protocol" pp "github.com/anacrolix/torrent/peer_protocol"
@ -26,6 +29,8 @@ type pexConnState struct {
Listed bool Listed bool
info log.Logger info log.Logger
dbg log.Logger dbg log.Logger
// Running record of live connections the remote end of the connection purports to have.
remoteLiveConns map[netip.AddrPort]g.Option[pp.PexPeerFlags]
} }
func (s *pexConnState) IsEnabled() bool { func (s *pexConnState) IsEnabled() bool {
@ -67,6 +72,13 @@ func (s *pexConnState) genmsg() *pp.PexMsg {
return &tx return &tx
} }
func (s *pexConnState) numPending() int {
if s.torrent == nil {
return 0
}
return s.torrent.pex.numPending(s.last)
}
// Share is called from the writer goroutine if when it is woken up with the write buffers empty // Share is called from the writer goroutine if when it is woken up with the write buffers empty
// Returns whether there's more room on the send buffer to write to. // Returns whether there's more room on the send buffer to write to.
func (s *pexConnState) Share(postfn messageWriter) bool { func (s *pexConnState) Share(postfn messageWriter) bool {
@ -86,25 +98,51 @@ func (s *pexConnState) Share(postfn messageWriter) bool {
return true return true
} }
func (s *pexConnState) updateRemoteLiveConns(rx pp.PexMsg) (errs []error) {
for _, dropped := range rx.Dropped {
addrPort, _ := ipv4AddrPortFromKrpcNodeAddr(dropped)
delete(s.remoteLiveConns, addrPort)
}
for _, dropped := range rx.Dropped6 {
addrPort, _ := ipv6AddrPortFromKrpcNodeAddr(dropped)
delete(s.remoteLiveConns, addrPort)
}
for i, added := range rx.Added {
addr := netip.AddrFrom4([4]byte(added.IP.To4()))
addrPort := netip.AddrPortFrom(addr, uint16(added.Port))
flags := g.SliceGet(rx.AddedFlags, i)
g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags)
}
for i, added := range rx.Added6 {
addr := netip.AddrFrom16([16]byte(added.IP.To16()))
addrPort := netip.AddrPortFrom(addr, uint16(added.Port))
flags := g.SliceGet(rx.Added6Flags, i)
g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags)
}
return
}
// Recv is called from the reader goroutine // Recv is called from the reader goroutine
func (s *pexConnState) Recv(payload []byte) error { func (s *pexConnState) Recv(payload []byte) error {
rx, err := pp.LoadPexMsg(payload)
if err != nil {
return fmt.Errorf("unmarshalling pex message: %w", err)
}
s.dbg.Printf("received pex message: %v", rx)
torrent.Add("pex added peers received", int64(len(rx.Added)))
torrent.Add("pex added6 peers received", int64(len(rx.Added6)))
s.updateRemoteLiveConns(rx)
if !s.torrent.wantPeers() { if !s.torrent.wantPeers() {
s.dbg.Printf("peer reserve ok, incoming PEX discarded") s.dbg.Printf("peer reserve ok, incoming PEX discarded")
return nil return nil
} }
// TODO: This should be per conn, not for the whole Torrent.
if time.Now().Before(s.torrent.pex.rest) { if time.Now().Before(s.torrent.pex.rest) {
s.dbg.Printf("in cooldown period, incoming PEX discarded") s.dbg.Printf("in cooldown period, incoming PEX discarded")
return nil return nil
} }
rx, err := pp.LoadPexMsg(payload)
if err != nil {
return fmt.Errorf("error unmarshalling PEX message: %s", err)
}
s.dbg.Print("incoming PEX message: ", rx)
torrent.Add("pex added peers received", int64(len(rx.Added)))
torrent.Add("pex added6 peers received", int64(len(rx.Added6)))
var peers peerInfos var peers peerInfos
peers.AppendFromPex(rx.Added6, rx.Added6Flags) peers.AppendFromPex(rx.Added6, rx.Added6Flags)
peers.AppendFromPex(rx.Added, rx.AddedFlags) peers.AppendFromPex(rx.Added, rx.AddedFlags)

View File

@ -13,7 +13,7 @@ import (
"github.com/anacrolix/torrent" "github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/internal/testutil" "github.com/anacrolix/torrent/internal/testutil"
"github.com/anacrolix/torrent/storage" "github.com/anacrolix/torrent/storage"
"github.com/frankban/quicktest" qt "github.com/frankban/quicktest"
"golang.org/x/time/rate" "golang.org/x/time/rate"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -188,7 +188,7 @@ func TestSeedAfterDownloading(t *testing.T) {
defer wg.Done() defer wg.Done()
r := llg.NewReader() r := llg.NewReader()
defer r.Close() defer r.Close()
quicktest.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), quicktest.IsNil) qt.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), qt.IsNil)
}() }()
done := make(chan struct{}) done := make(chan struct{})
defer close(done) defer close(done)

View File

@ -108,7 +108,7 @@ type Torrent struct {
// them. That encourages us to reconnect to peers that are well known in // them. That encourages us to reconnect to peers that are well known in
// the swarm. // the swarm.
peers prioritizedPeers peers prioritizedPeers
// Whether we want to know to know more peers. // Whether we want to know more peers.
wantPeersEvent missinggo.Event wantPeersEvent missinggo.Event
// An announcer for each tracker URL. // An announcer for each tracker URL.
trackerAnnouncers map[string]torrentTrackerAnnouncer trackerAnnouncers map[string]torrentTrackerAnnouncer
@ -774,7 +774,7 @@ func (t *Torrent) writeStatus(w io.Writer) {
for i, c := range peers { for i, c := range peers {
fmt.Fprintf(w, "%2d. ", i+1) fmt.Fprintf(w, "%2d. ", i+1)
buf.Reset() buf.Reset()
c.writeStatus(&buf, t) c.writeStatus(&buf)
w.Write(bytes.TrimRight( w.Write(bytes.TrimRight(
bytes.ReplaceAll(buf.Bytes(), []byte("\n"), []byte("\n ")), bytes.ReplaceAll(buf.Bytes(), []byte("\n"), []byte("\n ")),
" ")) " "))
@ -1983,6 +1983,7 @@ func (t *Torrent) addPeerConn(c *PeerConn) (err error) {
panic(len(t.conns)) panic(len(t.conns))
} }
t.conns[c] = struct{}{} t.conns[c] = struct{}{}
t.cl.event.Broadcast()
if !t.cl.config.DisablePEX && !c.PeerExtensionBytes.SupportsExtended() { if !t.cl.config.DisablePEX && !c.PeerExtensionBytes.SupportsExtended() {
t.pex.Add(c) // as no further extended handshake expected t.pex.Add(c) // as no further extended handshake expected
} }