Skip to content

Commit

Permalink
Do not reproduce reingestion workflow documentation (#4072)
Browse files Browse the repository at this point in the history
* Do not produce documentation if it matches workflow documentation that was already created.

Generate links accordingly. Update test accordingly.

* Run linter.

* Add logic to map shared DAGs onto the same documentation

* Sort documents in type section by mapped DAG ID as well

---------

Co-authored-by: Matt Fergoda <mattfergoda@users.noreply.github.com>
Co-authored-by: Madison Swain-Bowden <bowdenm@spu.edu>
  • Loading branch information
3 people authored Apr 19, 2024
1 parent f31b84d commit 9ac2586
Show file tree
Hide file tree
Showing 4 changed files with 236 additions and 573 deletions.
55 changes: 49 additions & 6 deletions catalog/tests/utilities/dag_doc_gen/test_dag_doc_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ class DagMock(NamedTuple):
_MODULE = "utilities.dag_doc_gen.dag_doc_generation"


@pytest.mark.parametrize(
"dag_id, expected_mapped_dag_id",
[
("sample_dag_123", "sample_dag_123"),
("sample_dag_audio_123", "sample_dag_{media_type}_123"),
("staging_database_restore", "staging_database_restore"),
("wikimedia_reingestion_workflow", "wikimedia_commons_workflow"),
],
)
@pytest.mark.parametrize("schedule", ["@daily", None])
@pytest.mark.parametrize(
"doc, expected_doc",
Expand Down Expand Up @@ -75,20 +84,30 @@ class DagMock(NamedTuple):
],
)
def test_get_dags_info(
schedule, doc, expected_doc, tags, type_, provider_workflow, catchup, expected_dated
dag_id,
expected_mapped_dag_id,
schedule,
doc,
expected_doc,
tags,
type_,
provider_workflow,
catchup,
expected_dated,
):
dag = DagMock(schedule_interval=schedule, doc_md=doc, catchup=catchup, tags=tags)
expected = DagInfo(
dag_id=DAG_ID,
dag_id=dag_id,
schedule=schedule,
doc=expected_doc,
dated=expected_dated,
type_=type_,
provider_workflow=provider_workflow,
mapped_dag_id=expected_mapped_dag_id,
)
with mock.patch(f"{_MODULE}.get_provider_workflows") as provider_workflow_mock:
provider_workflow_mock.return_value.get.return_value = provider_workflow
actual = dag_doc_generation.get_dags_info({DAG_ID: dag})[0]
actual = dag_doc_generation.get_dags_info({dag_id: dag})[0]
assert actual == expected


Expand All @@ -104,6 +123,7 @@ def test_get_dags_info(
type_="",
dated=False,
provider_workflow=None,
mapped_dag_id=DAG_ID,
),
False,
"""
Expand All @@ -123,6 +143,7 @@ def test_get_dags_info(
type_="",
dated=False,
provider_workflow=None,
mapped_dag_id=DAG_ID,
),
False,
"""
Expand All @@ -142,6 +163,7 @@ def test_get_dags_info(
type_="",
dated=False,
provider_workflow=PROVIDER_WORKFLOW_INSTANCE,
mapped_dag_id=DAG_ID,
),
True,
"""
Expand All @@ -150,13 +172,34 @@ def test_get_dags_info(
| DAG ID | Schedule Interval | Dated | Media Type(s) |
| --- | --- | --- | --- |
| [`sample_dag_123`](#sample_dag_123) | `@daily` | `False` | m1, m2 |
""",
),
# Separate mapped DAG ID
(
DagInfo(
dag_id=DAG_ID,
schedule="@daily",
doc="A doc does exist here",
type_="",
dated=False,
provider_workflow=None,
mapped_dag_id="something_entirely_different",
),
False,
"""
### Special Name
| DAG ID | Schedule Interval |
| --- | --- |
| [`sample_dag_123`](#something_entirely_different) | `@daily` |
""",
),
],
)
def test_generate_type_subsection(dag_info, is_provider, expected):
dag_by_doc_md = {dag_info.doc: dag_info.mapped_dag_id}
actual = dag_doc_generation.generate_type_subsection(
"Special Name", [dag_info], is_provider
"Special Name", [dag_info], is_provider, dag_by_doc_md
)
assert actual.strip() == expected.strip()

