From 533a8e781d49c6dee61c1adc0c52eddb477e8337 Mon Sep 17 00:00:00 2001 From: vyzo Date: Sun, 30 Sep 2018 14:32:01 +0300 Subject: [PATCH] implement dht GetValue SearchValue PutValue --- dht.go | 73 ++++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 63 insertions(+), 10 deletions(-) diff --git a/dht.go b/dht.go index a43526a..b8df8b7 100644 --- a/dht.go +++ b/dht.go @@ -173,7 +173,6 @@ func (d *Daemon) doDHTGetClosestPeers(req *pb.DHTRequest) (*pb.Response, <-chan }() return dhtOkResponse(dhtResponseBegin()), rch, cancel - } func (d *Daemon) doDHTGetPublicKey(req *pb.DHTRequest) (*pb.Response, <-chan *pb.DHTResponse, func()) { @@ -202,15 +201,68 @@ func (d *Daemon) doDHTGetPublicKey(req *pb.DHTRequest) (*pb.Response, <-chan *pb } func (d *Daemon) doDHTGetValue(req *pb.DHTRequest) (*pb.Response, <-chan *pb.DHTResponse, func()) { - return errorResponseString("XXX Implement me!"), nil, nil + if req.Key == nil { + return errorResponseString("Malformed request; missing key parameter"), nil, nil + } + + ctx, cancel := d.dhtRequestContext(req) + defer cancel() + + val, err := d.dht.GetValue(ctx, *req.Key) + if err != nil { + return errorResponse(err), nil, nil + } + + return dhtOkResponse(dhtResponseValue(val)), nil, nil } func (d *Daemon) doDHTSearchValue(req *pb.DHTRequest) (*pb.Response, <-chan *pb.DHTResponse, func()) { - return errorResponseString("XXX Implement me!"), nil, nil + if req.Key == nil { + return errorResponseString("Malformed request; missing key parameter"), nil, nil + } + + ctx, cancel := d.dhtRequestContext(req) + + ch, err := d.dht.SearchValue(ctx, *req.Key) + if err != nil { + cancel() + return errorResponse(err), nil, nil + } + + rch := make(chan *pb.DHTResponse) + go func() { + defer cancel() + defer close(rch) + for val := range ch { + select { + case rch <- dhtResponseValue(val): + case <-ctx.Done(): + return + } + } + }() + + return dhtOkResponse(dhtResponseBegin()), rch, cancel } func (d *Daemon) doDHTPutValue(req *pb.DHTRequest) (*pb.Response, <-chan *pb.DHTResponse, func()) { - return errorResponseString("XXX Implement me!"), nil, nil + if req.Key == nil { + return errorResponseString("Malformed request; missing key parameter"), nil, nil + } + + if req.Value == nil { + return errorResponseString("Malformed request; missing value parameter"), nil, nil + } + + ctx, cancel := d.dhtRequestContext(req) + defer cancel() + + err := d.dht.PutValue(ctx, *req.Key, req.Value) + if err != nil { + return errorResponse(err), nil, nil + } + + return okResponse(), nil, nil } func (d *Daemon) doDHTProvide(req *pb.DHTRequest) (*pb.Response, <-chan *pb.DHTResponse, func()) { @@ -246,10 +298,7 @@ func dhtResponsePeerInfo(pi pstore.PeerInfo) *pb.DHTResponse { } func dhtResponsePeerID(p peer.ID) *pb.DHTResponse { - return &pb.DHTResponse{ - Type: pb.DHTResponse_VALUE.Enum(), - Value: []byte(p), - } + return dhtResponseValue([]byte(p)) } func dhtResponsePublicKey(key crypto.PubKey) (*pb.DHTResponse, error) { @@ -257,10 +306,14 @@ func dhtResponsePublicKey(key crypto.PubKey) (*pb.DHTResponse, error) { if err != nil { return nil, err } + return dhtResponseValue(bytes), nil +} + +func dhtResponseValue(val []byte) *pb.DHTResponse { return &pb.DHTResponse{ Type: pb.DHTResponse_VALUE.Enum(), - Value: bytes, - }, nil + Value: val, + } } func dhtOkResponse(r *pb.DHTResponse) *pb.Response {