feat: also use header hierarchy to determine shards

Signed-off-by: Konstantin Fickel <mail@konstantinfickel.de>
This commit is contained in:
Konstantin Fickel 2025-06-20 18:23:04 +02:00
parent 695a28e715
commit 6d61d67d2e
2 changed files with 148 additions and 8 deletions

View file

@ -5,7 +5,7 @@ from pydantic import BaseModel
from mistletoe import Document from mistletoe import Document
from mistletoe.markdown_renderer import MarkdownRenderer, Fragment from mistletoe.markdown_renderer import MarkdownRenderer, Fragment
from mistletoe.span_token import SpanToken, RawText from mistletoe.span_token import SpanToken, RawText
from mistletoe.block_token import Paragraph, BlockToken from mistletoe.block_token import Paragraph, BlockToken, Heading
from mistletoe.token import Token from mistletoe.token import Token
from itertools import pairwise from itertools import pairwise
import re import re
@ -42,12 +42,8 @@ T = TypeVar("T")
def extract_tags(tokens: list[Token]) -> list[str]: def extract_tags(tokens: list[Token]) -> list[str]:
return list( tags: iter[Tag] = filter(lambda token: isinstance(token, Tag), tokens)
map( return list(map(lambda marker: marker.content, tags))
lambda marker: marker.content,
filter(lambda token: isinstance(token, Tag), tokens),
)
)
def extract_markers_and_tags(tokens: list[Token]) -> tuple[list[str], list[str]]: def extract_markers_and_tags(tokens: list[Token]) -> tuple[list[str], list[str]]:
@ -127,6 +123,85 @@ def parse_paragraph_shards(block_tokens: list[BlockToken], end_line: int) -> Sha
return to_shard(own_elements, start_line, end_line, children=child_shards) return to_shard(own_elements, start_line, end_line, children=child_shards)
def optional_max(source_array: list[int]) -> Optional[int]:
try:
return max(source_array)
except ValueError:
return None
def optional_min(source_array: list[int]) -> Optional[int]:
try:
return min(source_array)
except ValueError:
return None
def find_heading_positions(
block_tokens: list[BlockToken], header_level: int
) -> list[int]:
return [
index
for index, block_token in enumerate(block_tokens)
if isinstance(block_token, Heading) and block_token.level == header_level
]
def parse_header_shards(
block_tokens: list[BlockToken], end_line: int, ignore_first_token: bool = False
) -> Shard:
max_header_level_with_marker = optional_max(
map(
lambda heading: heading.level,
filter(
lambda block_token: isinstance(block_token, Heading)
and has_markers(block_token),
block_tokens[1:] if ignore_first_token else block_tokens,
),
)
)
if max_header_level_with_marker is None:
return parse_paragraph_shards(block_tokens, end_line)
header_level_for_slicing = optional_min(
map(
lambda heading: heading.level,
filter(
lambda block_token: isinstance(block_token, Heading),
block_tokens[1:] if ignore_first_token else block_tokens,
),
)
)
slice_positions = find_heading_positions(block_tokens, header_level_for_slicing)
is_first_slice_part_of_parent_shard = 0 not in slice_positions
sliced_by_heading_level = split_at(block_tokens, slice_positions)
own_elements = (
sliced_by_heading_level[0] if is_first_slice_part_of_parent_shard else []
)
child_elements = (
sliced_by_heading_level[1:]
if is_first_slice_part_of_parent_shard
else sliced_by_heading_level
)
child_shards = []
for i in range(len(child_elements)):
child_end_line = (
child_elements[i + 1][0].line_number - 1
if i + 1 < len(child_elements)
else end_line
)
child_shards.append(
parse_header_shards(child_elements[i], child_end_line, True)
)
return to_shard(own_elements, block_tokens[0].line_number, end_line, child_shards)
def parse_markdown_file(file_name: str, file_content: str) -> StreamFile: def parse_markdown_file(file_name: str, file_content: str) -> StreamFile:
shard = None shard = None
with TagMarkdownRenderer(): with TagMarkdownRenderer():
@ -134,7 +209,7 @@ def parse_markdown_file(file_name: str, file_content: str) -> StreamFile:
line_count = len(file_content.splitlines()) line_count = len(file_content.splitlines())
if block_tokens := ast.children: if block_tokens := ast.children:
shard = parse_paragraph_shards(block_tokens, line_count) shard = parse_header_shards(block_tokens, line_count)
return StreamFile(shard=shard, filename=file_name) return StreamFile(shard=shard, filename=file_name)

View file

@ -127,3 +127,68 @@ class TestParseProcess:
], ],
), ),
) )
def test_parse_header_without_markers(self):
file_text = "# Heading\n\n## Subheading"
assert parse_markdown_file(self.file_name, file_text) == StreamFile(
filename=self.file_name,
shard=Shard(
markers=[],
tags=[],
start_line=1,
end_line=3,
children=[],
),
)
def test_parse_split_at_headin_if_marker_on_subheading(self):
file_text = "# Heading @Tag1\n\n## @Marker1 Subheading @Tag2\n\n# Heading @Tag3"
assert parse_markdown_file(self.file_name, file_text) == StreamFile(
filename=self.file_name,
shard=Shard(
markers=[],
tags=[],
start_line=1,
end_line=5,
children=[
Shard(
markers=[],
tags=["Tag1"],
start_line=1,
end_line=4,
children=[
Shard(
markers=["Marker1"],
tags=["Tag2"],
start_line=3,
end_line=4,
children=[],
)
],
),
Shard(
markers=[],
tags=["Tag3"],
start_line=5,
end_line=5,
children=[],
),
],
),
)
def test_parse_only_parse_releveant_levels(self):
file_text = "# @Marker1 Heading @Tag1\n\n## Subheading @Tag2"
assert parse_markdown_file(self.file_name, file_text) == StreamFile(
filename=self.file_name,
shard=Shard(
markers=["Marker1"],
tags=["Tag1", "Tag2"],
start_line=1,
end_line=3,
children=[],
),
)