Implement Edit Feature #28
20 changed files with 1091 additions and 129 deletions
|
|
@ -21,6 +21,3 @@ jobs:
|
|||
|
||||
- name: Test with PyTest
|
||||
run: nix develop .#impure --command bash -c "uv run pytest --junit-xml test-report.xml"
|
||||
|
||||
- name: Check with PyRight
|
||||
run: nix develop .#impure --command bash -c "uv run pyright"
|
||||
|
|
|
|||
|
|
@ -88,9 +88,11 @@
|
|||
# Package a virtual environment as our main application.
|
||||
#
|
||||
# Enable no optional dependencies for production build.
|
||||
packages.x86_64-linux = let
|
||||
packages.x86_64-linux =
|
||||
let
|
||||
streamer = pythonSet.mkVirtualEnv "streamer-env" workspace.deps.default;
|
||||
in {
|
||||
in
|
||||
{
|
||||
inherit streamer;
|
||||
default = streamer;
|
||||
};
|
||||
|
|
@ -116,8 +118,7 @@
|
|||
pre-commit
|
||||
bashInteractive
|
||||
];
|
||||
env =
|
||||
{
|
||||
env = {
|
||||
# Prevent uv from managing Python downloads
|
||||
UV_PYTHON_DOWNLOADS = "never";
|
||||
# Force uv to use nixpkgs Python interpreter
|
||||
|
|
|
|||
|
|
@ -1,45 +1,76 @@
|
|||
import glob
|
||||
import os
|
||||
import typer
|
||||
from datetime import datetime
|
||||
from shutil import move
|
||||
from typing import Annotated, Generator
|
||||
|
||||
import click
|
||||
import typer
|
||||
from rich import print
|
||||
from rich.markdown import Markdown
|
||||
from rich.panel import Panel
|
||||
from shutil import move
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from streamer.parse.parse import parse_markdown_file
|
||||
from streamer.query.task import find_by_markers
|
||||
from streamer.localize import (
|
||||
LocalizedShard,
|
||||
RepositoryConfiguration,
|
||||
localize_stream_file,
|
||||
)
|
||||
from streamer.localize.preconfigured_configurations import TaskConfiguration
|
||||
from streamer.parse import parse_markdown_file
|
||||
from streamer.query import find_shard_by_position
|
||||
from streamer.settings import Settings
|
||||
|
||||
app = typer.Typer()
|
||||
|
||||
|
||||
@app.command()
|
||||
def todo() -> None:
|
||||
def all_files(config: RepositoryConfiguration) -> Generator[LocalizedShard]:
|
||||
for file_name in glob.glob(f"{glob.escape(Settings().base_folder)}/*.md"):
|
||||
with open(file_name, "r") as file:
|
||||
file_content = file.read()
|
||||
sharded_document = parse_markdown_file(file_name, file_content)
|
||||
if sharded_document.shard:
|
||||
open_tasks = find_by_markers(sharded_document.shard, ["Task"], ["Done"])
|
||||
if shard := localize_stream_file(
|
||||
parse_markdown_file(file_name, file_content), config
|
||||
):
|
||||
yield shard
|
||||
|
||||
for task_shard in open_tasks:
|
||||
|
||||
@app.command()
|
||||
def todo() -> None:
|
||||
all_shards = list(all_files(TaskConfiguration))
|
||||
|
||||
for task_shard in find_shard_by_position(all_shards, "task", "open"):
|
||||
with open(task_shard.location["file"], "r") as file:
|
||||
file_content = file.read().splitlines()
|
||||
print(
|
||||
Panel(
|
||||
Markdown(
|
||||
"\n".join(
|
||||
file_content.splitlines()[
|
||||
file_content[
|
||||
task_shard.start_line - 1 : task_shard.end_line
|
||||
]
|
||||
)
|
||||
),
|
||||
title=f"{file_name}:{task_shard.start_line}",
|
||||
title=f"{task_shard.location['file']}:{task_shard.start_line}",
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@app.command()
|
||||
def edit(number: Annotated[int, typer.Argument()] = 1) -> None:
|
||||
all_shards = list(all_files(TaskConfiguration))
|
||||
sorted_shards = sorted(all_shards, key=lambda s: s.moment)
|
||||
|
||||
if abs(number) >= len(sorted_shards):
|
||||
raise ValueError("Argument out of range")
|
||||
|
||||
selected_number = number
|
||||
if selected_number >= 0:
|
||||
selected_number = len(sorted_shards) - selected_number
|
||||
else:
|
||||
selected_number = -selected_number
|
||||
|
||||
click.edit(None, filename=sorted_shards[selected_number].location["file"])
|
||||
|
||||
|
||||
@app.command()
|
||||
def new() -> None:
|
||||
streamer_directory = Settings().base_folder
|
||||
|
|
@ -59,7 +90,9 @@ def new() -> None:
|
|||
parsed_content = parse_markdown_file(prelimary_path, content)
|
||||
|
||||
final_file_name = f"{timestamp}.md"
|
||||
if parsed_content.shard is not None and len(markers := parsed_content.shard.markers):
|
||||
if parsed_content.shard is not None and len(
|
||||
markers := parsed_content.shard.markers
|
||||
):
|
||||
final_file_name = f"{timestamp} {' '.join(markers)}.md"
|
||||
|
||||
final_path = os.path.join(streamer_directory, final_file_name)
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
from .localize import localize_stream_file
|
||||
from .repostory_configuration import RepositoryConfiguration
|
||||
from .localized_shard import LocalizedShard
|
||||
from .repository_configuration import RepositoryConfiguration
|
||||
|
||||
__all__ = [
|
||||
"RepositoryConfiguration",
|
||||
|
|
|
|||
|
|
@ -1,9 +1,9 @@
|
|||
from datetime import datetime
|
||||
import re
|
||||
import os
|
||||
import re
|
||||
from datetime import date, datetime, time
|
||||
|
||||
|
||||
def extract_date_from_file_name(file_name: str) -> datetime | None:
|
||||
def extract_datetime_from_file_name(file_name: str) -> datetime | None:
|
||||
FILE_NAME_REGEX = r"^(?P<date>\d{8})(?:-(?P<time>\d{4,6}))?.+.md$"
|
||||
base_name = os.path.basename(file_name)
|
||||
match = re.match(FILE_NAME_REGEX, base_name)
|
||||
|
|
@ -17,4 +17,76 @@ def extract_date_from_file_name(file_name: str) -> datetime | None:
|
|||
return None
|
||||
|
||||
|
||||
__all__ = ["extract_date_from_file_name"]
|
||||
def extract_datetime_from_marker(marker: str) -> datetime | None:
|
||||
"""
|
||||
Extract a datetime from a marker string in the exact format: YYYYMMDDHHMMSS.
|
||||
|
||||
Returns:
|
||||
Parsed datetime if the format is fulfilled and values are valid, else None.
|
||||
"""
|
||||
if not re.fullmatch(r"\d{14}", marker or ""):
|
||||
return None
|
||||
try:
|
||||
return datetime.strptime(marker, "%Y%m%d%H%M%S")
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def extract_date_from_marker(marker: str) -> date | None:
|
||||
"""
|
||||
Extract a date from a marker string in the exact format: YYYYMMDD.
|
||||
|
||||
Returns:
|
||||
Parsed date if the format is fulfilled and values are valid, else None.
|
||||
"""
|
||||
if not re.fullmatch(r"\d{8}", marker or ""):
|
||||
return None
|
||||
try:
|
||||
return datetime.strptime(marker, "%Y%m%d").date()
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def extract_time_from_marker(marker: str) -> time | None: # noqa: F821
|
||||
"""
|
||||
Extract a time from a marker string in the exact format: HHMMSS.
|
||||
|
||||
Returns:
|
||||
Parsed time if the format is fulfilled and values are valid, else None.
|
||||
"""
|
||||
if not re.fullmatch(r"\d{6}", marker or ""):
|
||||
return None
|
||||
try:
|
||||
return datetime.strptime(marker, "%H%M%S").time()
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def extract_datetime_from_marker_list(markers: list[str], inherited_datetime: datetime):
|
||||
shard_time: time | None = None
|
||||
shard_date: date | None = None
|
||||
|
||||
for marker in markers[::-1]:
|
||||
if parsed_time := extract_time_from_marker(marker):
|
||||
shard_time = parsed_time
|
||||
if parsed_date := extract_date_from_marker(marker):
|
||||
shard_date = parsed_date
|
||||
if parsed_datetime := extract_datetime_from_marker(marker):
|
||||
shard_date = parsed_datetime.date()
|
||||
shard_time = parsed_datetime.time()
|
||||
|
||||
if shard_date and not shard_time:
|
||||
return datetime.combine(shard_date, time(0, 0, 0))
|
||||
|
||||
return datetime.combine(
|
||||
shard_date or inherited_datetime.date(), shard_time or inherited_datetime.time()
|
||||
)
|
||||
|
||||
|
||||
__all__ = [
|
||||
"extract_datetime_from_file_name",
|
||||
"extract_datetime_from_marker",
|
||||
"extract_date_from_marker",
|
||||
"extract_time_from_marker",
|
||||
"extract_datetime_from_marker_list",
|
||||
]
|
||||
|
|
|
|||
|
|
@ -1,46 +1,66 @@
|
|||
from datetime import datetime
|
||||
|
||||
from streamer.parse.shard import Shard, StreamFile
|
||||
|
||||
from .repostory_configuration import RepositoryConfiguration
|
||||
from .extract_datetime import (
|
||||
extract_datetime_from_file_name,
|
||||
extract_datetime_from_marker_list,
|
||||
)
|
||||
from .localized_shard import LocalizedShard
|
||||
from .extract_datetime import extract_date_from_file_name
|
||||
from .repository_configuration import RepositoryConfiguration
|
||||
|
||||
|
||||
def localize_shard(
|
||||
shard: Shard, config: RepositoryConfiguration, propagated: dict[str, str]
|
||||
shard: Shard,
|
||||
config: RepositoryConfiguration,
|
||||
propagated: dict[str, str],
|
||||
moment: datetime,
|
||||
) -> LocalizedShard:
|
||||
position = {**propagated}
|
||||
private_position: dict[str, str] = {}
|
||||
|
||||
adjusted_moment: datetime = extract_datetime_from_marker_list(shard.markers, moment)
|
||||
|
||||
for marker in shard.markers:
|
||||
normalized_marker = marker.lower()
|
||||
if marker_definition := config.markers[normalized_marker]:
|
||||
dimension_name = marker_definition.dimension
|
||||
dimension = config.dimensions[marker_definition.dimension]
|
||||
if marker in config.markers:
|
||||
marker_definition = config.markers[marker]
|
||||
for placement in marker_definition.placements:
|
||||
if placement.if_with <= set(shard.markers):
|
||||
dimension = config.dimensions[placement.dimension]
|
||||
|
||||
value = placement.value or marker
|
||||
|
||||
if dimension.propagate:
|
||||
position[dimension_name] = normalized_marker
|
||||
position[placement.dimension] = value
|
||||
else:
|
||||
private_position[dimension_name] = normalized_marker
|
||||
private_position[placement.dimension] = value
|
||||
|
||||
children = [localize_shard(child, config, position) for child in shard.children]
|
||||
children = [
|
||||
localize_shard(child, config, position, adjusted_moment)
|
||||
for child in shard.children
|
||||
]
|
||||
|
||||
position.update(private_position)
|
||||
|
||||
return LocalizedShard(
|
||||
**shard.model_dump(exclude={"children"}), location=position, children=children
|
||||
**shard.model_dump(exclude={"children"}),
|
||||
location=position,
|
||||
children=children,
|
||||
moment=adjusted_moment,
|
||||
)
|
||||
|
||||
|
||||
def localize_stream_file(
|
||||
stream_file: StreamFile, config: RepositoryConfiguration
|
||||
) -> LocalizedShard | None:
|
||||
shard_date = extract_date_from_file_name(stream_file.filename)
|
||||
shard_date = extract_datetime_from_file_name(stream_file.file_name)
|
||||
|
||||
if not shard_date or not stream_file.shard:
|
||||
raise ValueError("Could not extract date")
|
||||
|
||||
return localize_shard(stream_file.shard, config, {"moment": shard_date.isoformat()})
|
||||
return localize_shard(
|
||||
stream_file.shard, config, {"file": stream_file.file_name}, shard_date
|
||||
)
|
||||
|
||||
|
||||
__all__ = ["localize_stream_file"]
|
||||
|
|
|
|||
|
|
@ -1,8 +1,12 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from streamer.parse.shard import Shard
|
||||
|
||||
|
||||
class LocalizedShard(Shard):
|
||||
moment: datetime
|
||||
location: dict[str, str]
|
||||
children: list[LocalizedShard] = [] # pyright: ignore[reportIncompatibleVariableOverride]
|
||||
|
||||
|
|
|
|||
95
src/streamer/localize/preconfigured_configurations.py
Normal file
95
src/streamer/localize/preconfigured_configurations.py
Normal file
|
|
@ -0,0 +1,95 @@
|
|||
from streamer.localize.repository_configuration import (
|
||||
Dimension,
|
||||
Marker,
|
||||
MarkerPlacement,
|
||||
RepositoryConfiguration,
|
||||
)
|
||||
|
||||
TaskConfiguration = RepositoryConfiguration(
|
||||
dimensions={
|
||||
"task": Dimension(
|
||||
display_name="Task",
|
||||
comment="If placed, the given shard is a task. The placement determines the state.",
|
||||
propagate=False,
|
||||
),
|
||||
"project": Dimension(
|
||||
display_name="Project",
|
||||
comment="Project the task is attached to",
|
||||
propagate=True,
|
||||
),
|
||||
},
|
||||
markers={
|
||||
"Task": Marker(
|
||||
display_name="Task",
|
||||
placements=[
|
||||
MarkerPlacement(dimension="task", value="open"),
|
||||
MarkerPlacement(if_with={"Done"}, dimension="task", value="done"),
|
||||
MarkerPlacement(if_with={"Waiting"}, dimension="task", value="waiting"),
|
||||
MarkerPlacement(
|
||||
if_with={"Cancelled"}, dimension="task", value="cancelled"
|
||||
),
|
||||
MarkerPlacement(
|
||||
if_with={"NotDone"}, dimension="task", value="cancelled"
|
||||
),
|
||||
],
|
||||
),
|
||||
"WaitingFor": Marker(
|
||||
display_name="Task",
|
||||
placements=[
|
||||
MarkerPlacement(dimension="task", value="waiting"),
|
||||
],
|
||||
),
|
||||
},
|
||||
)
|
||||
|
||||
BasicTimesheetConfiguration = RepositoryConfiguration(
|
||||
dimensions={
|
||||
"timesheet": Dimension(
|
||||
display_name="Timesheet",
|
||||
comment="Used by Timesheet-Subcommand to create Timecards",
|
||||
propagate=False,
|
||||
)
|
||||
},
|
||||
markers={
|
||||
"VacationDay": Marker(
|
||||
display_name="Vacation Day",
|
||||
placements=[
|
||||
MarkerPlacement(
|
||||
if_with={"Timesheet"},
|
||||
dimension="timesheet",
|
||||
value="day_off_sick_leave",
|
||||
)
|
||||
],
|
||||
),
|
||||
"Holiday": Marker(
|
||||
display_name="Offical Holiday",
|
||||
placements=[
|
||||
MarkerPlacement(
|
||||
if_with={"Timesheet"},
|
||||
dimension="timesheet",
|
||||
value="day_off_holiday",
|
||||
)
|
||||
],
|
||||
),
|
||||
"SickLeave": Marker(
|
||||
display_name="Sick Leave",
|
||||
placements=[
|
||||
MarkerPlacement(
|
||||
if_with={"Timesheet"},
|
||||
dimension="timesheet",
|
||||
value="day_off_sick_leave",
|
||||
)
|
||||
],
|
||||
),
|
||||
"UndertimeDay": Marker(
|
||||
display_name="Undertime Leave",
|
||||
placements=[
|
||||
MarkerPlacement(
|
||||
if_with={"Timesheet"},
|
||||
dimension="timesheet",
|
||||
value="day_off_undertime",
|
||||
)
|
||||
],
|
||||
),
|
||||
},
|
||||
)
|
||||
107
src/streamer/localize/repository_configuration.py
Normal file
107
src/streamer/localize/repository_configuration.py
Normal file
|
|
@ -0,0 +1,107 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class Dimension(BaseModel):
|
||||
display_name: str
|
||||
comment: Optional[str] = None
|
||||
propagate: bool = False
|
||||
|
||||
|
||||
class MarkerPlacement(BaseModel):
|
||||
if_with: set[str] = set()
|
||||
dimension: str
|
||||
value: str | None = None
|
||||
|
||||
|
||||
class Marker(BaseModel):
|
||||
display_name: str
|
||||
placements: list[MarkerPlacement] = []
|
||||
|
||||
|
||||
class RepositoryConfiguration(BaseModel):
|
||||
dimensions: dict[str, Dimension]
|
||||
markers: dict[str, Marker]
|
||||
|
||||
|
||||
def merge_single_dimension(base: Dimension, second: Dimension) -> Dimension:
|
||||
second_fields_set = getattr(second, "model_fields_set", set())
|
||||
|
||||
return Dimension(
|
||||
display_name=second.display_name or base.display_name,
|
||||
comment=base.comment if second.comment is None else second.comment,
|
||||
propagate=second.propagate
|
||||
if "propagate" in second_fields_set
|
||||
else base.propagate,
|
||||
)
|
||||
|
||||
|
||||
def merge_dimensions(
|
||||
base: dict[str, Dimension], second: dict[str, Dimension]
|
||||
) -> dict[str, Dimension]:
|
||||
merged: dict[str, Dimension] = dict(base)
|
||||
for key, second_dimension in second.items():
|
||||
if key in merged:
|
||||
merged[key] = merge_single_dimension(merged[key], second_dimension)
|
||||
else:
|
||||
merged[key] = second_dimension
|
||||
return merged
|
||||
|
||||
|
||||
def _placement_identity(p: MarkerPlacement) -> tuple[frozenset[str], str]:
|
||||
return (frozenset(p.if_with), p.dimension)
|
||||
|
||||
|
||||
def merge_single_marker(base: Marker, second: Marker) -> Marker:
|
||||
merged_display_name = second.display_name or base.display_name
|
||||
|
||||
merged_placements: list[MarkerPlacement] = []
|
||||
seen: dict[tuple[frozenset[str], str], int] = {}
|
||||
|
||||
for placement in base.placements:
|
||||
ident = _placement_identity(placement)
|
||||
seen[ident] = len(merged_placements)
|
||||
merged_placements.append(placement)
|
||||
|
||||
for placement in second.placements:
|
||||
ident = _placement_identity(placement)
|
||||
if ident in seen:
|
||||
merged_placements[seen[ident]] = placement
|
||||
else:
|
||||
seen[ident] = len(merged_placements)
|
||||
merged_placements.append(placement)
|
||||
|
||||
return Marker(display_name=merged_display_name, placements=merged_placements)
|
||||
|
||||
|
||||
def merge_markers(
|
||||
base: dict[str, Marker], second: dict[str, Marker]
|
||||
) -> dict[str, Marker]:
|
||||
merged: dict[str, Marker] = dict(base)
|
||||
for key, second_marker in second.items():
|
||||
if key in merged:
|
||||
merged[key] = merge_single_marker(merged[key], second_marker)
|
||||
else:
|
||||
merged[key] = second_marker
|
||||
return merged
|
||||
|
||||
|
||||
def merge_repository_configuration(
|
||||
base: RepositoryConfiguration, second: RepositoryConfiguration
|
||||
) -> RepositoryConfiguration:
|
||||
return RepositoryConfiguration(
|
||||
dimensions=merge_dimensions(base.dimensions, second.dimensions),
|
||||
markers=merge_markers(base.markers, second.markers),
|
||||
)
|
||||
|
||||
|
||||
__all__ = [
|
||||
"Dimension",
|
||||
"Marker",
|
||||
"MarkerPlacement",
|
||||
"RepositoryConfiguration",
|
||||
"merge_repository_configuration",
|
||||
]
|
||||
|
|
@ -1,21 +0,0 @@
|
|||
from typing import Optional
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class Dimension(BaseModel):
|
||||
display_name: str
|
||||
comment: Optional[str] = None
|
||||
propagate: bool = False
|
||||
|
||||
|
||||
class Marker(BaseModel):
|
||||
display_name: str
|
||||
dimension: str
|
||||
|
||||
|
||||
class RepositoryConfiguration(BaseModel):
|
||||
dimensions: dict[str, Dimension]
|
||||
markers: dict[str, Marker]
|
||||
|
||||
|
||||
__all__ = ["Dimension", "Marker", "RepositoryConfiguration"]
|
||||
|
|
@ -20,8 +20,8 @@ def get_line_number(block_token: BlockToken) -> int:
|
|||
|
||||
|
||||
def build_shard(
|
||||
start_line,
|
||||
end_line,
|
||||
start_line: int,
|
||||
end_line: int,
|
||||
markers: list[str] = [],
|
||||
tags: list[str] = [],
|
||||
children: list[Shard] = [],
|
||||
|
|
@ -236,7 +236,7 @@ def parse_markdown_file(file_name: str, file_content: str) -> StreamFile:
|
|||
):
|
||||
shard = parsed_shard
|
||||
|
||||
return StreamFile(shard=shard, filename=file_name)
|
||||
return StreamFile(shard=shard, file_name=file_name)
|
||||
|
||||
|
||||
__all__ = ["Shard", "StreamFile", "parse_markdown_file"]
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
|
|
@ -11,7 +12,7 @@ class Shard(BaseModel):
|
|||
|
||||
|
||||
class StreamFile(BaseModel):
|
||||
filename: str
|
||||
file_name: str
|
||||
shard: Shard | None = None
|
||||
|
||||
|
||||
|
|
|
|||
3
src/streamer/query/__init__.py
Normal file
3
src/streamer/query/__init__.py
Normal file
|
|
@ -0,0 +1,3 @@
|
|||
from .find import find_shard, find_shard_by_position
|
||||
|
||||
__all__ = ["find_shard_by_position", "find_shard"]
|
||||
29
src/streamer/query/find.py
Normal file
29
src/streamer/query/find.py
Normal file
|
|
@ -0,0 +1,29 @@
|
|||
from typing import Callable
|
||||
|
||||
from streamer.localize import LocalizedShard
|
||||
|
||||
|
||||
def find_shard(
|
||||
shards: list[LocalizedShard], query_function: Callable[[LocalizedShard], bool]
|
||||
) -> list[LocalizedShard]:
|
||||
found_shards = []
|
||||
|
||||
for shard in shards:
|
||||
if query_function(shard):
|
||||
found_shards.append(shard)
|
||||
found_shards.extend(find_shard(shard.children, query_function))
|
||||
|
||||
return found_shards
|
||||
|
||||
|
||||
def find_shard_by_position(
|
||||
shards: list[LocalizedShard], dimension: str, value: str
|
||||
) -> list[LocalizedShard]:
|
||||
return find_shard(
|
||||
shards,
|
||||
lambda shard: dimension in shard.location
|
||||
and shard.location[dimension] == value,
|
||||
)
|
||||
|
||||
|
||||
__all__ = ["find_shard_by_position", "find_shard"]
|
||||
|
|
@ -1,15 +0,0 @@
|
|||
from streamer.parse.shard import Shard
|
||||
|
||||
|
||||
def find_by_markers(shard: Shard, has: list[str], has_not: list[str]) -> list[Shard]:
|
||||
found_shards = []
|
||||
|
||||
if any(tag in has for tag in shard.markers) and not any(
|
||||
tag in has_not for tag in shard.markers
|
||||
):
|
||||
found_shards.append(shard)
|
||||
|
||||
for child in shard.children:
|
||||
found_shards.extend(find_by_markers(child, has, has_not))
|
||||
|
||||
return found_shards
|
||||
|
|
@ -1,32 +1,157 @@
|
|||
from datetime import datetime
|
||||
from streamer.localize.extract_datetime import extract_date_from_file_name
|
||||
from datetime import date, datetime, time
|
||||
|
||||
from streamer.localize.extract_datetime import (
|
||||
extract_date_from_marker,
|
||||
extract_datetime_from_file_name,
|
||||
extract_datetime_from_marker,
|
||||
extract_datetime_from_marker_list,
|
||||
extract_time_from_marker,
|
||||
)
|
||||
|
||||
|
||||
class TestExtractDateTime:
|
||||
def test_extract_date_from_file_name_valid(self):
|
||||
file_name = "20230101-123456 Some Text.md"
|
||||
assert datetime(2023, 1, 1, 12, 34, 56) == extract_date_from_file_name(
|
||||
assert datetime(2023, 1, 1, 12, 34, 56) == extract_datetime_from_file_name(
|
||||
file_name
|
||||
)
|
||||
|
||||
def test_extract_date_from_file_name_invalid(self):
|
||||
file_name = "invalid-file-name.md"
|
||||
assert extract_date_from_file_name(file_name) is None
|
||||
assert extract_datetime_from_file_name(file_name) is None
|
||||
|
||||
def test_extract_date_from_file_name_without_time(self):
|
||||
file_name = "20230101 Some Text.md"
|
||||
assert datetime(2023, 1, 1, 0, 0, 0) == extract_date_from_file_name(file_name)
|
||||
assert datetime(2023, 1, 1, 0, 0, 0) == extract_datetime_from_file_name(
|
||||
file_name
|
||||
)
|
||||
|
||||
def test_extract_date_from_file_name_short_time(self):
|
||||
file_name = "20230101-1234 Some Text.md"
|
||||
assert datetime(2023, 1, 1, 12, 34, 0) == extract_date_from_file_name(file_name)
|
||||
assert datetime(2023, 1, 1, 12, 34, 0) == extract_datetime_from_file_name(
|
||||
file_name
|
||||
)
|
||||
|
||||
def test_extract_date_from_file_name_empty_string(self):
|
||||
file_name = ""
|
||||
assert extract_date_from_file_name(file_name) is None
|
||||
assert extract_datetime_from_file_name(file_name) is None
|
||||
|
||||
def test_extract_date_from_file_name_with_full_path(self):
|
||||
file_name = "/path/to/20230101-123456 Some Text.md"
|
||||
assert datetime(2023, 1, 1, 12, 34, 56) == extract_date_from_file_name(
|
||||
assert datetime(2023, 1, 1, 12, 34, 56) == extract_datetime_from_file_name(
|
||||
file_name
|
||||
)
|
||||
|
||||
|
||||
class TestExtractMarkerDateTime:
|
||||
def test_extract_datetime_from_marker_valid(self):
|
||||
marker = "20250101150000"
|
||||
assert datetime(2025, 1, 1, 15, 0, 0) == extract_datetime_from_marker(marker)
|
||||
|
||||
def test_extract_datetime_from_marker_invalid_format(self):
|
||||
assert extract_datetime_from_marker("2025010115000") is None # too short
|
||||
assert extract_datetime_from_marker("202501011500000") is None # too long
|
||||
assert extract_datetime_from_marker("2025-01-01T150000") is None # separators
|
||||
assert extract_datetime_from_marker("2025010115000a") is None # non-digit
|
||||
assert extract_datetime_from_marker("") is None
|
||||
|
||||
def test_extract_datetime_from_marker_invalid_values(self):
|
||||
assert extract_datetime_from_marker("20250230120000") is None # Feb 30
|
||||
assert extract_datetime_from_marker("20250101126000") is None # minute 60
|
||||
assert extract_datetime_from_marker("20250101240000") is None # hour 24
|
||||
|
||||
|
||||
class TestExtractMarkerDate:
|
||||
def test_extract_date_from_marker_valid(self):
|
||||
marker = "20250101"
|
||||
assert date(2025, 1, 1) == extract_date_from_marker(marker)
|
||||
|
||||
def test_extract_date_from_marker_invalid_format(self):
|
||||
assert extract_date_from_marker("2025010") is None # too short
|
||||
assert extract_date_from_marker("202501011") is None # too long
|
||||
assert extract_date_from_marker("2025-01-01") is None # separators
|
||||
assert extract_date_from_marker("2025010a") is None # non-digit
|
||||
assert extract_date_from_marker("") is None
|
||||
|
||||
def test_extract_date_from_marker_invalid_values(self):
|
||||
assert extract_date_from_marker("20250230") is None # Feb 30
|
||||
assert extract_date_from_marker("20251301") is None # month 13
|
||||
assert extract_date_from_marker("20250132") is None # day 32
|
||||
|
||||
|
||||
class TestExtractMarkerTime:
|
||||
def test_extract_time_from_marker_valid(self):
|
||||
marker = "150000"
|
||||
assert time(15, 0, 0) == extract_time_from_marker(marker)
|
||||
|
||||
def test_extract_time_from_marker_invalid_format(self):
|
||||
assert extract_time_from_marker("15000") is None # too short
|
||||
assert extract_time_from_marker("1500000") is None # too long
|
||||
assert extract_time_from_marker("15:00:00") is None # separators
|
||||
assert extract_time_from_marker("15000a") is None # non-digit
|
||||
assert extract_time_from_marker("") is None
|
||||
|
||||
def test_extract_time_from_marker_invalid_values(self):
|
||||
assert extract_time_from_marker("240000") is None # hour 24
|
||||
assert extract_time_from_marker("156000") is None # minute 60
|
||||
assert extract_time_from_marker("150060") is None # second 60
|
||||
|
||||
|
||||
class TestExtractDateTimeFromMarkerList:
|
||||
def test_no_markers_inherits_datetime(self):
|
||||
inherited = datetime(2025, 1, 2, 3, 4, 5)
|
||||
assert inherited == extract_datetime_from_marker_list([], inherited)
|
||||
|
||||
def test_unrelated_markers_inherits_datetime(self):
|
||||
inherited = datetime(2025, 1, 2, 3, 4, 5)
|
||||
markers = ["not-a-marker", "2025-01-01", "1500", "1234567"]
|
||||
assert inherited == extract_datetime_from_marker_list(markers, inherited)
|
||||
|
||||
def test_date_only_marker_sets_midnight(self):
|
||||
inherited = datetime(2025, 6, 7, 8, 9, 10)
|
||||
markers = ["20250101"]
|
||||
assert datetime(2025, 1, 1, 0, 0, 0) == extract_datetime_from_marker_list(
|
||||
markers, inherited
|
||||
)
|
||||
|
||||
def test_time_only_marker_inherits_date(self):
|
||||
inherited = datetime(2025, 6, 7, 8, 9, 10)
|
||||
markers = ["150000"]
|
||||
assert datetime(2025, 6, 7, 15, 0, 0) == extract_datetime_from_marker_list(
|
||||
markers, inherited
|
||||
)
|
||||
|
||||
def test_datetime_marker_overrides_both_date_and_time(self):
|
||||
inherited = datetime(2025, 6, 7, 8, 9, 10)
|
||||
markers = ["20250101150000"]
|
||||
assert datetime(2025, 1, 1, 15, 0, 0) == extract_datetime_from_marker_list(
|
||||
markers, inherited
|
||||
)
|
||||
|
||||
def test_combined_date_and_time_markers(self):
|
||||
inherited = datetime(2025, 6, 7, 8, 9, 10)
|
||||
markers = ["20250101", "150000"]
|
||||
assert datetime(2025, 1, 1, 15, 0, 0) == extract_datetime_from_marker_list(
|
||||
markers, inherited
|
||||
)
|
||||
|
||||
def test_first_marker_wins_when_multiple_dates_or_times(self):
|
||||
inherited = datetime(2025, 6, 7, 8, 9, 10)
|
||||
markers = ["20250101", "150000", "20250102", "160000"]
|
||||
assert datetime(2025, 1, 1, 15, 0, 0) == extract_datetime_from_marker_list(
|
||||
markers, inherited
|
||||
)
|
||||
|
||||
def test_last_separated_date_and_time_win(self):
|
||||
inherited = datetime(2025, 6, 7, 8, 9, 10)
|
||||
markers = ["20250101", "150000", "20250102160000"]
|
||||
assert datetime(2025, 1, 1, 15, 0, 0) == extract_datetime_from_marker_list(
|
||||
markers, inherited
|
||||
)
|
||||
|
||||
def test_invalid_date_or_time_markers_are_ignored(self):
|
||||
inherited = datetime(2025, 6, 7, 8, 9, 10)
|
||||
markers = ["20251301", "240000", "20250101", "150000"]
|
||||
assert datetime(2025, 1, 1, 15, 0, 0) == extract_datetime_from_marker_list(
|
||||
markers, inherited
|
||||
)
|
||||
|
|
|
|||
365
test/localize/test_repository_configuration_merge.py
Normal file
365
test/localize/test_repository_configuration_merge.py
Normal file
|
|
@ -0,0 +1,365 @@
|
|||
import pytest
|
||||
|
||||
from streamer.localize.repository_configuration import (
|
||||
Dimension,
|
||||
Marker,
|
||||
MarkerPlacement,
|
||||
RepositoryConfiguration,
|
||||
merge_dimensions,
|
||||
merge_markers,
|
||||
merge_repository_configuration,
|
||||
merge_single_dimension,
|
||||
merge_single_marker,
|
||||
)
|
||||
|
||||
|
||||
class TestMergeSingleDimension:
|
||||
def test_second_overrides_display_name_when_non_empty(self):
|
||||
base = Dimension(display_name="Base", comment="c1", propagate=True)
|
||||
second = Dimension(display_name="Second", comment="c2", propagate=False)
|
||||
|
||||
merged = merge_single_dimension(base, second)
|
||||
|
||||
assert merged.display_name == "Second"
|
||||
assert merged.comment == "c2"
|
||||
assert merged.propagate is False
|
||||
|
||||
def test_second_empty_display_name_falls_back_to_base(self):
|
||||
base = Dimension(display_name="Base", comment="c1", propagate=True)
|
||||
second = Dimension(display_name="", comment="c2", propagate=False)
|
||||
|
||||
merged = merge_single_dimension(base, second)
|
||||
|
||||
assert merged.display_name == "Base"
|
||||
assert merged.comment == "c2"
|
||||
assert merged.propagate is False
|
||||
|
||||
def test_second_comment_none_does_not_erase_base_comment(self):
|
||||
base = Dimension(display_name="Base", comment="keep", propagate=True)
|
||||
second = Dimension(display_name="Second", comment=None, propagate=False)
|
||||
|
||||
merged = merge_single_dimension(base, second)
|
||||
|
||||
assert merged.display_name == "Second"
|
||||
assert merged.comment == "keep"
|
||||
|
||||
def test_second_comment_non_none_overrides_base_comment(self):
|
||||
base = Dimension(display_name="Base", comment="c1", propagate=True)
|
||||
second = Dimension(display_name="Second", comment="c2", propagate=True)
|
||||
|
||||
merged = merge_single_dimension(base, second)
|
||||
|
||||
assert merged.comment == "c2"
|
||||
|
||||
def test_second_propagate_overrides_base_when_provided(self):
|
||||
base = Dimension(display_name="Base", comment="c1", propagate=True)
|
||||
second = Dimension(display_name="Second", comment="c2", propagate=False)
|
||||
|
||||
merged = merge_single_dimension(base, second)
|
||||
|
||||
assert merged.propagate is False
|
||||
|
||||
def test_propagate_merging_retains_base_when_second_not_provided(self):
|
||||
base = Dimension(display_name="Base", comment="c1", propagate=True)
|
||||
second = Dimension(display_name="Second", comment="c2")
|
||||
|
||||
merged = merge_single_dimension(base, second)
|
||||
|
||||
assert merged.propagate is True
|
||||
|
||||
|
||||
class TestMergeDimensions:
|
||||
def test_adds_new_keys_from_second(self):
|
||||
base = {"a": Dimension(display_name="A", propagate=True)}
|
||||
second = {"b": Dimension(display_name="B", propagate=False)}
|
||||
|
||||
merged = merge_dimensions(base, second)
|
||||
|
||||
assert set(merged.keys()) == {"a", "b"}
|
||||
assert merged["a"].display_name == "A"
|
||||
assert merged["b"].display_name == "B"
|
||||
|
||||
def test_merges_existing_keys(self):
|
||||
base = {"a": Dimension(display_name="A", comment="c1", propagate=True)}
|
||||
second = {"a": Dimension(display_name="A2", comment=None, propagate=False)}
|
||||
|
||||
merged = merge_dimensions(base, second)
|
||||
|
||||
assert merged["a"].display_name == "A2"
|
||||
assert merged["a"].comment == "c1"
|
||||
assert merged["a"].propagate is False
|
||||
|
||||
def test_does_not_mutate_inputs(self):
|
||||
base = {"a": Dimension(display_name="A", comment="c1", propagate=True)}
|
||||
second = {"b": Dimension(display_name="B", comment="c2", propagate=False)}
|
||||
|
||||
merged = merge_dimensions(base, second)
|
||||
|
||||
assert "b" not in base
|
||||
assert "a" not in second
|
||||
assert set(merged.keys()) == {"a", "b"}
|
||||
|
||||
|
||||
class TestMergeSingleMarker:
|
||||
def test_second_overrides_display_name_when_non_empty(self):
|
||||
base = Marker(
|
||||
display_name="Base",
|
||||
placements=[MarkerPlacement(dimension="project", value=None)],
|
||||
)
|
||||
second = Marker(
|
||||
display_name="Second",
|
||||
placements=[MarkerPlacement(dimension="timesheet", value="coding")],
|
||||
)
|
||||
|
||||
merged = merge_single_marker(base, second)
|
||||
|
||||
assert merged.display_name == "Second"
|
||||
assert merged.placements == [
|
||||
MarkerPlacement(dimension="project", value=None, if_with=set()),
|
||||
MarkerPlacement(dimension="timesheet", value="coding", if_with=set()),
|
||||
]
|
||||
|
||||
def test_second_empty_display_name_falls_back_to_base(self):
|
||||
base = Marker(display_name="Base", placements=[])
|
||||
second = Marker(display_name="", placements=[])
|
||||
|
||||
merged = merge_single_marker(base, second)
|
||||
|
||||
assert merged.display_name == "Base"
|
||||
|
||||
def test_appends_new_placements(self):
|
||||
base = Marker(
|
||||
display_name="Base",
|
||||
placements=[
|
||||
MarkerPlacement(dimension="project"),
|
||||
],
|
||||
)
|
||||
second = Marker(
|
||||
display_name="Second",
|
||||
placements=[
|
||||
MarkerPlacement(
|
||||
if_with={"Timesheet"}, dimension="timesheet", value="x"
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
merged = merge_single_marker(base, second)
|
||||
|
||||
assert merged.placements == [
|
||||
MarkerPlacement(dimension="project"),
|
||||
MarkerPlacement(if_with={"Timesheet"}, dimension="timesheet", value="x"),
|
||||
]
|
||||
|
||||
def test_deduplicates_by_identity_and_second_overrides_base(self):
|
||||
base = Marker(
|
||||
display_name="Base",
|
||||
placements=[
|
||||
MarkerPlacement(if_with={"A"}, dimension="d", value="v"),
|
||||
MarkerPlacement(if_with={"B"}, dimension="d", value="v2"),
|
||||
],
|
||||
)
|
||||
second = Marker(
|
||||
display_name="Second",
|
||||
placements=[
|
||||
MarkerPlacement(if_with={"A"}, dimension="d", value="v"),
|
||||
MarkerPlacement(if_with={"C"}, dimension="d", value="v3"),
|
||||
],
|
||||
)
|
||||
|
||||
merged = merge_single_marker(base, second)
|
||||
|
||||
assert merged.placements == [
|
||||
MarkerPlacement(if_with={"A"}, dimension="d", value="v"),
|
||||
MarkerPlacement(if_with={"B"}, dimension="d", value="v2"),
|
||||
MarkerPlacement(if_with={"C"}, dimension="d", value="v3"),
|
||||
]
|
||||
|
||||
def test_identity_is_order_insensitive_for_if_with(self):
|
||||
base = Marker(
|
||||
display_name="Base",
|
||||
placements=[MarkerPlacement(if_with={"A", "B"}, dimension="d", value="v")],
|
||||
)
|
||||
second = Marker(
|
||||
display_name="Second",
|
||||
placements=[MarkerPlacement(if_with={"B", "A"}, dimension="d", value="v2")],
|
||||
)
|
||||
|
||||
merged = merge_single_marker(base, second)
|
||||
|
||||
# With `if_with` as a set, identity is order-insensitive; second overrides base.
|
||||
assert merged.placements == [
|
||||
MarkerPlacement(if_with={"A", "B"}, dimension="d", value="v2"),
|
||||
]
|
||||
|
||||
|
||||
class TestMergeMarkers:
|
||||
def test_adds_new_marker_keys_from_second(self):
|
||||
base = {"M1": Marker(display_name="M1", placements=[])}
|
||||
second = {"M2": Marker(display_name="M2", placements=[])}
|
||||
|
||||
merged = merge_markers(base, second)
|
||||
|
||||
assert set(merged.keys()) == {"M1", "M2"}
|
||||
|
||||
def test_merges_existing_marker_keys(self):
|
||||
base = {
|
||||
"M": Marker(
|
||||
display_name="Base",
|
||||
placements=[MarkerPlacement(dimension="project")],
|
||||
)
|
||||
}
|
||||
second = {
|
||||
"M": Marker(
|
||||
display_name="Second",
|
||||
placements=[
|
||||
MarkerPlacement(
|
||||
if_with={"Timesheet"}, dimension="timesheet", value="coding"
|
||||
)
|
||||
],
|
||||
)
|
||||
}
|
||||
|
||||
merged = merge_markers(base, second)
|
||||
|
||||
assert merged["M"].display_name == "Second"
|
||||
assert merged["M"].placements == [
|
||||
MarkerPlacement(dimension="project", value=None, if_with=set()),
|
||||
MarkerPlacement(
|
||||
if_with={"Timesheet"}, dimension="timesheet", value="coding"
|
||||
),
|
||||
]
|
||||
|
||||
def test_does_not_mutate_inputs(self):
|
||||
base = {"M1": Marker(display_name="M1", placements=[])}
|
||||
second = {"M2": Marker(display_name="M2", placements=[])}
|
||||
|
||||
merged = merge_markers(base, second)
|
||||
|
||||
assert "M2" not in base
|
||||
assert "M1" not in second
|
||||
assert set(merged.keys()) == {"M1", "M2"}
|
||||
|
||||
|
||||
class TestMergeRepositoryConfiguration:
|
||||
def test_merges_dimensions_and_markers(self):
|
||||
base = RepositoryConfiguration(
|
||||
dimensions={
|
||||
"project": Dimension(
|
||||
display_name="Project", comment="c1", propagate=True
|
||||
),
|
||||
"moment": Dimension(
|
||||
display_name="Moment", comment="c2", propagate=True
|
||||
),
|
||||
},
|
||||
markers={
|
||||
"Streamer": Marker(
|
||||
display_name="Streamer",
|
||||
placements=[MarkerPlacement(dimension="project")],
|
||||
)
|
||||
},
|
||||
)
|
||||
|
||||
second = RepositoryConfiguration(
|
||||
dimensions={
|
||||
"project": Dimension(display_name="Project2", propagate=False),
|
||||
"timesheet": Dimension(
|
||||
display_name="Timesheet", comment="c3", propagate=False
|
||||
),
|
||||
},
|
||||
markers={
|
||||
"Streamer": Marker(
|
||||
display_name="Streamer2",
|
||||
placements=[
|
||||
MarkerPlacement(
|
||||
if_with={"Timesheet"}, dimension="timesheet", value="coding"
|
||||
)
|
||||
],
|
||||
),
|
||||
"JobHunting": Marker(
|
||||
display_name="JobHunting",
|
||||
placements=[MarkerPlacement(dimension="project")],
|
||||
),
|
||||
},
|
||||
)
|
||||
|
||||
merged = merge_repository_configuration(base, second)
|
||||
|
||||
assert set(merged.dimensions.keys()) == {"project", "moment", "timesheet"}
|
||||
assert merged.dimensions["project"].display_name == "Project2"
|
||||
assert merged.dimensions["project"].comment == "c1"
|
||||
assert merged.dimensions["project"].propagate is False
|
||||
assert merged.dimensions["moment"].display_name == "Moment"
|
||||
assert merged.dimensions["timesheet"].display_name == "Timesheet"
|
||||
|
||||
assert set(merged.markers.keys()) == {"Streamer", "JobHunting"}
|
||||
assert merged.markers["Streamer"].display_name == "Streamer2"
|
||||
assert merged.markers["Streamer"].placements == [
|
||||
MarkerPlacement(dimension="project", value=None, if_with=set()),
|
||||
MarkerPlacement(
|
||||
if_with={"Timesheet"}, dimension="timesheet", value="coding"
|
||||
),
|
||||
]
|
||||
assert merged.markers["JobHunting"].placements == [
|
||||
MarkerPlacement(dimension="project", value=None, if_with=set())
|
||||
]
|
||||
|
||||
def test_does_not_mutate_base_or_second(self):
|
||||
base = RepositoryConfiguration(
|
||||
dimensions={"a": Dimension(display_name="A", propagate=True)},
|
||||
markers={"M": Marker(display_name="M", placements=[])},
|
||||
)
|
||||
second = RepositoryConfiguration(
|
||||
dimensions={"b": Dimension(display_name="B", propagate=False)},
|
||||
markers={"N": Marker(display_name="N", placements=[])},
|
||||
)
|
||||
|
||||
_ = merge_repository_configuration(base, second)
|
||||
|
||||
assert set(base.dimensions.keys()) == {"a"}
|
||||
assert set(second.dimensions.keys()) == {"b"}
|
||||
assert set(base.markers.keys()) == {"M"}
|
||||
assert set(second.markers.keys()) == {"N"}
|
||||
|
||||
def test_merge_is_associative_for_non_conflicting_inputs(self):
|
||||
a = RepositoryConfiguration(
|
||||
dimensions={"d1": Dimension(display_name="D1", propagate=True)},
|
||||
markers={"m1": Marker(display_name="M1", placements=[])},
|
||||
)
|
||||
b = RepositoryConfiguration(
|
||||
dimensions={"d2": Dimension(display_name="D2", propagate=False)},
|
||||
markers={"m2": Marker(display_name="M2", placements=[])},
|
||||
)
|
||||
c = RepositoryConfiguration(
|
||||
dimensions={"d3": Dimension(display_name="D3", propagate=False)},
|
||||
markers={"m3": Marker(display_name="M3", placements=[])},
|
||||
)
|
||||
|
||||
left = merge_repository_configuration(merge_repository_configuration(a, b), c)
|
||||
right = merge_repository_configuration(a, merge_repository_configuration(b, c))
|
||||
|
||||
assert left == right
|
||||
assert set(left.dimensions.keys()) == {"d1", "d2", "d3"}
|
||||
assert set(left.markers.keys()) == {"m1", "m2", "m3"}
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("base", "second", "expected_propagate"),
|
||||
[
|
||||
(
|
||||
RepositoryConfiguration(
|
||||
dimensions={"d": Dimension(display_name="D", propagate=True)},
|
||||
markers={},
|
||||
),
|
||||
RepositoryConfiguration(
|
||||
dimensions={"d": Dimension(display_name="D2")},
|
||||
markers={},
|
||||
),
|
||||
True,
|
||||
)
|
||||
],
|
||||
)
|
||||
def test_merge_repository_configuration_propagate_preserves_base_when_omitted(
|
||||
base, second, expected_propagate
|
||||
):
|
||||
merged = merge_repository_configuration(base, second)
|
||||
assert merged.dimensions["d"].propagate is expected_propagate
|
||||
|
|
@ -1,6 +1,7 @@
|
|||
from streamer.parse import StreamFile, parse_markdown_file, Shard
|
||||
from faker import Faker
|
||||
|
||||
from streamer.parse import Shard, StreamFile, parse_markdown_file
|
||||
|
||||
fake = Faker()
|
||||
|
||||
|
||||
|
|
@ -9,13 +10,13 @@ class TestParseProcess:
|
|||
|
||||
def test_parse_empty_file(self):
|
||||
assert parse_markdown_file(self.file_name, "") == StreamFile(
|
||||
filename=self.file_name, shard=Shard(start_line=1, end_line=1)
|
||||
file_name=self.file_name, shard=Shard(start_line=1, end_line=1)
|
||||
)
|
||||
|
||||
def test_parse_basic_one_line_file(self):
|
||||
test_file = "Hello World"
|
||||
assert parse_markdown_file(self.file_name, test_file) == StreamFile(
|
||||
filename=self.file_name,
|
||||
file_name=self.file_name,
|
||||
shard=Shard(
|
||||
start_line=1,
|
||||
end_line=1,
|
||||
|
|
@ -25,7 +26,7 @@ class TestParseProcess:
|
|||
def test_parse_basic_multi_line_file(self):
|
||||
test_file = "Hello World\n\nHello again!"
|
||||
assert parse_markdown_file(self.file_name, test_file) == StreamFile(
|
||||
filename=self.file_name,
|
||||
file_name=self.file_name,
|
||||
shard=Shard(
|
||||
start_line=1,
|
||||
end_line=3,
|
||||
|
|
@ -35,7 +36,7 @@ class TestParseProcess:
|
|||
def test_parse_single_line_with_tag(self):
|
||||
test_file = "@Tag Hello World"
|
||||
assert parse_markdown_file(self.file_name, test_file) == StreamFile(
|
||||
filename=self.file_name,
|
||||
file_name=self.file_name,
|
||||
shard=Shard(
|
||||
markers=["Tag"],
|
||||
start_line=1,
|
||||
|
|
@ -46,7 +47,7 @@ class TestParseProcess:
|
|||
def test_parse_single_line_with_two_tags(self):
|
||||
test_file = "@Marker1 @Marker2 Hello World"
|
||||
assert parse_markdown_file(self.file_name, test_file) == StreamFile(
|
||||
filename=self.file_name,
|
||||
file_name=self.file_name,
|
||||
shard=Shard(
|
||||
markers=["Marker1", "Marker2"],
|
||||
start_line=1,
|
||||
|
|
@ -57,7 +58,7 @@ class TestParseProcess:
|
|||
def test_parse_single_line_with_two_tags_and_misplaced_tag(self):
|
||||
test_file = "@Tag1 @Tag2 Hello World @Tag3"
|
||||
assert parse_markdown_file(self.file_name, test_file) == StreamFile(
|
||||
filename=self.file_name,
|
||||
file_name=self.file_name,
|
||||
shard=Shard(
|
||||
markers=["Tag1", "Tag2"],
|
||||
tags=["Tag3"],
|
||||
|
|
@ -70,7 +71,7 @@ class TestParseProcess:
|
|||
file_text = "Hello World!\n\n@Tag1 Block 1\n\n@Tag2 Block 2"
|
||||
|
||||
assert parse_markdown_file(self.file_name, file_text) == StreamFile(
|
||||
filename=self.file_name,
|
||||
file_name=self.file_name,
|
||||
shard=Shard(
|
||||
start_line=1,
|
||||
end_line=5,
|
||||
|
|
@ -113,7 +114,7 @@ class TestParseProcess:
|
|||
file_text = "# Heading @Tag1\n\n## @Marker1 Subheading @Tag2\n\n# Heading @Tag3"
|
||||
|
||||
assert parse_markdown_file(self.file_name, file_text) == StreamFile(
|
||||
filename=self.file_name,
|
||||
file_name=self.file_name,
|
||||
shard=Shard(
|
||||
start_line=1,
|
||||
end_line=5,
|
||||
|
|
@ -140,7 +141,7 @@ class TestParseProcess:
|
|||
file_text = "# @Marker1 Heading @Tag1\n\n## Subheading @Tag2"
|
||||
|
||||
assert parse_markdown_file(self.file_name, file_text) == StreamFile(
|
||||
filename=self.file_name,
|
||||
file_name=self.file_name,
|
||||
shard=Shard(
|
||||
markers=["Marker1"],
|
||||
tags=["Tag1", "Tag2"],
|
||||
104
test/query/test_find.py
Normal file
104
test/query/test_find.py
Normal file
|
|
@ -0,0 +1,104 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from streamer.localize import LocalizedShard
|
||||
from streamer.query.find import find_shard, find_shard_by_position
|
||||
|
||||
|
||||
def generate_localized_shard(
|
||||
*,
|
||||
location: dict[str, str] | None = None,
|
||||
children: list[LocalizedShard] | None = None,
|
||||
) -> LocalizedShard:
|
||||
return LocalizedShard(
|
||||
start_line=1,
|
||||
end_line=1,
|
||||
moment=datetime(2020, 1, 1),
|
||||
location=location or {},
|
||||
children=children or [],
|
||||
markers=[],
|
||||
tags=[],
|
||||
)
|
||||
|
||||
|
||||
class TestFindShard:
|
||||
def test_returns_empty_when_no_match(self) -> None:
|
||||
root = generate_localized_shard(location={"file": "a.md"})
|
||||
shards = [root]
|
||||
|
||||
result = find_shard(shards, lambda s: "missing" in s.location)
|
||||
|
||||
assert result == []
|
||||
|
||||
def test_finds_matches_depth_first_and_preserves_order(self) -> None:
|
||||
grandchild = generate_localized_shard(location={"k": "match"})
|
||||
child1 = generate_localized_shard(
|
||||
location={"k": "match"}, children=[grandchild]
|
||||
)
|
||||
child2 = generate_localized_shard(location={"k": "nope"})
|
||||
root = generate_localized_shard(
|
||||
location={"k": "nope"}, children=[child1, child2]
|
||||
)
|
||||
|
||||
result = find_shard([root], lambda s: s.location.get("k") == "match")
|
||||
|
||||
assert result == [child1, grandchild]
|
||||
|
||||
def test_includes_root_if_it_matches(self) -> None:
|
||||
root = generate_localized_shard(
|
||||
location={"k": "match"},
|
||||
children=[generate_localized_shard(location={"k": "match"})],
|
||||
)
|
||||
|
||||
result = find_shard([root], lambda s: s.location.get("k") == "match")
|
||||
|
||||
assert result[0] is root
|
||||
assert len(result) == 2
|
||||
|
||||
def test_multiple_roots_keeps_left_to_right_order(self) -> None:
|
||||
a = generate_localized_shard(location={"k": "match"})
|
||||
b = generate_localized_shard(location={"k": "match"})
|
||||
c = generate_localized_shard(location={"k": "nope"})
|
||||
|
||||
result = find_shard([a, b, c], lambda s: s.location.get("k") == "match")
|
||||
|
||||
assert result == [a, b]
|
||||
|
||||
def test_query_function_can_use_arbitrary_logic(self) -> None:
|
||||
# Ensures typing/behavior supports any callable that returns bool.
|
||||
a = generate_localized_shard(location={"x": "1"})
|
||||
b = generate_localized_shard(location={"x": "2"})
|
||||
c = generate_localized_shard(location={"x": "3"})
|
||||
root = generate_localized_shard(location={}, children=[a, b, c])
|
||||
|
||||
def is_even_x(shard: LocalizedShard) -> bool:
|
||||
x = shard.location.get("x")
|
||||
return x is not None and int(x) % 2 == 0
|
||||
|
||||
result = find_shard([root], is_even_x)
|
||||
|
||||
assert result == [b]
|
||||
|
||||
|
||||
class TestFindShardByPosition:
|
||||
def test_matches_only_when_dimension_present_and_equal(self) -> None:
|
||||
match = generate_localized_shard(location={"file": "a.md", "line": "10"})
|
||||
wrong_value = generate_localized_shard(location={"file": "a.md", "line": "11"})
|
||||
missing_dim = generate_localized_shard(location={"file": "a.md"})
|
||||
root = generate_localized_shard(
|
||||
location={"root": "x"}, children=[match, wrong_value, missing_dim]
|
||||
)
|
||||
|
||||
result = find_shard_by_position([root], "line", "10")
|
||||
|
||||
assert result == [match]
|
||||
|
||||
def test_recurses_through_children(self) -> None:
|
||||
deep = generate_localized_shard(location={"section": "s1"})
|
||||
mid = generate_localized_shard(location={"section": "s0"}, children=[deep])
|
||||
root = generate_localized_shard(location={}, children=[mid])
|
||||
|
||||
result = find_shard_by_position([root], "section", "s1")
|
||||
|
||||
assert result == [deep]
|
||||
|
|
@ -1,8 +1,11 @@
|
|||
from datetime import datetime
|
||||
|
||||
from streamer.localize.localize import localize_stream_file
|
||||
from streamer.localize.localized_shard import LocalizedShard
|
||||
from streamer.localize.repostory_configuration import (
|
||||
from streamer.localize.repository_configuration import (
|
||||
Dimension,
|
||||
Marker,
|
||||
MarkerPlacement,
|
||||
RepositoryConfiguration,
|
||||
)
|
||||
from streamer.parse.shard import Shard, StreamFile
|
||||
|
|
@ -19,28 +22,66 @@ repository_configuration = RepositoryConfiguration(
|
|||
comment="Timestamp this entry was created at",
|
||||
propagate=True,
|
||||
),
|
||||
"timesheet": Dimension(
|
||||
display_name="Timesheet",
|
||||
comment="Time Cards for Time Tracking",
|
||||
propagate=True,
|
||||
),
|
||||
},
|
||||
markers={
|
||||
"streamer": Marker(display_name="Streamer", dimension="project"),
|
||||
"jobhunting": Marker(display_name="JobHunting", dimension="project"),
|
||||
"Streamer": Marker(
|
||||
display_name="Streamer",
|
||||
placements=[
|
||||
MarkerPlacement(dimension="project"),
|
||||
MarkerPlacement(
|
||||
if_with={"Timesheet"}, dimension="timesheet", value="coding"
|
||||
),
|
||||
],
|
||||
),
|
||||
"JobHunting": Marker(
|
||||
display_name="JobHunting", placements=[MarkerPlacement(dimension="project")]
|
||||
),
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
class TestExtractDateTime:
|
||||
class TestLocalize:
|
||||
def test_project_simple_stream_file(self):
|
||||
stream_file = StreamFile(
|
||||
filename="20250622-121000 Test File.md",
|
||||
file_name="20250622-121000 Test File.md",
|
||||
shard=Shard(start_line=1, end_line=1, markers=["Streamer"]),
|
||||
)
|
||||
|
||||
assert localize_stream_file(
|
||||
stream_file, repository_configuration
|
||||
) == LocalizedShard(
|
||||
moment=datetime(2025, 6, 22, 12, 10, 0, 0),
|
||||
markers=["Streamer"],
|
||||
tags=[],
|
||||
start_line=1,
|
||||
end_line=1,
|
||||
children=[],
|
||||
location={"moment": "2025-06-22T12:10:00", "project": "streamer"},
|
||||
location={"project": "Streamer", "file": stream_file.file_name},
|
||||
)
|
||||
|
||||
def test_timesheet_use_case(self):
|
||||
stream_file = StreamFile(
|
||||
file_name="20260131-210000 Test File.md",
|
||||
shard=Shard(start_line=1, end_line=1, markers=["Timesheet", "Streamer"]),
|
||||
)
|
||||
|
||||
assert localize_stream_file(
|
||||
stream_file, repository_configuration
|
||||
) == LocalizedShard(
|
||||
moment=datetime(2026, 1, 31, 21, 0, 0, 0),
|
||||
markers=["Timesheet", "Streamer"],
|
||||
tags=[],
|
||||
start_line=1,
|
||||
end_line=1,
|
||||
children=[],
|
||||
location={
|
||||
"file": stream_file.file_name,
|
||||
"project": "Streamer",
|
||||
"timesheet": "coding",
|
||||
},
|
||||
)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue