Connect to SQL data
To connect to your SQL data, you first create a Data Source which tells GX where your database resides and how to connect to it. You then configure Data Assets for your Data Source to tell GX which sets of records you want to be able to access from your Data Source. Finally, you will define Batch Definitions which allow you to request all the records retrieved from a Data Asset or further partition the returned records based on the contents of a date and time field.
GX supports the following SQL dialects:
- PostgreSQL
- SQLite
- Snowflake
- Databricks SQL
- BigQuery SQL
All other SQL dialects are handled through the python module SQLAlchemy
. You can find more information on the dialects supported by SQLAlchemy
on their dialects page.
Configure credentials
To connect GX to your SQL data, you will need your connection string and corresponding credentials. Because your connection string and credentials provide access to your data they should be stored securely outside of version control. GX Core allows you to securely store credentials and connection strings as environment variables or in an uncommitted config file. These variables are then accessed through string substitution in your version controlled code.
Prerequisites
- The ability to set environment variables or a File Data Context.
GX Core also supports referencing credentials that have been stored in the AWS Secrets Manager, Google Cloud Secret Manager, and Azure Key Vault secrets managers. To set up GX Core to access one of these secrets managers you will additionally require:
- The ability to install Python modules with
pip
.
Procedure
-
Determine your connection string format.
Different types of SQL database have different formats for their connection string. In the following table, the text in
<>
corresponds to the values specific to your credentials and connection string.Database type Connection string PostgreSQL postgresql+psycopg2://<USERNAME>:<PASSWORD>@<HOST>:<PORT>/<DATABASE>
SQLite sqlite:///<PATH_TO_DB_FILE>
Snowflake snowflake://<USER_NAME>:<PASSWORD>@<ACCOUNT_NAME>/<DATABASE_NAME>/<SCHEMA_NAME>?warehouse=<WAREHOUSE_NAME>&role=<ROLE_NAME>&application=great_expectations_oss
You have the option to connect to Snowflake with key-pair authentication instead of a password.Databricks SQL databricks://token:<TOKEN>@<HOST>:<PORT>?http_path=<HTTP_PATH>&catalog=<CATALOG>&schema=<SCHEMA>
BigQuery SQL bigquery://<GCP_PROJECT>/<BIGQUERY_DATASET>?credentials_path=/path/to/your/credentials.json
Other connection string formats are valid provided they are for a SQL database that is supported by SQLAlchemy. You can find more information on the dialects supported by
SQLAlchemy
on their dialects page. -
Store the credentials required for your connection.
GX supports the following methods of securely storing credentials. Chose one to implement for your connection:
- Environment Variables
- config.yml
- Key pair (Snowflake only)
Environment variables provide the quickest way to securely set up your credentials.
You can set environment variables by replacing the values in
<>
with your information and enteringexport <VARIABLE_NAME>=<VALUE>
commands in the terminal or adding the commands to your~/.bashrc
file. If you use theexport
command from the terminal, the environment variables will not persist beyond the current session. If you add them to the~/.bashrc
file, the variables will be exported each time you log in.You can export individual credentials or an entire connection string. For example:
Terminal or ~/.bashrcexport MY_POSTGRES_USERNAME=<USERNAME>
export MY_POSTGRES_PASSWORD=<PASSWORD>or:
Terminal or ~/.bashrcexport POSTGRES_CONNECTION_STRING=postgresql+psycopg2://<USERNAME>:<PASSWORD>@<HOST>:<PORT>/<DATABASE>
You can also reference your stored credentials within a stored connection string by wrapping their corresponding variable in
${
and}
. For example:Terminal or ~/.bashrcexport MY_POSTGRES_USERNAME=<USERNAME>
export MY_POSTGRES_PASSWORD=<PASSWORD>
export POSTGRES_CONNECTION_STRING=postgresql+psycopg2://${MY_POSTGRES_USERNAME}:${MY_POSTGRES_PASSWORD}@<HOST>:<PORT>/<DATABASE>Because the dollar sign character
$
is used to indicate the start of a string substitution they should be escaped using a backslash\
if they are part of your credentials. For example, if your password ispa$$word
then in the previous examples you would use the command:Terminal or ~/.bashrcexport MY_POSTGRES_PASSWORD=pa\$\$word
YAML files make variables more visible, are easier to edit, and allow for modularization. For example, you can create a YAML file for development and testing and another for production.
A File Data Context is required before you can configure credentials in a YAML file. By default, the credentials file in a File Data Context is located at
/great_expectations/uncommitted/config_variables.yml
. Theuncommitted/
directory is included in a default.gitignore
and will be excluded from version control.Save your access credentials or the database connection string to
great_expectations/uncommitted/config_variables.yml
. For example:config_variables.ymlMY_POSTGRES_USERNAME: <USERNAME>
MY_POSTGRES_PASSWORD: <PASSWORD>or:
config_variables.ymlPOSTGRES_CONNECTION_STRING: postgresql+psycopg2://<USERNAME>:<PASSWORD>@<HOST>:<PORT>/<DATABASE>
You can also reference your stored credentials within a stored connection string by wrapping their corresponding variable in
${
and}
. For example:config_variables.ymlMY_POSTGRES_USERNAME: <USERNAME>
MY_POSTGRES_PASSWORD: <PASSWORD>
POSTGRES_CONNECTION_STRING: postgresql+psycopg2://${MY_POSTGRES_USERNAME}:${MY_POSTGRES_PASSWORD}@<HOST>:<PORT>/<DATABASE>Because the dollar sign character
$
is used to indicate the start of a string substitution they should be escaped using a backslash\
if they are part of your credentials. For example, if your password ispa$$word
then in the previous examples you would use the command:Terminalexport MY_POSTGRES_PASSWORD=pa\$\$word
If you're connecting to Snowflake, you can use key-pair authentication instead of a password. This improves security and can be helpful for automations.
Follow Snowflake's docs to configure and store the private and public keys.
-
Access your credentials in Python strings.
Securely stored credentials are accessed via string substitution. You can reference your credentials in a Python string by wrapping the variable name in
${
and}
. Using individual credentials would look like:Pythonconnection_string="postgresql+psycopg2://${MY_POSTGRES_USERNAME}:${MY_POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DATABASE}",
Or you could reference a configured variable that contains the full connection string by providing a Python string that contains just a reference to that variable:
Pythonconnection_string="${POSTGRES_CONNECTION_STRING}"
When you pass a string that references your stored credentials to a GX Core method that requires a connection string as a parameter the referenced variable will be substituted for the corresponding stored value.
Securely stored credentials are accessed via string substitution. You can reference your credentials in a Python string by wrapping the variable name in
${
and}
. Using individual credentials would look like:Pythonconnection_string="postgresql+psycopg2://${MY_POSTGRES_USERNAME}:${MY_POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DATABASE}",
Or you could reference a configured variable that contains the full connection string by providing a Python string that contains just a reference to that variable:
Pythonconnection_string="${POSTGRES_CONNECTION_STRING}"
When you pass a string that references your stored credentials to a GX Core method that requires a connection string as a parameter the referenced variable will be substituted for the corresponding stored value.
To use key-pair authentication for Snowflake, you will pass the private key as a connection argument with
kwargs
in addition to passing connection details with theconnection_string
parameter. Here's an example of how to access your private key in Python.Pythonimport pathlib
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
PRIVATE_KEY_FILE = pathlib.Path("path/to/my/rsa_key.p8").resolve(strict=True)
p_key = serialization.load_pem_private_key(
PRIVATE_KEY_FILE.read_bytes(),
password=b"my_password",
backend=default_backend()
)
pkb = p_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption())
connect_args = {"private_key": pkb} -
Optional. Access credentials stored in a secret manager.
GX Core supports the AWS Secrets Manager, Google Cloud Secret Manager, and Azure Key Vault secrets managers. For more information on how to set up string substitutions that pull credentials from these sources, see Access secrets managers.
Create a SQL Data Source
Data Sources tell GX where your data is located and how to connect to it. With SQL databases this is done through a connection string you will provide.
Prerequisites
- Python version 3.9 to 3.12.
- An installation of GX Core with support for SQL dependencies
- A preconfigured Data Context.
- Credentials stored securely outside of version control.
Procedure
- Instructions
- Sample code
-
Import GX and instantiate a Data Context:
Pythonimport great_expectations as gx
context = gx.get_context() -
Define a name and connection string for your Data Source.
You can assign any name to a Data Source as long as it is unique within your Data Context.
Your connection string or credentials should not be saved in plain text in your code. Instead, you should reference a securely stored connection string or credentials through string substitution. The guidance on how to Configure your credentials covers how to determine the format of your connection string, securely store your connection string or credentials, and how to reference your connection string or credentials in Python.
The following code defines a Data Source name and references a PostgreSQL connection string that has been securely stored in its entirety:
Pythondatasource_name = "my_new_datasource"
my_connection_string = "${POSTGRESQL_CONNECTION_STRING}" -
Create a Data Source.
GX Core provides specific methods for creating Data Sources that correspond to supported SQL dialects. All of these methods are accessible from the
data_sources
attribute of your Data Context. Reference the following table to determine the method used for your data's SQL dialect:Database type Data Context method PostgreSQL context.data_sources.add_postgres(name: str, connection_string: str)
SQLite context.data_sources.add_sqlite(name: str, connection_string: str)
Snowflake context.data_sources.add_snowflake(name:str, connection_string: str)
DataBricks SQL context.data_sources.add_databricks_sql(name: str, connection_string: str)
Other SQL context.data_sources.add_sql(name: str, connection_string:str)
Once you have the method for your data's SQL dialect, you can call it with the previously defined Data Source name and connection string to create your Data Source. The following example creates a PostgreSQL Data Source:
Pythondata_source = context.data_sources.add_postgres(
name=datasource_name, connection_string=my_connection_string
) -
Optional. If you're connecting to Snowflake and want to use key-pair authentication instead of a password, pass the private key with
kwargs
. Note that a placeholder password is still required to pass the configuration validation, but the password will not be used if aprivate_key
is provided.Python# For details on how to access your private key, refer to "Configure credentials" above
connect_args = {"private_key": pkb}
connection_details={
"account": "accountname.region",
"user": "my_user",
"role": "my_role",
"password": "placeholder_value", # must be provided to pass validation but will be ignored
"warehouse": "my_wh",
"database": "my_db",
"schema": "my_schema"
}
data_source = context.sources.add_snowflake(
name=datasource_name,
connection_string=connection_details,
kwargs={"connect_args": connect_args}
)Private key serialized in File Data ContextIf you're using a File Data Context,
kwargs
will be serialized togreat_expectations.yml
, including the private key. -
Optional. Verify the Data Source is connected:
Pythonprint(context.data_sources.get(datasource_name))
The details of your Data Source are retrieved from the Data Context and displayed.
import great_expectations as gx
context = gx.get_context()
datasource_name = "my_new_datasource"
my_connection_string = "${POSTGRESQL_CONNECTION_STRING}"
data_source = context.data_sources.add_postgres(
name=datasource_name, connection_string=my_connection_string
)
print(context.data_sources.get(datasource_name))
Create a Data Asset
Data Assets are collections of records within a Data Source. With SQL Data Sources, a Data Asset can consist of the records from a specific table or the records from a specified query.
Prerequisites
- A preconfigured Data Context. The variable
context
is used for your Data Context in the following example code. - A Data Source connected to SQL data.
Procedure
- Instructions
- Sample code
-
Retrieve your Data Source.
Replace the value of
data_source_name
with the name of your Data Source and execute the following code to retrieve an existing Data Source from your Data Context:Pythondata_source_name = "my_new_datasource"
data_source = context.data_sources.get(data_source_name) -
Add a Data Asset to your Data Source.
- Table Data Asset
- Query Data Asset
A Table Data Asset consists of the records in a single table. It takes two required parameters:
- table_name: The name of the SQL table that the Table Data Asset will retrieve records from.
- name: The name used to reference the Table Data Asset within GX. You may assign this arbitrarily, but all Data Assets within the same Data Source must have unique names.
Replace the values of
asset_name
anddatabase_table_name
in the following code, then execute it to add a Table Data Asset to your Data Source:Pythonasset_name = "MY_TABLE_ASSET"
database_table_name = "postgres_taxi_data"
table_data_asset = data_source.add_table_asset(
table_name=database_table_name, name=asset_name
)A Query Data Asset consists of the records returned by a SQL query. It takes two required parameters:
- query: The SQL query that the Data Asset will retrieve records from.
- name: The name used to reference the Query Data Asset within GX. You may assign this arbitrarily, but all Data Assets within the same Data Source must have unique names.
Replace the values of
asset_name
andasset_query
in the following code, then execute it to add a Query Data Asset to your Data Source:Pythonasset_name = "MY_QUERY_ASSET"
asset_query = "SELECT * from postgres_taxi_data"
query_data_asset = data_source.add_query_asset(query=asset_query, name=asset_name) -
Optional. Verify that your Data Asset was added to your Data Source:
Pythonprint(data_source.assets)
A list of Data Assets is printed. You can verify your Data Asset was created and added to the Data Source by checking for its name in the printed list.
-
Optional. Add additional Data Assets to your Data Source.
A Data Source can have multiple Data Assets. Repeat this procedure to add additional Table or Query Data Assets to your Data Source.
import great_expectations as gx
context = gx.get_context()
# Set up: Create a Data Source
datasource_name = "my_new_datasource"
my_connection_string = "${POSTGRESQL_CONNECTION_STRING}"
data_source = context.data_sources.add_postgres(
name=datasource_name, connection_string=my_connection_string
)
print(context.data_sources.get(datasource_name))
# Alternatively, fetch a Data Source from the Data Context.
data_source_name = "my_new_datasource"
data_source = context.data_sources.get(data_source_name)
# Example of creating a Table Asset
asset_name = "MY_TABLE_ASSET"
database_table_name = "postgres_taxi_data"
table_data_asset = data_source.add_table_asset(
table_name=database_table_name, name=asset_name
)
# Example of creating a Query Asset
asset_name = "MY_QUERY_ASSET"
asset_query = "SELECT * from postgres_taxi_data"
query_data_asset = data_source.add_query_asset(query=asset_query, name=asset_name)
# Verify that the Data Assets were created
print(data_source.assets)
Create a Batch Definition
A Batch Definition allows you to request all the records from a Data Asset or a subset based on the contents of a date and time field.
If you use GX Cloud and GX Core together, note that Batch Definitions you create with the API apply to API-managed Expectations only.
Prerequisites
- A preconfigured Data Context. The variable
context
is used for your Data Context in the following example code. - A Data Asset on a SQL Data Source.
Procedure
- Instructions
- Sample code
-
Retrieve your Data Asset.
Replace the value of
datasource_name
with the name of your Data Source and the value ofasset_name
with the name of your Data Asset in the following code. Then execute it to retrieve an existing Data Source and Data Asset from your Data Context:Python# Retrieve a Data Source
datasource_name = "my_datasource"
data_source = context.data_sources.get(datasource_name)
# Get the Data Asset from the Data Source
asset_name = "MY_TABLE_ASSET"
data_asset = data_source.get_asset(asset_name) -
Add a Batch Definition to the Data Asset.
- Full table
- Partitioned
A full table Batch Definition returns all of the records in your Data Asset as a single Batch. Therefore, to define a full table Batch Definition you only need to provide a name for the Batch Definition to be referenced by.
Update the
name
parameter and execute the following code to create a full table Batch Definition:Pythonfull_table_batch_definition = data_asset.add_batch_definition_whole_table(
name="FULL_TABLE"
)A partitioned Batch Definition subdivides the records in a Data Asset based on the values in a specified field. GX Core currently supports partitioning Data Assets based on date fields. The records can be grouped by year, month, or day.
Update the
date_column
variable andname
parameters in the following snippet, then execute it to create partitioned Batch Definitions:Pythondate_column = "pickup_datetime"
daily_batch_definition = data_asset.add_batch_definition_daily(
name="DAILY", column=date_column
)
monthly_batch_definition = data_asset.add_batch_definition_monthly(
name="MONTHLY", column=date_column
)
yearly_batch_definition = data_asset.add_batch_definition_yearly(
name="YEARLY", column=date_column
) -
Optional. Verify the Batch Definition is valid.
When retrieving a Batch from a partitioned Batch Definition, you can specify the date of the data to retrieve as shown in the following examples. If you do not specify a date, the most recent date in the data is returned by default.
- Full table
- Partitioned
Pythonfull_table_batch = full_table_batch_definition.get_batch()
full_table_batch.head()Pythondaily_batch = daily_batch_definition.get_batch(
batch_parameters={"year": 2020, "month": 1, "day": 14}
)
daily_batch.head()
monthly_batch = monthly_batch_definition.get_batch(
batch_parameters={"year": 2020, "month": 1}
)
monthly_batch.head()
yearly_batch = yearly_batch_definition.get_batch(batch_parameters={"year": 2020})
yearly_batch.head() -
Optional. Create additional Batch Definitions.
A Data Asset can have multiple Batch Definitions as long as each Batch Definition has a unique name within that Data Asset. Repeat this procedure to add additional full table or partitioned Batch Definitions to your Data Asset.
import great_expectations as gx
context = gx.get_context()
# Retrieve a Data Source
datasource_name = "my_datasource"
data_source = context.data_sources.get(datasource_name)
# Get the Data Asset from the Data Source
asset_name = "MY_TABLE_ASSET"
data_asset = data_source.get_asset(asset_name)
# Example of a full table Batch Definition
full_table_batch_definition = data_asset.add_batch_definition_whole_table(
name="FULL_TABLE"
)
# Verify that the Batch Definition is valid
full_table_batch = full_table_batch_definition.get_batch()
full_table_batch.head()
# Examples of partitioned Batch Definitions
date_column = "pickup_datetime"
daily_batch_definition = data_asset.add_batch_definition_daily(
name="DAILY", column=date_column
)
monthly_batch_definition = data_asset.add_batch_definition_monthly(
name="MONTHLY", column=date_column
)
yearly_batch_definition = data_asset.add_batch_definition_yearly(
name="YEARLY", column=date_column
)
# Verify that the partitioned Batch Definitions are valid
daily_batch = daily_batch_definition.get_batch(
batch_parameters={"year": 2020, "month": 1, "day": 14}
)
daily_batch.head()
monthly_batch = monthly_batch_definition.get_batch(
batch_parameters={"year": 2020, "month": 1}
)
monthly_batch.head()
yearly_batch = yearly_batch_definition.get_batch(batch_parameters={"year": 2020})
yearly_batch.head()