Add button to load new replies in web UI (#35210)
This commit is contained in:
@@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
class Api::V1::StatusesController < Api::BaseController
|
class Api::V1::StatusesController < Api::BaseController
|
||||||
include Authorization
|
include Authorization
|
||||||
|
include AsyncRefreshesConcern
|
||||||
|
|
||||||
before_action -> { authorize_if_got_token! :read, :'read:statuses' }, except: [:create, :update, :destroy]
|
before_action -> { authorize_if_got_token! :read, :'read:statuses' }, except: [:create, :update, :destroy]
|
||||||
before_action -> { doorkeeper_authorize! :write, :'write:statuses' }, only: [:create, :update, :destroy]
|
before_action -> { doorkeeper_authorize! :write, :'write:statuses' }, only: [:create, :update, :destroy]
|
||||||
@@ -57,9 +58,17 @@ class Api::V1::StatusesController < Api::BaseController
|
|||||||
@context = Context.new(ancestors: loaded_ancestors, descendants: loaded_descendants)
|
@context = Context.new(ancestors: loaded_ancestors, descendants: loaded_descendants)
|
||||||
statuses = [@status] + @context.ancestors + @context.descendants
|
statuses = [@status] + @context.ancestors + @context.descendants
|
||||||
|
|
||||||
render json: @context, serializer: REST::ContextSerializer, relationships: StatusRelationshipsPresenter.new(statuses, current_user&.account_id)
|
refresh_key = "context:#{@status.id}:refresh"
|
||||||
|
async_refresh = AsyncRefresh.new(refresh_key)
|
||||||
|
|
||||||
ActivityPub::FetchAllRepliesWorker.perform_async(@status.id) if !current_account.nil? && @status.should_fetch_replies?
|
if async_refresh.running?
|
||||||
|
add_async_refresh_header(async_refresh)
|
||||||
|
elsif !current_account.nil? && @status.should_fetch_replies?
|
||||||
|
add_async_refresh_header(AsyncRefresh.create(refresh_key))
|
||||||
|
ActivityPub::FetchAllRepliesWorker.perform_async(@status.id)
|
||||||
|
end
|
||||||
|
|
||||||
|
render json: @context, serializer: REST::ContextSerializer, relationships: StatusRelationshipsPresenter.new(statuses, current_user&.account_id)
|
||||||
end
|
end
|
||||||
|
|
||||||
def create
|
def create
|
||||||
|
|||||||
@@ -1,3 +1,5 @@
|
|||||||
|
import { createAction } from '@reduxjs/toolkit';
|
||||||
|
|
||||||
import { apiGetContext } from 'mastodon/api/statuses';
|
import { apiGetContext } from 'mastodon/api/statuses';
|
||||||
import { createDataLoadingThunk } from 'mastodon/store/typed_functions';
|
import { createDataLoadingThunk } from 'mastodon/store/typed_functions';
|
||||||
|
|
||||||
@@ -6,13 +8,18 @@ import { importFetchedStatuses } from './importer';
|
|||||||
export const fetchContext = createDataLoadingThunk(
|
export const fetchContext = createDataLoadingThunk(
|
||||||
'status/context',
|
'status/context',
|
||||||
({ statusId }: { statusId: string }) => apiGetContext(statusId),
|
({ statusId }: { statusId: string }) => apiGetContext(statusId),
|
||||||
(context, { dispatch }) => {
|
({ context, refresh }, { dispatch }) => {
|
||||||
const statuses = context.ancestors.concat(context.descendants);
|
const statuses = context.ancestors.concat(context.descendants);
|
||||||
|
|
||||||
dispatch(importFetchedStatuses(statuses));
|
dispatch(importFetchedStatuses(statuses));
|
||||||
|
|
||||||
return {
|
return {
|
||||||
context,
|
context,
|
||||||
|
refresh,
|
||||||
};
|
};
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
|
export const completeContextRefresh = createAction<{ statusId: string }>(
|
||||||
|
'status/context/complete',
|
||||||
|
);
|
||||||
|
|||||||
@@ -20,6 +20,50 @@ export const getLinks = (response: AxiosResponse) => {
|
|||||||
return LinkHeader.parse(value);
|
return LinkHeader.parse(value);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
export interface AsyncRefreshHeader {
|
||||||
|
id: string;
|
||||||
|
retry: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
const isAsyncRefreshHeader = (obj: object): obj is AsyncRefreshHeader =>
|
||||||
|
'id' in obj && 'retry' in obj;
|
||||||
|
|
||||||
|
export const getAsyncRefreshHeader = (
|
||||||
|
response: AxiosResponse,
|
||||||
|
): AsyncRefreshHeader | null => {
|
||||||
|
const value = response.headers['mastodon-async-refresh'] as
|
||||||
|
| string
|
||||||
|
| undefined;
|
||||||
|
|
||||||
|
if (!value) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
const asyncRefreshHeader: Record<string, unknown> = {};
|
||||||
|
|
||||||
|
value.split(/,\s*/).forEach((pair) => {
|
||||||
|
const [key, val] = pair.split('=', 2);
|
||||||
|
|
||||||
|
let typedValue: string | number;
|
||||||
|
|
||||||
|
if (key && ['id', 'retry'].includes(key) && val) {
|
||||||
|
if (val.startsWith('"')) {
|
||||||
|
typedValue = val.slice(1, -1);
|
||||||
|
} else {
|
||||||
|
typedValue = parseInt(val);
|
||||||
|
}
|
||||||
|
|
||||||
|
asyncRefreshHeader[key] = typedValue;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if (isAsyncRefreshHeader(asyncRefreshHeader)) {
|
||||||
|
return asyncRefreshHeader;
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
};
|
||||||
|
|
||||||
const csrfHeader: RawAxiosRequestHeaders = {};
|
const csrfHeader: RawAxiosRequestHeaders = {};
|
||||||
|
|
||||||
const setCSRFHeader = () => {
|
const setCSRFHeader = () => {
|
||||||
@@ -83,7 +127,7 @@ export default function api(withAuthorization = true) {
|
|||||||
return instance;
|
return instance;
|
||||||
}
|
}
|
||||||
|
|
||||||
type ApiUrl = `v${1 | 2}/${string}`;
|
type ApiUrl = `v${1 | '1_alpha' | 2}/${string}`;
|
||||||
type RequestParamsOrData = Record<string, unknown>;
|
type RequestParamsOrData = Record<string, unknown>;
|
||||||
|
|
||||||
export async function apiRequest<ApiResponse = unknown>(
|
export async function apiRequest<ApiResponse = unknown>(
|
||||||
|
|||||||
5
app/javascript/mastodon/api/async_refreshes.ts
Normal file
5
app/javascript/mastodon/api/async_refreshes.ts
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
import { apiRequestGet } from 'mastodon/api';
|
||||||
|
import type { ApiAsyncRefreshJSON } from 'mastodon/api_types/async_refreshes';
|
||||||
|
|
||||||
|
export const apiGetAsyncRefresh = (id: string) =>
|
||||||
|
apiRequestGet<ApiAsyncRefreshJSON>(`v1_alpha/async_refreshes/${id}`);
|
||||||
@@ -1,5 +1,14 @@
|
|||||||
import { apiRequestGet } from 'mastodon/api';
|
import api, { getAsyncRefreshHeader } from 'mastodon/api';
|
||||||
import type { ApiContextJSON } from 'mastodon/api_types/statuses';
|
import type { ApiContextJSON } from 'mastodon/api_types/statuses';
|
||||||
|
|
||||||
export const apiGetContext = (statusId: string) =>
|
export const apiGetContext = async (statusId: string) => {
|
||||||
apiRequestGet<ApiContextJSON>(`v1/statuses/${statusId}/context`);
|
const response = await api().request<ApiContextJSON>({
|
||||||
|
method: 'GET',
|
||||||
|
url: `/api/v1/statuses/${statusId}/context`,
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
context: response.data,
|
||||||
|
refresh: getAsyncRefreshHeader(response),
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|||||||
7
app/javascript/mastodon/api_types/async_refreshes.ts
Normal file
7
app/javascript/mastodon/api_types/async_refreshes.ts
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
export interface ApiAsyncRefreshJSON {
|
||||||
|
async_refresh: {
|
||||||
|
id: string;
|
||||||
|
status: 'running' | 'finished';
|
||||||
|
result_count: number;
|
||||||
|
};
|
||||||
|
}
|
||||||
@@ -0,0 +1,111 @@
|
|||||||
|
import { useEffect, useState, useCallback } from 'react';
|
||||||
|
|
||||||
|
import { useIntl, defineMessages, FormattedMessage } from 'react-intl';
|
||||||
|
|
||||||
|
import classNames from 'classnames';
|
||||||
|
|
||||||
|
import {
|
||||||
|
fetchContext,
|
||||||
|
completeContextRefresh,
|
||||||
|
} from 'mastodon/actions/statuses';
|
||||||
|
import type { AsyncRefreshHeader } from 'mastodon/api';
|
||||||
|
import { apiGetAsyncRefresh } from 'mastodon/api/async_refreshes';
|
||||||
|
import { LoadingIndicator } from 'mastodon/components/loading_indicator';
|
||||||
|
import { useAppSelector, useAppDispatch } from 'mastodon/store';
|
||||||
|
|
||||||
|
const messages = defineMessages({
|
||||||
|
loading: {
|
||||||
|
id: 'status.context.loading',
|
||||||
|
defaultMessage: 'Checking for more replies',
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
export const RefreshController: React.FC<{
|
||||||
|
statusId: string;
|
||||||
|
withBorder?: boolean;
|
||||||
|
}> = ({ statusId, withBorder }) => {
|
||||||
|
const refresh = useAppSelector(
|
||||||
|
(state) => state.contexts.refreshing[statusId],
|
||||||
|
);
|
||||||
|
const dispatch = useAppDispatch();
|
||||||
|
const intl = useIntl();
|
||||||
|
const [ready, setReady] = useState(false);
|
||||||
|
const [loading, setLoading] = useState(false);
|
||||||
|
|
||||||
|
useEffect(() => {
|
||||||
|
let timeoutId: ReturnType<typeof setTimeout>;
|
||||||
|
|
||||||
|
const scheduleRefresh = (refresh: AsyncRefreshHeader) => {
|
||||||
|
timeoutId = setTimeout(() => {
|
||||||
|
void apiGetAsyncRefresh(refresh.id).then((result) => {
|
||||||
|
if (result.async_refresh.status === 'finished') {
|
||||||
|
dispatch(completeContextRefresh({ statusId }));
|
||||||
|
|
||||||
|
if (result.async_refresh.result_count > 0) {
|
||||||
|
setReady(true);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
scheduleRefresh(refresh);
|
||||||
|
}
|
||||||
|
|
||||||
|
return '';
|
||||||
|
});
|
||||||
|
}, refresh.retry * 1000);
|
||||||
|
};
|
||||||
|
|
||||||
|
if (refresh) {
|
||||||
|
scheduleRefresh(refresh);
|
||||||
|
}
|
||||||
|
|
||||||
|
return () => {
|
||||||
|
clearTimeout(timeoutId);
|
||||||
|
};
|
||||||
|
}, [dispatch, setReady, statusId, refresh]);
|
||||||
|
|
||||||
|
const handleClick = useCallback(() => {
|
||||||
|
setLoading(true);
|
||||||
|
setReady(false);
|
||||||
|
|
||||||
|
dispatch(fetchContext({ statusId }))
|
||||||
|
.then(() => {
|
||||||
|
setLoading(false);
|
||||||
|
return '';
|
||||||
|
})
|
||||||
|
.catch(() => {
|
||||||
|
setLoading(false);
|
||||||
|
});
|
||||||
|
}, [dispatch, setReady, statusId]);
|
||||||
|
|
||||||
|
if (ready && !loading) {
|
||||||
|
return (
|
||||||
|
<button
|
||||||
|
className={classNames('load-more load-gap', {
|
||||||
|
'timeline-hint--with-descendants': withBorder,
|
||||||
|
})}
|
||||||
|
onClick={handleClick}
|
||||||
|
>
|
||||||
|
<FormattedMessage
|
||||||
|
id='status.context.load_new_replies'
|
||||||
|
defaultMessage='New replies available'
|
||||||
|
/>
|
||||||
|
</button>
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!refresh && !loading) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return (
|
||||||
|
<div
|
||||||
|
className={classNames('load-more load-gap', {
|
||||||
|
'timeline-hint--with-descendants': withBorder,
|
||||||
|
})}
|
||||||
|
aria-busy
|
||||||
|
aria-live='polite'
|
||||||
|
aria-label={intl.formatMessage(messages.loading)}
|
||||||
|
>
|
||||||
|
<LoadingIndicator />
|
||||||
|
</div>
|
||||||
|
);
|
||||||
|
};
|
||||||
@@ -68,7 +68,7 @@ import { attachFullscreenListener, detachFullscreenListener, isFullscreen } from
|
|||||||
|
|
||||||
import ActionBar from './components/action_bar';
|
import ActionBar from './components/action_bar';
|
||||||
import { DetailedStatus } from './components/detailed_status';
|
import { DetailedStatus } from './components/detailed_status';
|
||||||
|
import { RefreshController } from './components/refresh_controller';
|
||||||
|
|
||||||
const messages = defineMessages({
|
const messages = defineMessages({
|
||||||
revealAll: { id: 'status.show_more_all', defaultMessage: 'Show more for all' },
|
revealAll: { id: 'status.show_more_all', defaultMessage: 'Show more for all' },
|
||||||
@@ -548,7 +548,7 @@ class Status extends ImmutablePureComponent {
|
|||||||
|
|
||||||
render () {
|
render () {
|
||||||
let ancestors, descendants, remoteHint;
|
let ancestors, descendants, remoteHint;
|
||||||
const { isLoading, status, ancestorsIds, descendantsIds, intl, domain, multiColumn, pictureInPicture } = this.props;
|
const { isLoading, status, ancestorsIds, descendantsIds, refresh, intl, domain, multiColumn, pictureInPicture } = this.props;
|
||||||
const { fullscreen } = this.state;
|
const { fullscreen } = this.state;
|
||||||
|
|
||||||
if (isLoading) {
|
if (isLoading) {
|
||||||
@@ -578,11 +578,9 @@ class Status extends ImmutablePureComponent {
|
|||||||
|
|
||||||
if (!isLocal) {
|
if (!isLocal) {
|
||||||
remoteHint = (
|
remoteHint = (
|
||||||
<TimelineHint
|
<RefreshController
|
||||||
className={classNames(!!descendants && 'timeline-hint--with-descendants')}
|
statusId={status.get('id')}
|
||||||
url={status.get('url')}
|
withBorder={!!descendants}
|
||||||
message={<FormattedMessage id='hints.threads.replies_may_be_missing' defaultMessage='Replies from other servers may be missing.' />}
|
|
||||||
label={<FormattedMessage id='hints.threads.see_more' defaultMessage='See more replies on {domain}' values={{ domain: <strong>{status.getIn(['account', 'acct']).split('@')[1]}</strong> }} />}
|
|
||||||
/>
|
/>
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -424,8 +424,6 @@
|
|||||||
"hints.profiles.see_more_followers": "See more followers on {domain}",
|
"hints.profiles.see_more_followers": "See more followers on {domain}",
|
||||||
"hints.profiles.see_more_follows": "See more follows on {domain}",
|
"hints.profiles.see_more_follows": "See more follows on {domain}",
|
||||||
"hints.profiles.see_more_posts": "See more posts on {domain}",
|
"hints.profiles.see_more_posts": "See more posts on {domain}",
|
||||||
"hints.threads.replies_may_be_missing": "Replies from other servers may be missing.",
|
|
||||||
"hints.threads.see_more": "See more replies on {domain}",
|
|
||||||
"home.column_settings.show_quotes": "Show quotes",
|
"home.column_settings.show_quotes": "Show quotes",
|
||||||
"home.column_settings.show_reblogs": "Show boosts",
|
"home.column_settings.show_reblogs": "Show boosts",
|
||||||
"home.column_settings.show_replies": "Show replies",
|
"home.column_settings.show_replies": "Show replies",
|
||||||
@@ -847,6 +845,8 @@
|
|||||||
"status.bookmark": "Bookmark",
|
"status.bookmark": "Bookmark",
|
||||||
"status.cancel_reblog_private": "Unboost",
|
"status.cancel_reblog_private": "Unboost",
|
||||||
"status.cannot_reblog": "This post cannot be boosted",
|
"status.cannot_reblog": "This post cannot be boosted",
|
||||||
|
"status.context.load_new_replies": "New replies available",
|
||||||
|
"status.context.loading": "Checking for more replies",
|
||||||
"status.continued_thread": "Continued thread",
|
"status.continued_thread": "Continued thread",
|
||||||
"status.copy": "Copy link to post",
|
"status.copy": "Copy link to post",
|
||||||
"status.delete": "Delete",
|
"status.delete": "Delete",
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import type { Draft, UnknownAction } from '@reduxjs/toolkit';
|
|||||||
import type { List as ImmutableList } from 'immutable';
|
import type { List as ImmutableList } from 'immutable';
|
||||||
|
|
||||||
import { timelineDelete } from 'mastodon/actions/timelines_typed';
|
import { timelineDelete } from 'mastodon/actions/timelines_typed';
|
||||||
|
import type { AsyncRefreshHeader } from 'mastodon/api';
|
||||||
import type { ApiRelationshipJSON } from 'mastodon/api_types/relationships';
|
import type { ApiRelationshipJSON } from 'mastodon/api_types/relationships';
|
||||||
import type {
|
import type {
|
||||||
ApiStatusJSON,
|
ApiStatusJSON,
|
||||||
@@ -12,7 +13,7 @@ import type {
|
|||||||
import type { Status } from 'mastodon/models/status';
|
import type { Status } from 'mastodon/models/status';
|
||||||
|
|
||||||
import { blockAccountSuccess, muteAccountSuccess } from '../actions/accounts';
|
import { blockAccountSuccess, muteAccountSuccess } from '../actions/accounts';
|
||||||
import { fetchContext } from '../actions/statuses';
|
import { fetchContext, completeContextRefresh } from '../actions/statuses';
|
||||||
import { TIMELINE_UPDATE } from '../actions/timelines';
|
import { TIMELINE_UPDATE } from '../actions/timelines';
|
||||||
import { compareId } from '../compare_id';
|
import { compareId } from '../compare_id';
|
||||||
|
|
||||||
@@ -25,11 +26,13 @@ interface TimelineUpdateAction extends UnknownAction {
|
|||||||
interface State {
|
interface State {
|
||||||
inReplyTos: Record<string, string>;
|
inReplyTos: Record<string, string>;
|
||||||
replies: Record<string, string[]>;
|
replies: Record<string, string[]>;
|
||||||
|
refreshing: Record<string, AsyncRefreshHeader>;
|
||||||
}
|
}
|
||||||
|
|
||||||
const initialState: State = {
|
const initialState: State = {
|
||||||
inReplyTos: {},
|
inReplyTos: {},
|
||||||
replies: {},
|
replies: {},
|
||||||
|
refreshing: {},
|
||||||
};
|
};
|
||||||
|
|
||||||
const normalizeContext = (
|
const normalizeContext = (
|
||||||
@@ -127,6 +130,13 @@ export const contextsReducer = createReducer(initialState, (builder) => {
|
|||||||
builder
|
builder
|
||||||
.addCase(fetchContext.fulfilled, (state, action) => {
|
.addCase(fetchContext.fulfilled, (state, action) => {
|
||||||
normalizeContext(state, action.meta.arg.statusId, action.payload.context);
|
normalizeContext(state, action.meta.arg.statusId, action.payload.context);
|
||||||
|
|
||||||
|
if (action.payload.refresh) {
|
||||||
|
state.refreshing[action.meta.arg.statusId] = action.payload.refresh;
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.addCase(completeContextRefresh, (state, action) => {
|
||||||
|
delete state.refreshing[action.payload.statusId];
|
||||||
})
|
})
|
||||||
.addCase(blockAccountSuccess, (state, action) => {
|
.addCase(blockAccountSuccess, (state, action) => {
|
||||||
filterContexts(
|
filterContexts(
|
||||||
|
|||||||
@@ -3,9 +3,6 @@
|
|||||||
module Status::FetchRepliesConcern
|
module Status::FetchRepliesConcern
|
||||||
extend ActiveSupport::Concern
|
extend ActiveSupport::Concern
|
||||||
|
|
||||||
# enable/disable fetching all replies
|
|
||||||
FETCH_REPLIES_ENABLED = ENV['FETCH_REPLIES_ENABLED'] == 'true'
|
|
||||||
|
|
||||||
# debounce fetching all replies to minimize DoS
|
# debounce fetching all replies to minimize DoS
|
||||||
FETCH_REPLIES_COOLDOWN_MINUTES = (ENV['FETCH_REPLIES_COOLDOWN_MINUTES'] || 15).to_i.minutes
|
FETCH_REPLIES_COOLDOWN_MINUTES = (ENV['FETCH_REPLIES_COOLDOWN_MINUTES'] || 15).to_i.minutes
|
||||||
FETCH_REPLIES_INITIAL_WAIT_MINUTES = (ENV['FETCH_REPLIES_INITIAL_WAIT_MINUTES'] || 5).to_i.minutes
|
FETCH_REPLIES_INITIAL_WAIT_MINUTES = (ENV['FETCH_REPLIES_INITIAL_WAIT_MINUTES'] || 5).to_i.minutes
|
||||||
@@ -36,7 +33,7 @@ module Status::FetchRepliesConcern
|
|||||||
|
|
||||||
def should_fetch_replies?
|
def should_fetch_replies?
|
||||||
# we aren't brand new, and we haven't fetched replies since the debounce window
|
# we aren't brand new, and we haven't fetched replies since the debounce window
|
||||||
FETCH_REPLIES_ENABLED && !local? && created_at <= FETCH_REPLIES_INITIAL_WAIT_MINUTES.ago && (
|
!local? && created_at <= FETCH_REPLIES_INITIAL_WAIT_MINUTES.ago && (
|
||||||
fetched_replies_at.nil? || fetched_replies_at <= FETCH_REPLIES_COOLDOWN_MINUTES.ago
|
fetched_replies_at.nil? || fetched_replies_at <= FETCH_REPLIES_COOLDOWN_MINUTES.ago
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
|
|||||||
79
app/models/worker_batch.rb
Normal file
79
app/models/worker_batch.rb
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
class WorkerBatch
|
||||||
|
include Redisable
|
||||||
|
|
||||||
|
TTL = 3600
|
||||||
|
|
||||||
|
def initialize(id = nil)
|
||||||
|
@id = id || SecureRandom.hex(12)
|
||||||
|
end
|
||||||
|
|
||||||
|
attr_reader :id
|
||||||
|
|
||||||
|
# Connect the batch with an async refresh. When the number of processed jobs
|
||||||
|
# passes the given threshold, the async refresh will be marked as finished.
|
||||||
|
# @param [String] async_refresh_key
|
||||||
|
# @param [Float] threshold
|
||||||
|
def connect(async_refresh_key, threshold: 1.0)
|
||||||
|
redis.hset(key, { 'async_refresh_key' => async_refresh_key, 'threshold' => threshold })
|
||||||
|
end
|
||||||
|
|
||||||
|
# Add jobs to the batch. Usually when the batch is created.
|
||||||
|
# @param [Array<String>] jids
|
||||||
|
def add_jobs(jids)
|
||||||
|
if jids.blank?
|
||||||
|
async_refresh_key = redis.hget(key, 'async_refresh_key')
|
||||||
|
|
||||||
|
if async_refresh_key.present?
|
||||||
|
async_refresh = AsyncRefresh.new(async_refresh_key)
|
||||||
|
async_refresh.finish!
|
||||||
|
end
|
||||||
|
|
||||||
|
return
|
||||||
|
end
|
||||||
|
|
||||||
|
redis.multi do |pipeline|
|
||||||
|
pipeline.sadd(key('jobs'), jids)
|
||||||
|
pipeline.expire(key('jobs'), TTL)
|
||||||
|
pipeline.hincrby(key, 'pending', jids.size)
|
||||||
|
pipeline.expire(key, TTL)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# Remove a job from the batch, such as when it's been processed or it has failed.
|
||||||
|
# @param [String] jid
|
||||||
|
def remove_job(jid)
|
||||||
|
_, pending, processed, async_refresh_key, threshold = redis.multi do |pipeline|
|
||||||
|
pipeline.srem(key('jobs'), jid)
|
||||||
|
pipeline.hincrby(key, 'pending', -1)
|
||||||
|
pipeline.hincrby(key, 'processed', 1)
|
||||||
|
pipeline.hget(key, 'async_refresh_key')
|
||||||
|
pipeline.hget(key, 'threshold')
|
||||||
|
end
|
||||||
|
|
||||||
|
if async_refresh_key.present?
|
||||||
|
async_refresh = AsyncRefresh.new(async_refresh_key)
|
||||||
|
async_refresh.increment_result_count(by: 1)
|
||||||
|
async_refresh.finish! if pending.zero? || processed >= threshold.to_f * (processed + pending)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# Get pending jobs.
|
||||||
|
# @returns [Array<String>]
|
||||||
|
def jobs
|
||||||
|
redis.smembers(key('jobs'))
|
||||||
|
end
|
||||||
|
|
||||||
|
# Inspect the batch.
|
||||||
|
# @returns [Hash]
|
||||||
|
def info
|
||||||
|
redis.hgetall(key)
|
||||||
|
end
|
||||||
|
|
||||||
|
private
|
||||||
|
|
||||||
|
def key(suffix = nil)
|
||||||
|
"worker_batch:#{@id}#{":#{suffix}" if suffix}"
|
||||||
|
end
|
||||||
|
end
|
||||||
@@ -6,7 +6,7 @@ class ActivityPub::FetchAllRepliesService < ActivityPub::FetchRepliesService
|
|||||||
# Limit of replies to fetch per status
|
# Limit of replies to fetch per status
|
||||||
MAX_REPLIES = (ENV['FETCH_REPLIES_MAX_SINGLE'] || 500).to_i
|
MAX_REPLIES = (ENV['FETCH_REPLIES_MAX_SINGLE'] || 500).to_i
|
||||||
|
|
||||||
def call(status_uri, collection_or_uri, max_pages: 1, request_id: nil)
|
def call(status_uri, collection_or_uri, max_pages: 1, async_refresh_key: nil, request_id: nil)
|
||||||
@status_uri = status_uri
|
@status_uri = status_uri
|
||||||
|
|
||||||
super
|
super
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ class ActivityPub::FetchRepliesService < BaseService
|
|||||||
# Limit of fetched replies
|
# Limit of fetched replies
|
||||||
MAX_REPLIES = 5
|
MAX_REPLIES = 5
|
||||||
|
|
||||||
def call(reference_uri, collection_or_uri, max_pages: 1, allow_synchronous_requests: true, request_id: nil)
|
def call(reference_uri, collection_or_uri, max_pages: 1, allow_synchronous_requests: true, async_refresh_key: nil, request_id: nil)
|
||||||
@reference_uri = reference_uri
|
@reference_uri = reference_uri
|
||||||
@allow_synchronous_requests = allow_synchronous_requests
|
@allow_synchronous_requests = allow_synchronous_requests
|
||||||
|
|
||||||
@@ -14,7 +14,10 @@ class ActivityPub::FetchRepliesService < BaseService
|
|||||||
return if @items.nil?
|
return if @items.nil?
|
||||||
|
|
||||||
@items = filter_replies(@items)
|
@items = filter_replies(@items)
|
||||||
FetchReplyWorker.push_bulk(@items) { |reply_uri| [reply_uri, { 'request_id' => request_id }] }
|
|
||||||
|
batch = WorkerBatch.new
|
||||||
|
batch.connect(async_refresh_key) if async_refresh_key.present?
|
||||||
|
batch.add_jobs(FetchReplyWorker.push_bulk(@items) { |reply_uri| [reply_uri, { 'request_id' => request_id, 'batch_id' => batch.id }] })
|
||||||
|
|
||||||
[@items, n_pages]
|
[@items, n_pages]
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -55,7 +55,7 @@ class ActivityPub::FetchAllRepliesWorker
|
|||||||
replies_collection_or_uri = get_replies_uri(status)
|
replies_collection_or_uri = get_replies_uri(status)
|
||||||
return if replies_collection_or_uri.nil?
|
return if replies_collection_or_uri.nil?
|
||||||
|
|
||||||
ActivityPub::FetchAllRepliesService.new.call(value_or_id(status), replies_collection_or_uri, max_pages: max_pages, **options.deep_symbolize_keys)
|
ActivityPub::FetchAllRepliesService.new.call(value_or_id(status), replies_collection_or_uri, max_pages: max_pages, async_refresh_key: "context:#{@root_status.id}:refresh", **options.deep_symbolize_keys)
|
||||||
end
|
end
|
||||||
|
|
||||||
# Get the URI of the replies collection of a status
|
# Get the URI of the replies collection of a status
|
||||||
|
|||||||
@@ -7,6 +7,9 @@ class FetchReplyWorker
|
|||||||
sidekiq_options queue: 'pull', retry: 3
|
sidekiq_options queue: 'pull', retry: 3
|
||||||
|
|
||||||
def perform(child_url, options = {})
|
def perform(child_url, options = {})
|
||||||
|
batch = WorkerBatch.new(options.delete('batch_id')) if options['batch_id']
|
||||||
FetchRemoteStatusService.new.call(child_url, **options.symbolize_keys)
|
FetchRemoteStatusService.new.call(child_url, **options.symbolize_keys)
|
||||||
|
ensure
|
||||||
|
batch&.remove_job(jid)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
104
spec/models/worker_batch_spec.rb
Normal file
104
spec/models/worker_batch_spec.rb
Normal file
@@ -0,0 +1,104 @@
|
|||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
require 'rails_helper'
|
||||||
|
|
||||||
|
RSpec.describe WorkerBatch do
|
||||||
|
subject { described_class.new }
|
||||||
|
|
||||||
|
let(:async_refresh_key) { 'test_refresh' }
|
||||||
|
let(:async_refresh) { nil }
|
||||||
|
|
||||||
|
describe '#id' do
|
||||||
|
it 'returns a string' do
|
||||||
|
expect(subject.id).to be_a String
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe '#connect' do
|
||||||
|
before do
|
||||||
|
subject.connect(async_refresh_key, threshold: 0.75)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'persists the async refresh key' do
|
||||||
|
expect(subject.info['async_refresh_key']).to eq async_refresh_key
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'persists the threshold' do
|
||||||
|
expect(subject.info['threshold']).to eq '0.75'
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe '#add_jobs' do
|
||||||
|
before do
|
||||||
|
subject.connect(async_refresh_key, threshold: 0.5) if async_refresh.present?
|
||||||
|
subject.add_jobs([])
|
||||||
|
end
|
||||||
|
|
||||||
|
context 'when called with empty array' do
|
||||||
|
it 'does not persist the number of pending jobs' do
|
||||||
|
expect(subject.info).to be_empty
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'does not persist the job IDs' do
|
||||||
|
expect(subject.jobs).to eq []
|
||||||
|
end
|
||||||
|
|
||||||
|
context 'when async refresh is connected' do
|
||||||
|
let(:async_refresh) { AsyncRefresh.new(async_refresh_key) }
|
||||||
|
|
||||||
|
it 'immediately marks the async refresh as finished' do
|
||||||
|
expect(async_refresh.reload.finished?).to be true
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
context 'when called with an array of job IDs' do
|
||||||
|
before do
|
||||||
|
subject.add_jobs(%w(foo bar))
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'persists the number of pending jobs' do
|
||||||
|
expect(subject.info['pending']).to eq '2'
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'persists the job IDs' do
|
||||||
|
expect(subject.jobs).to eq %w(foo bar)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe '#remove_job' do
|
||||||
|
before do
|
||||||
|
subject.connect(async_refresh_key, threshold: 0.5) if async_refresh.present?
|
||||||
|
subject.add_jobs(%w(foo bar baz))
|
||||||
|
subject.remove_job('foo')
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'removes the job from pending jobs' do
|
||||||
|
expect(subject.jobs).to eq %w(bar baz)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'decrements the number of pending jobs' do
|
||||||
|
expect(subject.info['pending']).to eq '2'
|
||||||
|
end
|
||||||
|
|
||||||
|
context 'when async refresh is connected' do
|
||||||
|
let(:async_refresh) { AsyncRefresh.new(async_refresh_key) }
|
||||||
|
|
||||||
|
it 'increments async refresh progress' do
|
||||||
|
expect(async_refresh.reload.result_count).to eq 1
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'marks the async refresh as finished when the threshold is reached' do
|
||||||
|
subject.remove_job('bar')
|
||||||
|
expect(async_refresh.reload.finished?).to be true
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe '#info' do
|
||||||
|
it 'returns a hash' do
|
||||||
|
expect(subject.info).to be_a Hash
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
@@ -123,7 +123,6 @@ RSpec.describe ActivityPub::FetchAllRepliesWorker do
|
|||||||
end
|
end
|
||||||
|
|
||||||
before do
|
before do
|
||||||
stub_const('Status::FetchRepliesConcern::FETCH_REPLIES_ENABLED', true)
|
|
||||||
all_items.each do |item|
|
all_items.each do |item|
|
||||||
next if [top_note_uri, reply_note_uri].include? item
|
next if [top_note_uri, reply_note_uri].include? item
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user