Source code for supy.util._tmy

from pathlib import Path
from typing import Tuple

import numpy as np
import pandas as pd


#################################################################
# generate TMY dataframe from supy results
# weight class to determine constants for TMY generation
class Const:
    class ConstError(TypeError):
        pass

    class ConstCaseError(ConstError):
        pass

    def __setattr__(self, name, value):
        if name in self.__dict__:
            raise self.ConstError("can't change const %s" % name)
        if not name.isupper():
            raise self.ConstCaseError('const name "%s" is not all uppercase' % name)
        self.__dict__[name] = value


def gen_score_list(length):
    list_score = (np.arange(length) + 0.5) / length
    return list_score


def gen_score_ser(ser_test):
    ser_score = ser_test.sort_values(ascending=True)
    length = ser_score.size
    list_score = (np.arange(length) + 0.5) / length
    ser_score.loc[:] = list_score
    return ser_score


def gen_FS_DF(df_output):
    """generate DataFrame of scores.

    Parameters
    ----------
    df_output

    Returns
    -------
    type
        Description of returned object.

    """
    df_day = pd.pivot_table(
        df_output,
        values=["T2", "U10", "Kdown", "RH2"],
        index=["Year", "Month", "Day"],
        aggfunc=[min, max, np.mean,],
    )
    df_day_all_year = pd.pivot_table(
        df_output,
        values=["T2", "U10", "Kdown", "RH2"],
        index=["Month", "Day"],
        aggfunc=[min, max, np.mean,],
    )

    array_yr_mon = df_day.index.droplevel("Day").to_frame().drop_duplicates().values

    df_fs = pd.DataFrame(
        {
            (yr, mon): (
                df_day.loc[(yr, mon)].apply(gen_score_ser)
                - df_day_all_year.loc[mon].apply(gen_score_ser)
            )
            .abs()
            .mean()
            for yr, mon in array_yr_mon
        }
    )

    return df_fs


def gen_WS_DF(df_met):
    """generate DataFrame of weighted sums of F-score.

    Parameters
    ----------
    df_met : pd.DataFrame
        A dataframe of meterological info that mush include these columns/variables:
        - T2: near surface air temperature at 2 m agl
        - RH2: near surface relative humidity at 2 m agl
        - U10: near surface wind speed at 10 m agl
        - Kdown: incomidng solar radiation
        - Year: calendar year
        - Month: calendar month
        - Day: calendar day

    Returns
    -------
    pd.DataFrame
        Converted dataframe with calculated metrics for TMY generation.

    """
    df_fs = gen_FS_DF(df_met)

    list_index = [
        ("mean", "T2"),
        ("max", "T2"),
        ("min", "T2"),
        ("mean", "U10"),
        ("max", "U10"),
        ("min", "U10"),
        ("mean", "RH2"),
        ("max", "RH2"),
        ("min", "RH2"),
        ("mean", "Kdown"),
    ]

    # generate weights: Sandia method
    const = Const()

    const.T_MEAN = 2 / 24
    const.T_MAX = 1 / 24
    const.T_MIN = 1 / 24
    const.T_RANGE = 0  # todo
    const.RH_MEAN = 2 / 24
    const.RH_MAX = 1 / 24
    const.RH_MIN = 1 / 24
    const.RH_RANGE = 0  # todo
    const.WIND_MEAN = 2 / 24
    const.WIND_MAX = 2 / 24
    const.WIND_MIN = 0
    const.WIND_RANGE = 0  # todo
    const.WIND_DIRECTION = 0  # todo
    const.SOLAR_RADIATION_GLOBAL = 12 / 24
    const.SOLAR_RADIATION_DIRECT = 0  # todo

    list_const = [
        getattr(const, attr)
        for attr in [
            "T_MEAN",
            "T_MAX",
            "T_MIN",
            "WIND_MEAN",
            "WIND_MAX",
            "WIND_MIN",
            "RH_MEAN",
            "RH_MAX",
            "RH_MIN",
            "SOLAR_RADIATION_GLOBAL",
        ]
    ]
    list_ws = [df_fs.loc[idx] * cst for idx, cst in zip(list_index, list_const)]
    df_ws = pd.concat(list_ws, axis=1).sum(axis=1).unstack().dropna()

    return df_ws


def pick_year(df_output, n=5):
    # root mean square differences
    df_rmsd_mon = cal_rmsd_mon(df_output)

    # WS: weighted FS metric
    df_ws = gen_WS_DF(df_output)

    # years with smallest WS
    year_nsmallest = df_ws.apply(lambda ser: ser.nsmallest(n).index)

    # best candidate years for each month
    year_sel = df_rmsd_mon.apply(lambda ser: ser.loc[year_nsmallest[ser.name]]).idxmin()

    return year_sel


def cal_rmsd_mon(df_output):
    df_day = pd.pivot_table(
        df_output, values="Kdown", index=["Year", "Month", "Day"], aggfunc=[np.mean,]
    )

    df_day_all_year = pd.pivot_table(
        df_output, values="Kdown", index=["Month", "Day"], aggfunc=[np.mean,]
    )

    array_yr_mon = df_day.index.droplevel("Day").to_frame().drop_duplicates().values

    df_rmse = (
        pd.DataFrame(
            {
                (yr, mon): np.sqrt(
                    np.square(df_day.loc[(yr, mon)] - df_day_all_year.loc[mon]).mean()
                )
                for yr, mon in array_yr_mon
            }
        )
        .stack()
        .T.dropna()
    )
    df_rmse.columns = df_rmse.columns.droplevel([0, 1])
    return df_rmse


# headers of standard EPW files
header_EPW = """
Year
Month
Day
Hour
Minute
Data Source and Uncertainty Flags
Dry Bulb Temperature
Dew Point Temperature
Relative Humidity
Atmospheric Station Pressure
Extraterrestrial Horizontal Radiation
Extraterrestrial Direct Normal Radiation
Horizontal Infrared Radiation Intensity
Global Horizontal Radiation
Direct Normal Radiation
Diffuse Horizontal Radiation
Global Horizontal Illuminance
Direct Normal Illuminance
Diffuse Horizontal Illuminance
Zenith Luminance
Wind Direction
Wind Speed
Total Sky Cover
Opaque Sky Cover
Visibility
Ceiling Height
Present Weather Observation
Present Weather Codes
Precipitable Water
Aerosol Optical Depth
Snow Depth
Days Since Last Snowfall
Albedo
Liquid Precipitation Depth
Liquid Precipitation Quantity
    """

# list of variables in EPW
list_var_EPW = header_EPW.split("\n")[1:-1]

# dict: SuPy variables -> EPW standard names
dict_supy_epw = {
    "Kdown": "Global Horizontal Radiation",
    "T2": "Dry Bulb Temperature",
    "RH2": "Relative Humidity",
    "U10": "Wind Speed",
}
dict_epw_supy = {v: k for k, v in dict_supy_epw.items()}


def gen_TMY(df_output):
    """generate TMY (typical meteorological year) from SuPy output.

    Parameters
    ----------
    df_output : pandas.DataFrame
        Output from `run_supy`: longterm (e.g., >10 years) simulation results, otherwise not very useful.

    """

    # calculate weighted score
    df_output_x = df_output.assign(
        Year=lambda df: df.index.year,
        Month=lambda df: df.index.month,
        Day=lambda df: df.index.day,
        Hour=lambda df: df.index.hour,
        Minute=lambda df: df.index.minute,
    )
    # df_ws = gen_WS_DF(df_output_x)

    # select year
    year_sel = pick_year(df_output_x, n=5)

    # convert `0h` to `24h` and take care of `day`: to follow EPW convention
    df_output_x = conv_0to24(df_output_x)

    # generate TMY data
    df_TMY = pd.concat(
        [
            df_output_x.groupby(["Month", "Year"]).get_group(grp)
            for grp in year_sel.items()
        ]
    )

    return df_TMY


def conv_0to24(df_TMY):
    # convert `0h` to `24h` and take care of `day`
    loc_24h = df_TMY.index == df_TMY.index.normalize()
    ser_24h = df_TMY.loc[loc_24h].index - pd.Timedelta("1h")
    df_TMY.loc[loc_24h, "Year"] = ser_24h.year
    df_TMY.loc[loc_24h, "Month"] = ser_24h.month
    df_TMY.loc[loc_24h, "Day"] = ser_24h.day
    df_TMY.loc[loc_24h, "Hour"] = 24
    return df_TMY


