-
Notifications
You must be signed in to change notification settings - Fork 94
Persist workflow problems in UCX multiworkspace catalog #3756
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 8 commits
bd1758c
5b943f4
b2428b7
cfc813e
5418bd2
0803921
78da9ff
63ee0e0
8b13c92
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,14 +4,15 @@ | |
|
||
from collections.abc import Iterable | ||
from datetime import datetime, timezone | ||
from pathlib import Path | ||
|
||
from databricks.labs.blueprint.parallel import Threads | ||
from databricks.labs.lsql.backends import SqlBackend | ||
from databricks.sdk import WorkspaceClient | ||
from databricks.sdk.errors import NotFound | ||
from databricks.sdk.service import jobs | ||
|
||
from databricks.labs.ucx.framework.crawlers import CrawlerBase | ||
from databricks.labs.ucx.framework.utils import escape_sql_identifier | ||
from databricks.labs.ucx.assessment.jobs import JobsCrawler | ||
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex | ||
from databricks.labs.ucx.source_code.base import ( | ||
|
@@ -37,17 +38,28 @@ | |
logger = logging.getLogger(__name__) | ||
|
||
|
||
class WorkflowLinter: | ||
def __init__( | ||
class WorkflowLinter(CrawlerBase): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @asnare : This "crawler" does not fit the usual crawler format as it "crawls" multiple resources: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was a very difficult problem to solve: I'm not sure I reached a conclusion. I had a sense that we could use some sort of "composite" crawler than produced the different things all at once and then split them up, but never really went through the detailed effort of figuring out the details to graft those two worlds together. |
||
"""Lint workflows for UC compatibility and references to data assets. | ||
|
||
Data assets linted: | ||
- Table and view references (UsedTables) | ||
- Direct file system access (DirectFsAccess) | ||
""" | ||
|
||
def __init__( # pylint: disable=too-many-arguments | ||
self, | ||
ws: WorkspaceClient, | ||
sql_backend: SqlBackend, | ||
schema: str, | ||
jobs_crawler: JobsCrawler, | ||
resolver: DependencyResolver, | ||
path_lookup: PathLookup, | ||
migration_index: TableMigrationIndex, | ||
directfs_crawler: DirectFsAccessCrawler, | ||
used_tables_crawler: UsedTablesCrawler, | ||
): | ||
super().__init__(sql_backend, "hive_metastore", schema, "workflow_problems", JobProblem) | ||
|
||
self._ws = ws | ||
self._jobs_crawler = jobs_crawler | ||
self._resolver = resolver | ||
|
@@ -56,31 +68,34 @@ def __init__( | |
self._directfs_crawler = directfs_crawler | ||
self._used_tables_crawler = used_tables_crawler | ||
|
||
def refresh_report(self, sql_backend: SqlBackend, inventory_database: str) -> None: | ||
tasks = [] | ||
for job in self._jobs_crawler.snapshot(): | ||
tasks.append(functools.partial(self.lint_job, job.job_id)) | ||
def _try_fetch(self) -> Iterable[JobProblem]: | ||
"""Fetch all linting problems from the inventory table. | ||
|
||
If trying to fetch the linted data assets, use their respective crawlers. | ||
""" | ||
for row in self._fetch(f"SELECT * FROM {escape_sql_identifier(self.full_name)}"): | ||
yield JobProblem(*row) | ||
|
||
def _crawl(self) -> Iterable[JobProblem]: | ||
"""Crawl the workflow jobs and lint them. | ||
|
||
Next to linted workflow problems, the crawler also collects: | ||
- Table and view references (UsedTables) | ||
- Direct file system access (DirectFsAccess) | ||
""" | ||
tasks = [functools.partial(self.lint_job, job.job_id) for job in self._jobs_crawler.snapshot()] | ||
if not tasks: | ||
return | ||
logger.info(f"Running {len(tasks)} linting tasks in parallel...") | ||
job_results, errors = Threads.gather('linting workflows', tasks) | ||
job_problems: list[JobProblem] = [] | ||
job_dfsas: list[DirectFsAccess] = [] | ||
job_tables: list[UsedTable] = [] | ||
for problems, dfsas, tables in job_results: | ||
job_problems.extend(problems) | ||
job_dfsas.extend(dfsas) | ||
job_tables.extend(tables) | ||
logger.info(f"Saving {len(job_problems)} linting problems...") | ||
sql_backend.save_table( | ||
f'{inventory_database}.workflow_problems', | ||
job_problems, | ||
JobProblem, | ||
mode='overwrite', | ||
) | ||
self._directfs_crawler.dump_all(job_dfsas) | ||
self._used_tables_crawler.dump_all(job_tables) | ||
if len(errors) > 0: | ||
results, errors = Threads.gather("linting workflows", tasks) | ||
if errors: | ||
error_messages = "\n".join([str(error) for error in errors]) | ||
logger.warning(f"Errors occurred during linting:\n{error_messages}") | ||
if results: | ||
problems, dfsas, tables = zip(*results) | ||
self._directfs_crawler.dump_all(dfsas) | ||
self._used_tables_crawler.dump_all(tables) | ||
yield from problems | ||
|
||
def lint_job(self, job_id: int) -> tuple[list[JobProblem], list[DirectFsAccess], list[UsedTable]]: | ||
try: | ||
|
@@ -95,8 +110,6 @@ def lint_job(self, job_id: int) -> tuple[list[JobProblem], list[DirectFsAccess], | |
logger.warning(f"Found job problems:\n{problem_messages}") | ||
return problems, dfsas, tables | ||
|
||
_UNKNOWN = Path('<UNKNOWN>') | ||
|
||
def _lint_job(self, job: jobs.Job) -> tuple[list[JobProblem], list[DirectFsAccess], list[UsedTable]]: | ||
problems: list[JobProblem] = [] | ||
dfsas: list[DirectFsAccess] = [] | ||
|
@@ -111,12 +124,11 @@ def _lint_job(self, job: jobs.Job) -> tuple[list[JobProblem], list[DirectFsAcces | |
if not advices: | ||
advices = self._lint_task(graph, session_state) | ||
for advice in advices: | ||
absolute_path = "UNKNOWN" if advice.has_missing_path() else advice.path.absolute().as_posix() | ||
job_problem = JobProblem( | ||
job_id=job.job_id, | ||
job_name=job.settings.name, | ||
task_key=task.task_key, | ||
path=absolute_path, | ||
path=advice.path.as_posix(), | ||
code=advice.advice.code, | ||
message=advice.advice.message, | ||
start_line=advice.advice.start_line, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@asnare : This can only be
str
according to the current implementation. Do you know why? And/or what what to do with non-string fields?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are constrained because the mechanism we use to encode them as an identifier is only defined for when they're a string. I vaguely recall proposing JSON-encoding the sequence, but that was rejected.
Anything that isn't a string needs to be made available (unambiguously) as a string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we made these available as string without changing the dataclass attributes? (We want to avoid that as this is persisted as table in the UCX inventory, thus difficult to update the data types.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think read-only properties will work?