diff --git a/p2pclient/dht.go b/p2pclient/dht.go index 146b133..d6aad09 100644 --- a/p2pclient/dht.go +++ b/p2pclient/dht.go @@ -7,6 +7,7 @@ import ( "net" ggio "github.com/gogo/protobuf/io" + cid "github.com/ipfs/go-cid" pb "github.com/libp2p/go-libp2p-daemon/pb" peer "github.com/libp2p/go-libp2p-peer" ma "github.com/multiformats/go-multiaddr" @@ -140,9 +141,20 @@ func (c *Client) FindPeer(peer peer.ID) (*PeerInfo, error) { return info, nil } -// FindPeersConnectedToPeer queries the DHT for peers that have an active -// connection to a given peer. -func (c *Client) FindPeersConnectedToPeer(ctx context.Context, peer peer.ID) (<-chan *PeerInfo, error) { +func convertResponseToPeerInfo(out chan<- *PeerInfo, respc <-chan *pb.DHTResponse) { + defer close(out) + + for resp := range respc { + info, err := convertPbPeerInfo(resp.GetPeer()) + if err != nil { + continue + } + + out <- info + } +} + +func (c *Client) peerStreamRequest(ctx context.Context, req *pb.Request) (<-chan *PeerInfo, error) { control, err := c.newControlConn() if err != nil { return nil, err @@ -150,10 +162,6 @@ func (c *Client) FindPeersConnectedToPeer(ctx context.Context, peer peer.ID) (<- out := make(chan *PeerInfo, 10) w := ggio.NewDelimitedWriter(control) - req := newDHTReq(&pb.DHTRequest{ - Type: pb.DHTRequest_FIND_PEERS_CONNECTED_TO_PEER.Enum(), - Peer: []byte(peer), - }) if err = w.WriteMsg(req); err != nil { return nil, err @@ -164,18 +172,29 @@ func (c *Client) FindPeersConnectedToPeer(ctx context.Context, peer peer.ID) (<- return nil, err } - go func() { - defer close(out) - - for resp := range respc { - info, err := convertPbPeerInfo(resp.GetPeer()) - if err != nil { - continue - } - - out <- info - } - }() + go convertResponseToPeerInfo(out, respc) return out, nil } + +// FindPeersConnectedToPeer queries the DHT for peers that have an active +// connection to a given peer. +func (c *Client) FindPeersConnectedToPeer(ctx context.Context, peer peer.ID) (<-chan *PeerInfo, error) { + req := newDHTReq(&pb.DHTRequest{ + Type: pb.DHTRequest_FIND_PEERS_CONNECTED_TO_PEER.Enum(), + Peer: []byte(peer), + }) + + return c.peerStreamRequest(ctx, req) +} + +// FindProviders queries the DHT for peers that provide a piece of content +// identified by a CID. +func (c *Client) FindProviders(ctx context.Context, cid cid.Cid) (<-chan *PeerInfo, error) { + req := newDHTReq(&pb.DHTRequest{ + Type: pb.DHTRequest_FIND_PROVIDERS.Enum(), + Cid: cid.Bytes(), + }) + + return c.peerStreamRequest(ctx, req) +}