Cognite Replicator is a Python package for replicating data across Cognite Data Fusion (CDF) projects. This package is built on top of the Cognite Python SDK. This component is Community content and not officially supported by Cognite. Bugs and changes will be fixed on a best effort basis. Feel free to open issues and pull requests, we will review them as soon as we can.
Copyright 2023 Cognite AS
In order to start using the Replicator, you need:
- Python3 (>= 3.6)
- Credentials for both the source and destination projects:
- CLIENT_ID ("Client ID from Azure")
- CLIENT_SECRET ("Client secret from Azure", only if using authentication via secret)
- CLUSTER ("Name of CDF cluster")
- TENANT_ID ("Tenant ID from Azure"
- PROJECT ("Name of CDF project")
This is how you set the client secret as an environment variable on Mac OS and Linux:
$ export SOURCE_CLIENT_SECRET=<your source client secret>
$ export DEST_CLIENT_SECRET=<your destination client secret>
The replicator is available on PyPI, and can also be executed .
To run it from command line, run:
pip install cognite-replicator
Alternatively, build and run it as a docker container. The image is avaible on docker hub:
docker build -t cognite-replicator .
Create a configuration file based on the config/default.yml and update the values corresponding to your environment If no file is specified then replicator will use config/default.yml.
via Python
python -m cognite.replicator config/filepath.yml
or alternatively via docker If no access to a browser, you should use the client secret authentication
docker run -e SOURCE_CLIENT_SECRET -e DEST_CLIENT_SECRET -v /absolute/path/to/config/config.yml:/config.yml cognite-replicator /config.yml
It will copy everything from source to destination and use your own credentials to run the code, you need to have the right permissions to read on the source project and write on the destination project
import os
import yaml
from cognite.client.credentials import OAuthInteractive
from cognite.client import CogniteClient, ClientConfig
from cognite.replicator import assets, events, files, time_series, datapoints, sequences, sequence_rows
# SOURCE
SOURCE_TENANT_ID = "48d5043c-cf70-4c49-881c-c638f5796997"
SOURCE_CLIENT_ID = "1b90ede3-271e-401b-81a0-a4d52bea3273"
SOURCE_PROJECT = "publicdata"
SOURCE_CLUSTER = "api"
# DESTINATION
DEST_TENANT_ID = "d4febcbc-db24-4823-bffd-92fd05b9c6bc"
DEST_CLIENT_ID = "189e8b95-f1ce-47d2-aa66-4c2fe3567f91"
DEST_PROJECT = "sa-team"
DEST_CLUSTER = "bluefield"
### Autogenerated variables
SOURCE_SCOPES = [f"https://{SOURCE_CLUSTER}.cognitedata.com/.default"]
SOURCE_BASE_URL = f"https://{SOURCE_CLUSTER}.cognitedata.com"
SOURCE_AUTHORITY_URL = f"https://login.microsoftonline.com/{SOURCE_TENANT_ID}"
DEST_SCOPES = [f"https://{DEST_CLUSTER}.cognitedata.com/.default"]
DEST_BASE_URL = f"https://{DEST_CLUSTER}.cognitedata.com"
DEST_AUTHORITY_URL = f"https://login.microsoftonline.com/{DEST_TENANT_ID}"
# Config
BATCH_SIZE = 10000 # this is the max size of a batch to be posted
NUM_THREADS = 10 # this is the max number of threads to be used
TIMEOUT = 90
PORT = 53000
SOURCE_CLIENT = CogniteClient(
ClientConfig(
credentials=OAuthInteractive(
authority_url=SOURCE_AUTHORITY_URL,
client_id=SOURCE_CLIENT_ID,
scopes=SOURCE_SCOPES,
),
project=SOURCE_PROJECT,
base_url=SOURCE_BASE_URL,
client_name="cognite-replicator-source",
)
)
DEST_CLIENT = CogniteClient(
ClientConfig(
credentials=OAuthInteractive(
authority_url=DEST_AUTHORITY_URL,
client_id=DEST_CLIENT_ID,
scopes=DEST_SCOPES,
),
project=DEST_PROJECT,
base_url=DEST_BASE_URL,
client_name="cognite-replicator-destination",
)
)
if __name__ == "__main__": # this is necessary because threading
#### Uncomment the resources you would like to copy
assets.replicate(SOURCE_CLIENT, DEST_CLIENT)
#events.replicate(SOURCE_CLIENT, DEST_CLIENT, BATCH_SIZE, NUM_THREADS)
#files.replicate(SOURCE_CLIENT, DEST_CLIENT, BATCH_SIZE, NUM_THREADS)
#time_series.replicate(SOURCE_CLIENT, DEST_CLIENT, BATCH_SIZE, NUM_THREADS)
#datapoints.replicate(SOURCE_CLIENT, DEST_CLIENT)
#sequences.replicate(SOURCE_CLIENT, DEST_CLIENT, BATCH_SIZE, NUM_THREADS)
#sequence_rows.replicate(SOURCE_CLIENT, DEST_CLIENT, BATCH_SIZE, NUM_THREADS)
It will copy everything from source to destination and use your own credentials to run the code, you need to have the right permissions to read on the source project and write on the destination project (in the example below, the secrets are stored as environment variables)
import os
from cognite.client.credentials import OAuthClientCredentials
from cognite.client import CogniteClient, ClientConfig
from cognite.replicator import assets, events, files, time_series, datapoints, sequences, sequence_rows
# SOURCE
SOURCE_TENANT_ID = "48d5043c-cf70-4c49-881c-c638f5796997"
SOURCE_CLIENT_ID = "1b90ede3-271e-401b-81a0-a4d52bea3273"
SOURCE_CLIENT_SECRET = os.environ.get("SOURCE_CLIENT_SECRET")
SOURCE_PROJECT = "publicdata"
SOURCE_CLUSTER = "api"
# DESTINATION
DEST_TENANT_ID = "d4febcbc-db24-4823-bffd-92fd05b9c6bc"
DEST_CLIENT_ID = "189e8b95-f1ce-47d2-aa66-4c2fe3567f91"
DEST_CLIENT_SECRET = os.environ.get("DEST_CLIENT_SECRET")
DEST_PROJECT = "sa-team"
DEST_CLUSTER = "bluefield"
### Autogenerated variables
SOURCE_SCOPES = [f"https://{SOURCE_CLUSTER}.cognitedata.com/.default"]
SOURCE_BASE_URL = f"https://{SOURCE_CLUSTER}.cognitedata.com"
SOURCE_TOKEN_URL = f"https://login.microsoftonline.com/{SOURCE_TENANT_ID}/oauth2/v2.0/token"
DEST_SCOPES = [f"https://{DEST_CLUSTER}.cognitedata.com/.default"]
DEST_BASE_URL = f"https://{DEST_CLUSTER}.cognitedata.com"
DEST_TOKEN_URL = f"https://login.microsoftonline.com/{DEST_TENANT_ID}/oauth2/v2.0/token"
COGNITE_CONFIG_FILE = "config/config.yml"
# Config
BATCH_SIZE = 10000 # this is the max size of a batch to be posted
NUM_THREADS = 10 # this is the max number of threads to be used
TIMEOUT = 90
PORT = 53000
SOURCE_CLIENT = CogniteClient(
ClientConfig(
credentials=OAuthClientCredentials(
token_url=SOURCE_TOKEN_URL,
client_id=SOURCE_CLIENT_ID,
scopes=SOURCE_SCOPES,
client_secret=SOURCE_CLIENT_SECRET,
),
project=SOURCE_PROJECT,
base_url=SOURCE_BASE_URL,
client_name="cognite-replicator-source",
)
)
DEST_CLIENT = CogniteClient(
ClientConfig(
credentials=OAuthClientCredentials(
token_url=DEST_TOKEN_URL,
client_id=DEST_CLIENT_ID,
scopes=DEST_SCOPES,
client_secret=DEST_CLIENT_SECRET,
),
project=DEST_PROJECT,
base_url=DEST_BASE_URL,
client_name="cognite-replicator-destination",
)
)
if __name__ == "__main__": # this is necessary because threading
#### Uncomment the resources you would like to copy
assets.replicate(SOURCE_CLIENT, DEST_CLIENT)
#events.replicate(SOURCE_CLIENT, DEST_CLIENT, BATCH_SIZE, NUM_THREADS)
#files.replicate(SOURCE_CLIENT, DEST_CLIENT, BATCH_SIZE, NUM_THREADS)
#time_series.replicate(SOURCE_CLIENT, DEST_CLIENT, BATCH_SIZE, NUM_THREADS)
#datapoints.replicate(SOURCE_CLIENT, DEST_CLIENT)
#sequences.replicate(SOURCE_CLIENT, DEST_CLIENT, BATCH_SIZE, NUM_THREADS)
#sequence_rows.replicate(SOURCE_CLIENT, DEST_CLIENT, BATCH_SIZE, NUM_THREADS)
Refer to default configuration file or example configuration file for all keys in the configuration file Start with client creation from either step 2.1 or 2.2
if __name__ == "__main__": # this is necessary because threading
config = {
"timeseries_external_ids": ["pi:160670", "pi:160623"],
"datapoints_start": "100d-ago",
"datapoints_end": "now",
}
time_series.replicate(
client_src=SOURCE_CLIENT,
client_dst=DEST_CLIENT,
batch_size=BATCH_SIZE,
num_threads=NUM_THREADS,
config=config,
)
datapoints.replicate(
client_src=SOURCE_CLIENT,
client_dst=DEST_CLIENT,
external_ids=config.get("timeseries_external_ids"),
start=config.get("datapoints_start"),
end=config.get("datapoints_end"),
)
It will use the configuration file to determine what will be copied In this case, no need to create the client, it will be created based on what is in the configuration file
import yaml
from cognite.replicator.__main__ import main
import os
if __name__ == "__main__": # this is necessary because threading
COGNITE_CONFIG_FILE = yaml.safe_load("config/config.yml")
os.environ["COGNITE_CONFIG_FILE"] = COGNITE_CONFIG_FILE
main()
It will use the configuration file to determine what will be copied In this case, no need to create the client, it will be created based on what is in the configuration file
import yaml
import sys
sys.path.append("cognite-replicator") ### Path of the local version of the replicator. Importing from outside of the current working directory requires sys.path, which is a list of all directories Python searches through.
import os
if __name__ == "__main__": # this is necessary because threading
COGNITE_CONFIG_FILE = yaml.safe_load("config/config.yml")
os.environ["COGNITE_CONFIG_FILE"] = COGNITE_CONFIG_FILE
main()
sys.path.remove("cognite-replicator") ## Python will also search these paths for future projects unless they are removed. Removes unwanted search paths
Change the version in the files
Wondering about upcoming or previous changes? Take a look at the CHANGELOG.
Want to contribute? Check out CONTRIBUTING.