swoop-inc/spark-alchemy

AbstractMethodError with Spark Scala integration

Closed this issue · 3 comments

Getting error as per below

Exception in thread "main" java.lang.AbstractMethodError: com.swoop.alchemy.spark.expressions.hll.HyperLogLogInitSimpleAgg.inputTypes()Lscala/collection/Seq;
at org.apache.spark.sql.catalyst.expressions.ExpectsInputTypes$class.checkInputDataTypes(ExpectsInputTypes.scala:44)
at org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction.checkInputDataTypes(interfaces.scala:158)
at org.apache.spark.sql.catalyst.expressions.Expression.resolved$lzycompute(Expression.scala:132)
at org.apache.spark.sql.catalyst.expressions.Expression.resolved(Expression.scala:132)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$childrenResolved$1.apply(Expression.scala:144)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$childrenResolved$1.apply(Expression.scala:144)
at scala.collection.LinearSeqOptimized$class.forall(LinearSeqOptimized.scala:69)
at scala.collection.immutable.List.forall(List.scala:83)
at org.apache.spark.sql.catalyst.expressions.Expression.childrenResolved(Expression.scala:144)
at org.apache.spark.sql.catalyst.expressions.Expression.resolved$lzycompute(Expression.scala:132)
at org.apache.spark.sql.catalyst.expressions.Expression.resolved(Expression.scala:132)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$childrenResolved$1.apply(Expression.scala:144)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$childrenResolved$1.apply(Expression.scala:144)
at scala.collection.LinearSeqOptimized$class.forall(LinearSeqOptimized.scala:69)
at scala.collection.immutable.List.forall(List.scala:83)
at org.apache.spark.sql.catalyst.expressions.Expression.childrenResolved(Expression.scala:144)
at org.apache.spark.sql.catalyst.expressions.Alias.resolved$lzycompute(namedExpressions.scala:142)
at org.apache.spark.sql.catalyst.expressions.Alias.resolved(namedExpressions.scala:142)
at org.apache.spark.sql.catalyst.plans.logical.Aggregate$$anonfun$resolved$4.apply(basicLogicalOperators.scala:502)
at org.apache.spark.sql.catalyst.plans.logical.Aggregate$$anonfun$resolved$4.apply(basicLogicalOperators.scala:502)
at scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:79)
at scala.collection.immutable.Stream.exists(Stream.scala:188)

Code:
import org.apache.spark.sql.SparkSession
import com.swoop.alchemy.spark.expressions.hll.functions._
/**

  • Hello world!

*/
object App{

def main(args: Array[String]): Unit=
{
val path1 = new java.io.File("C:\mohit\views\hlldemo\user.csv").getCanonicalPath
val spark=SparkSession.builder().appName("demo").master("local").getOrCreate()

val df1 = spark
.read
.option("header", "true")
.option("charset", "UTF8")
.csv(path1)
val df2=df1
    .groupBy("first_name").agg(hll_init_agg("user_id"))

df2.show()

}
}

pidge commented

@mohitgoyal201617 Thanks for the report. Can you provide the CSV file you are using, or at least the schema necessary to reproduce it?

Encountering the same error running in cluster mode (works well in local). I am using the following libraries

"com.swoop" %% "spark-alchemy" % "0.5.5"
"net.agkn" % "hll" % "1.6.0"

Running this simple example from the wiki:

  spark.range(1000000).select(
    // exact distinct count
    countDistinct(col("id")).as("cntd"),
    // Spark's HLL implementation with default 5% precision
    approx_count_distinct(col("id")).as("anctd_spark_default"),
    // approximate distinct count with default 5% precision
    hll_cardinality(hll_init_agg("id")).as("acntd_default")
    // approximate distinct counts with custom precision
  ).show(false)

If it works in local mode and fails on the cluster, the cluster environment is not set up correctly.