Skip to content

Implement migration sequencing (phase 1) #2980

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
171 changes: 171 additions & 0 deletions src/databricks/labs/ucx/sequencing/sequencing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
from __future__ import annotations

import itertools
from collections.abc import Iterable
from dataclasses import dataclass, field

from databricks.sdk import WorkspaceClient
from databricks.sdk.service import jobs

from databricks.labs.ucx.source_code.graph import DependencyGraph


@dataclass
class MigrationStep:
step_id: int
step_number: int
object_type: str
object_id: str
object_name: str
object_owner: str
required_step_ids: list[int] = field(default_factory=list)


@dataclass
class MigrationNode:
last_node_id = 0
node_id: int
object_type: str
object_id: str
object_name: str
object_owner: str
required_steps: list[MigrationNode] = field(default_factory=list)

def generate_steps(self) -> tuple[MigrationStep, Iterable[MigrationStep]]:
# traverse the nodes using a depth-first algorithm
# ultimate leaves have a step number of 1
# use highest required step number + 1 for this step
highest_step_number = 0
required_step_ids: list[int] = []
all_generated_steps: list[Iterable[MigrationStep]] = []
for required_step in self.required_steps:
step, generated_steps = required_step.generate_steps()
highest_step_number = max(highest_step_number, step.step_number)
required_step_ids.append(step.step_id)
all_generated_steps.append(generated_steps)
all_generated_steps.append([step])
this_step = MigrationStep(
step_id=self.node_id,
step_number=highest_step_number + 1,
object_type=self.object_type,
object_id=self.object_id,
object_name=self.object_name,
object_owner=self.object_owner,
required_step_ids=required_step_ids,
)
return this_step, itertools.chain(*all_generated_steps)

def find(self, object_type: str, object_id: str) -> MigrationNode | None:
if object_type == self.object_type and object_id == self.object_id:
return self
for step in self.required_steps:
found = step.find(object_type, object_id)
if found:
return found
return None


class MigrationSequencer:

def __init__(self, ws: WorkspaceClient):
self._ws = ws
self._root = MigrationNode(
node_id=0, object_type="ROOT", object_id="ROOT", object_name="ROOT", object_owner="NONE"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
node_id=0, object_type="ROOT", object_id="ROOT", object_name="ROOT", object_owner="NONE"
node_id=0, object_type="ROOT", object_id="ROOT", object_name="ROOT", object_owner="NONE",

make fmt

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make fmt doesn't change this code...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with trailing comma it should

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that line is gone anyway

)

def register_workflow_task(self, task: jobs.Task, job: jobs.Job, _graph: DependencyGraph) -> MigrationNode:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def register_workflow_task(self, task: jobs.Task, job: jobs.Job, _graph: DependencyGraph) -> MigrationNode:
def _register_workflow_task(self, task: jobs.Task, job: jobs.Job, graph: DependencyGraph) -> MigrationNode:

make this one private. we should start from the job, not the task

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the current thinking is to leverage the existing dependency graph in WorkflowLinter.refresh_report in order to avoid rebuilding it and re-fetch all assets from the workspace. Starting from the job would make that impossible.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

task_id = f"{job.job_id}/{task.task_key}"
task_node = self._find_node(object_type="TASK", object_id=task_id)
if task_node:
return task_node
job_node = self.register_workflow_job(job)
MigrationNode.last_node_id += 1
task_node = MigrationNode(
node_id=MigrationNode.last_node_id,
object_type="TASK",
object_id=task_id,
object_name=task.task_key,
object_owner=job_node.object_owner, # no task owner so use job one
)
job_node.required_steps.append(task_node)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this algorithm is a bit convoluted and hard to follow. please rewrite with the adjacency map and do a topological sorting with kahn's algorithm in this class. e.g.

self._adjacency = collections.defaultdict(set)

...

# have a central _nodes field to have all the nodes addressable by (TYPE, ID) tuple
self._nodes[('TASK', task_id)] = MigrationNode(
            node_id=self._last_node_id,
            object_type="TASK",
            object_id=task_id,
            object_name=task.task_key,
            object_owner=job_node.object_owner,
        )
self._adjacency[('JOB', job_id)].append(('TASK', task_id))

... and actual toposort is pretty straightforward with Kahn's algo:

indegrees = collections.defaultdict(int)
for src, dep_set in self._adjacency.items():
  indegrees[src] = len(dep_set) # count incoming dependencies for a node

queue, toposorted, sequence_num = [], [], 0
for src, incoming in indegrees.items():
  if incoming > 0: continue
  queue.append(src) # start with zero-dependencies nodes

while queue:
  curr = queue.popleft()
  toposorted.append(replace(self._nodes[curr], sequence_num=sequence_num))
  sequence_num += 1
  for dep in self._adjacency[curr]:
    indegrees[dep] -= 1
    if indegrees[dep] == 0:
      queue.append(dep)

return toposorted

