From 936c8e19ff3db376fdd1ed53772c51c817a72112 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 23 Apr 2015 14:51:41 +0200 Subject: [PATCH 01/14] p2p/discover: store nodes in leveldb --- p2p/discover/node.go | 83 ++++++++++++++++++++++++++++++++++++------- p2p/discover/table.go | 3 +- 2 files changed, 73 insertions(+), 13 deletions(-) diff --git a/p2p/discover/node.go b/p2p/discover/node.go index 6662a6cb7..d8a5cc351 100644 --- a/p2p/discover/node.go +++ b/p2p/discover/node.go @@ -1,8 +1,10 @@ package discover import ( + "bytes" "crypto/ecdsa" "crypto/elliptic" + "encoding/binary" "encoding/hex" "errors" "fmt" @@ -11,13 +13,16 @@ import ( "math/rand" "net" "net/url" + "os" "strconv" "strings" - "sync" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto/secp256k1" "github.com/ethereum/go-ethereum/rlp" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/opt" + "github.com/syndtr/goleveldb/leveldb/storage" ) const nodeIDBits = 512 @@ -308,23 +313,77 @@ func randomID(a NodeID, n int) (b NodeID) { // nodeDB stores all nodes we know about. type nodeDB struct { - mu sync.RWMutex - byID map[NodeID]*Node + ldb *leveldb.DB +} + +var dbVersionKey = []byte("pv") + +// Opens the backing LevelDB. If path is "", we use an in-memory database. +func newNodeDB(path string, version int64) (db *nodeDB, err error) { + db = new(nodeDB) + opts := new(opt.Options) + if path == "" { + db.ldb, err = leveldb.Open(storage.NewMemStorage(), opts) + } else { + db.ldb, err = openLDB(path, opts, version) + } + return db, err +} + +func openLDB(path string, opts *opt.Options, version int64) (*leveldb.DB, error) { + ldb, err := leveldb.OpenFile(path, opts) + if _, iscorrupted := err.(leveldb.ErrCorrupted); iscorrupted { + ldb, err = leveldb.RecoverFile(path, opts) + } + if err != nil { + return nil, err + } + // The nodes contained in the database correspond to a certain + // protocol version. Flush all nodes if the DB version doesn't match. + // There is no need to do this for memory databases because they + // won't ever be used with a different protocol version. + shouldVal := make([]byte, binary.MaxVarintLen64) + shouldVal = shouldVal[:binary.PutVarint(shouldVal, version)] + val, err := ldb.Get(dbVersionKey, nil) + if err == leveldb.ErrNotFound { + err = ldb.Put(dbVersionKey, shouldVal, nil) + } else if err == nil && !bytes.Equal(val, shouldVal) { + // Delete and start over. + ldb.Close() + if err = os.RemoveAll(path); err != nil { + return nil, err + } + return openLDB(path, opts, version) + } + if err != nil { + ldb.Close() + ldb = nil + } + return ldb, err } func (db *nodeDB) get(id NodeID) *Node { - db.mu.RLock() - defer db.mu.RUnlock() - return db.byID[id] + v, err := db.ldb.Get(id[:], nil) + if err != nil { + return nil + } + n := new(Node) + if err := rlp.DecodeBytes(v, n); err != nil { + return nil + } + return n +} + +func (db *nodeDB) update(n *Node) error { + v, err := rlp.EncodeToBytes(n) + if err != nil { + return err + } + return db.ldb.Put(n.ID[:], v, nil) } func (db *nodeDB) add(id NodeID, addr *net.UDPAddr, tcpPort uint16) *Node { - db.mu.Lock() - defer db.mu.Unlock() - if db.byID == nil { - db.byID = make(map[NodeID]*Node) - } n := &Node{ID: id, IP: addr.IP, DiscPort: addr.Port, TCPPort: int(tcpPort)} - db.byID[n.ID] = n + db.update(n) return n } diff --git a/p2p/discover/table.go b/p2p/discover/table.go index e2e846456..ba2f9b8ec 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -59,9 +59,10 @@ type bucket struct { } func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr) *Table { + db, _ := newNodeDB("", Version) tab := &Table{ net: t, - db: new(nodeDB), + db: db, self: newNode(ourID, ourAddr), bonding: make(map[NodeID]*bondproc), bondslots: make(chan struct{}, maxBondingPingPongs), From 5f735d6fce10b4552b0a6d3eb6503c5a302f4f61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 23 Apr 2015 18:47:24 +0300 Subject: [PATCH 02/14] cmd, eth, p2p, p2p/discover: init and clean up the seed cache --- cmd/bootnode/main.go | 2 +- eth/backend.go | 2 ++ p2p/discover/node.go | 4 ++++ p2p/discover/table.go | 16 +++++++++++++--- p2p/discover/udp.go | 8 ++++---- p2p/server.go | 6 +++++- 6 files changed, 29 insertions(+), 9 deletions(-) diff --git a/cmd/bootnode/main.go b/cmd/bootnode/main.go index dda9f34d4..26912525d 100644 --- a/cmd/bootnode/main.go +++ b/cmd/bootnode/main.go @@ -71,7 +71,7 @@ func main() { } } - if _, err := discover.ListenUDP(nodeKey, *listenAddr, natm); err != nil { + if _, err := discover.ListenUDP(nodeKey, *listenAddr, natm, ""); err != nil { log.Fatal(err) } select {} diff --git a/eth/backend.go b/eth/backend.go index 356e7fd1a..382cfc832 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -179,6 +179,7 @@ func New(config *Config) (*Ethereum, error) { if err != nil { return nil, err } + seedDbPath := path.Join(config.DataDir, "seeds") // Perform database sanity checks d, _ := blockDb.Get([]byte("ProtocolVersion")) @@ -243,6 +244,7 @@ func New(config *Config) (*Ethereum, error) { NAT: config.NAT, NoDial: !config.Dial, BootstrapNodes: config.parseBootNodes(), + SeedCache: seedDbPath, } if len(config.Port) > 0 { eth.net.ListenAddr = ":" + config.Port diff --git a/p2p/discover/node.go b/p2p/discover/node.go index d8a5cc351..3e79ddb53 100644 --- a/p2p/discover/node.go +++ b/p2p/discover/node.go @@ -387,3 +387,7 @@ func (db *nodeDB) add(id NodeID, addr *net.UDPAddr, tcpPort uint16) *Node { db.update(n) return n } + +func (db *nodeDB) close() { + db.ldb.Close() +} diff --git a/p2p/discover/table.go b/p2p/discover/table.go index ba2f9b8ec..3702a2114 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -11,6 +11,9 @@ import ( "sort" "sync" "time" + + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" ) const ( @@ -58,8 +61,14 @@ type bucket struct { entries []*Node } -func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr) *Table { - db, _ := newNodeDB("", Version) +func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, seedCache string) *Table { + // Load the bootstrap seed cache (use in memory db upon failure) + db, err := newNodeDB(seedCache, Version) + if err != nil { + glog.V(logger.Warn).Infoln("Failed to open bootstrap seed cache:", err) + db, _ = newNodeDB("", Version) + } + // Create the bootstrap table tab := &Table{ net: t, db: db, @@ -81,9 +90,10 @@ func (tab *Table) Self() *Node { return tab.self } -// Close terminates the network listener. +// Close terminates the network listener and flushes the seed cache. func (tab *Table) Close() { tab.net.close() + tab.db.close() } // Bootstrap sets the bootstrap nodes. These nodes are used to connect diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go index 07a1a739c..c26703f19 100644 --- a/p2p/discover/udp.go +++ b/p2p/discover/udp.go @@ -144,7 +144,7 @@ type reply struct { } // ListenUDP returns a new table that listens for UDP packets on laddr. -func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface) (*Table, error) { +func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface, seedCache string) (*Table, error) { addr, err := net.ResolveUDPAddr("udp", laddr) if err != nil { return nil, err @@ -153,12 +153,12 @@ func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface) (*Table if err != nil { return nil, err } - tab, _ := newUDP(priv, conn, natm) + tab, _ := newUDP(priv, conn, natm, seedCache) glog.V(logger.Info).Infoln("Listening,", tab.self) return tab, nil } -func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface) (*Table, *udp) { +func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface, seedCache string) (*Table, *udp) { udp := &udp{ conn: c, priv: priv, @@ -176,7 +176,7 @@ func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface) (*Table, *udp) { realaddr = &net.UDPAddr{IP: ext, Port: realaddr.Port} } } - udp.Table = newTable(udp, PubkeyID(&priv.PublicKey), realaddr) + udp.Table = newTable(udp, PubkeyID(&priv.PublicKey), realaddr, seedCache) go udp.loop() go udp.readLoop() return udp.Table, udp diff --git a/p2p/server.go b/p2p/server.go index ecf418d13..39b0b8b6e 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -59,6 +59,10 @@ type Server struct { // with the rest of the network. BootstrapNodes []*discover.Node + // SeedCache is the path to the database containing the previously seen live + // nodes in the network to use as potential bootstrap seeds. + SeedCache string + // Protocols should contain the protocols supported // by the server. Matching protocols are launched for // each peer. @@ -197,7 +201,7 @@ func (srv *Server) Start() (err error) { } // node table - ntab, err := discover.ListenUDP(srv.PrivateKey, srv.ListenAddr, srv.NAT) + ntab, err := discover.ListenUDP(srv.PrivateKey, srv.ListenAddr, srv.NAT, srv.SeedCache) if err != nil { return err } From af923c965ff7a4e652d5c9edabc50f7a03e1135d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 23 Apr 2015 19:24:48 +0300 Subject: [PATCH 03/14] p2p/discovery: use the seed table for finding nodes, auto drop stale ones --- p2p/discover/node.go | 33 ++++++++++++++++++++++++++++++--- p2p/discover/table.go | 10 ++++++++-- 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/p2p/discover/node.go b/p2p/discover/node.go index 3e79ddb53..0ec9630d3 100644 --- a/p2p/discover/node.go +++ b/p2p/discover/node.go @@ -325,12 +325,13 @@ func newNodeDB(path string, version int64) (db *nodeDB, err error) { if path == "" { db.ldb, err = leveldb.Open(storage.NewMemStorage(), opts) } else { - db.ldb, err = openLDB(path, opts, version) + db.ldb, err = openNodeDB(path, opts, version) } return db, err } -func openLDB(path string, opts *opt.Options, version int64) (*leveldb.DB, error) { +// openNodeDB opens a persistent seed cache, flushing old versions. +func openNodeDB(path string, opts *opt.Options, version int64) (*leveldb.DB, error) { ldb, err := leveldb.OpenFile(path, opts) if _, iscorrupted := err.(leveldb.ErrCorrupted); iscorrupted { ldb, err = leveldb.RecoverFile(path, opts) @@ -353,7 +354,7 @@ func openLDB(path string, opts *opt.Options, version int64) (*leveldb.DB, error) if err = os.RemoveAll(path); err != nil { return nil, err } - return openLDB(path, opts, version) + return openNodeDB(path, opts, version) } if err != nil { ldb.Close() @@ -362,6 +363,7 @@ func openLDB(path string, opts *opt.Options, version int64) (*leveldb.DB, error) return ldb, err } +// get retrieves a node with a given id from the seed da func (db *nodeDB) get(id NodeID) *Node { v, err := db.ldb.Get(id[:], nil) if err != nil { @@ -374,6 +376,24 @@ func (db *nodeDB) get(id NodeID) *Node { return n } +// list retrieves a batch of nodes from the database. +func (db *nodeDB) list(n int) []*Node { + it := db.ldb.NewIterator(nil, nil) + defer it.Release() + + nodes := make([]*Node, 0, n) + for i := 0; i < n && it.Next(); i++ { + var id NodeID + copy(id[:], it.Key()) + + if node := db.get(id); node != nil { + nodes = append(nodes, node) + } + } + return nodes +} + +// update inserts - potentially overwriting - a node in the seed database. func (db *nodeDB) update(n *Node) error { v, err := rlp.EncodeToBytes(n) if err != nil { @@ -382,12 +402,19 @@ func (db *nodeDB) update(n *Node) error { return db.ldb.Put(n.ID[:], v, nil) } +// add inserts a new node into the seed database. func (db *nodeDB) add(id NodeID, addr *net.UDPAddr, tcpPort uint16) *Node { n := &Node{ID: id, IP: addr.IP, DiscPort: addr.Port, TCPPort: int(tcpPort)} db.update(n) return n } +// delete removes a node from the database. +func (db *nodeDB) delete(id NodeID) error { + return db.ldb.Delete(id[:], nil) +} + +// close flushes and closes the database files. func (db *nodeDB) close() { db.ldb.Close() } diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 3702a2114..fa791c9f3 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -177,8 +177,14 @@ func (tab *Table) refresh() { result := tab.Lookup(randomID(tab.self.ID, ld)) if len(result) == 0 { - // bootstrap the table with a self lookup - all := tab.bondall(tab.nursery) + // Pick a batch of previously know seeds to lookup with and discard them (will come back if they are still live) + seeds := tab.db.list(10) + for _, seed := range seeds { + glog.V(logger.Debug).Infoln("Seeding network with:", seed) + tab.db.delete(seed.ID) + } + // Bootstrap the table with a self lookup + all := tab.bondall(append(tab.nursery, seeds...)) tab.mutex.Lock() tab.add(all) tab.mutex.Unlock() From 971702e7a1a5e698721fa6147c444abad9c20141 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 23 Apr 2015 19:40:35 +0300 Subject: [PATCH 04/14] p2p/discovery: fix broken tests due to API update --- p2p/discover/table_test.go | 6 +++--- p2p/discover/udp_test.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/p2p/discover/table_test.go b/p2p/discover/table_test.go index a98376bca..e2bd3c8ad 100644 --- a/p2p/discover/table_test.go +++ b/p2p/discover/table_test.go @@ -15,7 +15,7 @@ import ( func TestTable_pingReplace(t *testing.T) { doit := func(newNodeIsResponding, lastInBucketIsResponding bool) { transport := newPingRecorder() - tab := newTable(transport, NodeID{}, &net.UDPAddr{}) + tab := newTable(transport, NodeID{}, &net.UDPAddr{}, "") last := fillBucket(tab, 200) pingSender := randomID(tab.self.ID, 200) @@ -145,7 +145,7 @@ func TestTable_closest(t *testing.T) { test := func(test *closeTest) bool { // for any node table, Target and N - tab := newTable(nil, test.Self, &net.UDPAddr{}) + tab := newTable(nil, test.Self, &net.UDPAddr{}, "") tab.add(test.All) // check that doClosest(Target, N) returns nodes @@ -217,7 +217,7 @@ func TestTable_Lookup(t *testing.T) { self := gen(NodeID{}, quickrand).(NodeID) target := randomID(self, 200) transport := findnodeOracle{t, target} - tab := newTable(transport, self, &net.UDPAddr{}) + tab := newTable(transport, self, &net.UDPAddr{}, "") // lookup on empty table returns no nodes if results := tab.Lookup(target); len(results) > 0 { diff --git a/p2p/discover/udp_test.go b/p2p/discover/udp_test.go index c6c4d78e3..782895e46 100644 --- a/p2p/discover/udp_test.go +++ b/p2p/discover/udp_test.go @@ -41,7 +41,7 @@ func newUDPTest(t *testing.T) *udpTest { remotekey: newkey(), remoteaddr: &net.UDPAddr{IP: net.IP{1, 2, 3, 4}, Port: 30303}, } - test.table, test.udp = newUDP(test.localkey, test.pipe, nil) + test.table, test.udp = newUDP(test.localkey, test.pipe, nil, "") return test } From 6def110c37d4d43402c4b658ce6b291400f840e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 24 Apr 2015 11:19:33 +0300 Subject: [PATCH 05/14] cmd/bootnode, eth, p2p, p2p/discover: clean up the seeder and mesh into eth. --- cmd/bootnode/main.go | 2 +- eth/backend.go | 13 +++- p2p/discover/cache.go | 134 +++++++++++++++++++++++++++++++++++++ p2p/discover/node.go | 114 ------------------------------- p2p/discover/table.go | 23 +++---- p2p/discover/table_test.go | 6 +- p2p/discover/udp.go | 10 +-- p2p/discover/udp_test.go | 4 +- p2p/server.go | 6 +- 9 files changed, 168 insertions(+), 144 deletions(-) create mode 100644 p2p/discover/cache.go diff --git a/cmd/bootnode/main.go b/cmd/bootnode/main.go index 26912525d..826604cdc 100644 --- a/cmd/bootnode/main.go +++ b/cmd/bootnode/main.go @@ -71,7 +71,7 @@ func main() { } } - if _, err := discover.ListenUDP(nodeKey, *listenAddr, natm, ""); err != nil { + if _, err := discover.ListenUDP(nodeKey, *listenAddr, natm, nil); err != nil { log.Fatal(err) } select {} diff --git a/eth/backend.go b/eth/backend.go index 382cfc832..039f730f1 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -125,6 +125,8 @@ type Ethereum struct { blockDb common.Database // Block chain database stateDb common.Database // State changes database extraDb common.Database // Extra database (txs, etc) + seedDb *discover.Cache // Peer database seeding the bootstrap + // Closed when databases are flushed and closed databasesClosed chan bool @@ -179,7 +181,10 @@ func New(config *Config) (*Ethereum, error) { if err != nil { return nil, err } - seedDbPath := path.Join(config.DataDir, "seeds") + seedDb, err := discover.NewPersistentCache(path.Join(config.DataDir, "seeds")) + if err != nil { + return nil, err + } // Perform database sanity checks d, _ := blockDb.Get([]byte("ProtocolVersion")) @@ -207,6 +212,7 @@ func New(config *Config) (*Ethereum, error) { blockDb: blockDb, stateDb: stateDb, extraDb: extraDb, + seedDb: seedDb, eventMux: &event.TypeMux{}, accountManager: config.AccountManager, DataDir: config.DataDir, @@ -244,7 +250,7 @@ func New(config *Config) (*Ethereum, error) { NAT: config.NAT, NoDial: !config.Dial, BootstrapNodes: config.parseBootNodes(), - SeedCache: seedDbPath, + SeedCache: seedDb, } if len(config.Port) > 0 { eth.net.ListenAddr = ":" + config.Port @@ -423,6 +429,7 @@ done: } } + s.seedDb.Close() s.blockDb.Close() s.stateDb.Close() s.extraDb.Close() @@ -450,7 +457,7 @@ func (self *Ethereum) SuggestPeer(nodeURL string) error { } func (s *Ethereum) Stop() { - s.txSub.Unsubscribe() // quits txBroadcastLoop + s.txSub.Unsubscribe() // quits txBroadcastLoop s.protocolManager.Stop() s.txPool.Stop() diff --git a/p2p/discover/cache.go b/p2p/discover/cache.go new file mode 100644 index 000000000..f6bab4591 --- /dev/null +++ b/p2p/discover/cache.go @@ -0,0 +1,134 @@ +// Contains the discovery cache, storing previously seen nodes to act as seed +// servers during bootstrapping the network. + +package discover + +import ( + "bytes" + "encoding/binary" + "net" + "os" + + "github.com/ethereum/go-ethereum/rlp" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/storage" +) + +// Cache stores all nodes we know about. +type Cache struct { + db *leveldb.DB +} + +// Cache version to allow dumping old data if it changes. +var cacheVersionKey = []byte("pv") + +// NewMemoryCache creates a new in-memory peer cache without a persistent backend. +func NewMemoryCache() (*Cache, error) { + db, err := leveldb.Open(storage.NewMemStorage(), nil) + if err != nil { + return nil, err + } + return &Cache{db: db}, nil +} + +// NewPersistentCache creates/opens a leveldb backed persistent peer cache, also +// flushing its contents in case of a version mismatch. +func NewPersistentCache(path string) (*Cache, error) { + // Try to open the cache, recovering any corruption + db, err := leveldb.OpenFile(path, nil) + if _, iscorrupted := err.(leveldb.ErrCorrupted); iscorrupted { + db, err = leveldb.RecoverFile(path, nil) + } + if err != nil { + return nil, err + } + // The nodes contained in the cache correspond to a certain protocol version. + // Flush all nodes if the version doesn't match. + currentVer := make([]byte, binary.MaxVarintLen64) + currentVer = currentVer[:binary.PutVarint(currentVer, Version)] + + blob, err := db.Get(cacheVersionKey, nil) + switch err { + case leveldb.ErrNotFound: + // Version not found (i.e. empty cache), insert it + err = db.Put(cacheVersionKey, currentVer, nil) + + case nil: + // Version present, flush if different + if !bytes.Equal(blob, currentVer) { + db.Close() + if err = os.RemoveAll(path); err != nil { + return nil, err + } + return NewPersistentCache(path) + } + } + // Clean up in case of an error + if err != nil { + db.Close() + return nil, err + } + return &Cache{db: db}, nil +} + +// get retrieves a node with a given id from the seed da +func (c *Cache) get(id NodeID) *Node { + blob, err := c.db.Get(id[:], nil) + if err != nil { + return nil + } + node := new(Node) + if err := rlp.DecodeBytes(blob, node); err != nil { + return nil + } + return node +} + +// list retrieves a batch of nodes from the database. +func (c *Cache) list(n int) []*Node { + it := c.db.NewIterator(nil, nil) + defer it.Release() + + nodes := make([]*Node, 0, n) + for i := 0; i < n && it.Next(); i++ { + var id NodeID + copy(id[:], it.Key()) + + if node := c.get(id); node != nil { + nodes = append(nodes, node) + } + } + return nodes +} + +// update inserts - potentially overwriting - a node in the seed database. +func (c *Cache) update(node *Node) error { + blob, err := rlp.EncodeToBytes(node) + if err != nil { + return err + } + return c.db.Put(node.ID[:], blob, nil) +} + +// add inserts a new node into the seed database. +func (c *Cache) add(id NodeID, addr *net.UDPAddr, tcpPort uint16) *Node { + node := &Node{ + ID: id, + IP: addr.IP, + DiscPort: addr.Port, + TCPPort: int(tcpPort), + } + c.update(node) + + return node +} + +// delete removes a node from the database. +func (c *Cache) delete(id NodeID) error { + return c.db.Delete(id[:], nil) +} + +// Close flushes and closes the database files. +func (c *Cache) Close() { + c.db.Close() +} diff --git a/p2p/discover/node.go b/p2p/discover/node.go index 0ec9630d3..e66ca37a4 100644 --- a/p2p/discover/node.go +++ b/p2p/discover/node.go @@ -1,10 +1,8 @@ package discover import ( - "bytes" "crypto/ecdsa" "crypto/elliptic" - "encoding/binary" "encoding/hex" "errors" "fmt" @@ -13,16 +11,12 @@ import ( "math/rand" "net" "net/url" - "os" "strconv" "strings" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto/secp256k1" "github.com/ethereum/go-ethereum/rlp" - "github.com/syndtr/goleveldb/leveldb" - "github.com/syndtr/goleveldb/leveldb/opt" - "github.com/syndtr/goleveldb/leveldb/storage" ) const nodeIDBits = 512 @@ -310,111 +304,3 @@ func randomID(a NodeID, n int) (b NodeID) { } return b } - -// nodeDB stores all nodes we know about. -type nodeDB struct { - ldb *leveldb.DB -} - -var dbVersionKey = []byte("pv") - -// Opens the backing LevelDB. If path is "", we use an in-memory database. -func newNodeDB(path string, version int64) (db *nodeDB, err error) { - db = new(nodeDB) - opts := new(opt.Options) - if path == "" { - db.ldb, err = leveldb.Open(storage.NewMemStorage(), opts) - } else { - db.ldb, err = openNodeDB(path, opts, version) - } - return db, err -} - -// openNodeDB opens a persistent seed cache, flushing old versions. -func openNodeDB(path string, opts *opt.Options, version int64) (*leveldb.DB, error) { - ldb, err := leveldb.OpenFile(path, opts) - if _, iscorrupted := err.(leveldb.ErrCorrupted); iscorrupted { - ldb, err = leveldb.RecoverFile(path, opts) - } - if err != nil { - return nil, err - } - // The nodes contained in the database correspond to a certain - // protocol version. Flush all nodes if the DB version doesn't match. - // There is no need to do this for memory databases because they - // won't ever be used with a different protocol version. - shouldVal := make([]byte, binary.MaxVarintLen64) - shouldVal = shouldVal[:binary.PutVarint(shouldVal, version)] - val, err := ldb.Get(dbVersionKey, nil) - if err == leveldb.ErrNotFound { - err = ldb.Put(dbVersionKey, shouldVal, nil) - } else if err == nil && !bytes.Equal(val, shouldVal) { - // Delete and start over. - ldb.Close() - if err = os.RemoveAll(path); err != nil { - return nil, err - } - return openNodeDB(path, opts, version) - } - if err != nil { - ldb.Close() - ldb = nil - } - return ldb, err -} - -// get retrieves a node with a given id from the seed da -func (db *nodeDB) get(id NodeID) *Node { - v, err := db.ldb.Get(id[:], nil) - if err != nil { - return nil - } - n := new(Node) - if err := rlp.DecodeBytes(v, n); err != nil { - return nil - } - return n -} - -// list retrieves a batch of nodes from the database. -func (db *nodeDB) list(n int) []*Node { - it := db.ldb.NewIterator(nil, nil) - defer it.Release() - - nodes := make([]*Node, 0, n) - for i := 0; i < n && it.Next(); i++ { - var id NodeID - copy(id[:], it.Key()) - - if node := db.get(id); node != nil { - nodes = append(nodes, node) - } - } - return nodes -} - -// update inserts - potentially overwriting - a node in the seed database. -func (db *nodeDB) update(n *Node) error { - v, err := rlp.EncodeToBytes(n) - if err != nil { - return err - } - return db.ldb.Put(n.ID[:], v, nil) -} - -// add inserts a new node into the seed database. -func (db *nodeDB) add(id NodeID, addr *net.UDPAddr, tcpPort uint16) *Node { - n := &Node{ID: id, IP: addr.IP, DiscPort: addr.Port, TCPPort: int(tcpPort)} - db.update(n) - return n -} - -// delete removes a node from the database. -func (db *nodeDB) delete(id NodeID) error { - return db.ldb.Delete(id[:], nil) -} - -// close flushes and closes the database files. -func (db *nodeDB) close() { - db.ldb.Close() -} diff --git a/p2p/discover/table.go b/p2p/discover/table.go index fa791c9f3..98371d6f9 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -27,6 +27,7 @@ type Table struct { mutex sync.Mutex // protects buckets, their content, and nursery buckets [nBuckets]*bucket // index of known nodes by distance nursery []*Node // bootstrap nodes + cache *Cache // cache of known nodes bondmu sync.Mutex bonding map[NodeID]*bondproc @@ -34,7 +35,6 @@ type Table struct { net transport self *Node // metadata of the local node - db *nodeDB } type bondproc struct { @@ -61,17 +61,15 @@ type bucket struct { entries []*Node } -func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, seedCache string) *Table { - // Load the bootstrap seed cache (use in memory db upon failure) - db, err := newNodeDB(seedCache, Version) - if err != nil { - glog.V(logger.Warn).Infoln("Failed to open bootstrap seed cache:", err) - db, _ = newNodeDB("", Version) +func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, seeder *Cache) *Table { + // If no seed cache was given, use an in-memory one + if seeder == nil { + seeder, _ = NewMemoryCache() } // Create the bootstrap table tab := &Table{ net: t, - db: db, + cache: seeder, self: newNode(ourID, ourAddr), bonding: make(map[NodeID]*bondproc), bondslots: make(chan struct{}, maxBondingPingPongs), @@ -93,7 +91,6 @@ func (tab *Table) Self() *Node { // Close terminates the network listener and flushes the seed cache. func (tab *Table) Close() { tab.net.close() - tab.db.close() } // Bootstrap sets the bootstrap nodes. These nodes are used to connect @@ -178,10 +175,10 @@ func (tab *Table) refresh() { result := tab.Lookup(randomID(tab.self.ID, ld)) if len(result) == 0 { // Pick a batch of previously know seeds to lookup with and discard them (will come back if they are still live) - seeds := tab.db.list(10) + seeds := tab.cache.list(10) for _, seed := range seeds { glog.V(logger.Debug).Infoln("Seeding network with:", seed) - tab.db.delete(seed.ID) + tab.cache.delete(seed.ID) } // Bootstrap the table with a self lookup all := tab.bondall(append(tab.nursery, seeds...)) @@ -252,7 +249,7 @@ func (tab *Table) bondall(nodes []*Node) (result []*Node) { // of the process can be skipped. func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16) (*Node, error) { var n *Node - if n = tab.db.get(id); n == nil { + if n = tab.cache.get(id); n == nil { tab.bondmu.Lock() w := tab.bonding[id] if w != nil { @@ -297,7 +294,7 @@ func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAdd // waitping will simply time out. tab.net.waitping(id) } - w.n = tab.db.add(id, addr, tcpPort) + w.n = tab.cache.add(id, addr, tcpPort) close(w.done) } diff --git a/p2p/discover/table_test.go b/p2p/discover/table_test.go index e2bd3c8ad..8274731e3 100644 --- a/p2p/discover/table_test.go +++ b/p2p/discover/table_test.go @@ -15,7 +15,7 @@ import ( func TestTable_pingReplace(t *testing.T) { doit := func(newNodeIsResponding, lastInBucketIsResponding bool) { transport := newPingRecorder() - tab := newTable(transport, NodeID{}, &net.UDPAddr{}, "") + tab := newTable(transport, NodeID{}, &net.UDPAddr{}, nil) last := fillBucket(tab, 200) pingSender := randomID(tab.self.ID, 200) @@ -145,7 +145,7 @@ func TestTable_closest(t *testing.T) { test := func(test *closeTest) bool { // for any node table, Target and N - tab := newTable(nil, test.Self, &net.UDPAddr{}, "") + tab := newTable(nil, test.Self, &net.UDPAddr{}, nil) tab.add(test.All) // check that doClosest(Target, N) returns nodes @@ -217,7 +217,7 @@ func TestTable_Lookup(t *testing.T) { self := gen(NodeID{}, quickrand).(NodeID) target := randomID(self, 200) transport := findnodeOracle{t, target} - tab := newTable(transport, self, &net.UDPAddr{}, "") + tab := newTable(transport, self, &net.UDPAddr{}, nil) // lookup on empty table returns no nodes if results := tab.Lookup(target); len(results) > 0 { diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go index c26703f19..6805fb686 100644 --- a/p2p/discover/udp.go +++ b/p2p/discover/udp.go @@ -144,7 +144,7 @@ type reply struct { } // ListenUDP returns a new table that listens for UDP packets on laddr. -func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface, seedCache string) (*Table, error) { +func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface, seeder *Cache) (*Table, error) { addr, err := net.ResolveUDPAddr("udp", laddr) if err != nil { return nil, err @@ -153,12 +153,12 @@ func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface, seedCac if err != nil { return nil, err } - tab, _ := newUDP(priv, conn, natm, seedCache) + tab, _ := newUDP(priv, conn, natm, seeder) glog.V(logger.Info).Infoln("Listening,", tab.self) return tab, nil } -func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface, seedCache string) (*Table, *udp) { +func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface, seeder *Cache) (*Table, *udp) { udp := &udp{ conn: c, priv: priv, @@ -176,7 +176,7 @@ func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface, seedCache string realaddr = &net.UDPAddr{IP: ext, Port: realaddr.Port} } } - udp.Table = newTable(udp, PubkeyID(&priv.PublicKey), realaddr, seedCache) + udp.Table = newTable(udp, PubkeyID(&priv.PublicKey), realaddr, seeder) go udp.loop() go udp.readLoop() return udp.Table, udp @@ -449,7 +449,7 @@ func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte if expired(req.Expiration) { return errExpired } - if t.db.get(fromID) == nil { + if t.cache.get(fromID) == nil { // No bond exists, we don't process the packet. This prevents // an attack vector where the discovery protocol could be used // to amplify traffic in a DDOS attack. A malicious actor diff --git a/p2p/discover/udp_test.go b/p2p/discover/udp_test.go index 782895e46..299f94543 100644 --- a/p2p/discover/udp_test.go +++ b/p2p/discover/udp_test.go @@ -41,7 +41,7 @@ func newUDPTest(t *testing.T) *udpTest { remotekey: newkey(), remoteaddr: &net.UDPAddr{IP: net.IP{1, 2, 3, 4}, Port: 30303}, } - test.table, test.udp = newUDP(test.localkey, test.pipe, nil, "") + test.table, test.udp = newUDP(test.localkey, test.pipe, nil, nil) return test } @@ -157,7 +157,7 @@ func TestUDP_findnode(t *testing.T) { // ensure there's a bond with the test node, // findnode won't be accepted otherwise. - test.table.db.add(PubkeyID(&test.remotekey.PublicKey), test.remoteaddr, 99) + test.table.cache.add(PubkeyID(&test.remotekey.PublicKey), test.remoteaddr, 99) // check that closest neighbors are returned. test.packetIn(nil, findnodePacket, &findnode{Target: testTarget, Expiration: futureExp}) diff --git a/p2p/server.go b/p2p/server.go index 39b0b8b6e..5f1b80f51 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -59,9 +59,9 @@ type Server struct { // with the rest of the network. BootstrapNodes []*discover.Node - // SeedCache is the path to the database containing the previously seen live - // nodes in the network to use as potential bootstrap seeds. - SeedCache string + // SeedCache is the database containing the previously seen live nodes in + // the network to use as potential bootstrap seeds. + SeedCache *discover.Cache // Protocols should contain the protocols supported // by the server. Matching protocols are launched for From 8646365b42ddae30e596835b4512792ca11196a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 24 Apr 2015 18:04:41 +0300 Subject: [PATCH 06/14] cmd/bootnode, eth, p2p, p2p/discover: use a fancier db design --- cmd/bootnode/main.go | 2 +- eth/backend.go | 10 +- p2p/discover/cache.go | 134 --------------------- p2p/discover/database.go | 233 +++++++++++++++++++++++++++++++++++++ p2p/discover/table.go | 36 ++++-- p2p/discover/table_test.go | 6 +- p2p/discover/udp.go | 10 +- p2p/discover/udp_test.go | 10 +- p2p/server.go | 8 +- 9 files changed, 280 insertions(+), 169 deletions(-) delete mode 100644 p2p/discover/cache.go create mode 100644 p2p/discover/database.go diff --git a/cmd/bootnode/main.go b/cmd/bootnode/main.go index 826604cdc..26912525d 100644 --- a/cmd/bootnode/main.go +++ b/cmd/bootnode/main.go @@ -71,7 +71,7 @@ func main() { } } - if _, err := discover.ListenUDP(nodeKey, *listenAddr, natm, nil); err != nil { + if _, err := discover.ListenUDP(nodeKey, *listenAddr, natm, ""); err != nil { log.Fatal(err) } select {} diff --git a/eth/backend.go b/eth/backend.go index 039f730f1..28640b63d 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -125,7 +125,6 @@ type Ethereum struct { blockDb common.Database // Block chain database stateDb common.Database // State changes database extraDb common.Database // Extra database (txs, etc) - seedDb *discover.Cache // Peer database seeding the bootstrap // Closed when databases are flushed and closed databasesClosed chan bool @@ -181,10 +180,7 @@ func New(config *Config) (*Ethereum, error) { if err != nil { return nil, err } - seedDb, err := discover.NewPersistentCache(path.Join(config.DataDir, "seeds")) - if err != nil { - return nil, err - } + nodeDb := path.Join(config.DataDir, "nodes") // Perform database sanity checks d, _ := blockDb.Get([]byte("ProtocolVersion")) @@ -212,7 +208,6 @@ func New(config *Config) (*Ethereum, error) { blockDb: blockDb, stateDb: stateDb, extraDb: extraDb, - seedDb: seedDb, eventMux: &event.TypeMux{}, accountManager: config.AccountManager, DataDir: config.DataDir, @@ -250,7 +245,7 @@ func New(config *Config) (*Ethereum, error) { NAT: config.NAT, NoDial: !config.Dial, BootstrapNodes: config.parseBootNodes(), - SeedCache: seedDb, + NodeDatabase: nodeDb, } if len(config.Port) > 0 { eth.net.ListenAddr = ":" + config.Port @@ -429,7 +424,6 @@ done: } } - s.seedDb.Close() s.blockDb.Close() s.stateDb.Close() s.extraDb.Close() diff --git a/p2p/discover/cache.go b/p2p/discover/cache.go deleted file mode 100644 index f6bab4591..000000000 --- a/p2p/discover/cache.go +++ /dev/null @@ -1,134 +0,0 @@ -// Contains the discovery cache, storing previously seen nodes to act as seed -// servers during bootstrapping the network. - -package discover - -import ( - "bytes" - "encoding/binary" - "net" - "os" - - "github.com/ethereum/go-ethereum/rlp" - "github.com/syndtr/goleveldb/leveldb" - "github.com/syndtr/goleveldb/leveldb/storage" -) - -// Cache stores all nodes we know about. -type Cache struct { - db *leveldb.DB -} - -// Cache version to allow dumping old data if it changes. -var cacheVersionKey = []byte("pv") - -// NewMemoryCache creates a new in-memory peer cache without a persistent backend. -func NewMemoryCache() (*Cache, error) { - db, err := leveldb.Open(storage.NewMemStorage(), nil) - if err != nil { - return nil, err - } - return &Cache{db: db}, nil -} - -// NewPersistentCache creates/opens a leveldb backed persistent peer cache, also -// flushing its contents in case of a version mismatch. -func NewPersistentCache(path string) (*Cache, error) { - // Try to open the cache, recovering any corruption - db, err := leveldb.OpenFile(path, nil) - if _, iscorrupted := err.(leveldb.ErrCorrupted); iscorrupted { - db, err = leveldb.RecoverFile(path, nil) - } - if err != nil { - return nil, err - } - // The nodes contained in the cache correspond to a certain protocol version. - // Flush all nodes if the version doesn't match. - currentVer := make([]byte, binary.MaxVarintLen64) - currentVer = currentVer[:binary.PutVarint(currentVer, Version)] - - blob, err := db.Get(cacheVersionKey, nil) - switch err { - case leveldb.ErrNotFound: - // Version not found (i.e. empty cache), insert it - err = db.Put(cacheVersionKey, currentVer, nil) - - case nil: - // Version present, flush if different - if !bytes.Equal(blob, currentVer) { - db.Close() - if err = os.RemoveAll(path); err != nil { - return nil, err - } - return NewPersistentCache(path) - } - } - // Clean up in case of an error - if err != nil { - db.Close() - return nil, err - } - return &Cache{db: db}, nil -} - -// get retrieves a node with a given id from the seed da -func (c *Cache) get(id NodeID) *Node { - blob, err := c.db.Get(id[:], nil) - if err != nil { - return nil - } - node := new(Node) - if err := rlp.DecodeBytes(blob, node); err != nil { - return nil - } - return node -} - -// list retrieves a batch of nodes from the database. -func (c *Cache) list(n int) []*Node { - it := c.db.NewIterator(nil, nil) - defer it.Release() - - nodes := make([]*Node, 0, n) - for i := 0; i < n && it.Next(); i++ { - var id NodeID - copy(id[:], it.Key()) - - if node := c.get(id); node != nil { - nodes = append(nodes, node) - } - } - return nodes -} - -// update inserts - potentially overwriting - a node in the seed database. -func (c *Cache) update(node *Node) error { - blob, err := rlp.EncodeToBytes(node) - if err != nil { - return err - } - return c.db.Put(node.ID[:], blob, nil) -} - -// add inserts a new node into the seed database. -func (c *Cache) add(id NodeID, addr *net.UDPAddr, tcpPort uint16) *Node { - node := &Node{ - ID: id, - IP: addr.IP, - DiscPort: addr.Port, - TCPPort: int(tcpPort), - } - c.update(node) - - return node -} - -// delete removes a node from the database. -func (c *Cache) delete(id NodeID) error { - return c.db.Delete(id[:], nil) -} - -// Close flushes and closes the database files. -func (c *Cache) Close() { - c.db.Close() -} diff --git a/p2p/discover/database.go b/p2p/discover/database.go new file mode 100644 index 000000000..93a2ded24 --- /dev/null +++ b/p2p/discover/database.go @@ -0,0 +1,233 @@ +// Contains the node database, storing previously seen nodes and any collected +// metadata about them for QoS purposes. + +package discover + +import ( + "bytes" + "encoding/binary" + "os" + "time" + + "github.com/ethereum/go-ethereum/rlp" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/storage" +) + +// nodeDB stores all nodes we know about. +type nodeDB struct { + lvl *leveldb.DB +} + +// Schema layout for the node database +var ( + nodeDBVersionKey = []byte("version") // Version of the database to flush if changes + nodeDBStartupKey = []byte("startup") // Time when the node discovery started (seed selection) + nodeDBItemPrefix = []byte("n:") // Identifier to prefix node entries with + + nodeDBDiscoverRoot = ":discover" + nodeDBDiscoverPing = nodeDBDiscoverRoot + ":lastping" + nodeDBDiscoverBond = nodeDBDiscoverRoot + ":lastbond" +) + +// newNodeDB creates a new node database for storing and retrieving infos about +// known peers in the network. If no path is given, an in-memory, temporary +// database is constructed. +func newNodeDB(path string) (*nodeDB, error) { + if path == "" { + return newMemoryNodeDB() + } + return newPersistentNodeDB(path) +} + +// newMemoryNodeDB creates a new in-memory node database without a persistent +// backend. +func newMemoryNodeDB() (*nodeDB, error) { + db, err := leveldb.Open(storage.NewMemStorage(), nil) + if err != nil { + return nil, err + } + return &nodeDB{lvl: db}, nil +} + +// newPersistentNodeDB creates/opens a leveldb backed persistent node database, +// also flushing its contents in case of a version mismatch. +func newPersistentNodeDB(path string) (*nodeDB, error) { + // Try to open the cache, recovering any corruption + db, err := leveldb.OpenFile(path, nil) + if _, iscorrupted := err.(leveldb.ErrCorrupted); iscorrupted { + db, err = leveldb.RecoverFile(path, nil) + } + if err != nil { + return nil, err + } + // The nodes contained in the cache correspond to a certain protocol version. + // Flush all nodes if the version doesn't match. + currentVer := make([]byte, binary.MaxVarintLen64) + currentVer = currentVer[:binary.PutVarint(currentVer, Version)] + + blob, err := db.Get(nodeDBVersionKey, nil) + switch err { + case leveldb.ErrNotFound: + // Version not found (i.e. empty cache), insert it + err = db.Put(nodeDBVersionKey, currentVer, nil) + + case nil: + // Version present, flush if different + if !bytes.Equal(blob, currentVer) { + db.Close() + if err = os.RemoveAll(path); err != nil { + return nil, err + } + return newPersistentNodeDB(path) + } + } + // Clean up in case of an error + if err != nil { + db.Close() + return nil, err + } + return &nodeDB{lvl: db}, nil +} + +// key generates the leveldb key-blob from a node id and its particular field of +// interest. +func (db *nodeDB) key(id NodeID, field string) []byte { + return append(nodeDBItemPrefix, append(id[:], field...)...) +} + +// splitKey tries to split a database key into a node id and a field part. +func (db *nodeDB) splitKey(key []byte) (id NodeID, field string) { + // If the key is not of a node, return it plainly + if !bytes.HasPrefix(key, nodeDBItemPrefix) { + return NodeID{}, string(key) + } + // Otherwise split the id and field + item := key[len(nodeDBItemPrefix):] + copy(id[:], item[:len(id)]) + field = string(item[len(id):]) + + return id, field +} + +// fetchTime retrieves a time instance (encoded as a unix timestamp) associated +// with a particular database key. +func (db *nodeDB) fetchTime(key []byte) time.Time { + blob, err := db.lvl.Get(key, nil) + if err != nil { + return time.Time{} + } + var unix int64 + if err := rlp.DecodeBytes(blob, &unix); err != nil { + return time.Time{} + } + return time.Unix(unix, 0) +} + +// storeTime update a specific database entry to the current time instance as a +// unix timestamp. +func (db *nodeDB) storeTime(key []byte, instance time.Time) error { + blob, err := rlp.EncodeToBytes(instance.Unix()) + if err != nil { + return err + } + return db.lvl.Put(key, blob, nil) +} + +// startup retrieves the time instance when the bootstrapping last begun. Its +// purpose is to prevent contacting potential seed nodes multiple times in the +// same boot cycle. +func (db *nodeDB) startup() time.Time { + return db.fetchTime(nodeDBStartupKey) +} + +// updateStartup updates the bootstrap initiation time to the one specified. +func (db *nodeDB) updateStartup(instance time.Time) error { + return db.storeTime(nodeDBStartupKey, instance) +} + +// node retrieves a node with a given id from the database. +func (db *nodeDB) node(id NodeID) *Node { + blob, err := db.lvl.Get(db.key(id, nodeDBDiscoverRoot), nil) + if err != nil { + return nil + } + node := new(Node) + if err := rlp.DecodeBytes(blob, node); err != nil { + return nil + } + return node +} + +// updateNode inserts - potentially overwriting - a node into the peer database. +func (db *nodeDB) updateNode(node *Node) error { + blob, err := rlp.EncodeToBytes(node) + if err != nil { + return err + } + return db.lvl.Put(db.key(node.ID, nodeDBDiscoverRoot), blob, nil) +} + +// lastPing retrieves the time of the last ping packet send to a remote node, +// requesting binding. +func (db *nodeDB) lastPing(id NodeID) time.Time { + return db.fetchTime(db.key(id, nodeDBDiscoverPing)) +} + +// updateLastPing updates the last time we tried contacting a remote node. +func (db *nodeDB) updateLastPing(id NodeID, instance time.Time) error { + return db.storeTime(db.key(id, nodeDBDiscoverPing), instance) +} + +// lastBond retrieves the time of the last successful bonding with a remote node. +func (db *nodeDB) lastBond(id NodeID) time.Time { + return db.fetchTime(db.key(id, nodeDBDiscoverBond)) +} + +// updateLastBond updates the last time we successfully bound to a remote node. +func (db *nodeDB) updateLastBond(id NodeID, instance time.Time) error { + return db.storeTime(db.key(id, nodeDBDiscoverBond), instance) +} + +// querySeeds retrieves a batch of nodes to be used as potential seed servers +// during bootstrapping the node into the network. +// +// Ideal seeds are the most recently seen nodes (highest probability to be still +// alive), but yet untried. However, since leveldb only supports dumb iteration +// we will instead start pulling in potential seeds that haven't been yet pinged +// since the start of the boot procedure. +// +// If the database runs out of potential seeds, we restart the startup counter +// and start iterating over the peers again. +func (db *nodeDB) querySeeds(n int) []*Node { + startup := db.startup() + + it := db.lvl.NewIterator(nil, nil) + defer it.Release() + + nodes := make([]*Node, 0, n) + for len(nodes) < n && it.Next() { + // Iterate until a discovery node is found + id, field := db.splitKey(it.Key()) + if field != nodeDBDiscoverRoot { + continue + } + // Retrieve the last ping time, and if older than startup, query + lastPing := db.lastPing(id) + if lastPing.Before(startup) { + if node := db.node(id); node != nil { + nodes = append(nodes, node) + } + } + } + // Reset the startup time if no seeds were found + if len(nodes) == 0 { + db.updateStartup(time.Now()) + } + return nodes +} + +// close flushes and closes the database files. +func (db *nodeDB) close() { + db.lvl.Close() +} diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 98371d6f9..891bbfd05 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -27,7 +27,7 @@ type Table struct { mutex sync.Mutex // protects buckets, their content, and nursery buckets [nBuckets]*bucket // index of known nodes by distance nursery []*Node // bootstrap nodes - cache *Cache // cache of known nodes + db *nodeDB // database of known nodes bondmu sync.Mutex bonding map[NodeID]*bondproc @@ -61,15 +61,17 @@ type bucket struct { entries []*Node } -func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, seeder *Cache) *Table { +func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string) *Table { // If no seed cache was given, use an in-memory one - if seeder == nil { - seeder, _ = NewMemoryCache() + db, err := newNodeDB(nodeDBPath) + if err != nil { + glog.V(logger.Warn).Infoln("Failed to open node database:", err) + db, _ = newNodeDB("") } // Create the bootstrap table tab := &Table{ net: t, - cache: seeder, + db: db, self: newNode(ourID, ourAddr), bonding: make(map[NodeID]*bondproc), bondslots: make(chan struct{}, maxBondingPingPongs), @@ -91,6 +93,7 @@ func (tab *Table) Self() *Node { // Close terminates the network listener and flushes the seed cache. func (tab *Table) Close() { tab.net.close() + tab.db.close() } // Bootstrap sets the bootstrap nodes. These nodes are used to connect @@ -174,11 +177,10 @@ func (tab *Table) refresh() { result := tab.Lookup(randomID(tab.self.ID, ld)) if len(result) == 0 { - // Pick a batch of previously know seeds to lookup with and discard them (will come back if they are still live) - seeds := tab.cache.list(10) + // Pick a batch of previously know seeds to lookup with + seeds := tab.db.querySeeds(10) for _, seed := range seeds { - glog.V(logger.Debug).Infoln("Seeding network with:", seed) - tab.cache.delete(seed.ID) + glog.V(logger.Debug).Infoln("Seeding network with", seed) } // Bootstrap the table with a self lookup all := tab.bondall(append(tab.nursery, seeds...)) @@ -249,7 +251,7 @@ func (tab *Table) bondall(nodes []*Node) (result []*Node) { // of the process can be skipped. func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16) (*Node, error) { var n *Node - if n = tab.cache.get(id); n == nil { + if n = tab.db.node(id); n == nil { tab.bondmu.Lock() w := tab.bonding[id] if w != nil { @@ -282,8 +284,12 @@ func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16 } func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16) { + // Request a bonding slot to limit network usage <-tab.bondslots defer func() { tab.bondslots <- struct{}{} }() + + // Ping the remote side and wait for a pong + tab.db.updateLastPing(id, time.Now()) if w.err = tab.net.ping(id, addr); w.err != nil { close(w.done) return @@ -294,7 +300,15 @@ func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAdd // waitping will simply time out. tab.net.waitping(id) } - w.n = tab.cache.add(id, addr, tcpPort) + // Bonding succeeded, update the node database + w.n = &Node{ + ID: id, + IP: addr.IP, + DiscPort: addr.Port, + TCPPort: int(tcpPort), + } + tab.db.updateNode(w.n) + tab.db.updateLastBond(id, time.Now()) close(w.done) } diff --git a/p2p/discover/table_test.go b/p2p/discover/table_test.go index 8274731e3..e2bd3c8ad 100644 --- a/p2p/discover/table_test.go +++ b/p2p/discover/table_test.go @@ -15,7 +15,7 @@ import ( func TestTable_pingReplace(t *testing.T) { doit := func(newNodeIsResponding, lastInBucketIsResponding bool) { transport := newPingRecorder() - tab := newTable(transport, NodeID{}, &net.UDPAddr{}, nil) + tab := newTable(transport, NodeID{}, &net.UDPAddr{}, "") last := fillBucket(tab, 200) pingSender := randomID(tab.self.ID, 200) @@ -145,7 +145,7 @@ func TestTable_closest(t *testing.T) { test := func(test *closeTest) bool { // for any node table, Target and N - tab := newTable(nil, test.Self, &net.UDPAddr{}, nil) + tab := newTable(nil, test.Self, &net.UDPAddr{}, "") tab.add(test.All) // check that doClosest(Target, N) returns nodes @@ -217,7 +217,7 @@ func TestTable_Lookup(t *testing.T) { self := gen(NodeID{}, quickrand).(NodeID) target := randomID(self, 200) transport := findnodeOracle{t, target} - tab := newTable(transport, self, &net.UDPAddr{}, nil) + tab := newTable(transport, self, &net.UDPAddr{}, "") // lookup on empty table returns no nodes if results := tab.Lookup(target); len(results) > 0 { diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go index 6805fb686..65741b5f5 100644 --- a/p2p/discover/udp.go +++ b/p2p/discover/udp.go @@ -144,7 +144,7 @@ type reply struct { } // ListenUDP returns a new table that listens for UDP packets on laddr. -func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface, seeder *Cache) (*Table, error) { +func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface, nodeDBPath string) (*Table, error) { addr, err := net.ResolveUDPAddr("udp", laddr) if err != nil { return nil, err @@ -153,12 +153,12 @@ func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface, seeder if err != nil { return nil, err } - tab, _ := newUDP(priv, conn, natm, seeder) + tab, _ := newUDP(priv, conn, natm, nodeDBPath) glog.V(logger.Info).Infoln("Listening,", tab.self) return tab, nil } -func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface, seeder *Cache) (*Table, *udp) { +func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface, nodeDBPath string) (*Table, *udp) { udp := &udp{ conn: c, priv: priv, @@ -176,7 +176,7 @@ func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface, seeder *Cache) ( realaddr = &net.UDPAddr{IP: ext, Port: realaddr.Port} } } - udp.Table = newTable(udp, PubkeyID(&priv.PublicKey), realaddr, seeder) + udp.Table = newTable(udp, PubkeyID(&priv.PublicKey), realaddr, nodeDBPath) go udp.loop() go udp.readLoop() return udp.Table, udp @@ -449,7 +449,7 @@ func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte if expired(req.Expiration) { return errExpired } - if t.cache.get(fromID) == nil { + if t.db.node(fromID) == nil { // No bond exists, we don't process the packet. This prevents // an attack vector where the discovery protocol could be used // to amplify traffic in a DDOS attack. A malicious actor diff --git a/p2p/discover/udp_test.go b/p2p/discover/udp_test.go index 299f94543..47e04b85a 100644 --- a/p2p/discover/udp_test.go +++ b/p2p/discover/udp_test.go @@ -41,7 +41,7 @@ func newUDPTest(t *testing.T) *udpTest { remotekey: newkey(), remoteaddr: &net.UDPAddr{IP: net.IP{1, 2, 3, 4}, Port: 30303}, } - test.table, test.udp = newUDP(test.localkey, test.pipe, nil, nil) + test.table, test.udp = newUDP(test.localkey, test.pipe, nil, "") return test } @@ -157,8 +157,12 @@ func TestUDP_findnode(t *testing.T) { // ensure there's a bond with the test node, // findnode won't be accepted otherwise. - test.table.cache.add(PubkeyID(&test.remotekey.PublicKey), test.remoteaddr, 99) - + test.table.db.updateNode(&Node{ + ID: PubkeyID(&test.remotekey.PublicKey), + IP: test.remoteaddr.IP, + DiscPort: test.remoteaddr.Port, + TCPPort: 99, + }) // check that closest neighbors are returned. test.packetIn(nil, findnodePacket, &findnode{Target: testTarget, Expiration: futureExp}) test.waitPacketOut(func(p *neighbors) { diff --git a/p2p/server.go b/p2p/server.go index 5f1b80f51..5c5883ae8 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -59,9 +59,9 @@ type Server struct { // with the rest of the network. BootstrapNodes []*discover.Node - // SeedCache is the database containing the previously seen live nodes in - // the network to use as potential bootstrap seeds. - SeedCache *discover.Cache + // NodeDatabase is the path to the database containing the previously seen + // live nodes in the network. + NodeDatabase string // Protocols should contain the protocols supported // by the server. Matching protocols are launched for @@ -201,7 +201,7 @@ func (srv *Server) Start() (err error) { } // node table - ntab, err := discover.ListenUDP(srv.PrivateKey, srv.ListenAddr, srv.NAT, srv.SeedCache) + ntab, err := discover.ListenUDP(srv.PrivateKey, srv.ListenAddr, srv.NAT, srv.NodeDatabase) if err != nil { return err } From 0201c04b95df3224e5fe1a3a591dba95ab8030dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 27 Apr 2015 10:19:16 +0300 Subject: [PATCH 07/14] p2p/discovery: fix issues raised in the nodeDb PR --- p2p/discover/database.go | 63 +++++++++++++++++++--------------------- p2p/discover/table.go | 5 ++-- 2 files changed, 32 insertions(+), 36 deletions(-) diff --git a/p2p/discover/database.go b/p2p/discover/database.go index 93a2ded24..ece0618ff 100644 --- a/p2p/discover/database.go +++ b/p2p/discover/database.go @@ -70,7 +70,10 @@ func newPersistentNodeDB(path string) (*nodeDB, error) { switch err { case leveldb.ErrNotFound: // Version not found (i.e. empty cache), insert it - err = db.Put(nodeDBVersionKey, currentVer, nil) + if err := db.Put(nodeDBVersionKey, currentVer, nil); err != nil { + db.Close() + return nil, err + } case nil: // Version present, flush if different @@ -82,22 +85,17 @@ func newPersistentNodeDB(path string) (*nodeDB, error) { return newPersistentNodeDB(path) } } - // Clean up in case of an error - if err != nil { - db.Close() - return nil, err - } return &nodeDB{lvl: db}, nil } -// key generates the leveldb key-blob from a node id and its particular field of -// interest. -func (db *nodeDB) key(id NodeID, field string) []byte { +// makeKey generates the leveldb key-blob from a node id and its particular +// field of interest. +func makeKey(id NodeID, field string) []byte { return append(nodeDBItemPrefix, append(id[:], field...)...) } // splitKey tries to split a database key into a node id and a field part. -func (db *nodeDB) splitKey(key []byte) (id NodeID, field string) { +func splitKey(key []byte) (id NodeID, field string) { // If the key is not of a node, return it plainly if !bytes.HasPrefix(key, nodeDBItemPrefix) { return NodeID{}, string(key) @@ -110,27 +108,26 @@ func (db *nodeDB) splitKey(key []byte) (id NodeID, field string) { return id, field } -// fetchTime retrieves a time instance (encoded as a unix timestamp) associated -// with a particular database key. -func (db *nodeDB) fetchTime(key []byte) time.Time { +// fetchInt64 retrieves an integer instance associated with a particular +// database key. +func (db *nodeDB) fetchInt64(key []byte) int64 { blob, err := db.lvl.Get(key, nil) if err != nil { - return time.Time{} + return 0 } - var unix int64 - if err := rlp.DecodeBytes(blob, &unix); err != nil { - return time.Time{} + val, read := binary.Varint(blob) + if read <= 0 { + return 0 } - return time.Unix(unix, 0) + return val } -// storeTime update a specific database entry to the current time instance as a +// storeInt64 update a specific database entry to the current time instance as a // unix timestamp. -func (db *nodeDB) storeTime(key []byte, instance time.Time) error { - blob, err := rlp.EncodeToBytes(instance.Unix()) - if err != nil { - return err - } +func (db *nodeDB) storeInt64(key []byte, n int64) error { + blob := make([]byte, binary.MaxVarintLen64) + blob = blob[:binary.PutVarint(blob, n)] + return db.lvl.Put(key, blob, nil) } @@ -138,17 +135,17 @@ func (db *nodeDB) storeTime(key []byte, instance time.Time) error { // purpose is to prevent contacting potential seed nodes multiple times in the // same boot cycle. func (db *nodeDB) startup() time.Time { - return db.fetchTime(nodeDBStartupKey) + return time.Unix(db.fetchInt64(nodeDBStartupKey), 0) } // updateStartup updates the bootstrap initiation time to the one specified. func (db *nodeDB) updateStartup(instance time.Time) error { - return db.storeTime(nodeDBStartupKey, instance) + return db.storeInt64(nodeDBStartupKey, instance.Unix()) } // node retrieves a node with a given id from the database. func (db *nodeDB) node(id NodeID) *Node { - blob, err := db.lvl.Get(db.key(id, nodeDBDiscoverRoot), nil) + blob, err := db.lvl.Get(makeKey(id, nodeDBDiscoverRoot), nil) if err != nil { return nil } @@ -165,28 +162,28 @@ func (db *nodeDB) updateNode(node *Node) error { if err != nil { return err } - return db.lvl.Put(db.key(node.ID, nodeDBDiscoverRoot), blob, nil) + return db.lvl.Put(makeKey(node.ID, nodeDBDiscoverRoot), blob, nil) } // lastPing retrieves the time of the last ping packet send to a remote node, // requesting binding. func (db *nodeDB) lastPing(id NodeID) time.Time { - return db.fetchTime(db.key(id, nodeDBDiscoverPing)) + return time.Unix(db.fetchInt64(makeKey(id, nodeDBDiscoverPing)), 0) } // updateLastPing updates the last time we tried contacting a remote node. func (db *nodeDB) updateLastPing(id NodeID, instance time.Time) error { - return db.storeTime(db.key(id, nodeDBDiscoverPing), instance) + return db.storeInt64(makeKey(id, nodeDBDiscoverPing), instance.Unix()) } // lastBond retrieves the time of the last successful bonding with a remote node. func (db *nodeDB) lastBond(id NodeID) time.Time { - return db.fetchTime(db.key(id, nodeDBDiscoverBond)) + return time.Unix(db.fetchInt64(makeKey(id, nodeDBDiscoverBond)), 0) } // updateLastBond updates the last time we successfully bound to a remote node. func (db *nodeDB) updateLastBond(id NodeID, instance time.Time) error { - return db.storeTime(db.key(id, nodeDBDiscoverBond), instance) + return db.storeInt64(makeKey(id, nodeDBDiscoverBond), instance.Unix()) } // querySeeds retrieves a batch of nodes to be used as potential seed servers @@ -208,7 +205,7 @@ func (db *nodeDB) querySeeds(n int) []*Node { nodes := make([]*Node, 0, n) for len(nodes) < n && it.Next() { // Iterate until a discovery node is found - id, field := db.splitKey(it.Key()) + id, field := splitKey(it.Key()) if field != nodeDBDiscoverRoot { continue } diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 891bbfd05..e21d9f2f4 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -62,13 +62,12 @@ type bucket struct { } func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string) *Table { - // If no seed cache was given, use an in-memory one + // If no node database was given, use an in-memory one db, err := newNodeDB(nodeDBPath) if err != nil { glog.V(logger.Warn).Infoln("Failed to open node database:", err) db, _ = newNodeDB("") } - // Create the bootstrap table tab := &Table{ net: t, db: db, @@ -90,7 +89,7 @@ func (tab *Table) Self() *Node { return tab.self } -// Close terminates the network listener and flushes the seed cache. +// Close terminates the network listener and flushes the node database. func (tab *Table) Close() { tab.net.close() tab.db.close() From 8de8f61d369b5830f818eddf446e8368ecf11f0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 27 Apr 2015 12:33:06 +0300 Subject: [PATCH 08/14] p2p/discover: write the basic tests, catch RLP bug --- p2p/discover/database.go | 20 +++-- p2p/discover/database_test.go | 136 ++++++++++++++++++++++++++++++++++ p2p/discover/table.go | 2 +- 3 files changed, 150 insertions(+), 8 deletions(-) create mode 100644 p2p/discover/database_test.go diff --git a/p2p/discover/database.go b/p2p/discover/database.go index ece0618ff..c6f70972e 100644 --- a/p2p/discover/database.go +++ b/p2p/discover/database.go @@ -14,6 +14,9 @@ import ( "github.com/syndtr/goleveldb/leveldb/storage" ) +// Special node ID to use as a nil element. +var nodeDBNilNodeID = NodeID{} + // nodeDB stores all nodes we know about. type nodeDB struct { lvl *leveldb.DB @@ -27,7 +30,7 @@ var ( nodeDBDiscoverRoot = ":discover" nodeDBDiscoverPing = nodeDBDiscoverRoot + ":lastping" - nodeDBDiscoverBond = nodeDBDiscoverRoot + ":lastbond" + nodeDBDiscoverPong = nodeDBDiscoverRoot + ":lastpong" ) // newNodeDB creates a new node database for storing and retrieving infos about @@ -91,6 +94,9 @@ func newPersistentNodeDB(path string) (*nodeDB, error) { // makeKey generates the leveldb key-blob from a node id and its particular // field of interest. func makeKey(id NodeID, field string) []byte { + if bytes.Equal(id[:], nodeDBNilNodeID[:]) { + return []byte(field) + } return append(nodeDBItemPrefix, append(id[:], field...)...) } @@ -176,14 +182,14 @@ func (db *nodeDB) updateLastPing(id NodeID, instance time.Time) error { return db.storeInt64(makeKey(id, nodeDBDiscoverPing), instance.Unix()) } -// lastBond retrieves the time of the last successful bonding with a remote node. -func (db *nodeDB) lastBond(id NodeID) time.Time { - return time.Unix(db.fetchInt64(makeKey(id, nodeDBDiscoverBond)), 0) +// lastPong retrieves the time of the last successful contact from remote node. +func (db *nodeDB) lastPong(id NodeID) time.Time { + return time.Unix(db.fetchInt64(makeKey(id, nodeDBDiscoverPong)), 0) } -// updateLastBond updates the last time we successfully bound to a remote node. -func (db *nodeDB) updateLastBond(id NodeID, instance time.Time) error { - return db.storeInt64(makeKey(id, nodeDBDiscoverBond), instance.Unix()) +// updateLastPong updates the last time a remote node successfully contacted. +func (db *nodeDB) updateLastPong(id NodeID, instance time.Time) error { + return db.storeInt64(makeKey(id, nodeDBDiscoverPong), instance.Unix()) } // querySeeds retrieves a batch of nodes to be used as potential seed servers diff --git a/p2p/discover/database_test.go b/p2p/discover/database_test.go new file mode 100644 index 000000000..580b13d41 --- /dev/null +++ b/p2p/discover/database_test.go @@ -0,0 +1,136 @@ +package discover + +import ( + "bytes" + "net" + "testing" + "time" +) + +var nodeDBKeyTests = []struct { + id NodeID + field string + key []byte +}{ + { + id: NodeID{}, + field: "version", + key: []byte{0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e}, // field + }, + { + id: MustHexID("0x1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), + field: ":discover", + key: []byte{0x6e, 0x3a, // prefix + 0x1d, 0xd9, 0xd6, 0x5c, 0x45, 0x52, 0xb5, 0xeb, // node id + 0x43, 0xd5, 0xad, 0x55, 0xa2, 0xee, 0x3f, 0x56, // + 0xc6, 0xcb, 0xc1, 0xc6, 0x4a, 0x5c, 0x8d, 0x65, // + 0x9f, 0x51, 0xfc, 0xd5, 0x1b, 0xac, 0xe2, 0x43, // + 0x51, 0x23, 0x2b, 0x8d, 0x78, 0x21, 0x61, 0x7d, // + 0x2b, 0x29, 0xb5, 0x4b, 0x81, 0xcd, 0xef, 0xb9, // + 0xb3, 0xe9, 0xc3, 0x7d, 0x7f, 0xd5, 0xf6, 0x32, // + 0x70, 0xbc, 0xc9, 0xe1, 0xa6, 0xf6, 0xa4, 0x39, // + 0x3a, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, // field + }, + }, +} + +func TestNodeDBKeys(t *testing.T) { + for i, tt := range nodeDBKeyTests { + if key := makeKey(tt.id, tt.field); !bytes.Equal(key, tt.key) { + t.Errorf("make test %d: key mismatch: have 0x%x, want 0x%x", i, key, tt.key) + } + id, field := splitKey(tt.key) + if !bytes.Equal(id[:], tt.id[:]) { + t.Errorf("split test %d: id mismatch: have 0x%x, want 0x%x", i, id, tt.id) + } + if field != tt.field { + t.Errorf("split test %d: field mismatch: have 0x%x, want 0x%x", i, field, tt.field) + } + } +} + +var nodeDBInt64Tests = []struct { + key []byte + value int64 +}{ + {key: []byte{0x01}, value: 1}, + {key: []byte{0x02}, value: 2}, + {key: []byte{0x03}, value: 3}, +} + +func TestNodeDBInt64(t *testing.T) { + db, _ := newNodeDB("") + + tests := nodeDBInt64Tests + for i := 0; i < len(tests); i++ { + // Insert the next value + if err := db.storeInt64(tests[i].key, tests[i].value); err != nil { + t.Errorf("test %d: failed to store value: %v", i, err) + } + // Check all existing and non existing values + for j := 0; j < len(tests); j++ { + num := db.fetchInt64(tests[j].key) + switch { + case j <= i && num != tests[j].value: + t.Errorf("test %d, item %d: value mismatch: have %v, want %v", i, j, num, tests[j].value) + case j > i && num != 0: + t.Errorf("test %d, item %d: value mismatch: have %v, want %v", i, j, num, 0) + } + } + } +} + +func TestNodeDBFetchStore(t *testing.T) { + node := &Node{ + ID: MustHexID("0x1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), + IP: net.IP([]byte{192, 168, 0, 1}), + DiscPort: 31313, + TCPPort: 30303, + } + inst := time.Now() + db, _ := newNodeDB("") + + // Check fetch/store operations on the startup object + if stored := db.startup(); stored.Unix() != 0 { + t.Errorf("startup: non-existing object: %v", stored) + } + if err := db.updateStartup(inst); err != nil { + t.Errorf("startup: failed to update: %v", err) + } + if stored := db.startup(); stored.Unix() != inst.Unix() { + t.Errorf("startup: value mismatch: have %v, want %v", stored, inst) + } + // Check fetch/store operations on a node ping object + if stored := db.lastPing(node.ID); stored.Unix() != 0 { + t.Errorf("ping: non-existing object: %v", stored) + } + if err := db.updateLastPing(node.ID, inst); err != nil { + t.Errorf("ping: failed to update: %v", err) + } + if stored := db.lastPing(node.ID); stored.Unix() != inst.Unix() { + t.Errorf("ping: value mismatch: have %v, want %v", stored, inst) + } + // Check fetch/store operations on a node pong object + if stored := db.lastPong(node.ID); stored.Unix() != 0 { + t.Errorf("pong: non-existing object: %v", stored) + } + if err := db.updateLastPong(node.ID, inst); err != nil { + t.Errorf("pong: failed to update: %v", err) + } + if stored := db.lastPong(node.ID); stored.Unix() != inst.Unix() { + t.Errorf("pong: value mismatch: have %v, want %v", stored, inst) + } + // Check fetch/store operations on an actual node object + if stored := db.node(node.ID); stored != nil { + t.Errorf("node: non-existing object: %v", stored) + } + if err := db.updateNode(node); err != nil { + t.Errorf("node: failed to update: %v", err) + } + if stored := db.node(node.ID); stored == nil { + t.Errorf("node: not found") + } else if !bytes.Equal(stored.ID[:], node.ID[:]) || !bytes.Equal(stored.IP, node.IP) || + stored.DiscPort != node.DiscPort || stored.TCPPort != node.TCPPort { + t.Errorf("node: data mismatch: have %v, want %v", stored, node) + } +} diff --git a/p2p/discover/table.go b/p2p/discover/table.go index e21d9f2f4..4058c7bb7 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -307,7 +307,7 @@ func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAdd TCPPort: int(tcpPort), } tab.db.updateNode(w.n) - tab.db.updateLastBond(id, time.Now()) + tab.db.updateLastPong(id, time.Now()) close(w.done) } From 85b4b44235aac41b95c34dbc4de7de91a597ec3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 27 Apr 2015 14:45:35 +0300 Subject: [PATCH 09/14] p2p/discover: use iterator based seeding, drop old protocol test --- p2p/discover/database.go | 52 +++++++--------- p2p/discover/database_test.go | 111 +++++++++++++++++++++++++++++----- 2 files changed, 118 insertions(+), 45 deletions(-) diff --git a/p2p/discover/database.go b/p2p/discover/database.go index c6f70972e..d03bf0ab5 100644 --- a/p2p/discover/database.go +++ b/p2p/discover/database.go @@ -9,8 +9,11 @@ import ( "os" "time" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/rlp" "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/storage" ) @@ -19,13 +22,13 @@ var nodeDBNilNodeID = NodeID{} // nodeDB stores all nodes we know about. type nodeDB struct { - lvl *leveldb.DB + lvl *leveldb.DB // Interface to the database itself + seeder iterator.Iterator // Iterator for fetching possible seed nodes } // Schema layout for the node database var ( nodeDBVersionKey = []byte("version") // Version of the database to flush if changes - nodeDBStartupKey = []byte("startup") // Time when the node discovery started (seed selection) nodeDBItemPrefix = []byte("n:") // Identifier to prefix node entries with nodeDBDiscoverRoot = ":discover" @@ -137,26 +140,16 @@ func (db *nodeDB) storeInt64(key []byte, n int64) error { return db.lvl.Put(key, blob, nil) } -// startup retrieves the time instance when the bootstrapping last begun. Its -// purpose is to prevent contacting potential seed nodes multiple times in the -// same boot cycle. -func (db *nodeDB) startup() time.Time { - return time.Unix(db.fetchInt64(nodeDBStartupKey), 0) -} - -// updateStartup updates the bootstrap initiation time to the one specified. -func (db *nodeDB) updateStartup(instance time.Time) error { - return db.storeInt64(nodeDBStartupKey, instance.Unix()) -} - // node retrieves a node with a given id from the database. func (db *nodeDB) node(id NodeID) *Node { blob, err := db.lvl.Get(makeKey(id, nodeDBDiscoverRoot), nil) if err != nil { + glog.V(logger.Warn).Infof("failed to retrieve node: %v", err) return nil } node := new(Node) if err := rlp.DecodeBytes(blob, node); err != nil { + glog.V(logger.Warn).Infof("failed to decode node RLP: %v", err) return nil } return node @@ -203,34 +196,35 @@ func (db *nodeDB) updateLastPong(id NodeID, instance time.Time) error { // If the database runs out of potential seeds, we restart the startup counter // and start iterating over the peers again. func (db *nodeDB) querySeeds(n int) []*Node { - startup := db.startup() - - it := db.lvl.NewIterator(nil, nil) - defer it.Release() - + // Create a new seed iterator if none exists + if db.seeder == nil { + db.seeder = db.lvl.NewIterator(nil, nil) + } + // Iterate over the nodes and find suitable seeds nodes := make([]*Node, 0, n) - for len(nodes) < n && it.Next() { + for len(nodes) < n && db.seeder.Next() { // Iterate until a discovery node is found - id, field := splitKey(it.Key()) + id, field := splitKey(db.seeder.Key()) if field != nodeDBDiscoverRoot { continue } - // Retrieve the last ping time, and if older than startup, query - lastPing := db.lastPing(id) - if lastPing.Before(startup) { - if node := db.node(id); node != nil { - nodes = append(nodes, node) - } + // Load it as a potential seed + if node := db.node(id); node != nil { + nodes = append(nodes, node) } } - // Reset the startup time if no seeds were found + // Release the iterator if we reached the end if len(nodes) == 0 { - db.updateStartup(time.Now()) + db.seeder.Release() + db.seeder = nil } return nodes } // close flushes and closes the database files. func (db *nodeDB) close() { + if db.seeder != nil { + db.seeder.Release() + } db.lvl.Close() } diff --git a/p2p/discover/database_test.go b/p2p/discover/database_test.go index 580b13d41..b067d458d 100644 --- a/p2p/discover/database_test.go +++ b/p2p/discover/database_test.go @@ -82,24 +82,13 @@ func TestNodeDBInt64(t *testing.T) { func TestNodeDBFetchStore(t *testing.T) { node := &Node{ - ID: MustHexID("0x1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), - IP: net.IP([]byte{192, 168, 0, 1}), - DiscPort: 31313, - TCPPort: 30303, + ID: MustHexID("0x1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), + IP: net.IP([]byte{192, 168, 0, 1}), + TCPPort: 30303, } inst := time.Now() db, _ := newNodeDB("") - // Check fetch/store operations on the startup object - if stored := db.startup(); stored.Unix() != 0 { - t.Errorf("startup: non-existing object: %v", stored) - } - if err := db.updateStartup(inst); err != nil { - t.Errorf("startup: failed to update: %v", err) - } - if stored := db.startup(); stored.Unix() != inst.Unix() { - t.Errorf("startup: value mismatch: have %v, want %v", stored, inst) - } // Check fetch/store operations on a node ping object if stored := db.lastPing(node.ID); stored.Unix() != 0 { t.Errorf("ping: non-existing object: %v", stored) @@ -129,8 +118,98 @@ func TestNodeDBFetchStore(t *testing.T) { } if stored := db.node(node.ID); stored == nil { t.Errorf("node: not found") - } else if !bytes.Equal(stored.ID[:], node.ID[:]) || !bytes.Equal(stored.IP, node.IP) || - stored.DiscPort != node.DiscPort || stored.TCPPort != node.TCPPort { + } else if !bytes.Equal(stored.ID[:], node.ID[:]) || !stored.IP.Equal(node.IP) || stored.TCPPort != node.TCPPort { t.Errorf("node: data mismatch: have %v, want %v", stored, node) } } + +var nodeDBSeedQueryNodes = []struct { + node Node + pong time.Time +}{ + { + node: Node{ + ID: MustHexID("0x01d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), + IP: []byte{127, 0, 0, 1}, + }, + pong: time.Now().Add(-2 * time.Second), + }, + { + node: Node{ + ID: MustHexID("0x02d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), + IP: []byte{127, 0, 0, 2}, + }, + pong: time.Now().Add(-3 * time.Second), + }, + { + node: Node{ + ID: MustHexID("0x03d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), + IP: []byte{127, 0, 0, 3}, + }, + pong: time.Now().Add(-1 * time.Second), + }, +} + +func TestNodeDBSeedQuery(t *testing.T) { + db, _ := newNodeDB("") + + // Insert a batch of nodes for querying + for i, seed := range nodeDBSeedQueryNodes { + if err := db.updateNode(&seed.node); err != nil { + t.Fatalf("node %d: failed to insert: %v", i, err) + } + } + // Retrieve the entire batch and check for duplicates + seeds := db.querySeeds(2 * len(nodeDBSeedQueryNodes)) + if len(seeds) != len(nodeDBSeedQueryNodes) { + t.Errorf("seed count mismatch: have %v, want %v", len(seeds), len(nodeDBSeedQueryNodes)) + } + have := make(map[NodeID]struct{}) + for _, seed := range seeds { + have[seed.ID] = struct{}{} + } + want := make(map[NodeID]struct{}) + for _, seed := range nodeDBSeedQueryNodes { + want[seed.node.ID] = struct{}{} + } + for id, _ := range have { + if _, ok := want[id]; !ok { + t.Errorf("extra seed: %v", id) + } + } + for id, _ := range want { + if _, ok := have[id]; !ok { + t.Errorf("missing seed: %v", id) + } + } + // Make sure the next batch is empty (seed EOF) + seeds = db.querySeeds(2 * len(nodeDBSeedQueryNodes)) + if len(seeds) != 0 { + t.Errorf("seed count mismatch: have %v, want %v", len(seeds), 0) + } +} + +func TestNodeDBSeedQueryContinuation(t *testing.T) { + db, _ := newNodeDB("") + + // Insert a batch of nodes for querying + for i, seed := range nodeDBSeedQueryNodes { + if err := db.updateNode(&seed.node); err != nil { + t.Fatalf("node %d: failed to insert: %v", i, err) + } + } + // Iteratively retrieve the batch, checking for an empty batch on reset + for i := 0; i < len(nodeDBSeedQueryNodes); i++ { + if seeds := db.querySeeds(1); len(seeds) != 1 { + t.Errorf("1st iteration %d: seed count mismatch: have %v, want %v", i, len(seeds), 1) + } + } + if seeds := db.querySeeds(1); len(seeds) != 0 { + t.Errorf("reset: seed count mismatch: have %v, want %v", len(seeds), 0) + } + for i := 0; i < len(nodeDBSeedQueryNodes); i++ { + if seeds := db.querySeeds(1); len(seeds) != 1 { + t.Errorf("2nd iteration %d: seed count mismatch: have %v, want %v", i, len(seeds), 1) + } + } +} From 706da56f751a6ec98c9f2f29bebc66dffe8d1e2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 27 Apr 2015 14:56:42 +0300 Subject: [PATCH 10/14] p2p/discover: wrap the pinger to update the node db too --- p2p/discover/table.go | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 4058c7bb7..ecfb8d672 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -288,8 +288,7 @@ func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAdd defer func() { tab.bondslots <- struct{}{} }() // Ping the remote side and wait for a pong - tab.db.updateLastPing(id, time.Now()) - if w.err = tab.net.ping(id, addr); w.err != nil { + if w.err = tab.ping(id, addr); w.err != nil { close(w.done) return } @@ -307,14 +306,13 @@ func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAdd TCPPort: int(tcpPort), } tab.db.updateNode(w.n) - tab.db.updateLastPong(id, time.Now()) close(w.done) } func (tab *Table) pingreplace(new *Node, b *bucket) { if len(b.entries) == bucketSize { oldest := b.entries[bucketSize-1] - if err := tab.net.ping(oldest.ID, oldest.addr()); err == nil { + if err := tab.ping(oldest.ID, oldest.addr()); err == nil { // The node responded, we don't need to replace it. return } @@ -327,6 +325,19 @@ func (tab *Table) pingreplace(new *Node, b *bucket) { b.entries[0] = new } +// ping a remote endpoint and wait for a reply, also updating the node database +// accordingly. +func (tab *Table) ping(id NodeID, addr *net.UDPAddr) error { + // Update the last ping and send the message + tab.db.updateLastPing(id, time.Now()) + if err := tab.net.ping(id, addr); err != nil { + return err + } + // Pong received, update the database and return + tab.db.updateLastPong(id, time.Now()) + return nil +} + // add puts the entries into the table if their corresponding // bucket is not full. The caller must hold tab.mutex. func (tab *Table) add(entries []*Node) { From 75fd738dea37387e465e8fc05b163adfb5ab34ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 27 Apr 2015 15:06:31 +0300 Subject: [PATCH 11/14] p2p/discover: drop a superfluous warning --- p2p/discover/database.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/discover/database.go b/p2p/discover/database.go index d03bf0ab5..74c9c6af3 100644 --- a/p2p/discover/database.go +++ b/p2p/discover/database.go @@ -144,7 +144,7 @@ func (db *nodeDB) storeInt64(key []byte, n int64) error { func (db *nodeDB) node(id NodeID) *Node { blob, err := db.lvl.Get(makeKey(id, nodeDBDiscoverRoot), nil) if err != nil { - glog.V(logger.Warn).Infof("failed to retrieve node: %v", err) + glog.V(logger.Detail).Infof("failed to retrieve node %v: %v", id, err) return nil } node := new(Node) From a136e2bb222ed0eaa9b5e5a31a07fcc664de8eb7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 27 Apr 2015 15:28:17 +0300 Subject: [PATCH 12/14] p2p/discover: parametrize nodedb version, add persistency tests --- p2p/discover/database.go | 10 +++--- p2p/discover/database_test.go | 59 ++++++++++++++++++++++++++++++++--- p2p/discover/table.go | 4 +-- 3 files changed, 62 insertions(+), 11 deletions(-) diff --git a/p2p/discover/database.go b/p2p/discover/database.go index 74c9c6af3..b7c0c0498 100644 --- a/p2p/discover/database.go +++ b/p2p/discover/database.go @@ -39,11 +39,11 @@ var ( // newNodeDB creates a new node database for storing and retrieving infos about // known peers in the network. If no path is given, an in-memory, temporary // database is constructed. -func newNodeDB(path string) (*nodeDB, error) { +func newNodeDB(path string, version int) (*nodeDB, error) { if path == "" { return newMemoryNodeDB() } - return newPersistentNodeDB(path) + return newPersistentNodeDB(path, version) } // newMemoryNodeDB creates a new in-memory node database without a persistent @@ -58,7 +58,7 @@ func newMemoryNodeDB() (*nodeDB, error) { // newPersistentNodeDB creates/opens a leveldb backed persistent node database, // also flushing its contents in case of a version mismatch. -func newPersistentNodeDB(path string) (*nodeDB, error) { +func newPersistentNodeDB(path string, version int) (*nodeDB, error) { // Try to open the cache, recovering any corruption db, err := leveldb.OpenFile(path, nil) if _, iscorrupted := err.(leveldb.ErrCorrupted); iscorrupted { @@ -70,7 +70,7 @@ func newPersistentNodeDB(path string) (*nodeDB, error) { // The nodes contained in the cache correspond to a certain protocol version. // Flush all nodes if the version doesn't match. currentVer := make([]byte, binary.MaxVarintLen64) - currentVer = currentVer[:binary.PutVarint(currentVer, Version)] + currentVer = currentVer[:binary.PutVarint(currentVer, int64(version))] blob, err := db.Get(nodeDBVersionKey, nil) switch err { @@ -88,7 +88,7 @@ func newPersistentNodeDB(path string) (*nodeDB, error) { if err = os.RemoveAll(path); err != nil { return nil, err } - return newPersistentNodeDB(path) + return newPersistentNodeDB(path, version) } } return &nodeDB{lvl: db}, nil diff --git a/p2p/discover/database_test.go b/p2p/discover/database_test.go index b067d458d..0412a4770 100644 --- a/p2p/discover/database_test.go +++ b/p2p/discover/database_test.go @@ -2,7 +2,10 @@ package discover import ( "bytes" + "io/ioutil" "net" + "os" + "path/filepath" "testing" "time" ) @@ -59,7 +62,8 @@ var nodeDBInt64Tests = []struct { } func TestNodeDBInt64(t *testing.T) { - db, _ := newNodeDB("") + db, _ := newNodeDB("", Version) + defer db.close() tests := nodeDBInt64Tests for i := 0; i < len(tests); i++ { @@ -87,7 +91,9 @@ func TestNodeDBFetchStore(t *testing.T) { TCPPort: 30303, } inst := time.Now() - db, _ := newNodeDB("") + + db, _ := newNodeDB("", Version) + defer db.close() // Check fetch/store operations on a node ping object if stored := db.lastPing(node.ID); stored.Unix() != 0 { @@ -151,7 +157,8 @@ var nodeDBSeedQueryNodes = []struct { } func TestNodeDBSeedQuery(t *testing.T) { - db, _ := newNodeDB("") + db, _ := newNodeDB("", Version) + defer db.close() // Insert a batch of nodes for querying for i, seed := range nodeDBSeedQueryNodes { @@ -190,7 +197,8 @@ func TestNodeDBSeedQuery(t *testing.T) { } func TestNodeDBSeedQueryContinuation(t *testing.T) { - db, _ := newNodeDB("") + db, _ := newNodeDB("", Version) + defer db.close() // Insert a batch of nodes for querying for i, seed := range nodeDBSeedQueryNodes { @@ -213,3 +221,46 @@ func TestNodeDBSeedQueryContinuation(t *testing.T) { } } } + +func TestNodeDBPersistency(t *testing.T) { + root, err := ioutil.TempDir("", "nodedb-") + if err != nil { + t.Fatalf("failed to create temporary data folder: %v", err) + } + defer os.RemoveAll(root) + + var ( + testKey = []byte("somekey") + testInt = int64(314) + ) + + // Create a persistent database and store some values + db, err := newNodeDB(filepath.Join("root", "database"), Version) + if err != nil { + t.Fatalf("failed to create persistent database: %v", err) + } + if err := db.storeInt64(testKey, testInt); err != nil { + t.Fatalf("failed to store value: %v.", err) + } + db.close() + + // Reopen the database and check the value + db, err = newNodeDB(filepath.Join("root", "database"), Version) + if err != nil { + t.Fatalf("failed to open persistent database: %v", err) + } + if val := db.fetchInt64(testKey); val != testInt { + t.Fatalf("value mismatch: have %v, want %v", val, testInt) + } + db.close() + + // Change the database version and check flush + db, err = newNodeDB(filepath.Join("root", "database"), Version+1) + if err != nil { + t.Fatalf("failed to open persistent database: %v", err) + } + if val := db.fetchInt64(testKey); val != 0 { + t.Fatalf("value mismatch: have %v, want %v", val, 0) + } + db.close() +} diff --git a/p2p/discover/table.go b/p2p/discover/table.go index ecfb8d672..11bdff198 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -63,10 +63,10 @@ type bucket struct { func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string) *Table { // If no node database was given, use an in-memory one - db, err := newNodeDB(nodeDBPath) + db, err := newNodeDB(nodeDBPath, Version) if err != nil { glog.V(logger.Warn).Infoln("Failed to open node database:", err) - db, _ = newNodeDB("") + db, _ = newNodeDB("", Version) } tab := &Table{ net: t, From 437cf4b3acf1b5a4efde64aacaacdf14289010d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 27 Apr 2015 17:38:28 +0300 Subject: [PATCH 13/14] p2p/discover: add node expirer and related tests --- p2p/discover/database.go | 71 +++++++++++++++++++++++++++++++++-- p2p/discover/database_test.go | 47 +++++++++++++++++++++++ p2p/discover/table.go | 2 + 3 files changed, 116 insertions(+), 4 deletions(-) diff --git a/p2p/discover/database.go b/p2p/discover/database.go index b7c0c0498..48539a6c9 100644 --- a/p2p/discover/database.go +++ b/p2p/discover/database.go @@ -7,6 +7,7 @@ import ( "bytes" "encoding/binary" "os" + "sync" "time" "github.com/ethereum/go-ethereum/logger" @@ -17,13 +18,19 @@ import ( "github.com/syndtr/goleveldb/leveldb/storage" ) -// Special node ID to use as a nil element. -var nodeDBNilNodeID = NodeID{} +var ( + nodeDBNilNodeID = NodeID{} // Special node ID to use as a nil element. + nodeDBNodeExpiration = 24 * time.Hour // Time after which an unseen node should be dropped. + nodeDBCleanupCycle = time.Hour // Time period for running the expiration task. +) // nodeDB stores all nodes we know about. type nodeDB struct { lvl *leveldb.DB // Interface to the database itself seeder iterator.Iterator // Iterator for fetching possible seed nodes + + runner sync.Once // Ensures we can start at most one expirer + quit chan struct{} // Channel to signal the expiring thread to stop } // Schema layout for the node database @@ -53,7 +60,10 @@ func newMemoryNodeDB() (*nodeDB, error) { if err != nil { return nil, err } - return &nodeDB{lvl: db}, nil + return &nodeDB{ + lvl: db, + quit: make(chan struct{}), + }, nil } // newPersistentNodeDB creates/opens a leveldb backed persistent node database, @@ -91,7 +101,10 @@ func newPersistentNodeDB(path string, version int) (*nodeDB, error) { return newPersistentNodeDB(path, version) } } - return &nodeDB{lvl: db}, nil + return &nodeDB{ + lvl: db, + quit: make(chan struct{}), + }, nil } // makeKey generates the leveldb key-blob from a node id and its particular @@ -164,6 +177,55 @@ func (db *nodeDB) updateNode(node *Node) error { return db.lvl.Put(makeKey(node.ID, nodeDBDiscoverRoot), blob, nil) } +// expirer should be started in a go routine, and is responsible for looping ad +// infinitum and dropping stale data from the database. +func (db *nodeDB) expirer() { + db.runner.Do(func() { + tick := time.Tick(nodeDBCleanupCycle) + for { + select { + case <-tick: + if err := db.expireNodes(); err != nil { + glog.V(logger.Error).Infof("Failed to expire nodedb items: %v", err) + } + + case <-db.quit: + return + } + } + }) +} + +// expireNodes iterates over the database and deletes all nodes that have not +// been seen (i.e. received a pong from) for some alloted time. +func (db *nodeDB) expireNodes() error { + threshold := time.Now().Add(-nodeDBNodeExpiration) + + // Find discovered nodes that are older than the allowance + it := db.lvl.NewIterator(nil, nil) + defer it.Release() + + for it.Next() { + // Skip the item if not a discovery node + id, field := splitKey(it.Key()) + if field != nodeDBDiscoverRoot { + continue + } + // Skip the node if not expired yet + if seen := db.lastPong(id); seen.After(threshold) { + continue + } + // Otherwise delete all associated information + prefix := makeKey(id, "") + for ok := it.Seek(prefix); ok && bytes.HasPrefix(it.Key(), prefix); ok = it.Next() { + if err := db.lvl.Delete(it.Key(), nil); err != nil { + return err + } + } + } + return nil +} + // lastPing retrieves the time of the last ping packet send to a remote node, // requesting binding. func (db *nodeDB) lastPing(id NodeID) time.Time { @@ -226,5 +288,6 @@ func (db *nodeDB) close() { if db.seeder != nil { db.seeder.Release() } + close(db.quit) db.lvl.Close() } diff --git a/p2p/discover/database_test.go b/p2p/discover/database_test.go index 0412a4770..f327cf73b 100644 --- a/p2p/discover/database_test.go +++ b/p2p/discover/database_test.go @@ -264,3 +264,50 @@ func TestNodeDBPersistency(t *testing.T) { } db.close() } + +var nodeDBExpirationNodes = []struct { + node Node + pong time.Time + exp bool +}{ + { + node: Node{ + ID: MustHexID("0x01d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), + IP: []byte{127, 0, 0, 1}, + }, + pong: time.Now().Add(-nodeDBNodeExpiration + time.Minute), + exp: false, + }, { + node: Node{ + ID: MustHexID("0x02d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), + IP: []byte{127, 0, 0, 2}, + }, + pong: time.Now().Add(-nodeDBNodeExpiration - time.Minute), + exp: true, + }, +} + +func TestNodeDBExpiration(t *testing.T) { + db, _ := newNodeDB("", Version) + defer db.close() + + // Add all the test nodes and set their last pong time + for i, seed := range nodeDBExpirationNodes { + if err := db.updateNode(&seed.node); err != nil { + t.Fatalf("node %d: failed to insert: %v", i, err) + } + if err := db.updateLastPong(seed.node.ID, seed.pong); err != nil { + t.Fatalf("node %d: failed to update pong: %v", i, err) + } + } + // Expire some of them, and check the rest + if err := db.expireNodes(); err != nil { + t.Fatalf("failed to expire nodes: %v", err) + } + for i, seed := range nodeDBExpirationNodes { + node := db.node(seed.node.ID) + if (node == nil && !seed.exp) || (node != nil && seed.exp) { + t.Errorf("node %d: expiration mismatch: have %v, want %v", i, node, seed.exp) + } + } +} diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 11bdff198..060aa7c09 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -335,6 +335,8 @@ func (tab *Table) ping(id NodeID, addr *net.UDPAddr) error { } // Pong received, update the database and return tab.db.updateLastPong(id, time.Now()) + go tab.db.expirer() + return nil } From 4992765032b4318f3f5b4940a553b4e552c55963 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 28 Apr 2015 10:28:04 +0300 Subject: [PATCH 14/14] p2p/discover: fix goroutine leak due to blocking on sync.Once --- p2p/discover/database.go | 35 +++++++++++++++++++++++------------ p2p/discover/table.go | 2 +- 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/p2p/discover/database.go b/p2p/discover/database.go index 48539a6c9..d966a6ac1 100644 --- a/p2p/discover/database.go +++ b/p2p/discover/database.go @@ -177,23 +177,34 @@ func (db *nodeDB) updateNode(node *Node) error { return db.lvl.Put(makeKey(node.ID, nodeDBDiscoverRoot), blob, nil) } +// ensureExpirer is a small helper method ensuring that the data expiration +// mechanism is running. If the expiration goroutine is already running, this +// method simply returns. +// +// The goal is to start the data evacuation only after the network successfully +// bootstrapped itself (to prevent dumping potentially useful seed nodes). Since +// it would require significant overhead to exactly trace the first successful +// convergence, it's simpler to "ensure" the correct state when an appropriate +// condition occurs (i.e. a successful bonding), and discard further events. +func (db *nodeDB) ensureExpirer() { + db.runner.Do(func() { go db.expirer() }) +} + // expirer should be started in a go routine, and is responsible for looping ad // infinitum and dropping stale data from the database. func (db *nodeDB) expirer() { - db.runner.Do(func() { - tick := time.Tick(nodeDBCleanupCycle) - for { - select { - case <-tick: - if err := db.expireNodes(); err != nil { - glog.V(logger.Error).Infof("Failed to expire nodedb items: %v", err) - } - - case <-db.quit: - return + tick := time.Tick(nodeDBCleanupCycle) + for { + select { + case <-tick: + if err := db.expireNodes(); err != nil { + glog.V(logger.Error).Infof("Failed to expire nodedb items: %v", err) } + + case <-db.quit: + return } - }) + } } // expireNodes iterates over the database and deletes all nodes that have not diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 060aa7c09..d3fe373f4 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -335,7 +335,7 @@ func (tab *Table) ping(id NodeID, addr *net.UDPAddr) error { } // Pong received, update the database and return tab.db.updateLastPong(id, time.Now()) - go tab.db.expirer() + tab.db.ensureExpirer() return nil }