Skip to content

AzurePipelinesRunner

kedro_azureml_pipeline.runner.AzurePipelinesRunner

Bases: SequentialRunner

Sequential runner that rewires dataset paths for Azure ML.

Parameters

Name Type Description Default
is_async bool

Whether to run nodes asynchronously.

False
data_paths dict of str to str or None

Mapping of dataset name to Azure ML mount/download path.

None

See Also

AzureMLPipelineDataset : Dataset whose paths are rewired. AzureMLAssetDataset : Versioned asset dataset. AzureMLLocalRunHook : Hook that detects this runner.

Source Code

Show/Hide source
class AzurePipelinesRunner(SequentialRunner):
    """Sequential runner that rewires dataset paths for Azure ML.

    Parameters
    ----------
    is_async : bool
        Whether to run nodes asynchronously.
    data_paths : dict of str to str or None
        Mapping of dataset name to Azure ML mount/download path.

    See Also
    --------
    [AzureMLPipelineDataset][kedro_azureml_pipeline.datasets.AzureMLPipelineDataset] : Dataset whose paths are rewired.
    [AzureMLAssetDataset][kedro_azureml_pipeline.datasets.AzureMLAssetDataset] : Versioned asset dataset.
    [AzureMLLocalRunHook][kedro_azureml_pipeline.hooks.AzureMLLocalRunHook] : Hook that detects this runner.
    """

    def __init__(
        self,
        is_async: bool = False,
        data_paths: dict[str, str] | None = None,
    ):
        super().__init__(is_async)
        self.data_paths = data_paths if data_paths is not None else {}

    def run(
        self,
        pipeline: Pipeline,
        catalog: CatalogProtocol,
        hook_manager: PluginManager | None = None,
        run_id: str | None = None,
        only_missing_outputs: bool = False,
    ) -> dict[str, Any]:
        """Execute the pipeline with Azure ML dataset path rewiring.

        Parameters
        ----------
        pipeline : Pipeline
            Kedro pipeline to run.
        catalog : DataCatalog
            Data catalog.
        hook_manager : PluginManager or None
            Pluggy hook manager.
        run_id : str or None
            Unique run identifier.
        only_missing_outputs : bool
            If ``True``, only run nodes whose outputs are missing.

        Returns
        -------
        dict of str to Any
            Mapping of output dataset names to their values.
        """
        assert isinstance(catalog, DataCatalog), f"AzurePipelinesRunner requires a DataCatalog, got {type(catalog)}"
        # Preserve Azure configs from existing datasets before copying
        azure_configs = {}
        for ds_name in catalog.filter():
            ds = catalog[ds_name]
            if isinstance(ds, AzureMLAssetDataset) and hasattr(ds, "azure_config"):
                azure_configs[ds_name] = ds.azure_config

        # Use Kedro 1.0 copy mechanism instead of shallow_copy
        # For now, create a new catalog with the same datasets
        updated_catalog = DataCatalog()

        # Copy all existing datasets to the new catalog
        for ds_name in catalog.filter():
            ds = catalog[ds_name]
            updated_catalog[ds_name] = ds

        # Restore Azure configs after copying
        for ds_name, azure_config in azure_configs.items():
            if ds_name in updated_catalog.filter():  # pragma: no branch
                ds = updated_catalog[ds_name]
                if isinstance(ds, AzureMLAssetDataset):  # pragma: no branch
                    ds.azure_config = azure_config

        catalog_set = set(updated_catalog.filter())

        # Loop over datasets in arguments to set their paths
        for ds_name, azure_dataset_path in self.data_paths.items():
            if ds_name in catalog_set:
                ds = updated_catalog[ds_name]
                if isinstance(ds, AzureMLPipelineDataset):
                    if isinstance(ds, AzureMLAssetDataset) and ds._azureml_type == "uri_file":
                        ds.root_dir = Path(azure_dataset_path).parent.as_posix()
                    else:
                        ds.root_dir = azure_dataset_path
                    logger.debug("Rewired dataset '%s' root_dir to '%s'", ds_name, ds.root_dir)
                    updated_catalog[ds_name] = ds
            elif ds_name in catalog:
                # Resolve factory patterns (e.g. "product.{group}.{variant}.output")
                ds = catalog[ds_name]
                updated_catalog[ds_name] = ds
                logger.debug("Resolved factory dataset '%s'", ds_name)
            else:
                updated_catalog[ds_name] = self.create_default_data_set(ds_name)
                logger.debug("Created default pickle dataset for '%s'", ds_name)

        # Loop over remaining input datasets to add them to the catalog
        unsatisfied = pipeline.inputs() - set(updated_catalog.filter())
        for ds_name in unsatisfied:
            if ds_name in catalog:
                # Dataset is resolvable including as a factory dataset
                updated_catalog[ds_name] = catalog[ds_name]
            else:
                updated_catalog[ds_name] = self.create_default_data_set(ds_name)  # pragma: no cover

        return super().run(
            pipeline=pipeline,
            catalog=updated_catalog,
            hook_manager=hook_manager,
            only_missing_outputs=only_missing_outputs,
            run_id=run_id,
        )

    def create_default_data_set(self, ds_name: str) -> AbstractDataset:
        """Create a default pickle dataset for an intermediate output.

        Parameters
        ----------
        ds_name : str
            Dataset name to create.

        Returns
        -------
        AbstractDataset
            A ``AzureMLPipelineDataset`` wrapping a pickle backend.
        """
        return AzureMLPipelineDataset(
            {
                "type": PickleDataset,
                "backend": "cloudpickle",
                "filepath": f"{ds_name}.pickle",
            },
            root_dir=self.data_paths[ds_name],
        )