Expand Down Expand Up @@ -188,8 +231,8 @@ def test_generate_dag_doc():
with mock.patch(f"{_MODULE}.get_dags_info") as get_dags_info_mock:
# Return in reverse order to ensure they show up in the correct order
get_dags_info_mock.return_value = [
DagInfo("b", None, "this one has a doc", "t1", False, None),
DagInfo("a", None, None, "t1", False, None),
DagInfo("b", None, "this one has a doc", "t1", False, None, "b"),
DagInfo("a", None, None, "t1", False, None, "a"),
]
actual = dag_doc_generation.generate_dag_doc()
assert actual == expected
71 changes: 61 additions & 10 deletions catalog/utilities/dag_doc_gen/dag_doc_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,23 @@
The following is documentation associated with each DAG (where available):
"""
# Mapping of terms in DAG IDs to collapse into a single term when displaying
# the single-documentation section.
DAG_ID_TERMS_TO_COLLAPSE = {
"image": "{media_type}",
"audio": "{media_type}",
"production": "{environment}",
"staging": "{environment}",
}
# DAG IDs to ignore when collapsing reference documentation for a mapped term
DAG_IDS_TO_IGNORE_COLLAPSE = {
"recreate_full_staging_index",
"staging_database_restore",
"create_proportional_by_source_staging_index",
}
REINGESTION_SPECIFIC_MAPPING = {
"wikimedia_reingestion_workflow": "wikimedia_commons_workflow"
}

# Typing
DagMapping = dict[str, DAG]
Expand All @@ -69,6 +86,7 @@ class DagInfo(NamedTuple):
type_: str
dated: bool
provider_workflow: ProviderWorkflow | None
mapped_dag_id: str


def load_dags(dag_folder: str) -> DagMapping:
Expand Down Expand Up @@ -112,6 +130,26 @@ def fix_headings(doc: str) -> str:
return doc


def determine_mapped_dag_id(dag_id: str) -> str:
"""
Determine the mapped DAG ID for the provided DAG ID.
This is used to collapse multiple references to the same DAG documentation into
a single definition.
"""
if dag_id in DAG_IDS_TO_IGNORE_COLLAPSE:
return dag_id
if dag_id in REINGESTION_SPECIFIC_MAPPING:
return REINGESTION_SPECIFIC_MAPPING[dag_id]
if "_reingestion" in dag_id:
return dag_id.replace("_reingestion", "")
parts = dag_id.split("_")
for idx, part in enumerate(parts):
if part in DAG_ID_TERMS_TO_COLLAPSE:
parts[idx] = DAG_ID_TERMS_TO_COLLAPSE[part]
return "_".join(parts)


def get_dags_info(dags: DagMapping) -> list[DagInfo]:
"""
Convert the provided DAG ID -> DAG mapping into a list of DagInfo instances.
Expand Down Expand Up @@ -139,14 +177,18 @@ def get_dags_info(dags: DagMapping) -> list[DagInfo]:
type_=type_,
dated=dated,
provider_workflow=provider_workflow,
mapped_dag_id=determine_mapped_dag_id(dag_id),
)
)

return dags_info


def generate_type_subsection(
name: str, dags_info: list[DagInfo], is_provider: bool
name: str,
dags_info: list[DagInfo],
is_provider: bool,
dag_by_doc_md: dict[str, str],
) -> str:
"""Generate the documentation for a "DAGs by type" subsection."""
log.info(f"Building subsection for '{name}'")
Expand All @@ -166,12 +208,12 @@ def generate_type_subsection(
text += header + "\n"
text += "| " + " | ".join(["---"] * column_count) + " |"

for dag in dags_info:
for dag in sorted(dags_info, key=lambda d: d.mapped_dag_id):
dag_id = f"`{dag.dag_id}`"
# If we have documentation for the DAG, we'll want to link to it within the
# markdown, so we reference it using the heading text (the DAG ID)
if dag.doc:
dag_id = f"[{dag_id}](#{dag.dag_id})"
dag_id = f"[{dag_id}](#{dag_by_doc_md[dag.doc]})"
text += f"\n| {dag_id} | `{dag.schedule}` |"
if is_provider:
text += f" `{dag.dated}` | {', '.join(dag.provider_workflow.media_types)} |"
Expand All @@ -184,7 +226,7 @@ def generate_type_subsection(
def generate_single_documentation(dag: DagInfo) -> str:
"""Generate the documentation for a single DAG."""
return f"""
### `{dag.dag_id}`
### `{dag.mapped_dag_id}`
{dag.doc}
Expand All @@ -210,7 +252,14 @@ def generate_dag_doc(dag_folder: Path = DAG_FOLDER) -> str:
for dag in dags_info:
dags_by_type[dag.type_].append(dag)

dag_by_doc_md: dict[str, str] = {}
for type_, dags in sorted(dags_by_type.items()):
for dag in dags:
if dag.doc not in dag_by_doc_md:
# Sphinx removes the brackets from heading references
dag_by_doc_md[dag.doc] = dag.mapped_dag_id.replace("{", "").replace(
"}", ""
)
# Create a more human-readable name
name = type_.replace("_", " ").replace("-", " ").title()
# Special case for provider tables since they have extra information
Expand All @@ -219,23 +268,25 @@ def generate_dag_doc(dag_folder: Path = DAG_FOLDER) -> str:
# sub-list as part of a table of contents, but defer adding the sub-lists until
# all are generated.
text += f" 1. [{name}](#{type_.replace('_', '-')})\n"
dag_types.append(generate_type_subsection(name, dags, is_provider))
dag_types.append(
generate_type_subsection(name, dags, is_provider, dag_by_doc_md)
)

text += "\n" + "\n\n".join(dag_types)

text += MIDAMBLE
dag_docs = []
for dag in sorted(dags_info, key=lambda d: d.dag_id):
dag_docs = set()
for dag in sorted(dags_info, key=lambda d: d.mapped_dag_id):
# This section only contains subsections for DAGs where we have documentation
if not dag.doc:
continue
# Similar to the DAGs-by-type section, we add the reference to a table of
# contents first, and then defer adding all the generated individual docs until
# the very end.
text += f" 1. [`{dag.dag_id}`](#{dag.dag_id})\n"
dag_docs.append(generate_single_documentation(dag))
text += f" 1. [`{dag.dag_id}`](#{dag_by_doc_md[dag.doc]})\n"
dag_docs.add(generate_single_documentation(dag))

text += "\n" + "".join(dag_docs)
text += "\n" + "".join(sorted(dag_docs))

# Normalize the newlines at the end of the file and add one more to make sure
# our pre-commit checks are happy!
Expand Down
Loading

0 comments on commit 9ac2586

Please sign in to comment.