Source code for supy.util._gap_filler

import pandas as pd
import numpy as np


# locate the first position of period with in-between gaps
def loc_gap(ser_test, freq="1D", pattern="010"):
    rsmp = ser_test.resample(freq)
    ser_TF_10 = rsmp.apply(lambda ser: ser.isna().any()) * 1
    n_gap = ser_TF_10.sum()
    if cal_ratio_info(ser_test) < 0.5:
        raise RuntimeError(
            f"too many gaps (n_gap/n_period={n_gap}/{ser_TF_10.size}) to proceed! please enlarge the series to introduce more valid periods."
        )
    str_TF_10 = ser_TF_10.astype(str).str.cat()
    pos_gap = str_TF_10.find(pattern)
    loc_ser = ser_TF_10.iloc[pos_gap : pos_gap + len(pattern)].index
    return loc_ser


# fill gap with neighbouring days
def fill_gap_one(ser_test, freq="1D", pattern="010"):
    # resample into daily periods
    rsmp = ser_test.resample(freq)
    # locate the gaps according to gap pattern: 0 for NO gap, 1 for gapped
    loc_ser = loc_gap(ser_test, freq, pattern)

    # generator groups
    ser_find = (rsmp.get_group(x) for x in loc_ser)
    if len(loc_ser) == 0:
        return ser_test

    # assign series:
    # ser_prev: series prior to gapped period
    # ser_gap: series with gaps
    # ser_post: series after gapped period
    if pattern == "010":
        ser_prev, ser_gap, ser_post = ser_find
    elif pattern == "01":
        ser_prev, ser_gap = ser_find
        ser_post = pd.Series([])
    elif pattern == "10":
        ser_gap, ser_post = ser_find
        ser_prev = pd.Series([])

    # base series for gap filling
    ser_fill_base = pd.concat([ser_prev, ser_post])
    ser_fill = (
        ser_fill_base.groupby(
            [
                ser_fill_base.index.hour.rename("hr"),
                ser_fill_base.index.minute.rename("min"),
            ]
        )
        .median()
        .reset_index(drop=True)
    )
    try:
        ser_fill.index = ser_gap.index
    except ValueError:
        print(ser_test)
        print(pattern)
        print(ser_fill_base)
        print(ser_gap)

    # calculate rescaling factor with enough values to robustly rescale
    if (pattern == "010") and (ser_gap.count() > len(ser_gap) / 2):
        scale_fill = (ser_fill / ser_gap).median()
        # correct scale_fill for edge cases
        scale_fill = 1 if abs(scale_fill) > 10 else scale_fill
        scale_fill = 1 if abs(scale_fill) < 0.1 else scale_fill
        scale_fill = 1 if np.isnan(scale_fill) else scale_fill
    else:
        scale_fill = 1
    # rescale fill based on median ratio of fill:orig at available timesteps
    ser_fill_gap = ser_fill / scale_fill

    # fill in gaps with rescaled values of the filling data
    ser_gap.loc[ser_gap.isna()] = ser_fill_gap.loc[ser_gap.isna()]
    ser_filled = pd.concat([ser_prev, ser_gap, ser_post])

    # fill the original gapped series
    ser_test_filled = ser_test.copy()
    ser_test_filled.loc[ser_filled.index] = ser_filled
    return ser_test_filled


def cal_ratio_info(ser_to_fill, freq="1d"):
    rsmp = ser_to_fill.resample(freq)
    ser_TF_10 = rsmp.apply(lambda ser: ser.isna().any()) * 1
    return 1 - ser_TF_10.sum() / ser_TF_10.size


