dagster-teradata with Teradata Vantage
This guide walks you through integrating Dagster with Teradata Vantage to create and manage ETL pipelines. It provides step-by-step instructions for installing and configuring the necessary packages, setting up a Dagster project, and implementing a pipeline that interacts with Teradata Vantage.
Dagster
- Dagster is a data orchestrator built for data engineers, with integrated lineage, observability, a declarative programming model and best-in-class testability.
- Data pipelines are automated workflows that ingest raw data, process it through various transformations (such as cleaning and structuring), and produce a final, usable format—much like an assembly line for data.
- Dagster orchestrates this process by defining each stage of the pipeline, ensuring tasks execute in the correct sequence and at scheduled intervals. It provides a structured way to manage dependencies, track execution, and maintain reliable data workflows.
- Dagster orchestrates dbt alongside other technologies. Dagster's asset-oriented approach allows Dagster to understand dbt at the level of individual dbt models.
Prerequisites
-
Access to a Teradata Vantage instance.
NotaIf you need a test instance of Vantage, you can provision one for free at https://clearscape.teradata.com
-
Python 3.9 or higher, Python 3.12 is recommended.
-
pip
Setting Up a Virtual Enviroment
A virtual environment is recommended to isolate project dependencies and avoid conflicts with system-wide Python packages. Here’s how to set it up:
- Windows
- MacOS
- Linux
Run in Powershell:
Install dagster and dagster-teradata
With your virtual environment active, the next step is to install dagster and the Teradata provider package (dagster-teradata) to interact with Teradata Vantage.
-
Install the Required Packages:
-
Note about Optional Dependencies:
a)
dagster-teradata
relies on dagster-aws for ingesting data from an S3 bucket into Teradata Vantage. Sincedagster-aws
is an optional dependency, users can install it by running:b)
dagster-teradata
also relies ondagster-azure
for ingesting data from an Azure Blob Storage container into Teradata Vantage. To install this dependency, run: -
Verify the Installation:
To confirm that Dagster is correctly installed, run:
If installed correctly, it should show the version of Dagster.
Initialize a Dagster Project
Now that you have the necessary packages installed, the next step is to create a new Dagster project.
Scaffold a New Dagster Project
Run the following command:
This command will create a new project named dagster-quickstart. It will automatically generate the following directory structure:
Refer here to know more above this directory structure
Create Sample Data
To simulate an ETL pipeline, create a CSV file with sample data that your pipeline will process.
Create the CSV File: Inside the dagster_quickstart/data/ directory, create a file named sample_data.csv with the following content:
This file represents sample data that will be used as input for your ETL pipeline.
Define Assets for the ETL Pipeline
Now, we’ll define a series of assets for the ETL pipeline inside the assets.py file.
Edit the assets.py File: Open the dagster_quickstart/assets.py file and add the following code to define the pipeline:
This Dagster pipeline defines a series of assets that interact with Teradata. It starts by reading data from a CSV file, then drops and recreates a table in Teradata. After that, it inserts rows from the CSV into the table and finally retrieves the data from the table.
Define the Pipeline Definitions
The next step is to configure the pipeline by defining the necessary resources and jobs.
Edit the definitions.py File: Open dagster_quickstart/definitions.py and define your Dagster pipeline as follows:
This code sets up a Dagster project that interacts with Teradata by defining assets and resources
- It imports necessary modules, including pandas, Dagster, and dagster-teradata.
- It imports asset functions (read_csv_file, read_table, create_table, drop_table, insert_rows) from the assets.py module.
- It registers these assets with Dagster using Definitions, allowing Dagster to track and execute them.
- It defines a Teradata resource (TeradataResource) that reads database connection details from environment variables (TERADATA_HOST, TERADATA_USER, TERADATA_PASSWORD, TERADATA_DATABASE).
Running the Pipeline
After setting up the project, you can now run your Dagster pipeline:
- Start the Dagster Dev Server: In your terminal, navigate to the root directory of your project and run: dagster dev After executing the command dagster dev, the Dagster logs will be displayed directly in the terminal. Any errors encountered during startup will also be logged here. Once you see a message similar to: It indicates that the Dagster web server is running successfully. At this point, you can proceed to the next step.
- Access the Dagster UI: Open a web browser and navigate to http://127.0.0.1:3000. This will open the Dagster UI where you can manage and monitor your pipelines.
- Run the Pipeline:
- In the top navigation of the Dagster UI, click on Assets > View global asset lineage.
- Click Materialize to execute the pipeline.
- In the popup window, click View to see the details of the pipeline run.
- Monitor the Run: The Dagster UI allows you to visualize the pipeline's progress, view logs, and inspect the status of each step. You can switch between different views to see the execution logs and metadata for each asset.
Below are some of the operations provided by the TeradataResource:
1. Execute a Query (execute_query
)
This operation executes a SQL query within Teradata Vantage.
Args:
sql
(str) – The query to be executed.fetch_results
(bool, optional) – If True, fetch the query results. Defaults to False.single_result_row
(bool, optional) – If True, return only the first row of the result set. Effective only iffetch_results
is True. Defaults to False.
2. Execute Multiple Queries (execute_queries
)
This operation executes a series of SQL queries within Teradata Vantage.
Args:
sql_queries
(Sequence[str]) – List of queries to be executed in series.fetch_results
(bool, optional) – If True, fetch the query results. Defaults to False.single_result_row
(bool, optional) – If True, return only the first row of the result set. Effective only iffetch_results
is True. Defaults to False.
3. Drop a Database (drop_database
)
This operation drops one or more databases from Teradata Vantage.
Args:
databases
(Union[str, Sequence[str]]) – Database name or list of database names to drop.
4. Drop a Table (drop_table
)
This operation drops one or more tables from Teradata Vantage.
Args:
tables
(Union[str, Sequence[str]]) – Table name or list of table names to drop.
Summary
This guide provides a step-by-step approach to integrating Dagster with Teradata Vantage for building ETL pipelines