feat: extract date & time from tags
Signed-off-by: Konstantin Fickel <mail@konstantinfickel.de>
This commit is contained in:
parent
ee91b2e8db
commit
d5b1541436
6 changed files with 246 additions and 22 deletions
|
|
@ -1,9 +1,9 @@
|
|||
from datetime import datetime
|
||||
import re
|
||||
import os
|
||||
import re
|
||||
from datetime import date, datetime, time
|
||||
|
||||
|
||||
def extract_date_from_file_name(file_name: str) -> datetime | None:
|
||||
def extract_datetime_from_file_name(file_name: str) -> datetime | None:
|
||||
FILE_NAME_REGEX = r"^(?P<date>\d{8})(?:-(?P<time>\d{4,6}))?.+.md$"
|
||||
base_name = os.path.basename(file_name)
|
||||
match = re.match(FILE_NAME_REGEX, base_name)
|
||||
|
|
@ -17,4 +17,76 @@ def extract_date_from_file_name(file_name: str) -> datetime | None:
|
|||
return None
|
||||
|
||||
|
||||
__all__ = ["extract_date_from_file_name"]
|
||||
def extract_datetime_from_marker(marker: str) -> datetime | None:
|
||||
"""
|
||||
Extract a datetime from a marker string in the exact format: YYYYMMDDHHMMSS.
|
||||
|
||||
Returns:
|
||||
Parsed datetime if the format is fulfilled and values are valid, else None.
|
||||
"""
|
||||
if not re.fullmatch(r"\d{14}", marker or ""):
|
||||
return None
|
||||
try:
|
||||
return datetime.strptime(marker, "%Y%m%d%H%M%S")
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def extract_date_from_marker(marker: str) -> date | None:
|
||||
"""
|
||||
Extract a date from a marker string in the exact format: YYYYMMDD.
|
||||
|
||||
Returns:
|
||||
Parsed date if the format is fulfilled and values are valid, else None.
|
||||
"""
|
||||
if not re.fullmatch(r"\d{8}", marker or ""):
|
||||
return None
|
||||
try:
|
||||
return datetime.strptime(marker, "%Y%m%d").date()
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def extract_time_from_marker(marker: str) -> time | None: # noqa: F821
|
||||
"""
|
||||
Extract a time from a marker string in the exact format: HHMMSS.
|
||||
|
||||
Returns:
|
||||
Parsed time if the format is fulfilled and values are valid, else None.
|
||||
"""
|
||||
if not re.fullmatch(r"\d{6}", marker or ""):
|
||||
return None
|
||||
try:
|
||||
return datetime.strptime(marker, "%H%M%S").time()
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def extract_datetime_from_marker_list(markers: list[str], inherited_datetime: datetime):
|
||||
shard_time: time | None = None
|
||||
shard_date: date | None = None
|
||||
|
||||
for marker in markers[::-1]:
|
||||
if parsed_time := extract_time_from_marker(marker):
|
||||
shard_time = parsed_time
|
||||
if parsed_date := extract_date_from_marker(marker):
|
||||
shard_date = parsed_date
|
||||
if parsed_datetime := extract_datetime_from_marker(marker):
|
||||
shard_date = parsed_datetime.date()
|
||||
shard_time = parsed_datetime.time()
|
||||
|
||||
if shard_date and not shard_time:
|
||||
return datetime.combine(shard_date, time(0, 0, 0))
|
||||
|
||||
return datetime.combine(
|
||||
shard_date or inherited_datetime.date(), shard_time or inherited_datetime.time()
|
||||
)
|
||||
|
||||
|
||||
__all__ = [
|
||||
"extract_datetime_from_file_name",
|
||||
"extract_datetime_from_marker",
|
||||
"extract_date_from_marker",
|
||||
"extract_time_from_marker",
|
||||
"extract_datetime_from_marker_list",
|
||||
]
|
||||
|
|
|
|||
|
|
@ -1,19 +1,29 @@
|
|||
from datetime import datetime
|
||||
|
||||
from streamer.parse.shard import Shard, StreamFile
|
||||
|
||||
from .repostory_configuration import RepositoryConfiguration
|
||||
from .extract_datetime import (
|
||||
extract_datetime_from_file_name,
|
||||
extract_datetime_from_marker_list,
|
||||
)
|
||||
from .localized_shard import LocalizedShard
|
||||
from .extract_datetime import extract_date_from_file_name
|
||||
from .repostory_configuration import RepositoryConfiguration
|
||||
|
||||
|
||||
def localize_shard(
|
||||
shard: Shard, config: RepositoryConfiguration, propagated: dict[str, str]
|
||||
shard: Shard,
|
||||
config: RepositoryConfiguration,
|
||||
propagated: dict[str, str],
|
||||
moment: datetime,
|
||||
) -> LocalizedShard:
|
||||
position = {**propagated}
|
||||
private_position: dict[str, str] = {}
|
||||
|
||||
adjusted_moment: datetime = extract_datetime_from_marker_list(shard.markers, moment)
|
||||
|
||||
for marker in shard.markers:
|
||||
normalized_marker = marker.lower()
|
||||
|
||||
if marker_definition := config.markers[normalized_marker]:
|
||||
dimension_name = marker_definition.dimension
|
||||
dimension = config.dimensions[marker_definition.dimension]
|
||||
|
|
@ -23,24 +33,30 @@ def localize_shard(
|
|||
else:
|
||||
private_position[dimension_name] = normalized_marker
|
||||
|
||||
children = [localize_shard(child, config, position) for child in shard.children]
|
||||
children = [
|
||||
localize_shard(child, config, position, adjusted_moment)
|
||||
for child in shard.children
|
||||
]
|
||||
|
||||
position.update(private_position)
|
||||
|
||||
return LocalizedShard(
|
||||
**shard.model_dump(exclude={"children"}), location=position, children=children
|
||||
**shard.model_dump(exclude={"children"}),
|
||||
location=position,
|
||||
children=children,
|
||||
moment=adjusted_moment,
|
||||
)
|
||||
|
||||
|
||||
def localize_stream_file(
|
||||
stream_file: StreamFile, config: RepositoryConfiguration
|
||||
) -> LocalizedShard | None:
|
||||
shard_date = extract_date_from_file_name(stream_file.filename)
|
||||
shard_date = extract_datetime_from_file_name(stream_file.filename)
|
||||
|
||||
if not shard_date or not stream_file.shard:
|
||||
raise ValueError("Could not extract date")
|
||||
|
||||
return localize_shard(stream_file.shard, config, {"moment": shard_date.isoformat()})
|
||||
return localize_shard(stream_file.shard, config, {}, shard_date)
|
||||
|
||||
|
||||
__all__ = ["localize_stream_file"]
|
||||
|
|
|
|||
|
|
@ -1,8 +1,12 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from streamer.parse.shard import Shard
|
||||
|
||||
|
||||
class LocalizedShard(Shard):
|
||||
moment: datetime
|
||||
location: dict[str, str]
|
||||
children: list[LocalizedShard] = [] # pyright: ignore[reportIncompatibleVariableOverride]
|
||||
|
||||
|
|
|
|||
|
|
@ -1,3 +1,5 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from streamer.parse.shard import Shard, StreamFile
|
||||
|
||||
|
||||
|
|
@ -7,7 +9,7 @@ class ShardWithMarkdown(Shard):
|
|||
|
||||
|
||||
class StreamFileWithMarkdown(StreamFile):
|
||||
shard: ShardWithMarkdown | None = None # pyright: ignore[reportIncompatibleVariableOverride]
|
||||
shard: ShardWithMarkdown | None = None
|
||||
|
||||
|
||||
def attach_markdown_shard(shard: Shard, markdown_text: str) -> ShardWithMarkdown:
|
||||
|
|
@ -15,7 +17,9 @@ def attach_markdown_shard(shard: Shard, markdown_text: str) -> ShardWithMarkdown
|
|||
markdown_content = "\n".join(lines[shard.start_line - 1 : shard.end_line])
|
||||
return ShardWithMarkdown(
|
||||
**shard.model_dump(exclude=["children"]),
|
||||
children=map(lambda child: attach_markdown_shard(child, markdown_text), shard.children),
|
||||
children=[
|
||||
attach_markdown_shard(child, markdown_text) for child in shard.children
|
||||
],
|
||||
markdown_content=markdown_content,
|
||||
)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue