diff --git a/p2pclient/dht.go b/p2pclient/dht.go index 3383c06..d77723b 100644 --- a/p2pclient/dht.go +++ b/p2pclient/dht.go @@ -21,6 +21,10 @@ type PeerInfo struct { } func convertPbPeerInfo(pbi *pb.PeerInfo) (*PeerInfo, error) { + if pbi == nil { + return nil, errors.New("null peerinfo") + } + id, err := peer.IDFromBytes(pbi.GetId()) if err != nil { return nil, err @@ -87,7 +91,7 @@ func (c *Client) FindPeer(peer peer.ID) (*PeerInfo, error) { return nil, err } - out := make(chan PeerInfo, 10) + out := make(chan *PeerInfo, 10) defer close(out) defer control.Close() @@ -124,3 +128,44 @@ func (c *Client) FindPeer(peer peer.ID) (*PeerInfo, error) { return info, nil } + +// FindPeersConnectedToPeer queries the DHT for peers that have an active +// connection to a given peer. +func (c *Client) FindPeersConnectedToPeer(ctx context.Context, peer peer.ID) (<-chan *PeerInfo, error) { + control, err := c.newControlConn() + if err != nil { + return nil, err + } + + out := make(chan *PeerInfo, 10) + w := ggio.NewDelimitedWriter(control) + req := newDHTReq(&pb.DHTRequest{ + Type: pb.DHTRequest_FIND_PEERS_CONNECTED_TO_PEER.Enum(), + Peer: []byte(peer), + }) + + if err := w.WriteMsg(req); err != nil { + return nil, err + } + + go func() { + defer close(out) + respc := readDhtResponseStream(ctx, control) + + for resp := range respc { + if resp.GetType() != pb.Response_OK { + log.Errorf("error from daemon in findpeersconnectedtopeer: %s", resp.GetError().GetMsg()) + return + } + + info, err := convertPbPeerInfo(resp.Dht.GetPeer()) + if err != nil { + continue + } + + out <- info + } + }() + + return out, nil +}