Methods

run(pipeline, catalog, hook_manager=None, run_id=None, only_missing_outputs=False)

Execute the pipeline with Azure ML dataset path rewiring.

Parameters
Name Type Description Default
pipeline Pipeline

Kedro pipeline to run.

required
catalog DataCatalog

Data catalog.

required
hook_manager PluginManager or None

Pluggy hook manager.

None
run_id str or None

Unique run identifier.

None
only_missing_outputs bool

If True, only run nodes whose outputs are missing.

False
Returns
Type Description
dict of str to Any

Mapping of output dataset names to their values.

Source Code
Show/Hide source
def run(
    self,
    pipeline: Pipeline,
    catalog: CatalogProtocol,
    hook_manager: PluginManager | None = None,
    run_id: str | None = None,
    only_missing_outputs: bool = False,
) -> dict[str, Any]:
    """Execute the pipeline with Azure ML dataset path rewiring.

    Parameters
    ----------
    pipeline : Pipeline
        Kedro pipeline to run.
    catalog : DataCatalog
        Data catalog.
    hook_manager : PluginManager or None
        Pluggy hook manager.
    run_id : str or None
        Unique run identifier.
    only_missing_outputs : bool
        If ``True``, only run nodes whose outputs are missing.

    Returns
    -------
    dict of str to Any
        Mapping of output dataset names to their values.
    """
    assert isinstance(catalog, DataCatalog), f"AzurePipelinesRunner requires a DataCatalog, got {type(catalog)}"
    # Preserve Azure configs from existing datasets before copying
    azure_configs = {}
    for ds_name in catalog.filter():
        ds = catalog[ds_name]
        if isinstance(ds, AzureMLAssetDataset) and hasattr(ds, "azure_config"):
            azure_configs[ds_name] = ds.azure_config

    # Use Kedro 1.0 copy mechanism instead of shallow_copy
    # For now, create a new catalog with the same datasets
    updated_catalog = DataCatalog()

    # Copy all existing datasets to the new catalog
    for ds_name in catalog.filter():
        ds = catalog[ds_name]
        updated_catalog[ds_name] = ds

    # Restore Azure configs after copying
    for ds_name, azure_config in azure_configs.items():
        if ds_name in updated_catalog.filter():  # pragma: no branch
            ds = updated_catalog[ds_name]
            if isinstance(ds, AzureMLAssetDataset):  # pragma: no branch
                ds.azure_config = azure_config

    catalog_set = set(updated_catalog.filter())

    # Loop over datasets in arguments to set their paths
    for ds_name, azure_dataset_path in self.data_paths.items():
        if ds_name in catalog_set:
            ds = updated_catalog[ds_name]
            if isinstance(ds, AzureMLPipelineDataset):
                if isinstance(ds, AzureMLAssetDataset) and ds._azureml_type == "uri_file":
                    ds.root_dir = Path(azure_dataset_path).parent.as_posix()
                else:
                    ds.root_dir = azure_dataset_path
                logger.debug("Rewired dataset '%s' root_dir to '%s'", ds_name, ds.root_dir)
                updated_catalog[ds_name] = ds
        elif ds_name in catalog:
            # Resolve factory patterns (e.g. "product.{group}.{variant}.output")
            ds = catalog[ds_name]
            updated_catalog[ds_name] = ds
            logger.debug("Resolved factory dataset '%s'", ds_name)
        else:
            updated_catalog[ds_name] = self.create_default_data_set(ds_name)
            logger.debug("Created default pickle dataset for '%s'", ds_name)

    # Loop over remaining input datasets to add them to the catalog
    unsatisfied = pipeline.inputs() - set(updated_catalog.filter())
    for ds_name in unsatisfied:
        if ds_name in catalog:
            # Dataset is resolvable including as a factory dataset
            updated_catalog[ds_name] = catalog[ds_name]
        else:
            updated_catalog[ds_name] = self.create_default_data_set(ds_name)  # pragma: no cover

    return super().run(
        pipeline=pipeline,
        catalog=updated_catalog,
        hook_manager=hook_manager,
        only_missing_outputs=only_missing_outputs,
        run_id=run_id,
    )

create_default_data_set(ds_name)

Create a default pickle dataset for an intermediate output.

Parameters
Name Type Description Default
ds_name str

Dataset name to create.

required
Returns
Type Description
AbstractDataset

A AzureMLPipelineDataset wrapping a pickle backend.

Source Code
Show/Hide source
def create_default_data_set(self, ds_name: str) -> AbstractDataset:
    """Create a default pickle dataset for an intermediate output.

    Parameters
    ----------
    ds_name : str
        Dataset name to create.

    Returns
    -------
    AbstractDataset
        A ``AzureMLPipelineDataset`` wrapping a pickle backend.
    """
    return AzureMLPipelineDataset(
        {
            "type": PickleDataset,
            "backend": "cloudpickle",
            "filepath": f"{ds_name}.pickle",
        },
        root_dir=self.data_paths[ds_name],
    )