diff --git a/adlfs/__init__.py b/adlfs/__init__.py index 54cf1a9b..f55776e6 100644 --- a/adlfs/__init__.py +++ b/adlfs/__init__.py @@ -1,5 +1,12 @@ from .gen1 import AzureDatalakeFileSystem +from .onelake import OneLakeFile, OneLakeFileSystem from .spec import AzureBlobFile, AzureBlobFileSystem from .utils import __version__, version_tuple # noqa: F401 -__all__ = ["AzureBlobFileSystem", "AzureBlobFile", "AzureDatalakeFileSystem"] +__all__ = [ + "AzureBlobFileSystem", + "AzureBlobFile", + "AzureDatalakeFileSystem", + "OneLakeFileSystem", + "OneLakeFile", +] diff --git a/adlfs/onelake.py b/adlfs/onelake.py new file mode 100644 index 00000000..5cb2551d --- /dev/null +++ b/adlfs/onelake.py @@ -0,0 +1,501 @@ +# -*- coding: utf-8 -*- + +from __future__ import absolute_import, division, print_function + +import logging +import os + +from azure.core.exceptions import ResourceNotFoundError +from azure.storage.filedatalake.aio import ( + DataLakeServiceClient as AIODataLakeServiceClient, +) +from fsspec.asyn import AsyncFileSystem, sync, sync_wrapper +from fsspec.spec import AbstractBufferedFile +from fsspec.utils import infer_storage_options + +logger = logging.getLogger(__name__) + + +class OneLakeFileSystem(AsyncFileSystem): + """ + Access Microsoft OneLake using Azure Data Lake Storage Gen2 API. + + OneLake is the built-in, data lake for Microsoft Fabric that's automatically provisioned + with every Microsoft Fabric tenant. + + Parameters + ---------- + account_name : str + OneLake account name (typically in the format: onelake.dfs.fabric.microsoft.com) + workspace_name : str, optional + The name of the Fabric workspace + lakehouse_name : str, optional + The name of the lakehouse within the workspace + tenant_id : str + Azure tenant ID for authentication + client_id : str + Azure AD application client ID + client_secret : str + Azure AD application client secret + credential : azure.core.credentials.TokenCredential, optional + Azure credential object for authentication + anon : bool, optional + Use anonymous access (default: False) + + Examples + -------- + >>> from adlfs import OneLakeFileSystem + >>> fs = OneLakeFileSystem( + ... account_name="onelake", + ... tenant_id="your-tenant-id", + ... client_id="your-client-id", + ... client_secret="your-client-secret" + ... ) + >>> fs.ls("") + """ + + protocol = ("onelake", "abfss") + + def __init__( + self, + account_name: str = None, + workspace_name: str = None, + lakehouse_name: str = None, + tenant_id: str = None, + client_id: str = None, + client_secret: str = None, + credential=None, + anon: bool = False, + loop=None, + asynchronous: bool = False, + **kwargs, + ): + # Import here to avoid circular imports + from fsspec.asyn import get_loop + + super().__init__(asynchronous=asynchronous, loop=loop or get_loop(), **kwargs) + + self.account_name = account_name or "onelake" + self.workspace_name = workspace_name + self.lakehouse_name = lakehouse_name + self.tenant_id = tenant_id or os.getenv("AZURE_TENANT_ID") + self.client_id = client_id or os.getenv("AZURE_CLIENT_ID") + self.client_secret = client_secret or os.getenv("AZURE_CLIENT_SECRET") + self.credential = credential + self.anon = anon + + # OneLake uses a specific endpoint format + self.account_url = f"https://{self.account_name}.dfs.fabric.microsoft.com" + + self._setup_credentials() + self.do_connect() + + def _setup_credentials(self): + """Setup authentication credentials for OneLake access.""" + if ( + not self.anon + and not self.credential + and self.client_id + and self.client_secret + and self.tenant_id + ): + from azure.identity import ClientSecretCredential + from azure.identity.aio import ( + ClientSecretCredential as AIOClientSecretCredential, + ) + + self.credential = AIOClientSecretCredential( + tenant_id=self.tenant_id, + client_id=self.client_id, + client_secret=self.client_secret, + ) + self.sync_credential = ClientSecretCredential( + tenant_id=self.tenant_id, + client_id=self.client_id, + client_secret=self.client_secret, + ) + + def do_connect(self): + """Establish connection to OneLake service.""" + import weakref + + if self.credential: + self.service_client = AIODataLakeServiceClient( + account_url=self.account_url, credential=self.credential + ) + elif self.anon: + self.service_client = AIODataLakeServiceClient(account_url=self.account_url) + else: + raise ValueError( + "OneLake requires authentication. Provide credentials or set anon=True" + ) + + # Setup cleanup for service client + weakref.finalize(self, self._cleanup_service_client, self.service_client) + + @staticmethod + def _cleanup_service_client(service_client): + """Cleanup service client resources""" + try: + if hasattr(service_client, "close"): + # For sync cleanup, we need to use asyncio + import asyncio + + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + # Can't run cleanup in same loop, schedule it + loop.create_task(service_client.close()) + else: + loop.run_until_complete(service_client.close()) + except Exception: + # If we can't clean up properly, at least try to close the session + pass + except Exception: + pass + + @classmethod + def _strip_protocol(cls, path: str): + """Remove the protocol from the path.""" + # Handle both onelake:// and abfss:// protocols + if path.startswith("onelake://"): + path = path[10:] + elif path.startswith("abfss://"): + # For abfss URLs, we need to parse the URL and extract the path + from urllib.parse import urlparse + + parsed = urlparse(path) + if "onelake.dfs.fabric.microsoft.com" in parsed.netloc: + # Extract workspace from username part and combine with path + if "@" in parsed.netloc: + workspace = parsed.netloc.split("@")[0] + path = f"{workspace}{parsed.path}" + else: + path = parsed.path + else: + # Not a OneLake URL, return as-is without protocol + path = path[8:] # Remove "abfss://" + return path.lstrip("/") + + @staticmethod + def _get_kwargs_from_urls(urlpath): + """Extract parameters from OneLake URLs.""" + ops = infer_storage_options(urlpath) + out = {} + + # Parse OneLake-specific URL structure + host = ops.get("host", "") + if "onelake.dfs.fabric.microsoft.com" in host: + out["account_name"] = "onelake" + + # Check if we have username (from abfss://username@host format) + username = ops.get("username") + if username: + # For abfss URLs, workspace is the username part + out["workspace_name"] = username + + # Lakehouse is the first part of the path + path_parts = ops.get("path", "").strip("/").split("/") + if len(path_parts) >= 1 and path_parts[0]: + out["lakehouse_name"] = path_parts[0] + else: + # For regular onelake:// URLs, extract from path + path_parts = ops.get("path", "").strip("/").split("/") + if len(path_parts) >= 1 and path_parts[0]: + out["workspace_name"] = path_parts[0] + if len(path_parts) >= 2 and path_parts[1]: + out["lakehouse_name"] = path_parts[1] + + return out + + def split_path(self, path: str): + """Split OneLake path into workspace, lakehouse, and file path components.""" + path = self._strip_protocol(path).strip("/") + + if not path: + return "", "", "" + + parts = path.split("/", 2) + workspace = parts[0] if len(parts) > 0 else "" + lakehouse = parts[1] if len(parts) > 1 else "" + file_path = parts[2] if len(parts) > 2 else "" + + return workspace, lakehouse, file_path + + async def _ls(self, path: str = "", detail: bool = True, **kwargs): + """List files and directories in OneLake.""" + path = self._strip_protocol(path).strip("/") + workspace, lakehouse, file_path = self.split_path(path) + + if not workspace: + # List workspaces (this would require Fabric API, simplified for now) + if self.workspace_name: + return [ + {"name": self.workspace_name, "type": "directory", "size": None} + ] + else: + raise NotImplementedError( + "Listing all workspaces requires Fabric API access" + ) + + if not lakehouse: + # List lakehouses in workspace (simplified) + if self.lakehouse_name: + full_path = f"{workspace}/{self.lakehouse_name}" + return [{"name": full_path, "type": "directory", "size": None}] + else: + raise NotImplementedError( + "Listing lakehouses requires Fabric API access" + ) + + # For OneLake, the file system is the workspace, and lakehouse is part of the path + file_system_name = workspace + lakehouse_path = f"{lakehouse}/{file_path}" if file_path else lakehouse + + try: + async with self.service_client.get_file_system_client( + file_system_name + ) as file_system_client: + paths = file_system_client.get_paths( + path=lakehouse_path, recursive=False + ) + + results = [] + async for path_item in paths: + # Construct the full path correctly based on the Azure response + full_name = f"{workspace}/{path_item.name}" + results.append( + { + "name": full_name, + "type": "directory" if path_item.is_directory else "file", + "size": path_item.content_length + if hasattr(path_item, "content_length") + else None, + "last_modified": path_item.last_modified + if hasattr(path_item, "last_modified") + else None, + } + ) + + return results if detail else [r["name"] for r in results] + + except ResourceNotFoundError: + raise FileNotFoundError(f"Path not found: {path}") + + async def _info(self, path: str, **kwargs): + """Get information about a file or directory.""" + path = self._strip_protocol(path).strip("/") + workspace, lakehouse, file_path = self.split_path(path) + + if not workspace or not lakehouse: + return {"name": path, "type": "directory", "size": None} + + # For OneLake, the file system is the workspace, and lakehouse is part of the path + file_system_name = workspace + lakehouse_file_path = f"{lakehouse}/{file_path}" if file_path else lakehouse + + try: + async with self.service_client.get_file_system_client( + file_system_name + ) as file_system_client: + if file_path: + async with file_system_client.get_file_client( + lakehouse_file_path + ) as file_client: + properties = await file_client.get_file_properties() + return { + "name": path, + "type": "file", + "size": properties.size, + "last_modified": properties.last_modified, + } + else: + # Directory + return {"name": path, "type": "directory", "size": None} + + except ResourceNotFoundError: + raise FileNotFoundError(f"Path not found: {path}") + + async def _cat_file(self, path: str, start: int = None, end: int = None, **kwargs): + """Read file content from OneLake.""" + path = self._strip_protocol(path).strip("/") + workspace, lakehouse, file_path = self.split_path(path) + + if not workspace or not lakehouse or not file_path: + raise ValueError("Invalid path format for OneLake file") + + # For OneLake, the file system is the workspace, and lakehouse is part of the path + file_system_name = workspace + lakehouse_file_path = f"{lakehouse}/{file_path}" + + try: + async with self.service_client.get_file_system_client( + file_system_name + ) as file_system_client: + async with file_system_client.get_file_client( + lakehouse_file_path + ) as file_client: + download_stream = await file_client.download_file( + offset=start, length=end - start if end else None + ) + return await download_stream.readall() + + except ResourceNotFoundError: + raise FileNotFoundError(f"File not found: {path}") + + async def _pipe_file(self, path: str, data: bytes, **kwargs): + """Write data to a file in OneLake.""" + path = self._strip_protocol(path).strip("/") + workspace, lakehouse, file_path = self.split_path(path) + + if not workspace or not lakehouse or not file_path: + raise ValueError("Invalid path format for OneLake file") + + # For OneLake, the file system is the workspace, and lakehouse is part of the path + file_system_name = workspace + lakehouse_file_path = f"{lakehouse}/{file_path}" + + try: + async with self.service_client.get_file_system_client( + file_system_name + ) as file_system_client: + async with file_system_client.get_file_client( + lakehouse_file_path + ) as file_client: + await file_client.create_file() + await file_client.append_data(data, offset=0, length=len(data)) + await file_client.flush_data(len(data)) + + except Exception as e: + raise IOError(f"Failed to write file {path}: {e}") + + async def _rm_file(self, path: str, **kwargs): + """Delete a file from OneLake.""" + path = self._strip_protocol(path).strip("/") + workspace, lakehouse, file_path = self.split_path(path) + + if not workspace or not lakehouse or not file_path: + raise ValueError("Invalid path format for OneLake file") + + # For OneLake, the file system is the workspace, and lakehouse is part of the path + file_system_name = workspace + lakehouse_file_path = f"{lakehouse}/{file_path}" + + try: + async with self.service_client.get_file_system_client( + file_system_name + ) as file_system_client: + async with file_system_client.get_file_client( + lakehouse_file_path + ) as file_client: + await file_client.delete_file() + + except ResourceNotFoundError: + pass # File already deleted + + async def _mkdir(self, path: str, create_parents: bool = True, **kwargs): + """Create a directory in OneLake.""" + path = self._strip_protocol(path).strip("/") + workspace, lakehouse, file_path = self.split_path(path) + + if not workspace or not lakehouse: + # Can't create workspaces or lakehouses through this API + raise NotImplementedError( + "Creating workspaces/lakehouses requires Fabric API" + ) + + # For OneLake, the file system is the workspace, and lakehouse is part of the path + file_system_name = workspace + lakehouse_dir_path = f"{lakehouse}/{file_path}" + + try: + async with self.service_client.get_file_system_client( + file_system_name + ) as file_system_client: + async with file_system_client.get_directory_client( + lakehouse_dir_path + ) as directory_client: + await directory_client.create_directory() + + except Exception as e: + raise IOError(f"Failed to create directory {path}: {e}") + + # Sync wrappers + ls = sync_wrapper(_ls) + info = sync_wrapper(_info) + cat_file = sync_wrapper(_cat_file) + pipe_file = sync_wrapper(_pipe_file) + rm_file = sync_wrapper(_rm_file) + mkdir = sync_wrapper(_mkdir) + + def _open( + self, + path: str, + mode: str = "rb", + block_size: int = None, + autocommit: bool = True, + cache_options: dict = None, + **kwargs, + ): + """Open a file for reading or writing.""" + return OneLakeFile( + fs=self, + path=path, + mode=mode, + block_size=block_size, + autocommit=autocommit, + cache_options=cache_options or {}, + **kwargs, + ) + + +class OneLakeFile(AbstractBufferedFile): + """File-like operations on OneLake files.""" + + def __init__( + self, + fs: OneLakeFileSystem, + path: str, + mode: str = "rb", + block_size: int = None, + autocommit: bool = True, + cache_options: dict = None, + **kwargs, + ): + self.fs = fs + self.path = path + self.mode = mode + self.autocommit = autocommit + + workspace, lakehouse, file_path = fs.split_path(path) + self.workspace = workspace + self.lakehouse = lakehouse + self.file_path = file_path + + if not workspace or not lakehouse or not file_path: + raise ValueError("Invalid OneLake path format") + + self.container_path = f"{workspace}/{lakehouse}" + + super().__init__( + fs=fs, + path=path, + mode=mode, + block_size=block_size, + cache_options=cache_options or {}, + **kwargs, + ) + + def _fetch_range(self, start: int, end: int): + """Fetch a range of bytes from the file.""" + return sync(self.fs.loop, self.fs._cat_file, self.path, start=start, end=end) + + def _upload_chunk(self, final: bool = False, **kwargs): + """Upload a chunk of data.""" + if self.mode in {"wb", "ab"}: + data = self.buffer.getvalue() + if data or final: + sync(self.fs.loop, self.fs._pipe_file, self.path, data) + self.offset = self.offset + len(data) if self.offset else len(data) + return True + return False diff --git a/adlfs/spec.py b/adlfs/spec.py index 5ff509e1..c4b104fb 100644 --- a/adlfs/spec.py +++ b/adlfs/spec.py @@ -436,6 +436,12 @@ def _get_kwargs_from_urls(urlpath): out = {} host = ops.get("host", None) if host: + # Check if this is a OneLake URL (should be routed to OneLakeFileSystem) + if "onelake.dfs.fabric.microsoft.com" in host: + # This is a OneLake URL, don't process it here + # The fsspec registry should route to OneLakeFileSystem + return out + match = re.match( r"(?P.+)\.(dfs|blob)\.core\.windows\.net", host ) diff --git a/adlfs/tests/test_onelake.py b/adlfs/tests/test_onelake.py new file mode 100644 index 00000000..67f9f384 --- /dev/null +++ b/adlfs/tests/test_onelake.py @@ -0,0 +1,670 @@ +import os +from unittest import mock +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from azure.core.exceptions import ResourceNotFoundError + +from adlfs import OneLakeFile, OneLakeFileSystem + + +def create_async_context_manager_mock(): + """Helper to create a proper async context manager mock.""" + mock_context = AsyncMock() + mock_context.__aenter__ = AsyncMock(return_value=mock_context) + mock_context.__aexit__ = AsyncMock(return_value=False) + return mock_context + + +class TestOneLakeFileSystem: + """Test cases for OneLakeFileSystem""" + + def test_init_with_credentials(self): + """Test initialization with client credentials""" + fs = OneLakeFileSystem( + account_name="onelake", + workspace_name="test_workspace", + lakehouse_name="test_lakehouse", + tenant_id="test-tenant", + client_id="test-client", + client_secret="test-secret", + ) + + assert fs.account_name == "onelake" + assert fs.workspace_name == "test_workspace" + assert fs.lakehouse_name == "test_lakehouse" + assert fs.tenant_id == "test-tenant" + assert fs.client_id == "test-client" + assert fs.client_secret == "test-secret" + assert not fs.anon + assert fs.account_url == "https://onelake.dfs.fabric.microsoft.com" + + def test_init_anonymous(self): + """Test initialization with anonymous access""" + fs = OneLakeFileSystem(anon=True) + + assert fs.account_name == "onelake" + assert fs.anon is True + + def test_init_with_env_vars(self): + """Test initialization using environment variables""" + with mock.patch.dict( + os.environ, + { + "AZURE_TENANT_ID": "env-tenant", + "AZURE_CLIENT_ID": "env-client", + "AZURE_CLIENT_SECRET": "env-secret", + }, + ): + fs = OneLakeFileSystem() + + assert fs.tenant_id == "env-tenant" + assert fs.client_id == "env-client" + assert fs.client_secret == "env-secret" + + def test_strip_protocol(self): + """Test URL protocol stripping""" + assert ( + OneLakeFileSystem._strip_protocol("onelake://workspace/lakehouse/file.txt") + == "workspace/lakehouse/file.txt" + ) + assert ( + OneLakeFileSystem._strip_protocol("workspace/lakehouse/file.txt") + == "workspace/lakehouse/file.txt" + ) + assert ( + OneLakeFileSystem._strip_protocol("/workspace/lakehouse/file.txt") + == "workspace/lakehouse/file.txt" + ) + + # Test abfss URL stripping for OneLake + assert ( + OneLakeFileSystem._strip_protocol( + "abfss://workspace@onelake.dfs.fabric.microsoft.com/lakehouse/Files/file.txt" + ) + == "workspace/lakehouse/Files/file.txt" + ) + + def test_get_kwargs_from_urls(self): + """Test URL parsing for kwargs extraction""" + kwargs = OneLakeFileSystem._get_kwargs_from_urls( + "onelake://onelake.dfs.fabric.microsoft.com/workspace/lakehouse/file.txt" + ) + + assert kwargs.get("account_name") == "onelake" + assert kwargs.get("workspace_name") == "workspace" + assert kwargs.get("lakehouse_name") == "lakehouse" + + # Test abfss URL parsing for OneLake + abfss_kwargs = OneLakeFileSystem._get_kwargs_from_urls( + "abfss://q_dev_workspace@onelake.dfs.fabric.microsoft.com/qdata_dev_lh.Lakehouse/Files/Upload_Test" + ) + + assert abfss_kwargs.get("account_name") == "onelake" + assert abfss_kwargs.get("workspace_name") == "q_dev_workspace" + assert abfss_kwargs.get("lakehouse_name") == "qdata_dev_lh.Lakehouse" + + def test_split_path(self): + """Test path splitting into components""" + fs = OneLakeFileSystem(anon=True) + + # Test full path + workspace, lakehouse, file_path = fs.split_path( + "workspace/lakehouse/folder/file.txt" + ) + assert workspace == "workspace" + assert lakehouse == "lakehouse" + assert file_path == "folder/file.txt" + + # Test partial paths + workspace, lakehouse, file_path = fs.split_path("workspace/lakehouse") + assert workspace == "workspace" + assert lakehouse == "lakehouse" + assert file_path == "" + + workspace, lakehouse, file_path = fs.split_path("workspace") + assert workspace == "workspace" + assert lakehouse == "" + assert file_path == "" + + # Test empty path + workspace, lakehouse, file_path = fs.split_path("") + assert workspace == "" + assert lakehouse == "" + assert file_path == "" + + @pytest.mark.asyncio + async def test_ls_with_workspace_and_lakehouse(self): + """Test listing files when workspace and lakehouse are known""" + with patch("adlfs.onelake.AIODataLakeServiceClient"): + fs = OneLakeFileSystem( + workspace_name="test_workspace", + lakehouse_name="test_lakehouse", + anon=True, + ) + + # Mock the service client - don't make it an AsyncMock since we need regular return values + mock_service_client = MagicMock() + fs.service_client = mock_service_client + + # Create the file system client as an async context manager + mock_file_system_client = ( + MagicMock() + ) # Changed from AsyncMock to MagicMock for non-async methods + mock_fs_context = create_async_context_manager_mock() + mock_fs_context.__aenter__.return_value = mock_file_system_client + mock_service_client.get_file_system_client.return_value = mock_fs_context + + # Mock path items + mock_path1 = MagicMock() + mock_path1.name = "file1.txt" + mock_path1.is_directory = False + mock_path1.content_length = 1024 + + mock_path2 = MagicMock() + mock_path2.name = "folder1" + mock_path2.is_directory = True + + class MockAsyncIterator: + def __init__(self, items): + self.items = items + self.index = 0 + + def __aiter__(self): + return self + + async def __anext__(self): + if self.index >= len(self.items): + raise StopAsyncIteration + item = self.items[self.index] + self.index += 1 + return item + + mock_file_system_client.get_paths.return_value = MockAsyncIterator( + [mock_path1, mock_path2] + ) + + result = await fs._ls("test_workspace/test_lakehouse/") + + assert len(result) == 2 + assert result[0]["name"] == "test_workspace/test_lakehouse/file1.txt" + assert result[0]["type"] == "file" + assert result[0]["size"] == 1024 + assert result[1]["name"] == "test_workspace/test_lakehouse/folder1" + assert result[1]["type"] == "directory" + + @pytest.mark.asyncio + async def test_ls_file_not_found(self): + """Test listing when path doesn't exist""" + fs = OneLakeFileSystem(anon=True) + + mock_service_client = AsyncMock() + mock_file_system_client = AsyncMock() + mock_service_client.get_file_system_client.return_value = ( + mock_file_system_client + ) + mock_file_system_client.get_paths.side_effect = ResourceNotFoundError( + "Path not found" + ) + + fs.service_client = mock_service_client + + with pytest.raises(FileNotFoundError): + await fs._ls("workspace/lakehouse/nonexistent") + + @pytest.mark.asyncio + async def test_info_file(self): + """Test getting file information""" + fs = OneLakeFileSystem(anon=True) + + mock_service_client = AsyncMock() + mock_file_system_client = AsyncMock() + mock_file_client = AsyncMock() + + mock_service_client.get_file_system_client.return_value = ( + mock_file_system_client + ) + mock_file_system_client.get_file_client.return_value = mock_file_client + + mock_properties = MagicMock() + mock_properties.size = 2048 + mock_properties.last_modified = "2023-01-01T00:00:00Z" + + mock_file_client.get_file_properties.return_value = mock_properties + fs.service_client = mock_service_client + + result = await fs._info("workspace/lakehouse/file.txt") + + assert result["name"] == "workspace/lakehouse/file.txt" + assert result["type"] == "file" + assert result["size"] == 2048 + + @pytest.mark.asyncio + async def test_info_directory(self): + """Test getting directory information""" + fs = OneLakeFileSystem(anon=True) + + result = await fs._info("workspace/lakehouse") + + assert result["name"] == "workspace/lakehouse" + assert result["type"] == "directory" + assert result["size"] is None + + @pytest.mark.asyncio + async def test_cat_file(self): + """Test reading file content""" + fs = OneLakeFileSystem(anon=True) + + mock_service_client = AsyncMock() + mock_file_system_client = AsyncMock() + mock_file_client = AsyncMock() + mock_download_stream = AsyncMock() + + mock_service_client.get_file_system_client.return_value = ( + mock_file_system_client + ) + mock_file_system_client.get_file_client.return_value = mock_file_client + mock_file_client.download_file.return_value = mock_download_stream + mock_download_stream.readall.return_value = b"file content" + + fs.service_client = mock_service_client + + result = await fs._cat_file("workspace/lakehouse/file.txt") + + assert result == b"file content" + mock_file_client.download_file.assert_called_once_with(offset=None, length=None) + + @pytest.mark.asyncio + async def test_cat_file_with_range(self): + """Test reading file content with byte range""" + fs = OneLakeFileSystem(anon=True) + + mock_service_client = AsyncMock() + mock_file_system_client = AsyncMock() + mock_file_client = AsyncMock() + mock_download_stream = AsyncMock() + + mock_service_client.get_file_system_client.return_value = ( + mock_file_system_client + ) + mock_file_system_client.get_file_client.return_value = mock_file_client + mock_file_client.download_file.return_value = mock_download_stream + mock_download_stream.readall.return_value = b"content" + + fs.service_client = mock_service_client + + result = await fs._cat_file("workspace/lakehouse/file.txt", start=10, end=20) + + assert result == b"content" + mock_file_client.download_file.assert_called_once_with(offset=10, length=10) + + @pytest.mark.asyncio + async def test_cat_file_not_found(self): + """Test reading non-existent file""" + fs = OneLakeFileSystem(anon=True) + + mock_service_client = AsyncMock() + mock_file_system_client = AsyncMock() + mock_file_client = AsyncMock() + + mock_service_client.get_file_system_client.return_value = ( + mock_file_system_client + ) + mock_file_system_client.get_file_client.return_value = mock_file_client + mock_file_client.download_file.side_effect = ResourceNotFoundError( + "File not found" + ) + + fs.service_client = mock_service_client + + with pytest.raises(FileNotFoundError): + await fs._cat_file("workspace/lakehouse/nonexistent.txt") + + @pytest.mark.asyncio + async def test_pipe_file(self): + """Test writing file content""" + fs = OneLakeFileSystem(anon=True) + + mock_service_client = AsyncMock() + mock_file_system_client = AsyncMock() + mock_file_client = AsyncMock() + + mock_service_client.get_file_system_client.return_value = ( + mock_file_system_client + ) + mock_file_system_client.get_file_client.return_value = mock_file_client + + fs.service_client = mock_service_client + + test_data = b"test data" + await fs._pipe_file("workspace/lakehouse/newfile.txt", test_data) + + mock_file_client.create_file.assert_called_once() + mock_file_client.append_data.assert_called_once_with( + test_data, offset=0, length=len(test_data) + ) + mock_file_client.flush_data.assert_called_once_with(len(test_data)) + + @pytest.mark.asyncio + async def test_rm_file(self): + """Test deleting a file""" + fs = OneLakeFileSystem(anon=True) + + mock_service_client = AsyncMock() + mock_file_system_client = AsyncMock() + mock_file_client = AsyncMock() + + mock_service_client.get_file_system_client.return_value = ( + mock_file_system_client + ) + mock_file_system_client.get_file_client.return_value = mock_file_client + + fs.service_client = mock_service_client + + await fs._rm_file("workspace/lakehouse/file.txt") + + mock_file_client.delete_file.assert_called_once() + + @pytest.mark.asyncio + async def test_rm_file_not_found(self): + """Test deleting non-existent file (should not raise)""" + fs = OneLakeFileSystem(anon=True) + + mock_service_client = AsyncMock() + mock_file_system_client = AsyncMock() + mock_file_client = AsyncMock() + + mock_service_client.get_file_system_client.return_value = ( + mock_file_system_client + ) + mock_file_system_client.get_file_client.return_value = mock_file_client + mock_file_client.delete_file.side_effect = ResourceNotFoundError( + "File not found" + ) + + fs.service_client = mock_service_client + + # Should not raise an exception + await fs._rm_file("workspace/lakehouse/nonexistent.txt") + + @pytest.mark.asyncio + async def test_mkdir(self): + """Test creating a directory""" + fs = OneLakeFileSystem(anon=True) + + mock_service_client = AsyncMock() + mock_file_system_client = AsyncMock() + mock_directory_client = AsyncMock() + + mock_service_client.get_file_system_client.return_value = ( + mock_file_system_client + ) + mock_file_system_client.get_directory_client.return_value = ( + mock_directory_client + ) + + fs.service_client = mock_service_client + + await fs._mkdir("workspace/lakehouse/newfolder") + + mock_directory_client.create_directory.assert_called_once() + + def test_mkdir_invalid_path(self): + """Test creating directory with invalid path""" + fs = OneLakeFileSystem(anon=True) + + with pytest.raises(NotImplementedError): + fs.mkdir("workspace") # Can't create workspace + + def test_invalid_path_formats(self): + """Test handling of invalid path formats""" + fs = OneLakeFileSystem(anon=True) + + # Test invalid paths for file operations + with pytest.raises(ValueError): + OneLakeFile(fs, "invalid_path", mode="rb") + + with pytest.raises(ValueError): + OneLakeFile(fs, "workspace", mode="rb") # Missing lakehouse and file + + with pytest.raises(ValueError): + OneLakeFile(fs, "workspace/lakehouse", mode="rb") # Missing file + + +class TestOneLakeFile: + """Test cases for OneLakeFile""" + + def test_init_valid_path(self): + """Test file initialization with valid path""" + fs = OneLakeFileSystem(anon=True) + + file_obj = OneLakeFile(fs, "workspace/lakehouse/file.txt", mode="rb") + + assert file_obj.workspace == "workspace" + assert file_obj.lakehouse == "lakehouse" + assert file_obj.file_path == "file.txt" + assert file_obj.container_path == "workspace/lakehouse" + + def test_init_invalid_path(self): + """Test file initialization with invalid path""" + fs = OneLakeFileSystem(anon=True) + + with pytest.raises(ValueError): + OneLakeFile(fs, "invalid", mode="rb") + + @patch("adlfs.onelake.sync") + def test_fetch_range(self, mock_sync): + """Test fetching byte range from file""" + fs = OneLakeFileSystem(anon=True) + file_obj = OneLakeFile(fs, "workspace/lakehouse/file.txt", mode="rb") + + mock_sync.return_value = b"test data" + + result = file_obj._fetch_range(0, 10) + + assert result == b"test data" + mock_sync.assert_called_once() + + @patch("adlfs.onelake.sync") + def test_upload_chunk(self, mock_sync): + """Test uploading chunk of data""" + fs = OneLakeFileSystem(anon=True) + file_obj = OneLakeFile(fs, "workspace/lakehouse/file.txt", mode="wb") + + # Mock the buffer + file_obj.buffer = MagicMock() + file_obj.buffer.getvalue.return_value = b"test data" + file_obj.offset = 0 + + result = file_obj._upload_chunk(final=True) + + assert result is True + mock_sync.assert_called_once() + + def test_protocols(self): + """Test that OneLake protocols are registered""" + assert "onelake" in OneLakeFileSystem.protocol + assert "abfss" in OneLakeFileSystem.protocol + + +class TestOneLakeURLRouting: + """Test URL routing between AzureBlobFileSystem and OneLakeFileSystem""" + + def test_onelake_url_routing(self): + """Test that OneLake URLs are properly parsed and routed.""" + # OneLake URLs should be handled by OneLakeFileSystem + onelake_url = ( + "abfss://q_dev_workspace@onelake.dfs.fabric.microsoft.com/" + "qdata_dev_lh.Lakehouse/Files/Upload_Test" + ) + + # Test OneLakeFileSystem can handle both protocols + assert "abfss" in OneLakeFileSystem.protocol + assert "onelake" in OneLakeFileSystem.protocol + + # Test URL parsing for OneLake + kwargs = OneLakeFileSystem._get_kwargs_from_urls(onelake_url) + assert kwargs.get("account_name") == "onelake" + assert kwargs.get("workspace_name") == "q_dev_workspace" + assert kwargs.get("lakehouse_name") == "qdata_dev_lh.Lakehouse" + + # Test path stripping for OneLake URLs + stripped = OneLakeFileSystem._strip_protocol(onelake_url) + assert stripped == "q_dev_workspace/qdata_dev_lh.Lakehouse/Files/Upload_Test" + + def test_azure_blob_url_routing(self): + """Test that regular Azure Storage URLs are handled by AzureBlobFileSystem.""" + from adlfs import AzureBlobFileSystem + + # Regular Azure Storage URL + azure_url = "abfss://container@storageaccount.dfs.core.windows.net/path/to/file" + + # Test that AzureBlobFileSystem doesn't process OneLake URLs + onelake_url = ( + "abfss://workspace@onelake.dfs.fabric.microsoft.com/lakehouse/path" + ) + onelake_kwargs = AzureBlobFileSystem._get_kwargs_from_urls(onelake_url) + assert not onelake_kwargs # Should return empty dict for OneLake URLs + + # Test that AzureBlobFileSystem handles regular Azure URLs + azure_kwargs = AzureBlobFileSystem._get_kwargs_from_urls(azure_url) + assert azure_kwargs.get("account_name") == "storageaccount" + + def test_onelake_strip_protocol_variations(self): + """Test OneLake URL stripping with different URL formats.""" + + test_cases = [ + # (input_url, expected_stripped_path) + ( + "abfss://workspace@onelake.dfs.fabric.microsoft.com/lakehouse/Files/test.txt", + "workspace/lakehouse/Files/test.txt", + ), + ( + "onelake://workspace/lakehouse/Files/test.txt", + "workspace/lakehouse/Files/test.txt", + ), + ( + "workspace/lakehouse/Files/test.txt", + "workspace/lakehouse/Files/test.txt", + ), + ] + + for url, expected in test_cases: + result = OneLakeFileSystem._strip_protocol(url) + assert result == expected, f"Failed for URL: {url}" + + def test_onelake_get_kwargs_variations(self): + """Test OneLake URL parameter extraction with different formats.""" + + test_cases = [ + # abfss format with workspace in host part + { + "url": ( + "abfss://q_dev_workspace@onelake.dfs.fabric.microsoft.com/" + "qdata_dev_lh.Lakehouse/Files/Upload_Test" + ), + "expected": { + "account_name": "onelake", + "workspace_name": "q_dev_workspace", + "lakehouse_name": "qdata_dev_lh.Lakehouse", + }, + }, + # onelake format with workspace in path + { + "url": ( + "onelake://onelake.dfs.fabric.microsoft.com/" + "workspace/lakehouse/Files/test.txt" + ), + "expected": { + "account_name": "onelake", + "workspace_name": "workspace", + "lakehouse_name": "lakehouse", + }, + }, + ] + + for test_case in test_cases: + kwargs = OneLakeFileSystem._get_kwargs_from_urls(test_case["url"]) + for key, expected_value in test_case["expected"].items(): + assert ( + kwargs.get(key) == expected_value + ), f"Failed for URL: {test_case['url']}, key: {key}, got: {kwargs.get(key)}, expected: {expected_value}" + + def test_azure_blob_ignores_onelake_domains(self): + """Test that AzureBlobFileSystem ignores OneLake domain URLs.""" + from adlfs import AzureBlobFileSystem + + onelake_urls = [ + "abfss://workspace@onelake.dfs.fabric.microsoft.com/lakehouse/file", + "abfs://workspace@onelake.dfs.fabric.microsoft.com/lakehouse/file", + ] + + for url in onelake_urls: + kwargs = AzureBlobFileSystem._get_kwargs_from_urls(url) + # Should return empty dict (no account_name extracted) + assert kwargs == {}, f"AzureBlobFileSystem should ignore OneLake URL: {url}" + + def test_protocol_overlap_handling(self): + """Test that protocol overlap between filesystems is handled correctly.""" + from adlfs import AzureBlobFileSystem + + # Both filesystems support abfss protocol + assert "abfss" in AzureBlobFileSystem.protocol + assert "abfss" in OneLakeFileSystem.protocol + + # But they should handle different domains + azure_url = "abfss://container@account.dfs.core.windows.net/file" + onelake_url = ( + "abfss://workspace@onelake.dfs.fabric.microsoft.com/lakehouse/file" + ) + + # Azure should handle core.windows.net, ignore fabric.microsoft.com + azure_kwargs = AzureBlobFileSystem._get_kwargs_from_urls(azure_url) + onelake_kwargs_from_azure = AzureBlobFileSystem._get_kwargs_from_urls( + onelake_url + ) + + assert azure_kwargs.get("account_name") == "account" + assert onelake_kwargs_from_azure == {} # Should be empty + + # OneLake should handle fabric.microsoft.com URLs + onelake_kwargs = OneLakeFileSystem._get_kwargs_from_urls(onelake_url) + assert onelake_kwargs.get("account_name") == "onelake" + assert onelake_kwargs.get("workspace_name") == "workspace" + + +class TestOneLakeIntegration: + """Integration tests for OneLake functionality""" + + def test_fsspec_integration(self): + """Test that OneLake can be used with fsspec.open""" + import fsspec + + # Register the protocol + fsspec.register_implementation("onelake", OneLakeFileSystem) + + # Test that the protocol is registered + assert "onelake" in fsspec.available_protocols() + + # Test URL parsing + with mock.patch("adlfs.onelake.OneLakeFileSystem.do_connect"): + fs = fsspec.filesystem("onelake", anon=True) + assert isinstance(fs, OneLakeFileSystem) + + def test_sync_methods(self): + """Test that sync wrapper methods work""" + fs = OneLakeFileSystem(anon=True) + + # These should be callable (though they might raise without proper mocking) + assert hasattr(fs, "ls") + assert hasattr(fs, "info") + assert hasattr(fs, "cat_file") + assert hasattr(fs, "pipe_file") + assert hasattr(fs, "rm_file") + assert hasattr(fs, "mkdir") + assert callable(fs.ls) + assert callable(fs.info) + assert callable(fs.cat_file) + assert callable(fs.pipe_file) + assert callable(fs.rm_file) + assert callable(fs.mkdir) diff --git a/adlfs/tests/test_uri_format.py b/adlfs/tests/test_uri_format.py index 5f42ef03..3f20fced 100644 --- a/adlfs/tests/test_uri_format.py +++ b/adlfs/tests/test_uri_format.py @@ -54,3 +54,59 @@ def test_account_name_from_url(): "abfs://test@some_account_name.dfs.core.windows.net/some_file" ) assert kwargs["account_name"] == "some_account_name" + + +def test_azure_storage_url_routing(): + """Test that AzureBlobFileSystem correctly handles Azure Storage URLs""" + + # Test various Azure Storage URL formats + azure_urls_and_expected = [ + ("abfss://container@account.dfs.core.windows.net/file", "account"), + ("abfs://container@account.dfs.core.windows.net/file", "account"), + ("abfss://container@account.blob.core.windows.net/file", "account"), + ("az://container@account.blob.core.windows.net/file", "account"), + ] + + for url, expected_account in azure_urls_and_expected: + kwargs = AzureBlobFileSystem._get_kwargs_from_urls(url) + assert kwargs.get("account_name") == expected_account, f"Failed for URL: {url}" + + +def test_onelake_url_ignored_by_azure_blob_fs(): + """Test that AzureBlobFileSystem ignores OneLake URLs""" + + # OneLake URLs should be ignored by AzureBlobFileSystem + onelake_urls = [ + "abfss://workspace@onelake.dfs.fabric.microsoft.com/lakehouse/file", + "abfs://workspace@onelake.dfs.fabric.microsoft.com/lakehouse/file", + ] + + for url in onelake_urls: + kwargs = AzureBlobFileSystem._get_kwargs_from_urls(url) + # Should return empty dict (no account_name extracted) + assert kwargs == {}, f"AzureBlobFileSystem should ignore OneLake URL: {url}" + + +def test_azure_vs_onelake_domain_routing(): + """Test that domain-based routing works correctly""" + + # Azure Storage domains should be handled by AzureBlobFileSystem + azure_domains = [ + "abfss://container@account.dfs.core.windows.net/file", + "abfss://container@account.blob.core.windows.net/file", + ] + + for url in azure_domains: + kwargs = AzureBlobFileSystem._get_kwargs_from_urls(url) + assert ( + kwargs.get("account_name") == "account" + ), f"Azure domain not handled correctly: {url}" + + # OneLake domains should be ignored by AzureBlobFileSystem + onelake_domains = [ + "abfss://workspace@onelake.dfs.fabric.microsoft.com/lakehouse/file", + ] + + for url in onelake_domains: + kwargs = AzureBlobFileSystem._get_kwargs_from_urls(url) + assert kwargs == {}, f"OneLake domain should be ignored: {url}" diff --git a/docs/api.md b/docs/api.md index 9332a4ab..0ea586c6 100644 --- a/docs/api.md +++ b/docs/api.md @@ -1,6 +1,8 @@ # API Reference -`adlfs.AzureBlobFileSystem` provides an interface for Azure Blob Storage. +## Azure Blob Storage / Data Lake Storage Gen2 + +`adlfs.AzureBlobFileSystem` provides an interface for Azure Blob Storage and Azure Data Lake Storage Gen2. ```{eval-rst} .. autoclass:: adlfs.AzureBlobFileSystem @@ -12,3 +14,27 @@ :members: ``` +## Microsoft OneLake + +`adlfs.OneLakeFileSystem` provides an interface for Microsoft OneLake (part of Microsoft Fabric). + +```{eval-rst} +.. autoclass:: adlfs.OneLakeFileSystem + :show-inheritance: + :members: + +.. autoclass:: adlfs.OneLakeFile + :show-inheritance: + :members: +``` + +## Azure Data Lake Storage Gen1 (Legacy) + +`adlfs.AzureDatalakeFileSystem` provides an interface for Azure Data Lake Storage Gen1 (being retired). + +```{eval-rst} +.. autoclass:: adlfs.AzureDatalakeFileSystem + :show-inheritance: + :members: +``` + diff --git a/docs/index.md b/docs/index.md index 0013d04f..737d46ed 100644 --- a/docs/index.md +++ b/docs/index.md @@ -1,6 +1,6 @@ # adlfs -`adlfs` provides an [`fsspec`][fsspec]-compatible interface to [Azure Blob storage], [Azure Data Lake Storage Gen2], and [Azure Data Lake Storage Gen1]. +`adlfs` provides an [`fsspec`][fsspec]-compatible interface to [Azure Blob storage], [Azure Data Lake Storage Gen2], [Azure Data Lake Storage Gen1], and [Microsoft OneLake]. ## Installation @@ -14,15 +14,48 @@ or conda from the conda-forge channel conda install -c conda-forge adlfs +## Microsoft Storage Ecosystem Overview + +Microsoft provides several data storage solutions for different use cases: + +### Azure Data Lake Storage Gen1 (ADLS Gen1) +- **Status**: Legacy, being retired +- **Type**: Hierarchical file system, POSIX compliant +- **Endpoint**: `https://.azuredatalakestore.net` +- **Use Case**: Legacy big data workloads + +### Azure Data Lake Storage Gen2 (ADLS Gen2) +- **Status**: Current recommended solution +- **Type**: Based on Blob storage with hierarchical namespace +- **Endpoints**: + - Blob Service: `https://.blob.core.windows.net` + - Data Lake Service: `https://.dfs.core.windows.net` +- **Use Case**: Modern data lake and analytics workloads + +### Microsoft OneLake +- **Status**: Newest, part of Microsoft Fabric platform +- **Type**: Unified data lake with Delta Lake format, ACID transactions +- **Endpoint**: `https://onelake.dfs.fabric.microsoft.com` +- **Use Case**: Microsoft Fabric analytics platform + +### OneDrive/SharePoint +- **Note**: For OneDrive, Teams files, and SharePoint document libraries, use [`msgraphfs`](https://github.com/acsone/msgraphfs) instead of `adlfs` + ## `fsspec` protocols `adlfs` registers the following protocols with `fsspec`. -protocol | filesystem --------- | ---------- -`abfs` | `adlfs.AzureBlobFileSystem` -`az` | `adlfs.AzureBlobFileSystem` -`adl` | `adlfs.AzureDatalakeFileSystem` +protocol | filesystem | storage type +-------- | ---------- | ------------ +`abfs` | `adlfs.AzureBlobFileSystem` | Azure Blob Storage / ADLS Gen2 +`abfss` | `adlfs.AzureBlobFileSystem` or `adlfs.OneLakeFileSystem`* | Azure Blob Storage / ADLS Gen2 / OneLake +`az` | `adlfs.AzureBlobFileSystem` | Azure Blob Storage / ADLS Gen2 +`adl` | `adlfs.AzureDatalakeFileSystem` | Azure Data Lake Storage Gen1 +`onelake`| `adlfs.OneLakeFileSystem` | Microsoft OneLake + +*`abfss` URLs are automatically routed to the correct filesystem based on the domain: +- `*.dfs.core.windows.net` → `AzureBlobFileSystem` +- `onelake.dfs.fabric.microsoft.com` → `OneLakeFileSystem` ## Authentication @@ -60,6 +93,29 @@ Additionally, some methods will include the account URL and authentication crede >>> fs = adlfs.AzureBlobFileSystem(connection_string=CONNECTION_STRING) ``` +### OneLake Authentication + +OneLake requires Azure Active Directory authentication. You can authenticate using: + +```{code-block} python +>>> from adlfs import OneLakeFileSystem +>>> fs = OneLakeFileSystem( +... tenant_id="your-tenant-id", +... client_id="your-client-id", +... client_secret="your-client-secret" +... ) +``` + +Or using environment variables: + +```{code-block} python +>>> import os +>>> os.environ["AZURE_TENANT_ID"] = "your-tenant-id" +>>> os.environ["AZURE_CLIENT_ID"] = "your-client-id" +>>> os.environ["AZURE_CLIENT_SECRET"] = "your-client-secret" +>>> fs = OneLakeFileSystem() +``` + ## Usage See the [fsspec documentation] on usage. @@ -76,6 +132,28 @@ to list all the files or directories in the top-level of a storage container, yo ['gbif/occurrence'] ``` +### OneLake Usage + +OneLake paths follow the structure `workspace/lakehouse/path/to/file`. You can use both `onelake://` and `abfss://` protocols: + +```{code-block} python +>>> from adlfs import OneLakeFileSystem +>>> fs = OneLakeFileSystem(tenant_id="...", client_id="...", client_secret="...") + +# List contents of a lakehouse +>>> fs.ls("my_workspace/my_lakehouse") +['my_workspace/my_lakehouse/Files', 'my_workspace/my_lakehouse/Tables'] + +# Using with fsspec +>>> import fsspec +>>> with fsspec.open("onelake://my_workspace/my_lakehouse/Files/data.parquet") as f: +... data = f.read() + +# Using abfss protocol (automatically routes to OneLake) +>>> with fsspec.open("abfss://my_workspace@onelake.dfs.fabric.microsoft.com/my_lakehouse/Files/data.parquet") as f: +... data = f.read() +``` + ```{toctree} :maxdepth: 2 @@ -94,5 +172,6 @@ api.md [Azure Blob storage]: https://docs.microsoft.com/en-us/azure/storage/blobs/storage-blobs-introduction [Azure Data Lake Storage Gen2]: https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-introduction [Azure Data Lake Storage Gen1]: https://docs.microsoft.com/en-us/azure/data-lake-store/ +[Microsoft OneLake]: https://docs.microsoft.com/en-us/fabric/onelake/ [`azure.storage.blob`]: https://docs.microsoft.com/en-us/azure/storage/blobs/storage-quickstart-blobs-python [fsspec documentation]: https://filesystem-spec.readthedocs.io/en/latest/usage.html diff --git a/pyproject.toml b/pyproject.toml index efe3b32a..4c60f5df 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,6 +29,7 @@ dependencies = [ "azure-datalake-store>=0.0.53,<0.1", "azure-identity", "azure-storage-blob>=12.17.0", + "azure-storage-file-datalake>=12.17.0", "fsspec>=2023.12.0", "aiohttp>=3.7.0", ] @@ -44,6 +45,7 @@ tests = ["pytest", "docker", "pytest-mock", "arrow", "dask[dataframe]"] [project.entry-points."fsspec.specs"] abfss = "adlfs:AzureBlobFileSystem" +onelake = "adlfs:OneLakeFileSystem" [tool.setuptools.packages.find] include = ["adlfs*"] diff --git a/requirements/latest.txt b/requirements/latest.txt index 0ef2232b..c4b7f4d1 100644 --- a/requirements/latest.txt +++ b/requirements/latest.txt @@ -2,4 +2,5 @@ fsspec azure-core<2.0.0 azure-datalake-store -azure-storage-blob \ No newline at end of file +azure-storage-blob +azure-storage-file-datalake \ No newline at end of file