From 64d10c08726af33048e8eeb8df257628a3944870 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jano=C5=A1=20Gulja=C5=A1?= Date: Sat, 23 Feb 2019 10:47:33 +0100 Subject: [PATCH] swarm: mock store listings (#19157) * swarm/storage/mock: implement listings methods for mem and rpc stores * swarm/storage/mock/rpc: add comments and newTestStore helper function * swarm/storage/mock/mem: add missing comments * swarm/storage/mock: add comments to new types and constants * swarm/storage/mock/db: implement listings for mock/db global store * swarm/storage/mock/test: add comments for MockStoreListings * swarm/storage/mock/explorer: initial implementation * cmd/swarm/global-store: add chunk explorer * cmd/swarm/global-store: add chunk explorer tests * swarm/storage/mock/explorer: add tests * swarm/storage/mock/explorer: add swagger api definition * swarm/storage/mock/explorer: not-zero test values for invalid addr and key * swarm/storage/mock/explorer: test wildcard cors origin * swarm/storage/mock/db: renames based on Fabio's suggestions * swarm/storage/mock/explorer: add more comments to testHandler function * cmd/swarm/global-store: terminate subprocess with Kill in tests --- cmd/swarm/global-store/explorer.go | 66 +++ cmd/swarm/global-store/explorer_test.go | 254 ++++++++++ cmd/swarm/global-store/global_store.go | 24 +- cmd/swarm/global-store/global_store_test.go | 64 ++- cmd/swarm/global-store/main.go | 44 +- swarm/storage/mock/db/db.go | 295 ++++++++++-- swarm/storage/mock/db/db_test.go | 70 +-- swarm/storage/mock/explorer/explorer.go | 257 ++++++++++ swarm/storage/mock/explorer/explorer_test.go | 471 +++++++++++++++++++ swarm/storage/mock/explorer/headers_test.go | 163 +++++++ swarm/storage/mock/explorer/swagger.yaml | 176 +++++++ swarm/storage/mock/mem/mem.go | 270 +++++++++-- swarm/storage/mock/mem/mem_test.go | 6 + swarm/storage/mock/mock.go | 31 ++ swarm/storage/mock/rpc/rpc.go | 24 + swarm/storage/mock/rpc/rpc_test.go | 29 +- swarm/storage/mock/test/test.go | 118 +++++ 17 files changed, 2215 insertions(+), 147 deletions(-) create mode 100644 cmd/swarm/global-store/explorer.go create mode 100644 cmd/swarm/global-store/explorer_test.go create mode 100644 swarm/storage/mock/explorer/explorer.go create mode 100644 swarm/storage/mock/explorer/explorer_test.go create mode 100644 swarm/storage/mock/explorer/headers_test.go create mode 100644 swarm/storage/mock/explorer/swagger.yaml diff --git a/cmd/swarm/global-store/explorer.go b/cmd/swarm/global-store/explorer.go new file mode 100644 index 000000000..634ff1ebb --- /dev/null +++ b/cmd/swarm/global-store/explorer.go @@ -0,0 +1,66 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of go-ethereum. +// +// go-ethereum is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// go-ethereum is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with go-ethereum. If not, see . + +package main + +import ( + "context" + "fmt" + "net" + "net/http" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/swarm/storage/mock" + "github.com/ethereum/go-ethereum/swarm/storage/mock/explorer" + cli "gopkg.in/urfave/cli.v1" +) + +// serveChunkExplorer starts an http server in background with chunk explorer handler +// using the provided global store. Server is started if the returned shutdown function +// is not nil. +func serveChunkExplorer(ctx *cli.Context, globalStore mock.GlobalStorer) (shutdown func(), err error) { + if !ctx.IsSet("explorer-address") { + return nil, nil + } + + corsOrigins := ctx.StringSlice("explorer-cors-origin") + server := &http.Server{ + Handler: explorer.NewHandler(globalStore, corsOrigins), + IdleTimeout: 30 * time.Minute, + ReadTimeout: 2 * time.Minute, + WriteTimeout: 2 * time.Minute, + } + listener, err := net.Listen("tcp", ctx.String("explorer-address")) + if err != nil { + return nil, fmt.Errorf("explorer: %v", err) + } + log.Info("chunk explorer http", "address", listener.Addr().String(), "origins", corsOrigins) + + go func() { + if err := server.Serve(listener); err != nil { + log.Error("chunk explorer", "err", err) + } + }() + + return func() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := server.Shutdown(ctx); err != nil { + log.Error("chunk explorer: shutdown", "err", err) + } + }, nil +} diff --git a/cmd/swarm/global-store/explorer_test.go b/cmd/swarm/global-store/explorer_test.go new file mode 100644 index 000000000..2e4928c8f --- /dev/null +++ b/cmd/swarm/global-store/explorer_test.go @@ -0,0 +1,254 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of go-ethereum. +// +// go-ethereum is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// go-ethereum is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with go-ethereum. If not, see . + +package main + +import ( + "encoding/json" + "fmt" + "net/http" + "sort" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/swarm/storage/mock/explorer" + mockRPC "github.com/ethereum/go-ethereum/swarm/storage/mock/rpc" +) + +// TestExplorer validates basic chunk explorer functionality by storing +// a small set of chunk and making http requests on exposed endpoint. +// Full chunk explorer validation is done in mock/explorer package. +func TestExplorer(t *testing.T) { + addr := findFreeTCPAddress(t) + explorerAddr := findFreeTCPAddress(t) + testCmd := runGlobalStore(t, "ws", "--addr", addr, "--explorer-address", explorerAddr) + defer testCmd.Kill() + + client := websocketClient(t, addr) + + store := mockRPC.NewGlobalStore(client) + defer store.Close() + + nodeKeys := map[string][]string{ + "a1": {"b1", "b2", "b3"}, + "a2": {"b3", "b4", "b5"}, + } + + keyNodes := make(map[string][]string) + + for addr, keys := range nodeKeys { + for _, key := range keys { + keyNodes[key] = append(keyNodes[key], addr) + } + } + + invalidAddr := "c1" + invalidKey := "d1" + + for addr, keys := range nodeKeys { + for _, key := range keys { + err := store.Put(common.HexToAddress(addr), common.Hex2Bytes(key), []byte("data")) + if err != nil { + t.Fatal(err) + } + } + } + + endpoint := "http://" + explorerAddr + + t.Run("has key", func(t *testing.T) { + for addr, keys := range nodeKeys { + for _, key := range keys { + testStatusResponse(t, endpoint+"/api/has-key/"+addr+"/"+key, http.StatusOK) + testStatusResponse(t, endpoint+"/api/has-key/"+invalidAddr+"/"+key, http.StatusNotFound) + } + testStatusResponse(t, endpoint+"/api/has-key/"+addr+"/"+invalidKey, http.StatusNotFound) + } + testStatusResponse(t, endpoint+"/api/has-key/"+invalidAddr+"/"+invalidKey, http.StatusNotFound) + }) + + t.Run("keys", func(t *testing.T) { + var keys []string + for key := range keyNodes { + keys = append(keys, key) + } + sort.Strings(keys) + testKeysResponse(t, endpoint+"/api/keys", explorer.KeysResponse{ + Keys: keys, + }) + }) + + t.Run("nodes", func(t *testing.T) { + var nodes []string + for addr := range nodeKeys { + nodes = append(nodes, common.HexToAddress(addr).Hex()) + } + sort.Strings(nodes) + testNodesResponse(t, endpoint+"/api/nodes", explorer.NodesResponse{ + Nodes: nodes, + }) + }) + + t.Run("node keys", func(t *testing.T) { + for addr, keys := range nodeKeys { + testKeysResponse(t, endpoint+"/api/keys?node="+addr, explorer.KeysResponse{ + Keys: keys, + }) + } + testKeysResponse(t, endpoint+"/api/keys?node="+invalidAddr, explorer.KeysResponse{}) + }) + + t.Run("key nodes", func(t *testing.T) { + for key, addrs := range keyNodes { + var nodes []string + for _, addr := range addrs { + nodes = append(nodes, common.HexToAddress(addr).Hex()) + } + sort.Strings(nodes) + testNodesResponse(t, endpoint+"/api/nodes?key="+key, explorer.NodesResponse{ + Nodes: nodes, + }) + } + testNodesResponse(t, endpoint+"/api/nodes?key="+invalidKey, explorer.NodesResponse{}) + }) +} + +// TestExplorer_CORSOrigin validates if chunk explorer returns +// correct CORS origin header in GET and OPTIONS requests. +func TestExplorer_CORSOrigin(t *testing.T) { + origin := "http://localhost/" + addr := findFreeTCPAddress(t) + explorerAddr := findFreeTCPAddress(t) + testCmd := runGlobalStore(t, "ws", + "--addr", addr, + "--explorer-address", explorerAddr, + "--explorer-cors-origin", origin, + ) + defer testCmd.Kill() + + // wait until the server is started + waitHTTPEndpoint(t, explorerAddr) + + url := "http://" + explorerAddr + "/api/keys" + + t.Run("get", func(t *testing.T) { + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + t.Fatal(err) + } + req.Header.Set("Origin", origin) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatal(err) + } + header := resp.Header.Get("Access-Control-Allow-Origin") + if header != origin { + t.Errorf("got Access-Control-Allow-Origin header %q, want %q", header, origin) + } + }) + + t.Run("preflight", func(t *testing.T) { + req, err := http.NewRequest(http.MethodOptions, url, nil) + if err != nil { + t.Fatal(err) + } + req.Header.Set("Origin", origin) + req.Header.Set("Access-Control-Request-Method", "GET") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatal(err) + } + header := resp.Header.Get("Access-Control-Allow-Origin") + if header != origin { + t.Errorf("got Access-Control-Allow-Origin header %q, want %q", header, origin) + } + }) +} + +// testStatusResponse makes an http request to provided url +// and validates if response is explorer.StatusResponse for +// the expected status code. +func testStatusResponse(t *testing.T, url string, code int) { + t.Helper() + + resp, err := http.Get(url) + if err != nil { + t.Fatal(err) + } + if resp.StatusCode != code { + t.Errorf("got status code %v, want %v", resp.StatusCode, code) + } + var r explorer.StatusResponse + if err := json.NewDecoder(resp.Body).Decode(&r); err != nil { + t.Fatal(err) + } + if r.Code != code { + t.Errorf("got response code %v, want %v", r.Code, code) + } + if r.Message != http.StatusText(code) { + t.Errorf("got response message %q, want %q", r.Message, http.StatusText(code)) + } +} + +// testKeysResponse makes an http request to provided url +// and validates if response machhes expected explorer.KeysResponse. +func testKeysResponse(t *testing.T, url string, want explorer.KeysResponse) { + t.Helper() + + resp, err := http.Get(url) + if err != nil { + t.Fatal(err) + } + if resp.StatusCode != http.StatusOK { + t.Errorf("got status code %v, want %v", resp.StatusCode, http.StatusOK) + } + var r explorer.KeysResponse + if err := json.NewDecoder(resp.Body).Decode(&r); err != nil { + t.Fatal(err) + } + if fmt.Sprint(r.Keys) != fmt.Sprint(want.Keys) { + t.Errorf("got keys %v, want %v", r.Keys, want.Keys) + } + if r.Next != want.Next { + t.Errorf("got next %s, want %s", r.Next, want.Next) + } +} + +// testNodeResponse makes an http request to provided url +// and validates if response machhes expected explorer.NodeResponse. +func testNodesResponse(t *testing.T, url string, want explorer.NodesResponse) { + t.Helper() + + resp, err := http.Get(url) + if err != nil { + t.Fatal(err) + } + if resp.StatusCode != http.StatusOK { + t.Errorf("got status code %v, want %v", resp.StatusCode, http.StatusOK) + } + var r explorer.NodesResponse + if err := json.NewDecoder(resp.Body).Decode(&r); err != nil { + t.Fatal(err) + } + if fmt.Sprint(r.Nodes) != fmt.Sprint(want.Nodes) { + t.Errorf("got nodes %v, want %v", r.Nodes, want.Nodes) + } + if r.Next != want.Next { + t.Errorf("got next %s, want %s", r.Next, want.Next) + } +} diff --git a/cmd/swarm/global-store/global_store.go b/cmd/swarm/global-store/global_store.go index a55756e1c..f93b464db 100644 --- a/cmd/swarm/global-store/global_store.go +++ b/cmd/swarm/global-store/global_store.go @@ -17,6 +17,7 @@ package main import ( + "io" "net" "net/http" "os" @@ -66,7 +67,7 @@ func startWS(ctx *cli.Context) (err error) { return http.Serve(listener, server.WebsocketHandler(origins)) } -// newServer creates a global store and returns its RPC server. +// newServer creates a global store and starts a chunk explorer server if configured. // Returned cleanup function should be called only if err is nil. func newServer(ctx *cli.Context) (server *rpc.Server, cleanup func(), err error) { log.PrintOrigins(true) @@ -81,7 +82,9 @@ func newServer(ctx *cli.Context) (server *rpc.Server, cleanup func(), err error) return nil, nil, err } cleanup = func() { - dbStore.Close() + if err := dbStore.Close(); err != nil { + log.Error("global store: close", "err", err) + } } globalStore = dbStore log.Info("database global store", "dir", dir) @@ -96,5 +99,22 @@ func newServer(ctx *cli.Context) (server *rpc.Server, cleanup func(), err error) return nil, nil, err } + shutdown, err := serveChunkExplorer(ctx, globalStore) + if err != nil { + cleanup() + return nil, nil, err + } + if shutdown != nil { + cleanup = func() { + shutdown() + + if c, ok := globalStore.(io.Closer); ok { + if err := c.Close(); err != nil { + log.Error("global store: close", "err", err) + } + } + } + } + return server, cleanup, nil } diff --git a/cmd/swarm/global-store/global_store_test.go b/cmd/swarm/global-store/global_store_test.go index 63f1c5389..c437c9d45 100644 --- a/cmd/swarm/global-store/global_store_test.go +++ b/cmd/swarm/global-store/global_store_test.go @@ -69,16 +69,7 @@ func testHTTP(t *testing.T, put bool, args ...string) { // wait until global store process is started as // rpc.DialHTTP is actually not connecting - for i := 0; i < 1000; i++ { - _, err = http.DefaultClient.Get("http://" + addr) - if err == nil { - break - } - time.Sleep(10 * time.Millisecond) - } - if err != nil { - t.Fatal(err) - } + waitHTTPEndpoint(t, addr) store := mockRPC.NewGlobalStore(client) defer store.Close() @@ -137,19 +128,7 @@ func testWebsocket(t *testing.T, put bool, args ...string) { testCmd := runGlobalStore(t, append([]string{"ws", "--addr", addr}, args...)...) defer testCmd.Kill() - var client *rpc.Client - var err error - // wait until global store process is started - for i := 0; i < 1000; i++ { - client, err = rpc.DialWebsocket(context.Background(), "ws://"+addr, "") - if err == nil { - break - } - time.Sleep(10 * time.Millisecond) - } - if err != nil { - t.Fatal(err) - } + client := websocketClient(t, addr) store := mockRPC.NewGlobalStore(client) defer store.Close() @@ -160,7 +139,7 @@ func testWebsocket(t *testing.T, put bool, args ...string) { wantValue := "value" if put { - err = node.Put([]byte(wantKey), []byte(wantValue)) + err := node.Put([]byte(wantKey), []byte(wantValue)) if err != nil { t.Fatal(err) } @@ -189,3 +168,40 @@ func findFreeTCPAddress(t *testing.T) (addr string) { return listener.Addr().String() } + +// websocketClient waits until global store process is started +// and returns rpc client. +func websocketClient(t *testing.T, addr string) (client *rpc.Client) { + t.Helper() + + var err error + for i := 0; i < 1000; i++ { + client, err = rpc.DialWebsocket(context.Background(), "ws://"+addr, "") + if err == nil { + break + } + time.Sleep(10 * time.Millisecond) + } + if err != nil { + t.Fatal(err) + } + return client +} + +// waitHTTPEndpoint retries http requests to a provided +// address until the connection is established. +func waitHTTPEndpoint(t *testing.T, addr string) { + t.Helper() + + var err error + for i := 0; i < 1000; i++ { + _, err = http.Get("http://" + addr) + if err == nil { + break + } + time.Sleep(10 * time.Millisecond) + } + if err != nil { + t.Fatal(err) + } +} diff --git a/cmd/swarm/global-store/main.go b/cmd/swarm/global-store/main.go index 51df0099a..52fafc8f6 100644 --- a/cmd/swarm/global-store/main.go +++ b/cmd/swarm/global-store/main.go @@ -19,12 +19,14 @@ package main import ( "os" - "github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/log" cli "gopkg.in/urfave/cli.v1" ) -var gitCommit string // Git SHA1 commit hash of the release (set via linker flags) +var ( + version = "0.1" + gitCommit string // Git SHA1 commit hash of the release (set via linker flags) +) func main() { err := newApp().Run(os.Args) @@ -37,16 +39,30 @@ func main() { // newApp construct a new instance of Swarm Global Store. // Method Run is called on it in the main function and in tests. func newApp() (app *cli.App) { - app = utils.NewApp(gitCommit, "Swarm Global Store") - + app = cli.NewApp() app.Name = "global-store" + app.Version = version + if len(gitCommit) >= 8 { + app.Version += "-" + gitCommit[:8] + } + app.Usage = "Swarm Global Store" // app flags (for all commands) app.Flags = []cli.Flag{ cli.IntFlag{ Name: "verbosity", Value: 3, - Usage: "verbosity level", + Usage: "Verbosity level.", + }, + cli.StringFlag{ + Name: "explorer-address", + Value: "", + Usage: "Chunk explorer HTTP listener address.", + }, + cli.StringSliceFlag{ + Name: "explorer-cors-origin", + Value: nil, + Usage: "Chunk explorer CORS origin (can be specified multiple times).", }, } @@ -54,7 +70,7 @@ func newApp() (app *cli.App) { { Name: "http", Aliases: []string{"h"}, - Usage: "start swarm global store with http server", + Usage: "Start swarm global store with HTTP server.", Action: startHTTP, // Flags only for "start" command. // Allow app flags to be specified after the @@ -63,19 +79,19 @@ func newApp() (app *cli.App) { cli.StringFlag{ Name: "dir", Value: "", - Usage: "data directory", + Usage: "Data directory.", }, cli.StringFlag{ Name: "addr", Value: "0.0.0.0:3033", - Usage: "address to listen for http connection", + Usage: "Address to listen for HTTP connections.", }, ), }, { Name: "websocket", Aliases: []string{"ws"}, - Usage: "start swarm global store with websocket server", + Usage: "Start swarm global store with WebSocket server.", Action: startWS, // Flags only for "start" command. // Allow app flags to be specified after the @@ -84,17 +100,17 @@ func newApp() (app *cli.App) { cli.StringFlag{ Name: "dir", Value: "", - Usage: "data directory", + Usage: "Data directory.", }, cli.StringFlag{ Name: "addr", Value: "0.0.0.0:3033", - Usage: "address to listen for websocket connection", + Usage: "Address to listen for WebSocket connections.", }, cli.StringSliceFlag{ - Name: "origins", - Value: &cli.StringSlice{"*"}, - Usage: "websocket origins", + Name: "origin", + Value: nil, + Usage: "WebSocket CORS origin (can be specified multiple times).", }, ), }, diff --git a/swarm/storage/mock/db/db.go b/swarm/storage/mock/db/db.go index 73ae199e8..313a61b43 100644 --- a/swarm/storage/mock/db/db.go +++ b/swarm/storage/mock/db/db.go @@ -21,8 +21,12 @@ import ( "archive/tar" "bytes" "encoding/json" + "errors" + "fmt" "io" "io/ioutil" + "sync" + "time" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/util" @@ -37,6 +41,10 @@ import ( // release resources used by the database. type GlobalStore struct { db *leveldb.DB + // protects nodes and keys indexes + // in Put and Delete methods + nodesLocks sync.Map + keysLocks sync.Map } // NewGlobalStore creates a new instance of GlobalStore. @@ -64,14 +72,14 @@ func (s *GlobalStore) NewNodeStore(addr common.Address) *mock.NodeStore { // Get returns chunk data if the chunk with key exists for node // on address addr. func (s *GlobalStore) Get(addr common.Address, key []byte) (data []byte, err error) { - has, err := s.db.Has(nodeDBKey(addr, key), nil) + has, err := s.db.Has(indexForHashesPerNode(addr, key), nil) if err != nil { return nil, mock.ErrNotFound } if !has { return nil, mock.ErrNotFound } - data, err = s.db.Get(dataDBKey(key), nil) + data, err = s.db.Get(indexDataKey(key), nil) if err == leveldb.ErrNotFound { err = mock.ErrNotFound } @@ -80,28 +88,165 @@ func (s *GlobalStore) Get(addr common.Address, key []byte) (data []byte, err err // Put saves the chunk data for node with address addr. func (s *GlobalStore) Put(addr common.Address, key []byte, data []byte) error { + unlock, err := s.lock(addr, key) + if err != nil { + return err + } + defer unlock() + batch := new(leveldb.Batch) - batch.Put(nodeDBKey(addr, key), nil) - batch.Put(dataDBKey(key), data) + batch.Put(indexForHashesPerNode(addr, key), nil) + batch.Put(indexForNodesWithHash(key, addr), nil) + batch.Put(indexForNodes(addr), nil) + batch.Put(indexForHashes(key), nil) + batch.Put(indexDataKey(key), data) return s.db.Write(batch, nil) } // Delete removes the chunk reference to node with address addr. func (s *GlobalStore) Delete(addr common.Address, key []byte) error { + unlock, err := s.lock(addr, key) + if err != nil { + return err + } + defer unlock() + batch := new(leveldb.Batch) - batch.Delete(nodeDBKey(addr, key)) + batch.Delete(indexForHashesPerNode(addr, key)) + batch.Delete(indexForNodesWithHash(key, addr)) + + // check if this node contains any keys, and if not + // remove it from the + x := indexForHashesPerNodePrefix(addr) + if k, _ := s.db.Get(x, nil); !bytes.HasPrefix(k, x) { + batch.Delete(indexForNodes(addr)) + } + + x = indexForNodesWithHashPrefix(key) + if k, _ := s.db.Get(x, nil); !bytes.HasPrefix(k, x) { + batch.Delete(indexForHashes(key)) + } return s.db.Write(batch, nil) } // HasKey returns whether a node with addr contains the key. func (s *GlobalStore) HasKey(addr common.Address, key []byte) bool { - has, err := s.db.Has(nodeDBKey(addr, key), nil) + has, err := s.db.Has(indexForHashesPerNode(addr, key), nil) if err != nil { has = false } return has } +// Keys returns a paginated list of keys on all nodes. +func (s *GlobalStore) Keys(startKey []byte, limit int) (keys mock.Keys, err error) { + return s.keys(nil, startKey, limit) +} + +// Nodes returns a paginated list of all known nodes. +func (s *GlobalStore) Nodes(startAddr *common.Address, limit int) (nodes mock.Nodes, err error) { + return s.nodes(nil, startAddr, limit) +} + +// NodeKeys returns a paginated list of keys on a node with provided address. +func (s *GlobalStore) NodeKeys(addr common.Address, startKey []byte, limit int) (keys mock.Keys, err error) { + return s.keys(&addr, startKey, limit) +} + +// KeyNodes returns a paginated list of nodes that contain a particular key. +func (s *GlobalStore) KeyNodes(key []byte, startAddr *common.Address, limit int) (nodes mock.Nodes, err error) { + return s.nodes(key, startAddr, limit) +} + +// keys returns a paginated list of keys. If addr is not nil, only keys on that +// node will be returned. +func (s *GlobalStore) keys(addr *common.Address, startKey []byte, limit int) (keys mock.Keys, err error) { + iter := s.db.NewIterator(nil, nil) + defer iter.Release() + + if limit <= 0 { + limit = mock.DefaultLimit + } + + prefix := []byte{indexForHashesPrefix} + if addr != nil { + prefix = indexForHashesPerNodePrefix(*addr) + } + if startKey != nil { + if addr != nil { + startKey = indexForHashesPerNode(*addr, startKey) + } else { + startKey = indexForHashes(startKey) + } + } else { + startKey = prefix + } + + ok := iter.Seek(startKey) + if !ok { + return keys, iter.Error() + } + for ; ok; ok = iter.Next() { + k := iter.Key() + if !bytes.HasPrefix(k, prefix) { + break + } + key := append([]byte(nil), bytes.TrimPrefix(k, prefix)...) + + if len(keys.Keys) >= limit { + keys.Next = key + break + } + + keys.Keys = append(keys.Keys, key) + } + return keys, iter.Error() +} + +// nodes returns a paginated list of node addresses. If key is not nil, +// only nodes that contain that key will be returned. +func (s *GlobalStore) nodes(key []byte, startAddr *common.Address, limit int) (nodes mock.Nodes, err error) { + iter := s.db.NewIterator(nil, nil) + defer iter.Release() + + if limit <= 0 { + limit = mock.DefaultLimit + } + + prefix := []byte{indexForNodesPrefix} + if key != nil { + prefix = indexForNodesWithHashPrefix(key) + } + startKey := prefix + if startAddr != nil { + if key != nil { + startKey = indexForNodesWithHash(key, *startAddr) + } else { + startKey = indexForNodes(*startAddr) + } + } + + ok := iter.Seek(startKey) + if !ok { + return nodes, iter.Error() + } + for ; ok; ok = iter.Next() { + k := iter.Key() + if !bytes.HasPrefix(k, prefix) { + break + } + addr := common.BytesToAddress(append([]byte(nil), bytes.TrimPrefix(k, prefix)...)) + + if len(nodes.Addrs) >= limit { + nodes.Next = &addr + break + } + + nodes.Addrs = append(nodes.Addrs, addr) + } + return nodes, iter.Error() +} + // Import reads tar archive from a reader that contains exported chunk data. // It returns the number of chunks imported and an error. func (s *GlobalStore) Import(r io.Reader) (n int, err error) { @@ -126,12 +271,18 @@ func (s *GlobalStore) Import(r io.Reader) (n int, err error) { return n, err } + key := common.Hex2Bytes(hdr.Name) + batch := new(leveldb.Batch) for _, addr := range c.Addrs { - batch.Put(nodeDBKeyHex(addr, hdr.Name), nil) + batch.Put(indexForHashesPerNode(addr, key), nil) + batch.Put(indexForNodesWithHash(key, addr), nil) + batch.Put(indexForNodes(addr), nil) } - batch.Put(dataDBKey(common.Hex2Bytes(hdr.Name)), c.Data) + batch.Put(indexForHashes(key), nil) + batch.Put(indexDataKey(key), c.Data) + if err = s.db.Write(batch, nil); err != nil { return n, err } @@ -150,18 +301,23 @@ func (s *GlobalStore) Export(w io.Writer) (n int, err error) { buf := bytes.NewBuffer(make([]byte, 0, 1024)) encoder := json.NewEncoder(buf) - iter := s.db.NewIterator(util.BytesPrefix(nodeKeyPrefix), nil) + snap, err := s.db.GetSnapshot() + if err != nil { + return 0, err + } + + iter := snap.NewIterator(util.BytesPrefix([]byte{indexForHashesByNodePrefix}), nil) defer iter.Release() var currentKey string var addrs []common.Address - saveChunk := func(hexKey string) error { - key := common.Hex2Bytes(hexKey) + saveChunk := func() error { + hexKey := currentKey - data, err := s.db.Get(dataDBKey(key), nil) + data, err := snap.Get(indexDataKey(common.Hex2Bytes(hexKey)), nil) if err != nil { - return err + return fmt.Errorf("get data %s: %v", hexKey, err) } buf.Reset() @@ -189,8 +345,8 @@ func (s *GlobalStore) Export(w io.Writer) (n int, err error) { } for iter.Next() { - k := bytes.TrimPrefix(iter.Key(), nodeKeyPrefix) - i := bytes.Index(k, []byte("-")) + k := bytes.TrimPrefix(iter.Key(), []byte{indexForHashesByNodePrefix}) + i := bytes.Index(k, []byte{keyTermByte}) if i < 0 { continue } @@ -201,7 +357,7 @@ func (s *GlobalStore) Export(w io.Writer) (n int, err error) { } if hexKey != currentKey { - if err = saveChunk(currentKey); err != nil { + if err = saveChunk(); err != nil { return n, err } @@ -209,35 +365,112 @@ func (s *GlobalStore) Export(w io.Writer) (n int, err error) { } currentKey = hexKey - addrs = append(addrs, common.BytesToAddress(k[i:])) + addrs = append(addrs, common.BytesToAddress(k[i+1:])) } if len(addrs) > 0 { - if err = saveChunk(currentKey); err != nil { + if err = saveChunk(); err != nil { return n, err } } - return n, err + return n, iter.Error() } var ( - nodeKeyPrefix = []byte("node-") - dataKeyPrefix = []byte("data-") + // maximal time for lock to wait until it returns error + lockTimeout = 3 * time.Second + // duration between two lock checks. + lockCheckDelay = 30 * time.Microsecond + // error returned by lock method when lock timeout is reached + errLockTimeout = errors.New("lock timeout") ) -// nodeDBKey constructs a database key for key/node mappings. -func nodeDBKey(addr common.Address, key []byte) []byte { - return nodeDBKeyHex(addr, common.Bytes2Hex(key)) +// lock protects parallel writes in Put and Delete methods for both +// node with provided address and for data with provided key. +func (s *GlobalStore) lock(addr common.Address, key []byte) (unlock func(), err error) { + start := time.Now() + nodeLockKey := addr.Hex() + for { + _, loaded := s.nodesLocks.LoadOrStore(nodeLockKey, struct{}{}) + if !loaded { + break + } + time.Sleep(lockCheckDelay) + if time.Since(start) > lockTimeout { + return nil, errLockTimeout + } + } + start = time.Now() + keyLockKey := common.Bytes2Hex(key) + for { + _, loaded := s.keysLocks.LoadOrStore(keyLockKey, struct{}{}) + if !loaded { + break + } + time.Sleep(lockCheckDelay) + if time.Since(start) > lockTimeout { + return nil, errLockTimeout + } + } + return func() { + s.nodesLocks.Delete(nodeLockKey) + s.keysLocks.Delete(keyLockKey) + }, nil } -// nodeDBKeyHex constructs a database key for key/node mappings -// using the hexadecimal string representation of the key. -func nodeDBKeyHex(addr common.Address, hexKey string) []byte { - return append(append(nodeKeyPrefix, []byte(hexKey+"-")...), addr[:]...) +const ( + // prefixes for different indexes + indexDataPrefix = 0 + indexForNodesWithHashesPrefix = 1 + indexForHashesByNodePrefix = 2 + indexForNodesPrefix = 3 + indexForHashesPrefix = 4 + + // keyTermByte splits keys and node addresses + // in database keys + keyTermByte = 0xff +) + +// indexForHashesPerNode constructs a database key to store keys used in +// NodeKeys method. +func indexForHashesPerNode(addr common.Address, key []byte) []byte { + return append(indexForHashesPerNodePrefix(addr), key...) } -// dataDBkey constructs a database key for key/data storage. -func dataDBKey(key []byte) []byte { - return append(dataKeyPrefix, key...) +// indexForHashesPerNodePrefix returns a prefix containing a node address used in +// NodeKeys method. Node address is hex encoded to be able to use keyTermByte +// for splitting node address and key. +func indexForHashesPerNodePrefix(addr common.Address) []byte { + return append([]byte{indexForNodesWithHashesPrefix}, append([]byte(addr.Hex()), keyTermByte)...) +} + +// indexForNodesWithHash constructs a database key to store keys used in +// KeyNodes method. +func indexForNodesWithHash(key []byte, addr common.Address) []byte { + return append(indexForNodesWithHashPrefix(key), addr[:]...) +} + +// indexForNodesWithHashPrefix returns a prefix containing a key used in +// KeyNodes method. Key is hex encoded to be able to use keyTermByte +// for splitting key and node address. +func indexForNodesWithHashPrefix(key []byte) []byte { + return append([]byte{indexForHashesByNodePrefix}, append([]byte(common.Bytes2Hex(key)), keyTermByte)...) +} + +// indexForNodes constructs a database key to store keys used in +// Nodes method. +func indexForNodes(addr common.Address) []byte { + return append([]byte{indexForNodesPrefix}, addr[:]...) +} + +// indexForHashes constructs a database key to store keys used in +// Keys method. +func indexForHashes(key []byte) []byte { + return append([]byte{indexForHashesPrefix}, key...) +} + +// indexDataKey constructs a database key for key/data storage. +func indexDataKey(key []byte) []byte { + return append([]byte{indexDataPrefix}, key...) } diff --git a/swarm/storage/mock/db/db_test.go b/swarm/storage/mock/db/db_test.go index 782faaf35..efbf942f6 100644 --- a/swarm/storage/mock/db/db_test.go +++ b/swarm/storage/mock/db/db_test.go @@ -1,5 +1,3 @@ -// +build go1.8 -// // Copyright 2018 The go-ethereum Authors // This file is part of the go-ethereum library. // @@ -29,47 +27,49 @@ import ( // TestDBStore is running a test.MockStore tests // using test.MockStore function. func TestDBStore(t *testing.T) { - dir, err := ioutil.TempDir("", "mock_"+t.Name()) - if err != nil { - panic(err) - } - defer os.RemoveAll(dir) - - store, err := NewGlobalStore(dir) - if err != nil { - t.Fatal(err) - } - defer store.Close() + store, cleanup := newTestStore(t) + defer cleanup() test.MockStore(t, store, 100) } +// TestDBStoreListings is running test.MockStoreListings tests. +func TestDBStoreListings(t *testing.T) { + store, cleanup := newTestStore(t) + defer cleanup() + + test.MockStoreListings(t, store, 1000) +} + // TestImportExport is running a test.ImportExport tests // using test.MockStore function. func TestImportExport(t *testing.T) { - dir1, err := ioutil.TempDir("", "mock_"+t.Name()+"_exporter") - if err != nil { - panic(err) - } - defer os.RemoveAll(dir1) + store1, cleanup := newTestStore(t) + defer cleanup() - store1, err := NewGlobalStore(dir1) - if err != nil { - t.Fatal(err) - } - defer store1.Close() - - dir2, err := ioutil.TempDir("", "mock_"+t.Name()+"_importer") - if err != nil { - panic(err) - } - defer os.RemoveAll(dir2) - - store2, err := NewGlobalStore(dir2) - if err != nil { - t.Fatal(err) - } - defer store2.Close() + store2, cleanup := newTestStore(t) + defer cleanup() test.ImportExport(t, store1, store2, 100) } + +// newTestStore creates a temporary GlobalStore +// that will be closed and data deleted when +// calling returned cleanup function. +func newTestStore(t *testing.T) (s *GlobalStore, cleanup func()) { + dir, err := ioutil.TempDir("", "swarm-mock-db-") + if err != nil { + t.Fatal(err) + } + + s, err = NewGlobalStore(dir) + if err != nil { + os.RemoveAll(dir) + t.Fatal(err) + } + + return s, func() { + s.Close() + os.RemoveAll(dir) + } +} diff --git a/swarm/storage/mock/explorer/explorer.go b/swarm/storage/mock/explorer/explorer.go new file mode 100644 index 000000000..8fffff8fd --- /dev/null +++ b/swarm/storage/mock/explorer/explorer.go @@ -0,0 +1,257 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package explorer + +import ( + "bytes" + "encoding/json" + "io" + "net/http" + "net/url" + "strconv" + "strings" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/swarm/log" + "github.com/ethereum/go-ethereum/swarm/storage/mock" + "github.com/rs/cors" +) + +const jsonContentType = "application/json; charset=utf-8" + +// NewHandler constructs an http.Handler with router +// that servers requests required by chunk explorer. +// +// /api/has-key/{node}/{key} +// /api/keys?start={key}&node={node}&limit={int[0..1000]} +// /api/nodes?start={node}&key={key}&limit={int[0..1000]} +// +// Data from global store will be served and appropriate +// CORS headers will be sent if allowed origins are provided. +func NewHandler(store mock.GlobalStorer, corsOrigins []string) (handler http.Handler) { + mux := http.NewServeMux() + mux.Handle("/api/has-key/", newHasKeyHandler(store)) + mux.Handle("/api/keys", newKeysHandler(store)) + mux.Handle("/api/nodes", newNodesHandler(store)) + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + jsonStatusResponse(w, http.StatusNotFound) + }) + handler = noCacheHandler(mux) + if corsOrigins != nil { + handler = cors.New(cors.Options{ + AllowedOrigins: corsOrigins, + AllowedMethods: []string{"GET"}, + MaxAge: 600, + }).Handler(handler) + } + return handler +} + +// newHasKeyHandler returns a new handler that serves +// requests for HasKey global store method. +// Possible responses are StatusResponse with +// status codes 200 or 404 if the chunk is found or not. +func newHasKeyHandler(store mock.GlobalStorer) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + addr, key, ok := parseHasKeyPath(r.URL.Path) + if !ok { + jsonStatusResponse(w, http.StatusNotFound) + return + } + found := store.HasKey(addr, key) + if !found { + jsonStatusResponse(w, http.StatusNotFound) + return + } + jsonStatusResponse(w, http.StatusOK) + } +} + +// KeysResponse is a JSON-encoded response for global store +// Keys and NodeKeys methods. +type KeysResponse struct { + Keys []string `json:"keys"` + Next string `json:"next,omitempty"` +} + +// newKeysHandler returns a new handler that serves +// requests for Key global store method. +// HTTP response body will be JSON-encoded KeysResponse. +func newKeysHandler(store mock.GlobalStorer) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + q := r.URL.Query() + node := q.Get("node") + start, limit := listingPage(q) + + var keys mock.Keys + if node == "" { + var err error + keys, err = store.Keys(common.Hex2Bytes(start), limit) + if err != nil { + log.Error("chunk explorer: keys handler: get keys", "start", start, "err", err) + jsonStatusResponse(w, http.StatusInternalServerError) + return + } + } else { + var err error + keys, err = store.NodeKeys(common.HexToAddress(node), common.Hex2Bytes(start), limit) + if err != nil { + log.Error("chunk explorer: keys handler: get node keys", "node", node, "start", start, "err", err) + jsonStatusResponse(w, http.StatusInternalServerError) + return + } + } + ks := make([]string, len(keys.Keys)) + for i, k := range keys.Keys { + ks[i] = common.Bytes2Hex(k) + } + data, err := json.Marshal(KeysResponse{ + Keys: ks, + Next: common.Bytes2Hex(keys.Next), + }) + if err != nil { + log.Error("chunk explorer: keys handler: json marshal", "err", err) + jsonStatusResponse(w, http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", jsonContentType) + _, err = io.Copy(w, bytes.NewReader(data)) + if err != nil { + log.Error("chunk explorer: keys handler: write response", "err", err) + } + } +} + +// NodesResponse is a JSON-encoded response for global store +// Nodes and KeyNodes methods. +type NodesResponse struct { + Nodes []string `json:"nodes"` + Next string `json:"next,omitempty"` +} + +// newNodesHandler returns a new handler that serves +// requests for Nodes global store method. +// HTTP response body will be JSON-encoded NodesResponse. +func newNodesHandler(store mock.GlobalStorer) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + q := r.URL.Query() + key := q.Get("key") + var start *common.Address + queryStart, limit := listingPage(q) + if queryStart != "" { + s := common.HexToAddress(queryStart) + start = &s + } + + var nodes mock.Nodes + if key == "" { + var err error + nodes, err = store.Nodes(start, limit) + if err != nil { + log.Error("chunk explorer: nodes handler: get nodes", "start", queryStart, "err", err) + jsonStatusResponse(w, http.StatusInternalServerError) + return + } + } else { + var err error + nodes, err = store.KeyNodes(common.Hex2Bytes(key), start, limit) + if err != nil { + log.Error("chunk explorer: nodes handler: get key nodes", "key", key, "start", queryStart, "err", err) + jsonStatusResponse(w, http.StatusInternalServerError) + return + } + } + ns := make([]string, len(nodes.Addrs)) + for i, n := range nodes.Addrs { + ns[i] = n.Hex() + } + var next string + if nodes.Next != nil { + next = nodes.Next.Hex() + } + data, err := json.Marshal(NodesResponse{ + Nodes: ns, + Next: next, + }) + if err != nil { + log.Error("chunk explorer: nodes handler", "err", err) + jsonStatusResponse(w, http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", jsonContentType) + _, err = io.Copy(w, bytes.NewReader(data)) + if err != nil { + log.Error("chunk explorer: nodes handler: write response", "err", err) + } + } +} + +// parseHasKeyPath extracts address and key from HTTP request +// path for HasKey route: /api/has-key/{node}/{key}. +// If ok is false, the provided path is not matched. +func parseHasKeyPath(p string) (addr common.Address, key []byte, ok bool) { + p = strings.TrimPrefix(p, "/api/has-key/") + parts := strings.SplitN(p, "/", 2) + if len(parts) != 2 || parts[0] == "" || parts[1] == "" { + return addr, nil, false + } + addr = common.HexToAddress(parts[0]) + key = common.Hex2Bytes(parts[1]) + return addr, key, true +} + +// listingPage returns start value and listing limit +// from url query values. +func listingPage(q url.Values) (start string, limit int) { + // if limit is not a valid integer (or blank string), + // ignore the error and use the returned 0 value + limit, _ = strconv.Atoi(q.Get("limit")) + return q.Get("start"), limit +} + +// StatusResponse is a standardized JSON-encoded response +// that contains information about HTTP response code +// for easier status identification. +type StatusResponse struct { + Message string `json:"message"` + Code int `json:"code"` +} + +// jsonStatusResponse writes to the response writer +// JSON-encoded StatusResponse based on the provided status code. +func jsonStatusResponse(w http.ResponseWriter, code int) { + w.Header().Set("Content-Type", jsonContentType) + w.WriteHeader(code) + err := json.NewEncoder(w).Encode(StatusResponse{ + Message: http.StatusText(code), + Code: code, + }) + if err != nil { + log.Error("chunk explorer: json status response", "err", err) + } +} + +// noCacheHandler sets required HTTP headers to prevent +// response caching at the client side. +func noCacheHandler(h http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate") + w.Header().Set("Pragma", "no-cache") + w.Header().Set("Expires", "0") + h.ServeHTTP(w, r) + }) +} diff --git a/swarm/storage/mock/explorer/explorer_test.go b/swarm/storage/mock/explorer/explorer_test.go new file mode 100644 index 000000000..be2668426 --- /dev/null +++ b/swarm/storage/mock/explorer/explorer_test.go @@ -0,0 +1,471 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package explorer + +import ( + "encoding/binary" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "net/url" + "os" + "sort" + "strconv" + "strings" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/swarm/storage/mock" + "github.com/ethereum/go-ethereum/swarm/storage/mock/db" + "github.com/ethereum/go-ethereum/swarm/storage/mock/mem" +) + +// TestHandler_memGlobalStore runs a set of tests +// to validate handler with mem global store. +func TestHandler_memGlobalStore(t *testing.T) { + t.Parallel() + + globalStore := mem.NewGlobalStore() + + testHandler(t, globalStore) +} + +// TestHandler_dbGlobalStore runs a set of tests +// to validate handler with database global store. +func TestHandler_dbGlobalStore(t *testing.T) { + t.Parallel() + + dir, err := ioutil.TempDir("", "swarm-mock-explorer-db-") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + + globalStore, err := db.NewGlobalStore(dir) + if err != nil { + t.Fatal(err) + } + defer globalStore.Close() + + testHandler(t, globalStore) +} + +// testHandler stores data distributed by node addresses +// and validates if this data is correctly retrievable +// by using the http.Handler returned by NewHandler function. +// This test covers all HTTP routes and various get parameters +// on them to check paginated results. +func testHandler(t *testing.T, globalStore mock.GlobalStorer) { + const ( + nodeCount = 350 + keyCount = 250 + keysOnNodeCount = 150 + ) + + // keys for every node + nodeKeys := make(map[string][]string) + + // a node address that is not present in global store + invalidAddr := "0x7b8b72938c254cf002c4e1e714d27e022be88d93" + + // a key that is not present in global store + invalidKey := "f9824192fb515cfb" + + for i := 1; i <= nodeCount; i++ { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, uint64(i)) + addr := common.BytesToAddress(b).Hex() + nodeKeys[addr] = make([]string, 0) + } + + for i := 1; i <= keyCount; i++ { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, uint64(i)) + + key := common.Bytes2Hex(b) + + var c int + for addr := range nodeKeys { + nodeKeys[addr] = append(nodeKeys[addr], key) + c++ + if c >= keysOnNodeCount { + break + } + } + } + + // sort keys for every node as they are expected to be + // sorted in HTTP responses + for _, keys := range nodeKeys { + sort.Strings(keys) + } + + // nodes for every key + keyNodes := make(map[string][]string) + + // construct a reverse mapping of nodes for every key + for addr, keys := range nodeKeys { + for _, key := range keys { + keyNodes[key] = append(keyNodes[key], addr) + } + } + + // sort node addresses with case insensitive sort, + // as hex letters in node addresses are in mixed caps + for _, addrs := range keyNodes { + sortCaseInsensitive(addrs) + } + + // find a key that is not stored at the address + var ( + unmatchedAddr string + unmatchedKey string + ) + for addr, keys := range nodeKeys { + for key := range keyNodes { + var found bool + for _, k := range keys { + if k == key { + found = true + break + } + } + if !found { + unmatchedAddr = addr + unmatchedKey = key + } + break + } + if unmatchedAddr != "" { + break + } + } + // check if unmatched key/address pair is found + if unmatchedAddr == "" || unmatchedKey == "" { + t.Fatalf("could not find a key that is not associated with a node") + } + + // store the data + for addr, keys := range nodeKeys { + for _, key := range keys { + err := globalStore.Put(common.HexToAddress(addr), common.Hex2Bytes(key), []byte("data")) + if err != nil { + t.Fatal(err) + } + } + } + + handler := NewHandler(globalStore, nil) + + // this subtest confirms that it has uploaded key and that it does not have invalid keys + t.Run("has key", func(t *testing.T) { + for addr, keys := range nodeKeys { + for _, key := range keys { + testStatusResponse(t, handler, "/api/has-key/"+addr+"/"+key, http.StatusOK) + testStatusResponse(t, handler, "/api/has-key/"+invalidAddr+"/"+key, http.StatusNotFound) + } + testStatusResponse(t, handler, "/api/has-key/"+addr+"/"+invalidKey, http.StatusNotFound) + } + testStatusResponse(t, handler, "/api/has-key/"+invalidAddr+"/"+invalidKey, http.StatusNotFound) + testStatusResponse(t, handler, "/api/has-key/"+unmatchedAddr+"/"+unmatchedKey, http.StatusNotFound) + }) + + // this subtest confirms that all keys are are listed in correct order with expected pagination + t.Run("keys", func(t *testing.T) { + var allKeys []string + for key := range keyNodes { + allKeys = append(allKeys, key) + } + sort.Strings(allKeys) + + t.Run("limit 0", testKeys(handler, allKeys, 0, "")) + t.Run("limit default", testKeys(handler, allKeys, mock.DefaultLimit, "")) + t.Run("limit 2x default", testKeys(handler, allKeys, 2*mock.DefaultLimit, "")) + t.Run("limit 0.5x default", testKeys(handler, allKeys, mock.DefaultLimit/2, "")) + t.Run("limit max", testKeys(handler, allKeys, mock.MaxLimit, "")) + t.Run("limit 2x max", testKeys(handler, allKeys, 2*mock.MaxLimit, "")) + t.Run("limit negative", testKeys(handler, allKeys, -10, "")) + }) + + // this subtest confirms that all keys are are listed for every node in correct order + // and that for one node different pagination options are correct + t.Run("node keys", func(t *testing.T) { + var limitCheckAddr string + + for addr, keys := range nodeKeys { + testKeys(handler, keys, 0, addr)(t) + if limitCheckAddr == "" { + limitCheckAddr = addr + } + } + testKeys(handler, nil, 0, invalidAddr)(t) + + limitCheckKeys := nodeKeys[limitCheckAddr] + t.Run("limit 0", testKeys(handler, limitCheckKeys, 0, limitCheckAddr)) + t.Run("limit default", testKeys(handler, limitCheckKeys, mock.DefaultLimit, limitCheckAddr)) + t.Run("limit 2x default", testKeys(handler, limitCheckKeys, 2*mock.DefaultLimit, limitCheckAddr)) + t.Run("limit 0.5x default", testKeys(handler, limitCheckKeys, mock.DefaultLimit/2, limitCheckAddr)) + t.Run("limit max", testKeys(handler, limitCheckKeys, mock.MaxLimit, limitCheckAddr)) + t.Run("limit 2x max", testKeys(handler, limitCheckKeys, 2*mock.MaxLimit, limitCheckAddr)) + t.Run("limit negative", testKeys(handler, limitCheckKeys, -10, limitCheckAddr)) + }) + + // this subtest confirms that all nodes are are listed in correct order with expected pagination + t.Run("nodes", func(t *testing.T) { + var allNodes []string + for addr := range nodeKeys { + allNodes = append(allNodes, addr) + } + sortCaseInsensitive(allNodes) + + t.Run("limit 0", testNodes(handler, allNodes, 0, "")) + t.Run("limit default", testNodes(handler, allNodes, mock.DefaultLimit, "")) + t.Run("limit 2x default", testNodes(handler, allNodes, 2*mock.DefaultLimit, "")) + t.Run("limit 0.5x default", testNodes(handler, allNodes, mock.DefaultLimit/2, "")) + t.Run("limit max", testNodes(handler, allNodes, mock.MaxLimit, "")) + t.Run("limit 2x max", testNodes(handler, allNodes, 2*mock.MaxLimit, "")) + t.Run("limit negative", testNodes(handler, allNodes, -10, "")) + }) + + // this subtest confirms that all nodes are are listed that contain a a particular key in correct order + // and that for one key different node pagination options are correct + t.Run("key nodes", func(t *testing.T) { + var limitCheckKey string + + for key, addrs := range keyNodes { + testNodes(handler, addrs, 0, key)(t) + if limitCheckKey == "" { + limitCheckKey = key + } + } + testNodes(handler, nil, 0, invalidKey)(t) + + limitCheckKeys := keyNodes[limitCheckKey] + t.Run("limit 0", testNodes(handler, limitCheckKeys, 0, limitCheckKey)) + t.Run("limit default", testNodes(handler, limitCheckKeys, mock.DefaultLimit, limitCheckKey)) + t.Run("limit 2x default", testNodes(handler, limitCheckKeys, 2*mock.DefaultLimit, limitCheckKey)) + t.Run("limit 0.5x default", testNodes(handler, limitCheckKeys, mock.DefaultLimit/2, limitCheckKey)) + t.Run("limit max", testNodes(handler, limitCheckKeys, mock.MaxLimit, limitCheckKey)) + t.Run("limit 2x max", testNodes(handler, limitCheckKeys, 2*mock.MaxLimit, limitCheckKey)) + t.Run("limit negative", testNodes(handler, limitCheckKeys, -10, limitCheckKey)) + }) +} + +// testsKeys returns a test function that validates wantKeys against a series of /api/keys +// HTTP responses with provided limit and node options. +func testKeys(handler http.Handler, wantKeys []string, limit int, node string) func(t *testing.T) { + return func(t *testing.T) { + t.Helper() + + wantLimit := limit + if wantLimit <= 0 { + wantLimit = mock.DefaultLimit + } + if wantLimit > mock.MaxLimit { + wantLimit = mock.MaxLimit + } + wantKeysLen := len(wantKeys) + var i int + var startKey string + for { + var wantNext string + start := i * wantLimit + end := (i + 1) * wantLimit + if end < wantKeysLen { + wantNext = wantKeys[end] + } else { + end = wantKeysLen + } + testKeysResponse(t, handler, node, startKey, limit, KeysResponse{ + Keys: wantKeys[start:end], + Next: wantNext, + }) + if wantNext == "" { + break + } + startKey = wantNext + i++ + } + } +} + +// testNodes returns a test function that validates wantAddrs against a series of /api/nodes +// HTTP responses with provided limit and key options. +func testNodes(handler http.Handler, wantAddrs []string, limit int, key string) func(t *testing.T) { + return func(t *testing.T) { + t.Helper() + + wantLimit := limit + if wantLimit <= 0 { + wantLimit = mock.DefaultLimit + } + if wantLimit > mock.MaxLimit { + wantLimit = mock.MaxLimit + } + wantAddrsLen := len(wantAddrs) + var i int + var startKey string + for { + var wantNext string + start := i * wantLimit + end := (i + 1) * wantLimit + if end < wantAddrsLen { + wantNext = wantAddrs[end] + } else { + end = wantAddrsLen + } + testNodesResponse(t, handler, key, startKey, limit, NodesResponse{ + Nodes: wantAddrs[start:end], + Next: wantNext, + }) + if wantNext == "" { + break + } + startKey = wantNext + i++ + } + } +} + +// testStatusResponse validates a response made on url if it matches +// the expected StatusResponse. +func testStatusResponse(t *testing.T, handler http.Handler, url string, code int) { + t.Helper() + + resp := httpGet(t, handler, url) + + if resp.StatusCode != code { + t.Errorf("got status code %v, want %v", resp.StatusCode, code) + } + if got := resp.Header.Get("Content-Type"); got != jsonContentType { + t.Errorf("got Content-Type header %q, want %q", got, jsonContentType) + } + var r StatusResponse + if err := json.NewDecoder(resp.Body).Decode(&r); err != nil { + t.Fatal(err) + } + if r.Code != code { + t.Errorf("got response code %v, want %v", r.Code, code) + } + if r.Message != http.StatusText(code) { + t.Errorf("got response message %q, want %q", r.Message, http.StatusText(code)) + } +} + +// testKeysResponse validates response returned from handler on /api/keys +// with node, start and limit options against KeysResponse. +func testKeysResponse(t *testing.T, handler http.Handler, node, start string, limit int, want KeysResponse) { + t.Helper() + + u, err := url.Parse("/api/keys") + if err != nil { + t.Fatal(err) + } + q := u.Query() + if node != "" { + q.Set("node", node) + } + if start != "" { + q.Set("start", start) + } + if limit != 0 { + q.Set("limit", strconv.Itoa(limit)) + } + u.RawQuery = q.Encode() + + resp := httpGet(t, handler, u.String()) + + if resp.StatusCode != http.StatusOK { + t.Errorf("got status code %v, want %v", resp.StatusCode, http.StatusOK) + } + if got := resp.Header.Get("Content-Type"); got != jsonContentType { + t.Errorf("got Content-Type header %q, want %q", got, jsonContentType) + } + var r KeysResponse + if err := json.NewDecoder(resp.Body).Decode(&r); err != nil { + t.Fatal(err) + } + if fmt.Sprint(r.Keys) != fmt.Sprint(want.Keys) { + t.Errorf("got keys %v, want %v", r.Keys, want.Keys) + } + if r.Next != want.Next { + t.Errorf("got next %s, want %s", r.Next, want.Next) + } +} + +// testNodesResponse validates response returned from handler on /api/nodes +// with key, start and limit options against NodesResponse. +func testNodesResponse(t *testing.T, handler http.Handler, key, start string, limit int, want NodesResponse) { + t.Helper() + + u, err := url.Parse("/api/nodes") + if err != nil { + t.Fatal(err) + } + q := u.Query() + if key != "" { + q.Set("key", key) + } + if start != "" { + q.Set("start", start) + } + if limit != 0 { + q.Set("limit", strconv.Itoa(limit)) + } + u.RawQuery = q.Encode() + + resp := httpGet(t, handler, u.String()) + + if resp.StatusCode != http.StatusOK { + t.Errorf("got status code %v, want %v", resp.StatusCode, http.StatusOK) + } + if got := resp.Header.Get("Content-Type"); got != jsonContentType { + t.Errorf("got Content-Type header %q, want %q", got, jsonContentType) + } + var r NodesResponse + if err := json.NewDecoder(resp.Body).Decode(&r); err != nil { + t.Fatal(err) + } + if fmt.Sprint(r.Nodes) != fmt.Sprint(want.Nodes) { + t.Errorf("got nodes %v, want %v", r.Nodes, want.Nodes) + } + if r.Next != want.Next { + t.Errorf("got next %s, want %s", r.Next, want.Next) + } +} + +// httpGet uses httptest recorder to provide a response on handler's url. +func httpGet(t *testing.T, handler http.Handler, url string) (r *http.Response) { + t.Helper() + + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + t.Fatal(err) + } + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + return w.Result() +} + +// sortCaseInsensitive performs a case insensitive sort on a string slice. +func sortCaseInsensitive(s []string) { + sort.Slice(s, func(i, j int) bool { + return strings.ToLower(s[i]) < strings.ToLower(s[j]) + }) +} diff --git a/swarm/storage/mock/explorer/headers_test.go b/swarm/storage/mock/explorer/headers_test.go new file mode 100644 index 000000000..5b8e05ffd --- /dev/null +++ b/swarm/storage/mock/explorer/headers_test.go @@ -0,0 +1,163 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package explorer + +import ( + "fmt" + "net/http" + "net/http/httptest" + "testing" + + "github.com/ethereum/go-ethereum/swarm/storage/mock/mem" +) + +// TestHandler_CORSOrigin validates that the correct Access-Control-Allow-Origin +// header is served with various allowed origin settings. +func TestHandler_CORSOrigin(t *testing.T) { + notAllowedOrigin := "http://not-allowed-origin.com/" + + for _, tc := range []struct { + name string + origins []string + }{ + { + name: "no origin", + origins: nil, + }, + { + name: "single origin", + origins: []string{"http://localhost/"}, + }, + { + name: "multiple origins", + origins: []string{"http://localhost/", "http://ethereum.org/"}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + handler := NewHandler(mem.NewGlobalStore(), tc.origins) + + origins := tc.origins + if origins == nil { + // handle the "no origin" test case + origins = []string{""} + } + + for _, origin := range origins { + t.Run(fmt.Sprintf("get %q", origin), newTestCORSOrigin(handler, origin, origin)) + t.Run(fmt.Sprintf("preflight %q", origin), newTestCORSPreflight(handler, origin, origin)) + } + + t.Run(fmt.Sprintf("get %q", notAllowedOrigin), newTestCORSOrigin(handler, notAllowedOrigin, "")) + t.Run(fmt.Sprintf("preflight %q", notAllowedOrigin), newTestCORSPreflight(handler, notAllowedOrigin, "")) + }) + } + + t.Run("wildcard", func(t *testing.T) { + handler := NewHandler(mem.NewGlobalStore(), []string{"*"}) + + for _, origin := range []string{ + "http://example.com/", + "http://ethereum.org", + "http://localhost", + } { + t.Run(fmt.Sprintf("get %q", origin), newTestCORSOrigin(handler, origin, origin)) + t.Run(fmt.Sprintf("preflight %q", origin), newTestCORSPreflight(handler, origin, origin)) + } + }) +} + +// newTestCORSOrigin returns a test function that validates if wantOrigin CORS header is +// served by the handler for a GET request. +func newTestCORSOrigin(handler http.Handler, origin, wantOrigin string) func(t *testing.T) { + return func(t *testing.T) { + req, err := http.NewRequest(http.MethodGet, "/", nil) + if err != nil { + t.Fatal(err) + } + req.Header.Set("Origin", origin) + + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + resp := w.Result() + + header := resp.Header.Get("Access-Control-Allow-Origin") + if header != wantOrigin { + t.Errorf("got Access-Control-Allow-Origin header %q, want %q", header, wantOrigin) + } + } +} + +// newTestCORSPreflight returns a test function that validates if wantOrigin CORS header is +// served by the handler for an OPTIONS CORS preflight request. +func newTestCORSPreflight(handler http.Handler, origin, wantOrigin string) func(t *testing.T) { + return func(t *testing.T) { + req, err := http.NewRequest(http.MethodOptions, "/", nil) + if err != nil { + t.Fatal(err) + } + req.Header.Set("Origin", origin) + req.Header.Set("Access-Control-Request-Method", "GET") + + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + resp := w.Result() + + header := resp.Header.Get("Access-Control-Allow-Origin") + if header != wantOrigin { + t.Errorf("got Access-Control-Allow-Origin header %q, want %q", header, wantOrigin) + } + } +} + +// TestHandler_noCacheHeaders validates that no cache headers are server. +func TestHandler_noCacheHeaders(t *testing.T) { + handler := NewHandler(mem.NewGlobalStore(), nil) + + for _, tc := range []struct { + url string + }{ + { + url: "/", + }, + { + url: "/api/nodes", + }, + { + url: "/api/keys", + }, + } { + req, err := http.NewRequest(http.MethodGet, tc.url, nil) + if err != nil { + t.Fatal(err) + } + + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + resp := w.Result() + + for header, want := range map[string]string{ + "Cache-Control": "no-cache, no-store, must-revalidate", + "Pragma": "no-cache", + "Expires": "0", + } { + got := resp.Header.Get(header) + if got != want { + t.Errorf("got %q header %q for url %q, want %q", header, tc.url, got, want) + } + } + } +} diff --git a/swarm/storage/mock/explorer/swagger.yaml b/swarm/storage/mock/explorer/swagger.yaml new file mode 100644 index 000000000..2c014e927 --- /dev/null +++ b/swarm/storage/mock/explorer/swagger.yaml @@ -0,0 +1,176 @@ +swagger: '2.0' +info: + title: Swarm Global Store API + version: 0.1.0 +tags: + - name: Has Key + description: Checks if a Key is stored on a Node + - name: Keys + description: Lists Keys + - name: Nodes + description: Lists Node addresses + +paths: + '/api/has-key/{node}/{key}': + get: + tags: + - Has Key + summary: Checks if a Key is stored on a Node + operationId: hasKey + produces: + - application/json + + parameters: + - name: node + in: path + required: true + type: string + format: hex-endoded + description: Node address. + + - name: key + in: path + required: true + type: string + format: hex-endoded + description: Key. + + responses: + '200': + description: Key is stored on Node + schema: + $ref: '#/definitions/Status' + '404': + description: Key is not stored on Node + schema: + $ref: '#/definitions/Status' + '500': + description: Internal Server Error + schema: + $ref: '#/definitions/Status' + + '/api/keys': + get: + tags: + - Keys + summary: Lists Keys + operationId: keys + produces: + - application/json + + parameters: + - name: start + in: query + required: false + type: string + format: hex-encoded Key + description: A Key as the starting point for the returned list. It is usually a value from the returned "next" field in the Keys repsonse. + + - name: limit + in: query + required: false + type: integer + default: 100 + minimum: 1 + maximum: 1000 + description: Limits the number of Keys returned in on response. + + - name: node + in: query + required: false + type: string + format: hex-encoded Node address + description: If this parameter is provided, only Keys that are stored on this Node be returned in the response. If not, all known Keys will be returned. + + responses: + '200': + description: List of Keys + schema: + $ref: '#/definitions/Keys' + '500': + description: Internal Server Error + schema: + $ref: '#/definitions/Status' + + '/api/nodes': + get: + tags: + - Nodes + summary: Lists Node addresses + operationId: nodes + produces: + - application/json + + parameters: + - name: start + in: query + required: false + type: string + format: hex-encoded Node address + description: A Node address as the starting point for the returned list. It is usually a value from the returned "next" field in the Nodes repsonse. + + - name: limit + in: query + required: false + type: integer + default: 100 + minimum: 1 + maximum: 1000 + description: Limits the number of Node addresses returned in on response. + + - name: key + in: query + required: false + type: string + format: hex-encoded Key + description: If this parameter is provided, only addresses of Nodes that store this Key will be returned in the response. If not, all known Node addresses will be returned. + + responses: + '200': + description: List of Node addresses + schema: + $ref: '#/definitions/Nodes' + '500': + description: Internal Server Error + schema: + $ref: '#/definitions/Status' + +definitions: + + Status: + type: object + properties: + message: + type: string + description: HTTP Status Code name. + code: + type: integer + description: HTTP Status Code. + + Keys: + type: object + properties: + keys: + type: array + description: A list of Keys. + items: + type: string + format: hex-encoded Key + next: + type: string + format: hex-encoded Key + description: If present, the next Key in listing. Can be passed as "start" query parameter to continue the listing. If not present, the end of the listing is reached. + + Nodes: + type: object + properties: + nodes: + type: array + description: A list of Node addresses. + items: + type: string + format: hex-encoded Node address + next: + type: string + format: hex-encoded Node address + description: If present, the next Node address in listing. Can be passed as "start" query parameter to continue the listing. If not present, the end of the listing is reached. diff --git a/swarm/storage/mock/mem/mem.go b/swarm/storage/mock/mem/mem.go index 3a0a2beb8..38bf098df 100644 --- a/swarm/storage/mock/mem/mem.go +++ b/swarm/storage/mock/mem/mem.go @@ -25,6 +25,7 @@ import ( "encoding/json" "io" "io/ioutil" + "sort" "sync" "github.com/ethereum/go-ethereum/common" @@ -34,16 +35,27 @@ import ( // GlobalStore stores all chunk data and also keys and node addresses relations. // It implements mock.GlobalStore interface. type GlobalStore struct { - nodes map[string]map[common.Address]struct{} - data map[string][]byte - mu sync.Mutex + // holds a slice of keys per node + nodeKeys map[common.Address][][]byte + // holds which key is stored on which nodes + keyNodes map[string][]common.Address + // all node addresses + nodes []common.Address + // all keys + keys [][]byte + // all keys data + data map[string][]byte + mu sync.RWMutex } // NewGlobalStore creates a new instance of GlobalStore. func NewGlobalStore() *GlobalStore { return &GlobalStore{ - nodes: make(map[string]map[common.Address]struct{}), - data: make(map[string][]byte), + nodeKeys: make(map[common.Address][][]byte), + keyNodes: make(map[string][]common.Address), + nodes: make([]common.Address, 0), + keys: make([][]byte, 0), + data: make(map[string][]byte), } } @@ -56,10 +68,10 @@ func (s *GlobalStore) NewNodeStore(addr common.Address) *mock.NodeStore { // Get returns chunk data if the chunk with key exists for node // on address addr. func (s *GlobalStore) Get(addr common.Address, key []byte) (data []byte, err error) { - s.mu.Lock() - defer s.mu.Unlock() + s.mu.RLock() + defer s.mu.RUnlock() - if _, ok := s.nodes[string(key)][addr]; !ok { + if _, has := s.nodeKeyIndex(addr, key); !has { return nil, mock.ErrNotFound } @@ -75,11 +87,33 @@ func (s *GlobalStore) Put(addr common.Address, key []byte, data []byte) error { s.mu.Lock() defer s.mu.Unlock() - if _, ok := s.nodes[string(key)]; !ok { - s.nodes[string(key)] = make(map[common.Address]struct{}) + if i, found := s.nodeKeyIndex(addr, key); !found { + s.nodeKeys[addr] = append(s.nodeKeys[addr], nil) + copy(s.nodeKeys[addr][i+1:], s.nodeKeys[addr][i:]) + s.nodeKeys[addr][i] = key } - s.nodes[string(key)][addr] = struct{}{} + + if i, found := s.keyNodeIndex(key, addr); !found { + k := string(key) + s.keyNodes[k] = append(s.keyNodes[k], addr) + copy(s.keyNodes[k][i+1:], s.keyNodes[k][i:]) + s.keyNodes[k][i] = addr + } + + if i, found := s.nodeIndex(addr); !found { + s.nodes = append(s.nodes, addr) + copy(s.nodes[i+1:], s.nodes[i:]) + s.nodes[i] = addr + } + + if i, found := s.keyIndex(key); !found { + s.keys = append(s.keys, nil) + copy(s.keys[i+1:], s.keys[i:]) + s.keys[i] = key + } + s.data[string(key)] = data + return nil } @@ -88,24 +122,177 @@ func (s *GlobalStore) Delete(addr common.Address, key []byte) error { s.mu.Lock() defer s.mu.Unlock() - var count int - if _, ok := s.nodes[string(key)]; ok { - delete(s.nodes[string(key)], addr) - count = len(s.nodes[string(key)]) + if i, has := s.nodeKeyIndex(addr, key); has { + s.nodeKeys[addr] = append(s.nodeKeys[addr][:i], s.nodeKeys[addr][i+1:]...) } - if count == 0 { - delete(s.data, string(key)) + + k := string(key) + if i, on := s.keyNodeIndex(key, addr); on { + s.keyNodes[k] = append(s.keyNodes[k][:i], s.keyNodes[k][i+1:]...) + } + + if len(s.nodeKeys[addr]) == 0 { + if i, found := s.nodeIndex(addr); found { + s.nodes = append(s.nodes[:i], s.nodes[i+1:]...) + } + } + + if len(s.keyNodes[k]) == 0 { + if i, found := s.keyIndex(key); found { + s.keys = append(s.keys[:i], s.keys[i+1:]...) + } } return nil } // HasKey returns whether a node with addr contains the key. -func (s *GlobalStore) HasKey(addr common.Address, key []byte) bool { - s.mu.Lock() - defer s.mu.Unlock() +func (s *GlobalStore) HasKey(addr common.Address, key []byte) (yes bool) { + s.mu.RLock() + defer s.mu.RUnlock() - _, ok := s.nodes[string(key)][addr] - return ok + _, yes = s.nodeKeyIndex(addr, key) + return yes +} + +// keyIndex returns the index of a key in keys slice. +func (s *GlobalStore) keyIndex(key []byte) (index int, found bool) { + l := len(s.keys) + index = sort.Search(l, func(i int) bool { + return bytes.Compare(s.keys[i], key) >= 0 + }) + found = index < l && bytes.Equal(s.keys[index], key) + return index, found +} + +// nodeIndex returns the index of a node address in nodes slice. +func (s *GlobalStore) nodeIndex(addr common.Address) (index int, found bool) { + l := len(s.nodes) + index = sort.Search(l, func(i int) bool { + return bytes.Compare(s.nodes[i][:], addr[:]) >= 0 + }) + found = index < l && bytes.Equal(s.nodes[index][:], addr[:]) + return index, found +} + +// nodeKeyIndex returns the index of a key in nodeKeys slice. +func (s *GlobalStore) nodeKeyIndex(addr common.Address, key []byte) (index int, found bool) { + l := len(s.nodeKeys[addr]) + index = sort.Search(l, func(i int) bool { + return bytes.Compare(s.nodeKeys[addr][i], key) >= 0 + }) + found = index < l && bytes.Equal(s.nodeKeys[addr][index], key) + return index, found +} + +// keyNodeIndex returns the index of a node address in keyNodes slice. +func (s *GlobalStore) keyNodeIndex(key []byte, addr common.Address) (index int, found bool) { + k := string(key) + l := len(s.keyNodes[k]) + index = sort.Search(l, func(i int) bool { + return bytes.Compare(s.keyNodes[k][i][:], addr[:]) >= 0 + }) + found = index < l && s.keyNodes[k][index] == addr + return index, found +} + +// Keys returns a paginated list of keys on all nodes. +func (s *GlobalStore) Keys(startKey []byte, limit int) (keys mock.Keys, err error) { + s.mu.RLock() + defer s.mu.RUnlock() + + var i int + if startKey != nil { + i, _ = s.keyIndex(startKey) + } + total := len(s.keys) + max := maxIndex(i, limit, total) + keys.Keys = make([][]byte, 0, max-i) + for ; i < max; i++ { + keys.Keys = append(keys.Keys, append([]byte(nil), s.keys[i]...)) + } + if total > max { + keys.Next = s.keys[max] + } + return keys, nil +} + +// Nodes returns a paginated list of all known nodes. +func (s *GlobalStore) Nodes(startAddr *common.Address, limit int) (nodes mock.Nodes, err error) { + s.mu.RLock() + defer s.mu.RUnlock() + + var i int + if startAddr != nil { + i, _ = s.nodeIndex(*startAddr) + } + total := len(s.nodes) + max := maxIndex(i, limit, total) + nodes.Addrs = make([]common.Address, 0, max-i) + for ; i < max; i++ { + nodes.Addrs = append(nodes.Addrs, s.nodes[i]) + } + if total > max { + nodes.Next = &s.nodes[max] + } + return nodes, nil +} + +// NodeKeys returns a paginated list of keys on a node with provided address. +func (s *GlobalStore) NodeKeys(addr common.Address, startKey []byte, limit int) (keys mock.Keys, err error) { + s.mu.RLock() + defer s.mu.RUnlock() + + var i int + if startKey != nil { + i, _ = s.nodeKeyIndex(addr, startKey) + } + total := len(s.nodeKeys[addr]) + max := maxIndex(i, limit, total) + keys.Keys = make([][]byte, 0, max-i) + for ; i < max; i++ { + keys.Keys = append(keys.Keys, append([]byte(nil), s.nodeKeys[addr][i]...)) + } + if total > max { + keys.Next = s.nodeKeys[addr][max] + } + return keys, nil +} + +// KeyNodes returns a paginated list of nodes that contain a particular key. +func (s *GlobalStore) KeyNodes(key []byte, startAddr *common.Address, limit int) (nodes mock.Nodes, err error) { + s.mu.RLock() + defer s.mu.RUnlock() + + var i int + if startAddr != nil { + i, _ = s.keyNodeIndex(key, *startAddr) + } + total := len(s.keyNodes[string(key)]) + max := maxIndex(i, limit, total) + nodes.Addrs = make([]common.Address, 0, max-i) + for ; i < max; i++ { + nodes.Addrs = append(nodes.Addrs, s.keyNodes[string(key)][i]) + } + if total > max { + nodes.Next = &s.keyNodes[string(key)][max] + } + return nodes, nil +} + +// maxIndex returns the end index for one page listing +// based on the start index, limit and total number of elements. +func maxIndex(start, limit, total int) (max int) { + if limit <= 0 { + limit = mock.DefaultLimit + } + if limit > mock.MaxLimit { + limit = mock.MaxLimit + } + max = total + if start+limit < max { + max = start + limit + } + return max } // Import reads tar archive from a reader that contains exported chunk data. @@ -135,14 +322,26 @@ func (s *GlobalStore) Import(r io.Reader) (n int, err error) { return n, err } - addrs := make(map[common.Address]struct{}) - for _, a := range c.Addrs { - addrs[a] = struct{}{} + key := common.Hex2Bytes(hdr.Name) + s.keyNodes[string(key)] = c.Addrs + for _, addr := range c.Addrs { + if i, has := s.nodeKeyIndex(addr, key); !has { + s.nodeKeys[addr] = append(s.nodeKeys[addr], nil) + copy(s.nodeKeys[addr][i+1:], s.nodeKeys[addr][i:]) + s.nodeKeys[addr][i] = key + } + if i, found := s.nodeIndex(addr); !found { + s.nodes = append(s.nodes, addr) + copy(s.nodes[i+1:], s.nodes[i:]) + s.nodes[i] = addr + } } - - key := string(common.Hex2Bytes(hdr.Name)) - s.nodes[key] = addrs - s.data[key] = c.Data + if i, found := s.keyIndex(key); !found { + s.keys = append(s.keys, nil) + copy(s.keys[i+1:], s.keys[i:]) + s.keys[i] = key + } + s.data[string(key)] = c.Data n++ } return n, err @@ -151,23 +350,18 @@ func (s *GlobalStore) Import(r io.Reader) (n int, err error) { // Export writes to a writer a tar archive with all chunk data from // the store. It returns the number of chunks exported and an error. func (s *GlobalStore) Export(w io.Writer) (n int, err error) { - s.mu.Lock() - defer s.mu.Unlock() + s.mu.RLock() + defer s.mu.RUnlock() tw := tar.NewWriter(w) defer tw.Close() buf := bytes.NewBuffer(make([]byte, 0, 1024)) encoder := json.NewEncoder(buf) - for key, addrs := range s.nodes { - al := make([]common.Address, 0, len(addrs)) - for a := range addrs { - al = append(al, a) - } - + for key, addrs := range s.keyNodes { buf.Reset() if err = encoder.Encode(mock.ExportedChunk{ - Addrs: al, + Addrs: addrs, Data: s.data[key], }); err != nil { return n, err diff --git a/swarm/storage/mock/mem/mem_test.go b/swarm/storage/mock/mem/mem_test.go index adcefaabb..d39aaef45 100644 --- a/swarm/storage/mock/mem/mem_test.go +++ b/swarm/storage/mock/mem/mem_test.go @@ -28,6 +28,12 @@ func TestGlobalStore(t *testing.T) { test.MockStore(t, NewGlobalStore(), 100) } +// TestGlobalStoreListings is running test for a GlobalStore +// using test.MockStoreListings function. +func TestGlobalStoreListings(t *testing.T) { + test.MockStoreListings(t, NewGlobalStore(), 1000) +} + // TestImportExport is running tests for importing and // exporting data between two GlobalStores // using test.ImportExport function. diff --git a/swarm/storage/mock/mock.go b/swarm/storage/mock/mock.go index 626ba3fe1..586112a98 100644 --- a/swarm/storage/mock/mock.go +++ b/swarm/storage/mock/mock.go @@ -39,6 +39,17 @@ import ( "github.com/ethereum/go-ethereum/common" ) +const ( + // DefaultLimit should be used as default limit for + // Keys, Nodes, NodeKeys and KeyNodes GlobarStorer + // methids implementations. + DefaultLimit = 100 + // MaxLimit should be used as the maximal returned number + // of items for Keys, Nodes, NodeKeys and KeyNodes GlobarStorer + // methids implementations, regardless of provided limit. + MaxLimit = 1000 +) + // ErrNotFound indicates that the chunk is not found. var ErrNotFound = errors.New("not found") @@ -76,6 +87,10 @@ func (n *NodeStore) Delete(key []byte) error { return n.store.Delete(n.addr, key) } +func (n *NodeStore) Keys(startKey []byte, limit int) (keys Keys, err error) { + return n.store.NodeKeys(n.addr, startKey, limit) +} + // GlobalStorer defines methods for mock db store // that stores chunk data for all swarm nodes. // It is used in tests to construct mock NodeStores @@ -85,12 +100,28 @@ type GlobalStorer interface { Put(addr common.Address, key []byte, data []byte) error Delete(addr common.Address, key []byte) error HasKey(addr common.Address, key []byte) bool + Keys(startKey []byte, limit int) (keys Keys, err error) + Nodes(startAddr *common.Address, limit int) (nodes Nodes, err error) + NodeKeys(addr common.Address, startKey []byte, limit int) (keys Keys, err error) + KeyNodes(key []byte, startAddr *common.Address, limit int) (nodes Nodes, err error) // NewNodeStore creates an instance of NodeStore // to be used by a single swarm node with // address addr. NewNodeStore(addr common.Address) *NodeStore } +// Keys are returned results by Keys and NodeKeys GlobalStorer methods. +type Keys struct { + Keys [][]byte + Next []byte +} + +// Nodes are returned results by Nodes and KeyNodes GlobalStorer methods. +type Nodes struct { + Addrs []common.Address + Next *common.Address +} + // Importer defines method for importing mock store data // from an exported tar archive. type Importer interface { diff --git a/swarm/storage/mock/rpc/rpc.go b/swarm/storage/mock/rpc/rpc.go index 8cd6c83a7..8150ccff1 100644 --- a/swarm/storage/mock/rpc/rpc.go +++ b/swarm/storage/mock/rpc/rpc.go @@ -88,3 +88,27 @@ func (s *GlobalStore) HasKey(addr common.Address, key []byte) bool { } return has } + +// Keys returns a paginated list of keys on all nodes. +func (s *GlobalStore) Keys(startKey []byte, limit int) (keys mock.Keys, err error) { + err = s.client.Call(&keys, "mockStore_keys", startKey, limit) + return keys, err +} + +// Nodes returns a paginated list of all known nodes. +func (s *GlobalStore) Nodes(startAddr *common.Address, limit int) (nodes mock.Nodes, err error) { + err = s.client.Call(&nodes, "mockStore_nodes", startAddr, limit) + return nodes, err +} + +// NodeKeys returns a paginated list of keys on a node with provided address. +func (s *GlobalStore) NodeKeys(addr common.Address, startKey []byte, limit int) (keys mock.Keys, err error) { + err = s.client.Call(&keys, "mockStore_nodeKeys", addr, startKey, limit) + return keys, err +} + +// KeyNodes returns a paginated list of nodes that contain a particular key. +func (s *GlobalStore) KeyNodes(key []byte, startAddr *common.Address, limit int) (nodes mock.Nodes, err error) { + err = s.client.Call(&nodes, "mockStore_keyNodes", key, startAddr, limit) + return nodes, err +} diff --git a/swarm/storage/mock/rpc/rpc_test.go b/swarm/storage/mock/rpc/rpc_test.go index f62340ede..6c4652355 100644 --- a/swarm/storage/mock/rpc/rpc_test.go +++ b/swarm/storage/mock/rpc/rpc_test.go @@ -27,6 +27,27 @@ import ( // TestDBStore is running test for a GlobalStore // using test.MockStore function. func TestRPCStore(t *testing.T) { + store, cleanup := newTestStore(t) + defer cleanup() + + test.MockStore(t, store, 30) +} + +// TestRPCStoreListings is running test for a GlobalStore +// using test.MockStoreListings function. +func TestRPCStoreListings(t *testing.T) { + store, cleanup := newTestStore(t) + defer cleanup() + + test.MockStoreListings(t, store, 1000) +} + +// newTestStore creates a temporary GlobalStore +// that will be closed when returned cleanup function +// is called. +func newTestStore(t *testing.T) (s *GlobalStore, cleanup func()) { + t.Helper() + serverStore := mem.NewGlobalStore() server := rpc.NewServer() @@ -35,7 +56,9 @@ func TestRPCStore(t *testing.T) { } store := NewGlobalStore(rpc.DialInProc(server)) - defer store.Close() - - test.MockStore(t, store, 30) + return store, func() { + if err := store.Close(); err != nil { + t.Error(err) + } + } } diff --git a/swarm/storage/mock/test/test.go b/swarm/storage/mock/test/test.go index 69828b144..cc837f0b7 100644 --- a/swarm/storage/mock/test/test.go +++ b/swarm/storage/mock/test/test.go @@ -20,6 +20,7 @@ package test import ( "bytes" + "encoding/binary" "fmt" "io" "strconv" @@ -170,6 +171,123 @@ func MockStore(t *testing.T, globalStore mock.GlobalStorer, n int) { }) } +// MockStoreListings tests global store methods Keys, Nodes, NodeKeys and KeyNodes. +// It uses a provided globalstore to put chunks for n number of node addresses +// and to validate that methods are returning the right responses. +func MockStoreListings(t *testing.T, globalStore mock.GlobalStorer, n int) { + addrs := make([]common.Address, n) + for i := 0; i < n; i++ { + addrs[i] = common.HexToAddress(strconv.FormatInt(int64(i)+1, 16)) + } + type chunk struct { + key []byte + data []byte + } + const chunksPerNode = 5 + keys := make([][]byte, n*chunksPerNode) + for i := 0; i < n*chunksPerNode; i++ { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, uint64(i)) + keys[i] = b + } + + // keep track of keys on every node + nodeKeys := make(map[common.Address][][]byte) + // keep track of nodes that store particular key + keyNodes := make(map[string][]common.Address) + for i := 0; i < chunksPerNode; i++ { + // put chunks for every address + for j := 0; j < n; j++ { + addr := addrs[j] + key := keys[(i*n)+j] + err := globalStore.Put(addr, key, []byte("data")) + if err != nil { + t.Fatal(err) + } + nodeKeys[addr] = append(nodeKeys[addr], key) + keyNodes[string(key)] = append(keyNodes[string(key)], addr) + } + + // test Keys method + var startKey []byte + var gotKeys [][]byte + for { + keys, err := globalStore.Keys(startKey, 0) + if err != nil { + t.Fatal(err) + } + gotKeys = append(gotKeys, keys.Keys...) + if keys.Next == nil { + break + } + startKey = keys.Next + } + wantKeys := keys[:(i+1)*n] + if fmt.Sprint(gotKeys) != fmt.Sprint(wantKeys) { + t.Fatalf("got #%v keys %v, want %v", i+1, gotKeys, wantKeys) + } + + // test Nodes method + var startNode *common.Address + var gotNodes []common.Address + for { + nodes, err := globalStore.Nodes(startNode, 0) + if err != nil { + t.Fatal(err) + } + gotNodes = append(gotNodes, nodes.Addrs...) + if nodes.Next == nil { + break + } + startNode = nodes.Next + } + wantNodes := addrs + if fmt.Sprint(gotNodes) != fmt.Sprint(wantNodes) { + t.Fatalf("got #%v nodes %v, want %v", i+1, gotNodes, wantNodes) + } + + // test NodeKeys method + for addr, wantKeys := range nodeKeys { + var startKey []byte + var gotKeys [][]byte + for { + keys, err := globalStore.NodeKeys(addr, startKey, 0) + if err != nil { + t.Fatal(err) + } + gotKeys = append(gotKeys, keys.Keys...) + if keys.Next == nil { + break + } + startKey = keys.Next + } + if fmt.Sprint(gotKeys) != fmt.Sprint(wantKeys) { + t.Fatalf("got #%v %s node keys %v, want %v", i+1, addr.Hex(), gotKeys, wantKeys) + } + } + + // test KeyNodes method + for key, wantNodes := range keyNodes { + var startNode *common.Address + var gotNodes []common.Address + for { + nodes, err := globalStore.KeyNodes([]byte(key), startNode, 0) + if err != nil { + t.Fatal(err) + } + gotNodes = append(gotNodes, nodes.Addrs...) + if nodes.Next == nil { + break + } + startNode = nodes.Next + } + if fmt.Sprint(gotNodes) != fmt.Sprint(wantNodes) { + t.Fatalf("got #%v %x key nodes %v, want %v", i+1, []byte(key), gotNodes, wantNodes) + } + } + } +} + // ImportExport saves chunks to the outStore, exports them to the tar archive, // imports tar archive to the inStore and checks if all chunks are imported correctly. func ImportExport(t *testing.T, outStore, inStore mock.GlobalStorer, n int) {