
A minimal Airflow Hook for interacting with Microsoft SQL Server

Alternative Microsoft SQL Server Hook for Airflow

Enables the usage of DbApiHook methods that the provided Hook for SQL Server does not support, such as .get_sqlalchemy_engine and .get_pandas_df.


python -m pip install git+https://github.com/Harduim/mssql_airflow.git


  • Use SQLAlchemy Connections with MsSQLHook.get_sqlalchemy_engine
  • Get a pandas dataframe from a query using MsSQLHook.get_pandas_df
  • Multiline inserts with MsSQLHook.batch_insert_rows
  • All other methods already implemented by DbApiHook

Instructions and sample usage

  • Create a connection on Admin => Conections
    • Conn Id: Name of the conection, used on the parameter mssql_conn_id
    • Conn Type: Microsoft SQL Server
    • Host: The IP address or hostname of the server
    • Schema: The Database not actual schema. Not sure why there is no "database" field, I'm just following Airflow's convention
    • Password: The password
    • Login: The user name. Use 'domain\username' for Windows auth.

Airflow 1.10

from datetime import datetime, timedelta

from airflow import DAG
from airflow.hooks.alternative_mssql_hook import MsSQLHook
from airflow.operators.python_operator import PythonOperator

def sample_usage():
    # Schema is the database, not the actual schema.
    mssql = MsSQLHook(mssql_conn_id="my_conn", schema="some_database")

    # This method (get_pandas_df) does not work with the regular mssql plugin
    df = mssql.get_pandas_df("SELECT * FROM TABLE")

    # All the regular dbapihook methods works
    my_records = mssql.get_records("SELECT col1, col2 FROM THE_TABLE")
    mssql.run("DELETE FROM othet_staging_table_name")

    # Saving data to a staging table using pandas to_sql
    conn = mssql.get_sqlalchemy_engine()
    df.to_sql("staging_table_name", con=conn, if_exists="replace")

with DAG(
    description="""Sample usage of the MSSQL plugin""",
    schedule_interval="00 00 * * *",
        "owner": "Arthur Harduim",
        "depends_on_past": False,
        "start_date": datetime(2020, 12, 1),
        "email": ["arthur@rioenergy.com.br"],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 2,
        "retry_delay": timedelta(minutes=6),
) as dag:
    some_db_tsk = PythonOperator(task_id="some_db_tsk", python_callable=sample_usage)

Airflow 2.0

from datetime import datetime, timedelta

from airflow import DAG

from alternative_mssql_hook import MsSQLHook
from airflow.operators.python import PythonOperator

def sample_usage():
    # Schema is the database, not the actual schema.
    mssql = MsSQLHook(mssql_conn_id="my_conn", schema="some_database")

    # This method (get_pandas_df) does not work with the regular mssql plugin
    df = mssql.get_pandas_df("SELECT * FROM TABLE")

    # All the default dbapihook methods works
    my_records = mssql.get_records("SELECT col1, col2 FROM THE_TABLE")
    mssql.run("DELETE FROM othet_staging_table_name")

    # Saving data to a staging table using pandas to_sql
    conn = mssql.get_sqlalchemy_engine()
    df.to_sql("staging_table_name", con=conn, if_exists="replace")

with DAG(
    description="""Sample usage of the MSSQL plugin""",
    schedule_interval="00 00 * * *",
        "owner": "Arthur Harduim",
        "depends_on_past": False,
        "start_date": datetime(2020, 12, 1),
        "email": ["arthur@rioenergy.com.br"],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 2,
        "retry_delay": timedelta(minutes=6),
) as dag:
    some_db_tsk = PythonOperator(task_id="some_db_tsk", python_callable=sample_usage)