Skip to content

Index

Utility function

check_categorical(expression) #

Checks whether a given expression is categorical or not.

Parameters:

Name Type Description Default
expression Expression

the expression to validate

required

Returns:

Name Type Description
bool bool

whether the expression can be considered categorical data or not

Source code in src/sdss_explorer/util/util.py
16
17
18
19
20
21
22
23
24
25
def check_categorical(expression: vx.Expression) -> bool:
    """Checks whether a given expression is categorical or not.

    Args:
        expression: the expression to validate

    Returns:
        bool: whether the expression can be considered categorical data or not
    """
    return (expression.dtype == "string") | (expression.dtype == "bool")

check_flags(flags, filters) #

Converts flags & values to boolean vaex expression for use as a filter.

Note

Registered as a vaex.expression.Expression method via the register_function decorator.

Parameters:

Name Type Description Default
flags Expression

bit flags expressions

required
filters Expression

which filters to apply

required

Returns: Boolean expression of filters

Source code in src/sdss_explorer/util/filters.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@vx.register_function(multiprocessing=True)
def check_flags(flags: vx.Expression, filters: vx.Expression) -> vx.Expression:
    """Converts flags & values to boolean vaex expression for use as a filter.

    Note:
        Registered as a `vaex.expression.Expression` method via the `register_function` decorator.

    Args:
        flags: bit flags expressions
        filters: which filters to apply
    Returns:
        Boolean expression of filters
    """
    return np.logical_and(flags, filters).any(axis=1)

filter_carton_mapper(df, mapping, carton, mapper, combotype='AND', invert=False) #

Filters a list of cartons and mappers

Based on code written by Andy Casey for github.com/sdss/semaphore

Source code in src/sdss_explorer/util/filters.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def filter_carton_mapper(
    df: vx.DataFrame,
    mapping: vx.DataFrame,
    carton: list[str],
    mapper: list[str],
    combotype: str = "AND",
    invert: bool = False,
) -> vx.Expression | None:
    """
    Filters a list of cartons and mappers

    Based on code written by Andy Casey for github.com/sdss/semaphore
    """
    if len(mapper) != 0 or len(carton) != 0:
        # mask
        if len(mapper) == 0:
            mask = mapping["alt_name"].isin(carton).values
        elif len(carton) == 0:
            mask = mapping["mapper"].isin(mapper).values
        else:
            mask = operator_map[combotype](
                mapping["mapper"].isin(mapper).values,
                mapping["alt_name"].isin(carton).values,
            )

        # determine active bits via mask and get flag_number & offset
        # NOTE: hardcoded nbits as 8, and nflags as 57
        bits = np.arange(len(mapping))[mask]
        num, offset = np.divmod(bits, 8)
        setbits = 57 > num  # ensure bits in flags

        # construct hashmap for each unique flag
        filters = np.zeros(57, dtype="uint8")
        unique_nums, indices = np.unique(num[setbits], return_inverse=True)
        for i, unique in enumerate(unique_nums):
            offsets = 1 << offset[setbits][indices == i]
            filters[unique] = np.bitwise_or.reduce(offsets)

        cmp_filter = df.func.check_flags(df["sdss5_target_flags"], filters)
    else:
        cmp_filter = None

    if invert and (cmp_filter is not None):
        logger.debug("inverting cmpfilter")
        return ~cmp_filter
    else:
        return cmp_filter

filter_expression(df, columns, expression, invert=False) #

Converts expression to valid filter

Source code in src/sdss_explorer/util/filters.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def filter_expression(
    df: vx.DataFrame,
    columns: list[str],
    expression: str,
    invert: bool = False,
):
    """Converts expression to valid filter"""
    # first, remove all spaces
    expr = expression.replace(" ", "")
    logger.debug(f"expr: {expr}")
    num_regex = r"^-?[0-9]+(?:\.[0-9]+)?(?:e-?\d+)?$"

    # get expression in parts, saving split via () regex
    subexpressions = re.split(r"(&|\||\)|\()", expr)
    n = 1
    for i, expr in enumerate(subexpressions):
        # saved regex info -> skip w/o enumerating
        if expr in ["", "&", "(", ")", "|"]:
            continue

        # guardrail ace here -- very important
        illegals = ["eval", "exec", "import", "__main__"]
        for illegal in illegals:
            if illegal in expression:
                logger.critical(
                    "this user attempted to use ACE-like expressions!")
                assert False, "Your session and IP has been logged."

        parts = re.split(r"(>=|<=|<|>|==|!=)", expr)
        if len(parts) == 1:
            assert False, f"expression {n} is invalid: no comparator"
        elif len(parts) == 5:
            # first, check that parts 2 & 4 are lt or lte comparators
            assert (
                re.fullmatch(r"<=|<", parts[1]) is not None
                and re.fullmatch(r"<=|<", parts[3]) is not None
            ), (f"expression {n} is invalid: not a proper 3-part inequality (a < col <= b)"
                )

            # check middle
            assert parts[2] in columns, (
                f"expression {n} is invalid: must be comparing a data column (a < col <= b)"
            )

            # check a and b & if a < b
            assert re.match(num_regex, parts[0]) is not None, (
                f"expression {n} is invalid: must be numeric for numerical data column"
            )
            assert float(parts[0]) < float(parts[-1]), (
                f"expression {n} is invalid: invalid inequality (a > b for a < col < b)"
            )

            # change the expression to valid format
            subexpressions[i] = (
                f"(({parts[0]}{parts[1]}{parts[2]})&({parts[2]}{parts[3]}{parts[4]}))"
            )

        elif len(parts) == 3:
            check = (parts[0] in columns, parts[2] in columns)
            if np.any(check):
                if check[0]:
                    col = parts[0]
                    num = parts[2]
                elif check[1]:
                    col = parts[2]
                    num = parts[0]
                dtype = str(df[col].dtype)
                if "float" in dtype or "int" in dtype:
                    assert re.match(num_regex, num) is not None, (
                        f"expression {n} is invalid: must be numeric for numerical data column"
                    )
            else:
                assert False, f"expression {n} is invalid: one part must be column"
            assert re.match(r">=|<=|<|>|==|!=", parts[1]) is not None, (
                f"expression {n} is invalid: middle is not comparator")

            # change the expression in subexpression
            subexpressions[i] = "(" + expr + ")"
        else:
            assert False, f"expression {n} is invalid: too many comparators"

        # enumerate the expr counter
        n = n + 1

    # create expression as str
    expr = "(" + "".join(subexpressions) + ")"
    logger.debug(f"expr final: {expr}")

    # set filter corresponding to inverts & exit
    if invert:  # NOTE: df will never be None unless something horrible happens
        logger.debug("inverting expression")
        return ~df[expr]
    else:
        return df[expr]

filter_flags(df, flags, dataset, invert=False) #

Generates a filter for flags

Parameters:

Name Type Description Default
df DataFrame

dataframe to filter

required
flags list[str]

list of flags to update

required
dataset str

specific dataset to filter on, used to check whether result_flags is present

required
invert bool

whether to invert

False
Source code in src/sdss_explorer/util/filters.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
def filter_flags(df: vx.DataFrame,
                 flags: list[str],
                 dataset: str,
                 invert: bool = False) -> vx.Expression | None:
    """
    Generates a filter for flags

    Args:
        df: dataframe to filter
        flags: list of flags to update
        dataset: specific dataset to filter on, used to check whether result_flags is present
        invert: whether to invert
    """
    filters = []
    for flag in flags:
        # Skip iteration if the subset's dataset is 'best' and the flag is 'Purely non-flagged'
        if (dataset == "mwmlite") and (flag == "purely non-flagged"):
            continue
        # boss-only pipeline exceptions for zwarning_flags filtering
        elif np.isin(
                dataset,
            ("spall", "lineforest"),
        ) and (flag == "purely non-flagged"):
            filters.append("zwarning_flags!=0")
            continue
        filters.append(flagList[flag])

    # Determine the final concatenated filter
    if filters:
        # Join the filters with ")&(" and wrap them in outer parentheses
        concat_filter = f"(({')&('.join(filters)}))"
        concat_filter: vx.Expression = df[concat_filter]
        if invert and (concat_filter is not None):
            logger.debug("inverting flagfilter")
            concat_filter = ~concat_filter
    else:
        concat_filter = None
    return concat_filter

generate_unique_key(key='') #

Generates a unique UUID-based key for given string.

Parameters:

Name Type Description Default
key str

a sub-key to prefix with

''

Returns:

Type Description
str

A new unique key with a UUID4 postpended.

