Dagster Integration
Dagster Integration
DataHub supports the integration of
- Dagster Pipeline metadata
- Job and Op run information as well as
- Lineage information when present
Using Datahub's Dagster Sensor
Dagster sensors allow us to perform some actions based on some state change. Datahub's defined dagster sensor will emit metadata after every dagster pipeline run execution. This sensor is able to emit both pipeline success as well as failures. For more details about Dagster sensors please refer Sensors.
Prerequisites
- You need to create a new dagster project. See https://docs.dagster.io/getting-started/create-new-project.
- There are two ways to define Dagster definition before starting dagster UI. One using Definitions class (recommended) and second using Repositories.
- Creation of new dagster project by default uses Definition class to define Dagster definition.
Setup
- You need to install the required dependency.
pip install acryl_datahub_dagster_plugin
- You need to import DataHub dagster plugin provided sensor definition and add it in Dagster definition or dagster repository before starting dagster UI as show below: Using Definitions class:
from dagster import Definitions
from datahub.ingestion.graph.client import DatahubClientConfig
from datahub_dagster_plugin.sensors.datahub_sensors import (
DatahubDagsterSourceConfig,
make_datahub_sensor,
)
config = DatahubDagsterSourceConfig(
datahub_client_config=DatahubClientConfig(
server="https://your_datahub_url/gms", token="your_datahub_token"
),
dagster_url="https://my-dagster-cloud.dagster.cloud",
)
datahub_sensor = make_datahub_sensor(config=config)
defs = Definitions(
sensors=[datahub_sensor],
)
The DataHub dagster plugin provided sensor internally uses below configs. You can set these configs using environment variables. If not set, the sensor will take the default value.
Configuration options:
Configuration Option Default value Description datahub_client_config The DataHub client config dagster_url The url to your Dagster Webserver. capture_asset_materialization True Whether to capture asset keys as Dataset on AssetMaterialization event capture_input_output True Whether to capture and try to parse input and output from HANDLED_OUTPUT,.LOADED_INPUT events. (currently only PathMetadataValue metadata supported (EXPERIMENTAL) platform_instance The instance of the platform that all assets produced by this recipe belong to. It is optional asset_lineage_extractor You can implement your own logic to capture asset lineage information. See example for details[] Once Dagster UI is up, you need to turn on the provided sensor execution. To turn on the sensor, click on Overview tab and then on Sensors tab. You will see a toggle button in front of all defined sensors to turn it on/off.
DataHub dagster plugin provided sensor is ready to emit metadata after every dagster pipeline run execution.
How to validate installation
- Go and check in Dagster UI at Overview -> Sensors menu if you can see the 'datahub_sensor'.
- Run a Dagster Job. In the dagster daemon logs, you should see DataHub related log messages like:
datahub_sensor - Emitting metadata...
Dagster Ins and Out
We can provide inputs and outputs to both assets and ops explicitly using a dictionary of Ins
and Out
corresponding to the decorated function arguments. While providing inputs and outputs explicitly we can provide metadata as well.
To create dataset upstream and downstream dependency for the assets and ops you can use an ins and out dictionary with metadata provided. For reference, look at the sample jobs created using assets assets_job.py
, or ops ops_job.py
.
Define your custom logic to capture asset lineage information
You can define your own logic to capture asset lineage information.
The output Tuple contains two dictionaries, one for input assets and the other for output assets. The key of the dictionary is the op key and the value is the set of asset urns that are upstream or downstream of the op.
from datahub_dagster_plugin.client.dagster_generator import DagsterGenerator, DatasetLineage
def asset_lineage_extractor(
context: RunStatusSensorContext,
dagster_generator: DagsterGenerator,
graph: DataHubGraph,
) -> Dict[str, DatasetLineage]:
dataset_lineage: Dict[str, DatasetLineage] = {}
# Extracting input and output assets from the context
return dataset_lineage
Debugging
Connection error for Datahub Rest URL
If you get ConnectionError: HTTPConnectionPool(host='localhost', port=8080), then in that case your DataHub GMS service is not up.