fix: resolve all basedpyright warnings

- Use collections.abc.Generator/Iterable instead of deprecated typing imports
- Replace Optional with union syntax (X | None)
- Add explicit type annotations to eliminate reportUnknownVariableType
- Use typing.cast for untyped mistletoe attributes (content, level, line_number)
- Replace mutable default arguments with None defaults (reportCallInDefaultInitializer)
- Add ClassVar annotation for model_config (reportIncompatibleVariableOverride)
- Add @override decorator for settings_customise_sources (reportImplicitOverride)
- Annotate class attributes in Tag (reportUnannotatedClassAttribute)
- Add parameter type annotations in test (reportMissingParameterType)
- Assign unused call result to _ (reportUnusedCallResult)
This commit is contained in:
Konstantin Fickel 2026-02-15 17:10:09 +01:00
parent 1e203d9db3
commit 49cd9bcfa0
Signed by: kfickel
GPG key ID: A793722F9933C1A5
10 changed files with 770 additions and 16 deletions

126
src/streamd/__init__.py Normal file
View file

@ -0,0 +1,126 @@
import glob
import os
from collections.abc import Generator
from datetime import datetime
from shutil import move
from typing import Annotated
import click
import typer
from rich import print
from rich.markdown import Markdown
from rich.panel import Panel
from streamd.localize import (
LocalizedShard,
RepositoryConfiguration,
localize_stream_file,
)
from streamd.localize.preconfigured_configurations import TaskConfiguration
from streamd.parse import parse_markdown_file
from streamd.query import find_shard_by_position
from streamd.settings import Settings
from streamd.timesheet.configuration import BasicTimesheetConfiguration
from streamd.timesheet.extract import extract_timesheets
app = typer.Typer()
def all_files(config: RepositoryConfiguration) -> Generator[LocalizedShard]:
for file_name in glob.glob(f"{glob.escape(Settings().base_folder)}/*.md"):
with open(file_name, "r") as file:
file_content = file.read()
if shard := localize_stream_file(
parse_markdown_file(file_name, file_content), config
):
yield shard
@app.command()
def todo() -> None:
all_shards = list(all_files(TaskConfiguration))
for task_shard in find_shard_by_position(all_shards, "task", "open"):
with open(task_shard.location["file"], "r") as file:
file_content = file.read().splitlines()
print(
Panel(
Markdown(
"\n".join(
file_content[
task_shard.start_line - 1 : task_shard.end_line
]
)
),
title=f"{task_shard.location['file']}:{task_shard.start_line}",
)
)
@app.command()
def edit(number: Annotated[int, typer.Argument()] = 1) -> None:
all_shards = list(all_files(TaskConfiguration))
sorted_shards = sorted(all_shards, key=lambda s: s.moment)
if abs(number) >= len(sorted_shards):
raise ValueError("Argument out of range")
selected_number = number
if selected_number >= 0:
selected_number = len(sorted_shards) - selected_number
else:
selected_number = -selected_number
click.edit(None, filename=sorted_shards[selected_number].location["file"])
@app.command()
def timesheet() -> None:
all_shards = list(all_files(BasicTimesheetConfiguration))
sheets = sorted(extract_timesheets(all_shards), key=lambda card: card.date)
for sheet in sheets:
print(sheet.date)
print(
",".join(
map(lambda card: f"{card.from_time},{card.to_time}", sheet.timecards)
),
)
@app.command()
def new() -> None:
streamd_directory = Settings().base_folder
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
preliminary_file_name = f"{timestamp}_wip.md"
prelimary_path = os.path.join(streamd_directory, preliminary_file_name)
content = "# "
with open(prelimary_path, "w") as file:
_ = file.write(content)
click.edit(None, filename=prelimary_path)
with open(prelimary_path, "r") as file:
content = file.read()
parsed_content = parse_markdown_file(prelimary_path, content)
final_file_name = f"{timestamp}.md"
if parsed_content.shard is not None and len(
markers := parsed_content.shard.markers
):
final_file_name = f"{timestamp} {' '.join(markers)}.md"
final_path = os.path.join(streamd_directory, final_file_name)
_ = move(prelimary_path, final_path)
print(f"Saved as [yellow]{final_file_name}")
@app.callback(invoke_without_command=True)
def main(ctx: typer.Context):
if ctx.invoked_subcommand is None:
new()
if __name__ == "__main__":
app()

