from datetime import datetime from streamer.parse.shard import Shard, StreamFile from .extract_datetime import ( extract_datetime_from_file_name, extract_datetime_from_marker_list, ) from .localized_shard import LocalizedShard from .repository_configuration import RepositoryConfiguration def localize_shard( shard: Shard, config: RepositoryConfiguration, propagated: dict[str, str], moment: datetime, ) -> LocalizedShard: position = {**propagated} private_position: dict[str, str] = {} adjusted_moment: datetime = extract_datetime_from_marker_list(shard.markers, moment) for marker in shard.markers: if marker in config.markers: marker_definition = config.markers[marker] for placement in marker_definition.placements: if placement.if_with <= set(shard.markers): dimension = config.dimensions[placement.dimension] value = placement.value or marker if placement.overwrites or ( placement.dimension not in position and placement.dimension not in private_position ): if dimension.propagate: position[placement.dimension] = value else: private_position[placement.dimension] = value children = [ localize_shard(child, config, position, adjusted_moment) for child in shard.children ] position.update(private_position) return LocalizedShard( **shard.model_dump(exclude={"children"}), location=position, children=children, moment=adjusted_moment, ) def localize_stream_file( stream_file: StreamFile, config: RepositoryConfiguration ) -> LocalizedShard | None: shard_date = extract_datetime_from_file_name(stream_file.file_name) if not shard_date or not stream_file.shard: raise ValueError("Could not extract date") return localize_shard( stream_file.shard, config, {"file": stream_file.file_name}, shard_date ) __all__ = ["localize_stream_file"]