feat: also support tags in further blocks

Signed-off-by: Konstantin Fickel <mail@konstantinfickel.de>
This commit is contained in:
Konstantin Fickel 2025-06-20 16:15:11 +02:00
parent 42262844a6
commit b7ad75f079
2 changed files with 64 additions and 27 deletions

View file

@ -42,46 +42,56 @@ T = TypeVar("T")
def extract_tags(tokens: list[Token]) -> list[str]: def extract_tags(tokens: list[Token]) -> list[str]:
return list(map( return list(
lambda marker: marker.content, map(
filter(lambda token: isinstance(token, Tag), tokens), lambda marker: marker.content,
)) filter(lambda token: isinstance(token, Tag), tokens),
)
def extract_markers_and_tags(header: Optional[Token]) -> tuple[list[str], list[str]]:
marker_boundary_check = lambda token: isinstance(token, Tag) or (
isinstance(token, RawText) and re.match(r"^[\s]*$", token.content)
) )
marker_region = takewhile(marker_boundary_check, header.children)
tag_region = dropwhile(marker_boundary_check, header.children)
return extract_tags(marker_region), extract_tags(tag_region)
def extract_markers_and_tags(tokens: list[Token]) -> tuple[list[str], list[str]]:
def marker_boundary_check(token: Token):
return isinstance(token, Tag) or (
isinstance(token, RawText) and re.match(r"^[\s]*$", token.content)
)
marker_region = takewhile(marker_boundary_check, tokens[0].children)
tag_region_first = dropwhile(marker_boundary_check, tokens[0].children)
tags: list[str] = extract_tags(tag_region_first)
for token in tokens[1:]:
tags.extend(extract_tags(token.children))
return extract_tags(marker_region), tags
def has_markers(token: Token) -> bool: def has_markers(token: Token) -> bool:
markers, _ = extract_markers_and_tags(token) markers, _ = extract_markers_and_tags([token])
return len(markers) > 0 return len(markers) > 0
def find_shard_positions(block_tokens: list[BlockToken]) -> list[int]: def find_shard_positions(block_tokens: list[BlockToken]) -> list[int]:
return [ return [
index for index, block_token in enumerate(block_tokens) index
for index, block_token in enumerate(block_tokens)
if isinstance(block_token, Paragraph) and has_markers(block_token) if isinstance(block_token, Paragraph) and has_markers(block_token)
] ]
T = TypeVar('T') T = TypeVar("T")
def split_at(list_to_be_split: list[T], positions: list[int]): def split_at(list_to_be_split: list[T], positions: list[int]):
positions = sorted(set([0, *positions, len(list_to_be_split)])) positions = sorted(set([0, *positions, len(list_to_be_split)]))
return [ return [list_to_be_split[left:right] for left, right in pairwise(positions)]
list_to_be_split[left : right]
for left, right in pairwise(positions)
]
def to_shard(tokens: list[Token], start_line: int, end_line: int, children: list[Shard] = []) -> Shard:
markers, tags = extract_markers_and_tags(tokens[0]) if len(tokens) > 0 else ([], []) def to_shard(
# TODO: also find tags of children! tokens: list[Token], start_line: int, end_line: int, children: list[Shard] = []
) -> Shard:
markers, tags = extract_markers_and_tags(tokens) if len(tokens) > 0 else ([], [])
return Shard( return Shard(
markers=markers, markers=markers,
@ -91,9 +101,10 @@ def to_shard(tokens: list[Token], start_line: int, end_line: int, children: list
children=children, children=children,
) )
def parse_markdown_file(file_name: str, file_content: str) -> StreamFile: def parse_markdown_file(file_name: str, file_content: str) -> StreamFile:
shard = None shard = None
with TagMarkdownRenderer() as renderer: with TagMarkdownRenderer():
ast = Document(file_content) ast = Document(file_content)
line_count = len(file_content.splitlines()) line_count = len(file_content.splitlines())
@ -106,11 +117,15 @@ def parse_markdown_file(file_name: str, file_content: str) -> StreamFile:
for i in range(len(block_tokens)): for i in range(len(block_tokens)):
token = block_tokens[i] token = block_tokens[i]
if i in shard_starts: if i in shard_starts:
end_line = block_tokens[i + 1].line_number - 1 if i + 1 < len(block_tokens) else line_count end_line = (
block_tokens[i + 1].line_number - 1
if i + 1 < len(block_tokens)
else line_count
)
child_shards.append(to_shard([token], token.line_number, end_line)) child_shards.append(to_shard([token], token.line_number, end_line))
else: else:
own_elements.append(token) own_elements.append(token)
if len(child_shards) == 1 and len(own_elements) == 0: if len(child_shards) == 1 and len(own_elements) == 0:
shard = child_shards[0] shard = child_shards[0]
else: else:

View file

@ -78,7 +78,7 @@ class TestParseProcess:
) )
def test_parse_split_paragraphs_into_shards(self): def test_parse_split_paragraphs_into_shards(self):
file_text = f"Hello World!\n\n@Tag1 Block 1\n\n@Tag2 Block 2" file_text = "Hello World!\n\n@Tag1 Block 1\n\n@Tag2 Block 2"
assert parse_markdown_file(self.file_name, file_text) == StreamFile( assert parse_markdown_file(self.file_name, file_text) == StreamFile(
filename=self.file_name, filename=self.file_name,
@ -104,4 +104,26 @@ class TestParseProcess:
), ),
], ],
), ),
) )
def test_parse_split_paragraph_with_inner_tags_at_more_positions(self):
file_text = "Hello @Tag1 World!\n\n@Marker Block 1\n\nBlock 2 @Tag2"
assert parse_markdown_file(self.file_name, file_text) == StreamFile(
filename=self.file_name,
shard=Shard(
markers=[],
tags=["Tag1", "Tag2"],
start_line=1,
end_line=5,
children=[
Shard(
markers=["Marker"],
tags=[],
start_line=3,
end_line=3,
children=[],
),
],
),
)