# function to read in EPW file
[docs]def read_epw(path_epw: Path) -> pd.DataFrame: """Read in `epw` file as a DataFrame Parameters ---------- path_epw : Path path to `epw` file Returns ------- df_tmy: pd.DataFrame TMY results of `epw` file """ df_tmy = pd.read_csv(path_epw, skiprows=8, sep=",", header=None) df_tmy.columns = [x.strip() for x in header_EPW.split("\n")[1:-1]] df_tmy["DateTime"] = pd.to_datetime( pd.to_datetime( df_tmy["Year"] * 10000 + df_tmy["Month"] * 100 + df_tmy["Day"], format="%Y%m%d", ) + pd.to_timedelta(df_tmy["Hour"], unit="h") ) df_tmy = df_tmy.set_index("DateTime") return df_tmy
# generate EPW file from `df_TMY`
[docs]def gen_epw( df_output: pd.DataFrame, lat, lon, tz=0, path_epw=Path("./uTMY.epw"), ) -> Tuple[pd.DataFrame, str, Path]: """Generate an `epw` file of uTMY (urbanised Typical Meteorological Year) using SUEWS simulation results Parameters ---------- df_output : pd.DataFrame SUEWS simulation results. path_epw : Path, optional Path to store generated epw file, by default Path('./uTMY.epw'). lat: float Latitude of the site, used for calculating solar angle. lon: float Longitude of the site, used for calculating solar angle. tz: float time zone represented by time difference from UTC+0 (e.g., 8 for UTC+8), by default 0 (i.e., UTC+0) Returns ------- df_epw, text_meta, path_epw: Tuple[pd.DataFrame, str, Path] - df_epw: uTMY result - text_meta: meta-info text - path_epw: path to generated `epw` file """ import atmosp from pathlib import Path import pvlib # select months from representative years df_tmy = gen_TMY(df_output.copy()) # assign timezone info df_tmy.index = df_tmy.index.tz_localize(tz * 3600) # adding necessary variables that can be derive from supy output df_tmy["Dew Point Temperature"] = ( atmosp.calculate( "Td", T=df_tmy["T2"].values + 273.15, qv=df_tmy["Q2"].values, qv_unit="g/kg", RH=df_tmy["RH2"].values, rho=1.23, ) - 273.15 ) df_tmy["Atmospheric Station Pressure"] = atmosp.calculate( "p", T=df_tmy["T2"].values + 273.15, qv=df_tmy["Q2"].values, qv_unit="g/kg", RH=df_tmy["RH2"].values, rho=1.23, ) # processing solar radiation components df_tmy.loc[df_tmy["Kdown"] < 0.001, "Kdown"] = 0 # =================================================================== # relationship of solar radiation components: # GHI = DHI + DNI * cos (Z) # GHI: global horizontal irridiance # DHI: diffuse horizontal irridiance # DNI: direct normal irridiance # cos(Z): cosine of solar zenith angle # transfer simulated Kdown to GHI GHI = df_tmy["Kdown"] # global horizontal radiation df_tmy["Global Horizontal Radiation"] = GHI # solar zenith angle solar_zenith_deg = pvlib.solarposition.get_solarposition( df_tmy.index, lat, lon ).zenith # direct normal radaition # Determine DNI from GHI using the DIRINT modification of the DISC model. DNI = pvlib.irradiance.dirint( ghi=df_tmy["Global Horizontal Radiation"], times=df_tmy.index, solar_zenith=solar_zenith_deg, pressure=df_tmy["Atmospheric Station Pressure"], temp_dew=df_tmy["Dew Point Temperature"], use_delta_kt_prime=True, ).replace(np.nan, 0) df_tmy["Direct Normal Radiation"] = DNI.values # diffuse horizontal radiation # DHI = GHI - DNI * cos (Z) df_tmy["Diffuse Horizontal Radiation"] = GHI - DNI * np.cos( solar_zenith_deg * np.pi / 180 ) # end: solar radiation processing # =================================================================== # horizontal infrared radiation df_tmy["Horizontal Infrared Radiation Intensity"] = df_tmy["Ldown"] # conform column names to EPW standard df_TMY_x = df_tmy.rename(columns=dict_supy_epw) # initialise df_epw for EPW output df_epw = pd.DataFrame(columns=list_var_EPW, index=df_tmy.index) # dict of default values dict_var_dft = { "Data Source and Uncertainty Flags": -9992, "Extraterrestrial Horizontal Radiation": 9999, "Extraterrestrial Direct Normal Radiation": 9999, "Horizontal Infrared Radiation Intensity": 9999, "Direct Normal Radiation": 9999, "Global Horizontal Illuminance": 9999999, "Direct Normal Illuminance": 9999999, "Diffuse Horizontal Illuminance": 9999999, "Zenith Luminance": 9999, "Wind Direction": 999, "Total Sky Cover": 99, "Opaque Sky Cover": 99, "Visibility": 9999, "Ceiling Height": 99999, "Present Weather Observation": 9999, "Present Weather Codes": 9999, "Precipitable Water": 999, "Aerosol Optical Depth": 999, "Snow Depth": 999, "Days Since Last Snowfall": 99, "Albedo": 999, "Liquid Precipitation Depth": 999, "Liquid Precipitation Quantity": 999, } for var in list_var_EPW: try: df_epw[var] = df_TMY_x[var].values except: # print(f'{var} not existing! This variable will be filled with default value {dict_var_dft[var]}') try: df_epw[var] = np.ones(len(df_epw)) * dict_var_dft[var] except: df_epw[var] = np.nan # fill 'Data Source and Uncertainty Flags' df_epw[ "Data Source and Uncertainty Flags" ] = "?9?9?9?9E0?9?9?9*9*9?9*9*9?9*9*9?9?9*9*_*9*9*9*9*9" # df_epw["Global Horizontal Radiation"] = np.ones(len(df_epw)) * 9999 df_epw.index = df_TMY_x.index df_epw = df_epw.sort_values(["Month", "Day", "Hour"], axis=0) # save pure data to a csv for formatting path_epw = Path(path_epw) if not path_epw.parent.exists(): path_epw.parent.mkdir(parents=True) path_epw.touch(exist_ok=True) df_epw.to_csv(path_epw, index=None, header=None) text_data = path_epw.read_text().split("\n") # delete the csv file path_epw.unlink() text_meta = """ LOCATION,Chongqing Shapingba,Chongqing,CHN,CSWD,575160,lat,lon,tz,259.1 DESIGN CONDITIONS,1,Climate Design Data 2009 ASHRAE Handbook,,Heating,1,3.2,4.2,-0.2,3.8,6.5,1.3,4.3,6.2,4.9,7.6,4.3,7.5,1.4,0,Cooling,7,7.4,36.9,25.6,35.5,25.6,34.2,25.4,27.4,32.7,26.9,32.2,26.4,31.6,2.5,110,26.1,22.2,30.2,25.6,21.5,29.8,25.1,20.8,29.3,89.3,32.7,86.9,32.5,84.7,31.7,909,Extremes,5.1,4.3,3.6,35.4,1.1,38.8,1.3,1.6,0.1,40,-0.6,40.9,-1.4,41.8,-2.3,43 TYPICAL/EXTREME PERIODS,6,Summer - Week Nearest Max Temperature For Period,Extreme,7/27,8/ 2,Summer - Week Nearest Average Temperature For Period,Typical,7/ 6,7/12,Winter - Week Nearest Min Temperature For Period,Extreme,12/22,1/ 5,Winter - Week Nearest Average Temperature For Period,Typical,1/13,1/19,Autumn - Week Nearest Average Temperature For Period,Typical,10/13,10/19,Spring - Week Nearest Average Temperature For Period,Typical,4/12,4/18 GROUND TEMPERATURES,3,.5,,,,13.31,10.23,9.39,10.12,14.28,18.95,23.34,26.51,27.44,25.95,22.35,17.82,2,,,,16.09,13.20,11.82,11.77,13.97,17.16,20.59,23.54,25.06,24.77,22.74,19.63,4,,,,17.90,15.65,14.27,13.85,14.66,16.52,18.83,21.09,22.62,22.98,22.11,20.29 HOLIDAYS/DAYLIGHT SAVINGS,No,0,0,0 COMMENTS 1, generated by SuPy COMMENTS 2, none DATA PERIODS,1,1,Data,Sunday,1/1,12/31 """ text_meta = text_meta.split("\n")[1:-1] #change the default latitude, longuitude and timezone from args, which will be used to calculate solar position and received direct solar radiation at each surface in EnergyPlus text_meta[0] = text_meta[0].replace('lat',str(lat)) text_meta[0] = text_meta[0].replace('lon',str(lon)) text_meta[0] = text_meta[0].replace('tz',str(tz)) # lines = [] text_epw = "\n".join(text_meta + text_data) # with open(path_epw, 'r') as f: # for line in f: # lines.append(line) # lines.insert(0, text_meta[1:]) # s = ''.join(lines) # write out the actual EPW file path_epw.write_text(text_epw) # with open(path_epw, "w") as fp: # fp.write(text_epw) return df_epw, text_meta, path_epw