refactor: rename test/ to tests/

This commit is contained in:
Konstantin Fickel 2026-02-15 17:30:44 +01:00
parent af2debc19b
commit f9ed0463f7
Signed by: kfickel
GPG key ID: A793722F9933C1A5
6 changed files with 16 additions and 17 deletions

View file

@ -0,0 +1,157 @@
from datetime import date, datetime, time
from streamd.localize.extract_datetime import (
extract_date_from_marker,
extract_datetime_from_file_name,
extract_datetime_from_marker,
extract_datetime_from_marker_list,
extract_time_from_marker,
)
class TestExtractDateTime:
def test_extract_date_from_file_name_valid(self):
file_name = "20230101-123456 Some Text.md"
assert datetime(2023, 1, 1, 12, 34, 56) == extract_datetime_from_file_name(
file_name
)
def test_extract_date_from_file_name_invalid(self):
file_name = "invalid-file-name.md"
assert extract_datetime_from_file_name(file_name) is None
def test_extract_date_from_file_name_without_time(self):
file_name = "20230101 Some Text.md"
assert datetime(2023, 1, 1, 0, 0, 0) == extract_datetime_from_file_name(
file_name
)
def test_extract_date_from_file_name_short_time(self):
file_name = "20230101-1234 Some Text.md"
assert datetime(2023, 1, 1, 12, 34, 0) == extract_datetime_from_file_name(
file_name
)
def test_extract_date_from_file_name_empty_string(self):
file_name = ""
assert extract_datetime_from_file_name(file_name) is None
def test_extract_date_from_file_name_with_full_path(self):
file_name = "/path/to/20230101-123456 Some Text.md"
assert datetime(2023, 1, 1, 12, 34, 56) == extract_datetime_from_file_name(
file_name
)
class TestExtractMarkerDateTime:
def test_extract_datetime_from_marker_valid(self):
marker = "20250101150000"
assert datetime(2025, 1, 1, 15, 0, 0) == extract_datetime_from_marker(marker)
def test_extract_datetime_from_marker_invalid_format(self):
assert extract_datetime_from_marker("2025010115000") is None # too short
assert extract_datetime_from_marker("202501011500000") is None # too long
assert extract_datetime_from_marker("2025-01-01T150000") is None # separators
assert extract_datetime_from_marker("2025010115000a") is None # non-digit
assert extract_datetime_from_marker("") is None
def test_extract_datetime_from_marker_invalid_values(self):
assert extract_datetime_from_marker("20250230120000") is None # Feb 30
assert extract_datetime_from_marker("20250101126000") is None # minute 60
assert extract_datetime_from_marker("20250101240000") is None # hour 24
class TestExtractMarkerDate:
def test_extract_date_from_marker_valid(self):
marker = "20250101"
assert date(2025, 1, 1) == extract_date_from_marker(marker)
def test_extract_date_from_marker_invalid_format(self):
assert extract_date_from_marker("2025010") is None # too short
assert extract_date_from_marker("202501011") is None # too long
assert extract_date_from_marker("2025-01-01") is None # separators
assert extract_date_from_marker("2025010a") is None # non-digit
assert extract_date_from_marker("") is None
def test_extract_date_from_marker_invalid_values(self):
assert extract_date_from_marker("20250230") is None # Feb 30
assert extract_date_from_marker("20251301") is None # month 13
assert extract_date_from_marker("20250132") is None # day 32
class TestExtractMarkerTime:
def test_extract_time_from_marker_valid(self):
marker = "150000"
assert time(15, 0, 0) == extract_time_from_marker(marker)
def test_extract_time_from_marker_invalid_format(self):
assert extract_time_from_marker("15000") is None # too short
assert extract_time_from_marker("1500000") is None # too long
assert extract_time_from_marker("15:00:00") is None # separators
assert extract_time_from_marker("15000a") is None # non-digit
assert extract_time_from_marker("") is None
def test_extract_time_from_marker_invalid_values(self):
assert extract_time_from_marker("240000") is None # hour 24
assert extract_time_from_marker("156000") is None # minute 60
assert extract_time_from_marker("150060") is None # second 60
class TestExtractDateTimeFromMarkerList:
def test_no_markers_inherits_datetime(self):
inherited = datetime(2025, 1, 2, 3, 4, 5)
assert inherited == extract_datetime_from_marker_list([], inherited)
def test_unrelated_markers_inherits_datetime(self):
inherited = datetime(2025, 1, 2, 3, 4, 5)
markers = ["not-a-marker", "2025-01-01", "1500", "1234567"]
assert inherited == extract_datetime_from_marker_list(markers, inherited)
def test_date_only_marker_sets_midnight(self):
inherited = datetime(2025, 6, 7, 8, 9, 10)
markers = ["20250101"]
assert datetime(2025, 1, 1, 0, 0, 0) == extract_datetime_from_marker_list(
markers, inherited
)
def test_time_only_marker_inherits_date(self):
inherited = datetime(2025, 6, 7, 8, 9, 10)
markers = ["150000"]
assert datetime(2025, 6, 7, 15, 0, 0) == extract_datetime_from_marker_list(
markers, inherited
)
def test_datetime_marker_overrides_both_date_and_time(self):
inherited = datetime(2025, 6, 7, 8, 9, 10)
markers = ["20250101150000"]
assert datetime(2025, 1, 1, 15, 0, 0) == extract_datetime_from_marker_list(
markers, inherited
)
def test_combined_date_and_time_markers(self):
inherited = datetime(2025, 6, 7, 8, 9, 10)
markers = ["20250101", "150000"]
assert datetime(2025, 1, 1, 15, 0, 0) == extract_datetime_from_marker_list(
markers, inherited
)
def test_first_marker_wins_when_multiple_dates_or_times(self):
inherited = datetime(2025, 6, 7, 8, 9, 10)
markers = ["20250101", "150000", "20250102", "160000"]
assert datetime(2025, 1, 1, 15, 0, 0) == extract_datetime_from_marker_list(
markers, inherited
)
def test_last_separated_date_and_time_win(self):
inherited = datetime(2025, 6, 7, 8, 9, 10)
markers = ["20250101", "150000", "20250102160000"]
assert datetime(2025, 1, 1, 15, 0, 0) == extract_datetime_from_marker_list(
markers, inherited
)
def test_invalid_date_or_time_markers_are_ignored(self):
inherited = datetime(2025, 6, 7, 8, 9, 10)
markers = ["20251301", "240000", "20250101", "150000"]
assert datetime(2025, 1, 1, 15, 0, 0) == extract_datetime_from_marker_list(
markers, inherited
)

