diff --git a/src/plugins/discourse/createGraph.js b/src/plugins/discourse/createGraph.js index fd4fffb..517482b 100644 --- a/src/plugins/discourse/createGraph.js +++ b/src/plugins/discourse/createGraph.js @@ -16,7 +16,7 @@ import { type Topic, type LikeAction, } from "./fetch"; -import {type DiscourseData} from "./mirror"; +import {type ReadRepository} from "./mirrorRepository"; import { authorsPostEdgeType, authorsTopicEdgeType, @@ -143,7 +143,7 @@ export function likesEdge(serverUrl: string, like: LikeAction): Edge { }; } -export function createGraph(serverUrl: string, data: DiscourseData): Graph { +export function createGraph(serverUrl: string, data: ReadRepository): Graph { const gc = new _GraphCreator(serverUrl, data); return gc.graph; } @@ -151,10 +151,10 @@ export function createGraph(serverUrl: string, data: DiscourseData): Graph { class _GraphCreator { graph: Graph; serverUrl: string; - data: DiscourseData; + data: ReadRepository; topicIdToTitle: Map; - constructor(serverUrl: string, data: DiscourseData) { + constructor(serverUrl: string, data: ReadRepository) { if (serverUrl.endsWith("/")) { throw new Error(`by convention, serverUrl should not end with /`); } diff --git a/src/plugins/discourse/createGraph.test.js b/src/plugins/discourse/createGraph.test.js index dd380f1..213b777 100644 --- a/src/plugins/discourse/createGraph.test.js +++ b/src/plugins/discourse/createGraph.test.js @@ -1,7 +1,7 @@ // @flow import sortBy from "lodash.sortby"; -import type {DiscourseData} from "./mirror"; +import type {ReadRepository} from "./mirrorRepository"; import type {Topic, Post, PostId, TopicId, LikeAction} from "./fetch"; import {NodeAddress, EdgeAddress, type Node, type Edge} from "../../core/graph"; import { @@ -34,7 +34,7 @@ import { import type {EdgeType, NodeType} from "../../analysis/types"; describe("plugins/discourse/createGraph", () => { - class MockData implements DiscourseData { + class MockData implements ReadRepository { _topics: $ReadOnlyArray; _posts: $ReadOnlyArray; _likes: $ReadOnlyArray; @@ -69,6 +69,12 @@ describe("plugins/discourse/createGraph", () => { )[0]; return post ? post.id : null; } + maxIds() { + return { + maxPostId: this._posts.reduce((max, p) => Math.max(p.id, max), 0), + maxTopicId: this._topics.reduce((max, t) => Math.max(t.id, max), 0), + }; + } } function example() { diff --git a/src/plugins/discourse/loadDiscourse.js b/src/plugins/discourse/loadDiscourse.js index e3edd9c..74f91e0 100644 --- a/src/plugins/discourse/loadDiscourse.js +++ b/src/plugins/discourse/loadDiscourse.js @@ -3,6 +3,7 @@ import Database from "better-sqlite3"; import base64url from "base64url"; import {Fetcher, type DiscourseFetchOptions} from "./fetch"; +import {SqliteMirrorRepository, type ReadRepository} from "./mirrorRepository"; import {Mirror} from "./mirror"; import {createGraph} from "./createGraph"; import {TaskReporter} from "../../util/taskReporter"; @@ -22,9 +23,13 @@ export async function loadDiscourse( ): Promise { const filename = base64url.encode(options.fetchOptions.serverUrl) + ".db"; const db = new Database(path.join(options.cacheDirectory, filename)); + const repo = new SqliteMirrorRepository(db, options.fetchOptions.serverUrl); const fetcher = new Fetcher(options.fetchOptions); - const mirror = new Mirror(db, fetcher, options.fetchOptions.serverUrl); + const mirror = new Mirror(repo, fetcher, options.fetchOptions.serverUrl); await mirror.update(reporter); - const graph = createGraph(options.fetchOptions.serverUrl, mirror); + const graph = createGraph( + options.fetchOptions.serverUrl, + (repo: ReadRepository) + ); return graph; } diff --git a/src/plugins/discourse/mirror.js b/src/plugins/discourse/mirror.js index 83da3db..4395185 100644 --- a/src/plugins/discourse/mirror.js +++ b/src/plugins/discourse/mirror.js @@ -1,65 +1,8 @@ // @flow -import type {Database} from "better-sqlite3"; -import stringify from "json-stable-stringify"; -import dedent from "../../util/dedent"; import type {TaskReporter} from "../../util/taskReporter"; -import { - type Discourse, - type TopicId, - type PostId, - type Topic, - type Post, - type LikeAction, -} from "./fetch"; - -// The version should be bumped any time the database schema is changed, -// so that the cache will be properly invalidated. -const VERSION = "discourse_mirror_v4"; - -/** - * An interface for retrieving all of the Discourse data at once. - * - * Also has some convenience methods for interpeting the data (e.g. getting - * a post by its index in a topic). - * - * The mirror implements this; it's factored out as an interface for - * ease of testing. - */ -export interface DiscourseData { - /** - * Retrieve every Topic available. - * - * The order is unspecified. - */ - topics(): $ReadOnlyArray; - - /** - * Retrieve every Post available. - * - * The order is unspecified. - */ - posts(): $ReadOnlyArray; - - /** - * Given a TopicId and a post number, find that numbered post within the topic. - * - * Returns undefined if no such post is available. - */ - findPostInTopic(topicId: TopicId, indexWithinTopic: number): ?PostId; - - /** - * Get usernames for all users. - * - * The order is unspecified. - */ - users(): $ReadOnlyArray; - - /** - * Gets all of the like actions in the history. - */ - likes(): $ReadOnlyArray; -} +import {type Discourse} from "./fetch"; +import {MirrorRepository} from "./mirrorRepository"; /** * Mirrors data from the Discourse API into a local sqlite db. @@ -78,8 +21,8 @@ export interface DiscourseData { * Each Mirror instance is tied to a particular server. Trying to use a mirror * for multiple Discourse servers is not permitted; use separate Mirrors. */ -export class Mirror implements DiscourseData { - +_db: Database; +export class Mirror { + +_repo: MirrorRepository; +_fetcher: Discourse; +_serverUrl: string; @@ -92,271 +35,49 @@ export class Mirror implements DiscourseData { * A serverUrl is required so that we can ensure that this Mirror is only storing * data from a particular Discourse server. */ - constructor(db: Database, fetcher: Discourse, serverUrl: string) { - if (db == null) throw new Error("db: " + String(db)); - this._db = db; + constructor(repo: MirrorRepository, fetcher: Discourse, serverUrl: string) { + this._repo = repo; this._fetcher = fetcher; this._serverUrl = serverUrl; - if (db.inTransaction) { - throw new Error("already in transaction"); - } - try { - db.prepare("BEGIN").run(); - this._initialize(); - if (db.inTransaction) { - db.prepare("COMMIT").run(); - } - } finally { - if (db.inTransaction) { - db.prepare("ROLLBACK").run(); - } - } - } - - _initialize() { - const db = this._db; - // We store the config in a singleton table `meta`, whose unique row - // has primary key `0`. Only the first ever insert will succeed; we - // are locked into the first config. - db.prepare( - dedent`\ - CREATE TABLE IF NOT EXISTS meta ( - zero INTEGER PRIMARY KEY, - config TEXT NOT NULL - ) - ` - ).run(); - - const config = stringify({ - version: VERSION, - serverUrl: this._serverUrl, - }); - - const existingConfig: string | void = db - .prepare("SELECT config FROM meta") - .pluck() - .get(); - if (existingConfig === config) { - // Already set up; nothing to do. - return; - } else if (existingConfig !== undefined) { - throw new Error( - "Database already populated with incompatible server or version" - ); - } - db.prepare("INSERT INTO meta (zero, config) VALUES (0, ?)").run(config); - - const tables = [ - "CREATE TABLE users (username TEXT PRIMARY KEY)", - dedent`\ - CREATE TABLE topics ( - id INTEGER PRIMARY KEY, - title TEXT NOT NULL, - timestamp_ms INTEGER NOT NULL, - author_username TEXT NOT NULL, - FOREIGN KEY(author_username) REFERENCES users(username) - ) - `, - dedent`\ - CREATE TABLE posts ( - id INTEGER PRIMARY KEY, - timestamp_ms INTEGER NOT NULL, - author_username TEXT NOT NULL, - topic_id INTEGER NOT NULL, - index_within_topic INTEGER NOT NULL, - reply_to_post_index INTEGER, - cooked TEXT NOT NULL, - FOREIGN KEY(topic_id) REFERENCES topics(id), - FOREIGN KEY(author_username) REFERENCES users(username) - ) - `, - dedent`\ - CREATE TABLE likes ( - username TEXT NOT NULL, - post_id INTEGER NOT NULL, - timestamp_ms INTEGER NOT NULL, - CONSTRAINT username_post PRIMARY KEY (username, post_id), - FOREIGN KEY(post_id) REFERENCES posts(id), - FOREIGN KEY(username) REFERENCES users(username) - )`, - ]; - for (const sql of tables) { - db.prepare(sql).run(); - } - } - - topics(): $ReadOnlyArray { - return this._db - .prepare( - dedent`\ - SELECT - id, - timestamp_ms, - author_username, - title - FROM topics` - ) - .all() - .map((x) => ({ - id: x.id, - timestampMs: x.timestamp_ms, - authorUsername: x.author_username, - title: x.title, - })); - } - - posts(): $ReadOnlyArray { - return this._db - .prepare( - dedent`\ - SELECT - id, - timestamp_ms, - author_username, - topic_id, - index_within_topic, - reply_to_post_index, - cooked - FROM posts` - ) - .all() - .map((x) => ({ - id: x.id, - timestampMs: x.timestamp_ms, - authorUsername: x.author_username, - topicId: x.topic_id, - indexWithinTopic: x.index_within_topic, - replyToPostIndex: x.reply_to_post_index, - cooked: x.cooked, - })); - } - - users(): $ReadOnlyArray { - return this._db - .prepare("SELECT username FROM users") - .pluck() - .all(); - } - - likes(): $ReadOnlyArray { - return this._db - .prepare("SELECT post_id, username, timestamp_ms FROM likes") - .all() - .map((x) => ({ - postId: x.post_id, - timestampMs: x.timestamp_ms, - username: x.username, - })); - } - - findPostInTopic(topicId: TopicId, indexWithinTopic: number): ?PostId { - return this._db - .prepare( - dedent`\ - SELECT id - FROM posts - WHERE topic_id = :topic_id AND index_within_topic = :index_within_topic - ` - ) - .pluck() - .get({topic_id: topicId, index_within_topic: indexWithinTopic}); } async update(reporter: TaskReporter) { - reporter.start("discourse"); - const db = this._db; - const {max_post: lastLocalPostId, max_topic: lastLocalTopicId} = db - .prepare( - dedent`\ - SELECT - (SELECT IFNULL(MAX(id), 0) FROM posts) AS max_post, - (SELECT IFNULL(MAX(id), 0) FROM topics) AS max_topic - ` - ) - .get(); - + // Local functions add the warning and tracking semantics we want from them. const encounteredPostIds = new Set(); - const addPost: (Post) => void = (() => { - const query = db.prepare( - dedent`\ - REPLACE INTO posts ( - id, - timestamp_ms, - author_username, - topic_id, - index_within_topic, - reply_to_post_index, - cooked - ) VALUES ( - :id, - :timestamp_ms, - :author_username, - :topic_id, - :index_within_topic, - :reply_to_post_index, - :cooked - ) - ` - ); - const serverUrl = this._serverUrl; - return function addPost(post: Post) { - try { - addUser(post.authorUsername); - encounteredPostIds.add(post.id); - query.run({ - id: post.id, - timestamp_ms: post.timestampMs, - reply_to_post_index: post.replyToPostIndex, - index_within_topic: post.indexWithinTopic, - topic_id: post.topicId, - author_username: post.authorUsername, - cooked: post.cooked, - }); - } catch (e) { - const url = `${serverUrl}/t/${post.topicId}/${post.indexWithinTopic}`; - console.warn( - `Warning: Encountered error '${e.message}' while adding post ${url}.` - ); - } - }; - })(); + const addPost = (post) => { + try { + encounteredPostIds.add(post.id); + return this._repo.addPost(post); + } catch (e) { + const url = `${this._serverUrl}/t/${post.topicId}/${post.indexWithinTopic}`; + console.warn( + `Warning: Encountered error '${e.message}' while adding post ${url}.` + ); + return {changes: 0, lastInsertRowid: -1}; + } + }; - const addUser: (username: string) => void = (() => { - const query = db.prepare( - "INSERT OR IGNORE INTO users (username) VALUES (?)" - ); - return function addUser(username: string) { - query.run(username); - }; - })(); + const addLike = (like) => { + try { + const res = this._repo.addLike(like); + return {doneWithUser: res.changes === 0}; + } catch (e) { + console.warn( + `Warning: Encountered error '${e.message}' ` + + `on a like by ${like.username} ` + + `on post id ${like.postId}.` + ); + return {doneWithUser: false}; + } + }; - const addTopic: (Topic) => void = (() => { - const query = this._db.prepare( - dedent`\ - REPLACE INTO topics ( - id, - title, - timestamp_ms, - author_username - ) VALUES ( - :id, - :title, - :timestamp_ms, - :author_username - ) - ` - ); - return function addTopic(topic: Topic) { - addUser(topic.authorUsername); - query.run({ - id: topic.id, - title: topic.title, - timestamp_ms: topic.timestampMs, - author_username: topic.authorUsername, - }); - }; - })(); + reporter.start("discourse"); + + const { + maxPostId: lastLocalPostId, + maxTopicId: lastLocalTopicId, + } = this._repo.maxIds(); reporter.start("discourse/topics"); const latestTopicId = await this._fetcher.latestTopicId(); @@ -368,7 +89,7 @@ export class Mirror implements DiscourseData { const topicWithPosts = await this._fetcher.topicWithPosts(topicId); if (topicWithPosts != null) { const {topic, posts} = topicWithPosts; - addTopic(topic); + this._repo.addTopic(topic); for (const post of posts) { addPost(post); } @@ -414,49 +135,8 @@ export class Mirror implements DiscourseData { // since our last scan. This would likely improve the performance of this // section of the update significantly. - /** - * Add a like action to the database. The user of the like is - * assumed to already exist in the database; if this is not known to - * be the case, run `addUser(like.username)` first. - * - * Returns a status indicating whether we are done processing this user - * (e.g. we have already seen all of their likes). - */ - const addLike: (like: LikeAction) => {|+doneWithUser: boolean|} = (() => { - const query = db.prepare( - dedent`\ - INSERT OR IGNORE INTO likes ( - post_id, - timestamp_ms, - username - ) VALUES ( - :post_id, - :timestamp_ms, - :username - ) - ` - ); - return function addLike(like: LikeAction) { - try { - const runResult = query.run({ - post_id: like.postId, - timestamp_ms: like.timestampMs, - username: like.username, - }); - return {doneWithUser: runResult.changes === 0}; - } catch (e) { - console.warn( - `Warning: Encountered error '${e.message}' ` + - `on a like by ${like.username} ` + - `on post id ${like.postId}.` - ); - return {doneWithUser: false}; - } - }; - })(); - reporter.start("discourse/likes"); - for (const user of this.users()) { + for (const user of this._repo.users()) { let offset = 0; let upToDate = false; while (!upToDate) { diff --git a/src/plugins/discourse/mirror.test.js b/src/plugins/discourse/mirror.test.js index 2483b8a..afe2600 100644 --- a/src/plugins/discourse/mirror.test.js +++ b/src/plugins/discourse/mirror.test.js @@ -2,9 +2,8 @@ import sortBy from "lodash.sortby"; import Database from "better-sqlite3"; -import fs from "fs"; -import tmp from "tmp"; import {Mirror} from "./mirror"; +import {SqliteMirrorRepository} from "./mirrorRepository"; import { type Discourse, type TopicId, @@ -170,60 +169,41 @@ describe("plugins/discourse/mirror", () => { const fetcher = new MockFetcher(); const db = new Database(":memory:"); const url = "http://example.com"; - const mirror = new Mirror(db, fetcher, url); + const repo = new SqliteMirrorRepository(db, url); + const mirror = new Mirror(repo, fetcher, url); const reporter = new TestTaskReporter(); - return {fetcher, mirror, reporter, url}; + return {fetcher, mirror, reporter, url, repo}; }; - it("rejects a different server url without changing the database", () => { - // We use an on-disk database file here so that we can dump the - // contents to ensure that the database is physically unchanged. - const filename = tmp.fileSync().name; - const db = new Database(filename); - const fetcher = new MockFetcher(); - const url1 = "https://foo.bar"; - const url2 = "https://foo.zod"; - expect(() => new Mirror(db, fetcher, url1)).not.toThrow(); - const data = fs.readFileSync(filename).toJSON(); - - expect(() => new Mirror(db, fetcher, url2)).toThrow( - "incompatible server or version" - ); - expect(fs.readFileSync(filename).toJSON()).toEqual(data); - - expect(() => new Mirror(db, fetcher, url1)).not.toThrow(); - expect(fs.readFileSync(filename).toJSON()).toEqual(data); - }); - it("mirrors topics from the fetcher", async () => { - const {mirror, fetcher, reporter} = example(); + const {mirror, fetcher, reporter, repo} = example(); fetcher.addPost(2, null); fetcher.addPost(3, null); const topic2 = fetcher._topic(2); const topic3 = fetcher._topic(3); await mirror.update(reporter); - expect(mirror.topics()).toEqual([topic2, topic3]); + expect(repo.topics()).toEqual([topic2, topic3]); }); it("mirrors posts from the fetcher", async () => { - const {mirror, fetcher, reporter} = example(); + const {mirror, fetcher, reporter, repo} = example(); const p1 = fetcher.addPost(2, null); const p2 = fetcher.addPost(3, null); const p3 = fetcher.addPost(3, 1); await mirror.update(reporter); const posts = [fetcher._post(p1), fetcher._post(p2), fetcher._post(p3)]; - expect(mirror.posts()).toEqual(posts); + expect(repo.posts()).toEqual(posts); }); it("provides usernames for all active users", async () => { - const {mirror, fetcher, reporter} = example(); + const {mirror, fetcher, reporter, repo} = example(); fetcher.addPost(2, null, "alpha"); fetcher.addPost(3, null, "beta"); fetcher.addPost(3, 1, "alpha"); await mirror.update(reporter); // credbot appears because it is the nominal author of all topics expect( - mirror + repo .users() .slice() .sort() @@ -241,7 +221,7 @@ describe("plugins/discourse/mirror", () => { } it("provides all the likes by users that have posted", async () => { - const {mirror, fetcher, reporter} = example(); + const {mirror, fetcher, reporter, repo} = example(); fetcher.addPost(1, null, "alpha"); fetcher.addPost(2, null, "alpha"); fetcher.addPost(3, null, "beta"); @@ -250,26 +230,26 @@ describe("plugins/discourse/mirror", () => { const l3 = fetcher.addLike("beta", 3, 7); const l4 = fetcher.addLike("alpha", 1, 8); await mirror.update(reporter); - expectLikesSorted(mirror.likes(), [l1, l2, l3, l4]); + expectLikesSorted(repo.likes(), [l1, l2, l3, l4]); const l5 = fetcher.addLike("alpha", 2, 9); fetcher.addPost(4, null, "credbot"); const l6 = fetcher.addLike("credbot", 2, 10); const l7 = fetcher.addLike("beta", 4, 11); await mirror.update(reporter); - expectLikesSorted(mirror.likes(), [l1, l2, l3, l4, l5, l6, l7]); + expectLikesSorted(repo.likes(), [l1, l2, l3, l4, l5, l6, l7]); }); it("doesn't find likes of users that never posted", async () => { - const {mirror, fetcher, reporter} = example(); + const {mirror, fetcher, reporter, repo} = example(); fetcher.addPost(1, null); fetcher.addLike("nope", 1, 1); await mirror.update(reporter); - expect(mirror.likes()).toEqual([]); + expect(repo.likes()).toEqual([]); }); describe("update semantics", () => { it("only fetches new topics on `update`", async () => { - const {mirror, fetcher, reporter} = example(); + const {mirror, fetcher, reporter, repo} = example(); fetcher.addPost(1, null); fetcher.addPost(2, null); await mirror.update(reporter); @@ -278,11 +258,11 @@ describe("plugins/discourse/mirror", () => { await mirror.update(reporter); expect(fetchTopicWithPosts).toHaveBeenCalledTimes(1); expect(fetchTopicWithPosts).toHaveBeenCalledWith(3); - expect(mirror.topics().map((x) => x.id)).toEqual([1, 2, 3]); + expect(repo.topics().map((x) => x.id)).toEqual([1, 2, 3]); }); it("gets new posts on old topics on update", async () => { - const {mirror, fetcher, reporter} = example(); + const {mirror, fetcher, reporter, repo} = example(); fetcher.addPost(1, null); fetcher.addPost(2, null); await mirror.update(reporter); @@ -292,30 +272,30 @@ describe("plugins/discourse/mirror", () => { const latestPosts = await fetcher.latestPosts(); // The post added to the old topic wasn't retrieved by latest post expect(latestPosts.map((x) => x.id)).not.toContain(id); - const allPostIds = mirror.posts().map((x) => x.id); + const allPostIds = repo.posts().map((x) => x.id); // The post was still included, meaning the mirror scanned for new posts by id expect(allPostIds).toContain(id); }); it("skips null/missing topics", async () => { - const {mirror, fetcher, reporter} = example(); + const {mirror, fetcher, reporter, repo} = example(); fetcher.addPost(1, null); fetcher.addPost(3, null); await mirror.update(reporter); - expect(mirror.topics().map((x) => x.id)).toEqual([1, 3]); + expect(repo.topics().map((x) => x.id)).toEqual([1, 3]); }); it("skips null/missing posts", async () => { - const {mirror, fetcher, reporter} = example(); + const {mirror, fetcher, reporter, repo} = example(); const p1 = fetcher.addPost(1, null); fetcher._latestPostId += 2; const p2 = fetcher.addPost(3, null); await mirror.update(reporter); - expect(mirror.posts().map((x) => x.id)).toEqual([p1, p2]); + expect(repo.posts().map((x) => x.id)).toEqual([p1, p2]); }); it("queries explicitly for posts that are not present in topicWithPosts.posts", async () => { - const {mirror, fetcher, reporter} = example(); + const {mirror, fetcher, reporter, repo} = example(); const p1 = fetcher.addPost(1, null); const p2 = fetcher.addPost(1, 1); const p3 = fetcher.addPost(1, 1); @@ -333,7 +313,7 @@ describe("plugins/discourse/mirror", () => { expect(fetchPost).toHaveBeenCalledTimes(1); expect(fetchPost).toHaveBeenCalledWith(p2); - expect(mirror.posts().map(getId)).toEqual([p1, p2, p3]); + expect(repo.posts().map(getId)).toEqual([p1, p2, p3]); }); it("does not explicitly query for posts that were in topicWithPosts.posts", async () => { @@ -345,14 +325,14 @@ describe("plugins/discourse/mirror", () => { }); it("does not explicitly query for posts that were provided in latest posts", async () => { - const {mirror, fetcher, reporter} = example(); + const {mirror, fetcher, reporter, repo} = example(); fetcher.addPost(1, null); await mirror.update(reporter); const id = fetcher.addPost(1, 1); const fetchPost = jest.spyOn(fetcher, "post"); await mirror.update(reporter); expect(fetchPost).not.toHaveBeenCalled(); - expect(mirror.posts().map((x) => x.id)).toContain(id); + expect(repo.posts().map((x) => x.id)).toContain(id); }); it("does not query for topics at all if there were no new topics", async () => { @@ -405,7 +385,7 @@ describe("plugins/discourse/mirror", () => { }); it("warns if one of the latest posts has no topic", async () => { - const {mirror, fetcher, reporter} = example(); + const {mirror, fetcher, reporter, repo} = example(); const pid1 = fetcher.addPost(1, null, "credbot"); const pid2 = fetcher.addPost(2, null, "credbot"); // Verify that the problem post is one of the latest posts @@ -415,9 +395,9 @@ describe("plugins/discourse/mirror", () => { fetcher._latestTopicId--; await mirror.update(reporter); const topics = [fetcher._topic(1)]; - expect(mirror.topics()).toEqual(topics); + expect(repo.topics()).toEqual(topics); const posts = [fetcher._post(pid1)]; - expect(mirror.posts()).toEqual(posts); + expect(repo.posts()).toEqual(posts); expect(console.warn).toHaveBeenCalledWith( "Warning: Encountered error 'FOREIGN KEY constraint failed' " + "while adding post http://example.com/t/2/1." @@ -427,7 +407,7 @@ describe("plugins/discourse/mirror", () => { }); it("warns if it finds a (non-latest) post with no topic", async () => { - const {mirror, fetcher, reporter} = example(); + const {mirror, fetcher, reporter, repo} = example(); const pid1 = fetcher.addPost(1, null, "credbot"); const pid2 = fetcher.addPost(2, null, "credbot"); const pid3 = fetcher.addPost(1, null, "credbot"); @@ -438,9 +418,9 @@ describe("plugins/discourse/mirror", () => { fetcher._latestTopicId--; await mirror.update(reporter); const topics = [fetcher._topic(1)]; - expect(mirror.topics()).toEqual(topics); + expect(repo.topics()).toEqual(topics); const posts = [pid1, pid3].map((x) => fetcher._post(x)); - expect(mirror.posts()).toEqual(posts); + expect(repo.posts()).toEqual(posts); expect(console.warn).toHaveBeenCalledWith( "Warning: Encountered error 'FOREIGN KEY constraint failed' " + "while adding post http://example.com/t/2/1." @@ -450,14 +430,14 @@ describe("plugins/discourse/mirror", () => { }); it("warns if it gets a like that doesn't correspond to any post", async () => { - const {mirror, fetcher, reporter} = example(); + const {mirror, fetcher, reporter, repo} = example(); const pid = fetcher.addPost(1, null, "credbot"); const badLike = {username: "credbot", postId: 37, timestampMs: 0}; fetcher._likes.push(badLike); await mirror.update(reporter); - expect(mirror.topics()).toEqual([fetcher._topic(1)]); - expect(mirror.posts()).toEqual([fetcher._post(pid)]); - expect(mirror.likes()).toEqual([]); + expect(repo.topics()).toEqual([fetcher._topic(1)]); + expect(repo.posts()).toEqual([fetcher._post(pid)]); + expect(repo.likes()).toEqual([]); expect(console.warn).toHaveBeenCalledWith( "Warning: Encountered error 'FOREIGN KEY constraint failed' " + "on a like by credbot on post id 37." @@ -467,18 +447,18 @@ describe("plugins/discourse/mirror", () => { }); }); - it("ignores if a user's likes are missing", async () => { - const {mirror, fetcher, reporter} = example(); + it("warns if a user's likes are missing", async () => { + const {mirror, fetcher, reporter, repo} = example(); const pid = fetcher.addPost(1, null, "credbot"); (fetcher: any).likesByUser = async () => null; await mirror.update(reporter); - expect(mirror.topics()).toEqual([fetcher._topic(1)]); - expect(mirror.posts()).toEqual([fetcher._post(pid)]); - expect(mirror.likes()).toEqual([]); + expect(repo.topics()).toEqual([fetcher._topic(1)]); + expect(repo.posts()).toEqual([fetcher._post(pid)]); + expect(repo.likes()).toEqual([]); }); it("inserts other likes if one user's likes are missing", async () => { - const {mirror, fetcher, reporter} = example(); + const {mirror, fetcher, reporter, repo} = example(); const p1 = fetcher.addPost(1, null, "credbot"); const p2 = fetcher.addPost(1, 1, "otheruser"); const l1 = fetcher.addLike("otheruser", 1, 123); @@ -491,9 +471,9 @@ describe("plugins/discourse/mirror", () => { return await _likesByUser(targetUsername, offset); }; await mirror.update(reporter); - expect(mirror.topics()).toEqual([fetcher._topic(1)]); - expect(mirror.posts()).toEqual([fetcher._post(p1), fetcher._post(p2)]); - expect(mirror.likes()).toEqual([l1]); + expect(repo.topics()).toEqual([fetcher._topic(1)]); + expect(repo.posts()).toEqual([fetcher._post(p1), fetcher._post(p2)]); + expect(repo.likes()).toEqual([l1]); }); it("sends the right tasks to the TaskReporter", async () => { @@ -515,42 +495,42 @@ describe("plugins/discourse/mirror", () => { describe("findPostInTopic", () => { it("works for the first post in a topic", async () => { - const {mirror, fetcher, reporter} = example(); + const {mirror, fetcher, reporter, repo} = example(); const id = fetcher.addPost(5, null); const post = NullUtil.get(fetcher._post(id)); expect(post.topicId).toEqual(5); expect(post.indexWithinTopic).toEqual(1); await mirror.update(reporter); - expect(mirror.findPostInTopic(5, 1)).toEqual(id); + expect(repo.findPostInTopic(5, 1)).toEqual(id); }); it("works for the second post in a topic", async () => { - const {mirror, fetcher, reporter} = example(); + const {mirror, fetcher, reporter, repo} = example(); fetcher.addPost(1, null); const id = fetcher.addPost(1, 1); const post = NullUtil.get(fetcher._post(id)); expect(post.indexWithinTopic).toEqual(2); await mirror.update(reporter); - expect(mirror.findPostInTopic(1, 2)).toEqual(id); + expect(repo.findPostInTopic(1, 2)).toEqual(id); }); it("returns undefined for a post with too high an index", async () => { - const {mirror, fetcher, reporter} = example(); + const {mirror, fetcher, reporter, repo} = example(); fetcher.addPost(1, null); await mirror.update(reporter); - expect(mirror.findPostInTopic(1, 2)).toBe(undefined); + expect(repo.findPostInTopic(1, 2)).toBe(undefined); }); it("returns undefined for topic that doesnt exist", async () => { - const {mirror, fetcher, reporter} = example(); + const {mirror, fetcher, reporter, repo} = example(); fetcher.addPost(1, null); await mirror.update(reporter); - expect(mirror.findPostInTopic(2, 1)).toBe(undefined); + expect(repo.findPostInTopic(2, 1)).toBe(undefined); }); it("returns undefined for a mirror that never updated", async () => { - const {mirror} = example(); - expect(mirror.findPostInTopic(1, 1)).toBe(undefined); + const {repo} = example(); + expect(repo.findPostInTopic(1, 1)).toBe(undefined); }); }); }); diff --git a/src/plugins/discourse/mirrorRepository.js b/src/plugins/discourse/mirrorRepository.js new file mode 100644 index 0000000..bc02c54 --- /dev/null +++ b/src/plugins/discourse/mirrorRepository.js @@ -0,0 +1,365 @@ +// @flow + +import type {Database} from "better-sqlite3"; +import stringify from "json-stable-stringify"; +import dedent from "../../util/dedent"; +import { + type TopicId, + type PostId, + type Topic, + type Post, + type LikeAction, +} from "./fetch"; + +// The version should be bumped any time the database schema is changed, +// so that the cache will be properly invalidated. +const VERSION = "discourse_mirror_v4"; + +/** + * An interface for reading the local Discourse data. + */ +export interface ReadRepository { + /** + * Retrieve every Topic available. + * + * The order is unspecified. + */ + topics(): $ReadOnlyArray; + + /** + * Retrieve every Post available. + * + * The order is unspecified. + */ + posts(): $ReadOnlyArray; + + /** + * Given a TopicId and a post number, find that numbered post within the topic. + * + * Returns undefined if no such post is available. + */ + findPostInTopic(topicId: TopicId, indexWithinTopic: number): ?PostId; + + /** + * Get usernames for all users. + * + * The order is unspecified. + */ + users(): $ReadOnlyArray; + + /** + * Gets all of the like actions in the history. + */ + likes(): $ReadOnlyArray; +} + +export type MaxIds = {| + +maxPostId: number, + +maxTopicId: number, +|}; + +export type AddResult = {| + +changes: number, + +lastInsertRowid: number, +|}; + +// Read-write interface the mirror uses internally. +export interface MirrorRepository extends ReadRepository { + maxIds(): MaxIds; + addTopic(topic: Topic): AddResult; + addPost(post: Post): AddResult; + addLike(like: LikeAction): AddResult; +} + +function toAddResult({ + changes, + lastInsertRowid, +}: { + changes: number, + lastInsertRowid: number, +}): AddResult { + return {changes, lastInsertRowid}; +} + +export class SqliteMirrorRepository + implements ReadRepository, MirrorRepository { + +_db: Database; + + constructor(db: Database, serverUrl: string) { + if (db == null) throw new Error("db: " + String(db)); + this._db = db; + if (db.inTransaction) { + throw new Error("already in transaction"); + } + try { + db.prepare("BEGIN").run(); + this._initialize(serverUrl); + if (db.inTransaction) { + db.prepare("COMMIT").run(); + } + } finally { + if (db.inTransaction) { + db.prepare("ROLLBACK").run(); + } + } + } + + _initialize(serverUrl: string) { + const db = this._db; + // We store the config in a singleton table `meta`, whose unique row + // has primary key `0`. Only the first ever insert will succeed; we + // are locked into the first config. + db.prepare( + dedent`\ + CREATE TABLE IF NOT EXISTS meta ( + zero INTEGER PRIMARY KEY, + config TEXT NOT NULL + ) + ` + ).run(); + + const config = stringify({ + version: VERSION, + serverUrl: serverUrl, + }); + + const existingConfig: string | void = db + .prepare("SELECT config FROM meta") + .pluck() + .get(); + if (existingConfig === config) { + // Already set up; nothing to do. + return; + } else if (existingConfig !== undefined) { + throw new Error( + "Database already populated with incompatible server or version" + ); + } + db.prepare("INSERT INTO meta (zero, config) VALUES (0, ?)").run(config); + + const tables = [ + "CREATE TABLE users (username TEXT PRIMARY KEY)", + dedent`\ + CREATE TABLE topics ( + id INTEGER PRIMARY KEY, + title TEXT NOT NULL, + timestamp_ms INTEGER NOT NULL, + author_username TEXT NOT NULL, + FOREIGN KEY(author_username) REFERENCES users(username) + ) + `, + dedent`\ + CREATE TABLE posts ( + id INTEGER PRIMARY KEY, + timestamp_ms INTEGER NOT NULL, + author_username TEXT NOT NULL, + topic_id INTEGER NOT NULL, + index_within_topic INTEGER NOT NULL, + reply_to_post_index INTEGER, + cooked TEXT NOT NULL, + FOREIGN KEY(topic_id) REFERENCES topics(id), + FOREIGN KEY(author_username) REFERENCES users(username) + ) + `, + dedent`\ + CREATE TABLE likes ( + username TEXT NOT NULL, + post_id INTEGER NOT NULL, + timestamp_ms INTEGER NOT NULL, + CONSTRAINT username_post PRIMARY KEY (username, post_id), + FOREIGN KEY(post_id) REFERENCES posts(id), + FOREIGN KEY(username) REFERENCES users(username) + )`, + ]; + for (const sql of tables) { + db.prepare(sql).run(); + } + } + + maxIds(): MaxIds { + const res = this._db + .prepare( + dedent`\ + SELECT + (SELECT IFNULL(MAX(id), 0) FROM posts) AS max_post, + (SELECT IFNULL(MAX(id), 0) FROM topics) AS max_topic + ` + ) + .get(); + return { + maxPostId: res.max_post, + maxTopicId: res.max_topic, + }; + } + + topics(): $ReadOnlyArray { + return this._db + .prepare( + dedent`\ + SELECT + id, + timestamp_ms, + author_username, + title + FROM topics` + ) + .all() + .map((x) => ({ + id: x.id, + timestampMs: x.timestamp_ms, + authorUsername: x.author_username, + title: x.title, + })); + } + + posts(): $ReadOnlyArray { + return this._db + .prepare( + dedent`\ + SELECT + id, + timestamp_ms, + author_username, + topic_id, + index_within_topic, + reply_to_post_index, + cooked + FROM posts` + ) + .all() + .map((x) => ({ + id: x.id, + timestampMs: x.timestamp_ms, + authorUsername: x.author_username, + topicId: x.topic_id, + indexWithinTopic: x.index_within_topic, + replyToPostIndex: x.reply_to_post_index, + cooked: x.cooked, + })); + } + + users(): $ReadOnlyArray { + return this._db + .prepare("SELECT username FROM users") + .pluck() + .all(); + } + + likes(): $ReadOnlyArray { + return this._db + .prepare("SELECT post_id, username, timestamp_ms FROM likes") + .all() + .map((x) => ({ + postId: x.post_id, + timestampMs: x.timestamp_ms, + username: x.username, + })); + } + + findPostInTopic(topicId: TopicId, indexWithinTopic: number): ?PostId { + return this._db + .prepare( + dedent`\ + SELECT id + FROM posts + WHERE topic_id = :topic_id AND index_within_topic = :index_within_topic + ` + ) + .pluck() + .get({topic_id: topicId, index_within_topic: indexWithinTopic}); + } + + addLike(like: LikeAction): AddResult { + this.addUser(like.username); + const res = this._db + .prepare( + dedent`\ + INSERT OR IGNORE INTO likes ( + post_id, + timestamp_ms, + username + ) VALUES ( + :post_id, + :timestamp_ms, + :username + ) + ` + ) + .run({ + post_id: like.postId, + timestamp_ms: like.timestampMs, + username: like.username, + }); + return toAddResult(res); + } + + addPost(post: Post): AddResult { + this.addUser(post.authorUsername); + const res = this._db + .prepare( + dedent`\ + REPLACE INTO posts ( + id, + timestamp_ms, + author_username, + topic_id, + index_within_topic, + reply_to_post_index, + cooked + ) VALUES ( + :id, + :timestamp_ms, + :author_username, + :topic_id, + :index_within_topic, + :reply_to_post_index, + :cooked + ) + ` + ) + .run({ + id: post.id, + timestamp_ms: post.timestampMs, + reply_to_post_index: post.replyToPostIndex, + index_within_topic: post.indexWithinTopic, + topic_id: post.topicId, + author_username: post.authorUsername, + cooked: post.cooked, + }); + return toAddResult(res); + } + + addTopic(topic: Topic): AddResult { + this.addUser(topic.authorUsername); + const res = this._db + .prepare( + dedent`\ + REPLACE INTO topics ( + id, + title, + timestamp_ms, + author_username + ) VALUES ( + :id, + :title, + :timestamp_ms, + :author_username + ) + ` + ) + .run({ + id: topic.id, + title: topic.title, + timestamp_ms: topic.timestampMs, + author_username: topic.authorUsername, + }); + return toAddResult(res); + } + + addUser(username: string): AddResult { + const res = this._db + .prepare("INSERT OR IGNORE INTO users (username) VALUES (?)") + .run(username); + return toAddResult(res); + } +} diff --git a/src/plugins/discourse/mirrorRepository.test.js b/src/plugins/discourse/mirrorRepository.test.js new file mode 100644 index 0000000..f96eba5 --- /dev/null +++ b/src/plugins/discourse/mirrorRepository.test.js @@ -0,0 +1,27 @@ +// @flow + +import Database from "better-sqlite3"; +import fs from "fs"; +import tmp from "tmp"; +import {SqliteMirrorRepository} from "./mirrorRepository"; + +describe("plugins/discourse/mirrorRepository", () => { + it("rejects a different server url without changing the database", () => { + // We use an on-disk database file here so that we can dump the + // contents to ensure that the database is physically unchanged. + const filename = tmp.fileSync().name; + const db = new Database(filename); + const url1 = "https://foo.bar"; + const url2 = "https://foo.zod"; + expect(() => new SqliteMirrorRepository(db, url1)).not.toThrow(); + const data = fs.readFileSync(filename).toJSON(); + + expect(() => new SqliteMirrorRepository(db, url2)).toThrow( + "incompatible server or version" + ); + expect(fs.readFileSync(filename).toJSON()).toEqual(data); + + expect(() => new SqliteMirrorRepository(db, url1)).not.toThrow(); + expect(fs.readFileSync(filename).toJSON()).toEqual(data); + }); +});