Skip to content

Persist workflow problems in UCX multiworkspace catalog #3756

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion src/databricks/labs/ucx/assessment/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def assess_workflows(self, ctx: RuntimeContext):

Also, stores direct filesystem accesses for display in the migration dashboard.
"""
ctx.workflow_linter.refresh_report(ctx.sql_backend, ctx.inventory_database)
ctx.workflow_linter.snapshot()


class Failing(Workflow):
Expand Down
2 changes: 2 additions & 0 deletions src/databricks/labs/ucx/contexts/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,8 @@ def dependency_resolver(self) -> DependencyResolver:
def workflow_linter(self) -> WorkflowLinter:
return WorkflowLinter(
self.workspace_client,
self.sql_backend,
self.inventory_database,
self.jobs_crawler,
self.dependency_resolver,
self.path_lookup,
Expand Down
21 changes: 21 additions & 0 deletions src/databricks/labs/ucx/contexts/workflow_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from databricks.labs.ucx.progress.jobs import JobsProgressEncoder
from databricks.labs.ucx.progress.tables import TableProgressEncoder
from databricks.labs.ucx.progress.workflow_runs import WorkflowRunRecorder
from databricks.labs.ucx.source_code.jobs import JobProblem, JobProblemOwnership

# As with GlobalContext, service factories unavoidably have a lot of public methods.
# pylint: disable=too-many-public-methods
Expand Down Expand Up @@ -81,6 +82,15 @@ def jobs_crawler(self) -> JobsCrawler:
def job_ownership(self) -> JobOwnership:
return JobOwnership(self.administrator_locator)

@cached_property
def workflow_problem_ownership(self) -> JobProblemOwnership:
return JobProblemOwnership(
self.administrator_locator,
self.workspace_client,
self.workspace_path_ownership,
self.job_ownership,
)

@cached_property
def submit_runs_crawler(self) -> SubmitRunsCrawler:
return SubmitRunsCrawler(
Expand Down Expand Up @@ -217,6 +227,17 @@ def jobs_progress(self) -> ProgressEncoder[JobInfo]:
self.config.ucx_catalog,
)

@cached_property
def workflow_problem_progress(self) -> ProgressEncoder[JobProblem]:
return ProgressEncoder(
self.sql_backend,
self.workflow_problem_ownership,
JobProblem,
self.parent_run_id,
self.workspace_id,
self.config.ucx_catalog,
)

@cached_property
def pipelines_progress(self) -> ProgressEncoder[PipelineInfo]:
return ProgressEncoder(
Expand Down
9 changes: 7 additions & 2 deletions src/databricks/labs/ucx/progress/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,13 @@ def update_lakeview_dashboards_history_log(self, ctx: RuntimeContext) -> None:
def assess_workflows(self, ctx: RuntimeContext):
"""Scans all jobs for migration issues in notebooks.
Also stores direct filesystem accesses for display in the migration dashboard."""
# TODO: Ensure these are captured in the history log.
ctx.workflow_linter.refresh_report(ctx.sql_backend, ctx.inventory_database)
ctx.workflow_linter.snapshot(force_refresh=True)

@job_task(depends_on=[assess_workflows], job_cluster="user_isolation")
def update_workflow_problems_history_log(self, ctx: RuntimeContext) -> None:
"""Update the history log with the latest workflow problems snapshot."""
workflow_problems_snapshot = ctx.workflow_linter.snapshot(force_refresh=False)
ctx.workflow_problem_progress.append_inventory_snapshot(workflow_problems_snapshot)

@job_task(
depends_on=[
Expand Down
50 changes: 47 additions & 3 deletions src/databricks/labs/ucx/source_code/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from importlib import metadata
from pathlib import Path
from urllib import parse
from typing import ClassVar

from databricks.labs.blueprint.paths import DBFSPath
from databricks.sdk import WorkspaceClient
Expand All @@ -16,10 +17,10 @@
from databricks.sdk.service.jobs import Source

from databricks.labs.ucx.assessment.crawlers import runtime_version_tuple
from databricks.labs.ucx.assessment.jobs import JobInfo, JobOwnership
from databricks.labs.ucx.framework.owners import AdministratorLocator, Ownership, WorkspacePathOwnership
from databricks.labs.ucx.mixins.cached_workspace_path import WorkspaceCache, InvalidPath
from databricks.labs.ucx.source_code.base import (
LineageAtom,
)
from databricks.labs.ucx.source_code.base import LineageAtom
from databricks.labs.ucx.source_code.graph import (
Dependency,
DependencyGraph,
Expand All @@ -45,10 +46,25 @@ class JobProblem:
end_line: int
end_col: int

__id_attributes__: ClassVar[tuple[str, ...]] = (
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@asnare : This can only be str according to the current implementation. Do you know why? And/or what what to do with non-string fields?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are constrained because the mechanism we use to encode them as an identifier is only defined for when they're a string. I vaguely recall proposing JSON-encoding the sequence, but that was rejected.

Anything that isn't a string needs to be made available (unambiguously) as a string.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anything that isn't a string needs to be made available (unambiguously) as a string.

Can we made these available as string without changing the dataclass attributes? (We want to avoid that as this is persisted as table in the UCX inventory, thus difficult to update the data types.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think read-only properties will work?

"job_id",
"task_key",
"path",
"code",
"start_line",
"start_col",
"end_line",
"end_col",
)

def as_message(self) -> str:
message = f"{self.path}:{self.start_line} [{self.code}] {self.message}"
return message

def has_missing_path(self) -> bool:
"""Flag if the path is missing, or not."""
return self.path == Path("<MISSING_SOURCE_PATH>") # Reusing flag from DependencyProblem


class WorkflowTask(Dependency):
def __init__(self, ws: WorkspaceClient, task: jobs.Task, job: jobs.Job):
Expand Down Expand Up @@ -358,3 +374,31 @@ def _new_job_cluster_metadata(self, new_cluster) -> Iterable[DependencyProblem]:
self._spark_version = new_cluster.spark_version
self._data_security_mode = new_cluster.data_security_mode
return []


class JobProblemOwnership(Ownership[JobProblem]):
"""Determine ownership of job (workflow) problems.

This is the job creator (if known).
"""

def __init__(
self,
administrator_locator: AdministratorLocator,
ws: WorkspaceClient,
workspace_path_ownership: WorkspacePathOwnership,
job_ownership: JobOwnership,
) -> None:
super().__init__(administrator_locator)
self._ws = ws
self._workspace_path_ownership = workspace_path_ownership
self._job_ownership = job_ownership

def _maybe_direct_owner(self, record: JobProblem) -> str | None:
if not record.has_missing_path():
return self._workspace_path_ownership.owner_of_path(record.path)
try:
job = self._ws.jobs.get(record.job_id)
return self._job_ownership.owner_of(JobInfo.from_job(job))
except DatabricksError:
return None
70 changes: 41 additions & 29 deletions src/databricks/labs/ucx/source_code/linters/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@

from collections.abc import Iterable
from datetime import datetime, timezone
from pathlib import Path

from databricks.labs.blueprint.parallel import Threads
from databricks.labs.lsql.backends import SqlBackend
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import NotFound
from databricks.sdk.service import jobs

from databricks.labs.ucx.framework.crawlers import CrawlerBase
from databricks.labs.ucx.framework.utils import escape_sql_identifier
from databricks.labs.ucx.assessment.jobs import JobsCrawler
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex
from databricks.labs.ucx.source_code.base import (
Expand All @@ -37,17 +38,28 @@
logger = logging.getLogger(__name__)


class WorkflowLinter:
def __init__(
class WorkflowLinter(CrawlerBase):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@asnare : This "crawler" does not fit the usual crawler format as it "crawls" multiple resources: JobProblems, UsedTable and DirectFsAccess. You looked into this. What is your conclusion on handling this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a very difficult problem to solve: I'm not sure I reached a conclusion. I had a sense that we could use some sort of "composite" crawler than produced the different things all at once and then split them up, but never really went through the detailed effort of figuring out the details to graft those two worlds together.

"""Lint workflows for UC compatibility and references to data assets.

Data assets linted:
- Table and view references (UsedTables)
- Direct file system access (DirectFsAccess)
"""

def __init__( # pylint: disable=too-many-arguments
self,
ws: WorkspaceClient,
sql_backend: SqlBackend,
schema: str,
jobs_crawler: JobsCrawler,
resolver: DependencyResolver,
path_lookup: PathLookup,
migration_index: TableMigrationIndex,
directfs_crawler: DirectFsAccessCrawler,
used_tables_crawler: UsedTablesCrawler,
):
super().__init__(sql_backend, "hive_metastore", schema, "workflow_problems", JobProblem)

self._ws = ws
self._jobs_crawler = jobs_crawler
self._resolver = resolver
Expand All @@ -56,31 +68,34 @@ def __init__(
self._directfs_crawler = directfs_crawler
self._used_tables_crawler = used_tables_crawler

def refresh_report(self, sql_backend: SqlBackend, inventory_database: str) -> None:
tasks = []
for job in self._jobs_crawler.snapshot():
tasks.append(functools.partial(self.lint_job, job.job_id))
def _try_fetch(self) -> Iterable[JobProblem]:
"""Fetch all linting problems from the inventory table.

If trying to fetch the linted data assets, use their respective crawlers.
"""
for row in self._fetch(f"SELECT * FROM {escape_sql_identifier(self.full_name)}"):
yield JobProblem(*row)

def _crawl(self) -> Iterable[JobProblem]:
"""Crawl the workflow jobs and lint them.

