Forward ActivityPub creates that reply to local statuses (#4709)
* Forward ActivityPub creates that reply to local statuses * Fix test * Fix wrong signers
This commit is contained in:
		@@ -17,6 +17,7 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
 | 
			
		||||
 | 
			
		||||
    resolve_thread(status)
 | 
			
		||||
    distribute(status)
 | 
			
		||||
    forward_for_reply if status.public_visibility? || status.unlisted_visibility?
 | 
			
		||||
 | 
			
		||||
    status
 | 
			
		||||
  end
 | 
			
		||||
@@ -162,4 +163,13 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
 | 
			
		||||
    return @skip_download if defined?(@skip_download)
 | 
			
		||||
    @skip_download ||= DomainBlock.find_by(domain: @account.domain)&.reject_media?
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def reply_to_local?
 | 
			
		||||
    !replied_to_status.nil? && replied_to_status.account.local?
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def forward_for_reply
 | 
			
		||||
    return unless @json['signature'].present? && reply_to_local?
 | 
			
		||||
    ActivityPub::RawDistributionWorker.perform_async(Oj.dump(@json), replied_to_status.account_id)
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
 
 | 
			
		||||
@@ -16,6 +16,8 @@ class ActivityPub::Activity::Delete < ActivityPub::Activity
 | 
			
		||||
  private
 | 
			
		||||
 | 
			
		||||
  def forward_for_reblogs(status)
 | 
			
		||||
    return if @json['signature'].blank?
 | 
			
		||||
 | 
			
		||||
    ActivityPub::RawDistributionWorker.push_bulk(status.reblogs.includes(:account).references(:account).merge(Account.local).pluck(:account_id)) do |account_id|
 | 
			
		||||
      [payload, account_id]
 | 
			
		||||
    end
 | 
			
		||||
 
 | 
			
		||||
@@ -40,6 +40,7 @@ class PostStatusService < BaseService
 | 
			
		||||
    DistributionWorker.perform_async(status.id)
 | 
			
		||||
    Pubsubhubbub::DistributionWorker.perform_async(status.stream_entry.id)
 | 
			
		||||
    ActivityPub::DistributionWorker.perform_async(status.id)
 | 
			
		||||
    ActivityPub::ReplyDistributionWorker.perform_async(status.id) if status.reply? && status.thread.account.local?
 | 
			
		||||
 | 
			
		||||
    if options[:idempotency].present?
 | 
			
		||||
      redis.setex("idempotency:status:#{account.id}:#{options[:idempotency]}", 3_600, status.id)
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										42
									
								
								app/workers/activitypub/reply_distribution_worker.rb
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										42
									
								
								app/workers/activitypub/reply_distribution_worker.rb
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,42 @@
 | 
			
		||||
# frozen_string_literal: true
 | 
			
		||||
 | 
			
		||||
class ActivityPub::ReplyDistributionWorker
 | 
			
		||||
  include Sidekiq::Worker
 | 
			
		||||
 | 
			
		||||
  sidekiq_options queue: 'push'
 | 
			
		||||
 | 
			
		||||
  def perform(status_id)
 | 
			
		||||
    @status  = Status.find(status_id)
 | 
			
		||||
    @account = @status.thread.account
 | 
			
		||||
 | 
			
		||||
    return if skip_distribution?
 | 
			
		||||
 | 
			
		||||
    ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
 | 
			
		||||
      [signed_payload, @status.account_id, inbox_url]
 | 
			
		||||
    end
 | 
			
		||||
  rescue ActiveRecord::RecordNotFound
 | 
			
		||||
    true
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  private
 | 
			
		||||
 | 
			
		||||
  def skip_distribution?
 | 
			
		||||
    @status.private_visibility? || @status.direct_visibility?
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def inboxes
 | 
			
		||||
    @inboxes ||= @account.followers.inboxes
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def signed_payload
 | 
			
		||||
    @signed_payload ||= Oj.dump(ActivityPub::LinkedDataSignature.new(payload).sign!(@status.account))
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def payload
 | 
			
		||||
    @payload ||= ActiveModelSerializers::SerializableResource.new(
 | 
			
		||||
      @status,
 | 
			
		||||
      serializer: ActivityPub::ActivitySerializer,
 | 
			
		||||
      adapter: ActivityPub::Adapter
 | 
			
		||||
    ).as_json
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
@@ -11,6 +11,7 @@ RSpec.describe ActivityPub::Activity::Delete do
 | 
			
		||||
      type: 'Delete',
 | 
			
		||||
      actor: ActivityPub::TagManager.instance.uri_for(sender),
 | 
			
		||||
      object: ActivityPub::TagManager.instance.uri_for(status),
 | 
			
		||||
      signature: 'foo',
 | 
			
		||||
    }.with_indifferent_access
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user