que-rb/que

ActiveSupport::CurrentAttributes gets reset in console

LukeClancy opened this issue · 5 comments

This can be against expectations but is not necessarily a bug

What is CurrentAttributes:

CurrentAttributes is a class that is often used to keep track of data over a request's lifecycle. This often includes the signed in user. Its treated kind-of like a global variable

Problem Statement:

I have been using CurrentAttributes to pretend to be users during tests over the console. Recently, I noticed that TestJob.enqueue seems to reset CurrentAttributes, wiping that data. This does not happen during actual requests which go through the controller.

  1. CurrentAttributes is made for requests, so this could be seen as out of scope. In which case one could do this instead when using the console:
Class C
    class << self
        attr_accessor :test
    end
end
  1. Alternatively, regular sql requests do not reset CurrentAttributes, so it would be expected that neither would que. So it would be more user friendly to fix it.

Example of issue

class C < ActiveSupport::CurrentAttributes
	attribute :test
end
class ApplicationJob < Que::Job; end
class TestJob < ApplicationJob
	def run
		100.times{ Rails.logger.info "job" }
	end
end
def i_test_que
	C.test = 5
	Rails.logger.info C.test.inspect
	u = User.first
	Rails.logger.info C.test.inspect
	TestJob.enqueue
	Rails.logger.info C.test.inspect
end

#output when running i_test_que in console: 5, 5, nil

This does seem quite strange.

Is it definitely the same behaviour if you use the plain class C rather than CurrentAttributes? I don't see how that could be reset.
Is it only when using the Rails console, not within Rails app code?

I just tried to reproduce this in a minimal way and was unable to.

Gemfile:

# frozen_string_literal: true

source "https://rubygems.org"

gem 'que'
gem 'rails'
gem 'pg'

test.rb:

#!/usr/bin/env ruby

# Run with:
# bundle
# docker run -e POSTGRES_PASSWORD=que -e POSTGRES_USER=que -e POSTGRES_DB=que -p 5432:5432 -d postgres:14.7
# DATABASE_URL=postgres://que:que@localhost:5432/que bundle exec ./test.rb

require 'active_support'
require 'active_record'
require 'que'

ActiveRecord::Base.establish_connection(ENV['DATABASE_URL'])

Que.connection = ActiveRecord
Que.migrate!(version: Que::Migrations::CURRENT_VERSION)

class C < ActiveSupport::CurrentAttributes
  attribute :test
end

class ApplicationJob < Que::Job; end

class TestJob < ApplicationJob
  def run
    puts 'Running job'
  end
end

class QueJob < ActiveRecord::Base; end

C.test = 5
puts C.test.inspect

j = QueJob.first
puts C.test.inspect

TestJob.enqueue
puts C.test.inspect

Output:

5
5
5

Ok, I've reproduced it in the console and a Rake task of a new Rails app. As I suspected, it only occurs with CurrentAttributes, not the plain class C.

docker run -e POSTGRES_PASSWORD=que -e POSTGRES_USER=que -e POSTGRES_DB=que -p 5432:5432 -d postgres:14.7
gem install rails
rails new my_app
cd my_app
rails g model user
bundle add que
bundle add pg

config/database.yml:

development:
  adapter: postgresql
  host: 127.0.0.1
  database: que
  username: que
  password: que
  pool: <%= ENV.fetch("RAILS_MAX_THREADS") { 5 } %>
  timeout: 5000

Add to Rakefile:

task :derp do
  require 'que'
  Que.connection = ActiveRecord
  require './config/environment'
  Rails.logger = Logger.new(STDOUT)

  class C < ActiveSupport::CurrentAttributes
    attribute :test
  end
  class QueApplicationJob < Que::Job; end
  class TestJob < QueApplicationJob
    def run
      puts 'Running job'
    end
  end

  C.test = 5
  puts C.test.inspect

  u = User.first
  puts C.test.inspect

  TestJob.enqueue
  puts C.test.inspect
end

Then run:

bundle exec rails db:migrate
bundle exec rails derp

Output:

➜ bundle exec rails derp
5
5
nil

This line is the culprit:

::Rails.application.executor.wrap(&block)

in:

# Use Rails' executor (if present) to make sure that the connection
# we're using isn't taken from us while the block runs. See
# https://github.com/que-rb/que/issues/166#issuecomment-274218910
def wrap_in_rails_executor(&block)
if defined?(::Rails.application.executor)
::Rails.application.executor.wrap(&block)
else
yield
end
end

If the method body is replaced with just yield, C.test is not reset.

Edit: Backtrace:

[1] pry(Que::ActiveRecord::Connection)> backtrace
--> #0  #<Class:Que::ActiveRecord::Connection>.wrap_in_rails_executor(&block#Proc) at /home/brendan/Projects/que/lib/que/active_record/connection.rb:23
    #1  #<Class:Que::ActiveRecord::Connection>.checkout at /home/brendan/Projects/que/lib/que/active_record/connection.rb:11
    #2  Que::ConnectionPool.checkout at /home/brendan/Projects/que/lib/que/connection_pool.rb:18
    #3  Que::ConnectionPool.execute(*args#Array) at /home/brendan/Projects/que/lib/que/connection_pool.rb:57
    #4  #<Class:Que>.execute(*args#Array, **#NilClass, &block#NilClass) at /home/brendan/.rbenv/versions/3.2.2/lib/ruby/3.2.0/forwardable.rb:240
    #5  #<Class:Que::Job>.enqueue(*args#Array, **#NilClass) at /home/brendan/Projects/que/lib/que/job.rb:131
    #6  block in block in <main> at /home/brendan/Projects/que-issue-415/my_app/Rakefile:26
[...]

Backtrace of when it's doing the reset:

From: /home/brendan/Projects/que-issue-415/my_app/Rakefile:14 :

     9: 
    10:   class C < ActiveSupport::CurrentAttributes
    11:     attribute :test
    12:     before_reset do
    13:       require 'pry'; binding.pry
 => 14:       puts 'RESET'
    15:     end
    16:   end
    17:   class QueApplicationJob < Que::Job; end
    18:   class TestJob < QueApplicationJob
    19:     def run

