Skip to content

Commit

Permalink
Collect direct filesystem access from queries (#2599)
Browse files Browse the repository at this point in the history
## Changes
Adds support for extracting DirectFileSystemAccess records from
workspace queries

### Linked issues
Resolves #2350

### Functionality
- [x] added a new table `directfs_in_queries`
- [x] added a new view `directfs` unioning `directfs_in_paths` with the
above

### Tests
- [x] added unit tests
- [x] manually tested schema upgrade:

![Screenshot 2024-09-13 at 16 17
22](https://github.com/user-attachments/assets/52b19491-66f4-480b-b843-185dc73a6893)


Integration with ExperimentalWorkflowLinter will be done in a separate
PR

---------

Co-authored-by: Eric Vergnaud <eric.vergnaud@databricks.com>
Co-authored-by: Serge Smertin <259697+nfx@users.noreply.github.com>
Co-authored-by: Andrew Snare <asnare@users.noreply.github.com>
  • Loading branch information
4 people authored and JCZuurmond committed Sep 18, 2024
1 parent 67d91b9 commit fa5b7b8
Show file tree
Hide file tree
Showing 9 changed files with 243 additions and 91 deletions.
17 changes: 13 additions & 4 deletions src/databricks/labs/ucx/contexts/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from databricks.labs.ucx.recon.metadata_retriever import DatabricksTableMetadataRetriever
from databricks.labs.ucx.recon.migration_recon import MigrationRecon
from databricks.labs.ucx.recon.schema_comparator import StandardSchemaComparator
from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessCrawlers
from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessCrawler
from databricks.labs.ucx.source_code.python_libraries import PythonLibraryResolver
from databricks.sdk import AccountClient, WorkspaceClient, core
from databricks.sdk.errors import ResourceDoesNotExist
Expand Down Expand Up @@ -54,6 +54,7 @@
from databricks.labs.ucx.source_code.path_lookup import PathLookup
from databricks.labs.ucx.source_code.graph import DependencyResolver
from databricks.labs.ucx.source_code.known import KnownList
from databricks.labs.ucx.source_code.queries import QueryLinter
from databricks.labs.ucx.source_code.redash import Redash
from databricks.labs.ucx.workspace_access import generic, redash
from databricks.labs.ucx.workspace_access.groups import GroupManager
Expand Down Expand Up @@ -426,13 +427,21 @@ def workflow_linter(self):
self.dependency_resolver,
self.path_lookup,
TableMigrationIndex([]), # TODO: bring back self.tables_migrator.index()
self.directfs_access_crawlers,
self.directfs_access_crawler_for_paths,
self.config.include_job_ids,
)

@cached_property
def directfs_access_crawlers(self):
return DirectFsAccessCrawlers(self.sql_backend, self.inventory_database)
def query_linter(self):
return QueryLinter(self.workspace_client, self.directfs_access_crawler_for_queries)

@cached_property
def directfs_access_crawler_for_paths(self):
return DirectFsAccessCrawler.for_paths(self.sql_backend, self.inventory_database)

@cached_property
def directfs_access_crawler_for_queries(self):
return DirectFsAccessCrawler.for_queries(self.sql_backend, self.inventory_database)

@cached_property
def redash(self):
Expand Down
9 changes: 4 additions & 5 deletions src/databricks/labs/ucx/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
from databricks.labs.ucx.installer.workflows import WorkflowsDeployment
from databricks.labs.ucx.recon.migration_recon import ReconResult
from databricks.labs.ucx.runtime import Workflows
from databricks.labs.ucx.source_code.directfs_access import DirectFsAccess
from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessInPath, DirectFsAccessInQuery
from databricks.labs.ucx.source_code.jobs import JobProblem
from databricks.labs.ucx.workspace_access.base import Permissions
from databricks.labs.ucx.workspace_access.generic import WorkspaceObjectInfo
Expand Down Expand Up @@ -121,9 +121,8 @@ def deploy_schema(sql_backend: SqlBackend, inventory_schema: str):
functools.partial(table, "udfs", Udf),
functools.partial(table, "logs", LogRecord),
functools.partial(table, "recon_results", ReconResult),
functools.partial(
table, "directfs_in_paths", DirectFsAccess
), # directfs_in_queries will be added in upcoming PR
functools.partial(table, "directfs_in_paths", DirectFsAccessInPath),
functools.partial(table, "directfs_in_queries", DirectFsAccessInQuery),
],
)
deployer.deploy_view("grant_detail", "queries/views/grant_detail.sql")
Expand All @@ -132,7 +131,7 @@ def deploy_schema(sql_backend: SqlBackend, inventory_schema: str):
deployer.deploy_view("misc_patterns", "queries/views/misc_patterns.sql")
deployer.deploy_view("code_patterns", "queries/views/code_patterns.sql")
deployer.deploy_view("reconciliation_results", "queries/views/reconciliation_results.sql")
# direct_file_system_access view will be added in upcoming PR
deployer.deploy_view("directfs", "queries/views/directfs.sql")


def extract_major_minor(version_string):
Expand Down
27 changes: 27 additions & 0 deletions src/databricks/labs/ucx/queries/views/directfs.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
SELECT
path,
is_read,
is_write,
source_id,
source_timestamp,
source_lineage,
assessment_start_timestamp,
assessment_end_timestamp,
job_id,
job_name,
task_key
FROM $inventory.directfs_in_paths
UNION ALL
SELECT
path,
is_read,
is_write,
source_id,
source_timestamp,
source_lineage,
assessment_start_timestamp,
assessment_end_timestamp,
NULL as job_id,
NULL as job_name,
null as task_key
FROM $inventory.directfs_in_queries
133 changes: 72 additions & 61 deletions src/databricks/labs/ucx/source_code/directfs_access.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
from __future__ import annotations


import dataclasses
import logging
import sys
from collections.abc import Sequence, Iterable
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, TypeVar

from databricks.labs.ucx.framework.crawlers import CrawlerBase
from databricks.labs.lsql.backends import SqlBackend
from databricks.sdk.errors import DatabricksError

from databricks.labs.ucx.framework.utils import escape_sql_identifier

if sys.version_info >= (3, 11):
from typing import Self
else:
from typing_extensions import Self


logger = logging.getLogger(__name__)


Expand All @@ -25,6 +35,14 @@ class LineageAtom:
class DirectFsAccess:
"""A record describing a Direct File System Access"""

@classmethod
def from_dict(cls, data: dict[str, Any]) -> Self:
source_lineage = data.get("source_lineage", None)
if isinstance(source_lineage, list) and len(source_lineage) > 0 and isinstance(source_lineage[0], dict):
lineage_atoms = [LineageAtom(*lineage) for lineage in source_lineage]
data["source_lineage"] = lineage_atoms
return cls(**data)

UNKNOWN = "unknown"

path: str
Expand All @@ -33,9 +51,6 @@ class DirectFsAccess:
source_id: str = UNKNOWN
source_timestamp: datetime = datetime.fromtimestamp(0)
source_lineage: list[LineageAtom] = field(default_factory=list)
job_id: int = -1
job_name: str = UNKNOWN
task_key: str = UNKNOWN
assessment_start_timestamp: datetime = datetime.fromtimestamp(0)
assessment_end_timestamp: datetime = datetime.fromtimestamp(0)

