feat: fix heading & paragraph parsing

Signed-off-by: Konstantin Fickel <mail@konstantinfickel.de>
This commit is contained in:
Konstantin Fickel 2025-06-21 16:02:16 +02:00
parent de99ffe83e
commit 0f645e7e9b
2 changed files with 228 additions and 202 deletions

View file

@ -3,11 +3,12 @@ from itertools import takewhile, dropwhile
from typing import Optional, TypeVar from typing import Optional, TypeVar
from pydantic import BaseModel from pydantic import BaseModel
from mistletoe import Document from mistletoe import Document
from mistletoe.markdown_renderer import MarkdownRenderer, Fragment, BlankLine from mistletoe.markdown_renderer import MarkdownRenderer, Fragment
from mistletoe.span_token import SpanToken, RawText from mistletoe.span_token import SpanToken, RawText
from mistletoe.block_token import Paragraph, BlockToken, Heading from mistletoe.block_token import Paragraph, BlockToken, Heading
from mistletoe.token import Token from mistletoe.token import Token
from itertools import pairwise from itertools import pairwise
from collections import Counter
import re import re
@ -46,28 +47,28 @@ def extract_tags(tokens: list[Token]) -> list[str]:
return list(map(lambda marker: marker.content, tags)) return list(map(lambda marker: marker.content, tags))
def extract_markers_and_tags(tokens: list[Token]) -> tuple[list[str], list[str]]: def extract_markers_and_tags(tokens: BlockToken) -> tuple[list[str], list[str]]:
def marker_boundary_check(token: Token): def marker_boundary_check(token: Token):
return isinstance(token, Tag) or ( return isinstance(token, Tag) or (
isinstance(token, RawText) and re.match(r"^[\s]*$", token.content) isinstance(token, RawText) and re.match(r"^[\s]*$", token.content)
) )
marker_region = takewhile(marker_boundary_check, tokens[0].children) marker_region = takewhile(marker_boundary_check, tokens.children)
tag_region_first = dropwhile(marker_boundary_check, tokens[0].children) tag_region = dropwhile(marker_boundary_check, tokens.children)
tags: list[str] = extract_tags(tag_region_first) return extract_tags(marker_region), extract_tags(tag_region)
for token in tokens[1:]:
tags.extend(extract_tags(token.children))
return extract_tags(marker_region), tags
def has_markers(token: Token) -> bool: def has_markers(block_token: BlockToken) -> bool:
markers, _ = extract_markers_and_tags([token]) for child in block_token.children:
return len(markers) > 0 if isinstance(child, Tag):
return True
if isinstance(child, RawText) and re.match(r"^[\s]*$", child.content):
continue
return False
def find_shard_positions(block_tokens: list[BlockToken]) -> list[int]: def find_paragraph_shard_positions(block_tokens: list[BlockToken]) -> list[int]:
return [ return [
index index
for index, block_token in enumerate(block_tokens) for index, block_token in enumerate(block_tokens)
@ -84,80 +85,7 @@ def split_at(list_to_be_split: list[A], positions: list[int]):
return [list_to_be_split[left:right] for left, right in pairwise(positions)] return [list_to_be_split[left:right] for left, right in pairwise(positions)]
def to_shard( def find_headings_by_level(
tokens: list[Token], start_line: int, end_line: int, children: list[Shard] = []
) -> Shard:
if len(children) == 1 and len(tokens) == 0:
return children[0]
markers, tags = extract_markers_and_tags(tokens) if len(tokens) > 0 else ([], [])
return Shard(
markers=markers,
tags=tags,
start_line=start_line,
end_line=end_line,
children=children,
)
def append_children(shard: Shard, new_children: list[Shard] = []) -> Shard:
shard_children = shard.children if len(shard.children) > 0 else []
new_shard_children = shard_children + new_children
if (
len(new_shard_children) == 1
and len(shard.markers) == 0
and len(shard.tags) == 0
):
return new_shard_children[0]
return Shard(
markers=shard.markers,
tags=shard.tags,
start_line=shard.start_line,
end_line=shard.end_line,
children=shard_children + new_children,
)
def parse_paragraph_shards(block_tokens: list[BlockToken], end_line: int) -> Shard:
start_line = block_tokens[0].line_number
shard_starts = find_shard_positions(block_tokens)
child_shards: list[Shard] = []
own_elements: list[BlockToken] = []
for i in range(len(block_tokens)):
token = block_tokens[i]
if i in shard_starts:
shard_end_line = (
block_tokens[i + 1].line_number - 1
if i + 1 < len(block_tokens)
else end_line
)
child_shards.append(to_shard([token], token.line_number, shard_end_line))
else:
own_elements.append(token)
return to_shard(own_elements, start_line, end_line, children=child_shards)
def optional_max(source_array: list[int]) -> Optional[int]:
try:
return max(source_array)
except ValueError:
return None
def optional_min(source_array: list[int]) -> Optional[int]:
try:
return min(source_array)
except ValueError:
return None
def find_heading_positions(
block_tokens: list[BlockToken], header_level: int block_tokens: list[BlockToken], header_level: int
) -> list[int]: ) -> list[int]:
return [ return [
@ -167,89 +95,143 @@ def find_heading_positions(
] ]
def calculate_heading_level_for_next_split(
block_tokens: list[BlockToken],
) -> Optional[int]:
"""
If there is no marker in any heading, then return None.
If only the first token is a heading with a marker, then return None.
Otherwise: Return the heading level with the lowest level (h1 < h2), of which there are two or which has a marker (and doesn't stem from first)
"""
level_of_headings_without_first_with_marker = [
token.level
for token in block_tokens[1:]
if isinstance(token, Heading) and has_markers(token)
]
if len(level_of_headings_without_first_with_marker) == 0:
return None
heading_level_counter = Counter(
[token.level for token in block_tokens if isinstance(token, Heading)]
)
return min(
[level for level, count in heading_level_counter.items() if count >= 2]
+ level_of_headings_without_first_with_marker
)
def merge_into_first_shard(
shards: list[Shard], start_line: int, end_line: int, additional_tags: list[str] = []
):
return shards[0].model_copy(
update={
"start_line": start_line,
"end_line": end_line,
"children": shards[1:],
"tags": shards[0].tags + additional_tags,
}
)
def parse_single_block_shards(block_token: BlockToken, start_line: int, end_line: int):
markers, tags = extract_markers_and_tags(block_token)
return Shard(start_line=start_line, end_line=end_line, markers=markers, tags=tags)
def parse_paragraph_shards(
block_tokens: list[BlockToken], start_line: int, end_line: int
) -> Optional[Shard]:
is_first_block_heading = isinstance(block_tokens[0], Heading) and has_markers(
block_tokens[0]
)
paragraph_positions = find_paragraph_shard_positions(block_tokens)
children = []
added_tags = []
is_first_block_only_with_marker = False
for i, token in enumerate(block_tokens):
if i in paragraph_positions:
is_first_block_heading = i == 0
if i in paragraph_positions or (i == 0 and is_first_block_heading):
child_start_line = token.line_number
child_end_line = (
block_tokens[i + 1].line_number - 1
if i + 1 < len(block_tokens)
else end_line
)
children.append(
parse_single_block_shards(token, child_start_line, child_end_line)
)
elif token.children:
added_tags.extend(extract_tags(token.children))
if len(children) == 0 and len(added_tags) == 0:
return None
if is_first_block_heading or is_first_block_only_with_marker:
return merge_into_first_shard(children, start_line, end_line, added_tags)
else:
return Shard(
start_line=start_line, end_line=end_line, children=children, tags=added_tags
)
def parse_header_shards( def parse_header_shards(
block_tokens: list[BlockToken], block_tokens: list[BlockToken],
start_line: int, start_line: int,
end_line: int, end_line: int,
first_token_is_header: bool = False, use_first_child_as_header: bool = False,
) -> Shard: ) -> Optional[Shard]:
if len(block_tokens) == 0: if len(block_tokens) == 0:
return Shard(start_line=start_line, end_line=end_line) return Shard(start_line=start_line, end_line=end_line)
max_header_level_with_marker = optional_max( split_at_heading_level = calculate_heading_level_for_next_split(block_tokens)
map(
lambda heading: heading.level,
filter(
lambda block_token: isinstance(block_token, Heading)
and has_markers(block_token),
block_tokens[1:] if first_token_is_header else block_tokens,
),
)
)
if max_header_level_with_marker is None: if split_at_heading_level is None:
return parse_paragraph_shards(block_tokens, end_line) return parse_paragraph_shards(block_tokens, start_line, end_line)
header_level_for_slicing = optional_min( heading_positions = find_headings_by_level(block_tokens, split_at_heading_level)
map(
lambda heading: heading.level,
filter(
lambda block_token: isinstance(block_token, Heading),
block_tokens[1:] if first_token_is_header else block_tokens,
),
)
)
slice_positions = find_heading_positions(block_tokens, header_level_for_slicing) block_tokens_split_by_heading = split_at(block_tokens, heading_positions)
if first_token_is_header:
for end_of_header in range(0, len(block_tokens) - 1):
if not isinstance(block_tokens[end_of_header + 1], BlankLine):
continue
slice_positions.append(end_of_header)
is_first_slice_part_of_parent_shard = 0 not in slice_positions
sliced_by_heading_level = split_at(block_tokens, slice_positions) children = []
for i, child_blocks in enumerate(block_tokens_split_by_heading):
own_elements = ( child_start_line = child_blocks[0].line_number
sliced_by_heading_level[0] if is_first_slice_part_of_parent_shard else []
)
child_elements = (
sliced_by_heading_level[1:]
if is_first_slice_part_of_parent_shard
else sliced_by_heading_level
)
child_shards = []
for i in range(len(child_elements)):
child_start_line = child_elements[i][0].line_number
child_end_line = ( child_end_line = (
child_elements[i + 1][0].line_number - 1 block_tokens_split_by_heading[i + 1][0].line_number - 1
if i + 1 < len(child_elements) if i + 1 < len(block_tokens_split_by_heading)
else end_line else end_line
) )
child_shards.append( if child_shard := parse_header_shards(
parse_header_shards( child_blocks,
child_elements[i],
child_start_line, child_start_line,
child_end_line, child_end_line,
first_token_is_header=True, use_first_child_as_header=i > 0 or 0 in heading_positions,
) ):
) children.append(child_shard)
own_shard = parse_header_shards( if use_first_child_as_header and len(children) > 0:
own_elements, start_line, end_line, first_token_is_header=False return merge_into_first_shard(children, start_line, end_line)
) else:
return append_children(own_shard, child_shards) return Shard(start_line=start_line, end_line=end_line, children=children)
def parse_markdown_file(file_name: str, file_content: str) -> StreamFile: def parse_markdown_file(file_name: str, file_content: str) -> StreamFile:
shard = None shard = Shard(start_line=1, end_line=max([len(file_content.splitlines()), 1]))
with TagMarkdownRenderer(): with TagMarkdownRenderer():
ast = Document(file_content) ast = Document(file_content)
line_count = len(file_content.splitlines())
if block_tokens := ast.children: if block_tokens := ast.children:
shard = parse_header_shards(block_tokens, 1, line_count) if parsed_shard := parse_header_shards(
block_tokens, shard.start_line, shard.end_line
):
shard = parsed_shard
return StreamFile(shard=shard, filename=file_name) return StreamFile(shard=shard, filename=file_name)

View file

@ -9,7 +9,7 @@ class TestParseProcess:
def test_parse_empty_file(self): def test_parse_empty_file(self):
assert parse_markdown_file(self.file_name, "") == StreamFile( assert parse_markdown_file(self.file_name, "") == StreamFile(
filename=self.file_name, shard=None filename=self.file_name, shard=Shard(start_line=1, end_line=1)
) )
def test_parse_basic_one_line_file(self): def test_parse_basic_one_line_file(self):
@ -17,11 +17,8 @@ class TestParseProcess:
assert parse_markdown_file(self.file_name, test_file) == StreamFile( assert parse_markdown_file(self.file_name, test_file) == StreamFile(
filename=self.file_name, filename=self.file_name,
shard=Shard( shard=Shard(
markers=[],
tags=[],
start_line=1, start_line=1,
end_line=1, end_line=1,
children=[],
), ),
) )
@ -109,37 +106,27 @@ class TestParseProcess:
def test_parse_split_paragraph_with_inner_tags_at_more_positions(self): def test_parse_split_paragraph_with_inner_tags_at_more_positions(self):
file_text = "Hello @Tag1 World!\n\n@Marker Block 1\n\nBlock 2 @Tag2" file_text = "Hello @Tag1 World!\n\n@Marker Block 1\n\nBlock 2 @Tag2"
assert parse_markdown_file(self.file_name, file_text) == StreamFile( assert parse_markdown_file(self.file_name, file_text).shard == Shard(
filename=self.file_name,
shard=Shard(
markers=[], markers=[],
tags=["Tag1", "Tag2"], tags=["Tag1", "Tag2"],
start_line=1, start_line=1,
end_line=5, end_line=5,
children=[ children=[
Shard( Shard(
markers=["Marker"], markers=["Marker"], tags=[], start_line=3, end_line=3, children=[]
tags=[],
start_line=3,
end_line=3,
children=[],
), ),
], ],
),
) )
def test_parse_header_without_markers(self): def test_parse_header_without_markers(self):
file_text = "# Heading\n\n## Subheading" file_text = "# Heading\n\n## Subheading"
assert parse_markdown_file(self.file_name, file_text) == StreamFile( assert parse_markdown_file(self.file_name, file_text).shard == Shard(
filename=self.file_name,
shard=Shard(
markers=[], markers=[],
tags=[], tags=[],
start_line=1, start_line=1,
end_line=3, end_line=3,
children=[], children=[],
),
) )
def test_parse_split_at_heading_if_marker_on_subheading(self): def test_parse_split_at_heading_if_marker_on_subheading(self):
@ -192,13 +179,17 @@ class TestParseProcess:
def test_continue_full_parsing_before_headings_start(self): def test_continue_full_parsing_before_headings_start(self):
file_text = "Hello\n\n@Marker1 World!\n\n# @Marker2 I'm a heading!" file_text = "Hello\n\n@Marker1 World!\n\n# @Marker2 I'm a heading!"
assert parse_markdown_file(self.file_name, file_text) == StreamFile( assert parse_markdown_file(self.file_name, file_text).shard == Shard(
filename=self.file_name,
shard=Shard(
markers=[], markers=[],
tags=[], tags=[],
start_line=1, start_line=1,
end_line=5, end_line=5,
children=[
Shard(
markers=[],
tags=[],
start_line=1,
end_line=4,
children=[ children=[
Shard( Shard(
markers=["Marker1"], markers=["Marker1"],
@ -206,14 +197,67 @@ class TestParseProcess:
start_line=3, start_line=3,
end_line=3, end_line=3,
children=[], children=[],
)
],
), ),
Shard( Shard(
markers=["Marker2"], markers=["Marker2"], tags=[], start_line=5, end_line=5, children=[]
),
],
)
def test_complex_heading_structure(self):
file_text = "Preamble @Preamble\n## @Intro\n# @Title\n## @Chapter1\n## @Chapter2\n### Section 1\n### Section 2"
assert parse_markdown_file(self.file_name, file_text).shard == Shard(
markers=[],
tags=[], tags=[],
start_line=5, start_line=1,
end_line=5, end_line=7,
children=[
Shard(
markers=[],
tags=[],
start_line=1,
end_line=2,
children=[
Shard(
markers=[],
tags=["Preamble"],
start_line=1,
end_line=1,
children=[],
),
Shard(
markers=["Intro"],
tags=[],
start_line=2,
end_line=2,
children=[], children=[],
), ),
], ],
), ),
Shard(
markers=["Title"],
tags=[],
start_line=3,
end_line=7,
children=[
Shard(
markers=["Chapter1"],
tags=[],
start_line=4,
end_line=4,
children=[],
),
Shard(
markers=["Chapter2"],
tags=[],
start_line=5,
end_line=7,
children=[],
),
],
),
],
) )