implement dht GetValue SearchValue PutValue

This commit is contained in:
vyzo 2018-09-30 14:32:01 +03:00
parent f5e74ac8ab
commit 533a8e781d
1 changed files with 63 additions and 10 deletions

73
dht.go
View File

@ -173,7 +173,6 @@ func (d *Daemon) doDHTGetClosestPeers(req *pb.DHTRequest) (*pb.Response, <-chan
}()
return dhtOkResponse(dhtResponseBegin()), rch, cancel
}
func (d *Daemon) doDHTGetPublicKey(req *pb.DHTRequest) (*pb.Response, <-chan *pb.DHTResponse, func()) {
@ -202,15 +201,68 @@ func (d *Daemon) doDHTGetPublicKey(req *pb.DHTRequest) (*pb.Response, <-chan *pb
}
func (d *Daemon) doDHTGetValue(req *pb.DHTRequest) (*pb.Response, <-chan *pb.DHTResponse, func()) {
return errorResponseString("XXX Implement me!"), nil, nil
if req.Key == nil {
return errorResponseString("Malformed request; missing key parameter"), nil, nil
}
ctx, cancel := d.dhtRequestContext(req)
defer cancel()
val, err := d.dht.GetValue(ctx, *req.Key)
if err != nil {
return errorResponse(err), nil, nil
}
return dhtOkResponse(dhtResponseValue(val)), nil, nil
}
func (d *Daemon) doDHTSearchValue(req *pb.DHTRequest) (*pb.Response, <-chan *pb.DHTResponse, func()) {
return errorResponseString("XXX Implement me!"), nil, nil
if req.Key == nil {
return errorResponseString("Malformed request; missing key parameter"), nil, nil
}
ctx, cancel := d.dhtRequestContext(req)
ch, err := d.dht.SearchValue(ctx, *req.Key)
if err != nil {
cancel()
return errorResponse(err), nil, nil
}
rch := make(chan *pb.DHTResponse)
go func() {
defer cancel()
defer close(rch)
for val := range ch {
select {
case rch <- dhtResponseValue(val):
case <-ctx.Done():
return
}
}
}()
return dhtOkResponse(dhtResponseBegin()), rch, cancel
}
func (d *Daemon) doDHTPutValue(req *pb.DHTRequest) (*pb.Response, <-chan *pb.DHTResponse, func()) {
return errorResponseString("XXX Implement me!"), nil, nil
if req.Key == nil {
return errorResponseString("Malformed request; missing key parameter"), nil, nil
}
if req.Value == nil {
return errorResponseString("Malformed request; missing value parameter"), nil, nil
}
ctx, cancel := d.dhtRequestContext(req)
defer cancel()
err := d.dht.PutValue(ctx, *req.Key, req.Value)
if err != nil {
return errorResponse(err), nil, nil
}
return okResponse(), nil, nil
}
func (d *Daemon) doDHTProvide(req *pb.DHTRequest) (*pb.Response, <-chan *pb.DHTResponse, func()) {
@ -246,10 +298,7 @@ func dhtResponsePeerInfo(pi pstore.PeerInfo) *pb.DHTResponse {
}
func dhtResponsePeerID(p peer.ID) *pb.DHTResponse {
return &pb.DHTResponse{
Type: pb.DHTResponse_VALUE.Enum(),
Value: []byte(p),
}
return dhtResponseValue([]byte(p))
}
func dhtResponsePublicKey(key crypto.PubKey) (*pb.DHTResponse, error) {
@ -257,10 +306,14 @@ func dhtResponsePublicKey(key crypto.PubKey) (*pb.DHTResponse, error) {
if err != nil {
return nil, err
}
return dhtResponseValue(bytes), nil
}
func dhtResponseValue(val []byte) *pb.DHTResponse {
return &pb.DHTResponse{
Type: pb.DHTResponse_VALUE.Enum(),
Value: bytes,
}, nil
Value: val,
}
}
func dhtOkResponse(r *pb.DHTResponse) *pb.Response {