# fill gaps iteratively
def fill_gap_all_x(ser_to_fill: pd.Series, freq="1D", limit_fill=1) -> pd.Series:
    """Fill all gaps in a time series using data from neighbouring divisions of 'freq'

    Parameters
    ----------
    ser_to_fill : pd.Series
        Time series to gap-fill
    freq : str, optional
        Frequency to identify gapped divisions, by default '1D'
    limit_fill: int, optional
        Maximum number of consecutive NaNs to fill.
        Any number less than one means no pre-gap-filling interpolation will be done.

    Returns
    -------
    ser_test_filled: pd.Series
        Gap-filled time series.

    Patterns
    --------
    010: missing data in division between others with no missing data
    01:  missing data in division after one with no missing data
    10:  division with missing data before one with no missing data
    """

    if limit_fill > 0:
        ser_test_filled = ser_to_fill.copy().interpolate(limit=limit_fill)
    else:
        ser_test_filled = ser_to_fill.copy()

    ptn_list = ["010", "01", "10"]

    # if ratio_info > 0.5:
    n_freq = ser_test_filled.resample(freq).size().size
    if n_freq > 4:
        n_chunk = int(pd.Timedelta(freq) / ser_test_filled.index.freq)
        n_freq = int(ser_test_filled.size / n_chunk)
        if n_chunk * n_freq < ser_test_filled.size:
            ser_residual = ser_test_filled.iloc[n_chunk * n_freq :]
        else:
            ser_residual = pd.Series([])
        ser_test_filled = ser_test_filled.iloc[: n_chunk * n_freq]
        # ser_test_filled
        # n_freq
        if n_freq > 4:
            n_sep = int(n_freq / 2) * n_chunk
            #     offset = pd.Timedelta(freq) * n_sep
            #     ind_sep1 = ser_test_filled.index[0] + offset
            #     ind_sep2 = ser_test_filled.index[1] + offset
            ser_test_filled1 = ser_test_filled.iloc[:n_sep]
            ser_test_filled2 = ser_test_filled.iloc[n_sep:]
            rinfo1 = cal_ratio_info(ser_test_filled1)
            rinfo2 = cal_ratio_info(ser_test_filled2)
            rinfo_thresh = 0.2
            while (not (rinfo1 >= rinfo_thresh and rinfo2 >= rinfo_thresh)) and (
                n_sep < n_freq
            ):
                # print(rinfo1, rinfo2, n_sep)
                if rinfo1 < rinfo_thresh:
                    n_sep += n_chunk
                if rinfo2 < rinfo_thresh:
                    n_sep -= n_chunk
                ser_test_filled1 = ser_test_filled.iloc[:n_sep]
                ser_test_filled2 = ser_test_filled.iloc[n_sep:]
                rinfo1 = cal_ratio_info(ser_test_filled1)
                rinfo2 = cal_ratio_info(ser_test_filled2)

        # print(rinfo1, rinfo2, ser_test_filled1.size, ser_test_filled2.size)
        ser_test_filled1 = fill_gap_all_x(ser_test_filled1)
        ser_test_filled2 = fill_gap_all_x(ser_test_filled2)
        # print(ser_test_filled1.size)
        # print(ser_test_filled2.size)
        # print()
        ser_test_filled = pd.concat([ser_test_filled1, ser_test_filled2, ser_residual])
    else:
        while ser_test_filled.isna().any():
            # print("here")
            # try to different gap patterns and fill gaps
            try:
                ptn_gap = next(
                    ptn
                    for ptn in ptn_list
                    if len(loc_gap(ser_test_filled, freq, ptn)) == len(ptn)
                )
                # print(ser_test_filled,ptn_gap)
                ser_test_filled = fill_gap_one(ser_test_filled, freq, ptn_gap)
            except StopIteration:
                pass
            except RuntimeError as e:
                # print("cannot proceed so can only return the original gapped series!")
                return ser_test_filled

    return ser_test_filled


[docs]def fill_gap_all( ser_to_fill: pd.Series, freq="1D", limit_fill=1, thresh_ratio=0.8, ) -> pd.Series: """Fill all gaps in a time series using data from neighbouring divisions of 'freq' Parameters ---------- ser_to_fill : pd.Series Time series to gap-fill freq : str, optional Frequency to identify gapped divisions, by default '1D' limit_fill: int, optional Maximum number of consecutive NaNs to fill. Any number less than one means no pre-gap-filling interpolation will be done. Returns ------- ser_test_filled: pd.Series Gap-filled time series. Patterns -------- 010: missing data in division between others with no missing data 01: missing data in division after one with no missing data 10: division with missing data before one with no missing data """ ratio_info = cal_ratio_info(ser_to_fill) ser_test_filled = norm_ser_dt(ser_to_fill) if ratio_info < thresh_ratio: raise RuntimeError( f"input series is too gapped (valid data ratio = {ratio_info:.2%}) to proceed with the gap-filling method." ) else: # normalise into a series with complete diurnal cycles ser_test_filled = fill_gap_all_x(ser_test_filled, freq, limit_fill) # i = 0 while ser_test_filled.isna().any(): # i += 1 ser_test_filled = fill_gap_all_x(ser_test_filled, freq, limit_fill) ser_gap = ser_test_filled.groupby(ser_test_filled.index.year).apply( lambda x: x.isna().sum() ) # print(i, ser_gap) ser_test_filled = ser_test_filled.loc[ser_to_fill.index] return ser_test_filled
def norm_ser_dt(ser_dt): """ normalise a time series into a time series with complete diurnal cycles. Parameters ---------- ser_dt : pandas.Series datetime-indexed time series Returns ------- pandas.Series time series with complete diurnal cycles; missing values may be inserted as np.nan. """ freq = ser_dt.index.freq dt_start, dt_end = ser_dt.index.normalize()[[0, -1]] dt_end = dt_end + pd.Timedelta("1d") dt_end idx_norm = pd.date_range(dt_start, dt_end, freq=freq)[:-1] ser_norm = ser_dt.reindex(idx_norm) return ser_norm