que-rb/que

After upgrade to 1.0.0, que runs, but doesn't process jobs

LukeClancy opened this issue · 1 comments

Hello, I have been using que and recently upgraded. Thanks for an amazing tool! Its awesome how I can set a time for a job and know it will be done. I will be going back to 0.14 on my main branch until this is fixed

Behavior:

After upgrade to 1.0.0 que is running but not processing any jobs. I tested this through creating a simple TestJob class that writes to a file. Que is also not generating any errors.

The other symptom I noticed was my computer fan went wild. Using top -i command-line utility, the behavior seems similar to an infinite loop. %cpu for ruby2.7 goes up to 100%. Maybe this is just how workers operate?

Steps taken

  1. Made sure I did a migrate!(version: 4), set my config.active_record.schema_format = :sql, and made sure all jobs in default queue as specified in readme
  2. Made sure it found application.rb by putting in puts statements
  3. Created TestJob class that should write to file and then throw an exception
  4. enqueued 100 test jobs
  5. ran que and checked for changes to file or que_jobs table.
  6. noticed que-web relied on different version of que (1.0.0.beta3), removed it.

Specs

que version 0.14.3 -> 1.0.0
postgres 13
rails 6.0.4.4

Job Code

class TestJob < Que::Job
	def run()
		`echo "ran" >> test.txt`          #<- shows if run here
		destr                             #<- should error here
	end

	
	#below for rails console command only
	def self.get_que_info
		puts ["job classes and count: [#{QueJob.group(:job_class).
			pluck(:job_class, Arel.sql("COUNT(que_jobs.id)")).to_a.join(" count - ")}]",
			"error count: #{QueJob.pluck(:error_count).to_a.sum}",
			"finished count: #{QueJob.where("finished_at IS NOT NULL").count}",
			"past run_at count: #{QueJob.where("run_at < ?", Time.now).count}",
			"test.txt: '#{`cat test.txt`}'"
		]
	end
	def self.ping_que_info
		while true
			get_que_info
			sleep 1
		end
	end
end

Que output

with the debug log setting

