diff --git a/peerconn.go b/peerconn.go index a2652da9..8933dc77 100644 --- a/peerconn.go +++ b/peerconn.go @@ -81,7 +81,7 @@ type PeerConn struct { // Chunks that we might reasonably expect to receive from the peer. Due to // latency, buffering, and implementation differences, we may receive // chunks that are no longer in the set of requests actually want. - validReceiveChunks map[request]struct{} + validReceiveChunks map[request]int // Indexed by metadata piece, set to true if posted and pending a // response. metadataRequests []bool @@ -501,9 +501,9 @@ func (cn *PeerConn) request(r request, mw messageWriter) bool { } cn.requests[r] = struct{}{} if cn.validReceiveChunks == nil { - cn.validReceiveChunks = make(map[request]struct{}) + cn.validReceiveChunks = make(map[request]int) } - cn.validReceiveChunks[r] = struct{}{} + cn.validReceiveChunks[r]++ cn.t.pendingRequests[r]++ cn.t.requestStrategy.hooks().sentRequest(r) cn.updateExpectingChunks() @@ -987,6 +987,14 @@ func (c *PeerConn) onReadRequest(r request) error { return nil } +func runSafeExtraneous(f func()) { + if true { + go f() + } else { + f() + } +} + // Processes incoming BitTorrent wire-protocol messages. The client lock is held upon entry and // exit. Returning will end the connection. func (c *PeerConn) mainReadLoop() (err error) { @@ -1026,6 +1034,7 @@ func (c *PeerConn) mainReadLoop() (err error) { } messageTypesReceived.Add(msg.Type.String(), 1) if msg.Type.FastExtension() && !c.fastEnabled() { + runSafeExtraneous(func() { torrent.Add("fast messages received when extension is disabled", 1) }) return fmt.Errorf("received fast extension message (type=%v) but extension is disabled", msg.Type) } switch msg.Type { @@ -1090,7 +1099,7 @@ func (c *PeerConn) mainReadLoop() (err error) { err = c.peerSentHaveNone() case pp.Reject: c.deleteRequest(newRequestFromMessage(&msg)) - delete(c.validReceiveChunks, newRequestFromMessage(&msg)) + c.decExpectedChunkReceive(newRequestFromMessage(&msg)) case pp.AllowedFast: torrent.Add("allowed fasts received", 1) log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).SetLevel(log.Debug).Log(c.t.logger) @@ -1107,6 +1116,17 @@ func (c *PeerConn) mainReadLoop() (err error) { } } +func (c *PeerConn) decExpectedChunkReceive(r request) { + count := c.validReceiveChunks[r] + if count == 1 { + delete(c.validReceiveChunks, r) + } else if count > 1 { + c.validReceiveChunks[r] = count - 1 + } else { + panic(r) + } +} + func (c *PeerConn) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err error) { defer func() { // TODO: Should we still do this? @@ -1196,11 +1216,11 @@ func (c *PeerConn) receiveChunk(msg *pp.Message) error { torrent.Add("chunks received while choking", 1) } - if _, ok := c.validReceiveChunks[req]; !ok { + if c.validReceiveChunks[req] <= 0 { torrent.Add("chunks received unexpected", 1) return errors.New("received unexpected chunk") } - delete(c.validReceiveChunks, req) + c.decExpectedChunkReceive(req) if c.peerChoking && c.peerAllowedFast.Get(int(req.Index)) { torrent.Add("chunks received due to allowed fast", 1) diff --git a/peerconn_test.go b/peerconn_test.go index 439f533a..61d74054 100644 --- a/peerconn_test.go +++ b/peerconn_test.go @@ -132,7 +132,7 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) { // The chunk must be written to storage everytime, to ensure the // writeSem is unlocked. t.pieces[0]._dirtyChunks.Clear() - cn.validReceiveChunks = map[request]struct{}{newRequestFromMessage(&msg): {}} + cn.validReceiveChunks = map[request]int{newRequestFromMessage(&msg): 1} cl.unlock() n, err := w.Write(wb) require.NoError(b, err)