diff --git a/src/streamd/__init__.py b/src/streamd/__init__.py new file mode 100644 index 0000000..c72d098 --- /dev/null +++ b/src/streamd/__init__.py @@ -0,0 +1,126 @@ +import glob +import os +from collections.abc import Generator +from datetime import datetime +from shutil import move +from typing import Annotated + +import click +import typer +from rich import print +from rich.markdown import Markdown +from rich.panel import Panel + +from streamd.localize import ( + LocalizedShard, + RepositoryConfiguration, + localize_stream_file, +) +from streamd.localize.preconfigured_configurations import TaskConfiguration +from streamd.parse import parse_markdown_file +from streamd.query import find_shard_by_position +from streamd.settings import Settings +from streamd.timesheet.configuration import BasicTimesheetConfiguration +from streamd.timesheet.extract import extract_timesheets + +app = typer.Typer() + + +def all_files(config: RepositoryConfiguration) -> Generator[LocalizedShard]: + for file_name in glob.glob(f"{glob.escape(Settings().base_folder)}/*.md"): + with open(file_name, "r") as file: + file_content = file.read() + if shard := localize_stream_file( + parse_markdown_file(file_name, file_content), config + ): + yield shard + + +@app.command() +def todo() -> None: + all_shards = list(all_files(TaskConfiguration)) + + for task_shard in find_shard_by_position(all_shards, "task", "open"): + with open(task_shard.location["file"], "r") as file: + file_content = file.read().splitlines() + print( + Panel( + Markdown( + "\n".join( + file_content[ + task_shard.start_line - 1 : task_shard.end_line + ] + ) + ), + title=f"{task_shard.location['file']}:{task_shard.start_line}", + ) + ) + + +@app.command() +def edit(number: Annotated[int, typer.Argument()] = 1) -> None: + all_shards = list(all_files(TaskConfiguration)) + sorted_shards = sorted(all_shards, key=lambda s: s.moment) + + if abs(number) >= len(sorted_shards): + raise ValueError("Argument out of range") + + selected_number = number + if selected_number >= 0: + selected_number = len(sorted_shards) - selected_number + else: + selected_number = -selected_number + + click.edit(None, filename=sorted_shards[selected_number].location["file"]) + + +@app.command() +def timesheet() -> None: + all_shards = list(all_files(BasicTimesheetConfiguration)) + sheets = sorted(extract_timesheets(all_shards), key=lambda card: card.date) + for sheet in sheets: + print(sheet.date) + print( + ",".join( + map(lambda card: f"{card.from_time},{card.to_time}", sheet.timecards) + ), + ) + + +@app.command() +def new() -> None: + streamd_directory = Settings().base_folder + + timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") + preliminary_file_name = f"{timestamp}_wip.md" + prelimary_path = os.path.join(streamd_directory, preliminary_file_name) + + content = "# " + with open(prelimary_path, "w") as file: + _ = file.write(content) + + click.edit(None, filename=prelimary_path) + + with open(prelimary_path, "r") as file: + content = file.read() + parsed_content = parse_markdown_file(prelimary_path, content) + + final_file_name = f"{timestamp}.md" + if parsed_content.shard is not None and len( + markers := parsed_content.shard.markers + ): + final_file_name = f"{timestamp} {' '.join(markers)}.md" + + final_path = os.path.join(streamd_directory, final_file_name) + _ = move(prelimary_path, final_path) + print(f"Saved as [yellow]{final_file_name}") + + +@app.callback(invoke_without_command=True) +def main(ctx: typer.Context): + if ctx.invoked_subcommand is None: + new() + + +if __name__ == "__main__": + app() diff --git a/src/streamd/localize/localize.py b/src/streamd/localize/localize.py new file mode 100644 index 0000000..bc084f4 --- /dev/null +++ b/src/streamd/localize/localize.py @@ -0,0 +1,73 @@ +from datetime import datetime + +from streamd.parse.shard import Shard, StreamFile + +from .extract_datetime import ( + extract_datetime_from_file_name, + extract_datetime_from_marker_list, +) +from .localized_shard import LocalizedShard +from .repository_configuration import RepositoryConfiguration + + +def localize_shard( + shard: Shard, + config: RepositoryConfiguration, + propagated: dict[str, str], + moment: datetime, +) -> LocalizedShard: + position = {**propagated} + private_position: dict[str, str] = {} + + adjusted_moment: datetime = extract_datetime_from_marker_list(shard.markers, moment) + + for marker in shard.markers: + if marker in config.markers: + marker_definition = config.markers[marker] + for placement in marker_definition.placements: + if placement.if_with <= set(shard.markers): + dimension = config.dimensions[placement.dimension] + + value = placement.value or marker + + if placement.overwrites or ( + placement.dimension not in position + and placement.dimension not in private_position + ): + if dimension.propagate: + position[placement.dimension] = value + else: + private_position[placement.dimension] = value + + children = [ + localize_shard(child, config, position, adjusted_moment) + for child in shard.children + ] + + position.update(private_position) + + return LocalizedShard( + markers=shard.markers, + tags=shard.tags, + start_line=shard.start_line, + end_line=shard.end_line, + location=position, + children=children, + moment=adjusted_moment, + ) + + +def localize_stream_file( + stream_file: StreamFile, config: RepositoryConfiguration +) -> LocalizedShard | None: + shard_date = extract_datetime_from_file_name(stream_file.file_name) + + if not shard_date or not stream_file.shard: + raise ValueError("Could not extract date") + + return localize_shard( + stream_file.shard, config, {"file": stream_file.file_name}, shard_date + ) + + +__all__ = ["localize_stream_file"] diff --git a/src/streamd/localize/repository_configuration.py b/src/streamd/localize/repository_configuration.py new file mode 100644 index 0000000..4b83a03 --- /dev/null +++ b/src/streamd/localize/repository_configuration.py @@ -0,0 +1,106 @@ +from __future__ import annotations + +from pydantic import BaseModel + + +class Dimension(BaseModel): + display_name: str + comment: str | None = None + propagate: bool = False + + +class MarkerPlacement(BaseModel): + if_with: set[str] = set() + dimension: str + value: str | None = None + overwrites: bool = True + + +class Marker(BaseModel): + display_name: str + placements: list[MarkerPlacement] = [] + + +class RepositoryConfiguration(BaseModel): + dimensions: dict[str, Dimension] + markers: dict[str, Marker] + + +def merge_single_dimension(base: Dimension, second: Dimension) -> Dimension: + second_fields_set: set[str] = getattr(second, "model_fields_set", set()) + + return Dimension( + display_name=second.display_name or base.display_name, + comment=base.comment if second.comment is None else second.comment, + propagate=second.propagate + if "propagate" in second_fields_set + else base.propagate, + ) + + +def merge_dimensions( + base: dict[str, Dimension], second: dict[str, Dimension] +) -> dict[str, Dimension]: + merged: dict[str, Dimension] = dict(base) + for key, second_dimension in second.items(): + if key in merged: + merged[key] = merge_single_dimension(merged[key], second_dimension) + else: + merged[key] = second_dimension + return merged + + +def _placement_identity(p: MarkerPlacement) -> tuple[frozenset[str], str]: + return (frozenset(p.if_with), p.dimension) + + +def merge_single_marker(base: Marker, second: Marker) -> Marker: + merged_display_name = second.display_name or base.display_name + + merged_placements: list[MarkerPlacement] = [] + seen: dict[tuple[frozenset[str], str], int] = {} + + for placement in base.placements: + ident = _placement_identity(placement) + seen[ident] = len(merged_placements) + merged_placements.append(placement) + + for placement in second.placements: + ident = _placement_identity(placement) + if ident in seen: + merged_placements[seen[ident]] = placement + else: + seen[ident] = len(merged_placements) + merged_placements.append(placement) + + return Marker(display_name=merged_display_name, placements=merged_placements) + + +def merge_markers( + base: dict[str, Marker], second: dict[str, Marker] +) -> dict[str, Marker]: + merged: dict[str, Marker] = dict(base) + for key, second_marker in second.items(): + if key in merged: + merged[key] = merge_single_marker(merged[key], second_marker) + else: + merged[key] = second_marker + return merged + + +def merge_repository_configuration( + base: RepositoryConfiguration, second: RepositoryConfiguration +) -> RepositoryConfiguration: + return RepositoryConfiguration( + dimensions=merge_dimensions(base.dimensions, second.dimensions), + markers=merge_markers(base.markers, second.markers), + ) + + +__all__ = [ + "Dimension", + "Marker", + "MarkerPlacement", + "RepositoryConfiguration", + "merge_repository_configuration", +] diff --git a/src/streamd/parse/extract_tag.py b/src/streamd/parse/extract_tag.py new file mode 100644 index 0000000..b7bfd45 --- /dev/null +++ b/src/streamd/parse/extract_tag.py @@ -0,0 +1,92 @@ +import re +from collections.abc import Iterable +from typing import cast + +from mistletoe.block_token import BlockToken +from mistletoe.span_token import Emphasis, Link, RawText, Strikethrough, Strong +from mistletoe.token import Token + +from .markdown_tag import Tag + + +def extract_markers_and_tags_from_single_token( + token: Token, + marker_boundary_encountered: bool, + return_at_first_marker: bool = False, +) -> tuple[list[str], list[str], bool]: + result_markers: list[str] = [] + result_tags: list[str] = [] + result_marker_boundary_encountered = marker_boundary_encountered + + if isinstance(token, Tag): + content = cast(str, token.content) + if marker_boundary_encountered: + result_tags.append(content) + else: + result_markers.append(content) + elif isinstance(token, (Emphasis, Strong, Strikethrough, Link)): + children = list(token.children or []) + markers, tags, child_marker_boundary_encountered = ( + extract_markers_and_tags_from_tokens( + children, + marker_boundary_encountered, + return_at_first_marker, + ) + ) + result_markers.extend(markers) + result_tags.extend(tags) + result_marker_boundary_encountered = ( + marker_boundary_encountered or child_marker_boundary_encountered + ) + elif isinstance(token, RawText): + content_raw = cast(str, token.content) + if not re.match(r"^[\s]*$", content_raw): + result_marker_boundary_encountered = True + else: + result_marker_boundary_encountered = True + + return result_markers, result_tags, result_marker_boundary_encountered + + +def extract_markers_and_tags_from_tokens( + tokens: Iterable[Token], + marker_boundary_encountered: bool, + return_at_first_marker: bool = False, +) -> tuple[list[str], list[str], bool]: + result_markers: list[str] = [] + result_tags: list[str] = [] + result_marker_boundary_encountered = marker_boundary_encountered + + for child in tokens: + markers, tags, child_marker_boundary_encountered = ( + extract_markers_and_tags_from_single_token( + child, result_marker_boundary_encountered, return_at_first_marker + ) + ) + result_markers.extend(markers) + result_tags.extend(tags) + result_marker_boundary_encountered = ( + marker_boundary_encountered or child_marker_boundary_encountered + ) + + if len(result_markers) > 0 and return_at_first_marker: + break + + return result_markers, result_tags, result_marker_boundary_encountered + + +def extract_markers_and_tags(block_token: BlockToken) -> tuple[list[str], list[str]]: + children = list(block_token.children or []) + markers, tags, _ = extract_markers_and_tags_from_tokens(children, False) + return markers, tags + + +def has_markers(block_token: BlockToken) -> bool: + children = list(block_token.children or []) + markers, _, _ = extract_markers_and_tags_from_tokens( + children, False, return_at_first_marker=True + ) + return len(markers) > 0 + + +__all__ = ["extract_markers_and_tags", "has_markers"] diff --git a/src/streamd/parse/markdown_tag.py b/src/streamd/parse/markdown_tag.py new file mode 100644 index 0000000..798f10e --- /dev/null +++ b/src/streamd/parse/markdown_tag.py @@ -0,0 +1,23 @@ +import re +from typing import cast + +from mistletoe.markdown_renderer import Fragment, MarkdownRenderer +from mistletoe.span_token import SpanToken + + +class Tag(SpanToken): + parse_inner: bool = False + pattern: re.Pattern[str] = re.compile(r"@([^\s*\x60~\[\]]+)") + + +class TagMarkdownRenderer(MarkdownRenderer): + def __init__(self) -> None: + super().__init__(Tag) # pyright: ignore[reportUnknownMemberType] + + def render_tag(self, token: Tag): + content = cast(str, token.content) + yield Fragment("@") + yield Fragment(content) + + +__all__ = ["Tag", "TagMarkdownRenderer"] diff --git a/src/streamd/parse/parse.py b/src/streamd/parse/parse.py new file mode 100644 index 0000000..4d14fa3 --- /dev/null +++ b/src/streamd/parse/parse.py @@ -0,0 +1,258 @@ +from collections import Counter +from typing import cast + +from mistletoe.block_token import ( + BlockToken, + Document, + Heading, + List, + ListItem, + Paragraph, +) + +from .extract_tag import extract_markers_and_tags, has_markers +from .list import split_at +from .markdown_tag import TagMarkdownRenderer +from .shard import Shard, StreamFile + + +def get_line_number(block_token: BlockToken) -> int: + return cast(int, block_token.line_number) # pyright: ignore[reportAttributeAccessIssue] + + +def build_shard( + start_line: int, + end_line: int, + markers: list[str] | None = None, + tags: list[str] | None = None, + children: list[Shard] | None = None, +) -> Shard: + markers = markers or [] + tags = tags or [] + children = children or [] + + if ( + len(children) == 1 + and len(tags) == 0 + and len(markers) == 0 + and children[0].start_line == start_line + and children[0].end_line == end_line + ): + return children[0] + + return Shard( + markers=markers, + tags=tags, + children=children, + start_line=start_line, + end_line=end_line, + ) + + +def merge_into_first_shard( + shards: list[Shard], + start_line: int, + end_line: int, + additional_tags: list[str] | None = None, +) -> Shard: + return shards[0].model_copy( + update={ + "start_line": start_line, + "end_line": end_line, + "children": shards[1:], + "tags": shards[0].tags + (additional_tags or []), + } + ) + + +def find_paragraph_shard_positions(block_tokens: list[BlockToken]) -> list[int]: + return [ + index + for index, block_token in enumerate(block_tokens) + if isinstance(block_token, Paragraph) and has_markers(block_token) + ] + + +def _heading_level(heading: Heading) -> int: + return cast(int, heading.level) + + +def find_headings_by_level( + block_tokens: list[BlockToken], header_level: int +) -> list[int]: + return [ + index + for index, block_token in enumerate(block_tokens) + if isinstance(block_token, Heading) + and _heading_level(block_token) == header_level + ] + + +def calculate_heading_level_for_next_split( + block_tokens: list[BlockToken], +) -> int | None: + """ + If there is no marker in any heading, then return None. + If only the first token is a heading with a marker, then return None. + Otherwise: Return the heading level with the lowest level (h1 < h2), of which there are two or which has a marker (and doesn't stem from first) + """ + level_of_headings_without_first_with_marker: list[int] = [ + _heading_level(token) + for token in block_tokens[1:] + if isinstance(token, Heading) and has_markers(token) + ] + + if len(level_of_headings_without_first_with_marker) == 0: + return None + + heading_level_counter: Counter[int] = Counter( + [_heading_level(token) for token in block_tokens if isinstance(token, Heading)] + ) + + return min( + [level for level, count in heading_level_counter.items() if count >= 2] + + level_of_headings_without_first_with_marker + ) + + +def parse_single_block_shards( + block_token: BlockToken, start_line: int, end_line: int +) -> tuple[Shard | None, list[str]]: + markers: list[str] = [] + tags: list[str] = [] + children: list[Shard] = [] + + if isinstance(block_token, List): + list_items: list[ListItem] = ( # pyright: ignore[reportAssignmentType] + list(block_token.children) if block_token.children is not None else [] + ) + for index, list_item in enumerate(list_items): + list_item_start_line = get_line_number(list_item) + list_item_end_line = ( + get_line_number(list_items[index + 1]) - 1 + if index + 1 < len(list_items) + else end_line + ) + list_item_shard, list_item_tags = parse_multiple_block_shards( + list_item.children, # pyright: ignore[reportArgumentType] + list_item_start_line, + list_item_end_line, + ) + if list_item_shard is not None: + children.append(list_item_shard) + tags.extend(list_item_tags) + + elif isinstance(block_token, (Paragraph, Heading)): + markers, tags = extract_markers_and_tags(block_token) + + if len(markers) == 0 and len(children) == 0: + return None, tags + + return build_shard( + start_line, end_line, markers=markers, tags=tags, children=children + ), [] + + +def parse_multiple_block_shards( + block_tokens: list[BlockToken], + start_line: int, + end_line: int, + enforce_shard: bool = False, +) -> tuple[Shard | None, list[str]]: + is_first_block_heading = isinstance(block_tokens[0], Heading) and has_markers( + block_tokens[0] + ) + + paragraph_positions = find_paragraph_shard_positions(block_tokens) + children: list[Shard] = [] + tags: list[str] = [] + + is_first_block_only_with_marker = False + + for i, token in enumerate(block_tokens): + if i in paragraph_positions: + is_first_block_only_with_marker = i == 0 + + child_start_line = get_line_number(token) + child_end_line = ( + get_line_number(block_tokens[i + 1]) - 1 + if i + 1 < len(block_tokens) + else end_line + ) + + child_shard, child_tags = parse_single_block_shards( + token, child_start_line, child_end_line + ) + + if child_shard is not None: + children.append(child_shard) + if len(child_tags) > 0: + tags.extend(child_tags) + + if len(children) == 0 and not enforce_shard: + return None, tags + if is_first_block_heading or is_first_block_only_with_marker: + return merge_into_first_shard(children, start_line, end_line, tags), [] + else: + return build_shard(start_line, end_line, tags=tags, children=children), [] + + +def parse_header_shards( + block_tokens: list[BlockToken], + start_line: int, + end_line: int, + use_first_child_as_header: bool = False, +) -> Shard | None: + if len(block_tokens) == 0: + return build_shard(start_line, end_line) + + split_at_heading_level = calculate_heading_level_for_next_split(block_tokens) + + if split_at_heading_level is None: + return parse_multiple_block_shards( + block_tokens, start_line, end_line, enforce_shard=True + )[0] + + heading_positions = find_headings_by_level(block_tokens, split_at_heading_level) + + block_tokens_split_by_heading = split_at(block_tokens, heading_positions) + + children: list[Shard] = [] + for i, child_blocks in enumerate(block_tokens_split_by_heading): + child_start_line = get_line_number(child_blocks[0]) + child_end_line = ( + get_line_number(block_tokens_split_by_heading[i + 1][0]) - 1 + if i + 1 < len(block_tokens_split_by_heading) + else end_line + ) + if child_shard := parse_header_shards( + child_blocks, + child_start_line, + child_end_line, + use_first_child_as_header=i > 0 or 0 in heading_positions, + ): + children.append(child_shard) + + if use_first_child_as_header and len(children) > 0: + return merge_into_first_shard(children, start_line, end_line) + else: + return build_shard(start_line, end_line, children=children) + + +def parse_markdown_file(file_name: str, file_content: str) -> StreamFile: + shard = build_shard(1, max([len(file_content.splitlines()), 1])) + + with TagMarkdownRenderer(): + ast = Document(file_content) + + block_tokens: list[BlockToken] = ast.children # pyright: ignore[reportAssignmentType] + if len(block_tokens) > 0: + if parsed_shard := parse_header_shards( + block_tokens, shard.start_line, shard.end_line + ): + shard = parsed_shard + + return StreamFile(shard=shard, file_name=file_name) + + +__all__ = ["Shard", "StreamFile", "parse_markdown_file"] diff --git a/src/streamd/query/find.py b/src/streamd/query/find.py new file mode 100644 index 0000000..428e05a --- /dev/null +++ b/src/streamd/query/find.py @@ -0,0 +1,36 @@ +from typing import Callable + +from streamd.localize import LocalizedShard + + +def find_shard( + shards: list[LocalizedShard], query_function: Callable[[LocalizedShard], bool] +) -> list[LocalizedShard]: + found_shards: list[LocalizedShard] = [] + + for shard in shards: + if query_function(shard): + found_shards.append(shard) + found_shards.extend(find_shard(shard.children, query_function)) + + return found_shards + + +def find_shard_by_position( + shards: list[LocalizedShard], dimension: str, value: str +) -> list[LocalizedShard]: + return find_shard( + shards, + lambda shard: ( + dimension in shard.location and shard.location[dimension] == value + ), + ) + + +def find_shard_by_set_dimension( + shards: list[LocalizedShard], dimension: str +) -> list[LocalizedShard]: + return find_shard(shards, lambda shard: dimension in shard.location) + + +__all__ = ["find_shard_by_position", "find_shard", "find_shard_by_set_dimension"] diff --git a/src/streamd/settings/__init__.py b/src/streamd/settings/__init__.py new file mode 100644 index 0000000..bdf28b4 --- /dev/null +++ b/src/streamd/settings/__init__.py @@ -0,0 +1,38 @@ +import os +from typing import ClassVar, override + +from pydantic_settings import ( + BaseSettings, + PydanticBaseSettingsSource, + SettingsConfigDict, + YamlConfigSettingsSource, +) +from xdg_base_dirs import xdg_config_home + +SETTINGS_FILE = xdg_config_home() / "streamd" / "config.yaml" + + +class Settings(BaseSettings): + model_config: ClassVar[SettingsConfigDict] = SettingsConfigDict( + env_file_encoding="utf-8" + ) + + base_folder: str = os.getcwd() + + @classmethod + @override + def settings_customise_sources( + cls, + settings_cls: type[BaseSettings], + init_settings: PydanticBaseSettingsSource, + env_settings: PydanticBaseSettingsSource, + dotenv_settings: PydanticBaseSettingsSource, + file_secret_settings: PydanticBaseSettingsSource, + ) -> tuple[PydanticBaseSettingsSource, ...]: + return ( + init_settings, + YamlConfigSettingsSource(settings_cls, yaml_file=SETTINGS_FILE), + dotenv_settings, + env_settings, + file_secret_settings, + ) diff --git a/test/localize/test_repository_configuration_merge.py b/test/localize/test_repository_configuration_merge.py index f5d345d..ba74c76 100644 --- a/test/localize/test_repository_configuration_merge.py +++ b/test/localize/test_repository_configuration_merge.py @@ -1,6 +1,6 @@ import pytest -from streamer.localize.repository_configuration import ( +from streamd.localize.repository_configuration import ( Dimension, Marker, MarkerPlacement, @@ -252,8 +252,8 @@ class TestMergeRepositoryConfiguration: ), }, markers={ - "Streamer": Marker( - display_name="Streamer", + "Streamd": Marker( + display_name="Streamd", placements=[MarkerPlacement(dimension="project")], ) }, @@ -267,8 +267,8 @@ class TestMergeRepositoryConfiguration: ), }, markers={ - "Streamer": Marker( - display_name="Streamer2", + "Streamd": Marker( + display_name="Streamd2", placements=[ MarkerPlacement( if_with={"Timesheet"}, dimension="timesheet", value="coding" @@ -291,9 +291,9 @@ class TestMergeRepositoryConfiguration: assert merged.dimensions["moment"].display_name == "Moment" assert merged.dimensions["timesheet"].display_name == "Timesheet" - assert set(merged.markers.keys()) == {"Streamer", "JobHunting"} - assert merged.markers["Streamer"].display_name == "Streamer2" - assert merged.markers["Streamer"].placements == [ + assert set(merged.markers.keys()) == {"Streamd", "JobHunting"} + assert merged.markers["Streamd"].display_name == "Streamd2" + assert merged.markers["Streamd"].placements == [ MarkerPlacement(dimension="project", value=None, if_with=set()), MarkerPlacement( if_with={"Timesheet"}, dimension="timesheet", value="coding" @@ -359,7 +359,9 @@ class TestMergeRepositoryConfiguration: ], ) def test_merge_repository_configuration_propagate_preserves_base_when_omitted( - base, second, expected_propagate + base: RepositoryConfiguration, + second: RepositoryConfiguration, + expected_propagate: bool, ): merged = merge_repository_configuration(base, second) assert merged.dimensions["d"].propagate is expected_propagate diff --git a/test/timesheet/test_extract_timesheets.py b/test/timesheet/test_extract_timesheets.py index c5a0bbe..b54befc 100644 --- a/test/timesheet/test_extract_timesheets.py +++ b/test/timesheet/test_extract_timesheets.py @@ -4,13 +4,13 @@ from datetime import datetime, time import pytest -from streamer.localize.localized_shard import LocalizedShard -from streamer.timesheet.configuration import ( +from streamd.localize.localized_shard import LocalizedShard +from streamd.timesheet.configuration import ( TIMESHEET_DIMENSION_NAME, TimesheetPointType, ) -from streamer.timesheet.extract import extract_timesheets -from streamer.timesheet.timecard import SpecialDayType, Timecard, Timesheet +from streamd.timesheet.extract import extract_timesheets +from streamd.timesheet.timecard import SpecialDayType, Timecard, Timesheet def point(at: datetime, type: TimesheetPointType) -> LocalizedShard: @@ -243,7 +243,7 @@ class TestExtractTimesheets: ] with pytest.raises(ValueError, match=r"Last Timecard of .* is not a break"): - extract_timesheets(shards) + _ = extract_timesheets(shards) def test_two_special_day_types_same_day_is_invalid(self): """ @@ -257,7 +257,7 @@ class TestExtractTimesheets: ] with pytest.raises(ValueError, match=r"is both .* and .*"): - extract_timesheets(shards) + _ = extract_timesheets(shards) def test_points_with_mixed_dates_inside_one_group_raises(self): """ @@ -273,7 +273,7 @@ class TestExtractTimesheets: ] with pytest.raises(ValueError, match=r"Last Timecard of .* is not a break"): - extract_timesheets(shards) + _ = extract_timesheets(shards) def test_day_with_only_breaks_is_ignored(self): """