A minimal Airflow Hook for interacting with Microsoft SQL Server
Enables the usage of DbApiHook methods that the provided Hook for SQL Server does not support, such as .get_sqlalchemy_engine and .get_pandas_df.
python -m pip install git+https://github.com/Harduim/mssql_airflow.git
- Use SQLAlchemy Connections with
MsSQLHook.get_sqlalchemy_engine
- Get a pandas dataframe from a query using
MsSQLHook.get_pandas_df
- Multiline inserts with
MsSQLHook.batch_insert_rows
- All other methods already implemented by DbApiHook
- Create a connection on Admin => Conections
- Conn Id: Name of the conection, used on the parameter mssql_conn_id
- Conn Type: Microsoft SQL Server
- Host: The IP address or hostname of the server
- Schema: The Database not actual schema. Not sure why there is no "database" field, I'm just following Airflow's convention
- Password: The password
- Login: The user name. Use 'domain\username' for Windows auth.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.hooks.alternative_mssql_hook import MsSQLHook
from airflow.operators.python_operator import PythonOperator
def sample_usage():
# Schema is the database, not the actual schema.
mssql = MsSQLHook(mssql_conn_id="my_conn", schema="some_database")
# This method (get_pandas_df) does not work with the regular mssql plugin
df = mssql.get_pandas_df("SELECT * FROM TABLE")
# All the regular dbapihook methods works
my_records = mssql.get_records("SELECT col1, col2 FROM THE_TABLE")
mssql.run("DELETE FROM othet_staging_table_name")
# Saving data to a staging table using pandas to_sql
conn = mssql.get_sqlalchemy_engine()
df.to_sql("staging_table_name", con=conn, if_exists="replace")
with DAG(
"Sample_DAG",
description="""Sample usage of the MSSQL plugin""",
schedule_interval="00 00 * * *",
default_args={
"owner": "Arthur Harduim",
"depends_on_past": False,
"start_date": datetime(2020, 12, 1),
"email": ["arthur@rioenergy.com.br"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(minutes=6),
},
catchup=False,
) as dag:
some_db_tsk = PythonOperator(task_id="some_db_tsk", python_callable=sample_usage)
from datetime import datetime, timedelta
from airflow import DAG
from alternative_mssql_hook import MsSQLHook
from airflow.operators.python import PythonOperator
def sample_usage():
# Schema is the database, not the actual schema.
mssql = MsSQLHook(mssql_conn_id="my_conn", schema="some_database")
# This method (get_pandas_df) does not work with the regular mssql plugin
df = mssql.get_pandas_df("SELECT * FROM TABLE")
# All the default dbapihook methods works
my_records = mssql.get_records("SELECT col1, col2 FROM THE_TABLE")
mssql.run("DELETE FROM othet_staging_table_name")
# Saving data to a staging table using pandas to_sql
conn = mssql.get_sqlalchemy_engine()
df.to_sql("staging_table_name", con=conn, if_exists="replace")
with DAG(
"Sample_DAG",
description="""Sample usage of the MSSQL plugin""",
schedule_interval="00 00 * * *",
default_args={
"owner": "Arthur Harduim",
"depends_on_past": False,
"start_date": datetime(2020, 12, 1),
"email": ["arthur@rioenergy.com.br"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(minutes=6),
},
catchup=False,
) as dag:
some_db_tsk = PythonOperator(task_id="some_db_tsk", python_callable=sample_usage)