Documentation Index
Fetch the complete documentation index at: https://docs.dados.rio/llms.txt
Use this file to discover all available pages before exploring further.
Migração de Pipeline: Prefect 1.4 → Prefect 3.0
Visão Geral
Este guia irá ajudá-lo a entender as principais diferenças e como migrar sua pipeline de forma segura.
Principais Mudanças
| Componente | Prefect 1.4 | Prefect 3.0 |
|---|
| Schedules | Python code | YAML configuration |
| Flows | @task + Flow() | @flow + @task |
| Deployments | Python scripts | YAML files |
| Agents | Agents | Work Pools |
| Configuration | Python/JSON | YAML |
Pré-requisitos
Antes de começar a migração, certifique-se de que você possui:
- ✅ Pipeline Prefect 1.4 funcionando corretamente
- ✅ Acesso completo ao código fonte da pipeline
- ✅ Conhecimento básico de YAML
- ✅ Acesso ao repositório Git da pipeline
Estrutura de Arquivos
Estrutura Atual (Prefect 1.4)
minha-pipeline/
├── flow.py # Definição do flow e tasks
├── schedules.py # Configuração de schedules
Estrutura Nova (Prefect 3.0)
minha-pipeline/
├── flow.py # Flow com @flow decorator
├── prefect.yaml # Configuração principal
├── Dockerfile # Container
└── pyproject.toml # Dependências Python
Passo a Passo da Migração
💡 Dica para Usuários de IDEs com LLM: Se você usa Cursor ou outra IDE com LLM integrada, pode usar o seguinte prompt para automatizar a migração:
Quero migrar um flow do Prefect 1.4 para o Prefect 3.0. Para isso, siga as etapas abaixo, usando apenas os arquivos cujos caminhos fornecerei:
### Arquivos:
- Schedule original (Prefect 1.4): [CAMINHO_DO_SCHEDULE_ANTIGO]
- YAML de referência (Prefect 3.0): [CAMINHO_YAML_REFERENCIA]
- YAML a ser criado (Prefect 3.0): [CAMINHO_YAML_NOVO]
- Flow base (bem escrito): [CAMINHO_FLOW_REFERENCIA]
- Flow a ser ajustado: [CAMINHO_FLOW_A_MODIFICAR]
---
### Etapas:
1. **Migrar o Schedule:**
- Pegue as informações do `schedule` do Prefect 1.4 (arquivo original).
- Converta para formato compatível com o Prefect 3.0.
- Aplique o estilo e formatação usados no YAML de referência.
- Gere um novo arquivo YAML no local especificado.
2. **Adaptar o Flow:**
- Use o flow de referência como modelo de estilo e boas práticas.
- Modifique o flow de destino para fazer a mesma coisa (ou o equivalente), mas adaptando os nomes de variáveis, funções e fluxo de execução conforme o nome da pasta onde o flow de destino está localizado (use o nome da pasta final do caminho como referência).
- Garanta compatibilidade com Prefect 3.0.
3. **Output:**
- Gere o conteúdo final do YAML convertido e do novo flow modificado.
- Comente resumidamente o que foi alterado em relação aos originais.
Use nosso template do cookiecutter para gerar rapidamente a estrutura de diretórios e arquivos necessários para uma nova pipeline Prefect. Para criar uma nova pipeline, instale uv e rode:
uvx cookiecutter templates --output-dir=pipelines
Você será solicitado a informar valores como secretaria e pipeline, que serão utilizados para preencher os nomes dos diretórios, arquivos e variáveis nos templates. O template gerado já segue o padrão de nomenclatura definido anteriormente.
O template irá gerar automaticamente:
flow.py - Estrutura base do flow
prefect.yaml - Configuração do deployment
Dockerfile - Container para execução
pyproject.toml - Dependências do projeto
.dockerignore - Arquivos ignorados no build
2. Análise da Pipeline Atual
Antes de migrar, analise sua pipeline atual:
2.1 Analisar Configurações Atuais
Identifique as configurações da sua pipeline de dump de banco:
- Schedules: Frequência, horário e timezone de execução
- Parâmetros: Configurações específicas do banco e tabelas
- Dependências: Imports e bibliotecas utilizadas
3. Migração do Schedule
3.1 Analisar o Schedule Atual
Examine seu arquivo schedules.py atual:
# Exemplo: schedules.py (Prefect 1.4)
from prefect.schedules import Schedule
from datetime import timedelta, datetime
import pytz
from prefeitura_rio.pipelines_utils.prefect import generate_dump_db_schedules
# Configuração do schedule
sme_clocks = generate_dump_db_schedules(
interval=timedelta(days=1),
start_date=datetime(2022, 1, 1, 21, 0, tzinfo=pytz.timezone("America/Sao_Paulo")),
table_parameters=sme_core_sso_queries,
)
schedule = Schedule(clocks=untuple(sme_clocks))
3.2 Converter para YAML
Crie a seção de schedules no prefect.yaml:
# prefect.yaml
name: minha-pipeline
prefect-version: 3.4.3
deployments:
- name: minha-pipeline--prod
entrypoint: flow.py:main_flow
work_pool:
name: default-agent-pool
work_queue_name: default
schedules:
- interval: 86400 # 24 horas em segundos
anchor_date: "2022-01-01T21:00:00"
timezone: America/Sao_Paulo
slug: daily-execution
parameters:
table_id: minha_tabela
execute_query: |
SELECT * FROM minha_tabela
WHERE data_atualizacao >= CURRENT_DATE - 1;
3.3 Mapeamento de Configurações
| Prefect 1.4 | Prefect 3.0 | Exemplo |
|---|
timedelta(days=1) | interval: 86400 | 86400 segundos = 1 dia |
timedelta(hours=6) | interval: 21600 | 21600 segundos = 6 horas |
start_date | anchor_date | ”2022-01-01T21:00:00” |
timezone | timezone | ”America/Sao_Paulo” |
table_parameters | parameters | Parâmetros específicos |
4. Migração do Flow
4.1 Identificar o Template
Para pipelines de dump de banco, use o template específico:
from iplanrio.pipelines_templates.dump_db.tasks import (
dump_upload_batch_task,
format_partitioned_query_task,
)
4.2 Adaptar Imports
# Antes (Prefect 1.4)
from prefect import task, Flow
from prefect.schedules import Schedule
from prefeitura_rio.pipelines_utils.prefect import generate_dump_db_schedules
# Depois (Prefect 3.0)
from prefect import task, flow
from iplanrio.pipelines_templates.dump_db.tasks import (
dump_upload_batch_task,
format_partitioned_query_task,
)
4.3 Adaptar Decorators
# Antes (Prefect 1.4)
@task
def minha_task():
return "resultado"
def main_flow():
result = minha_task()
return result
# Criar o flow
with Flow("minha-pipeline") as flow:
main_flow()
# Depois (Prefect 3.0)
@task
def minha_task():
return "resultado"
@flow(log_prints=True, name="minha-pipeline")
def main_flow():
result = minha_task()
return result
4.4 Adaptar Parâmetros
# Antes (Prefect 1.4)
def main_flow(param1, param2="default"):
# lógica da pipeline
pass
# Depois (Prefect 3.0)
@flow(log_prints=True)
def main_flow(
param1: str,
param2: str = "default",
param3: Optional[str] = None,
):
# lógica da pipeline
pass
5. Configuração do prefect.yaml
5.1 Estrutura Básica
name: minha-pipeline
prefect-version: 3.4.3
build:
- prefect.deployments.steps.run_shell_script:
id: get-commit-hash
script: git rev-parse --short HEAD
stream_output: false
- prefect_docker.deployments.steps.build_docker_image:
id: build-image
requires: prefect-docker>=0.6.5
image_name: seu-registro/imagem
tag: "minha-pipeline-{{ get-commit-hash.stdout }}"
dockerfile: Dockerfile
deployments:
- name: minha-pipeline--staging
version: "{{ build-image.tag }}"
entrypoint: flow.py:main_flow
work_pool:
name: default-agent-pool
work_queue_name: default
job_variables:
image: "{{ build-image.image_name }}:{{ build-image.tag }}"
command: python -m prefect flow-run execute
5.2 Configuração de Schedules
deployments:
- name: minha-pipeline--prod
version: "{{ build-image.tag }}"
entrypoint: flow.py:main_flow
work_pool:
name: default-agent-pool
work_queue_name: default
job_variables:
image: "{{ build-image.image_name }}:{{ build-image.tag }}"
command: python -m prefect flow-run execute
schedules:
- interval: 86400 # 24 horas
anchor_date: "2022-01-01T21:00:00"
timezone: America/Sao_Paulo
slug: daily-execution
parameters:
param1: valor1
param2: valor2
6. Configuração de Work Pool
Este repositório utiliza dois work pools principais para execução dos deployments Prefect:
-
default-pool: Destinado à execução geral de pipelines, incluindo fluxos que não possuem requisitos especiais de rede ou infraestrutura. É o pool padrão para a maioria dos deployments.
-
datario-pool: Utilizado para pipelines que acessam bancos de dados ou sistemas internos da IplanRio, especialmente aqueles que exigem conexão via VPN. Esse pool garante que os jobs sejam executados em ambientes com acesso seguro e autorizado aos recursos internos.
Para pipelines de dump de banco de dados, use o datario-pool:
work_pool:
name: datario-pool
work_queue_name: default
job_variables:
image: "{{ build-image.image_name }}:{{ build-image.tag }}"
command: python -m prefect flow-run execute
7. Migração de Secrets para Infisical
Para pipelines de dump de banco de dados, é necessário configurar as credenciais de acesso no Infisical. Acesse infisical.iplan.dados.rio e configure os secrets nos projetos:
- prefect-jobs (produção)
- prefect-jobs-staging (staging)
7.1 Estrutura de Pastas
Crie uma pasta com o nome da pipeline seguindo o seguinte padrão. Exemplo para a pipeline db-gestao-escolar:
Pipeline: db-gestao-escolar
├── Pasta: db-gestao-escolar
├── DB_GESTAO_ESCOLAR__DB_USERNAME
├── DB_GESTAO_ESCOLAR__DB_PASSWORD
└── Outras variáveis específicas
7.2 Padrão de Nomenclatura
Para pipelines de ingestão de banco de dados, use sempre o padrão:
{PIPELINE_NAME_UPPER}__DB_USERNAME
{PIPELINE_NAME_UPPER}__DB_PASSWORD
7.3 Configuração no Flow
No seu flow, referencie as variáveis usando o caminho do Infisical:
@flow(log_prints=True)
def dump_database_flow(
# ... outros parâmetros ...
infisical_secret_path: str = "/db-gestao-escolar",
):
# O Prefect irá buscar automaticamente as variáveis
# DB_GESTAO_ESCOLAR__DB_USERNAME e DB_GESTAO_ESCOLAR__DB_PASSWORD
# no caminho /db-gestao-escolar do Infisical
Exemplos Práticos
Exemplo Completo
Flow (flow.py)
from prefect import flow, task
from iplanrio.pipelines_templates.dump_db.tasks import (
dump_upload_batch_task,
format_partitioned_query_task,
)
@flow(log_prints=True)
def dump_database_flow(
db_database: str = "meu_banco",
db_host: str = "localhost",
db_port: str = "1433",
db_type: str = "sql_server",
execute_query: str = "SELECT * FROM minha_tabela",
dataset_id: str = "meu_dataset",
table_id: str = "minha_tabela",
infisical_secret_path: str = "/secrets/database",
dump_mode: str = "overwrite",
batch_size: int = 50000,
biglake_table: bool = True,
):
print(f"Iniciando dump da tabela {table_id}")
# Executar o dump
result = dump_upload_batch_task(
db_database=db_database,
db_host=db_host,
db_port=db_port,
db_type=db_type,
execute_query=execute_query,
dataset_id=dataset_id,
table_id=table_id,
infisical_secret_path=infisical_secret_path,
dump_mode=dump_mode,
batch_size=batch_size,
biglake_table=biglake_table,
)
print(f"Dump concluído: {result}")
return result
Configuração (prefect.yaml)
name: dump-database-pipeline
prefect-version: 3.4.3
build:
- prefect.deployments.steps.run_shell_script:
id: get-commit-hash
script: git rev-parse --short HEAD
stream_output: false
- prefect_docker.deployments.steps.build_docker_image:
id: build-image
requires: prefect-docker>=0.6.5
image_name: seu-registro/imagem
tag: "dump-database-{{ get-commit-hash.stdout }}"
dockerfile: Dockerfile
deployments:
- name: dump-database--prod
version: "{{ build-image.tag }}"
entrypoint: flow.py:dump_database_flow
work_pool:
name: datario-pool
work_queue_name: default
job_variables:
image: "{{ build-image.image_name }}:{{ build-image.tag }}"
command: python -m prefect flow-run execute
schedules:
- interval: 86400
anchor_date: "2022-01-01T21:00:00"
timezone: America/Sao_Paulo
slug: daily-dump
parameters:
db_database: "meu_banco"
table_id: "minha_tabela"
execute_query: "SELECT * FROM minha_tabela WHERE data_atualizacao >= CURRENT_DATE - 1"