prefeitura-rio/pipelines

[dados] Migrar captura GPS STPL

Closed this issue · 4 comments

  • Reforatorar captura aninhada no Prefect
  • Testar captura em dev (staging)

TODOs:

  • Instalar pacote da BD

O pacote pôde ser instalado no Linux e WSL usando python 3.9.
A versão 3.10 apresentou erro ao consreuir wheel para Pandas

Foi testada a captura_sppo_v2, através do fect-agent.json.
O código executou as tasks até o final, apenas apresentando erro na task rename_current_flow_run_now_time. Isto seria, segundo @Hellcassius , um comportamento esperado nas circunstânciais atuais.

Portanto, o pipelines foi configurado e testado com sucesso. (log em anexo).

log_test_sppo_v2.txt

yxuo commented

Andamento das metas

Reforatorar captura aninhada no Prefect

  • 1. Pegar o flow do brt e stpl, e entender o básico de como o projeto funciona.

  • 2. Verificar quais tasks tem em um (brt?) que não tem no outro (stpl?);
    Verificar:

    • Quais flows há;
    • Quais tarefas e funções cada flow tem;
    • Por que um flow tem uma tarefa e o outro não;
    • O que cada flow consegue fazer com as configurações originais;
    • O que cada flow precisa fazer, idealmente;
    • O que cada flow pode fazer adicionando tais funcionalidades, um do outro.

      BRT:

      • Captura
        • create_current_date_hour_partition() - Como esta função do STPL apenas unifica as funções do BRT para obter o filename, partitions e timestamp, a única mudança é o código do Flow ser simplificado. Não percebeu-se alteração na funcionalidade.
      • Materialização - O STPL não ainda não possui este flow

      STPL:

      • Captura
        • create_api_url_brt_gps() - Poderá preparar o código para lidar com alterações na URL da API. Até o momento não identificou-se tal necessidade para o STPL; a averiguar.
      • Materialização
        • get_materialization_date_range() - O BQ poderá processar os dados apenas usando um período de tempo de até 1h atrás (usado no BRT e ônibus). Não se sabe se é interessante aplicar no STPL. Vale notar que esta task já é usada no BRT e Ônibus.
  • 3. Verificar quais "passos" precisa adicionar no stpl.

    Captura

    • create_api_url_brt_gps() - Baseando-se nas tasks do STPL, presume-se que não será usado um período de tempo na materialização, pois o código original os continha, mas o STPL os removera. Por hora não será adicionado.
    • ✔️ get_current_timestamp(), create_date_hour_partition(), parse_timestamp_to_string() - O STPL já possui uma função unificada para estas três informações, porém está no formato antigo. Segundo @Hellcassius, as versões atuais dos Flows visam modularizar as Tasks. Portanto estas funções substituirão a função create_current_date_hour_partition().
    • ✔️ Demais tasks aparentemente já foram adicionadas ao código.

    Materialização - Não há

    • ✔️ Como ele foi baseado fortemente no BRT, a princípio serão usadas as mesmas tasks.
  • 4. Adicionar os passos necessários no STPL

    Materialização

    • Foram adicionados os mesmos passos usados no BRT, alterando as funções próprias e nomes para se adequar ao STPL.

    Captura

    • Além do flow (e todo o código) ser descongelado, não houve maiores alterações. Apenas mudanças de nomenclaturas para se adequar ao STPL. Por exemplo, o nome stpl_captura foi alterado para captura_stpl, seguindo o padrão dos outros projetos.

Testar captura em dev (staging)

  • 1. Ser capaz de rodar o flow do brt (que já funciona)

    Foi feito um teste (2022-09-26 20:13:16) para obter o log e verificar como o timestamp_gps se apresenta antes e depois da conversão.

  • 2. Testar o flow do stpl e verificar se ele atingiu o objetivo

    • O código foi testado (2021-09-29) e , pelos dados obtidos, os diretórios onde os arquivos devem ser salvos localmente parecem de acordo.
    • As colunas dos arquivos crus (JSON) e tratados (CSV) parecem de acordo com a API.
      Seguem os logs:

    Raw (JSON):

    • data/raw/br_rj_riodejaneiro_stpl_gps/registros/data=2022-09-29/hora=15/2022-09-29-15-21-00.json
    • data/raw/br_rj_riodejaneiro_onibus_gps/registros/data=2022-09-29/hora=15/2022-09-29-15-57-00.json

    Treated (CSV):

    • data/staging/br_rj_riodejaneiro_stpl_gps/registros/data=2022-09-29/hora=15/2022-09-29-15-21-00.csv
    • data/staging/br_rj_riodejaneiro_onibus_gps/registros/data=2022-09-29/hora=15/2022-09-29-15-57-00.csv
    • data/staging/br_rj_riodejaneiro_brt_gps/registros/data=2022-09-29/hora=18/2022-09-29-18-24-00.csv

    Dataset ID:

    • br_rj_riodejaneiro_stpl_gps
    • br_rj_riodejaneiro_onibus_gps
    • br_rj_riodejaneiro_brt_gps_staging

    Table ID:

    • STPL: registros
    • SPPO: registros
    • BRT: registros

    Partitions (STPL, SPPO, BRT):

    • STPL: data=2022-09-29/hora=15
    • SPPO: data=2022-09-29/hora=15
    • BRT: data=2022-09-29/hora=18

    Logs em anexo:

  • 3. Testar pylint em pipelines/rj_smtr e resolver qualquer reclamação dele

    ------------------------------------
    Your code has been rated at 10.00/10