diff --git a/src/usethis/_integrations/ci/bitbucket/errors.py b/src/usethis/_integrations/ci/bitbucket/errors.py index 1c26abb2..4a463421 100644 --- a/src/usethis/_integrations/ci/bitbucket/errors.py +++ b/src/usethis/_integrations/ci/bitbucket/errors.py @@ -5,3 +5,7 @@ class UnexpectedImportPipelineError(UsethisError): """Raised when an import pipeline is unexpectedly encountered.""" + + +class MissingStepError(ValueError): + """Raised when a step cannot be found in the pipeline.""" diff --git a/src/usethis/_integrations/ci/bitbucket/pipeweld.py b/src/usethis/_integrations/ci/bitbucket/pipeweld.py index ec5f9e6b..f681f04d 100644 --- a/src/usethis/_integrations/ci/bitbucket/pipeweld.py +++ b/src/usethis/_integrations/ci/bitbucket/pipeweld.py @@ -8,33 +8,33 @@ import usethis._pipeweld.containers from usethis._integrations.ci.bitbucket.dump import bitbucket_fancy_dump -from usethis._integrations.ci.bitbucket.errors import UnexpectedImportPipelineError +from usethis._integrations.ci.bitbucket.errors import ( + MissingStepError, + UnexpectedImportPipelineError, +) from usethis._integrations.ci.bitbucket.io_ import ( edit_bitbucket_pipelines_yaml, ) from usethis._integrations.ci.bitbucket.schema import ( ImportPipeline, Items, + Parallel, ParallelExpanded, ParallelItem, ParallelSteps, Pipeline, Pipelines, StageItem, + Step, StepItem, ) from usethis._integrations.ci.bitbucket.schema_utils import step1tostep from usethis._integrations.file.yaml.update import update_ruamel_yaml_map -from usethis._pipeweld.ops import Instruction +from usethis._pipeweld.ops import InsertParallel, InsertSuccessor, Instruction if TYPE_CHECKING: - from usethis._integrations.ci.bitbucket.io_ import ( - BitbucketPipelinesYAMLDocument, - ) - from usethis._integrations.ci.bitbucket.schema import ( - PipelinesConfiguration, - Step, - ) + from usethis._integrations.ci.bitbucket.io_ import BitbucketPipelinesYAMLDocument + from usethis._integrations.ci.bitbucket.schema import PipelinesConfiguration def get_pipeweld_step(step: Step) -> str: @@ -116,9 +116,13 @@ def _(item: StageItem): ) -def apply_pipeweld_instruction(instruction: Instruction, *, new_step: Step) -> None: +def apply_pipeweld_instruction( + instruction: Instruction, *, step_to_insert: Step +) -> None: with edit_bitbucket_pipelines_yaml() as doc: - apply_pipeweld_instruction_via_doc(instruction, doc=doc, new_step=new_step) + apply_pipeweld_instruction_via_doc( + instruction, doc=doc, step_to_insert=step_to_insert + ) dump = bitbucket_fancy_dump(doc.model, reference=doc.content) update_ruamel_yaml_map(doc.content, dump, preserve_comments=True) @@ -126,13 +130,9 @@ def apply_pipeweld_instruction(instruction: Instruction, *, new_step: Step) -> N def apply_pipeweld_instruction_via_doc( instruction: Instruction, *, - new_step: Step, + step_to_insert: Step, doc: BitbucketPipelinesYAMLDocument, ) -> None: - if get_pipeweld_step(new_step) != instruction.step: - # N.B. This doesn't currently handle moving existing steps - return - if doc.model.pipelines is None: doc.model.pipelines = Pipelines() @@ -143,27 +143,239 @@ def apply_pipeweld_instruction_via_doc( items = [] elif isinstance(default.root, ImportPipeline): msg = ( - f"Cannot add step '{new_step.name}' to default pipeline in " + f"Cannot add step '{step_to_insert.name}' to default pipeline in " f"'bitbucket-pipelines.yml' because it is an import pipeline." ) raise UnexpectedImportPipelineError(msg) else: items = default.root.root + # Check if this instruction is for a new step or an existing step + # When pipeweld determines we need to add a step (e.g., adding "lint" step to a pipeline), + # it generates instructions for how to insert it. If the pipeline already has steps in + # parallel that need to be rearranged to satisfy dependencies, pipeweld also generates + # instructions to move those existing steps. So: + # - New step: inserting the actual new step being added (e.g., "lint") + # - Existing step: rearranging an existing step (e.g., moving "build" from a parallel + # block so "lint" can be inserted between "build" and "test") + try: + # Try to extract an existing step with this name + step = _extract_step_from_items(items, step_name=instruction.step) + except MissingStepError: + # Step not found in pipeline, so this must be the new step being added + step = step_to_insert + + _apply_instruction_to_items( + instruction=instruction, items=items, step_to_insert=step + ) + + if default is None and items: + pipelines.default = Pipeline(Items(items)) + + +def _apply_instruction_to_items( + *, + instruction: Instruction, + items: list[StepItem | ParallelItem | StageItem], + step_to_insert: Step, +) -> None: + """Apply an instruction to insert a step into the items list. + + Args: + instruction: The instruction specifying how to insert the step. + items: The list of pipeline items to modify. + step_to_insert: The step to insert. + """ if instruction.after is None: - items.insert(0, StepItem(step=new_step)) - else: + # Insert at the beginning - always as a simple step + items.insert(0, StepItem(step=step_to_insert)) + elif isinstance(instruction, InsertSuccessor): + # Insert in series after the specified step for item in items: if _is_insertion_necessary(item, instruction=instruction): - # N.B. This doesn't currently handle InsertParallel properly items.insert( items.index(item) + 1, - StepItem(step=new_step), + StepItem(step=step_to_insert), + ) + break + elif isinstance(instruction, InsertParallel): + # Insert in parallel with the specified step + for idx, item in enumerate(items): + if _is_insertion_necessary(item, instruction=instruction): + _insert_parallel_step( + item, items=items, idx=idx, step_to_insert=step_to_insert ) break + else: + assert_never(instruction) - if default is None and items: - pipelines.default = Pipeline(Items(items)) + +def _extract_step_from_items( + items: list[StepItem | ParallelItem | StageItem], *, step_name: str +) -> Step: + """Find and remove a step from the items list. + + This function searches for a step with the given name, removes it from the + items list, and returns the step. If the step is found in a parallel block + with other steps, only that step is removed from the parallel block. + + Raises: + MissingStepError: If the step cannot be found in the pipeline. + """ + for idx, item in enumerate(items): + extracted = _extract_step_from_item( + item, step_name=step_name, items=items, idx=idx + ) + if extracted is not None: + return extracted + msg = f"Step '{step_name}' not found in pipeline" + raise MissingStepError(msg) + + +@singledispatch +def _extract_step_from_item( + item: StepItem | ParallelItem | StageItem, + *, + step_name: str, + items: list[StepItem | ParallelItem | StageItem], + idx: int, +) -> Step | None: + """Extract a step from an item, potentially modifying the items list.""" + raise NotImplementedError + + +@_extract_step_from_item.register +def _( + item: StepItem, + *, + step_name: str, + items: list[StepItem | ParallelItem | StageItem], + idx: int, +) -> Step | None: + if get_pipeweld_step(item.step) == step_name: + # Remove this item from the list + items.pop(idx) + return item.step + return None + + +@_extract_step_from_item.register +def _( + item: ParallelItem, + *, + step_name: str, + items: list[StepItem | ParallelItem | StageItem], + idx: int, +) -> Step | None: + if item.parallel is not None: + if isinstance(item.parallel.root, ParallelSteps): + step_items = item.parallel.root.root + elif isinstance(item.parallel.root, ParallelExpanded): + step_items = item.parallel.root.steps.root + else: + assert_never(item.parallel.root) + + for step_idx, step_item in enumerate(step_items): + if get_pipeweld_step(step_item.step) == step_name: + # Found it - remove from the parallel block + extracted_step = step_item.step + step_items.pop(step_idx) + + # If only one step remains in the parallel, convert to a simple step + if len(step_items) == 1: + items[idx] = step_items[0] + elif len(step_items) == 0: + # No steps left, remove the parallel item + items.pop(idx) + + return extracted_step + return None + + +@_extract_step_from_item.register +def _( + # https://github.com/astral-sh/ruff/issues/18654 + item: StageItem, # noqa: ARG001 + *, + step_name: str, # noqa: ARG001 + items: list[StepItem | ParallelItem | StageItem], # noqa: ARG001 + idx: int, # noqa: ARG001 +) -> Step | None: + # We don't extract steps from within stages as they represent deployment + # stages and their internal structure should be preserved. + return None + + +@singledispatch +def _insert_parallel_step( + item: StepItem | ParallelItem | StageItem, + *, + items: list[StepItem | ParallelItem | StageItem], + idx: int, + step_to_insert: Step, +) -> None: + """Insert a step in parallel with an existing item. + + This function handles the logic of converting a single step to a parallel block + or adding to an existing parallel block. + """ + raise NotImplementedError + + +@_insert_parallel_step.register +def _( + item: StepItem, + *, + items: list[StepItem | ParallelItem | StageItem], + idx: int, + step_to_insert: Step, +) -> None: + # Replace the single step with a parallel block containing both steps + parallel_item = ParallelItem( + parallel=Parallel( + ParallelSteps( + [ + StepItem(step=item.step), + StepItem(step=step_to_insert), + ] + ) + ) + ) + items[idx] = parallel_item + + +@_insert_parallel_step.register +def _( + item: ParallelItem, + *, + # https://github.com/astral-sh/ruff/issues/18654 + items: list[StepItem | ParallelItem | StageItem], # noqa: ARG001 + idx: int, # noqa: ARG001 + step_to_insert: Step, +) -> None: + if item.parallel is not None: + if isinstance(item.parallel.root, ParallelSteps): + # Add to the existing list of parallel steps + item.parallel.root.root.append(StepItem(step=step_to_insert)) + elif isinstance(item.parallel.root, ParallelExpanded): + # Add to the expanded parallel steps + item.parallel.root.steps.root.append(StepItem(step=step_to_insert)) + else: + assert_never(item.parallel.root) + + +@_insert_parallel_step.register +def _( + item: StageItem, + *, + items: list[StepItem | ParallelItem | StageItem], + idx: int, + step_to_insert: Step, +) -> None: + # StageItems are trickier since they aren't supported in ParallelSteps. But we + # never need to add them in practice anyway. The only reason this is really here + # is for type safety. + raise NotImplementedError @singledispatch diff --git a/src/usethis/_integrations/ci/bitbucket/steps.py b/src/usethis/_integrations/ci/bitbucket/steps.py index 8b0396f3..878717ec 100644 --- a/src/usethis/_integrations/ci/bitbucket/steps.py +++ b/src/usethis/_integrations/ci/bitbucket/steps.py @@ -195,7 +195,7 @@ def _add_step_in_default_via_doc( ).add() for instruction in weld_result.instructions: apply_pipeweld_instruction_via_doc( - instruction=instruction, new_step=step, doc=doc + instruction=instruction, step_to_insert=step, doc=doc ) diff --git a/tests/usethis/_integrations/ci/bitbucket/test_pipeweld.py b/tests/usethis/_integrations/ci/bitbucket/test_pipeweld.py index 6651b417..97095cc2 100644 --- a/tests/usethis/_integrations/ci/bitbucket/test_pipeweld.py +++ b/tests/usethis/_integrations/ci/bitbucket/test_pipeweld.py @@ -41,7 +41,7 @@ def test_add_to_brand_new_pipeline(self, tmp_path: Path): with change_cwd(tmp_path): apply_pipeweld_instruction( InsertSuccessor(step="foo", after=None), - new_step=Step(name="foo", script=Script(["echo foo"])), + step_to_insert=Step(name="foo", script=Script(["echo foo"])), ) # Assert @@ -74,7 +74,7 @@ def test_import_pipeline_fails(self, tmp_path: Path): with change_cwd(tmp_path), pytest.raises(UnexpectedImportPipelineError): apply_pipeweld_instruction( InsertSuccessor(step="foo", after=None), - new_step=Step(name="foo", script=Script(["echo foo"])), + step_to_insert=Step(name="foo", script=Script(["echo foo"])), ) def test_existing_pipeline(self, tmp_path: Path): @@ -95,7 +95,7 @@ def test_existing_pipeline(self, tmp_path: Path): with change_cwd(tmp_path): apply_pipeweld_instruction( InsertSuccessor(step="foo", after=None), - new_step=Step(name="foo", script=Script(["echo foo"])), + step_to_insert=Step(name="foo", script=Script(["echo foo"])), ) # Assert @@ -136,7 +136,7 @@ def test_step_item(self, tmp_path: Path): with change_cwd(tmp_path): apply_pipeweld_instruction( InsertSuccessor(step="foo", after="bar"), - new_step=Step(name="foo", script=Script(["echo foo"])), + step_to_insert=Step(name="foo", script=Script(["echo foo"])), ) # Assert @@ -181,7 +181,7 @@ def test_parallel_item(self, tmp_path: Path): with change_cwd(tmp_path): apply_pipeweld_instruction( InsertSuccessor(step="foo", after="qux"), - new_step=Step(name="foo", script=Script(["echo foo"])), + step_to_insert=Step(name="foo", script=Script(["echo foo"])), ) # Assert @@ -232,7 +232,7 @@ def test_parallel_expanded(self, tmp_path: Path): with change_cwd(tmp_path): apply_pipeweld_instruction( InsertSuccessor(step="foo", after="qux"), - new_step=Step(name="foo", script=Script(["echo foo"])), + step_to_insert=Step(name="foo", script=Script(["echo foo"])), ) # Assert @@ -280,7 +280,7 @@ def test_stage_item(self, tmp_path: Path): with change_cwd(tmp_path): apply_pipeweld_instruction( InsertSuccessor(step="foo", after="baz"), - new_step=Step(name="foo", script=Script(["echo foo"])), + step_to_insert=Step(name="foo", script=Script(["echo foo"])), ) # Assert @@ -304,6 +304,295 @@ def test_stage_item(self, tmp_path: Path): """ ) + class TestInsertParallel: + def test_simple_step(self, tmp_path: Path): + # Arrange: Pipeline with a single step + (tmp_path / "bitbucket-pipelines.yml").write_text( + """\ +image: atlassian/default-image:3 +pipelines: + default: + - step: + name: bar + script: + - echo bar +""" + ) + + # Act: Insert foo in parallel to bar + with change_cwd(tmp_path): + apply_pipeweld_instruction( + InsertParallel(step="foo", after="bar"), + step_to_insert=Step(name="foo", script=Script(["echo foo"])), + ) + + # Assert: Should create a parallel block with both steps + content = (tmp_path / "bitbucket-pipelines.yml").read_text() + assert ( + content + == """\ +image: atlassian/default-image:3 +pipelines: + default: + - parallel: + - step: + name: bar + script: + - echo bar + - step: + name: foo + script: + - echo foo +""" + ) + + def test_add_to_existing_parallel(self, tmp_path: Path): + # Arrange: Pipeline with an existing parallel block + (tmp_path / "bitbucket-pipelines.yml").write_text( + """\ +image: atlassian/default-image:3 +pipelines: + default: + - parallel: + - step: + name: bar + script: + - echo bar + - step: + name: baz + script: + - echo baz +""" + ) + + # Act: Insert foo in parallel to bar (which is already in a parallel block) + with change_cwd(tmp_path): + apply_pipeweld_instruction( + InsertParallel(step="foo", after="bar"), + step_to_insert=Step(name="foo", script=Script(["echo foo"])), + ) + + # Assert: Should add to the existing parallel block + content = (tmp_path / "bitbucket-pipelines.yml").read_text() + assert ( + content + == """\ +image: atlassian/default-image:3 +pipelines: + default: + - parallel: + - step: + name: bar + script: + - echo bar + - step: + name: baz + script: + - echo baz + - step: + name: foo + script: + - echo foo +""" + ) + + def test_parallel_after_none(self, tmp_path: Path): + # Arrange: Empty pipeline + (tmp_path / "bitbucket-pipelines.yml").write_text( + """\ +image: atlassian/default-image:3 +""" + ) + + # Act: Insert foo in parallel at the beginning (after=None) + with change_cwd(tmp_path): + apply_pipeweld_instruction( + InsertParallel(step="foo", after=None), + step_to_insert=Step(name="foo", script=Script(["echo foo"])), + ) + + # Assert: Should just add the step (parallel with no other steps is just a step) + content = (tmp_path / "bitbucket-pipelines.yml").read_text() + assert ( + content + == """\ +image: atlassian/default-image:3 +pipelines: + default: + - step: + name: foo + script: + - echo foo +""" + ) + + def test_add_to_existing_parallel_expanded_format(self, tmp_path: Path): + # Arrange: Pipeline with an existing parallel block using expanded format + (tmp_path / "bitbucket-pipelines.yml").write_text( + """\ +image: atlassian/default-image:3 +pipelines: + default: + - parallel: + steps: + - step: + name: bar + script: + - echo bar + - step: + name: baz + script: + - echo baz +""" + ) + + # Act: Insert foo in parallel to bar (expanded format) + with change_cwd(tmp_path): + apply_pipeweld_instruction( + InsertParallel(step="foo", after="bar"), + step_to_insert=Step(name="foo", script=Script(["echo foo"])), + ) + + # Assert: Should add to the existing parallel block + content = (tmp_path / "bitbucket-pipelines.yml").read_text() + assert ( + content + == """\ +image: atlassian/default-image:3 +pipelines: + default: + - parallel: + steps: + - step: + name: bar + script: + - echo bar + - step: + name: baz + script: + - echo baz + - step: + name: foo + script: + - echo foo +""" + ) + + +class TestBreakUpParallelism: + """Test breaking up an existing parallel block to satisfy dependencies.""" + + def test_full_parallel_breakup_sequence(self, tmp_path: Path): + # Arrange: Start with parallel(A, B), want to insert C after A + (tmp_path / "bitbucket-pipelines.yml").write_text( + """\ +image: atlassian/default-image:3 +pipelines: + default: + - parallel: + - step: + name: A + script: + - echo A + - step: + name: B + script: + - echo B +""" + ) + + with change_cwd(tmp_path): + # Simulate pipeweld instructions for: series("A", "C", "B") + # Note: step_to_insert is always C (the actual new step being added) + # but instruction.step varies (A, B, or C) to indicate which step + # the instruction is about (existing or new) + + # Step 1: Move A to beginning + apply_pipeweld_instruction( + InsertSuccessor(step="A", after=None), + step_to_insert=Step(name="C", script=Script(["echo C"])), + ) + + # Step 2: Move B after A + apply_pipeweld_instruction( + InsertSuccessor(step="B", after="A"), + step_to_insert=Step(name="C", script=Script(["echo C"])), + ) + + # Step 3: Insert C after A (C will go between A and B) + apply_pipeweld_instruction( + InsertSuccessor(step="C", after="A"), + step_to_insert=Step(name="C", script=Script(["echo C"])), + ) + + # Assert: Should result in A, then C, then B in series + content = (tmp_path / "bitbucket-pipelines.yml").read_text() + assert ( + content + == """\ +image: atlassian/default-image:3 +pipelines: + default: + - step: + name: A + script: + - echo A + - step: + name: C + script: + - echo C + - step: + name: B + script: + - echo B +""" + ) + + def test_extract_last_step_from_parallel(self, tmp_path: Path): + # Arrange: Parallel block with a single step + (tmp_path / "bitbucket-pipelines.yml").write_text( + """\ +image: atlassian/default-image:3 +pipelines: + default: + - parallel: + - step: + name: A + script: + - echo A + - step: + name: B + script: + - echo B +""" + ) + + # Act: Extract the only step from the parallel block + with change_cwd(tmp_path): + apply_pipeweld_instruction( + InsertSuccessor(step="A", after=None), + step_to_insert=Step(name="C", script=Script(["echo C"])), + ) + + # Assert: Parallel block should be removed entirely, leaving A and B + content = (tmp_path / "bitbucket-pipelines.yml").read_text() + assert ( + content + == """\ +image: atlassian/default-image:3 +pipelines: + default: + - step: + name: A + script: + - echo A + - step: + name: B + script: + - echo B +""" + ) + class TestGetInstructionsForInsertion: class TestStr: