Source code for simdb.validation.validator

import re
from pathlib import Path
from typing import Any, Dict, List, Optional, cast

import cerberus
import numpy as np
import yaml

from simdb.config import Config, ConfigError
from simdb.database.models.simulation import Simulation

ValidatorBase = cast(Any, cerberus.Validator)


[docs] class TestParameters: pass
[docs] class LoadError(Exception): pass
[docs] class ValidationError(Exception): pass
[docs] class CustomValidator(ValidatorBase): types_mapping = cast(Any, cerberus.Validator).types_mapping.copy() types_mapping["numpy"] = cerberus.TypeDefinition("numpy", (np.ndarray,), ()) def _validate_exists(self, check_exists, field, value): """The rule's arguments are validated against this schema: {'type': ['string'], 'check_with': 'type'}""" if check_exists and not Path(value).exists(): self._error(field, "File must exist") def _validate_min_value(self, min_value, field, value): """The rule's arguments are validated against this schema: {'type': 'float'} """ if not isinstance(value, np.ndarray): value = value[~np.isnan(value)] if value.size == 0: self._error(field, "Values in numpy array are NaN or empty") self._error(field, "Value is not a numpy array") if min_value is not None and value.min() < min_value: self._error(field, f"Minimum {value.min()} less than {min_value}") def _validate_max_value(self, max_value, field, value): """The rule's arguments are validated against this schema: {'type': 'float'} """ if not isinstance(value, np.ndarray): value = value[~np.isnan(value)] if value.size == 0: self._error(field, "Values in numpy array are NaN or empty") self._error(field, "Value is not a numpy array") if max_value is not None and value.max() > max_value: self._error(field, f"Maximum {value.max()} greater than {max_value}") def _compare(self, comparison, field, value, comparator: str, message: str): if comparison is None: return if isinstance(value, np.ndarray): value = value[~np.isnan(value)] if value.size == 0: self._error(field, "Values in numpy array are NaN or empty") if not getattr(value, comparator)(comparison).all(): self._error(field, f"Values are not {message} {comparison}") elif isinstance(value, float): if not getattr(value, comparator)(comparison): self._error(field, f"Value is not {message} {comparison}") else: self._error(field, "Value is not a numpy array or a float") def _validate_gt(self, comparison, field, value): """The rule's arguments are validated against this schema: {'type': 'float'} """ self._compare(comparison, field, value, "__gt__", "greater than") def _validate_ge(self, comparison, field, value): """The rule's arguments are validated against this schema: {'type': 'float'} """ self._compare(comparison, field, value, "__ge__", "greater than or equal to") def _validate_lt(self, comparison, field, value): """The rule's arguments are validated against this schema: {'type': 'float'} """ self._compare(comparison, field, value, "__lt__", "less than") def _validate_le(self, comparison, field, value): """The rule's arguments are validated against this schema: {'type': 'float'} """ self._compare(comparison, field, value, "__le__", "less than or equal to") @classmethod def _normalize_coerce_int(cls, value): return int(value) @classmethod def _normalize_coerce_float(cls, value): return float(value) @classmethod def _normalize_coerce_numpy(cls, value): if isinstance(value, np.ndarray): return value elif isinstance(value, str): return np.fromstring(value[1:-1], sep=" ") else: return np.array(value)
def _load_schema(path: Path): if not path.exists(): return [{}] # load schema from file with path.open() as file: try: schema = yaml.load(file, Loader=yaml.SafeLoader) return schema except yaml.YAMLError as err: raise LoadError( f"Failed to read validation schema from file {file}" ) from err
[docs] class Validator: _validator: CustomValidator _section_re = re.compile(r"\S+ \"(\S+)=(\S+)\"")
[docs] @classmethod def validation_schemas( cls, config: Config, simulation: Optional[Simulation], path=None ) -> List[Dict]: root = Path( str( config.get_option( "validation.path", default=str(config.config_directory) ) ) ) paths = [] if path: paths.append(path) else: paths.append(root / "validation-schema.yaml") # Look for config sections like [validation "key=value"] and see if the # simulationhas metadata matching the given test. If matching, adding the # "path" in this section to the paths. if simulation is not None: sections = [ sec for sec in config.sections() if sec.startswith("validation") ] for section in sections: if section == "validation": continue match = cls._section_re.match(section) if match: key = match.group(1) value = match.group(2) for meta in simulation.find_meta(key): if meta.value == value: path = config.get_section(section).get("path", "") if path: paths.append(path) elif section != "validation": raise ConfigError(f"Invalid validation section {section}") schemas = [] for path in paths: schemas.append(_load_schema(path)) return schemas
def __init__(self, schema: Dict): try: self._validator = CustomValidator(schema) self._validator.allow_unknown = True except cerberus.SchemaError as err: raise LoadError("Failed to parse validation schema") from err
[docs] def validate(self, sim: Simulation) -> None: # convert sim to dictionary data = sim.meta_dict() # validate using cerberus if not self._validator.validate(data): raise ValidationError(self._validator.errors)