diff --git a/p2p/protocol/circuitv1/relay/relay.go b/p2p/protocol/circuitv1/relay/relay.go index 4f29a582..71973399 100644 --- a/p2p/protocol/circuitv1/relay/relay.go +++ b/p2p/protocol/circuitv1/relay/relay.go @@ -29,6 +29,9 @@ const ( ConnectTimeout = 30 * time.Second HandshakeTimeout = time.Minute + RelayHopTag = "relay-hop" + MaxRelayHopTag = 5 + maxMessageSize = 4096 ) @@ -158,15 +161,15 @@ func (r *Relay) handleHopStream(s network.Stream, msg *pb.CircuitRelay) { } r.active++ - r.conns[src.ID]++ - r.conns[dest.ID]++ + r.addConn(src.ID) + r.addConn(src.ID) r.mx.Unlock() cleanup := func() { r.mx.Lock() r.active-- - r.conns[src.ID]-- - r.conns[dest.ID]-- + r.rmConn(src.ID) + r.rmConn(dest.ID) r.mx.Unlock() } @@ -262,6 +265,29 @@ func (r *Relay) handleHopStream(s network.Stream, msg *pb.CircuitRelay) { go r.relayConn(bs, s, dest.ID, src.ID, done) } +func (r *Relay) addConn(p peer.ID) { + conns := r.conns[p] + conns++ + r.conns[p] = conns + if conns < MaxRelayHopTag { + r.host.ConnManager().UpsertTag(p, RelayHopTag, func(v int) int { return v + 1 }) + } +} + +func (r *Relay) rmConn(p peer.ID) { + conns := r.conns[p] + conns-- + if conns > 0 { + r.conns[p] = conns + } else { + delete(r.conns, p) + } + if conns < MaxRelayHopTag { + r.host.ConnManager().UpsertTag(p, RelayHopTag, func(v int) int { return v - 1 }) + } + +} + func (r *Relay) relayConn(src, dest network.Stream, srcID, destID peer.ID, done func()) { defer done() diff --git a/p2p/protocol/circuitv2/relay/relay.go b/p2p/protocol/circuitv2/relay/relay.go index ecd6bfc5..4a087285 100644 --- a/p2p/protocol/circuitv2/relay/relay.go +++ b/p2p/protocol/circuitv2/relay/relay.go @@ -30,6 +30,9 @@ const ( ConnectTimeout = 30 * time.Second HandshakeTimeout = time.Minute + RelayHopTag = "relay-hop" + MaxRelayHopTag = 5 + maxMessageSize = 4096 ) @@ -215,23 +218,23 @@ func (r *Relay) handleConnect(s network.Stream, msg *pbv2.HopMessage) { r.handleError(s, pbv2.Status_RESOURCE_LIMIT_EXCEEDED) return } - r.conns[src]++ destConns := r.conns[dest.ID] if destConns >= r.rc.MaxCircuits { - r.conns[src]-- r.mx.Unlock() log.Debugf("refusing connection from %s to %s; too many connecitons to %s", src, dest.ID, dest.ID) r.handleError(s, pbv2.Status_RESOURCE_LIMIT_EXCEEDED) return } - r.conns[dest.ID]++ + + r.addConn(src) + r.addConn(dest.ID) r.mx.Unlock() cleanup := func() { r.mx.Lock() - r.conns[src]-- - r.conns[dest.ID]-- + r.rmConn(src) + r.rmConn(dest.ID) r.mx.Unlock() } @@ -339,6 +342,28 @@ func (r *Relay) handleConnect(s network.Stream, msg *pbv2.HopMessage) { } } +func (r *Relay) addConn(p peer.ID) { + conns := r.conns[p] + conns++ + r.conns[p] = conns + if conns < MaxRelayHopTag { + r.host.ConnManager().UpsertTag(p, RelayHopTag, func(v int) int { return v + 1 }) + } +} + +func (r *Relay) rmConn(p peer.ID) { + conns := r.conns[p] + conns-- + if conns > 0 { + r.conns[p] = conns + } else { + delete(r.conns, p) + } + if conns < MaxRelayHopTag { + r.host.ConnManager().UpsertTag(p, RelayHopTag, func(v int) int { return v - 1 }) + } +} + func (r *Relay) relayLimited(src, dest network.Stream, srcID, destID peer.ID, limit int64, done func()) { defer done()