Abstract out request-response flow, finish api
This commit is contained in:
parent
ca4dcdf33e
commit
6b51a250df
206
p2pclient/dht.go
206
p2pclient/dht.go
|
@ -8,6 +8,7 @@ import (
|
||||||
|
|
||||||
ggio "github.com/gogo/protobuf/io"
|
ggio "github.com/gogo/protobuf/io"
|
||||||
cid "github.com/ipfs/go-cid"
|
cid "github.com/ipfs/go-cid"
|
||||||
|
crypto "github.com/libp2p/go-libp2p-crypto"
|
||||||
pb "github.com/libp2p/go-libp2p-daemon/pb"
|
pb "github.com/libp2p/go-libp2p-daemon/pb"
|
||||||
peer "github.com/libp2p/go-libp2p-peer"
|
peer "github.com/libp2p/go-libp2p-peer"
|
||||||
ma "github.com/multiformats/go-multiaddr"
|
ma "github.com/multiformats/go-multiaddr"
|
||||||
|
@ -96,25 +97,15 @@ func readDhtResponseStream(ctx context.Context, control net.Conn) (<-chan *pb.DH
|
||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// FindPeer queries the daemon for a peer's address.
|
func (c *Client) request(req *pb.Request) (*pb.DHTResponse, error) {
|
||||||
func (c *Client) FindPeer(peer peer.ID) (*PeerInfo, error) {
|
|
||||||
control, err := c.newControlConn()
|
control, err := c.newControlConn()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
out := make(chan *PeerInfo, 10)
|
|
||||||
defer close(out)
|
|
||||||
defer control.Close()
|
defer control.Close()
|
||||||
|
|
||||||
req := newDHTReq(&pb.DHTRequest{
|
|
||||||
Type: pb.DHTRequest_FIND_PEER.Enum(),
|
|
||||||
Peer: []byte(peer),
|
|
||||||
})
|
|
||||||
|
|
||||||
w := ggio.NewDelimitedWriter(control)
|
w := ggio.NewDelimitedWriter(control)
|
||||||
if err = w.WriteMsg(req); err != nil {
|
if err = w.WriteMsg(req); err != nil {
|
||||||
control.Close()
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -123,8 +114,10 @@ func (c *Client) FindPeer(peer peer.ID) (*PeerInfo, error) {
|
||||||
if err = r.ReadMsg(msg); err != nil {
|
if err = r.ReadMsg(msg); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if msg.GetType() == pb.Response_ERROR {
|
if msg.GetType() == pb.Response_ERROR {
|
||||||
err = fmt.Errorf("error from daemon in findpeer: %s", msg.GetError().GetMsg())
|
err := fmt.Errorf("error from daemon in %s response: %s", req.GetType().String(), msg.GetError())
|
||||||
|
log.Errorf(err.Error())
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -133,7 +126,22 @@ func (c *Client) FindPeer(peer peer.ID) (*PeerInfo, error) {
|
||||||
return nil, errors.New("dht response was not populated in findpeer")
|
return nil, errors.New("dht response was not populated in findpeer")
|
||||||
}
|
}
|
||||||
|
|
||||||
info, err := convertPbPeerInfo(dht.GetPeer())
|
return dht, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FindPeer queries the daemon for a peer's address.
|
||||||
|
func (c *Client) FindPeer(peer peer.ID) (*PeerInfo, error) {
|
||||||
|
req := newDHTReq(&pb.DHTRequest{
|
||||||
|
Type: pb.DHTRequest_FIND_PEER.Enum(),
|
||||||
|
Peer: []byte(peer),
|
||||||
|
})
|
||||||
|
|
||||||
|
msg, err := c.request(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
info, err := convertPbPeerInfo(msg.GetPeer())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -141,39 +149,157 @@ func (c *Client) FindPeer(peer peer.ID) (*PeerInfo, error) {
|
||||||
return info, nil
|
return info, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func convertResponseToPeerInfo(out chan<- *PeerInfo, respc <-chan *pb.DHTResponse) {
|
// GetPublicKey queries the daemon for a peer's address.
|
||||||
defer close(out)
|
func (c *Client) GetPublicKey(peer peer.ID) (crypto.PubKey, error) {
|
||||||
|
req := newDHTReq(&pb.DHTRequest{
|
||||||
|
Type: pb.DHTRequest_GET_PUBLIC_KEY.Enum(),
|
||||||
|
Peer: []byte(peer),
|
||||||
|
})
|
||||||
|
|
||||||
for resp := range respc {
|
msg, err := c.request(req)
|
||||||
info, err := convertPbPeerInfo(resp.GetPeer())
|
if err != nil {
|
||||||
if err != nil {
|
return nil, err
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
out <- info
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
key, err := crypto.UnmarshalPublicKey(msg.GetValue())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return key, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) peerStreamRequest(ctx context.Context, req *pb.Request) (<-chan *PeerInfo, error) {
|
// GetValue queries the daemon for a value stored at a key.
|
||||||
|
func (c *Client) GetValue(key string) ([]byte, error) {
|
||||||
|
req := newDHTReq(&pb.DHTRequest{
|
||||||
|
Type: pb.DHTRequest_FIND_PEER.Enum(),
|
||||||
|
Key: &key,
|
||||||
|
})
|
||||||
|
|
||||||
|
msg, err := c.request(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return msg.GetValue(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// PutValue sets the value stored at a given key in the DHT to a given value.
|
||||||
|
func (c *Client) PutValue(key string, value []byte) error {
|
||||||
|
req := newDHTReq(&pb.DHTRequest{
|
||||||
|
Type: pb.DHTRequest_PUT_VALUE.Enum(),
|
||||||
|
Key: &key,
|
||||||
|
Value: value,
|
||||||
|
})
|
||||||
|
|
||||||
|
_, err := c.request(req)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Provide announces that our peer provides content described by a CID.
|
||||||
|
func (c *Client) Provide(id cid.Cid) error {
|
||||||
|
req := newDHTReq(&pb.DHTRequest{
|
||||||
|
Type: pb.DHTRequest_PUT_VALUE.Enum(),
|
||||||
|
Cid: id.Bytes(),
|
||||||
|
})
|
||||||
|
|
||||||
|
_, err := c.request(req)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func convertResponseToPeerInfo(respc <-chan *pb.DHTResponse) <-chan *PeerInfo {
|
||||||
|
out := make(chan *PeerInfo, 10)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer close(out)
|
||||||
|
|
||||||
|
for resp := range respc {
|
||||||
|
info, err := convertPbPeerInfo(resp.GetPeer())
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
out <- info
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func convertResponseToPeerID(respc <-chan *pb.DHTResponse) <-chan peer.ID {
|
||||||
|
out := make(chan peer.ID, 10)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer close(out)
|
||||||
|
|
||||||
|
for resp := range respc {
|
||||||
|
id, err := peer.IDFromBytes(resp.GetValue())
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
out <- id
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func convertResponseToValue(respc <-chan *pb.DHTResponse) <-chan []byte {
|
||||||
|
out := make(chan []byte, 10)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer close(out)
|
||||||
|
|
||||||
|
for resp := range respc {
|
||||||
|
out <- resp.GetValue()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) streamRequest(ctx context.Context, req *pb.Request) (<-chan *pb.DHTResponse, error) {
|
||||||
control, err := c.newControlConn()
|
control, err := c.newControlConn()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
out := make(chan *PeerInfo, 10)
|
|
||||||
w := ggio.NewDelimitedWriter(control)
|
w := ggio.NewDelimitedWriter(control)
|
||||||
|
|
||||||
if err = w.WriteMsg(req); err != nil {
|
if err = w.WriteMsg(req); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
respc, err := readDhtResponseStream(ctx, control)
|
return readDhtResponseStream(ctx, control)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) streamRequestPeerInfo(ctx context.Context, req *pb.Request) (<-chan *PeerInfo, error) {
|
||||||
|
respc, err := c.streamRequest(ctx, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
go convertResponseToPeerInfo(out, respc)
|
out := convertResponseToPeerInfo(respc)
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) streamRequestPeerID(ctx context.Context, req *pb.Request) (<-chan peer.ID, error) {
|
||||||
|
respc, err := c.streamRequest(ctx, req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
out := convertResponseToPeerID(respc)
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) streamRequestValue(ctx context.Context, req *pb.Request) (<-chan []byte, error) {
|
||||||
|
respc, err := c.streamRequest(ctx, req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
out := convertResponseToValue(respc)
|
||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -185,7 +311,7 @@ func (c *Client) FindPeersConnectedToPeer(ctx context.Context, peer peer.ID) (<-
|
||||||
Peer: []byte(peer),
|
Peer: []byte(peer),
|
||||||
})
|
})
|
||||||
|
|
||||||
return c.peerStreamRequest(ctx, req)
|
return c.streamRequestPeerInfo(ctx, req)
|
||||||
}
|
}
|
||||||
|
|
||||||
// FindProviders queries the DHT for peers that provide a piece of content
|
// FindProviders queries the DHT for peers that provide a piece of content
|
||||||
|
@ -196,5 +322,27 @@ func (c *Client) FindProviders(ctx context.Context, cid cid.Cid) (<-chan *PeerIn
|
||||||
Cid: cid.Bytes(),
|
Cid: cid.Bytes(),
|
||||||
})
|
})
|
||||||
|
|
||||||
return c.peerStreamRequest(ctx, req)
|
return c.streamRequestPeerInfo(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetClosestPeers queries the DHT for peers that provide a piece of content
|
||||||
|
// identified by a CID.
|
||||||
|
func (c *Client) GetClosestPeers(ctx context.Context, key string) (<-chan peer.ID, error) {
|
||||||
|
req := newDHTReq(&pb.DHTRequest{
|
||||||
|
Type: pb.DHTRequest_GET_CLOSEST_PEERS.Enum(),
|
||||||
|
Key: &key,
|
||||||
|
})
|
||||||
|
|
||||||
|
return c.streamRequestPeerID(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SearchValue queries the DHT for the best/most valid value stored at a key.
|
||||||
|
// Later responses are better.
|
||||||
|
func (c *Client) SearchValue(ctx context.Context, key string) (<-chan []byte, error) {
|
||||||
|
req := newDHTReq(&pb.DHTRequest{
|
||||||
|
Type: pb.DHTRequest_SEARCH_VALUE.Enum(),
|
||||||
|
Key: &key,
|
||||||
|
})
|
||||||
|
|
||||||
|
return c.streamRequestValue(ctx, req)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue