Implement Edit Feature #28

Merged
kfickel merged 12 commits from 8-edit-feature into main 2026-02-01 10:34:32 +01:00
20 changed files with 1091 additions and 129 deletions

View file

@ -21,6 +21,3 @@ jobs:
- name: Test with PyTest - name: Test with PyTest
run: nix develop .#impure --command bash -c "uv run pytest --junit-xml test-report.xml" run: nix develop .#impure --command bash -c "uv run pytest --junit-xml test-report.xml"
- name: Check with PyRight
run: nix develop .#impure --command bash -c "uv run pyright"

View file

@ -88,12 +88,14 @@
# Package a virtual environment as our main application. # Package a virtual environment as our main application.
# #
# Enable no optional dependencies for production build. # Enable no optional dependencies for production build.
packages.x86_64-linux = let packages.x86_64-linux =
streamer = pythonSet.mkVirtualEnv "streamer-env" workspace.deps.default; let
in { streamer = pythonSet.mkVirtualEnv "streamer-env" workspace.deps.default;
inherit streamer; in
default = streamer; {
}; inherit streamer;
default = streamer;
};
# Make streamer runnable with `nix run` # Make streamer runnable with `nix run`
apps.x86_64-linux = { apps.x86_64-linux = {
@ -116,18 +118,17 @@
pre-commit pre-commit
bashInteractive bashInteractive
]; ];
env = env = {
{ # Prevent uv from managing Python downloads
# Prevent uv from managing Python downloads UV_PYTHON_DOWNLOADS = "never";
UV_PYTHON_DOWNLOADS = "never"; # Force uv to use nixpkgs Python interpreter
# Force uv to use nixpkgs Python interpreter UV_PYTHON = python.interpreter;
UV_PYTHON = python.interpreter; }
} // lib.optionalAttrs pkgs.stdenv.isLinux {
// lib.optionalAttrs pkgs.stdenv.isLinux { # Python libraries often load native shared objects using dlopen(3).
# Python libraries often load native shared objects using dlopen(3). # Setting LD_LIBRARY_PATH makes the dynamic library loader aware of libraries without using RPATH for lookup.
# Setting LD_LIBRARY_PATH makes the dynamic library loader aware of libraries without using RPATH for lookup. LD_LIBRARY_PATH = lib.makeLibraryPath pkgs.pythonManylinuxPackages.manylinux1;
LD_LIBRARY_PATH = lib.makeLibraryPath pkgs.pythonManylinuxPackages.manylinux1; };
};
shellHook = '' shellHook = ''
unset PYTHONPATH unset PYTHONPATH
''; '';

View file

@ -1,43 +1,74 @@
import glob import glob
import os import os
import typer from datetime import datetime
from shutil import move
from typing import Annotated, Generator
import click import click
import typer
from rich import print from rich import print
from rich.markdown import Markdown from rich.markdown import Markdown
from rich.panel import Panel from rich.panel import Panel
from shutil import move
from datetime import datetime from streamer.localize import (
LocalizedShard,
from streamer.parse.parse import parse_markdown_file RepositoryConfiguration,
from streamer.query.task import find_by_markers localize_stream_file,
)
from streamer.localize.preconfigured_configurations import TaskConfiguration
from streamer.parse import parse_markdown_file
from streamer.query import find_shard_by_position
from streamer.settings import Settings from streamer.settings import Settings
app = typer.Typer() app = typer.Typer()
@app.command() def all_files(config: RepositoryConfiguration) -> Generator[LocalizedShard]:
def todo() -> None:
for file_name in glob.glob(f"{glob.escape(Settings().base_folder)}/*.md"): for file_name in glob.glob(f"{glob.escape(Settings().base_folder)}/*.md"):
with open(file_name, "r") as file: with open(file_name, "r") as file:
file_content = file.read() file_content = file.read()
sharded_document = parse_markdown_file(file_name, file_content) if shard := localize_stream_file(
if sharded_document.shard: parse_markdown_file(file_name, file_content), config
open_tasks = find_by_markers(sharded_document.shard, ["Task"], ["Done"]) ):
yield shard
for task_shard in open_tasks:
print( @app.command()
Panel( def todo() -> None:
Markdown( all_shards = list(all_files(TaskConfiguration))
"\n".join(
file_content.splitlines()[ for task_shard in find_shard_by_position(all_shards, "task", "open"):
task_shard.start_line - 1 : task_shard.end_line with open(task_shard.location["file"], "r") as file:
] file_content = file.read().splitlines()
) print(
), Panel(
title=f"{file_name}:{task_shard.start_line}", Markdown(
"\n".join(
file_content[
task_shard.start_line - 1 : task_shard.end_line
]
) )
) ),
title=f"{task_shard.location['file']}:{task_shard.start_line}",
)
)
@app.command()
def edit(number: Annotated[int, typer.Argument()] = 1) -> None:
all_shards = list(all_files(TaskConfiguration))
sorted_shards = sorted(all_shards, key=lambda s: s.moment)
if abs(number) >= len(sorted_shards):
raise ValueError("Argument out of range")
selected_number = number
if selected_number >= 0:
selected_number = len(sorted_shards) - selected_number
else:
selected_number = -selected_number
click.edit(None, filename=sorted_shards[selected_number].location["file"])
@app.command() @app.command()
@ -59,7 +90,9 @@ def new() -> None:
parsed_content = parse_markdown_file(prelimary_path, content) parsed_content = parse_markdown_file(prelimary_path, content)
final_file_name = f"{timestamp}.md" final_file_name = f"{timestamp}.md"
if parsed_content.shard is not None and len(markers := parsed_content.shard.markers): if parsed_content.shard is not None and len(
markers := parsed_content.shard.markers
):
final_file_name = f"{timestamp} {' '.join(markers)}.md" final_file_name = f"{timestamp} {' '.join(markers)}.md"
final_path = os.path.join(streamer_directory, final_file_name) final_path = os.path.join(streamer_directory, final_file_name)

View file

@ -1,6 +1,6 @@
from .localize import localize_stream_file from .localize import localize_stream_file
from .repostory_configuration import RepositoryConfiguration
from .localized_shard import LocalizedShard from .localized_shard import LocalizedShard
from .repository_configuration import RepositoryConfiguration
__all__ = [ __all__ = [
"RepositoryConfiguration", "RepositoryConfiguration",

View file

@ -1,9 +1,9 @@
from datetime import datetime
import re
import os import os
import re
from datetime import date, datetime, time
def extract_date_from_file_name(file_name: str) -> datetime | None: def extract_datetime_from_file_name(file_name: str) -> datetime | None:
FILE_NAME_REGEX = r"^(?P<date>\d{8})(?:-(?P<time>\d{4,6}))?.+.md$" FILE_NAME_REGEX = r"^(?P<date>\d{8})(?:-(?P<time>\d{4,6}))?.+.md$"
base_name = os.path.basename(file_name) base_name = os.path.basename(file_name)
match = re.match(FILE_NAME_REGEX, base_name) match = re.match(FILE_NAME_REGEX, base_name)
@ -17,4 +17,76 @@ def extract_date_from_file_name(file_name: str) -> datetime | None:
return None return None
__all__ = ["extract_date_from_file_name"] def extract_datetime_from_marker(marker: str) -> datetime | None:
"""
Extract a datetime from a marker string in the exact format: YYYYMMDDHHMMSS.
Returns:
Parsed datetime if the format is fulfilled and values are valid, else None.
"""
if not re.fullmatch(r"\d{14}", marker or ""):
return None
try:
return datetime.strptime(marker, "%Y%m%d%H%M%S")
except ValueError:
return None
def extract_date_from_marker(marker: str) -> date | None:
"""
Extract a date from a marker string in the exact format: YYYYMMDD.
Returns:
Parsed date if the format is fulfilled and values are valid, else None.
"""
if not re.fullmatch(r"\d{8}", marker or ""):
return None
try:
return datetime.strptime(marker, "%Y%m%d").date()
except ValueError:
return None
def extract_time_from_marker(marker: str) -> time | None: # noqa: F821
"""
Extract a time from a marker string in the exact format: HHMMSS.
Returns:
Parsed time if the format is fulfilled and values are valid, else None.
"""
if not re.fullmatch(r"\d{6}", marker or ""):
return None
try:
return datetime.strptime(marker, "%H%M%S").time()
except ValueError:
return None
def extract_datetime_from_marker_list(markers: list[str], inherited_datetime: datetime):
shard_time: time | None = None
shard_date: date | None = None
for marker in markers[::-1]:
if parsed_time := extract_time_from_marker(marker):
shard_time = parsed_time
if parsed_date := extract_date_from_marker(marker):
shard_date = parsed_date
if parsed_datetime := extract_datetime_from_marker(marker):
shard_date = parsed_datetime.date()
shard_time = parsed_datetime.time()
if shard_date and not shard_time:
return datetime.combine(shard_date, time(0, 0, 0))
return datetime.combine(
shard_date or inherited_datetime.date(), shard_time or inherited_datetime.time()
)
__all__ = [
"extract_datetime_from_file_name",
"extract_datetime_from_marker",
"extract_date_from_marker",
"extract_time_from_marker",
"extract_datetime_from_marker_list",
]

View file

@ -1,46 +1,66 @@
from datetime import datetime
from streamer.parse.shard import Shard, StreamFile from streamer.parse.shard import Shard, StreamFile
from .repostory_configuration import RepositoryConfiguration from .extract_datetime import (
extract_datetime_from_file_name,
extract_datetime_from_marker_list,
)
from .localized_shard import LocalizedShard from .localized_shard import LocalizedShard
from .extract_datetime import extract_date_from_file_name from .repository_configuration import RepositoryConfiguration
def localize_shard( def localize_shard(
shard: Shard, config: RepositoryConfiguration, propagated: dict[str, str] shard: Shard,
config: RepositoryConfiguration,
propagated: dict[str, str],
moment: datetime,
) -> LocalizedShard: ) -> LocalizedShard:
position = {**propagated} position = {**propagated}
private_position: dict[str, str] = {} private_position: dict[str, str] = {}
adjusted_moment: datetime = extract_datetime_from_marker_list(shard.markers, moment)
for marker in shard.markers: for marker in shard.markers:
normalized_marker = marker.lower() if marker in config.markers:
if marker_definition := config.markers[normalized_marker]: marker_definition = config.markers[marker]
dimension_name = marker_definition.dimension for placement in marker_definition.placements:
dimension = config.dimensions[marker_definition.dimension] if placement.if_with <= set(shard.markers):
dimension = config.dimensions[placement.dimension]
if dimension.propagate: value = placement.value or marker
position[dimension_name] = normalized_marker
else:
private_position[dimension_name] = normalized_marker
children = [localize_shard(child, config, position) for child in shard.children] if dimension.propagate:
position[placement.dimension] = value
else:
private_position[placement.dimension] = value
children = [
localize_shard(child, config, position, adjusted_moment)
for child in shard.children
]
position.update(private_position) position.update(private_position)
return LocalizedShard( return LocalizedShard(
**shard.model_dump(exclude={"children"}), location=position, children=children **shard.model_dump(exclude={"children"}),
location=position,
children=children,
moment=adjusted_moment,
) )
def localize_stream_file( def localize_stream_file(
stream_file: StreamFile, config: RepositoryConfiguration stream_file: StreamFile, config: RepositoryConfiguration
) -> LocalizedShard | None: ) -> LocalizedShard | None:
shard_date = extract_date_from_file_name(stream_file.filename) shard_date = extract_datetime_from_file_name(stream_file.file_name)
if not shard_date or not stream_file.shard: if not shard_date or not stream_file.shard:
raise ValueError("Could not extract date") raise ValueError("Could not extract date")
return localize_shard(stream_file.shard, config, {"moment": shard_date.isoformat()}) return localize_shard(
stream_file.shard, config, {"file": stream_file.file_name}, shard_date
)
__all__ = ["localize_stream_file"] __all__ = ["localize_stream_file"]

View file

@ -1,8 +1,12 @@
from __future__ import annotations from __future__ import annotations
from datetime import datetime
from streamer.parse.shard import Shard from streamer.parse.shard import Shard
class LocalizedShard(Shard): class LocalizedShard(Shard):
moment: datetime
location: dict[str, str] location: dict[str, str]
children: list[LocalizedShard] = [] # pyright: ignore[reportIncompatibleVariableOverride] children: list[LocalizedShard] = [] # pyright: ignore[reportIncompatibleVariableOverride]

View file

@ -0,0 +1,95 @@
from streamer.localize.repository_configuration import (
Dimension,
Marker,
MarkerPlacement,
RepositoryConfiguration,
)
TaskConfiguration = RepositoryConfiguration(
dimensions={
"task": Dimension(
display_name="Task",
comment="If placed, the given shard is a task. The placement determines the state.",
propagate=False,
),
"project": Dimension(
display_name="Project",
comment="Project the task is attached to",
propagate=True,
),
},
markers={
"Task": Marker(
display_name="Task",
placements=[
MarkerPlacement(dimension="task", value="open"),
MarkerPlacement(if_with={"Done"}, dimension="task", value="done"),
MarkerPlacement(if_with={"Waiting"}, dimension="task", value="waiting"),
MarkerPlacement(
if_with={"Cancelled"}, dimension="task", value="cancelled"
),
MarkerPlacement(
if_with={"NotDone"}, dimension="task", value="cancelled"
),
],
),
"WaitingFor": Marker(
display_name="Task",
placements=[
MarkerPlacement(dimension="task", value="waiting"),
],
),
},
)
BasicTimesheetConfiguration = RepositoryConfiguration(
dimensions={
"timesheet": Dimension(
display_name="Timesheet",
comment="Used by Timesheet-Subcommand to create Timecards",
propagate=False,
)
},
markers={
"VacationDay": Marker(
display_name="Vacation Day",
placements=[
MarkerPlacement(
if_with={"Timesheet"},
dimension="timesheet",
value="day_off_sick_leave",
)
],
),
"Holiday": Marker(
display_name="Offical Holiday",
placements=[
MarkerPlacement(
if_with={"Timesheet"},
dimension="timesheet",
value="day_off_holiday",
)
],
),
"SickLeave": Marker(
display_name="Sick Leave",
placements=[
MarkerPlacement(
if_with={"Timesheet"},
dimension="timesheet",
value="day_off_sick_leave",
)
],
),
"UndertimeDay": Marker(
display_name="Undertime Leave",
placements=[
MarkerPlacement(
if_with={"Timesheet"},
dimension="timesheet",
value="day_off_undertime",
)
],
),
},
)

View file

@ -0,0 +1,107 @@
from __future__ import annotations
from typing import Optional
from pydantic import BaseModel
class Dimension(BaseModel):
display_name: str
comment: Optional[str] = None
propagate: bool = False
class MarkerPlacement(BaseModel):
if_with: set[str] = set()
dimension: str
value: str | None = None
class Marker(BaseModel):
display_name: str
placements: list[MarkerPlacement] = []
class RepositoryConfiguration(BaseModel):
dimensions: dict[str, Dimension]
markers: dict[str, Marker]
def merge_single_dimension(base: Dimension, second: Dimension) -> Dimension:
second_fields_set = getattr(second, "model_fields_set", set())
return Dimension(
display_name=second.display_name or base.display_name,
comment=base.comment if second.comment is None else second.comment,
propagate=second.propagate
if "propagate" in second_fields_set
else base.propagate,
)
def merge_dimensions(
base: dict[str, Dimension], second: dict[str, Dimension]
) -> dict[str, Dimension]:
merged: dict[str, Dimension] = dict(base)
for key, second_dimension in second.items():
if key in merged:
merged[key] = merge_single_dimension(merged[key], second_dimension)
else:
merged[key] = second_dimension
return merged
def _placement_identity(p: MarkerPlacement) -> tuple[frozenset[str], str]:
return (frozenset(p.if_with), p.dimension)
def merge_single_marker(base: Marker, second: Marker) -> Marker:
merged_display_name = second.display_name or base.display_name
merged_placements: list[MarkerPlacement] = []
seen: dict[tuple[frozenset[str], str], int] = {}
for placement in base.placements:
ident = _placement_identity(placement)
seen[ident] = len(merged_placements)
merged_placements.append(placement)
for placement in second.placements:
ident = _placement_identity(placement)
if ident in seen:
merged_placements[seen[ident]] = placement
else:
seen[ident] = len(merged_placements)
merged_placements.append(placement)
return Marker(display_name=merged_display_name, placements=merged_placements)
def merge_markers(
base: dict[str, Marker], second: dict[str, Marker]
) -> dict[str, Marker]:
merged: dict[str, Marker] = dict(base)
for key, second_marker in second.items():
if key in merged:
merged[key] = merge_single_marker(merged[key], second_marker)
else:
merged[key] = second_marker
return merged
def merge_repository_configuration(
base: RepositoryConfiguration, second: RepositoryConfiguration
) -> RepositoryConfiguration:
return RepositoryConfiguration(
dimensions=merge_dimensions(base.dimensions, second.dimensions),
markers=merge_markers(base.markers, second.markers),
)
__all__ = [
"Dimension",
"Marker",
"MarkerPlacement",
"RepositoryConfiguration",
"merge_repository_configuration",
]

View file

@ -1,21 +0,0 @@
from typing import Optional
from pydantic import BaseModel
class Dimension(BaseModel):
display_name: str
comment: Optional[str] = None
propagate: bool = False
class Marker(BaseModel):
display_name: str
dimension: str
class RepositoryConfiguration(BaseModel):
dimensions: dict[str, Dimension]
markers: dict[str, Marker]
__all__ = ["Dimension", "Marker", "RepositoryConfiguration"]

View file

@ -20,8 +20,8 @@ def get_line_number(block_token: BlockToken) -> int:
def build_shard( def build_shard(
start_line, start_line: int,
end_line, end_line: int,
markers: list[str] = [], markers: list[str] = [],
tags: list[str] = [], tags: list[str] = [],
children: list[Shard] = [], children: list[Shard] = [],
@ -236,7 +236,7 @@ def parse_markdown_file(file_name: str, file_content: str) -> StreamFile:
): ):
shard = parsed_shard shard = parsed_shard
return StreamFile(shard=shard, filename=file_name) return StreamFile(shard=shard, file_name=file_name)
__all__ = ["Shard", "StreamFile", "parse_markdown_file"] __all__ = ["Shard", "StreamFile", "parse_markdown_file"]

View file

@ -1,4 +1,5 @@
from __future__ import annotations from __future__ import annotations
from pydantic import BaseModel from pydantic import BaseModel
@ -11,7 +12,7 @@ class Shard(BaseModel):
class StreamFile(BaseModel): class StreamFile(BaseModel):
filename: str file_name: str
shard: Shard | None = None shard: Shard | None = None

View file

@ -0,0 +1,3 @@
from .find import find_shard, find_shard_by_position
__all__ = ["find_shard_by_position", "find_shard"]

View file

@ -0,0 +1,29 @@
from typing import Callable
from streamer.localize import LocalizedShard
def find_shard(
shards: list[LocalizedShard], query_function: Callable[[LocalizedShard], bool]
) -> list[LocalizedShard]:
found_shards = []
for shard in shards:
if query_function(shard):
found_shards.append(shard)
found_shards.extend(find_shard(shard.children, query_function))
return found_shards
def find_shard_by_position(
shards: list[LocalizedShard], dimension: str, value: str
) -> list[LocalizedShard]:
return find_shard(
shards,
lambda shard: dimension in shard.location
and shard.location[dimension] == value,
)
__all__ = ["find_shard_by_position", "find_shard"]

View file

@ -1,15 +0,0 @@
from streamer.parse.shard import Shard
def find_by_markers(shard: Shard, has: list[str], has_not: list[str]) -> list[Shard]:
found_shards = []
if any(tag in has for tag in shard.markers) and not any(
tag in has_not for tag in shard.markers
):
found_shards.append(shard)
for child in shard.children:
found_shards.extend(find_by_markers(child, has, has_not))
return found_shards

View file

@ -1,32 +1,157 @@
from datetime import datetime from datetime import date, datetime, time
from streamer.localize.extract_datetime import extract_date_from_file_name
from streamer.localize.extract_datetime import (
extract_date_from_marker,
extract_datetime_from_file_name,
extract_datetime_from_marker,
extract_datetime_from_marker_list,
extract_time_from_marker,
)
class TestExtractDateTime: class TestExtractDateTime:
def test_extract_date_from_file_name_valid(self): def test_extract_date_from_file_name_valid(self):
file_name = "20230101-123456 Some Text.md" file_name = "20230101-123456 Some Text.md"
assert datetime(2023, 1, 1, 12, 34, 56) == extract_date_from_file_name( assert datetime(2023, 1, 1, 12, 34, 56) == extract_datetime_from_file_name(
file_name file_name
) )
def test_extract_date_from_file_name_invalid(self): def test_extract_date_from_file_name_invalid(self):
file_name = "invalid-file-name.md" file_name = "invalid-file-name.md"
assert extract_date_from_file_name(file_name) is None assert extract_datetime_from_file_name(file_name) is None
def test_extract_date_from_file_name_without_time(self): def test_extract_date_from_file_name_without_time(self):
file_name = "20230101 Some Text.md" file_name = "20230101 Some Text.md"
assert datetime(2023, 1, 1, 0, 0, 0) == extract_date_from_file_name(file_name) assert datetime(2023, 1, 1, 0, 0, 0) == extract_datetime_from_file_name(
file_name
)
def test_extract_date_from_file_name_short_time(self): def test_extract_date_from_file_name_short_time(self):
file_name = "20230101-1234 Some Text.md" file_name = "20230101-1234 Some Text.md"
assert datetime(2023, 1, 1, 12, 34, 0) == extract_date_from_file_name(file_name) assert datetime(2023, 1, 1, 12, 34, 0) == extract_datetime_from_file_name(
file_name
)
def test_extract_date_from_file_name_empty_string(self): def test_extract_date_from_file_name_empty_string(self):
file_name = "" file_name = ""
assert extract_date_from_file_name(file_name) is None assert extract_datetime_from_file_name(file_name) is None
def test_extract_date_from_file_name_with_full_path(self): def test_extract_date_from_file_name_with_full_path(self):
file_name = "/path/to/20230101-123456 Some Text.md" file_name = "/path/to/20230101-123456 Some Text.md"
assert datetime(2023, 1, 1, 12, 34, 56) == extract_date_from_file_name( assert datetime(2023, 1, 1, 12, 34, 56) == extract_datetime_from_file_name(
file_name file_name
) )
class TestExtractMarkerDateTime:
def test_extract_datetime_from_marker_valid(self):
marker = "20250101150000"
assert datetime(2025, 1, 1, 15, 0, 0) == extract_datetime_from_marker(marker)
def test_extract_datetime_from_marker_invalid_format(self):
assert extract_datetime_from_marker("2025010115000") is None # too short
assert extract_datetime_from_marker("202501011500000") is None # too long
assert extract_datetime_from_marker("2025-01-01T150000") is None # separators
assert extract_datetime_from_marker("2025010115000a") is None # non-digit
assert extract_datetime_from_marker("") is None
def test_extract_datetime_from_marker_invalid_values(self):
assert extract_datetime_from_marker("20250230120000") is None # Feb 30
assert extract_datetime_from_marker("20250101126000") is None # minute 60
assert extract_datetime_from_marker("20250101240000") is None # hour 24
class TestExtractMarkerDate:
def test_extract_date_from_marker_valid(self):
marker = "20250101"
assert date(2025, 1, 1) == extract_date_from_marker(marker)
def test_extract_date_from_marker_invalid_format(self):
assert extract_date_from_marker("2025010") is None # too short
assert extract_date_from_marker("202501011") is None # too long
assert extract_date_from_marker("2025-01-01") is None # separators
assert extract_date_from_marker("2025010a") is None # non-digit
assert extract_date_from_marker("") is None
def test_extract_date_from_marker_invalid_values(self):
assert extract_date_from_marker("20250230") is None # Feb 30
assert extract_date_from_marker("20251301") is None # month 13
assert extract_date_from_marker("20250132") is None # day 32
class TestExtractMarkerTime:
def test_extract_time_from_marker_valid(self):
marker = "150000"
assert time(15, 0, 0) == extract_time_from_marker(marker)
def test_extract_time_from_marker_invalid_format(self):
assert extract_time_from_marker("15000") is None # too short
assert extract_time_from_marker("1500000") is None # too long
assert extract_time_from_marker("15:00:00") is None # separators
assert extract_time_from_marker("15000a") is None # non-digit
assert extract_time_from_marker("") is None
def test_extract_time_from_marker_invalid_values(self):
assert extract_time_from_marker("240000") is None # hour 24
assert extract_time_from_marker("156000") is None # minute 60
assert extract_time_from_marker("150060") is None # second 60
class TestExtractDateTimeFromMarkerList:
def test_no_markers_inherits_datetime(self):
inherited = datetime(2025, 1, 2, 3, 4, 5)
assert inherited == extract_datetime_from_marker_list([], inherited)
def test_unrelated_markers_inherits_datetime(self):
inherited = datetime(2025, 1, 2, 3, 4, 5)
markers = ["not-a-marker", "2025-01-01", "1500", "1234567"]
assert inherited == extract_datetime_from_marker_list(markers, inherited)
def test_date_only_marker_sets_midnight(self):
inherited = datetime(2025, 6, 7, 8, 9, 10)
markers = ["20250101"]
assert datetime(2025, 1, 1, 0, 0, 0) == extract_datetime_from_marker_list(
markers, inherited
)
def test_time_only_marker_inherits_date(self):
inherited = datetime(2025, 6, 7, 8, 9, 10)
markers = ["150000"]
assert datetime(2025, 6, 7, 15, 0, 0) == extract_datetime_from_marker_list(
markers, inherited
)
def test_datetime_marker_overrides_both_date_and_time(self):
inherited = datetime(2025, 6, 7, 8, 9, 10)
markers = ["20250101150000"]
assert datetime(2025, 1, 1, 15, 0, 0) == extract_datetime_from_marker_list(
markers, inherited
)
def test_combined_date_and_time_markers(self):
inherited = datetime(2025, 6, 7, 8, 9, 10)
markers = ["20250101", "150000"]
assert datetime(2025, 1, 1, 15, 0, 0) == extract_datetime_from_marker_list(
markers, inherited
)
def test_first_marker_wins_when_multiple_dates_or_times(self):
inherited = datetime(2025, 6, 7, 8, 9, 10)
markers = ["20250101", "150000", "20250102", "160000"]
assert datetime(2025, 1, 1, 15, 0, 0) == extract_datetime_from_marker_list(
markers, inherited
)
def test_last_separated_date_and_time_win(self):
inherited = datetime(2025, 6, 7, 8, 9, 10)
markers = ["20250101", "150000", "20250102160000"]
assert datetime(2025, 1, 1, 15, 0, 0) == extract_datetime_from_marker_list(
markers, inherited
)
def test_invalid_date_or_time_markers_are_ignored(self):
inherited = datetime(2025, 6, 7, 8, 9, 10)
markers = ["20251301", "240000", "20250101", "150000"]
assert datetime(2025, 1, 1, 15, 0, 0) == extract_datetime_from_marker_list(
markers, inherited
)

View file

@ -0,0 +1,365 @@
import pytest
from streamer.localize.repository_configuration import (
Dimension,
Marker,
MarkerPlacement,
RepositoryConfiguration,
merge_dimensions,
merge_markers,
merge_repository_configuration,
merge_single_dimension,
merge_single_marker,
)
class TestMergeSingleDimension:
def test_second_overrides_display_name_when_non_empty(self):
base = Dimension(display_name="Base", comment="c1", propagate=True)
second = Dimension(display_name="Second", comment="c2", propagate=False)
merged = merge_single_dimension(base, second)
assert merged.display_name == "Second"
assert merged.comment == "c2"
assert merged.propagate is False
def test_second_empty_display_name_falls_back_to_base(self):
base = Dimension(display_name="Base", comment="c1", propagate=True)
second = Dimension(display_name="", comment="c2", propagate=False)
merged = merge_single_dimension(base, second)
assert merged.display_name == "Base"
assert merged.comment == "c2"
assert merged.propagate is False
def test_second_comment_none_does_not_erase_base_comment(self):
base = Dimension(display_name="Base", comment="keep", propagate=True)
second = Dimension(display_name="Second", comment=None, propagate=False)
merged = merge_single_dimension(base, second)
assert merged.display_name == "Second"
assert merged.comment == "keep"
def test_second_comment_non_none_overrides_base_comment(self):
base = Dimension(display_name="Base", comment="c1", propagate=True)
second = Dimension(display_name="Second", comment="c2", propagate=True)
merged = merge_single_dimension(base, second)
assert merged.comment == "c2"
def test_second_propagate_overrides_base_when_provided(self):
base = Dimension(display_name="Base", comment="c1", propagate=True)
second = Dimension(display_name="Second", comment="c2", propagate=False)
merged = merge_single_dimension(base, second)
assert merged.propagate is False
def test_propagate_merging_retains_base_when_second_not_provided(self):
base = Dimension(display_name="Base", comment="c1", propagate=True)
second = Dimension(display_name="Second", comment="c2")
merged = merge_single_dimension(base, second)
assert merged.propagate is True
class TestMergeDimensions:
def test_adds_new_keys_from_second(self):
base = {"a": Dimension(display_name="A", propagate=True)}
second = {"b": Dimension(display_name="B", propagate=False)}
merged = merge_dimensions(base, second)
assert set(merged.keys()) == {"a", "b"}
assert merged["a"].display_name == "A"
assert merged["b"].display_name == "B"
def test_merges_existing_keys(self):
base = {"a": Dimension(display_name="A", comment="c1", propagate=True)}
second = {"a": Dimension(display_name="A2", comment=None, propagate=False)}
merged = merge_dimensions(base, second)
assert merged["a"].display_name == "A2"
assert merged["a"].comment == "c1"
assert merged["a"].propagate is False
def test_does_not_mutate_inputs(self):
base = {"a": Dimension(display_name="A", comment="c1", propagate=True)}
second = {"b": Dimension(display_name="B", comment="c2", propagate=False)}
merged = merge_dimensions(base, second)
assert "b" not in base
assert "a" not in second
assert set(merged.keys()) == {"a", "b"}
class TestMergeSingleMarker:
def test_second_overrides_display_name_when_non_empty(self):
base = Marker(
display_name="Base",
placements=[MarkerPlacement(dimension="project", value=None)],
)
second = Marker(
display_name="Second",
placements=[MarkerPlacement(dimension="timesheet", value="coding")],
)
merged = merge_single_marker(base, second)
assert merged.display_name == "Second"
assert merged.placements == [
MarkerPlacement(dimension="project", value=None, if_with=set()),
MarkerPlacement(dimension="timesheet", value="coding", if_with=set()),
]
def test_second_empty_display_name_falls_back_to_base(self):
base = Marker(display_name="Base", placements=[])
second = Marker(display_name="", placements=[])
merged = merge_single_marker(base, second)
assert merged.display_name == "Base"
def test_appends_new_placements(self):
base = Marker(
display_name="Base",
placements=[
MarkerPlacement(dimension="project"),
],
)
second = Marker(
display_name="Second",
placements=[
MarkerPlacement(
if_with={"Timesheet"}, dimension="timesheet", value="x"
),
],
)
merged = merge_single_marker(base, second)
assert merged.placements == [
MarkerPlacement(dimension="project"),
MarkerPlacement(if_with={"Timesheet"}, dimension="timesheet", value="x"),
]
def test_deduplicates_by_identity_and_second_overrides_base(self):
base = Marker(
display_name="Base",
placements=[
MarkerPlacement(if_with={"A"}, dimension="d", value="v"),
MarkerPlacement(if_with={"B"}, dimension="d", value="v2"),
],
)
second = Marker(
display_name="Second",
placements=[
MarkerPlacement(if_with={"A"}, dimension="d", value="v"),
MarkerPlacement(if_with={"C"}, dimension="d", value="v3"),
],
)
merged = merge_single_marker(base, second)
assert merged.placements == [
MarkerPlacement(if_with={"A"}, dimension="d", value="v"),
MarkerPlacement(if_with={"B"}, dimension="d", value="v2"),
MarkerPlacement(if_with={"C"}, dimension="d", value="v3"),
]
def test_identity_is_order_insensitive_for_if_with(self):
base = Marker(
display_name="Base",
placements=[MarkerPlacement(if_with={"A", "B"}, dimension="d", value="v")],
)
second = Marker(
display_name="Second",
placements=[MarkerPlacement(if_with={"B", "A"}, dimension="d", value="v2")],
)
merged = merge_single_marker(base, second)
# With `if_with` as a set, identity is order-insensitive; second overrides base.
assert merged.placements == [
MarkerPlacement(if_with={"A", "B"}, dimension="d", value="v2"),
]
class TestMergeMarkers:
def test_adds_new_marker_keys_from_second(self):
base = {"M1": Marker(display_name="M1", placements=[])}
second = {"M2": Marker(display_name="M2", placements=[])}
merged = merge_markers(base, second)
assert set(merged.keys()) == {"M1", "M2"}
def test_merges_existing_marker_keys(self):
base = {
"M": Marker(
display_name="Base",
placements=[MarkerPlacement(dimension="project")],
)
}
second = {
"M": Marker(
display_name="Second",
placements=[
MarkerPlacement(
if_with={"Timesheet"}, dimension="timesheet", value="coding"
)
],
)
}
merged = merge_markers(base, second)
assert merged["M"].display_name == "Second"
assert merged["M"].placements == [
MarkerPlacement(dimension="project", value=None, if_with=set()),
MarkerPlacement(
if_with={"Timesheet"}, dimension="timesheet", value="coding"
),
]
def test_does_not_mutate_inputs(self):
base = {"M1": Marker(display_name="M1", placements=[])}
second = {"M2": Marker(display_name="M2", placements=[])}
merged = merge_markers(base, second)
assert "M2" not in base
assert "M1" not in second
assert set(merged.keys()) == {"M1", "M2"}
class TestMergeRepositoryConfiguration:
def test_merges_dimensions_and_markers(self):
base = RepositoryConfiguration(
dimensions={
"project": Dimension(
display_name="Project", comment="c1", propagate=True
),
"moment": Dimension(
display_name="Moment", comment="c2", propagate=True
),
},
markers={
"Streamer": Marker(
display_name="Streamer",
placements=[MarkerPlacement(dimension="project")],
)
},
)
second = RepositoryConfiguration(
dimensions={
"project": Dimension(display_name="Project2", propagate=False),
"timesheet": Dimension(
display_name="Timesheet", comment="c3", propagate=False
),
},
markers={
"Streamer": Marker(
display_name="Streamer2",
placements=[
MarkerPlacement(
if_with={"Timesheet"}, dimension="timesheet", value="coding"
)
],
),
"JobHunting": Marker(
display_name="JobHunting",
placements=[MarkerPlacement(dimension="project")],
),
},
)
merged = merge_repository_configuration(base, second)
assert set(merged.dimensions.keys()) == {"project", "moment", "timesheet"}
assert merged.dimensions["project"].display_name == "Project2"
assert merged.dimensions["project"].comment == "c1"
assert merged.dimensions["project"].propagate is False
assert merged.dimensions["moment"].display_name == "Moment"
assert merged.dimensions["timesheet"].display_name == "Timesheet"
assert set(merged.markers.keys()) == {"Streamer", "JobHunting"}
assert merged.markers["Streamer"].display_name == "Streamer2"
assert merged.markers["Streamer"].placements == [
MarkerPlacement(dimension="project", value=None, if_with=set()),
MarkerPlacement(
if_with={"Timesheet"}, dimension="timesheet", value="coding"
),
]
assert merged.markers["JobHunting"].placements == [
MarkerPlacement(dimension="project", value=None, if_with=set())
]
def test_does_not_mutate_base_or_second(self):
base = RepositoryConfiguration(
dimensions={"a": Dimension(display_name="A", propagate=True)},
markers={"M": Marker(display_name="M", placements=[])},
)
second = RepositoryConfiguration(
dimensions={"b": Dimension(display_name="B", propagate=False)},
markers={"N": Marker(display_name="N", placements=[])},
)
_ = merge_repository_configuration(base, second)
assert set(base.dimensions.keys()) == {"a"}
assert set(second.dimensions.keys()) == {"b"}
assert set(base.markers.keys()) == {"M"}
assert set(second.markers.keys()) == {"N"}
def test_merge_is_associative_for_non_conflicting_inputs(self):
a = RepositoryConfiguration(
dimensions={"d1": Dimension(display_name="D1", propagate=True)},
markers={"m1": Marker(display_name="M1", placements=[])},
)
b = RepositoryConfiguration(
dimensions={"d2": Dimension(display_name="D2", propagate=False)},
markers={"m2": Marker(display_name="M2", placements=[])},
)
c = RepositoryConfiguration(
dimensions={"d3": Dimension(display_name="D3", propagate=False)},
markers={"m3": Marker(display_name="M3", placements=[])},
)
left = merge_repository_configuration(merge_repository_configuration(a, b), c)
right = merge_repository_configuration(a, merge_repository_configuration(b, c))
assert left == right
assert set(left.dimensions.keys()) == {"d1", "d2", "d3"}
assert set(left.markers.keys()) == {"m1", "m2", "m3"}
@pytest.mark.parametrize(
("base", "second", "expected_propagate"),
[
(
RepositoryConfiguration(
dimensions={"d": Dimension(display_name="D", propagate=True)},
markers={},
),
RepositoryConfiguration(
dimensions={"d": Dimension(display_name="D2")},
markers={},
),
True,
)
],
)
def test_merge_repository_configuration_propagate_preserves_base_when_omitted(
base, second, expected_propagate
):
merged = merge_repository_configuration(base, second)
assert merged.dimensions["d"].propagate is expected_propagate

View file

@ -1,6 +1,7 @@
from streamer.parse import StreamFile, parse_markdown_file, Shard
from faker import Faker from faker import Faker
from streamer.parse import Shard, StreamFile, parse_markdown_file
fake = Faker() fake = Faker()
@ -9,13 +10,13 @@ class TestParseProcess:
def test_parse_empty_file(self): def test_parse_empty_file(self):
assert parse_markdown_file(self.file_name, "") == StreamFile( assert parse_markdown_file(self.file_name, "") == StreamFile(
filename=self.file_name, shard=Shard(start_line=1, end_line=1) file_name=self.file_name, shard=Shard(start_line=1, end_line=1)
) )
def test_parse_basic_one_line_file(self): def test_parse_basic_one_line_file(self):
test_file = "Hello World" test_file = "Hello World"
assert parse_markdown_file(self.file_name, test_file) == StreamFile( assert parse_markdown_file(self.file_name, test_file) == StreamFile(
filename=self.file_name, file_name=self.file_name,
shard=Shard( shard=Shard(
start_line=1, start_line=1,
end_line=1, end_line=1,
@ -25,7 +26,7 @@ class TestParseProcess:
def test_parse_basic_multi_line_file(self): def test_parse_basic_multi_line_file(self):
test_file = "Hello World\n\nHello again!" test_file = "Hello World\n\nHello again!"
assert parse_markdown_file(self.file_name, test_file) == StreamFile( assert parse_markdown_file(self.file_name, test_file) == StreamFile(
filename=self.file_name, file_name=self.file_name,
shard=Shard( shard=Shard(
start_line=1, start_line=1,
end_line=3, end_line=3,
@ -35,7 +36,7 @@ class TestParseProcess:
def test_parse_single_line_with_tag(self): def test_parse_single_line_with_tag(self):
test_file = "@Tag Hello World" test_file = "@Tag Hello World"
assert parse_markdown_file(self.file_name, test_file) == StreamFile( assert parse_markdown_file(self.file_name, test_file) == StreamFile(
filename=self.file_name, file_name=self.file_name,
shard=Shard( shard=Shard(
markers=["Tag"], markers=["Tag"],
start_line=1, start_line=1,
@ -46,7 +47,7 @@ class TestParseProcess:
def test_parse_single_line_with_two_tags(self): def test_parse_single_line_with_two_tags(self):
test_file = "@Marker1 @Marker2 Hello World" test_file = "@Marker1 @Marker2 Hello World"
assert parse_markdown_file(self.file_name, test_file) == StreamFile( assert parse_markdown_file(self.file_name, test_file) == StreamFile(
filename=self.file_name, file_name=self.file_name,
shard=Shard( shard=Shard(
markers=["Marker1", "Marker2"], markers=["Marker1", "Marker2"],
start_line=1, start_line=1,
@ -57,7 +58,7 @@ class TestParseProcess:
def test_parse_single_line_with_two_tags_and_misplaced_tag(self): def test_parse_single_line_with_two_tags_and_misplaced_tag(self):
test_file = "@Tag1 @Tag2 Hello World @Tag3" test_file = "@Tag1 @Tag2 Hello World @Tag3"
assert parse_markdown_file(self.file_name, test_file) == StreamFile( assert parse_markdown_file(self.file_name, test_file) == StreamFile(
filename=self.file_name, file_name=self.file_name,
shard=Shard( shard=Shard(
markers=["Tag1", "Tag2"], markers=["Tag1", "Tag2"],
tags=["Tag3"], tags=["Tag3"],
@ -70,7 +71,7 @@ class TestParseProcess:
file_text = "Hello World!\n\n@Tag1 Block 1\n\n@Tag2 Block 2" file_text = "Hello World!\n\n@Tag1 Block 1\n\n@Tag2 Block 2"
assert parse_markdown_file(self.file_name, file_text) == StreamFile( assert parse_markdown_file(self.file_name, file_text) == StreamFile(
filename=self.file_name, file_name=self.file_name,
shard=Shard( shard=Shard(
start_line=1, start_line=1,
end_line=5, end_line=5,
@ -113,7 +114,7 @@ class TestParseProcess:
file_text = "# Heading @Tag1\n\n## @Marker1 Subheading @Tag2\n\n# Heading @Tag3" file_text = "# Heading @Tag1\n\n## @Marker1 Subheading @Tag2\n\n# Heading @Tag3"
assert parse_markdown_file(self.file_name, file_text) == StreamFile( assert parse_markdown_file(self.file_name, file_text) == StreamFile(
filename=self.file_name, file_name=self.file_name,
shard=Shard( shard=Shard(
start_line=1, start_line=1,
end_line=5, end_line=5,
@ -140,7 +141,7 @@ class TestParseProcess:
file_text = "# @Marker1 Heading @Tag1\n\n## Subheading @Tag2" file_text = "# @Marker1 Heading @Tag1\n\n## Subheading @Tag2"
assert parse_markdown_file(self.file_name, file_text) == StreamFile( assert parse_markdown_file(self.file_name, file_text) == StreamFile(
filename=self.file_name, file_name=self.file_name,
shard=Shard( shard=Shard(
markers=["Marker1"], markers=["Marker1"],
tags=["Tag1", "Tag2"], tags=["Tag1", "Tag2"],

104
test/query/test_find.py Normal file
View file

@ -0,0 +1,104 @@
from __future__ import annotations
from datetime import datetime
from streamer.localize import LocalizedShard
from streamer.query.find import find_shard, find_shard_by_position
def generate_localized_shard(
*,
location: dict[str, str] | None = None,
children: list[LocalizedShard] | None = None,
) -> LocalizedShard:
return LocalizedShard(
start_line=1,
end_line=1,
moment=datetime(2020, 1, 1),
location=location or {},
children=children or [],
markers=[],
tags=[],
)
class TestFindShard:
def test_returns_empty_when_no_match(self) -> None:
root = generate_localized_shard(location={"file": "a.md"})
shards = [root]
result = find_shard(shards, lambda s: "missing" in s.location)
assert result == []
def test_finds_matches_depth_first_and_preserves_order(self) -> None:
grandchild = generate_localized_shard(location={"k": "match"})
child1 = generate_localized_shard(
location={"k": "match"}, children=[grandchild]
)
child2 = generate_localized_shard(location={"k": "nope"})
root = generate_localized_shard(
location={"k": "nope"}, children=[child1, child2]
)
result = find_shard([root], lambda s: s.location.get("k") == "match")
assert result == [child1, grandchild]
def test_includes_root_if_it_matches(self) -> None:
root = generate_localized_shard(
location={"k": "match"},
children=[generate_localized_shard(location={"k": "match"})],
)
result = find_shard([root], lambda s: s.location.get("k") == "match")
assert result[0] is root
assert len(result) == 2
def test_multiple_roots_keeps_left_to_right_order(self) -> None:
a = generate_localized_shard(location={"k": "match"})
b = generate_localized_shard(location={"k": "match"})
c = generate_localized_shard(location={"k": "nope"})
result = find_shard([a, b, c], lambda s: s.location.get("k") == "match")
assert result == [a, b]
def test_query_function_can_use_arbitrary_logic(self) -> None:
# Ensures typing/behavior supports any callable that returns bool.
a = generate_localized_shard(location={"x": "1"})
b = generate_localized_shard(location={"x": "2"})
c = generate_localized_shard(location={"x": "3"})
root = generate_localized_shard(location={}, children=[a, b, c])
def is_even_x(shard: LocalizedShard) -> bool:
x = shard.location.get("x")
return x is not None and int(x) % 2 == 0
result = find_shard([root], is_even_x)
assert result == [b]
class TestFindShardByPosition:
def test_matches_only_when_dimension_present_and_equal(self) -> None:
match = generate_localized_shard(location={"file": "a.md", "line": "10"})
wrong_value = generate_localized_shard(location={"file": "a.md", "line": "11"})
missing_dim = generate_localized_shard(location={"file": "a.md"})
root = generate_localized_shard(
location={"root": "x"}, children=[match, wrong_value, missing_dim]
)
result = find_shard_by_position([root], "line", "10")
assert result == [match]
def test_recurses_through_children(self) -> None:
deep = generate_localized_shard(location={"section": "s1"})
mid = generate_localized_shard(location={"section": "s0"}, children=[deep])
root = generate_localized_shard(location={}, children=[mid])
result = find_shard_by_position([root], "section", "s1")
assert result == [deep]

View file

@ -1,8 +1,11 @@
from datetime import datetime
from streamer.localize.localize import localize_stream_file from streamer.localize.localize import localize_stream_file
from streamer.localize.localized_shard import LocalizedShard from streamer.localize.localized_shard import LocalizedShard
from streamer.localize.repostory_configuration import ( from streamer.localize.repository_configuration import (
Dimension, Dimension,
Marker, Marker,
MarkerPlacement,
RepositoryConfiguration, RepositoryConfiguration,
) )
from streamer.parse.shard import Shard, StreamFile from streamer.parse.shard import Shard, StreamFile
@ -19,28 +22,66 @@ repository_configuration = RepositoryConfiguration(
comment="Timestamp this entry was created at", comment="Timestamp this entry was created at",
propagate=True, propagate=True,
), ),
"timesheet": Dimension(
display_name="Timesheet",
comment="Time Cards for Time Tracking",
propagate=True,
),
}, },
markers={ markers={
"streamer": Marker(display_name="Streamer", dimension="project"), "Streamer": Marker(
"jobhunting": Marker(display_name="JobHunting", dimension="project"), display_name="Streamer",
placements=[
MarkerPlacement(dimension="project"),
MarkerPlacement(
if_with={"Timesheet"}, dimension="timesheet", value="coding"
),
],
),
"JobHunting": Marker(
display_name="JobHunting", placements=[MarkerPlacement(dimension="project")]
),
}, },
) )
class TestExtractDateTime: class TestLocalize:
def test_project_simple_stream_file(self): def test_project_simple_stream_file(self):
stream_file = StreamFile( stream_file = StreamFile(
filename="20250622-121000 Test File.md", file_name="20250622-121000 Test File.md",
shard=Shard(start_line=1, end_line=1, markers=["Streamer"]), shard=Shard(start_line=1, end_line=1, markers=["Streamer"]),
) )
assert localize_stream_file( assert localize_stream_file(
stream_file, repository_configuration stream_file, repository_configuration
) == LocalizedShard( ) == LocalizedShard(
moment=datetime(2025, 6, 22, 12, 10, 0, 0),
markers=["Streamer"], markers=["Streamer"],
tags=[], tags=[],
start_line=1, start_line=1,
end_line=1, end_line=1,
children=[], children=[],
location={"moment": "2025-06-22T12:10:00", "project": "streamer"}, location={"project": "Streamer", "file": stream_file.file_name},
)
def test_timesheet_use_case(self):
stream_file = StreamFile(
file_name="20260131-210000 Test File.md",
shard=Shard(start_line=1, end_line=1, markers=["Timesheet", "Streamer"]),
)
assert localize_stream_file(
stream_file, repository_configuration
) == LocalizedShard(
moment=datetime(2026, 1, 31, 21, 0, 0, 0),
markers=["Timesheet", "Streamer"],
tags=[],
start_line=1,
end_line=1,
children=[],
location={
"file": stream_file.file_name,
"project": "Streamer",
"timesheet": "coding",
},
) )