Skip to content

filter

filter_dataframe(uuid, release, datatype, dataset, name='A', expression='', carton='', mapper='', flags='', crossmatch='', cmtype='', combotype='AND', invert=False) #

Filters and exports dataframe based on input subset parameters.

Will write a file to the scratch disk based on settings.scratch.

Parameters:

Name Type Description Default
uuid UUID

unique job id

required
release str

data release

required
datatype str

datatype (star or visit)

required
dataset str

specific dataset i.e. aspcap, spall, best

required
name str

name of subset, used in generating output file

'A'
expression str

filter expression

''
carton str

comma-separated cartons

''
mapper str

comma-separated mappers

''
flags str

comma-separated flagss

''
combotype str

logical reducer for carton/mapper

'AND'
invert bool

whether to invert all filters

False

Returns:

Type Description
None

None

Source code in src/sdss_explorer/server/filter.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def filter_dataframe(
    uuid: UUID,
    release: str,
    datatype: str,
    dataset: str,
    name: str = "A",
    expression: str = "",
    carton: str = "",
    mapper: str = "",
    flags: str = "",
    crossmatch: str = "",
    cmtype: str = "",
    combotype: str = "AND",
    invert: bool = False,
) -> None:
    """Filters and exports dataframe based on input subset parameters.

    Will write a file to the scratch disk based on `settings.scratch`.

    Args:
        uuid: unique job id
        release: data release
        datatype: datatype (star or visit)
        dataset: specific dataset i.e. aspcap, spall, best
        name: name of subset, used in generating output file
        expression: filter expression
        carton: comma-separated cartons
        mapper: comma-separated mappers
        flags: comma-separated flagss
        combotype: logical reducer for carton/mapper
        invert: whether to invert all filters

    Returns:
        None
    """
    logger.debug("starting filter job")
    dff, columns = load_dataframe(release, datatype, dataset)
    if (dff is None) or (columns is None):
        raise Exception("dataframe/columns load failed")
    filters = list()

    # generic unpack; show to console
    logger.debug(f"""requested {release}/{datatype}/{dataset}{uuid}
                 expr:                 {expression} 
                 carton:               {carton} 
                 mapper:               {mapper} 
                 flags:                {flags}
                 crossmatch({cmtype}): {crossmatch[:8]}...
                 combotype:            {combotype}
                 invert:               {invert}
                 """)

    # process list-like data
    if carton:
        carton: list[str] = carton.split(",")
    if mapper:
        mapper: list[str] = mapper.split(",")
    if flags:
        flags: list[str] = flags.split(",")

    # make all filters via utility funcs
    if expression:
        filters.append(
            filter_expression(dff, columns, expression, invert=invert))
    if carton or mapper:
        cmp_filter = filter_carton_mapper(
            dff,
            mappings,
            carton if carton else [],
            mapper if mapper else [],
            combotype=combotype,
            invert=invert,
        )
        filters.append(cmp_filter)
    if flags:
        flagfilter = filter_flags(dff, flags, dataset, invert=invert)
        filters.append(flagfilter)
    if len(crossmatch) > 0:
        crossmatchFilter = filter_crossmatch(dff, crossmatch, cmtype)
        filters.append(crossmatchFilter)

    # concat all and go!
    filters = [f for f in filters if f is not None]
    if filters:
        totalfilter = reduce(operator.__and__, filters)
        dff = dff[totalfilter]
    if len(dff) == 0:
        raise Exception("attempting to export 0 length df")

    # make directory and pass back after successful export
    os.makedirs(os.path.join(settings.scratch, str(uuid)), exist_ok=True)
    currentTime = "{date:%Y-%m-%d_%H:%M:%S}".format(date=datetime.now())
    filename = f"subset-{name}-{release}-{datatype}-{dataset}-{currentTime}.parquet"
    filepath = os.path.join(str(uuid), filename)
    disk_path = os.path.join(settings.scratch, filepath)

    # extract, then export
    dff = dff[columns].extract()
    dff.export_parquet(disk_path, chunk_size=int(60e3))

    # cleanup to free memory slightly
    dff.close()
    del dff
    gc.collect()
    logger.debug("completed filter job, exiting now!")
    return filepath