2
0
mirror of synced 2025-02-23 14:18:13 +00:00

Drop connections that send chunks we shouldn't receive

This commit is contained in:
Matt Joiner 2018-06-24 20:35:46 +10:00
parent 195695042d
commit 0330b87288

View File

@ -76,6 +76,10 @@ type connection struct {
Choked bool
requests map[request]struct{}
requestsLowWater int
// Chunks that we might reasonably expect to receive from the peer. Due to
// latency, buffering, and implementation differences, we may receive
// chunks that are no longer in the set of requests actually want.
validReceiveChunks map[request]struct{}
// Indexed by metadata piece, set to true if posted and pending a
// response.
metadataRequests []bool
@ -475,9 +479,6 @@ type messageWriter func(pp.Message) bool
// Proxies the messageWriter's response.
func (cn *connection) request(r request, mw messageWriter) bool {
if cn.requests == nil {
cn.requests = make(map[request]struct{}, cn.nominalMaxRequests())
}
if _, ok := cn.requests[r]; ok {
panic("chunk already requested")
}
@ -497,7 +498,14 @@ func (cn *connection) request(r request, mw messageWriter) bool {
panic("requesting while choked and not allowed fast")
}
}
if cn.requests == nil {
cn.requests = make(map[request]struct{})
}
cn.requests[r] = struct{}{}
if cn.validReceiveChunks == nil {
cn.validReceiveChunks = make(map[request]struct{})
}
cn.validReceiveChunks[r] = struct{}{}
cn.t.pendingRequests[r]++
cn.t.lastRequested[r] = time.Now()
cn.updateExpectingChunks()
@ -1099,6 +1107,7 @@ func (c *connection) mainReadLoop() (err error) {
c.updateExpectingChunks()
case pp.Reject:
c.deleteRequest(newRequestFromMessage(&msg))
delete(c.validReceiveChunks, newRequestFromMessage(&msg))
case pp.Unchoke:
c.PeerChoked = false
c.tickleWriter()
@ -1126,7 +1135,7 @@ func (c *connection) mainReadLoop() (err error) {
case pp.HaveNone:
err = c.peerSentHaveNone()
case pp.Piece:
c.receiveChunk(&msg)
err = c.receiveChunk(&msg)
if len(msg.Piece) == int(t.chunkSize) {
t.chunkPool.Put(&msg.Piece)
}
@ -1271,13 +1280,18 @@ func (cn *connection) rw() io.ReadWriter {
}
// Handle a received chunk from a peer.
func (c *connection) receiveChunk(msg *pp.Message) {
func (c *connection) receiveChunk(msg *pp.Message) error {
t := c.t
cl := t.cl
torrent.Add("chunks received", 1)
req := newRequestFromMessage(msg)
if _, ok := c.validReceiveChunks[req]; !ok {
return errors.New("received unexpected chunk")
}
delete(c.validReceiveChunks, req)
// Request has been satisfied.
if c.deleteRequest(req) {
if c.expectingChunks() {
@ -1299,7 +1313,7 @@ func (c *connection) receiveChunk(msg *pp.Message) {
if t.haveChunk(req) {
torrent.Add("chunks received unwanted", 1)
c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUnwanted }))
return
return nil
}
index := int(req.Index)
@ -1343,7 +1357,7 @@ func (c *connection) receiveChunk(msg *pp.Message) {
log.Printf("%s (%s): error writing chunk %v: %s", t, t.infoHash, req, err)
t.pendRequest(req)
t.updatePieceCompletion(int(msg.Index))
return
return nil
}
// It's important that the piece is potentially queued before we check if
@ -1357,6 +1371,8 @@ func (c *connection) receiveChunk(msg *pp.Message) {
cl.event.Broadcast()
t.publishPieceChange(int(req.Index))
return nil
}
func (c *connection) onDirtiedPiece(piece int) {