merge go-libp2p-circuit here

This commit is contained in:
Marten Seemann 2022-07-01 17:38:26 +02:00
commit 48d4857b21
11 changed files with 2464 additions and 0 deletions

View File

@ -0,0 +1,124 @@
package relay
import (
"fmt"
"net"
"time"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
// HopTagWeight is the connection manager weight for connections carrying relay hop streams
var HopTagWeight = 5
type Conn struct {
stream network.Stream
remote peer.AddrInfo
host host.Host
relay *Relay
}
type NetAddr struct {
Relay string
Remote string
}
func (n *NetAddr) Network() string {
return "libp2p-circuit-relay"
}
func (n *NetAddr) String() string {
return fmt.Sprintf("relay[%s-%s]", n.Remote, n.Relay)
}
func (c *Conn) Close() error {
c.untagHop()
return c.stream.Reset()
}
func (c *Conn) Read(buf []byte) (int, error) {
return c.stream.Read(buf)
}
func (c *Conn) Write(buf []byte) (int, error) {
return c.stream.Write(buf)
}
func (c *Conn) SetDeadline(t time.Time) error {
return c.stream.SetDeadline(t)
}
func (c *Conn) SetReadDeadline(t time.Time) error {
return c.stream.SetReadDeadline(t)
}
func (c *Conn) SetWriteDeadline(t time.Time) error {
return c.stream.SetWriteDeadline(t)
}
func (c *Conn) RemoteAddr() net.Addr {
return &NetAddr{
Relay: c.stream.Conn().RemotePeer().Pretty(),
Remote: c.remote.ID.Pretty(),
}
}
// Increment the underlying relay connection tag by 1, thus increasing its protection from
// connection pruning. This ensures that connections to relays are not accidentally closed,
// by the connection manager, taking with them all the relayed connections (that may themselves
// be protected).
func (c *Conn) tagHop() {
c.relay.mx.Lock()
defer c.relay.mx.Unlock()
p := c.stream.Conn().RemotePeer()
c.relay.hopCount[p]++
if c.relay.hopCount[p] == 1 {
c.host.ConnManager().TagPeer(p, "relay-hop-stream", HopTagWeight)
}
}
// Decrement the underlying relay connection tag by 1; this is performed when we close the
// relayed connection.
func (c *Conn) untagHop() {
c.relay.mx.Lock()
defer c.relay.mx.Unlock()
p := c.stream.Conn().RemotePeer()
c.relay.hopCount[p]--
if c.relay.hopCount[p] == 0 {
c.host.ConnManager().UntagPeer(p, "relay-hop-stream")
delete(c.relay.hopCount, p)
}
}
// TODO: is it okay to cast c.Conn().RemotePeer() into a multiaddr? might be "user input"
func (c *Conn) RemoteMultiaddr() ma.Multiaddr {
// TODO: We should be able to do this directly without converting to/from a string.
relayAddr, err := ma.NewComponent(
ma.ProtocolWithCode(ma.P_P2P).Name,
c.stream.Conn().RemotePeer().Pretty(),
)
if err != nil {
panic(err)
}
return ma.Join(c.stream.Conn().RemoteMultiaddr(), relayAddr, circuitAddr)
}
func (c *Conn) LocalMultiaddr() ma.Multiaddr {
return c.stream.Conn().LocalMultiaddr()
}
func (c *Conn) LocalAddr() net.Addr {
na, err := manet.ToNetAddr(c.stream.Conn().LocalMultiaddr())
if err != nil {
log.Error("failed to convert local multiaddr to net addr:", err)
return nil
}
return na
}

View File

@ -0,0 +1,57 @@
package relay
import (
"context"
"fmt"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/transport"
ma "github.com/multiformats/go-multiaddr"
)
func (d *RelayTransport) Dial(ctx context.Context, a ma.Multiaddr, p peer.ID) (transport.CapableConn, error) {
c, err := d.Relay().Dial(ctx, a, p)
if err != nil {
return nil, err
}
c.tagHop()
scope, _ := network.NullResourceManager.OpenConnection(network.DirOutbound, false)
return d.upgrader.Upgrade(ctx, d, c, network.DirOutbound, p, scope)
}
func (r *Relay) Dial(ctx context.Context, a ma.Multiaddr, p peer.ID) (*Conn, error) {
// split /a/p2p-circuit/b into (/a, /p2p-circuit/b)
relayaddr, destaddr := ma.SplitFunc(a, func(c ma.Component) bool {
return c.Protocol().Code == ma.P_CIRCUIT
})
// If the address contained no /p2p-circuit part, the second part is nil.
if destaddr == nil {
return nil, fmt.Errorf("%s is not a relay address", a)
}
if relayaddr == nil {
return nil, fmt.Errorf(
"can't dial a p2p-circuit without specifying a relay: %s",
a,
)
}
// Strip the /p2p-circuit prefix from the destaddr.
_, destaddr = ma.SplitFirst(destaddr)
dinfo := &peer.AddrInfo{ID: p, Addrs: []ma.Multiaddr{}}
if destaddr != nil {
dinfo.Addrs = append(dinfo.Addrs, destaddr)
}
var rinfo *peer.AddrInfo
rinfo, err := peer.AddrInfoFromP2pAddr(relayaddr)
if err != nil {
return nil, fmt.Errorf("error parsing multiaddr '%s': %s", relayaddr.String(), err)
}
return r.DialPeer(ctx, *rinfo, *dinfo)
}

View File

@ -0,0 +1,61 @@
package relay
import (
"net"
pb "github.com/libp2p/go-libp2p-circuit/pb"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
var _ manet.Listener = (*RelayListener)(nil)
type RelayListener Relay
func (l *RelayListener) Relay() *Relay {
return (*Relay)(l)
}
func (r *Relay) Listener() *RelayListener {
// TODO: Only allow one!
return (*RelayListener)(r)
}
func (l *RelayListener) Accept() (manet.Conn, error) {
for {
select {
case c := <-l.incoming:
err := l.Relay().writeResponse(c.stream, pb.CircuitRelay_SUCCESS)
if err != nil {
log.Debugf("error writing relay response: %s", err.Error())
c.stream.Reset()
continue
}
// TODO: Pretty print.
log.Infof("accepted relay connection: %q", c)
c.tagHop()
return c, nil
case <-l.ctx.Done():
return nil, l.ctx.Err()
}
}
}
func (l *RelayListener) Addr() net.Addr {
return &NetAddr{
Relay: "any",
Remote: "any",
}
}
func (l *RelayListener) Multiaddr() ma.Multiaddr {
return circuitAddr
}
func (l *RelayListener) Close() error {
// TODO: noop?
return nil
}

View File

@ -0,0 +1,11 @@
PB = $(wildcard *.proto)
GO = $(PB:.proto=.pb.go)
all: $(GO)
%.pb.go: %.proto
protoc --proto_path=$(GOPATH)/src:. --gogofast_out=. $<
clean:
rm -f *.pb.go
rm -f *.go

View File

@ -0,0 +1,868 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: relay.proto
package relay_pb
import (
fmt "fmt"
github_com_gogo_protobuf_proto "github.com/gogo/protobuf/proto"
proto "github.com/gogo/protobuf/proto"
io "io"
math "math"
math_bits "math/bits"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
type CircuitRelay_Status int32
const (
CircuitRelay_SUCCESS CircuitRelay_Status = 100
CircuitRelay_HOP_SRC_ADDR_TOO_LONG CircuitRelay_Status = 220
CircuitRelay_HOP_DST_ADDR_TOO_LONG CircuitRelay_Status = 221
CircuitRelay_HOP_SRC_MULTIADDR_INVALID CircuitRelay_Status = 250
CircuitRelay_HOP_DST_MULTIADDR_INVALID CircuitRelay_Status = 251
CircuitRelay_HOP_NO_CONN_TO_DST CircuitRelay_Status = 260
CircuitRelay_HOP_CANT_DIAL_DST CircuitRelay_Status = 261
CircuitRelay_HOP_CANT_OPEN_DST_STREAM CircuitRelay_Status = 262
CircuitRelay_HOP_CANT_SPEAK_RELAY CircuitRelay_Status = 270
CircuitRelay_HOP_CANT_RELAY_TO_SELF CircuitRelay_Status = 280
CircuitRelay_STOP_SRC_ADDR_TOO_LONG CircuitRelay_Status = 320
CircuitRelay_STOP_DST_ADDR_TOO_LONG CircuitRelay_Status = 321
CircuitRelay_STOP_SRC_MULTIADDR_INVALID CircuitRelay_Status = 350
CircuitRelay_STOP_DST_MULTIADDR_INVALID CircuitRelay_Status = 351
CircuitRelay_STOP_RELAY_REFUSED CircuitRelay_Status = 390
CircuitRelay_MALFORMED_MESSAGE CircuitRelay_Status = 400
)
var CircuitRelay_Status_name = map[int32]string{
100: "SUCCESS",
220: "HOP_SRC_ADDR_TOO_LONG",
221: "HOP_DST_ADDR_TOO_LONG",
250: "HOP_SRC_MULTIADDR_INVALID",
251: "HOP_DST_MULTIADDR_INVALID",
260: "HOP_NO_CONN_TO_DST",
261: "HOP_CANT_DIAL_DST",
262: "HOP_CANT_OPEN_DST_STREAM",
270: "HOP_CANT_SPEAK_RELAY",
280: "HOP_CANT_RELAY_TO_SELF",
320: "STOP_SRC_ADDR_TOO_LONG",
321: "STOP_DST_ADDR_TOO_LONG",
350: "STOP_SRC_MULTIADDR_INVALID",
351: "STOP_DST_MULTIADDR_INVALID",
390: "STOP_RELAY_REFUSED",
400: "MALFORMED_MESSAGE",
}
var CircuitRelay_Status_value = map[string]int32{
"SUCCESS": 100,
"HOP_SRC_ADDR_TOO_LONG": 220,
"HOP_DST_ADDR_TOO_LONG": 221,
"HOP_SRC_MULTIADDR_INVALID": 250,
"HOP_DST_MULTIADDR_INVALID": 251,
"HOP_NO_CONN_TO_DST": 260,
"HOP_CANT_DIAL_DST": 261,
"HOP_CANT_OPEN_DST_STREAM": 262,
"HOP_CANT_SPEAK_RELAY": 270,
"HOP_CANT_RELAY_TO_SELF": 280,
"STOP_SRC_ADDR_TOO_LONG": 320,
"STOP_DST_ADDR_TOO_LONG": 321,
"STOP_SRC_MULTIADDR_INVALID": 350,
"STOP_DST_MULTIADDR_INVALID": 351,
"STOP_RELAY_REFUSED": 390,
"MALFORMED_MESSAGE": 400,
}
func (x CircuitRelay_Status) Enum() *CircuitRelay_Status {
p := new(CircuitRelay_Status)
*p = x
return p
}
func (x CircuitRelay_Status) String() string {
return proto.EnumName(CircuitRelay_Status_name, int32(x))
}
func (x *CircuitRelay_Status) UnmarshalJSON(data []byte) error {
value, err := proto.UnmarshalJSONEnum(CircuitRelay_Status_value, data, "CircuitRelay_Status")
if err != nil {
return err
}
*x = CircuitRelay_Status(value)
return nil
}
func (CircuitRelay_Status) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_9f69a7d5a802d584, []int{0, 0}
}
type CircuitRelay_Type int32
const (
CircuitRelay_HOP CircuitRelay_Type = 1
CircuitRelay_STOP CircuitRelay_Type = 2
CircuitRelay_STATUS CircuitRelay_Type = 3
CircuitRelay_CAN_HOP CircuitRelay_Type = 4
)
var CircuitRelay_Type_name = map[int32]string{
1: "HOP",
2: "STOP",
3: "STATUS",
4: "CAN_HOP",
}
var CircuitRelay_Type_value = map[string]int32{
"HOP": 1,
"STOP": 2,
"STATUS": 3,
"CAN_HOP": 4,
}
func (x CircuitRelay_Type) Enum() *CircuitRelay_Type {
p := new(CircuitRelay_Type)
*p = x
return p
}
func (x CircuitRelay_Type) String() string {
return proto.EnumName(CircuitRelay_Type_name, int32(x))
}
func (x *CircuitRelay_Type) UnmarshalJSON(data []byte) error {
value, err := proto.UnmarshalJSONEnum(CircuitRelay_Type_value, data, "CircuitRelay_Type")
if err != nil {
return err
}
*x = CircuitRelay_Type(value)
return nil
}
func (CircuitRelay_Type) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_9f69a7d5a802d584, []int{0, 1}
}
type CircuitRelay struct {
Type *CircuitRelay_Type `protobuf:"varint,1,opt,name=type,enum=relay.pb.CircuitRelay_Type" json:"type,omitempty"`
SrcPeer *CircuitRelay_Peer `protobuf:"bytes,2,opt,name=srcPeer" json:"srcPeer,omitempty"`
DstPeer *CircuitRelay_Peer `protobuf:"bytes,3,opt,name=dstPeer" json:"dstPeer,omitempty"`
Code *CircuitRelay_Status `protobuf:"varint,4,opt,name=code,enum=relay.pb.CircuitRelay_Status" json:"code,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *CircuitRelay) Reset() { *m = CircuitRelay{} }
func (m *CircuitRelay) String() string { return proto.CompactTextString(m) }
func (*CircuitRelay) ProtoMessage() {}
func (*CircuitRelay) Descriptor() ([]byte, []int) {
return fileDescriptor_9f69a7d5a802d584, []int{0}
}
func (m *CircuitRelay) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *CircuitRelay) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_CircuitRelay.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *CircuitRelay) XXX_Merge(src proto.Message) {
xxx_messageInfo_CircuitRelay.Merge(m, src)
}
func (m *CircuitRelay) XXX_Size() int {
return m.Size()
}
func (m *CircuitRelay) XXX_DiscardUnknown() {
xxx_messageInfo_CircuitRelay.DiscardUnknown(m)
}
var xxx_messageInfo_CircuitRelay proto.InternalMessageInfo
func (m *CircuitRelay) GetType() CircuitRelay_Type {
if m != nil && m.Type != nil {
return *m.Type
}
return CircuitRelay_HOP
}
func (m *CircuitRelay) GetSrcPeer() *CircuitRelay_Peer {
if m != nil {
return m.SrcPeer
}
return nil
}
func (m *CircuitRelay) GetDstPeer() *CircuitRelay_Peer {
if m != nil {
return m.DstPeer
}
return nil
}
func (m *CircuitRelay) GetCode() CircuitRelay_Status {
if m != nil && m.Code != nil {
return *m.Code
}
return CircuitRelay_SUCCESS
}
type CircuitRelay_Peer struct {
Id []byte `protobuf:"bytes,1,req,name=id" json:"id,omitempty"`
Addrs [][]byte `protobuf:"bytes,2,rep,name=addrs" json:"addrs,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *CircuitRelay_Peer) Reset() { *m = CircuitRelay_Peer{} }
func (m *CircuitRelay_Peer) String() string { return proto.CompactTextString(m) }
func (*CircuitRelay_Peer) ProtoMessage() {}
func (*CircuitRelay_Peer) Descriptor() ([]byte, []int) {
return fileDescriptor_9f69a7d5a802d584, []int{0, 0}
}
func (m *CircuitRelay_Peer) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *CircuitRelay_Peer) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_CircuitRelay_Peer.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *CircuitRelay_Peer) XXX_Merge(src proto.Message) {
xxx_messageInfo_CircuitRelay_Peer.Merge(m, src)
}
func (m *CircuitRelay_Peer) XXX_Size() int {
return m.Size()
}
func (m *CircuitRelay_Peer) XXX_DiscardUnknown() {
xxx_messageInfo_CircuitRelay_Peer.DiscardUnknown(m)
}
var xxx_messageInfo_CircuitRelay_Peer proto.InternalMessageInfo
func (m *CircuitRelay_Peer) GetId() []byte {
if m != nil {
return m.Id
}
return nil
}
func (m *CircuitRelay_Peer) GetAddrs() [][]byte {
if m != nil {
return m.Addrs
}
return nil
}
func init() {
proto.RegisterEnum("relay.pb.CircuitRelay_Status", CircuitRelay_Status_name, CircuitRelay_Status_value)
proto.RegisterEnum("relay.pb.CircuitRelay_Type", CircuitRelay_Type_name, CircuitRelay_Type_value)
proto.RegisterType((*CircuitRelay)(nil), "relay.pb.CircuitRelay")
proto.RegisterType((*CircuitRelay_Peer)(nil), "relay.pb.CircuitRelay.Peer")
}
func init() { proto.RegisterFile("relay.proto", fileDescriptor_9f69a7d5a802d584) }
var fileDescriptor_9f69a7d5a802d584 = []byte{
// 473 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x92, 0x4f, 0x6f, 0xd3, 0x3e,
0x18, 0xc7, 0x65, 0x27, 0xbf, 0x76, 0x7a, 0x5a, 0x4d, 0xfe, 0x59, 0x63, 0x64, 0x9d, 0x56, 0xaa,
0x9e, 0x7a, 0x40, 0x45, 0x4c, 0xe2, 0x05, 0x98, 0xc4, 0xdd, 0x2a, 0xd2, 0x38, 0xb2, 0x5d, 0x24,
0x4e, 0x56, 0x69, 0x72, 0xa8, 0x84, 0xd4, 0x2a, 0xcd, 0x0e, 0xbd, 0xc3, 0xb8, 0x21, 0x8e, 0xbc,
0x1c, 0xe0, 0xc4, 0x91, 0x17, 0xc0, 0x3f, 0xf5, 0x65, 0xc0, 0x05, 0xd9, 0x5d, 0x33, 0x44, 0x37,
0x89, 0xa3, 0x9f, 0xef, 0xe7, 0xe3, 0x3c, 0xf9, 0x26, 0xd0, 0x28, 0xf2, 0x17, 0x93, 0x55, 0x7f,
0x51, 0xcc, 0xcb, 0x39, 0xdd, 0xbb, 0x3a, 0x3c, 0xef, 0xbe, 0xae, 0x41, 0x33, 0x9c, 0x15, 0xd3,
0x8b, 0x59, 0x29, 0xed, 0x8c, 0x3e, 0x00, 0xbf, 0x5c, 0x2d, 0xf2, 0x00, 0x75, 0x50, 0x6f, 0xff,
0xf4, 0xb8, 0xbf, 0x25, 0xfb, 0x7f, 0x52, 0x7d, 0xbd, 0x5a, 0xe4, 0xd2, 0x81, 0xf4, 0x11, 0xd4,
0x97, 0xc5, 0x34, 0xcd, 0xf3, 0x22, 0xc0, 0x1d, 0xd4, 0x6b, 0xdc, 0xea, 0x58, 0x44, 0x6e, 0x59,
0xab, 0x65, 0xcb, 0xd2, 0x69, 0xde, 0x3f, 0x68, 0x57, 0x2c, 0x7d, 0x08, 0xfe, 0x74, 0x9e, 0xe5,
0x81, 0xef, 0xd6, 0x3b, 0xb9, 0xc5, 0x51, 0xe5, 0xa4, 0xbc, 0x58, 0x4a, 0x87, 0xb6, 0xee, 0x83,
0xef, 0xd4, 0x7d, 0xc0, 0xb3, 0x2c, 0x40, 0x1d, 0xdc, 0x6b, 0x4a, 0x3c, 0xcb, 0xe8, 0x01, 0xfc,
0x37, 0xc9, 0xb2, 0x62, 0x19, 0xe0, 0x8e, 0xd7, 0x6b, 0xca, 0xcd, 0xa1, 0xfb, 0xd1, 0x83, 0xda,
0x46, 0xa7, 0x0d, 0xa8, 0xab, 0x71, 0x18, 0x72, 0xa5, 0x48, 0x46, 0x5b, 0x70, 0xe7, 0x5c, 0xa4,
0x46, 0xc9, 0xd0, 0xb0, 0x28, 0x92, 0x46, 0x0b, 0x61, 0x62, 0x91, 0x9c, 0x91, 0x2f, 0x68, 0x9b,
0x45, 0x4a, 0xff, 0x95, 0x7d, 0x45, 0xb4, 0x0d, 0x47, 0x5b, 0x6f, 0x34, 0x8e, 0xf5, 0xd0, 0x01,
0xc3, 0xe4, 0x29, 0x8b, 0x87, 0x11, 0xf9, 0x59, 0xe5, 0xd6, 0xdd, 0xcd, 0x7f, 0x21, 0x7a, 0x17,
0xa8, 0xcd, 0x13, 0x61, 0x42, 0x91, 0x24, 0x46, 0x0b, 0x8b, 0x92, 0x97, 0x98, 0x1e, 0xc2, 0xff,
0x36, 0x08, 0x59, 0xa2, 0x4d, 0x34, 0x64, 0xb1, 0x9b, 0xbf, 0xc2, 0xf4, 0x04, 0x82, 0x6a, 0x2e,
0x52, 0x9e, 0xb8, 0xab, 0x95, 0x96, 0x9c, 0x8d, 0xc8, 0x25, 0xa6, 0x47, 0x70, 0x50, 0xc5, 0x2a,
0xe5, 0xec, 0x89, 0x91, 0x3c, 0x66, 0xcf, 0xc8, 0x1b, 0x4c, 0x8f, 0xe1, 0xb0, 0x8a, 0xdc, 0xd0,
0x3e, 0x4d, 0xf1, 0x78, 0x40, 0xde, 0xb9, 0x50, 0xe9, 0x1b, 0x0b, 0x78, 0x7f, 0x1d, 0xee, 0x36,
0xf0, 0x01, 0xd3, 0x7b, 0xd0, 0xaa, 0xcc, 0xdd, 0x57, 0xfc, 0x76, 0x0d, 0xdc, 0xdc, 0xc1, 0x77,
0x6c, 0x3b, 0x70, 0xc0, 0x66, 0x29, 0xc9, 0x07, 0x63, 0xc5, 0x23, 0x72, 0xe9, 0xd9, 0x0e, 0x46,
0x2c, 0x1e, 0x08, 0x39, 0xe2, 0x91, 0x19, 0x71, 0xa5, 0xd8, 0x19, 0x27, 0x6f, 0xbd, 0xee, 0x29,
0xf8, 0xf6, 0x0f, 0xa5, 0x75, 0xf0, 0xce, 0x45, 0x4a, 0x10, 0xdd, 0x03, 0xdf, 0xde, 0x40, 0x30,
0x05, 0xa8, 0x29, 0xcd, 0xf4, 0x58, 0x11, 0xcf, 0x7e, 0xe0, 0x90, 0x25, 0xc6, 0x22, 0xfe, 0xe3,
0xe6, 0xa7, 0x75, 0x1b, 0x7d, 0x5e, 0xb7, 0xd1, 0x8f, 0x75, 0x1b, 0xfd, 0x0e, 0x00, 0x00, 0xff,
0xff, 0x6b, 0x22, 0x33, 0xbb, 0x2f, 0x03, 0x00, 0x00,
}
func (m *CircuitRelay) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *CircuitRelay) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *CircuitRelay) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if m.Code != nil {
i = encodeVarintRelay(dAtA, i, uint64(*m.Code))
i--
dAtA[i] = 0x20
}
if m.DstPeer != nil {
{
size, err := m.DstPeer.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintRelay(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x1a
}
if m.SrcPeer != nil {
{
size, err := m.SrcPeer.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintRelay(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x12
}
if m.Type != nil {
i = encodeVarintRelay(dAtA, i, uint64(*m.Type))
i--
dAtA[i] = 0x8
}
return len(dAtA) - i, nil
}
func (m *CircuitRelay_Peer) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *CircuitRelay_Peer) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *CircuitRelay_Peer) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Addrs) > 0 {
for iNdEx := len(m.Addrs) - 1; iNdEx >= 0; iNdEx-- {
i -= len(m.Addrs[iNdEx])
copy(dAtA[i:], m.Addrs[iNdEx])
i = encodeVarintRelay(dAtA, i, uint64(len(m.Addrs[iNdEx])))
i--
dAtA[i] = 0x12
}
}
if m.Id == nil {
return 0, github_com_gogo_protobuf_proto.NewRequiredNotSetError("id")
} else {
i -= len(m.Id)
copy(dAtA[i:], m.Id)
i = encodeVarintRelay(dAtA, i, uint64(len(m.Id)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func encodeVarintRelay(dAtA []byte, offset int, v uint64) int {
offset -= sovRelay(v)
base := offset
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return base
}
func (m *CircuitRelay) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.Type != nil {
n += 1 + sovRelay(uint64(*m.Type))
}
if m.SrcPeer != nil {
l = m.SrcPeer.Size()
n += 1 + l + sovRelay(uint64(l))
}
if m.DstPeer != nil {
l = m.DstPeer.Size()
n += 1 + l + sovRelay(uint64(l))
}
if m.Code != nil {
n += 1 + sovRelay(uint64(*m.Code))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func (m *CircuitRelay_Peer) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.Id != nil {
l = len(m.Id)
n += 1 + l + sovRelay(uint64(l))
}
if len(m.Addrs) > 0 {
for _, b := range m.Addrs {
l = len(b)
n += 1 + l + sovRelay(uint64(l))
}
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func sovRelay(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}
func sozRelay(x uint64) (n int) {
return sovRelay(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *CircuitRelay) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRelay
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: CircuitRelay: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: CircuitRelay: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType)
}
var v CircuitRelay_Type
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRelay
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= CircuitRelay_Type(b&0x7F) << shift
if b < 0x80 {
break
}
}
m.Type = &v
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field SrcPeer", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRelay
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthRelay
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthRelay
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.SrcPeer == nil {
m.SrcPeer = &CircuitRelay_Peer{}
}
if err := m.SrcPeer.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field DstPeer", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRelay
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthRelay
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthRelay
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.DstPeer == nil {
m.DstPeer = &CircuitRelay_Peer{}
}
if err := m.DstPeer.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 4:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Code", wireType)
}
var v CircuitRelay_Status
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRelay
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= CircuitRelay_Status(b&0x7F) << shift
if b < 0x80 {
break
}
}
m.Code = &v
default:
iNdEx = preIndex
skippy, err := skipRelay(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthRelay
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthRelay
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *CircuitRelay_Peer) Unmarshal(dAtA []byte) error {
var hasFields [1]uint64
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRelay
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: Peer: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: Peer: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Id", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRelay
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthRelay
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthRelay
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Id = append(m.Id[:0], dAtA[iNdEx:postIndex]...)
if m.Id == nil {
m.Id = []byte{}
}
iNdEx = postIndex
hasFields[0] |= uint64(0x00000001)
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Addrs", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRelay
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthRelay
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthRelay
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Addrs = append(m.Addrs, make([]byte, postIndex-iNdEx))
copy(m.Addrs[len(m.Addrs)-1], dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipRelay(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthRelay
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthRelay
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if hasFields[0]&uint64(0x00000001) == 0 {
return github_com_gogo_protobuf_proto.NewRequiredNotSetError("id")
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipRelay(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
depth := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowRelay
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowRelay
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
case 1:
iNdEx += 8
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowRelay
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if length < 0 {
return 0, ErrInvalidLengthRelay
}
iNdEx += length
if iNdEx < 0 {
return 0, ErrInvalidLengthRelay
}
case 3:
depth++
case 4:
if depth == 0 {
return 0, ErrUnexpectedEndOfGroupRelay
}
depth--
case 5:
iNdEx += 4
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
if depth == 0 {
return iNdEx, nil
}
}
return 0, io.ErrUnexpectedEOF
}
var (
ErrInvalidLengthRelay = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowRelay = fmt.Errorf("proto: integer overflow")
ErrUnexpectedEndOfGroupRelay = fmt.Errorf("proto: unexpected end of group")
)

View File

@ -0,0 +1,44 @@
syntax = "proto2";
package relay.pb;
message CircuitRelay {
enum Status {
SUCCESS = 100;
HOP_SRC_ADDR_TOO_LONG = 220;
HOP_DST_ADDR_TOO_LONG = 221;
HOP_SRC_MULTIADDR_INVALID = 250;
HOP_DST_MULTIADDR_INVALID = 251;
HOP_NO_CONN_TO_DST = 260;
HOP_CANT_DIAL_DST = 261;
HOP_CANT_OPEN_DST_STREAM = 262;
HOP_CANT_SPEAK_RELAY = 270;
HOP_CANT_RELAY_TO_SELF = 280;
STOP_SRC_ADDR_TOO_LONG = 320;
STOP_DST_ADDR_TOO_LONG = 321;
STOP_SRC_MULTIADDR_INVALID = 350;
STOP_DST_MULTIADDR_INVALID = 351;
STOP_RELAY_REFUSED = 390;
MALFORMED_MESSAGE = 400;
}
enum Type { // RPC identifier, either HOP, STOP or STATUS
HOP = 1;
STOP = 2;
STATUS = 3;
CAN_HOP = 4;
}
message Peer {
required bytes id = 1; // peer id
repeated bytes addrs = 2; // peer's known addresses
}
optional Type type = 1; // Type of the message
optional Peer srcPeer = 2; // srcPeer and dstPeer are used when Type is HOP or STOP
optional Peer dstPeer = 3;
optional Status code = 4; // Status code, used when Type is STATUS
}

View File

@ -0,0 +1,507 @@
package relay
import (
"context"
"fmt"
"io"
"sync"
"sync/atomic"
"time"
pb "github.com/libp2p/go-libp2p-circuit/pb"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/transport"
pool "github.com/libp2p/go-buffer-pool"
logging "github.com/ipfs/go-log/v2"
ma "github.com/multiformats/go-multiaddr"
)
var log = logging.Logger("relay")
const ProtoID = "/libp2p/circuit/relay/0.1.0"
const maxMessageSize = 4096
var (
RelayAcceptTimeout = 10 * time.Second
HopConnectTimeout = 30 * time.Second
StopHandshakeTimeout = 1 * time.Minute
HopStreamBufferSize = 4096
HopStreamLimit = 1 << 19 // 512K hops for 1M goroutines
streamTimeout = 1 * time.Minute
)
// Relay is the relay transport and service.
type Relay struct {
host host.Host
upgrader transport.Upgrader
ctx context.Context
ctxCancel context.CancelFunc
self peer.ID
active bool
hop bool
incoming chan *Conn
// atomic counters
streamCount int32
liveHopCount int32
// per peer hop counters
mx sync.Mutex
hopCount map[peer.ID]int
}
// RelayOpts are options for configuring the relay transport.
type RelayOpt int
var (
// OptActive configures the relay transport to actively establish
// outbound connections on behalf of clients. You probably don't want to
// enable this unless you know what you're doing.
OptActive = RelayOpt(0)
// OptHop configures the relay transport to accept requests to relay
// traffic on behalf of third-parties. Unless OptActive is specified,
// this will only relay traffic between peers already connected to this
// node.
OptHop = RelayOpt(1)
// OptDiscovery is a no-op. It was introduced as a way to probe new
// peers to see if they were willing to act as a relays. However, in
// practice, it's useless. While it does test to see if these peers are
// relays, it doesn't (and can't), check to see if these peers are
// _active_ relays (i.e., will actively dial the target peer).
//
// This option may be re-enabled in the future but for now you shouldn't
// use it.
OptDiscovery = RelayOpt(2)
)
type RelayError struct {
Code pb.CircuitRelay_Status
}
func (e RelayError) Error() string {
return fmt.Sprintf("error opening relay circuit: %s (%d)", pb.CircuitRelay_Status_name[int32(e.Code)], e.Code)
}
// NewRelay constructs a new relay.
func NewRelay(h host.Host, upgrader transport.Upgrader, opts ...RelayOpt) (*Relay, error) {
r := &Relay{
upgrader: upgrader,
host: h,
self: h.ID(),
incoming: make(chan *Conn),
hopCount: make(map[peer.ID]int),
}
r.ctx, r.ctxCancel = context.WithCancel(context.Background())
for _, opt := range opts {
switch opt {
case OptActive:
r.active = true
case OptHop:
r.hop = true
case OptDiscovery:
log.Errorf(
"circuit.OptDiscovery is now a no-op: %s",
"dialing peers with a random relay is no longer supported",
)
default:
return nil, fmt.Errorf("unrecognized option: %d", opt)
}
}
h.SetStreamHandler(ProtoID, r.handleNewStream)
return r, nil
}
// Increment the live hop count and increment the connection manager tags by 1 for the two
// sides of the hop stream. This ensures that connections with many hop streams will be protected
// from pruning, thus minimizing disruption from connection trimming in a relay node.
func (r *Relay) addLiveHop(from, to peer.ID) {
atomic.AddInt32(&r.liveHopCount, 1)
r.host.ConnManager().UpsertTag(from, "relay-hop-stream", incrementTag)
r.host.ConnManager().UpsertTag(to, "relay-hop-stream", incrementTag)
}
// Decrement the live hpo count and decrement the connection manager tags for the two sides
// of the hop stream.
func (r *Relay) rmLiveHop(from, to peer.ID) {
atomic.AddInt32(&r.liveHopCount, -1)
r.host.ConnManager().UpsertTag(from, "relay-hop-stream", decrementTag)
r.host.ConnManager().UpsertTag(to, "relay-hop-stream", decrementTag)
}
func (r *Relay) GetActiveHops() int32 {
return atomic.LoadInt32(&r.liveHopCount)
}
func (r *Relay) DialPeer(ctx context.Context, relay peer.AddrInfo, dest peer.AddrInfo) (*Conn, error) {
log.Debugf("dialing peer %s through relay %s", dest.ID, relay.ID)
if len(relay.Addrs) > 0 {
r.host.Peerstore().AddAddrs(relay.ID, relay.Addrs, peerstore.TempAddrTTL)
}
s, err := r.host.NewStream(ctx, relay.ID, ProtoID)
if err != nil {
return nil, err
}
rd := newDelimitedReader(s, maxMessageSize)
wr := newDelimitedWriter(s)
defer rd.Close()
var msg pb.CircuitRelay
msg.Type = pb.CircuitRelay_HOP.Enum()
msg.SrcPeer = peerInfoToPeer(r.host.Peerstore().PeerInfo(r.self))
msg.DstPeer = peerInfoToPeer(dest)
err = wr.WriteMsg(&msg)
if err != nil {
s.Reset()
return nil, err
}
msg.Reset()
err = rd.ReadMsg(&msg)
if err != nil {
s.Reset()
return nil, err
}
if msg.GetType() != pb.CircuitRelay_STATUS {
s.Reset()
return nil, fmt.Errorf("unexpected relay response; not a status message (%d)", msg.GetType())
}
if msg.GetCode() != pb.CircuitRelay_SUCCESS {
s.Reset()
return nil, RelayError{msg.GetCode()}
}
return &Conn{stream: s, remote: dest, host: r.host, relay: r}, nil
}
func (r *Relay) Matches(addr ma.Multiaddr) bool {
// TODO: Look at the prefix transport as well.
_, err := addr.ValueForProtocol(ma.P_CIRCUIT)
return err == nil
}
// Queries a peer for support of hop relay
func CanHop(ctx context.Context, host host.Host, id peer.ID) (bool, error) {
s, err := host.NewStream(ctx, id, ProtoID)
if err != nil {
return false, err
}
defer s.Close()
rd := newDelimitedReader(s, maxMessageSize)
wr := newDelimitedWriter(s)
defer rd.Close()
var msg pb.CircuitRelay
msg.Type = pb.CircuitRelay_CAN_HOP.Enum()
if err := wr.WriteMsg(&msg); err != nil {
s.Reset()
return false, err
}
msg.Reset()
if err := rd.ReadMsg(&msg); err != nil {
s.Reset()
return false, err
}
if msg.GetType() != pb.CircuitRelay_STATUS {
return false, fmt.Errorf("unexpected relay response; not a status message (%d)", msg.GetType())
}
return msg.GetCode() == pb.CircuitRelay_SUCCESS, nil
}
func (r *Relay) CanHop(ctx context.Context, id peer.ID) (bool, error) {
return CanHop(ctx, r.host, id)
}
func (r *Relay) handleNewStream(s network.Stream) {
s.SetReadDeadline(time.Now().Add(streamTimeout))
log.Infof("new relay stream from: %s", s.Conn().RemotePeer())
rd := newDelimitedReader(s, maxMessageSize)
defer rd.Close()
var msg pb.CircuitRelay
err := rd.ReadMsg(&msg)
if err != nil {
r.handleError(s, pb.CircuitRelay_MALFORMED_MESSAGE)
return
}
// reset stream deadline as message has been read
s.SetReadDeadline(time.Time{})
switch msg.GetType() {
case pb.CircuitRelay_HOP:
r.handleHopStream(s, &msg)
case pb.CircuitRelay_STOP:
r.handleStopStream(s, &msg)
case pb.CircuitRelay_CAN_HOP:
r.handleCanHop(s, &msg)
default:
log.Warnf("unexpected relay handshake: %d", msg.GetType())
r.handleError(s, pb.CircuitRelay_MALFORMED_MESSAGE)
}
}
func (r *Relay) handleHopStream(s network.Stream, msg *pb.CircuitRelay) {
if !r.hop {
r.handleError(s, pb.CircuitRelay_HOP_CANT_SPEAK_RELAY)
return
}
streamCount := atomic.AddInt32(&r.streamCount, 1)
liveHopCount := atomic.LoadInt32(&r.liveHopCount)
defer atomic.AddInt32(&r.streamCount, -1)
if (streamCount + liveHopCount) > int32(HopStreamLimit) {
log.Warn("hop stream limit exceeded; resetting stream")
s.Reset()
return
}
src, err := peerToPeerInfo(msg.GetSrcPeer())
if err != nil {
r.handleError(s, pb.CircuitRelay_HOP_SRC_MULTIADDR_INVALID)
return
}
if src.ID != s.Conn().RemotePeer() {
r.handleError(s, pb.CircuitRelay_HOP_SRC_MULTIADDR_INVALID)
return
}
dst, err := peerToPeerInfo(msg.GetDstPeer())
if err != nil {
r.handleError(s, pb.CircuitRelay_HOP_DST_MULTIADDR_INVALID)
return
}
if dst.ID == r.self {
r.handleError(s, pb.CircuitRelay_HOP_CANT_RELAY_TO_SELF)
return
}
// open stream
ctx, cancel := context.WithTimeout(r.ctx, HopConnectTimeout)
defer cancel()
if !r.active {
ctx = network.WithNoDial(ctx, "relay hop")
} else if len(dst.Addrs) > 0 {
r.host.Peerstore().AddAddrs(dst.ID, dst.Addrs, peerstore.TempAddrTTL)
}
bs, err := r.host.NewStream(ctx, dst.ID, ProtoID)
if err != nil {
log.Debugf("error opening relay stream to %s: %s", dst.ID.Pretty(), err.Error())
if err == network.ErrNoConn {
r.handleError(s, pb.CircuitRelay_HOP_NO_CONN_TO_DST)
} else {
r.handleError(s, pb.CircuitRelay_HOP_CANT_DIAL_DST)
}
return
}
// stop handshake
rd := newDelimitedReader(bs, maxMessageSize)
wr := newDelimitedWriter(bs)
defer rd.Close()
// set handshake deadline
bs.SetDeadline(time.Now().Add(StopHandshakeTimeout))
msg.Type = pb.CircuitRelay_STOP.Enum()
err = wr.WriteMsg(msg)
if err != nil {
log.Debugf("error writing stop handshake: %s", err.Error())
bs.Reset()
r.handleError(s, pb.CircuitRelay_HOP_CANT_OPEN_DST_STREAM)
return
}
msg.Reset()
err = rd.ReadMsg(msg)
if err != nil {
log.Debugf("error reading stop response: %s", err.Error())
bs.Reset()
r.handleError(s, pb.CircuitRelay_HOP_CANT_OPEN_DST_STREAM)
return
}
if msg.GetType() != pb.CircuitRelay_STATUS {
log.Debugf("unexpected relay stop response: not a status message (%d)", msg.GetType())
bs.Reset()
r.handleError(s, pb.CircuitRelay_HOP_CANT_OPEN_DST_STREAM)
return
}
if msg.GetCode() != pb.CircuitRelay_SUCCESS {
log.Debugf("relay stop failure: %d", msg.GetCode())
bs.Reset()
r.handleError(s, msg.GetCode())
return
}
err = r.writeResponse(s, pb.CircuitRelay_SUCCESS)
if err != nil {
log.Debugf("error writing relay response: %s", err.Error())
bs.Reset()
s.Reset()
return
}
// relay connection
log.Infof("relaying connection between %s and %s", src.ID.Pretty(), dst.ID.Pretty())
// reset deadline
bs.SetDeadline(time.Time{})
r.addLiveHop(src.ID, dst.ID)
goroutines := new(int32)
*goroutines = 2
done := func() {
if atomic.AddInt32(goroutines, -1) == 0 {
s.Close()
bs.Close()
r.rmLiveHop(src.ID, dst.ID)
}
}
// Don't reset streams after finishing or the other side will get an
// error, not an EOF.
go func() {
defer done()
buf := pool.Get(HopStreamBufferSize)
defer pool.Put(buf)
count, err := io.CopyBuffer(s, bs, buf)
if err != nil {
log.Debugf("relay copy error: %s", err)
// Reset both.
s.Reset()
bs.Reset()
} else {
// propagate the close
s.CloseWrite()
}
log.Debugf("relayed %d bytes from %s to %s", count, dst.ID.Pretty(), src.ID.Pretty())
}()
go func() {
defer done()
buf := pool.Get(HopStreamBufferSize)
defer pool.Put(buf)
count, err := io.CopyBuffer(bs, s, buf)
if err != nil {
log.Debugf("relay copy error: %s", err)
// Reset both.
bs.Reset()
s.Reset()
} else {
// propagate the close
bs.CloseWrite()
}
log.Debugf("relayed %d bytes from %s to %s", count, src.ID.Pretty(), dst.ID.Pretty())
}()
}
func (r *Relay) handleStopStream(s network.Stream, msg *pb.CircuitRelay) {
src, err := peerToPeerInfo(msg.GetSrcPeer())
if err != nil {
r.handleError(s, pb.CircuitRelay_STOP_SRC_MULTIADDR_INVALID)
return
}
dst, err := peerToPeerInfo(msg.GetDstPeer())
if err != nil || dst.ID != r.self {
r.handleError(s, pb.CircuitRelay_STOP_DST_MULTIADDR_INVALID)
return
}
log.Infof("relay connection from: %s", src.ID)
if len(src.Addrs) > 0 {
r.host.Peerstore().AddAddrs(src.ID, src.Addrs, peerstore.TempAddrTTL)
}
select {
case r.incoming <- &Conn{stream: s, remote: src, host: r.host, relay: r}:
case <-time.After(RelayAcceptTimeout):
r.handleError(s, pb.CircuitRelay_STOP_RELAY_REFUSED)
}
}
func (r *Relay) handleCanHop(s network.Stream, msg *pb.CircuitRelay) {
var err error
if r.hop {
err = r.writeResponse(s, pb.CircuitRelay_SUCCESS)
} else {
err = r.writeResponse(s, pb.CircuitRelay_HOP_CANT_SPEAK_RELAY)
}
if err != nil {
s.Reset()
log.Debugf("error writing relay response: %s", err.Error())
} else {
s.Close()
}
}
func (r *Relay) handleError(s network.Stream, code pb.CircuitRelay_Status) {
log.Warnf("relay error: %s (%d)", pb.CircuitRelay_Status_name[int32(code)], code)
err := r.writeResponse(s, code)
if err != nil {
s.Reset()
log.Debugf("error writing relay response: %s", err.Error())
} else {
s.Close()
}
}
func (r *Relay) writeResponse(s network.Stream, code pb.CircuitRelay_Status) error {
wr := newDelimitedWriter(s)
var msg pb.CircuitRelay
msg.Type = pb.CircuitRelay_STATUS.Enum()
msg.Code = code.Enum()
return wr.WriteMsg(&msg)
}

View File

@ -0,0 +1,451 @@
package relay_test
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"net"
"testing"
"time"
. "github.com/libp2p/go-libp2p-circuit"
pb "github.com/libp2p/go-libp2p-circuit/pb"
bhost "github.com/libp2p/go-libp2p-blankhost"
"github.com/libp2p/go-libp2p-core/host"
swarm "github.com/libp2p/go-libp2p-swarm"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
/* TODO: add tests
- simple A -[R]-> B
- A tries to relay through R, R doesnt support relay
- A tries to relay through R to B, B doesnt support relay
- A sends too long multiaddr
- R drops stream mid-message
- A relays through R, R has no connection to B
*/
func getNetHosts(t *testing.T, n int) []host.Host {
var out []host.Host
for i := 0; i < n; i++ {
netw := swarmt.GenSwarm(t)
h := bhost.NewBlankHost(netw)
out = append(out, h)
}
return out
}
func newTestRelay(t *testing.T, host host.Host, opts ...RelayOpt) *Relay {
r, err := NewRelay(host, swarmt.GenUpgrader(t, host.Network().(*swarm.Swarm)), opts...)
if err != nil {
t.Fatal(err)
}
return r
}
func connect(t *testing.T, a, b host.Host) {
pinfo := a.Peerstore().PeerInfo(a.ID())
err := b.Connect(context.Background(), pinfo)
if err != nil {
t.Fatal(err)
}
}
func TestBasicRelay(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, 3)
connect(t, hosts[0], hosts[1])
connect(t, hosts[1], hosts[2])
time.Sleep(10 * time.Millisecond)
r1 := newTestRelay(t, hosts[0])
newTestRelay(t, hosts[1], OptHop)
r3 := newTestRelay(t, hosts[2])
var (
conn1, conn2 net.Conn
done = make(chan struct{})
)
defer func() {
<-done
if conn1 != nil {
conn1.Close()
}
if conn2 != nil {
conn2.Close()
}
}()
msg := []byte("relay works!")
go func() {
defer close(done)
list := r3.Listener()
var err error
conn1, err = list.Accept()
if err != nil {
t.Error(err)
return
}
_, err = conn1.Write(msg)
if err != nil {
t.Error(err)
return
}
}()
rinfo := hosts[1].Peerstore().PeerInfo(hosts[1].ID())
dinfo := hosts[2].Peerstore().PeerInfo(hosts[2].ID())
rctx, rcancel := context.WithTimeout(ctx, time.Second)
defer rcancel()
var err error
conn2, err = r1.DialPeer(rctx, rinfo, dinfo)
if err != nil {
t.Fatal(err)
}
result := make([]byte, len(msg))
_, err = io.ReadFull(conn2, result)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(result, msg) {
t.Fatal("message was incorrect:", string(result))
}
}
func TestRelayReset(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, 3)
connect(t, hosts[0], hosts[1])
connect(t, hosts[1], hosts[2])
time.Sleep(10 * time.Millisecond)
r1 := newTestRelay(t, hosts[0])
newTestRelay(t, hosts[1], OptHop)
r3 := newTestRelay(t, hosts[2])
ready := make(chan struct{})
msg := []byte("relay works!")
go func() {
list := r3.Listener()
con, err := list.Accept()
if err != nil {
t.Error(err)
return
}
<-ready
_, err = con.Write(msg)
if err != nil {
t.Error(err)
return
}
hosts[2].Network().ClosePeer(hosts[1].ID())
}()
rinfo := hosts[1].Peerstore().PeerInfo(hosts[1].ID())
dinfo := hosts[2].Peerstore().PeerInfo(hosts[2].ID())
rctx, rcancel := context.WithTimeout(ctx, time.Second)
defer rcancel()
con, err := r1.DialPeer(rctx, rinfo, dinfo)
if err != nil {
t.Fatal(err)
}
close(ready)
_, err = ioutil.ReadAll(con)
if err == nil {
t.Fatal("expected error for reset relayed connection")
}
}
func TestBasicRelayDial(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
hosts := getNetHosts(t, 3)
connect(t, hosts[0], hosts[1])
connect(t, hosts[1], hosts[2])
time.Sleep(10 * time.Millisecond)
r1 := newTestRelay(t, hosts[0])
_ = newTestRelay(t, hosts[1], OptHop)
r3 := newTestRelay(t, hosts[2])
var (
conn1, conn2 net.Conn
done = make(chan struct{})
)
defer func() {
cancel()
<-done
if conn1 != nil {
conn1.Close()
}
if conn2 != nil {
conn2.Close()
}
}()
msg := []byte("relay works!")
go func() {
defer close(done)
list := r3.Listener()
var err error
conn1, err = list.Accept()
if err != nil {
t.Error(err)
return
}
_, err = conn1.Write(msg)
if err != nil {
t.Error(err)
return
}
}()
addr := ma.StringCast(fmt.Sprintf("/ipfs/%s/p2p-circuit", hosts[1].ID().Pretty()))
rctx, rcancel := context.WithTimeout(ctx, time.Second)
defer rcancel()
var err error
conn2, err = r1.Dial(rctx, addr, hosts[2].ID())
if err != nil {
t.Fatal(err)
}
data := make([]byte, len(msg))
_, err = io.ReadFull(conn2, data)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(data, msg) {
t.Fatal("message was incorrect:", string(data))
}
}
func TestUnspecificRelayDialFails(t *testing.T) {
hosts := getNetHosts(t, 3)
r1 := newTestRelay(t, hosts[0])
newTestRelay(t, hosts[1], OptHop)
r3 := newTestRelay(t, hosts[2])
connect(t, hosts[0], hosts[1])
connect(t, hosts[1], hosts[2])
time.Sleep(100 * time.Millisecond)
go func() {
if _, err := r3.Listener().Accept(); err == nil {
t.Error("should not have received relay connection")
}
}()
addr := ma.StringCast("/p2p-circuit")
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if _, err := r1.Dial(ctx, addr, hosts[2].ID()); err == nil {
t.Fatal("expected dial with unspecified relay address to fail, even if we're connected to a relay")
}
}
func TestRelayThroughNonHop(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, 3)
connect(t, hosts[0], hosts[1])
connect(t, hosts[1], hosts[2])
time.Sleep(10 * time.Millisecond)
r1 := newTestRelay(t, hosts[0])
newTestRelay(t, hosts[1])
newTestRelay(t, hosts[2])
rinfo := hosts[1].Peerstore().PeerInfo(hosts[1].ID())
dinfo := hosts[2].Peerstore().PeerInfo(hosts[2].ID())
rctx, rcancel := context.WithTimeout(ctx, time.Second)
defer rcancel()
_, err := r1.DialPeer(rctx, rinfo, dinfo)
if err == nil {
t.Fatal("expected error")
}
rerr, ok := err.(RelayError)
if !ok {
t.Fatalf("expected RelayError: %#v", err)
}
if rerr.Code != pb.CircuitRelay_HOP_CANT_SPEAK_RELAY {
t.Fatal("expected 'HOP_CANT_SPEAK_RELAY' error")
}
}
func TestRelayNoDestConnection(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, 3)
connect(t, hosts[0], hosts[1])
time.Sleep(10 * time.Millisecond)
r1 := newTestRelay(t, hosts[0])
newTestRelay(t, hosts[1], OptHop)
rinfo := hosts[1].Peerstore().PeerInfo(hosts[1].ID())
dinfo := hosts[2].Peerstore().PeerInfo(hosts[2].ID())
rctx, rcancel := context.WithTimeout(ctx, time.Second)
defer rcancel()
_, err := r1.DialPeer(rctx, rinfo, dinfo)
if err == nil {
t.Fatal("expected error")
}
rerr, ok := err.(RelayError)
if !ok {
t.Fatalf("expected RelayError: %#v", err)
}
if rerr.Code != pb.CircuitRelay_HOP_NO_CONN_TO_DST {
t.Fatal("expected 'HOP_NO_CONN_TO_DST' error")
}
}
func TestActiveRelay(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, 3)
connect(t, hosts[0], hosts[1])
time.Sleep(10 * time.Millisecond)
r1 := newTestRelay(t, hosts[0])
newTestRelay(t, hosts[1], OptHop, OptActive)
r3 := newTestRelay(t, hosts[2])
connChan := make(chan manet.Conn)
msg := []byte("relay works!")
go func() {
defer close(connChan)
list := r3.Listener()
conn1, err := list.Accept()
if err != nil {
t.Error(err)
return
}
if _, err := conn1.Write(msg); err != nil {
t.Error(err)
return
}
connChan <- conn1
}()
rinfo := hosts[1].Peerstore().PeerInfo(hosts[1].ID())
dinfo := hosts[2].Peerstore().PeerInfo(hosts[2].ID())
rctx, rcancel := context.WithTimeout(ctx, time.Second)
defer rcancel()
conn2, err := r1.DialPeer(rctx, rinfo, dinfo)
if err != nil {
t.Fatal(err)
}
defer conn2.Close()
data := make([]byte, len(msg))
_, err = io.ReadFull(conn2, data)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(data, msg) {
t.Fatal("message was incorrect:", string(data))
}
conn1, ok := <-connChan
if !ok {
t.Fatal("listener didn't accept a connection")
}
conn1.Close()
}
func TestRelayCanHop(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, 2)
connect(t, hosts[0], hosts[1])
time.Sleep(10 * time.Millisecond)
r1 := newTestRelay(t, hosts[0])
newTestRelay(t, hosts[1], OptHop)
canhop, err := r1.CanHop(ctx, hosts[1].ID())
if err != nil {
t.Fatal(err)
}
if !canhop {
t.Fatal("Relay can't hop")
}
}