Next to linted workflow problems, the crawler also collects:
- Table and view references (UsedTables)
- Direct file system access (DirectFsAccess)
"""
tasks = [functools.partial(self.lint_job, job.job_id) for job in self._jobs_crawler.snapshot()]
if not tasks:
return
logger.info(f"Running {len(tasks)} linting tasks in parallel...")
job_results, errors = Threads.gather('linting workflows', tasks)
job_problems: list[JobProblem] = []
job_dfsas: list[DirectFsAccess] = []
job_tables: list[UsedTable] = []
for problems, dfsas, tables in job_results:
job_problems.extend(problems)
job_dfsas.extend(dfsas)
job_tables.extend(tables)
logger.info(f"Saving {len(job_problems)} linting problems...")
sql_backend.save_table(
f'{inventory_database}.workflow_problems',
job_problems,
JobProblem,
mode='overwrite',
)
self._directfs_crawler.dump_all(job_dfsas)
self._used_tables_crawler.dump_all(job_tables)
if len(errors) > 0:
results, errors = Threads.gather("linting workflows", tasks)
if errors:
error_messages = "\n".join([str(error) for error in errors])
logger.warning(f"Errors occurred during linting:\n{error_messages}")
if results:
problems, dfsas, tables = zip(*results)
self._directfs_crawler.dump_all(dfsas)
self._used_tables_crawler.dump_all(tables)
yield from problems

def lint_job(self, job_id: int) -> tuple[list[JobProblem], list[DirectFsAccess], list[UsedTable]]:
try:
Expand All @@ -95,8 +110,6 @@ def lint_job(self, job_id: int) -> tuple[list[JobProblem], list[DirectFsAccess],
logger.warning(f"Found job problems:\n{problem_messages}")
return problems, dfsas, tables

_UNKNOWN = Path('<UNKNOWN>')

def _lint_job(self, job: jobs.Job) -> tuple[list[JobProblem], list[DirectFsAccess], list[UsedTable]]:
problems: list[JobProblem] = []
dfsas: list[DirectFsAccess] = []
Expand All @@ -111,12 +124,11 @@ def _lint_job(self, job: jobs.Job) -> tuple[list[JobProblem], list[DirectFsAcces
if not advices:
advices = self._lint_task(graph, session_state)
for advice in advices:
absolute_path = "UNKNOWN" if advice.has_missing_path() else advice.path.absolute().as_posix()
job_problem = JobProblem(
job_id=job.job_id,
job_name=job.settings.name,
task_key=task.task_key,
path=absolute_path,
path=advice.path.as_posix(),
code=advice.advice.code,
message=advice.advice.message,
start_line=advice.advice.start_line,
Expand Down
4 changes: 4 additions & 0 deletions src/databricks/labs/ucx/source_code/linters/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ class _ReportingContext:


class QueryLinter:
"""Lint queries

TODO: Let QueryLinter inherit from `BaseCrawler`
"""

def __init__(
self,
Expand Down
8 changes: 7 additions & 1 deletion tests/integration/assessment/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from databricks.sdk.retries import retried
from databricks.sdk.service.iam import PermissionLevel

from databricks.labs.ucx.source_code.jobs import JobProblem


@retried(on=[NotFound, InvalidParameterValue])
def test_running_real_assessment_job(
Expand Down Expand Up @@ -47,5 +49,9 @@ def test_running_real_assessment_job(
assert actual_tables == expected_tables

query = f"SELECT * FROM {installation_ctx.inventory_database}.workflow_problems"
workflow_problems_without_path = [problem for problem in sql_backend.fetch(query) if problem["path"] == "UNKNOWN"]
workflow_problems_without_path = []
for record in sql_backend.fetch(query):
job_problem = JobProblem(**record.asDict())
if job_problem.has_missing_path():
workflow_problems_without_path.append(job_problem)
assert not workflow_problems_without_path
6 changes: 3 additions & 3 deletions tests/integration/source_code/test_directfs_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ def test_lakeview_query_dfsa_ownership(runtime_ctx) -> None:
def test_path_dfsa_ownership(
runtime_ctx,
make_directory,
inventory_schema,
sql_backend,
) -> None:
"""Verify the ownership of a direct-fs record for a notebook/source path associated with a job."""

Expand All @@ -92,14 +90,16 @@ def test_path_dfsa_ownership(
# Produce a DFSA record for the job.
linter = WorkflowLinter(
runtime_ctx.workspace_client,
runtime_ctx.sql_backend,
runtime_ctx.inventory_database,
runtime_ctx.jobs_crawler,
runtime_ctx.dependency_resolver,
runtime_ctx.path_lookup,
TableMigrationIndex([]),
runtime_ctx.directfs_access_crawler_for_paths,
runtime_ctx.used_tables_crawler_for_paths,
)
linter.refresh_report(sql_backend, inventory_schema)
linter.snapshot()

# Find a record for our job.
records = runtime_ctx.directfs_access_crawler_for_paths.snapshot()
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/source_code/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def test_linter_from_context(simple_ctx, make_job) -> None:
# Ensure we have at least 1 job that fails: "Deprecated file system path in call to: /mnt/things/e/f/g"
job = make_job(content="spark.read.table('a_table').write.csv('/mnt/things/e/f/g')\n")
simple_ctx.config.include_job_ids = [job.job_id]
simple_ctx.workflow_linter.refresh_report(simple_ctx.sql_backend, simple_ctx.inventory_database)
simple_ctx.workflow_linter.refresh_report()

# Verify that the 'problems' table has content.
cursor = simple_ctx.sql_backend.fetch(
Expand Down
Loading
Loading