Abstract streaming responses

This commit is contained in:
Cole Brown 2018-10-30 15:48:22 -04:00
parent d8f14167cb
commit ca4dcdf33e
1 changed files with 38 additions and 19 deletions

View File

@ -7,6 +7,7 @@ import (
"net"
ggio "github.com/gogo/protobuf/io"
cid "github.com/ipfs/go-cid"
pb "github.com/libp2p/go-libp2p-daemon/pb"
peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
@ -140,31 +141,7 @@ func (c *Client) FindPeer(peer peer.ID) (*PeerInfo, error) {
return info, nil
}
// FindPeersConnectedToPeer queries the DHT for peers that have an active
// connection to a given peer.
func (c *Client) FindPeersConnectedToPeer(ctx context.Context, peer peer.ID) (<-chan *PeerInfo, error) {
control, err := c.newControlConn()
if err != nil {
return nil, err
}
out := make(chan *PeerInfo, 10)
w := ggio.NewDelimitedWriter(control)
req := newDHTReq(&pb.DHTRequest{
Type: pb.DHTRequest_FIND_PEERS_CONNECTED_TO_PEER.Enum(),
Peer: []byte(peer),
})
if err = w.WriteMsg(req); err != nil {
return nil, err
}
respc, err := readDhtResponseStream(ctx, control)
if err != nil {
return nil, err
}
go func() {
func convertResponseToPeerInfo(out chan<- *PeerInfo, respc <-chan *pb.DHTResponse) {
defer close(out)
for resp := range respc {
@ -175,7 +152,49 @@ func (c *Client) FindPeersConnectedToPeer(ctx context.Context, peer peer.ID) (<-
out <- info
}
}()
}
func (c *Client) peerStreamRequest(ctx context.Context, req *pb.Request) (<-chan *PeerInfo, error) {
control, err := c.newControlConn()
if err != nil {
return nil, err
}
out := make(chan *PeerInfo, 10)
w := ggio.NewDelimitedWriter(control)
if err = w.WriteMsg(req); err != nil {
return nil, err
}
respc, err := readDhtResponseStream(ctx, control)
if err != nil {
return nil, err
}
go convertResponseToPeerInfo(out, respc)
return out, nil
}
// FindPeersConnectedToPeer queries the DHT for peers that have an active
// connection to a given peer.
func (c *Client) FindPeersConnectedToPeer(ctx context.Context, peer peer.ID) (<-chan *PeerInfo, error) {
req := newDHTReq(&pb.DHTRequest{
Type: pb.DHTRequest_FIND_PEERS_CONNECTED_TO_PEER.Enum(),
Peer: []byte(peer),
})
return c.peerStreamRequest(ctx, req)
}
// FindProviders queries the DHT for peers that provide a piece of content
// identified by a CID.
func (c *Client) FindProviders(ctx context.Context, cid cid.Cid) (<-chan *PeerInfo, error) {
req := newDHTReq(&pb.DHTRequest{
Type: pb.DHTRequest_FIND_PROVIDERS.Enum(),
Cid: cid.Bytes(),
})
return c.peerStreamRequest(ctx, req)
}