diff --git a/src/databricks/labs/ucx/assessment/workflows.py b/src/databricks/labs/ucx/assessment/workflows.py index 3c0efbd7af..4359344313 100644 --- a/src/databricks/labs/ucx/assessment/workflows.py +++ b/src/databricks/labs/ucx/assessment/workflows.py @@ -213,7 +213,7 @@ def assess_workflows(self, ctx: RuntimeContext): Also, stores direct filesystem accesses for display in the migration dashboard. """ - ctx.workflow_linter.refresh_report(ctx.sql_backend, ctx.inventory_database) + ctx.workflow_linter.snapshot() class Failing(Workflow): diff --git a/src/databricks/labs/ucx/contexts/application.py b/src/databricks/labs/ucx/contexts/application.py index 0965ffb583..5406cd52d6 100644 --- a/src/databricks/labs/ucx/contexts/application.py +++ b/src/databricks/labs/ucx/contexts/application.py @@ -589,6 +589,8 @@ def dependency_resolver(self) -> DependencyResolver: def workflow_linter(self) -> WorkflowLinter: return WorkflowLinter( self.workspace_client, + self.sql_backend, + self.inventory_database, self.jobs_crawler, self.dependency_resolver, self.path_lookup, diff --git a/src/databricks/labs/ucx/contexts/workflow_task.py b/src/databricks/labs/ucx/contexts/workflow_task.py index bcd0555b95..86f9e01544 100644 --- a/src/databricks/labs/ucx/contexts/workflow_task.py +++ b/src/databricks/labs/ucx/contexts/workflow_task.py @@ -32,6 +32,7 @@ from databricks.labs.ucx.progress.jobs import JobsProgressEncoder from databricks.labs.ucx.progress.tables import TableProgressEncoder from databricks.labs.ucx.progress.workflow_runs import WorkflowRunRecorder +from databricks.labs.ucx.source_code.jobs import JobProblem, JobProblemOwnership # As with GlobalContext, service factories unavoidably have a lot of public methods. # pylint: disable=too-many-public-methods @@ -81,6 +82,15 @@ def jobs_crawler(self) -> JobsCrawler: def job_ownership(self) -> JobOwnership: return JobOwnership(self.administrator_locator) + @cached_property + def workflow_problem_ownership(self) -> JobProblemOwnership: + return JobProblemOwnership( + self.administrator_locator, + self.workspace_client, + self.workspace_path_ownership, + self.job_ownership, + ) + @cached_property def submit_runs_crawler(self) -> SubmitRunsCrawler: return SubmitRunsCrawler( @@ -217,6 +227,17 @@ def jobs_progress(self) -> ProgressEncoder[JobInfo]: self.config.ucx_catalog, ) + @cached_property + def workflow_problem_progress(self) -> ProgressEncoder[JobProblem]: + return ProgressEncoder( + self.sql_backend, + self.workflow_problem_ownership, + JobProblem, + self.parent_run_id, + self.workspace_id, + self.config.ucx_catalog, + ) + @cached_property def pipelines_progress(self) -> ProgressEncoder[PipelineInfo]: return ProgressEncoder( diff --git a/src/databricks/labs/ucx/progress/workflows.py b/src/databricks/labs/ucx/progress/workflows.py index b0ffd45a86..3784293049 100644 --- a/src/databricks/labs/ucx/progress/workflows.py +++ b/src/databricks/labs/ucx/progress/workflows.py @@ -180,8 +180,13 @@ def update_lakeview_dashboards_history_log(self, ctx: RuntimeContext) -> None: def assess_workflows(self, ctx: RuntimeContext): """Scans all jobs for migration issues in notebooks. Also stores direct filesystem accesses for display in the migration dashboard.""" - # TODO: Ensure these are captured in the history log. - ctx.workflow_linter.refresh_report(ctx.sql_backend, ctx.inventory_database) + ctx.workflow_linter.snapshot(force_refresh=True) + + @job_task(depends_on=[assess_workflows], job_cluster="user_isolation") + def update_workflow_problems_history_log(self, ctx: RuntimeContext) -> None: + """Update the history log with the latest workflow problems snapshot.""" + workflow_problems_snapshot = ctx.workflow_linter.snapshot(force_refresh=False) + ctx.workflow_problem_progress.append_inventory_snapshot(workflow_problems_snapshot) @job_task( depends_on=[ diff --git a/src/databricks/labs/ucx/source_code/jobs.py b/src/databricks/labs/ucx/source_code/jobs.py index bacc4554d3..e5b2c5ee2e 100644 --- a/src/databricks/labs/ucx/source_code/jobs.py +++ b/src/databricks/labs/ucx/source_code/jobs.py @@ -7,6 +7,7 @@ from importlib import metadata from pathlib import Path from urllib import parse +from typing import ClassVar from databricks.labs.blueprint.paths import DBFSPath from databricks.sdk import WorkspaceClient @@ -16,10 +17,10 @@ from databricks.sdk.service.jobs import Source from databricks.labs.ucx.assessment.crawlers import runtime_version_tuple +from databricks.labs.ucx.assessment.jobs import JobInfo, JobOwnership +from databricks.labs.ucx.framework.owners import AdministratorLocator, Ownership, WorkspacePathOwnership from databricks.labs.ucx.mixins.cached_workspace_path import WorkspaceCache, InvalidPath -from databricks.labs.ucx.source_code.base import ( - LineageAtom, -) +from databricks.labs.ucx.source_code.base import LineageAtom from databricks.labs.ucx.source_code.graph import ( Dependency, DependencyGraph, @@ -37,7 +38,7 @@ class JobProblem: job_id: int job_name: str task_key: str - path: str + path: str # str for legacy support code: str message: str start_line: int @@ -45,10 +46,25 @@ class JobProblem: end_line: int end_col: int + __id_attributes__: ClassVar[tuple[str, ...]] = ( + "job_id", + "task_key", + "path", + "code", + "start_line", + "start_col", + "end_line", + "end_col", + ) + def as_message(self) -> str: message = f"{self.path}:{self.start_line} [{self.code}] {self.message}" return message + def has_missing_path(self) -> bool: + """Flag if the path is missing, or not.""" + return self.path == Path("").as_posix() # Reusing flag from DependencyProblem + class WorkflowTask(Dependency): def __init__(self, ws: WorkspaceClient, task: jobs.Task, job: jobs.Job): @@ -358,3 +374,31 @@ def _new_job_cluster_metadata(self, new_cluster) -> Iterable[DependencyProblem]: self._spark_version = new_cluster.spark_version self._data_security_mode = new_cluster.data_security_mode return [] + + +class JobProblemOwnership(Ownership[JobProblem]): + """Determine ownership of job (workflow) problems. + + This is the job creator (if known). + """ + + def __init__( + self, + administrator_locator: AdministratorLocator, + ws: WorkspaceClient, + workspace_path_ownership: WorkspacePathOwnership, + job_ownership: JobOwnership, + ) -> None: + super().__init__(administrator_locator) + self._ws = ws + self._workspace_path_ownership = workspace_path_ownership + self._job_ownership = job_ownership + + def _maybe_direct_owner(self, record: JobProblem) -> str | None: + if not record.has_missing_path(): + return self._workspace_path_ownership.owner_of_path(record.path) + try: + job = self._ws.jobs.get(record.job_id) + return self._job_ownership.owner_of(JobInfo.from_job(job)) + except DatabricksError: + return None diff --git a/src/databricks/labs/ucx/source_code/linters/jobs.py b/src/databricks/labs/ucx/source_code/linters/jobs.py index 721fdc8a89..4a09a7eb17 100644 --- a/src/databricks/labs/ucx/source_code/linters/jobs.py +++ b/src/databricks/labs/ucx/source_code/linters/jobs.py @@ -4,7 +4,6 @@ from collections.abc import Iterable from datetime import datetime, timezone -from pathlib import Path from databricks.labs.blueprint.parallel import Threads from databricks.labs.lsql.backends import SqlBackend @@ -12,6 +11,8 @@ from databricks.sdk.errors import NotFound from databricks.sdk.service import jobs +from databricks.labs.ucx.framework.crawlers import CrawlerBase +from databricks.labs.ucx.framework.utils import escape_sql_identifier from databricks.labs.ucx.assessment.jobs import JobsCrawler from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex from databricks.labs.ucx.source_code.base import ( @@ -37,10 +38,19 @@ logger = logging.getLogger(__name__) -class WorkflowLinter: - def __init__( +class WorkflowLinter(CrawlerBase): + """Lint workflows for UC compatibility and references to data assets. + + Data assets linted: + - Table and view references (UsedTables) + - Direct file system access (DirectFsAccess) + """ + + def __init__( # pylint: disable=too-many-arguments self, ws: WorkspaceClient, + sql_backend: SqlBackend, + schema: str, jobs_crawler: JobsCrawler, resolver: DependencyResolver, path_lookup: PathLookup, @@ -48,6 +58,8 @@ def __init__( directfs_crawler: DirectFsAccessCrawler, used_tables_crawler: UsedTablesCrawler, ): + super().__init__(sql_backend, "hive_metastore", schema, "workflow_problems", JobProblem) + self._ws = ws self._jobs_crawler = jobs_crawler self._resolver = resolver @@ -56,31 +68,34 @@ def __init__( self._directfs_crawler = directfs_crawler self._used_tables_crawler = used_tables_crawler - def refresh_report(self, sql_backend: SqlBackend, inventory_database: str) -> None: - tasks = [] - for job in self._jobs_crawler.snapshot(): - tasks.append(functools.partial(self.lint_job, job.job_id)) + def _try_fetch(self) -> Iterable[JobProblem]: + """Fetch all linting problems from the inventory table. + + If trying to fetch the linted data assets, use their respective crawlers. + """ + for row in self._fetch(f"SELECT * FROM {escape_sql_identifier(self.full_name)}"): + yield JobProblem(*row) + + def _crawl(self) -> Iterable[JobProblem]: + """Crawl the workflow jobs and lint them. + + Next to linted workflow problems, the crawler also collects: + - Table and view references (UsedTables) + - Direct file system access (DirectFsAccess) + """ + tasks = [functools.partial(self.lint_job, job.job_id) for job in self._jobs_crawler.snapshot()] + if not tasks: + return logger.info(f"Running {len(tasks)} linting tasks in parallel...") - job_results, errors = Threads.gather('linting workflows', tasks) - job_problems: list[JobProblem] = [] - job_dfsas: list[DirectFsAccess] = [] - job_tables: list[UsedTable] = [] - for problems, dfsas, tables in job_results: - job_problems.extend(problems) - job_dfsas.extend(dfsas) - job_tables.extend(tables) - logger.info(f"Saving {len(job_problems)} linting problems...") - sql_backend.save_table( - f'{inventory_database}.workflow_problems', - job_problems, - JobProblem, - mode='overwrite', - ) - self._directfs_crawler.dump_all(job_dfsas) - self._used_tables_crawler.dump_all(job_tables) - if len(errors) > 0: + results, errors = Threads.gather("linting workflows", tasks) + if errors: error_messages = "\n".join([str(error) for error in errors]) logger.warning(f"Errors occurred during linting:\n{error_messages}") + if results: + problems, dfsas, tables = zip(*results) + self._directfs_crawler.dump_all(dfsas) + self._used_tables_crawler.dump_all(tables) + yield from problems def lint_job(self, job_id: int) -> tuple[list[JobProblem], list[DirectFsAccess], list[UsedTable]]: try: @@ -95,8 +110,6 @@ def lint_job(self, job_id: int) -> tuple[list[JobProblem], list[DirectFsAccess], logger.warning(f"Found job problems:\n{problem_messages}") return problems, dfsas, tables - _UNKNOWN = Path('') - def _lint_job(self, job: jobs.Job) -> tuple[list[JobProblem], list[DirectFsAccess], list[UsedTable]]: problems: list[JobProblem] = [] dfsas: list[DirectFsAccess] = [] @@ -111,12 +124,11 @@ def _lint_job(self, job: jobs.Job) -> tuple[list[JobProblem], list[DirectFsAcces if not advices: advices = self._lint_task(graph, session_state) for advice in advices: - absolute_path = "UNKNOWN" if advice.has_missing_path() else advice.path.absolute().as_posix() job_problem = JobProblem( job_id=job.job_id, job_name=job.settings.name, task_key=task.task_key, - path=absolute_path, + path=advice.path.as_posix(), code=advice.advice.code, message=advice.advice.message, start_line=advice.advice.start_line, diff --git a/src/databricks/labs/ucx/source_code/linters/queries.py b/src/databricks/labs/ucx/source_code/linters/queries.py index aca7e6e0a2..7ca50070c2 100644 --- a/src/databricks/labs/ucx/source_code/linters/queries.py +++ b/src/databricks/labs/ucx/source_code/linters/queries.py @@ -40,6 +40,10 @@ class _ReportingContext: class QueryLinter: + """Lint queries + + TODO: Let QueryLinter inherit from `BaseCrawler` + """ def __init__( self, diff --git a/tests/integration/assessment/test_workflows.py b/tests/integration/assessment/test_workflows.py index d9d0e53e91..4239be3a4e 100644 --- a/tests/integration/assessment/test_workflows.py +++ b/tests/integration/assessment/test_workflows.py @@ -4,6 +4,8 @@ from databricks.sdk.retries import retried from databricks.sdk.service.iam import PermissionLevel +from databricks.labs.ucx.source_code.jobs import JobProblem + @retried(on=[NotFound, InvalidParameterValue]) def test_running_real_assessment_job( @@ -47,5 +49,9 @@ def test_running_real_assessment_job( assert actual_tables == expected_tables query = f"SELECT * FROM {installation_ctx.inventory_database}.workflow_problems" - workflow_problems_without_path = [problem for problem in sql_backend.fetch(query) if problem["path"] == "UNKNOWN"] + workflow_problems_without_path = [] + for record in sql_backend.fetch(query): + job_problem = JobProblem(**record.asDict()) + if job_problem.has_missing_path(): + workflow_problems_without_path.append(job_problem) assert not workflow_problems_without_path diff --git a/tests/integration/source_code/test_directfs_access.py b/tests/integration/source_code/test_directfs_access.py index fc4b966b9d..0edee2c30c 100644 --- a/tests/integration/source_code/test_directfs_access.py +++ b/tests/integration/source_code/test_directfs_access.py @@ -79,8 +79,6 @@ def test_lakeview_query_dfsa_ownership(runtime_ctx) -> None: def test_path_dfsa_ownership( runtime_ctx, make_directory, - inventory_schema, - sql_backend, ) -> None: """Verify the ownership of a direct-fs record for a notebook/source path associated with a job.""" @@ -92,6 +90,8 @@ def test_path_dfsa_ownership( # Produce a DFSA record for the job. linter = WorkflowLinter( runtime_ctx.workspace_client, + runtime_ctx.sql_backend, + runtime_ctx.inventory_database, runtime_ctx.jobs_crawler, runtime_ctx.dependency_resolver, runtime_ctx.path_lookup, @@ -99,7 +99,7 @@ def test_path_dfsa_ownership( runtime_ctx.directfs_access_crawler_for_paths, runtime_ctx.used_tables_crawler_for_paths, ) - linter.refresh_report(sql_backend, inventory_schema) + linter.snapshot() # Find a record for our job. records = runtime_ctx.directfs_access_crawler_for_paths.snapshot() diff --git a/tests/integration/source_code/test_jobs.py b/tests/integration/source_code/test_jobs.py index 80522e7f83..086b72f173 100644 --- a/tests/integration/source_code/test_jobs.py +++ b/tests/integration/source_code/test_jobs.py @@ -36,7 +36,7 @@ def test_linter_from_context(simple_ctx, make_job) -> None: # Ensure we have at least 1 job that fails: "Deprecated file system path in call to: /mnt/things/e/f/g" job = make_job(content="spark.read.table('a_table').write.csv('/mnt/things/e/f/g')\n") simple_ctx.config.include_job_ids = [job.job_id] - simple_ctx.workflow_linter.refresh_report(simple_ctx.sql_backend, simple_ctx.inventory_database) + simple_ctx.workflow_linter.refresh_report() # Verify that the 'problems' table has content. cursor = simple_ctx.sql_backend.fetch( diff --git a/tests/unit/progress/test_workflows.py b/tests/unit/progress/test_workflows.py index 3557257644..e0bafd875b 100644 --- a/tests/unit/progress/test_workflows.py +++ b/tests/unit/progress/test_workflows.py @@ -3,6 +3,7 @@ from unittest.mock import create_autospec import pytest + from databricks.labs.ucx.hive_metastore import TablesCrawler from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatusRefresher from databricks.labs.ucx.progress.history import ProgressEncoder @@ -12,6 +13,8 @@ from databricks.labs.ucx.progress.workflows import MigrationProgress from databricks.labs.ucx.contexts.workflow_task import RuntimeContext +from databricks.labs.ucx.source_code.linters.jobs import WorkflowLinter +from databricks.labs.ucx.source_code.linters.queries import QueryLinter @pytest.mark.parametrize( @@ -98,21 +101,22 @@ def test_migration_progress_runtime_tables_refresh_update_history_log(run_workfl mock_history_log.append_inventory_snapshot.assert_called_once() -@pytest.mark.parametrize( - "task, linter", - ( - (MigrationProgress.assess_dashboards, RuntimeContext.query_linter), - (MigrationProgress.assess_workflows, RuntimeContext.workflow_linter), - ), -) -def test_linter_runtime_refresh(run_workflow, task, linter) -> None: - linter_class = get_type_hints(linter.func)["return"] - mock_linter = create_autospec(linter_class) - linter_name = linter.attrname - run_workflow(task, **{linter_name: mock_linter}) +def test_migration_progress_assess_dashboards_calls_query_linter_refresh_report(run_workflow) -> None: + mock_linter = create_autospec(QueryLinter) + run_workflow(MigrationProgress.assess_dashboards, query_linter=mock_linter) mock_linter.refresh_report.assert_called_once() +def test_migration_progress_assess_workflows_calls_workflow_linter_snapshot(run_workflow) -> None: + mock_linter = create_autospec(WorkflowLinter) + mock_history_log = create_autospec(ProgressEncoder) + run_workflow(MigrationProgress.assess_workflows, workflow_linter=mock_linter) + run_workflow( + MigrationProgress.update_workflow_problems_history_log, workflow_progress=mock_history_log, parent_run_id=1234 + ) + mock_linter.snapshot.assert_called_once() + + def test_migration_progress_with_valid_prerequisites(run_workflow) -> None: ws = create_autospec(WorkspaceClient) ws.metastores.current.return_value = MetastoreAssignment(metastore_id="test", workspace_id=123456789) diff --git a/tests/unit/source_code/test_jobs.py b/tests/unit/source_code/test_jobs.py index 5024834eb0..96e3078e4f 100644 --- a/tests/unit/source_code/test_jobs.py +++ b/tests/unit/source_code/test_jobs.py @@ -230,14 +230,17 @@ def test_workflow_task_container_builds_dependency_graph_spark_python_task( def test_workflow_linter_lint_job_logs_problems(dependency_resolver, mock_path_lookup, empty_index, caplog) -> None: - expected_message = "Found job problems:\nUNKNOWN:-1 [library-install-failed] 'pip --disable-pip-version-check install unknown-library" + expected_message = "Found job problems:\n:-1 [library-install-failed] 'pip --disable-pip-version-check install unknown-library" ws = create_autospec(WorkspaceClient) + sql_backend = MockBackend() jobs_crawler = create_autospec(JobsCrawler) directfs_crawler = create_autospec(DirectFsAccessCrawler) used_tables_crawler = create_autospec(UsedTablesCrawler) linter = WorkflowLinter( ws, + sql_backend, + "test", jobs_crawler, dependency_resolver, mock_path_lookup, @@ -567,6 +570,8 @@ def test_workflow_linter_refresh_report(dependency_resolver, mock_path_lookup, m used_tables_crawler = UsedTablesCrawler.for_paths(sql_backend, "test") linter = WorkflowLinter( ws, + sql_backend, + "test", jobs_crawler, dependency_resolver, mock_path_lookup, @@ -574,7 +579,7 @@ def test_workflow_linter_refresh_report(dependency_resolver, mock_path_lookup, m directfs_crawler, used_tables_crawler, ) - linter.refresh_report(sql_backend, 'test') + linter.snapshot() jobs_crawler.snapshot.assert_called_once() sql_backend.has_rows_written_for('test.workflow_problems')