View file

@ -0,0 +1,73 @@
from datetime import datetime
from streamd.parse.shard import Shard, StreamFile
from .extract_datetime import (
extract_datetime_from_file_name,
extract_datetime_from_marker_list,
)
from .localized_shard import LocalizedShard
from .repository_configuration import RepositoryConfiguration
def localize_shard(
shard: Shard,
config: RepositoryConfiguration,
propagated: dict[str, str],
moment: datetime,
) -> LocalizedShard:
position = {**propagated}
private_position: dict[str, str] = {}
adjusted_moment: datetime = extract_datetime_from_marker_list(shard.markers, moment)
for marker in shard.markers:
if marker in config.markers:
marker_definition = config.markers[marker]
for placement in marker_definition.placements:
if placement.if_with <= set(shard.markers):
dimension = config.dimensions[placement.dimension]
value = placement.value or marker
if placement.overwrites or (
placement.dimension not in position
and placement.dimension not in private_position
):
if dimension.propagate:
position[placement.dimension] = value
else:
private_position[placement.dimension] = value
children = [
localize_shard(child, config, position, adjusted_moment)
for child in shard.children
]
position.update(private_position)
return LocalizedShard(
markers=shard.markers,
tags=shard.tags,
start_line=shard.start_line,
end_line=shard.end_line,
location=position,
children=children,
moment=adjusted_moment,
)
def localize_stream_file(
stream_file: StreamFile, config: RepositoryConfiguration
) -> LocalizedShard | None:
shard_date = extract_datetime_from_file_name(stream_file.file_name)
if not shard_date or not stream_file.shard:
raise ValueError("Could not extract date")
return localize_shard(
stream_file.shard, config, {"file": stream_file.file_name}, shard_date
)
__all__ = ["localize_stream_file"]

View file

@ -0,0 +1,106 @@
from __future__ import annotations
from pydantic import BaseModel
class Dimension(BaseModel):
display_name: str
comment: str | None = None
propagate: bool = False
class MarkerPlacement(BaseModel):
if_with: set[str] = set()
dimension: str
value: str | None = None
overwrites: bool = True
class Marker(BaseModel):
display_name: str
placements: list[MarkerPlacement] = []
class RepositoryConfiguration(BaseModel):
dimensions: dict[str, Dimension]
markers: dict[str, Marker]
def merge_single_dimension(base: Dimension, second: Dimension) -> Dimension:
second_fields_set: set[str] = getattr(second, "model_fields_set", set())
return Dimension(
display_name=second.display_name or base.display_name,
comment=base.comment if second.comment is None else second.comment,
propagate=second.propagate
if "propagate" in second_fields_set
else base.propagate,
)
def merge_dimensions(
base: dict[str, Dimension], second: dict[str, Dimension]
) -> dict[str, Dimension]:
merged: dict[str, Dimension] = dict(base)
for key, second_dimension in second.items():
if key in merged:
merged[key] = merge_single_dimension(merged[key], second_dimension)
else:
merged[key] = second_dimension
return merged
def _placement_identity(p: MarkerPlacement) -> tuple[frozenset[str], str]:
return (frozenset(p.if_with), p.dimension)
def merge_single_marker(base: Marker, second: Marker) -> Marker:
merged_display_name = second.display_name or base.display_name
merged_placements: list[MarkerPlacement] = []
seen: dict[tuple[frozenset[str], str], int] = {}
for placement in base.placements:
ident = _placement_identity(placement)
seen[ident] = len(merged_placements)
merged_placements.append(placement)
for placement in second.placements:
ident = _placement_identity(placement)
if ident in seen:
merged_placements[seen[ident]] = placement
else:
seen[ident] = len(merged_placements)
merged_placements.append(placement)
return Marker(display_name=merged_display_name, placements=merged_placements)
def merge_markers(
base: dict[str, Marker], second: dict[str, Marker]
) -> dict[str, Marker]:
merged: dict[str, Marker] = dict(base)
for key, second_marker in second.items():
if key in merged:
merged[key] = merge_single_marker(merged[key], second_marker)
else:
merged[key] = second_marker
return merged
def merge_repository_configuration(
base: RepositoryConfiguration, second: RepositoryConfiguration
) -> RepositoryConfiguration:
return RepositoryConfiguration(
dimensions=merge_dimensions(base.dimensions, second.dimensions),
markers=merge_markers(base.markers, second.markers),
)
__all__ = [
"Dimension",
"Marker",
"MarkerPlacement",
"RepositoryConfiguration",
"merge_repository_configuration",
]

