-
Notifications
You must be signed in to change notification settings - Fork 95
Implement migration sequencing (phase 1) #2980
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
8eba903
5f65831
52c5495
1860917
ae23d20
9c63b8b
a1734b5
872d74c
18acdc0
b7b0bb2
d0a6f6d
3a411c6
5b15124
73947c2
082602b
bb56fba
bfce474
07bcab5
d0a957b
4ee30a0
5fbd3bb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,171 @@ | ||||||
from __future__ import annotations | ||||||
|
||||||
import itertools | ||||||
from collections.abc import Iterable | ||||||
from dataclasses import dataclass, field | ||||||
|
||||||
from databricks.sdk import WorkspaceClient | ||||||
from databricks.sdk.service import jobs | ||||||
|
||||||
from databricks.labs.ucx.source_code.graph import DependencyGraph | ||||||
|
||||||
|
||||||
@dataclass | ||||||
class MigrationStep: | ||||||
step_id: int | ||||||
step_number: int | ||||||
object_type: str | ||||||
object_id: str | ||||||
object_name: str | ||||||
object_owner: str | ||||||
required_step_ids: list[int] = field(default_factory=list) | ||||||
|
||||||
|
||||||
@dataclass | ||||||
class MigrationNode: | ||||||
last_node_id = 0 | ||||||
node_id: int | ||||||
object_type: str | ||||||
object_id: str | ||||||
object_name: str | ||||||
object_owner: str | ||||||
required_steps: list[MigrationNode] = field(default_factory=list) | ||||||
|
||||||
def generate_steps(self) -> tuple[MigrationStep, Iterable[MigrationStep]]: | ||||||
# traverse the nodes using a depth-first algorithm | ||||||
# ultimate leaves have a step number of 1 | ||||||
# use highest required step number + 1 for this step | ||||||
highest_step_number = 0 | ||||||
required_step_ids: list[int] = [] | ||||||
all_generated_steps: list[Iterable[MigrationStep]] = [] | ||||||
for required_step in self.required_steps: | ||||||
step, generated_steps = required_step.generate_steps() | ||||||
highest_step_number = max(highest_step_number, step.step_number) | ||||||
required_step_ids.append(step.step_id) | ||||||
all_generated_steps.append(generated_steps) | ||||||
all_generated_steps.append([step]) | ||||||
this_step = MigrationStep( | ||||||
step_id=self.node_id, | ||||||
step_number=highest_step_number + 1, | ||||||
object_type=self.object_type, | ||||||
object_id=self.object_id, | ||||||
object_name=self.object_name, | ||||||
object_owner=self.object_owner, | ||||||
required_step_ids=required_step_ids, | ||||||
) | ||||||
return this_step, itertools.chain(*all_generated_steps) | ||||||
|
||||||
def find(self, object_type: str, object_id: str) -> MigrationNode | None: | ||||||
if object_type == self.object_type and object_id == self.object_id: | ||||||
return self | ||||||
for step in self.required_steps: | ||||||
found = step.find(object_type, object_id) | ||||||
if found: | ||||||
return found | ||||||
return None | ||||||
|
||||||
|
||||||
class MigrationSequencer: | ||||||
|
||||||
def __init__(self, ws: WorkspaceClient): | ||||||
self._ws = ws | ||||||
self._root = MigrationNode( | ||||||
node_id=0, object_type="ROOT", object_id="ROOT", object_name="ROOT", object_owner="NONE" | ||||||
) | ||||||
|
||||||
def register_workflow_task(self, task: jobs.Task, job: jobs.Job, _graph: DependencyGraph) -> MigrationNode: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
make this one private. we should start from the job, not the task There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the current thinking is to leverage the existing dependency graph in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||||||
task_id = f"{job.job_id}/{task.task_key}" | ||||||
task_node = self._find_node(object_type="TASK", object_id=task_id) | ||||||
if task_node: | ||||||
return task_node | ||||||
job_node = self.register_workflow_job(job) | ||||||
MigrationNode.last_node_id += 1 | ||||||
nfx marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
task_node = MigrationNode( | ||||||
node_id=MigrationNode.last_node_id, | ||||||
object_type="TASK", | ||||||
object_id=task_id, | ||||||
object_name=task.task_key, | ||||||
object_owner=job_node.object_owner, # no task owner so use job one | ||||||
) | ||||||
job_node.required_steps.append(task_node) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this algorithm is a bit convoluted and hard to follow. please rewrite with the adjacency map and do a topological sorting with kahn's algorithm in this class. e.g. self._adjacency = collections.defaultdict(set)
...
# have a central _nodes field to have all the nodes addressable by (TYPE, ID) tuple
self._nodes[('TASK', task_id)] = MigrationNode(
node_id=self._last_node_id,
object_type="TASK",
object_id=task_id,
object_name=task.task_key,
object_owner=job_node.object_owner,
)
self._adjacency[('JOB', job_id)].append(('TASK', task_id)) ... and actual toposort is pretty straightforward with Kahn's algo: indegrees = collections.defaultdict(int)
for src, dep_set in self._adjacency.items():
indegrees[src] = len(dep_set) # count incoming dependencies for a node
queue, toposorted, sequence_num = [], [], 0
for src, incoming in indegrees.items():
if incoming > 0: continue
queue.append(src) # start with zero-dependencies nodes
while queue:
curr = queue.popleft()
toposorted.append(replace(self._nodes[curr], sequence_num=sequence_num))
sequence_num += 1
for dep in self._adjacency[curr]:
indegrees[dep] -= 1
if indegrees[dep] == 0:
queue.append(dep)
return toposorted i find it confusing with all those There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The problem with Kahn is it only works for DAGs, which we can't guarantee. We have duplicates and recursive loops. I'm trying it out but I suspect it will break on solacc. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. where do we have recursive loops? incomplete DAGs would also be fine, as node without dependencies would alway be first There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider the following code: file
file
file
the above will return the following dependency graph fragment:
When building the dependency graph we detect the recursive cycle (and we break it). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've implemented Kahn. I'll check whether it fails in PR for phase 2 (which deals with the dependency graph) |
||||||
if task.existing_cluster_id: | ||||||
cluster_node = self.register_cluster(task.existing_cluster_id) | ||||||
cluster_node.required_steps.append(task_node) | ||||||
if job_node not in cluster_node.required_steps: | ||||||
cluster_node.required_steps.append(job_node) | ||||||
# TODO register dependency graph | ||||||
return task_node | ||||||
|
||||||
def register_workflow_job(self, job: jobs.Job) -> MigrationNode: | ||||||
job_node = self._find_node(object_type="JOB", object_id=str(job.job_id)) | ||||||
if job_node: | ||||||
return job_node | ||||||
MigrationNode.last_node_id += 1 | ||||||
job_name = job.settings.name if job.settings and job.settings.name else str(job.job_id) | ||||||
job_node = MigrationNode( | ||||||
node_id=MigrationNode.last_node_id, | ||||||
object_type="JOB", | ||||||
object_id=str(job.job_id), | ||||||
object_name=job_name, | ||||||
object_owner=job.creator_user_name or "<UNKNOWN>", | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. inject use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done. Requires #2999 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. #2999 no longer needed |
||||||
) | ||||||
top_level = True | ||||||
if job.settings and job.settings.job_clusters: | ||||||
for job_cluster in job.settings.job_clusters: | ||||||
cluster_node = self.register_job_cluster(job_cluster) | ||||||
if cluster_node: | ||||||
top_level = False | ||||||
cluster_node.required_steps.append(job_node) | ||||||
if top_level: | ||||||
self._root.required_steps.append(job_node) | ||||||
return job_node | ||||||
|
||||||
def register_job_cluster(self, cluster: jobs.JobCluster) -> MigrationNode | None: | ||||||
if cluster.new_cluster: | ||||||
return None | ||||||
return self.register_cluster(cluster.job_cluster_key) | ||||||
|
||||||
def register_cluster(self, cluster_key: str) -> MigrationNode: | ||||||
cluster_node = self._find_node(object_type="CLUSTER", object_id=cluster_key) | ||||||
if cluster_node: | ||||||
return cluster_node | ||||||
details = self._ws.clusters.get(cluster_key) | ||||||
object_name = details.cluster_name if details and details.cluster_name else cluster_key | ||||||
object_owner = details.creator_user_name if details and details.creator_user_name else "<UNKNOWN>" | ||||||
MigrationNode.last_node_id += 1 | ||||||
cluster_node = MigrationNode( | ||||||
node_id=MigrationNode.last_node_id, | ||||||
object_type="CLUSTER", | ||||||
object_id=cluster_key, | ||||||
object_name=object_name, | ||||||
object_owner=object_owner, | ||||||
) | ||||||
# TODO register warehouses and policies | ||||||
self._root.required_steps.append(cluster_node) | ||||||
return cluster_node | ||||||
|
||||||
def generate_steps(self) -> Iterable[MigrationStep]: | ||||||
_root_step, generated_steps = self._root.generate_steps() | ||||||
unique_steps = self._deduplicate_steps(generated_steps) | ||||||
return self._sorted_steps(unique_steps) | ||||||
|
||||||
@staticmethod | ||||||
def _sorted_steps(steps: Iterable[MigrationStep]) -> Iterable[MigrationStep]: | ||||||
# sort by step number, lowest first | ||||||
return sorted(steps, key=lambda step: step.step_number) | ||||||
|
||||||
@staticmethod | ||||||
def _deduplicate_steps(steps: Iterable[MigrationStep]) -> Iterable[MigrationStep]: | ||||||
best_steps: dict[int, MigrationStep] = {} | ||||||
for step in steps: | ||||||
existing = best_steps.get(step.step_id, None) | ||||||
# keep the step with the highest step number | ||||||
# TODO this possibly affects the step_number of steps that depend on this one | ||||||
# but it's probably OK to not be 100% accurate initially | ||||||
if existing and existing.step_number >= step.step_number: | ||||||
continue | ||||||
best_steps[step.step_id] = step | ||||||
return best_steps.values() | ||||||
|
||||||
def _find_node(self, object_type: str, object_id: str) -> MigrationNode | None: | ||||||
return self._root.find(object_type, object_id) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
from databricks.sdk.service import jobs | ||
from databricks.sdk.service.compute import ClusterDetails | ||
|
||
from databricks.labs.ucx.sequencing.sequencing import MigrationSequencer | ||
from databricks.labs.ucx.source_code.base import CurrentSessionState | ||
from databricks.labs.ucx.source_code.graph import DependencyGraph | ||
from databricks.labs.ucx.source_code.jobs import WorkflowTask | ||
|
||
|
||
def test_cluster_from_task_has_children(ws, simple_dependency_resolver, mock_path_lookup): | ||
ws.clusters.get.return_value = ClusterDetails(cluster_name="my-cluster", creator_user_name="John Doe") | ||
task = jobs.Task(task_key="test-task", existing_cluster_id="cluster-123") | ||
settings = jobs.JobSettings(name="test-job", tasks=[task]) | ||
job = jobs.Job(job_id=1234, settings=settings) | ||
ws.jobs.get.return_value = job | ||
dependency = WorkflowTask(ws, task, job) | ||
graph = DependencyGraph(dependency, None, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()) | ||
sequencer = MigrationSequencer(ws) | ||
sequencer.register_workflow_task(task, job, graph) | ||
steps = list(sequencer.generate_steps()) | ||
step = steps[-1] | ||
assert step.step_id | ||
assert step.object_type == "CLUSTER" | ||
assert step.object_id == "cluster-123" | ||
assert step.object_name == "my-cluster" | ||
assert step.object_owner == "John Doe" | ||
assert step.step_number == 3 | ||
assert len(step.required_step_ids) == 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make fmt
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make fmt
doesn't change this code...There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with trailing comma it should
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that line is gone anyway