View File

@ -0,0 +1,74 @@
package relay
import (
"fmt"
"io"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/transport"
ma "github.com/multiformats/go-multiaddr"
)
var circuitAddr = ma.Cast(ma.ProtocolWithCode(ma.P_CIRCUIT).VCode)
var _ transport.Transport = (*RelayTransport)(nil)
var _ io.Closer = (*RelayTransport)(nil)
type RelayTransport Relay
func (t *RelayTransport) Relay() *Relay {
return (*Relay)(t)
}
func (r *Relay) Transport() *RelayTransport {
return (*RelayTransport)(r)
}
func (t *RelayTransport) Listen(laddr ma.Multiaddr) (transport.Listener, error) {
// TODO: Ensure we have a connection to the relay, if specified. Also,
// make sure the multiaddr makes sense.
if !t.Relay().Matches(laddr) {
return nil, fmt.Errorf("%s is not a relay address", laddr)
}
return t.upgrader.UpgradeListener(t, t.Relay().Listener()), nil
}
func (t *RelayTransport) CanDial(raddr ma.Multiaddr) bool {
return t.Relay().Matches(raddr)
}
func (t *RelayTransport) Proxy() bool {
return true
}
func (t *RelayTransport) Protocols() []int {
return []int{ma.P_CIRCUIT}
}
func (r *RelayTransport) Close() error {
r.ctxCancel()
return nil
}
// AddRelayTransport constructs a relay and adds it as a transport to the host network.
func AddRelayTransport(h host.Host, upgrader transport.Upgrader, opts ...RelayOpt) error {
n, ok := h.Network().(transport.TransportNetwork)
if !ok {
return fmt.Errorf("%v is not a transport network", h.Network())
}
r, err := NewRelay(h, upgrader, opts...)
if err != nil {
return err
}
// There's no nice way to handle these errors as we have no way to tear
// down the relay.
// TODO
if err := n.AddTransport(r.Transport()); err != nil {
log.Error("failed to add relay transport:", err)
} else if err := n.Listen(r.Listener().Multiaddr()); err != nil {
log.Error("failed to listen on relay transport:", err)
}
return nil
}