i find it confusing with all those _deduplicate_steps and _find_node methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with Kahn is it only works for DAGs, which we can't guarantee. We have duplicates and recursive loops. I'm trying it out but I suspect it will break on solacc.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where do we have recursive loops? incomplete DAGs would also be fine, as node without dependencies would alway be first

Copy link
Contributor Author

@ericvergnaud ericvergnaud Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider the following code:

file A.py:

class A:
 
    def return_a_bee(self):
        from B import B
        return B()

file B.py:

from A import A

class B(A):
     pass

file T.py:

from A import A
from B import B

the above will return the following dependency graph fragment:

     T
    / \
   A    B
  /      \
 B        A

When building the dependency graph we detect the recursive cycle (and we break it).
In the above, there is no 0-dependency node, so not sure Kahn will work...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've implemented Kahn. I'll check whether it fails in PR for phase 2 (which deals with the dependency graph)

if task.existing_cluster_id:
cluster_node = self.register_cluster(task.existing_cluster_id)
cluster_node.required_steps.append(task_node)
if job_node not in cluster_node.required_steps:
cluster_node.required_steps.append(job_node)
# TODO register dependency graph
return task_node

def register_workflow_job(self, job: jobs.Job) -> MigrationNode:
job_node = self._find_node(object_type="JOB", object_id=str(job.job_id))
if job_node:
return job_node
MigrationNode.last_node_id += 1
job_name = job.settings.name if job.settings and job.settings.name else str(job.job_id)
job_node = MigrationNode(
node_id=MigrationNode.last_node_id,
object_type="JOB",
object_id=str(job.job_id),
object_name=job_name,
object_owner=job.creator_user_name or "<UNKNOWN>",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inject use databricks.labs.ucx.framework.owners.Ownership subclasses to properly determine object owners.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. Requires #2999

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#2999 no longer needed

)
top_level = True
if job.settings and job.settings.job_clusters:
for job_cluster in job.settings.job_clusters:
cluster_node = self.register_job_cluster(job_cluster)
if cluster_node:
top_level = False
cluster_node.required_steps.append(job_node)
if top_level:
self._root.required_steps.append(job_node)
return job_node

def register_job_cluster(self, cluster: jobs.JobCluster) -> MigrationNode | None:
if cluster.new_cluster:
return None
return self.register_cluster(cluster.job_cluster_key)

def register_cluster(self, cluster_key: str) -> MigrationNode:
cluster_node = self._find_node(object_type="CLUSTER", object_id=cluster_key)
if cluster_node:
return cluster_node
details = self._ws.clusters.get(cluster_key)
object_name = details.cluster_name if details and details.cluster_name else cluster_key
object_owner = details.creator_user_name if details and details.creator_user_name else "<UNKNOWN>"
MigrationNode.last_node_id += 1
cluster_node = MigrationNode(
node_id=MigrationNode.last_node_id,
object_type="CLUSTER",
object_id=cluster_key,
object_name=object_name,
object_owner=object_owner,
)
# TODO register warehouses and policies
self._root.required_steps.append(cluster_node)
return cluster_node

def generate_steps(self) -> Iterable[MigrationStep]:
_root_step, generated_steps = self._root.generate_steps()
unique_steps = self._deduplicate_steps(generated_steps)
return self._sorted_steps(unique_steps)

@staticmethod
def _sorted_steps(steps: Iterable[MigrationStep]) -> Iterable[MigrationStep]:
# sort by step number, lowest first
return sorted(steps, key=lambda step: step.step_number)

@staticmethod
def _deduplicate_steps(steps: Iterable[MigrationStep]) -> Iterable[MigrationStep]:
best_steps: dict[int, MigrationStep] = {}
for step in steps:
existing = best_steps.get(step.step_id, None)
# keep the step with the highest step number
# TODO this possibly affects the step_number of steps that depend on this one
# but it's probably OK to not be 100% accurate initially
if existing and existing.step_number >= step.step_number:
continue
best_steps[step.step_id] = step
return best_steps.values()

def _find_node(self, object_type: str, object_id: str) -> MigrationNode | None:
return self._root.find(object_type, object_id)
24 changes: 21 additions & 3 deletions tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,17 @@

from databricks.labs.ucx.hive_metastore import TablesCrawler
from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler
from databricks.labs.ucx.source_code.graph import BaseNotebookResolver
from databricks.labs.ucx.source_code.graph import BaseNotebookResolver, DependencyResolver
from databricks.labs.ucx.source_code.known import KnownList
from databricks.labs.ucx.source_code.linters.files import ImportFileResolver, FileLoader
from databricks.labs.ucx.source_code.notebooks.loaders import NotebookResolver, NotebookLoader
from databricks.labs.ucx.source_code.path_lookup import PathLookup
from databricks.sdk import AccountClient
from databricks.sdk.config import Config

