diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 3e37cb9..9833f8f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -11,8 +11,8 @@ jobs: strategy: fail-fast: false matrix: - python-version: [3.9] - os: [macos-latest, ubuntu-latest, windows-latest] + python-version: ['3.10'] + os: [ubuntu-latest] steps: - uses: actions/checkout@v2 diff --git a/.github/workflows/updateMarkdown.yml b/.github/workflows/updateMarkdown.yml deleted file mode 100644 index 291a609..0000000 --- a/.github/workflows/updateMarkdown.yml +++ /dev/null @@ -1,22 +0,0 @@ -name: on-push-do-doco -on: - push: -jobs: - release: - runs-on: windows-latest - steps: - - uses: actions/checkout@v3 - - name: Run MarkdownSnippets - run: | - dotnet tool install --global MarkdownSnippets.Tool - mdsnippets ${GITHUB_WORKSPACE} - shell: bash - - name: Push changes - run: | - git config --local user.email "action@github.com" - git config --local user.name "GitHub Action" - git commit -m "Doco changes" -a || echo "nothing to commit" - remote="https://${GITHUB_ACTOR}:${{secrets.GITHUB_TOKEN}}@github.com/${GITHUB_REPOSITORY}.git" - branch="${GITHUB_REF:11}" - git push "${remote}" ${branch} || echo "nothing to push" - shell: bash diff --git a/README.md b/README.md index 3cb6d41..fe91c05 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,6 @@ # Data Pipelines in Python Library that makes it easy to write Python programs that get data from multiple sources, manipulate it, and then output the results. - ## Contents * [Who is this project for?](#who-is-this-project-for) @@ -12,7 +11,7 @@ Library that makes it easy to write Python programs that get data from multiple * [Watch the video](#watch-the-video) * [What is included?](#what-is-included) * [Recommended Tooling?](#recommended-tooling) - * [Next steps](#next-steps) + * [Next steps](#next-steps) ## Who is this project for? Anyone implementing a project that manipulates data from one or more sources and writes the results to one or more sinks. diff --git a/conftest.py b/conftest.py new file mode 100644 index 0000000..044963b --- /dev/null +++ b/conftest.py @@ -0,0 +1 @@ +collect_ignore = ["setup.py"] diff --git a/mdsnippets.json b/mdsnippets.json deleted file mode 100644 index 27dddee..0000000 --- a/mdsnippets.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "ExcludeDirectories": [ "target" ], - "Convention": "InPlaceOverwrite", - "TocLevel": 5 -} diff --git a/pyproject.toml b/pyproject.toml index c668604..c157773 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,5 +1,7 @@ [tool.pytest.ini_options] pythonpath = "src" +testpaths = 'tests' +python_files = '*.py' addopts = [ "--import-mode=importlib", ] diff --git a/pytest.ini b/pytest.ini index 923020e..b8ae217 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,4 +1,7 @@ [pytest] +asyncio_mode=auto +pythonpath = src testpaths = tests python_files = *.py -addopts = --showlocals +norecursedirs = .pytest_cache .github .idea +addopts = --showlocals --import-mode=importlib diff --git a/requirements.txt b/requirements.txt index 56e6704..4f8c766 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ approvaltests pytest assertpy +pytest-asyncio diff --git a/setup.py b/setup.py index 9caf8f0..4c88cfc 100644 --- a/setup.py +++ b/setup.py @@ -1,16 +1,12 @@ -import re from pathlib import Path from setuptools import setup, find_packages HERE = Path(__file__).parent -_version_file_contents = (HERE / "src" / "version.py").read_text() -matched = re.search(r'"(.*)"', _version_file_contents) -VERSION = matched.group(1) if matched is not None else "UNKNOWN VERSION" setup( name="datapipelines", - version=VERSION, + version="0.1.0", description="Data transform pipeline library to simplify ETL-style applications", author="Deep Roots", author_email="", diff --git a/src/datapipeline/__init__.py b/src/datapipeline/__init__.py index d6e2ac7..e36597e 100644 --- a/src/datapipeline/__init__.py +++ b/src/datapipeline/__init__.py @@ -1,4 +1,6 @@ from __future__ import annotations from datapipeline.clientapi import NamedStep, ProcessingStep, RestructuringStep -from datapipeline.segmentimpl import DataProcessingSegment +from datapipeline.segmentimpl import RestructuringSegment, SourceSegment, \ + TransformSegment, SinkSegment, DataProcessingSegment, PipeHeadSegment +from datapipeline.pipeline import Pipeline, is_valid_pipeline, pipeline, start_with, source, transform, sink diff --git a/src/datapipeline/clientapi.py b/src/datapipeline/clientapi.py index 830f8bf..463602b 100644 --- a/src/datapipeline/clientapi.py +++ b/src/datapipeline/clientapi.py @@ -11,19 +11,54 @@ class NamedStep(Protocol): @property @abstractmethod - def name(self) -> str: + def __name__(self) -> str: pass @runtime_checkable class ProcessingStep(NamedStep, Protocol[TIn]): @abstractmethod - def transform(self, data: TIn) -> None: + def __call__(self, data: TIn) -> None: + pass + + +@runtime_checkable +class ProcessingStepAsync(NamedStep, Protocol[TIn]): + @abstractmethod + async def __call__(self, data: TIn) -> None: pass @runtime_checkable class RestructuringStep(NamedStep, Protocol[TIn, TOut]): @abstractmethod - def restructure(self, data: TIn) -> TOut: + def __call__(self, data: TIn) -> TOut: + pass + + +@runtime_checkable +class Loader(NamedStep, Protocol[TIn, TOut]): + @abstractmethod + async def __call__(self, data: TIn) -> TOut: + pass + + +@runtime_checkable +class ParseImpl(NamedStep, Protocol[TIn, TOut]): + @abstractmethod + def __call__(self, data: TIn, new_data: TOut) -> None: + pass + + +@runtime_checkable +class Extractor(NamedStep, Protocol[TIn, TOut]): + @abstractmethod + def __call__(self, data: TIn) -> TOut: + pass + + +@runtime_checkable +class StoreImpl(NamedStep, Protocol[TIn]): + @abstractmethod + async def __call__(self, data: TIn) -> None: pass diff --git a/src/datapipeline/pipeline.py b/src/datapipeline/pipeline.py new file mode 100644 index 0000000..7d064f5 --- /dev/null +++ b/src/datapipeline/pipeline.py @@ -0,0 +1,134 @@ +from __future__ import annotations + +import asyncio +from typing import Callable, TypeVar, Awaitable, Generic, List, Iterable + +import assertpy +from assertpy import soft_assertions, assert_that, soft_fail + +from datapipeline import DataProcessingSegment, RestructuringSegment, PipeHeadSegment, clientapi +from datapipeline.segmentimpl import _PipeSegment, SourceSegment, TransformSegment, SinkSegment + +T = TypeVar("T") +TRaw = TypeVar("TRaw") +TSrc = TypeVar("TSrc") +TNext = TypeVar("TNext") +TDest = TypeVar("TDest") + + +def needs(*info_items: str) -> Callable[[T], T]: + def inner(f: T) -> T: + f._p_needs_ = list(info_items) + return f + + return inner + + +def gives(*info_items: str) -> Callable[[T], T]: + def inner(f: T) -> T: + f._p_gives_ = list(info_items) + return f + + return inner + + +SegmentBuilder = Callable[[_PipeSegment[TNext, T] | None], DataProcessingSegment[TNext]] +RestructureBuilder = Callable[[_PipeSegment[TNext, T] | None], RestructuringSegment[TSrc, TNext]] +PipeHeadBuilder = Callable[[_PipeSegment[TNext, T] | None], PipeHeadSegment[TNext]] +AnyBuilder = Callable[[_PipeSegment | None], _PipeSegment] + + +def start_with(data_constructor: Callable[[], T]) -> IncompletePipeline[T, T]: + def build(next_segment: _PipeSegment[T, TNext] | None) -> PipeHeadSegment[T]: + return PipeHeadSegment(data_constructor, next_segment) + + return IncompletePipeline[T, T]([build]) + + +def source(load: Callable[[T], Awaitable[TRaw]], parse: Callable[[T, TRaw], None]) -> SegmentBuilder[T]: + def build(next_segment: _PipeSegment[T, TNext] | None) -> SourceSegment[T, TRaw]: + return SourceSegment(load, parse, next_segment) + + return build + + +def transform(process: Callable[[T], None]) -> SegmentBuilder[T]: + def build(next_segment: _PipeSegment[T, TNext] | None) -> TransformSegment[T]: + return TransformSegment(process, next_segment) + + return build + + +def sink(extract: Callable[[TSrc], TDest], store: Callable[[TDest], Awaitable[None]]) -> SegmentBuilder[TSrc]: + def build(next_segment: _PipeSegment[T, TNext] | None) -> SinkSegment[T, TRaw]: + return SinkSegment(extract, store, next_segment) + + return build + + +class IncompletePipeline(Generic[TSrc, T]): + def __init__(self, prior_steps: List[AnyBuilder]): + self._prior_steps = prior_steps + + def then(self, *steps: SegmentBuilder[T]) -> PotentiallyCompletePipeline[TSrc, T]: + return PotentiallyCompletePipeline(self._prior_steps + list(steps)) + + +class PotentiallyCompletePipeline(Generic[TSrc, T]): + def __init__(self, prior_steps: List[AnyBuilder]): + self._prior_steps = prior_steps + + def restructure_to(self, + data_constructor: Callable[[], TDest], + restructure: Callable[[T], TDest]) \ + -> IncompletePipeline[TSrc, TDest]: + def build(next_segment: _PipeSegment[T, TNext] | None) -> RestructuringSegment[T, TDest]: + return RestructuringSegment(restructure, next_segment) + + return IncompletePipeline(self._prior_steps + [build]) + + def build(self) -> Pipeline: + next_segment = None + for builder in reversed(self._prior_steps): + next_segment = builder(next_segment) + return Pipeline(next_segment) + + +class Pipeline: + _first_segment: _PipeSegment + + def __init__(self, first_segment): + self._first_segment = first_segment + + def to_verification_string(self): + return self._first_segment.to_verification_string() + + @property + def _segments(self) -> Iterable[_PipeSegment]: + return iter(self._first_segment) + + def run(self): + asyncio.run(self._first_segment.process(None)) + + +def pipeline(builder: PotentiallyCompletePipeline[TSrc, TDest]) -> Pipeline: + return builder.build() + + +def is_valid_pipeline(self): + if not isinstance(self.val, Pipeline): + raise TypeError('val must be a pipeline.') + provided = set() + with soft_assertions(): + kind = self.kind + self.kind = 'soft' + for segment in self.val._segments: + needed = set(segment.needs) + if not needed <= provided: + desc = self.description + self.description = f'Segment {segment.descriptor}' + self.error(f'Had unmet needs {needed - provided}. The pipeline to that point had provided {provided}.') + self.description = desc + provided.update(segment.gives) + self.kind = kind + return self diff --git a/src/datapipeline/segmentimpl.py b/src/datapipeline/segmentimpl.py index 14d5439..0a37a8d 100644 --- a/src/datapipeline/segmentimpl.py +++ b/src/datapipeline/segmentimpl.py @@ -1,89 +1,176 @@ from __future__ import annotations +import inspect from abc import ABCMeta, abstractmethod -from typing import Generic, TypeVar +from typing import Generic, TypeVar, Callable, List -from datapipeline.clientapi import ProcessingStep, RestructuringStep +from datapipeline import clientapi U = TypeVar('U') TIn = TypeVar('TIn') TOut = TypeVar('TOut') +TRaw = TypeVar('TRaw') class _PipeSegment(Generic[TIn, TOut], metaclass=ABCMeta): _next_segment: _PipeSegment[TOut, U] + needs: List[str] + gives: List[str] - def __init__(self): - self._next_segment = NullTerminator() + class Iterator: + next_segment: _PipeSegment + + def __init__(self, next_segment: _PipeSegment): + self.next_segment = next_segment + + def __iter__(self): + return self + + def __next__(self): + self.next_segment = self.next_segment._next_segment + if isinstance(self.next_segment, NullTerminator): + raise StopIteration() + return self.next_segment + + def __init__(self, needs: List[str], gives: List[str], next_segment: _PipeSegment[TOut, U] = None): + if next_segment is None: + next_segment = NullTerminator() + self._next_segment = next_segment + self.needs = needs + self.gives = gives + + def __iter__(self): + return _PipeSegment.Iterator(self) @property @abstractmethod - def name(self) -> str: - pass + def descriptor(self) -> str: + raise NotImplementedError def to_verification_string(self) -> str: - return f'|\n+--{self.name}\n' + self._next_segment.to_verification_string() + return f' |\n{self.descriptor}\n | info change: {self.needs} ==> {self.gives}\n{self._next_segment.to_verification_string()}' @abstractmethod - def _process(self, data: TIn) -> TOut: + async def _process(self, data: TIn) -> TOut: pass - def process(self, data: TIn) -> None: - result = self._process(data) - self._next_segment.process(result) + async def process(self, data: TIn) -> None: + result = await self._process(data) + await self._next_segment.process(result) + - def then(self, next_segment: ProcessingStep[TOut] | RestructuringStep[TOut, U]) -> _PipeSegment[TOut, U]: - self._next_segment = DataProcessingSegment(next_segment) if isinstance(next_segment, ProcessingStep) else \ - RestructuringSegment(next_segment) - return self._next_segment +class PipeHeadSegment(_PipeSegment[TIn, TIn], Generic[TIn]): + _impl: Callable[[], TIn] + + def __init__(self, impl: Callable[[], TIn], next_segment: _PipeSegment[TIn, U] = None): + super(PipeHeadSegment, self).__init__([], [], next_segment) + self._impl = impl + + def to_verification_string(self) -> str: + return f' Start with empty <{self._impl.__name__}>\n{self._next_segment.to_verification_string()}' + + async def _process(self, data: TIn) -> TIn: + return self._impl() + + @property + def descriptor(self) -> str: + raise NotImplementedError class DataProcessingSegment(_PipeSegment[TIn, TIn], Generic[TIn]): - _impl: ProcessingStep[TIn] + _impl: clientapi.ProcessingStep[TIn] | clientapi.ProcessingStepAsync[TIn] - def __init__(self, impl: ProcessingStep[TIn]): - super(DataProcessingSegment, self).__init__() + def __init__(self, impl: clientapi.ProcessingStep[TIn] | clientapi.ProcessingStepAsync[TIn], + next_segment: _PipeSegment[TIn, U] = None): + assert isinstance(impl, clientapi.ProcessingStep) + super(DataProcessingSegment, self).__init__(getattr(impl, "_p_needs_", []), getattr(impl, "_p_gives_", []), next_segment) self._impl = impl @property - def name(self) -> str: - return self._impl.name + def descriptor(self) -> str: + return f'{self.symbol()} {self._impl.__name__}' - def _process(self, data: TIn) -> TIn: - self._impl.transform(data) + @abstractmethod + def symbol(self) -> str: + raise NotImplementedError + + async def _process(self, data: TIn) -> TIn: + result = self._impl(data) + if inspect.isawaitable(result): + await result return data +class SourceSegment(DataProcessingSegment[TIn], Generic[TIn, TRaw]): + def __init__(self, load: clientapi.Loader[TIn, TRaw], parse: clientapi.ParseImpl[TIn, TRaw], + next_segment: _PipeSegment[TIn, U] = None): + async def impl(data: TIn) -> None: + parse(data, await load(data)) + + assert isinstance(load, clientapi.Loader) + assert isinstance(parse, clientapi.ParseImpl) + impl.__name__ = f'load:{load.__name__}, parse: {parse.__name__}' + impl._p_needs_ = getattr(load, "_p_needs_", []) + impl._p_gives_ = getattr(load, "_p_gives_", []) + super(SourceSegment, self).__init__(impl, next_segment) + + def symbol(self) -> str: + return ">-| " + + +class TransformSegment(DataProcessingSegment[TIn], Generic[TIn]): + def symbol(self) -> str: + return " + " + + +class SinkSegment(DataProcessingSegment[TIn], Generic[TIn, TRaw]): + def __init__(self, extract: clientapi.Extractor[TIn, TRaw], store: clientapi.StoreImpl[TRaw], + next_segment: _PipeSegment[TIn, U] = None): + async def impl(data: TIn) -> None: + await store(extract(data)) + + assert isinstance(extract, clientapi.Extractor) + assert isinstance(store, clientapi.StoreImpl) + impl.__name__ = f'extract: {extract.__name__}, store: {store.__name__}' + impl._p_needs_ = getattr(extract, "_p_needs_", []) + impl._p_gives_ = getattr(extract, "_p_gives_", []) + super(SinkSegment, self).__init__(impl, next_segment) + + def symbol(self) -> str: + return " |->" + + class RestructuringSegment(_PipeSegment[TIn, TOut], Generic[TIn, TOut]): - _impl: RestructuringStep[TIn, TOut] + _impl: Callable[[TIn], TOut] - def __init__(self, impl: RestructuringStep[TIn, TOut]): - super(RestructuringSegment, self).__init__() + def __init__(self, impl: Callable[[TIn], TOut], next_segment: _PipeSegment[TOut, U] = None): + assert isinstance(impl, clientapi.RestructuringStep) + super(RestructuringSegment, self).__init__(getattr(impl, "_p_needs_", []), getattr(impl, "_p_gives_", []), next_segment) self._impl = impl @property - def name(self) -> str: - return self._impl.name + def descriptor(self) -> str: + return f' <-> Changed to <{inspect.get_annotations(self._impl, eval_str=True)["return"].__name__}> using {self._impl.__name__}' - def _process(self, data: TIn) -> TOut: - return self._impl.restructure(data) + async def _process(self, data: TIn) -> TOut: + return self._impl(data) class NullTerminator(_PipeSegment[TIn, TIn], Generic[TIn]): def __init__(self): pass # Don't call the superclass. - def name(self) -> str: + def __next__(self): + raise StopIteration() + + def descriptor(self) -> str: raise NotImplementedError def to_verification_string(self) -> str: - return"" + return "" - def process(self, data: TIn) -> None: + async def process(self, data: TIn) -> None: pass - def _process(self, data: TIn) -> TIn: - raise NotImplementedError - - def then(self, next_segment: ProcessingStep[TIn] | RestructuringStep[TIn, U]) -> _PipeSegment[TIn, U]: + async def _process(self, data: TIn) -> TIn: raise NotImplementedError diff --git a/src/examplecode/product.py b/src/examplecode/product.py index 195fb70..f63a914 100644 --- a/src/examplecode/product.py +++ b/src/examplecode/product.py @@ -1,4 +1,147 @@ +from __future__ import annotations +from typing import Any -class ValidSegmentImpl: +from datapipeline.pipeline import needs, gives, source, transform, sink, pipeline, start_with + + +class RawCustomerData: + value: Any + + def __init__(self): + self.value = None + + +class CustomerGraph: + pass + + +@needs() +@gives("customers from filesystem", "customer list") +async def load_customer_csv(data: RawCustomerData) -> str: + pass + + +def parse_customers_from_csv(data: RawCustomerData, new_data: str) -> None: + pass + + +@needs() +@gives("customers from api", "customer list") +async def load_customer_crm_api(data: RawCustomerData) -> str: + pass + + +def parse_customers_from_json(data: RawCustomerData, new_data: str) -> None: + pass + + +@needs("customer list") +@gives("customer emails") +def remove_invalid_emails(data: RawCustomerData) -> None: + pass + + +@needs("customer list") +@gives("non-test customers") +def remove_test_customers(data: RawCustomerData) -> None: + pass + + +@needs("customer emails", "non-test customers") +@gives("customer orders") +async def load_customer_orders(data: RawCustomerData) -> dict: + pass + + +def parse_orders_from_json(data: RawCustomerData, new_data: dict) -> None: + pass + + +@needs("customer orders") +@gives("valid orders") +def remove_empty_orders(data: RawCustomerData) -> None: + pass + + +@needs("customer list", "customer orders") +@gives("order cohorts") +def group_orders_into_customer_cohorts(data: RawCustomerData) -> None: pass + + +@needs("valid orders", "order cohorts") +@gives("relative order timing") +def compute_cohort_relative_date_per_order(data: RawCustomerData) -> None: + pass + + +def create_customer_object_graph(data: RawCustomerData) -> CustomerGraph: + pass + + +@needs("customer list", "order cohorts", "customer emails") +@gives("alpha", "omega") +def understand_something(data: CustomerGraph) -> None: + pass + + +@needs("customer list", "relative order timing", "alpha") +@gives("beta") +def understand_another(data: CustomerGraph) -> None: + pass + + +@needs("omega", "beta") +@gives("totality") +def keep_understanding(data: CustomerGraph) -> None: + pass + + +class DestStructureOne: + pass + + +@needs("alpha", "beta") +def extract_cohort_analysis(data: CustomerGraph) -> DestStructureOne: + pass + + +async def email_analysis_to_sales_team(data: DestStructureOne) -> None: + pass + + +class DestStructureTwo: + pass + + +@needs("totality") +def extract_revenue_projections(data: CustomerGraph) -> DestStructureTwo: + pass + + +async def put_projections_into_quickbooks(data: DestStructureTwo) -> None: + pass + + +def create_pipeline(): + return pipeline( + start_with(RawCustomerData) + .then( + source(load_customer_csv, parse_customers_from_csv), + source(load_customer_crm_api, parse_customers_from_json), + transform(remove_invalid_emails), + transform(remove_test_customers), + source(load_customer_orders, parse_orders_from_json), + transform(remove_empty_orders), + transform(group_orders_into_customer_cohorts), + transform(compute_cohort_relative_date_per_order)) + .restructure_to(CustomerGraph, create_customer_object_graph) + .then( + transform(understand_something), + transform(understand_another), + transform(keep_understanding), + sink(extract_cohort_analysis, email_analysis_to_sales_team), + sink(extract_revenue_projections, put_projections_into_quickbooks), + ) + ) diff --git a/src/version.py b/src/version.py index 54f129d..dbfe2f6 100644 --- a/src/version.py +++ b/src/version.py @@ -1 +1 @@ -version_number = "0.0.0" +version_number = "0.1.0" diff --git a/tests/pipe_segments.py b/tests/pipe_segments.py index fdeb8cc..ed6c8b2 100644 --- a/tests/pipe_segments.py +++ b/tests/pipe_segments.py @@ -1,60 +1,132 @@ +import asyncio +from typing import Any, Generic, TypeVar + import pytest from assertpy import assert_that from approvaltests import verify -from datapipeline import DataProcessingSegment +from datapipeline import RestructuringSegment, SourceSegment, TransformSegment, SinkSegment, PipeHeadSegment + + +T = TypeVar('T') class DataTransferObject: - def __init__(self): - self.some_num = None - self.dumb_obj = None + some_value: Any + dumb_object: Any + + def __init__(self, some_value=None): + self.some_value = some_value + self.dumb_object = None + + +class SecondDTO: + message: str + + def __init__(self, message: str = ""): + self.message = message + + +def format_dumb_object(data: DataTransferObject) -> None: + data.dumb_object = f"{{ saw: {data.some_value} }}" + + +async def first(data: DataTransferObject) -> dict: + pass + + +def parse_first(data: DataTransferObject, new_data: dict) -> None: + pass + +def second(data: DataTransferObject) -> None: + pass -class ValidSegmentImpl: - name = "Task 1" - def transform(self, data): - data.dumb_object = f"{{ saw: {data.some_num} }}" +def extract_for_third(data: SecondDTO) -> dict: + pass -class SegmentNamed: - def __init__(self, name): - self.name = name +async def third(data: dict) -> None: + pass - def transform(self, data): - raise NotImplementedError +def convert(data: DataTransferObject) -> SecondDTO: + return SecondDTO(f"made by convert function from {data.some_value}") -class CapturingSink: - name = "Capture for test" + +class Capture(Generic[T]): + result: T + + def __init__(self): + self.result = None + self.__name__ = 'Capture' + + def __call__(self, arg: T) -> None: + self.result = arg + + +class CaptureAsync(Generic[T]): + result: T def __init__(self): self.result = None + self.__name__ = 'CaptureAsync' + + async def __call__(self, arg: T) -> None: + await asyncio.sleep(0.005) + self.result = arg + + +async def test_restructuring_segment_uses_its_impl(): + def has_right_message(arg: SecondDTO): + assert_that(arg.message).is_equal_to("made by convert function from 2") + + test_subject = RestructuringSegment(convert, TransformSegment(has_right_message)) + await test_subject.process(DataTransferObject(2)) + + +async def test_pipe_segment_calls_its_transform_impl_to_process_data(): + capture = Capture[DataTransferObject]() + test_subject = TransformSegment(format_dumb_object, TransformSegment(capture)) + await test_subject.process(DataTransferObject(4)) + assert_that(capture.result.dumb_object).is_equal_to("{ saw: 4 }") + + +async def test_source_segment_chains_its_two_implementations(): + async def fetch_data(arg: DataTransferObject) -> str: + await asyncio.sleep(0.005) + return "fetched value" - def transform(self, data): - self.result = data + def parse_data(arg: DataTransferObject, new_data: str) -> None: + arg.some_value = new_data + capture = Capture[DataTransferObject]() + test_subject = SourceSegment(fetch_data, parse_data, TransformSegment(capture)) + await test_subject.process(DataTransferObject("original value")) + assert_that(capture.result.some_value).is_equal_to("fetched value") -def test_pipe_segment_names_are_used_when_verifying_them(): - processor = ValidSegmentImpl() - test_subject = DataProcessingSegment(processor) - assert_that(test_subject.to_verification_string()).contains(processor.name) +async def test_sink_segment_chains_its_two_implementations(): + def extract(arg: DataTransferObject) -> str: + return "extracted value" -def test_pipe_segment_calls_its_transform_impl_to_process_data(): - processor = ValidSegmentImpl() - sink = CapturingSink() - test_subject = DataProcessingSegment(processor) - test_subject.then(sink) - data = DataTransferObject() - data.some_num = 4 - test_subject.process(data) - assert_that(sink.result.dumb_object).is_equal_to("{ saw: 4 }") + capture = CaptureAsync[DataTransferObject]() + test_subject = SinkSegment(extract, capture) + await test_subject.process(DataTransferObject("original value")) + assert_that(capture.result).is_equal_to("extracted value") def test_pipelines_can_be_validated_as_strings(): - test_subject = DataProcessingSegment(SegmentNamed("first")) - test_subject \ - .then(SegmentNamed("second"))\ - .then(SegmentNamed("third")) + test_subject = PipeHeadSegment( + DataTransferObject, + SourceSegment( + first, + parse_first, + TransformSegment( + second, + RestructuringSegment( + convert, + SinkSegment( + extract_for_third, + third))))) verify(test_subject.to_verification_string()) diff --git a/tests/pipe_segments.test_pipelines_can_be_validated_as_strings.approved.txt b/tests/pipe_segments.test_pipelines_can_be_validated_as_strings.approved.txt index c5089fb..4979f47 100644 --- a/tests/pipe_segments.test_pipelines_can_be_validated_as_strings.approved.txt +++ b/tests/pipe_segments.test_pipelines_can_be_validated_as_strings.approved.txt @@ -1,6 +1,13 @@ -| -+--first -| -+--second -| -+--third + Start with empty + | +>-| load:first, parse: parse_first + | info change: [] ==> [] + | + + second + | info change: [] ==> [] + | + <-> Changed to using convert + | info change: [] ==> [] + | + |-> extract: extract_for_third, store: third + | info change: [] ==> [] diff --git a/tests/pipeline.py b/tests/pipeline.py new file mode 100644 index 0000000..670d285 --- /dev/null +++ b/tests/pipeline.py @@ -0,0 +1,71 @@ +import asyncio +from typing import Generic, TypeVar + +from approvaltests import verify +from assertpy import assert_that, add_extension + +from datapipeline import pipeline, start_with, source, transform, is_valid_pipeline, sink +from examplecode import product + +add_extension(is_valid_pipeline) + +T = TypeVar('T') + + +class CaptureAsync(Generic[T]): + result: T + + def __init__(self): + self.result = None + self.__name__ = 'CaptureAsync' + + async def __call__(self, arg: T) -> None: + await asyncio.sleep(0.005) + self.result = arg + + +def test_valid_pipeline_passes_test(): + result = product.create_pipeline() + assert_that(result).is_valid_pipeline() + verify(result.to_verification_string()) + + +def test_pipeline_runs_its_segments(): + async def fetch(data: product.RawCustomerData) -> str: + await asyncio.sleep(0.005) + return "fetched data" + + def parse(data: product.RawCustomerData, new_data: str) -> None: + data.value = new_data.split()[0] + + def change(data: product.RawCustomerData) -> None: + data.value = f"changed {data.value}" + + def extract(data: product.RawCustomerData) -> str: + return data.value + + capture = CaptureAsync[str]() + test_subject = pipeline( + start_with(product.RawCustomerData) + .then( + source(fetch, parse), + transform(change), + sink(extract, capture)) + ) + test_subject.run() + assert_that(capture.result).is_equal_to("changed fetched") + + +def test_pipeline_with_missing_requirement_is_invalid(): + result = pipeline( + start_with(product.RawCustomerData) + .then( + source(product.load_customer_csv, product.parse_customers_from_csv), + transform(product.remove_empty_orders)) + ) + + def make_assertion(): + assert_that(result).is_valid_pipeline() + + assert_that(make_assertion).raises(AssertionError).when_called_with().contains( + "[Segment + remove_empty_orders] Had unmet needs {'customer orders'}. The pipeline to that point had provided {'") diff --git a/tests/pipeline.test_valid_pipeline_passes_test.approved.txt b/tests/pipeline.test_valid_pipeline_passes_test.approved.txt new file mode 100644 index 0000000..f3a9eeb --- /dev/null +++ b/tests/pipeline.test_valid_pipeline_passes_test.approved.txt @@ -0,0 +1,43 @@ + Start with empty + | +>-| load:load_customer_csv, parse: parse_customers_from_csv + | info change: [] ==> ['customers from filesystem', 'customer list'] + | +>-| load:load_customer_crm_api, parse: parse_customers_from_json + | info change: [] ==> ['customers from api', 'customer list'] + | + + remove_invalid_emails + | info change: ['customer list'] ==> ['customer emails'] + | + + remove_test_customers + | info change: ['customer list'] ==> ['non-test customers'] + | +>-| load:load_customer_orders, parse: parse_orders_from_json + | info change: ['customer emails', 'non-test customers'] ==> ['customer orders'] + | + + remove_empty_orders + | info change: ['customer orders'] ==> ['valid orders'] + | + + group_orders_into_customer_cohorts + | info change: ['customer list', 'customer orders'] ==> ['order cohorts'] + | + + compute_cohort_relative_date_per_order + | info change: ['valid orders', 'order cohorts'] ==> ['relative order timing'] + | + <-> Changed to using create_customer_object_graph + | info change: [] ==> [] + | + + understand_something + | info change: ['customer list', 'order cohorts', 'customer emails'] ==> ['alpha', 'omega'] + | + + understand_another + | info change: ['customer list', 'relative order timing', 'alpha'] ==> ['beta'] + | + + keep_understanding + | info change: ['omega', 'beta'] ==> ['totality'] + | + |-> extract: extract_cohort_analysis, store: email_analysis_to_sales_team + | info change: ['alpha', 'beta'] ==> [] + | + |-> extract: extract_revenue_projections, store: put_projections_into_quickbooks + | info change: ['totality'] ==> [] diff --git a/tox.ini b/tox.ini index b2c52fb..398708d 100644 --- a/tox.ini +++ b/tox.ini @@ -1,7 +1,7 @@ - + # run with: tox -e dev [tox] -envlist = tests-{py38,py39} +envlist = tests-{py} [testenv] commands = python -m pytest {posargs} @@ -10,4 +10,3 @@ deps = -rrequirements.txt [testenv:dev] usedevelop = True commands = -