DyfanJones/noctua

Support AWS Athena UNLOAD

DyfanJones opened this issue · 7 comments

Aws Athena support UNLOAD which allows Athena to write out different file type i.e. parquet.

This will allow noctua/RAthena to utilise AWS Athena Unload queries and read the parquet format similar to how AWS Data Wrangler currently does (NOTE: AWS Data Wrangler wraps queries with CTAS (https://docs.aws.amazon.com/athena/latest/ug/ctas.html)

AWS Data Wrangler example with Pros/Cons to their current CTAS method
https://aws-data-wrangler.readthedocs.io/en/stable/tutorials/006%20-%20Amazon%20Athena.html

Set up awswrangler example for noctua benchmarks

import awswrangler as wr

import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/data/"


if "awswrangler_test" not in wr.catalog.databases().values:
    wr.catalog.create_database("awswrangler_test")

cols = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]

df = wr.s3.read_csv(
    path="s3://noaa-ghcn-pds/csv/189",
    names=cols,
    parse_dates=["dt", "obs_time"])  # Read 10 files from the 1890 decade (~1GB)

wr.s3.to_parquet(
    df=df,
    path=path,
    dataset=True,
    mode="overwrite",
    database="awswrangler_test",
    table="noaa"
);

wr.catalog.table(database="awswrangler_test", table="noaa")

Initial benchmark testing:

remotes::install_github("dyfanjones/noctua", ref="parquet_unload")

library(DBI)

con <- dbConnect(noctua::athena())

# AWS Athena result is outputed as CSV and then read into R.
system.time({
  df = dbGetQuery(con, "SELECT * FROM awswrangler_test.noaa")
})
# Info: (Data scanned: 80.88 MB)
# user  system elapsed 
# 81.819  38.409 667.637 

# AWS Athena result is outputed as Parquet and then read into R.
system.time({
  df = dbGetQuery(con, "SELECT * FROM awswrangler_test.noaa", unload = T)
})
# Info: (Data scanned: 80.88 MB)
# user  system elapsed 
# 17.899   2.992  57.022 

When caching is enable extra time performance is gained:

noctua::noctua_options(cache_size = 1)

# AWS Athena result is outputed as Parquet and then read into R.
system.time({
  df = dbGetQuery(con, "SELECT * FROM awswrangler_test.noaa", unload = T)
})
# Info: (Data scanned: 80.88 MB)
# user  system elapsed 
# 17.899   2.992  57.022 

system.time({
  df = dbGetQuery(con, "SELECT * FROM awswrangler_test.noaa", unload = T)
})

# Info: (Data scanned: 80.88 MB)
# user  system elapsed 
# 14.094   3.117  34.663 

The unload method does come with some down side. Possibly create a vignette and document pros and cons of both methods, so that users are fully aware of what to expect.

Benchmark running on sagemaker ml.t3.xlarge instance:

library(DBI)

con <- dbConnect(noctua::athena())

# Query ran using CSV output
system.time({
  df = dbGetQuery(con, "SELECT * FROM awswrangler_test.noaa")
})
# Info: (Data scanned: 80.88 MB)
#    user  system elapsed
#  57.004   8.430 160.567 

noctua::noctua_options(cache_size = 1)

# Query ran using UNLOAD Parquet output
system.time({
  df = dbGetQuery(con, "SELECT * FROM awswrangler_test.noaa", unload = T)
})
# Info: (Data scanned: 80.88 MB)
#    user  system elapsed 
#  21.622   2.350  39.232 

# Query ran using cache
system.time({
  df = dbGetQuery(con, "SELECT * FROM awswrangler_test.noaa", unload = T)
})
# Info: (Data scanned: 80.88 MB)
#    user  system elapsed 
#  17.514   2.430  14.249 

# Query ran using cache with unload parameter not specified
system.time({
  df = dbGetQuery(con, "SELECT * FROM awswrangler_test.noaa")
})
# Info: (Data scanned: 80.88 MB)
#    user  system elapsed 
#  15.319   2.445  11.924 

Results:

csv:
seconds:160.567

unload:
seconds: 39.232
4X faster

cache unload:
seconds:11.924
13.4X faster

noctua: 2.2.0.9000
R: 4.1.1

Comparison to awswrangler:

Note: Benchmarks ran on sagemaker ml.t3.xlarge instance:

import awswrangler as wr

# Query with AWS Athena csv output
%%time
wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test", ctas_approach=False)
# CPU times: user 1min 30s, sys: 5.78 s, total: 1min 36s
# Wall time: 3min 9s
# seconds: 189

# Query with AWS Athena Parquet output
%%time
wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test")
# CPU times: user 1min 3s, sys: 6.53 s, total: 1min 10s
# Wall time: 1min 32s
# seonds: 92

# Query with AWS Athena Parquet output
%%time
wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test", categories=["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"])
# CPU times: user 18.1 s, sys: 2.21 s, total: 20.3 s
# Wall time: 35.9 s
# seconds: 35.9

Results:

csv:
seconds: 189

ctas:
seonds: 92
2.05X faster

ctas with categories:
seconds: 35.9
5.26X faster

awswrangler version: 2.11.0
python: 3.6.13

Closing issues as AWS Athena Unload has been added the RAthena and noctua.

Great stuff!