Skip to content

Implementation of an Ownership factory #3072

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/databricks/labs/ucx/assessment/clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
from collections.abc import Iterable
from dataclasses import dataclass
from typing import ClassVar
from typing import ClassVar, Any

from databricks.labs.lsql.backends import SqlBackend
from databricks.sdk import WorkspaceClient
Expand Down Expand Up @@ -191,6 +191,9 @@ class ClusterOwnership(Ownership[ClusterInfo]):
This is the cluster creator (if known).
"""

def is_applicable_to(self, record: Any) -> bool:
return isinstance(record, ClusterInfo)

def _maybe_direct_owner(self, record: ClusterInfo) -> str | None:
return record.creator

Expand Down Expand Up @@ -256,5 +259,8 @@ class ClusterPolicyOwnership(Ownership[PolicyInfo]):
This is the creator of the cluster policy (if known).
"""

def is_applicable_to(self, record: Any) -> bool:
return isinstance(record, PolicyInfo)

def _maybe_direct_owner(self, record: PolicyInfo) -> str | None:
return record.creator
5 changes: 4 additions & 1 deletion src/databricks/labs/ucx/assessment/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from hashlib import sha256
from typing import ClassVar
from typing import ClassVar, Any

from databricks.labs.lsql.backends import SqlBackend
from databricks.sdk import WorkspaceClient
Expand Down Expand Up @@ -158,6 +158,9 @@ class JobOwnership(Ownership[JobInfo]):
This is the job creator (if known).
"""

def is_applicable_to(self, record: Any) -> bool:
return isinstance(record, JobInfo)

def _maybe_direct_owner(self, record: JobInfo) -> str | None:
return record.creator

Expand Down
5 changes: 4 additions & 1 deletion src/databricks/labs/ucx/assessment/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
from collections.abc import Iterable
from dataclasses import dataclass
from typing import ClassVar
from typing import ClassVar, Any

from databricks.labs.lsql.backends import SqlBackend
from databricks.sdk import WorkspaceClient
Expand Down Expand Up @@ -86,5 +86,8 @@ class PipelineOwnership(Ownership[PipelineInfo]):
This is the pipeline creator (if known).
"""

def is_applicable_to(self, record: Any) -> bool:
return isinstance(record, PipelineInfo)

def _maybe_direct_owner(self, record: PipelineInfo) -> str | None:
return record.creator_name
22 changes: 21 additions & 1 deletion src/databricks/labs/ucx/contexts/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@
from databricks.labs.ucx.assessment.export import AssessmentExporter
from databricks.labs.ucx.aws.credentials import CredentialManager
from databricks.labs.ucx.config import WorkspaceConfig
from databricks.labs.ucx.framework.owners import AdministratorLocator, WorkspacePathOwnership, LegacyQueryOwnership
from databricks.labs.ucx.framework.owners import (
AdministratorLocator,
WorkspacePathOwnership,
Ownership,
LegacyQueryOwnership,
Record,
)
from databricks.labs.ucx.hive_metastore import ExternalLocations, MountsCrawler, TablesCrawler
from databricks.labs.ucx.hive_metastore.catalog_schema import CatalogSchema
from databricks.labs.ucx.hive_metastore.grants import (
Expand Down Expand Up @@ -571,6 +577,20 @@ def migration_recon(self) -> MigrationRecon:
def administrator_locator(self) -> AdministratorLocator:
return AdministratorLocator(self.workspace_client)

@cached_property
def ownership_factory(self) -> Callable[[Record], Ownership]:
# ensure registration of Ownerships
_ = [
self.directfs_access_ownership,
self.grant_ownership,
self.legacy_query_ownership,
self.table_migration_ownership,
self.table_ownership,
self.udf_ownership,
self.workspace_path_ownership,
]
return Ownership.for_record
Comment on lines +582 to +594
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As things stand during tests we re-create the application context for each test that needs one, and this is going to lead to the registry accumulating instances for each test that hits this property.

For this reason (and others) in the past we've preferred to avoid global singletons (which the class-hosted registry is), and move the cache to be owned and managed outside of it so that the lifecycle of the cache is tied to the application context.



class CliContext(GlobalContext, abc.ABC):
@cached_property
Expand Down
34 changes: 30 additions & 4 deletions src/databricks/labs/ucx/framework/owners.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from __future__ import annotations
import logging
from abc import ABC, abstractmethod
from collections.abc import Callable, Iterable, Sequence
from dataclasses import dataclass
from datetime import timedelta
from functools import cached_property
from typing import Generic, TypeVar, final
from typing import Generic, TypeVar, final, Any

from databricks.labs.blueprint.paths import WorkspacePath
from databricks.sdk import WorkspaceClient
Expand Down Expand Up @@ -169,8 +171,21 @@ def get_workspace_administrator(self) -> str:
class Ownership(ABC, Generic[Record]):
"""Determine an owner for a given type of object."""

_ownerships: set[Ownership] = set()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the type annotation here is quite correct: it needs to be ClassVar[…]? (Without this I believe it is considered by the type tooling to be an instance property.)


@classmethod
def for_record(cls, record: Any) -> Ownership[Record]:
for ownership in cls._ownerships:
if ownership.is_applicable_to(record):
return ownership
raise ValueError(f"Ownership not implemented or not registered for {type(record).__name__}")
Comment on lines +176 to +181
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Elsewhere I've commented on this API, and whether it should take the type of the record instead of an instance.

I'd really like this method to have a docstring. The current design allows for some ambiguous corner cases like multiple registered instances supporting a given record. (This remains even if we switch to type-based lookups.)


def __init__(self, administrator_locator: AdministratorLocator) -> None:
self._administrator_locator = administrator_locator
self._ownerships.add(self)

@abstractmethod
def is_applicable_to(self, record: Any) -> bool: ...
Comment on lines +187 to +188
Copy link
Contributor

@asnare asnare Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a design perspective, I think I'd prefer this to take the type of the record as an argument rather than an instance thereof. The reason for this is that it would then encourage the caller to get the encoder and re-use it for a collection, which is the usual use-case. As it stands the current API encourages the lookup for each record within a loop over a collection, and the lookups are reasonably expensive due to the iteration required.

I'm on the fence about whether it should be a class method or not, although leaning towards a class method being more accurate. (For singletons, which these are, I agree that the distinction is more academic than practical.)

Assuming the current API, I think a default implementation is possible (and preferable) here:

  1. A cached property, initialised via typing.get_args(self), to get the generic type of the instance.
  2. Use isinstance() as in the current implementations against the cached property.

(If we switch to accepting the type rather than an instance then the details obviously change.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I missed this comment indicating that we want to allow instances to support multiple types.

That said, I don't fully follow the reasoning: type-based lookups can also support multiple instance types (A | B) and encourage a single lookup rather than the (current) situation that encourages callers to implicitly linear probe the registry within a loop. I think I'm missing something here, and hope that @nfx can help me understand what he has in mind.


@final
def owner_of(self, record: Record) -> str:
Expand Down Expand Up @@ -201,6 +216,9 @@ def __init__(self, administrator_locator: AdministratorLocator, ws: WorkspaceCli
super().__init__(administrator_locator)
self._ws = ws

def is_applicable_to(self, record: Any) -> bool:
return isinstance(record, WorkspacePath)

def owner_of_path(self, path: str) -> str:
return self.owner_of(WorkspacePath(self._ws, path))

Expand Down Expand Up @@ -242,14 +260,22 @@ def _infer_from_first_can_manage(object_permissions):
return None


class LegacyQueryOwnership(Ownership[str]):
@dataclass
class LegacyQueryPath:
path: str


class LegacyQueryOwnership(Ownership[LegacyQueryPath]):
def __init__(self, administrator_locator: AdministratorLocator, workspace_client: WorkspaceClient) -> None:
super().__init__(administrator_locator)
self._workspace_client = workspace_client

def _maybe_direct_owner(self, record: str) -> str | None:
def is_applicable_to(self, record: Any) -> bool:
return isinstance(record, LegacyQueryPath)

def _maybe_direct_owner(self, record: LegacyQueryPath) -> str | None:
try:
legacy_query = self._workspace_client.queries.get(record)
legacy_query = self._workspace_client.queries.get(record.path)
return legacy_query.owner_user_name
except NotFound:
return None
Expand Down
5 changes: 4 additions & 1 deletion src/databricks/labs/ucx/hive_metastore/grants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from collections.abc import Callable, Iterable
from dataclasses import dataclass, replace
from functools import partial, cached_property
from typing import ClassVar, Protocol
from typing import ClassVar, Protocol, Any

from databricks.labs.blueprint.installation import Installation
from databricks.labs.blueprint.parallel import ManyError, Threads
Expand Down Expand Up @@ -404,6 +404,9 @@ class GrantOwnership(Ownership[Grant]):
At the present we can't determine a specific owner for grants.
"""

def is_applicable_to(self, record: Any) -> bool:
return isinstance(record, Grant)

def _maybe_direct_owner(self, record: Grant) -> None:
return None

Expand Down
11 changes: 10 additions & 1 deletion src/databricks/labs/ucx/hive_metastore/ownership.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import logging
from functools import cached_property
from typing import Any

from databricks.labs.ucx.framework.owners import (
Ownership,
AdministratorLocator,
LegacyQueryOwnership,
WorkspacePathOwnership,
LegacyQueryPath,
)
from databricks.labs.ucx.hive_metastore import TablesCrawler
from databricks.labs.ucx.hive_metastore.grants import GrantsCrawler
Expand Down Expand Up @@ -40,6 +42,9 @@ def __init__(
self._legacy_query_ownership = legacy_query_ownership
self._workspace_path_ownership = workspace_path_ownership

def is_applicable_to(self, record: Any) -> bool:
return isinstance(record, Table)

def _maybe_direct_owner(self, record: Table) -> str | None:
owner = self._maybe_from_grants(record)
if owner:
Expand All @@ -54,7 +59,7 @@ def _maybe_from_sources(self, record: Table) -> str | None:
if not used_table.is_write:
return None
if used_table.source_type == 'QUERY' and used_table.query_id:
return self._legacy_query_ownership.owner_of(used_table.query_id)
return self._legacy_query_ownership.owner_of(LegacyQueryPath(used_table.query_id))
if used_table.source_type in {'NOTEBOOK', 'FILE'}:
return self._workspace_path_ownership.owner_of_path(used_table.source_id)
logger.warning(f"Unknown source type {used_table.source_type} for {used_table.source_id}")
Expand Down Expand Up @@ -97,7 +102,11 @@ def __init__(self, tables_crawler: TablesCrawler, table_ownership: TableOwnershi
self._table_ownership = table_ownership
self._indexed_tables: dict[tuple[str, str], Table] | None = None

def is_applicable_to(self, record: Any) -> bool:
return isinstance(record, TableMigrationStatus)

def _tables_snapshot_index(self, reindex: bool = False) -> dict[tuple[str, str], Table]:

index = self._indexed_tables
if index is None or reindex:
snapshot = self._tables_crawler.snapshot()
Expand Down
5 changes: 4 additions & 1 deletion src/databricks/labs/ucx/hive_metastore/udfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from collections.abc import Iterable
from dataclasses import dataclass, replace
from functools import partial
from typing import ClassVar
from typing import ClassVar, Any

from databricks.labs.blueprint.parallel import Threads
from databricks.labs.lsql.backends import SqlBackend
Expand Down Expand Up @@ -151,5 +151,8 @@ class UdfOwnership(Ownership[Udf]):
At the present we don't determine a specific owner for UDFs.
"""

def is_applicable_to(self, record: Any) -> bool:
return isinstance(record, Udf)

def _maybe_direct_owner(self, record: Udf) -> None:
return None
7 changes: 6 additions & 1 deletion src/databricks/labs/ucx/source_code/directfs_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import logging
from collections.abc import Sequence, Iterable
from typing import Any

from databricks.labs.blueprint.paths import WorkspacePath
from databricks.sdk import WorkspaceClient
Expand All @@ -15,6 +16,7 @@
AdministratorLocator,
WorkspacePathOwnership,
LegacyQueryOwnership,
LegacyQueryPath,
)
from databricks.labs.ucx.framework.utils import escape_sql_identifier
from databricks.labs.ucx.source_code.base import DirectFsAccess
Expand Down Expand Up @@ -86,9 +88,12 @@ def __init__(
self._legacy_query_ownership = legacy_query_ownership
self._workspace_client = workspace_client

def is_applicable_to(self, record: Any) -> bool:
return isinstance(record, DirectFsAccess)

def _maybe_direct_owner(self, record: DirectFsAccess) -> str | None:
if record.source_type == 'QUERY' and record.query_id:
return self._legacy_query_ownership.owner_of(record.query_id)
return self._legacy_query_ownership.owner_of(LegacyQueryPath(record.query_id))
if record.source_type in {'NOTEBOOK', 'FILE'}:
return self._notebook_owner(record)
logger.warning(f"Unknown source type {record.source_type} for {record.source_id}")
Expand Down
26 changes: 25 additions & 1 deletion tests/unit/contexts/test_application.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
from unittest.mock import create_autospec

import pytest
from databricks.labs.blueprint.paths import WorkspacePath
from databricks.labs.lsql.backends import MockBackend

from databricks.labs.ucx.contexts.application import GlobalContext
from databricks.labs.ucx.contexts.workspace_cli import LocalCheckoutContext
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex
from databricks.labs.ucx.framework.owners import Ownership
from databricks.labs.ucx.hive_metastore.grants import Grant
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex, TableMigrationStatus
from databricks.labs.ucx.hive_metastore.table_migrate import TablesMigrator
from databricks.labs.ucx.hive_metastore.tables import Table
from databricks.labs.ucx.hive_metastore.udfs import Udf
from databricks.labs.ucx.source_code.base import DirectFsAccess
from databricks.labs.ucx.source_code.linters.context import LinterContext
from tests.unit import mock_workspace_client

Expand All @@ -23,6 +29,7 @@
"used_tables_crawler_for_paths",
"used_tables_crawler_for_queries",
"verify_has_ucx_catalog",
"ownership_factory",
],
)
def test_global_context_attributes_not_none(attribute: str) -> None:
Expand All @@ -44,3 +51,20 @@ def test_local_context_attributes_not_none(attribute: str) -> None:
ctx.replace(languages=LinterContext(TableMigrationIndex([])), tables_migrator=tables_migrator)
assert hasattr(ctx, attribute)
assert getattr(ctx, attribute) is not None


@pytest.mark.parametrize(
"record",
[
DirectFsAccess(),
WorkspacePath(mock_workspace_client()),
Grant("x", "y"),
Table("a", "b", "c", "d", "e"),
Udf("a", "b", "c", "d", "e", "a", False, "c", "d", "e"),
TableMigrationStatus("x", "y"),
],
)
def test_ownership_factory_succeeds(record: type):
ctx = GlobalContext().replace(workspace_client=mock_workspace_client(), sql_backend=MockBackend())
ownership = ctx.ownership_factory(record)
assert isinstance(ownership, Ownership)
4 changes: 4 additions & 0 deletions tests/unit/framework/test_owners.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import re
from collections.abc import Callable, Sequence
from typing import Any
from unittest.mock import create_autospec, Mock

import pytest
Expand Down Expand Up @@ -27,6 +28,9 @@ def __init__(
self._owner_fn = owner_fn
self.mock_admin_locator = mock_admin_locator

def is_applicable_to(self, record: Any) -> bool:
return True

def _maybe_direct_owner(self, record: Record) -> str | None:
return self._owner_fn(record)

Expand Down
Loading