Add support for paginating partial collections in SynchronizeFollowersService (#34277)
				
					
				
			This commit is contained in:
		@@ -153,7 +153,7 @@ class Account < ApplicationRecord
 | 
			
		||||
  scope :not_excluded_by_account, ->(account) { where.not(id: account.excluded_from_timeline_account_ids) }
 | 
			
		||||
  scope :not_domain_blocked_by_account, ->(account) { where(arel_table[:domain].eq(nil).or(arel_table[:domain].not_in(account.excluded_from_timeline_domains))) }
 | 
			
		||||
  scope :dormant, -> { joins(:account_stat).merge(AccountStat.without_recent_activity) }
 | 
			
		||||
  scope :with_username, ->(value) { where arel_table[:username].lower.eq(value.to_s.downcase) }
 | 
			
		||||
  scope :with_username, ->(value) { value.is_a?(Array) ? where(arel_table[:username].lower.in(value.map { |x| x.to_s.downcase })) : where(arel_table[:username].lower.eq(value.to_s.downcase)) }
 | 
			
		||||
  scope :with_domain, ->(value) { where arel_table[:domain].lower.eq(value&.to_s&.downcase) }
 | 
			
		||||
  scope :without_memorial, -> { where(memorial: false) }
 | 
			
		||||
  scope :duplicate_uris, -> { select(:uri, Arel.star.count).group(:uri).having(Arel.star.count.gt(1)) }
 | 
			
		||||
 
 | 
			
		||||
@@ -4,32 +4,46 @@ class ActivityPub::SynchronizeFollowersService < BaseService
 | 
			
		||||
  include JsonLdHelper
 | 
			
		||||
  include Payloadable
 | 
			
		||||
 | 
			
		||||
  MAX_COLLECTION_PAGES = 10
 | 
			
		||||
 | 
			
		||||
  def call(account, partial_collection_url)
 | 
			
		||||
    @account = account
 | 
			
		||||
    @expected_followers_ids = []
 | 
			
		||||
 | 
			
		||||
    items = collection_items(partial_collection_url)
 | 
			
		||||
    return if items.nil?
 | 
			
		||||
 | 
			
		||||
    # There could be unresolved accounts (hence the call to .compact) but this
 | 
			
		||||
    # should never happen in practice, since in almost all cases we keep an
 | 
			
		||||
    # Account record, and should we not do that, we should have sent a Delete.
 | 
			
		||||
    # In any case there is not much we can do if that occurs.
 | 
			
		||||
    @expected_followers = items.filter_map { |uri| ActivityPub::TagManager.instance.uri_to_resource(uri, Account) }
 | 
			
		||||
    return unless process_collection!(partial_collection_url)
 | 
			
		||||
 | 
			
		||||
    remove_unexpected_local_followers!
 | 
			
		||||
    handle_unexpected_outgoing_follows!
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  private
 | 
			
		||||
 | 
			
		||||
  def process_page!(items)
 | 
			
		||||
    page_expected_followers = extract_local_followers(items)
 | 
			
		||||
    @expected_followers_ids.concat(page_expected_followers.pluck(:id))
 | 
			
		||||
 | 
			
		||||
    handle_unexpected_outgoing_follows!(page_expected_followers)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def extract_local_followers(items)
 | 
			
		||||
    # There could be unresolved accounts (hence the call to .filter_map) but this
 | 
			
		||||
    # should never happen in practice, since in almost all cases we keep an
 | 
			
		||||
    # Account record, and should we not do that, we should have sent a Delete.
 | 
			
		||||
    # In any case there is not much we can do if that occurs.
 | 
			
		||||
 | 
			
		||||
    # TODO: this will need changes when switching to numeric IDs
 | 
			
		||||
 | 
			
		||||
    usernames = items.filter_map { |uri| ActivityPub::TagManager.instance.uri_to_local_id(uri, :username)&.downcase }
 | 
			
		||||
    Account.local.with_username(usernames)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def remove_unexpected_local_followers!
 | 
			
		||||
    @account.followers.local.where.not(id: @expected_followers.map(&:id)).reorder(nil).find_each do |unexpected_follower|
 | 
			
		||||
    @account.followers.local.where.not(id: @expected_followers_ids).reorder(nil).find_each do |unexpected_follower|
 | 
			
		||||
      UnfollowService.new.call(unexpected_follower, @account)
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def handle_unexpected_outgoing_follows!
 | 
			
		||||
    @expected_followers.each do |expected_follower|
 | 
			
		||||
  def handle_unexpected_outgoing_follows!(expected_followers)
 | 
			
		||||
    expected_followers.each do |expected_follower|
 | 
			
		||||
      next if expected_follower.following?(@account)
 | 
			
		||||
 | 
			
		||||
      if expected_follower.requested?(@account)
 | 
			
		||||
@@ -50,21 +64,33 @@ class ActivityPub::SynchronizeFollowersService < BaseService
 | 
			
		||||
    Oj.dump(serialize_payload(follow, ActivityPub::UndoFollowSerializer))
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def collection_items(collection_or_uri)
 | 
			
		||||
    collection = fetch_collection(collection_or_uri)
 | 
			
		||||
    return unless collection.is_a?(Hash)
 | 
			
		||||
  # Only returns true if the whole collection has been processed
 | 
			
		||||
  def process_collection!(collection_uri, max_pages: MAX_COLLECTION_PAGES)
 | 
			
		||||
    collection = fetch_collection(collection_uri)
 | 
			
		||||
    return false unless collection.is_a?(Hash)
 | 
			
		||||
 | 
			
		||||
    collection = fetch_collection(collection['first']) if collection['first'].present?
 | 
			
		||||
    return unless collection.is_a?(Hash)
 | 
			
		||||
 | 
			
		||||
    # Abort if we'd have to paginate through more than one page of followers
 | 
			
		||||
    return if collection['next'].present?
 | 
			
		||||
    while collection.is_a?(Hash)
 | 
			
		||||
      process_page!(as_array(collection_page_items(collection)))
 | 
			
		||||
 | 
			
		||||
      max_pages -= 1
 | 
			
		||||
 | 
			
		||||
      return true if collection['next'].blank? # We reached the end of the collection
 | 
			
		||||
      return false if max_pages <= 0 # We reached our pages limit
 | 
			
		||||
 | 
			
		||||
      collection = fetch_collection(collection['next'])
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    false
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def collection_page_items(collection)
 | 
			
		||||
    case collection['type']
 | 
			
		||||
    when 'Collection', 'CollectionPage'
 | 
			
		||||
      as_array(collection['items'])
 | 
			
		||||
      collection['items']
 | 
			
		||||
    when 'OrderedCollection', 'OrderedCollectionPage'
 | 
			
		||||
      as_array(collection['orderedItems'])
 | 
			
		||||
      collection['orderedItems']
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -10,7 +10,7 @@ RSpec.describe ActivityPub::SynchronizeFollowersService do
 | 
			
		||||
  let(:bob)            { Fabricate(:account, username: 'bob') }
 | 
			
		||||
  let(:eve)            { Fabricate(:account, username: 'eve') }
 | 
			
		||||
  let(:mallory)        { Fabricate(:account, username: 'mallory') }
 | 
			
		||||
  let(:collection_uri) { 'http://example.com/partial-followers' }
 | 
			
		||||
  let(:collection_uri) { 'https://example.com/partial-followers' }
 | 
			
		||||
 | 
			
		||||
  let(:items) do
 | 
			
		||||
    [alice, eve, mallory].map do |account|
 | 
			
		||||
@@ -97,7 +97,76 @@ RSpec.describe ActivityPub::SynchronizeFollowersService do
 | 
			
		||||
      it_behaves_like 'synchronizes followers'
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    context 'when the endpoint is a paginated Collection of actor URIs with a next page' do
 | 
			
		||||
    context 'when the endpoint is a paginated Collection of actor URIs split across multiple pages' do
 | 
			
		||||
      before do
 | 
			
		||||
        stub_request(:get, 'https://example.com/partial-followers')
 | 
			
		||||
          .to_return(status: 200, headers: { 'Content-Type': 'application/activity+json' }, body: Oj.dump({
 | 
			
		||||
            '@context': 'https://www.w3.org/ns/activitystreams',
 | 
			
		||||
            type: 'Collection',
 | 
			
		||||
            id: 'https://example.com/partial-followers',
 | 
			
		||||
            first: 'https://example.com/partial-followers/1',
 | 
			
		||||
          }))
 | 
			
		||||
 | 
			
		||||
        stub_request(:get, 'https://example.com/partial-followers/1')
 | 
			
		||||
          .to_return(status: 200, headers: { 'Content-Type': 'application/activity+json' }, body: Oj.dump({
 | 
			
		||||
            '@context': 'https://www.w3.org/ns/activitystreams',
 | 
			
		||||
            type: 'CollectionPage',
 | 
			
		||||
            id: 'https://example.com/partial-followers/1',
 | 
			
		||||
            partOf: 'https://example.com/partial-followers',
 | 
			
		||||
            next: 'https://example.com/partial-followers/2',
 | 
			
		||||
            items: [alice, eve].map { |account| ActivityPub::TagManager.instance.uri_for(account) },
 | 
			
		||||
          }))
 | 
			
		||||
 | 
			
		||||
        stub_request(:get, 'https://example.com/partial-followers/2')
 | 
			
		||||
          .to_return(status: 200, headers: { 'Content-Type': 'application/activity+json' }, body: Oj.dump({
 | 
			
		||||
            '@context': 'https://www.w3.org/ns/activitystreams',
 | 
			
		||||
            type: 'CollectionPage',
 | 
			
		||||
            id: 'https://example.com/partial-followers/2',
 | 
			
		||||
            partOf: 'https://example.com/partial-followers',
 | 
			
		||||
            items: ActivityPub::TagManager.instance.uri_for(mallory),
 | 
			
		||||
          }))
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      it_behaves_like 'synchronizes followers'
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    context 'when the endpoint is a paginated Collection of actor URIs split across, but one page errors out' do
 | 
			
		||||
      before do
 | 
			
		||||
        stub_request(:get, 'https://example.com/partial-followers')
 | 
			
		||||
          .to_return(status: 200, headers: { 'Content-Type': 'application/activity+json' }, body: Oj.dump({
 | 
			
		||||
            '@context': 'https://www.w3.org/ns/activitystreams',
 | 
			
		||||
            type: 'Collection',
 | 
			
		||||
            id: 'https://example.com/partial-followers',
 | 
			
		||||
            first: 'https://example.com/partial-followers/1',
 | 
			
		||||
          }))
 | 
			
		||||
 | 
			
		||||
        stub_request(:get, 'https://example.com/partial-followers/1')
 | 
			
		||||
          .to_return(status: 200, headers: { 'Content-Type': 'application/activity+json' }, body: Oj.dump({
 | 
			
		||||
            '@context': 'https://www.w3.org/ns/activitystreams',
 | 
			
		||||
            type: 'CollectionPage',
 | 
			
		||||
            id: 'https://example.com/partial-followers/1',
 | 
			
		||||
            partOf: 'https://example.com/partial-followers',
 | 
			
		||||
            next: 'https://example.com/partial-followers/2',
 | 
			
		||||
            items: [mallory].map { |account| ActivityPub::TagManager.instance.uri_for(account) },
 | 
			
		||||
          }))
 | 
			
		||||
 | 
			
		||||
        stub_request(:get, 'https://example.com/partial-followers/2')
 | 
			
		||||
          .to_return(status: 404)
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      it 'confirms pending follow request but does not remove extra followers' do
 | 
			
		||||
        previous_follower_ids = actor.followers.pluck(:id)
 | 
			
		||||
 | 
			
		||||
        subject.call(actor, collection_uri)
 | 
			
		||||
 | 
			
		||||
        expect(previous_follower_ids - actor.followers.reload.pluck(:id))
 | 
			
		||||
          .to be_empty
 | 
			
		||||
        expect(mallory)
 | 
			
		||||
          .to be_following(actor)
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    context 'when the endpoint is a paginated Collection of actor URIs with more pages than we allow' do
 | 
			
		||||
      let(:payload) do
 | 
			
		||||
        {
 | 
			
		||||
          '@context': 'https://www.w3.org/ns/activitystreams',
 | 
			
		||||
@@ -113,12 +182,19 @@ RSpec.describe ActivityPub::SynchronizeFollowersService do
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      before do
 | 
			
		||||
        stub_const('ActivityPub::SynchronizeFollowersService::MAX_COLLECTION_PAGES', 1)
 | 
			
		||||
        stub_request(:get, collection_uri).to_return(status: 200, body: Oj.dump(payload), headers: { 'Content-Type': 'application/activity+json' })
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      it 'does not change followers' do
 | 
			
		||||
        expect { subject.call(actor, collection_uri) }
 | 
			
		||||
          .to_not(change { actor.followers.reload.reorder(id: :asc).pluck(:id) })
 | 
			
		||||
      it 'confirms pending follow request but does not remove extra followers' do
 | 
			
		||||
        previous_follower_ids = actor.followers.pluck(:id)
 | 
			
		||||
 | 
			
		||||
        subject.call(actor, collection_uri)
 | 
			
		||||
 | 
			
		||||
        expect(previous_follower_ids - actor.followers.reload.pluck(:id))
 | 
			
		||||
          .to be_empty
 | 
			
		||||
        expect(mallory)
 | 
			
		||||
          .to be_following(actor)
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user