From 36586e00cd500bfc8626c6fedbf20d0754a92fc9 Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Thu, 26 Jan 2023 16:53:04 -0500 Subject: [PATCH 01/46] F - Sketch out some example product code that uses the API I'd like to expose. Everything just returns either Null objects or None. --- src/examplecode/product.py | 197 ++++++++++++++++++++++++++++++++++++- tests/pipeline.py | 8 ++ 2 files changed, 204 insertions(+), 1 deletion(-) create mode 100644 tests/pipeline.py diff --git a/src/examplecode/product.py b/src/examplecode/product.py index 195fb70..4f86ac5 100644 --- a/src/examplecode/product.py +++ b/src/examplecode/product.py @@ -1,4 +1,199 @@ +from __future__ import annotations +import asyncio +from typing import TypeVar, Callable, Awaitable, Generic +from datapipeline import DataProcessingSegment -class ValidSegmentImpl: + +class DataCollectingDTO: + pass + + +class AbstractingDTO: + pass + + +T = TypeVar("T") +TRaw = TypeVar("TRaw") +TSrc = TypeVar("TSrc") +TDest = TypeVar("TDest") + + +def needs(*info_items: str) -> Callable[[T], T]: + def inner(f: T) -> T: + f._p_needs_ = info_items + return f + + return inner + + +def gives(*info_items: str) -> Callable[[T], T]: + def inner(f: T) -> T: + f._p_gives_ = info_items + return f + + return inner + + +@needs() +@gives("foo from filesystem", "foo list") +async def fetch_one(data: DataCollectingDTO) -> str: + pass + + +def parse_one(data: DataCollectingDTO, new_data: str) -> None: + pass + + +@needs() +@gives("foo from api", "foo list") +async def fetch_two(data: DataCollectingDTO) -> str: + pass + + +def parse_two(data: DataCollectingDTO, new_data: str) -> None: + pass + + +@needs("foo list") +@gives("foo.core") +def clean_some(data: DataCollectingDTO) -> None: + pass + + +@needs("foo list") +@gives("foo.bar") +def clean_more(data: DataCollectingDTO) -> None: + pass + + +@needs("foo.core", "foo.bar") +@gives("related baz") +async def fetch_based_on_data_so_far(data: DataCollectingDTO) -> dict: + pass + + +def parse_new_source(data: DataCollectingDTO, new_data: dict) -> None: + pass + + +@needs("related baz") +@gives("baz.something") +def clean_a(data: DataCollectingDTO) -> None: + pass + + +@needs("foo list", "related baz") +@gives("baz.association") +def clean_b(data: DataCollectingDTO) -> None: + pass + + +@needs("baz.something", "baz.association") +@gives("quux") +def clean_c(data: DataCollectingDTO) -> None: + pass + + +def restructuring_function(data: DataCollectingDTO) -> AbstractingDTO: pass + + +@needs("foo list", "baz.association", "baz.something") +@gives("alpha", "omega") +def understand_something(data: AbstractingDTO) -> None: + pass + + +@needs("foo list", "quux", "alpha") +@gives("beta") +def understand_another(data: AbstractingDTO) -> None: + pass + + +@needs("omega", "beta") +@gives("totality") +def keep_understanding(data: AbstractingDTO) -> None: + pass + + +class DestStructureOne: + pass + + +@needs("alpha", "beta") +def extract_and_format_one(data: AbstractingDTO) -> DestStructureOne: + pass + + +async def put_one(data: DestStructureOne) -> None: + pass + + +class DestStructureTwo: + pass + + +@needs("totality") +def extract_and_format_two(data: AbstractingDTO) -> DestStructureTwo: + pass + + +async def put_two(data: DestStructureTwo) -> None: + pass + + +def source(load: Callable[[T], Awaitable[TRaw]], parse: Callable[[T, TRaw], None]) -> DataProcessingSegment[T]: + pass + + +def transform(process: Callable[[T], None]) -> DataProcessingSegment[T]: + pass + + +def sink(extract: Callable[[TSrc], TDest], store: Callable[[TDest], Awaitable[None]]) -> DataProcessingSegment[TSrc]: + pass + + +class IncompletePipeline(Generic[TSrc, T]): + def then(self, *steps: DataProcessingSegment[T]) -> PotentiallyCompletePipeline[TSrc, T]: + return PotentiallyCompletePipeline() + + +class PotentiallyCompletePipeline(Generic[TSrc, T]): + def restructure_to(self, + data_constructor: Callable[[], TDest], + restructure: Callable[[T], TDest]) \ + -> IncompletePipeline[TSrc, TDest]: + return IncompletePipeline() + + +def pipeline(builder: PotentiallyCompletePipeline[TSrc, TDest]): + pass + + +def start_with(data_constructor: Callable[[], T]) -> IncompletePipeline[T]: + return IncompletePipeline() + + +def create_pipeline(): + return pipeline( + start_with(DataCollectingDTO) + .then( + source(fetch_one, parse_one), + source(fetch_two, parse_two), + transform(clean_some), + transform(clean_more), + source(fetch_based_on_data_so_far, parse_new_source), + transform(clean_a), + transform(clean_b), + transform(clean_c)) + .restructure_to(AbstractingDTO, restructuring_function) + .then( + transform(understand_something), + transform(understand_another), + transform(keep_understanding), + sink(extract_and_format_one, put_one), + sink(extract_and_format_two, put_two), + ) + ) diff --git a/tests/pipeline.py b/tests/pipeline.py new file mode 100644 index 0000000..e544288 --- /dev/null +++ b/tests/pipeline.py @@ -0,0 +1,8 @@ +import pytest +from assertpy import assert_that + +from examplecode.product import create_pipeline + + +def test_create_pipeline_fluently(): + pipeline = create_pipeline() From 12fedde9c2bcea42379308ff2b25d3998e8dbed3 Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Thu, 26 Jan 2023 16:58:31 -0500 Subject: [PATCH 02/46] r - Move decorator. --- src/datapipeline/pipeline.py | 16 ++++++++++++++++ src/examplecode/product.py | 10 +--------- 2 files changed, 17 insertions(+), 9 deletions(-) create mode 100644 src/datapipeline/pipeline.py diff --git a/src/datapipeline/pipeline.py b/src/datapipeline/pipeline.py new file mode 100644 index 0000000..cfa9e42 --- /dev/null +++ b/src/datapipeline/pipeline.py @@ -0,0 +1,16 @@ +from __future__ import annotations + +from typing import Callable, TypeVar + +T = TypeVar("T") +TRaw = TypeVar("TRaw") +TSrc = TypeVar("TSrc") +TDest = TypeVar("TDest") + + +def needs(*info_items: str) -> Callable[[T], T]: + def inner(f: T) -> T: + f._p_needs_ = info_items + return f + + return inner diff --git a/src/examplecode/product.py b/src/examplecode/product.py index 4f86ac5..ca800dd 100644 --- a/src/examplecode/product.py +++ b/src/examplecode/product.py @@ -1,8 +1,8 @@ from __future__ import annotations -import asyncio from typing import TypeVar, Callable, Awaitable, Generic from datapipeline import DataProcessingSegment +from datapipeline.pipeline import needs class DataCollectingDTO: @@ -19,14 +19,6 @@ class AbstractingDTO: TDest = TypeVar("TDest") -def needs(*info_items: str) -> Callable[[T], T]: - def inner(f: T) -> T: - f._p_needs_ = info_items - return f - - return inner - - def gives(*info_items: str) -> Callable[[T], T]: def inner(f: T) -> T: f._p_gives_ = info_items From b82cc27ec067cb477a43d7a48632dead352e4075 Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Thu, 26 Jan 2023 17:03:21 -0500 Subject: [PATCH 03/46] r - Move many functions and classes. --- src/datapipeline/pipeline.py | 50 ++++++++++++++++++++++++++++++++++- src/examplecode/product.py | 51 +----------------------------------- 2 files changed, 50 insertions(+), 51 deletions(-) diff --git a/src/datapipeline/pipeline.py b/src/datapipeline/pipeline.py index cfa9e42..5f3ed6c 100644 --- a/src/datapipeline/pipeline.py +++ b/src/datapipeline/pipeline.py @@ -1,6 +1,9 @@ from __future__ import annotations -from typing import Callable, TypeVar +from typing import Callable, TypeVar, Awaitable, Generic + +from datapipeline import DataProcessingSegment + T = TypeVar("T") TRaw = TypeVar("TRaw") @@ -14,3 +17,48 @@ def inner(f: T) -> T: return f return inner + + +def gives(*info_items: str) -> Callable[[T], T]: + def inner(f: T) -> T: + f._p_gives_ = info_items + return f + + return inner + + +def source(load: Callable[[T], Awaitable[TRaw]], parse: Callable[[T, TRaw], None]) -> DataProcessingSegment[T]: + pass + + +def transform(process: Callable[[T], None]) -> DataProcessingSegment[T]: + pass + + +def sink(extract: Callable[[TSrc], TDest], store: Callable[[TDest], Awaitable[None]]) -> DataProcessingSegment[TSrc]: + pass + + +class IncompletePipeline(Generic[TSrc, T]): + def then(self, *steps: DataProcessingSegment[T]) -> PotentiallyCompletePipeline[TSrc, T]: + return PotentiallyCompletePipeline() + + +class PotentiallyCompletePipeline(Generic[TSrc, T]): + def restructure_to(self, + data_constructor: Callable[[], TDest], + restructure: Callable[[T], TDest]) \ + -> IncompletePipeline[TSrc, TDest]: + return IncompletePipeline() + + +class Pipeline: + pass + + +def pipeline(builder: PotentiallyCompletePipeline[TSrc, TDest]) -> Pipeline: + pass + + +def start_with(data_constructor: Callable[[], T]) -> IncompletePipeline[T]: + return IncompletePipeline() diff --git a/src/examplecode/product.py b/src/examplecode/product.py index ca800dd..9da0d0c 100644 --- a/src/examplecode/product.py +++ b/src/examplecode/product.py @@ -1,8 +1,6 @@ from __future__ import annotations -from typing import TypeVar, Callable, Awaitable, Generic -from datapipeline import DataProcessingSegment -from datapipeline.pipeline import needs +from datapipeline.pipeline import needs, gives, source, transform, sink, pipeline, start_with class DataCollectingDTO: @@ -13,20 +11,6 @@ class AbstractingDTO: pass -T = TypeVar("T") -TRaw = TypeVar("TRaw") -TSrc = TypeVar("TSrc") -TDest = TypeVar("TDest") - - -def gives(*info_items: str) -> Callable[[T], T]: - def inner(f: T) -> T: - f._p_gives_ = info_items - return f - - return inner - - @needs() @gives("foo from filesystem", "foo list") async def fetch_one(data: DataCollectingDTO) -> str: @@ -135,39 +119,6 @@ async def put_two(data: DestStructureTwo) -> None: pass -def source(load: Callable[[T], Awaitable[TRaw]], parse: Callable[[T, TRaw], None]) -> DataProcessingSegment[T]: - pass - - -def transform(process: Callable[[T], None]) -> DataProcessingSegment[T]: - pass - - -def sink(extract: Callable[[TSrc], TDest], store: Callable[[TDest], Awaitable[None]]) -> DataProcessingSegment[TSrc]: - pass - - -class IncompletePipeline(Generic[TSrc, T]): - def then(self, *steps: DataProcessingSegment[T]) -> PotentiallyCompletePipeline[TSrc, T]: - return PotentiallyCompletePipeline() - - -class PotentiallyCompletePipeline(Generic[TSrc, T]): - def restructure_to(self, - data_constructor: Callable[[], TDest], - restructure: Callable[[T], TDest]) \ - -> IncompletePipeline[TSrc, TDest]: - return IncompletePipeline() - - -def pipeline(builder: PotentiallyCompletePipeline[TSrc, TDest]): - pass - - -def start_with(data_constructor: Callable[[], T]) -> IncompletePipeline[T]: - return IncompletePipeline() - - def create_pipeline(): return pipeline( start_with(DataCollectingDTO) From a45945d9d5ff62c4230a885442471a21ffbd3728 Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Thu, 26 Jan 2023 17:14:38 -0500 Subject: [PATCH 04/46] R!! Convert segments to expect the client step to have __name__, not name. First step in making it work with callables. :) --- src/datapipeline/clientapi.py | 2 +- src/datapipeline/segmentimpl.py | 8 ++++---- tests/pipe_segments.py | 20 ++++++++++++++++---- 3 files changed, 21 insertions(+), 9 deletions(-) diff --git a/src/datapipeline/clientapi.py b/src/datapipeline/clientapi.py index 830f8bf..d853a58 100644 --- a/src/datapipeline/clientapi.py +++ b/src/datapipeline/clientapi.py @@ -11,7 +11,7 @@ class NamedStep(Protocol): @property @abstractmethod - def name(self) -> str: + def __name__(self) -> str: pass diff --git a/src/datapipeline/segmentimpl.py b/src/datapipeline/segmentimpl.py index 14d5439..c3ce655 100644 --- a/src/datapipeline/segmentimpl.py +++ b/src/datapipeline/segmentimpl.py @@ -19,10 +19,10 @@ def __init__(self): @property @abstractmethod def name(self) -> str: - pass + raise NotImplementedError def to_verification_string(self) -> str: - return f'|\n+--{self.name}\n' + self._next_segment.to_verification_string() + return f'|\n+--{self.name}\n{self._next_segment.to_verification_string()}' @abstractmethod def _process(self, data: TIn) -> TOut: @@ -47,7 +47,7 @@ def __init__(self, impl: ProcessingStep[TIn]): @property def name(self) -> str: - return self._impl.name + return self._impl.__name__ def _process(self, data: TIn) -> TIn: self._impl.transform(data) @@ -63,7 +63,7 @@ def __init__(self, impl: RestructuringStep[TIn, TOut]): @property def name(self) -> str: - return self._impl.name + return self._impl.__name__ def _process(self, data: TIn) -> TOut: return self._impl.restructure(data) diff --git a/tests/pipe_segments.py b/tests/pipe_segments.py index fdeb8cc..74a9828 100644 --- a/tests/pipe_segments.py +++ b/tests/pipe_segments.py @@ -11,22 +11,34 @@ def __init__(self): class ValidSegmentImpl: - name = "Task 1" + __name__ = "Task 1" def transform(self, data): data.dumb_object = f"{{ saw: {data.some_num} }}" +def first(data): + pass + + +def second(data): + pass + + +def third(data): + pass + + class SegmentNamed: def __init__(self, name): - self.name = name + self.__name__ = name def transform(self, data): raise NotImplementedError class CapturingSink: - name = "Capture for test" + __name__ = "Capture for test" def __init__(self): self.result = None @@ -38,7 +50,7 @@ def transform(self, data): def test_pipe_segment_names_are_used_when_verifying_them(): processor = ValidSegmentImpl() test_subject = DataProcessingSegment(processor) - assert_that(test_subject.to_verification_string()).contains(processor.name) + assert_that(test_subject.to_verification_string()).contains(processor.__name__) def test_pipe_segment_calls_its_transform_impl_to_process_data(): From 317de3848691c54b51a61f765f9402f8f1b32bc8 Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Thu, 26 Jan 2023 17:17:45 -0500 Subject: [PATCH 05/46] R!! Data processign segments now expect their implementations to implement __call__. --- src/datapipeline/clientapi.py | 2 +- src/datapipeline/segmentimpl.py | 2 +- tests/pipe_segments.py | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/datapipeline/clientapi.py b/src/datapipeline/clientapi.py index d853a58..06aa431 100644 --- a/src/datapipeline/clientapi.py +++ b/src/datapipeline/clientapi.py @@ -18,7 +18,7 @@ def __name__(self) -> str: @runtime_checkable class ProcessingStep(NamedStep, Protocol[TIn]): @abstractmethod - def transform(self, data: TIn) -> None: + def __call__(self, data: TIn) -> None: pass diff --git a/src/datapipeline/segmentimpl.py b/src/datapipeline/segmentimpl.py index c3ce655..325de2c 100644 --- a/src/datapipeline/segmentimpl.py +++ b/src/datapipeline/segmentimpl.py @@ -50,7 +50,7 @@ def name(self) -> str: return self._impl.__name__ def _process(self, data: TIn) -> TIn: - self._impl.transform(data) + self._impl(data) return data diff --git a/tests/pipe_segments.py b/tests/pipe_segments.py index 74a9828..f1cf675 100644 --- a/tests/pipe_segments.py +++ b/tests/pipe_segments.py @@ -13,7 +13,7 @@ def __init__(self): class ValidSegmentImpl: __name__ = "Task 1" - def transform(self, data): + def __call__(self, data): data.dumb_object = f"{{ saw: {data.some_num} }}" @@ -33,7 +33,7 @@ class SegmentNamed: def __init__(self, name): self.__name__ = name - def transform(self, data): + def __call__(self, data): raise NotImplementedError @@ -43,7 +43,7 @@ class CapturingSink: def __init__(self): self.result = None - def transform(self, data): + def __call__(self, data): self.result = data From 09620079d0afb1ab2769d03ec03e9cf8e7287b56 Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Thu, 26 Jan 2023 17:24:20 -0500 Subject: [PATCH 06/46] t - Convert some of the segment tests to just use functions for segment implementations. --- tests/pipe_segments.py | 26 +++++++++++--------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/tests/pipe_segments.py b/tests/pipe_segments.py index f1cf675..6fb7412 100644 --- a/tests/pipe_segments.py +++ b/tests/pipe_segments.py @@ -10,22 +10,19 @@ def __init__(self): self.dumb_obj = None -class ValidSegmentImpl: - __name__ = "Task 1" +def format_dumb_object(data: DataTransferObject) -> None: + data.dumb_object = f"{{ saw: {data.some_num} }}" - def __call__(self, data): - data.dumb_object = f"{{ saw: {data.some_num} }}" - -def first(data): +def first(data: DataTransferObject) -> None: pass -def second(data): +def second(data: DataTransferObject) -> None: pass -def third(data): +def third(data: DataTransferObject) -> None: pass @@ -33,30 +30,29 @@ class SegmentNamed: def __init__(self, name): self.__name__ = name - def __call__(self, data): + def __call__(self, data: DataTransferObject) -> None: raise NotImplementedError class CapturingSink: __name__ = "Capture for test" + result: DataTransferObject def __init__(self): self.result = None - def __call__(self, data): + def __call__(self, data: DataTransferObject) -> None: self.result = data def test_pipe_segment_names_are_used_when_verifying_them(): - processor = ValidSegmentImpl() - test_subject = DataProcessingSegment(processor) - assert_that(test_subject.to_verification_string()).contains(processor.__name__) + test_subject = DataProcessingSegment(format_dumb_object) + assert_that(test_subject.to_verification_string()).contains(format_dumb_object.__name__) def test_pipe_segment_calls_its_transform_impl_to_process_data(): - processor = ValidSegmentImpl() sink = CapturingSink() - test_subject = DataProcessingSegment(processor) + test_subject = DataProcessingSegment(format_dumb_object) test_subject.then(sink) data = DataTransferObject() data.some_num = 4 From 34c39322dce62c18809a74a4ef5fb5bb1393119d Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Thu, 26 Jan 2023 17:26:37 -0500 Subject: [PATCH 07/46] t - Convert more of the segment tests to just use simple function impls. --- tests/pipe_segments.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/pipe_segments.py b/tests/pipe_segments.py index 6fb7412..a085fb3 100644 --- a/tests/pipe_segments.py +++ b/tests/pipe_segments.py @@ -61,8 +61,8 @@ def test_pipe_segment_calls_its_transform_impl_to_process_data(): def test_pipelines_can_be_validated_as_strings(): - test_subject = DataProcessingSegment(SegmentNamed("first")) + test_subject = DataProcessingSegment(first) test_subject \ - .then(SegmentNamed("second"))\ - .then(SegmentNamed("third")) + .then(second)\ + .then(third) verify(test_subject.to_verification_string()) From 531528b294d6e7160e4a554021636fe2c48ce964 Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Thu, 26 Jan 2023 17:28:43 -0500 Subject: [PATCH 08/46] F - Distinguish data processing from restructuring segments in the output. This will get fancier later. :) Currently, Restructuring ones aren't supposed to be used by any test, so this sees if we use any accidentally. --- src/datapipeline/segmentimpl.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/datapipeline/segmentimpl.py b/src/datapipeline/segmentimpl.py index 325de2c..a60bbda 100644 --- a/src/datapipeline/segmentimpl.py +++ b/src/datapipeline/segmentimpl.py @@ -18,11 +18,11 @@ def __init__(self): @property @abstractmethod - def name(self) -> str: + def descriptor(self) -> str: raise NotImplementedError def to_verification_string(self) -> str: - return f'|\n+--{self.name}\n{self._next_segment.to_verification_string()}' + return f'|\n{self.descriptor}\n{self._next_segment.to_verification_string()}' @abstractmethod def _process(self, data: TIn) -> TOut: @@ -46,8 +46,8 @@ def __init__(self, impl: ProcessingStep[TIn]): self._impl = impl @property - def name(self) -> str: - return self._impl.__name__ + def descriptor(self) -> str: + return f'+--{self._impl.__name__}' def _process(self, data: TIn) -> TIn: self._impl(data) @@ -62,8 +62,8 @@ def __init__(self, impl: RestructuringStep[TIn, TOut]): self._impl = impl @property - def name(self) -> str: - return self._impl.__name__ + def descriptor(self) -> str: + return f'<->{self._impl.__name__}' def _process(self, data: TIn) -> TOut: return self._impl.restructure(data) @@ -73,7 +73,7 @@ class NullTerminator(_PipeSegment[TIn, TIn], Generic[TIn]): def __init__(self): pass # Don't call the superclass. - def name(self) -> str: + def descriptor(self) -> str: raise NotImplementedError def to_verification_string(self) -> str: From 1f122765c36b92420467761a056157e7db73ed22 Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Thu, 26 Jan 2023 18:00:01 -0500 Subject: [PATCH 09/46] F - Pipe segments now expect the next segment upon construction. The fancy API will happen at the pipeline, not the segment. --- src/datapipeline/segmentimpl.py | 23 +++++++++++----------- tests/pipe_segments.py | 35 ++++++++++++++++----------------- 2 files changed, 28 insertions(+), 30 deletions(-) diff --git a/src/datapipeline/segmentimpl.py b/src/datapipeline/segmentimpl.py index a60bbda..10bb114 100644 --- a/src/datapipeline/segmentimpl.py +++ b/src/datapipeline/segmentimpl.py @@ -13,8 +13,8 @@ class _PipeSegment(Generic[TIn, TOut], metaclass=ABCMeta): _next_segment: _PipeSegment[TOut, U] - def __init__(self): - self._next_segment = NullTerminator() + def __init__(self, next_segment): + self._next_segment = next_segment @property @abstractmethod @@ -32,17 +32,14 @@ def process(self, data: TIn) -> None: result = self._process(data) self._next_segment.process(result) - def then(self, next_segment: ProcessingStep[TOut] | RestructuringStep[TOut, U]) -> _PipeSegment[TOut, U]: - self._next_segment = DataProcessingSegment(next_segment) if isinstance(next_segment, ProcessingStep) else \ - RestructuringSegment(next_segment) - return self._next_segment - class DataProcessingSegment(_PipeSegment[TIn, TIn], Generic[TIn]): _impl: ProcessingStep[TIn] - def __init__(self, impl: ProcessingStep[TIn]): - super(DataProcessingSegment, self).__init__() + def __init__(self, impl: ProcessingStep[TIn], next_segment: _PipeSegment[TIn, U] = None): + if next_segment is None: + next_segment = NullTerminator() + super(DataProcessingSegment, self).__init__(next_segment) self._impl = impl @property @@ -57,8 +54,10 @@ def _process(self, data: TIn) -> TIn: class RestructuringSegment(_PipeSegment[TIn, TOut], Generic[TIn, TOut]): _impl: RestructuringStep[TIn, TOut] - def __init__(self, impl: RestructuringStep[TIn, TOut]): - super(RestructuringSegment, self).__init__() + def __init__(self, impl: RestructuringStep[TIn, TOut], next_segment: _PipeSegment[TIn, U] = None): + if next_segment is None: + next_segment = NullTerminator() + super(RestructuringSegment, self).__init__(next_segment) self._impl = impl @property @@ -77,7 +76,7 @@ def descriptor(self) -> str: raise NotImplementedError def to_verification_string(self) -> str: - return"" + return "" def process(self, data: TIn) -> None: pass diff --git a/tests/pipe_segments.py b/tests/pipe_segments.py index a085fb3..028a976 100644 --- a/tests/pipe_segments.py +++ b/tests/pipe_segments.py @@ -1,3 +1,5 @@ +from typing import Any + import pytest from assertpy import assert_that from approvaltests import verify @@ -5,9 +7,12 @@ class DataTransferObject: + some_num: Any + dumb_object: Any + def __init__(self): self.some_num = None - self.dumb_obj = None + self.dumb_object = None def format_dumb_object(data: DataTransferObject) -> None: @@ -26,17 +31,9 @@ def third(data: DataTransferObject) -> None: pass -class SegmentNamed: - def __init__(self, name): - self.__name__ = name - - def __call__(self, data: DataTransferObject) -> None: - raise NotImplementedError - - class CapturingSink: __name__ = "Capture for test" - result: DataTransferObject + result: DataTransferObject | None def __init__(self): self.result = None @@ -51,18 +48,20 @@ def test_pipe_segment_names_are_used_when_verifying_them(): def test_pipe_segment_calls_its_transform_impl_to_process_data(): - sink = CapturingSink() - test_subject = DataProcessingSegment(format_dumb_object) - test_subject.then(sink) + result: Any = None + + def capture(arg: DataTransferObject): + nonlocal result + assert_that(arg.dumb_object).is_equal_to("{ saw: 4 }") + result = arg.dumb_object + + test_subject = DataProcessingSegment(format_dumb_object, DataProcessingSegment(capture)) data = DataTransferObject() data.some_num = 4 test_subject.process(data) - assert_that(sink.result.dumb_object).is_equal_to("{ saw: 4 }") + assert_that(result).is_equal_to("{ saw: 4 }") def test_pipelines_can_be_validated_as_strings(): - test_subject = DataProcessingSegment(first) - test_subject \ - .then(second)\ - .then(third) + test_subject = DataProcessingSegment(first, DataProcessingSegment(second, DataProcessingSegment(third))) verify(test_subject.to_verification_string()) From 423903220cddffe6dd639677515f70ac0107809e Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Thu, 26 Jan 2023 18:00:56 -0500 Subject: [PATCH 10/46] t - Remove redundant assertion. --- tests/pipe_segments.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/pipe_segments.py b/tests/pipe_segments.py index 028a976..07f1134 100644 --- a/tests/pipe_segments.py +++ b/tests/pipe_segments.py @@ -52,7 +52,6 @@ def test_pipe_segment_calls_its_transform_impl_to_process_data(): def capture(arg: DataTransferObject): nonlocal result - assert_that(arg.dumb_object).is_equal_to("{ saw: 4 }") result = arg.dumb_object test_subject = DataProcessingSegment(format_dumb_object, DataProcessingSegment(capture)) From b89ce24f7d93b256b81d2210140efa02048058a4 Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Thu, 26 Jan 2023 18:02:08 -0500 Subject: [PATCH 11/46] T - Remove unused code. --- tests/pipe_segments.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/tests/pipe_segments.py b/tests/pipe_segments.py index 07f1134..76811e4 100644 --- a/tests/pipe_segments.py +++ b/tests/pipe_segments.py @@ -31,17 +31,6 @@ def third(data: DataTransferObject) -> None: pass -class CapturingSink: - __name__ = "Capture for test" - result: DataTransferObject | None - - def __init__(self): - self.result = None - - def __call__(self, data: DataTransferObject) -> None: - self.result = data - - def test_pipe_segment_names_are_used_when_verifying_them(): test_subject = DataProcessingSegment(format_dumb_object) assert_that(test_subject.to_verification_string()).contains(format_dumb_object.__name__) From 32ae052232745142fa0a26fa22c0fdc9548dcfbd Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Thu, 26 Jan 2023 18:06:58 -0500 Subject: [PATCH 12/46] F - Update validation strings. --- src/datapipeline/segmentimpl.py | 6 +++--- ...ipelines_can_be_validated_as_strings.approved.txt | 12 ++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/datapipeline/segmentimpl.py b/src/datapipeline/segmentimpl.py index 10bb114..d241d30 100644 --- a/src/datapipeline/segmentimpl.py +++ b/src/datapipeline/segmentimpl.py @@ -22,7 +22,7 @@ def descriptor(self) -> str: raise NotImplementedError def to_verification_string(self) -> str: - return f'|\n{self.descriptor}\n{self._next_segment.to_verification_string()}' + return f' |\n{self.descriptor}\n{self._next_segment.to_verification_string()}' @abstractmethod def _process(self, data: TIn) -> TOut: @@ -44,7 +44,7 @@ def __init__(self, impl: ProcessingStep[TIn], next_segment: _PipeSegment[TIn, U] @property def descriptor(self) -> str: - return f'+--{self._impl.__name__}' + return f' +--{self._impl.__name__}' def _process(self, data: TIn) -> TIn: self._impl(data) @@ -62,7 +62,7 @@ def __init__(self, impl: RestructuringStep[TIn, TOut], next_segment: _PipeSegmen @property def descriptor(self) -> str: - return f'<->{self._impl.__name__}' + return f' <-> {self._impl.__name__}' def _process(self, data: TIn) -> TOut: return self._impl.restructure(data) diff --git a/tests/pipe_segments.test_pipelines_can_be_validated_as_strings.approved.txt b/tests/pipe_segments.test_pipelines_can_be_validated_as_strings.approved.txt index c5089fb..0f53b62 100644 --- a/tests/pipe_segments.test_pipelines_can_be_validated_as_strings.approved.txt +++ b/tests/pipe_segments.test_pipelines_can_be_validated_as_strings.approved.txt @@ -1,6 +1,6 @@ -| -+--first -| -+--second -| -+--third + | + +--first + | + +--second + | + +--third From 0c4633e2cd5628f0a367b45cc5152bcb78dff1d2 Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Thu, 26 Jan 2023 18:20:18 -0500 Subject: [PATCH 13/46] F - Include restructuring segment in the verification output. --- src/datapipeline/__init__.py | 2 +- src/datapipeline/clientapi.py | 2 +- src/datapipeline/segmentimpl.py | 8 +++---- tests/pipe_segments.py | 21 ++++++++++++++++--- ...s_can_be_validated_as_strings.approved.txt | 2 ++ 5 files changed, 26 insertions(+), 9 deletions(-) diff --git a/src/datapipeline/__init__.py b/src/datapipeline/__init__.py index d6e2ac7..e78d387 100644 --- a/src/datapipeline/__init__.py +++ b/src/datapipeline/__init__.py @@ -1,4 +1,4 @@ from __future__ import annotations from datapipeline.clientapi import NamedStep, ProcessingStep, RestructuringStep -from datapipeline.segmentimpl import DataProcessingSegment +from datapipeline.segmentimpl import DataProcessingSegment, RestructuringSegment diff --git a/src/datapipeline/clientapi.py b/src/datapipeline/clientapi.py index 06aa431..e1b8f3d 100644 --- a/src/datapipeline/clientapi.py +++ b/src/datapipeline/clientapi.py @@ -25,5 +25,5 @@ def __call__(self, data: TIn) -> None: @runtime_checkable class RestructuringStep(NamedStep, Protocol[TIn, TOut]): @abstractmethod - def restructure(self, data: TIn) -> TOut: + def __call__(self, data: TIn) -> TOut: pass diff --git a/src/datapipeline/segmentimpl.py b/src/datapipeline/segmentimpl.py index d241d30..cbb3f59 100644 --- a/src/datapipeline/segmentimpl.py +++ b/src/datapipeline/segmentimpl.py @@ -1,7 +1,7 @@ from __future__ import annotations from abc import ABCMeta, abstractmethod -from typing import Generic, TypeVar +from typing import Generic, TypeVar, Callable from datapipeline.clientapi import ProcessingStep, RestructuringStep @@ -52,9 +52,9 @@ def _process(self, data: TIn) -> TIn: class RestructuringSegment(_PipeSegment[TIn, TOut], Generic[TIn, TOut]): - _impl: RestructuringStep[TIn, TOut] + _impl: Callable[[TIn], TOut] - def __init__(self, impl: RestructuringStep[TIn, TOut], next_segment: _PipeSegment[TIn, U] = None): + def __init__(self, impl: Callable[[TIn], TOut], next_segment: _PipeSegment[TOut, U] = None): if next_segment is None: next_segment = NullTerminator() super(RestructuringSegment, self).__init__(next_segment) @@ -65,7 +65,7 @@ def descriptor(self) -> str: return f' <-> {self._impl.__name__}' def _process(self, data: TIn) -> TOut: - return self._impl.restructure(data) + return self._impl(data) class NullTerminator(_PipeSegment[TIn, TIn], Generic[TIn]): diff --git a/tests/pipe_segments.py b/tests/pipe_segments.py index 76811e4..89d53ab 100644 --- a/tests/pipe_segments.py +++ b/tests/pipe_segments.py @@ -3,7 +3,7 @@ import pytest from assertpy import assert_that from approvaltests import verify -from datapipeline import DataProcessingSegment +from datapipeline import DataProcessingSegment, RestructuringSegment class DataTransferObject: @@ -15,6 +15,10 @@ def __init__(self): self.dumb_object = None +class SecondDTO: + pass + + def format_dumb_object(data: DataTransferObject) -> None: data.dumb_object = f"{{ saw: {data.some_num} }}" @@ -27,7 +31,11 @@ def second(data: DataTransferObject) -> None: pass -def third(data: DataTransferObject) -> None: +def third(data: SecondDTO) -> None: + pass + + +def convert(data: DataTransferObject) -> SecondDTO: pass @@ -51,5 +59,12 @@ def capture(arg: DataTransferObject): def test_pipelines_can_be_validated_as_strings(): - test_subject = DataProcessingSegment(first, DataProcessingSegment(second, DataProcessingSegment(third))) + test_subject = DataProcessingSegment( + first, + DataProcessingSegment( + second, + RestructuringSegment( + convert, + DataProcessingSegment( + third)))) verify(test_subject.to_verification_string()) diff --git a/tests/pipe_segments.test_pipelines_can_be_validated_as_strings.approved.txt b/tests/pipe_segments.test_pipelines_can_be_validated_as_strings.approved.txt index 0f53b62..b5cb1e0 100644 --- a/tests/pipe_segments.test_pipelines_can_be_validated_as_strings.approved.txt +++ b/tests/pipe_segments.test_pipelines_can_be_validated_as_strings.approved.txt @@ -3,4 +3,6 @@ | +--second | + <-> convert + | +--third From 33e1886bb20ab6a3461596b6f3d6ca3bd504ede0 Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Thu, 26 Jan 2023 18:26:28 -0500 Subject: [PATCH 14/46] F - Updated restructuring node verification to include the new DTO. --- src/datapipeline/segmentimpl.py | 5 +++-- ...est_pipelines_can_be_validated_as_strings.approved.txt | 8 ++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/datapipeline/segmentimpl.py b/src/datapipeline/segmentimpl.py index cbb3f59..114a51a 100644 --- a/src/datapipeline/segmentimpl.py +++ b/src/datapipeline/segmentimpl.py @@ -1,5 +1,6 @@ from __future__ import annotations +import inspect from abc import ABCMeta, abstractmethod from typing import Generic, TypeVar, Callable @@ -44,7 +45,7 @@ def __init__(self, impl: ProcessingStep[TIn], next_segment: _PipeSegment[TIn, U] @property def descriptor(self) -> str: - return f' +--{self._impl.__name__}' + return f' +-- {self._impl.__name__}' def _process(self, data: TIn) -> TIn: self._impl(data) @@ -62,7 +63,7 @@ def __init__(self, impl: Callable[[TIn], TOut], next_segment: _PipeSegment[TOut, @property def descriptor(self) -> str: - return f' <-> {self._impl.__name__}' + return f' <-> Changed to <{inspect.get_annotations(self._impl)["return"].__name__}> using {self._impl.__name__}' def _process(self, data: TIn) -> TOut: return self._impl(data) diff --git a/tests/pipe_segments.test_pipelines_can_be_validated_as_strings.approved.txt b/tests/pipe_segments.test_pipelines_can_be_validated_as_strings.approved.txt index b5cb1e0..5043fe5 100644 --- a/tests/pipe_segments.test_pipelines_can_be_validated_as_strings.approved.txt +++ b/tests/pipe_segments.test_pipelines_can_be_validated_as_strings.approved.txt @@ -1,8 +1,8 @@ | - +--first + +-- first | - +--second + +-- second | - <-> convert + <-> Changed to using convert | - +--third + +-- third From ed53de65d851d4c8fab86ec4cbda2bc787544868 Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Thu, 26 Jan 2023 18:31:20 -0500 Subject: [PATCH 15/46] F - Now I have a transform segment. --- src/datapipeline/__init__.py | 2 +- src/datapipeline/segmentimpl.py | 11 ++++++++++- tests/pipe_segments.py | 12 ++++++------ 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/src/datapipeline/__init__.py b/src/datapipeline/__init__.py index e78d387..ca566eb 100644 --- a/src/datapipeline/__init__.py +++ b/src/datapipeline/__init__.py @@ -1,4 +1,4 @@ from __future__ import annotations from datapipeline.clientapi import NamedStep, ProcessingStep, RestructuringStep -from datapipeline.segmentimpl import DataProcessingSegment, RestructuringSegment +from datapipeline.segmentimpl import DataProcessingSegment, RestructuringSegment, TransformSegment diff --git a/src/datapipeline/segmentimpl.py b/src/datapipeline/segmentimpl.py index 114a51a..9038241 100644 --- a/src/datapipeline/segmentimpl.py +++ b/src/datapipeline/segmentimpl.py @@ -45,13 +45,22 @@ def __init__(self, impl: ProcessingStep[TIn], next_segment: _PipeSegment[TIn, U] @property def descriptor(self) -> str: - return f' +-- {self._impl.__name__}' + return f'{self.symbol()} {self._impl.__name__}' + + @abstractmethod + def symbol(self) -> str: + raise NotImplementedError def _process(self, data: TIn) -> TIn: self._impl(data) return data +class TransformSegment(DataProcessingSegment[TIn], Generic[TIn]): + def symbol(self) -> str: + return " +--" + + class RestructuringSegment(_PipeSegment[TIn, TOut], Generic[TIn, TOut]): _impl: Callable[[TIn], TOut] diff --git a/tests/pipe_segments.py b/tests/pipe_segments.py index 89d53ab..7e704cf 100644 --- a/tests/pipe_segments.py +++ b/tests/pipe_segments.py @@ -3,7 +3,7 @@ import pytest from assertpy import assert_that from approvaltests import verify -from datapipeline import DataProcessingSegment, RestructuringSegment +from datapipeline import RestructuringSegment, TransformSegment class DataTransferObject: @@ -40,7 +40,7 @@ def convert(data: DataTransferObject) -> SecondDTO: def test_pipe_segment_names_are_used_when_verifying_them(): - test_subject = DataProcessingSegment(format_dumb_object) + test_subject = TransformSegment(format_dumb_object) assert_that(test_subject.to_verification_string()).contains(format_dumb_object.__name__) @@ -51,7 +51,7 @@ def capture(arg: DataTransferObject): nonlocal result result = arg.dumb_object - test_subject = DataProcessingSegment(format_dumb_object, DataProcessingSegment(capture)) + test_subject = TransformSegment(format_dumb_object, TransformSegment(capture)) data = DataTransferObject() data.some_num = 4 test_subject.process(data) @@ -59,12 +59,12 @@ def capture(arg: DataTransferObject): def test_pipelines_can_be_validated_as_strings(): - test_subject = DataProcessingSegment( + test_subject = TransformSegment( first, - DataProcessingSegment( + TransformSegment( second, RestructuringSegment( convert, - DataProcessingSegment( + TransformSegment( third)))) verify(test_subject.to_verification_string()) From 27a9a117b7dd42cb8bb08edeac7bdce0d05c7072 Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Thu, 26 Jan 2023 20:20:42 -0500 Subject: [PATCH 16/46] F - Pipeline approvals now have different symbols for source, sink, and transform. --- src/datapipeline/__init__.py | 2 +- src/datapipeline/pipeline.py | 15 +++++++++------ src/datapipeline/segmentimpl.py | 12 +++++++++++- tests/pipe_segments.py | 6 +++--- ...lines_can_be_validated_as_strings.approved.txt | 6 +++--- 5 files changed, 27 insertions(+), 14 deletions(-) diff --git a/src/datapipeline/__init__.py b/src/datapipeline/__init__.py index ca566eb..181a227 100644 --- a/src/datapipeline/__init__.py +++ b/src/datapipeline/__init__.py @@ -1,4 +1,4 @@ from __future__ import annotations from datapipeline.clientapi import NamedStep, ProcessingStep, RestructuringStep -from datapipeline.segmentimpl import DataProcessingSegment, RestructuringSegment, TransformSegment +from datapipeline.segmentimpl import RestructuringSegment, SourceSegment, TransformSegment, SinkSegment, DataProcessingSegment diff --git a/src/datapipeline/pipeline.py b/src/datapipeline/pipeline.py index 5f3ed6c..eaadc68 100644 --- a/src/datapipeline/pipeline.py +++ b/src/datapipeline/pipeline.py @@ -2,8 +2,7 @@ from typing import Callable, TypeVar, Awaitable, Generic -from datapipeline import DataProcessingSegment - +from datapipeline import DataProcessingSegment, RestructuringSegment T = TypeVar("T") TRaw = TypeVar("TRaw") @@ -27,20 +26,24 @@ def inner(f: T) -> T: return inner -def source(load: Callable[[T], Awaitable[TRaw]], parse: Callable[[T, TRaw], None]) -> DataProcessingSegment[T]: +SegmentBuilder = Callable[[DataProcessingSegment[T] | None], DataProcessingSegment[T]] +RestructureBuilder = Callable[[DataProcessingSegment[TDest] | None], RestructuringSegment[TSrc, TDest]] + + +def source(load: Callable[[T], Awaitable[TRaw]], parse: Callable[[T, TRaw], None]) -> SegmentBuilder[T]: pass -def transform(process: Callable[[T], None]) -> DataProcessingSegment[T]: +def transform(process: Callable[[T], None]) -> SegmentBuilder[T]: pass -def sink(extract: Callable[[TSrc], TDest], store: Callable[[TDest], Awaitable[None]]) -> DataProcessingSegment[TSrc]: +def sink(extract: Callable[[TSrc], TDest], store: Callable[[TDest], Awaitable[None]]) -> SegmentBuilder[TSrc]: pass class IncompletePipeline(Generic[TSrc, T]): - def then(self, *steps: DataProcessingSegment[T]) -> PotentiallyCompletePipeline[TSrc, T]: + def then(self, *steps: SegmentBuilder[T]) -> PotentiallyCompletePipeline[TSrc, T]: return PotentiallyCompletePipeline() diff --git a/src/datapipeline/segmentimpl.py b/src/datapipeline/segmentimpl.py index 9038241..f5fac6e 100644 --- a/src/datapipeline/segmentimpl.py +++ b/src/datapipeline/segmentimpl.py @@ -56,9 +56,19 @@ def _process(self, data: TIn) -> TIn: return data +class SourceSegment(DataProcessingSegment[TIn], Generic[TIn]): + def symbol(self) -> str: + return ">-| " + + class TransformSegment(DataProcessingSegment[TIn], Generic[TIn]): def symbol(self) -> str: - return " +--" + return " + " + + +class SinkSegment(DataProcessingSegment[TIn], Generic[TIn]): + def symbol(self) -> str: + return " |->" class RestructuringSegment(_PipeSegment[TIn, TOut], Generic[TIn, TOut]): diff --git a/tests/pipe_segments.py b/tests/pipe_segments.py index 7e704cf..a21f5f8 100644 --- a/tests/pipe_segments.py +++ b/tests/pipe_segments.py @@ -3,7 +3,7 @@ import pytest from assertpy import assert_that from approvaltests import verify -from datapipeline import RestructuringSegment, TransformSegment +from datapipeline import RestructuringSegment, SourceSegment, TransformSegment, SinkSegment class DataTransferObject: @@ -59,12 +59,12 @@ def capture(arg: DataTransferObject): def test_pipelines_can_be_validated_as_strings(): - test_subject = TransformSegment( + test_subject = SourceSegment( first, TransformSegment( second, RestructuringSegment( convert, - TransformSegment( + SinkSegment( third)))) verify(test_subject.to_verification_string()) diff --git a/tests/pipe_segments.test_pipelines_can_be_validated_as_strings.approved.txt b/tests/pipe_segments.test_pipelines_can_be_validated_as_strings.approved.txt index 5043fe5..75c21bf 100644 --- a/tests/pipe_segments.test_pipelines_can_be_validated_as_strings.approved.txt +++ b/tests/pipe_segments.test_pipelines_can_be_validated_as_strings.approved.txt @@ -1,8 +1,8 @@ | - +-- first +>-| first | - +-- second + + second | <-> Changed to using convert | - +-- third + |-> third From 9101b9d98d19fe6ab055fd455a82c672644baded Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Thu, 26 Jan 2023 20:21:31 -0500 Subject: [PATCH 17/46] r - line wrapping. --- src/datapipeline/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/datapipeline/__init__.py b/src/datapipeline/__init__.py index 181a227..10369fa 100644 --- a/src/datapipeline/__init__.py +++ b/src/datapipeline/__init__.py @@ -1,4 +1,5 @@ from __future__ import annotations from datapipeline.clientapi import NamedStep, ProcessingStep, RestructuringStep -from datapipeline.segmentimpl import RestructuringSegment, SourceSegment, TransformSegment, SinkSegment, DataProcessingSegment +from datapipeline.segmentimpl import RestructuringSegment, SourceSegment, \ + TransformSegment, SinkSegment, DataProcessingSegment From e7b2bd255bb030e32e2a05b9ddc5cc10eb004f21 Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Thu, 26 Jan 2023 20:41:15 -0500 Subject: [PATCH 18/46] F - Example code is slightly less abstract. --- src/examplecode/product.py | 88 +++++++++++++++++++------------------- 1 file changed, 44 insertions(+), 44 deletions(-) diff --git a/src/examplecode/product.py b/src/examplecode/product.py index 9da0d0c..98b7990 100644 --- a/src/examplecode/product.py +++ b/src/examplecode/product.py @@ -3,93 +3,93 @@ from datapipeline.pipeline import needs, gives, source, transform, sink, pipeline, start_with -class DataCollectingDTO: +class RawCustomerData: pass -class AbstractingDTO: +class CustomerGraph: pass @needs() -@gives("foo from filesystem", "foo list") -async def fetch_one(data: DataCollectingDTO) -> str: +@gives("customers from filesystem", "customer list") +async def load_customer_csv(data: RawCustomerData) -> str: pass -def parse_one(data: DataCollectingDTO, new_data: str) -> None: +def parse_customers_from_csv(data: RawCustomerData, new_data: str) -> None: pass @needs() -@gives("foo from api", "foo list") -async def fetch_two(data: DataCollectingDTO) -> str: +@gives("customers from api", "customer list") +async def load_customer_crm_api(data: RawCustomerData) -> str: pass -def parse_two(data: DataCollectingDTO, new_data: str) -> None: +def parse_customers_from_json(data: RawCustomerData, new_data: str) -> None: pass -@needs("foo list") -@gives("foo.core") -def clean_some(data: DataCollectingDTO) -> None: +@needs("customer list") +@gives("customer emails") +def remove_invalid_emails(data: RawCustomerData) -> None: pass @needs("foo list") -@gives("foo.bar") -def clean_more(data: DataCollectingDTO) -> None: +@gives("non-test customers") +def remove_test_customers(data: RawCustomerData) -> None: pass -@needs("foo.core", "foo.bar") -@gives("related baz") -async def fetch_based_on_data_so_far(data: DataCollectingDTO) -> dict: +@needs("customer emails", "non-test customers") +@gives("customer orders") +async def load_customer_orders(data: RawCustomerData) -> dict: pass -def parse_new_source(data: DataCollectingDTO, new_data: dict) -> None: +def parse_orders_from_json(data: RawCustomerData, new_data: dict) -> None: pass -@needs("related baz") -@gives("baz.something") -def clean_a(data: DataCollectingDTO) -> None: +@needs("customer orders") +@gives("valid orders") +def remove_empty_orders(data: RawCustomerData) -> None: pass -@needs("foo list", "related baz") -@gives("baz.association") -def clean_b(data: DataCollectingDTO) -> None: +@needs("customer list", "customer orders") +@gives("order cohorts") +def group_orders_into_customer_cohorts(data: RawCustomerData) -> None: pass -@needs("baz.something", "baz.association") -@gives("quux") -def clean_c(data: DataCollectingDTO) -> None: +@needs("valid orders", "order cohorts") +@gives("relative order timing") +def compute_cohort_relative_date_per_order(data: RawCustomerData) -> None: pass -def restructuring_function(data: DataCollectingDTO) -> AbstractingDTO: +def create_customer_object_graph(data: RawCustomerData) -> CustomerGraph: pass -@needs("foo list", "baz.association", "baz.something") +@needs("customer list", "order cohorts", "customer emails") @gives("alpha", "omega") -def understand_something(data: AbstractingDTO) -> None: +def understand_something(data: CustomerGraph) -> None: pass -@needs("foo list", "quux", "alpha") +@needs("customer list", "relative order timing", "alpha") @gives("beta") -def understand_another(data: AbstractingDTO) -> None: +def understand_another(data: CustomerGraph) -> None: pass @needs("omega", "beta") @gives("totality") -def keep_understanding(data: AbstractingDTO) -> None: +def keep_understanding(data: CustomerGraph) -> None: pass @@ -98,7 +98,7 @@ class DestStructureOne: @needs("alpha", "beta") -def extract_and_format_one(data: AbstractingDTO) -> DestStructureOne: +def extract_and_format_one(data: CustomerGraph) -> DestStructureOne: pass @@ -111,7 +111,7 @@ class DestStructureTwo: @needs("totality") -def extract_and_format_two(data: AbstractingDTO) -> DestStructureTwo: +def extract_and_format_two(data: CustomerGraph) -> DestStructureTwo: pass @@ -121,17 +121,17 @@ async def put_two(data: DestStructureTwo) -> None: def create_pipeline(): return pipeline( - start_with(DataCollectingDTO) + start_with(RawCustomerData) .then( - source(fetch_one, parse_one), - source(fetch_two, parse_two), - transform(clean_some), - transform(clean_more), - source(fetch_based_on_data_so_far, parse_new_source), - transform(clean_a), - transform(clean_b), - transform(clean_c)) - .restructure_to(AbstractingDTO, restructuring_function) + source(load_customer_csv, parse_customers_from_csv), + source(load_customer_crm_api, parse_customers_from_json), + transform(remove_invalid_emails), + transform(remove_test_customers), + source(load_customer_orders, parse_orders_from_json), + transform(remove_empty_orders), + transform(group_orders_into_customer_cohorts), + transform(compute_cohort_relative_date_per_order)) + .restructure_to(CustomerGraph, create_customer_object_graph) .then( transform(understand_something), transform(understand_another), From fc8bd0008f5e66e774d27dc16b9cc918c396c5eb Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Thu, 26 Jan 2023 20:43:57 -0500 Subject: [PATCH 19/46] F - And more concreteness in the pipeline example. --- src/examplecode/product.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/examplecode/product.py b/src/examplecode/product.py index 98b7990..ea273d0 100644 --- a/src/examplecode/product.py +++ b/src/examplecode/product.py @@ -98,11 +98,11 @@ class DestStructureOne: @needs("alpha", "beta") -def extract_and_format_one(data: CustomerGraph) -> DestStructureOne: +def extract_cohort_analysis(data: CustomerGraph) -> DestStructureOne: pass -async def put_one(data: DestStructureOne) -> None: +async def email_analysis_to_sales_team(data: DestStructureOne) -> None: pass @@ -111,11 +111,11 @@ class DestStructureTwo: @needs("totality") -def extract_and_format_two(data: CustomerGraph) -> DestStructureTwo: +def extract_revenue_projections(data: CustomerGraph) -> DestStructureTwo: pass -async def put_two(data: DestStructureTwo) -> None: +async def put_projections_into_quickbooks(data: DestStructureTwo) -> None: pass @@ -136,7 +136,7 @@ def create_pipeline(): transform(understand_something), transform(understand_another), transform(keep_understanding), - sink(extract_and_format_one, put_one), - sink(extract_and_format_two, put_two), + sink(extract_cohort_analysis, email_analysis_to_sales_team), + sink(extract_revenue_projections, put_projections_into_quickbooks), ) ) From a4357060249f87d39f0d90147901b3f630d8f7f2 Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Thu, 26 Jan 2023 21:27:48 -0500 Subject: [PATCH 20/46] F - Starting to implement the ability to test pipeline sequencing. --- src/datapipeline/__init__.py | 1 + src/datapipeline/pipeline.py | 17 ++++++++++-- tests/pipeline.py | 27 ++++++++++++++++--- ...st_valid_pipeline_passes_test.approved.txt | 1 + 4 files changed, 40 insertions(+), 6 deletions(-) create mode 100644 tests/pipeline.test_valid_pipeline_passes_test.approved.txt diff --git a/src/datapipeline/__init__.py b/src/datapipeline/__init__.py index 10369fa..11cb6b1 100644 --- a/src/datapipeline/__init__.py +++ b/src/datapipeline/__init__.py @@ -3,3 +3,4 @@ from datapipeline.clientapi import NamedStep, ProcessingStep, RestructuringStep from datapipeline.segmentimpl import RestructuringSegment, SourceSegment, \ TransformSegment, SinkSegment, DataProcessingSegment +from datapipeline.pipeline import Pipeline, is_valid_pipeline, pipeline, start_with, source, transform, sink diff --git a/src/datapipeline/pipeline.py b/src/datapipeline/pipeline.py index eaadc68..f2aa17e 100644 --- a/src/datapipeline/pipeline.py +++ b/src/datapipeline/pipeline.py @@ -56,12 +56,25 @@ def restructure_to(self, class Pipeline: - pass + def __init__(self): + self.segments = [] + + def to_verification_string(self): + return "" def pipeline(builder: PotentiallyCompletePipeline[TSrc, TDest]) -> Pipeline: - pass + return Pipeline() def start_with(data_constructor: Callable[[], T]) -> IncompletePipeline[T]: return IncompletePipeline() + + +def is_valid_pipeline(self): + if not isinstance(self.val, Pipeline): + raise TypeError('val must be a pipeline.') + for segment in self.val.segments: + if segment != 5: + self.error(f'{self.val} is NOT 5!') + return self diff --git a/tests/pipeline.py b/tests/pipeline.py index e544288..fdcffdf 100644 --- a/tests/pipeline.py +++ b/tests/pipeline.py @@ -1,8 +1,27 @@ import pytest -from assertpy import assert_that +from approvaltests import verify +from assertpy import assert_that, add_extension, fail -from examplecode.product import create_pipeline +from datapipeline import pipeline, start_with, source, transform, is_valid_pipeline +from examplecode import product +add_extension(is_valid_pipeline) -def test_create_pipeline_fluently(): - pipeline = create_pipeline() + +def test_valid_pipeline_passes_test(): + pipeline = product.create_pipeline() + assert_that(pipeline).is_valid_pipeline() + verify(pipeline.to_verification_string()) + +def test_pipeline_with_missing_requirement_is_invalid(): + pipe = pipeline( + start_with(product.RawCustomerData) + .then( + source(product.load_customer_csv, product.parse_customers_from_csv), + transform(product.remove_empty_orders)) + ) + try: + assert_that(pipe).is_valid_pipeline() + # fail("Pipeline should not have been valid") + except AssertionError as e: + assert_that(str(e)).is_equal_to("") diff --git a/tests/pipeline.test_valid_pipeline_passes_test.approved.txt b/tests/pipeline.test_valid_pipeline_passes_test.approved.txt new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/tests/pipeline.test_valid_pipeline_passes_test.approved.txt @@ -0,0 +1 @@ + From e06aead8b7f367f54536fba5f18f4514c5fd1c8f Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Thu, 26 Jan 2023 21:52:08 -0500 Subject: [PATCH 21/46] F - Now have a pipe segment for the head of a pipe - the one that creates the initial empty DTO & doesn't expect one as input. --- src/datapipeline/__init__.py | 2 +- src/datapipeline/segmentimpl.py | 20 +++++++++++++++++++ tests/pipe_segments.py | 20 ++++++++++--------- ...s_can_be_validated_as_strings.approved.txt | 1 + 4 files changed, 33 insertions(+), 10 deletions(-) diff --git a/src/datapipeline/__init__.py b/src/datapipeline/__init__.py index 11cb6b1..e36597e 100644 --- a/src/datapipeline/__init__.py +++ b/src/datapipeline/__init__.py @@ -2,5 +2,5 @@ from datapipeline.clientapi import NamedStep, ProcessingStep, RestructuringStep from datapipeline.segmentimpl import RestructuringSegment, SourceSegment, \ - TransformSegment, SinkSegment, DataProcessingSegment + TransformSegment, SinkSegment, DataProcessingSegment, PipeHeadSegment from datapipeline.pipeline import Pipeline, is_valid_pipeline, pipeline, start_with, source, transform, sink diff --git a/src/datapipeline/segmentimpl.py b/src/datapipeline/segmentimpl.py index f5fac6e..08771c5 100644 --- a/src/datapipeline/segmentimpl.py +++ b/src/datapipeline/segmentimpl.py @@ -34,6 +34,26 @@ def process(self, data: TIn) -> None: self._next_segment.process(result) +class PipeHeadSegment(_PipeSegment[TIn, TIn], Generic[TIn]): + _impl: Callable[[], TIn] + + def __init__(self, impl: Callable[[], TIn], next_segment: _PipeSegment[TIn, U] = None): + if next_segment is None: + next_segment = NullTerminator() + super(PipeHeadSegment, self).__init__(next_segment) + self._impl = impl + + def to_verification_string(self) -> str: + return f' Start with empty <{self._impl.__name__}>\n{self._next_segment.to_verification_string()}' + + def _process(self, data: TIn) -> TIn: + return self._impl() + + @property + def descriptor(self) -> str: + raise NotImplementedError + + class DataProcessingSegment(_PipeSegment[TIn, TIn], Generic[TIn]): _impl: ProcessingStep[TIn] diff --git a/tests/pipe_segments.py b/tests/pipe_segments.py index a21f5f8..a11bcc8 100644 --- a/tests/pipe_segments.py +++ b/tests/pipe_segments.py @@ -3,7 +3,7 @@ import pytest from assertpy import assert_that from approvaltests import verify -from datapipeline import RestructuringSegment, SourceSegment, TransformSegment, SinkSegment +from datapipeline import RestructuringSegment, SourceSegment, TransformSegment, SinkSegment, PipeHeadSegment class DataTransferObject: @@ -59,12 +59,14 @@ def capture(arg: DataTransferObject): def test_pipelines_can_be_validated_as_strings(): - test_subject = SourceSegment( - first, - TransformSegment( - second, - RestructuringSegment( - convert, - SinkSegment( - third)))) + test_subject = PipeHeadSegment( + DataTransferObject, + SourceSegment( + first, + TransformSegment( + second, + RestructuringSegment( + convert, + SinkSegment( + third))))) verify(test_subject.to_verification_string()) diff --git a/tests/pipe_segments.test_pipelines_can_be_validated_as_strings.approved.txt b/tests/pipe_segments.test_pipelines_can_be_validated_as_strings.approved.txt index 75c21bf..d1b1636 100644 --- a/tests/pipe_segments.test_pipelines_can_be_validated_as_strings.approved.txt +++ b/tests/pipe_segments.test_pipelines_can_be_validated_as_strings.approved.txt @@ -1,3 +1,4 @@ + Start with empty | >-| first | From 38d667b0d1632d0a9b48c4421baad3d3a3b62351 Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Fri, 27 Jan 2023 10:51:24 -0500 Subject: [PATCH 22/46] t - Remove redundant test. --- tests/pipe_segments.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/tests/pipe_segments.py b/tests/pipe_segments.py index a11bcc8..cd86efa 100644 --- a/tests/pipe_segments.py +++ b/tests/pipe_segments.py @@ -39,11 +39,6 @@ def convert(data: DataTransferObject) -> SecondDTO: pass -def test_pipe_segment_names_are_used_when_verifying_them(): - test_subject = TransformSegment(format_dumb_object) - assert_that(test_subject.to_verification_string()).contains(format_dumb_object.__name__) - - def test_pipe_segment_calls_its_transform_impl_to_process_data(): result: Any = None From 157d8a9d8d3100bec73b89985ec96b1ad406fd7b Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Fri, 27 Jan 2023 10:56:45 -0500 Subject: [PATCH 23/46] T - Verify restructuring segment works correctly. --- tests/pipe_segments.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/tests/pipe_segments.py b/tests/pipe_segments.py index cd86efa..42baf93 100644 --- a/tests/pipe_segments.py +++ b/tests/pipe_segments.py @@ -10,13 +10,16 @@ class DataTransferObject: some_num: Any dumb_object: Any - def __init__(self): - self.some_num = None + def __init__(self, some_num=None): + self.some_num = some_num self.dumb_object = None class SecondDTO: - pass + message: str + + def __init__(self, message: str = ""): + self.message = message def format_dumb_object(data: DataTransferObject) -> None: @@ -36,7 +39,15 @@ def third(data: SecondDTO) -> None: def convert(data: DataTransferObject) -> SecondDTO: - pass + return SecondDTO(f"made by convert function from {data.some_num}") + + +def test_restructuring_segment_uses_its_impl(): + def has_right_message(arg: SecondDTO): + assert_that(arg.message).is_equal_to("made by convert function from 2") + + test_subject = RestructuringSegment(convert, TransformSegment(has_right_message)) + test_subject.process(DataTransferObject(2)) def test_pipe_segment_calls_its_transform_impl_to_process_data(): @@ -47,8 +58,7 @@ def capture(arg: DataTransferObject): result = arg.dumb_object test_subject = TransformSegment(format_dumb_object, TransformSegment(capture)) - data = DataTransferObject() - data.some_num = 4 + data = DataTransferObject(4) test_subject.process(data) assert_that(result).is_equal_to("{ saw: 4 }") From 115fb6d235fda79d1e3c420b8b9e14d9b8da0a13 Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Sun, 29 Jan 2023 19:28:01 -0500 Subject: [PATCH 24/46] F - Source pipe segments have a fetcher and a parser. Both are still sync...for now. --- src/datapipeline/clientapi.py | 14 +++++++++++ src/datapipeline/pipeline.py | 19 +++++++++++---- src/datapipeline/segmentimpl.py | 16 +++++++++---- tests/pipe_segments.py | 24 ++++++++++++++++++- ...s_can_be_validated_as_strings.approved.txt | 2 +- tests/pipeline.py | 11 +++++---- 6 files changed, 70 insertions(+), 16 deletions(-) diff --git a/src/datapipeline/clientapi.py b/src/datapipeline/clientapi.py index e1b8f3d..c5d6497 100644 --- a/src/datapipeline/clientapi.py +++ b/src/datapipeline/clientapi.py @@ -27,3 +27,17 @@ class RestructuringStep(NamedStep, Protocol[TIn, TOut]): @abstractmethod def __call__(self, data: TIn) -> TOut: pass + + +@runtime_checkable +class Loader(NamedStep, Protocol[TIn, TOut]): + @abstractmethod + def __call__(self, data: TIn) -> TOut: + pass + + +@runtime_checkable +class ParseImpl(NamedStep, Protocol[TIn, TOut]): + @abstractmethod + def __call__(self, data: TIn, new_data: TOut) -> None: + pass diff --git a/src/datapipeline/pipeline.py b/src/datapipeline/pipeline.py index f2aa17e..c2e360e 100644 --- a/src/datapipeline/pipeline.py +++ b/src/datapipeline/pipeline.py @@ -2,11 +2,13 @@ from typing import Callable, TypeVar, Awaitable, Generic -from datapipeline import DataProcessingSegment, RestructuringSegment +from datapipeline import DataProcessingSegment, RestructuringSegment, PipeHeadSegment +from datapipeline.segmentimpl import _PipeSegment, SourceSegment, TransformSegment T = TypeVar("T") TRaw = TypeVar("TRaw") TSrc = TypeVar("TSrc") +TNext = TypeVar("TNext") TDest = TypeVar("TDest") @@ -26,16 +28,20 @@ def inner(f: T) -> T: return inner -SegmentBuilder = Callable[[DataProcessingSegment[T] | None], DataProcessingSegment[T]] -RestructureBuilder = Callable[[DataProcessingSegment[TDest] | None], RestructuringSegment[TSrc, TDest]] +SegmentBuilder = Callable[[_PipeSegment[TNext, T] | None], DataProcessingSegment[TNext]] +RestructureBuilder = Callable[[_PipeSegment[TNext, T] | None], RestructuringSegment[TSrc, TNext]] +PipeHeadBuilder = Callable[[_PipeSegment[TNext, T] | None], PipeHeadSegment[TNext]] def source(load: Callable[[T], Awaitable[TRaw]], parse: Callable[[T, TRaw], None]) -> SegmentBuilder[T]: + # return SourceSegment(load, parse) pass def transform(process: Callable[[T], None]) -> SegmentBuilder[T]: - pass + def build(next_segment: _PipeSegment[TNext, T] | None) -> TransformSegment[TNext]: + return TransformSegment(process, next_segment) + return build def sink(extract: Callable[[TSrc], TDest], store: Callable[[TDest], Awaitable[None]]) -> SegmentBuilder[TSrc]: @@ -44,10 +50,13 @@ def sink(extract: Callable[[TSrc], TDest], store: Callable[[TDest], Awaitable[No class IncompletePipeline(Generic[TSrc, T]): def then(self, *steps: SegmentBuilder[T]) -> PotentiallyCompletePipeline[TSrc, T]: - return PotentiallyCompletePipeline() + return PotentiallyCompletePipeline(steps) class PotentiallyCompletePipeline(Generic[TSrc, T]): + def __init__(self, prior_steps: SegmentBuilder[T]): + self._prior_steps = prior_steps + def restructure_to(self, data_constructor: Callable[[], TDest], restructure: Callable[[T], TDest]) \ diff --git a/src/datapipeline/segmentimpl.py b/src/datapipeline/segmentimpl.py index 08771c5..3ee8668 100644 --- a/src/datapipeline/segmentimpl.py +++ b/src/datapipeline/segmentimpl.py @@ -4,11 +4,12 @@ from abc import ABCMeta, abstractmethod from typing import Generic, TypeVar, Callable -from datapipeline.clientapi import ProcessingStep, RestructuringStep +from datapipeline import clientapi U = TypeVar('U') TIn = TypeVar('TIn') TOut = TypeVar('TOut') +TRaw = TypeVar('TRaw') class _PipeSegment(Generic[TIn, TOut], metaclass=ABCMeta): @@ -55,9 +56,9 @@ def descriptor(self) -> str: class DataProcessingSegment(_PipeSegment[TIn, TIn], Generic[TIn]): - _impl: ProcessingStep[TIn] + _impl: clientapi.ProcessingStep[TIn] - def __init__(self, impl: ProcessingStep[TIn], next_segment: _PipeSegment[TIn, U] = None): + def __init__(self, impl: clientapi.ProcessingStep[TIn], next_segment: _PipeSegment[TIn, U] = None): if next_segment is None: next_segment = NullTerminator() super(DataProcessingSegment, self).__init__(next_segment) @@ -76,7 +77,14 @@ def _process(self, data: TIn) -> TIn: return data -class SourceSegment(DataProcessingSegment[TIn], Generic[TIn]): +class SourceSegment(DataProcessingSegment[TIn], Generic[TIn, TRaw]): + def __init__(self, fetch: clientapi.Loader[TIn, TRaw], parse: clientapi.ParseImpl[TIn, TRaw], + next_segment: _PipeSegment[TIn, U] = None): + def impl(data: TIn) -> None: + parse(data, fetch(data)) + impl.__name__ = f'{fetch.__name__} and {parse.__name__}' + super(SourceSegment, self).__init__(impl, next_segment) + def symbol(self) -> str: return ">-| " diff --git a/tests/pipe_segments.py b/tests/pipe_segments.py index 42baf93..97827ce 100644 --- a/tests/pipe_segments.py +++ b/tests/pipe_segments.py @@ -26,7 +26,11 @@ def format_dumb_object(data: DataTransferObject) -> None: data.dumb_object = f"{{ saw: {data.some_num} }}" -def first(data: DataTransferObject) -> None: +def first(data: DataTransferObject) -> dict: + pass + + +def parse_first(data: DataTransferObject, new_data: dict) -> None: pass @@ -63,11 +67,29 @@ def capture(arg: DataTransferObject): assert_that(result).is_equal_to("{ saw: 4 }") +def test_source_segment_chains_its_two_implementations(): + result: Any = None + + def capture(arg: DataTransferObject): + nonlocal result + result = arg.some_num + + def fetch_data(arg: DataTransferObject) -> int: + return 8 + + def parse_data(arg: DataTransferObject, new_data: int) -> None: + arg.some_num = new_data + test_subject = SourceSegment(fetch_data, parse_data, TransformSegment(capture)) + test_subject.process(DataTransferObject(4)) + assert_that(result).is_equal_to(8) + + def test_pipelines_can_be_validated_as_strings(): test_subject = PipeHeadSegment( DataTransferObject, SourceSegment( first, + parse_first, TransformSegment( second, RestructuringSegment( diff --git a/tests/pipe_segments.test_pipelines_can_be_validated_as_strings.approved.txt b/tests/pipe_segments.test_pipelines_can_be_validated_as_strings.approved.txt index d1b1636..deacefc 100644 --- a/tests/pipe_segments.test_pipelines_can_be_validated_as_strings.approved.txt +++ b/tests/pipe_segments.test_pipelines_can_be_validated_as_strings.approved.txt @@ -1,6 +1,6 @@ Start with empty | ->-| first +>-| first and parse_first | + second | diff --git a/tests/pipeline.py b/tests/pipeline.py index fdcffdf..21983ae 100644 --- a/tests/pipeline.py +++ b/tests/pipeline.py @@ -9,19 +9,20 @@ def test_valid_pipeline_passes_test(): - pipeline = product.create_pipeline() - assert_that(pipeline).is_valid_pipeline() - verify(pipeline.to_verification_string()) + result = product.create_pipeline() + assert_that(result).is_valid_pipeline() + verify(result.to_verification_string()) + def test_pipeline_with_missing_requirement_is_invalid(): - pipe = pipeline( + result = pipeline( start_with(product.RawCustomerData) .then( source(product.load_customer_csv, product.parse_customers_from_csv), transform(product.remove_empty_orders)) ) try: - assert_that(pipe).is_valid_pipeline() + assert_that(result).is_valid_pipeline() # fail("Pipeline should not have been valid") except AssertionError as e: assert_that(str(e)).is_equal_to("") From e0d6188e89afd8c6adac6dc050c35ddf041a4a1c Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Sun, 29 Jan 2023 19:42:55 -0500 Subject: [PATCH 25/46] F - Sinks now take and extractor and a storer, not just one impl function. --- src/datapipeline/clientapi.py | 14 ++++++++++++++ src/datapipeline/segmentimpl.py | 15 +++++++++++---- tests/pipe_segments.py | 7 ++++++- ...lines_can_be_validated_as_strings.approved.txt | 4 ++-- 4 files changed, 33 insertions(+), 7 deletions(-) diff --git a/src/datapipeline/clientapi.py b/src/datapipeline/clientapi.py index c5d6497..57a488b 100644 --- a/src/datapipeline/clientapi.py +++ b/src/datapipeline/clientapi.py @@ -41,3 +41,17 @@ class ParseImpl(NamedStep, Protocol[TIn, TOut]): @abstractmethod def __call__(self, data: TIn, new_data: TOut) -> None: pass + + +@runtime_checkable +class Extractor(NamedStep, Protocol[TIn, TOut]): + @abstractmethod + def __call__(self, data: TIn) -> TOut: + pass + + +@runtime_checkable +class StoreImpl(NamedStep, Protocol[TIn]): + @abstractmethod + def __call__(self, data: TIn) -> None: + pass diff --git a/src/datapipeline/segmentimpl.py b/src/datapipeline/segmentimpl.py index 3ee8668..788843b 100644 --- a/src/datapipeline/segmentimpl.py +++ b/src/datapipeline/segmentimpl.py @@ -78,11 +78,11 @@ def _process(self, data: TIn) -> TIn: class SourceSegment(DataProcessingSegment[TIn], Generic[TIn, TRaw]): - def __init__(self, fetch: clientapi.Loader[TIn, TRaw], parse: clientapi.ParseImpl[TIn, TRaw], + def __init__(self, load: clientapi.Loader[TIn, TRaw], parse: clientapi.ParseImpl[TIn, TRaw], next_segment: _PipeSegment[TIn, U] = None): def impl(data: TIn) -> None: - parse(data, fetch(data)) - impl.__name__ = f'{fetch.__name__} and {parse.__name__}' + parse(data, load(data)) + impl.__name__ = f'load:{load.__name__}, parse: {parse.__name__}' super(SourceSegment, self).__init__(impl, next_segment) def symbol(self) -> str: @@ -94,7 +94,14 @@ def symbol(self) -> str: return " + " -class SinkSegment(DataProcessingSegment[TIn], Generic[TIn]): +class SinkSegment(DataProcessingSegment[TIn], Generic[TIn, TRaw]): + def __init__(self, extract: clientapi.Extractor[TIn, TRaw], store: clientapi.StoreImpl[TRaw], + next_segment: _PipeSegment[TIn, U] = None): + def impl(data: TIn) -> None: + store(data, extract(data)) + impl.__name__ = f'extract: {extract.__name__}, store: {store.__name__}' + super(SinkSegment, self).__init__(impl, next_segment) + def symbol(self) -> str: return " |->" diff --git a/tests/pipe_segments.py b/tests/pipe_segments.py index 97827ce..f8593ba 100644 --- a/tests/pipe_segments.py +++ b/tests/pipe_segments.py @@ -38,7 +38,11 @@ def second(data: DataTransferObject) -> None: pass -def third(data: SecondDTO) -> None: +def extract_for_third(data: SecondDTO) -> dict: + pass + + +def third(data: dict) -> None: pass @@ -95,5 +99,6 @@ def test_pipelines_can_be_validated_as_strings(): RestructuringSegment( convert, SinkSegment( + extract_for_third, third))))) verify(test_subject.to_verification_string()) diff --git a/tests/pipe_segments.test_pipelines_can_be_validated_as_strings.approved.txt b/tests/pipe_segments.test_pipelines_can_be_validated_as_strings.approved.txt index deacefc..7c91ebe 100644 --- a/tests/pipe_segments.test_pipelines_can_be_validated_as_strings.approved.txt +++ b/tests/pipe_segments.test_pipelines_can_be_validated_as_strings.approved.txt @@ -1,9 +1,9 @@ Start with empty | ->-| first and parse_first +>-| load:first, parse: parse_first | + second | <-> Changed to using convert | - |-> third + |-> extract: extract_for_third, store: third From 82628ee44dc7844978b4eb3f223f5e9dd7ab1dd5 Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Sun, 29 Jan 2023 19:56:23 -0500 Subject: [PATCH 26/46] r - extract a capture object. --- tests/pipe_segments.py | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/tests/pipe_segments.py b/tests/pipe_segments.py index f8593ba..0da4ce7 100644 --- a/tests/pipe_segments.py +++ b/tests/pipe_segments.py @@ -1,4 +1,4 @@ -from typing import Any +from typing import Any, Generic, TypeVar import pytest from assertpy import assert_that @@ -6,6 +6,9 @@ from datapipeline import RestructuringSegment, SourceSegment, TransformSegment, SinkSegment, PipeHeadSegment +T = TypeVar('T') + + class DataTransferObject: some_num: Any dumb_object: Any @@ -50,6 +53,16 @@ def convert(data: DataTransferObject) -> SecondDTO: return SecondDTO(f"made by convert function from {data.some_num}") +class Capture(Generic[T]): + result: T + + def __init__(self): + self.result = None + + def __call__(self, arg: T) -> None: + self.result = arg + + def test_restructuring_segment_uses_its_impl(): def has_right_message(arg: SecondDTO): assert_that(arg.message).is_equal_to("made by convert function from 2") @@ -59,33 +72,24 @@ def has_right_message(arg: SecondDTO): def test_pipe_segment_calls_its_transform_impl_to_process_data(): - result: Any = None - - def capture(arg: DataTransferObject): - nonlocal result - result = arg.dumb_object - + capture = Capture[DataTransferObject]() test_subject = TransformSegment(format_dumb_object, TransformSegment(capture)) data = DataTransferObject(4) test_subject.process(data) - assert_that(result).is_equal_to("{ saw: 4 }") + assert_that(capture.result.dumb_object).is_equal_to("{ saw: 4 }") def test_source_segment_chains_its_two_implementations(): - result: Any = None - - def capture(arg: DataTransferObject): - nonlocal result - result = arg.some_num - def fetch_data(arg: DataTransferObject) -> int: return 8 def parse_data(arg: DataTransferObject, new_data: int) -> None: arg.some_num = new_data + + capture = Capture[DataTransferObject]() test_subject = SourceSegment(fetch_data, parse_data, TransformSegment(capture)) test_subject.process(DataTransferObject(4)) - assert_that(result).is_equal_to(8) + assert_that(capture.result.some_num).is_equal_to(8) def test_pipelines_can_be_validated_as_strings(): From 2ceaab9899f4938d05a01ab43000e251e131c2cb Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Sun, 29 Jan 2023 20:39:48 -0500 Subject: [PATCH 27/46] B - Sink now composes its functions correctly. --- src/datapipeline/segmentimpl.py | 2 +- tests/pipe_segments.py | 11 +++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/src/datapipeline/segmentimpl.py b/src/datapipeline/segmentimpl.py index 788843b..4e0c46b 100644 --- a/src/datapipeline/segmentimpl.py +++ b/src/datapipeline/segmentimpl.py @@ -98,7 +98,7 @@ class SinkSegment(DataProcessingSegment[TIn], Generic[TIn, TRaw]): def __init__(self, extract: clientapi.Extractor[TIn, TRaw], store: clientapi.StoreImpl[TRaw], next_segment: _PipeSegment[TIn, U] = None): def impl(data: TIn) -> None: - store(data, extract(data)) + store(extract(data)) impl.__name__ = f'extract: {extract.__name__}, store: {store.__name__}' super(SinkSegment, self).__init__(impl, next_segment) diff --git a/tests/pipe_segments.py b/tests/pipe_segments.py index 0da4ce7..5f27414 100644 --- a/tests/pipe_segments.py +++ b/tests/pipe_segments.py @@ -58,6 +58,7 @@ class Capture(Generic[T]): def __init__(self): self.result = None + self.__name__ = 'Capture' def __call__(self, arg: T) -> None: self.result = arg @@ -92,6 +93,16 @@ def parse_data(arg: DataTransferObject, new_data: int) -> None: assert_that(capture.result.some_num).is_equal_to(8) +def test_sink_segment_chains_its_two_implementations(): + def extract(arg: DataTransferObject) -> int: + return 8 + + capture = Capture[DataTransferObject]() + test_subject = SinkSegment(extract, capture) + test_subject.process(DataTransferObject(4)) + assert_that(capture.result).is_equal_to(8) + + def test_pipelines_can_be_validated_as_strings(): test_subject = PipeHeadSegment( DataTransferObject, From 6ad1847da59430d31bbc07f5da28d3e9ab9d9895 Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Sun, 29 Jan 2023 20:59:39 -0500 Subject: [PATCH 28/46] F - All pipeline builder functions return build functions to do the work. --- src/datapipeline/pipeline.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/datapipeline/pipeline.py b/src/datapipeline/pipeline.py index c2e360e..145ac1c 100644 --- a/src/datapipeline/pipeline.py +++ b/src/datapipeline/pipeline.py @@ -3,7 +3,7 @@ from typing import Callable, TypeVar, Awaitable, Generic from datapipeline import DataProcessingSegment, RestructuringSegment, PipeHeadSegment -from datapipeline.segmentimpl import _PipeSegment, SourceSegment, TransformSegment +from datapipeline.segmentimpl import _PipeSegment, SourceSegment, TransformSegment, SinkSegment T = TypeVar("T") TRaw = TypeVar("TRaw") @@ -34,18 +34,21 @@ def inner(f: T) -> T: def source(load: Callable[[T], Awaitable[TRaw]], parse: Callable[[T, TRaw], None]) -> SegmentBuilder[T]: - # return SourceSegment(load, parse) - pass + def build(next_segment: _PipeSegment[T, TNext] | None) -> SourceSegment[T, TRaw]: + return SourceSegment(load, parse, next_segment) + return build def transform(process: Callable[[T], None]) -> SegmentBuilder[T]: - def build(next_segment: _PipeSegment[TNext, T] | None) -> TransformSegment[TNext]: + def build(next_segment: _PipeSegment[T, TNext] | None) -> TransformSegment[T]: return TransformSegment(process, next_segment) return build def sink(extract: Callable[[TSrc], TDest], store: Callable[[TDest], Awaitable[None]]) -> SegmentBuilder[TSrc]: - pass + def build(next_segment: _PipeSegment[T, TNext] | None) -> SinkSegment[T, TRaw]: + return SinkSegment(extract, store, next_segment) + return build class IncompletePipeline(Generic[TSrc, T]): From db6e30df098efae1894287f6b4a519fc5b03cff8 Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Mon, 30 Jan 2023 14:31:56 -0500 Subject: [PATCH 29/46] F - Can create a pipeline! It has all the segments and can be verified using an approvals test. --- src/datapipeline/pipeline.py | 58 ++++++++++++++----- src/datapipeline/segmentimpl.py | 19 +++++- ...st_valid_pipeline_passes_test.approved.txt | 30 +++++++++- 3 files changed, 89 insertions(+), 18 deletions(-) diff --git a/src/datapipeline/pipeline.py b/src/datapipeline/pipeline.py index 145ac1c..bd24821 100644 --- a/src/datapipeline/pipeline.py +++ b/src/datapipeline/pipeline.py @@ -1,8 +1,8 @@ from __future__ import annotations -from typing import Callable, TypeVar, Awaitable, Generic +from typing import Callable, TypeVar, Awaitable, Generic, List, Iterable -from datapipeline import DataProcessingSegment, RestructuringSegment, PipeHeadSegment +from datapipeline import DataProcessingSegment, RestructuringSegment, PipeHeadSegment, clientapi from datapipeline.segmentimpl import _PipeSegment, SourceSegment, TransformSegment, SinkSegment T = TypeVar("T") @@ -31,62 +31,88 @@ def inner(f: T) -> T: SegmentBuilder = Callable[[_PipeSegment[TNext, T] | None], DataProcessingSegment[TNext]] RestructureBuilder = Callable[[_PipeSegment[TNext, T] | None], RestructuringSegment[TSrc, TNext]] PipeHeadBuilder = Callable[[_PipeSegment[TNext, T] | None], PipeHeadSegment[TNext]] +AnyBuilder = Callable[[_PipeSegment | None], _PipeSegment] + + +def start_with(data_constructor: Callable[[], T]) -> IncompletePipeline[T, T]: + def build(next_segment: _PipeSegment[T, TNext] | None) -> PipeHeadSegment[T]: + return PipeHeadSegment(data_constructor, next_segment) + + return IncompletePipeline[T, T]([build]) def source(load: Callable[[T], Awaitable[TRaw]], parse: Callable[[T, TRaw], None]) -> SegmentBuilder[T]: def build(next_segment: _PipeSegment[T, TNext] | None) -> SourceSegment[T, TRaw]: return SourceSegment(load, parse, next_segment) + return build def transform(process: Callable[[T], None]) -> SegmentBuilder[T]: def build(next_segment: _PipeSegment[T, TNext] | None) -> TransformSegment[T]: return TransformSegment(process, next_segment) + return build def sink(extract: Callable[[TSrc], TDest], store: Callable[[TDest], Awaitable[None]]) -> SegmentBuilder[TSrc]: def build(next_segment: _PipeSegment[T, TNext] | None) -> SinkSegment[T, TRaw]: return SinkSegment(extract, store, next_segment) + return build class IncompletePipeline(Generic[TSrc, T]): + def __init__(self, prior_steps: List[AnyBuilder]): + self._prior_steps = prior_steps + def then(self, *steps: SegmentBuilder[T]) -> PotentiallyCompletePipeline[TSrc, T]: - return PotentiallyCompletePipeline(steps) + return PotentiallyCompletePipeline(self._prior_steps + list(steps)) class PotentiallyCompletePipeline(Generic[TSrc, T]): - def __init__(self, prior_steps: SegmentBuilder[T]): + def __init__(self, prior_steps: List[AnyBuilder]): self._prior_steps = prior_steps def restructure_to(self, data_constructor: Callable[[], TDest], restructure: Callable[[T], TDest]) \ -> IncompletePipeline[TSrc, TDest]: - return IncompletePipeline() + def build(next_segment: _PipeSegment[T, TNext] | None) -> RestructuringSegment[T, TDest]: + return RestructuringSegment(restructure, next_segment) + + return IncompletePipeline(self._prior_steps + [build]) + + def build(self) -> Pipeline: + next_segment = None + for builder in reversed(self._prior_steps): + next_segment = builder(next_segment) + return Pipeline(next_segment) class Pipeline: - def __init__(self): - self.segments = [] + _first_segment: _PipeSegment - def to_verification_string(self): - return "" + def __init__(self, first_segment): + self._first_segment = first_segment + def to_verification_string(self): + return self._first_segment.to_verification_string() -def pipeline(builder: PotentiallyCompletePipeline[TSrc, TDest]) -> Pipeline: - return Pipeline() + @property + def segments(self) -> Iterable[_PipeSegment]: + return iter(self._first_segment) -def start_with(data_constructor: Callable[[], T]) -> IncompletePipeline[T]: - return IncompletePipeline() +def pipeline(builder: PotentiallyCompletePipeline[TSrc, TDest]) -> Pipeline: + return builder.build() def is_valid_pipeline(self): if not isinstance(self.val, Pipeline): raise TypeError('val must be a pipeline.') - for segment in self.val.segments: - if segment != 5: - self.error(f'{self.val} is NOT 5!') + # for segment in self.val.segments: + # pass + # if segment != 5: + # self.error(f'{segment} is NOT 5!') return self diff --git a/src/datapipeline/segmentimpl.py b/src/datapipeline/segmentimpl.py index 4e0c46b..9505e09 100644 --- a/src/datapipeline/segmentimpl.py +++ b/src/datapipeline/segmentimpl.py @@ -18,6 +18,14 @@ class _PipeSegment(Generic[TIn, TOut], metaclass=ABCMeta): def __init__(self, next_segment): self._next_segment = next_segment + def __iter__(self): + return self + + def __next__(self): + if isinstance(self._next_segment, NullTerminator): + raise StopIteration() + return self._next_segment + @property @abstractmethod def descriptor(self) -> str: @@ -59,6 +67,7 @@ class DataProcessingSegment(_PipeSegment[TIn, TIn], Generic[TIn]): _impl: clientapi.ProcessingStep[TIn] def __init__(self, impl: clientapi.ProcessingStep[TIn], next_segment: _PipeSegment[TIn, U] = None): + assert isinstance(impl, clientapi.ProcessingStep) if next_segment is None: next_segment = NullTerminator() super(DataProcessingSegment, self).__init__(next_segment) @@ -82,6 +91,8 @@ def __init__(self, load: clientapi.Loader[TIn, TRaw], parse: clientapi.ParseImpl next_segment: _PipeSegment[TIn, U] = None): def impl(data: TIn) -> None: parse(data, load(data)) + assert isinstance(load, clientapi.Loader) + assert isinstance(parse, clientapi.ParseImpl) impl.__name__ = f'load:{load.__name__}, parse: {parse.__name__}' super(SourceSegment, self).__init__(impl, next_segment) @@ -99,6 +110,8 @@ def __init__(self, extract: clientapi.Extractor[TIn, TRaw], store: clientapi.Sto next_segment: _PipeSegment[TIn, U] = None): def impl(data: TIn) -> None: store(extract(data)) + assert isinstance(extract, clientapi.Extractor) + assert isinstance(store, clientapi.StoreImpl) impl.__name__ = f'extract: {extract.__name__}, store: {store.__name__}' super(SinkSegment, self).__init__(impl, next_segment) @@ -110,6 +123,7 @@ class RestructuringSegment(_PipeSegment[TIn, TOut], Generic[TIn, TOut]): _impl: Callable[[TIn], TOut] def __init__(self, impl: Callable[[TIn], TOut], next_segment: _PipeSegment[TOut, U] = None): + assert isinstance(impl, clientapi.RestructuringStep) if next_segment is None: next_segment = NullTerminator() super(RestructuringSegment, self).__init__(next_segment) @@ -117,7 +131,7 @@ def __init__(self, impl: Callable[[TIn], TOut], next_segment: _PipeSegment[TOut, @property def descriptor(self) -> str: - return f' <-> Changed to <{inspect.get_annotations(self._impl)["return"].__name__}> using {self._impl.__name__}' + return f' <-> Changed to <{inspect.get_annotations(self._impl, eval_str=True)["return"].__name__}> using {self._impl.__name__}' def _process(self, data: TIn) -> TOut: return self._impl(data) @@ -127,6 +141,9 @@ class NullTerminator(_PipeSegment[TIn, TIn], Generic[TIn]): def __init__(self): pass # Don't call the superclass. + def __next__(self): + raise StopIteration() + def descriptor(self) -> str: raise NotImplementedError diff --git a/tests/pipeline.test_valid_pipeline_passes_test.approved.txt b/tests/pipeline.test_valid_pipeline_passes_test.approved.txt index 8b13789..2f78e7d 100644 --- a/tests/pipeline.test_valid_pipeline_passes_test.approved.txt +++ b/tests/pipeline.test_valid_pipeline_passes_test.approved.txt @@ -1 +1,29 @@ - + Start with empty + | +>-| load:load_customer_csv, parse: parse_customers_from_csv + | +>-| load:load_customer_crm_api, parse: parse_customers_from_json + | + + remove_invalid_emails + | + + remove_test_customers + | +>-| load:load_customer_orders, parse: parse_orders_from_json + | + + remove_empty_orders + | + + group_orders_into_customer_cohorts + | + + compute_cohort_relative_date_per_order + | + <-> Changed to using create_customer_object_graph + | + + understand_something + | + + understand_another + | + + keep_understanding + | + |-> extract: extract_cohort_analysis, store: email_analysis_to_sales_team + | + |-> extract: extract_revenue_projections, store: put_projections_into_quickbooks From 1dd3e75606ba86400d1aa923877d30176643671e Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Mon, 30 Jan 2023 14:41:00 -0500 Subject: [PATCH 30/46] F - Can now iterate over the segments of a pipeline. CheckValid does that, but doesn't have anything to check yet. --- src/datapipeline/pipeline.py | 4 ++-- src/datapipeline/segmentimpl.py | 25 +++++++++++++++++++------ 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/src/datapipeline/pipeline.py b/src/datapipeline/pipeline.py index bd24821..558e060 100644 --- a/src/datapipeline/pipeline.py +++ b/src/datapipeline/pipeline.py @@ -111,8 +111,8 @@ def pipeline(builder: PotentiallyCompletePipeline[TSrc, TDest]) -> Pipeline: def is_valid_pipeline(self): if not isinstance(self.val, Pipeline): raise TypeError('val must be a pipeline.') - # for segment in self.val.segments: - # pass + for segment in self.val.segments: + pass # if segment != 5: # self.error(f'{segment} is NOT 5!') return self diff --git a/src/datapipeline/segmentimpl.py b/src/datapipeline/segmentimpl.py index 9505e09..1112e3d 100644 --- a/src/datapipeline/segmentimpl.py +++ b/src/datapipeline/segmentimpl.py @@ -12,19 +12,30 @@ TRaw = TypeVar('TRaw') -class _PipeSegment(Generic[TIn, TOut], metaclass=ABCMeta): - _next_segment: _PipeSegment[TOut, U] +class PipeIterator: + next_segment: _PipeSegment - def __init__(self, next_segment): - self._next_segment = next_segment + def __init__(self, next_segment: _PipeSegment): + self.next_segment = next_segment def __iter__(self): return self def __next__(self): - if isinstance(self._next_segment, NullTerminator): + if isinstance(self.next_segment, NullTerminator): raise StopIteration() - return self._next_segment + self.next_segment = self.next_segment._next_segment + return self.next_segment + + +class _PipeSegment(Generic[TIn, TOut], metaclass=ABCMeta): + _next_segment: _PipeSegment[TOut, U] + + def __init__(self, next_segment): + self._next_segment = next_segment + + def __iter__(self): + return PipeIterator(self) @property @abstractmethod @@ -91,6 +102,7 @@ def __init__(self, load: clientapi.Loader[TIn, TRaw], parse: clientapi.ParseImpl next_segment: _PipeSegment[TIn, U] = None): def impl(data: TIn) -> None: parse(data, load(data)) + assert isinstance(load, clientapi.Loader) assert isinstance(parse, clientapi.ParseImpl) impl.__name__ = f'load:{load.__name__}, parse: {parse.__name__}' @@ -110,6 +122,7 @@ def __init__(self, extract: clientapi.Extractor[TIn, TRaw], store: clientapi.Sto next_segment: _PipeSegment[TIn, U] = None): def impl(data: TIn) -> None: store(extract(data)) + assert isinstance(extract, clientapi.Extractor) assert isinstance(store, clientapi.StoreImpl) impl.__name__ = f'extract: {extract.__name__}, store: {store.__name__}' From 15c0d2bc676d818c5819a3a95d3fe453d17fa4e6 Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Mon, 30 Jan 2023 14:43:53 -0500 Subject: [PATCH 31/46] r - Move iterator to be a nested class. --- src/datapipeline/segmentimpl.py | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/src/datapipeline/segmentimpl.py b/src/datapipeline/segmentimpl.py index 1112e3d..74bc1cf 100644 --- a/src/datapipeline/segmentimpl.py +++ b/src/datapipeline/segmentimpl.py @@ -12,30 +12,29 @@ TRaw = TypeVar('TRaw') -class PipeIterator: - next_segment: _PipeSegment +class _PipeSegment(Generic[TIn, TOut], metaclass=ABCMeta): + _next_segment: _PipeSegment[TOut, U] - def __init__(self, next_segment: _PipeSegment): - self.next_segment = next_segment + class Iterator: + next_segment: _PipeSegment - def __iter__(self): - return self + def __init__(self, next_segment: _PipeSegment): + self.next_segment = next_segment - def __next__(self): - if isinstance(self.next_segment, NullTerminator): - raise StopIteration() - self.next_segment = self.next_segment._next_segment - return self.next_segment + def __iter__(self): + return self - -class _PipeSegment(Generic[TIn, TOut], metaclass=ABCMeta): - _next_segment: _PipeSegment[TOut, U] + def __next__(self): + if isinstance(self.next_segment, NullTerminator): + raise StopIteration() + self.next_segment = self.next_segment._next_segment + return self.next_segment def __init__(self, next_segment): self._next_segment = next_segment def __iter__(self): - return PipeIterator(self) + return _PipeSegment.Iterator(self) @property @abstractmethod From 5ee186b3987603a0b41a68d2298d514d9281f94d Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Mon, 30 Jan 2023 14:54:13 -0500 Subject: [PATCH 32/46] r - Delete unused code. --- src/datapipeline/segmentimpl.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/datapipeline/segmentimpl.py b/src/datapipeline/segmentimpl.py index 74bc1cf..79f2ed8 100644 --- a/src/datapipeline/segmentimpl.py +++ b/src/datapipeline/segmentimpl.py @@ -167,6 +167,3 @@ def process(self, data: TIn) -> None: def _process(self, data: TIn) -> TIn: raise NotImplementedError - - def then(self, next_segment: ProcessingStep[TIn] | RestructuringStep[TIn, U]) -> _PipeSegment[TIn, U]: - raise NotImplementedError From 5e3e9e2079251e2503445cab4cd7fda78b5948ce Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Mon, 30 Jan 2023 14:54:43 -0500 Subject: [PATCH 33/46] F - Pipe segments now have needs and gives lists, which are always empty. --- src/datapipeline/segmentimpl.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/src/datapipeline/segmentimpl.py b/src/datapipeline/segmentimpl.py index 79f2ed8..4492963 100644 --- a/src/datapipeline/segmentimpl.py +++ b/src/datapipeline/segmentimpl.py @@ -2,7 +2,7 @@ import inspect from abc import ABCMeta, abstractmethod -from typing import Generic, TypeVar, Callable +from typing import Generic, TypeVar, Callable, List from datapipeline import clientapi @@ -30,8 +30,12 @@ def __next__(self): self.next_segment = self.next_segment._next_segment return self.next_segment - def __init__(self, next_segment): + def __init__(self, needs: List[str], gives: List[str], next_segment: _PipeSegment[TOut, U] = None): + if next_segment is None: + next_segment = NullTerminator() self._next_segment = next_segment + self.needs = needs + self.gives = gives def __iter__(self): return _PipeSegment.Iterator(self) @@ -57,9 +61,7 @@ class PipeHeadSegment(_PipeSegment[TIn, TIn], Generic[TIn]): _impl: Callable[[], TIn] def __init__(self, impl: Callable[[], TIn], next_segment: _PipeSegment[TIn, U] = None): - if next_segment is None: - next_segment = NullTerminator() - super(PipeHeadSegment, self).__init__(next_segment) + super(PipeHeadSegment, self).__init__([], [], next_segment) self._impl = impl def to_verification_string(self) -> str: @@ -78,9 +80,7 @@ class DataProcessingSegment(_PipeSegment[TIn, TIn], Generic[TIn]): def __init__(self, impl: clientapi.ProcessingStep[TIn], next_segment: _PipeSegment[TIn, U] = None): assert isinstance(impl, clientapi.ProcessingStep) - if next_segment is None: - next_segment = NullTerminator() - super(DataProcessingSegment, self).__init__(next_segment) + super(DataProcessingSegment, self).__init__([], [], next_segment) self._impl = impl @property @@ -136,9 +136,7 @@ class RestructuringSegment(_PipeSegment[TIn, TOut], Generic[TIn, TOut]): def __init__(self, impl: Callable[[TIn], TOut], next_segment: _PipeSegment[TOut, U] = None): assert isinstance(impl, clientapi.RestructuringStep) - if next_segment is None: - next_segment = NullTerminator() - super(RestructuringSegment, self).__init__(next_segment) + super(RestructuringSegment, self).__init__([], [], next_segment) self._impl = impl @property From b66ed4df186a28cf268259502a4fce1e1283a7d3 Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Mon, 30 Jan 2023 14:55:28 -0500 Subject: [PATCH 34/46] r - Annotate types for needs/gives lists. --- src/datapipeline/segmentimpl.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/datapipeline/segmentimpl.py b/src/datapipeline/segmentimpl.py index 4492963..96de7b2 100644 --- a/src/datapipeline/segmentimpl.py +++ b/src/datapipeline/segmentimpl.py @@ -14,6 +14,8 @@ class _PipeSegment(Generic[TIn, TOut], metaclass=ABCMeta): _next_segment: _PipeSegment[TOut, U] + needs: List[str] + gives: List[str] class Iterator: next_segment: _PipeSegment From 977399924600a5d635f2bd8935add1af7b77a665 Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Mon, 30 Jan 2023 14:57:03 -0500 Subject: [PATCH 35/46] R - Change function property for needs/gives to be a list of strings, rather than a tuple. --- src/datapipeline/pipeline.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/datapipeline/pipeline.py b/src/datapipeline/pipeline.py index 558e060..b6debce 100644 --- a/src/datapipeline/pipeline.py +++ b/src/datapipeline/pipeline.py @@ -14,7 +14,7 @@ def needs(*info_items: str) -> Callable[[T], T]: def inner(f: T) -> T: - f._p_needs_ = info_items + f._p_needs_ = list(info_items) return f return inner @@ -22,7 +22,7 @@ def inner(f: T) -> T: def gives(*info_items: str) -> Callable[[T], T]: def inner(f: T) -> T: - f._p_gives_ = info_items + f._p_gives_ = list(info_items) return f return inner From 5be92e7f6109d134064cf091c169f5ff2de0f477 Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Mon, 30 Jan 2023 15:02:22 -0500 Subject: [PATCH 36/46] F - Include info changes (needs & gives) in the verification string. --- src/datapipeline/segmentimpl.py | 2 +- ...elines_can_be_validated_as_strings.approved.txt | 4 ++++ ...ne.test_valid_pipeline_passes_test.approved.txt | 14 ++++++++++++++ 3 files changed, 19 insertions(+), 1 deletion(-) diff --git a/src/datapipeline/segmentimpl.py b/src/datapipeline/segmentimpl.py index 96de7b2..17d6afd 100644 --- a/src/datapipeline/segmentimpl.py +++ b/src/datapipeline/segmentimpl.py @@ -48,7 +48,7 @@ def descriptor(self) -> str: raise NotImplementedError def to_verification_string(self) -> str: - return f' |\n{self.descriptor}\n{self._next_segment.to_verification_string()}' + return f' |\n{self.descriptor}\n | info change: {self.needs} ==> {self.gives}\n{self._next_segment.to_verification_string()}' @abstractmethod def _process(self, data: TIn) -> TOut: diff --git a/tests/pipe_segments.test_pipelines_can_be_validated_as_strings.approved.txt b/tests/pipe_segments.test_pipelines_can_be_validated_as_strings.approved.txt index 7c91ebe..4979f47 100644 --- a/tests/pipe_segments.test_pipelines_can_be_validated_as_strings.approved.txt +++ b/tests/pipe_segments.test_pipelines_can_be_validated_as_strings.approved.txt @@ -1,9 +1,13 @@ Start with empty | >-| load:first, parse: parse_first + | info change: [] ==> [] | + second + | info change: [] ==> [] | <-> Changed to using convert + | info change: [] ==> [] | |-> extract: extract_for_third, store: third + | info change: [] ==> [] diff --git a/tests/pipeline.test_valid_pipeline_passes_test.approved.txt b/tests/pipeline.test_valid_pipeline_passes_test.approved.txt index 2f78e7d..7827977 100644 --- a/tests/pipeline.test_valid_pipeline_passes_test.approved.txt +++ b/tests/pipeline.test_valid_pipeline_passes_test.approved.txt @@ -1,29 +1,43 @@ Start with empty | >-| load:load_customer_csv, parse: parse_customers_from_csv + | info change: [] ==> [] | >-| load:load_customer_crm_api, parse: parse_customers_from_json + | info change: [] ==> [] | + remove_invalid_emails + | info change: [] ==> [] | + remove_test_customers + | info change: [] ==> [] | >-| load:load_customer_orders, parse: parse_orders_from_json + | info change: [] ==> [] | + remove_empty_orders + | info change: [] ==> [] | + group_orders_into_customer_cohorts + | info change: [] ==> [] | + compute_cohort_relative_date_per_order + | info change: [] ==> [] | <-> Changed to using create_customer_object_graph + | info change: [] ==> [] | + understand_something + | info change: [] ==> [] | + understand_another + | info change: [] ==> [] | + keep_understanding + | info change: [] ==> [] | |-> extract: extract_cohort_analysis, store: email_analysis_to_sales_team + | info change: [] ==> [] | |-> extract: extract_revenue_projections, store: put_projections_into_quickbooks + | info change: [] ==> [] From 06c9b8912c7aaa3643281a2d8f4482802c0b29ba Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Mon, 30 Jan 2023 15:10:37 -0500 Subject: [PATCH 37/46] F - Now we store the gives / needs lists defined by the decorators into the pipe segments. --- src/datapipeline/segmentimpl.py | 8 ++++-- ...st_valid_pipeline_passes_test.approved.txt | 26 +++++++++---------- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/src/datapipeline/segmentimpl.py b/src/datapipeline/segmentimpl.py index 17d6afd..5ad3007 100644 --- a/src/datapipeline/segmentimpl.py +++ b/src/datapipeline/segmentimpl.py @@ -82,7 +82,7 @@ class DataProcessingSegment(_PipeSegment[TIn, TIn], Generic[TIn]): def __init__(self, impl: clientapi.ProcessingStep[TIn], next_segment: _PipeSegment[TIn, U] = None): assert isinstance(impl, clientapi.ProcessingStep) - super(DataProcessingSegment, self).__init__([], [], next_segment) + super(DataProcessingSegment, self).__init__(getattr(impl, "_p_needs_", []), getattr(impl, "_p_gives_", []), next_segment) self._impl = impl @property @@ -107,6 +107,8 @@ def impl(data: TIn) -> None: assert isinstance(load, clientapi.Loader) assert isinstance(parse, clientapi.ParseImpl) impl.__name__ = f'load:{load.__name__}, parse: {parse.__name__}' + impl._p_needs_ = getattr(load, "_p_needs_", []) + impl._p_gives_ = getattr(load, "_p_gives_", []) super(SourceSegment, self).__init__(impl, next_segment) def symbol(self) -> str: @@ -127,6 +129,8 @@ def impl(data: TIn) -> None: assert isinstance(extract, clientapi.Extractor) assert isinstance(store, clientapi.StoreImpl) impl.__name__ = f'extract: {extract.__name__}, store: {store.__name__}' + impl._p_needs_ = getattr(extract, "_p_needs_", []) + impl._p_gives_ = getattr(extract, "_p_gives_", []) super(SinkSegment, self).__init__(impl, next_segment) def symbol(self) -> str: @@ -138,7 +142,7 @@ class RestructuringSegment(_PipeSegment[TIn, TOut], Generic[TIn, TOut]): def __init__(self, impl: Callable[[TIn], TOut], next_segment: _PipeSegment[TOut, U] = None): assert isinstance(impl, clientapi.RestructuringStep) - super(RestructuringSegment, self).__init__([], [], next_segment) + super(RestructuringSegment, self).__init__(getattr(impl, "_p_needs_", []), getattr(impl, "_p_gives_", []), next_segment) self._impl = impl @property diff --git a/tests/pipeline.test_valid_pipeline_passes_test.approved.txt b/tests/pipeline.test_valid_pipeline_passes_test.approved.txt index 7827977..91971c5 100644 --- a/tests/pipeline.test_valid_pipeline_passes_test.approved.txt +++ b/tests/pipeline.test_valid_pipeline_passes_test.approved.txt @@ -1,43 +1,43 @@ Start with empty | >-| load:load_customer_csv, parse: parse_customers_from_csv - | info change: [] ==> [] + | info change: [] ==> ['customers from filesystem', 'customer list'] | >-| load:load_customer_crm_api, parse: parse_customers_from_json - | info change: [] ==> [] + | info change: [] ==> ['customers from api', 'customer list'] | + remove_invalid_emails - | info change: [] ==> [] + | info change: ['customer list'] ==> ['customer emails'] | + remove_test_customers - | info change: [] ==> [] + | info change: ['foo list'] ==> ['non-test customers'] | >-| load:load_customer_orders, parse: parse_orders_from_json - | info change: [] ==> [] + | info change: ['customer emails', 'non-test customers'] ==> ['customer orders'] | + remove_empty_orders - | info change: [] ==> [] + | info change: ['customer orders'] ==> ['valid orders'] | + group_orders_into_customer_cohorts - | info change: [] ==> [] + | info change: ['customer list', 'customer orders'] ==> ['order cohorts'] | + compute_cohort_relative_date_per_order - | info change: [] ==> [] + | info change: ['valid orders', 'order cohorts'] ==> ['relative order timing'] | <-> Changed to using create_customer_object_graph | info change: [] ==> [] | + understand_something - | info change: [] ==> [] + | info change: ['customer list', 'order cohorts', 'customer emails'] ==> ['alpha', 'omega'] | + understand_another - | info change: [] ==> [] + | info change: ['customer list', 'relative order timing', 'alpha'] ==> ['beta'] | + keep_understanding - | info change: [] ==> [] + | info change: ['omega', 'beta'] ==> ['totality'] | |-> extract: extract_cohort_analysis, store: email_analysis_to_sales_team - | info change: [] ==> [] + | info change: ['alpha', 'beta'] ==> [] | |-> extract: extract_revenue_projections, store: put_projections_into_quickbooks - | info change: [] ==> [] + | info change: ['totality'] ==> [] From 281e1d65bc7e7a102929287000de7d934808b697 Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Mon, 30 Jan 2023 17:28:25 -0500 Subject: [PATCH 38/46] F - Can now validate that a pipeline provides all needed info before depending on it, as long as people use @needs() and @gives(). --- src/datapipeline/pipeline.py | 20 +++++++++++++++---- src/datapipeline/segmentimpl.py | 2 +- src/examplecode/product.py | 2 +- tests/pipeline.py | 19 +++++++++--------- ...st_valid_pipeline_passes_test.approved.txt | 2 +- 5 files changed, 29 insertions(+), 16 deletions(-) diff --git a/src/datapipeline/pipeline.py b/src/datapipeline/pipeline.py index b6debce..5227d8a 100644 --- a/src/datapipeline/pipeline.py +++ b/src/datapipeline/pipeline.py @@ -2,6 +2,9 @@ from typing import Callable, TypeVar, Awaitable, Generic, List, Iterable +import assertpy +from assertpy import soft_assertions, assert_that, soft_fail + from datapipeline import DataProcessingSegment, RestructuringSegment, PipeHeadSegment, clientapi from datapipeline.segmentimpl import _PipeSegment, SourceSegment, TransformSegment, SinkSegment @@ -111,8 +114,17 @@ def pipeline(builder: PotentiallyCompletePipeline[TSrc, TDest]) -> Pipeline: def is_valid_pipeline(self): if not isinstance(self.val, Pipeline): raise TypeError('val must be a pipeline.') - for segment in self.val.segments: - pass - # if segment != 5: - # self.error(f'{segment} is NOT 5!') + provided = set() + with soft_assertions(): + kind = self.kind + self.kind = 'soft' + for segment in self.val.segments: + needed = set(segment.needs) + if not needed <= provided: + desc = self.description + self.description = f'Segment {segment.descriptor}' + self.error(f'Had unmet needs {needed - provided}. The pipeline to that point had provided {provided}.') + self.description = desc + provided.update(segment.gives) + self.kind = kind return self diff --git a/src/datapipeline/segmentimpl.py b/src/datapipeline/segmentimpl.py index 5ad3007..41c1200 100644 --- a/src/datapipeline/segmentimpl.py +++ b/src/datapipeline/segmentimpl.py @@ -27,9 +27,9 @@ def __iter__(self): return self def __next__(self): + self.next_segment = self.next_segment._next_segment if isinstance(self.next_segment, NullTerminator): raise StopIteration() - self.next_segment = self.next_segment._next_segment return self.next_segment def __init__(self, needs: List[str], gives: List[str], next_segment: _PipeSegment[TOut, U] = None): diff --git a/src/examplecode/product.py b/src/examplecode/product.py index ea273d0..2b8c8c3 100644 --- a/src/examplecode/product.py +++ b/src/examplecode/product.py @@ -37,7 +37,7 @@ def remove_invalid_emails(data: RawCustomerData) -> None: pass -@needs("foo list") +@needs("customer list") @gives("non-test customers") def remove_test_customers(data: RawCustomerData) -> None: pass diff --git a/tests/pipeline.py b/tests/pipeline.py index 21983ae..8a6f7ed 100644 --- a/tests/pipeline.py +++ b/tests/pipeline.py @@ -16,13 +16,14 @@ def test_valid_pipeline_passes_test(): def test_pipeline_with_missing_requirement_is_invalid(): result = pipeline( - start_with(product.RawCustomerData) - .then( - source(product.load_customer_csv, product.parse_customers_from_csv), - transform(product.remove_empty_orders)) - ) - try: + start_with(product.RawCustomerData) + .then( + source(product.load_customer_csv, product.parse_customers_from_csv), + transform(product.remove_empty_orders)) + ) + + def make_assertion(): assert_that(result).is_valid_pipeline() - # fail("Pipeline should not have been valid") - except AssertionError as e: - assert_that(str(e)).is_equal_to("") + + assert_that(make_assertion).raises(AssertionError).when_called_with().contains( + "[Segment + remove_empty_orders] Had unmet needs {'customer orders'}. The pipeline to that point had provided {'") diff --git a/tests/pipeline.test_valid_pipeline_passes_test.approved.txt b/tests/pipeline.test_valid_pipeline_passes_test.approved.txt index 91971c5..f3a9eeb 100644 --- a/tests/pipeline.test_valid_pipeline_passes_test.approved.txt +++ b/tests/pipeline.test_valid_pipeline_passes_test.approved.txt @@ -10,7 +10,7 @@ | info change: ['customer list'] ==> ['customer emails'] | + remove_test_customers - | info change: ['foo list'] ==> ['non-test customers'] + | info change: ['customer list'] ==> ['non-test customers'] | >-| load:load_customer_orders, parse: parse_orders_from_json | info change: ['customer emails', 'non-test customers'] ==> ['customer orders'] From b4d53f9fa57ee26004bc23e1987eac0809c835ef Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Mon, 30 Jan 2023 17:29:09 -0500 Subject: [PATCH 39/46] r - Optimize imports. --- tests/pipeline.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/pipeline.py b/tests/pipeline.py index 8a6f7ed..94b8ac4 100644 --- a/tests/pipeline.py +++ b/tests/pipeline.py @@ -1,6 +1,5 @@ -import pytest from approvaltests import verify -from assertpy import assert_that, add_extension, fail +from assertpy import assert_that, add_extension from datapipeline import pipeline, start_with, source, transform, is_valid_pipeline from examplecode import product From b040e3a1cec09229900b508a6f5d27b223fd6438 Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Mon, 30 Jan 2023 17:40:02 -0500 Subject: [PATCH 40/46] E** Trying to get tests to run on CI. CI env config isn't working out of the box and I don't really understand it, so... --- .github/workflows/test.yml | 4 ++-- pyproject.toml | 2 ++ pytest.ini | 2 -- src/version.py | 2 +- tox.ini | 5 ++--- 5 files changed, 7 insertions(+), 8 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 3e37cb9..86b8d71 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -11,8 +11,8 @@ jobs: strategy: fail-fast: false matrix: - python-version: [3.9] - os: [macos-latest, ubuntu-latest, windows-latest] + python-version: [3.10] + os: [ubuntu-latest] steps: - uses: actions/checkout@v2 diff --git a/pyproject.toml b/pyproject.toml index c668604..c157773 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,5 +1,7 @@ [tool.pytest.ini_options] pythonpath = "src" +testpaths = 'tests' +python_files = '*.py' addopts = [ "--import-mode=importlib", ] diff --git a/pytest.ini b/pytest.ini index 923020e..0c98ff0 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,4 +1,2 @@ [pytest] -testpaths = tests -python_files = *.py addopts = --showlocals diff --git a/src/version.py b/src/version.py index 54f129d..dbfe2f6 100644 --- a/src/version.py +++ b/src/version.py @@ -1 +1 @@ -version_number = "0.0.0" +version_number = "0.1.0" diff --git a/tox.ini b/tox.ini index b2c52fb..398708d 100644 --- a/tox.ini +++ b/tox.ini @@ -1,7 +1,7 @@ - + # run with: tox -e dev [tox] -envlist = tests-{py38,py39} +envlist = tests-{py} [testenv] commands = python -m pytest {posargs} @@ -10,4 +10,3 @@ deps = -rrequirements.txt [testenv:dev] usedevelop = True commands = - From 202e085786f11ecff5b0a9e3ae8f503f27213a71 Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Mon, 30 Jan 2023 17:45:10 -0500 Subject: [PATCH 41/46] E** Perhaps now get python 3.10? --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 86b8d71..9833f8f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -11,7 +11,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: [3.10] + python-version: ['3.10'] os: [ubuntu-latest] steps: From e46209bda885d3beebfcb6427402099c39a4dee4 Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Mon, 30 Jan 2023 17:48:10 -0500 Subject: [PATCH 42/46] E!! Remove markdown snippets, since that was taking a ton of CI build time (way more than the tests). I'll manage my own md. --- .github/workflows/updateMarkdown.yml | 22 ---------------------- README.md | 3 +-- mdsnippets.json | 5 ----- 3 files changed, 1 insertion(+), 29 deletions(-) delete mode 100644 .github/workflows/updateMarkdown.yml delete mode 100644 mdsnippets.json diff --git a/.github/workflows/updateMarkdown.yml b/.github/workflows/updateMarkdown.yml deleted file mode 100644 index 291a609..0000000 --- a/.github/workflows/updateMarkdown.yml +++ /dev/null @@ -1,22 +0,0 @@ -name: on-push-do-doco -on: - push: -jobs: - release: - runs-on: windows-latest - steps: - - uses: actions/checkout@v3 - - name: Run MarkdownSnippets - run: | - dotnet tool install --global MarkdownSnippets.Tool - mdsnippets ${GITHUB_WORKSPACE} - shell: bash - - name: Push changes - run: | - git config --local user.email "action@github.com" - git config --local user.name "GitHub Action" - git commit -m "Doco changes" -a || echo "nothing to commit" - remote="https://${GITHUB_ACTOR}:${{secrets.GITHUB_TOKEN}}@github.com/${GITHUB_REPOSITORY}.git" - branch="${GITHUB_REF:11}" - git push "${remote}" ${branch} || echo "nothing to push" - shell: bash diff --git a/README.md b/README.md index 3cb6d41..fe91c05 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,6 @@ # Data Pipelines in Python Library that makes it easy to write Python programs that get data from multiple sources, manipulate it, and then output the results. - ## Contents * [Who is this project for?](#who-is-this-project-for) @@ -12,7 +11,7 @@ Library that makes it easy to write Python programs that get data from multiple * [Watch the video](#watch-the-video) * [What is included?](#what-is-included) * [Recommended Tooling?](#recommended-tooling) - * [Next steps](#next-steps) + * [Next steps](#next-steps) ## Who is this project for? Anyone implementing a project that manipulates data from one or more sources and writes the results to one or more sinks. diff --git a/mdsnippets.json b/mdsnippets.json deleted file mode 100644 index 27dddee..0000000 --- a/mdsnippets.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "ExcludeDirectories": [ "target" ], - "Convention": "InPlaceOverwrite", - "TocLevel": 5 -} From 0aa4afcfe214630d811683fb77fc02b62e94779a Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Mon, 30 Jan 2023 17:52:45 -0500 Subject: [PATCH 43/46] E** Stop being fancy with the version number. It isn't working. --- setup.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/setup.py b/setup.py index 9caf8f0..4c88cfc 100644 --- a/setup.py +++ b/setup.py @@ -1,16 +1,12 @@ -import re from pathlib import Path from setuptools import setup, find_packages HERE = Path(__file__).parent -_version_file_contents = (HERE / "src" / "version.py").read_text() -matched = re.search(r'"(.*)"', _version_file_contents) -VERSION = matched.group(1) if matched is not None else "UNKNOWN VERSION" setup( name="datapipelines", - version=VERSION, + version="0.1.0", description="Data transform pipeline library to simplify ETL-style applications", author="Deep Roots", author_email="", From 4edc807d8bc917a36babf4b43c18c670dd265d22 Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Tue, 31 Jan 2023 10:19:49 -0500 Subject: [PATCH 44/46] B** - Set pytest.ini to perhaps work for CI builds. --- pytest.ini | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pytest.ini b/pytest.ini index 0c98ff0..883b040 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,2 +1,5 @@ [pytest] -addopts = --showlocals +pythonpath = src +testpaths = tests +python_files = *.py +addopts = --showlocals --import-mode=importlib From 9c201df121a0f392e24b79aec2af9d6a07f7bddf Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Tue, 31 Jan 2023 13:01:19 -0500 Subject: [PATCH 45/46] F - Pipe segments now operate async when appropriate. The pipe's backbone is async, sometimes calling client code synchronously. --- conftest.py | 1 + pytest.ini | 2 ++ requirements.txt | 1 + src/datapipeline/clientapi.py | 11 ++++-- src/datapipeline/segmentimpl.py | 35 +++++++++--------- tests/pipe_segments.py | 63 ++++++++++++++++++++------------- 6 files changed, 70 insertions(+), 43 deletions(-) create mode 100644 conftest.py diff --git a/conftest.py b/conftest.py new file mode 100644 index 0000000..044963b --- /dev/null +++ b/conftest.py @@ -0,0 +1 @@ +collect_ignore = ["setup.py"] diff --git a/pytest.ini b/pytest.ini index 883b040..b8ae217 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,5 +1,7 @@ [pytest] +asyncio_mode=auto pythonpath = src testpaths = tests python_files = *.py +norecursedirs = .pytest_cache .github .idea addopts = --showlocals --import-mode=importlib diff --git a/requirements.txt b/requirements.txt index 56e6704..4f8c766 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ approvaltests pytest assertpy +pytest-asyncio diff --git a/src/datapipeline/clientapi.py b/src/datapipeline/clientapi.py index 57a488b..463602b 100644 --- a/src/datapipeline/clientapi.py +++ b/src/datapipeline/clientapi.py @@ -22,6 +22,13 @@ def __call__(self, data: TIn) -> None: pass +@runtime_checkable +class ProcessingStepAsync(NamedStep, Protocol[TIn]): + @abstractmethod + async def __call__(self, data: TIn) -> None: + pass + + @runtime_checkable class RestructuringStep(NamedStep, Protocol[TIn, TOut]): @abstractmethod @@ -32,7 +39,7 @@ def __call__(self, data: TIn) -> TOut: @runtime_checkable class Loader(NamedStep, Protocol[TIn, TOut]): @abstractmethod - def __call__(self, data: TIn) -> TOut: + async def __call__(self, data: TIn) -> TOut: pass @@ -53,5 +60,5 @@ def __call__(self, data: TIn) -> TOut: @runtime_checkable class StoreImpl(NamedStep, Protocol[TIn]): @abstractmethod - def __call__(self, data: TIn) -> None: + async def __call__(self, data: TIn) -> None: pass diff --git a/src/datapipeline/segmentimpl.py b/src/datapipeline/segmentimpl.py index 41c1200..0a37a8d 100644 --- a/src/datapipeline/segmentimpl.py +++ b/src/datapipeline/segmentimpl.py @@ -51,12 +51,12 @@ def to_verification_string(self) -> str: return f' |\n{self.descriptor}\n | info change: {self.needs} ==> {self.gives}\n{self._next_segment.to_verification_string()}' @abstractmethod - def _process(self, data: TIn) -> TOut: + async def _process(self, data: TIn) -> TOut: pass - def process(self, data: TIn) -> None: - result = self._process(data) - self._next_segment.process(result) + async def process(self, data: TIn) -> None: + result = await self._process(data) + await self._next_segment.process(result) class PipeHeadSegment(_PipeSegment[TIn, TIn], Generic[TIn]): @@ -69,7 +69,7 @@ def __init__(self, impl: Callable[[], TIn], next_segment: _PipeSegment[TIn, U] = def to_verification_string(self) -> str: return f' Start with empty <{self._impl.__name__}>\n{self._next_segment.to_verification_string()}' - def _process(self, data: TIn) -> TIn: + async def _process(self, data: TIn) -> TIn: return self._impl() @property @@ -78,9 +78,10 @@ def descriptor(self) -> str: class DataProcessingSegment(_PipeSegment[TIn, TIn], Generic[TIn]): - _impl: clientapi.ProcessingStep[TIn] + _impl: clientapi.ProcessingStep[TIn] | clientapi.ProcessingStepAsync[TIn] - def __init__(self, impl: clientapi.ProcessingStep[TIn], next_segment: _PipeSegment[TIn, U] = None): + def __init__(self, impl: clientapi.ProcessingStep[TIn] | clientapi.ProcessingStepAsync[TIn], + next_segment: _PipeSegment[TIn, U] = None): assert isinstance(impl, clientapi.ProcessingStep) super(DataProcessingSegment, self).__init__(getattr(impl, "_p_needs_", []), getattr(impl, "_p_gives_", []), next_segment) self._impl = impl @@ -93,16 +94,18 @@ def descriptor(self) -> str: def symbol(self) -> str: raise NotImplementedError - def _process(self, data: TIn) -> TIn: - self._impl(data) + async def _process(self, data: TIn) -> TIn: + result = self._impl(data) + if inspect.isawaitable(result): + await result return data class SourceSegment(DataProcessingSegment[TIn], Generic[TIn, TRaw]): def __init__(self, load: clientapi.Loader[TIn, TRaw], parse: clientapi.ParseImpl[TIn, TRaw], next_segment: _PipeSegment[TIn, U] = None): - def impl(data: TIn) -> None: - parse(data, load(data)) + async def impl(data: TIn) -> None: + parse(data, await load(data)) assert isinstance(load, clientapi.Loader) assert isinstance(parse, clientapi.ParseImpl) @@ -123,8 +126,8 @@ def symbol(self) -> str: class SinkSegment(DataProcessingSegment[TIn], Generic[TIn, TRaw]): def __init__(self, extract: clientapi.Extractor[TIn, TRaw], store: clientapi.StoreImpl[TRaw], next_segment: _PipeSegment[TIn, U] = None): - def impl(data: TIn) -> None: - store(extract(data)) + async def impl(data: TIn) -> None: + await store(extract(data)) assert isinstance(extract, clientapi.Extractor) assert isinstance(store, clientapi.StoreImpl) @@ -149,7 +152,7 @@ def __init__(self, impl: Callable[[TIn], TOut], next_segment: _PipeSegment[TOut, def descriptor(self) -> str: return f' <-> Changed to <{inspect.get_annotations(self._impl, eval_str=True)["return"].__name__}> using {self._impl.__name__}' - def _process(self, data: TIn) -> TOut: + async def _process(self, data: TIn) -> TOut: return self._impl(data) @@ -166,8 +169,8 @@ def descriptor(self) -> str: def to_verification_string(self) -> str: return "" - def process(self, data: TIn) -> None: + async def process(self, data: TIn) -> None: pass - def _process(self, data: TIn) -> TIn: + async def _process(self, data: TIn) -> TIn: raise NotImplementedError diff --git a/tests/pipe_segments.py b/tests/pipe_segments.py index 5f27414..ed6c8b2 100644 --- a/tests/pipe_segments.py +++ b/tests/pipe_segments.py @@ -1,3 +1,4 @@ +import asyncio from typing import Any, Generic, TypeVar import pytest @@ -10,11 +11,11 @@ class DataTransferObject: - some_num: Any + some_value: Any dumb_object: Any - def __init__(self, some_num=None): - self.some_num = some_num + def __init__(self, some_value=None): + self.some_value = some_value self.dumb_object = None @@ -26,10 +27,10 @@ def __init__(self, message: str = ""): def format_dumb_object(data: DataTransferObject) -> None: - data.dumb_object = f"{{ saw: {data.some_num} }}" + data.dumb_object = f"{{ saw: {data.some_value} }}" -def first(data: DataTransferObject) -> dict: +async def first(data: DataTransferObject) -> dict: pass @@ -45,12 +46,12 @@ def extract_for_third(data: SecondDTO) -> dict: pass -def third(data: dict) -> None: +async def third(data: dict) -> None: pass def convert(data: DataTransferObject) -> SecondDTO: - return SecondDTO(f"made by convert function from {data.some_num}") + return SecondDTO(f"made by convert function from {data.some_value}") class Capture(Generic[T]): @@ -64,43 +65,55 @@ def __call__(self, arg: T) -> None: self.result = arg -def test_restructuring_segment_uses_its_impl(): +class CaptureAsync(Generic[T]): + result: T + + def __init__(self): + self.result = None + self.__name__ = 'CaptureAsync' + + async def __call__(self, arg: T) -> None: + await asyncio.sleep(0.005) + self.result = arg + + +async def test_restructuring_segment_uses_its_impl(): def has_right_message(arg: SecondDTO): assert_that(arg.message).is_equal_to("made by convert function from 2") test_subject = RestructuringSegment(convert, TransformSegment(has_right_message)) - test_subject.process(DataTransferObject(2)) + await test_subject.process(DataTransferObject(2)) -def test_pipe_segment_calls_its_transform_impl_to_process_data(): +async def test_pipe_segment_calls_its_transform_impl_to_process_data(): capture = Capture[DataTransferObject]() test_subject = TransformSegment(format_dumb_object, TransformSegment(capture)) - data = DataTransferObject(4) - test_subject.process(data) + await test_subject.process(DataTransferObject(4)) assert_that(capture.result.dumb_object).is_equal_to("{ saw: 4 }") -def test_source_segment_chains_its_two_implementations(): - def fetch_data(arg: DataTransferObject) -> int: - return 8 +async def test_source_segment_chains_its_two_implementations(): + async def fetch_data(arg: DataTransferObject) -> str: + await asyncio.sleep(0.005) + return "fetched value" - def parse_data(arg: DataTransferObject, new_data: int) -> None: - arg.some_num = new_data + def parse_data(arg: DataTransferObject, new_data: str) -> None: + arg.some_value = new_data capture = Capture[DataTransferObject]() test_subject = SourceSegment(fetch_data, parse_data, TransformSegment(capture)) - test_subject.process(DataTransferObject(4)) - assert_that(capture.result.some_num).is_equal_to(8) + await test_subject.process(DataTransferObject("original value")) + assert_that(capture.result.some_value).is_equal_to("fetched value") -def test_sink_segment_chains_its_two_implementations(): - def extract(arg: DataTransferObject) -> int: - return 8 +async def test_sink_segment_chains_its_two_implementations(): + def extract(arg: DataTransferObject) -> str: + return "extracted value" - capture = Capture[DataTransferObject]() + capture = CaptureAsync[DataTransferObject]() test_subject = SinkSegment(extract, capture) - test_subject.process(DataTransferObject(4)) - assert_that(capture.result).is_equal_to(8) + await test_subject.process(DataTransferObject("original value")) + assert_that(capture.result).is_equal_to("extracted value") def test_pipelines_can_be_validated_as_strings(): From 840dcfde46059ca493edcedeb1a9320456cd83da Mon Sep 17 00:00:00 2001 From: arlobelshee Date: Tue, 31 Jan 2023 13:14:53 -0500 Subject: [PATCH 46/46] F - Pipeline supports run - which is a regular, synchronous function. It encapsulates all the async behavior in the pipe. --- src/datapipeline/pipeline.py | 8 +++++-- src/examplecode/product.py | 7 +++++- tests/pipeline.py | 45 +++++++++++++++++++++++++++++++++++- 3 files changed, 56 insertions(+), 4 deletions(-) diff --git a/src/datapipeline/pipeline.py b/src/datapipeline/pipeline.py index 5227d8a..7d064f5 100644 --- a/src/datapipeline/pipeline.py +++ b/src/datapipeline/pipeline.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio from typing import Callable, TypeVar, Awaitable, Generic, List, Iterable import assertpy @@ -103,9 +104,12 @@ def to_verification_string(self): return self._first_segment.to_verification_string() @property - def segments(self) -> Iterable[_PipeSegment]: + def _segments(self) -> Iterable[_PipeSegment]: return iter(self._first_segment) + def run(self): + asyncio.run(self._first_segment.process(None)) + def pipeline(builder: PotentiallyCompletePipeline[TSrc, TDest]) -> Pipeline: return builder.build() @@ -118,7 +122,7 @@ def is_valid_pipeline(self): with soft_assertions(): kind = self.kind self.kind = 'soft' - for segment in self.val.segments: + for segment in self.val._segments: needed = set(segment.needs) if not needed <= provided: desc = self.description diff --git a/src/examplecode/product.py b/src/examplecode/product.py index 2b8c8c3..f63a914 100644 --- a/src/examplecode/product.py +++ b/src/examplecode/product.py @@ -1,10 +1,15 @@ from __future__ import annotations +from typing import Any + from datapipeline.pipeline import needs, gives, source, transform, sink, pipeline, start_with class RawCustomerData: - pass + value: Any + + def __init__(self): + self.value = None class CustomerGraph: diff --git a/tests/pipeline.py b/tests/pipeline.py index 94b8ac4..670d285 100644 --- a/tests/pipeline.py +++ b/tests/pipeline.py @@ -1,11 +1,28 @@ +import asyncio +from typing import Generic, TypeVar + from approvaltests import verify from assertpy import assert_that, add_extension -from datapipeline import pipeline, start_with, source, transform, is_valid_pipeline +from datapipeline import pipeline, start_with, source, transform, is_valid_pipeline, sink from examplecode import product add_extension(is_valid_pipeline) +T = TypeVar('T') + + +class CaptureAsync(Generic[T]): + result: T + + def __init__(self): + self.result = None + self.__name__ = 'CaptureAsync' + + async def __call__(self, arg: T) -> None: + await asyncio.sleep(0.005) + self.result = arg + def test_valid_pipeline_passes_test(): result = product.create_pipeline() @@ -13,6 +30,32 @@ def test_valid_pipeline_passes_test(): verify(result.to_verification_string()) +def test_pipeline_runs_its_segments(): + async def fetch(data: product.RawCustomerData) -> str: + await asyncio.sleep(0.005) + return "fetched data" + + def parse(data: product.RawCustomerData, new_data: str) -> None: + data.value = new_data.split()[0] + + def change(data: product.RawCustomerData) -> None: + data.value = f"changed {data.value}" + + def extract(data: product.RawCustomerData) -> str: + return data.value + + capture = CaptureAsync[str]() + test_subject = pipeline( + start_with(product.RawCustomerData) + .then( + source(fetch, parse), + transform(change), + sink(extract, capture)) + ) + test_subject.run() + assert_that(capture.result).is_equal_to("changed fetched") + + def test_pipeline_with_missing_requirement_is_invalid(): result = pipeline( start_with(product.RawCustomerData)