ianmcook/implyr

use spark with tbl() returned object

Closed this issue · 7 comments

Hi Ian -

Thanks for your work on implyr! I created an impala object following these step:

impala <- implyr::src_impala(
      drv = drv,
      dsn = "Impala",
      database = "my_db"
    )
my_impala_tbl <- tbl(impala, in_schema("my_db", "mytable"))
test_df = my_impala_tbl %>% select(colA, colB) %>% filter(colA == "a")

Are there ways that can convert the returned test_df to a spark df without having to collect it to R first?

Thanks!

Hi! When you're using implyr, a command like tbl(impala, "tablename") returns an object of class tbl_impala. You can think of this like the Impala equivalent of a Spark DataFrame—it doesn't exist in memory in your R session, it exists as a lazily executed Impala query result set.

You can do a few things with a tbl_impala, mainly: (a) use dplyr verbs on it (which returns another tbl_impala) or (b) collect it (which returns a tbl_df: a local in-memory R data frame).

To return a Spark DataFrame, you should use sparklyr instead of implyr. sparklyr lets you access tables that are defined in a Hive metastore just like implyr does (using the tbl() function). With sparklyr, this returns an object of class tbl_spark.

Both implyr and sparklyr allow you to manipulate data using dplyr verbs, and they both push down the operations to the compute engine and lazily execute them. The difference is that implyr uses Impala as the compute engine, whereas sparklyr uses Spark as the compute engine.

I hope that helps!

Thanks for explaining! The reason I asked this question is that we have some spark functions written using sparklyr and we'd like to apply them to a large dataset that is read in using implyr. We currently read the data in using sparklyr, but it's so much faster to load the data using implyr than reading through HQL query! (One like this: )

test_data_jobj = session %>% invoke("sql", query)
test_df = sdf_register(test_data_jobj)

From your comment it sounds like it's impossible to convert an tbl_impala object to a spark df directly. Is it possible to write the tbl_impala object to a parquet file though? That way we can use sparklyr::spark_read_parquet to load the data in as a spark df and then pipe it to our spark functions.

Many thanks!

Right, there is no way to use Spark functions with Impala.

At the end of all your operations on the tbl_impala, you could use a compute() to store the result as a new Impala table. In the call to compute(), specify file_format = "PARQUET" to store the new table's data as Parquet files. Then you could read those files (from the Hive warehouse directory) with sparklyr. That's an indirect way of solving the problem, but it could work, if you have permission to create new Impala tables.

Awesome - compute() is what I was looking for! Is there a way to specify the database name in the function? I'd like to save it in database MyDB under the name 'test_tbl', so I did:

compute(test_tbl_impala, name = "MyDB.test_tbl", file_format = "PARQUET", temporary = F, overwrite = T, force = T)

And got an error:

nanodbc/nanodbc.cpp:1587: HY000: [Cloudera][ImpalaODBC] (110) Error while executing a query in Impala: [HY000] : AuthorizationException: User 'hadoop_user1' does not have privileges to execute 'CREATE' on: MyDB.test_tbl

hadoop_user1 does have privilege to create tables though in MyDB though.

Then I thought, it should know that I'm trying to save it to MyDB since I specified the database in src_impala:

    impala <- implyr::src_impala(
      drv = drv,
      dsn = "Impala",
      database = "MyDB"
    )

Yet doing this:
compute(dt_imp, name = "test_tbl", file_format = "PARQUET", temporary = F, overwrite = T, force = T)

Gives an error:

nanodbc/nanodbc.cpp:1587: HY000: [Cloudera][ImpalaODBC] (110) Error while executing a query in Impala: [HY000] : AuthorizationException: User 'hadoop_user1' does not have privileges to execute 'CREATE' on: default.test_tbl

database doesn't seem to be an argument in compute(). What am I missing here?

You can use in_schema() to specify the database name in the name argument passed to compute(), like this:

compute(dt_imp, name = in_schema("mydb", "test_tbl"), file_format = "PARQUET", temporary = FALSE)

Let me know if that works for you!

This works indeed! 👍 I was able to save it as a parquet file, and used pyspark to load that .parq file into spark and it's exactly the same as the one I saved to parquet. (The spark df loaded using sparklyr::spark_read_parquet read character columns in 'raw' format but it's a different issue to solve.)

Thank you so much!!

Great, thanks!

Is sparklyr treating the small integer columns as raw? If so, I've run into that. See sparklyr/sparklyr#973 and look at the workaround I posted in the comment there.