# !/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Filename: stub_new.py
# Project: generate
# Author: Brian Cherinka
# Created: Tuesday, 23rd February 2021 10:41:09 am
# License: BSD 3-clause "New" or "Revised" License
# Copyright (c) 2021 Brian Cherinka
# Last Modified: Tuesday, 23rd February 2021 10:41:09 am
# Modified By: Brian Cherinka
from __future__ import print_function, division, absolute_import
import abc
import yaml
import os
from jinja2 import Environment, PackageLoader, TemplateNotFound
from typing import Iterator
from pydantic import ValidationError
from ..gitio import Git
from .changelog import yamldiff_selector
from ..models.releases import releases as sdss_releases
from ..models.yaml import YamlModel
from datamodel.generate.filetypes import file_selector, get_filetype, get_filesize, literal
from datamodel import log
[docs]
class BaseStub(abc.ABC):
format = None
cacheable = False
has_template = True
def __init__(self, datamodel = None, use_cache_release: str = None, full_cache: bool = None,
verbose: bool = None, force: bool = None):
self.environment = None
self.template = None
self.output = None
self.datamodel = datamodel
self.verbose = verbose
# cache control attrs
self.use_cache_release = use_cache_release
self.full_cache = full_cache
self.force = force
# content attrs
self._template_input = None
self._cache = None
self.content = None
self._validated_yaml = None
# set up the Jinja 2 template + environment, and the output stub file path
self._set_template()
if self.datamodel:
self._set_output()
# setup a git object
self.git = Git(verbose=self.verbose)
def __repr__(self) -> str:
if self.datamodel:
return (f'<Stub(format="{self.format}", file_species="{self.datamodel.file_species}", '
f'release="{self.datamodel.release}")>')
else:
return f'<Stub(format="{self.format}")>'
[docs]
@classmethod
def from_datamodel(cls, datamodel):
return cls(datamodel=datamodel)
[docs]
def add_datamodel(self, datamodel):
self.datamodel = datamodel
self._set_output()
def _set_template(self) -> None:
""" Set the jina2 environment including filters for content. """
if not self.has_template:
return
loader = PackageLoader("datamodel", "templates")
self.environment = Environment(loader=loader, trim_blocks=True, lstrip_blocks=True)
self.template = self.environment.get_template(f'stub.{self.format}')
def _set_datamodel_dir(self) -> None:
""" Set the DATAMODEL_DIR from the environment """
self.datamodel_dir = os.getenv("DATAMODEL_DIR", None)
if not self.datamodel_dir:
raise ValueError("No DATAMODEL_DIR found. Please set a proper environment variable.")
elif not os.path.exists(self.datamodel_dir):
raise IOError(f"No datamodel directory found at {self.datamodel_dir}")
def _set_output(self) -> None:
""" set the yaml file output directory """
if not self.datamodel:
raise AttributeError('Cannot set an output directory without a valid datamodel')
# create the output directory
self._set_datamodel_dir()
data_dir = os.path.join(self.datamodel_dir, "datamodel")
products_dir = os.path.join(data_dir, "products")
directory = os.path.join(products_dir, self.format)
if not os.path.exists(directory):
os.makedirs(directory)
# set the output file path
self.output = os.path.join(directory, f'{self.datamodel.file_species}.{self.format}')
[docs]
def remove_output(self) -> None:
""" Delete the yaml file on disk """
if self.output and os.path.exists(self.output):
os.remove(self.output)
[docs]
def render_content(self, force: bool = None, force_release: str = None) -> None:
""" Populate the yaml template with generated content """
if not self._cache or force:
self._get_cache(force=force, force_release=force_release)
if self.format != 'yaml' and not self.validate_cache():
log.info('yaml cache is not validated!')
return
self._get_content()
@abc.abstractmethod
def _get_content(self):
pass
[docs]
def write(self, force: bool = None, use_cache_release: str = None,
full_cache: bool = None, **kwargs) -> None:
self.use_cache_release = use_cache_release
self.full_cache = full_cache
self.force = force
if not self.output:
raise AttributeError('No output filepath set')
# always re-render the content
force_release = kwargs.get("force_release", None)
self.render_content(force=force, force_release=force_release)
if not self.content:
log.info('No cache content to write out!')
return
with open(self.output, 'w') as f:
f.write(self.content)
[docs]
def update_cache(self, force: bool = None) -> None:
""" Update the in-memory stub cache from the on-disk file """
self._get_cache(force=force)
def _prepare_input(self) -> dict:
""" prepare the initial template input """
template_input = {}
if not self.datamodel:
raise AttributeError('Cannot prepare template input without a valid datamodel')
# check if datamodel file is real when not in design phase
if not self.datamodel.design and not os.path.exists(self.datamodel.file):
raise IOError(f'File {self.datamodel.file} does not exist. Cannot prepare input.')
# create input dictionary for the template
template_input = {
"file_species": self.datamodel.file_species,
'file_template': self.datamodel.template,
"environments": [self.datamodel.env_label],
"surveys": [self.datamodel.survey],
"releases": [self.datamodel.release],
"example": [self.datamodel.real_location],
"location": [self.datamodel.location],
"access": self._get_access_cache(),
"design": self.datamodel.design,
"data_level": self.datamodel.data_level
}
# add additional real info if not in the design phase
if not self.datamodel.design:
template_input.update({
"path": self.datamodel.file,
"filename": os.path.basename(self.datamodel.file),
"filesize": get_filesize(self.datamodel.file),
"filetype": get_filetype(self.datamodel.file)})
return template_input
def _get_access_cache(self) -> dict:
return {"in_sdss_access": self.datamodel.in_sdss_access,
"path_name": self.datamodel.access[self.datamodel.release]['path_name'],
"path_template": self.datamodel.access[self.datamodel.release]['path_template'],
"path_kwargs": self.datamodel.access[self.datamodel.release]['path_kwargs'],
"access_string": self.datamodel.access[self.datamodel.release]['access']}
def _create_cache(self) -> dict:
if not self.cacheable or not self.has_template:
raise ValueError(f'Cannot create a new cache. The {self.format} stub is not '
'cacheable or does not have a valid template to create a cache. '
'Please create a cacheable yaml file first.')
# prepare initial cache input
input = self._prepare_input()
return yaml.load(self.template.render(input), Loader=yaml.FullLoader)
def _read_cache(self, path: str) -> dict:
""" read the raw yaml cache file"""
with open(path) as file:
content = yaml.load(file, Loader=yaml.FullLoader)
return self._check_release_in_cache(content)
def _get_cache(self, force: bool = None, force_release: str = None) -> None:
# check force and "access/md" format
if force and self.format in ('access', 'md'):
force = None
# only cache-able format is yaml - load that content
cached_file = self.output.replace(self.format, 'yaml')
if os.path.exists(cached_file) and (not force or (force and force_release)):
# read existing cache
content = self._read_cache(cached_file)
else:
# create a brand new cache
content = self._create_cache()
# select the correct file object
suffix = content['general']['datatype'] or get_filetype(self.datamodel.location)
file_class = file_selector(suffix)
# raise error if no class found
if not file_class:
raise ValueError(f'No supported file class found for {suffix}.')
# update any design entry
content['general']['design'] = self.datamodel.design
# update the VAC field
content['general']['vac'] = self.datamodel.vac
# update the RSP field
content['general']['recommended_science_product'] = self.datamodel.recommended_science_product
# check the content dictionary has a proper release
if self.datamodel.release not in content['releases']:
content['releases'][self.datamodel.release] = {"template": None, "example": None, "location": None,
"environment": None, "access": {}, "survey": None,
file_class.cache_key: {}}
# set the cache content
self._cache = content
# instantiate the file object
self.selected_file = file_class(self._cache, datamodel=self.datamodel, stub=self)
# if release is the same, copy over entire cache
if self.use_cache_release and self.full_cache:
self.selected_file._use_full_cache()
self._update_cache_changelog()
return
# set the cache with access info
self._update_cache_access()
# check the filetype and generate proper YAML content
self.selected_file._set_cache(force=force)
# update the data level field
if 'data_level' not in content['general']:
#content['general']['data_level'] = "replace me - with the data level of the product, as defined here https://sdss-wiki.atlassian.net/wiki/x/IwDCGw"
content['general']['data_level'] = "0.0.0"
content['general']['data_level'] = self.datamodel.data_level or content['general']['data_level']
# update the cache changelog
self._update_cache_changelog()
# literal-ize any cache notes (see notes on literal in filetypes/par.py)
if 'notes' not in self._cache:
self._cache['notes'] = None
self._cache['notes'] = literal(self._cache['notes'])
# literal-ize any cache regrets (see notes on literal in filetypes/par.py)
if 'regrets' not in self._cache:
self._cache['regrets'] = "I have no regrets!"
self._cache['regrets'] = literal(self._cache['regrets'])
def _check_release_in_cache(self, content: dict) -> dict:
""" updates the yaml.general.releases list with new releases """
# sort the sdss release list
sdss_releases.sort('release_date')
# load and updates the yaml release list
releases = content['general']['releases']
if self.datamodel.release not in releases:
releases.append(self.datamodel.release)
# sort by the sdss release date; work release always is latest
releases.sort(key=lambda x: sdss_releases.list_names().index(x))
content['general']['releases'] = releases
return content
def _update_cache_access(self) -> None:
""" update the cache with access info """
# always updates the cache with latest datamodel
# update the access dictionary in the cache
self._cache['releases'][self.datamodel.release]['access'] = self._get_access_cache()
# update the template/location, environment keywords in the cache
self._cache['releases'][self.datamodel.release]['template'] = self.datamodel.template
self._cache['releases'][self.datamodel.release]['environment'] = self.datamodel.env_label
self._cache['releases'][self.datamodel.release]['survey'] = self.datamodel.survey
# update the general environments sections in cache
self._update_general_section('environments', self.datamodel.env_label)
# update the general surveys sections in cache
self._update_general_section('surveys', self.datamodel.survey)
# update the location/example keyword in the cache
self._cache['releases'][self.datamodel.release]['location'] = self.datamodel.location
self._cache['releases'][self.datamodel.release]['example'] = self.datamodel.real_location
def _update_general_section(self, key: str, value: str) -> None:
""" Updates or adds a value into a new general section """
section = self._cache['general'].get(key, [])
if not section:
log.warning(f'No {key} section found in cache. Adding new section.')
self._cache['general'][key] = section
# update the general sections in the cache
if value not in self._cache['general'][key]:
if isinstance(section, list):
self._cache['general'][key].append(value)
else:
self._cache['general'][key] = value
def _update_cache_changelog(self):
""" Update the changelog in the cache """
# get the correct yamldiff class
suffix = self._cache['general']['datatype'] or get_filetype(self.datamodel.location)
yd_class = yamldiff_selector(suffix)
# return if no class present
if not yd_class:
return
# instantiate compute the changelog and update the cache
yaml_diff = yd_class(self._cache)
release_order = reversed(self._cache['general']['releases'])
changelog = yaml_diff.generate_changelog(release_order, simple=True)
self._cache['changelog']['releases'] = changelog
[docs]
def validate_cache(self):
""" Validate the yaml cache """
if not self._cache:
log.info("No yaml cache to validate!")
return False
# validate the yaml cache
try:
self._validated_yaml = YamlModel.model_validate(self._cache)
except ValidationError as err:
log.error(err)
return False
else:
return True
[docs]
def commit_to_git(self) -> None:
""" Commit the stub to Github """
# create new branch if needed
if self.git.current_branch == 'main':
branch = f'dmgen-fs-{self.datamodel.file_species}'
log.info(f'Creating new working branch {branch}.')
self.git.create_new_branch(branch)
# add and commit the file
self.git.add(path=self.output)
self.git.commit(message=f"committing {self.datamodel.file_species}.{self.format}")
[docs]
def push_to_git(self) -> None:
""" Push changes to Github """
# try a git pull
try:
self.git.pull()
except RuntimeError as err:
log.warning(err)
# try a git push
self.git.push()
[docs]
def remove_from_git(self) -> None:
""" Remove file from the git repo """
# create new branch if needed
if self.git.current_branch == 'main':
branch = f'dmgen-fs-{self.datamodel.file_species}'
log.info(f'Creating new working branch {branch}.')
self.git.create_new_branch(branch)
# try to remove the file
if os.path.exists(self.output):
self.git.rm(self.output)
self.git.commit(message=f"removing file {self.datamodel.file_species}.{self.format}")
[docs]
def remove_release(self, release: str):
""" Remove a release from the datamodel stub """
if not self._cache:
self.update_cache()
# remove the release from the general section attribute
if release in self._cache["general"]["releases"]:
self._cache["general"]["releases"].remove(release)
# remove the release from the releases section
if release in self._cache["releases"]:
self._cache["releases"].pop(release)
# update the changelog
self._update_cache_changelog()
# def workflow(self):
# # create stub with datamodel
# # set the output file to write content
# # read content from yaml file and create cached content
# # for yaml format only, create the cache (has template, creates, uses, replaces cached content)
# # render yaml template with prepared template input
# # validate the yaml content
# # write out the contents into yaml file
# # for markdown format only (has template, only used cached content)
# # read the markdown template with yaml cache
# # validate the yaml content
# # write out the contents with cached content
# # for json format only (no template, only uses cached content)
# # read in yaml cache content
# # validate the yaml content
# # write out json file with cached content
# pass
[docs]
class YamlStub(BaseStub):
format: str = 'yaml'
cacheable: bool = True
def _get_content(self) -> None:
self.content = yaml.dump(self._cache, sort_keys=False)
[docs]
class MdStub(BaseStub):
format: str = 'md'
def _get_content(self, release: str = None, group: str = 'WORK') -> None:
# update the markdown template to a file specific template
try:
self.template = self.environment.get_template(f'md/{self.selected_file.suffix.lower()}.md')
except TemplateNotFound:
log.error(f'Jinja2 markdown template not found for filetype {self.selected_file.suffix.lower()}.'
' Check that a markdown stub for the filetype has been created in templates/md/.')
return
selected_release = self.get_selected_release(release=release, group=group)
data = self._cache['releases'][selected_release].get(self.selected_file.cache_key, {})
self.content = self.template.render(content=self._cache, data=data, filetype=self.selected_file.suffix.lower(),
selected_release=selected_release, cache_key=self.selected_file.cache_key)
[docs]
def get_selected_release(self, release: str = None, group: str = 'WORK') -> str:
""" get the hdu content for a given release """
cached_releases = list(self._cache['releases'].keys())
if len(cached_releases) == 0:
return release or self.datamodel.release
elif len(cached_releases) == 1:
return cached_releases[0]
elif release in cached_releases:
return release
else:
if release and release not in cached_releases:
log.debug(f'Input release {release} unavailable in cache. '
f'Selecting latest release in group {group}')
# TODO - move to separate function
# groups releases and sorts them
import itertools
g = itertools.groupby(sorted(cached_releases, key=lambda x: x[0]), lambda x: x[0])
key = {'D': 'DR', 'I': 'IPL', 'M': 'MPL', 'W': 'WORK'}
rs = {}
for i, gg in g:
rs[key[i]] = sorted(gg, key=lambda x: int(x[2:]) if 'DR' in x else int(x[3:]) if 'PL' in x else x)
# get latest release
# set a fallback group; fallback either to WORK or DR group
altgroup = 'WORK' if group != 'WORK' else 'DR'
if group not in key.values():
raise KeyError(f'group {group} is not a valid release group')
elif group not in rs.keys():
if altgroup not in rs.keys():
log.warning(f'group(s) {group}/{altgroup} not yet a cached release')
group = next(iter(rs.keys()))
log.warning(f'Falling back to group: {group}')
else:
group = altgroup
return rs[group][-1]
[docs]
def render_content(self, force: bool = None, release: str = None, group: str = 'WORK') -> None:
if not self._cache or force:
self._get_cache(force=force)
if self.format != 'yaml' and not self.validate_cache():
log.info('yaml cache is not validated!')
return
self._get_content(release=release, group=group)
[docs]
def write(self, force: bool = None, release: str = None, group: str = 'WORK',
html: bool = None, use_cache_release: str = None,
full_cache: bool = None, **kwargs) -> None:
if not self.output:
raise AttributeError('No output filepath set')
# always re-render the content
self.render_content(force=force, release=release, group=group)
if not self.content:
log.info('No cache content to write out!')
return
with open(self.output, 'w') as f:
f.write(self.content)
[docs]
class JsonStub(BaseStub):
format: str = 'json'
has_template: bool = False
def _get_content(self) -> None:
# uses orjson to dump; see orjson_dumps method in models/yaml.py
# orjson options; indent=2, sort_keys = False (default)
self.content = self._validated_yaml.model_dump_json(by_alias=True, indent=2) if self._validated_yaml else {}
[docs]
class AccessStub(BaseStub):
format: str = 'access'
has_template: bool = False
cacheable: bool = False
def _get_content(self) -> None:
releases = {k: v.get('access', {}) for k,v in self._cache['releases'].items()}
self.content = yaml.dump(releases, sort_keys=False)
[docs]
def stub_iterator(format: str = None) -> Iterator[BaseStub]:
""" Iterator for all stub formats """
for stub in [YamlStub, AccessStub, MdStub, JsonStub]:
if format and format != stub.format:
continue
yield stub