Provides a simple hook and operators to migrate your Salesforce data to another database.
This can be useful when all of your data is already stored in a warehouse seperate from Salesforce and all of your data analytics/visualization tools are also tied to a different warehouse.
Copyright 2016 Giovanni Briggs
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Here is an example of using the hook and operator to move data from Salesforce into BigQuery:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators import SalesforceToFileOperator
from airflow.contrib.operators import file_to_gcs
from airflow.contrib.operators import gcs_to_bq
import os
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['gbriggs2012@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('bifrost', default_args=default_args)
output_filename = "account.json"
output_filepath = os.path.join(".", output_filename)
output_schemaname = "account_schema.json"
output_schema = os.path.join(".", output_schemaname)
# these values can and should change based on your unique setup
# the GCS BUCKET is the bucket on Google Cloud Storage where you want the files to live
# GCS_CONN_ID is the name of the Airflow connection that holds your credentials to connect to the Google API
# SF_CONN_ID is the name of the Airflow connection that holds your crednetials to connect to your Salesforce API
#
# To setup connections:
# - launch the airflow webserver:
# >$ airflow webserver
# - Go to Admin->Connections and then click on "Create"
# - select the corresponding connection type and enter in your info
# - For Google Cloud Services use "Google Cloud Platform"
# - For Salesforce use "HTTP"
# * username: your salesforce username
# * passsword: your salesforce password
# * extra: you should put a JSON structure here that contains your security token if your SF implemntation requires it
# - {"security_token": "YOUR_SECURITY_TOKEN_HERE"}
GCS_BUCKET = "YOUR_BUCKET_HERE"
GCS_CONN_ID = "google_cloud_storage_default"
SF_CONN_ID = "salesforce_conn"
# query salesforce
# the SalesforceToFileOperator takes in a conection name
# to define the connection, go to Admin -> Connections
# it uses the HTTP connection type
# the security token is included in the "Extras" field, which allows you to define extra attributes in a JSON format
# {"security_token":"asdasdasd"}
t1 = SalesforceToFileOperator(
task_id = "get_model_salesforce",
obj = "Account", # the object we are querying
#fields = ["Name", "Id"], # you can use this to limit the fields that are queried
conn_id = SF_CONN_ID, # name of the Airflow connection that has our SF credentials
output = output_filepath, # file where the resulting data is stored
output_schemafile = output_schema, # tell the operator that we want a file of the resulting schema
output_schematype = "BQ", # specify that we want to generate a schema file for BigQuery
fmt = "ndjson", # write the file as newline deliminated json. Other options include CSV and JSON
record_time_added = True, # add a column to the output that records the time at which the data was fetched
coerce_to_timestamp = True, # coerce all date and datetime fields into Unix timestamps (UTC)
dag = dag
)
# push result to GCS
t2a = file_to_gcs.FileToGoogleCloudStorageOperator(
task_id = "model_to_gcs",
dst = output_filename,
bucket = GCS_BUCKET,
conn_id = GCS_CONN_ID,
src = output_filepath,
dag = dag
)
# push schema to GCS
t2b = file_to_gcs.FileToGoogleCloudStorageOperator(
task_id = "model_schema_to_gcs",
dst = output_schemaname,
bucket = GCS_BUCKET,
conn_id = GCS_CONN_ID,
src = output_schema,
dag = dag
)
# move to BigQuery
# Create and write disposition settings and descriptions
# https://cloud.google.com/bigquery/docs/reference/v2/jobs
#
# Also, valid source formats are not made particularly clear, but this page describes them:
# https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).sourceFormat
t3 = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
task_id = "file_to_bigquery",
bucket = GCS_BUCKET,
source_objects = [output_filename],
schema_object = output_schemaname,
source_format = "NEWLINE_DELIMITED_JSON",
destination_project_dataset_table = "scratch.sf_account",
create_disposition = "CREATE_IF_NEEDED",
write_disposition = "WRITE_APPEND",
dag = dag
)
# moving the files can happen at the same time as soon as the first task as finished
t1.set_downstream(t2a)
t1.set_downstream(t2b)
# moving the data with the appropriate schema can't happen until both of those files are uploaded to GCS
t3.set_upstream([t2a, t2b])