From 0f645e7e9bf0800b62a1c60197ccff113c01b3fb Mon Sep 17 00:00:00 2001 From: Konstantin Fickel Date: Sat, 21 Jun 2025 16:02:16 +0200 Subject: [PATCH] feat: fix heading & paragraph parsing Signed-off-by: Konstantin Fickel --- src/streamer/parse.py | 278 ++++++++++++++++++++---------------------- test/test_parse.py | 152 +++++++++++++++-------- 2 files changed, 228 insertions(+), 202 deletions(-) diff --git a/src/streamer/parse.py b/src/streamer/parse.py index 3ef2200..197c498 100644 --- a/src/streamer/parse.py +++ b/src/streamer/parse.py @@ -3,11 +3,12 @@ from itertools import takewhile, dropwhile from typing import Optional, TypeVar from pydantic import BaseModel from mistletoe import Document -from mistletoe.markdown_renderer import MarkdownRenderer, Fragment, BlankLine +from mistletoe.markdown_renderer import MarkdownRenderer, Fragment from mistletoe.span_token import SpanToken, RawText from mistletoe.block_token import Paragraph, BlockToken, Heading from mistletoe.token import Token from itertools import pairwise +from collections import Counter import re @@ -46,28 +47,28 @@ def extract_tags(tokens: list[Token]) -> list[str]: return list(map(lambda marker: marker.content, tags)) -def extract_markers_and_tags(tokens: list[Token]) -> tuple[list[str], list[str]]: +def extract_markers_and_tags(tokens: BlockToken) -> tuple[list[str], list[str]]: def marker_boundary_check(token: Token): return isinstance(token, Tag) or ( isinstance(token, RawText) and re.match(r"^[\s]*$", token.content) ) - marker_region = takewhile(marker_boundary_check, tokens[0].children) - tag_region_first = dropwhile(marker_boundary_check, tokens[0].children) + marker_region = takewhile(marker_boundary_check, tokens.children) + tag_region = dropwhile(marker_boundary_check, tokens.children) - tags: list[str] = extract_tags(tag_region_first) - for token in tokens[1:]: - tags.extend(extract_tags(token.children)) - - return extract_tags(marker_region), tags + return extract_tags(marker_region), extract_tags(tag_region) -def has_markers(token: Token) -> bool: - markers, _ = extract_markers_and_tags([token]) - return len(markers) > 0 +def has_markers(block_token: BlockToken) -> bool: + for child in block_token.children: + if isinstance(child, Tag): + return True + if isinstance(child, RawText) and re.match(r"^[\s]*$", child.content): + continue + return False -def find_shard_positions(block_tokens: list[BlockToken]) -> list[int]: +def find_paragraph_shard_positions(block_tokens: list[BlockToken]) -> list[int]: return [ index for index, block_token in enumerate(block_tokens) @@ -84,80 +85,7 @@ def split_at(list_to_be_split: list[A], positions: list[int]): return [list_to_be_split[left:right] for left, right in pairwise(positions)] -def to_shard( - tokens: list[Token], start_line: int, end_line: int, children: list[Shard] = [] -) -> Shard: - if len(children) == 1 and len(tokens) == 0: - return children[0] - - markers, tags = extract_markers_and_tags(tokens) if len(tokens) > 0 else ([], []) - - return Shard( - markers=markers, - tags=tags, - start_line=start_line, - end_line=end_line, - children=children, - ) - - -def append_children(shard: Shard, new_children: list[Shard] = []) -> Shard: - shard_children = shard.children if len(shard.children) > 0 else [] - new_shard_children = shard_children + new_children - - if ( - len(new_shard_children) == 1 - and len(shard.markers) == 0 - and len(shard.tags) == 0 - ): - return new_shard_children[0] - - return Shard( - markers=shard.markers, - tags=shard.tags, - start_line=shard.start_line, - end_line=shard.end_line, - children=shard_children + new_children, - ) - - -def parse_paragraph_shards(block_tokens: list[BlockToken], end_line: int) -> Shard: - start_line = block_tokens[0].line_number - shard_starts = find_shard_positions(block_tokens) - - child_shards: list[Shard] = [] - own_elements: list[BlockToken] = [] - - for i in range(len(block_tokens)): - token = block_tokens[i] - if i in shard_starts: - shard_end_line = ( - block_tokens[i + 1].line_number - 1 - if i + 1 < len(block_tokens) - else end_line - ) - child_shards.append(to_shard([token], token.line_number, shard_end_line)) - else: - own_elements.append(token) - - return to_shard(own_elements, start_line, end_line, children=child_shards) - - -def optional_max(source_array: list[int]) -> Optional[int]: - try: - return max(source_array) - except ValueError: - return None - - -def optional_min(source_array: list[int]) -> Optional[int]: - try: - return min(source_array) - except ValueError: - return None - - -def find_heading_positions( +def find_headings_by_level( block_tokens: list[BlockToken], header_level: int ) -> list[int]: return [ @@ -167,89 +95,143 @@ def find_heading_positions( ] +def calculate_heading_level_for_next_split( + block_tokens: list[BlockToken], +) -> Optional[int]: + """ + If there is no marker in any heading, then return None. + If only the first token is a heading with a marker, then return None. + Otherwise: Return the heading level with the lowest level (h1 < h2), of which there are two or which has a marker (and doesn't stem from first) + """ + level_of_headings_without_first_with_marker = [ + token.level + for token in block_tokens[1:] + if isinstance(token, Heading) and has_markers(token) + ] + + if len(level_of_headings_without_first_with_marker) == 0: + return None + + heading_level_counter = Counter( + [token.level for token in block_tokens if isinstance(token, Heading)] + ) + + return min( + [level for level, count in heading_level_counter.items() if count >= 2] + + level_of_headings_without_first_with_marker + ) + + +def merge_into_first_shard( + shards: list[Shard], start_line: int, end_line: int, additional_tags: list[str] = [] +): + return shards[0].model_copy( + update={ + "start_line": start_line, + "end_line": end_line, + "children": shards[1:], + "tags": shards[0].tags + additional_tags, + } + ) + + +def parse_single_block_shards(block_token: BlockToken, start_line: int, end_line: int): + markers, tags = extract_markers_and_tags(block_token) + return Shard(start_line=start_line, end_line=end_line, markers=markers, tags=tags) + + +def parse_paragraph_shards( + block_tokens: list[BlockToken], start_line: int, end_line: int +) -> Optional[Shard]: + is_first_block_heading = isinstance(block_tokens[0], Heading) and has_markers( + block_tokens[0] + ) + + paragraph_positions = find_paragraph_shard_positions(block_tokens) + children = [] + added_tags = [] + + is_first_block_only_with_marker = False + + for i, token in enumerate(block_tokens): + if i in paragraph_positions: + is_first_block_heading = i == 0 + + if i in paragraph_positions or (i == 0 and is_first_block_heading): + child_start_line = token.line_number + child_end_line = ( + block_tokens[i + 1].line_number - 1 + if i + 1 < len(block_tokens) + else end_line + ) + + children.append( + parse_single_block_shards(token, child_start_line, child_end_line) + ) + elif token.children: + added_tags.extend(extract_tags(token.children)) + + if len(children) == 0 and len(added_tags) == 0: + return None + if is_first_block_heading or is_first_block_only_with_marker: + return merge_into_first_shard(children, start_line, end_line, added_tags) + else: + return Shard( + start_line=start_line, end_line=end_line, children=children, tags=added_tags + ) + + def parse_header_shards( block_tokens: list[BlockToken], start_line: int, end_line: int, - first_token_is_header: bool = False, -) -> Shard: + use_first_child_as_header: bool = False, +) -> Optional[Shard]: if len(block_tokens) == 0: return Shard(start_line=start_line, end_line=end_line) - max_header_level_with_marker = optional_max( - map( - lambda heading: heading.level, - filter( - lambda block_token: isinstance(block_token, Heading) - and has_markers(block_token), - block_tokens[1:] if first_token_is_header else block_tokens, - ), - ) - ) + split_at_heading_level = calculate_heading_level_for_next_split(block_tokens) - if max_header_level_with_marker is None: - return parse_paragraph_shards(block_tokens, end_line) + if split_at_heading_level is None: + return parse_paragraph_shards(block_tokens, start_line, end_line) - header_level_for_slicing = optional_min( - map( - lambda heading: heading.level, - filter( - lambda block_token: isinstance(block_token, Heading), - block_tokens[1:] if first_token_is_header else block_tokens, - ), - ) - ) + heading_positions = find_headings_by_level(block_tokens, split_at_heading_level) - slice_positions = find_heading_positions(block_tokens, header_level_for_slicing) - if first_token_is_header: - for end_of_header in range(0, len(block_tokens) - 1): - if not isinstance(block_tokens[end_of_header + 1], BlankLine): - continue - slice_positions.append(end_of_header) - is_first_slice_part_of_parent_shard = 0 not in slice_positions + block_tokens_split_by_heading = split_at(block_tokens, heading_positions) - sliced_by_heading_level = split_at(block_tokens, slice_positions) - - own_elements = ( - sliced_by_heading_level[0] if is_first_slice_part_of_parent_shard else [] - ) - child_elements = ( - sliced_by_heading_level[1:] - if is_first_slice_part_of_parent_shard - else sliced_by_heading_level - ) - - child_shards = [] - for i in range(len(child_elements)): - child_start_line = child_elements[i][0].line_number + children = [] + for i, child_blocks in enumerate(block_tokens_split_by_heading): + child_start_line = child_blocks[0].line_number child_end_line = ( - child_elements[i + 1][0].line_number - 1 - if i + 1 < len(child_elements) + block_tokens_split_by_heading[i + 1][0].line_number - 1 + if i + 1 < len(block_tokens_split_by_heading) else end_line ) - child_shards.append( - parse_header_shards( - child_elements[i], - child_start_line, - child_end_line, - first_token_is_header=True, - ) - ) + if child_shard := parse_header_shards( + child_blocks, + child_start_line, + child_end_line, + use_first_child_as_header=i > 0 or 0 in heading_positions, + ): + children.append(child_shard) - own_shard = parse_header_shards( - own_elements, start_line, end_line, first_token_is_header=False - ) - return append_children(own_shard, child_shards) + if use_first_child_as_header and len(children) > 0: + return merge_into_first_shard(children, start_line, end_line) + else: + return Shard(start_line=start_line, end_line=end_line, children=children) def parse_markdown_file(file_name: str, file_content: str) -> StreamFile: - shard = None + shard = Shard(start_line=1, end_line=max([len(file_content.splitlines()), 1])) + with TagMarkdownRenderer(): ast = Document(file_content) - line_count = len(file_content.splitlines()) if block_tokens := ast.children: - shard = parse_header_shards(block_tokens, 1, line_count) + if parsed_shard := parse_header_shards( + block_tokens, shard.start_line, shard.end_line + ): + shard = parsed_shard return StreamFile(shard=shard, filename=file_name) diff --git a/test/test_parse.py b/test/test_parse.py index 4b025a0..4fd4d56 100644 --- a/test/test_parse.py +++ b/test/test_parse.py @@ -9,7 +9,7 @@ class TestParseProcess: def test_parse_empty_file(self): assert parse_markdown_file(self.file_name, "") == StreamFile( - filename=self.file_name, shard=None + filename=self.file_name, shard=Shard(start_line=1, end_line=1) ) def test_parse_basic_one_line_file(self): @@ -17,11 +17,8 @@ class TestParseProcess: assert parse_markdown_file(self.file_name, test_file) == StreamFile( filename=self.file_name, shard=Shard( - markers=[], - tags=[], start_line=1, end_line=1, - children=[], ), ) @@ -109,37 +106,27 @@ class TestParseProcess: def test_parse_split_paragraph_with_inner_tags_at_more_positions(self): file_text = "Hello @Tag1 World!\n\n@Marker Block 1\n\nBlock 2 @Tag2" - assert parse_markdown_file(self.file_name, file_text) == StreamFile( - filename=self.file_name, - shard=Shard( - markers=[], - tags=["Tag1", "Tag2"], - start_line=1, - end_line=5, - children=[ - Shard( - markers=["Marker"], - tags=[], - start_line=3, - end_line=3, - children=[], - ), - ], - ), + assert parse_markdown_file(self.file_name, file_text).shard == Shard( + markers=[], + tags=["Tag1", "Tag2"], + start_line=1, + end_line=5, + children=[ + Shard( + markers=["Marker"], tags=[], start_line=3, end_line=3, children=[] + ), + ], ) def test_parse_header_without_markers(self): file_text = "# Heading\n\n## Subheading" - assert parse_markdown_file(self.file_name, file_text) == StreamFile( - filename=self.file_name, - shard=Shard( - markers=[], - tags=[], - start_line=1, - end_line=3, - children=[], - ), + assert parse_markdown_file(self.file_name, file_text).shard == Shard( + markers=[], + tags=[], + start_line=1, + end_line=3, + children=[], ) def test_parse_split_at_heading_if_marker_on_subheading(self): @@ -192,28 +179,85 @@ class TestParseProcess: def test_continue_full_parsing_before_headings_start(self): file_text = "Hello\n\n@Marker1 World!\n\n# @Marker2 I'm a heading!" - assert parse_markdown_file(self.file_name, file_text) == StreamFile( - filename=self.file_name, - shard=Shard( - markers=[], - tags=[], - start_line=1, - end_line=5, - children=[ - Shard( - markers=["Marker1"], - tags=[], - start_line=3, - end_line=3, - children=[], - ), - Shard( - markers=["Marker2"], - tags=[], - start_line=5, - end_line=5, - children=[], - ), - ], - ), + assert parse_markdown_file(self.file_name, file_text).shard == Shard( + markers=[], + tags=[], + start_line=1, + end_line=5, + children=[ + Shard( + markers=[], + tags=[], + start_line=1, + end_line=4, + children=[ + Shard( + markers=["Marker1"], + tags=[], + start_line=3, + end_line=3, + children=[], + ) + ], + ), + Shard( + markers=["Marker2"], tags=[], start_line=5, end_line=5, children=[] + ), + ], + ) + + def test_complex_heading_structure(self): + file_text = "Preamble @Preamble\n## @Intro\n# @Title\n## @Chapter1\n## @Chapter2\n### Section 1\n### Section 2" + + assert parse_markdown_file(self.file_name, file_text).shard == Shard( + markers=[], + tags=[], + start_line=1, + end_line=7, + children=[ + Shard( + markers=[], + tags=[], + start_line=1, + end_line=2, + children=[ + Shard( + markers=[], + tags=["Preamble"], + start_line=1, + end_line=1, + children=[], + ), + Shard( + markers=["Intro"], + tags=[], + start_line=2, + end_line=2, + children=[], + ), + ], + ), + Shard( + markers=["Title"], + tags=[], + start_line=3, + end_line=7, + children=[ + Shard( + markers=["Chapter1"], + tags=[], + start_line=4, + end_line=4, + children=[], + ), + Shard( + markers=["Chapter2"], + tags=[], + start_line=5, + end_line=7, + children=[], + ), + ], + ), + ], )