diff --git a/src/plugins/discourse/mirror.js b/src/plugins/discourse/mirror.js new file mode 100644 index 0000000..12ab0f8 --- /dev/null +++ b/src/plugins/discourse/mirror.js @@ -0,0 +1,299 @@ +// @flow + +import type Database from "better-sqlite3"; +import stringify from "json-stable-stringify"; +import dedent from "../../util/dedent"; +import { + type Discourse, + type TopicId, + type PostId, + type Topic, + type Post, +} from "./fetch"; + +// The version should be bumped any time the database schema is changed, +// so that the cache will be properly invalidated. +const VERSION = "discourse_mirror_v1"; + +/** + * Mirrors data from the Discourse API into a local sqlite db. + * + * This class allows us to persist a local copy of data from a Discourse + * instance. We have it for reasons similar to why we have a GraphQL mirror for + * GitHub; it allows us to avoid re-doing expensive IO every time we re-load + * SourceCred. It also gives us robustness in the face of network failures (we + * can keep however much we downloaded until the fault). + * + * As implemented, the Mirror will never update already-downloaded content, + * meaning it will not catch edits or deletions. As such, it's advisable to + * replace the cache periodically (perhaps once a week or month). We may + * implement automatic cache invalidation in the future. + * + * Each Mirror instance is tied to a particular server. Trying to use a mirror + * for multiple Discourse servers is not permitted; use separate Mirrors. + */ +export class Mirror { + +_db: Database; + +_fetcher: Discourse; + + /** + * Construct a new Mirror instance. + * + * Takes a Database, which may be a pre-existing Mirror database. The + * provided DiscourseInterface will be used to retrieve new data from Discourse. + * + * A serverUrl is required so that we can ensure that this Mirror is only storing + * data from a particular Discourse server. + */ + constructor(db: Database, fetcher: Discourse, serverUrl: string) { + if (db == null) throw new Error("db: " + String(db)); + this._db = db; + this._fetcher = fetcher; + if (db.inTransaction) { + throw new Error("already in transaction"); + } + try { + db.prepare("BEGIN").run(); + this._initialize(serverUrl); + if (db.inTransaction) { + db.prepare("COMMIT").run(); + } + } finally { + if (db.inTransaction) { + db.prepare("ROLLBACK").run(); + } + } + } + + _initialize(serverUrl: string) { + const db = this._db; + // We store the config in a singleton table `meta`, whose unique row + // has primary key `0`. Only the first ever insert will succeed; we + // are locked into the first config. + db.prepare( + dedent`\ + CREATE TABLE IF NOT EXISTS meta ( + zero INTEGER PRIMARY KEY, + config TEXT NOT NULL + ) + ` + ).run(); + + const config = stringify({ + version: VERSION, + serverUrl: serverUrl, + }); + + const existingConfig: string | void = db + .prepare("SELECT config FROM meta") + .pluck() + .get(); + if (existingConfig === config) { + // Already set up; nothing to do. + return; + } else if (existingConfig !== undefined) { + throw new Error( + "Database already populated with incompatible server or version" + ); + } + db.prepare("INSERT INTO meta (zero, config) VALUES (0, ?)").run(config); + + const tables = [ + dedent`\ + CREATE TABLE topics ( + id INTEGER PRIMARY KEY, + title TEXT NOT NULL, + timestamp_ms INTEGER NOT NULL, + author_username TEXT NOT NULL + ) + `, + dedent`\ + CREATE TABLE posts ( + id INTEGER PRIMARY KEY, + timestamp_ms INTEGER NOT NULL, + author_username TEXT NOT NULL, + topic_id INTEGER NOT NULL, + index_within_topic INTEGER NOT NULL, + reply_to_post_index INTEGER, + FOREIGN KEY(topic_id) REFERENCES topics(id) + ) + `, + ]; + for (const sql of tables) { + db.prepare(sql).run(); + } + } + + topics(): $ReadOnlyArray { + return this._db + .prepare( + dedent`\ + SELECT + id, + timestamp_ms, + author_username, + title + FROM topics` + ) + .all() + .map((x) => ({ + id: x.id, + timestampMs: x.timestamp_ms, + authorUsername: x.author_username, + title: x.title, + })); + } + + posts(): $ReadOnlyArray { + return this._db + .prepare( + dedent`\ + SELECT + id, + timestamp_ms, + author_username, + topic_id, + index_within_topic, + reply_to_post_index + FROM posts` + ) + .all() + .map((x) => ({ + id: x.id, + timestampMs: x.timestamp_ms, + authorUsername: x.author_username, + topicId: x.topic_id, + indexWithinTopic: x.index_within_topic, + replyToPostIndex: x.reply_to_post_index, + })); + } + + /** + * Given a TopicId and a post number, find that numbered post within the topic. + * + * Returns undefined if no such post exists. + */ + findPostInTopic(topicId: TopicId, indexWithinTopic: number): ?PostId { + return this._db + .prepare( + dedent`\ + SELECT id + FROM posts + WHERE topic_id = :topic_id AND index_within_topic = :index_within_topic + ` + ) + .pluck() + .get({topic_id: topicId, index_within_topic: indexWithinTopic}); + } + + async update() { + const db = this._db; + const latestTopicId = await this._fetcher.latestTopicId(); + const lastLocalPostId = + db + .prepare("SELECT MAX(id) FROM posts") + .pluck() + .get() || 0; + + const lastLocalTopicId = + db + .prepare("SELECT MAX(id) FROM topics") + .pluck() + .get() || 0; + + const encounteredPostIds = new Set(); + + function addPost(post: Post) { + const { + id, + timestampMs, + replyToPostIndex, + indexWithinTopic, + topicId, + authorUsername, + } = post; + db.prepare( + dedent`\ + REPLACE INTO posts ( + id, + timestamp_ms, + author_username, + topic_id, + index_within_topic, + reply_to_post_index + ) VALUES ( + :id, + :timestamp_ms, + :author_username, + :topic_id, + :index_within_topic, + :reply_to_post_index + ) + ` + ).run({ + id, + timestamp_ms: timestampMs, + reply_to_post_index: replyToPostIndex, + index_within_topic: indexWithinTopic, + topic_id: topicId, + author_username: authorUsername, + }); + encounteredPostIds.add(id); + } + + for ( + let topicId = lastLocalTopicId + 1; + topicId <= latestTopicId; + topicId++ + ) { + const topicWithPosts = await this._fetcher.topicWithPosts(topicId); + if (topicWithPosts != null) { + const {topic, posts} = topicWithPosts; + const {id, title, timestampMs, authorUsername} = topic; + this._db + .prepare( + dedent`\ + REPLACE INTO topics ( + id, + title, + timestamp_ms, + author_username + ) VALUES ( + :id, + :title, + :timestamp_ms, + :author_username + )` + ) + .run({ + id, + title, + timestamp_ms: timestampMs, + author_username: authorUsername, + }); + for (const post of posts) { + addPost(post); + } + } + } + + const latestPosts = await this._fetcher.latestPosts(); + for (const post of latestPosts) { + if (!encounteredPostIds.has(post.id) && post.id > lastLocalPostId) { + addPost(post); + } + } + + const latestPost = latestPosts[0]; + const latestPostId = latestPost == null ? 0 : latestPost.id; + for (let postId = lastLocalPostId + 1; postId <= latestPostId; postId++) { + if (encounteredPostIds.has(postId)) { + continue; + } + const post = await this._fetcher.post(postId); + if (post != null) { + addPost(post); + } + } + } +} diff --git a/src/plugins/discourse/mirror.test.js b/src/plugins/discourse/mirror.test.js new file mode 100644 index 0000000..c4ed010 --- /dev/null +++ b/src/plugins/discourse/mirror.test.js @@ -0,0 +1,298 @@ +// @flow + +import Database from "better-sqlite3"; +import fs from "fs"; +import tmp from "tmp"; +import {Mirror} from "./mirror"; +import { + type Discourse, + type TopicId, + type PostId, + type Topic, + type Post, + type TopicWithPosts, +} from "./fetch"; +import * as MapUtil from "../../util/map"; +import * as NullUtil from "../../util/null"; + +type PostInfo = {| + +indexWithinTopic: number, + +replyToPostIndex: number | null, + +topicId: number, +|}; +class MockFetcher implements Discourse { + _latestTopicId: number; + _latestPostId: number; + _topicToPostIds: Map; + _posts: Map; + + constructor() { + this._latestTopicId = 1; + this._latestPostId = 1; + this._topicToPostIds = new Map(); + this._posts = new Map(); + } + async latestTopicId(): Promise { + return this._latestTopicId; + } + + async latestPosts(): Promise { + const latestPost = this._post(this._latestPostId - 1); + if (latestPost == null) { + return []; + } + return [latestPost]; + } + + async topicWithPosts(id: TopicId): Promise { + const postIds = this._topicToPostIds.get(id); + if (postIds == null || postIds.length === 0) { + return null; + } + const firstPost = this._post(postIds[0]); + if (firstPost == null) { + throw new Error("invalid firstPost"); + } + // Only return the first post in the posts array, to ensure that we have to + // test the functionality where we manually grab posts by ID + const posts = [firstPost]; + return {topic: this._topic(id), posts}; + } + + _topic(id: TopicId): Topic { + return { + id, + title: `topic ${id}`, + timestampMs: 1000, + authorUsername: "credbot", + }; + } + + async post(id: PostId): Promise { + return this._post(id); + } + + _post(id: PostId): Post | null { + const postInfo = this._posts.get(id); + if (postInfo == null) { + return null; + } + const {replyToPostIndex, topicId, indexWithinTopic} = postInfo; + return { + id, + timestampMs: 2003, + replyToPostIndex, + topicId, + indexWithinTopic, + authorUsername: "credbot", + }; + } + + addPost(topicId: TopicId, replyToNumber: number | null): PostId { + const postId = this._latestPostId++; + this._latestTopicId = Math.max(topicId, this._latestTopicId); + const postsOnTopic = MapUtil.pushValue( + this._topicToPostIds, + topicId, + postId + ); + if (replyToNumber != null && replyToNumber >= postsOnTopic.length) { + throw new Error("invalid replyToNumber"); + } + const postInfo: PostInfo = { + indexWithinTopic: postsOnTopic.length, + replyToPostIndex: replyToNumber, + topicId: topicId, + }; + this._posts.set(postId, postInfo); + return postId; + } +} + +describe("plugins/discourse/mirror", () => { + const example = () => { + const fetcher = new MockFetcher(); + const db = new Database(":memory:"); + const mirror = new Mirror(db, fetcher, "https://some-url.io"); + return {fetcher, mirror}; + }; + + it("rejects a different server url without changing the database", () => { + // We use an on-disk database file here so that we can dump the + // contents to ensure that the database is physically unchanged. + const filename = tmp.fileSync().name; + const db = new Database(filename); + const fetcher = new MockFetcher(); + const url1 = "https://foo.bar"; + const url2 = "https://foo.zod"; + expect(() => new Mirror(db, fetcher, url1)).not.toThrow(); + const data = fs.readFileSync(filename).toJSON(); + + expect(() => new Mirror(db, fetcher, url2)).toThrow( + "incompatible server or version" + ); + expect(fs.readFileSync(filename).toJSON()).toEqual(data); + + expect(() => new Mirror(db, fetcher, url1)).not.toThrow(); + expect(fs.readFileSync(filename).toJSON()).toEqual(data); + }); + + it("mirrors topics from the fetcher", async () => { + const {mirror, fetcher} = example(); + fetcher.addPost(2, null); + fetcher.addPost(3, null); + const topic2 = fetcher._topic(2); + const topic3 = fetcher._topic(3); + await mirror.update(); + expect(mirror.topics()).toEqual([topic2, topic3]); + }); + + it("mirrors posts from the fetcher", async () => { + const {mirror, fetcher} = example(); + const p1 = fetcher.addPost(2, null); + const p2 = fetcher.addPost(3, null); + const p3 = fetcher.addPost(3, 1); + await mirror.update(); + const posts = [fetcher._post(p1), fetcher._post(p2), fetcher._post(p3)]; + expect(mirror.posts()).toEqual(posts); + }); + + describe("update semantics", () => { + it("only fetches new topics on `update`", async () => { + const {mirror, fetcher} = example(); + fetcher.addPost(1, null); + fetcher.addPost(2, null); + await mirror.update(); + fetcher.addPost(3, null); + const fetchTopicWithPosts = jest.spyOn(fetcher, "topicWithPosts"); + await mirror.update(); + expect(fetchTopicWithPosts).toHaveBeenCalledTimes(1); + expect(fetchTopicWithPosts).toHaveBeenCalledWith(3); + expect(mirror.topics().map((x) => x.id)).toEqual([1, 2, 3]); + }); + + it("gets new posts on old topics on update", async () => { + const {mirror, fetcher} = example(); + fetcher.addPost(1, null); + fetcher.addPost(2, null); + await mirror.update(); + const id = fetcher.addPost(1, 1); + fetcher.addPost(3, null); + await mirror.update(); + const latestPosts = await fetcher.latestPosts(); + // The post added to the old topic wasn't retrieved by latest post + expect(latestPosts.map((x) => x.id)).not.toContain(id); + const allPostIds = mirror.posts().map((x) => x.id); + // The post was still included, meaning the mirror scanned for new posts by id + expect(allPostIds).toContain(id); + }); + + it("skips null/missing topics", async () => { + const {mirror, fetcher} = example(); + fetcher.addPost(1, null); + fetcher.addPost(3, null); + await mirror.update(); + expect(mirror.topics().map((x) => x.id)).toEqual([1, 3]); + }); + + it("skips null/missing posts", async () => { + const {mirror, fetcher} = example(); + const p1 = fetcher.addPost(1, null); + fetcher._latestPostId += 2; + const p2 = fetcher.addPost(3, null); + await mirror.update(); + expect(mirror.posts().map((x) => x.id)).toEqual([p1, p2]); + }); + + it("queries explicitly for posts that are not present in topicWithPosts.posts", async () => { + const {mirror, fetcher} = example(); + const p1 = fetcher.addPost(1, null); + const p2 = fetcher.addPost(1, 1); + const p3 = fetcher.addPost(1, 1); + const fetchPost = jest.spyOn(fetcher, "post"); + await mirror.update(); + const getId = (x) => x.id; + + const postsFromTopic = NullUtil.get(await fetcher.topicWithPosts(1)) + .posts; + expect(postsFromTopic.map(getId)).toEqual([p1]); + + const postsFromLatest = await fetcher.latestPosts(); + expect(postsFromLatest.map(getId)).toEqual([p3]); + + expect(fetchPost).toHaveBeenCalledTimes(1); + expect(fetchPost).toHaveBeenCalledWith(p2); + + expect(mirror.posts().map(getId)).toEqual([p1, p2, p3]); + }); + + it("does not explicitly query for posts that were in topicWithPosts.posts", async () => { + const {mirror, fetcher} = example(); + fetcher.addPost(1, null); + const fetchPost = jest.spyOn(fetcher, "post"); + await mirror.update(); + expect(fetchPost).not.toHaveBeenCalled(); + }); + + it("does not explicitly query for posts that were provided in latest posts", async () => { + const {mirror, fetcher} = example(); + fetcher.addPost(1, null); + await mirror.update(); + const id = fetcher.addPost(1, 1); + const fetchPost = jest.spyOn(fetcher, "post"); + await mirror.update(); + expect(fetchPost).not.toHaveBeenCalled(); + expect(mirror.posts().map((x) => x.id)).toContain(id); + }); + + it("does not query for topics at all if there were no new topics", async () => { + const {mirror, fetcher} = example(); + fetcher.addPost(1, null); + await mirror.update(); + const fetchTopic = jest.spyOn(fetcher, "topicWithPosts"); + await mirror.update(); + expect(fetchTopic).not.toHaveBeenCalled(); + }); + }); + + describe("findPostInTopic", () => { + it("works for the first post in a topic", async () => { + const {mirror, fetcher} = example(); + const id = fetcher.addPost(5, null); + const post = NullUtil.get(fetcher._post(id)); + expect(post.topicId).toEqual(5); + expect(post.indexWithinTopic).toEqual(1); + await mirror.update(); + expect(mirror.findPostInTopic(5, 1)).toEqual(id); + }); + + it("works for the second post in a topic", async () => { + const {mirror, fetcher} = example(); + fetcher.addPost(1, null); + const id = fetcher.addPost(1, 1); + const post = NullUtil.get(fetcher._post(id)); + expect(post.indexWithinTopic).toEqual(2); + await mirror.update(); + expect(mirror.findPostInTopic(1, 2)).toEqual(id); + }); + + it("returns undefined for a post with too high an index", async () => { + const {mirror, fetcher} = example(); + fetcher.addPost(1, null); + await mirror.update(); + expect(mirror.findPostInTopic(1, 2)).toBe(undefined); + }); + + it("returns undefined for topic that doesnt exist", async () => { + const {mirror, fetcher} = example(); + fetcher.addPost(1, null); + await mirror.update(); + expect(mirror.findPostInTopic(2, 1)).toBe(undefined); + }); + + it("returns undefined for a mirror that never updated", async () => { + const {mirror} = example(); + expect(mirror.findPostInTopic(1, 1)).toBe(undefined); + }); + }); +});