que-rb/que

No jobs are running

Closed this issue ยท 8 comments

Just started running Que with rails and activejob, and it seems that no jobs are actually running. I installed que-web, where I see that all my jobs are scheduled, and with the right time, but they never run. I don't know where to start debugging this.

My setup: que version 1.0.0.beta3, rails 6 with active_job, psql version 12.1. I tried several environments, all give the same result.

This is an example of the arguments of a scheduled job:

{:job_id=>"b0dd0192-f8ec-4bbb-98a8-b2e42693a95c", :locale=>"he", :priority=>nil, :timezone=>"Jerusalem", :arguments=>["Teaching::IssueMailer", "replied", "deliver_now", {:args=>[{:_aj_globalid=>"gid://bgumath/Issue/2ae15a86-73f7-4e21-bcce-a54d99c06071"}, {:_aj_symbol_keys=>[]}], :_aj_symbol_keys=>["args"]}], :job_class=>"ActionMailer::MailDeliveryJob", :executions=>0, :queue_name=>"mailers", :enqueued_at=>"2019-11-19T12:11:00Z", :provider_job_id=>nil, :exception_executions=>{}}

Can you reproduce this behaviour in an example application for me to debug?

I tried to reproduce this using a standard rails new --database=postgresql and setting everything up.

Did you set all of the settings in your config/application.rb?
Did you create a AR migration?

I created and ran the following migration:

class CreateQueSchema < ActiveRecord::Migration[6.0]
  def up
    # Whenever you use Que in a migration, always specify the version you're
    # migrating to. If you're unsure what the current version is, check the
    # changelog.
    Que.migrate!(version: 4)
  end

  def down
    # Migrate to version 0 to remove Que entirely.
    Que.migrate!(version: 0)
  end
end

I'm not sure which settings I should set in config/application.rb, here is the full file:

require_relative 'boot'

require 'rails/all'

module Gollum
  GIT_ADAPTER = 'rugged'
end

Bundler.require(*Rails.groups)

module Bgumath
  class Application < Rails::Application
    config.load_defaults '6.0'

    config.time_zone = 'Jerusalem'
    config.beginning_of_week = :sunday

    config.i18n.available_locales = [:en, :he]
    I18n.config.missing_interpolation_argument_handler = ->(key, args, str) {
      if args.key?(:interpolation_default)
        I18n.interpolate(str, {key => args[:interpolation_default]}.merge(args))
      else
        raise I18n::MissingInterpolationArgument.new(key, args, str)
      end
    }

    config.action_view.form_with_generates_remote_forms = true
    config.action_view.prefix_partial_path_with_controller_namespace = false
    config.eager_load_paths += Dir[Rails.root.join(*%w[spec mailers previews])]
    config.add_autoload_paths_to_load_path = false

    config.generators do |g|
      g.test_framework :rspec,
        :fixtures => true,
        :view_specs => false,
        :helper_specs => false,
        :routing_specs => true,
        :controller_specs => true,
        :request_specs => true
      g.fixture_replacement :factory_bot, :dir => "spec/factories"
      g.template_engine :erb
      g.helper false
      g.jbuilder false
      g.stylesheets false
      g.javascripts false
      g.assets false
    end

    config.middleware.insert 0, Rack::UTF8Sanitizer

    config.active_storage.variant_processor = :vips
    config.active_storage.replace_on_assign_to_many = true

    config.middleware.insert_before 0, Rack::Cors do
      allow do
        origins '*'
        resource '*', headers: :any, methods: [:get, :post, :options]
      end
    end

    config.rails_semantic_logger.quiet_assets = true
    

    ActiveRecord::SessionStore::Session.serializer = :null

    config.middleware.use(Rack::Attack)

    I18n.backend.class.send(:include, I18n::Backend::Cascade)

  end
end

module ActiveModel
  class Railtie < Rails::Railtie
    generators do |app|
      Rails::Generators.configure! app.config.generators
      require_relative '../lib/rails/generators/extensions'
    end
  end
