aws/aws-sdk-rails

Missing SQS message argument when queued for ActiveJob

kgorshkov opened this issue · 2 comments

I am struggling to integrate Aws::Rails::SqsActiveJob so that I can pull events from an AWS SQS Queue. I keep getting the error 'wrong number of arguments (given 0, expected 1)' when ActiveJob.perform is called with a queued message

Is this a serialization issue? Is the message not being serialized for ActiveJob.perform_later? Do I need a custom serializer? It appears the message argument from the queued event is not being sent to the ApplicationJob perform() function. I do restart the aws_sqs_active_job process between attempts. Here are some of the things I have tried all with this same issue.

  • various function argument signatures such as a keyword job_data:
  • inheriting from ActiveJob::Base and ApplicationJob
  • using :amazon_sqs, :amazon_sqs_async and :shoryuken for config.active_job.queue_adapter
  • using both FIFO and standard SQS queues

The gemfile contains:

ruby '2.6.6'
gem 'aws-sdk-rails'
gem 'rails', '~> 5.2'

Here is the activejob class for testing the issue:

class CartsUpdateJob < ApplicationJob
  queue_as :default

  rescue_from ActiveJob::DeserializationError do |ex|
    Rails.logger.error ex
    Rails.logger.error ex.backtrace.join("\n")
  end

  def perform(job_data)
    Rails.logger.info "data: " + job_data.inspect
    Rails.logger.info "data: " + job_data['job_class']
  end
end

I know that the message payload is arriving from the SQS queue successfully because the event's json hash value of ['job_class'] is being received by the JobRunner since JobRunner knows which ActiveJob class to use.

