Read peer request data without Client lock
This commit is contained in:
parent
b020b8c2b6
commit
0d40c4bac2
|
@ -959,8 +959,11 @@ func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
|
|||
M: map[pp.ExtensionName]pp.ExtensionNumber{
|
||||
pp.ExtensionNameMetadata: metadataExtendedId,
|
||||
},
|
||||
V: cl.config.ExtendedHandshakeClientVersion,
|
||||
Reqq: 64, // TODO: Really?
|
||||
V: cl.config.ExtendedHandshakeClientVersion,
|
||||
// If peer requests are buffered on read, this instructs the amount of memory
|
||||
// that might be used to cache pending writes. Assuming 512KiB cached for
|
||||
// sending, for 16KiB chunks.
|
||||
Reqq: 1 << 5,
|
||||
YourIp: pp.CompactIp(addrIpOrNil(conn.RemoteAddr)),
|
||||
Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
|
||||
Port: cl.incomingPeerPort(),
|
||||
|
|
140
peerconn.go
140
peerconn.go
|
@ -38,6 +38,10 @@ const (
|
|||
PeerSourceDirect = "M"
|
||||
)
|
||||
|
||||
type peerRequestState struct {
|
||||
data []byte
|
||||
}
|
||||
|
||||
type peer struct {
|
||||
// First to ensure 64-bit alignment for atomics. See #262.
|
||||
_stats ConnStats
|
||||
|
@ -88,7 +92,7 @@ type peer struct {
|
|||
// Stuff controlled by the remote peer.
|
||||
peerInterested bool
|
||||
peerChoking bool
|
||||
peerRequests map[request]struct{}
|
||||
peerRequests map[request]*peerRequestState
|
||||
PeerPrefersEncryption bool // as indicated by 'e' field in extension handshake
|
||||
PeerListenPort int
|
||||
// The pieces the peer has claimed to have.
|
||||
|
@ -365,17 +369,23 @@ func (cn *peer) peerHasPiece(piece pieceIndex) bool {
|
|||
return cn.peerSentHaveAll || cn._peerPieces.Contains(bitmap.BitIndex(piece))
|
||||
}
|
||||
|
||||
// Writes a message into the write buffer.
|
||||
func (cn *PeerConn) post(msg pp.Message) {
|
||||
// 64KiB, but temporarily less to work around an issue with WebRTC. TODO: Update when
|
||||
// https://github.com/pion/datachannel/issues/59 is fixed.
|
||||
const writeBufferHighWaterLen = 1 << 15
|
||||
|
||||
// Writes a message into the write buffer. Returns whether it's okay to keep writing. Posting is
|
||||
// done asynchronously, so it may be that we're not able to honour backpressure from this method. It
|
||||
// might be possible to merge this with PeerConn.write down the track? They seem to be very similar.
|
||||
func (cn *PeerConn) post(msg pp.Message) bool {
|
||||
torrent.Add(fmt.Sprintf("messages posted of type %s", msg.Type.String()), 1)
|
||||
// We don't need to track bytes here because a connection.w Writer wrapper
|
||||
// takes care of that (although there's some delay between us recording
|
||||
// the message, and the connection writer flushing it out.).
|
||||
// We don't need to track bytes here because a connection.w Writer wrapper takes care of that
|
||||
// (although there's some delay between us recording the message, and the connection writer
|
||||
// flushing it out.).
|
||||
cn.writeBuffer.Write(msg.MustMarshalBinary())
|
||||
// Last I checked only Piece messages affect stats, and we don't post
|
||||
// those.
|
||||
// Last I checked only Piece messages affect stats, and we don't post those.
|
||||
cn.wroteMsg(&msg)
|
||||
cn.tickleWriter()
|
||||
return cn.writeBuffer.Len() < writeBufferHighWaterLen
|
||||
}
|
||||
|
||||
// Returns true if there's room to write more.
|
||||
|
@ -383,9 +393,7 @@ func (cn *PeerConn) write(msg pp.Message) bool {
|
|||
cn.wroteMsg(&msg)
|
||||
cn.writeBuffer.Write(msg.MustMarshalBinary())
|
||||
torrent.Add(fmt.Sprintf("messages filled of type %s", msg.Type.String()), 1)
|
||||
// 64KiB, but temporarily less to work around an issue with WebRTC. TODO: Update
|
||||
// when https://github.com/pion/datachannel/issues/59 is fixed.
|
||||
return cn.writeBuffer.Len() < 1<<15
|
||||
return cn.writeBuffer.Len() < writeBufferHighWaterLen
|
||||
}
|
||||
|
||||
func (cn *PeerConn) requestMetadataPiece(index int) {
|
||||
|
@ -1032,13 +1040,68 @@ func (c *PeerConn) onReadRequest(r request) error {
|
|||
return errors.New("bad request")
|
||||
}
|
||||
if c.peerRequests == nil {
|
||||
c.peerRequests = make(map[request]struct{}, maxRequests)
|
||||
c.peerRequests = make(map[request]*peerRequestState, maxRequests)
|
||||
}
|
||||
c.peerRequests[r] = struct{}{}
|
||||
c.tickleWriter()
|
||||
value := &peerRequestState{}
|
||||
c.peerRequests[r] = value
|
||||
go c.peerRequestDataReader(r, value)
|
||||
//c.tickleWriter()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *PeerConn) peerRequestDataReader(r request, prs *peerRequestState) {
|
||||
b, err := readPeerRequestData(r, c)
|
||||
c.locker().Lock()
|
||||
defer c.locker().Unlock()
|
||||
if err != nil {
|
||||
c.peerRequestDataReadFailed(err, r)
|
||||
} else {
|
||||
if b == nil {
|
||||
panic("data must be non-nil to trigger send")
|
||||
}
|
||||
prs.data = b
|
||||
c.tickleWriter()
|
||||
}
|
||||
}
|
||||
|
||||
// If this is maintained correctly, we might be able to support optional synchronous reading for
|
||||
// chunk sending, the way it used to work.
|
||||
func (c *PeerConn) peerRequestDataReadFailed(err error, r request) {
|
||||
c.logger.WithDefaultLevel(log.Warning).Printf("error reading chunk for peer request %v: %v", r, err)
|
||||
i := pieceIndex(r.Index)
|
||||
if c.t.pieceComplete(i) {
|
||||
// There used to be more code here that just duplicated the following break. Piece
|
||||
// completions are currently cached, so I'm not sure how helpful this update is, except to
|
||||
// pull any completion changes pushed to the storage backend in failed reads that got us
|
||||
// here.
|
||||
c.t.updatePieceCompletion(i)
|
||||
}
|
||||
// If we failed to send a chunk, choke the peer to ensure they flush all their requests. We've
|
||||
// probably dropped a piece from storage, but there's no way to communicate this to the peer. If
|
||||
// they ask for it again, we'll kick them to allow us to send them an updated bitfield on the
|
||||
// next connect. TODO: Support rejecting here too.
|
||||
if c.choking {
|
||||
c.logger.WithDefaultLevel(log.Warning).Printf("already choking peer, requests might not be rejected correctly")
|
||||
}
|
||||
c.choke(c.post)
|
||||
}
|
||||
|
||||
func readPeerRequestData(r request, c *PeerConn) ([]byte, error) {
|
||||
b := make([]byte, r.Length)
|
||||
p := c.t.info.Piece(int(r.Index))
|
||||
n, err := c.t.readAt(b, p.Offset()+int64(r.Begin))
|
||||
if n == len(b) {
|
||||
if err == io.EOF {
|
||||
err = nil
|
||||
}
|
||||
} else {
|
||||
if err == nil {
|
||||
panic("expected error")
|
||||
}
|
||||
}
|
||||
return b, err
|
||||
}
|
||||
|
||||
func runSafeExtraneous(f func()) {
|
||||
if true {
|
||||
go f()
|
||||
|
@ -1422,7 +1485,10 @@ another:
|
|||
if !c.unchoke(msg) {
|
||||
return false
|
||||
}
|
||||
for r := range c.peerRequests {
|
||||
for r, state := range c.peerRequests {
|
||||
if state.data == nil {
|
||||
continue
|
||||
}
|
||||
res := c.t.cl.config.UploadRateLimiter.ReserveN(time.Now(), int(r.Length))
|
||||
if !res.OK() {
|
||||
panic(fmt.Sprintf("upload rate limiter burst size < %d", r.Length))
|
||||
|
@ -1434,27 +1500,7 @@ another:
|
|||
// Hard to say what to return here.
|
||||
return true
|
||||
}
|
||||
more, err := c.sendChunk(r, msg)
|
||||
if err != nil {
|
||||
c.logger.WithDefaultLevel(log.Warning).Printf("sending chunk to peer: %v", err)
|
||||
i := pieceIndex(r.Index)
|
||||
if c.t.pieceComplete(i) {
|
||||
// There used to be more code here that just duplicated the following break.
|
||||
// Piece completions are currently cached, so I'm not sure how helpful this
|
||||
// update is, except to pull any completion changes pushed to the storage
|
||||
// backend in failed reads that got us here.
|
||||
c.t.updatePieceCompletion(i)
|
||||
}
|
||||
// If we failed to send a chunk, choke the peer by breaking out of the loop here to
|
||||
// ensure they flush all their requests. We've probably dropped a piece from
|
||||
// storage, but there's no way to communicate this to the peer. If they ask for it
|
||||
// again, we'll kick them to allow us to send them an updated bitfield on the next
|
||||
// connect.
|
||||
if c.choking {
|
||||
c.logger.WithDefaultLevel(log.Warning).Printf("already choking peer, requests might not be rejected correctly")
|
||||
}
|
||||
break another
|
||||
}
|
||||
more := c.sendChunk(r, msg, state)
|
||||
delete(c.peerRequests, r)
|
||||
if !more {
|
||||
return false
|
||||
|
@ -1533,6 +1579,8 @@ func (c *peer) deleteAllRequests() {
|
|||
// }
|
||||
}
|
||||
|
||||
// This is called when something has changed that should wake the writer, such as putting stuff into
|
||||
// the writeBuffer, or changing some state that the writer can act on.
|
||||
func (c *PeerConn) tickleWriter() {
|
||||
c.writerCond.Broadcast()
|
||||
}
|
||||
|
@ -1549,26 +1597,14 @@ func (c *PeerConn) _postCancel(r request) {
|
|||
c.post(makeCancelMessage(r))
|
||||
}
|
||||
|
||||
func (c *PeerConn) sendChunk(r request, msg func(pp.Message) bool) (more bool, err error) {
|
||||
b := make([]byte, r.Length)
|
||||
p := c.t.info.Piece(int(r.Index))
|
||||
n, err := c.t.readAt(b, p.Offset()+int64(r.Begin))
|
||||
if n != len(b) {
|
||||
if err == nil {
|
||||
panic("expected error")
|
||||
}
|
||||
return
|
||||
} else if err == io.EOF {
|
||||
err = nil
|
||||
}
|
||||
more = msg(pp.Message{
|
||||
func (c *PeerConn) sendChunk(r request, msg func(pp.Message) bool, state *peerRequestState) (more bool) {
|
||||
c.lastChunkSent = time.Now()
|
||||
return msg(pp.Message{
|
||||
Type: pp.Piece,
|
||||
Index: r.Index,
|
||||
Begin: r.Begin,
|
||||
Piece: b,
|
||||
Piece: state.data,
|
||||
})
|
||||
c.lastChunkSent = time.Now()
|
||||
return
|
||||
}
|
||||
|
||||
func (c *PeerConn) setTorrent(t *Torrent) {
|
||||
|
|
Loading…
Reference in New Issue