This repo has been moved to https://github.com/seabow/spark-lucene
A Lucene Datasource of Spark.Supports push down filter and push down aggeragation.Test on spark version 3.0.2
val userDefinedSchema_01 = StructType(
List(
StructField("Day", IntegerType, true),
StructField("Customer ID", StringType, true),
StructField("Customer Name", StringType, true),
StructField("Standard Package", IntegerType, true),
StructField("Extra Option 1", LongType, true),
StructField("Extra Option 2", FloatType, true),
StructField("Extra Option 3", DoubleType, true),
StructField("Array Info", ArrayType(StringType), true),
StructField("Map Info", MapType(StringType, StringType), true),
StructField("Struct Info", StructType(Seq(
StructField("name", StringType, true),
StructField("age", IntegerType, true)
)), true)
)
)
val expectedData_01 = List(
Row(1, "CA869", "Phạm Uyển Trinh", null, null, 2200.01f, null, Array("1", "2", "3"), Map(), Row("John Doe", 30)),
Row(2, "CA870", "Nguyễn Liên Thảo", null, null, 2000.02f, 1350.05d, Array("1", "2", "3"), null, Row("Jane Smith", 25)),
Row(4, "CA871", "Lê Thị Nga", 17000, null, null, null, Array("1", "2", "3"), Map("color" -> "yellow", "0.3" -> "1"), Row("David Johnson", 40)),
Row(1, "CA872", "Phan Tố Nga", null, null, 2000.02f, null, Array("1", "2", "3"), Map(), Row("Sarah Williams", 35)),
Row(1, "CA873", "Nguyễn Thị Teresa Teng", null, 132324l, 1200.03f, null, Array("1", "2", "3","4"), Map("color" -> "red", "0.5" -> "2"), Row("Michael Brown", 45))
).asJava
val df=spark.createDataFrame(expectedData_01,userDefinedSchema_01)
df.write.format("lucene").mode("overwrite").save("spark_lucene")
val df= spark.read.format("lucene").load("spark_lucene")
- Read and write data with schema using spark-sql/dataframe api,as beyond.
- Pushed filters support not only for
atomic
types (int,string..
) but also forcomplex
types. e.g.Map<String,String>. Array. StructType
. If you have a tags:Map<String,String> field. lucene hadoop will allow you to search for all keys of thetags
field. just like
df.filter("tags.`your_key`='you_value'")
And you can also push filters for array_contains()
expression as follows:
df.filter("array_contains(your_array_col,'you_value')")
Both of the above cases will be converted to a TermQuery of Lucene query.
Of course you can also push filters for any other expression. e.g. >
,<
,in
...
- Push down aggregation support for
count,max,min,sum
by luceneDocValuesField
. (Due to the limitations of Spark 3.0.2, aggregation push down foravg
is not supported.The alternative solution is to calculate the sum and count first, and then divide them.) - Prefer location implements for partition file.
When writing a dataframe aslucene
format,each partition file will be written as a lucene index dir on hdfs,with a suffix.lucene
.
Since opening an index in Lucene is a relatively expensive operation, we want to perform the index opening only when loading the Lucene-format data source for the first time. This requires ensuring that queries for the corresponding partition files are allocated to a fixed executor as much as possible. - Support facets accelerate for arrayType.
spark.read.option("enforceFacetSchema","true").lucene("lucene_path").groupBy("ArrayInfo").count()
same as
spark.read.orc("orc_path"
).withColumn("ArrayInfo", explode_outer(col("ArrayInfo"))).groupBy("ArrayInfo").count()
but faster.
Test Format : See RandomTestDataGenerator.scala Test Data Size: 100 million records Result Storage Format: ORC/Lucene file
ORC Write Performance:
- Duration: 53 seconds
- File Size: 3.0GB
Lucene Indexing Performance:
- Duration: 139 seconds
- Index Size: 13.9GB
Conditional Query Tests:
Condition: "array_contains(map_tags.sports
, 'basketball')"
Storage Format | Query Result Count | Query 1 Duration (secs) | Query 2 Duration (secs) | Query 3 Duration (secs) |
---|---|---|---|---|
ORC | 24,996,139 | 11 | 12 | 15 |
Lucene | 24,996,139 | 7 | 2 | 1 |
Condition: "map_tags['sports'] is not null"
Storage Format | Query Result Count | Query 1 Duration (secs) | Query 2 Duration (secs) | Query 3 Duration (secs) |
---|---|---|---|---|
ORC | 49,950,531 | 16 | 14 | 12 |
Lucene | 49,950,531 | 8 | 2 | 2 |
Faceted Aggregation Test:
Storage Format | Aggregation Duration 1 (secs) | Aggregation Duration 2 (secs) | Aggregation Duration 3 (secs) |
---|---|---|---|
ORC | 55 | 31 | 26 |
Lucene | 52 | 8 | 7 |
Based on the test results, the following conclusions can be drawn:
- In the case of 100 million records, ORC demonstrates good write performance with a duration of 53 seconds and a file size of 3.0GB.
- Lucene indexing takes longer, with a duration of 139 seconds, and results in a relatively bigger index size of 13.9GB.
- For conditional queries, both ORC and Lucene are capable of quickly finding records that satisfy the given conditions. In terms of query performance, Lucene slightly outperforms ORC, especially in multiple query scenarios where Lucene exhibits faster response times.
- In terms of faceted aggregation, ORC performs well with a duration of 26 seconds, while Lucene achieves the aggregation in 8 seconds.
In conclusion, the choice of data storage and query engine should be based on specific usage scenarios and requirements. ORC is suitable for storing and querying large-scale data efficiently, while Lucene is more suitable for multi-dimensional analysis and filtering scenarios.