logstash-plugins/logstash-integration-kafka

Add Support for AWS MSK IAM authorization

aydasraf opened this issue · 0 comments

I am trying to integrate Logstash version 8.2.2 with AWS Managed Kafka Service (MSK) with Kafka version of 2.6.3 that is using the IAM Authorization and logstash-integration-kafka v10.10.0

I used the following Kafka output config in my pipeline

        kafka {
          bootstrap_servers => msk_bootstrap_servers
          client_id => "logstash-to-kafka"
          compression_type => "gzip"
          sasl_jaas_config => "software.amazon.msk.auth.iam.IAMLoginModule required;"
          sasl_mechanism => "AWS_MSK_IAM"
          security_protocol => "SASL_SSL"
          codec => "json_lines"
          topic_id => "logs"
    }

once the pipeline is deployed, the output plugin fails with the following exception javax.security.auth.login.LoginException: No LoginModule found for software.amazon.msk.auth.iam.IAMLoginModule

below is the full-stack trace for the error:

"line":939,"classLoaderName":"app"},{"class":"org.jruby.RubyClass$INVOKER$i$newInstance","method":"call","file":"RubyClass$INVOKER$i$newInstance.gen","line":-1,"classLoaderName":"app"},{"class":"org.jruby.java.proxies.ConcreteJavaProxy$NewMethod","method":"call","file":"ConcreteJavaProxy.java","line":109,"classLoaderName":"app"},{"class":"org.jruby.ir.targets.InvokeSite","method":"invoke","file":"InvokeSite.java","line":207,"classLoaderName":"app"},{"class":"usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_10_dot_0_minus_java.lib.logstash.outputs.kafka","method":"RUBY$method$create_producer$0","file":"/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.10.0-java/lib/logstash/outputs/kafka.rb","line":369},{"class":"usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_10_dot_0_minus_java.lib.logstash.outputs.kafka","method":"RUBY$method$create_producer$0$__VARARGS__","file":"/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.10.0-java/lib/logstash/outputs/kafka.rb","line":330},{"class":"org.jruby.internal.runtime.methods.CompiledIRMethod","method":"call","file":"CompiledIRMethod.java","line":80,"classLoaderName":"app"},{"class":"org.jruby.internal.runtime.methods.MixedModeIRMethod","method":"call","file":"MixedModeIRMethod.java","line":70,"classLoaderName":"app"},{"class":"org.jruby.ir.targets.InvokeSite","method":"invoke","file":"InvokeSite.java","line":207,"classLoaderName":"app"},{"class":"usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_10_dot_0_minus_java.lib.logstash.outputs.kafka","method":"RUBY$method$register$0","file":"/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.10.0-java/lib/logstash/outputs/kafka.rb","line":201},{"class":"org.jruby.internal.runtime.methods.CompiledIRMethod","method":"call","file":"CompiledIRMethod.java","line":93,"classLoaderName":"app"},{"class":"org.jruby.internal.runtime.methods.MixedModeIRMethod","method":"call","file":"MixedModeIRMethod.java","line":105,"classLoaderName":"app"},{"class":"org.jruby.internal.runtime.methods.DynamicMethod","method":"call","file":"DynamicMethod.java","line":192,"classLoaderName":"app"},{"class":"org.jruby.RubyClass","method":"finvoke","file":"RubyClass.java","line":572,"classLoaderName":"app"},{"class":"org.jruby.runtime.Helpers","method":"invoke","file":"Helpers.java","line":635,"classLoaderName":"app"},{"class":"org.jruby.RubyBasicObject","method":"callMethod","file":"RubyBasicObject.java","line":354,"classLoaderName":"app"},{"class":"org.logstash.config.ir.compiler.OutputStrategyExt$SimpleAbstractOutputStrategyExt","method":"reg","file":"OutputStrategyExt.java","line":275,"classLoaderName":"app"},{"class":"org.logstash.config.ir.compiler.OutputStrategyExt$AbstractOutputStrategyExt","method":"register","file":"OutputStrategyExt.java","line":131,"classLoaderName":"app"},{"class":"org.logstash.config.ir.compiler.OutputDelegatorExt","method":"doRegister","file":"OutputDelegatorExt.java","line":117,"classLoaderName":"app"},{"class":"org.logstash.config.ir.compiler.AbstractOutputDelegatorExt","method":"register","file":"AbstractOutputDelegatorExt.java","line":68,"classLoaderName":"app"},{"class":"org.logstash.config.ir.compiler.AbstractOutputDelegatorExt$INVOKER$i$0$0$register","method":"call","file":"AbstractOutputDelegatorExt$INVOKER$i$0$0$register.gen","line":-1},{"class":"org.jruby.internal.runtime.methods.JavaMethod$JavaMethodN","method":"call","file":"JavaMethod.java","line":831,"classLoaderName":"app"},{"class":"org.jruby.ir.targets.InvokeSite","method":"fail","file":"InvokeSite.java","line":237,"classLoaderName":"app"},{"class":"org.jruby.ir.targets.InvokeSite","method":"fail","file":"InvokeSite.java","line":255,"classLoaderName":"app"},{"class":"usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline","method":"RUBY$block$register_plugins$1","file":"/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb","line":233},{"class":"org.jruby.runtime.CompiledIRBlockBody","method":"yieldDirect","file":"CompiledIRBlockBody.java","line":148,"classLoaderName":"app"},{"class":"org.jruby.runtime.BlockBody","method":"yield","file":"BlockBody.java","line":106,"classLoaderName":"app"},{"class":"org.jruby.runtime.Block","method":"yield","file":"Block.java","line":184,"classLoaderName":"app"},{"class":"org.jruby.RubyArray","method":"each","file":"RubyArray.java","line":1821,"classLoaderName":"app"},{"class":"usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline","method":"RUBY$method$register_plugins$0","file":"/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb","line":232},{"class":"usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline","method":"RUBY$method$register_plugins$0$__VARARGS__","file":"/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb","line":230},{"class":"org.jruby.internal.runtime.methods.CompiledIRMethod","method":"call","file":"CompiledIRMethod.java","line":80,"classLoaderName":"app"},{"class":"org.jruby.internal.runtime.methods.MixedModeIRMethod","method":"call","file":"MixedModeIRMethod.java","line":70,"classLoaderName":"app"},{"class":"org.jruby.ir.targets.InvokeSite","method":"invoke","file":"InvokeSite.java","line":207,"classLoaderName":"app"},{"class":"usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline","method":"RUBY$method$maybe_setup_out_plugins$0","file":"/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb","line":598},{"class":"usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline","method":"RUBY$method$maybe_setup_out_plugins$0$__VARARGS__","file":"/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb","line":596},{"class":"org.jruby.internal.runtime.methods.CompiledIRMethod","method":"call","file":"CompiledIRMethod.java","line":80,"classLoaderName":"app"},{"class":"org.jruby.internal.runtime.methods.MixedModeIRMethod","method":"call","file":"MixedModeIRMethod.java","line":70,"classLoaderName":"app"},{"class":"org.jruby.ir.targets.InvokeSite","method":"invoke","file":"InvokeSite.java","line":207,"classLoaderName":"app"},{"class":"usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline","method":"RUBY$method$start_workers$0","file":"/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb","line":245},{"class":"usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline","method":"RUBY$method$start_workers$0$__VARARGS__","file":"/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb","line":241},{"class":"org.jruby.internal.runtime.methods.CompiledIRMethod","method":"call","file":"CompiledIRMethod.java","line":80,"classLoaderName":"app"},{"class":"org.jruby.internal.runtime.methods.MixedModeIRMethod","method":"call","file":"MixedModeIRMethod.java","line":70,"classLoaderName":"app"},{"class":"org.jruby.ir.targets.InvokeSite","method":"invoke","file":"InvokeSite.java","line":207,"classLoaderName":"app"},{"class":"usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline","method":"RUBY$method$run$0","file":"/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb","line":190},{"class":"usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline","method":"RUBY$method$run$0$__VARARGS__","file":"/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb","line":185},{"class":"org.jruby.internal.runtime.methods.CompiledIRMethod","method":"call","file":"CompiledIRMethod.java","line":80,"classLoaderName":"app"},{"class":"org.jruby.internal.runtime.methods.MixedModeIRMethod","method":"call","file":"MixedModeIRMethod.java","line":70,"classLoaderName":"app"},{"class":"org.jruby.ir.targets.InvokeSite","method":"invoke","file":"InvokeSite.java","line":207,"classLoaderName":"app"},{"class":"usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline","method":"RUBY$block$start$1","file":"/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb","line":142},{"class":"org.jruby.runtime.CompiledIRBlockBody","method":"callDirect","file":"CompiledIRBlockBody.java","line":138,"classLoaderName":"app"},{"class":"org.jruby.runtime.IRBlockBody","method":"call","file":"IRBlockBody.java","line":58,"classLoaderName":"app"},{"class":"org.jruby.runtime.IRBlockBody","method":"call","file":"IRBlockBody.java","line":52,"classLoaderName":"app"},{"class":"org.jruby.runtime.Block","method":"call","file":"Block.java","line":139,"classLoaderName":"app"},{"class":"org.jruby.RubyProc","method":"call","file":"RubyProc.java","line":318,"classLoaderName":"app"},{"class":"org.jruby.internal.runtime.RubyRunnable","method":"run","file":"RubyRunnable.java","line":105,"classLoaderName":"app"},{"class":"java.lang.Thread","method":"run","file":"Thread.java","line":829,"moduleName":"java.base","moduleVersion":"11.0.14.1"}],"message":"javax.security.auth.login.LoginException: No LoginModule found for software.amazon.msk.auth.iam.IAMLoginModule","localizedMessage":"javax.security.auth.login.LoginException: No LoginModule found for software.amazon.msk.auth.iam.IAMLoginModule"},"stackTrace":[{"class":"org.apache.kafka.clients.producer.KafkaProducer","method":"<init>","file":"org/apache/kafka/clients/producer/KafkaProducer.java","line":434},{"class":"org.apache.kafka.clients.producer.KafkaProducer","method":"<init>","file":"org/apache/kafka/clients/producer/KafkaProducer.java","line":298},{"class":"jdk.internal.reflect.NativeConstructorAccessorImpl","method":"newInstance0","file":"jdk/internal/reflect/NativeConstructorAccessorImpl.java","line":-2},{"class":"jdk.internal.reflect.NativeConstructorAccessorImpl","method":"newInstance","file":"jdk/internal/reflect/NativeConstructorAccessorImpl.java","line":62},{"class":"jdk.internal.reflect.DelegatingConstructorAccessorImpl","method":"newInstance","file":"jdk/internal/reflect/DelegatingConstructorAccessorImpl.java","line":45},{"class":"java.lang.reflect.Constructor","method":"newInstance","file":"java/lang/reflect/Constructor.java","line":490},{"class":"org.jruby.javasupport.JavaConstructor","method":"newInstanceDirect","file":"org/jruby/javasupport/JavaConstructor.java","line":253},{"class":"org.jruby.RubyClass","method":"newInstance","file":"org/jruby/RubyClass.java","line":939},{"class":"org.jruby.RubyClass$INVOKER$i$newInstance","method":"call","file":"org/jruby/RubyClass$INVOKER$i$newInstance.gen","line":-1},{"class":"usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_10_dot_0_minus_java.lib.logstash.outputs.kafka","method":"create_producer","file":"/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.10.0-java/lib/logstash/outputs/kafka.rb","line":369},{"class":"usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_10_dot_0_minus_java.lib.logstash.outputs.kafka","method":"register","file":"/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.10.0-java/lib/logstash/outputs/kafka.rb","line":201},{"class":"org.jruby.RubyClass","method":"finvoke","file":"org/jruby/RubyClass.java","line":572},{"class":"org.jruby.RubyBasicObject","method":"callMethod","file":"org/jruby/RubyBasicObject.java","line":354},{"class":"org.logstash.config.ir.compiler.OutputStrategyExt$SimpleAbstractOutputStrategyExt","method":"reg","file":"org/logstash/config/ir/compiler/OutputStrategyExt.java","line":275},{"class":"org.logstash.config.ir.compiler.OutputStrategyExt$AbstractOutputStrategyExt","method":"register","file":"org/logstash/config/ir/compiler/OutputStrategyExt.java","line":131},{"class":"org.logstash.config.ir.compiler.OutputDelegatorExt","method":"doRegister","file":"org/logstash/config/ir/compiler/OutputDelegatorExt.java","line":117},{"class":"org.logstash.config.ir.compiler.AbstractOutputDelegatorExt","method":"register","file":"org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java","line":68},{"class":"org.logstash.config.ir.compiler.AbstractOutputDelegatorExt$INVOKER$i$0$0$register","method":"call","file":"org/logstash/config/ir/compiler/AbstractOutputDelegatorExt$INVOKER$i$0$0$register.gen","line":-1},{"class":"usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline","method":"register_plugins","file":"/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb","line":233},{"class":"org.jruby.RubyArray","method":"each","file":"org/jruby/RubyArray.java","line":1821},{"class":"usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline","method":"register_plugins","file":"/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb","line":232},{"class":"usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline","method":"maybe_setup_out_plugins","file":"/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb","line":598},{"class":"usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline","method":"start_workers","file":"/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb","line":245},{"class":"usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline","method":"run","file":"/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb","line":190},{"class":"usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline","method":"start","file":"/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb","line":142},{"class":"org.jruby.RubyProc","method":"call","file":"org/jruby/RubyProc.java","line":318},{"class":"java.lang.Thread","method":"run","file":"java/lang/Thread.java","line":829}],"message":"Failed to construct kafka producer","localizedMessage":"Failed to construct kafka producer"},"backtrace":["org.apache.kafka.clients.producer.KafkaProducer.<init>(org/apache/kafka/clients/producer/KafkaProducer.java:434)","org.apache.kafka.clients.producer.KafkaProducer.<init>(org/apache/kafka/clients/producer/KafkaProducer.java:298)","jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)","jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(jdk/internal/reflect/NativeConstructorAccessorImpl.java:62)","jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(jdk/internal/reflect/DelegatingConstructorAccessorImpl.java:45)","java.lang.reflect.Constructor.newInstance(java/lang/reflect/Constructor.java:490)","org.jruby.javasupport.JavaConstructor.newInstanceDirect(org/jruby/javasupport/JavaConstructor.java:253)","org.jruby.RubyClass.newInstance(org/jruby/RubyClass.java:939)","org.jruby.RubyClass$INVOKER$i$newInstance.call(org/jruby/RubyClass$INVOKER$i$newInstance.gen)","usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_10_dot_0_minus_java.lib.logstash.outputs.kafka.create_producer(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.10.0-java/lib/logstash/outputs/kafka.rb:369)","usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_10_dot_0_minus_java.lib.logstash.outputs.kafka.register(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.10.0-java/lib/logstash/outputs/kafka.rb:201)","org.jruby.RubyClass.finvoke(org/jruby/RubyClass.java:572)","org.jruby.RubyBasicObject.callMethod(org/jruby/RubyBasicObject.java:354)","org.logstash.config.ir.compiler.OutputStrategyExt$SimpleAbstractOutputStrategyExt.reg(org/logstash/config/ir/compiler/OutputStrategyExt.java:275)","org.logstash.config.ir.compiler.OutputStrategyExt$AbstractOutputStrategyExt.register(org/logstash/config/ir/compiler/OutputStrategyExt.java:131)","org.logstash.config.ir.compiler.OutputDelegatorExt.doRegister(org/logstash/config/ir/compiler/OutputDelegatorExt.java:117)","org.logstash.config.ir.compiler.AbstractOutputDelegatorExt.register(org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java:68)","org.logstash.config.ir.compiler.AbstractOutputDelegatorExt$INVOKER$i$0$0$register.call(org/logstash/config/ir/compiler/AbstractOutputDelegatorExt$INVOKER$i$0$0$register.gen)","usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.register_plugins(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:233)"

Can you add support for AWS_MSK_IAM to both the input and output plugins? Is there a proper way to add this jar lib ourselves to the Logstash if not?