View file

@ -0,0 +1,92 @@
import re
from collections.abc import Iterable
from typing import cast
from mistletoe.block_token import BlockToken
from mistletoe.span_token import Emphasis, Link, RawText, Strikethrough, Strong
from mistletoe.token import Token
from .markdown_tag import Tag
def extract_markers_and_tags_from_single_token(
token: Token,
marker_boundary_encountered: bool,
return_at_first_marker: bool = False,
) -> tuple[list[str], list[str], bool]:
result_markers: list[str] = []
result_tags: list[str] = []
result_marker_boundary_encountered = marker_boundary_encountered
if isinstance(token, Tag):
content = cast(str, token.content)
if marker_boundary_encountered:
result_tags.append(content)
else:
result_markers.append(content)
elif isinstance(token, (Emphasis, Strong, Strikethrough, Link)):
children = list(token.children or [])
markers, tags, child_marker_boundary_encountered = (
extract_markers_and_tags_from_tokens(
children,
marker_boundary_encountered,
return_at_first_marker,
)
)
result_markers.extend(markers)
result_tags.extend(tags)
result_marker_boundary_encountered = (
marker_boundary_encountered or child_marker_boundary_encountered
)
elif isinstance(token, RawText):
content_raw = cast(str, token.content)
if not re.match(r"^[\s]*$", content_raw):
result_marker_boundary_encountered = True
else:
result_marker_boundary_encountered = True
return result_markers, result_tags, result_marker_boundary_encountered
def extract_markers_and_tags_from_tokens(
tokens: Iterable[Token],
marker_boundary_encountered: bool,
return_at_first_marker: bool = False,
) -> tuple[list[str], list[str], bool]:
result_markers: list[str] = []
result_tags: list[str] = []
result_marker_boundary_encountered = marker_boundary_encountered
for child in tokens:
markers, tags, child_marker_boundary_encountered = (
extract_markers_and_tags_from_single_token(
child, result_marker_boundary_encountered, return_at_first_marker
)
)
result_markers.extend(markers)
result_tags.extend(tags)
result_marker_boundary_encountered = (
marker_boundary_encountered or child_marker_boundary_encountered
)
if len(result_markers) > 0 and return_at_first_marker:
break
return result_markers, result_tags, result_marker_boundary_encountered
def extract_markers_and_tags(block_token: BlockToken) -> tuple[list[str], list[str]]:
children = list(block_token.children or [])
markers, tags, _ = extract_markers_and_tags_from_tokens(children, False)
return markers, tags
def has_markers(block_token: BlockToken) -> bool:
children = list(block_token.children or [])
markers, _, _ = extract_markers_and_tags_from_tokens(
children, False, return_at_first_marker=True
)
return len(markers) > 0
__all__ = ["extract_markers_and_tags", "has_markers"]

View file

@ -0,0 +1,23 @@
import re
from typing import cast
from mistletoe.markdown_renderer import Fragment, MarkdownRenderer
from mistletoe.span_token import SpanToken
class Tag(SpanToken):
parse_inner: bool = False
pattern: re.Pattern[str] = re.compile(r"@([^\s*\x60~\[\]]+)")
class TagMarkdownRenderer(MarkdownRenderer):
def __init__(self) -> None:
super().__init__(Tag) # pyright: ignore[reportUnknownMemberType]
def render_tag(self, token: Tag):
content = cast(str, token.content)
yield Fragment("@")
yield Fragment(content)
__all__ = ["Tag", "TagMarkdownRenderer"]

258
src/streamd/parse/parse.py Normal file
View file