end

Will try to create a minimal example later. Thanks!

I also verified that changes by the migration are reflected in the actual db. Here is a relevant part from db/structure.sql:

--
-- Name: que_validate_tags(jsonb); Type: FUNCTION; Schema: public; Owner: -
--

CREATE FUNCTION public.que_validate_tags(tags_array jsonb) RETURNS boolean
    LANGUAGE sql
    AS $$
  SELECT bool_and(
    jsonb_typeof(value) = 'string'
    AND
    char_length(value::text) <= 100
  )
  FROM jsonb_array_elements(tags_array)
$$;


SET default_tablespace = '';

SET default_table_access_method = heap;

--
-- Name: que_jobs; Type: TABLE; Schema: public; Owner: -
--

CREATE TABLE public.que_jobs (
    priority smallint DEFAULT 100 NOT NULL,
    run_at timestamp with time zone DEFAULT now() NOT NULL,
    id bigint NOT NULL,
    job_class text NOT NULL,
    error_count integer DEFAULT 0 NOT NULL,
    last_error_message text,
    queue text DEFAULT 'default'::text NOT NULL,
    last_error_backtrace text,
    finished_at timestamp with time zone,
    expired_at timestamp with time zone,
    args jsonb DEFAULT '[]'::jsonb NOT NULL,
    data jsonb DEFAULT '{}'::jsonb NOT NULL,
    CONSTRAINT error_length CHECK (((char_length(last_error_message) <= 500) AND (char_length(last_error_backtrace) <= 10000))),
    CONSTRAINT job_class_length CHECK ((char_length(
CASE job_class
    WHEN 'ActiveJob::QueueAdapters::QueAdapter::JobWrapper'::text THEN ((args -> 0) ->> 'job_class'::text)
    ELSE job_class
END) <= 200)),
    CONSTRAINT queue_length CHECK ((char_length(queue) <= 100)),
    CONSTRAINT valid_args CHECK ((jsonb_typeof(args) = 'array'::text)),
    CONSTRAINT valid_data CHECK (((jsonb_typeof(data) = 'object'::text) AND ((NOT (data ? 'tags'::text)) OR ((jsonb_typeof((data -> 'tags'::text)) = 'array'::text) AND (jsonb_array_length((data -> 'tags'::text)) <= 5) AND public.que_validate_tags((data -> 'tags'::text))))))
)
WITH (fillfactor='90');


--
-- Name: TABLE que_jobs; Type: COMMENT; Schema: public; Owner: -
--

COMMENT ON TABLE public.que_jobs IS '4';


--
-- Name: que_determine_job_state(public.que_jobs); Type: FUNCTION; Schema: public; Owner: -
--

CREATE FUNCTION public.que_determine_job_state(job public.que_jobs) RETURNS text
    LANGUAGE sql
    AS $$
  SELECT
    CASE
    WHEN job.expired_at  IS NOT NULL    THEN 'expired'
    WHEN job.finished_at IS NOT NULL    THEN 'finished'
    WHEN job.error_count > 0            THEN 'errored'
    WHEN job.run_at > CURRENT_TIMESTAMP THEN 'scheduled'
    ELSE                                     'ready'
    END
$$;


--
-- Name: que_job_notify(); Type: FUNCTION; Schema: public; Owner: -
--

CREATE FUNCTION public.que_job_notify() RETURNS trigger
    LANGUAGE plpgsql
    AS $$
  DECLARE
    locker_pid integer;
    sort_key json;
  BEGIN
    -- Don't do anything if the job is scheduled for a future time.
    IF NEW.run_at IS NOT NULL AND NEW.run_at > now() THEN
      RETURN null;
    END IF;

    -- Pick a locker to notify of the job's insertion, weighted by their number
    -- of workers. Should bounce pseudorandomly between lockers on each
    -- invocation, hence the md5-ordering, but still touch each one equally,
    -- hence the modulo using the job_id.
    SELECT pid
    INTO locker_pid
    FROM (
      SELECT *, last_value(row_number) OVER () + 1 AS count
      FROM (
        SELECT *, row_number() OVER () - 1 AS row_number
        FROM (
          SELECT *
          FROM public.que_lockers ql, generate_series(1, ql.worker_count) AS id
          WHERE listening AND queues @> ARRAY[NEW.queue]
          ORDER BY md5(pid::text || id::text)
        ) t1
      ) t2
    ) t3
    WHERE NEW.id % count = row_number;

    IF locker_pid IS NOT NULL THEN
      -- There's a size limit to what can be broadcast via LISTEN/NOTIFY, so
      -- rather than throw errors when someone enqueues a big job, just
      -- broadcast the most pertinent information, and let the locker query for
      -- the record after it's taken the lock. The worker will have to hit the
      -- DB in order to make sure the job is still visible anyway.
      SELECT row_to_json(t)
      INTO sort_key
      FROM (
        SELECT
          'job_available' AS message_type,
          NEW.queue       AS queue,
          NEW.priority    AS priority,
          NEW.id          AS id,
          -- Make sure we output timestamps as UTC ISO 8601
          to_char(NEW.run_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') AS run_at
      ) t;

      PERFORM pg_notify('que_listener_' || locker_pid::text, sort_key::text);
    END IF;

    RETURN null;
  END
$$;


--
-- Name: que_state_notify(); Type: FUNCTION; Schema: public; Owner: -
--

CREATE FUNCTION public.que_state_notify() RETURNS trigger
    LANGUAGE plpgsql
    AS $$
  DECLARE
    row record;
    message json;
    previous_state text;
    current_state text;
  BEGIN
    IF TG_OP = 'INSERT' THEN
      previous_state := 'nonexistent';
      current_state  := public.que_determine_job_state(NEW);
      row            := NEW;
    ELSIF TG_OP = 'DELETE' THEN
      previous_state := public.que_determine_job_state(OLD);
      current_state  := 'nonexistent';
      row            := OLD;
    ELSIF TG_OP = 'UPDATE' THEN
      previous_state := public.que_determine_job_state(OLD);
      current_state  := public.que_determine_job_state(NEW);

      -- If the state didn't change, short-circuit.
      IF previous_state = current_state THEN
        RETURN null;
      END IF;

      row := NEW;
    ELSE
      RAISE EXCEPTION 'Unrecognized TG_OP: %', TG_OP;
    END IF;

    SELECT row_to_json(t)
    INTO message
    FROM (
      SELECT
        'job_change' AS message_type,
        row.id       AS id,
        row.queue    AS queue,

        coalesce(row.data->'tags', '[]'::jsonb) AS tags,

        to_char(row.run_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') AS run_at,
        to_char(now()      AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') AS time,

        CASE row.job_class
        WHEN 'ActiveJob::QueueAdapters::QueAdapter::JobWrapper' THEN
          coalesce(
            row.args->0->>'job_class',
            'ActiveJob::QueueAdapters::QueAdapter::JobWrapper'
          )
        ELSE
          row.job_class
        END AS job_class,

        previous_state AS previous_state,
        current_state  AS current_state
    ) t;

    PERFORM pg_notify('que_state', message::text);

    RETURN null;
  END
$$;

You need to config.active_job.queue_adapter = :que ๐Ÿ˜„ Or did you do that in one of the config/environments/ files?

You need to config.active_job.queue_adapter = :que smile Or did you do that in one of the config/environments/ files?

Sure, in all of them, actually. A basic thing I don't understand: What part on my computer is supposed to actually run the jobs? As far as I understand, it is not part of my web app, right?

No, you need to run bundle exec que which will spawn a process which will run the jobs ๐Ÿ˜„

Come and join our discord server where I can help you out, because this isn't a bug: https://discord.gg/B3EW32H