brainz-social-old/lib/queue_worker.js

106 lines
5.3 KiB
JavaScript

'use strict';
var db = require('./db');
const http_agent = require('./http_agent');
const jsonld = require('jsonld');
var crypto = require('crypto');
module.exports = {
do_one_queue: async () => {
var result = await db('queue').orderBy('id', 'asc').limit(1);
var data = JSON.parse(result[0]["data"]);
if ( result[0] ) {
await db('queue').del().where('id', '=', result[0]["id"]);
if ( result[0]["type"] === "verify_inbox" ) {
var sig_header = data.sig_header;
var signature_split = sig_header.split(/,/);
var signature_elements = {};
signature_split.forEach((obj) => {
signature_elements[obj.split('"')[1].split(/=/)[0]] = obj.split('"')[1].split(/=/)[1];
});
var keyUrl = new URL(signature_elements["keyId"]);
var secondKeyUrl = new URL(keyUrl);
secondKeyUrl.hash = undefined;
var actor = (await db("object_box").where("origin", secondKeyUrl.toString()).andWhere("expires", ">=", (new Date()).getTime()))[0]["object"];
if ( !actor ) {
actor = http_agent.request(secondKeyUrl, {actor: data["actor"]})["data"];
await db("object_box").insert({
origin: secondKeyUrl.toString(),
to: null,
cc: null,
object: actor,
signedby: "fetch",
// TODO: We should respect the caching instructions provided by origin.
expires: (new Date()).getTime() + 604800000 // 7 days.
});
}
// TODO: Should we standardize on a primary schema at some point?
var actor_parsed = await jsonld.compact(JSON.parse(actor), ["https://www.w3.org/ns/activitystreams", { security: "https://w3id.org/security#"}]);
var publicKey = "";
if ( Array.isArray(actor_parsed["security:publicKey"]) ) {
actor_parsed["security:publicKey"].forEach((value) => {
if ( value["id"] === keyUrl.toString() ) {
publicKey = value["security:publicKeyPem"];
}
});
} else {
publicKey = actor_parsed["security:publicKey"]["security:publicKeyPem"];
}
var valid = crypto.verify("RSA-SHA256", data["body"], publicKey, signature_elements["signature"]);
var parsedBody = await jsonld.compact(JSON.parse(data["body"]), ["https://www.w3.org/ns/activitystreams", { security: "https://w3id.org/security#"}]);
var to = parsedBody.to;
if ( typeof to === "string" ) {
to = [to];
}
var cc = parsedBody.cc;
if ( typeof cc === "string" ) {
cc = [cc];
}
if ( valid ) {
// TODO: We should care that the key hostname and the object hostname match.
await db("object_box").insert({
origin: parsedBody.id,
to: `[${to.join(',')}]`,
cc: `[${cc.join(',')}]`,
object: parsedBody,
signedby: keyUrl.toString(),
expires: (new Date()).getTime() + 604800000 // 7 days.
});
} else {
await db("queue").insert({task: "fetch_object", data: JSON.stringify({object: parsedBody.id, actor: data["actor"]})});
}
} else if ( result[0] === "fetch_object" ) {
await db('queue').del().where('id', '=', result[0]["id"]);
var exists = await db("object_box").where("origin", "=", data["object"]).andWhere("expires", ">=", (new Date()).getTime());
if ( exists.length !== 0 ) {
return;
}
let parsedBody = await jsonld.compact(JSON.parse(http_agent.request(secondKeyUrl, {actor: data["actor"]})["data"]), ["https://www.w3.org/ns/activitystreams", { security: "https://w3id.org/security#"}]);
let to = parsedBody.to;
if ( typeof to === "string" ) {
to = [to];
}
let cc = parsedBody.cc;
if ( typeof cc === "string" ) {
cc = [cc];
}
await db("object_box").insert({
origin: parsedBody.id,
to: `[${to.join(',')}]`,
cc: `[${cc.join(',')}]`,
object: parsedBody,
signedby: "fetch",
// TODO: We should respect the caching instructions provided by origin.
expires: (new Date()).getTime() + 604800000 // 7 days.
});
} else {
// TODO: Surface this one as an error.
await db('queue').del().where('id', '=', result[0]["id"]);
await db('queue').insert({
task: result[0]["task"],
data: result[0]["data"]
});
}
}
}
};