from databricks.labs.ucx.config import WorkspaceConfig
from databricks.labs.ucx.contexts.workflow_task import RuntimeContext
from databricks.labs.ucx.source_code.python_libraries import PythonLibraryResolver

from . import mock_workspace_client

Expand Down Expand Up @@ -57,8 +61,10 @@ class CustomIterator:
def __init__(self, values):
self._values = iter(values)
self._has_next = True
self._next_value = None

def hasNext(self): # pylint: disable=invalid-name
# pylint: disable=invalid-name
def hasNext(self):
try:
self._next_value = next(self._values)
self._has_next = True
Expand Down Expand Up @@ -150,9 +156,11 @@ def inner(cb, **replace) -> RuntimeContext:
ctx.tables_crawler._spark._jsparkSession.sharedState().externalCatalog().listDatabases.return_value = (
mock_list_databases_iterator
)
# pylint: disable=protected-access
ctx.tables_crawler._spark._jsparkSession.sharedState().externalCatalog().listTables.return_value = (
mock_list_tables_iterator
)
# pylint: disable=protected-access
ctx.tables_crawler._spark._jsparkSession.sharedState().externalCatalog().getTable.return_value = (
get_table_mock
)
Expand All @@ -165,8 +173,9 @@ def inner(cb, **replace) -> RuntimeContext:

@pytest.fixture
def acc_client():
acc = create_autospec(AccountClient) # pylint: disable=mock-no-usage
acc = create_autospec(AccountClient)
acc.config = Config(host="https://accounts.cloud.databricks.com", account_id="123", token="123")
acc.assert_not_called()
return acc


Expand Down Expand Up @@ -201,3 +210,12 @@ def mock_backend() -> MockBackend:
@pytest.fixture
def ws():
return mock_workspace_client()


@pytest.fixture
def simple_dependency_resolver(mock_path_lookup: PathLookup) -> DependencyResolver:
allow_list = KnownList()
library_resolver = PythonLibraryResolver(allow_list)
notebook_resolver = NotebookResolver(NotebookLoader())
import_resolver = ImportFileResolver(FileLoader(), allow_list)
return DependencyResolver(library_resolver, notebook_resolver, import_resolver, import_resolver, mock_path_lookup)
Empty file.
28 changes: 28 additions & 0 deletions tests/unit/sequencing/test_sequencing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from databricks.sdk.service import jobs
from databricks.sdk.service.compute import ClusterDetails

from databricks.labs.ucx.sequencing.sequencing import MigrationSequencer
from databricks.labs.ucx.source_code.base import CurrentSessionState
from databricks.labs.ucx.source_code.graph import DependencyGraph
from databricks.labs.ucx.source_code.jobs import WorkflowTask


def test_cluster_from_task_has_children(ws, simple_dependency_resolver, mock_path_lookup):
ws.clusters.get.return_value = ClusterDetails(cluster_name="my-cluster", creator_user_name="John Doe")
task = jobs.Task(task_key="test-task", existing_cluster_id="cluster-123")
settings = jobs.JobSettings(name="test-job", tasks=[task])
job = jobs.Job(job_id=1234, settings=settings)
ws.jobs.get.return_value = job
dependency = WorkflowTask(ws, task, job)
graph = DependencyGraph(dependency, None, simple_dependency_resolver, mock_path_lookup, CurrentSessionState())
sequencer = MigrationSequencer(ws)
sequencer.register_workflow_task(task, job, graph)
steps = list(sequencer.generate_steps())
step = steps[-1]
assert step.step_id
assert step.object_type == "CLUSTER"
assert step.object_id == "cluster-123"
assert step.object_name == "my-cluster"
assert step.object_owner == "John Doe"
assert step.step_number == 3
assert len(step.required_step_ids) == 2
15 changes: 0 additions & 15 deletions tests/unit/source_code/conftest.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
import pytest

from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex, TableMigrationStatus
from databricks.labs.ucx.source_code.graph import DependencyResolver
from databricks.labs.ucx.source_code.known import KnownList
from databricks.labs.ucx.source_code.linters.files import ImportFileResolver, FileLoader
from databricks.labs.ucx.source_code.notebooks.loaders import NotebookLoader, NotebookResolver
from databricks.labs.ucx.source_code.path_lookup import PathLookup
from databricks.labs.ucx.source_code.python_libraries import PythonLibraryResolver


@pytest.fixture
Expand Down Expand Up @@ -51,12 +45,3 @@ def extended_test_index():
),
]
)


@pytest.fixture
def simple_dependency_resolver(mock_path_lookup: PathLookup) -> DependencyResolver:
allow_list = KnownList()
library_resolver = PythonLibraryResolver(allow_list)
notebook_resolver = NotebookResolver(NotebookLoader())
import_resolver = ImportFileResolver(FileLoader(), allow_list)
return DependencyResolver(library_resolver, notebook_resolver, import_resolver, import_resolver, mock_path_lookup)
Loading