Support AWS Athena UNLOAD
DyfanJones opened this issue · 7 comments
Aws Athena support UNLOAD which allows Athena to write out different file type i.e. parquet.
This will allow noctua/RAthena to utilise AWS Athena Unload queries and read the parquet format similar to how AWS Data Wrangler
currently does (NOTE: AWS Data Wrangler wraps queries with CTAS (https://docs.aws.amazon.com/athena/latest/ug/ctas.html)
AWS Data Wrangler example with Pros/Cons to their current CTAS method
https://aws-data-wrangler.readthedocs.io/en/stable/tutorials/006%20-%20Amazon%20Athena.html
Set up awswrangler example for noctua
benchmarks
import awswrangler as wr
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/data/"
if "awswrangler_test" not in wr.catalog.databases().values:
wr.catalog.create_database("awswrangler_test")
cols = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]
df = wr.s3.read_csv(
path="s3://noaa-ghcn-pds/csv/189",
names=cols,
parse_dates=["dt", "obs_time"]) # Read 10 files from the 1890 decade (~1GB)
wr.s3.to_parquet(
df=df,
path=path,
dataset=True,
mode="overwrite",
database="awswrangler_test",
table="noaa"
);
wr.catalog.table(database="awswrangler_test", table="noaa")
Initial benchmark testing:
remotes::install_github("dyfanjones/noctua", ref="parquet_unload")
library(DBI)
con <- dbConnect(noctua::athena())
# AWS Athena result is outputed as CSV and then read into R.
system.time({
df = dbGetQuery(con, "SELECT * FROM awswrangler_test.noaa")
})
# Info: (Data scanned: 80.88 MB)
# user system elapsed
# 81.819 38.409 667.637
# AWS Athena result is outputed as Parquet and then read into R.
system.time({
df = dbGetQuery(con, "SELECT * FROM awswrangler_test.noaa", unload = T)
})
# Info: (Data scanned: 80.88 MB)
# user system elapsed
# 17.899 2.992 57.022
When caching is enable extra time performance is gained:
noctua::noctua_options(cache_size = 1)
# AWS Athena result is outputed as Parquet and then read into R.
system.time({
df = dbGetQuery(con, "SELECT * FROM awswrangler_test.noaa", unload = T)
})
# Info: (Data scanned: 80.88 MB)
# user system elapsed
# 17.899 2.992 57.022
system.time({
df = dbGetQuery(con, "SELECT * FROM awswrangler_test.noaa", unload = T)
})
# Info: (Data scanned: 80.88 MB)
# user system elapsed
# 14.094 3.117 34.663
The unload method does come with some down side. Possibly create a vignette and document pros and cons of both methods, so that users are fully aware of what to expect.
Benchmark running on sagemaker ml.t3.xlarge
instance:
library(DBI)
con <- dbConnect(noctua::athena())
# Query ran using CSV output
system.time({
df = dbGetQuery(con, "SELECT * FROM awswrangler_test.noaa")
})
# Info: (Data scanned: 80.88 MB)
# user system elapsed
# 57.004 8.430 160.567
noctua::noctua_options(cache_size = 1)
# Query ran using UNLOAD Parquet output
system.time({
df = dbGetQuery(con, "SELECT * FROM awswrangler_test.noaa", unload = T)
})
# Info: (Data scanned: 80.88 MB)
# user system elapsed
# 21.622 2.350 39.232
# Query ran using cache
system.time({
df = dbGetQuery(con, "SELECT * FROM awswrangler_test.noaa", unload = T)
})
# Info: (Data scanned: 80.88 MB)
# user system elapsed
# 17.514 2.430 14.249
# Query ran using cache with unload parameter not specified
system.time({
df = dbGetQuery(con, "SELECT * FROM awswrangler_test.noaa")
})
# Info: (Data scanned: 80.88 MB)
# user system elapsed
# 15.319 2.445 11.924
Results:
csv:
seconds:160.567
unload:
seconds: 39.232
4X faster
cache unload:
seconds:11.924
13.4X faster
noctua: 2.2.0.9000
R: 4.1.1
Comparison to awswrangler
:
Note: Benchmarks ran on sagemaker ml.t3.xlarge
instance:
import awswrangler as wr
# Query with AWS Athena csv output
%%time
wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test", ctas_approach=False)
# CPU times: user 1min 30s, sys: 5.78 s, total: 1min 36s
# Wall time: 3min 9s
# seconds: 189
# Query with AWS Athena Parquet output
%%time
wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test")
# CPU times: user 1min 3s, sys: 6.53 s, total: 1min 10s
# Wall time: 1min 32s
# seonds: 92
# Query with AWS Athena Parquet output
%%time
wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test", categories=["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"])
# CPU times: user 18.1 s, sys: 2.21 s, total: 20.3 s
# Wall time: 35.9 s
# seconds: 35.9
Results:
csv:
seconds: 189
ctas:
seonds: 92
2.05X faster
ctas with categories:
seconds: 35.9
5.26X faster
awswrangler version: 2.11.0
python: 3.6.13
Closing issues as AWS Athena Unload has been added the RAthena and noctua.
Great stuff!