After upgrade to 1.0.0, que runs, but doesn't process jobs
LukeClancy opened this issue · 1 comments
Hello, I have been using que and recently upgraded. Thanks for an amazing tool! Its awesome how I can set a time for a job and know it will be done. I will be going back to 0.14 on my main branch until this is fixed
Behavior:
After upgrade to 1.0.0 que is running but not processing any jobs. I tested this through creating a simple TestJob class that writes to a file. Que is also not generating any errors.
The other symptom I noticed was my computer fan went wild. Using top -i
command-line utility, the behavior seems similar to an infinite loop. %cpu for ruby2.7 goes up to 100%. Maybe this is just how workers operate?
Steps taken
- Made sure I did a migrate!(version: 4), set my config.active_record.schema_format = :sql, and made sure all jobs in default queue as specified in readme
- Made sure it found application.rb by putting in puts statements
- Created TestJob class that should write to file and then throw an exception
- enqueued 100 test jobs
- ran que and checked for changes to file or que_jobs table.
- noticed que-web relied on different version of que (1.0.0.beta3), removed it.
Specs
que version 0.14.3 -> 1.0.0
postgres 13
rails 6.0.4.4
Job Code
class TestJob < Que::Job
def run()
`echo "ran" >> test.txt` #<- shows if run here
destr #<- should error here
end
#below for rails console command only
def self.get_que_info
puts ["job classes and count: [#{QueJob.group(:job_class).
pluck(:job_class, Arel.sql("COUNT(que_jobs.id)")).to_a.join(" count - ")}]",
"error count: #{QueJob.pluck(:error_count).to_a.sum}",
"finished count: #{QueJob.where("finished_at IS NOT NULL").count}",
"past run_at count: #{QueJob.where("run_at < ?", Time.now).count}",
"test.txt: '#{`cat test.txt`}'"
]
end
def self.ping_que_info
while true
get_que_info
sleep 1
end
end
end
Que output
with the debug log setting
>> bundle exec que -l debug
application.rb running #checking paths
development.rb ran #checking paths
#<ActiveSupport::Logger:0x000055cd665e7ae8> #checking logger
Que waiting for jobs...
D, [2022-01-26T15:35:04.191200 #42104] DEBUG -- : {"lib":"que","hostname":"LClimb","pid":42104,"thread":24120,"event":"locker_start","queues":["default"]}
with log-internals setting
bundle exec que --log-internals --worker-count 1
application.rb running
development.rb ran
#<ActiveSupport::Logger:0x000055c883ff40a0>
I, [2022-01-26T16:04:14.745592 #45365] INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":8740,"internal_event":"locker_instantiate","object_id":23960,"t":"2022-01-27T00:04:14.745548Z","queues":["default"],"listen":true,"poll":true,"poll_interval":5,"wait_period":50,"maximum_buffer_size":8,"minimum_buffer_size":2,"worker_priorities":[null]}
I, [2022-01-26T16:04:14.745732 #45365] INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":8740,"internal_event":"worker_instantiate","object_id":24020,"t":"2022-01-27T00:04:14.745640Z","priority":null,"job_buffer":24040,"result_queue":24060}
Que waiting for jobs...
I, [2022-01-26T16:04:14.782244 #45365] INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"connection_execute","object_id":24240,"t":"2022-01-27T00:04:14.782197Z","backend_pid":45376,"command":"SELECT set_config('application_name', $1, false)","params":["Que Locker: 45376"],"ntuples":1}
I, [2022-01-26T16:04:14.791739 #45365] INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"connection_execute","object_id":24240,"t":"2022-01-27T00:04:14.791660Z","backend_pid":45376,"command":" -- Temporary composite type we need for our queries to work.\n CREATE TYPE pg_temp.que_query_result AS (\n locked boolean,\n remaining_priorities jsonb\n );\n\n CREATE FUNCTION pg_temp.lock_and_update_priorities(priorities jsonb, job que_jobs)\n RETURNS pg_temp.que_query_result\n AS $$\n WITH\n -- Take the lock in a CTE because we want to use the result\n -- multiple times while only taking the lock once.\n lock_taken AS (\n SELECT pg_try_advisory_lock((job).id) AS taken\n ),\n relevant AS (\n SELECT priority, count\n FROM (\n SELECT\n key::smallint AS priority,\n value::text::integer AS count\n FROM jsonb_each(priorities)\n ) t1\n WHERE priority >= (job).priority\n ORDER BY priority ASC\n LIMIT 1\n )\n SELECT\n (SELECT taken FROM lock_taken), -- R\n CASE (SELECT taken FROM lock_taken)\n WHEN false THEN\n -- Simple case - we couldn't lock the job, so don't update the\n -- priorities hash.\n priorities\n WHEN true THEN\n CASE count\n WHEN 1 THEN\n -- Remove the priority from the JSONB doc entirely, rather\n -- than leaving a zero entry in it.\n priorities - priority::text\n ELSE\n -- Decrement the value in the JSONB doc.\n jsonb_set(\n priorities,\n ARRAY[priority::text],\n to_jsonb(count - 1)\n )\n END\n END\n FROM relevant\n $$\n STABLE\n LANGUAGE SQL;\n\n CREATE FUNCTION pg_temp.que_highest_remaining_priority(priorities jsonb) RETURNS smallint AS $$\n SELECT max(key::smallint) FROM jsonb_each(priorities)\n $$\n STABLE\n LANGUAGE SQL;\n","params":[],"ntuples":0}
I, [2022-01-26T16:04:14.791835 #45365] INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"listener_instantiate","object_id":24280,"t":"2022-01-27T00:04:14.791792Z","backend_pid":45376}
I, [2022-01-26T16:04:14.791878 #45365] INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"poller_instantiate","object_id":24300,"t":"2022-01-27T00:04:14.791861Z","backend_pid":45376,"queue":"default","poll_interval":5}
I, [2022-01-26T16:04:14.791966 #45365] INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"locker_start","object_id":23960,"t":"2022-01-27T00:04:14.791933Z","backend_pid":45376,"worker_priorities":[null],"pollers":[["default",5]]}
I, [2022-01-26T16:04:14.792258 #45365] INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"connection_execute","object_id":24240,"t":"2022-01-27T00:04:14.792234Z","backend_pid":45376,"command":"LISTEN que_listener_45376","params":[],"ntuples":0}
I, [2022-01-26T16:04:14.793403 #45365] INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"connection_execute","object_id":24240,"t":"2022-01-27T00:04:14.793379Z","backend_pid":45376,"command":"clean_lockers","params":[],"ntuples":0}
I, [2022-01-26T16:04:14.794867 #45365] INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"connection_execute","object_id":24240,"t":"2022-01-27T00:04:14.794839Z","backend_pid":45376,"command":"register_locker","params":[1,"{NULL}",45365,"LClimb",true,"{\"default\"}"],"ntuples":0}
I, [2022-01-26T16:04:14.794917 #45365] INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"locker_polling","object_id":23960,"t":"2022-01-27T00:04:14.794899Z","priorities":{"32767":9},"held_locks":[],"queue":"default"}
I, [2022-01-26T16:04:14.796724 #45365] INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"poller_polled","object_id":24300,"t":"2022-01-27T00:04:14.796697Z","queue":"default","locked":0,"priorities":{"32767":9},"held_locks":[],"newly_locked":[]}
I, [2022-01-26T16:04:14.796769 #45365] INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"listener_waiting","object_id":24280,"t":"2022-01-27T00:04:14.796750Z","backend_pid":45376,"channel":"que_listener_45376","timeout":0.05}
I, [2022-01-26T16:04:14.846889 #45365] INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"locker_polling","object_id":23960,"t":"2022-01-27T00:04:14.846823Z","priorities":{"32767":9},"held_locks":[],"queue":"default"}
I, [2022-01-26T16:04:14.846961 #45365] INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"listener_waiting","object_id":24280,"t":"2022-01-27T00:04:14.846939Z","backend_pid":45376,"channel":"que_listener_45376","timeout":0.05}
I, [2022-01-26T16:04:14.897052 #45365] INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"locker_polling","object_id":23960,"t":"2022-01-27T00:04:14.897006Z","priorities":{"32767":9},"held_locks":[],"queue":"default"}
I, [2022-01-26T16:04:14.897111 #45365] INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"listener_waiting","object_id":24280,"t":"2022-01-27T00:04:14.897090Z","backend_pid":45376,"channel":"que_listener_45376","timeout":0.05}
I, [2022-01-26T16:04:14.947232 #45365] INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"locker_polling","object_id":23960,"t":"2022-01-27T00:04:14.947188Z","priorities":{"32767":9},"held_locks":[],"queue":"default"}
I, [2022-01-26T16:04:14.947287 #45365] INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"listener_waiting","object_id":24280,"t":"2022-01-27T00:04:14.947267Z","backend_pid":45376,"channel":"que_listener_45376","timeout":0.05}
I, [2022-01-26T16:04:14.997373 #45365] INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"locker_polling","object_id":23960,"t":"2022-01-27T00:04:14.997330Z","priorities":{"32767":9},"held_locks":[],"queue":"default"}
I, [2022-01-26T16:04:14.997432 #45365] INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"listener_waiting","object_id":24280,"t":"2022-01-27T00:04:14.997411Z","backend_pid":45376,"channel":"que_listener_45376","timeout":0.05}
I, [2022-01-26T16:04:15.047514 #45365] INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"locker_polling","object_id":23960,"t":"2022-01-27T00:04:15.047472Z","priorities":{"32767":9},"held_locks":[],"queue":"default"}
I, [2022-01-26T16:04:15.047570 #45365] INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"listener_waiting","object_id":24280,"t":"2022-01-27T00:04:15.047550Z","backend_pid":45376,"channel":"que_listener_45376","timeout":0.05}
I, [2022-01-26T16:04:15.097659 #45365] INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"locker_polling","object_id":23960,"t":"2022-01-27T00:04:15.097616Z","priorities":{"32767":9},"held_locks":[],"queue":"default"}
I, [2022-01-26T16:04:15.097714 #45365] INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"listener_waiting","object_id":24280,"t":"2022-01-27T00:04:15.097694Z","backend_pid":45376,"channel":"que_listener_45376","timeout":0.05}
I, [2022-01-26T16:04:15.147792 #45365] INFO -- : {"lib":"que","hostname":"LClimb","pid":45365,"thread":24220,"internal_event":"locker_polling","object_id":23960,"t":"2022-01-27T00:04:15.147753Z","priorities":{"32767":9},"held_locks":[],"queue":"default"}
it continues the last few events after that.
During downgrade to 0.14.3 hit issue with the migration saying that the que_job_notify function did not exist. So guessing that my migration hit an issue at some point, I decided to purge everything que related in the database, uninstall it, and reinstall it.
- ran this (
#obviously this is not a good solution for live applications.
DROP TABLE IF EXISTS que_jobs
DROP TABLE IF EXISTS que_lockers
DROP TABLE IF EXISTS que_values
DROP FUNCTION IF EXISTS que_job_notify;
DROP FUNCTION IF EXISTS que_state_notify;
DROP FUNCTION IF EXISTS que_validate_tags;
- uninstall, reinstall
- Que.migrate!(version: 4)
It worked, and my TestJob class was being processed as-well! Therefore I am closing this issue