@ -0,0 +1,258 @@
from collections import Counter
from typing import cast
from mistletoe.block_token import (
BlockToken,
Document,
Heading,
List,
ListItem,
Paragraph,
)
from .extract_tag import extract_markers_and_tags, has_markers
from .list import split_at
from .markdown_tag import TagMarkdownRenderer
from .shard import Shard, StreamFile
def get_line_number(block_token: BlockToken) -> int:
return cast(int, block_token.line_number) # pyright: ignore[reportAttributeAccessIssue]
def build_shard(
start_line: int,
end_line: int,
markers: list[str] | None = None,
tags: list[str] | None = None,
children: list[Shard] | None = None,
) -> Shard:
markers = markers or []
tags = tags or []
children = children or []
if (
len(children) == 1
and len(tags) == 0
and len(markers) == 0
and children[0].start_line == start_line
and children[0].end_line == end_line
):
return children[0]
return Shard(
markers=markers,
tags=tags,
children=children,
start_line=start_line,
end_line=end_line,
)
def merge_into_first_shard(
shards: list[Shard],
start_line: int,
end_line: int,
additional_tags: list[str] | None = None,
) -> Shard:
return shards[0].model_copy(
update={
"start_line": start_line,
"end_line": end_line,
"children": shards[1:],
"tags": shards[0].tags + (additional_tags or []),
}
)
def find_paragraph_shard_positions(block_tokens: list[BlockToken]) -> list[int]:
return [
index
for index, block_token in enumerate(block_tokens)
if isinstance(block_token, Paragraph) and has_markers(block_token)
]
def _heading_level(heading: Heading) -> int:
return cast(int, heading.level)
def find_headings_by_level(
block_tokens: list[BlockToken], header_level: int
) -> list[int]:
return [
index
for index, block_token in enumerate(block_tokens)
if isinstance(block_token, Heading)
and _heading_level(block_token) == header_level
]
def calculate_heading_level_for_next_split(
block_tokens: list[BlockToken],
) -> int | None:
"""
If there is no marker in any heading, then return None.
If only the first token is a heading with a marker, then return None.
Otherwise: Return the heading level with the lowest level (h1 < h2), of which there are two or which has a marker (and doesn't stem from first)
"""
level_of_headings_without_first_with_marker: list[int] = [
_heading_level(token)
for token in block_tokens[1:]
if isinstance(token, Heading) and has_markers(token)
]
if len(level_of_headings_without_first_with_marker) == 0:
return None
heading_level_counter: Counter[int] = Counter(
[_heading_level(token) for token in block_tokens if isinstance(token, Heading)]
)
return min(
[level for level, count in heading_level_counter.items() if count >= 2]
+ level_of_headings_without_first_with_marker
)
def parse_single_block_shards(
block_token: BlockToken, start_line: int, end_line: int
) -> tuple[Shard | None, list[str]]:
markers: list[str] = []
tags: list[str] = []
children: list[Shard] = []
if isinstance(block_token, List):
list_items: list[ListItem] = ( # pyright: ignore[reportAssignmentType]
list(block_token.children) if block_token.children is not None else []
)
for index, list_item in enumerate(list_items):
list_item_start_line = get_line_number(list_item)
list_item_end_line = (
get_line_number(list_items[index + 1]) - 1
if index + 1 < len(list_items)
else end_line
)
list_item_shard, list_item_tags = parse_multiple_block_shards(
list_item.children, # pyright: ignore[reportArgumentType]
list_item_start_line,
list_item_end_line,
)
if list_item_shard is not None:
children.append(list_item_shard)
tags.extend(list_item_tags)
elif isinstance(block_token, (Paragraph, Heading)):
markers, tags = extract_markers_and_tags(block_token)
if len(markers) == 0 and len(children) == 0:
return None, tags
return build_shard(
start_line, end_line, markers=markers, tags=tags, children=children
), []
def parse_multiple_block_shards(
block_tokens: list[BlockToken],
start_line: int,
end_line: int,
enforce_shard: bool = False,
) -> tuple[Shard | None, list[str]]:
is_first_block_heading = isinstance(block_tokens[0], Heading) and has_markers(
block_tokens[0]
)
paragraph_positions = find_paragraph_shard_positions(block_tokens)
children: list[Shard] = []
tags: list[str] = []
is_first_block_only_with_marker = False
for i, token in enumerate(block_tokens):
if i in paragraph_positions:
is_first_block_only_with_marker = i == 0
child_start_line = get_line_number(token)
child_end_line = (
get_line_number(block_tokens[i + 1]) - 1
if i + 1 < len(block_tokens)
else end_line
)
child_shard, child_tags = parse_single_block_shards(
token, child_start_line, child_end_line
)
if child_shard is not None:
children.append(child_shard)
if len(child_tags) > 0:
tags.extend(child_tags)
if len(children) == 0 and not enforce_shard:
return None, tags
if is_first_block_heading or is_first_block_only_with_marker:
return merge_into_first_shard(children, start_line, end_line, tags), []
else:
return build_shard(start_line, end_line, tags=tags, children=children), []
def parse_header_shards(
block_tokens: list[BlockToken],
start_line: int,
end_line: int,
use_first_child_as_header: bool = False,
) -> Shard | None:
if len(block_tokens) == 0:
return build_shard(start_line, end_line)
split_at_heading_level = calculate_heading_level_for_next_split(block_tokens)
if split_at_heading_level is None:
return parse_multiple_block_shards(
block_tokens, start_line, end_line, enforce_shard=True
)[0]
heading_positions = find_headings_by_level(block_tokens, split_at_heading_level)
block_tokens_split_by_heading = split_at(block_tokens, heading_positions)
children: list[Shard] = []
for i, child_blocks in enumerate(block_tokens_split_by_heading):
child_start_line = get_line_number(child_blocks[0])
child_end_line = (
get_line_number(block_tokens_split_by_heading[i + 1][0]) - 1
if i + 1 < len(block_tokens_split_by_heading)
else end_line
)
if child_shard := parse_header_shards(
child_blocks,
child_start_line,
child_end_line,
use_first_child_as_header=i > 0 or 0 in heading_positions,
):
children.append(child_shard)
if use_first_child_as_header and len(children) > 0:
return merge_into_first_shard(children, start_line, end_line)
else:
return build_shard(start_line, end_line, children=children)
def parse_markdown_file(file_name: str, file_content: str) -> StreamFile:
shard = build_shard(1, max([len(file_content.splitlines()), 1]))
with TagMarkdownRenderer():
ast = Document(file_content)
block_tokens: list[BlockToken] = ast.children # pyright: ignore[reportAssignmentType]
if len(block_tokens) > 0:
if parsed_shard := parse_header_shards(
block_tokens, shard.start_line, shard.end_line
):
shard = parsed_shard
return StreamFile(shard=shard, file_name=file_name)
__all__ = ["Shard", "StreamFile", "parse_markdown_file"]

36
src/streamd/query/find.py Normal file
View file

@ -0,0 +1,36 @@
from typing import Callable
from streamd.localize import LocalizedShard
def find_shard(
shards: list[LocalizedShard], query_function: Callable[[LocalizedShard], bool]
) -> list[LocalizedShard]:
found_shards: list[LocalizedShard] = []
for shard in shards:
if query_function(shard):
found_shards.append(shard)
found_shards.extend(find_shard(shard.children, query_function))
return found_shards
def find_shard_by_position(
shards: list[LocalizedShard], dimension: str, value: str
) -> list[LocalizedShard]:
return find_shard(
shards,
lambda shard: (
dimension in shard.location and shard.location[dimension] == value
),
)
def find_shard_by_set_dimension(
shards: list[LocalizedShard], dimension: str
) -> list[LocalizedShard]:
return find_shard(shards, lambda shard: dimension in shard.location)
__all__ = ["find_shard_by_position", "find_shard", "find_shard_by_set_dimension"]

View file

@ -0,0 +1,38 @@
import os
from typing import ClassVar, override
from pydantic_settings import (
BaseSettings,
PydanticBaseSettingsSource,
SettingsConfigDict,
YamlConfigSettingsSource,
)
from xdg_base_dirs import xdg_config_home
SETTINGS_FILE = xdg_config_home() / "streamd" / "config.yaml"
class Settings(BaseSettings):
model_config: ClassVar[SettingsConfigDict] = SettingsConfigDict(
env_file_encoding="utf-8"
)
base_folder: str = os.getcwd()
@classmethod
@override
def settings_customise_sources(
cls,
settings_cls: type[BaseSettings],
init_settings: PydanticBaseSettingsSource,
env_settings: PydanticBaseSettingsSource,
dotenv_settings: PydanticBaseSettingsSource,
file_secret_settings: PydanticBaseSettingsSource,
) -> tuple[PydanticBaseSettingsSource, ...]:
return (
init_settings,
YamlConfigSettingsSource(settings_cls, yaml_file=SETTINGS_FILE),
dotenv_settings,
env_settings,
file_secret_settings,
)

View file

@ -1,6 +1,6 @@
import pytest
from streamer.localize.repository_configuration import (
from streamd.localize.repository_configuration import (
Dimension,
Marker,
MarkerPlacement,
@ -252,8 +252,8 @@ class TestMergeRepositoryConfiguration:
),
},
markers={
"Streamer": Marker(
display_name="Streamer",
"Streamd": Marker(
display_name="Streamd",
placements=[MarkerPlacement(dimension="project")],
)
},
@ -267,8 +267,8 @@ class TestMergeRepositoryConfiguration:
),
},
markers={
"Streamer": Marker(
display_name="Streamer2",
"Streamd": Marker(
display_name="Streamd2",
placements=[
MarkerPlacement(
if_with={"Timesheet"}, dimension="timesheet", value="coding"
@ -291,9 +291,9 @@ class TestMergeRepositoryConfiguration:
assert merged.dimensions["moment"].display_name == "Moment"
assert merged.dimensions["timesheet"].display_name == "Timesheet"
assert set(merged.markers.keys()) == {"Streamer", "JobHunting"}
assert merged.markers["Streamer"].display_name == "Streamer2"
assert merged.markers["Streamer"].placements == [
assert set(merged.markers.keys()) == {"Streamd", "JobHunting"}
assert merged.markers["Streamd"].display_name == "Streamd2"
assert merged.markers["Streamd"].placements == [
MarkerPlacement(dimension="project", value=None, if_with=set()),
MarkerPlacement(
if_with={"Timesheet"}, dimension="timesheet", value="coding"
@ -359,7 +359,9 @@ class TestMergeRepositoryConfiguration:
],
)
def test_merge_repository_configuration_propagate_preserves_base_when_omitted(
base, second, expected_propagate
base: RepositoryConfiguration,
second: RepositoryConfiguration,
expected_propagate: bool,
):
merged = merge_repository_configuration(base, second)
assert merged.dimensions["d"].propagate is expected_propagate

View file

@ -4,13 +4,13 @@ from datetime import datetime, time
import pytest
from streamer.localize.localized_shard import LocalizedShard
from streamer.timesheet.configuration import (
from streamd.localize.localized_shard import LocalizedShard
from streamd.timesheet.configuration import (
TIMESHEET_DIMENSION_NAME,
TimesheetPointType,
)
from streamer.timesheet.extract import extract_timesheets
from streamer.timesheet.timecard import SpecialDayType, Timecard, Timesheet
from streamd.timesheet.extract import extract_timesheets
from streamd.timesheet.timecard import SpecialDayType, Timecard, Timesheet
def point(at: datetime, type: TimesheetPointType) -> LocalizedShard:
@ -243,7 +243,7 @@ class TestExtractTimesheets:
]
with pytest.raises(ValueError, match=r"Last Timecard of .* is not a break"):
extract_timesheets(shards)
_ = extract_timesheets(shards)
def test_two_special_day_types_same_day_is_invalid(self):
"""
@ -257,7 +257,7 @@ class TestExtractTimesheets:
]
with pytest.raises(ValueError, match=r"is both .* and .*"):
extract_timesheets(shards)
_ = extract_timesheets(shards)
def test_points_with_mixed_dates_inside_one_group_raises(self):
"""
@ -273,7 +273,7 @@ class TestExtractTimesheets:
]
with pytest.raises(ValueError, match=r"Last Timecard of .* is not a break"):
extract_timesheets(shards)
_ = extract_timesheets(shards)
def test_day_with_only_breaks_is_ignored(self):
"""