View File

@ -0,0 +1,149 @@
package relay_test
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"testing"
"time"
. "github.com/libp2p/go-libp2p-circuit"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peerstore"
swarm "github.com/libp2p/go-libp2p-swarm"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
ma "github.com/multiformats/go-multiaddr"
)
const TestProto = "test/relay-transport"
var msg = []byte("relay works!")
func testSetupRelay(t *testing.T) []host.Host {
hosts := getNetHosts(t, 3)
err := AddRelayTransport(hosts[0], swarmt.GenUpgrader(t, hosts[0].Network().(*swarm.Swarm)))
if err != nil {
t.Fatal(err)
}
err = AddRelayTransport(hosts[1], swarmt.GenUpgrader(t, hosts[1].Network().(*swarm.Swarm)), OptHop)
if err != nil {
t.Fatal(err)
}
err = AddRelayTransport(hosts[2], swarmt.GenUpgrader(t, hosts[2].Network().(*swarm.Swarm)))
if err != nil {
t.Fatal(err)
}
connect(t, hosts[0], hosts[1])
connect(t, hosts[1], hosts[2])
time.Sleep(100 * time.Millisecond)
handler := func(s network.Stream) {
_, err := s.Write(msg)
if err != nil {
t.Error(err)
}
s.Close()
}
hosts[2].SetStreamHandler(TestProto, handler)
return hosts
}
func TestFullAddressTransportDial(t *testing.T) {
hosts := testSetupRelay(t)
var relayAddr ma.Multiaddr
for _, addr := range hosts[1].Addrs() {
// skip relay addrs.
if _, err := addr.ValueForProtocol(ma.P_CIRCUIT); err != nil {
relayAddr = addr
}
}
addr, err := ma.NewMultiaddr(fmt.Sprintf("%s/p2p/%s/p2p-circuit/p2p/%s", relayAddr.String(), hosts[1].ID().Pretty(), hosts[2].ID().Pretty()))
if err != nil {
t.Fatal(err)
}
hosts[0].Peerstore().AddAddrs(hosts[2].ID(), []ma.Multiaddr{addr}, peerstore.TempAddrTTL)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
s, err := hosts[0].NewStream(ctx, hosts[2].ID(), TestProto)
if err != nil {
t.Fatal(err)
}
data, err := ioutil.ReadAll(s)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(data, msg) {
t.Fatal("message was incorrect:", string(data))
}
}
func TestSpecificRelayTransportDial(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := testSetupRelay(t)
addr, err := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s/p2p-circuit/ipfs/%s", hosts[1].ID().Pretty(), hosts[2].ID().Pretty()))
if err != nil {
t.Fatal(err)
}
rctx, rcancel := context.WithTimeout(ctx, time.Second)
defer rcancel()
hosts[0].Peerstore().AddAddrs(hosts[2].ID(), []ma.Multiaddr{addr}, peerstore.TempAddrTTL)
s, err := hosts[0].NewStream(rctx, hosts[2].ID(), TestProto)
if err != nil {
t.Fatal(err)
}
data, err := ioutil.ReadAll(s)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(data, msg) {
t.Fatal("message was incorrect:", string(data))
}
}
func TestUnspecificRelayTransportDialFails(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := testSetupRelay(t)
addr, err := ma.NewMultiaddr(fmt.Sprintf("/p2p-circuit/ipfs/%s", hosts[2].ID().Pretty()))
if err != nil {
t.Fatal(err)
}
rctx, rcancel := context.WithTimeout(ctx, time.Second)
defer rcancel()
hosts[0].Peerstore().AddAddrs(hosts[2].ID(), []ma.Multiaddr{addr}, peerstore.TempAddrTTL)
_, err = hosts[0].NewStream(rctx, hosts[2].ID(), TestProto)
if err == nil {
t.Fatal("dial to unspecified address should have failed")
}
}

