Add streaming server side filtering for live/topic feed settings (#36585)
This commit is contained in:
@@ -19,6 +19,7 @@ import * as Redis from './redis.js';
|
||||
import { isTruthy, normalizeHashtag, firstParam } from './utils.js';
|
||||
|
||||
const environment = process.env.NODE_ENV || 'development';
|
||||
const PERMISSION_VIEW_FEEDS = 0x0000000000100000;
|
||||
|
||||
// Correctly detect and load .env or .env.production file based on environment:
|
||||
const dotenvFile = environment === 'production' ? '.env.production' : '.env';
|
||||
@@ -44,6 +45,7 @@ initializeLogLevel(process.env, environment);
|
||||
* @property {string[]} scopes
|
||||
* @property {string} accountId
|
||||
* @property {string[]} chosenLanguages
|
||||
* @property {number} permissions
|
||||
*/
|
||||
|
||||
|
||||
@@ -351,7 +353,7 @@ const startServer = async () => {
|
||||
* @returns {Promise<ResolvedAccount>}
|
||||
*/
|
||||
const accountFromToken = async (token, req) => {
|
||||
const result = await pgPool.query('SELECT oauth_access_tokens.id, oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id INNER JOIN accounts ON accounts.id = users.account_id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL AND users.disabled IS FALSE AND accounts.suspended_at IS NULL LIMIT 1', [token]);
|
||||
const result = await pgPool.query('SELECT oauth_access_tokens.id, oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes, COALESCE(user_roles.permissions, 0) AS permissions FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id INNER JOIN accounts ON accounts.id = users.account_id LEFT OUTER JOIN user_roles ON user_roles.id = users.role_id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL AND users.disabled IS FALSE AND accounts.suspended_at IS NULL LIMIT 1', [token]);
|
||||
|
||||
if (result.rows.length === 0) {
|
||||
throw new AuthenticationError('Invalid access token');
|
||||
@@ -367,6 +369,7 @@ const startServer = async () => {
|
||||
scopes: result.rows[0].scopes.split(' '),
|
||||
accountId: result.rows[0].account_id,
|
||||
chosenLanguages: result.rows[0].chosen_languages,
|
||||
permissions: result.rows[0].permissions,
|
||||
};
|
||||
};
|
||||
|
||||
@@ -583,6 +586,41 @@ const startServer = async () => {
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* @param {string} kind
|
||||
* @param {ResolvedAccount} account
|
||||
* @returns {Promise.<{ localAccess: boolean, remoteAccess: boolean }>}
|
||||
*/
|
||||
const getFeedAccessSettings = async (kind, account) => {
|
||||
const access = { localAccess: true, remoteAccess: true };
|
||||
|
||||
if (account.permissions & PERMISSION_VIEW_FEEDS) {
|
||||
return access;
|
||||
}
|
||||
|
||||
let localAccessVar, remoteAccessVar;
|
||||
|
||||
if (kind === 'hashtag') {
|
||||
localAccessVar = 'local_topic_feed_access';
|
||||
remoteAccessVar = 'remote_topic_feed_access';
|
||||
} else {
|
||||
localAccessVar = 'local_live_feed_access';
|
||||
remoteAccessVar = 'remote_live_feed_access';
|
||||
}
|
||||
|
||||
const result = await pgPool.query('SELECT var, value FROM settings WHERE var IN ($1, $2)', [localAccessVar, remoteAccessVar]);
|
||||
|
||||
result.rows.forEach((row) => {
|
||||
if (row.var === localAccessVar) {
|
||||
access.localAccess = row.value !== "--- disabled\n";
|
||||
} else {
|
||||
access.remoteAccess = row.value !== "--- disabled\n";
|
||||
}
|
||||
});
|
||||
|
||||
return access;
|
||||
};
|
||||
|
||||
/**
|
||||
* @param {string[]} channelIds
|
||||
* @param {http.IncomingMessage & ResolvedAccount} req
|
||||
@@ -590,10 +628,13 @@ const startServer = async () => {
|
||||
* @param {function(string, string): void} output
|
||||
* @param {undefined | function(string[], SubscriptionListener): void} attachCloseHandler
|
||||
* @param {'websocket' | 'eventsource'} destinationType
|
||||
* @param {boolean=} needsFiltering
|
||||
* @param {Object} options
|
||||
* @param {boolean} options.needsFiltering
|
||||
* @param {boolean=} options.filterLocal
|
||||
* @param {boolean=} options.filterRemote
|
||||
* @returns {SubscriptionListener}
|
||||
*/
|
||||
const streamFrom = (channelIds, req, log, output, attachCloseHandler, destinationType, needsFiltering = false) => {
|
||||
const streamFrom = (channelIds, req, log, output, attachCloseHandler, destinationType, { needsFiltering, filterLocal, filterRemote } = { needsFiltering: false, filterLocal: false, filterRemote: false }) => {
|
||||
log.info({ channelIds }, `Starting stream`);
|
||||
|
||||
/**
|
||||
@@ -641,6 +682,12 @@ const startServer = async () => {
|
||||
// The rest of the logic from here on in this function is to handle
|
||||
// filtering of statuses:
|
||||
|
||||
const localPayload = payload.account.username === payload.account.acct;
|
||||
if (localPayload ? filterLocal : filterRemote) {
|
||||
log.debug(`Message ${payload.id} filtered by feed settings`);
|
||||
return;
|
||||
}
|
||||
|
||||
// Filter based on language:
|
||||
if (Array.isArray(req.chosenLanguages) && req.chosenLanguages.indexOf(payload.language) === -1) {
|
||||
log.debug(`Message ${payload.id} filtered by language (${payload.language})`);
|
||||
@@ -946,7 +993,7 @@ const startServer = async () => {
|
||||
const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds));
|
||||
|
||||
// @ts-ignore
|
||||
streamFrom(channelIds, req, req.log, onSend, onEnd, 'eventsource', options.needsFiltering);
|
||||
streamFrom(channelIds, req, req.log, onSend, onEnd, 'eventsource', options);
|
||||
}).catch(err => {
|
||||
const {statusCode, errorMessage } = extractErrorStatusAndMessage(err);
|
||||
|
||||
@@ -982,9 +1029,25 @@ const startServer = async () => {
|
||||
* @param {any} req
|
||||
* @param {string} name
|
||||
* @param {StreamParams} params
|
||||
* @returns {Promise.<{ channelIds: string[], options: { needsFiltering: boolean } }>}
|
||||
* @returns {Promise.<{ channelIds: string[], options: { needsFiltering: boolean, filterLocal?: boolean, filterRemote?: boolean } }>}
|
||||
*/
|
||||
const channelNameToIds = (req, name, params) => new Promise((resolve, reject) => {
|
||||
/**
|
||||
* @param {string} feedKind
|
||||
* @param {string} channelId
|
||||
* @param {{ needsFiltering: boolean }} options
|
||||
*/
|
||||
const resolveFeed = (feedKind, channelId, options) => {
|
||||
getFeedAccessSettings(feedKind, req).then(({ localAccess, remoteAccess }) => {
|
||||
resolve({
|
||||
channelIds: [channelId],
|
||||
options: { ...options, filterLocal: !localAccess, filterRemote: !remoteAccess },
|
||||
});
|
||||
}).catch(() => {
|
||||
reject(new Error('Error getting feed access settings'));
|
||||
});
|
||||
};
|
||||
|
||||
switch (name) {
|
||||
case 'user':
|
||||
resolve({
|
||||
@@ -1001,46 +1064,22 @@ const startServer = async () => {
|
||||
|
||||
break;
|
||||
case 'public':
|
||||
resolve({
|
||||
channelIds: ['timeline:public'],
|
||||
options: { needsFiltering: true },
|
||||
});
|
||||
|
||||
resolveFeed('public', 'timeline:public', { needsFiltering: true });
|
||||
break;
|
||||
case 'public:local':
|
||||
resolve({
|
||||
channelIds: ['timeline:public:local'],
|
||||
options: { needsFiltering: true },
|
||||
});
|
||||
|
||||
resolveFeed('public', 'timeline:public:local', { needsFiltering: true });
|
||||
break;
|
||||
case 'public:remote':
|
||||
resolve({
|
||||
channelIds: ['timeline:public:remote'],
|
||||
options: { needsFiltering: true },
|
||||
});
|
||||
|
||||
resolveFeed('public', 'timeline:public:remote', { needsFiltering: true });
|
||||
break;
|
||||
case 'public:media':
|
||||
resolve({
|
||||
channelIds: ['timeline:public:media'],
|
||||
options: { needsFiltering: true },
|
||||
});
|
||||
|
||||
resolveFeed('public', 'timeline:public:media', { needsFiltering: true });
|
||||
break;
|
||||
case 'public:local:media':
|
||||
resolve({
|
||||
channelIds: ['timeline:public:local:media'],
|
||||
options: { needsFiltering: true },
|
||||
});
|
||||
|
||||
resolveFeed('public', 'timeline:public:local:media', { needsFiltering: true });
|
||||
break;
|
||||
case 'public:remote:media':
|
||||
resolve({
|
||||
channelIds: ['timeline:public:remote:media'],
|
||||
options: { needsFiltering: true },
|
||||
});
|
||||
|
||||
resolveFeed('public', 'timeline:public:remote:media', { needsFiltering: true });
|
||||
break;
|
||||
case 'direct':
|
||||
resolve({
|
||||
@@ -1052,24 +1091,20 @@ const startServer = async () => {
|
||||
case 'hashtag':
|
||||
if (!params.tag) {
|
||||
reject(new RequestError('Missing tag name parameter'));
|
||||
} else {
|
||||
resolve({
|
||||
channelIds: [`timeline:hashtag:${normalizeHashtag(params.tag)}`],
|
||||
options: { needsFiltering: true },
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
resolveFeed('hashtag', `timeline:hashtag:${normalizeHashtag(params.tag)}`, { needsFiltering: true });
|
||||
|
||||
break;
|
||||
case 'hashtag:local':
|
||||
if (!params.tag) {
|
||||
reject(new RequestError('Missing tag name parameter'));
|
||||
} else {
|
||||
resolve({
|
||||
channelIds: [`timeline:hashtag:${normalizeHashtag(params.tag)}:local`],
|
||||
options: { needsFiltering: true },
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
resolveFeed('hashtag', `timeline:hashtag:${normalizeHashtag(params.tag)}:local`, { needsFiltering: true });
|
||||
|
||||
break;
|
||||
case 'list':
|
||||
if (!params.list) {
|
||||
@@ -1132,7 +1167,7 @@ const startServer = async () => {
|
||||
|
||||
const onSend = streamToWs(request, websocket, streamNameFromChannelName(channelName, params));
|
||||
const stopHeartbeat = subscriptionHeartbeat(channelIds);
|
||||
const listener = streamFrom(channelIds, request, logger, onSend, undefined, 'websocket', options.needsFiltering);
|
||||
const listener = streamFrom(channelIds, request, logger, onSend, undefined, 'websocket', options);
|
||||
|
||||
metrics.connectedChannels.labels({ type: 'websocket', channel: channelName }).inc();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user