>> bundle exec que -l debug
application.rb running                 #checking paths
development.rb ran                    #checking paths
#<ActiveSupport::Logger:0x000055cd665e7ae8> #checking logger
Que waiting for jobs...
D, [2022-01-26T15:35:04.191200 #42104] DEBUG -- : {"lib":"que","hostname":"LClimb","pid":42104,"thread":24120,"event":"locker_start","queues":["default"]}

with log-internals setting

bundle exec que --log-internals --worker-count 1
application.rb running
development.rb ran
#<ActiveSupport::Logger:0x000055c883ff40a0>
I, [2022-01-26T16:04:14.745592 #45365]  INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":8740,"internal_event":"locker_instantiate","object_id":23960,"t":"2022-01-27T00:04:14.745548Z","queues":["default"],"listen":true,"poll":true,"poll_interval":5,"wait_period":50,"maximum_buffer_size":8,"minimum_buffer_size":2,"worker_priorities":[null]}
I, [2022-01-26T16:04:14.745732 #45365]  INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":8740,"internal_event":"worker_instantiate","object_id":24020,"t":"2022-01-27T00:04:14.745640Z","priority":null,"job_buffer":24040,"result_queue":24060}
Que waiting for jobs...
I, [2022-01-26T16:04:14.782244 #45365]  INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"connection_execute","object_id":24240,"t":"2022-01-27T00:04:14.782197Z","backend_pid":45376,"command":"SELECT set_config('application_name', $1, false)","params":["Que Locker: 45376"],"ntuples":1}
I, [2022-01-26T16:04:14.791739 #45365]  INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"connection_execute","object_id":24240,"t":"2022-01-27T00:04:14.791660Z","backend_pid":45376,"command":"          -- Temporary composite type we need for our queries to work.\n          CREATE TYPE pg_temp.que_query_result AS (\n            locked boolean,\n            remaining_priorities jsonb\n          );\n\n          CREATE FUNCTION pg_temp.lock_and_update_priorities(priorities jsonb, job que_jobs)\n          RETURNS pg_temp.que_query_result\n          AS $$\n            WITH\n              -- Take the lock in a CTE because we want to use the result\n              -- multiple times while only taking the lock once.\n              lock_taken AS (\n                SELECT pg_try_advisory_lock((job).id) AS taken\n              ),\n              relevant AS (\n                SELECT priority, count\n                FROM (\n                  SELECT\n                    key::smallint AS priority,\n                    value::text::integer AS count\n                  FROM jsonb_each(priorities)\n                  ) t1\n                WHERE priority >= (job).priority\n                ORDER BY priority ASC\n                LIMIT 1\n              )\n            SELECT\n              (SELECT taken FROM lock_taken), -- R\n              CASE (SELECT taken FROM lock_taken)\n              WHEN false THEN\n                -- Simple case - we couldn't lock the job, so don't update the\n                -- priorities hash.\n                priorities\n              WHEN true THEN\n                CASE count\n                WHEN 1 THEN\n                  -- Remove the priority from the JSONB doc entirely, rather\n                  -- than leaving a zero entry in it.\n                  priorities - priority::text\n                ELSE\n                  -- Decrement the value in the JSONB doc.\n                  jsonb_set(\n                    priorities,\n                    ARRAY[priority::text],\n                    to_jsonb(count - 1)\n                  )\n                END\n              END\n            FROM relevant\n          $$\n          STABLE\n          LANGUAGE SQL;\n\n          CREATE FUNCTION pg_temp.que_highest_remaining_priority(priorities jsonb) RETURNS smallint AS $$\n            SELECT max(key::smallint) FROM jsonb_each(priorities)\n          $$\n          STABLE\n          LANGUAGE SQL;\n","params":[],"ntuples":0}
I, [2022-01-26T16:04:14.791835 #45365]  INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"listener_instantiate","object_id":24280,"t":"2022-01-27T00:04:14.791792Z","backend_pid":45376}
I, [2022-01-26T16:04:14.791878 #45365]  INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"poller_instantiate","object_id":24300,"t":"2022-01-27T00:04:14.791861Z","backend_pid":45376,"queue":"default","poll_interval":5}
I, [2022-01-26T16:04:14.791966 #45365]  INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"locker_start","object_id":23960,"t":"2022-01-27T00:04:14.791933Z","backend_pid":45376,"worker_priorities":[null],"pollers":[["default",5]]}
I, [2022-01-26T16:04:14.792258 #45365]  INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"connection_execute","object_id":24240,"t":"2022-01-27T00:04:14.792234Z","backend_pid":45376,"command":"LISTEN que_listener_45376","params":[],"ntuples":0}
I, [2022-01-26T16:04:14.793403 #45365]  INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"connection_execute","object_id":24240,"t":"2022-01-27T00:04:14.793379Z","backend_pid":45376,"command":"clean_lockers","params":[],"ntuples":0}
I, [2022-01-26T16:04:14.794867 #45365]  INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"connection_execute","object_id":24240,"t":"2022-01-27T00:04:14.794839Z","backend_pid":45376,"command":"register_locker","params":[1,"{NULL}",45365,"LClimb",true,"{\"default\"}"],"ntuples":0}
I, [2022-01-26T16:04:14.794917 #45365]  INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"locker_polling","object_id":23960,"t":"2022-01-27T00:04:14.794899Z","priorities":{"32767":9},"held_locks":[],"queue":"default"}
I, [2022-01-26T16:04:14.796724 #45365]  INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"poller_polled","object_id":24300,"t":"2022-01-27T00:04:14.796697Z","queue":"default","locked":0,"priorities":{"32767":9},"held_locks":[],"newly_locked":[]}
I, [2022-01-26T16:04:14.796769 #45365]  INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"listener_waiting","object_id":24280,"t":"2022-01-27T00:04:14.796750Z","backend_pid":45376,"channel":"que_listener_45376","timeout":0.05}
I, [2022-01-26T16:04:14.846889 #45365]  INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"locker_polling","object_id":23960,"t":"2022-01-27T00:04:14.846823Z","priorities":{"32767":9},"held_locks":[],"queue":"default"}
I, [2022-01-26T16:04:14.846961 #45365]  INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"listener_waiting","object_id":24280,"t":"2022-01-27T00:04:14.846939Z","backend_pid":45376,"channel":"que_listener_45376","timeout":0.05}
I, [2022-01-26T16:04:14.897052 #45365]  INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"locker_polling","object_id":23960,"t":"2022-01-27T00:04:14.897006Z","priorities":{"32767":9},"held_locks":[],"queue":"default"}
I, [2022-01-26T16:04:14.897111 #45365]  INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"listener_waiting","object_id":24280,"t":"2022-01-27T00:04:14.897090Z","backend_pid":45376,"channel":"que_listener_45376","timeout":0.05}
I, [2022-01-26T16:04:14.947232 #45365]  INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"locker_polling","object_id":23960,"t":"2022-01-27T00:04:14.947188Z","priorities":{"32767":9},"held_locks":[],"queue":"default"}
I, [2022-01-26T16:04:14.947287 #45365]  INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"listener_waiting","object_id":24280,"t":"2022-01-27T00:04:14.947267Z","backend_pid":45376,"channel":"que_listener_45376","timeout":0.05}
I, [2022-01-26T16:04:14.997373 #45365]  INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"locker_polling","object_id":23960,"t":"2022-01-27T00:04:14.997330Z","priorities":{"32767":9},"held_locks":[],"queue":"default"}
I, [2022-01-26T16:04:14.997432 #45365]  INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"listener_waiting","object_id":24280,"t":"2022-01-27T00:04:14.997411Z","backend_pid":45376,"channel":"que_listener_45376","timeout":0.05}
I, [2022-01-26T16:04:15.047514 #45365]  INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"locker_polling","object_id":23960,"t":"2022-01-27T00:04:15.047472Z","priorities":{"32767":9},"held_locks":[],"queue":"default"}
I, [2022-01-26T16:04:15.047570 #45365]  INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"listener_waiting","object_id":24280,"t":"2022-01-27T00:04:15.047550Z","backend_pid":45376,"channel":"que_listener_45376","timeout":0.05}
I, [2022-01-26T16:04:15.097659 #45365]  INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"locker_polling","object_id":23960,"t":"2022-01-27T00:04:15.097616Z","priorities":{"32767":9},"held_locks":[],"queue":"default"}
I, [2022-01-26T16:04:15.097714 #45365]  INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"listener_waiting","object_id":24280,"t":"2022-01-27T00:04:15.097694Z","backend_pid":45376,"channel":"que_listener_45376","timeout":0.05}
I, [2022-01-26T16:04:15.147792 #45365]  INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"locker_polling","object_id":23960,"t":"2022-01-27T00:04:15.147753Z","priorities":{"32767":9},"held_locks":[],"queue":"default"}

it continues the last few events after that.

During downgrade to 0.14.3 hit issue with the migration saying that the que_job_notify function did not exist. So guessing that my migration hit an issue at some point, I decided to purge everything que related in the database, uninstall it, and reinstall it.

  1. ran this (
#obviously this is not a good solution for live applications.
DROP TABLE IF EXISTS que_jobs
DROP TABLE IF EXISTS que_lockers
DROP TABLE IF EXISTS que_values
DROP FUNCTION IF EXISTS que_job_notify;
DROP FUNCTION IF EXISTS que_state_notify;
DROP FUNCTION IF EXISTS que_validate_tags;
  1. uninstall, reinstall
  2. Que.migrate!(version: 4)

It worked, and my TestJob class was being processed as-well! Therefore I am closing this issue