Expand All @@ -45,71 +60,80 @@ def replace_source(
source_lineage: list[LineageAtom] | None = None,
source_timestamp: datetime | None = None,
):
return DirectFsAccess(
path=self.path,
is_read=self.is_read,
is_write=self.is_write,
return dataclasses.replace(
self,
source_id=source_id or self.source_id,
source_timestamp=source_timestamp or self.source_timestamp,
source_lineage=source_lineage or self.source_lineage,
job_id=self.job_id,
job_name=self.job_name,
task_key=self.task_key,
assessment_start_timestamp=self.assessment_start_timestamp,
assessment_end_timestamp=self.assessment_start_timestamp,
)

def replace_assessment_infos(
self, assessment_start: datetime | None = None, assessment_end: datetime | None = None
):
return dataclasses.replace(
self,
assessment_start_timestamp=assessment_start or self.assessment_start_timestamp,
assessment_end_timestamp=assessment_end or self.assessment_end_timestamp,
)


@dataclass
class DirectFsAccessInQuery(DirectFsAccess):

pass


@dataclass
class DirectFsAccessInPath(DirectFsAccess):
job_id: int = -1
job_name: str = DirectFsAccess.UNKNOWN
task_key: str = DirectFsAccess.UNKNOWN

def replace_job_infos(
self,
job_id: int | None = None,
job_name: str | None = None,
task_key: str | None = None,
):
return DirectFsAccess(
path=self.path,
is_read=self.is_read,
is_write=self.is_write,
source_id=self.source_id,
source_timestamp=self.source_timestamp,
source_lineage=self.source_lineage,
job_id=job_id or self.job_id,
job_name=job_name or self.job_name,
task_key=task_key or self.task_key,
assessment_start_timestamp=self.assessment_start_timestamp,
assessment_end_timestamp=self.assessment_start_timestamp,
return dataclasses.replace(
self, job_id=job_id or self.job_id, job_name=job_name or self.job_name, task_key=task_key or self.task_key
)

def replace_assessment_infos(
self, assessment_start: datetime | None = None, assessment_end: datetime | None = None
):
return DirectFsAccess(
path=self.path,
is_read=self.is_read,
is_write=self.is_write,
source_id=self.source_id,
source_timestamp=self.source_timestamp,
source_lineage=self.source_lineage,
job_id=self.job_id,
job_name=self.job_name,
task_key=self.task_key,
assessment_start_timestamp=assessment_start or self.assessment_start_timestamp,
assessment_end_timestamp=assessment_end or self.assessment_start_timestamp,
)

T = TypeVar("T", bound=DirectFsAccess)

class _DirectFsAccessCrawler(CrawlerBase[DirectFsAccess]):

def __init__(self, backend: SqlBackend, schema: str, table: str):
class DirectFsAccessCrawler(CrawlerBase[T]):

@classmethod
def for_paths(cls, backend: SqlBackend, schema) -> DirectFsAccessCrawler:
return DirectFsAccessCrawler[DirectFsAccessInPath](
backend,
schema,
"directfs_in_paths",
DirectFsAccessInPath,
)

@classmethod
def for_queries(cls, backend: SqlBackend, schema) -> DirectFsAccessCrawler:
return DirectFsAccessCrawler[DirectFsAccessInQuery](
backend,
schema,
"directfs_in_queries",
DirectFsAccessInQuery,
)

def __init__(self, backend: SqlBackend, schema: str, table: str, klass: type[T]):
"""
Initializes a DFSACrawler instance.
Args:
sql_backend (SqlBackend): The SQL Execution Backend abstraction (either REST API or Spark)
schema: The schema name for the inventory persistence.
"""
super().__init__(backend, "hive_metastore", schema, table, DirectFsAccess)
super().__init__(backend, "hive_metastore", schema, table, klass)

def dump_all(self, dfsas: Sequence[DirectFsAccess]):
def dump_all(self, dfsas: Sequence[T]):
"""This crawler doesn't follow the pull model because the fetcher fetches data for 2 crawlers, not just one
It's not **bad** because all records are pushed at once.
Providing a multi-entity crawler is out-of-scope of this PR
Expand All @@ -120,22 +144,9 @@ def dump_all(self, dfsas: Sequence[DirectFsAccess]):
except DatabricksError as e:
logger.error("Failed to store DFSAs", exc_info=e)

def _try_fetch(self) -> Iterable[DirectFsAccess]:
sql = f"SELECT * FROM {self.full_name}"
def _try_fetch(self) -> Iterable[T]:
sql = f"SELECT * FROM {escape_sql_identifier(self.full_name)}"
yield from self._backend.fetch(sql)

def _crawl(self) -> Iterable[DirectFsAccess]:
def _crawl(self) -> Iterable[T]:
raise NotImplementedError()


class DirectFsAccessCrawlers:

def __init__(self, sql_backend: SqlBackend, schema: str):
self._sql_backend = sql_backend
self._schema = schema

def for_paths(self) -> _DirectFsAccessCrawler:
return _DirectFsAccessCrawler(self._sql_backend, self._schema, "directfs_in_paths")

def for_queries(self) -> _DirectFsAccessCrawler:
return _DirectFsAccessCrawler(self._sql_backend, self._schema, "directfs_in_queries")
31 changes: 19 additions & 12 deletions src/databricks/labs/ucx/source_code/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import tempfile
from collections.abc import Generator, Iterable
from contextlib import contextmanager
from dataclasses import dataclass
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from importlib import metadata
from pathlib import Path
Expand All @@ -27,7 +27,12 @@
file_language,
guess_encoding,
)
from databricks.labs.ucx.source_code.directfs_access import DirectFsAccess, LineageAtom, DirectFsAccessCrawlers
from databricks.labs.ucx.source_code.directfs_access import (
DirectFsAccess,
LineageAtom,
DirectFsAccessCrawler,
DirectFsAccessInPath,
)
from databricks.labs.ucx.source_code.graph import (
Dependency,
DependencyGraph,
Expand Down Expand Up @@ -336,20 +341,20 @@ def __init__(
resolver: DependencyResolver,
path_lookup: PathLookup,
migration_index: TableMigrationIndex,
directfs_crawlers: DirectFsAccessCrawlers,
directfs_crawler: DirectFsAccessCrawler,
include_job_ids: list[int] | None = None,
):
self._ws = ws
self._resolver = resolver
self._path_lookup = path_lookup
self._migration_index = migration_index
self._directfs_crawlers = directfs_crawlers
self._directfs_crawler = directfs_crawler
self._include_job_ids = include_job_ids

def refresh_report(self, sql_backend: SqlBackend, inventory_database: str):
tasks = []
all_jobs = list(self._ws.jobs.list())
logger.info(f"Preparing {len(all_jobs)} linting jobs...")
logger.info(f"Preparing {len(all_jobs)} linting tasks...")
for job in all_jobs:
if self._include_job_ids and job.job_id not in self._include_job_ids:
logger.info(f"Skipping job {job.job_id}...")
Expand All @@ -358,7 +363,7 @@ def refresh_report(self, sql_backend: SqlBackend, inventory_database: str):
logger.info(f"Running {tasks} linting tasks in parallel...")
job_results, errors = Threads.gather('linting workflows', tasks)
job_problems: list[JobProblem] = []
job_dfsas: list[DirectFsAccess] = []
job_dfsas: list[DirectFsAccessInPath] = []
for problems, dfsas in job_results:
job_problems.extend(problems)
job_dfsas.extend(dfsas)
Expand All @@ -369,11 +374,11 @@ def refresh_report(self, sql_backend: SqlBackend, inventory_database: str):
JobProblem,
mode='overwrite',
)
self._directfs_crawlers.for_paths().dump_all(job_dfsas)
self._directfs_crawler.dump_all(job_dfsas)
if len(errors) > 0:
raise ManyError(errors)

def lint_job(self, job_id: int) -> tuple[list[JobProblem], list[DirectFsAccess]]:
def lint_job(self, job_id: int) -> tuple[list[JobProblem], list[DirectFsAccessInPath]]:
try:
job = self._ws.jobs.get(job_id)
except NotFound:
Expand All @@ -388,9 +393,9 @@ def lint_job(self, job_id: int) -> tuple[list[JobProblem], list[DirectFsAccess]]

_UNKNOWN = Path('<UNKNOWN>')

def _lint_job(self, job: jobs.Job) -> tuple[list[JobProblem], list[DirectFsAccess]]:
def _lint_job(self, job: jobs.Job) -> tuple[list[JobProblem], list[DirectFsAccessInPath]]:
problems: list[JobProblem] = []
dfsas: list[DirectFsAccess] = []
dfsas: list[DirectFsAccessInPath] = []
assert job.job_id is not None
assert job.settings is not None
assert job.settings.name is not None
Expand Down Expand Up @@ -458,12 +463,14 @@ def _lint_task(

def _collect_task_dfsas(
self, task: jobs.Task, job: jobs.Job, graph: DependencyGraph, session_state: CurrentSessionState
) -> Iterable[DirectFsAccess]:
) -> Iterable[DirectFsAccessInPath]:
collector = DfsaCollectorWalker(graph, set(), self._path_lookup, session_state)
assert job.settings is not None # as already done in _lint_job
job_name = job.settings.name
for dfsa in collector:
yield dfsa.replace_job_infos(job_id=job.job_id, job_name=job_name, task_key=task.task_key)
yield DirectFsAccessInPath(**asdict(dfsa)).replace_job_infos(
job_id=job.job_id, job_name=job_name, task_key=task.task_key
)


class LintingWalker(DependencyGraphWalker[LocatedAdvice]):
Expand Down
Loading

0 comments on commit fa5b7b8

Please sign in to comment.