diff --git a/dbutils/insert.go b/dbutils/insert.go new file mode 100644 index 00000000..8963c952 --- /dev/null +++ b/dbutils/insert.go @@ -0,0 +1,144 @@ +package main + +import ( + "context" + "database/sql" + "fmt" + "math/rand" + "time" + + _ "github.com/mattn/go-sqlite3" // Blank import to register the sqlite3 driver + + "github.com/status-im/go-waku/waku/v2/protocol" + "github.com/status-im/go-waku/waku/v2/protocol/pb" +) + +func genRandomBytes(size int) (blk []byte, err error) { + blk = make([]byte, size) + _, err = rand.Read(blk) + return +} + +func genRandomTimestamp(now int64, last30d int64) int64 { + return rand.Int63n(last30d) + now +} + +const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" + +func genRandomContentTopic(n int) string { + topics := []string{"topic1", "topic2", "topic3", "topic4", "topic5"} + i := n % 5 + return protocol.NewContentTopic("test", 1, topics[i], "plaintext").String() +} + +func newdb(path string) (*sql.DB, error) { + db, err := sql.Open("sqlite3", path) + if err != nil { + return nil, err + } + return db, nil +} + +func createTable(db *sql.DB) error { + sqlStmt := `CREATE TABLE IF NOT EXISTS message ( + id BLOB, + receiverTimestamp INTEGER NOT NULL, + senderTimestamp INTEGER NOT NULL, + contentTopic BLOB NOT NULL, + pubsubTopic BLOB NOT NULL, + payload BLOB, + version INTEGER NOT NULL DEFAULT 0, + CONSTRAINT messageIndex PRIMARY KEY (senderTimestamp, id, pubsubTopic) + ) WITHOUT ROWID; + + CREATE INDEX IF NOT EXISTS message_senderTimestamp ON message(senderTimestamp); + CREATE INDEX IF NOT EXISTS message_receiverTimestamp ON message(receiverTimestamp); + ` + _, err := db.Exec(sqlStmt) + if err != nil { + return err + } + return nil +} + +func main() { + + Ns := []int{10_000, 100_000, 1_000_000} + + for _, N := range Ns { + dbName := fmt.Sprintf("store%d.db", N) + fmt.Println("Inserting ", N, " records in ", dbName) + + db, err := newdb(dbName) + query := "INSERT INTO message (id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version) VALUES (?, ?, ?, ?, ?, ?, ?)" + + err = createTable(db) + if err != nil { + panic(err) + } + + trx, err := db.BeginTx(context.Background(), nil) + if err != nil { + panic(err) + } + + stmt, err := trx.Prepare(query) + if err != nil { + panic(err) + } + + last30d := time.Now().UnixNano() - time.Now().Add(-30*time.Hour*24).UnixNano() + now := time.Now().Add(-1 * time.Minute).UnixNano() + pubsubTopic := protocol.DefaultPubsubTopic().String() + for i := 1; i <= N; i++ { + + if i%1000 == 0 && i > 1 && i < N { + err := trx.Commit() + if err != nil { + panic(err) + } + + trx, err = db.BeginTx(context.Background(), nil) + if err != nil { + panic(err) + } + + stmt, err = trx.Prepare(query) + if err != nil { + panic(err) + } + } + + if i%(N/10) == 0 && i > 1 { + fmt.Println(i, "...") + } + + randPayload, err := genRandomBytes(100) + if err != nil { + panic(err) + } + + msg := pb.WakuMessage{ + Version: 0, + ContentTopic: genRandomContentTopic(i), + Timestamp: genRandomTimestamp(now, last30d), + Payload: randPayload, + } + + hash, err := msg.Hash() + if err != nil { + panic(err) + } + + _, err = stmt.Exec(hash, msg.Timestamp, msg.Timestamp, msg.ContentTopic, pubsubTopic, msg.Payload, msg.Version) + if err != nil { + panic(err) + } + } + + err = trx.Commit() + if err != nil { + panic(err) + } + } +} diff --git a/waku/persistence/store.go b/waku/persistence/store.go index 975ba3e9..c9090129 100644 --- a/waku/persistence/store.go +++ b/waku/persistence/store.go @@ -2,6 +2,7 @@ package persistence import ( "database/sql" + "fmt" "time" "github.com/status-im/go-waku/waku/v2/protocol/pb" @@ -100,7 +101,10 @@ func (d *DBStore) createTable() error { payload BLOB, version INTEGER NOT NULL DEFAULT 0, CONSTRAINT messageIndex PRIMARY KEY (senderTimestamp, id, pubsubTopic) - ) WITHOUT ROWID;` + ) WITHOUT ROWID; + + CREATE INDEX IF NOT EXISTS message_senderTimestamp ON message(senderTimestamp); + CREATE INDEX IF NOT EXISTS message_receiverTimestamp ON message(receiverTimestamp);` _, err := d.db.Exec(sqlStmt) if err != nil { return err @@ -120,7 +124,7 @@ func (d *DBStore) cleanOlderRecords() error { // Limit number of records to a max N if d.maxMessages > 0 { - sqlStmt := `DELETE FROM message WHERE id IN (SELECT id FROM message ORDER BY receiverTimestamp DESC LIMIT -1 OFFSET 5)` + sqlStmt := `DELETE FROM message WHERE id IN (SELECT id FROM message ORDER BY receiverTimestamp DESC LIMIT -1 OFFSET ?)` _, err := d.db.Exec(sqlStmt, d.maxMessages) if err != nil { return err @@ -157,6 +161,12 @@ func (d *DBStore) Put(cursor *pb.Index, pubsubTopic string, message *pb.WakuMess // Returns all the stored WakuMessages func (d *DBStore) GetAll() ([]StoredMessage, error) { + start := time.Now() + defer func() { + elapsed := time.Since(start) + d.log.Info(fmt.Sprintf("Loading records from the DB took %s", elapsed)) + }() + rows, err := d.db.Query("SELECT id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version FROM message ORDER BY senderTimestamp ASC") if err != nil { return nil, err @@ -196,6 +206,8 @@ func (d *DBStore) GetAll() ([]StoredMessage, error) { result = append(result, record) } + d.log.Info(fmt.Sprintf("DB returned %d records", len(result))) + err = rows.Err() if err != nil { return nil, err diff --git a/waku/v2/protocol/store/waku_store.go b/waku/v2/protocol/store/waku_store.go index 468113e1..044a6653 100644 --- a/waku/v2/protocol/store/waku_store.go +++ b/waku/v2/protocol/store/waku_store.go @@ -305,6 +305,12 @@ func (store *WakuStore) fetchDBRecords(ctx context.Context) { return } + start := time.Now() + defer func() { + elapsed := time.Since(start) + store.log.Info(fmt.Sprintf("Store initialization took %s", elapsed)) + }() + storedMessages, err := (store.msgProvider).GetAll() if err != nil { store.log.Error("could not load DBProvider messages", err) @@ -319,9 +325,11 @@ func (store *WakuStore) fetchDBRecords(ctx context.Context) { } _ = store.addToMessageQueue(storedMessage.PubsubTopic, idx, storedMessage.Message) - - metrics.RecordMessage(ctx, "stored", store.messageQueue.Length()) } + + metrics.RecordMessage(ctx, "stored", store.messageQueue.Length()) + + store.log.Info(fmt.Sprintf("%d messages available in waku store", store.messageQueue.Length())) } func (store *WakuStore) addToMessageQueue(pubsubTopic string, idx *pb.Index, msg *pb.WakuMessage) error {