diff --git a/connection.go b/connection.go index 85aaabdc..a019ff20 100644 --- a/connection.go +++ b/connection.go @@ -76,6 +76,10 @@ type connection struct { Choked bool requests map[request]struct{} requestsLowWater int + // Chunks that we might reasonably expect to receive from the peer. Due to + // latency, buffering, and implementation differences, we may receive + // chunks that are no longer in the set of requests actually want. + validReceiveChunks map[request]struct{} // Indexed by metadata piece, set to true if posted and pending a // response. metadataRequests []bool @@ -475,9 +479,6 @@ type messageWriter func(pp.Message) bool // Proxies the messageWriter's response. func (cn *connection) request(r request, mw messageWriter) bool { - if cn.requests == nil { - cn.requests = make(map[request]struct{}, cn.nominalMaxRequests()) - } if _, ok := cn.requests[r]; ok { panic("chunk already requested") } @@ -497,7 +498,14 @@ func (cn *connection) request(r request, mw messageWriter) bool { panic("requesting while choked and not allowed fast") } } + if cn.requests == nil { + cn.requests = make(map[request]struct{}) + } cn.requests[r] = struct{}{} + if cn.validReceiveChunks == nil { + cn.validReceiveChunks = make(map[request]struct{}) + } + cn.validReceiveChunks[r] = struct{}{} cn.t.pendingRequests[r]++ cn.t.lastRequested[r] = time.Now() cn.updateExpectingChunks() @@ -1099,6 +1107,7 @@ func (c *connection) mainReadLoop() (err error) { c.updateExpectingChunks() case pp.Reject: c.deleteRequest(newRequestFromMessage(&msg)) + delete(c.validReceiveChunks, newRequestFromMessage(&msg)) case pp.Unchoke: c.PeerChoked = false c.tickleWriter() @@ -1126,7 +1135,7 @@ func (c *connection) mainReadLoop() (err error) { case pp.HaveNone: err = c.peerSentHaveNone() case pp.Piece: - c.receiveChunk(&msg) + err = c.receiveChunk(&msg) if len(msg.Piece) == int(t.chunkSize) { t.chunkPool.Put(&msg.Piece) } @@ -1271,13 +1280,18 @@ func (cn *connection) rw() io.ReadWriter { } // Handle a received chunk from a peer. -func (c *connection) receiveChunk(msg *pp.Message) { +func (c *connection) receiveChunk(msg *pp.Message) error { t := c.t cl := t.cl torrent.Add("chunks received", 1) req := newRequestFromMessage(msg) + if _, ok := c.validReceiveChunks[req]; !ok { + return errors.New("received unexpected chunk") + } + delete(c.validReceiveChunks, req) + // Request has been satisfied. if c.deleteRequest(req) { if c.expectingChunks() { @@ -1299,7 +1313,7 @@ func (c *connection) receiveChunk(msg *pp.Message) { if t.haveChunk(req) { torrent.Add("chunks received unwanted", 1) c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUnwanted })) - return + return nil } index := int(req.Index) @@ -1343,7 +1357,7 @@ func (c *connection) receiveChunk(msg *pp.Message) { log.Printf("%s (%s): error writing chunk %v: %s", t, t.infoHash, req, err) t.pendRequest(req) t.updatePieceCompletion(int(msg.Index)) - return + return nil } // It's important that the piece is potentially queued before we check if @@ -1357,6 +1371,8 @@ func (c *connection) receiveChunk(msg *pp.Message) { cl.event.Broadcast() t.publishPieceChange(int(req.Index)) + + return nil } func (c *connection) onDirtiedPiece(piece int) {