feat: add initial support for positioning

Signed-off-by: Konstantin Fickel <mail@konstantinfickel.de>
This commit is contained in:
Konstantin Fickel 2025-06-22 18:02:42 +02:00
parent 28dc40ebf0
commit 0c61067db0
Signed by: kfickel
GPG key ID: A793722F9933C1A5
9 changed files with 186 additions and 3 deletions

View file

@ -59,7 +59,7 @@ def new() -> None:
parsed_content = parse_markdown_file(prelimary_path, content)
final_file_name = f"{timestamp}.md"
if parsed_content.shard and len(markers := parsed_content.shard.markers):
if len(markers := parsed_content.shard.markers):
final_file_name = f"{timestamp} {' '.join(markers)}.md"
final_path = os.path.join(streamer_directory, final_file_name)

View file

@ -0,0 +1,11 @@
from .localize import localize_stream_file
from .repostory_configuration import RepositoryConfiguration
from .localized_shard import LocalizedShard
__all__ = [
"Dimension",
"Marker",
"RepositoryConfiguration",
"localize_stream_file",
"LocalizedShard",
]

View file

@ -0,0 +1,21 @@
from datetime import datetime
import re
import os
from typing import Optional
def extract_date_from_file_name(file_name: str) -> Optional[datetime]:
FILE_NAME_REGEX = r"^(?P<date>\d{8})(?:-(?P<time>\d{4,6}))?.+.md$"
base_name = os.path.basename(file_name)
match = re.match(FILE_NAME_REGEX, base_name)
if match:
date_str = match.group("date")
time_str = match.group("time") or ""
time_str = time_str.ljust(6, "0")
datetime_str = f"{date_str} {time_str[:2]}:{time_str[2:4]}:{time_str[4:]}"
return datetime.strptime(datetime_str, "%Y%m%d %H:%M:%S")
return None
__all__ = ["extract_date_from_file_name"]

View file

@ -0,0 +1,43 @@
from typing import Optional
from streamer.parse.shard import Shard, StreamFile
from .repostory_configuration import RepositoryConfiguration
from .localized_shard import LocalizedShard
from .extract_datetime import extract_date_from_file_name
def localize_shard(
shard: Shard, config: RepositoryConfiguration, propagated: dict[str, str]
) -> LocalizedShard:
position = {**propagated}
private_position = {}
for marker in shard.markers:
normalized_marker = marker.lower()
if marker_definition := config.markers[normalized_marker]:
dimension_name = marker_definition.dimension
dimension = config.dimensions[marker_definition.dimension]
if dimension.propagate:
position[dimension_name] = normalized_marker
else:
private_position[dimension_name] = normalized_marker
children = [localize_shard(child, config, position) for child in shard.children]
(position.update(private_position),)
return LocalizedShard(
**shard.model_dump(exclude={"children"}), location=position, children=children
)
def localize_stream_file(
stream_file: StreamFile, config: RepositoryConfiguration
) -> Optional[LocalizedShard]:
shard_date = extract_date_from_file_name(stream_file.filename)
return localize_shard(stream_file.shard, config, {"moment": shard_date.isoformat()})
__all__ = ["localize_stream_file"]

View file

@ -0,0 +1,10 @@
from __future__ import annotations
from streamer.parse.shard import Shard
class LocalizedShard(Shard):
location: dict[str, str]
children: list[LocalizedShard]
__all__ = ["LocalizedShard"]

View file

@ -0,0 +1,21 @@
from typing import Optional
from pydantic import BaseModel
class Dimension(BaseModel):
display_name: str
comment: Optional[str] = None
propagate: bool = False
class Marker(BaseModel):
display_name: str
dimension: str
class RepositoryConfiguration(BaseModel):
dimensions: dict[str, Dimension]
markers: dict[str, Marker]
__all__ = ["Dimension", "Marker", "RepositoryConfiguration"]

View file

@ -1,5 +1,4 @@
from __future__ import annotations
from typing import Optional
from pydantic import BaseModel
@ -13,7 +12,7 @@ class Shard(BaseModel):
class StreamFile(BaseModel):
filename: str
shard: Optional[Shard] = None
shard: Shard = None
__all__ = ["Shard", "StreamFile"]