feat: base task command on position instead of tags

Signed-off-by: Konstantin Fickel <mail@konstantinfickel.de>
This commit is contained in:
Konstantin Fickel 2026-01-31 20:19:00 +01:00
parent 84ad91d4c4
commit 027bf531ce
Signed by: kfickel
GPG key ID: A793722F9933C1A5
7 changed files with 61 additions and 155 deletions

View file

@ -10,34 +10,46 @@ from rich import print
from rich.markdown import Markdown
from rich.panel import Panel
from streamer.parse.attach_markdown import StreamFileWithMarkdown, attach_markdown
from streamer.parse.parse import parse_markdown_file
from streamer.query.task import find_by_markers
from streamer.localize import (
LocalizedShard,
RepositoryConfiguration,
localize_stream_file,
)
from streamer.localize.preconfigured_configurations import TaskConfiguration
from streamer.parse import parse_markdown_file
from streamer.query import find_shard_by_position
from streamer.settings import Settings
app = typer.Typer()
def all_files() -> Generator[StreamFileWithMarkdown]:
def all_files(config: RepositoryConfiguration) -> Generator[LocalizedShard]:
for file_name in glob.glob(f"{glob.escape(Settings().base_folder)}/*.md"):
with open(file_name, "r") as file:
file_content = file.read()
yield attach_markdown(
parse_markdown_file(file_name, file_content), file_content
)
if shard := localize_stream_file(
parse_markdown_file(file_name, file_content), config
):
yield shard
@app.command()
def todo() -> None:
for sharded_document in all_files():
if sharded_document.shard:
open_tasks = find_by_markers(sharded_document.shard, ["Task"], ["Done"])
all_shards = list(all_files(TaskConfiguration))
for task_shard in open_tasks:
for task_shard in find_shard_by_position(all_shards, "task", "open"):
with open(task_shard.location["file"], "r") as file:
file_content = file.read().splitlines()
print(
Panel(
Markdown(task_shard.markdown_content),
title=f"{sharded_document.file_name}:{task_shard.start_line}",
Markdown(
"\n".join(
file_content[
task_shard.start_line - 1 : task_shard.end_line
]
)
),
title=f"{task_shard.location['file']}:{task_shard.start_line}",
)
)

View file

@ -22,7 +22,7 @@ TaskConfiguration = RepositoryConfiguration(
"Task": Marker(
display_name="Task",
placements=[
MarkerPlacement(dimension="task", value="ready"),
MarkerPlacement(dimension="task", value="open"),
MarkerPlacement(if_with={"Done"}, dimension="task", value="done"),
MarkerPlacement(if_with={"Waiting"}, dimension="task", value="waiting"),
MarkerPlacement(

View file

@ -1,43 +0,0 @@
from __future__ import annotations
from pydantic import BaseModel
from streamer.parse.shard import Shard, StreamFile
class ShardWithMarkdown(BaseModel):
markers: list[str] = []
tags: list[str] = []
start_line: int
end_line: int
children: list["ShardWithMarkdown"] = []
markdown_content: str
class StreamFileWithMarkdown(BaseModel):
file_name: str
shard: ShardWithMarkdown | None = None
def attach_markdown_shard(shard: Shard, markdown_text: str) -> ShardWithMarkdown:
lines = markdown_text.splitlines()
markdown_content = "\n".join(lines[shard.start_line - 1 : shard.end_line])
return ShardWithMarkdown(
**shard.model_dump(exclude={"children"}),
children=[
attach_markdown_shard(child, markdown_text) for child in shard.children
],
markdown_content=markdown_content,
)
def attach_markdown(file: StreamFile, markdown_text: str) -> StreamFileWithMarkdown:
if file.shard is None:
return StreamFileWithMarkdown(file_name=file.file_name, shard=None)
attached_shard = attach_markdown_shard(file.shard, markdown_text)
return StreamFileWithMarkdown(file_name=file.file_name, shard=attached_shard)
__all__ = ["attach_markdown"]

View file

@ -0,0 +1,3 @@
from .find import find_shard, find_shard_by_position
__all__ = ["find_shard_by_position", "find_shard"]

View file

@ -0,0 +1,29 @@
from typing import Callable
from streamer.localize import LocalizedShard
def find_shard(
shards: list[LocalizedShard], query_function: Callable[[LocalizedShard], bool]
) -> list[LocalizedShard]:
found_shards = []
for shard in shards:
if query_function(shard):
found_shards.append(shard)
found_shards.extend(find_shard(shard.children, query_function))
return found_shards
def find_shard_by_position(
shards: list[LocalizedShard], dimension: str, value: str
) -> list[LocalizedShard]:
return find_shard(
shards,
lambda shard: dimension in shard.location
and shard.location[dimension] == value,
)
__all__ = ["find_shard_by_position", "find_shard"]

View file

@ -1,19 +0,0 @@
from typing import TypeVar
from streamer.parse.shard import Shard
T = TypeVar("T", bound="Shard")
def find_by_markers(shard: T, has: list[str], has_not: list[str]) -> list[T]:
found_shards = []
if any(tag in has for tag in shard.markers) and not any(
tag in has_not for tag in shard.markers
):
found_shards.append(shard)
for child in shard.children:
found_shards.extend(find_by_markers(child, has, has_not))
return found_shards

View file

@ -1,76 +0,0 @@
from faker import Faker
from streamer.parse import Shard, StreamFile
from streamer.parse.attach_markdown import (
ShardWithMarkdown,
StreamFileWithMarkdown,
attach_markdown,
)
fake = Faker()
class TestAttachMarkdown:
file_name: str = fake.file_name(extension="md")
def test_attach_markdown_with_shard(self):
markdown_text = "Hello World\n\nThis is a test."
shard = Shard(start_line=1, end_line=3)
stream_file = StreamFile(file_name=self.file_name, shard=shard)
result = attach_markdown(stream_file, markdown_text)
assert result == StreamFileWithMarkdown(
file_name=self.file_name,
shard=ShardWithMarkdown(
start_line=1,
end_line=3,
markdown_content=markdown_text,
markers=[],
tags=[],
children=[],
),
)
def test_attach_markdown_without_shard(self):
stream_file = StreamFile(file_name=self.file_name, shard=None)
result = attach_markdown(stream_file, "Some markdown text")
assert result == StreamFileWithMarkdown(file_name=self.file_name, shard=None)
def test_attach_markdown_with_nested_shards(self):
markdown_text = "Header\n\n@Marker1 Content 1\n\n@Marker2 Content 2"
shard = Shard(
start_line=1,
end_line=5,
children=[
Shard(markers=["Marker1"], start_line=3, end_line=3),
Shard(markers=["Marker2"], start_line=5, end_line=5),
],
)
stream_file = StreamFile(file_name=self.file_name, shard=shard)
result = attach_markdown(stream_file, markdown_text)
assert result.file_name == self.file_name
assert result.shard is not None
assert result.shard.start_line == 1
assert result.shard.end_line == 5
assert (
result.shard.markdown_content
== "Header\n\n@Marker1 Content 1\n\n@Marker2 Content 2"
)
assert len(result.shard.children) == 2
# Check first child
assert result.shard.children[0].markers == ["Marker1"]
assert result.shard.children[0].start_line == 3
assert result.shard.children[0].end_line == 3
assert result.shard.children[0].markdown_content == "@Marker1 Content 1"
# Check second child
assert result.shard.children[1].markers == ["Marker2"]
assert result.shard.children[1].start_line == 5
assert result.shard.children[1].end_line == 5
assert result.shard.children[1].markdown_content == "@Marker2 Content 2"