diff --git a/p2p/protocol/internal/circuitv1-deprecated/conn.go b/p2p/protocol/internal/circuitv1-deprecated/conn.go new file mode 100644 index 00000000..82b86d15 --- /dev/null +++ b/p2p/protocol/internal/circuitv1-deprecated/conn.go @@ -0,0 +1,124 @@ +package relay + +import ( + "fmt" + "net" + "time" + + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" +) + +// HopTagWeight is the connection manager weight for connections carrying relay hop streams +var HopTagWeight = 5 + +type Conn struct { + stream network.Stream + remote peer.AddrInfo + host host.Host + relay *Relay +} + +type NetAddr struct { + Relay string + Remote string +} + +func (n *NetAddr) Network() string { + return "libp2p-circuit-relay" +} + +func (n *NetAddr) String() string { + return fmt.Sprintf("relay[%s-%s]", n.Remote, n.Relay) +} + +func (c *Conn) Close() error { + c.untagHop() + return c.stream.Reset() +} + +func (c *Conn) Read(buf []byte) (int, error) { + return c.stream.Read(buf) +} + +func (c *Conn) Write(buf []byte) (int, error) { + return c.stream.Write(buf) +} + +func (c *Conn) SetDeadline(t time.Time) error { + return c.stream.SetDeadline(t) +} + +func (c *Conn) SetReadDeadline(t time.Time) error { + return c.stream.SetReadDeadline(t) +} + +func (c *Conn) SetWriteDeadline(t time.Time) error { + return c.stream.SetWriteDeadline(t) +} + +func (c *Conn) RemoteAddr() net.Addr { + return &NetAddr{ + Relay: c.stream.Conn().RemotePeer().Pretty(), + Remote: c.remote.ID.Pretty(), + } +} + +// Increment the underlying relay connection tag by 1, thus increasing its protection from +// connection pruning. This ensures that connections to relays are not accidentally closed, +// by the connection manager, taking with them all the relayed connections (that may themselves +// be protected). +func (c *Conn) tagHop() { + c.relay.mx.Lock() + defer c.relay.mx.Unlock() + + p := c.stream.Conn().RemotePeer() + c.relay.hopCount[p]++ + if c.relay.hopCount[p] == 1 { + c.host.ConnManager().TagPeer(p, "relay-hop-stream", HopTagWeight) + } +} + +// Decrement the underlying relay connection tag by 1; this is performed when we close the +// relayed connection. +func (c *Conn) untagHop() { + c.relay.mx.Lock() + defer c.relay.mx.Unlock() + + p := c.stream.Conn().RemotePeer() + c.relay.hopCount[p]-- + if c.relay.hopCount[p] == 0 { + c.host.ConnManager().UntagPeer(p, "relay-hop-stream") + delete(c.relay.hopCount, p) + } +} + +// TODO: is it okay to cast c.Conn().RemotePeer() into a multiaddr? might be "user input" +func (c *Conn) RemoteMultiaddr() ma.Multiaddr { + // TODO: We should be able to do this directly without converting to/from a string. + relayAddr, err := ma.NewComponent( + ma.ProtocolWithCode(ma.P_P2P).Name, + c.stream.Conn().RemotePeer().Pretty(), + ) + if err != nil { + panic(err) + } + return ma.Join(c.stream.Conn().RemoteMultiaddr(), relayAddr, circuitAddr) +} + +func (c *Conn) LocalMultiaddr() ma.Multiaddr { + return c.stream.Conn().LocalMultiaddr() +} + +func (c *Conn) LocalAddr() net.Addr { + na, err := manet.ToNetAddr(c.stream.Conn().LocalMultiaddr()) + if err != nil { + log.Error("failed to convert local multiaddr to net addr:", err) + return nil + } + return na +} diff --git a/p2p/protocol/internal/circuitv1-deprecated/dial.go b/p2p/protocol/internal/circuitv1-deprecated/dial.go new file mode 100644 index 00000000..703521ef --- /dev/null +++ b/p2p/protocol/internal/circuitv1-deprecated/dial.go @@ -0,0 +1,57 @@ +package relay + +import ( + "context" + "fmt" + + "github.com/libp2p/go-libp2p-core/network" + + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/transport" + ma "github.com/multiformats/go-multiaddr" +) + +func (d *RelayTransport) Dial(ctx context.Context, a ma.Multiaddr, p peer.ID) (transport.CapableConn, error) { + c, err := d.Relay().Dial(ctx, a, p) + if err != nil { + return nil, err + } + c.tagHop() + scope, _ := network.NullResourceManager.OpenConnection(network.DirOutbound, false) + return d.upgrader.Upgrade(ctx, d, c, network.DirOutbound, p, scope) +} + +func (r *Relay) Dial(ctx context.Context, a ma.Multiaddr, p peer.ID) (*Conn, error) { + // split /a/p2p-circuit/b into (/a, /p2p-circuit/b) + relayaddr, destaddr := ma.SplitFunc(a, func(c ma.Component) bool { + return c.Protocol().Code == ma.P_CIRCUIT + }) + + // If the address contained no /p2p-circuit part, the second part is nil. + if destaddr == nil { + return nil, fmt.Errorf("%s is not a relay address", a) + } + + if relayaddr == nil { + return nil, fmt.Errorf( + "can't dial a p2p-circuit without specifying a relay: %s", + a, + ) + } + + // Strip the /p2p-circuit prefix from the destaddr. + _, destaddr = ma.SplitFirst(destaddr) + + dinfo := &peer.AddrInfo{ID: p, Addrs: []ma.Multiaddr{}} + if destaddr != nil { + dinfo.Addrs = append(dinfo.Addrs, destaddr) + } + + var rinfo *peer.AddrInfo + rinfo, err := peer.AddrInfoFromP2pAddr(relayaddr) + if err != nil { + return nil, fmt.Errorf("error parsing multiaddr '%s': %s", relayaddr.String(), err) + } + + return r.DialPeer(ctx, *rinfo, *dinfo) +} diff --git a/p2p/protocol/internal/circuitv1-deprecated/listen.go b/p2p/protocol/internal/circuitv1-deprecated/listen.go new file mode 100644 index 00000000..3b64bbed --- /dev/null +++ b/p2p/protocol/internal/circuitv1-deprecated/listen.go @@ -0,0 +1,61 @@ +package relay + +import ( + "net" + + pb "github.com/libp2p/go-libp2p-circuit/pb" + + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" +) + +var _ manet.Listener = (*RelayListener)(nil) + +type RelayListener Relay + +func (l *RelayListener) Relay() *Relay { + return (*Relay)(l) +} + +func (r *Relay) Listener() *RelayListener { + // TODO: Only allow one! + return (*RelayListener)(r) +} + +func (l *RelayListener) Accept() (manet.Conn, error) { + for { + select { + case c := <-l.incoming: + err := l.Relay().writeResponse(c.stream, pb.CircuitRelay_SUCCESS) + if err != nil { + log.Debugf("error writing relay response: %s", err.Error()) + c.stream.Reset() + continue + } + + // TODO: Pretty print. + log.Infof("accepted relay connection: %q", c) + + c.tagHop() + return c, nil + case <-l.ctx.Done(): + return nil, l.ctx.Err() + } + } +} + +func (l *RelayListener) Addr() net.Addr { + return &NetAddr{ + Relay: "any", + Remote: "any", + } +} + +func (l *RelayListener) Multiaddr() ma.Multiaddr { + return circuitAddr +} + +func (l *RelayListener) Close() error { + // TODO: noop? + return nil +} diff --git a/p2p/protocol/internal/circuitv1-deprecated/pb/Makefile b/p2p/protocol/internal/circuitv1-deprecated/pb/Makefile new file mode 100644 index 00000000..eb14b576 --- /dev/null +++ b/p2p/protocol/internal/circuitv1-deprecated/pb/Makefile @@ -0,0 +1,11 @@ +PB = $(wildcard *.proto) +GO = $(PB:.proto=.pb.go) + +all: $(GO) + +%.pb.go: %.proto + protoc --proto_path=$(GOPATH)/src:. --gogofast_out=. $< + +clean: + rm -f *.pb.go + rm -f *.go diff --git a/p2p/protocol/internal/circuitv1-deprecated/pb/relay.pb.go b/p2p/protocol/internal/circuitv1-deprecated/pb/relay.pb.go new file mode 100644 index 00000000..66703f15 --- /dev/null +++ b/p2p/protocol/internal/circuitv1-deprecated/pb/relay.pb.go @@ -0,0 +1,868 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: relay.proto + +package relay_pb + +import ( + fmt "fmt" + github_com_gogo_protobuf_proto "github.com/gogo/protobuf/proto" + proto "github.com/gogo/protobuf/proto" + io "io" + math "math" + math_bits "math/bits" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +type CircuitRelay_Status int32 + +const ( + CircuitRelay_SUCCESS CircuitRelay_Status = 100 + CircuitRelay_HOP_SRC_ADDR_TOO_LONG CircuitRelay_Status = 220 + CircuitRelay_HOP_DST_ADDR_TOO_LONG CircuitRelay_Status = 221 + CircuitRelay_HOP_SRC_MULTIADDR_INVALID CircuitRelay_Status = 250 + CircuitRelay_HOP_DST_MULTIADDR_INVALID CircuitRelay_Status = 251 + CircuitRelay_HOP_NO_CONN_TO_DST CircuitRelay_Status = 260 + CircuitRelay_HOP_CANT_DIAL_DST CircuitRelay_Status = 261 + CircuitRelay_HOP_CANT_OPEN_DST_STREAM CircuitRelay_Status = 262 + CircuitRelay_HOP_CANT_SPEAK_RELAY CircuitRelay_Status = 270 + CircuitRelay_HOP_CANT_RELAY_TO_SELF CircuitRelay_Status = 280 + CircuitRelay_STOP_SRC_ADDR_TOO_LONG CircuitRelay_Status = 320 + CircuitRelay_STOP_DST_ADDR_TOO_LONG CircuitRelay_Status = 321 + CircuitRelay_STOP_SRC_MULTIADDR_INVALID CircuitRelay_Status = 350 + CircuitRelay_STOP_DST_MULTIADDR_INVALID CircuitRelay_Status = 351 + CircuitRelay_STOP_RELAY_REFUSED CircuitRelay_Status = 390 + CircuitRelay_MALFORMED_MESSAGE CircuitRelay_Status = 400 +) + +var CircuitRelay_Status_name = map[int32]string{ + 100: "SUCCESS", + 220: "HOP_SRC_ADDR_TOO_LONG", + 221: "HOP_DST_ADDR_TOO_LONG", + 250: "HOP_SRC_MULTIADDR_INVALID", + 251: "HOP_DST_MULTIADDR_INVALID", + 260: "HOP_NO_CONN_TO_DST", + 261: "HOP_CANT_DIAL_DST", + 262: "HOP_CANT_OPEN_DST_STREAM", + 270: "HOP_CANT_SPEAK_RELAY", + 280: "HOP_CANT_RELAY_TO_SELF", + 320: "STOP_SRC_ADDR_TOO_LONG", + 321: "STOP_DST_ADDR_TOO_LONG", + 350: "STOP_SRC_MULTIADDR_INVALID", + 351: "STOP_DST_MULTIADDR_INVALID", + 390: "STOP_RELAY_REFUSED", + 400: "MALFORMED_MESSAGE", +} + +var CircuitRelay_Status_value = map[string]int32{ + "SUCCESS": 100, + "HOP_SRC_ADDR_TOO_LONG": 220, + "HOP_DST_ADDR_TOO_LONG": 221, + "HOP_SRC_MULTIADDR_INVALID": 250, + "HOP_DST_MULTIADDR_INVALID": 251, + "HOP_NO_CONN_TO_DST": 260, + "HOP_CANT_DIAL_DST": 261, + "HOP_CANT_OPEN_DST_STREAM": 262, + "HOP_CANT_SPEAK_RELAY": 270, + "HOP_CANT_RELAY_TO_SELF": 280, + "STOP_SRC_ADDR_TOO_LONG": 320, + "STOP_DST_ADDR_TOO_LONG": 321, + "STOP_SRC_MULTIADDR_INVALID": 350, + "STOP_DST_MULTIADDR_INVALID": 351, + "STOP_RELAY_REFUSED": 390, + "MALFORMED_MESSAGE": 400, +} + +func (x CircuitRelay_Status) Enum() *CircuitRelay_Status { + p := new(CircuitRelay_Status) + *p = x + return p +} + +func (x CircuitRelay_Status) String() string { + return proto.EnumName(CircuitRelay_Status_name, int32(x)) +} + +func (x *CircuitRelay_Status) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(CircuitRelay_Status_value, data, "CircuitRelay_Status") + if err != nil { + return err + } + *x = CircuitRelay_Status(value) + return nil +} + +func (CircuitRelay_Status) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_9f69a7d5a802d584, []int{0, 0} +} + +type CircuitRelay_Type int32 + +const ( + CircuitRelay_HOP CircuitRelay_Type = 1 + CircuitRelay_STOP CircuitRelay_Type = 2 + CircuitRelay_STATUS CircuitRelay_Type = 3 + CircuitRelay_CAN_HOP CircuitRelay_Type = 4 +) + +var CircuitRelay_Type_name = map[int32]string{ + 1: "HOP", + 2: "STOP", + 3: "STATUS", + 4: "CAN_HOP", +} + +var CircuitRelay_Type_value = map[string]int32{ + "HOP": 1, + "STOP": 2, + "STATUS": 3, + "CAN_HOP": 4, +} + +func (x CircuitRelay_Type) Enum() *CircuitRelay_Type { + p := new(CircuitRelay_Type) + *p = x + return p +} + +func (x CircuitRelay_Type) String() string { + return proto.EnumName(CircuitRelay_Type_name, int32(x)) +} + +func (x *CircuitRelay_Type) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(CircuitRelay_Type_value, data, "CircuitRelay_Type") + if err != nil { + return err + } + *x = CircuitRelay_Type(value) + return nil +} + +func (CircuitRelay_Type) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_9f69a7d5a802d584, []int{0, 1} +} + +type CircuitRelay struct { + Type *CircuitRelay_Type `protobuf:"varint,1,opt,name=type,enum=relay.pb.CircuitRelay_Type" json:"type,omitempty"` + SrcPeer *CircuitRelay_Peer `protobuf:"bytes,2,opt,name=srcPeer" json:"srcPeer,omitempty"` + DstPeer *CircuitRelay_Peer `protobuf:"bytes,3,opt,name=dstPeer" json:"dstPeer,omitempty"` + Code *CircuitRelay_Status `protobuf:"varint,4,opt,name=code,enum=relay.pb.CircuitRelay_Status" json:"code,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *CircuitRelay) Reset() { *m = CircuitRelay{} } +func (m *CircuitRelay) String() string { return proto.CompactTextString(m) } +func (*CircuitRelay) ProtoMessage() {} +func (*CircuitRelay) Descriptor() ([]byte, []int) { + return fileDescriptor_9f69a7d5a802d584, []int{0} +} +func (m *CircuitRelay) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *CircuitRelay) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_CircuitRelay.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *CircuitRelay) XXX_Merge(src proto.Message) { + xxx_messageInfo_CircuitRelay.Merge(m, src) +} +func (m *CircuitRelay) XXX_Size() int { + return m.Size() +} +func (m *CircuitRelay) XXX_DiscardUnknown() { + xxx_messageInfo_CircuitRelay.DiscardUnknown(m) +} + +var xxx_messageInfo_CircuitRelay proto.InternalMessageInfo + +func (m *CircuitRelay) GetType() CircuitRelay_Type { + if m != nil && m.Type != nil { + return *m.Type + } + return CircuitRelay_HOP +} + +func (m *CircuitRelay) GetSrcPeer() *CircuitRelay_Peer { + if m != nil { + return m.SrcPeer + } + return nil +} + +func (m *CircuitRelay) GetDstPeer() *CircuitRelay_Peer { + if m != nil { + return m.DstPeer + } + return nil +} + +func (m *CircuitRelay) GetCode() CircuitRelay_Status { + if m != nil && m.Code != nil { + return *m.Code + } + return CircuitRelay_SUCCESS +} + +type CircuitRelay_Peer struct { + Id []byte `protobuf:"bytes,1,req,name=id" json:"id,omitempty"` + Addrs [][]byte `protobuf:"bytes,2,rep,name=addrs" json:"addrs,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *CircuitRelay_Peer) Reset() { *m = CircuitRelay_Peer{} } +func (m *CircuitRelay_Peer) String() string { return proto.CompactTextString(m) } +func (*CircuitRelay_Peer) ProtoMessage() {} +func (*CircuitRelay_Peer) Descriptor() ([]byte, []int) { + return fileDescriptor_9f69a7d5a802d584, []int{0, 0} +} +func (m *CircuitRelay_Peer) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *CircuitRelay_Peer) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_CircuitRelay_Peer.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *CircuitRelay_Peer) XXX_Merge(src proto.Message) { + xxx_messageInfo_CircuitRelay_Peer.Merge(m, src) +} +func (m *CircuitRelay_Peer) XXX_Size() int { + return m.Size() +} +func (m *CircuitRelay_Peer) XXX_DiscardUnknown() { + xxx_messageInfo_CircuitRelay_Peer.DiscardUnknown(m) +} + +var xxx_messageInfo_CircuitRelay_Peer proto.InternalMessageInfo + +func (m *CircuitRelay_Peer) GetId() []byte { + if m != nil { + return m.Id + } + return nil +} + +func (m *CircuitRelay_Peer) GetAddrs() [][]byte { + if m != nil { + return m.Addrs + } + return nil +} + +func init() { + proto.RegisterEnum("relay.pb.CircuitRelay_Status", CircuitRelay_Status_name, CircuitRelay_Status_value) + proto.RegisterEnum("relay.pb.CircuitRelay_Type", CircuitRelay_Type_name, CircuitRelay_Type_value) + proto.RegisterType((*CircuitRelay)(nil), "relay.pb.CircuitRelay") + proto.RegisterType((*CircuitRelay_Peer)(nil), "relay.pb.CircuitRelay.Peer") +} + +func init() { proto.RegisterFile("relay.proto", fileDescriptor_9f69a7d5a802d584) } + +var fileDescriptor_9f69a7d5a802d584 = []byte{ + // 473 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x92, 0x4f, 0x6f, 0xd3, 0x3e, + 0x18, 0xc7, 0x65, 0x27, 0xbf, 0x76, 0x7a, 0x5a, 0x4d, 0xfe, 0x59, 0x63, 0x64, 0x9d, 0x56, 0xaa, + 0x9e, 0x7a, 0x40, 0x45, 0x4c, 0xe2, 0x05, 0x98, 0xc4, 0xdd, 0x2a, 0xd2, 0x38, 0xb2, 0x5d, 0x24, + 0x4e, 0x56, 0x69, 0x72, 0xa8, 0x84, 0xd4, 0x2a, 0xcd, 0x0e, 0xbd, 0xc3, 0xb8, 0x21, 0x8e, 0xbc, + 0x1c, 0xe0, 0xc4, 0x91, 0x17, 0xc0, 0x3f, 0xf5, 0x65, 0xc0, 0x05, 0xd9, 0x5d, 0x33, 0x44, 0x37, + 0x89, 0xa3, 0x9f, 0xef, 0xe7, 0xe3, 0x3c, 0xf9, 0x26, 0xd0, 0x28, 0xf2, 0x17, 0x93, 0x55, 0x7f, + 0x51, 0xcc, 0xcb, 0x39, 0xdd, 0xbb, 0x3a, 0x3c, 0xef, 0xbe, 0xae, 0x41, 0x33, 0x9c, 0x15, 0xd3, + 0x8b, 0x59, 0x29, 0xed, 0x8c, 0x3e, 0x00, 0xbf, 0x5c, 0x2d, 0xf2, 0x00, 0x75, 0x50, 0x6f, 0xff, + 0xf4, 0xb8, 0xbf, 0x25, 0xfb, 0x7f, 0x52, 0x7d, 0xbd, 0x5a, 0xe4, 0xd2, 0x81, 0xf4, 0x11, 0xd4, + 0x97, 0xc5, 0x34, 0xcd, 0xf3, 0x22, 0xc0, 0x1d, 0xd4, 0x6b, 0xdc, 0xea, 0x58, 0x44, 0x6e, 0x59, + 0xab, 0x65, 0xcb, 0xd2, 0x69, 0xde, 0x3f, 0x68, 0x57, 0x2c, 0x7d, 0x08, 0xfe, 0x74, 0x9e, 0xe5, + 0x81, 0xef, 0xd6, 0x3b, 0xb9, 0xc5, 0x51, 0xe5, 0xa4, 0xbc, 0x58, 0x4a, 0x87, 0xb6, 0xee, 0x83, + 0xef, 0xd4, 0x7d, 0xc0, 0xb3, 0x2c, 0x40, 0x1d, 0xdc, 0x6b, 0x4a, 0x3c, 0xcb, 0xe8, 0x01, 0xfc, + 0x37, 0xc9, 0xb2, 0x62, 0x19, 0xe0, 0x8e, 0xd7, 0x6b, 0xca, 0xcd, 0xa1, 0xfb, 0xd1, 0x83, 0xda, + 0x46, 0xa7, 0x0d, 0xa8, 0xab, 0x71, 0x18, 0x72, 0xa5, 0x48, 0x46, 0x5b, 0x70, 0xe7, 0x5c, 0xa4, + 0x46, 0xc9, 0xd0, 0xb0, 0x28, 0x92, 0x46, 0x0b, 0x61, 0x62, 0x91, 0x9c, 0x91, 0x2f, 0x68, 0x9b, + 0x45, 0x4a, 0xff, 0x95, 0x7d, 0x45, 0xb4, 0x0d, 0x47, 0x5b, 0x6f, 0x34, 0x8e, 0xf5, 0xd0, 0x01, + 0xc3, 0xe4, 0x29, 0x8b, 0x87, 0x11, 0xf9, 0x59, 0xe5, 0xd6, 0xdd, 0xcd, 0x7f, 0x21, 0x7a, 0x17, + 0xa8, 0xcd, 0x13, 0x61, 0x42, 0x91, 0x24, 0x46, 0x0b, 0x8b, 0x92, 0x97, 0x98, 0x1e, 0xc2, 0xff, + 0x36, 0x08, 0x59, 0xa2, 0x4d, 0x34, 0x64, 0xb1, 0x9b, 0xbf, 0xc2, 0xf4, 0x04, 0x82, 0x6a, 0x2e, + 0x52, 0x9e, 0xb8, 0xab, 0x95, 0x96, 0x9c, 0x8d, 0xc8, 0x25, 0xa6, 0x47, 0x70, 0x50, 0xc5, 0x2a, + 0xe5, 0xec, 0x89, 0x91, 0x3c, 0x66, 0xcf, 0xc8, 0x1b, 0x4c, 0x8f, 0xe1, 0xb0, 0x8a, 0xdc, 0xd0, + 0x3e, 0x4d, 0xf1, 0x78, 0x40, 0xde, 0xb9, 0x50, 0xe9, 0x1b, 0x0b, 0x78, 0x7f, 0x1d, 0xee, 0x36, + 0xf0, 0x01, 0xd3, 0x7b, 0xd0, 0xaa, 0xcc, 0xdd, 0x57, 0xfc, 0x76, 0x0d, 0xdc, 0xdc, 0xc1, 0x77, + 0x6c, 0x3b, 0x70, 0xc0, 0x66, 0x29, 0xc9, 0x07, 0x63, 0xc5, 0x23, 0x72, 0xe9, 0xd9, 0x0e, 0x46, + 0x2c, 0x1e, 0x08, 0x39, 0xe2, 0x91, 0x19, 0x71, 0xa5, 0xd8, 0x19, 0x27, 0x6f, 0xbd, 0xee, 0x29, + 0xf8, 0xf6, 0x0f, 0xa5, 0x75, 0xf0, 0xce, 0x45, 0x4a, 0x10, 0xdd, 0x03, 0xdf, 0xde, 0x40, 0x30, + 0x05, 0xa8, 0x29, 0xcd, 0xf4, 0x58, 0x11, 0xcf, 0x7e, 0xe0, 0x90, 0x25, 0xc6, 0x22, 0xfe, 0xe3, + 0xe6, 0xa7, 0x75, 0x1b, 0x7d, 0x5e, 0xb7, 0xd1, 0x8f, 0x75, 0x1b, 0xfd, 0x0e, 0x00, 0x00, 0xff, + 0xff, 0x6b, 0x22, 0x33, 0xbb, 0x2f, 0x03, 0x00, 0x00, +} + +func (m *CircuitRelay) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *CircuitRelay) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *CircuitRelay) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if m.Code != nil { + i = encodeVarintRelay(dAtA, i, uint64(*m.Code)) + i-- + dAtA[i] = 0x20 + } + if m.DstPeer != nil { + { + size, err := m.DstPeer.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRelay(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x1a + } + if m.SrcPeer != nil { + { + size, err := m.SrcPeer.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRelay(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + if m.Type != nil { + i = encodeVarintRelay(dAtA, i, uint64(*m.Type)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func (m *CircuitRelay_Peer) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *CircuitRelay_Peer) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *CircuitRelay_Peer) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if len(m.Addrs) > 0 { + for iNdEx := len(m.Addrs) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.Addrs[iNdEx]) + copy(dAtA[i:], m.Addrs[iNdEx]) + i = encodeVarintRelay(dAtA, i, uint64(len(m.Addrs[iNdEx]))) + i-- + dAtA[i] = 0x12 + } + } + if m.Id == nil { + return 0, github_com_gogo_protobuf_proto.NewRequiredNotSetError("id") + } else { + i -= len(m.Id) + copy(dAtA[i:], m.Id) + i = encodeVarintRelay(dAtA, i, uint64(len(m.Id))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func encodeVarintRelay(dAtA []byte, offset int, v uint64) int { + offset -= sovRelay(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *CircuitRelay) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Type != nil { + n += 1 + sovRelay(uint64(*m.Type)) + } + if m.SrcPeer != nil { + l = m.SrcPeer.Size() + n += 1 + l + sovRelay(uint64(l)) + } + if m.DstPeer != nil { + l = m.DstPeer.Size() + n += 1 + l + sovRelay(uint64(l)) + } + if m.Code != nil { + n += 1 + sovRelay(uint64(*m.Code)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *CircuitRelay_Peer) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Id != nil { + l = len(m.Id) + n += 1 + l + sovRelay(uint64(l)) + } + if len(m.Addrs) > 0 { + for _, b := range m.Addrs { + l = len(b) + n += 1 + l + sovRelay(uint64(l)) + } + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func sovRelay(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozRelay(x uint64) (n int) { + return sovRelay(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *CircuitRelay) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRelay + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: CircuitRelay: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: CircuitRelay: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType) + } + var v CircuitRelay_Type + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRelay + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= CircuitRelay_Type(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.Type = &v + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field SrcPeer", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRelay + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRelay + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRelay + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.SrcPeer == nil { + m.SrcPeer = &CircuitRelay_Peer{} + } + if err := m.SrcPeer.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field DstPeer", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRelay + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRelay + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRelay + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.DstPeer == nil { + m.DstPeer = &CircuitRelay_Peer{} + } + if err := m.DstPeer.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Code", wireType) + } + var v CircuitRelay_Status + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRelay + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= CircuitRelay_Status(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.Code = &v + default: + iNdEx = preIndex + skippy, err := skipRelay(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRelay + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthRelay + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *CircuitRelay_Peer) Unmarshal(dAtA []byte) error { + var hasFields [1]uint64 + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRelay + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Peer: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Peer: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Id", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRelay + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthRelay + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthRelay + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Id = append(m.Id[:0], dAtA[iNdEx:postIndex]...) + if m.Id == nil { + m.Id = []byte{} + } + iNdEx = postIndex + hasFields[0] |= uint64(0x00000001) + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Addrs", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRelay + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthRelay + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthRelay + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Addrs = append(m.Addrs, make([]byte, postIndex-iNdEx)) + copy(m.Addrs[len(m.Addrs)-1], dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRelay(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRelay + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthRelay + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + if hasFields[0]&uint64(0x00000001) == 0 { + return github_com_gogo_protobuf_proto.NewRequiredNotSetError("id") + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipRelay(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + depth := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowRelay + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowRelay + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + case 1: + iNdEx += 8 + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowRelay + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthRelay + } + iNdEx += length + if iNdEx < 0 { + return 0, ErrInvalidLengthRelay + } + case 3: + depth++ + case 4: + if depth == 0 { + return 0, ErrUnexpectedEndOfGroupRelay + } + depth-- + case 5: + iNdEx += 4 + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + if depth == 0 { + return iNdEx, nil + } + } + return 0, io.ErrUnexpectedEOF +} + +var ( + ErrInvalidLengthRelay = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowRelay = fmt.Errorf("proto: integer overflow") + ErrUnexpectedEndOfGroupRelay = fmt.Errorf("proto: unexpected end of group") +) diff --git a/p2p/protocol/internal/circuitv1-deprecated/pb/relay.proto b/p2p/protocol/internal/circuitv1-deprecated/pb/relay.proto new file mode 100644 index 00000000..de3e637b --- /dev/null +++ b/p2p/protocol/internal/circuitv1-deprecated/pb/relay.proto @@ -0,0 +1,44 @@ +syntax = "proto2"; + +package relay.pb; + +message CircuitRelay { + + enum Status { + SUCCESS = 100; + HOP_SRC_ADDR_TOO_LONG = 220; + HOP_DST_ADDR_TOO_LONG = 221; + HOP_SRC_MULTIADDR_INVALID = 250; + HOP_DST_MULTIADDR_INVALID = 251; + HOP_NO_CONN_TO_DST = 260; + HOP_CANT_DIAL_DST = 261; + HOP_CANT_OPEN_DST_STREAM = 262; + HOP_CANT_SPEAK_RELAY = 270; + HOP_CANT_RELAY_TO_SELF = 280; + STOP_SRC_ADDR_TOO_LONG = 320; + STOP_DST_ADDR_TOO_LONG = 321; + STOP_SRC_MULTIADDR_INVALID = 350; + STOP_DST_MULTIADDR_INVALID = 351; + STOP_RELAY_REFUSED = 390; + MALFORMED_MESSAGE = 400; + } + + enum Type { // RPC identifier, either HOP, STOP or STATUS + HOP = 1; + STOP = 2; + STATUS = 3; + CAN_HOP = 4; + } + + message Peer { + required bytes id = 1; // peer id + repeated bytes addrs = 2; // peer's known addresses + } + + optional Type type = 1; // Type of the message + + optional Peer srcPeer = 2; // srcPeer and dstPeer are used when Type is HOP or STOP + optional Peer dstPeer = 3; + + optional Status code = 4; // Status code, used when Type is STATUS +} diff --git a/p2p/protocol/internal/circuitv1-deprecated/relay.go b/p2p/protocol/internal/circuitv1-deprecated/relay.go new file mode 100644 index 00000000..284f3ef4 --- /dev/null +++ b/p2p/protocol/internal/circuitv1-deprecated/relay.go @@ -0,0 +1,507 @@ +package relay + +import ( + "context" + "fmt" + "io" + "sync" + "sync/atomic" + "time" + + pb "github.com/libp2p/go-libp2p-circuit/pb" + + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/peerstore" + "github.com/libp2p/go-libp2p-core/transport" + + pool "github.com/libp2p/go-buffer-pool" + + logging "github.com/ipfs/go-log/v2" + + ma "github.com/multiformats/go-multiaddr" +) + +var log = logging.Logger("relay") + +const ProtoID = "/libp2p/circuit/relay/0.1.0" + +const maxMessageSize = 4096 + +var ( + RelayAcceptTimeout = 10 * time.Second + HopConnectTimeout = 30 * time.Second + StopHandshakeTimeout = 1 * time.Minute + + HopStreamBufferSize = 4096 + HopStreamLimit = 1 << 19 // 512K hops for 1M goroutines + + streamTimeout = 1 * time.Minute +) + +// Relay is the relay transport and service. +type Relay struct { + host host.Host + upgrader transport.Upgrader + ctx context.Context + ctxCancel context.CancelFunc + self peer.ID + + active bool + hop bool + + incoming chan *Conn + + // atomic counters + streamCount int32 + liveHopCount int32 + + // per peer hop counters + mx sync.Mutex + hopCount map[peer.ID]int +} + +// RelayOpts are options for configuring the relay transport. +type RelayOpt int + +var ( + // OptActive configures the relay transport to actively establish + // outbound connections on behalf of clients. You probably don't want to + // enable this unless you know what you're doing. + OptActive = RelayOpt(0) + // OptHop configures the relay transport to accept requests to relay + // traffic on behalf of third-parties. Unless OptActive is specified, + // this will only relay traffic between peers already connected to this + // node. + OptHop = RelayOpt(1) + // OptDiscovery is a no-op. It was introduced as a way to probe new + // peers to see if they were willing to act as a relays. However, in + // practice, it's useless. While it does test to see if these peers are + // relays, it doesn't (and can't), check to see if these peers are + // _active_ relays (i.e., will actively dial the target peer). + // + // This option may be re-enabled in the future but for now you shouldn't + // use it. + OptDiscovery = RelayOpt(2) +) + +type RelayError struct { + Code pb.CircuitRelay_Status +} + +func (e RelayError) Error() string { + return fmt.Sprintf("error opening relay circuit: %s (%d)", pb.CircuitRelay_Status_name[int32(e.Code)], e.Code) +} + +// NewRelay constructs a new relay. +func NewRelay(h host.Host, upgrader transport.Upgrader, opts ...RelayOpt) (*Relay, error) { + r := &Relay{ + upgrader: upgrader, + host: h, + self: h.ID(), + incoming: make(chan *Conn), + hopCount: make(map[peer.ID]int), + } + r.ctx, r.ctxCancel = context.WithCancel(context.Background()) + + for _, opt := range opts { + switch opt { + case OptActive: + r.active = true + case OptHop: + r.hop = true + case OptDiscovery: + log.Errorf( + "circuit.OptDiscovery is now a no-op: %s", + "dialing peers with a random relay is no longer supported", + ) + default: + return nil, fmt.Errorf("unrecognized option: %d", opt) + } + } + + h.SetStreamHandler(ProtoID, r.handleNewStream) + + return r, nil +} + +// Increment the live hop count and increment the connection manager tags by 1 for the two +// sides of the hop stream. This ensures that connections with many hop streams will be protected +// from pruning, thus minimizing disruption from connection trimming in a relay node. +func (r *Relay) addLiveHop(from, to peer.ID) { + atomic.AddInt32(&r.liveHopCount, 1) + r.host.ConnManager().UpsertTag(from, "relay-hop-stream", incrementTag) + r.host.ConnManager().UpsertTag(to, "relay-hop-stream", incrementTag) +} + +// Decrement the live hpo count and decrement the connection manager tags for the two sides +// of the hop stream. +func (r *Relay) rmLiveHop(from, to peer.ID) { + atomic.AddInt32(&r.liveHopCount, -1) + r.host.ConnManager().UpsertTag(from, "relay-hop-stream", decrementTag) + r.host.ConnManager().UpsertTag(to, "relay-hop-stream", decrementTag) + +} + +func (r *Relay) GetActiveHops() int32 { + return atomic.LoadInt32(&r.liveHopCount) +} + +func (r *Relay) DialPeer(ctx context.Context, relay peer.AddrInfo, dest peer.AddrInfo) (*Conn, error) { + + log.Debugf("dialing peer %s through relay %s", dest.ID, relay.ID) + + if len(relay.Addrs) > 0 { + r.host.Peerstore().AddAddrs(relay.ID, relay.Addrs, peerstore.TempAddrTTL) + } + + s, err := r.host.NewStream(ctx, relay.ID, ProtoID) + if err != nil { + return nil, err + } + + rd := newDelimitedReader(s, maxMessageSize) + wr := newDelimitedWriter(s) + defer rd.Close() + + var msg pb.CircuitRelay + + msg.Type = pb.CircuitRelay_HOP.Enum() + msg.SrcPeer = peerInfoToPeer(r.host.Peerstore().PeerInfo(r.self)) + msg.DstPeer = peerInfoToPeer(dest) + + err = wr.WriteMsg(&msg) + if err != nil { + s.Reset() + return nil, err + } + + msg.Reset() + + err = rd.ReadMsg(&msg) + if err != nil { + s.Reset() + return nil, err + } + + if msg.GetType() != pb.CircuitRelay_STATUS { + s.Reset() + return nil, fmt.Errorf("unexpected relay response; not a status message (%d)", msg.GetType()) + } + + if msg.GetCode() != pb.CircuitRelay_SUCCESS { + s.Reset() + return nil, RelayError{msg.GetCode()} + } + + return &Conn{stream: s, remote: dest, host: r.host, relay: r}, nil +} + +func (r *Relay) Matches(addr ma.Multiaddr) bool { + // TODO: Look at the prefix transport as well. + _, err := addr.ValueForProtocol(ma.P_CIRCUIT) + return err == nil +} + +// Queries a peer for support of hop relay +func CanHop(ctx context.Context, host host.Host, id peer.ID) (bool, error) { + s, err := host.NewStream(ctx, id, ProtoID) + if err != nil { + return false, err + } + defer s.Close() + + rd := newDelimitedReader(s, maxMessageSize) + wr := newDelimitedWriter(s) + defer rd.Close() + + var msg pb.CircuitRelay + + msg.Type = pb.CircuitRelay_CAN_HOP.Enum() + + if err := wr.WriteMsg(&msg); err != nil { + s.Reset() + return false, err + } + + msg.Reset() + + if err := rd.ReadMsg(&msg); err != nil { + s.Reset() + return false, err + } + + if msg.GetType() != pb.CircuitRelay_STATUS { + return false, fmt.Errorf("unexpected relay response; not a status message (%d)", msg.GetType()) + } + + return msg.GetCode() == pb.CircuitRelay_SUCCESS, nil +} + +func (r *Relay) CanHop(ctx context.Context, id peer.ID) (bool, error) { + return CanHop(ctx, r.host, id) +} + +func (r *Relay) handleNewStream(s network.Stream) { + s.SetReadDeadline(time.Now().Add(streamTimeout)) + + log.Infof("new relay stream from: %s", s.Conn().RemotePeer()) + + rd := newDelimitedReader(s, maxMessageSize) + defer rd.Close() + + var msg pb.CircuitRelay + + err := rd.ReadMsg(&msg) + if err != nil { + r.handleError(s, pb.CircuitRelay_MALFORMED_MESSAGE) + return + } + // reset stream deadline as message has been read + s.SetReadDeadline(time.Time{}) + + switch msg.GetType() { + case pb.CircuitRelay_HOP: + r.handleHopStream(s, &msg) + case pb.CircuitRelay_STOP: + r.handleStopStream(s, &msg) + case pb.CircuitRelay_CAN_HOP: + r.handleCanHop(s, &msg) + default: + log.Warnf("unexpected relay handshake: %d", msg.GetType()) + r.handleError(s, pb.CircuitRelay_MALFORMED_MESSAGE) + } +} + +func (r *Relay) handleHopStream(s network.Stream, msg *pb.CircuitRelay) { + if !r.hop { + r.handleError(s, pb.CircuitRelay_HOP_CANT_SPEAK_RELAY) + return + } + + streamCount := atomic.AddInt32(&r.streamCount, 1) + liveHopCount := atomic.LoadInt32(&r.liveHopCount) + defer atomic.AddInt32(&r.streamCount, -1) + + if (streamCount + liveHopCount) > int32(HopStreamLimit) { + log.Warn("hop stream limit exceeded; resetting stream") + s.Reset() + return + } + + src, err := peerToPeerInfo(msg.GetSrcPeer()) + if err != nil { + r.handleError(s, pb.CircuitRelay_HOP_SRC_MULTIADDR_INVALID) + return + } + + if src.ID != s.Conn().RemotePeer() { + r.handleError(s, pb.CircuitRelay_HOP_SRC_MULTIADDR_INVALID) + return + } + + dst, err := peerToPeerInfo(msg.GetDstPeer()) + if err != nil { + r.handleError(s, pb.CircuitRelay_HOP_DST_MULTIADDR_INVALID) + return + } + + if dst.ID == r.self { + r.handleError(s, pb.CircuitRelay_HOP_CANT_RELAY_TO_SELF) + return + } + + // open stream + ctx, cancel := context.WithTimeout(r.ctx, HopConnectTimeout) + defer cancel() + + if !r.active { + ctx = network.WithNoDial(ctx, "relay hop") + } else if len(dst.Addrs) > 0 { + r.host.Peerstore().AddAddrs(dst.ID, dst.Addrs, peerstore.TempAddrTTL) + } + + bs, err := r.host.NewStream(ctx, dst.ID, ProtoID) + if err != nil { + log.Debugf("error opening relay stream to %s: %s", dst.ID.Pretty(), err.Error()) + if err == network.ErrNoConn { + r.handleError(s, pb.CircuitRelay_HOP_NO_CONN_TO_DST) + } else { + r.handleError(s, pb.CircuitRelay_HOP_CANT_DIAL_DST) + } + return + } + + // stop handshake + rd := newDelimitedReader(bs, maxMessageSize) + wr := newDelimitedWriter(bs) + defer rd.Close() + + // set handshake deadline + bs.SetDeadline(time.Now().Add(StopHandshakeTimeout)) + + msg.Type = pb.CircuitRelay_STOP.Enum() + + err = wr.WriteMsg(msg) + if err != nil { + log.Debugf("error writing stop handshake: %s", err.Error()) + bs.Reset() + r.handleError(s, pb.CircuitRelay_HOP_CANT_OPEN_DST_STREAM) + return + } + + msg.Reset() + + err = rd.ReadMsg(msg) + if err != nil { + log.Debugf("error reading stop response: %s", err.Error()) + bs.Reset() + r.handleError(s, pb.CircuitRelay_HOP_CANT_OPEN_DST_STREAM) + return + } + + if msg.GetType() != pb.CircuitRelay_STATUS { + log.Debugf("unexpected relay stop response: not a status message (%d)", msg.GetType()) + bs.Reset() + r.handleError(s, pb.CircuitRelay_HOP_CANT_OPEN_DST_STREAM) + return + } + + if msg.GetCode() != pb.CircuitRelay_SUCCESS { + log.Debugf("relay stop failure: %d", msg.GetCode()) + bs.Reset() + r.handleError(s, msg.GetCode()) + return + } + + err = r.writeResponse(s, pb.CircuitRelay_SUCCESS) + if err != nil { + log.Debugf("error writing relay response: %s", err.Error()) + bs.Reset() + s.Reset() + return + } + + // relay connection + log.Infof("relaying connection between %s and %s", src.ID.Pretty(), dst.ID.Pretty()) + + // reset deadline + bs.SetDeadline(time.Time{}) + + r.addLiveHop(src.ID, dst.ID) + + goroutines := new(int32) + *goroutines = 2 + done := func() { + if atomic.AddInt32(goroutines, -1) == 0 { + s.Close() + bs.Close() + r.rmLiveHop(src.ID, dst.ID) + } + } + + // Don't reset streams after finishing or the other side will get an + // error, not an EOF. + go func() { + defer done() + + buf := pool.Get(HopStreamBufferSize) + defer pool.Put(buf) + + count, err := io.CopyBuffer(s, bs, buf) + if err != nil { + log.Debugf("relay copy error: %s", err) + // Reset both. + s.Reset() + bs.Reset() + } else { + // propagate the close + s.CloseWrite() + } + log.Debugf("relayed %d bytes from %s to %s", count, dst.ID.Pretty(), src.ID.Pretty()) + }() + + go func() { + defer done() + + buf := pool.Get(HopStreamBufferSize) + defer pool.Put(buf) + + count, err := io.CopyBuffer(bs, s, buf) + if err != nil { + log.Debugf("relay copy error: %s", err) + // Reset both. + bs.Reset() + s.Reset() + } else { + // propagate the close + bs.CloseWrite() + } + log.Debugf("relayed %d bytes from %s to %s", count, src.ID.Pretty(), dst.ID.Pretty()) + }() +} + +func (r *Relay) handleStopStream(s network.Stream, msg *pb.CircuitRelay) { + src, err := peerToPeerInfo(msg.GetSrcPeer()) + if err != nil { + r.handleError(s, pb.CircuitRelay_STOP_SRC_MULTIADDR_INVALID) + return + } + + dst, err := peerToPeerInfo(msg.GetDstPeer()) + if err != nil || dst.ID != r.self { + r.handleError(s, pb.CircuitRelay_STOP_DST_MULTIADDR_INVALID) + return + } + + log.Infof("relay connection from: %s", src.ID) + + if len(src.Addrs) > 0 { + r.host.Peerstore().AddAddrs(src.ID, src.Addrs, peerstore.TempAddrTTL) + } + + select { + case r.incoming <- &Conn{stream: s, remote: src, host: r.host, relay: r}: + case <-time.After(RelayAcceptTimeout): + r.handleError(s, pb.CircuitRelay_STOP_RELAY_REFUSED) + } +} + +func (r *Relay) handleCanHop(s network.Stream, msg *pb.CircuitRelay) { + var err error + + if r.hop { + err = r.writeResponse(s, pb.CircuitRelay_SUCCESS) + } else { + err = r.writeResponse(s, pb.CircuitRelay_HOP_CANT_SPEAK_RELAY) + } + + if err != nil { + s.Reset() + log.Debugf("error writing relay response: %s", err.Error()) + } else { + s.Close() + } +} + +func (r *Relay) handleError(s network.Stream, code pb.CircuitRelay_Status) { + log.Warnf("relay error: %s (%d)", pb.CircuitRelay_Status_name[int32(code)], code) + err := r.writeResponse(s, code) + if err != nil { + s.Reset() + log.Debugf("error writing relay response: %s", err.Error()) + } else { + s.Close() + } +} + +func (r *Relay) writeResponse(s network.Stream, code pb.CircuitRelay_Status) error { + wr := newDelimitedWriter(s) + + var msg pb.CircuitRelay + msg.Type = pb.CircuitRelay_STATUS.Enum() + msg.Code = code.Enum() + + return wr.WriteMsg(&msg) +} diff --git a/p2p/protocol/internal/circuitv1-deprecated/relay_test.go b/p2p/protocol/internal/circuitv1-deprecated/relay_test.go new file mode 100644 index 00000000..5753c37e --- /dev/null +++ b/p2p/protocol/internal/circuitv1-deprecated/relay_test.go @@ -0,0 +1,451 @@ +package relay_test + +import ( + "bytes" + "context" + "fmt" + "io" + "io/ioutil" + "net" + "testing" + "time" + + . "github.com/libp2p/go-libp2p-circuit" + pb "github.com/libp2p/go-libp2p-circuit/pb" + + bhost "github.com/libp2p/go-libp2p-blankhost" + "github.com/libp2p/go-libp2p-core/host" + + swarm "github.com/libp2p/go-libp2p-swarm" + swarmt "github.com/libp2p/go-libp2p-swarm/testing" + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" +) + +/* TODO: add tests +- simple A -[R]-> B +- A tries to relay through R, R doesnt support relay +- A tries to relay through R to B, B doesnt support relay +- A sends too long multiaddr +- R drops stream mid-message +- A relays through R, R has no connection to B +*/ + +func getNetHosts(t *testing.T, n int) []host.Host { + var out []host.Host + + for i := 0; i < n; i++ { + netw := swarmt.GenSwarm(t) + h := bhost.NewBlankHost(netw) + out = append(out, h) + } + + return out +} + +func newTestRelay(t *testing.T, host host.Host, opts ...RelayOpt) *Relay { + r, err := NewRelay(host, swarmt.GenUpgrader(t, host.Network().(*swarm.Swarm)), opts...) + if err != nil { + t.Fatal(err) + } + return r +} + +func connect(t *testing.T, a, b host.Host) { + pinfo := a.Peerstore().PeerInfo(a.ID()) + err := b.Connect(context.Background(), pinfo) + if err != nil { + t.Fatal(err) + } +} + +func TestBasicRelay(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := getNetHosts(t, 3) + + connect(t, hosts[0], hosts[1]) + connect(t, hosts[1], hosts[2]) + + time.Sleep(10 * time.Millisecond) + + r1 := newTestRelay(t, hosts[0]) + + newTestRelay(t, hosts[1], OptHop) + + r3 := newTestRelay(t, hosts[2]) + + var ( + conn1, conn2 net.Conn + done = make(chan struct{}) + ) + + defer func() { + <-done + if conn1 != nil { + conn1.Close() + } + if conn2 != nil { + conn2.Close() + } + }() + + msg := []byte("relay works!") + go func() { + defer close(done) + list := r3.Listener() + + var err error + conn1, err = list.Accept() + if err != nil { + t.Error(err) + return + } + + _, err = conn1.Write(msg) + if err != nil { + t.Error(err) + return + } + }() + + rinfo := hosts[1].Peerstore().PeerInfo(hosts[1].ID()) + dinfo := hosts[2].Peerstore().PeerInfo(hosts[2].ID()) + + rctx, rcancel := context.WithTimeout(ctx, time.Second) + defer rcancel() + + var err error + conn2, err = r1.DialPeer(rctx, rinfo, dinfo) + if err != nil { + t.Fatal(err) + } + + result := make([]byte, len(msg)) + _, err = io.ReadFull(conn2, result) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(result, msg) { + t.Fatal("message was incorrect:", string(result)) + } +} + +func TestRelayReset(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := getNetHosts(t, 3) + + connect(t, hosts[0], hosts[1]) + connect(t, hosts[1], hosts[2]) + + time.Sleep(10 * time.Millisecond) + + r1 := newTestRelay(t, hosts[0]) + + newTestRelay(t, hosts[1], OptHop) + + r3 := newTestRelay(t, hosts[2]) + + ready := make(chan struct{}) + + msg := []byte("relay works!") + go func() { + list := r3.Listener() + + con, err := list.Accept() + if err != nil { + t.Error(err) + return + } + + <-ready + + _, err = con.Write(msg) + if err != nil { + t.Error(err) + return + } + + hosts[2].Network().ClosePeer(hosts[1].ID()) + }() + + rinfo := hosts[1].Peerstore().PeerInfo(hosts[1].ID()) + dinfo := hosts[2].Peerstore().PeerInfo(hosts[2].ID()) + + rctx, rcancel := context.WithTimeout(ctx, time.Second) + defer rcancel() + + con, err := r1.DialPeer(rctx, rinfo, dinfo) + if err != nil { + t.Fatal(err) + } + + close(ready) + + _, err = ioutil.ReadAll(con) + if err == nil { + t.Fatal("expected error for reset relayed connection") + } +} + +func TestBasicRelayDial(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + hosts := getNetHosts(t, 3) + + connect(t, hosts[0], hosts[1]) + connect(t, hosts[1], hosts[2]) + + time.Sleep(10 * time.Millisecond) + + r1 := newTestRelay(t, hosts[0]) + + _ = newTestRelay(t, hosts[1], OptHop) + r3 := newTestRelay(t, hosts[2]) + + var ( + conn1, conn2 net.Conn + done = make(chan struct{}) + ) + + defer func() { + cancel() + <-done + if conn1 != nil { + conn1.Close() + } + if conn2 != nil { + conn2.Close() + } + }() + + msg := []byte("relay works!") + go func() { + defer close(done) + list := r3.Listener() + + var err error + conn1, err = list.Accept() + if err != nil { + t.Error(err) + return + } + + _, err = conn1.Write(msg) + if err != nil { + t.Error(err) + return + } + }() + + addr := ma.StringCast(fmt.Sprintf("/ipfs/%s/p2p-circuit", hosts[1].ID().Pretty())) + + rctx, rcancel := context.WithTimeout(ctx, time.Second) + defer rcancel() + + var err error + conn2, err = r1.Dial(rctx, addr, hosts[2].ID()) + if err != nil { + t.Fatal(err) + } + + data := make([]byte, len(msg)) + _, err = io.ReadFull(conn2, data) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(data, msg) { + t.Fatal("message was incorrect:", string(data)) + } +} + +func TestUnspecificRelayDialFails(t *testing.T) { + hosts := getNetHosts(t, 3) + + r1 := newTestRelay(t, hosts[0]) + newTestRelay(t, hosts[1], OptHop) + r3 := newTestRelay(t, hosts[2]) + + connect(t, hosts[0], hosts[1]) + connect(t, hosts[1], hosts[2]) + + time.Sleep(100 * time.Millisecond) + + go func() { + if _, err := r3.Listener().Accept(); err == nil { + t.Error("should not have received relay connection") + } + }() + + addr := ma.StringCast("/p2p-circuit") + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + if _, err := r1.Dial(ctx, addr, hosts[2].ID()); err == nil { + t.Fatal("expected dial with unspecified relay address to fail, even if we're connected to a relay") + } +} + +func TestRelayThroughNonHop(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := getNetHosts(t, 3) + + connect(t, hosts[0], hosts[1]) + connect(t, hosts[1], hosts[2]) + + time.Sleep(10 * time.Millisecond) + + r1 := newTestRelay(t, hosts[0]) + + newTestRelay(t, hosts[1]) + + newTestRelay(t, hosts[2]) + + rinfo := hosts[1].Peerstore().PeerInfo(hosts[1].ID()) + dinfo := hosts[2].Peerstore().PeerInfo(hosts[2].ID()) + + rctx, rcancel := context.WithTimeout(ctx, time.Second) + defer rcancel() + + _, err := r1.DialPeer(rctx, rinfo, dinfo) + if err == nil { + t.Fatal("expected error") + } + + rerr, ok := err.(RelayError) + if !ok { + t.Fatalf("expected RelayError: %#v", err) + } + + if rerr.Code != pb.CircuitRelay_HOP_CANT_SPEAK_RELAY { + t.Fatal("expected 'HOP_CANT_SPEAK_RELAY' error") + } +} + +func TestRelayNoDestConnection(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := getNetHosts(t, 3) + + connect(t, hosts[0], hosts[1]) + + time.Sleep(10 * time.Millisecond) + + r1 := newTestRelay(t, hosts[0]) + + newTestRelay(t, hosts[1], OptHop) + + rinfo := hosts[1].Peerstore().PeerInfo(hosts[1].ID()) + dinfo := hosts[2].Peerstore().PeerInfo(hosts[2].ID()) + + rctx, rcancel := context.WithTimeout(ctx, time.Second) + defer rcancel() + + _, err := r1.DialPeer(rctx, rinfo, dinfo) + if err == nil { + t.Fatal("expected error") + } + + rerr, ok := err.(RelayError) + if !ok { + t.Fatalf("expected RelayError: %#v", err) + } + + if rerr.Code != pb.CircuitRelay_HOP_NO_CONN_TO_DST { + t.Fatal("expected 'HOP_NO_CONN_TO_DST' error") + } +} + +func TestActiveRelay(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := getNetHosts(t, 3) + + connect(t, hosts[0], hosts[1]) + + time.Sleep(10 * time.Millisecond) + + r1 := newTestRelay(t, hosts[0]) + newTestRelay(t, hosts[1], OptHop, OptActive) + r3 := newTestRelay(t, hosts[2]) + + connChan := make(chan manet.Conn) + + msg := []byte("relay works!") + go func() { + defer close(connChan) + list := r3.Listener() + + conn1, err := list.Accept() + if err != nil { + t.Error(err) + return + } + + if _, err := conn1.Write(msg); err != nil { + t.Error(err) + return + } + connChan <- conn1 + }() + + rinfo := hosts[1].Peerstore().PeerInfo(hosts[1].ID()) + dinfo := hosts[2].Peerstore().PeerInfo(hosts[2].ID()) + + rctx, rcancel := context.WithTimeout(ctx, time.Second) + defer rcancel() + + conn2, err := r1.DialPeer(rctx, rinfo, dinfo) + if err != nil { + t.Fatal(err) + } + defer conn2.Close() + + data := make([]byte, len(msg)) + _, err = io.ReadFull(conn2, data) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(data, msg) { + t.Fatal("message was incorrect:", string(data)) + } + conn1, ok := <-connChan + if !ok { + t.Fatal("listener didn't accept a connection") + } + conn1.Close() +} + +func TestRelayCanHop(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := getNetHosts(t, 2) + + connect(t, hosts[0], hosts[1]) + + time.Sleep(10 * time.Millisecond) + + r1 := newTestRelay(t, hosts[0]) + + newTestRelay(t, hosts[1], OptHop) + + canhop, err := r1.CanHop(ctx, hosts[1].ID()) + if err != nil { + t.Fatal(err) + } + + if !canhop { + t.Fatal("Relay can't hop") + } +} diff --git a/p2p/protocol/internal/circuitv1-deprecated/transport.go b/p2p/protocol/internal/circuitv1-deprecated/transport.go new file mode 100644 index 00000000..39515876 --- /dev/null +++ b/p2p/protocol/internal/circuitv1-deprecated/transport.go @@ -0,0 +1,74 @@ +package relay + +import ( + "fmt" + "io" + + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/transport" + ma "github.com/multiformats/go-multiaddr" +) + +var circuitAddr = ma.Cast(ma.ProtocolWithCode(ma.P_CIRCUIT).VCode) + +var _ transport.Transport = (*RelayTransport)(nil) +var _ io.Closer = (*RelayTransport)(nil) + +type RelayTransport Relay + +func (t *RelayTransport) Relay() *Relay { + return (*Relay)(t) +} + +func (r *Relay) Transport() *RelayTransport { + return (*RelayTransport)(r) +} + +func (t *RelayTransport) Listen(laddr ma.Multiaddr) (transport.Listener, error) { + // TODO: Ensure we have a connection to the relay, if specified. Also, + // make sure the multiaddr makes sense. + if !t.Relay().Matches(laddr) { + return nil, fmt.Errorf("%s is not a relay address", laddr) + } + return t.upgrader.UpgradeListener(t, t.Relay().Listener()), nil +} + +func (t *RelayTransport) CanDial(raddr ma.Multiaddr) bool { + return t.Relay().Matches(raddr) +} + +func (t *RelayTransport) Proxy() bool { + return true +} + +func (t *RelayTransport) Protocols() []int { + return []int{ma.P_CIRCUIT} +} + +func (r *RelayTransport) Close() error { + r.ctxCancel() + return nil +} + +// AddRelayTransport constructs a relay and adds it as a transport to the host network. +func AddRelayTransport(h host.Host, upgrader transport.Upgrader, opts ...RelayOpt) error { + n, ok := h.Network().(transport.TransportNetwork) + if !ok { + return fmt.Errorf("%v is not a transport network", h.Network()) + } + + r, err := NewRelay(h, upgrader, opts...) + if err != nil { + return err + } + + // There's no nice way to handle these errors as we have no way to tear + // down the relay. + // TODO + if err := n.AddTransport(r.Transport()); err != nil { + log.Error("failed to add relay transport:", err) + } else if err := n.Listen(r.Listener().Multiaddr()); err != nil { + log.Error("failed to listen on relay transport:", err) + } + return nil +} diff --git a/p2p/protocol/internal/circuitv1-deprecated/transport_test.go b/p2p/protocol/internal/circuitv1-deprecated/transport_test.go new file mode 100644 index 00000000..cc6c8a59 --- /dev/null +++ b/p2p/protocol/internal/circuitv1-deprecated/transport_test.go @@ -0,0 +1,149 @@ +package relay_test + +import ( + "bytes" + "context" + "fmt" + "io/ioutil" + "testing" + "time" + + . "github.com/libp2p/go-libp2p-circuit" + + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peerstore" + + swarm "github.com/libp2p/go-libp2p-swarm" + swarmt "github.com/libp2p/go-libp2p-swarm/testing" + ma "github.com/multiformats/go-multiaddr" +) + +const TestProto = "test/relay-transport" + +var msg = []byte("relay works!") + +func testSetupRelay(t *testing.T) []host.Host { + hosts := getNetHosts(t, 3) + + err := AddRelayTransport(hosts[0], swarmt.GenUpgrader(t, hosts[0].Network().(*swarm.Swarm))) + if err != nil { + t.Fatal(err) + } + + err = AddRelayTransport(hosts[1], swarmt.GenUpgrader(t, hosts[1].Network().(*swarm.Swarm)), OptHop) + if err != nil { + t.Fatal(err) + } + + err = AddRelayTransport(hosts[2], swarmt.GenUpgrader(t, hosts[2].Network().(*swarm.Swarm))) + if err != nil { + t.Fatal(err) + } + + connect(t, hosts[0], hosts[1]) + connect(t, hosts[1], hosts[2]) + + time.Sleep(100 * time.Millisecond) + + handler := func(s network.Stream) { + _, err := s.Write(msg) + if err != nil { + t.Error(err) + } + s.Close() + } + + hosts[2].SetStreamHandler(TestProto, handler) + + return hosts +} + +func TestFullAddressTransportDial(t *testing.T) { + hosts := testSetupRelay(t) + + var relayAddr ma.Multiaddr + for _, addr := range hosts[1].Addrs() { + // skip relay addrs. + if _, err := addr.ValueForProtocol(ma.P_CIRCUIT); err != nil { + relayAddr = addr + } + } + + addr, err := ma.NewMultiaddr(fmt.Sprintf("%s/p2p/%s/p2p-circuit/p2p/%s", relayAddr.String(), hosts[1].ID().Pretty(), hosts[2].ID().Pretty())) + if err != nil { + t.Fatal(err) + } + + hosts[0].Peerstore().AddAddrs(hosts[2].ID(), []ma.Multiaddr{addr}, peerstore.TempAddrTTL) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + s, err := hosts[0].NewStream(ctx, hosts[2].ID(), TestProto) + if err != nil { + t.Fatal(err) + } + + data, err := ioutil.ReadAll(s) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(data, msg) { + t.Fatal("message was incorrect:", string(data)) + } +} + +func TestSpecificRelayTransportDial(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := testSetupRelay(t) + + addr, err := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s/p2p-circuit/ipfs/%s", hosts[1].ID().Pretty(), hosts[2].ID().Pretty())) + if err != nil { + t.Fatal(err) + } + + rctx, rcancel := context.WithTimeout(ctx, time.Second) + defer rcancel() + + hosts[0].Peerstore().AddAddrs(hosts[2].ID(), []ma.Multiaddr{addr}, peerstore.TempAddrTTL) + + s, err := hosts[0].NewStream(rctx, hosts[2].ID(), TestProto) + if err != nil { + t.Fatal(err) + } + + data, err := ioutil.ReadAll(s) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(data, msg) { + t.Fatal("message was incorrect:", string(data)) + } +} + +func TestUnspecificRelayTransportDialFails(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := testSetupRelay(t) + + addr, err := ma.NewMultiaddr(fmt.Sprintf("/p2p-circuit/ipfs/%s", hosts[2].ID().Pretty())) + if err != nil { + t.Fatal(err) + } + + rctx, rcancel := context.WithTimeout(ctx, time.Second) + defer rcancel() + + hosts[0].Peerstore().AddAddrs(hosts[2].ID(), []ma.Multiaddr{addr}, peerstore.TempAddrTTL) + + _, err = hosts[0].NewStream(rctx, hosts[2].ID(), TestProto) + if err == nil { + t.Fatal("dial to unspecified address should have failed") + } + +} diff --git a/p2p/protocol/internal/circuitv1-deprecated/util.go b/p2p/protocol/internal/circuitv1-deprecated/util.go new file mode 100644 index 00000000..4435e0b5 --- /dev/null +++ b/p2p/protocol/internal/circuitv1-deprecated/util.go @@ -0,0 +1,118 @@ +package relay + +import ( + "errors" + "io" + + pb "github.com/libp2p/go-libp2p-circuit/pb" + + "github.com/libp2p/go-libp2p-core/peer" + + pool "github.com/libp2p/go-buffer-pool" + "github.com/libp2p/go-msgio/protoio" + + "github.com/gogo/protobuf/proto" + ma "github.com/multiformats/go-multiaddr" + "github.com/multiformats/go-varint" +) + +func peerToPeerInfo(p *pb.CircuitRelay_Peer) (peer.AddrInfo, error) { + if p == nil { + return peer.AddrInfo{}, errors.New("nil peer") + } + + id, err := peer.IDFromBytes(p.Id) + if err != nil { + return peer.AddrInfo{}, err + } + + addrs := make([]ma.Multiaddr, 0, len(p.Addrs)) + for _, addrBytes := range p.Addrs { + a, err := ma.NewMultiaddrBytes(addrBytes) + if err == nil { + addrs = append(addrs, a) + } + } + + return peer.AddrInfo{ID: id, Addrs: addrs}, nil +} + +func peerInfoToPeer(pi peer.AddrInfo) *pb.CircuitRelay_Peer { + addrs := make([][]byte, len(pi.Addrs)) + for i, addr := range pi.Addrs { + addrs[i] = addr.Bytes() + } + + p := new(pb.CircuitRelay_Peer) + p.Id = []byte(pi.ID) + p.Addrs = addrs + + return p +} + +func incrementTag(v int) int { + return v + 1 +} + +func decrementTag(v int) int { + if v > 0 { + return v - 1 + } else { + return v + } +} + +type delimitedReader struct { + r io.Reader + buf []byte +} + +// The gogo protobuf NewDelimitedReader is buffered, which may eat up stream data. +// So we need to implement a compatible delimited reader that reads unbuffered. +// There is a slowdown from unbuffered reading: when reading the message +// it can take multiple single byte Reads to read the length and another Read +// to read the message payload. +// However, this is not critical performance degradation as +// - the reader is utilized to read one (dialer, stop) or two messages (hop) during +// the handshake, so it's a drop in the water for the connection lifetime. +// - messages are small (max 4k) and the length fits in a couple of bytes, +// so overall we have at most three reads per message. +func newDelimitedReader(r io.Reader, maxSize int) *delimitedReader { + return &delimitedReader{r: r, buf: pool.Get(maxSize)} +} + +func (d *delimitedReader) Close() { + if d.buf != nil { + pool.Put(d.buf) + d.buf = nil + } +} + +func (d *delimitedReader) ReadByte() (byte, error) { + buf := d.buf[:1] + _, err := d.r.Read(buf) + return buf[0], err +} + +func (d *delimitedReader) ReadMsg(msg proto.Message) error { + mlen, err := varint.ReadUvarint(d) + if err != nil { + return err + } + + if uint64(len(d.buf)) < mlen { + return errors.New("message too large") + } + + buf := d.buf[:mlen] + _, err = io.ReadFull(d.r, buf) + if err != nil { + return err + } + + return proto.Unmarshal(buf, msg) +} + +func newDelimitedWriter(w io.Writer) protoio.WriteCloser { + return protoio.NewDelimitedWriter(w) +}