No jobs are running
Closed this issue ยท 8 comments
Just started running Que with rails and activejob, and it seems that no jobs are actually running. I installed que-web, where I see that all my jobs are scheduled, and with the right time, but they never run. I don't know where to start debugging this.
My setup: que version 1.0.0.beta3, rails 6 with active_job, psql version 12.1. I tried several environments, all give the same result.
This is an example of the arguments of a scheduled job:
{:job_id=>"b0dd0192-f8ec-4bbb-98a8-b2e42693a95c", :locale=>"he", :priority=>nil, :timezone=>"Jerusalem", :arguments=>["Teaching::IssueMailer", "replied", "deliver_now", {:args=>[{:_aj_globalid=>"gid://bgumath/Issue/2ae15a86-73f7-4e21-bcce-a54d99c06071"}, {:_aj_symbol_keys=>[]}], :_aj_symbol_keys=>["args"]}], :job_class=>"ActionMailer::MailDeliveryJob", :executions=>0, :queue_name=>"mailers", :enqueued_at=>"2019-11-19T12:11:00Z", :provider_job_id=>nil, :exception_executions=>{}}
Can you reproduce this behaviour in an example application for me to debug?
I tried to reproduce this using a standard rails new --database=postgresql
and setting everything up.
Did you set all of the settings in your config/application.rb
?
Did you create a AR migration?
I created and ran the following migration:
class CreateQueSchema < ActiveRecord::Migration[6.0]
def up
# Whenever you use Que in a migration, always specify the version you're
# migrating to. If you're unsure what the current version is, check the
# changelog.
Que.migrate!(version: 4)
end
def down
# Migrate to version 0 to remove Que entirely.
Que.migrate!(version: 0)
end
end
I'm not sure which settings I should set in config/application.rb
, here is the full file:
require_relative 'boot'
require 'rails/all'
module Gollum
GIT_ADAPTER = 'rugged'
end
Bundler.require(*Rails.groups)
module Bgumath
class Application < Rails::Application
config.load_defaults '6.0'
config.time_zone = 'Jerusalem'
config.beginning_of_week = :sunday
config.i18n.available_locales = [:en, :he]
I18n.config.missing_interpolation_argument_handler = ->(key, args, str) {
if args.key?(:interpolation_default)
I18n.interpolate(str, {key => args[:interpolation_default]}.merge(args))
else
raise I18n::MissingInterpolationArgument.new(key, args, str)
end
}
config.action_view.form_with_generates_remote_forms = true
config.action_view.prefix_partial_path_with_controller_namespace = false
config.eager_load_paths += Dir[Rails.root.join(*%w[spec mailers previews])]
config.add_autoload_paths_to_load_path = false
config.generators do |g|
g.test_framework :rspec,
:fixtures => true,
:view_specs => false,
:helper_specs => false,
:routing_specs => true,
:controller_specs => true,
:request_specs => true
g.fixture_replacement :factory_bot, :dir => "spec/factories"
g.template_engine :erb
g.helper false
g.jbuilder false
g.stylesheets false
g.javascripts false
g.assets false
end
config.middleware.insert 0, Rack::UTF8Sanitizer
config.active_storage.variant_processor = :vips
config.active_storage.replace_on_assign_to_many = true
config.middleware.insert_before 0, Rack::Cors do
allow do
origins '*'
resource '*', headers: :any, methods: [:get, :post, :options]
end
end
config.rails_semantic_logger.quiet_assets = true
ActiveRecord::SessionStore::Session.serializer = :null
config.middleware.use(Rack::Attack)
I18n.backend.class.send(:include, I18n::Backend::Cascade)
end
end
module ActiveModel
class Railtie < Rails::Railtie
generators do |app|
Rails::Generators.configure! app.config.generators
require_relative '../lib/rails/generators/extensions'
end
end
end
Will try to create a minimal example later. Thanks!
I also verified that changes by the migration are reflected in the actual db. Here is a relevant part from db/structure.sql
:
--
-- Name: que_validate_tags(jsonb); Type: FUNCTION; Schema: public; Owner: -
--
CREATE FUNCTION public.que_validate_tags(tags_array jsonb) RETURNS boolean
LANGUAGE sql
AS $$
SELECT bool_and(
jsonb_typeof(value) = 'string'
AND
char_length(value::text) <= 100
)
FROM jsonb_array_elements(tags_array)
$$;
SET default_tablespace = '';
SET default_table_access_method = heap;
--
-- Name: que_jobs; Type: TABLE; Schema: public; Owner: -
--
CREATE TABLE public.que_jobs (
priority smallint DEFAULT 100 NOT NULL,
run_at timestamp with time zone DEFAULT now() NOT NULL,
id bigint NOT NULL,
job_class text NOT NULL,
error_count integer DEFAULT 0 NOT NULL,
last_error_message text,
queue text DEFAULT 'default'::text NOT NULL,
last_error_backtrace text,
finished_at timestamp with time zone,
expired_at timestamp with time zone,
args jsonb DEFAULT '[]'::jsonb NOT NULL,
data jsonb DEFAULT '{}'::jsonb NOT NULL,
CONSTRAINT error_length CHECK (((char_length(last_error_message) <= 500) AND (char_length(last_error_backtrace) <= 10000))),
CONSTRAINT job_class_length CHECK ((char_length(
CASE job_class
WHEN 'ActiveJob::QueueAdapters::QueAdapter::JobWrapper'::text THEN ((args -> 0) ->> 'job_class'::text)
ELSE job_class
END) <= 200)),
CONSTRAINT queue_length CHECK ((char_length(queue) <= 100)),
CONSTRAINT valid_args CHECK ((jsonb_typeof(args) = 'array'::text)),
CONSTRAINT valid_data CHECK (((jsonb_typeof(data) = 'object'::text) AND ((NOT (data ? 'tags'::text)) OR ((jsonb_typeof((data -> 'tags'::text)) = 'array'::text) AND (jsonb_array_length((data -> 'tags'::text)) <= 5) AND public.que_validate_tags((data -> 'tags'::text))))))
)
WITH (fillfactor='90');
--
-- Name: TABLE que_jobs; Type: COMMENT; Schema: public; Owner: -
--
COMMENT ON TABLE public.que_jobs IS '4';
--
-- Name: que_determine_job_state(public.que_jobs); Type: FUNCTION; Schema: public; Owner: -
--
CREATE FUNCTION public.que_determine_job_state(job public.que_jobs) RETURNS text
LANGUAGE sql
AS $$
SELECT
CASE
WHEN job.expired_at IS NOT NULL THEN 'expired'
WHEN job.finished_at IS NOT NULL THEN 'finished'
WHEN job.error_count > 0 THEN 'errored'
WHEN job.run_at > CURRENT_TIMESTAMP THEN 'scheduled'
ELSE 'ready'
END
$$;
--
-- Name: que_job_notify(); Type: FUNCTION; Schema: public; Owner: -
--
CREATE FUNCTION public.que_job_notify() RETURNS trigger
LANGUAGE plpgsql
AS $$
DECLARE
locker_pid integer;
sort_key json;
BEGIN
-- Don't do anything if the job is scheduled for a future time.
IF NEW.run_at IS NOT NULL AND NEW.run_at > now() THEN
RETURN null;
END IF;
-- Pick a locker to notify of the job's insertion, weighted by their number
-- of workers. Should bounce pseudorandomly between lockers on each
-- invocation, hence the md5-ordering, but still touch each one equally,
-- hence the modulo using the job_id.
SELECT pid
INTO locker_pid
FROM (
SELECT *, last_value(row_number) OVER () + 1 AS count
FROM (
SELECT *, row_number() OVER () - 1 AS row_number
FROM (
SELECT *
FROM public.que_lockers ql, generate_series(1, ql.worker_count) AS id
WHERE listening AND queues @> ARRAY[NEW.queue]
ORDER BY md5(pid::text || id::text)
) t1
) t2
) t3
WHERE NEW.id % count = row_number;
IF locker_pid IS NOT NULL THEN
-- There's a size limit to what can be broadcast via LISTEN/NOTIFY, so
-- rather than throw errors when someone enqueues a big job, just
-- broadcast the most pertinent information, and let the locker query for
-- the record after it's taken the lock. The worker will have to hit the
-- DB in order to make sure the job is still visible anyway.
SELECT row_to_json(t)
INTO sort_key
FROM (
SELECT
'job_available' AS message_type,
NEW.queue AS queue,
NEW.priority AS priority,
NEW.id AS id,
-- Make sure we output timestamps as UTC ISO 8601
to_char(NEW.run_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') AS run_at
) t;
PERFORM pg_notify('que_listener_' || locker_pid::text, sort_key::text);
END IF;
RETURN null;
END
$$;
--
-- Name: que_state_notify(); Type: FUNCTION; Schema: public; Owner: -
--
CREATE FUNCTION public.que_state_notify() RETURNS trigger
LANGUAGE plpgsql
AS $$
DECLARE
row record;
message json;
previous_state text;
current_state text;
BEGIN
IF TG_OP = 'INSERT' THEN
previous_state := 'nonexistent';
current_state := public.que_determine_job_state(NEW);
row := NEW;
ELSIF TG_OP = 'DELETE' THEN
previous_state := public.que_determine_job_state(OLD);
current_state := 'nonexistent';
row := OLD;
ELSIF TG_OP = 'UPDATE' THEN
previous_state := public.que_determine_job_state(OLD);
current_state := public.que_determine_job_state(NEW);
-- If the state didn't change, short-circuit.
IF previous_state = current_state THEN
RETURN null;
END IF;
row := NEW;
ELSE
RAISE EXCEPTION 'Unrecognized TG_OP: %', TG_OP;
END IF;
SELECT row_to_json(t)
INTO message
FROM (
SELECT
'job_change' AS message_type,
row.id AS id,
row.queue AS queue,
coalesce(row.data->'tags', '[]'::jsonb) AS tags,
to_char(row.run_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') AS run_at,
to_char(now() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') AS time,
CASE row.job_class
WHEN 'ActiveJob::QueueAdapters::QueAdapter::JobWrapper' THEN
coalesce(
row.args->0->>'job_class',
'ActiveJob::QueueAdapters::QueAdapter::JobWrapper'
)
ELSE
row.job_class
END AS job_class,
previous_state AS previous_state,
current_state AS current_state
) t;
PERFORM pg_notify('que_state', message::text);
RETURN null;
END
$$;
You need to config.active_job.queue_adapter = :que
๐ Or did you do that in one of the config/environments/
files?
You need to
config.active_job.queue_adapter = :que
smile Or did you do that in one of theconfig/environments/
files?
Sure, in all of them, actually. A basic thing I don't understand: What part on my computer is supposed to actually run the jobs? As far as I understand, it is not part of my web app, right?
No, you need to run bundle exec que
which will spawn a process which will run the jobs ๐
Come and join our discord server where I can help you out, because this isn't a bug: https://discord.gg/B3EW32H