Código nos links abaixo atendendo o desafio de extração das tabelas dinâmicas do arquivo "vendas-combustiveis-m3.xls".
- Sales of oil derivative fuels by UF and product
- Sales of diesel by UF and type
Para uma melhor avaliação optei por disponibilizar o código no Google Colab, conforme link abaixo.
Segue também o arquivo ".py".
Arquivos gerados pelo código.
Dataframes gerados pelo código.
- Sales of oil derivative fuels by UF and product
- Sales of diesel by UF and type
O desafio também propôs orquestrar um pipeline de dados.
A solução que eu entendi ser viável seria utilizando o Airflow, contudo ainda não tinha utilizado o mesmo.
Me propus a aprender e, por utilizar no meu computador o Windows no momento, tentei de duas maneiras, utilizando o Docker e com um Ubuntu habilitando o subsistema do Windows para Linux (WSL).
Pelo Docker consegui subir o Airflow, escrever algumas Dags simples que funcionaram, com um exemplo mais abaixo. A ideia era segmentar as etapas do código em várias tasks, porém obtive alguns erros no momento da utilização de alguns pacotes python.
Já no Ubuntu, pensei em utilizar a Dag para rodar o script como um todo para posteriormente reorganizar em tasks. Uma dificuldade foi fornecer o caminho correto do script (provavelmente por que o caminho do arquivo.py é do Windows), para que assim o airflow conseguisse rodar o pipeline.
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
seven_days_ago = datetime.combine(datetime.today() - timedelta(7), datetime.min.time())
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': seven_days_ago,
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(days=1),
}
dag = DAG('execute_Case_Raizen', default_args=default_args)
t1 = BashOperator(
task_id='execute_Case_Raizen.py',
bash_command='python Case_Raizen.py',
dag=dag)
Apenas a nível de aprendizado consegui fazer funcionar DAGs mais simples como esta, com parte do código do desafio.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def file_request():
# File request
file_url = 'http://raw.githubusercontent.com/raizen-analytics/data-engineering-test/master/assets/vendas-combustiveis-m3.xls'
filename = 'vendas_combustiveis_m3.xls'
request.urlretrieve(file_url, filename)
with DAG(dag_id='file_request', schedule=None, start_date=datetime(2023,5,9)) as dag:
request_file2 = PythonOperator(
task_id='file_request',
python_callable=file_request
)