[1] pry(#<C>)> backtrace
--> #0  block in block in <class:C> at /home/brendan/Projects/que-issue-415/my_app/Rakefile:14
    ͱ-- #1  BasicObject.instance_exec(*args) at /home/brendan/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/activesupport-7.1.3/lib/active_support/callbacks.rb:448
    #2  block in ActiveSupport::Callbacks::CallTemplate::InstanceExec0.block in make_lambda at /home/brendan/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/activesupport-7.1.3/lib/active_support/callbacks.rb:448
    #3  block (2 levels) in #<Class:ActiveSupport::Callbacks::Filters::Before>.block (2 levels) in halting(callback_sequence#ActiveSupport::Callbacks::CallbackSequence, user_callback#Proc, halted_lambda#Proc, filter#Proc, name#Symbol) at /home/brendan/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/activesupport-7.1.3/lib/active_support/callbacks.rb:202
    #4  block (2 levels) in ActiveSupport::Callbacks::CallbackChain.block (2 levels) in default_terminator at /home/brendan/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/activesupport-7.1.3/lib/active_support/callbacks.rb:707
    ͱ-- #5  Kernel.catch(*args) at /home/brendan/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/activesupport-7.1.3/lib/active_support/callbacks.rb:706
    #6  block in ActiveSupport::Callbacks::CallbackChain.block in default_terminator at /home/brendan/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/activesupport-7.1.3/lib/active_support/callbacks.rb:706
    #7  block in #<Class:ActiveSupport::Callbacks::Filters::Before>.block in halting(callback_sequence#ActiveSupport::Callbacks::CallbackSequence, user_callback#Proc, halted_lambda#Proc, filter#Proc, name#Symbol) at /home/brendan/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/activesupport-7.1.3/lib/active_support/callbacks.rb:203
    #8  block in ActiveSupport::Callbacks::CallbackSequence.block in invoke_before(arg#ActiveSupport::Callbacks::Filters::Environment) at /home/brendan/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/activesupport-7.1.3/lib/active_support/callbacks.rb:598
    ͱ-- #9  Array.each at /home/brendan/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/activesupport-7.1.3/lib/active_support/callbacks.rb:598
    #10 ActiveSupport::Callbacks::CallbackSequence.invoke_before(arg#ActiveSupport::Callbacks::Filters::Environment) at /home/brendan/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/activesupport-7.1.3/lib/active_support/callbacks.rb:598
    #11 ActiveSupport::Callbacks.run_callbacks(kind#Symbol, type#NilClass) at /home/brendan/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/activesupport-7.1.3/lib/active_support/callbacks.rb:109
    #12 ActiveSupport::CurrentAttributes.reset at /home/brendan/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/activesupport-7.1.3/lib/active_support/current_attributes.rb:221
    ͱ-- #13 Hash.each_value at /home/brendan/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/activesupport-7.1.3/lib/active_support/current_attributes.rb:159
    #14 #<Class:ActiveSupport::CurrentAttributes>.reset_all at /home/brendan/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/activesupport-7.1.3/lib/active_support/current_attributes.rb:159
    #15 block (2 levels) in block (2 levels) in <class:Railtie> at /home/brendan/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/activesupport-7.1.3/lib/active_support/railtie.rb:49
    ͱ-- #16 BasicObject.instance_exec(*args) at /home/brendan/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/activesupport-7.1.3/lib/active_support/callbacks.rb:448
    #17 block in ActiveSupport::Callbacks::CallTemplate::InstanceExec0.block in make_lambda at /home/brendan/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/activesupport-7.1.3/lib/active_support/callbacks.rb:448
    #18 block (2 levels) in #<Class:ActiveSupport::Callbacks::Filters::Before>.block (2 levels) in halting(callback_sequence#ActiveSupport::Callbacks::CallbackSequence, user_callback#Proc, halted_lambda#Proc, filter#Proc, name#Symbol) at /home/brendan/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/activesupport-7.1.3/lib/active_support/callbacks.rb:202
    #19 block (2 levels) in ActiveSupport::Callbacks::CallbackChain.block (2 levels) in default_terminator at /home/brendan/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/activesupport-7.1.3/lib/active_support/callbacks.rb:707
    ͱ-- #20 Kernel.catch(*args) at /home/brendan/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/activesupport-7.1.3/lib/active_support/callbacks.rb:706
    #21 block in ActiveSupport::Callbacks::CallbackChain.block in default_terminator at /home/brendan/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/activesupport-7.1.3/lib/active_support/callbacks.rb:706
    #22 block in #<Class:ActiveSupport::Callbacks::Filters::Before>.block in halting(callback_sequence#ActiveSupport::Callbacks::CallbackSequence, user_callback#Proc, halted_lambda#Proc, filter#Proc, name#Symbol) at /home/brendan/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/activesupport-7.1.3/lib/active_support/callbacks.rb:203
    #23 block in ActiveSupport::Callbacks::CallbackSequence.block in invoke_before(arg#ActiveSupport::Callbacks::Filters::Environment) at /home/brendan/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/activesupport-7.1.3/lib/active_support/callbacks.rb:598
    ͱ-- #24 Array.each at /home/brendan/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/activesupport-7.1.3/lib/active_support/callbacks.rb:598
    #25 ActiveSupport::Callbacks::CallbackSequence.invoke_before(arg#ActiveSupport::Callbacks::Filters::Environment) at /home/brendan/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/activesupport-7.1.3/lib/active_support/callbacks.rb:598
    #26 ActiveSupport::Callbacks.run_callbacks(kind#Symbol, type#NilClass) at /home/brendan/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/activesupport-7.1.3/lib/active_support/callbacks.rb:109
    #27 ActiveSupport::ExecutionWrapper.run at /home/brendan/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/activesupport-7.1.3/lib/active_support/execution_wrapper.rb:129
    #28 ActiveSupport::ExecutionWrapper.run! at /home/brendan/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/activesupport-7.1.3/lib/active_support/execution_wrapper.rb:125
    #29 block in #<Class:ActiveSupport::ExecutionWrapper>.block in run!(reset#FalseClass) at /home/brendan/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/activesupport-7.1.3/lib/active_support/execution_wrapper.rb:78
    #30 Kernel.tap at /home/brendan/Projects/que-issue-415/my_app/<internal:kernel>:90
    #31 #<Class:ActiveSupport::ExecutionWrapper>.run!(reset#FalseClass) at /home/brendan/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/activesupport-7.1.3/lib/active_support/execution_wrapper.rb:75
    #32 #<Class:ActiveSupport::ExecutionWrapper>.wrap(source#String) at /home/brendan/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/activesupport-7.1.3/lib/active_support/execution_wrapper.rb:90
    #33 #<Class:Que::ActiveRecord::Connection>.wrap_in_rails_executor(&block#Proc) at /home/brendan/Projects/que/lib/que/active_record/connection.rb:24
    #34 #<Class:Que::ActiveRecord::Connection>.checkout at /home/brendan/Projects/que/lib/que/active_record/connection.rb:11
    #35 Que::ConnectionPool.checkout at /home/brendan/Projects/que/lib/que/connection_pool.rb:18
    #36 Que::ConnectionPool.execute(*args#Array) at /home/brendan/Projects/que/lib/que/connection_pool.rb:57
    #37 #<Class:Que>.execute(*args#Array, **#NilClass, &block#NilClass) at /home/brendan/.rbenv/versions/3.2.2/lib/ruby/3.2.0/forwardable.rb:240
    #38 #<Class:Que::Job>.enqueue(*args#Array, **#NilClass) at /home/brendan/Projects/que/lib/que/job.rb:131
    #39 block in block in <main> at /home/brendan/Projects/que-issue-415/my_app/Rakefile:30
[...]

Wow! Thanks for the prompt response!

Based off what you found I have created a monkey patch that solves the issue. Perhaps a better approach would be one that checks for multiple thread usage, or whatever is resetting CurrentAttributes. Trying to track errors through callbacks is the worst.

The reason I did not completely replace with yield was the warning comment above the original method. The warn_source_change is my own method (i probably have too many monkey patches)

Exception.warn_source_change(Que::ActiveRecord::Connection, 'e4867be7916e315359158dae159fc6e4',
	respond_with: :error, message: "Check compatibility / Remove patch")

module Que::ActiveRecord::Connection
	def self.wrap_in_rails_executor(&block)
		#https://stackoverflow.com/questions/13506690/how-to-determine-if-rails-is-running-from-cli-console-or-as-server
		in_console = Rails.const_defined? 'Console'
		if defined?(::Rails.application.executor) and not in_console
			::Rails.application.executor.wrap(&block)
		else
			yield
		end
	end
end