/cognite-replicator

A package of scripts for replicating data between CDF tenants

Primary LanguagePythonApache License 2.0Apache-2.0

Cognite logo

Cognite Python Replicator

build codecov Documentation Status PyPI version tox PyPI - Python Version Code style: black

Cognite Replicator is a Python package for replicating data across Cognite Data Fusion (CDF) projects. This package is built on top of the Cognite Python SDK. This component is Community content and not officially supported by Cognite. Bugs and changes will be fixed on a best effort basis. Feel free to open issues and pull requests, we will review them as soon as we can.

Copyright 2023 Cognite AS

Prerequisites

In order to start using the Replicator, you need:

  • Python3 (>= 3.6)
  • Credentials for both the source and destination projects:
    • CLIENT_ID ("Client ID from Azure")
    • CLIENT_SECRET ("Client secret from Azure", only if using authentication via secret)
    • CLUSTER ("Name of CDF cluster")
    • TENANT_ID ("Tenant ID from Azure"
    • PROJECT ("Name of CDF project")

This is how you set the client secret as an environment variable on Mac OS and Linux:

$ export SOURCE_CLIENT_SECRET=<your source client secret>
$ export DEST_CLIENT_SECRET=<your destination client secret>

Installation

The replicator is available on PyPI, and can also be executed .

To run it from command line, run:

pip install cognite-replicator

Alternatively, build and run it as a docker container. The image is avaible on docker hub:

docker build -t cognite-replicator .

Usage

1. Run with a configuration file as a standalone script

Create a configuration file based on the config/default.yml and update the values corresponding to your environment If no file is specified then replicator will use config/default.yml.

via Python

python -m cognite.replicator config/filepath.yml

or alternatively via docker If no access to a browser, you should use the client secret authentication

docker run -e SOURCE_CLIENT_SECRET -e DEST_CLIENT_SECRET -v /absolute/path/to/config/config.yml:/config.yml cognite-replicator /config.yml

2. Setup as Python library

2.1 Without configuration file and interactive login

It will copy everything from source to destination and use your own credentials to run the code, you need to have the right permissions to read on the source project and write on the destination project

import os
import yaml
from cognite.client.credentials import OAuthInteractive
from cognite.client import CogniteClient, ClientConfig
from cognite.replicator import assets, events, files, time_series, datapoints, sequences, sequence_rows

# SOURCE
SOURCE_TENANT_ID = "48d5043c-cf70-4c49-881c-c638f5796997"
SOURCE_CLIENT_ID = "1b90ede3-271e-401b-81a0-a4d52bea3273"
SOURCE_PROJECT = "publicdata"
SOURCE_CLUSTER = "api"

# DESTINATION
DEST_TENANT_ID = "d4febcbc-db24-4823-bffd-92fd05b9c6bc"
DEST_CLIENT_ID = "189e8b95-f1ce-47d2-aa66-4c2fe3567f91"
DEST_PROJECT = "sa-team"
DEST_CLUSTER = "bluefield"

### Autogenerated variables
SOURCE_SCOPES = [f"https://{SOURCE_CLUSTER}.cognitedata.com/.default"]
SOURCE_BASE_URL = f"https://{SOURCE_CLUSTER}.cognitedata.com"
SOURCE_AUTHORITY_URL = f"https://login.microsoftonline.com/{SOURCE_TENANT_ID}"
DEST_SCOPES = [f"https://{DEST_CLUSTER}.cognitedata.com/.default"]
DEST_BASE_URL = f"https://{DEST_CLUSTER}.cognitedata.com"
DEST_AUTHORITY_URL = f"https://login.microsoftonline.com/{DEST_TENANT_ID}"

# Config
BATCH_SIZE = 10000  # this is the max size of a batch to be posted
NUM_THREADS = 10  # this is the max number of threads to be used
TIMEOUT = 90
PORT = 53000

SOURCE_CLIENT = CogniteClient(
    ClientConfig(
        credentials=OAuthInteractive(
            authority_url=SOURCE_AUTHORITY_URL,
            client_id=SOURCE_CLIENT_ID,
            scopes=SOURCE_SCOPES,
        ),
        project=SOURCE_PROJECT,
        base_url=SOURCE_BASE_URL,
        client_name="cognite-replicator-source",
    )
)
DEST_CLIENT = CogniteClient(
    ClientConfig(
        credentials=OAuthInteractive(
            authority_url=DEST_AUTHORITY_URL,
            client_id=DEST_CLIENT_ID,
            scopes=DEST_SCOPES,
        ),
        project=DEST_PROJECT,
        base_url=DEST_BASE_URL,
        client_name="cognite-replicator-destination",
    )
)

if __name__ == "__main__":  # this is necessary because threading

    #### Uncomment the resources you would like to copy
    assets.replicate(SOURCE_CLIENT, DEST_CLIENT)
    #events.replicate(SOURCE_CLIENT, DEST_CLIENT, BATCH_SIZE, NUM_THREADS)
    #files.replicate(SOURCE_CLIENT, DEST_CLIENT, BATCH_SIZE, NUM_THREADS)
    #time_series.replicate(SOURCE_CLIENT, DEST_CLIENT, BATCH_SIZE, NUM_THREADS)
    #datapoints.replicate(SOURCE_CLIENT, DEST_CLIENT)
    #sequences.replicate(SOURCE_CLIENT, DEST_CLIENT, BATCH_SIZE, NUM_THREADS)
    #sequence_rows.replicate(SOURCE_CLIENT, DEST_CLIENT, BATCH_SIZE, NUM_THREADS)

2.2 Without configuration file and with client credentials authentication

It will copy everything from source to destination and use your own credentials to run the code, you need to have the right permissions to read on the source project and write on the destination project (in the example below, the secrets are stored as environment variables)

import os
from cognite.client.credentials import OAuthClientCredentials
from cognite.client import CogniteClient, ClientConfig
from cognite.replicator import assets, events, files, time_series, datapoints, sequences, sequence_rows

# SOURCE
SOURCE_TENANT_ID = "48d5043c-cf70-4c49-881c-c638f5796997"
SOURCE_CLIENT_ID = "1b90ede3-271e-401b-81a0-a4d52bea3273"
SOURCE_CLIENT_SECRET = os.environ.get("SOURCE_CLIENT_SECRET")
SOURCE_PROJECT = "publicdata"
SOURCE_CLUSTER = "api"

# DESTINATION
DEST_TENANT_ID = "d4febcbc-db24-4823-bffd-92fd05b9c6bc"
DEST_CLIENT_ID = "189e8b95-f1ce-47d2-aa66-4c2fe3567f91"
DEST_CLIENT_SECRET = os.environ.get("DEST_CLIENT_SECRET")
DEST_PROJECT = "sa-team"
DEST_CLUSTER = "bluefield"
### Autogenerated variables
SOURCE_SCOPES = [f"https://{SOURCE_CLUSTER}.cognitedata.com/.default"]
SOURCE_BASE_URL = f"https://{SOURCE_CLUSTER}.cognitedata.com"
SOURCE_TOKEN_URL = f"https://login.microsoftonline.com/{SOURCE_TENANT_ID}/oauth2/v2.0/token"
DEST_SCOPES = [f"https://{DEST_CLUSTER}.cognitedata.com/.default"]
DEST_BASE_URL = f"https://{DEST_CLUSTER}.cognitedata.com"
DEST_TOKEN_URL = f"https://login.microsoftonline.com/{DEST_TENANT_ID}/oauth2/v2.0/token"
COGNITE_CONFIG_FILE = "config/config.yml"
# Config
BATCH_SIZE = 10000  # this is the max size of a batch to be posted
NUM_THREADS = 10  # this is the max number of threads to be used
TIMEOUT = 90
PORT = 53000

SOURCE_CLIENT = CogniteClient(
    ClientConfig(
        credentials=OAuthClientCredentials(
            token_url=SOURCE_TOKEN_URL,
            client_id=SOURCE_CLIENT_ID,
            scopes=SOURCE_SCOPES,
            client_secret=SOURCE_CLIENT_SECRET,
        ),
        project=SOURCE_PROJECT,
        base_url=SOURCE_BASE_URL,
        client_name="cognite-replicator-source",
    )
)

DEST_CLIENT = CogniteClient(
    ClientConfig(
        credentials=OAuthClientCredentials(
            token_url=DEST_TOKEN_URL,
            client_id=DEST_CLIENT_ID,
            scopes=DEST_SCOPES,
            client_secret=DEST_CLIENT_SECRET,
        ),
        project=DEST_PROJECT,
        base_url=DEST_BASE_URL,
        client_name="cognite-replicator-destination",
    )
)

if __name__ == "__main__":  # this is necessary because threading

    #### Uncomment the resources you would like to copy
    assets.replicate(SOURCE_CLIENT, DEST_CLIENT)
    #events.replicate(SOURCE_CLIENT, DEST_CLIENT, BATCH_SIZE, NUM_THREADS)
    #files.replicate(SOURCE_CLIENT, DEST_CLIENT, BATCH_SIZE, NUM_THREADS)
    #time_series.replicate(SOURCE_CLIENT, DEST_CLIENT, BATCH_SIZE, NUM_THREADS)
    #datapoints.replicate(SOURCE_CLIENT, DEST_CLIENT)
    #sequences.replicate(SOURCE_CLIENT, DEST_CLIENT, BATCH_SIZE, NUM_THREADS)
    #sequence_rows.replicate(SOURCE_CLIENT, DEST_CLIENT, BATCH_SIZE, NUM_THREADS)

2.3 Alternative by having some elements of the configuration file as variable

Refer to default configuration file or example configuration file for all keys in the configuration file Start with client creation from either step 2.1 or 2.2

if __name__ == "__main__":  # this is necessary because threading
    config = {
        "timeseries_external_ids": ["pi:160670", "pi:160623"],
        "datapoints_start": "100d-ago",
        "datapoints_end": "now",
    }
    time_series.replicate(
        client_src=SOURCE_CLIENT,
        client_dst=DEST_CLIENT,
        batch_size=BATCH_SIZE,
        num_threads=NUM_THREADS,
        config=config,
    )
    datapoints.replicate(
        client_src=SOURCE_CLIENT,
        client_dst=DEST_CLIENT,
        external_ids=config.get("timeseries_external_ids"),
        start=config.get("datapoints_start"),
        end=config.get("datapoints_end"),
    )

3. With configuration file

It will use the configuration file to determine what will be copied In this case, no need to create the client, it will be created based on what is in the configuration file

import yaml
from cognite.replicator.__main__ import main
import os

if __name__ == "__main__":  # this is necessary because threading
    COGNITE_CONFIG_FILE = yaml.safe_load("config/config.yml")
    os.environ["COGNITE_CONFIG_FILE"] = COGNITE_CONFIG_FILE
    main()

4. Local testing

It will use the configuration file to determine what will be copied In this case, no need to create the client, it will be created based on what is in the configuration file

import yaml
import sys
sys.path.append("cognite-replicator") ### Path of the local version of the replicator. Importing from outside of the current working directory requires sys.path, which is a list of all directories Python searches through.
import os

if __name__ == "__main__":  # this is necessary because threading
    COGNITE_CONFIG_FILE = yaml.safe_load("config/config.yml")
    os.environ["COGNITE_CONFIG_FILE"] = COGNITE_CONFIG_FILE
    main()
    sys.path.remove("cognite-replicator")  ## Python will also search these paths for future projects unless they are removed. Removes unwanted search paths

Development

Change the version in the files

Changelog

Wondering about upcoming or previous changes? Take a look at the CHANGELOG.

Contributing

Want to contribute? Check out CONTRIBUTING.