Discourse: split Mirror from MirrorRepository (#1432)

This commit is contained in:
Robin van Boven 2019-11-15 13:52:01 +01:00 committed by GitHub
parent 28737cd4d2
commit d6fb58bf2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 507 additions and 444 deletions

View File

@ -16,7 +16,7 @@ import {
type Topic,
type LikeAction,
} from "./fetch";
import {type DiscourseData} from "./mirror";
import {type ReadRepository} from "./mirrorRepository";
import {
authorsPostEdgeType,
authorsTopicEdgeType,
@ -143,7 +143,7 @@ export function likesEdge(serverUrl: string, like: LikeAction): Edge {
};
}
export function createGraph(serverUrl: string, data: DiscourseData): Graph {
export function createGraph(serverUrl: string, data: ReadRepository): Graph {
const gc = new _GraphCreator(serverUrl, data);
return gc.graph;
}
@ -151,10 +151,10 @@ export function createGraph(serverUrl: string, data: DiscourseData): Graph {
class _GraphCreator {
graph: Graph;
serverUrl: string;
data: DiscourseData;
data: ReadRepository;
topicIdToTitle: Map<TopicId, string>;
constructor(serverUrl: string, data: DiscourseData) {
constructor(serverUrl: string, data: ReadRepository) {
if (serverUrl.endsWith("/")) {
throw new Error(`by convention, serverUrl should not end with /`);
}

View File

@ -1,7 +1,7 @@
// @flow
import sortBy from "lodash.sortby";
import type {DiscourseData} from "./mirror";
import type {ReadRepository} from "./mirrorRepository";
import type {Topic, Post, PostId, TopicId, LikeAction} from "./fetch";
import {NodeAddress, EdgeAddress, type Node, type Edge} from "../../core/graph";
import {
@ -34,7 +34,7 @@ import {
import type {EdgeType, NodeType} from "../../analysis/types";
describe("plugins/discourse/createGraph", () => {
class MockData implements DiscourseData {
class MockData implements ReadRepository {
_topics: $ReadOnlyArray<Topic>;
_posts: $ReadOnlyArray<Post>;
_likes: $ReadOnlyArray<LikeAction>;
@ -69,6 +69,12 @@ describe("plugins/discourse/createGraph", () => {
)[0];
return post ? post.id : null;
}
maxIds() {
return {
maxPostId: this._posts.reduce((max, p) => Math.max(p.id, max), 0),
maxTopicId: this._topics.reduce((max, t) => Math.max(t.id, max), 0),
};
}
}
function example() {

View File

@ -3,6 +3,7 @@
import Database from "better-sqlite3";
import base64url from "base64url";
import {Fetcher, type DiscourseFetchOptions} from "./fetch";
import {SqliteMirrorRepository, type ReadRepository} from "./mirrorRepository";
import {Mirror} from "./mirror";
import {createGraph} from "./createGraph";
import {TaskReporter} from "../../util/taskReporter";
@ -22,9 +23,13 @@ export async function loadDiscourse(
): Promise<Graph> {
const filename = base64url.encode(options.fetchOptions.serverUrl) + ".db";
const db = new Database(path.join(options.cacheDirectory, filename));
const repo = new SqliteMirrorRepository(db, options.fetchOptions.serverUrl);
const fetcher = new Fetcher(options.fetchOptions);
const mirror = new Mirror(db, fetcher, options.fetchOptions.serverUrl);
const mirror = new Mirror(repo, fetcher, options.fetchOptions.serverUrl);
await mirror.update(reporter);
const graph = createGraph(options.fetchOptions.serverUrl, mirror);
const graph = createGraph(
options.fetchOptions.serverUrl,
(repo: ReadRepository)
);
return graph;
}

View File

@ -1,65 +1,8 @@
// @flow
import type {Database} from "better-sqlite3";
import stringify from "json-stable-stringify";
import dedent from "../../util/dedent";
import type {TaskReporter} from "../../util/taskReporter";
import {
type Discourse,
type TopicId,
type PostId,
type Topic,
type Post,
type LikeAction,
} from "./fetch";
// The version should be bumped any time the database schema is changed,
// so that the cache will be properly invalidated.
const VERSION = "discourse_mirror_v4";
/**
* An interface for retrieving all of the Discourse data at once.
*
* Also has some convenience methods for interpeting the data (e.g. getting
* a post by its index in a topic).
*
* The mirror implements this; it's factored out as an interface for
* ease of testing.
*/
export interface DiscourseData {
/**
* Retrieve every Topic available.
*
* The order is unspecified.
*/
topics(): $ReadOnlyArray<Topic>;
/**
* Retrieve every Post available.
*
* The order is unspecified.
*/
posts(): $ReadOnlyArray<Post>;
/**
* Given a TopicId and a post number, find that numbered post within the topic.
*
* Returns undefined if no such post is available.
*/
findPostInTopic(topicId: TopicId, indexWithinTopic: number): ?PostId;
/**
* Get usernames for all users.
*
* The order is unspecified.
*/
users(): $ReadOnlyArray<string>;
/**
* Gets all of the like actions in the history.
*/
likes(): $ReadOnlyArray<LikeAction>;
}
import {type Discourse} from "./fetch";
import {MirrorRepository} from "./mirrorRepository";
/**
* Mirrors data from the Discourse API into a local sqlite db.
@ -78,8 +21,8 @@ export interface DiscourseData {
* Each Mirror instance is tied to a particular server. Trying to use a mirror
* for multiple Discourse servers is not permitted; use separate Mirrors.
*/
export class Mirror implements DiscourseData {
+_db: Database;
export class Mirror {
+_repo: MirrorRepository;
+_fetcher: Discourse;
+_serverUrl: string;
@ -92,271 +35,49 @@ export class Mirror implements DiscourseData {
* A serverUrl is required so that we can ensure that this Mirror is only storing
* data from a particular Discourse server.
*/
constructor(db: Database, fetcher: Discourse, serverUrl: string) {
if (db == null) throw new Error("db: " + String(db));
this._db = db;
constructor(repo: MirrorRepository, fetcher: Discourse, serverUrl: string) {
this._repo = repo;
this._fetcher = fetcher;
this._serverUrl = serverUrl;
if (db.inTransaction) {
throw new Error("already in transaction");
}
try {
db.prepare("BEGIN").run();
this._initialize();
if (db.inTransaction) {
db.prepare("COMMIT").run();
}
} finally {
if (db.inTransaction) {
db.prepare("ROLLBACK").run();
}
}
}
_initialize() {
const db = this._db;
// We store the config in a singleton table `meta`, whose unique row
// has primary key `0`. Only the first ever insert will succeed; we
// are locked into the first config.
db.prepare(
dedent`\
CREATE TABLE IF NOT EXISTS meta (
zero INTEGER PRIMARY KEY,
config TEXT NOT NULL
)
`
).run();
const config = stringify({
version: VERSION,
serverUrl: this._serverUrl,
});
const existingConfig: string | void = db
.prepare("SELECT config FROM meta")
.pluck()
.get();
if (existingConfig === config) {
// Already set up; nothing to do.
return;
} else if (existingConfig !== undefined) {
throw new Error(
"Database already populated with incompatible server or version"
);
}
db.prepare("INSERT INTO meta (zero, config) VALUES (0, ?)").run(config);
const tables = [
"CREATE TABLE users (username TEXT PRIMARY KEY)",
dedent`\
CREATE TABLE topics (
id INTEGER PRIMARY KEY,
title TEXT NOT NULL,
timestamp_ms INTEGER NOT NULL,
author_username TEXT NOT NULL,
FOREIGN KEY(author_username) REFERENCES users(username)
)
`,
dedent`\
CREATE TABLE posts (
id INTEGER PRIMARY KEY,
timestamp_ms INTEGER NOT NULL,
author_username TEXT NOT NULL,
topic_id INTEGER NOT NULL,
index_within_topic INTEGER NOT NULL,
reply_to_post_index INTEGER,
cooked TEXT NOT NULL,
FOREIGN KEY(topic_id) REFERENCES topics(id),
FOREIGN KEY(author_username) REFERENCES users(username)
)
`,
dedent`\
CREATE TABLE likes (
username TEXT NOT NULL,
post_id INTEGER NOT NULL,
timestamp_ms INTEGER NOT NULL,
CONSTRAINT username_post PRIMARY KEY (username, post_id),
FOREIGN KEY(post_id) REFERENCES posts(id),
FOREIGN KEY(username) REFERENCES users(username)
)`,
];
for (const sql of tables) {
db.prepare(sql).run();
}
}
topics(): $ReadOnlyArray<Topic> {
return this._db
.prepare(
dedent`\
SELECT
id,
timestamp_ms,
author_username,
title
FROM topics`
)
.all()
.map((x) => ({
id: x.id,
timestampMs: x.timestamp_ms,
authorUsername: x.author_username,
title: x.title,
}));
}
posts(): $ReadOnlyArray<Post> {
return this._db
.prepare(
dedent`\
SELECT
id,
timestamp_ms,
author_username,
topic_id,
index_within_topic,
reply_to_post_index,
cooked
FROM posts`
)
.all()
.map((x) => ({
id: x.id,
timestampMs: x.timestamp_ms,
authorUsername: x.author_username,
topicId: x.topic_id,
indexWithinTopic: x.index_within_topic,
replyToPostIndex: x.reply_to_post_index,
cooked: x.cooked,
}));
}
users(): $ReadOnlyArray<string> {
return this._db
.prepare("SELECT username FROM users")
.pluck()
.all();
}
likes(): $ReadOnlyArray<LikeAction> {
return this._db
.prepare("SELECT post_id, username, timestamp_ms FROM likes")
.all()
.map((x) => ({
postId: x.post_id,
timestampMs: x.timestamp_ms,
username: x.username,
}));
}
findPostInTopic(topicId: TopicId, indexWithinTopic: number): ?PostId {
return this._db
.prepare(
dedent`\
SELECT id
FROM posts
WHERE topic_id = :topic_id AND index_within_topic = :index_within_topic
`
)
.pluck()
.get({topic_id: topicId, index_within_topic: indexWithinTopic});
}
async update(reporter: TaskReporter) {
reporter.start("discourse");
const db = this._db;
const {max_post: lastLocalPostId, max_topic: lastLocalTopicId} = db
.prepare(
dedent`\
SELECT
(SELECT IFNULL(MAX(id), 0) FROM posts) AS max_post,
(SELECT IFNULL(MAX(id), 0) FROM topics) AS max_topic
`
)
.get();
// Local functions add the warning and tracking semantics we want from them.
const encounteredPostIds = new Set();
const addPost: (Post) => void = (() => {
const query = db.prepare(
dedent`\
REPLACE INTO posts (
id,
timestamp_ms,
author_username,
topic_id,
index_within_topic,
reply_to_post_index,
cooked
) VALUES (
:id,
:timestamp_ms,
:author_username,
:topic_id,
:index_within_topic,
:reply_to_post_index,
:cooked
)
`
);
const serverUrl = this._serverUrl;
return function addPost(post: Post) {
try {
addUser(post.authorUsername);
encounteredPostIds.add(post.id);
query.run({
id: post.id,
timestamp_ms: post.timestampMs,
reply_to_post_index: post.replyToPostIndex,
index_within_topic: post.indexWithinTopic,
topic_id: post.topicId,
author_username: post.authorUsername,
cooked: post.cooked,
});
} catch (e) {
const url = `${serverUrl}/t/${post.topicId}/${post.indexWithinTopic}`;
console.warn(
`Warning: Encountered error '${e.message}' while adding post ${url}.`
);
}
};
})();
const addPost = (post) => {
try {
encounteredPostIds.add(post.id);
return this._repo.addPost(post);
} catch (e) {
const url = `${this._serverUrl}/t/${post.topicId}/${post.indexWithinTopic}`;
console.warn(
`Warning: Encountered error '${e.message}' while adding post ${url}.`
);
return {changes: 0, lastInsertRowid: -1};
}
};
const addUser: (username: string) => void = (() => {
const query = db.prepare(
"INSERT OR IGNORE INTO users (username) VALUES (?)"
);
return function addUser(username: string) {
query.run(username);
};
})();
const addLike = (like) => {
try {
const res = this._repo.addLike(like);
return {doneWithUser: res.changes === 0};
} catch (e) {
console.warn(
`Warning: Encountered error '${e.message}' ` +
`on a like by ${like.username} ` +
`on post id ${like.postId}.`
);
return {doneWithUser: false};
}
};
const addTopic: (Topic) => void = (() => {
const query = this._db.prepare(
dedent`\
REPLACE INTO topics (
id,
title,
timestamp_ms,
author_username
) VALUES (
:id,
:title,
:timestamp_ms,
:author_username
)
`
);
return function addTopic(topic: Topic) {
addUser(topic.authorUsername);
query.run({
id: topic.id,
title: topic.title,
timestamp_ms: topic.timestampMs,
author_username: topic.authorUsername,
});
};
})();
reporter.start("discourse");
const {
maxPostId: lastLocalPostId,
maxTopicId: lastLocalTopicId,
} = this._repo.maxIds();
reporter.start("discourse/topics");
const latestTopicId = await this._fetcher.latestTopicId();
@ -368,7 +89,7 @@ export class Mirror implements DiscourseData {
const topicWithPosts = await this._fetcher.topicWithPosts(topicId);
if (topicWithPosts != null) {
const {topic, posts} = topicWithPosts;
addTopic(topic);
this._repo.addTopic(topic);
for (const post of posts) {
addPost(post);
}
@ -414,49 +135,8 @@ export class Mirror implements DiscourseData {
// since our last scan. This would likely improve the performance of this
// section of the update significantly.
/**
* Add a like action to the database. The user of the like is
* assumed to already exist in the database; if this is not known to
* be the case, run `addUser(like.username)` first.
*
* Returns a status indicating whether we are done processing this user
* (e.g. we have already seen all of their likes).
*/
const addLike: (like: LikeAction) => {|+doneWithUser: boolean|} = (() => {
const query = db.prepare(
dedent`\
INSERT OR IGNORE INTO likes (
post_id,
timestamp_ms,
username
) VALUES (
:post_id,
:timestamp_ms,
:username
)
`
);
return function addLike(like: LikeAction) {
try {
const runResult = query.run({
post_id: like.postId,
timestamp_ms: like.timestampMs,
username: like.username,
});
return {doneWithUser: runResult.changes === 0};
} catch (e) {
console.warn(
`Warning: Encountered error '${e.message}' ` +
`on a like by ${like.username} ` +
`on post id ${like.postId}.`
);
return {doneWithUser: false};
}
};
})();
reporter.start("discourse/likes");
for (const user of this.users()) {
for (const user of this._repo.users()) {
let offset = 0;
let upToDate = false;
while (!upToDate) {

View File

@ -2,9 +2,8 @@
import sortBy from "lodash.sortby";
import Database from "better-sqlite3";
import fs from "fs";
import tmp from "tmp";
import {Mirror} from "./mirror";
import {SqliteMirrorRepository} from "./mirrorRepository";
import {
type Discourse,
type TopicId,
@ -170,60 +169,41 @@ describe("plugins/discourse/mirror", () => {
const fetcher = new MockFetcher();
const db = new Database(":memory:");
const url = "http://example.com";
const mirror = new Mirror(db, fetcher, url);
const repo = new SqliteMirrorRepository(db, url);
const mirror = new Mirror(repo, fetcher, url);
const reporter = new TestTaskReporter();
return {fetcher, mirror, reporter, url};
return {fetcher, mirror, reporter, url, repo};
};
it("rejects a different server url without changing the database", () => {
// We use an on-disk database file here so that we can dump the
// contents to ensure that the database is physically unchanged.
const filename = tmp.fileSync().name;
const db = new Database(filename);
const fetcher = new MockFetcher();
const url1 = "https://foo.bar";
const url2 = "https://foo.zod";
expect(() => new Mirror(db, fetcher, url1)).not.toThrow();
const data = fs.readFileSync(filename).toJSON();
expect(() => new Mirror(db, fetcher, url2)).toThrow(
"incompatible server or version"
);
expect(fs.readFileSync(filename).toJSON()).toEqual(data);
expect(() => new Mirror(db, fetcher, url1)).not.toThrow();
expect(fs.readFileSync(filename).toJSON()).toEqual(data);
});
it("mirrors topics from the fetcher", async () => {
const {mirror, fetcher, reporter} = example();
const {mirror, fetcher, reporter, repo} = example();
fetcher.addPost(2, null);
fetcher.addPost(3, null);
const topic2 = fetcher._topic(2);
const topic3 = fetcher._topic(3);
await mirror.update(reporter);
expect(mirror.topics()).toEqual([topic2, topic3]);
expect(repo.topics()).toEqual([topic2, topic3]);
});
it("mirrors posts from the fetcher", async () => {
const {mirror, fetcher, reporter} = example();
const {mirror, fetcher, reporter, repo} = example();
const p1 = fetcher.addPost(2, null);
const p2 = fetcher.addPost(3, null);
const p3 = fetcher.addPost(3, 1);
await mirror.update(reporter);
const posts = [fetcher._post(p1), fetcher._post(p2), fetcher._post(p3)];
expect(mirror.posts()).toEqual(posts);
expect(repo.posts()).toEqual(posts);
});
it("provides usernames for all active users", async () => {
const {mirror, fetcher, reporter} = example();
const {mirror, fetcher, reporter, repo} = example();
fetcher.addPost(2, null, "alpha");
fetcher.addPost(3, null, "beta");
fetcher.addPost(3, 1, "alpha");
await mirror.update(reporter);
// credbot appears because it is the nominal author of all topics
expect(
mirror
repo
.users()
.slice()
.sort()
@ -241,7 +221,7 @@ describe("plugins/discourse/mirror", () => {
}
it("provides all the likes by users that have posted", async () => {
const {mirror, fetcher, reporter} = example();
const {mirror, fetcher, reporter, repo} = example();
fetcher.addPost(1, null, "alpha");
fetcher.addPost(2, null, "alpha");
fetcher.addPost(3, null, "beta");
@ -250,26 +230,26 @@ describe("plugins/discourse/mirror", () => {
const l3 = fetcher.addLike("beta", 3, 7);
const l4 = fetcher.addLike("alpha", 1, 8);
await mirror.update(reporter);
expectLikesSorted(mirror.likes(), [l1, l2, l3, l4]);
expectLikesSorted(repo.likes(), [l1, l2, l3, l4]);
const l5 = fetcher.addLike("alpha", 2, 9);
fetcher.addPost(4, null, "credbot");
const l6 = fetcher.addLike("credbot", 2, 10);
const l7 = fetcher.addLike("beta", 4, 11);
await mirror.update(reporter);
expectLikesSorted(mirror.likes(), [l1, l2, l3, l4, l5, l6, l7]);
expectLikesSorted(repo.likes(), [l1, l2, l3, l4, l5, l6, l7]);
});
it("doesn't find likes of users that never posted", async () => {
const {mirror, fetcher, reporter} = example();
const {mirror, fetcher, reporter, repo} = example();
fetcher.addPost(1, null);
fetcher.addLike("nope", 1, 1);
await mirror.update(reporter);
expect(mirror.likes()).toEqual([]);
expect(repo.likes()).toEqual([]);
});
describe("update semantics", () => {
it("only fetches new topics on `update`", async () => {
const {mirror, fetcher, reporter} = example();
const {mirror, fetcher, reporter, repo} = example();
fetcher.addPost(1, null);
fetcher.addPost(2, null);
await mirror.update(reporter);
@ -278,11 +258,11 @@ describe("plugins/discourse/mirror", () => {
await mirror.update(reporter);
expect(fetchTopicWithPosts).toHaveBeenCalledTimes(1);
expect(fetchTopicWithPosts).toHaveBeenCalledWith(3);
expect(mirror.topics().map((x) => x.id)).toEqual([1, 2, 3]);
expect(repo.topics().map((x) => x.id)).toEqual([1, 2, 3]);
});
it("gets new posts on old topics on update", async () => {
const {mirror, fetcher, reporter} = example();
const {mirror, fetcher, reporter, repo} = example();
fetcher.addPost(1, null);
fetcher.addPost(2, null);
await mirror.update(reporter);
@ -292,30 +272,30 @@ describe("plugins/discourse/mirror", () => {
const latestPosts = await fetcher.latestPosts();
// The post added to the old topic wasn't retrieved by latest post
expect(latestPosts.map((x) => x.id)).not.toContain(id);
const allPostIds = mirror.posts().map((x) => x.id);
const allPostIds = repo.posts().map((x) => x.id);
// The post was still included, meaning the mirror scanned for new posts by id
expect(allPostIds).toContain(id);
});
it("skips null/missing topics", async () => {
const {mirror, fetcher, reporter} = example();
const {mirror, fetcher, reporter, repo} = example();
fetcher.addPost(1, null);
fetcher.addPost(3, null);
await mirror.update(reporter);
expect(mirror.topics().map((x) => x.id)).toEqual([1, 3]);
expect(repo.topics().map((x) => x.id)).toEqual([1, 3]);
});
it("skips null/missing posts", async () => {
const {mirror, fetcher, reporter} = example();
const {mirror, fetcher, reporter, repo} = example();
const p1 = fetcher.addPost(1, null);
fetcher._latestPostId += 2;
const p2 = fetcher.addPost(3, null);
await mirror.update(reporter);
expect(mirror.posts().map((x) => x.id)).toEqual([p1, p2]);
expect(repo.posts().map((x) => x.id)).toEqual([p1, p2]);
});
it("queries explicitly for posts that are not present in topicWithPosts.posts", async () => {
const {mirror, fetcher, reporter} = example();
const {mirror, fetcher, reporter, repo} = example();
const p1 = fetcher.addPost(1, null);
const p2 = fetcher.addPost(1, 1);
const p3 = fetcher.addPost(1, 1);
@ -333,7 +313,7 @@ describe("plugins/discourse/mirror", () => {
expect(fetchPost).toHaveBeenCalledTimes(1);
expect(fetchPost).toHaveBeenCalledWith(p2);
expect(mirror.posts().map(getId)).toEqual([p1, p2, p3]);
expect(repo.posts().map(getId)).toEqual([p1, p2, p3]);
});
it("does not explicitly query for posts that were in topicWithPosts.posts", async () => {
@ -345,14 +325,14 @@ describe("plugins/discourse/mirror", () => {
});
it("does not explicitly query for posts that were provided in latest posts", async () => {
const {mirror, fetcher, reporter} = example();
const {mirror, fetcher, reporter, repo} = example();
fetcher.addPost(1, null);
await mirror.update(reporter);
const id = fetcher.addPost(1, 1);
const fetchPost = jest.spyOn(fetcher, "post");
await mirror.update(reporter);
expect(fetchPost).not.toHaveBeenCalled();
expect(mirror.posts().map((x) => x.id)).toContain(id);
expect(repo.posts().map((x) => x.id)).toContain(id);
});
it("does not query for topics at all if there were no new topics", async () => {
@ -405,7 +385,7 @@ describe("plugins/discourse/mirror", () => {
});
it("warns if one of the latest posts has no topic", async () => {
const {mirror, fetcher, reporter} = example();
const {mirror, fetcher, reporter, repo} = example();
const pid1 = fetcher.addPost(1, null, "credbot");
const pid2 = fetcher.addPost(2, null, "credbot");
// Verify that the problem post is one of the latest posts
@ -415,9 +395,9 @@ describe("plugins/discourse/mirror", () => {
fetcher._latestTopicId--;
await mirror.update(reporter);
const topics = [fetcher._topic(1)];
expect(mirror.topics()).toEqual(topics);
expect(repo.topics()).toEqual(topics);
const posts = [fetcher._post(pid1)];
expect(mirror.posts()).toEqual(posts);
expect(repo.posts()).toEqual(posts);
expect(console.warn).toHaveBeenCalledWith(
"Warning: Encountered error 'FOREIGN KEY constraint failed' " +
"while adding post http://example.com/t/2/1."
@ -427,7 +407,7 @@ describe("plugins/discourse/mirror", () => {
});
it("warns if it finds a (non-latest) post with no topic", async () => {
const {mirror, fetcher, reporter} = example();
const {mirror, fetcher, reporter, repo} = example();
const pid1 = fetcher.addPost(1, null, "credbot");
const pid2 = fetcher.addPost(2, null, "credbot");
const pid3 = fetcher.addPost(1, null, "credbot");
@ -438,9 +418,9 @@ describe("plugins/discourse/mirror", () => {
fetcher._latestTopicId--;
await mirror.update(reporter);
const topics = [fetcher._topic(1)];
expect(mirror.topics()).toEqual(topics);
expect(repo.topics()).toEqual(topics);
const posts = [pid1, pid3].map((x) => fetcher._post(x));
expect(mirror.posts()).toEqual(posts);
expect(repo.posts()).toEqual(posts);
expect(console.warn).toHaveBeenCalledWith(
"Warning: Encountered error 'FOREIGN KEY constraint failed' " +
"while adding post http://example.com/t/2/1."
@ -450,14 +430,14 @@ describe("plugins/discourse/mirror", () => {
});
it("warns if it gets a like that doesn't correspond to any post", async () => {
const {mirror, fetcher, reporter} = example();
const {mirror, fetcher, reporter, repo} = example();
const pid = fetcher.addPost(1, null, "credbot");
const badLike = {username: "credbot", postId: 37, timestampMs: 0};
fetcher._likes.push(badLike);
await mirror.update(reporter);
expect(mirror.topics()).toEqual([fetcher._topic(1)]);
expect(mirror.posts()).toEqual([fetcher._post(pid)]);
expect(mirror.likes()).toEqual([]);
expect(repo.topics()).toEqual([fetcher._topic(1)]);
expect(repo.posts()).toEqual([fetcher._post(pid)]);
expect(repo.likes()).toEqual([]);
expect(console.warn).toHaveBeenCalledWith(
"Warning: Encountered error 'FOREIGN KEY constraint failed' " +
"on a like by credbot on post id 37."
@ -467,18 +447,18 @@ describe("plugins/discourse/mirror", () => {
});
});
it("ignores if a user's likes are missing", async () => {
const {mirror, fetcher, reporter} = example();
it("warns if a user's likes are missing", async () => {
const {mirror, fetcher, reporter, repo} = example();
const pid = fetcher.addPost(1, null, "credbot");
(fetcher: any).likesByUser = async () => null;
await mirror.update(reporter);
expect(mirror.topics()).toEqual([fetcher._topic(1)]);
expect(mirror.posts()).toEqual([fetcher._post(pid)]);
expect(mirror.likes()).toEqual([]);
expect(repo.topics()).toEqual([fetcher._topic(1)]);
expect(repo.posts()).toEqual([fetcher._post(pid)]);
expect(repo.likes()).toEqual([]);
});
it("inserts other likes if one user's likes are missing", async () => {
const {mirror, fetcher, reporter} = example();
const {mirror, fetcher, reporter, repo} = example();
const p1 = fetcher.addPost(1, null, "credbot");
const p2 = fetcher.addPost(1, 1, "otheruser");
const l1 = fetcher.addLike("otheruser", 1, 123);
@ -491,9 +471,9 @@ describe("plugins/discourse/mirror", () => {
return await _likesByUser(targetUsername, offset);
};
await mirror.update(reporter);
expect(mirror.topics()).toEqual([fetcher._topic(1)]);
expect(mirror.posts()).toEqual([fetcher._post(p1), fetcher._post(p2)]);
expect(mirror.likes()).toEqual([l1]);
expect(repo.topics()).toEqual([fetcher._topic(1)]);
expect(repo.posts()).toEqual([fetcher._post(p1), fetcher._post(p2)]);
expect(repo.likes()).toEqual([l1]);
});
it("sends the right tasks to the TaskReporter", async () => {
@ -515,42 +495,42 @@ describe("plugins/discourse/mirror", () => {
describe("findPostInTopic", () => {
it("works for the first post in a topic", async () => {
const {mirror, fetcher, reporter} = example();
const {mirror, fetcher, reporter, repo} = example();
const id = fetcher.addPost(5, null);
const post = NullUtil.get(fetcher._post(id));
expect(post.topicId).toEqual(5);
expect(post.indexWithinTopic).toEqual(1);
await mirror.update(reporter);
expect(mirror.findPostInTopic(5, 1)).toEqual(id);
expect(repo.findPostInTopic(5, 1)).toEqual(id);
});
it("works for the second post in a topic", async () => {
const {mirror, fetcher, reporter} = example();
const {mirror, fetcher, reporter, repo} = example();
fetcher.addPost(1, null);
const id = fetcher.addPost(1, 1);
const post = NullUtil.get(fetcher._post(id));
expect(post.indexWithinTopic).toEqual(2);
await mirror.update(reporter);
expect(mirror.findPostInTopic(1, 2)).toEqual(id);
expect(repo.findPostInTopic(1, 2)).toEqual(id);
});
it("returns undefined for a post with too high an index", async () => {
const {mirror, fetcher, reporter} = example();
const {mirror, fetcher, reporter, repo} = example();
fetcher.addPost(1, null);
await mirror.update(reporter);
expect(mirror.findPostInTopic(1, 2)).toBe(undefined);
expect(repo.findPostInTopic(1, 2)).toBe(undefined);
});
it("returns undefined for topic that doesnt exist", async () => {
const {mirror, fetcher, reporter} = example();
const {mirror, fetcher, reporter, repo} = example();
fetcher.addPost(1, null);
await mirror.update(reporter);
expect(mirror.findPostInTopic(2, 1)).toBe(undefined);
expect(repo.findPostInTopic(2, 1)).toBe(undefined);
});
it("returns undefined for a mirror that never updated", async () => {
const {mirror} = example();
expect(mirror.findPostInTopic(1, 1)).toBe(undefined);
const {repo} = example();
expect(repo.findPostInTopic(1, 1)).toBe(undefined);
});
});
});

View File

@ -0,0 +1,365 @@
// @flow
import type {Database} from "better-sqlite3";
import stringify from "json-stable-stringify";
import dedent from "../../util/dedent";
import {
type TopicId,
type PostId,
type Topic,
type Post,
type LikeAction,
} from "./fetch";
// The version should be bumped any time the database schema is changed,
// so that the cache will be properly invalidated.
const VERSION = "discourse_mirror_v4";
/**
* An interface for reading the local Discourse data.
*/
export interface ReadRepository {
/**
* Retrieve every Topic available.
*
* The order is unspecified.
*/
topics(): $ReadOnlyArray<Topic>;
/**
* Retrieve every Post available.
*
* The order is unspecified.
*/
posts(): $ReadOnlyArray<Post>;
/**
* Given a TopicId and a post number, find that numbered post within the topic.
*
* Returns undefined if no such post is available.
*/
findPostInTopic(topicId: TopicId, indexWithinTopic: number): ?PostId;
/**
* Get usernames for all users.
*
* The order is unspecified.
*/
users(): $ReadOnlyArray<string>;
/**
* Gets all of the like actions in the history.
*/
likes(): $ReadOnlyArray<LikeAction>;
}
export type MaxIds = {|
+maxPostId: number,
+maxTopicId: number,
|};
export type AddResult = {|
+changes: number,
+lastInsertRowid: number,
|};
// Read-write interface the mirror uses internally.
export interface MirrorRepository extends ReadRepository {
maxIds(): MaxIds;
addTopic(topic: Topic): AddResult;
addPost(post: Post): AddResult;
addLike(like: LikeAction): AddResult;
}
function toAddResult({
changes,
lastInsertRowid,
}: {
changes: number,
lastInsertRowid: number,
}): AddResult {
return {changes, lastInsertRowid};
}
export class SqliteMirrorRepository
implements ReadRepository, MirrorRepository {
+_db: Database;
constructor(db: Database, serverUrl: string) {
if (db == null) throw new Error("db: " + String(db));
this._db = db;
if (db.inTransaction) {
throw new Error("already in transaction");
}
try {
db.prepare("BEGIN").run();
this._initialize(serverUrl);
if (db.inTransaction) {
db.prepare("COMMIT").run();
}
} finally {
if (db.inTransaction) {
db.prepare("ROLLBACK").run();
}
}
}
_initialize(serverUrl: string) {
const db = this._db;
// We store the config in a singleton table `meta`, whose unique row
// has primary key `0`. Only the first ever insert will succeed; we
// are locked into the first config.
db.prepare(
dedent`\
CREATE TABLE IF NOT EXISTS meta (
zero INTEGER PRIMARY KEY,
config TEXT NOT NULL
)
`
).run();
const config = stringify({
version: VERSION,
serverUrl: serverUrl,
});
const existingConfig: string | void = db
.prepare("SELECT config FROM meta")
.pluck()
.get();
if (existingConfig === config) {
// Already set up; nothing to do.
return;
} else if (existingConfig !== undefined) {
throw new Error(
"Database already populated with incompatible server or version"
);
}
db.prepare("INSERT INTO meta (zero, config) VALUES (0, ?)").run(config);
const tables = [
"CREATE TABLE users (username TEXT PRIMARY KEY)",
dedent`\
CREATE TABLE topics (
id INTEGER PRIMARY KEY,
title TEXT NOT NULL,
timestamp_ms INTEGER NOT NULL,
author_username TEXT NOT NULL,
FOREIGN KEY(author_username) REFERENCES users(username)
)
`,
dedent`\
CREATE TABLE posts (
id INTEGER PRIMARY KEY,
timestamp_ms INTEGER NOT NULL,
author_username TEXT NOT NULL,
topic_id INTEGER NOT NULL,
index_within_topic INTEGER NOT NULL,
reply_to_post_index INTEGER,
cooked TEXT NOT NULL,
FOREIGN KEY(topic_id) REFERENCES topics(id),
FOREIGN KEY(author_username) REFERENCES users(username)
)
`,
dedent`\
CREATE TABLE likes (
username TEXT NOT NULL,
post_id INTEGER NOT NULL,
timestamp_ms INTEGER NOT NULL,
CONSTRAINT username_post PRIMARY KEY (username, post_id),
FOREIGN KEY(post_id) REFERENCES posts(id),
FOREIGN KEY(username) REFERENCES users(username)
)`,
];
for (const sql of tables) {
db.prepare(sql).run();
}
}
maxIds(): MaxIds {
const res = this._db
.prepare(
dedent`\
SELECT
(SELECT IFNULL(MAX(id), 0) FROM posts) AS max_post,
(SELECT IFNULL(MAX(id), 0) FROM topics) AS max_topic
`
)
.get();
return {
maxPostId: res.max_post,
maxTopicId: res.max_topic,
};
}
topics(): $ReadOnlyArray<Topic> {
return this._db
.prepare(
dedent`\
SELECT
id,
timestamp_ms,
author_username,
title
FROM topics`
)
.all()
.map((x) => ({
id: x.id,
timestampMs: x.timestamp_ms,
authorUsername: x.author_username,
title: x.title,
}));
}
posts(): $ReadOnlyArray<Post> {
return this._db
.prepare(
dedent`\
SELECT
id,
timestamp_ms,
author_username,
topic_id,
index_within_topic,
reply_to_post_index,
cooked
FROM posts`
)
.all()
.map((x) => ({
id: x.id,
timestampMs: x.timestamp_ms,
authorUsername: x.author_username,
topicId: x.topic_id,
indexWithinTopic: x.index_within_topic,
replyToPostIndex: x.reply_to_post_index,
cooked: x.cooked,
}));
}
users(): $ReadOnlyArray<string> {
return this._db
.prepare("SELECT username FROM users")
.pluck()
.all();
}
likes(): $ReadOnlyArray<LikeAction> {
return this._db
.prepare("SELECT post_id, username, timestamp_ms FROM likes")
.all()
.map((x) => ({
postId: x.post_id,
timestampMs: x.timestamp_ms,
username: x.username,
}));
}
findPostInTopic(topicId: TopicId, indexWithinTopic: number): ?PostId {
return this._db
.prepare(
dedent`\
SELECT id
FROM posts
WHERE topic_id = :topic_id AND index_within_topic = :index_within_topic
`
)
.pluck()
.get({topic_id: topicId, index_within_topic: indexWithinTopic});
}
addLike(like: LikeAction): AddResult {
this.addUser(like.username);
const res = this._db
.prepare(
dedent`\
INSERT OR IGNORE INTO likes (
post_id,
timestamp_ms,
username
) VALUES (
:post_id,
:timestamp_ms,
:username
)
`
)
.run({
post_id: like.postId,
timestamp_ms: like.timestampMs,
username: like.username,
});
return toAddResult(res);
}
addPost(post: Post): AddResult {
this.addUser(post.authorUsername);
const res = this._db
.prepare(
dedent`\
REPLACE INTO posts (
id,
timestamp_ms,
author_username,
topic_id,
index_within_topic,
reply_to_post_index,
cooked
) VALUES (
:id,
:timestamp_ms,
:author_username,
:topic_id,
:index_within_topic,
:reply_to_post_index,
:cooked
)
`
)
.run({
id: post.id,
timestamp_ms: post.timestampMs,
reply_to_post_index: post.replyToPostIndex,
index_within_topic: post.indexWithinTopic,
topic_id: post.topicId,
author_username: post.authorUsername,
cooked: post.cooked,
});
return toAddResult(res);
}
addTopic(topic: Topic): AddResult {
this.addUser(topic.authorUsername);
const res = this._db
.prepare(
dedent`\
REPLACE INTO topics (
id,
title,
timestamp_ms,
author_username
) VALUES (
:id,
:title,
:timestamp_ms,
:author_username
)
`
)
.run({
id: topic.id,
title: topic.title,
timestamp_ms: topic.timestampMs,
author_username: topic.authorUsername,
});
return toAddResult(res);
}
addUser(username: string): AddResult {
const res = this._db
.prepare("INSERT OR IGNORE INTO users (username) VALUES (?)")
.run(username);
return toAddResult(res);
}
}

View File

@ -0,0 +1,27 @@
// @flow
import Database from "better-sqlite3";
import fs from "fs";
import tmp from "tmp";
import {SqliteMirrorRepository} from "./mirrorRepository";
describe("plugins/discourse/mirrorRepository", () => {
it("rejects a different server url without changing the database", () => {
// We use an on-disk database file here so that we can dump the
// contents to ensure that the database is physically unchanged.
const filename = tmp.fileSync().name;
const db = new Database(filename);
const url1 = "https://foo.bar";
const url2 = "https://foo.zod";
expect(() => new SqliteMirrorRepository(db, url1)).not.toThrow();
const data = fs.readFileSync(filename).toJSON();
expect(() => new SqliteMirrorRepository(db, url2)).toThrow(
"incompatible server or version"
);
expect(fs.readFileSync(filename).toJSON()).toEqual(data);
expect(() => new SqliteMirrorRepository(db, url1)).not.toThrow();
expect(fs.readFileSync(filename).toJSON()).toEqual(data);
});
});