View File

@ -0,0 +1,118 @@
package relay
import (
"errors"
"io"
pb "github.com/libp2p/go-libp2p-circuit/pb"
"github.com/libp2p/go-libp2p-core/peer"
pool "github.com/libp2p/go-buffer-pool"
"github.com/libp2p/go-msgio/protoio"
"github.com/gogo/protobuf/proto"
ma "github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-varint"
)
func peerToPeerInfo(p *pb.CircuitRelay_Peer) (peer.AddrInfo, error) {
if p == nil {
return peer.AddrInfo{}, errors.New("nil peer")
}
id, err := peer.IDFromBytes(p.Id)
if err != nil {
return peer.AddrInfo{}, err
}
addrs := make([]ma.Multiaddr, 0, len(p.Addrs))
for _, addrBytes := range p.Addrs {
a, err := ma.NewMultiaddrBytes(addrBytes)
if err == nil {
addrs = append(addrs, a)
}
}
return peer.AddrInfo{ID: id, Addrs: addrs}, nil
}
func peerInfoToPeer(pi peer.AddrInfo) *pb.CircuitRelay_Peer {
addrs := make([][]byte, len(pi.Addrs))
for i, addr := range pi.Addrs {
addrs[i] = addr.Bytes()
}
p := new(pb.CircuitRelay_Peer)
p.Id = []byte(pi.ID)
p.Addrs = addrs
return p
}
func incrementTag(v int) int {
return v + 1
}
func decrementTag(v int) int {
if v > 0 {
return v - 1
} else {
return v
}
}
type delimitedReader struct {
r io.Reader
buf []byte
}
// The gogo protobuf NewDelimitedReader is buffered, which may eat up stream data.
// So we need to implement a compatible delimited reader that reads unbuffered.
// There is a slowdown from unbuffered reading: when reading the message
// it can take multiple single byte Reads to read the length and another Read
// to read the message payload.
// However, this is not critical performance degradation as
// - the reader is utilized to read one (dialer, stop) or two messages (hop) during
// the handshake, so it's a drop in the water for the connection lifetime.
// - messages are small (max 4k) and the length fits in a couple of bytes,
// so overall we have at most three reads per message.
func newDelimitedReader(r io.Reader, maxSize int) *delimitedReader {
return &delimitedReader{r: r, buf: pool.Get(maxSize)}
}
func (d *delimitedReader) Close() {
if d.buf != nil {
pool.Put(d.buf)
d.buf = nil
}
}
func (d *delimitedReader) ReadByte() (byte, error) {
buf := d.buf[:1]
_, err := d.r.Read(buf)
return buf[0], err
}
func (d *delimitedReader) ReadMsg(msg proto.Message) error {
mlen, err := varint.ReadUvarint(d)
if err != nil {
return err
}
if uint64(len(d.buf)) < mlen {
return errors.New("message too large")
}
buf := d.buf[:mlen]
_, err = io.ReadFull(d.r, buf)
if err != nil {
return err
}
return proto.Unmarshal(buf, msg)
}
func newDelimitedWriter(w io.Writer) protoio.WriteCloser {
return protoio.NewDelimitedWriter(w)
}