More robust PuSH subscription refreshes (#2799)
* Fix #2473 - Use sidekiq scheduler to refresh PuSH subscriptions instead of cron Fix an issue where / in domain would raise exception in TagManager#normalize_domain PuSH subscriptions refresh done in a round-robin way to avoid hammering a single server's hub in sequence. Correct handling of failures/retries through Sidekiq (see also #2613). Optimize Account#with_followers scope. Also, since subscriptions are now delegated to Sidekiq jobs, an uncaught exception will not stop the entire refreshing operation halfway through Fix #2702 - Correct user agent header on outgoing http requests * Add test for SubscribeService * Extract #expiring_accounts into method * Make mastodon:push:refresh no-op * Queues are now defined in sidekiq.yml * Queues are now in sidekiq.yml
This commit is contained in:
		
							
								
								
									
										3
									
								
								Gemfile
									
									
									
									
									
								
							
							
						
						
									
										3
									
								
								Gemfile
									
									
									
									
									
								
							@@ -35,7 +35,7 @@ gem 'link_header'
 | 
			
		||||
gem 'local_time'
 | 
			
		||||
gem 'nokogiri'
 | 
			
		||||
gem 'oj'
 | 
			
		||||
gem 'ostatus2', '~> 1.1'
 | 
			
		||||
gem 'ostatus2', '~> 2.0'
 | 
			
		||||
gem 'ox'
 | 
			
		||||
gem 'rabl'
 | 
			
		||||
gem 'rack-attack'
 | 
			
		||||
@@ -48,6 +48,7 @@ gem 'rqrcode'
 | 
			
		||||
gem 'ruby-oembed', require: 'oembed'
 | 
			
		||||
gem 'sanitize'
 | 
			
		||||
gem 'sidekiq'
 | 
			
		||||
gem 'sidekiq-scheduler'
 | 
			
		||||
gem 'sidekiq-unique-jobs'
 | 
			
		||||
gem 'simple-navigation'
 | 
			
		||||
gem 'simple_form'
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										14
									
								
								Gemfile.lock
									
									
									
									
									
								
							
							
						
						
									
										14
									
								
								Gemfile.lock
									
									
									
									
									
								
							@@ -143,6 +143,8 @@ GEM
 | 
			
		||||
      thread_safe
 | 
			
		||||
    encryptor (3.0.0)
 | 
			
		||||
    erubis (2.7.0)
 | 
			
		||||
    et-orbi (1.0.3)
 | 
			
		||||
      tzinfo
 | 
			
		||||
    execjs (2.7.0)
 | 
			
		||||
    fabrication (2.16.1)
 | 
			
		||||
    faker (1.7.3)
 | 
			
		||||
@@ -251,7 +253,7 @@ GEM
 | 
			
		||||
    oj (3.0.5)
 | 
			
		||||
    openssl (2.0.3)
 | 
			
		||||
    orm_adapter (0.5.0)
 | 
			
		||||
    ostatus2 (1.1.0)
 | 
			
		||||
    ostatus2 (2.0.0)
 | 
			
		||||
      addressable (~> 2.4)
 | 
			
		||||
      http (~> 2.0)
 | 
			
		||||
      nokogiri (~> 1.6)
 | 
			
		||||
@@ -386,6 +388,8 @@ GEM
 | 
			
		||||
      unicode-display_width (~> 1.0, >= 1.0.1)
 | 
			
		||||
    ruby-oembed (0.12.0)
 | 
			
		||||
    ruby-progressbar (1.8.1)
 | 
			
		||||
    rufus-scheduler (3.4.0)
 | 
			
		||||
      et-orbi (~> 1.0)
 | 
			
		||||
    safe_yaml (1.0.4)
 | 
			
		||||
    sanitize (4.4.0)
 | 
			
		||||
      crass (~> 1.0.2)
 | 
			
		||||
@@ -396,6 +400,11 @@ GEM
 | 
			
		||||
      connection_pool (~> 2.2, >= 2.2.0)
 | 
			
		||||
      rack-protection (>= 1.5.0)
 | 
			
		||||
      redis (~> 3.3, >= 3.3.3)
 | 
			
		||||
    sidekiq-scheduler (2.1.4)
 | 
			
		||||
      redis (~> 3)
 | 
			
		||||
      rufus-scheduler (~> 3.2)
 | 
			
		||||
      sidekiq (>= 3)
 | 
			
		||||
      tilt (>= 1.4.0)
 | 
			
		||||
    sidekiq-unique-jobs (5.0.7)
 | 
			
		||||
      sidekiq (>= 4.0, <= 6.0)
 | 
			
		||||
      thor (~> 0)
 | 
			
		||||
@@ -499,7 +508,7 @@ DEPENDENCIES
 | 
			
		||||
  microformats2
 | 
			
		||||
  nokogiri
 | 
			
		||||
  oj
 | 
			
		||||
  ostatus2 (~> 1.1)
 | 
			
		||||
  ostatus2 (~> 2.0)
 | 
			
		||||
  ox
 | 
			
		||||
  paperclip (~> 5.1)
 | 
			
		||||
  paperclip-av-transcoder
 | 
			
		||||
@@ -527,6 +536,7 @@ DEPENDENCIES
 | 
			
		||||
  ruby-oembed
 | 
			
		||||
  sanitize
 | 
			
		||||
  sidekiq
 | 
			
		||||
  sidekiq-scheduler
 | 
			
		||||
  sidekiq-unique-jobs
 | 
			
		||||
  simple-navigation
 | 
			
		||||
  simple_form
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										2
									
								
								Procfile
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								Procfile
									
									
									
									
									
								
							@@ -1,2 +1,2 @@
 | 
			
		||||
web: bundle exec puma -C config/puma.rb
 | 
			
		||||
worker: bundle exec sidekiq -q default -q push -q pull -q mailers
 | 
			
		||||
worker: bundle exec sidekiq
 | 
			
		||||
 
 | 
			
		||||
@@ -1,3 +1,4 @@
 | 
			
		||||
web: PORT=3000 bundle exec puma -C config/puma.rb
 | 
			
		||||
sidekiq: bundle exec sidekiq
 | 
			
		||||
stream: PORT=4000 yarn run start
 | 
			
		||||
webpack: ./bin/webpack-dev-server --host 0.0.0.0
 | 
			
		||||
 
 | 
			
		||||
@@ -1,13 +1,17 @@
 | 
			
		||||
# frozen_string_literal: true
 | 
			
		||||
 | 
			
		||||
module HttpHelper
 | 
			
		||||
  USER_AGENT = "#{HTTP::Request::USER_AGENT} (Mastodon/#{Mastodon::Version}; +http://#{Rails.configuration.x.local_domain}/)"
 | 
			
		||||
 | 
			
		||||
  def http_client(options = {})
 | 
			
		||||
    timeout = { write: 10, connect: 10, read: 10 }.merge(options)
 | 
			
		||||
 | 
			
		||||
    HTTP.headers(user_agent: USER_AGENT)
 | 
			
		||||
    HTTP.headers(user_agent: user_agent)
 | 
			
		||||
        .timeout(:per_operation, timeout)
 | 
			
		||||
        .follow
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  private
 | 
			
		||||
 | 
			
		||||
  def user_agent
 | 
			
		||||
    @user_agent ||= "#{HTTP::Request::USER_AGENT} (Mastodon/#{Mastodon::Version}; +http://#{Rails.configuration.x.local_domain}/)"
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
 
 | 
			
		||||
@@ -65,8 +65,10 @@ class TagManager
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def normalize_domain(domain)
 | 
			
		||||
    return if domain.nil?
 | 
			
		||||
 | 
			
		||||
    uri = Addressable::URI.new
 | 
			
		||||
    uri.host = domain
 | 
			
		||||
    uri.host = domain.gsub(/[\/]/, '')
 | 
			
		||||
    uri.normalize.host
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -103,9 +103,10 @@ class Account < ApplicationRecord
 | 
			
		||||
 | 
			
		||||
  scope :remote, -> { where.not(domain: nil) }
 | 
			
		||||
  scope :local, -> { where(domain: nil) }
 | 
			
		||||
  scope :without_followers, -> { where('(select count(f.id) from follows as f where f.target_account_id = accounts.id) = 0') }
 | 
			
		||||
  scope :with_followers, -> { where('(select count(f.id) from follows as f where f.target_account_id = accounts.id) > 0') }
 | 
			
		||||
  scope :without_followers, -> { where(followers_count: 0) }
 | 
			
		||||
  scope :with_followers, -> { where('followers_count > 0') }
 | 
			
		||||
  scope :expiring, ->(time) { where(subscription_expires_at: nil).or(where('subscription_expires_at < ?', time)).remote.with_followers }
 | 
			
		||||
  scope :partitioned, -> { order('row_number() over (partition by domain)') }
 | 
			
		||||
  scope :silenced, -> { where(silenced: true) }
 | 
			
		||||
  scope :suspended, -> { where(suspended: true) }
 | 
			
		||||
  scope :recent, -> { reorder(id: :desc) }
 | 
			
		||||
 
 | 
			
		||||
@@ -40,7 +40,7 @@ class FollowService < BaseService
 | 
			
		||||
    if target_account.local?
 | 
			
		||||
      NotifyService.new.call(target_account, follow)
 | 
			
		||||
    else
 | 
			
		||||
      SubscribeService.new.call(target_account) unless target_account.subscribed?
 | 
			
		||||
      Pubsubhubbub::SubscribeWorker.perform_async(target_account.id) unless target_account.subscribed?
 | 
			
		||||
      NotificationWorker.perform_async(build_follow_xml(follow), source_account.id, target_account.id)
 | 
			
		||||
      AfterRemoteFollowWorker.perform_async(follow.id)
 | 
			
		||||
    end
 | 
			
		||||
 
 | 
			
		||||
@@ -77,7 +77,7 @@ class ProcessInteractionService < BaseService
 | 
			
		||||
  def authorize_follow_request!(account, target_account)
 | 
			
		||||
    follow_request = FollowRequest.find_by(account: target_account, target_account: account)
 | 
			
		||||
    follow_request&.authorize!
 | 
			
		||||
    SubscribeService.new.call(account) unless account.subscribed?
 | 
			
		||||
    Pubsubhubbub::SubscribeWorker.perform_async(account.id) unless account.subscribed?
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def reject_follow_request!(account, target_account)
 | 
			
		||||
 
 | 
			
		||||
@@ -5,15 +5,31 @@ class SubscribeService < BaseService
 | 
			
		||||
    account.secret = SecureRandom.hex
 | 
			
		||||
 | 
			
		||||
    subscription = account.subscription(api_subscription_url(account.id))
 | 
			
		||||
    response = subscription.subscribe
 | 
			
		||||
    response     = subscription.subscribe
 | 
			
		||||
 | 
			
		||||
    unless response.successful?
 | 
			
		||||
    if response_failed_permanently?(response)
 | 
			
		||||
      # An error in the 4xx range (except for 429, which is rate limiting)
 | 
			
		||||
      # means we're not allowed to subscribe. Fail and move on
 | 
			
		||||
      account.secret = ''
 | 
			
		||||
      Rails.logger.debug "PuSH subscription request for #{account.acct} failed: #{response.message}"
 | 
			
		||||
      account.save!
 | 
			
		||||
    elsif response_successful?(response)
 | 
			
		||||
      # Anything in the 2xx range means the subscription will be confirmed
 | 
			
		||||
      # asynchronously, we've done what we needed to do
 | 
			
		||||
      account.save!
 | 
			
		||||
    else
 | 
			
		||||
      # What's left is the 5xx range and 429, which means we need to retry
 | 
			
		||||
      # at a later time. Fail loudly!
 | 
			
		||||
      raise "Subscription attempt failed for #{account.acct} (#{account.hub_url}): HTTP #{response.code}"
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
    account.save!
 | 
			
		||||
  rescue HTTP::Error, OpenSSL::SSL::SSLError
 | 
			
		||||
    Rails.logger.debug "PuSH subscription request for #{account.acct} could not be made due to HTTP or SSL error"
 | 
			
		||||
  private
 | 
			
		||||
 | 
			
		||||
  def response_failed_permanently?(response)
 | 
			
		||||
    response.code > 299 && response.code < 500 && response.code != 429
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def response_successful?(response)
 | 
			
		||||
    response.code > 199 && response.code < 300
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
 
 | 
			
		||||
@@ -28,7 +28,7 @@ class UpdateRemoteProfileService < BaseService
 | 
			
		||||
 | 
			
		||||
    account.save_with_optional_avatar!
 | 
			
		||||
 | 
			
		||||
    SubscribeService.new.call(account) if resubscribe && (account.hub_url != old_hub_url)
 | 
			
		||||
    Pubsubhubbub::SubscribeWorker.perform_async(account.id) if resubscribe && (account.hub_url != old_hub_url)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  private
 | 
			
		||||
 
 | 
			
		||||
@@ -25,8 +25,8 @@ class Pubsubhubbub::DeliveryWorker
 | 
			
		||||
                   .headers(headers)
 | 
			
		||||
                   .post(subscription.callback_url, body: payload)
 | 
			
		||||
 | 
			
		||||
    return subscription.destroy! if response.code > 299 && response.code < 500 && response.code != 429 # HTTP 4xx means error is not temporary, except for 429 (throttling)
 | 
			
		||||
    raise "Delivery failed for #{subscription.callback_url}: HTTP #{response.code}" unless response.code > 199 && response.code < 300
 | 
			
		||||
    return subscription.destroy! if response_failed_permanently?(response) # HTTP 4xx means error is not temporary, except for 429 (throttling)
 | 
			
		||||
    raise "Delivery failed for #{subscription.callback_url}: HTTP #{response.code}" unless response_successful?(response)
 | 
			
		||||
 | 
			
		||||
    subscription.touch(:last_successful_delivery_at)
 | 
			
		||||
  end
 | 
			
		||||
@@ -37,4 +37,12 @@ class Pubsubhubbub::DeliveryWorker
 | 
			
		||||
    hmac = OpenSSL::HMAC.hexdigest(OpenSSL::Digest.new('sha1'), secret, payload)
 | 
			
		||||
    "sha1=#{hmac}"
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def response_failed_permanently?(response)
 | 
			
		||||
    response.code > 299 && response.code < 500 && response.code != 429
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def response_successful?(response)
 | 
			
		||||
    response.code > 199 && response.code < 300
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										13
									
								
								app/workers/pubsubhubbub/subscribe_worker.rb
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										13
									
								
								app/workers/pubsubhubbub/subscribe_worker.rb
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,13 @@
 | 
			
		||||
# frozen_string_literal: true
 | 
			
		||||
 | 
			
		||||
class Pubsubhubbub::SubscribeWorker
 | 
			
		||||
  include Sidekiq::Worker
 | 
			
		||||
 | 
			
		||||
  sidekiq_options queue: 'push'
 | 
			
		||||
 | 
			
		||||
  def perform(account_id)
 | 
			
		||||
    account = Account.find(account_id)
 | 
			
		||||
    Rails.logger.debug "PuSH re-subscribing to #{account.acct}"
 | 
			
		||||
    ::SubscribeService.new.call(account)
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
							
								
								
									
										20
									
								
								app/workers/scheduler/subscriptions_scheduler.rb
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										20
									
								
								app/workers/scheduler/subscriptions_scheduler.rb
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,20 @@
 | 
			
		||||
# frozen_string_literal: true
 | 
			
		||||
require 'sidekiq-scheduler'
 | 
			
		||||
 | 
			
		||||
class Scheduler::SubscriptionsScheduler
 | 
			
		||||
  include Sidekiq::Worker
 | 
			
		||||
 | 
			
		||||
  def perform
 | 
			
		||||
    Rails.logger.debug 'Queueing PuSH re-subscriptions'
 | 
			
		||||
 | 
			
		||||
    expiring_accounts.pluck(:id) do |id|
 | 
			
		||||
      Pubsubhubbub::SubscribeWorker.perform_async(id)
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  private
 | 
			
		||||
 | 
			
		||||
  def expiring_accounts
 | 
			
		||||
    Account.expiring(1.day.from_now).partitioned
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
@@ -69,7 +69,4 @@ Rails.application.configure do
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
 | 
			
		||||
require 'sidekiq/testing'
 | 
			
		||||
Sidekiq::Testing.inline!
 | 
			
		||||
 | 
			
		||||
ActiveRecordQueryTrace.enabled = ENV.fetch('QUERY_TRACE_ENABLED') { false }
 | 
			
		||||
 
 | 
			
		||||
@@ -1,2 +1,11 @@
 | 
			
		||||
---
 | 
			
		||||
:concurrency: 5
 | 
			
		||||
:queues:
 | 
			
		||||
  - default
 | 
			
		||||
  - push
 | 
			
		||||
  - pull
 | 
			
		||||
  - mailers
 | 
			
		||||
:schedule:
 | 
			
		||||
  subscriptions_scheduler:
 | 
			
		||||
    cron: '0 5 * * *'
 | 
			
		||||
    class: Scheduler::SubscriptionsScheduler
 | 
			
		||||
 
 | 
			
		||||
@@ -77,10 +77,8 @@ namespace :mastodon do
 | 
			
		||||
 | 
			
		||||
    desc 'Re-subscribes to soon expiring PuSH subscriptions'
 | 
			
		||||
    task refresh: :environment do
 | 
			
		||||
      Account.expiring(1.day.from_now).find_each do |a|
 | 
			
		||||
        Rails.logger.debug "PuSH re-subscribing to #{a.acct}"
 | 
			
		||||
        SubscribeService.new.call(a)
 | 
			
		||||
      end
 | 
			
		||||
      # No-op
 | 
			
		||||
      # This task is now executed via sidekiq-scheduler
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -53,10 +53,11 @@ RSpec.describe FollowService do
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    describe 'unlocked account' do
 | 
			
		||||
      let(:bob) { Fabricate(:user, email: 'bob@example.com', account: Fabricate(:account, username: 'bob', domain: 'example.com', salmon_url: 'http://salmon.example.com')).account }
 | 
			
		||||
      let(:bob) { Fabricate(:user, email: 'bob@example.com', account: Fabricate(:account, username: 'bob', domain: 'example.com', salmon_url: 'http://salmon.example.com', hub_url: 'http://hub.example.com')).account }
 | 
			
		||||
 | 
			
		||||
      before do
 | 
			
		||||
        stub_request(:post, "http://salmon.example.com/").to_return(:status => 200, :body => "", :headers => {})
 | 
			
		||||
        stub_request(:post, "http://hub.example.com/").to_return(status: 202)
 | 
			
		||||
        subject.call(sender, bob.acct)
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
@@ -70,6 +71,10 @@ RSpec.describe FollowService do
 | 
			
		||||
          xml.match(TagManager::VERBS[:follow])
 | 
			
		||||
        }).to have_been_made.once
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      it 'subscribes to PuSH' do
 | 
			
		||||
        expect(a_request(:post, "http://hub.example.com/")).to have_been_made.once
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										38
									
								
								spec/services/subscribe_service_spec.rb
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										38
									
								
								spec/services/subscribe_service_spec.rb
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,38 @@
 | 
			
		||||