Source code in src/sdss_explorer/util/util.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def generate_unique_key(key: str = "") -> str:
    """Generates a unique UUID-based key for given string.

    Args:
        key: a sub-key to prefix with

    Returns:
        A new unique key with a UUID4 postpended.
    """

    def make_uuid(*_ignore):
        return str(uuid.uuid4())

    return key + make_uuid()

setup_logging(log_path='./', log_file='explorerApp.log', console_log_level='DEBUG', file_log_level='INFO') #

Configures the logging system with a rotating file handler.

Parameters:

Name Type Description Default
log_path str

log path

'./'
log_file str

log filename

'explorerApp.log'
console_log_level str

log level for console as full uppercase string

'DEBUG'
file_log_level str

log level for file as full uppercase string

'INFO'
Source code in src/sdss_explorer/util/logger.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def setup_logging(
    log_path: str = "./",
    log_file: str = "explorerApp.log",
    console_log_level: str = "DEBUG",
    file_log_level: str = "INFO",
):
    """
    Configures the logging system with a rotating file handler.

    Args:
        log_path: log path
        log_file: log filename
        console_log_level: log level for console as full uppercase string
        file_log_level: log level for file as full uppercase string
    """
    logging_config = {
        "version": 1,
        "disable_existing_loggers": False,
        "formatters": {
            "standard": {
                "()":
                # logging.Formatter,
                MultiLineFormatter,  # use custom multi-line formatter
                "format":
                "%(asctime)s - %(name)s - %(levelname)s - %(kernel_id)s - %(message)s",  # standard format
            }
        },
        "handlers": {
            "console": {
                "class": "logging.StreamHandler",
                "formatter": "standard",
                "level": console_log_level,
            },
            "file": {
                "class": "logging.FileHandler",
                "formatter": "standard",
                "level": file_log_level,
                "filename": pathjoin(log_path, log_file),
                "mode": "a",
            },
        },
        "loggers": {
            "dashboard": {
                "handlers": ["console", "file"],
                "level": file_log_level,
                "propagate": False,
            },
            "server": {
                "handlers": ["console", "file"],
                "level": file_log_level,
                "propagate": False,
            },
        },
    }

    # set record factory to set kernel id
    oldfactory = logging.getLogRecordFactory()

    def record_factory(*args, **kwargs):
        record = oldfactory(*args, **kwargs)
        if getattr(record, "kernel_id", None) is None:
            record.kernel_id = get_kernel_id()
        return record

    logging.setLogRecordFactory(record_factory)

    logging.config.dictConfig(logging_config)
    return

validate_pipeline(df, pipeline) #

Validates whether pipeline is valid.

This checks membership of the pipeline in the dataframe.

Note

This method assumes pipeline is a valid column in df.

Parameters:

Name Type Description Default
df DataFrame

dataframe to check whether pipeline is valid against

required
pipeline str

the pipeline to check against

required

Returns:

Type Description
bool

True: if an valid pipeline for this dataframe

bool

False: if an invalid pipeline for this dataframe

Source code in src/sdss_explorer/util/util.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def validate_pipeline(df: vx.DataFrame, pipeline: str) -> bool:
    """Validates whether pipeline is valid.

    This checks membership of the pipeline in the dataframe.

    Note:
        This method assumes `pipeline` is a valid column in `df`.

    Args:
        df: dataframe to check whether `pipeline` is valid against
        pipeline: the pipeline to check against

    Returns:
        `True`: if an valid pipeline for this dataframe
        `False`: if an invalid pipeline for this dataframe
    """
    if df:
        if any(pipeline == c for c in df["pipeline"].unique()):
            return True
    return False

validate_release(path, release) #

Validates whether release is valid.

This traverses the provided path to see whether there is a folder of that name release.

Parameters:

Name Type Description Default
path str

the datapath to check whether release is valid against

required
release str

the release to check against

required

Returns:

Type Description
bool

True: if an valid release for this dataframe

bool

False: if an invalid release for this dataframe

Source code in src/sdss_explorer/util/util.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def validate_release(path: str, release: str) -> bool:
    """Validates whether release is valid.

    This traverses the provided path to see whether there is a folder of that name `release`.

    Args:
        path: the datapath to check whether `release` is valid against
        release: the release to check against

    Returns:
        `True`: if an valid release for this dataframe
        `False`: if an invalid release for this dataframe
    """
    if path:
        for it in os.scandir(path):
            if it.is_dir() and (it.name == release):
                return True
    return False