quintoandar/butterfree

Historical and Online Feature Store Writer - Write diretcly in hdfs

igorpereirabr1 opened this issue · 5 comments

Is there any way to write the Historical and Online feature stores directly in the hdfs?

Summary

I would like to write the Historical and Online feature stores in two different hdfs paths, using the parquet format.
I've tried use the S3Config class to do it, but not works..

What exactly is not working?
About the S3Config:
Currently it writes using the s3a fs protocol. If you open the file s3_config.py you can check this line: "path": os.path.join(f"s3a://{self.bucket}/", key). You can try creating a HdfsConfig replacing s3a for "path": os.path.join(f"hdfs://{self.bucket}/", key), or if using databricks "path": os.path.join(f"dbfs://{self.bucket}/", key).

But it raises a good point. Maybe a more generic configuration entity would be better. Something like a FileSystemConfig where you can define the protocol to use: s3, s3a, hdfs, dbfs. It should work in any cluster as long as it has this fs configurated on the cluster.

Hey @rafael, thanks for your suggestion!

I've created a new generic configuration class, just to test:

"""Holds configurations to read and write with Spark to AWS S3."""

import os
from typing import Dict, List

from butterfree.configs import environment
from butterfree.configs.db import AbstractWriteConfig


class GenericConfig(AbstractWriteConfig):
    """Configuration for Spark metastore database stored on AWS S3.
    Attributes:
        database: database name.
        mode: writing mode used be writers.
        format_: expected stored file format.
        path: database root location.
        partition_by: partition column to use when writing.
    """

    def __init__(
        self, bucket: str = None, mode: str = None, format_: str = None,
    ):
        self.bucket = bucket
        self.mode = mode
        self.format_ = format_

    @property
    def bucket(self) -> str:
        """Bucket name."""
        return self.__bucket

    @bucket.setter
    def bucket(self, value: str):
        self.__bucket = value or environment.get_variable("FEATURE_STORE_S3_BUCKET")

    @property
    def format_(self) -> str:
        """Expected stored file format."""
        return self.__format

    @format_.setter
    def format_(self, value: str):
        self.__format = value or "parquet"

    @property
    def mode(self) -> str:
        """Writing mode used be writers."""
        return self.__mode

    @mode.setter
    def mode(self, value):
        self.__mode = value or "overwrite"

    def get_options(self, key: str) -> dict:
        """Get options for AWS S3.
        Options will be a dictionary with the write and read configuration for
        Spark to AWS S3.
        Args:
            key: path to save data into AWS S3 bucket.
        Returns:
            Options configuration for AWS S3.
        """
        return {
            "mode": self.mode,
            "format_": self.format_,
            "path": os.path.join(f"dbfs://{self.bucket}/", key),
        }

    def translate(self, schema) -> List[Dict]:
        """Translate feature set spark schema to the corresponding database."""
        pass

Then

from butterfree.load.writers import (
    HistoricalFeatureStoreWriter,
    OnlineFeatureStoreWriter,
)
from butterfree.load import Sink

config = GenericConfig(bucket="/Users/ju195/feature_set", mode="append", format_="delta")

writers = [HistoricalFeatureStoreWriter(db_config=config,debug_mode=False)]
sink = Sink(writers=writers)

from butterfree.pipelines import FeatureSetPipeline

pipeline = FeatureSetPipeline(source=source, feature_set=feature_set, sink=sink)

pipeline.run()

Unfortunately returned the error:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:

Py4JJavaError: An error occurred while calling o82708.saveAsTable.
: org.apache.spark.sql.AnalysisException: Database 'test' not found;;
CreateDeltaTableCommand `test`.`feature_set`, Append, RepartitionByExpression [year#2763572, month#2763808, day#2764045], 200

Usually It's used the below syntax to write in the delta format in an DBFS path;

source_df.write.format("delta").mode("append").save("dbfs:/Users/ju195/feature_set/")

@igorpereirabr1

Butterfree's HistoricalFeatureStore writes the results to a table. It uses the configured path, but it creates a table on metastore that points to the data on the target path. So you will be able to query the table after from the metastore and not only access the data accessing directly the files on S3.

About the error: accessing this table test.feature_set. You probably didn't configure any database on the definition of the HistoricalFeatureStore. You can check this logic on the constructor of the HistoricalFeatureStore :

    def __init__(
        self,
        db_config=None,
        database=None,
        num_partitions=None,
        validation_threshold: float = DEFAULT_VALIDATION_THRESHOLD,
        debug_mode: bool = False,
    ):
        self.db_config = db_config or S3Config()
        self.database = database or environment.get_variable(
            "FEATURE_STORE_HISTORICAL_DATABASE"
        )
        self.num_partitions = num_partitions or DEFAULT_NUM_PARTITIONS
        self.validation_threshold = validation_threshold
        self.debug_mode = debug_mode

It tries to use the argument database or get this value by an env variable FEATURE_STORE_HISTORICAL_DATABASE. You need to use a existing database on your metastore, and pass the name of this database in the arguments or setting this env variable. If both are not provided, it will use the default value test.

So concluding: if perhaps you decide to create a database called tests (spark sql = create database test;) your job will probably work. But if you pretend to use any other existing database on your metastore, you need to configure on the writer.

Thanks @rafaelleinio , I'll try to test with a valid database, but first I need make sure that I've access to create one and writes in Hive Metastore.. If not I'll need keep writing and reading the files directly...