import os
import re
import urllib.parse
from enum import Enum, auto
from pathlib import Path
from typing import Dict, Iterable, List, Optional, TextIO, Tuple, Type, Union
import numpy as np
import yaml
from simdb.uri import URI
[docs]
class InvalidManifest(Exception):
"""Exception to throw when a manifest fails to validate."""
pass
[docs]
class InvalidAlias(InvalidManifest):
"""Exception to throw when the alias specified in the manifest is invalid."""
pass
def _expand_path(path: Path, base_path: Path) -> Path:
os.environ["MANIFEST_DIR"] = str(base_path)
path = Path(os.path.expandvars(str(path))).expanduser()
path = Path(str(path).replace("//", "/"))
if not path.is_absolute():
if not base_path.is_absolute():
raise ValueError("base_path must be absolute")
return base_path / path
else:
# Expand any /./ and /../ in absolute path
path = path.resolve()
return path
def _to_uri(uri_str: str, base_path: Path) -> Tuple["DataObject.Type", "URI"]:
uri = URI(uri_str)
if uri.authority:
raise InvalidManifest(f"invalid uri: {uri_str} - path must be absolute")
if uri.scheme is None:
raise InvalidManifest(f"invalid uri: {uri_str} - no scheme provided")
if uri.scheme == "file":
if uri.path is None:
raise InvalidManifest(f"invalid uri: {uri_str} - no path provided")
uri = URI(uri, path=_expand_path(uri.path, base_path))
return DataObject.Type.FILE, uri
if uri.scheme == "imas":
if "path" not in uri.query and not all(
("shot" in uri.query, "run" in uri.query, "database" in uri.query)
):
raise InvalidManifest(
f"invalid uri: {uri_str} - no path or (shot, run, database) provided "
"in IMAS uri"
)
return DataObject.Type.IMAS, uri
if uri.scheme == "simdb":
return DataObject.Type.UUID, uri
raise InvalidManifest(f"invalid uri: {uri_str}")
[docs]
class DataObject:
"""
Simulation data object, either a file, an IDS or an already registered object
identifiable by the UUID.
PATH: file:///<PATH>
IMAS: imas:<BACKEND>?path=<PATH>
"""
[docs]
class Type(Enum):
UNKNOWN = auto()
UUID = auto()
FILE = auto()
IMAS = auto()
type: Type = Type.UNKNOWN
uri: Union[URI, None] = None
def __init__(self, base_path: Path, uri: str) -> None:
(self.type, self.uri) = _to_uri(uri, base_path)
if self.type == DataObject.Type.UNKNOWN or not self.uri:
raise InvalidManifest("invalid input")
@property
def name(self) -> str:
return str(self.uri)
[docs]
class Source(DataObject):
"""
Simulation data inputs.
"""
pass
[docs]
class Sink(DataObject):
"""
Simulation data outputs.
"""
pass
[docs]
class ManifestValidator:
"""
Base class for validation of manifests.
"""
version: int
def __init__(self, version: int):
self.version = version
[docs]
def validate(self, values: Union[List, Dict]) -> None:
pass
[docs]
class ListValuesValidator(ManifestValidator):
"""
Class for the validation of list items in the manifest.
"""
def __init__(
self,
version: int,
section_name: Optional[str] = None,
expected_keys: Optional[Iterable] = None,
required_keys: Optional[Iterable] = None,
) -> None:
self.section_name: Optional[str] = section_name
self.expected_keys: Optional[Iterable] = expected_keys
self.required_keys: Optional[Iterable] = required_keys
super().__init__(version)
[docs]
def validate(self, values: Union[list, dict]) -> None:
if values is None:
return
if isinstance(values, dict):
raise InvalidManifest(
f"badly formatted manifest - {self.section_name} should be provided as "
"a list"
)
for item in values:
if not isinstance(item, dict) or len(item) > 1:
raise InvalidManifest(
f"badly formatted manifest - {self.section_name} values should be "
"a name value pair"
)
name = next(iter(item))
if isinstance(self.required_keys, tuple) and name not in self.required_keys:
raise InvalidManifest(
f"required {self.section_name} key not found in manifest: {name}"
)
[docs]
class DictValuesValidator(ManifestValidator):
"""
Class for the validation of dictionary items in the manifest.
"""
def __init__(
self,
version: int,
section_name: Optional[str] = None,
expected_keys: Optional[Iterable] = None,
required_keys: Optional[Iterable] = None,
) -> None:
self.section_name: Optional[str] = section_name
self.expected_keys: Optional[Iterable] = expected_keys
self.required_keys: Optional[Iterable] = required_keys
super().__init__(version)
[docs]
def validate(self, values: Union[list, dict]) -> None:
if isinstance(values, list):
raise InvalidManifest(
f"badly formatted manifest - {self.section_name} should be provided as "
"a dict"
)
if self.expected_keys is not None:
for key in values:
if key not in self.expected_keys:
if re.match(r"code[0-9]+", key):
for code_key in values[key]:
if code_key not in ("name", "repo", "commit"):
raise InvalidManifest(
f"unknown {self.section_name}.{key} key in"
f"manifest: {code_key}"
)
else:
raise InvalidManifest(
f"unknown {self.section_name} key in manifest: {key}"
)
if self.required_keys is not None:
for key in self.required_keys:
if isinstance(self.expected_keys, list) and key not in values:
raise InvalidManifest(
f"required {self.section_name} key not found in manifest: {key}"
)
[docs]
class DataObjectValidator(ListValuesValidator):
"""
Validator for the manifest data objects (inputs or outputs).
"""
def __init__(self, version: int, section_name: str) -> None:
if version == 0:
expected_keys = ("uuid", "path", "imas")
elif version > 0:
expected_keys = ("uri",)
else:
raise KeyError("Invalid version.")
super().__init__(version, section_name, expected_keys)
[docs]
def validate(self, values: Union[list, dict]) -> None:
super().validate(values)
if values is None:
return
seen_uris = set()
for value in values:
if self.version > 0:
uri = URI(value["uri"])
if uri.scheme not in ("file", "imas"):
raise InvalidManifest(f"unknown uri scheme: {uri.scheme}")
if str(uri) in seen_uris:
raise InvalidManifest(
f"Duplicate URI found in {self.section_name}: {uri}"
)
seen_uris.add(str(uri))
[docs]
class OutputsValidator(DataObjectValidator):
"""
Validator for the manifest outputs list.
"""
def __init__(self, version):
super().__init__(version, "outputs")
[docs]
class VersionValidator(ManifestValidator):
"""
Validator for manifest version.
"""
def __init__(self, version: int):
super().__init__(version)
[docs]
def validate(self, values: Union[List, Dict]) -> None:
if not isinstance(values, int):
raise InvalidManifest("version must be an integer")
[docs]
class AliasValidator(ManifestValidator):
"""
Validator for simulation alias.
"""
def __init__(self, version: int):
super().__init__(version)
[docs]
def validate(self, values: Union[List, Dict]) -> None:
if not isinstance(values, str):
raise InvalidManifest("alias must be a string")
if urllib.parse.quote(values) != values:
raise InvalidAlias(f"illegal characters in alias: {values}")
[docs]
class DescriptionValidator(ManifestValidator):
"""
Validator for simulation description.
"""
pass
[docs]
class ResponsibleValidator(ManifestValidator):
"""
Validator for simulation Responsible.
"""
pass
[docs]
def ndarray_constructor(
loader: yaml.SafeLoader, node: yaml.nodes.MappingNode
) -> np.ndarray:
mapping = loader.construct_mapping(node, deep=True)
return np.array(mapping["data"], mapping.get("dtype", None))
[docs]
def get_loader() -> Type[yaml.SafeLoader]:
loader = yaml.SafeLoader
loader.add_constructor("!ndarray", ndarray_constructor)
return loader
[docs]
class WorkflowValidator(DictValuesValidator):
"""
Validator for the manifest workflow dictionary.
"""
def __init__(self, version: int) -> None:
section_name = "workflow"
if version == 0:
expected_keys = ("name", "git", "repo", "commit", "codes")
required_keys = ("name", "commit", "codes")
elif version == 1:
expected_keys = (
"name",
"developer",
"date",
"repo",
"commit",
"codes",
"branch",
)
required_keys = ("name", "repo", "commit", "branch")
else:
raise KeyError("Invalid version.")
super().__init__(version, section_name, expected_keys, required_keys)
def _update_dict(old: Dict, new: Dict) -> None:
for k, v in new.items():
if k in old:
if isinstance(old[k], list):
old[k].append(v)
else:
old[k] = [old[k], v]
else:
old[k] = v
[docs]
class Manifest:
"""
Class to handle reading, writing & validation of simulation manifest files.
"""
def __init__(self) -> None:
self._data: Union[Dict, List, None] = None
self._path: Path = Path()
self._metadata: Dict = {}
@property
def metadata(self) -> Dict:
return self._metadata
[docs]
@classmethod
def from_template(cls) -> "Manifest":
"""
Create an empty manifest from a template file.
:return: A new manifest object.
"""
manifest = cls()
dir_path = Path(__file__).resolve().parent
manifest.load(dir_path / "template.yaml")
return manifest
@property
def inputs(self) -> Iterable[Source]:
sources = []
base_path = self._path.absolute().parent
if (
isinstance(self._data, dict)
and "inputs" in self._data
and self._data["inputs"]
):
for i in self._data["inputs"]:
source = Source(base_path, i["uri"])
if source.type == DataObject.Type.FILE:
if source.uri and source.uri.path:
source_path = Path(source.uri.path)
names = [
str(p) for p in source_path.parent.glob(source_path.name)
]
if not names:
raise InvalidManifest(
f"No files found matching path {source.uri.path}"
)
for name in names:
sources.append(Source(base_path, "file://" + name))
else:
sources.append(source)
return sources
@property
def outputs(self) -> Iterable[Sink]:
sinks = []
base_path = self._path.absolute().parent
if isinstance(self._data, dict) and self._data["outputs"]:
for i in self._data["outputs"]:
sink = Sink(base_path, i["uri"])
if sink.type == DataObject.Type.FILE:
if sink.uri and sink.uri.path:
sink_path = Path(sink.uri.path)
names = [str(p) for p in sink_path.parent.glob(sink_path.name)]
for name in names:
sinks.append(Sink(base_path, "file://" + name))
else:
sinks.append(sink)
return sinks
@property
def alias(self) -> Optional[str]:
if isinstance(self._data, dict):
return self._data.get("alias", None)
return None
@property
def responsible_name(self) -> Optional[str]:
if isinstance(self._data, dict):
return self._data.get("responsible_name", None)
return None
@property
def version(self) -> int:
if isinstance(self._data, dict):
return self._data.get("version", 2)
return 0
@property
def manifest_version(self) -> int:
if isinstance(self._data, dict):
return self._data.get("manifest_version", 2)
return 0
def _load_metadata(self, root_path: Path, path: Path):
try:
if not path.is_absolute():
root_dir = root_path.absolute().parent
path = root_dir / path
with path.open() as metadata_file:
_update_dict(
self._metadata, yaml.load(metadata_file, Loader=get_loader())
)
except yaml.YAMLError as err:
raise InvalidManifest(f"failed to read metadata file {path}") from err
def _convert_version(self):
if isinstance(self._data, dict) and self.version == 0:
self._convert_metadata()
self._data["inputs"] = self._convert_files(self._data["inputs"])
self._data["outputs"] = self._convert_files(self._data["outputs"])
self._data["version"] = 1
def _convert_metadata(self) -> None:
if isinstance(self._data, dict):
for item in ("description", "workflow"):
if item in self._data:
self._metadata[item] = self._data[item]
del self._data[item]
for key, value in self._metadata.items():
if key == "workflow":
if "git" in value:
value["repo"] = value["git"]
del value["git"]
if "codes" in value:
codes = value["codes"]
new_codes = []
for code in codes:
for _, v in code.items():
new_codes.append(v)
value["codes"] = new_codes
@classmethod
def _convert_files(cls, files: List[Dict[str, str]]) -> List[Dict[str, "URI"]]:
scheme_map = {
"uuid": "simdb",
"path": "file",
"imas": "imas",
}
new_files = []
for file in files:
for k, v in file.items():
new_files.append({"uri": URI(scheme=scheme_map[k], path=v)})
return new_files
[docs]
def load(self, file_path: Path) -> None:
"""
Load a manifest from the given file.
:param file_path: Path to the file read.
:return: None
"""
self._path: Path = file_path
with file_path.open() as file:
try:
self._data = yaml.load(file, Loader=get_loader())
except yaml.YAMLError as err:
raise InvalidManifest("badly formatted manifest") from err
if isinstance(self._data, dict) and "metadata" in self._data:
self._data["metadata"] or []
self._metadata["metadata"] = self._data["metadata"]
[docs]
def save(self, out_file: TextIO) -> None:
"""
Save the manifest to the given file.
:param out_file: The output text stream to write the manifest to.
:return: None
"""
yaml.dump(self._data, out_file, default_flow_style=False)
[docs]
def validate(self) -> None:
"""
Validate the manifest object.
:return: None
"""
if self._data is None:
raise InvalidManifest("failed to read manifest")
if isinstance(self._data, list):
raise InvalidManifest(
"badly formatted manifest - top level sections must be keys not a list"
)
if "manifest_version" not in self._data:
print("warning: no version given in manifest, assuming version 2.")
version = self.version
if version == 2:
section_validators = {
"manifest_version": VersionValidator(version),
"alias": AliasValidator(version),
"inputs": InputsValidator(version),
"outputs": OutputsValidator(version),
"metadata": MetaDataValidator(version),
"responsible_name": ResponsibleValidator(version),
}
else:
raise InvalidManifest(f"Unknown manifest version {version}.")
for section in self._data:
if section not in section_validators:
raise InvalidManifest(f"Unknown manifest section found {section}.")
required_sections = ("manifest_version", "outputs", "inputs")
for section in required_sections:
if section not in self._data:
raise InvalidManifest(
f"Required manifest section '{section}' not found."
)
for name, values in self._data.items():
section_validators[name].validate(values)
self._convert_version()