refactor!: rename package from streamer to streamd
- Rename src/streamer/ to src/streamd/ - Update all internal imports - Update pyproject.toml project name and entry point - Update README branding (Streamer -> Strea.md) - Switch from pyright to basedpyright - Bump requires-python to >=3.13
This commit is contained in:
parent
49cd9bcfa0
commit
af2debc19b
23 changed files with 48 additions and 789 deletions
|
|
@ -2,7 +2,7 @@ from __future__ import annotations
|
|||
|
||||
from datetime import datetime
|
||||
|
||||
from streamer.parse.shard import Shard
|
||||
from streamd.parse.shard import Shard
|
||||
|
||||
|
||||
class LocalizedShard(Shard):
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
from streamer.localize.repository_configuration import (
|
||||
from streamd.localize.repository_configuration import (
|
||||
Dimension,
|
||||
Marker,
|
||||
MarkerPlacement,
|
||||
|
|
@ -1,7 +1,7 @@
|
|||
from enum import StrEnum
|
||||
|
||||
from streamer.localize import RepositoryConfiguration
|
||||
from streamer.localize.repository_configuration import (
|
||||
from streamd.localize import RepositoryConfiguration
|
||||
from streamd.localize.repository_configuration import (
|
||||
Dimension,
|
||||
Marker,
|
||||
MarkerPlacement,
|
||||
|
|
@ -2,9 +2,8 @@ from datetime import datetime
|
|||
from itertools import groupby
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from streamer.localize import LocalizedShard
|
||||
from streamer.query.find import find_shard_by_set_dimension
|
||||
from streamd.localize import LocalizedShard
|
||||
from streamd.query.find import find_shard_by_set_dimension
|
||||
|
||||
from .configuration import TIMESHEET_DIMENSION_NAME, TimesheetPointType
|
||||
from .timecard import SpecialDayType, Timecard, Timesheet
|
||||
|
|
@ -1,126 +0,0 @@
|
|||
import glob
|
||||
import os
|
||||
from datetime import datetime
|
||||
from shutil import move
|
||||
from typing import Annotated, Generator
|
||||
|
||||
import click
|
||||
import typer
|
||||
from rich import print
|
||||
from rich.markdown import Markdown
|
||||
from rich.panel import Panel
|
||||
|
||||
from streamer.localize import (
|
||||
LocalizedShard,
|
||||
RepositoryConfiguration,
|
||||
localize_stream_file,
|
||||
)
|
||||
from streamer.localize.preconfigured_configurations import TaskConfiguration
|
||||
from streamer.parse import parse_markdown_file
|
||||
from streamer.query import find_shard_by_position
|
||||
from streamer.query.find import find_shard_by_set_dimension
|
||||
from streamer.settings import Settings
|
||||
from streamer.timesheet.configuration import BasicTimesheetConfiguration
|
||||
from streamer.timesheet.extract import extract_timesheets
|
||||
|
||||
app = typer.Typer()
|
||||
|
||||
|
||||
def all_files(config: RepositoryConfiguration) -> Generator[LocalizedShard]:
|
||||
for file_name in glob.glob(f"{glob.escape(Settings().base_folder)}/*.md"):
|
||||
with open(file_name, "r") as file:
|
||||
file_content = file.read()
|
||||
if shard := localize_stream_file(
|
||||
parse_markdown_file(file_name, file_content), config
|
||||
):
|
||||
yield shard
|
||||
|
||||
|
||||
@app.command()
|
||||
def todo() -> None:
|
||||
all_shards = list(all_files(TaskConfiguration))
|
||||
|
||||
for task_shard in find_shard_by_position(all_shards, "task", "open"):
|
||||
with open(task_shard.location["file"], "r") as file:
|
||||
file_content = file.read().splitlines()
|
||||
print(
|
||||
Panel(
|
||||
Markdown(
|
||||
"\n".join(
|
||||
file_content[
|
||||
task_shard.start_line - 1 : task_shard.end_line
|
||||
]
|
||||
)
|
||||
),
|
||||
title=f"{task_shard.location['file']}:{task_shard.start_line}",
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@app.command()
|
||||
def edit(number: Annotated[int, typer.Argument()] = 1) -> None:
|
||||
all_shards = list(all_files(TaskConfiguration))
|
||||
sorted_shards = sorted(all_shards, key=lambda s: s.moment)
|
||||
|
||||
if abs(number) >= len(sorted_shards):
|
||||
raise ValueError("Argument out of range")
|
||||
|
||||
selected_number = number
|
||||
if selected_number >= 0:
|
||||
selected_number = len(sorted_shards) - selected_number
|
||||
else:
|
||||
selected_number = -selected_number
|
||||
|
||||
click.edit(None, filename=sorted_shards[selected_number].location["file"])
|
||||
|
||||
|
||||
@app.command()
|
||||
def timesheet() -> None:
|
||||
all_shards = list(all_files(BasicTimesheetConfiguration))
|
||||
sheets = sorted(extract_timesheets(all_shards), key=lambda card: card.date)
|
||||
for sheet in sheets:
|
||||
print(sheet.date)
|
||||
print(
|
||||
",".join(
|
||||
map(lambda card: f"{card.from_time},{card.to_time}", sheet.timecards)
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@app.command()
|
||||
def new() -> None:
|
||||
streamer_directory = Settings().base_folder
|
||||
|
||||
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
|
||||
preliminary_file_name = f"{timestamp}_wip.md"
|
||||
prelimary_path = os.path.join(streamer_directory, preliminary_file_name)
|
||||
|
||||
content = "# "
|
||||
with open(prelimary_path, "w") as file:
|
||||
_ = file.write(content)
|
||||
|
||||
click.edit(None, filename=prelimary_path)
|
||||
|
||||
with open(prelimary_path, "r") as file:
|
||||
content = file.read()
|
||||
parsed_content = parse_markdown_file(prelimary_path, content)
|
||||
|
||||
final_file_name = f"{timestamp}.md"
|
||||
if parsed_content.shard is not None and len(
|
||||
markers := parsed_content.shard.markers
|
||||
):
|
||||
final_file_name = f"{timestamp} {' '.join(markers)}.md"
|
||||
|
||||
final_path = os.path.join(streamer_directory, final_file_name)
|
||||
_ = move(prelimary_path, final_path)
|
||||
print(f"Saved as [yellow]{final_file_name}")
|
||||
|
||||
|
||||
@app.callback(invoke_without_command=True)
|
||||
def main(ctx: typer.Context):
|
||||
if ctx.invoked_subcommand is None:
|
||||
new()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
app()
|
||||
|
|
@ -1,70 +0,0 @@
|
|||
from datetime import datetime
|
||||
|
||||
from streamer.parse.shard import Shard, StreamFile
|
||||
|
||||
from .extract_datetime import (
|
||||
extract_datetime_from_file_name,
|
||||
extract_datetime_from_marker_list,
|
||||
)
|
||||
from .localized_shard import LocalizedShard
|
||||
from .repository_configuration import RepositoryConfiguration
|
||||
|
||||
|
||||
def localize_shard(
|
||||
shard: Shard,
|
||||
config: RepositoryConfiguration,
|
||||
propagated: dict[str, str],
|
||||
moment: datetime,
|
||||
) -> LocalizedShard:
|
||||
position = {**propagated}
|
||||
private_position: dict[str, str] = {}
|
||||
|
||||
adjusted_moment: datetime = extract_datetime_from_marker_list(shard.markers, moment)
|
||||
|
||||
for marker in shard.markers:
|
||||
if marker in config.markers:
|
||||
marker_definition = config.markers[marker]
|
||||
for placement in marker_definition.placements:
|
||||
if placement.if_with <= set(shard.markers):
|
||||
dimension = config.dimensions[placement.dimension]
|
||||
|
||||
value = placement.value or marker
|
||||
|
||||
if placement.overwrites or (
|
||||
placement.dimension not in position
|
||||
and placement.dimension not in private_position
|
||||
):
|
||||
if dimension.propagate:
|
||||
position[placement.dimension] = value
|
||||
else:
|
||||
private_position[placement.dimension] = value
|
||||
|
||||
children = [
|
||||
localize_shard(child, config, position, adjusted_moment)
|
||||
for child in shard.children
|
||||
]
|
||||
|
||||
position.update(private_position)
|
||||
|
||||
return LocalizedShard(
|
||||
**shard.model_dump(exclude={"children"}),
|
||||
location=position,
|
||||
children=children,
|
||||
moment=adjusted_moment,
|
||||
)
|
||||
|
||||
|
||||
def localize_stream_file(
|
||||
stream_file: StreamFile, config: RepositoryConfiguration
|
||||
) -> LocalizedShard | None:
|
||||
shard_date = extract_datetime_from_file_name(stream_file.file_name)
|
||||
|
||||
if not shard_date or not stream_file.shard:
|
||||
raise ValueError("Could not extract date")
|
||||
|
||||
return localize_shard(
|
||||
stream_file.shard, config, {"file": stream_file.file_name}, shard_date
|
||||
)
|
||||
|
||||
|
||||
__all__ = ["localize_stream_file"]
|
||||
|
|
@ -1,108 +0,0 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class Dimension(BaseModel):
|
||||
display_name: str
|
||||
comment: Optional[str] = None
|
||||
propagate: bool = False
|
||||
|
||||
|
||||
class MarkerPlacement(BaseModel):
|
||||
if_with: set[str] = set()
|
||||
dimension: str
|
||||
value: str | None = None
|
||||
overwrites: bool = True
|
||||
|
||||
|
||||
class Marker(BaseModel):
|
||||
display_name: str
|
||||
placements: list[MarkerPlacement] = []
|
||||
|
||||
|
||||
class RepositoryConfiguration(BaseModel):
|
||||
dimensions: dict[str, Dimension]
|
||||
markers: dict[str, Marker]
|
||||
|
||||
|
||||
def merge_single_dimension(base: Dimension, second: Dimension) -> Dimension:
|
||||
second_fields_set = getattr(second, "model_fields_set", set())
|
||||
|
||||
return Dimension(
|
||||
display_name=second.display_name or base.display_name,
|
||||
comment=base.comment if second.comment is None else second.comment,
|
||||
propagate=second.propagate
|
||||
if "propagate" in second_fields_set
|
||||
else base.propagate,
|
||||
)
|
||||
|
||||
|
||||
def merge_dimensions(
|
||||
base: dict[str, Dimension], second: dict[str, Dimension]
|
||||
) -> dict[str, Dimension]:
|
||||
merged: dict[str, Dimension] = dict(base)
|
||||
for key, second_dimension in second.items():
|
||||
if key in merged:
|
||||
merged[key] = merge_single_dimension(merged[key], second_dimension)
|
||||
else:
|
||||
merged[key] = second_dimension
|
||||
return merged
|
||||
|
||||
|
||||
def _placement_identity(p: MarkerPlacement) -> tuple[frozenset[str], str]:
|
||||
return (frozenset(p.if_with), p.dimension)
|
||||
|
||||
|
||||
def merge_single_marker(base: Marker, second: Marker) -> Marker:
|
||||
merged_display_name = second.display_name or base.display_name
|
||||
|
||||
merged_placements: list[MarkerPlacement] = []
|
||||
seen: dict[tuple[frozenset[str], str], int] = {}
|
||||
|
||||
for placement in base.placements:
|
||||
ident = _placement_identity(placement)
|
||||
seen[ident] = len(merged_placements)
|
||||
merged_placements.append(placement)
|
||||
|
||||
for placement in second.placements:
|
||||
ident = _placement_identity(placement)
|
||||
if ident in seen:
|
||||
merged_placements[seen[ident]] = placement
|
||||
else:
|
||||
seen[ident] = len(merged_placements)
|
||||
merged_placements.append(placement)
|
||||
|
||||
return Marker(display_name=merged_display_name, placements=merged_placements)
|
||||
|
||||
|
||||
def merge_markers(
|
||||
base: dict[str, Marker], second: dict[str, Marker]
|
||||
) -> dict[str, Marker]:
|
||||
merged: dict[str, Marker] = dict(base)
|
||||
for key, second_marker in second.items():
|
||||
if key in merged:
|
||||
merged[key] = merge_single_marker(merged[key], second_marker)
|
||||
else:
|
||||
merged[key] = second_marker
|
||||
return merged
|
||||
|
||||
|
||||
def merge_repository_configuration(
|
||||
base: RepositoryConfiguration, second: RepositoryConfiguration
|
||||
) -> RepositoryConfiguration:
|
||||
return RepositoryConfiguration(
|
||||
dimensions=merge_dimensions(base.dimensions, second.dimensions),
|
||||
markers=merge_markers(base.markers, second.markers),
|
||||
)
|
||||
|
||||
|
||||
__all__ = [
|
||||
"Dimension",
|
||||
"Marker",
|
||||
"MarkerPlacement",
|
||||
"RepositoryConfiguration",
|
||||
"merge_repository_configuration",
|
||||
]
|
||||
|
|
@ -1,84 +0,0 @@
|
|||
import re
|
||||
from typing import Iterable
|
||||
from mistletoe.block_token import BlockToken
|
||||
from mistletoe.span_token import Emphasis, RawText, Strikethrough, Strong, Link
|
||||
from mistletoe.token import Token
|
||||
|
||||
from .markdown_tag import Tag
|
||||
|
||||
|
||||
def extract_markers_and_tags_from_single_token(
|
||||
token: Token,
|
||||
marker_boundary_encountered: bool,
|
||||
return_at_first_marker: bool = False,
|
||||
) -> tuple[list[str], list[str], bool]:
|
||||
result_markers, result_tags = [], []
|
||||
result_marker_boundary_encountered = marker_boundary_encountered
|
||||
|
||||
if isinstance(token, Tag):
|
||||
if marker_boundary_encountered:
|
||||
result_tags.append(token.content)
|
||||
else:
|
||||
result_markers.append(token.content)
|
||||
elif isinstance(token, (Emphasis, Strong, Strikethrough, Link)):
|
||||
markers, tags, child_marker_boundary_encountered = (
|
||||
extract_markers_and_tags_from_tokens(
|
||||
token.children or [],
|
||||
marker_boundary_encountered,
|
||||
return_at_first_marker,
|
||||
)
|
||||
)
|
||||
result_markers.extend(markers)
|
||||
result_tags.extend(tags)
|
||||
result_marker_boundary_encountered = (
|
||||
marker_boundary_encountered or child_marker_boundary_encountered
|
||||
)
|
||||
elif isinstance(token, RawText) and re.match(r"^[\s]*$", token.content):
|
||||
pass
|
||||
else:
|
||||
result_marker_boundary_encountered = True
|
||||
|
||||
return result_markers, result_tags, result_marker_boundary_encountered
|
||||
|
||||
|
||||
def extract_markers_and_tags_from_tokens(
|
||||
tokens: Iterable[Token],
|
||||
marker_boundary_encountered: bool,
|
||||
return_at_first_marker: bool = False,
|
||||
) -> tuple[list[str], list[str], bool]:
|
||||
result_markers, result_tags = [], []
|
||||
result_marker_boundary_encountered = marker_boundary_encountered
|
||||
|
||||
for child in tokens:
|
||||
markers, tags, child_marker_boundary_encountered = (
|
||||
extract_markers_and_tags_from_single_token(
|
||||
child, result_marker_boundary_encountered, return_at_first_marker
|
||||
)
|
||||
)
|
||||
result_markers.extend(markers)
|
||||
result_tags.extend(tags)
|
||||
result_marker_boundary_encountered = (
|
||||
marker_boundary_encountered or child_marker_boundary_encountered
|
||||
)
|
||||
|
||||
if len(result_markers) > 0 and return_at_first_marker:
|
||||
break
|
||||
|
||||
return result_markers, result_tags, result_marker_boundary_encountered
|
||||
|
||||
|
||||
def extract_markers_and_tags(block_token: BlockToken) -> tuple[list[str], list[str]]:
|
||||
markers, tags, _ = extract_markers_and_tags_from_tokens(
|
||||
block_token.children or [], False
|
||||
)
|
||||
return markers, tags
|
||||
|
||||
|
||||
def has_markers(block_token: BlockToken) -> bool:
|
||||
markers, _, _ = extract_markers_and_tags_from_tokens(
|
||||
block_token.children or [], False, return_at_first_marker=True
|
||||
)
|
||||
return len(markers) > 0
|
||||
|
||||
|
||||
__all__ = ["extract_markers_and_tags", "has_markers"]
|
||||
|
|
@ -1,20 +0,0 @@
|
|||
import re
|
||||
from mistletoe.markdown_renderer import Fragment, MarkdownRenderer
|
||||
from mistletoe.span_token import SpanToken
|
||||
|
||||
|
||||
class Tag(SpanToken):
|
||||
parse_inner = False
|
||||
pattern = re.compile(r"@([^\s*\x60~\[\]]+)")
|
||||
|
||||
|
||||
class TagMarkdownRenderer(MarkdownRenderer):
|
||||
def __init__(self):
|
||||
super().__init__(Tag)
|
||||
|
||||
def render_tag(self, token: Tag):
|
||||
yield Fragment("@")
|
||||
yield Fragment(token.content)
|
||||
|
||||
|
||||
__all__ = ["Tag", "TagMarkdownRenderer"]
|
||||
|
|
@ -1,242 +0,0 @@
|
|||
from collections import Counter
|
||||
|
||||
from mistletoe.block_token import (
|
||||
BlockToken,
|
||||
Document,
|
||||
Heading,
|
||||
List,
|
||||
ListItem,
|
||||
Paragraph,
|
||||
)
|
||||
|
||||
from .extract_tag import extract_markers_and_tags, has_markers
|
||||
from .list import split_at
|
||||
from .markdown_tag import TagMarkdownRenderer
|
||||
from .shard import Shard, StreamFile
|
||||
|
||||
|
||||
def get_line_number(block_token: BlockToken) -> int:
|
||||
return block_token.line_number # type: ignore
|
||||
|
||||
|
||||
def build_shard(
|
||||
start_line: int,
|
||||
end_line: int,
|
||||
markers: list[str] = [],
|
||||
tags: list[str] = [],
|
||||
children: list[Shard] = [],
|
||||
) -> Shard:
|
||||
if (
|
||||
len(children) == 1
|
||||
and len(tags) == 0
|
||||
and len(markers) == 0
|
||||
and children[0].start_line == start_line
|
||||
and children[0].end_line == end_line
|
||||
):
|
||||
return children[0]
|
||||
|
||||
return Shard(
|
||||
markers=markers,
|
||||
tags=tags,
|
||||
children=children,
|
||||
start_line=start_line,
|
||||
end_line=end_line,
|
||||
)
|
||||
|
||||
|
||||
def merge_into_first_shard(
|
||||
shards: list[Shard], start_line: int, end_line: int, additional_tags: list[str] = []
|
||||
):
|
||||
return shards[0].model_copy(
|
||||
update={
|
||||
"start_line": start_line,
|
||||
"end_line": end_line,
|
||||
"children": shards[1:],
|
||||
"tags": shards[0].tags + additional_tags,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def find_paragraph_shard_positions(block_tokens: list[BlockToken]) -> list[int]:
|
||||
return [
|
||||
index
|
||||
for index, block_token in enumerate(block_tokens)
|
||||
if isinstance(block_token, Paragraph) and has_markers(block_token)
|
||||
]
|
||||
|
||||
|
||||
def find_headings_by_level(
|
||||
block_tokens: list[BlockToken], header_level: int
|
||||
) -> list[int]:
|
||||
return [
|
||||
index
|
||||
for index, block_token in enumerate(block_tokens)
|
||||
if isinstance(block_token, Heading) and block_token.level == header_level
|
||||
]
|
||||
|
||||
|
||||
def calculate_heading_level_for_next_split(
|
||||
block_tokens: list[BlockToken],
|
||||
) -> int | None:
|
||||
"""
|
||||
If there is no marker in any heading, then return None.
|
||||
If only the first token is a heading with a marker, then return None.
|
||||
Otherwise: Return the heading level with the lowest level (h1 < h2), of which there are two or which has a marker (and doesn't stem from first)
|
||||
"""
|
||||
level_of_headings_without_first_with_marker = [
|
||||
token.level
|
||||
for token in block_tokens[1:]
|
||||
if isinstance(token, Heading) and has_markers(token)
|
||||
]
|
||||
|
||||
if len(level_of_headings_without_first_with_marker) == 0:
|
||||
return None
|
||||
|
||||
heading_level_counter = Counter(
|
||||
[token.level for token in block_tokens if isinstance(token, Heading)]
|
||||
)
|
||||
|
||||
return min(
|
||||
[level for level, count in heading_level_counter.items() if count >= 2]
|
||||
+ level_of_headings_without_first_with_marker
|
||||
)
|
||||
|
||||
|
||||
def parse_single_block_shards(
|
||||
block_token: BlockToken, start_line: int, end_line: int
|
||||
) -> tuple[Shard | None, list[str]]:
|
||||
markers, tags, children = [], [], []
|
||||
|
||||
if isinstance(block_token, List):
|
||||
list_items: list[ListItem] = ( # type: ignore
|
||||
list(block_token.children) if block_token.children is not None else []
|
||||
)
|
||||
for index, list_item in enumerate(list_items):
|
||||
list_item_start_line = get_line_number(list_item)
|
||||
list_item_end_line = (
|
||||
get_line_number(list_items[index + 1]) - 1
|
||||
if index + 1 < len(list_items)
|
||||
else end_line
|
||||
)
|
||||
list_item_shard, list_item_tags = parse_multiple_block_shards(
|
||||
list_item.children, # type: ignore
|
||||
list_item_start_line,
|
||||
list_item_end_line,
|
||||
)
|
||||
if list_item_shard is not None:
|
||||
children.append(list_item_shard)
|
||||
tags.extend(list_item_tags)
|
||||
|
||||
elif isinstance(block_token, (Paragraph, Heading)):
|
||||
markers, tags = extract_markers_and_tags(block_token)
|
||||
|
||||
if len(markers) == 0 and len(children) == 0:
|
||||
return None, tags
|
||||
|
||||
return build_shard(
|
||||
start_line, end_line, markers=markers, tags=tags, children=children
|
||||
), []
|
||||
|
||||
|
||||
def parse_multiple_block_shards(
|
||||
block_tokens: list[BlockToken],
|
||||
start_line: int,
|
||||
end_line: int,
|
||||
enforce_shard: bool = False,
|
||||
) -> tuple[Shard | None, list[str]]:
|
||||
is_first_block_heading = isinstance(block_tokens[0], Heading) and has_markers(
|
||||
block_tokens[0]
|
||||
)
|
||||
|
||||
paragraph_positions = find_paragraph_shard_positions(block_tokens)
|
||||
children, tags = [], []
|
||||
|
||||
is_first_block_only_with_marker = False
|
||||
|
||||
for i, token in enumerate(block_tokens):
|
||||
if i in paragraph_positions:
|
||||
is_first_block_only_with_marker = i == 0
|
||||
|
||||
child_start_line = get_line_number(token)
|
||||
child_end_line = (
|
||||
get_line_number(block_tokens[i + 1]) - 1
|
||||
if i + 1 < len(block_tokens)
|
||||
else end_line
|
||||
)
|
||||
|
||||
child_shard, child_tags = parse_single_block_shards(
|
||||
token, child_start_line, child_end_line
|
||||
)
|
||||
|
||||
if child_shard is not None:
|
||||
children.append(child_shard)
|
||||
if len(child_tags) > 0:
|
||||
tags.extend(child_tags)
|
||||
|
||||
if len(children) == 0 and not enforce_shard:
|
||||
return None, tags
|
||||
if is_first_block_heading or is_first_block_only_with_marker:
|
||||
return merge_into_first_shard(children, start_line, end_line, tags), []
|
||||
else:
|
||||
return build_shard(start_line, end_line, tags=tags, children=children), []
|
||||
|
||||
|
||||
def parse_header_shards(
|
||||
block_tokens: list[BlockToken],
|
||||
start_line: int,
|
||||
end_line: int,
|
||||
use_first_child_as_header: bool = False,
|
||||
) -> Shard | None:
|
||||
if len(block_tokens) == 0:
|
||||
return build_shard(start_line, end_line)
|
||||
|
||||
split_at_heading_level = calculate_heading_level_for_next_split(block_tokens)
|
||||
|
||||
if split_at_heading_level is None:
|
||||
return parse_multiple_block_shards(
|
||||
block_tokens, start_line, end_line, enforce_shard=True
|
||||
)[0]
|
||||
|
||||
heading_positions = find_headings_by_level(block_tokens, split_at_heading_level)
|
||||
|
||||
block_tokens_split_by_heading = split_at(block_tokens, heading_positions)
|
||||
|
||||
children = []
|
||||
for i, child_blocks in enumerate(block_tokens_split_by_heading):
|
||||
child_start_line = get_line_number(child_blocks[0])
|
||||
child_end_line = (
|
||||
get_line_number(block_tokens_split_by_heading[i + 1][0]) - 1
|
||||
if i + 1 < len(block_tokens_split_by_heading)
|
||||
else end_line
|
||||
)
|
||||
if child_shard := parse_header_shards(
|
||||
child_blocks,
|
||||
child_start_line,
|
||||
child_end_line,
|
||||
use_first_child_as_header=i > 0 or 0 in heading_positions,
|
||||
):
|
||||
children.append(child_shard)
|
||||
|
||||
if use_first_child_as_header and len(children) > 0:
|
||||
return merge_into_first_shard(children, start_line, end_line)
|
||||
else:
|
||||
return build_shard(start_line, end_line, children=children)
|
||||
|
||||
|
||||
def parse_markdown_file(file_name: str, file_content: str) -> StreamFile:
|
||||
shard = build_shard(1, max([len(file_content.splitlines()), 1]))
|
||||
|
||||
with TagMarkdownRenderer():
|
||||
ast = Document(file_content)
|
||||
|
||||
block_tokens: list[BlockToken] = ast.children # type: ignore
|
||||
if len(block_tokens) > 0:
|
||||
if parsed_shard := parse_header_shards(
|
||||
block_tokens, shard.start_line, shard.end_line
|
||||
):
|
||||
shard = parsed_shard
|
||||
|
||||
return StreamFile(shard=shard, file_name=file_name)
|
||||
|
||||
|
||||
__all__ = ["Shard", "StreamFile", "parse_markdown_file"]
|
||||
|
|
@ -1,35 +0,0 @@
|
|||
from typing import Callable
|
||||
|
||||
from streamer.localize import LocalizedShard
|
||||
|
||||
|
||||
def find_shard(
|
||||
shards: list[LocalizedShard], query_function: Callable[[LocalizedShard], bool]
|
||||
) -> list[LocalizedShard]:
|
||||
found_shards = []
|
||||
|
||||
for shard in shards:
|
||||
if query_function(shard):
|
||||
found_shards.append(shard)
|
||||
found_shards.extend(find_shard(shard.children, query_function))
|
||||
|
||||
return found_shards
|
||||
|
||||
|
||||
def find_shard_by_position(
|
||||
shards: list[LocalizedShard], dimension: str, value: str
|
||||
) -> list[LocalizedShard]:
|
||||
return find_shard(
|
||||
shards,
|
||||
lambda shard: dimension in shard.location
|
||||
and shard.location[dimension] == value,
|
||||
)
|
||||
|
||||
|
||||
def find_shard_by_set_dimension(
|
||||
shards: list[LocalizedShard], dimension: str
|
||||
) -> list[LocalizedShard]:
|
||||
return find_shard(shards, lambda shard: dimension in shard.location)
|
||||
|
||||
|
||||
__all__ = ["find_shard_by_position", "find_shard", "find_shard_by_set_dimension"]
|
||||
|
|
@ -1,33 +0,0 @@
|
|||
import os
|
||||
from pydantic_settings import (
|
||||
BaseSettings,
|
||||
PydanticBaseSettingsSource,
|
||||
SettingsConfigDict,
|
||||
YamlConfigSettingsSource,
|
||||
)
|
||||
from xdg_base_dirs import xdg_config_home
|
||||
|
||||
SETTINGS_FILE = xdg_config_home() / "streamer" / "config.yaml"
|
||||
|
||||
|
||||
class Settings(BaseSettings):
|
||||
model_config = SettingsConfigDict(env_file_encoding="utf-8")
|
||||
|
||||
base_folder: str = os.getcwd()
|
||||
|
||||
@classmethod
|
||||
def settings_customise_sources(
|
||||
cls,
|
||||
settings_cls: type[BaseSettings],
|
||||
init_settings: PydanticBaseSettingsSource,
|
||||
env_settings: PydanticBaseSettingsSource,
|
||||
dotenv_settings: PydanticBaseSettingsSource,
|
||||
file_secret_settings: PydanticBaseSettingsSource,
|
||||
) -> tuple[PydanticBaseSettingsSource, ...]:
|
||||
return (
|
||||
init_settings,
|
||||
YamlConfigSettingsSource(settings_cls, yaml_file=SETTINGS_FILE),
|
||||
dotenv_settings,
|
||||
env_settings,
|
||||
file_secret_settings,
|
||||
)
|
||||
Loading…
Add table
Add a link
Reference in a new issue