refactor: split parse into multiple files

Signed-off-by: Konstantin Fickel <mail@konstantinfickel.de>
This commit is contained in:
Konstantin Fickel 2025-06-21 16:43:40 +02:00
parent 082c13b046
commit dc2a97d3b8
6 changed files with 117 additions and 92 deletions

View file

@ -0,0 +1,4 @@
from .shard import Shard, StreamFile
from .parse import parse_markdown_file
__all__ = ["Shard", "StreamFile", "parse_markdown_file"]

View file

@ -0,0 +1,45 @@
import re
from typing import Iterable
from mistletoe.block_token import BlockToken
from mistletoe.span_token import RawText
from mistletoe.token import Token
from .markdown_tag import Tag
def extract_tags(tokens: Iterable[Token]) -> list[str]:
return [token.content for token in tokens if isinstance(token, Tag)]
def extract_markers_and_tags(block_token: BlockToken) -> tuple[list[str], list[str]]:
markers, tags = [], []
is_marker = True
if block_token.children is None:
return [], []
for token in block_token.children:
if isinstance(token, Tag):
if is_marker:
markers.append(token)
else:
tags.append(token)
elif not (isinstance(token, RawText) and re.match(r"^[\s]*$", token.content)):
is_marker = False
return extract_tags(markers), extract_tags(tags)
def has_markers(block_token: BlockToken) -> bool:
if block_token.children is None:
return False
for child in block_token.children:
if isinstance(child, Tag):
return True
elif not (isinstance(child, RawText) and re.match(r"^[\s]*$", child.content)):
return False
return False
__all__ = ["extract_tags", "extract_markers_and_tags", "has_markers"]

View file

@ -0,0 +1,13 @@
from itertools import pairwise
from typing import TypeVar
A = TypeVar("A")
def split_at(list_to_be_split: list[A], positions: list[int]):
positions = sorted(set([0, *positions, len(list_to_be_split)]))
return [list_to_be_split[left:right] for left, right in pairwise(positions)]
__all__ = ["split_at"]

View file

@ -0,0 +1,20 @@
import re
from mistletoe.markdown_renderer import Fragment, MarkdownRenderer
from mistletoe.span_token import SpanToken
class Tag(SpanToken):
parse_inner = False
pattern = re.compile(r"@([^\s]+)")
class TagMarkdownRenderer(MarkdownRenderer):
def __init__(self):
super().__init__(Tag)
def render_tag(self, token: Tag):
yield Fragment("@")
yield Fragment(token.content)
__all__ = ["Tag", "TagMarkdownRenderer"]

176
src/streamer/parse/parse.py Normal file
View file

@ -0,0 +1,176 @@
from typing import Optional
from mistletoe import Document
from mistletoe.block_token import Paragraph, BlockToken, Heading
from collections import Counter
from .markdown_tag import TagMarkdownRenderer
from .extract_tag import extract_markers_and_tags, extract_tags, has_markers
from .shard import Shard, StreamFile
from .list import split_at
def get_line_number(block_token: BlockToken) -> int:
return block_token.line_number # type: ignore
def merge_into_first_shard(
shards: list[Shard], start_line: int, end_line: int, additional_tags: list[str] = []
):
return shards[0].model_copy(
update={
"start_line": start_line,
"end_line": end_line,
"children": shards[1:],
"tags": shards[0].tags + additional_tags,
}
)
def find_paragraph_shard_positions(block_tokens: list[BlockToken]) -> list[int]:
return [
index
for index, block_token in enumerate(block_tokens)
if isinstance(block_token, Paragraph) and has_markers(block_token)
]
def find_headings_by_level(
block_tokens: list[BlockToken], header_level: int
) -> list[int]:
return [
index
for index, block_token in enumerate(block_tokens)
if isinstance(block_token, Heading) and block_token.level == header_level
]
def calculate_heading_level_for_next_split(
block_tokens: list[BlockToken],
) -> Optional[int]:
"""
If there is no marker in any heading, then return None.
If only the first token is a heading with a marker, then return None.
Otherwise: Return the heading level with the lowest level (h1 < h2), of which there are two or which has a marker (and doesn't stem from first)
"""
level_of_headings_without_first_with_marker = [
token.level
for token in block_tokens[1:]
if isinstance(token, Heading) and has_markers(token)
]
if len(level_of_headings_without_first_with_marker) == 0:
return None
heading_level_counter = Counter(
[token.level for token in block_tokens if isinstance(token, Heading)]
)
return min(
[level for level, count in heading_level_counter.items() if count >= 2]
+ level_of_headings_without_first_with_marker
)
def parse_single_block_shards(block_token: BlockToken, start_line: int, end_line: int):
markers, tags = extract_markers_and_tags(block_token)
return Shard(start_line=start_line, end_line=end_line, markers=markers, tags=tags)
def parse_paragraph_shards(
block_tokens: list[BlockToken], start_line: int, end_line: int
) -> Optional[Shard]:
is_first_block_heading = isinstance(block_tokens[0], Heading) and has_markers(
block_tokens[0]
)
paragraph_positions = find_paragraph_shard_positions(block_tokens)
children = []
added_tags = []
is_first_block_only_with_marker = False
for i, token in enumerate(block_tokens):
if i in paragraph_positions:
is_first_block_heading = i == 0
if i in paragraph_positions or (i == 0 and is_first_block_heading):
child_start_line = get_line_number(token)
child_end_line = (
get_line_number(block_tokens[i + 1]) - 1
if i + 1 < len(block_tokens)
else end_line
)
children.append(
parse_single_block_shards(token, child_start_line, child_end_line)
)
elif token.children:
added_tags.extend(extract_tags(token.children))
if len(children) == 0 and len(added_tags) == 0:
return None
if is_first_block_heading or is_first_block_only_with_marker:
return merge_into_first_shard(children, start_line, end_line, added_tags)
else:
return Shard(
start_line=start_line, end_line=end_line, children=children, tags=added_tags
)
def parse_header_shards(
block_tokens: list[BlockToken],
start_line: int,
end_line: int,
use_first_child_as_header: bool = False,
) -> Optional[Shard]:
if len(block_tokens) == 0:
return Shard(start_line=start_line, end_line=end_line)
split_at_heading_level = calculate_heading_level_for_next_split(block_tokens)
if split_at_heading_level is None:
return parse_paragraph_shards(block_tokens, start_line, end_line)
heading_positions = find_headings_by_level(block_tokens, split_at_heading_level)
block_tokens_split_by_heading = split_at(block_tokens, heading_positions)
children = []
for i, child_blocks in enumerate(block_tokens_split_by_heading):
child_start_line = get_line_number(child_blocks[0])
child_end_line = (
get_line_number(block_tokens_split_by_heading[i + 1][0]) - 1
if i + 1 < len(block_tokens_split_by_heading)
else end_line
)
if child_shard := parse_header_shards(
child_blocks,
child_start_line,
child_end_line,
use_first_child_as_header=i > 0 or 0 in heading_positions,
):
children.append(child_shard)
if use_first_child_as_header and len(children) > 0:
return merge_into_first_shard(children, start_line, end_line)
else:
return Shard(start_line=start_line, end_line=end_line, children=children)
def parse_markdown_file(file_name: str, file_content: str) -> StreamFile:
shard = Shard(start_line=1, end_line=max([len(file_content.splitlines()), 1]))
with TagMarkdownRenderer():
ast = Document(file_content)
block_tokens: list[BlockToken] = ast.children # type: ignore
if len(block_tokens) > 0:
if parsed_shard := parse_header_shards(
block_tokens, shard.start_line, shard.end_line
):
shard = parsed_shard
return StreamFile(shard=shard, filename=file_name)
__all__ = ["Shard", "StreamFile", "parse_markdown_file"]

View file

@ -0,0 +1,19 @@
from __future__ import annotations
from typing import Optional
from pydantic import BaseModel
class Shard(BaseModel):
markers: list[str] = []
tags: list[str] = []
start_line: int
end_line: int
children: list[Shard] = []
class StreamFile(BaseModel):
filename: str
shard: Optional[Shard] = None
__all__ = ["Shard", "StreamFile"]