docs. pt-1

This commit is contained in:
Richard Ramos 2021-04-22 14:49:52 -04:00
parent f978071043
commit 51e0fecb76
No known key found for this signature in database
GPG Key ID: 80D4B01265FDFE8F
5 changed files with 25 additions and 3 deletions

View File

@ -192,7 +192,6 @@ func init() {
rootCmd.Flags().Bool("use-db", true, "Store messages and peers in a DB, (default: true, use false for in-memory only)")
rootCmd.Flags().String("dbpath", "./store.db", "Path to DB file")
rootCmd.Flags().String("storenode", "", "Multiaddr of peer to connect with for waku store protocol")
}
func initConfig() {

View File

@ -8,7 +8,7 @@ import (
"github.com/status-im/go-waku/waku/persistence"
)
// Queries are the postgres queries for a given table.
// Queries are the sqlite queries for a given table.
type Queries struct {
deleteQuery string
existsQuery string
@ -85,10 +85,12 @@ func (q Queries) GetSize() string {
return q.getSizeQuery
}
// WithDB is a DBOption that lets you use a sqlite3 DBStore.
func WithDB(path string) persistence.DBOption {
return persistence.WithDriver("sqlite3", path)
}
// NewDB creates a sqlite3 DB in the specified path
func NewDB(path string) (*sql.DB, error) {
db, err := sql.Open("sqlite3", path)
if err != nil {
@ -97,6 +99,7 @@ func NewDB(path string) (*sql.DB, error) {
return db, nil
}
// CreateTable creates the table that will persist the peers
func CreateTable(db *sql.DB, tableName string) error {
sqlStmt := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (key TEXT NOT NULL UNIQUE, data BYTEA);", tableName)
_, err := db.Exec(sqlStmt)

View File

@ -8,6 +8,7 @@ import (
"github.com/status-im/go-waku/waku/v2/protocol/store"
)
// DBStore is a MessageProvider that has a *sql.DB connection
type DBStore struct {
store.MessageProvider
db *sql.DB
@ -15,6 +16,7 @@ type DBStore struct {
type DBOption func(*DBStore) error
// WithDB is a DBOption that lets you use any custom *sql.DB with a DBStore.
func WithDB(db *sql.DB) DBOption {
return func(d *DBStore) error {
d.db = db
@ -22,6 +24,7 @@ func WithDB(db *sql.DB) DBOption {
}
}
// WithDriver is a DBOption that will open a *sql.DB connection
func WithDriver(driverName string, datasourceName string) DBOption {
return func(d *DBStore) error {
db, err := sql.Open(driverName, datasourceName)
@ -33,6 +36,8 @@ func WithDriver(driverName string, datasourceName string) DBOption {
}
}
// Creates a new DB store using the db specified via options.
// It will create a messages table if it does not exist
func NewDBStore(opt DBOption) (*DBStore, error) {
result := new(DBStore)
@ -64,10 +69,12 @@ func (d *DBStore) createTable() error {
return nil
}
// Closes a DB connection
func (d *DBStore) Stop() {
d.db.Close()
}
// Inserts a WakuMessage into the DB
func (d *DBStore) Put(cursor *pb.Index, message *pb.WakuMessage) error {
stmt, err := d.db.Prepare("INSERT INTO messages (id, timestamp, contentTopic, payload, version) VALUES (?, ?, ?, ?, ?)")
if err != nil {
@ -81,6 +88,7 @@ func (d *DBStore) Put(cursor *pb.Index, message *pb.WakuMessage) error {
return nil
}
// Returns all the stored WakuMessages
func (d *DBStore) GetAll() ([]*pb.WakuMessage, error) {
rows, err := d.db.Query("SELECT timestamp, contentTopic, payload, version FROM messages ORDER BY timestamp ASC")
if err != nil {

View File

@ -50,6 +50,9 @@ func (b *broadcaster) run() {
}
}
// NewBroadcaster creates a Broadcaster with an specified length
// It's used to register subscriptors that will need to receive
// an Envelope containing a WakuMessage
func NewBroadcaster(buflen int) Broadcaster {
b := &broadcaster{
input: make(chan *protocol.Envelope, buflen),
@ -63,19 +66,23 @@ func NewBroadcaster(buflen int) Broadcaster {
return b
}
// Register a subscriptor channel
func (b *broadcaster) Register(newch chan<- *protocol.Envelope) {
b.reg <- newch
}
// Unregister a subscriptor channel
func (b *broadcaster) Unregister(newch chan<- *protocol.Envelope) {
b.unreg <- newch
}
// Closes the broadcaster. Used to stop receiving new subscribers
func (b *broadcaster) Close() error {
close(b.reg)
return nil
}
// Submits an Envelope to be broadcasted among all registered subscriber channels
func (b *broadcaster) Submit(m *protocol.Envelope) {
if b != nil {
b.input <- m

View File

@ -6,19 +6,24 @@ import (
"github.com/status-im/go-waku/waku/v2/protocol"
)
// Subscription to a pubsub topic
type Subscription struct {
// Channel for receiving messages
C chan *protocol.Envelope
closed bool
mutex sync.Mutex
quit chan struct{}
}
// Unsubscribe from a pubsub topic. Will close the message channel
func (subs *Subscription) Unsubscribe() {
if !subs.closed {
close(subs.quit)
}
}
// Determine whether a Subscription is open or not
func (subs *Subscription) IsClosed() bool {
subs.mutex.Lock()
defer subs.mutex.Unlock()