require 'rails_helper'
 | 
			
		||||
 | 
			
		||||
RSpec.describe SubscribeService do
 | 
			
		||||
  let(:account) { Fabricate(:account, username: 'bob', domain: 'example.com', hub_url: 'http://hub.example.com') }
 | 
			
		||||
  subject { SubscribeService.new }
 | 
			
		||||
 | 
			
		||||
  it 'sends subscription request to PuSH hub' do
 | 
			
		||||
    stub_request(:post, 'http://hub.example.com/').to_return(status: 202)
 | 
			
		||||
    subject.call(account)
 | 
			
		||||
    expect(a_request(:post, 'http://hub.example.com/')).to have_been_made.once
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  it 'generates and keeps PuSH secret on successful call' do
 | 
			
		||||
    stub_request(:post, 'http://hub.example.com/').to_return(status: 202)
 | 
			
		||||
    subject.call(account)
 | 
			
		||||
    expect(account.secret).to_not be_blank
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  it 'fails silently if PuSH hub forbids subscription' do
 | 
			
		||||
    stub_request(:post, 'http://hub.example.com/').to_return(status: 403)
 | 
			
		||||
    subject.call(account)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  it 'fails silently if PuSH hub is not found' do
 | 
			
		||||
    stub_request(:post, 'http://hub.example.com/').to_return(status: 404)
 | 
			
		||||
    subject.call(account)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  it 'fails loudly if there is a network error' do
 | 
			
		||||
    stub_request(:post, 'http://hub.example.com/').to_raise(HTTP::Error)
 | 
			
		||||
    expect { subject.call(account) }.to raise_error HTTP::Error
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  it 'fails loudly if PuSH hub is unavailable' do
 | 
			
		||||
    stub_request(:post, 'http://hub.example.com/').to_return(status: 503)
 | 
			
		||||
    expect { subject.call(account) }.to raise_error
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
		Reference in New Issue
	
	Block a user