/prefect-gcp

Prefect integrations with Google Cloud Platform.

Primary LanguagePythonApache License 2.0Apache-2.0

prefect-gcp

PyPI

Welcome!

prefect-gcp is a collection of prebuilt Prefect tasks that can be used to quickly construct Prefect flows.

Getting Started

Python setup

Requires an installation of Python 3.7+.

We recommend using a Python virtual environment manager such as pipenv, conda or virtualenv.

These tasks are designed to work with Prefect 2.0. For more information about how to use Prefect, please refer to the Prefect documentation.

Installation

To use prefect-gcp and Cloud Run:

pip install prefect-gcp

To use Cloud Storage:

pip install "prefect-gcp[cloud_storage]"

To use BigQuery:

pip install "prefect-gcp[bigquery]"

To use Secret Manager:

pip install "prefect-gcp[secret_manager]"

To use Vertex AI:

pip install "prefect-gcp[aiplatform]"

A list of available blocks in prefect-gcp and their setup instructions can be found here.

Write and run a flow

Download blob from bucket

from prefect import flow
from prefect_gcp.cloud_storage import GcsBucket

@flow
def donwload_flow():
    gcs_bucket = GcsBucket.load("my-bucket")
    path = gcs_bucket.download_object_to_path("my_folder/notes.txt", "notes.txt")
    return path

download_flow()

Deploy command on Cloud Run

Save the following as prefect_gcp_flow.py:

from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.cloud_run import CloudRunJob

@flow
def cloud_run_job_flow():
    cloud_run_job = CloudRunJob(
        image="us-docker.pkg.dev/cloudrun/container/job:latest",
        credentials=GcpCredentials.load("MY_BLOCK_NAME"),
        region="us-central1",
        command=["echo", "hello world"],
    )
    return cloud_run_job.run()

Deploy prefect_gcp_flow.py:

from prefect.deployments import Deployment
from prefect_gcp_flow import cloud_run_job_flow

deployment = Deployment.build_from_flow(
    flow=cloud_run_job_flow,
    name="cloud_run_job_deployment", 
    version=1, 
    work_queue_name="demo",
)
deployment.apply()

Run the deployment either on the UI or through the CLI:

prefect deployment run cloud-run-job-flow/cloud_run_job_deployment

Visit Prefect Deployments for more information about deployments.

Get Google auth credentials from GcpCredentials

To instantiate a Google Cloud client, like bigquery.Client, GcpCredentials is not a valid input. Instead, use the get_credentials_from_service_account method.

import google.cloud.bigquery
from prefect import flow
from prefect_gcp import GcpCredentials

@flow
def create_bigquery_client():
    gcp_credentials_block = GcpCredentials.load("BLOCK_NAME")
    google_auth_credentials = gcp_credentials_block.get_credentials_from_service_account()
    bigquery_client = bigquery.Client(credentials=google_auth_credentials)

Or simply call get_bigquery_client from GcpCredentials.

from prefect import flow
from prefect_gcp import GcpCredentials

@flow
def create_bigquery_client():
    gcp_credentials_block = GcpCredentials.load("BLOCK_NAME")
    bigquery_client = gcp_credentials_block.get_bigquery_client()

Deploy command on Vertex AI as a flow

Save the following as prefect_gcp_flow.py:

from prefect import flow
from prefect_gcp.credentials import GcpCredentials
from prefect_gcp.aiplatform import VertexAICustomTrainingJob

@flow
def vertex_ai_job_flow():
    gcp_credentials = GcpCredentials.load("MY_BLOCK")
    job = VertexAICustomTrainingJob(
        command=["echo", "hello world"],
        region="us-east1",
        image="us-docker.pkg.dev/cloudrun/container/job:latest",
        gcp_credentials=gcp_credentials,
    )
    job.run()

vertex_ai_job_flow()

Deploy prefect_gcp_flow.py:

from prefect.deployments import Deployment
from prefect_gcp_flow import vertex_ai_job_flow

deployment = Deployment.build_from_flow(
    flow=vertex_ai_job_flow,
    name="vertex-ai-job-deployment", 
    version=1, 
    work_queue_name="demo",
)
deployment.apply()

Run the deployment either on the UI or through the CLI:

prefect deployment run vertex-ai-job-flow/vertex-ai-job-deployment

Visit Prefect Deployments for more information about deployments.

Use with_options to customize options on any existing task or flow

from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.cloud_storage import cloud_storage_download_blob_as_bytes

custom_download = cloud_storage_download_blob_as_bytes.with_options(
    name="My custom task name",
    retries=2,
    retry_delay_seconds=10,
)
 
 @flow
 def example_with_options_flow():
    gcp_credentials = GcpCredentials(
        service_account_file="/path/to/service/account/keyfile.json")
    contents = custom_download("bucket", "blob", gcp_credentials)
    return contents()
 
example_with_options_flow()

For more tips on how to use tasks and flows in a Collection, check out Using Collections!

Resources

If you encounter any bugs while using prefect-gcp, feel free to open an issue in the prefect-gcp repository.

If you have any questions or issues while using prefect-gcp, you can find help in either the Prefect Discourse forum or the Prefect Slack community.

Feel free to ⭐️ or watch prefect-gcp for updates too!

Development

If you'd like to install a version of prefect-gcp for development, clone the repository and perform an editable install with pip:

git clone https://github.com/PrefectHQ/prefect-gcp.git

cd prefect-gcp/

pip install -e ".[dev]"

# Install linting pre-commit hooks
pre-commit install