[dados] Migrar captura GPS STPL
Closed this issue · 4 comments
- Reforatorar captura aninhada no Prefect
- Testar captura em dev (staging)
- Código em parte já refatorado no Prefect: https://github.com/prefeitura-rio/pipelines/tree/staging/captura-brt/pipelines/rj_smtr/br_rj_riodejaneiro_stpl_gps
- Estrutura de código de referência (captura do BRT, já aninhada): https://github.com/prefeitura-rio/pipelines/tree/staging/captura-brt/pipelines/rj_smtr/br_rj_riodejaneiro_brt_gps
- Código legado de captura do STPL: https://github.com/RJ-SMTR/maestro/tree/main/repositories/capturas/br_rj_riodejaneiro_stpl_gps
- Diagrama de fluxo das pipelines: https://miro.com/app/board/o9J_lqIY7Eg=/
TODOs:
- Instalar pacote da BD
O pacote pôde ser instalado no Linux e WSL usando python 3.9.
A versão 3.10 apresentou erro ao consreuir wheel para Pandas
- Configurar pipelines local (README): https://github.com/prefeitura-rio/pipelines => testar rodar a captura do BRT
Foi testada a
captura_sppo_v2
, através dofect-agent.json
.
O código executou as tasks até o final, apenas apresentando erro na taskrename_current_flow_run_now_time
. Isto seria, segundo @Hellcassius , um comportamento esperado nas circunstânciais atuais.
Portanto, o pipelines foi configurado e testado com sucesso. (log em anexo).
Andamento das metas
Reforatorar captura aninhada no Prefect
-
1. Pegar o flow do brt e stpl, e entender o básico de como o projeto funciona.
-
2. Verificar quais tasks tem em um (brt?) que não tem no outro (stpl?);
Verificar:- Quais flows há;
- Quais tarefas e funções cada flow tem;
- Por que um flow tem uma tarefa e o outro não;
- O que cada flow consegue fazer com as configurações originais;
- O que cada flow precisa fazer, idealmente;
- O que cada flow pode fazer adicionando tais funcionalidades, um do outro.
BRT:
- Captura
create_current_date_hour_partition()
- Como esta função do STPL apenas unifica as funções do BRT para obter o filename, partitions e timestamp, a única mudança é o código do Flow ser simplificado. Não percebeu-se alteração na funcionalidade.
- Materialização - O STPL não ainda não possui este flow
STPL:
- Captura
create_api_url_brt_gps()
- Poderá preparar o código para lidar com alterações na URL da API. Até o momento não identificou-se tal necessidade para o STPL; a averiguar.
- Materialização
- Captura
-
3. Verificar quais "passos" precisa adicionar no stpl.
Captura
- ❌
create_api_url_brt_gps()
- Baseando-se nas tasks do STPL, presume-se que não será usado um período de tempo na materialização, pois o código original os continha, mas o STPL os removera. Por hora não será adicionado. - ✔️
get_current_timestamp()
,create_date_hour_partition()
,parse_timestamp_to_string()
- O STPL já possui uma função unificada para estas três informações, porém está no formato antigo. Segundo @Hellcassius, as versões atuais dos Flows visam modularizar as Tasks. Portanto estas funções substituirão a funçãocreate_current_date_hour_partition()
. - ✔️ Demais tasks aparentemente já foram adicionadas ao código.
Materialização - Não há
- ✔️ Como ele foi baseado fortemente no BRT, a princípio serão usadas as mesmas tasks.
- ❌
-
4. Adicionar os passos necessários no STPL
Materialização
- Foram adicionados os mesmos passos usados no BRT, alterando as funções próprias e nomes para se adequar ao STPL.
Captura
- Além do flow (e todo o código) ser descongelado, não houve maiores alterações. Apenas mudanças de nomenclaturas para se adequar ao STPL. Por exemplo, o nome
stpl_captura
foi alterado paracaptura_stpl
, seguindo o padrão dos outros projetos.
Testar captura em dev (staging)
-
1. Ser capaz de rodar o flow do brt (que já funciona)
Foi feito um teste (2022-09-26 20:13:16) para obter o log e verificar como o timestamp_gps se apresenta antes e depois da conversão.
-
2. Testar o flow do stpl e verificar se ele atingiu o objetivo
- O código foi testado (2021-09-29) e , pelos dados obtidos, os diretórios onde os arquivos devem ser salvos localmente parecem de acordo.
- As colunas dos arquivos crus (JSON) e tratados (CSV) parecem de acordo com a API.
Seguem os logs:
Raw (JSON):
- data/raw/br_rj_riodejaneiro
_stpl_
gps/registros/data=2022-09-29/hora=15/2022-09-29-15-21
-00.json - data/raw/br_rj_riodejaneiro
_onibus_
gps/registros/data=2022-09-29/hora=15/2022-09-29-15-57
-00.json
Treated (CSV):
- data/staging/br_rj_riodejaneiro
_stpl_
gps/registros/data=2022-09-29/hora=15/2022-09-29-15-21
-00.csv - data/staging/br_rj_riodejaneiro
_onibus_
gps/registros/data=2022-09-29/hora=15/2022-09-29-15-57
-00.csv - data/staging/br_rj_riodejaneiro
_brt_
gps/registros/data=2022-09-29/hora=18
/2022-09-29-18-24
-00.csv
Dataset ID:
- br_rj_riodejaneiro
_stpl_
gps - br_rj_riodejaneiro
_onibus_
gps - br_rj_riodejaneiro
_brt_
gps_staging
Table ID:
- STPL: registros
- SPPO: registros
- BRT: registros
Partitions (STPL, SPPO, BRT):
- STPL: data=2022-09-29/hora=15
- SPPO: data=2022-09-29/hora=15
- BRT: data=2022-09-29/hora=
18
Logs em anexo:
-
3. Testar
pylint
empipelines/rj_smtr
e resolver qualquer reclamação dele------------------------------------ Your code has been rated at 10.00/10