diff --git a/waku/node.go b/waku/node.go index ce6c702..5e16216 100644 --- a/waku/node.go +++ b/waku/node.go @@ -192,7 +192,6 @@ func init() { rootCmd.Flags().Bool("use-db", true, "Store messages and peers in a DB, (default: true, use false for in-memory only)") rootCmd.Flags().String("dbpath", "./store.db", "Path to DB file") rootCmd.Flags().String("storenode", "", "Multiaddr of peer to connect with for waku store protocol") - } func initConfig() { diff --git a/waku/persistence/sqlite/sqlite.go b/waku/persistence/sqlite/sqlite.go index e9ac51e..f2c3139 100644 --- a/waku/persistence/sqlite/sqlite.go +++ b/waku/persistence/sqlite/sqlite.go @@ -8,7 +8,7 @@ import ( "github.com/status-im/go-waku/waku/persistence" ) -// Queries are the postgres queries for a given table. +// Queries are the sqlite queries for a given table. type Queries struct { deleteQuery string existsQuery string @@ -85,10 +85,12 @@ func (q Queries) GetSize() string { return q.getSizeQuery } +// WithDB is a DBOption that lets you use a sqlite3 DBStore. func WithDB(path string) persistence.DBOption { return persistence.WithDriver("sqlite3", path) } +// NewDB creates a sqlite3 DB in the specified path func NewDB(path string) (*sql.DB, error) { db, err := sql.Open("sqlite3", path) if err != nil { @@ -97,6 +99,7 @@ func NewDB(path string) (*sql.DB, error) { return db, nil } +// CreateTable creates the table that will persist the peers func CreateTable(db *sql.DB, tableName string) error { sqlStmt := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (key TEXT NOT NULL UNIQUE, data BYTEA);", tableName) _, err := db.Exec(sqlStmt) diff --git a/waku/persistence/store.go b/waku/persistence/store.go index 9b72216..172583c 100644 --- a/waku/persistence/store.go +++ b/waku/persistence/store.go @@ -8,6 +8,7 @@ import ( "github.com/status-im/go-waku/waku/v2/protocol/store" ) +// DBStore is a MessageProvider that has a *sql.DB connection type DBStore struct { store.MessageProvider db *sql.DB @@ -15,6 +16,7 @@ type DBStore struct { type DBOption func(*DBStore) error +// WithDB is a DBOption that lets you use any custom *sql.DB with a DBStore. func WithDB(db *sql.DB) DBOption { return func(d *DBStore) error { d.db = db @@ -22,6 +24,7 @@ func WithDB(db *sql.DB) DBOption { } } +// WithDriver is a DBOption that will open a *sql.DB connection func WithDriver(driverName string, datasourceName string) DBOption { return func(d *DBStore) error { db, err := sql.Open(driverName, datasourceName) @@ -33,6 +36,8 @@ func WithDriver(driverName string, datasourceName string) DBOption { } } +// Creates a new DB store using the db specified via options. +// It will create a messages table if it does not exist func NewDBStore(opt DBOption) (*DBStore, error) { result := new(DBStore) @@ -64,10 +69,12 @@ func (d *DBStore) createTable() error { return nil } +// Closes a DB connection func (d *DBStore) Stop() { d.db.Close() } +// Inserts a WakuMessage into the DB func (d *DBStore) Put(cursor *pb.Index, message *pb.WakuMessage) error { stmt, err := d.db.Prepare("INSERT INTO messages (id, timestamp, contentTopic, payload, version) VALUES (?, ?, ?, ?, ?)") if err != nil { @@ -81,6 +88,7 @@ func (d *DBStore) Put(cursor *pb.Index, message *pb.WakuMessage) error { return nil } +// Returns all the stored WakuMessages func (d *DBStore) GetAll() ([]*pb.WakuMessage, error) { rows, err := d.db.Query("SELECT timestamp, contentTopic, payload, version FROM messages ORDER BY timestamp ASC") if err != nil { diff --git a/waku/v2/node/broadcast.go b/waku/v2/node/broadcast.go index 18a441d..27ef1f7 100644 --- a/waku/v2/node/broadcast.go +++ b/waku/v2/node/broadcast.go @@ -50,6 +50,9 @@ func (b *broadcaster) run() { } } +// NewBroadcaster creates a Broadcaster with an specified length +// It's used to register subscriptors that will need to receive +// an Envelope containing a WakuMessage func NewBroadcaster(buflen int) Broadcaster { b := &broadcaster{ input: make(chan *protocol.Envelope, buflen), @@ -63,19 +66,23 @@ func NewBroadcaster(buflen int) Broadcaster { return b } +// Register a subscriptor channel func (b *broadcaster) Register(newch chan<- *protocol.Envelope) { b.reg <- newch } +// Unregister a subscriptor channel func (b *broadcaster) Unregister(newch chan<- *protocol.Envelope) { b.unreg <- newch } +// Closes the broadcaster. Used to stop receiving new subscribers func (b *broadcaster) Close() error { close(b.reg) return nil } +// Submits an Envelope to be broadcasted among all registered subscriber channels func (b *broadcaster) Submit(m *protocol.Envelope) { if b != nil { b.input <- m diff --git a/waku/v2/node/subscription.go b/waku/v2/node/subscription.go index 1c144e0..27adcf9 100644 --- a/waku/v2/node/subscription.go +++ b/waku/v2/node/subscription.go @@ -6,19 +6,24 @@ import ( "github.com/status-im/go-waku/waku/v2/protocol" ) +// Subscription to a pubsub topic type Subscription struct { - C chan *protocol.Envelope + // Channel for receiving messages + C chan *protocol.Envelope + closed bool mutex sync.Mutex quit chan struct{} } +// Unsubscribe from a pubsub topic. Will close the message channel func (subs *Subscription) Unsubscribe() { if !subs.closed { close(subs.quit) } } +// Determine whether a Subscription is open or not func (subs *Subscription) IsClosed() bool { subs.mutex.Lock() defer subs.mutex.Unlock()