View file

@ -0,0 +1,367 @@
import pytest
from streamd.localize.repository_configuration import (
Dimension,
Marker,
MarkerPlacement,
RepositoryConfiguration,
merge_dimensions,
merge_markers,
merge_repository_configuration,
merge_single_dimension,
merge_single_marker,
)
class TestMergeSingleDimension:
def test_second_overrides_display_name_when_non_empty(self):
base = Dimension(display_name="Base", comment="c1", propagate=True)
second = Dimension(display_name="Second", comment="c2", propagate=False)
merged = merge_single_dimension(base, second)
assert merged.display_name == "Second"
assert merged.comment == "c2"
assert merged.propagate is False
def test_second_empty_display_name_falls_back_to_base(self):
base = Dimension(display_name="Base", comment="c1", propagate=True)
second = Dimension(display_name="", comment="c2", propagate=False)
merged = merge_single_dimension(base, second)
assert merged.display_name == "Base"
assert merged.comment == "c2"
assert merged.propagate is False
def test_second_comment_none_does_not_erase_base_comment(self):
base = Dimension(display_name="Base", comment="keep", propagate=True)
second = Dimension(display_name="Second", comment=None, propagate=False)
merged = merge_single_dimension(base, second)
assert merged.display_name == "Second"
assert merged.comment == "keep"
def test_second_comment_non_none_overrides_base_comment(self):
base = Dimension(display_name="Base", comment="c1", propagate=True)
second = Dimension(display_name="Second", comment="c2", propagate=True)
merged = merge_single_dimension(base, second)
assert merged.comment == "c2"
def test_second_propagate_overrides_base_when_provided(self):
base = Dimension(display_name="Base", comment="c1", propagate=True)
second = Dimension(display_name="Second", comment="c2", propagate=False)
merged = merge_single_dimension(base, second)
assert merged.propagate is False
def test_propagate_merging_retains_base_when_second_not_provided(self):
base = Dimension(display_name="Base", comment="c1", propagate=True)
second = Dimension(display_name="Second", comment="c2")
merged = merge_single_dimension(base, second)
assert merged.propagate is True
class TestMergeDimensions:
def test_adds_new_keys_from_second(self):
base = {"a": Dimension(display_name="A", propagate=True)}
second = {"b": Dimension(display_name="B", propagate=False)}
merged = merge_dimensions(base, second)
assert set(merged.keys()) == {"a", "b"}
assert merged["a"].display_name == "A"
assert merged["b"].display_name == "B"
def test_merges_existing_keys(self):
base = {"a": Dimension(display_name="A", comment="c1", propagate=True)}
second = {"a": Dimension(display_name="A2", comment=None, propagate=False)}
merged = merge_dimensions(base, second)
assert merged["a"].display_name == "A2"
assert merged["a"].comment == "c1"
assert merged["a"].propagate is False
def test_does_not_mutate_inputs(self):
base = {"a": Dimension(display_name="A", comment="c1", propagate=True)}
second = {"b": Dimension(display_name="B", comment="c2", propagate=False)}
merged = merge_dimensions(base, second)
assert "b" not in base
assert "a" not in second
assert set(merged.keys()) == {"a", "b"}
class TestMergeSingleMarker:
def test_second_overrides_display_name_when_non_empty(self):
base = Marker(
display_name="Base",
placements=[MarkerPlacement(dimension="project", value=None)],
)
second = Marker(
display_name="Second",
placements=[MarkerPlacement(dimension="timesheet", value="coding")],
)
merged = merge_single_marker(base, second)
assert merged.display_name == "Second"
assert merged.placements == [
MarkerPlacement(dimension="project", value=None, if_with=set()),
MarkerPlacement(dimension="timesheet", value="coding", if_with=set()),
]
def test_second_empty_display_name_falls_back_to_base(self):
base = Marker(display_name="Base", placements=[])
second = Marker(display_name="", placements=[])
merged = merge_single_marker(base, second)
assert merged.display_name == "Base"
def test_appends_new_placements(self):
base = Marker(
display_name="Base",
placements=[
MarkerPlacement(dimension="project"),
],
)
second = Marker(
display_name="Second",
placements=[
MarkerPlacement(
if_with={"Timesheet"}, dimension="timesheet", value="x"
),
],
)
merged = merge_single_marker(base, second)
assert merged.placements == [
MarkerPlacement(dimension="project"),
MarkerPlacement(if_with={"Timesheet"}, dimension="timesheet", value="x"),
]
def test_deduplicates_by_identity_and_second_overrides_base(self):
base = Marker(
display_name="Base",
placements=[
MarkerPlacement(if_with={"A"}, dimension="d", value="v"),
MarkerPlacement(if_with={"B"}, dimension="d", value="v2"),
],
)
second = Marker(
display_name="Second",
placements=[
MarkerPlacement(if_with={"A"}, dimension="d", value="v"),
MarkerPlacement(if_with={"C"}, dimension="d", value="v3"),
],
)
merged = merge_single_marker(base, second)
assert merged.placements == [
MarkerPlacement(if_with={"A"}, dimension="d", value="v"),
MarkerPlacement(if_with={"B"}, dimension="d", value="v2"),
MarkerPlacement(if_with={"C"}, dimension="d", value="v3"),
]
def test_identity_is_order_insensitive_for_if_with(self):
base = Marker(
display_name="Base",
placements=[MarkerPlacement(if_with={"A", "B"}, dimension="d", value="v")],
)
second = Marker(
display_name="Second",
placements=[MarkerPlacement(if_with={"B", "A"}, dimension="d", value="v2")],
)
merged = merge_single_marker(base, second)
# With `if_with` as a set, identity is order-insensitive; second overrides base.
assert merged.placements == [
MarkerPlacement(if_with={"A", "B"}, dimension="d", value="v2"),
]
class TestMergeMarkers:
def test_adds_new_marker_keys_from_second(self):
base = {"M1": Marker(display_name="M1", placements=[])}
second = {"M2": Marker(display_name="M2", placements=[])}
merged = merge_markers(base, second)
assert set(merged.keys()) == {"M1", "M2"}
def test_merges_existing_marker_keys(self):
base = {
"M": Marker(
display_name="Base",
placements=[MarkerPlacement(dimension="project")],
)
}
second = {
"M": Marker(
display_name="Second",
placements=[
MarkerPlacement(
if_with={"Timesheet"}, dimension="timesheet", value="coding"
)
],
)
}
merged = merge_markers(base, second)
assert merged["M"].display_name == "Second"
assert merged["M"].placements == [
MarkerPlacement(dimension="project", value=None, if_with=set()),
MarkerPlacement(
if_with={"Timesheet"}, dimension="timesheet", value="coding"
),
]
def test_does_not_mutate_inputs(self):
base = {"M1": Marker(display_name="M1", placements=[])}
second = {"M2": Marker(display_name="M2", placements=[])}
merged = merge_markers(base, second)
assert "M2" not in base
assert "M1" not in second
assert set(merged.keys()) == {"M1", "M2"}
class TestMergeRepositoryConfiguration:
def test_merges_dimensions_and_markers(self):
base = RepositoryConfiguration(
dimensions={
"project": Dimension(
display_name="Project", comment="c1", propagate=True
),
"moment": Dimension(
display_name="Moment", comment="c2", propagate=True
),
},
markers={
"Streamd": Marker(
display_name="Streamd",
placements=[MarkerPlacement(dimension="project")],
)
},
)
second = RepositoryConfiguration(
dimensions={
"project": Dimension(display_name="Project2", propagate=False),
"timesheet": Dimension(
display_name="Timesheet", comment="c3", propagate=False
),
},
markers={
"Streamd": Marker(
display_name="Streamd2",
placements=[
MarkerPlacement(
if_with={"Timesheet"}, dimension="timesheet", value="coding"
)
],
),
"JobHunting": Marker(
display_name="JobHunting",
placements=[MarkerPlacement(dimension="project")],
),
},
)
merged = merge_repository_configuration(base, second)
assert set(merged.dimensions.keys()) == {"project", "moment", "timesheet"}
assert merged.dimensions["project"].display_name == "Project2"
assert merged.dimensions["project"].comment == "c1"
assert merged.dimensions["project"].propagate is False
assert merged.dimensions["moment"].display_name == "Moment"
assert merged.dimensions["timesheet"].display_name == "Timesheet"
assert set(merged.markers.keys()) == {"Streamd", "JobHunting"}
assert merged.markers["Streamd"].display_name == "Streamd2"
assert merged.markers["Streamd"].placements == [
MarkerPlacement(dimension="project", value=None, if_with=set()),
MarkerPlacement(
if_with={"Timesheet"}, dimension="timesheet", value="coding"
),
]
assert merged.markers["JobHunting"].placements == [
MarkerPlacement(dimension="project", value=None, if_with=set())
]
def test_does_not_mutate_base_or_second(self):
base = RepositoryConfiguration(
dimensions={"a": Dimension(display_name="A", propagate=True)},
markers={"M": Marker(display_name="M", placements=[])},
)
second = RepositoryConfiguration(
dimensions={"b": Dimension(display_name="B", propagate=False)},
markers={"N": Marker(display_name="N", placements=[])},
)
_ = merge_repository_configuration(base, second)
assert set(base.dimensions.keys()) == {"a"}
assert set(second.dimensions.keys()) == {"b"}
assert set(base.markers.keys()) == {"M"}
assert set(second.markers.keys()) == {"N"}
def test_merge_is_associative_for_non_conflicting_inputs(self):
a = RepositoryConfiguration(
dimensions={"d1": Dimension(display_name="D1", propagate=True)},
markers={"m1": Marker(display_name="M1", placements=[])},
)
b = RepositoryConfiguration(
dimensions={"d2": Dimension(display_name="D2", propagate=False)},
markers={"m2": Marker(display_name="M2", placements=[])},
)
c = RepositoryConfiguration(
dimensions={"d3": Dimension(display_name="D3", propagate=False)},
markers={"m3": Marker(display_name="M3", placements=[])},
)
left = merge_repository_configuration(merge_repository_configuration(a, b), c)
right = merge_repository_configuration(a, merge_repository_configuration(b, c))
assert left == right
assert set(left.dimensions.keys()) == {"d1", "d2", "d3"}
assert set(left.markers.keys()) == {"m1", "m2", "m3"}
@pytest.mark.parametrize(
("base", "second", "expected_propagate"),
[
(
RepositoryConfiguration(
dimensions={"d": Dimension(display_name="D", propagate=True)},
markers={},
),
RepositoryConfiguration(
dimensions={"d": Dimension(display_name="D2")},
markers={},
),
True,
)
],
)
def test_merge_repository_configuration_propagate_preserves_base_when_omitted(
base: RepositoryConfiguration,
second: RepositoryConfiguration,
expected_propagate: bool,
):
merged = merge_repository_configuration(base, second)
assert merged.dimensions["d"].propagate is expected_propagate

343
tests/parse/test_parse.py Normal file
View file

@ -0,0 +1,343 @@
from faker import Faker
from streamd.parse import Shard, StreamFile, parse_markdown_file
fake = Faker()
class TestParseProcess:
file_name: str = fake.file_name(extension="md")
def test_parse_empty_file(self):
assert parse_markdown_file(self.file_name, "") == StreamFile(
file_name=self.file_name, shard=Shard(start_line=1, end_line=1)
)
def test_parse_basic_one_line_file(self):
test_file = "Hello World"
assert parse_markdown_file(self.file_name, test_file) == StreamFile(
file_name=self.file_name,
shard=Shard(
start_line=1,
end_line=1,
),
)
def test_parse_basic_multi_line_file(self):
test_file = "Hello World\n\nHello again!"
assert parse_markdown_file(self.file_name, test_file) == StreamFile(
file_name=self.file_name,
shard=Shard(
start_line=1,
end_line=3,
),
)
def test_parse_single_line_with_tag(self):
test_file = "@Tag Hello World"
assert parse_markdown_file(self.file_name, test_file) == StreamFile(
file_name=self.file_name,
shard=Shard(
markers=["Tag"],
start_line=1,
end_line=1,
),
)
def test_parse_single_line_with_two_tags(self):
test_file = "@Marker1 @Marker2 Hello World"
assert parse_markdown_file(self.file_name, test_file) == StreamFile(
file_name=self.file_name,
shard=Shard(
markers=["Marker1", "Marker2"],
start_line=1,
end_line=1,
),
)
def test_parse_single_line_with_two_tags_and_misplaced_tag(self):
test_file = "@Tag1 @Tag2 Hello World @Tag3"
assert parse_markdown_file(self.file_name, test_file) == StreamFile(
file_name=self.file_name,
shard=Shard(
markers=["Tag1", "Tag2"],
tags=["Tag3"],
start_line=1,
end_line=1,
),
)
def test_parse_split_paragraphs_into_shards(self):
file_text = "Hello World!\n\n@Tag1 Block 1\n\n@Tag2 Block 2"
assert parse_markdown_file(self.file_name, file_text) == StreamFile(
file_name=self.file_name,
shard=Shard(
start_line=1,
end_line=5,
children=[
Shard(
markers=["Tag1"],
start_line=3,
end_line=3,
),
Shard(
markers=["Tag2"],
start_line=5,
end_line=5,
),
],
),
)
def test_parse_split_paragraph_with_inner_tags_at_more_positions(self):
file_text = "Hello @Tag1 World!\n\n@Marker Block 1\n\nBlock 2 @Tag2"
assert parse_markdown_file(self.file_name, file_text).shard == Shard(
tags=["Tag1", "Tag2"],
start_line=1,
end_line=5,
children=[
Shard(markers=["Marker"], start_line=3, end_line=3, children=[]),
],
)
def test_parse_header_without_markers(self):
file_text = "# Heading\n\n## Subheading"
assert parse_markdown_file(self.file_name, file_text).shard == Shard(
start_line=1,
end_line=3,
)
def test_parse_split_at_heading_if_marker_on_subheading(self):
file_text = "# Heading @Tag1\n\n## @Marker1 Subheading @Tag2\n\n# Heading @Tag3"
assert parse_markdown_file(self.file_name, file_text) == StreamFile(
file_name=self.file_name,
shard=Shard(
start_line=1,
end_line=5,
children=[
Shard(
tags=["Tag1"],
start_line=1,
end_line=4,
children=[
Shard(
markers=["Marker1"],
tags=["Tag2"],
start_line=3,
end_line=4,
),
],
),
Shard(tags=["Tag3"], start_line=5, end_line=5, children=[]),
],
),
)
def test_parse_only_parse_releveant_levels(self):
file_text = "# @Marker1 Heading @Tag1\n\n## Subheading @Tag2"
assert parse_markdown_file(self.file_name, file_text) == StreamFile(
file_name=self.file_name,
shard=Shard(
markers=["Marker1"],
tags=["Tag1", "Tag2"],
start_line=1,
end_line=3,
),
)
def test_parse_fullly_before_headings_start(self):
file_text = "Hello\n\n@Marker1 World!\n\n# @Marker2 I'm a heading!"
assert parse_markdown_file(self.file_name, file_text).shard == Shard(
start_line=1,
end_line=5,
children=[
Shard(
start_line=1,
end_line=4,
children=[
Shard(
markers=["Marker1"],
start_line=3,
end_line=3,
)
],
),
Shard(markers=["Marker2"], start_line=5, end_line=5, children=[]),
],
)
def test_parse_complex_heading_structure(self):
file_text = "Preamble @Preamble\n## @Intro\n# @Title\n## @Chapter1\n## @Chapter2\n### Section 1\n### Section 2"
assert parse_markdown_file(self.file_name, file_text).shard == Shard(
start_line=1,
end_line=7,
children=[
Shard(
start_line=1,
end_line=2,
children=[
Shard(
tags=["Preamble"],
start_line=1,
end_line=1,
),
Shard(
markers=["Intro"],
start_line=2,
end_line=2,
),
],
),
Shard(
markers=["Title"],
start_line=3,
end_line=7,
children=[
Shard(
markers=["Chapter1"],
start_line=4,
end_line=4,
),
Shard(
markers=["Chapter2"],
start_line=5,
end_line=7,
),
],
),
],
)
def test_simple_list(self):
file_text = "* hello world\n * @Marker i've got a marker"
assert parse_markdown_file(self.file_name, file_text).shard == Shard(
markers=[],
tags=[],
start_line=1,
end_line=2,
children=[
Shard(
markers=["Marker"], tags=[], start_line=2, end_line=2, children=[]
)
],
)
def test_parse_complex_list(self):
file_text = """* I'm the parent!
* @Marker1 I've got a marker\n
* I've got no marker!
* I've got a child with a marker!
* @Marker2 I'm the child with the marker
"""
assert parse_markdown_file(self.file_name, file_text).shard == Shard(
markers=[],
tags=[],
start_line=1,
end_line=6,
children=[
Shard(
markers=[],
tags=[],
start_line=2,
end_line=6,
children=[
Shard(
markers=["Marker1"],
tags=[],
start_line=2,
end_line=3,
children=[],
),
Shard(
markers=[],
tags=[],
start_line=5,
end_line=6,
children=[
Shard(
markers=["Marker2"],
tags=[],
start_line=6,
end_line=6,
children=[],
)
],
),
],
)
],
)
def test_parse_ignores_tags_in_code(self):
file_text = "```\n@Marker\n```"
assert parse_markdown_file(self.file_name, file_text).shard == Shard(
markers=[],
tags=[],
start_line=1,
end_line=3,
children=[],
)
def test_parse_finds_tags_in_italic_text(self):
file_text = "*@ItalicMarker*"
assert parse_markdown_file(self.file_name, file_text).shard == Shard(
markers=["ItalicMarker"],
tags=[],
start_line=1,
end_line=1,
children=[],
)
def test_parse_finds_tags_in_bold_text(self):
file_text = "**@BoldMarker**"
assert parse_markdown_file(self.file_name, file_text).shard == Shard(
markers=["BoldMarker"],
tags=[],
start_line=1,
end_line=1,
children=[],
)
def test_parse_finds_tags_in_strikethrough_text(self):
file_text = "~~@StrikeMarker~~"
assert parse_markdown_file(self.file_name, file_text).shard == Shard(
markers=["StrikeMarker"],
tags=[],
start_line=1,
end_line=1,
children=[],
)
def test_parse_finds_tags_in_link(self):
file_text = "[@LinkMarker](https://konstantinfickel.de)"
assert parse_markdown_file(self.file_name, file_text).shard == Shard(
markers=["LinkMarker"],
tags=[],
start_line=1,
end_line=1,
children=[],
)
def test_parse_continues_looking_for_markers_after_first_link_marker(self):
file_text = "[@LinkMarker1](https://konstantinfickel.de1) [@LinkMarker2](https://konstantinfickel.de)"
assert parse_markdown_file(self.file_name, file_text).shard == Shard(
markers=["LinkMarker1", "LinkMarker2"],
tags=[],
start_line=1,
end_line=1,
children=[],
)

104
tests/query/test_find.py Normal file
View file

@ -0,0 +1,104 @@
from __future__ import annotations
from datetime import datetime
from streamd.localize import LocalizedShard
from streamd.query.find import find_shard, find_shard_by_position
def generate_localized_shard(
*,
location: dict[str, str] | None = None,
children: list[LocalizedShard] | None = None,
) -> LocalizedShard:
return LocalizedShard(
start_line=1,
end_line=1,
moment=datetime(2020, 1, 1),
location=location or {},
children=children or [],
markers=[],
tags=[],
)
class TestFindShard:
def test_returns_empty_when_no_match(self) -> None:
root = generate_localized_shard(location={"file": "a.md"})
shards = [root]
result = find_shard(shards, lambda s: "missing" in s.location)
assert result == []
def test_finds_matches_depth_first_and_preserves_order(self) -> None:
grandchild = generate_localized_shard(location={"k": "match"})
child1 = generate_localized_shard(
location={"k": "match"}, children=[grandchild]
)
child2 = generate_localized_shard(location={"k": "nope"})
root = generate_localized_shard(
location={"k": "nope"}, children=[child1, child2]
)
result = find_shard([root], lambda s: s.location.get("k") == "match")
assert result == [child1, grandchild]
def test_includes_root_if_it_matches(self) -> None:
root = generate_localized_shard(
location={"k": "match"},
children=[generate_localized_shard(location={"k": "match"})],
)
result = find_shard([root], lambda s: s.location.get("k") == "match")
assert result[0] is root
assert len(result) == 2
def test_multiple_roots_keeps_left_to_right_order(self) -> None:
a = generate_localized_shard(location={"k": "match"})
b = generate_localized_shard(location={"k": "match"})
c = generate_localized_shard(location={"k": "nope"})
result = find_shard([a, b, c], lambda s: s.location.get("k") == "match")
assert result == [a, b]
def test_query_function_can_use_arbitrary_logic(self) -> None:
# Ensures typing/behavior supports any callable that returns bool.
a = generate_localized_shard(location={"x": "1"})
b = generate_localized_shard(location={"x": "2"})
c = generate_localized_shard(location={"x": "3"})
root = generate_localized_shard(location={}, children=[a, b, c])
def is_even_x(shard: LocalizedShard) -> bool:
x = shard.location.get("x")
return x is not None and int(x) % 2 == 0
result = find_shard([root], is_even_x)
assert result == [b]
class TestFindShardByPosition:
def test_matches_only_when_dimension_present_and_equal(self) -> None:
match = generate_localized_shard(location={"file": "a.md", "line": "10"})
wrong_value = generate_localized_shard(location={"file": "a.md", "line": "11"})
missing_dim = generate_localized_shard(location={"file": "a.md"})
root = generate_localized_shard(
location={"root": "x"}, children=[match, wrong_value, missing_dim]
)
result = find_shard_by_position([root], "line", "10")
assert result == [match]
def test_recurses_through_children(self) -> None:
deep = generate_localized_shard(location={"section": "s1"})
mid = generate_localized_shard(location={"section": "s0"}, children=[deep])
root = generate_localized_shard(location={}, children=[mid])
result = find_shard_by_position([root], "section", "s1")
assert result == [deep]

231
tests/test_localize.py Normal file
View file

@ -0,0 +1,231 @@
from datetime import datetime
from streamd.localize.localize import localize_stream_file
from streamd.localize.localized_shard import LocalizedShard
from streamd.localize.repository_configuration import (
Dimension,
Marker,
MarkerPlacement,
RepositoryConfiguration,
)
from streamd.parse.shard import Shard, StreamFile
repository_configuration = RepositoryConfiguration(
dimensions={
"project": Dimension(
display_name="Project",
comment="GTD Project that is being worked on",
propagate=True,
),
"moment": Dimension(
display_name="Moment",
comment="Timestamp this entry was created at",
propagate=True,
),
"timesheet": Dimension(
display_name="Timesheet",
comment="Time Cards for Time Tracking",
propagate=True,
),
},
markers={
"Streamd": Marker(
display_name="Streamd",
placements=[
MarkerPlacement(dimension="project"),
MarkerPlacement(
if_with={"Timesheet"}, dimension="timesheet", value="coding"
),
],
),
"JobHunting": Marker(
display_name="JobHunting", placements=[MarkerPlacement(dimension="project")]
),
},
)
class TestLocalize:
def test_project_simple_stream_file(self):
stream_file = StreamFile(
file_name="20250622-121000 Test File.md",
shard=Shard(start_line=1, end_line=1, markers=["Streamd"]),
)
assert localize_stream_file(
stream_file, repository_configuration
) == LocalizedShard(
moment=datetime(2025, 6, 22, 12, 10, 0, 0),
markers=["Streamd"],
tags=[],
start_line=1,
end_line=1,
children=[],
location={"project": "Streamd", "file": stream_file.file_name},
)
def test_timesheet_use_case(self):
stream_file = StreamFile(
file_name="20260131-210000 Test File.md",
shard=Shard(start_line=1, end_line=1, markers=["Timesheet", "Streamd"]),
)
assert localize_stream_file(
stream_file, repository_configuration
) == LocalizedShard(
moment=datetime(2026, 1, 31, 21, 0, 0, 0),
markers=["Timesheet", "Streamd"],
tags=[],
start_line=1,
end_line=1,
children=[],
location={
"file": stream_file.file_name,
"project": "Streamd",
"timesheet": "coding",
},
)
def test_overwrites_true_propagated_dimension_overwrites_existing_value(self):
config = RepositoryConfiguration(
dimensions={
"project": Dimension(display_name="Project", propagate=True),
},
markers={
"A": Marker(
display_name="A",
placements=[MarkerPlacement(dimension="project", value="a")],
),
"B": Marker(
display_name="B",
placements=[
MarkerPlacement(dimension="project", value="b", overwrites=True)
],
),
},
)
stream_file = StreamFile(
file_name="20260131-210000 Test File.md",
shard=Shard(start_line=1, end_line=1, markers=["A", "B"]),
)
assert localize_stream_file(stream_file, config) == LocalizedShard(
moment=datetime(2026, 1, 31, 21, 0, 0, 0),
markers=["A", "B"],
tags=[],
start_line=1,
end_line=1,
children=[],
location={"file": stream_file.file_name, "project": "b"},
)
def test_overwrites_false_propagated_dimension_does_not_overwrite_existing_value(
self,
):
config = RepositoryConfiguration(
dimensions={
"project": Dimension(display_name="Project", propagate=True),
},
markers={
"A": Marker(
display_name="A",
placements=[MarkerPlacement(dimension="project", value="a")],
),
"B": Marker(
display_name="B",
placements=[
MarkerPlacement(
dimension="project", value="b", overwrites=False
)
],
),
},
)
stream_file = StreamFile(
file_name="20260131-210000 Test File.md",
shard=Shard(start_line=1, end_line=1, markers=["A", "B"]),
)
assert localize_stream_file(stream_file, config) == LocalizedShard(
moment=datetime(2026, 1, 31, 21, 0, 0, 0),
markers=["A", "B"],
tags=[],
start_line=1,
end_line=1,
children=[],
location={"file": stream_file.file_name, "project": "a"},
)
def test_overwrites_true_non_propagated_dimension_overwrites_private_value(self):
config = RepositoryConfiguration(
dimensions={
"label": Dimension(display_name="Label", propagate=False),
},
markers={
"A": Marker(
display_name="A",
placements=[MarkerPlacement(dimension="label", value="a")],
),
"B": Marker(
display_name="B",
placements=[
MarkerPlacement(dimension="label", value="b", overwrites=True)
],
),
},
)
stream_file = StreamFile(
file_name="20260131-210000 Test File.md",
shard=Shard(start_line=1, end_line=1, markers=["A", "B"]),
)
assert localize_stream_file(stream_file, config) == LocalizedShard(
moment=datetime(2026, 1, 31, 21, 0, 0, 0),
markers=["A", "B"],
tags=[],
start_line=1,
end_line=1,
children=[],
location={"file": stream_file.file_name, "label": "b"},
)
def test_overwrites_false_non_propagated_dimension_does_not_overwrite_private_value(
self,
):
config = RepositoryConfiguration(
dimensions={
"label": Dimension(display_name="Label", propagate=False),
},
markers={
"A": Marker(
display_name="A",
placements=[
MarkerPlacement(dimension="label", value="a", overwrites=True)
],
),
"B": Marker(
display_name="B",
placements=[
MarkerPlacement(dimension="label", value="b", overwrites=False)
],
),
},
)
stream_file = StreamFile(
file_name="20260131-210000 Test File.md",
shard=Shard(start_line=1, end_line=1, markers=["A", "B"]),
)
assert localize_stream_file(stream_file, config) == LocalizedShard(
moment=datetime(2026, 1, 31, 21, 0, 0, 0),
markers=["A", "B"],
tags=[],
start_line=1,
end_line=1,
children=[],
location={"file": stream_file.file_name, "label": "a"},
)

View file

@ -0,0 +1,288 @@
from __future__ import annotations
from datetime import datetime, time
import pytest
from streamd.localize.localized_shard import LocalizedShard
from streamd.timesheet.configuration import (
TIMESHEET_DIMENSION_NAME,
TimesheetPointType,
)
from streamd.timesheet.extract import extract_timesheets
from streamd.timesheet.timecard import SpecialDayType, Timecard, Timesheet
def point(at: datetime, type: TimesheetPointType) -> LocalizedShard:
"""
Create a minimal LocalizedShard that will be interpreted as a timesheet point.
Note: The extract pipeline uses set-dimension filtering; we therefore ensure the
timesheet dimension is set in `location`.
"""
return LocalizedShard(
moment=at,
markers=["Timesheet"],
tags=[],
start_line=1,
end_line=1,
children=[],
location={TIMESHEET_DIMENSION_NAME: type.value, "file": "dummy.md"},
)
class TestExtractTimesheets:
def test_single_work_block(self):
day = datetime(2026, 2, 1, 0, 0, 0)
shards = [
point(day.replace(hour=9, minute=0), TimesheetPointType.Card),
point(day.replace(hour=17, minute=30), TimesheetPointType.Break),
]
assert extract_timesheets(shards) == [
Timesheet(
date=day.date(),
is_sick_leave=False,
special_day_type=None,
timecards=[Timecard(from_time=time(9, 0), to_time=time(17, 30))],
)
]
def test_three_work_blocks_separated_by_breaks(self):
day = datetime(2026, 2, 1, 0, 0, 0)
shards = [
point(day.replace(hour=7, minute=15), TimesheetPointType.Card),
point(day.replace(hour=12, minute=0), TimesheetPointType.Break),
point(day.replace(hour=12, minute=45), TimesheetPointType.Card),
point(day.replace(hour=15, minute=0), TimesheetPointType.Break),
point(day.replace(hour=16, minute=0), TimesheetPointType.Card),
point(day.replace(hour=17, minute=0), TimesheetPointType.Break),
]
assert extract_timesheets(shards) == [
Timesheet(
date=day.date(),
is_sick_leave=False,
special_day_type=None,
timecards=[
Timecard(from_time=time(7, 15), to_time=time(12, 0)),
Timecard(from_time=time(12, 45), to_time=time(15, 0)),
Timecard(from_time=time(16, 0), to_time=time(17, 0)),
],
)
]
def test_input_order_is_not_required_within_a_day(self):
"""
Points may come unsorted; extraction should sort by timestamp within a day.
"""
day = datetime(2026, 2, 1, 0, 0, 0)
shards = [
point(day.replace(hour=15, minute=0), TimesheetPointType.Break),
point(day.replace(hour=7, minute=15), TimesheetPointType.Card),
point(day.replace(hour=12, minute=0), TimesheetPointType.Break),
point(day.replace(hour=12, minute=45), TimesheetPointType.Card),
point(day.replace(hour=17, minute=0), TimesheetPointType.Break),
point(day.replace(hour=16, minute=0), TimesheetPointType.Card),
]
assert extract_timesheets(shards) == [
Timesheet(
date=day.date(),
is_sick_leave=False,
special_day_type=None,
timecards=[
Timecard(from_time=time(7, 15), to_time=time(12, 0)),
Timecard(from_time=time(12, 45), to_time=time(15, 0)),
Timecard(from_time=time(16, 0), to_time=time(17, 0)),
],
)
]
def test_groups_by_day(self):
"""
If points span multiple days, we should get one Timesheet per day.
"""
day1 = datetime(2026, 2, 1, 0, 0, 0)
day2 = datetime(2026, 2, 2, 0, 0, 0)
shards = [
point(day2.replace(hour=10, minute=0), TimesheetPointType.Card),
point(day2.replace(hour=18, minute=0), TimesheetPointType.Break),
point(day1.replace(hour=9, minute=0), TimesheetPointType.Card),
point(day1.replace(hour=17, minute=0), TimesheetPointType.Break),
]
# Note: current implementation groups by date using `itertools.groupby` on the
# incoming order; to be robust, we pass day1 points first, then day2 points.
# This asserts the intended behavior.
shards = [
point(day1.replace(hour=9, minute=0), TimesheetPointType.Card),
point(day1.replace(hour=17, minute=0), TimesheetPointType.Break),
point(day2.replace(hour=10, minute=0), TimesheetPointType.Card),
point(day2.replace(hour=18, minute=0), TimesheetPointType.Break),
]
assert extract_timesheets(shards) == [
Timesheet(
date=day1.date(),
is_sick_leave=False,
special_day_type=None,
timecards=[Timecard(from_time=time(9, 0), to_time=time(17, 0))],
),
Timesheet(
date=day2.date(),
is_sick_leave=False,
special_day_type=None,
timecards=[Timecard(from_time=time(10, 0), to_time=time(18, 0))],
),
]
def test_day_with_only_special_day_type_vacation(self):
"""
A day can be marked as Vacation without timecards; it should still produce a Timesheet.
"""
day = datetime(2026, 2, 1, 0, 0, 0)
shards = [
point(day.replace(hour=8, minute=0), TimesheetPointType.Vacation),
point(day.replace(hour=9, minute=0), TimesheetPointType.Break),
]
assert extract_timesheets(shards) == [
Timesheet(
date=day.date(),
is_sick_leave=False,
special_day_type=SpecialDayType.Vacation,
timecards=[],
)
]
def test_day_with_only_special_day_type_holiday(self):
day = datetime(2026, 2, 1, 0, 0, 0)
shards = [
point(day.replace(hour=8, minute=0), TimesheetPointType.Holiday),
point(day.replace(hour=9, minute=0), TimesheetPointType.Break),
]
assert extract_timesheets(shards) == [
Timesheet(
date=day.date(),
is_sick_leave=False,
special_day_type=SpecialDayType.Holiday,
timecards=[],
)
]
def test_day_with_only_special_day_type_undertime(self):
day = datetime(2026, 2, 1, 0, 0, 0)
shards = [
point(day.replace(hour=8, minute=0), TimesheetPointType.Undertime),
point(day.replace(hour=9, minute=0), TimesheetPointType.Break),
]
assert extract_timesheets(shards) == [
Timesheet(
date=day.date(),
is_sick_leave=False,
special_day_type=SpecialDayType.Undertime,
timecards=[],
)
]
def test_day_with_sick_leave_and_timecards(self):
"""
SickLeave should set the flag but not prevent timecard aggregation.
"""
day = datetime(2026, 2, 1, 0, 0, 0)
shards = [
point(day.replace(hour=7, minute=30), TimesheetPointType.SickLeave),
point(day.replace(hour=9, minute=0), TimesheetPointType.Card),
point(day.replace(hour=12, minute=0), TimesheetPointType.Break),
]
assert extract_timesheets(shards) == [
Timesheet(
date=day.date(),
is_sick_leave=True,
special_day_type=None,
timecards=[Timecard(from_time=time(9, 0), to_time=time(12, 0))],
)
]
def test_day_with_sick_leave_only(self):
"""
A day with only SickLeave should still produce a Timesheet (no timecards).
"""
day = datetime(2026, 2, 1, 0, 0, 0)
shards = [
point(day.replace(hour=8, minute=0), TimesheetPointType.SickLeave),
point(day.replace(hour=9, minute=0), TimesheetPointType.Break),
]
assert extract_timesheets(shards) == [
Timesheet(
date=day.date(),
is_sick_leave=True,
special_day_type=None,
timecards=[],
)
]
def test_empty_input(self):
assert extract_timesheets([]) == []
def test_day_with_only_cards_and_no_break_is_invalid(self):
"""
A day ending 'in work' (last point not a Break) should raise.
"""
day = datetime(2026, 2, 1, 0, 0, 0)
shards = [
point(day.replace(hour=9, minute=0), TimesheetPointType.Card),
point(day.replace(hour=12, minute=0), TimesheetPointType.Card),
]
with pytest.raises(ValueError, match=r"Last Timecard of .* is not a break"):
_ = extract_timesheets(shards)
def test_two_special_day_types_same_day_is_invalid(self):
"""
A day cannot be both Vacation and Holiday (or any two distinct special types).
"""
day = datetime(2026, 2, 1, 0, 0, 0)
shards = [
point(day.replace(hour=8, minute=0), TimesheetPointType.Vacation),
point(day.replace(hour=8, minute=5), TimesheetPointType.Holiday),
point(day.replace(hour=9, minute=0), TimesheetPointType.Break),
]
with pytest.raises(ValueError, match=r"is both .* and .*"):
_ = extract_timesheets(shards)
def test_points_with_mixed_dates_inside_one_group_raises(self):
"""
Defensive: if aggregation receives points spanning multiple dates for a single day,
it should raise. (This can occur if higher-level grouping is incorrect.)
"""
day1 = datetime(2026, 2, 1, 0, 0, 0)
day2 = datetime(2026, 2, 2, 0, 0, 0)
shards = [
point(day1.replace(hour=9, minute=0), TimesheetPointType.Card),
point(day2.replace(hour=9, minute=30), TimesheetPointType.Break),
]
with pytest.raises(ValueError, match=r"Last Timecard of .* is not a break"):
_ = extract_timesheets(shards)
def test_day_with_only_breaks_is_ignored(self):
"""
A day with no timecards and no sick/special markers should not emit a Timesheet.
"""
day = datetime(2026, 2, 1, 0, 0, 0)
shards = [
point(day.replace(hour=12, minute=0), TimesheetPointType.Break),
point(day.replace(hour=13, minute=0), TimesheetPointType.Break),
]
assert extract_timesheets(shards) == []