Starting Poller with options={:threads=>12, :max_messages=>1, :visibility_timeout=>120, :shutdown_timeout=>15, :backpressure=>10, :queues=>{:default=>"https://sqs.us-west-2.amazonaws.com/redacted/default"}, :logger=>#<ActiveSupport::Logger:0x00000000095b4920 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00000000095b4718 @datetime_format=nil>, @formatter=#<ActiveSupport::Logger::SimpleFormatter:0x00000000095bfe88 @datetime_format=nil>, @logdev=#<Logger::LogDevice:0x00000000095b4420 @shift_period_suffix="%Y%m%d", @shift_size=1048576, @shift_age=0, @filename="log/aws.log", @dev=#<File:log/aws.log>, @mon_mutex=#<Thread::Mutex:0x00000000095b42b8>, @mon_mutex_owner_object_id=78488080, @mon_owner=nil, @mon_count=0>, @local_levels=#<Concurrent::Map:0x00000000095bf960 entries=0 default_proc=nil>>, :message_group_id=>"SqsActiveJobGroup", :config_file=>#<Pathname:C:/Users/KG/theapp/config/aws_sqs_active_job.yml>, :client=>#<Aws::SQS::Client>, :queue=>"default", :environment=>"development"}
Polling on: default => https://sqs.us-west-2.amazonaws.com/redactedid/default
Processing batch of 1 messages
Running job: [CartsUpdateJob]
Error processing job [CartsUpdateJob]: wrong number of arguments (given 0, expected 1)
C:/Users/KG/theapp/app/jobs/carts_update_job.rb:9:in `perform'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/activejob-5.2.5/lib/active_job/execution.rb:39:in `block in perform_now'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/activesupport-5.2.5/lib/active_support/callbacks.rb:109:in `block in run_callbacks'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/i18n-1.8.10/lib/i18n.rb:314:in `with_locale'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/activejob-5.2.5/lib/active_job/translation.rb:9:in `block (2 levels) in <module:Translation>'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/activesupport-5.2.5/lib/active_support/callbacks.rb:118:in `instance_exec'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/activesupport-5.2.5/lib/active_support/callbacks.rb:118:in `block in run_callbacks'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/activejob-5.2.5/lib/active_job/logging.rb:26:in `block (4 levels) in <module:Logging>'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/activesupport-5.2.5/lib/active_support/notifications.rb:168:in `block in instrument'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/activesupport-5.2.5/lib/active_support/notifications/instrumenter.rb:23:in `instrument'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/activesupport-5.2.5/lib/active_support/notifications.rb:168:in `instrument'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/activejob-5.2.5/lib/active_job/logging.rb:25:in `block (3 levels) in <module:Logging>'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/activejob-5.2.5/lib/active_job/logging.rb:46:in `block in tag_logger'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/activesupport-5.2.5/lib/active_support/tagged_logging.rb:71:in `block in tagged'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/activesupport-5.2.5/lib/active_support/tagged_logging.rb:28:in `tagged'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/activesupport-5.2.5/lib/active_support/tagged_logging.rb:71:in `tagged'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/activejob-5.2.5/lib/active_job/logging.rb:46:in `tag_logger'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/activejob-5.2.5/lib/active_job/logging.rb:22:in `block (2 levels) in <module:Logging>'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/activesupport-5.2.5/lib/active_support/callbacks.rb:118:in `instance_exec'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/activesupport-5.2.5/lib/active_support/callbacks.rb:118:in `block in run_callbacks'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/activesupport-5.2.5/lib/active_support/callbacks.rb:136:in `run_callbacks'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/activejob-5.2.5/lib/active_job/execution.rb:38:in `perform_now'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/activejob-5.2.5/lib/active_job/execution.rb:24:in `block in execute'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/activesupport-5.2.5/lib/active_support/callbacks.rb:109:in `block in run_callbacks'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/activejob-5.2.5/lib/active_job/railtie.rb:28:in `block (4 levels) in <class:Railtie>'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/activesupport-5.2.5/lib/active_support/execution_wrapper.rb:87:in `wrap'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/activesupport-5.2.5/lib/active_support/reloader.rb:73:in `block in wrap'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/activesupport-5.2.5/lib/active_support/execution_wrapper.rb:87:in `wrap'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/activesupport-5.2.5/lib/active_support/reloader.rb:72:in `wrap'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/activejob-5.2.5/lib/active_job/railtie.rb:27:in `block (3 levels) in <class:Railtie>'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/activesupport-5.2.5/lib/active_support/callbacks.rb:118:in `instance_exec'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/activesupport-5.2.5/lib/active_support/callbacks.rb:118:in `block in run_callbacks'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/activesupport-5.2.5/lib/active_support/callbacks.rb:136:in `run_callbacks'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/activejob-5.2.5/lib/active_job/execution.rb:22:in `execute'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/aws-sdk-rails-3.6.0/lib/aws/rails/sqs_active_job/job_runner.rb:17:in `run'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/aws-sdk-rails-3.6.0/lib/aws/rails/sqs_active_job/executor.rb:30:in `block in execute'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/concurrent-ruby-1.1.8/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb:363:in `run_task'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/concurrent-ruby-1.1.8/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb:352:in `block (3 levels) in create_worker'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/concurrent-ruby-1.1.8/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb:335:in `loop'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/concurrent-ruby-1.1.8/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb:335:in `block (2 levels) in create_worker'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/concurrent-ruby-1.1.8/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb:334:in `catch'
C:/Ruby26-x64/lib/ruby/gems/2.6.0/gems/concurrent-ruby-1.1.8/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb:334:in `block in create_worker'

SQS events are queued in an SQS queue by a Ruby Lambda function that is triggered from an Eventbridge Rule. The test message is 20.56KB and here are some pertinent parts of the json that are taken from the SQS 'send and receive messages' utility:

{
  "version": "0",
  ...
  "region": "us-west-2",
  "resources": [],
  "detail": {
    "metadata": {
	...
    }
  },
  "webhook": {
    "line_items": [
	...
    ],
    "note": null,
    "updated_at": "2021-04-16T00:03:52.020Z",
    "created_at": "2021-04-10T00:05:43.173Z"
  },
  "job_class": "CartsUpdateJob"
}

I am able to simulate a successful ActiveJob handoff and processing with my business logic added back into the ActiveJob CartsUpdateJob perform_now() function and using the same test event via the console:

client = Aws::SQS::Client.new
queue_url = client.get_queue_url(queue_name: "theapp-dev-aws-queue")
resp = client.receive_message(queue_url: queue_url.queue_url)
job_data = JSON.parse(resp.messages[0].body)
CartsUpdateJob.perform_now(job_data)

but when I run that same code in the console with .perform_later:

Enqueued CartsUpdateJob (Job ID: e84635b8-75bd-462b-bd83-4606fb9cfa54)
to AmazonSqs(default) with arguments: ...

I can see the same argument error in the worker process output:

8:18:51 PM aws.1 |  Running job: e84635b8-75bd-462b-bd83-4606fb9cfa54[CartsUpdateJob] 8:18:51 PM aws.1 |  Running job: [CartsUpdateJob] 
8:18:51 PM aws.1 |  Error processingjob [CartsUpdateJob]: wrong number of arguments (given 0, expected 1)
8:18:51 PM aws.1 | C:/Users/KG/theapp/app/jobs/carts_update_job.rb:5:in `perform'

Thank you for any insight or help you can provide.

How are you creating the message in your Ruby lambda? I haven't been able to reproduce this yet.

I copied you're JobClass and doing CartsUpdateJob.perform_later("my-test-string") sends the following message:

{
  "job_class": "CartsUpdateJob",
  "job_id": "aecfa167-4ddf-4e94-83bf-425f2494303f",
  "provider_job_id": null,
  "queue_name": "default",
  "priority": null,
  "arguments": [
    "my-test-string"
  ],
  "executions": 0,
  "exception_executions": {},
  "locale": "en",
  "timezone": "UTC",
  "enqueued_at": "2021-04-20T21:44:28Z"
}

Running the worker then picks up and successfully runs the job and job_data in perform is the string "my-test-string". The arguments passed to perform should be the same arguments that are passed to perform_later and doesn't include the full dump of job data that is used by sqs/the serializer/deserializer (ie, job_class is not included, just what is under arguments).

I was missing the arguments key in my message. Thank you.