"""
Collection of classes and functions to obtain spectral parameters.
"""
from pvlib import iotools
import os
import glob
import pandas as pd
from rex import NSRDBX, Outputs
from pvdeg import humidity
import datetime
import numpy as np
import h5py
import dask.dataframe as dd
import xarray as xr
[docs]
def get(database, id=None, geospatial=False, **kwargs):
"""
Load weather data directly from NSRDB or through any other PVLIB i/o
Load weather data directly from NSRDB or through any other PVLIB i/o
tools function
Parameters:
-----------
database : (str)
'NSRDB' or 'PVGIS'
id : (int or tuple)
If NSRDB, id is the gid for the desired location
If PVGIS, id is a tuple of (latitude, longitude) for the desired location
geospatial : (bool)
If True, initialize weather data via xarray dataset and meta data via
dask dataframe. This is useful for large scale geospatial analyses on
distributed compute systems. Geospaital analyses are only supported for
NSRDB data and locally stored h5 files that follow pvlib conventions.
**kwargs :
Additional keyword arguments to pass to the get_weather function
(see pvlib.iotools.get_psm3 for PVGIS, and get_NSRDB for NSRDB)
Returns:
--------
weather_df : (pd.DataFrame)
DataFrame of weather data
meta : (dict)
Dictionary of metadata for the weather data
"""
META_MAP = {"elevation": "altitude", "Local Time Zone": "tz"}
if type(id) is tuple:
location = id
gid = None
lat = location[0]
lon = location[1]
elif type(id) is int:
gid = id
location = None
elif id is None:
if not geospatial:
raise TypeError(
"Specify location via tuple (latitude, longitude), or gid integer."
)
if not geospatial:
# TODO: decide wether to follow NSRDB or pvlib conventions...
# e.g. temp_air vs. air_temperature
# "map variables" will guarantee PVLIB conventions (automatic in coming update) which is "temp_air"
if database == "NSRDB":
weather_df, meta = get_NSRDB(gid=gid, location=location, **kwargs)
elif database == "PVGIS":
URL = "https://re.jrc.ec.europa.eu/api/v5_2/"
weather_df, _, meta, _ = iotools.get_pvgis_tmy(
latitude=lat, longitude=lon, map_variables=True, url=URL, **kwargs
)
meta = meta["location"]
elif database == "PSM3":
weather_df, meta = iotools.get_psm3(latitude=lat, longitude=lon, **kwargs)
elif database == "local":
fp = kwargs.pop("file")
fn, fext = os.path.splitext(fp)
weather_df, meta = read(gid=gid, file_in=fp, file_type=fext[1:], **kwargs)
else:
raise NameError("Weather database not found.")
for key in [*meta.keys()]:
if key in META_MAP.keys():
meta[META_MAP[key]] = meta.pop(key)
if database == "NSRDB" or database == "PSM3":
meta["wind_height"] = 2
meta["Source"] = "NSRDB"
elif database == "PVGIS":
meta["wind_height"] = 10
meta["Source"] = "PVGIS"
else:
meta["wind_height"] = None
# switch weather data headers and metadata to pvlib standard
map_weather(weather_df)
map_meta(meta)
if "relative_humidity" not in weather_df.columns:
print('Column "relative_humidity" not found in DataFrame. Calculating...')
weather_df = humidity._ambient(weather_df)
return weather_df, meta
elif geospatial:
if database == "NSRDB":
weather_ds, meta_df = get_NSRDB(geospatial=geospatial, **kwargs)
meta_df["wind_height"] = 2
elif database == "local":
fp = kwargs.pop("file")
weather_ds, meta_df = ini_h5_geospatial(fp)
else:
raise NameError(f"Geospatial analysis not implemented for {database}.")
return weather_ds, meta_df
[docs]
def read(file_in, file_type, map_variables=True, **kwargs):
"""
Read a locally stored weather file of any PVLIB compatible type
#TODO: add error handling
Parameters:
-----------
file_in : (path)
full file path to the desired weather file
file_type : (str)
type of weather file from list below (verified)
[psm3, tmy3, epw, h5, csv]
"""
META_MAP = {"elevation": "altitude", "Local Time Zone": "tz"}
supported = ["psm3", "tmy3", "epw", "h5", "csv"]
file_type = file_type.upper()
if file_type in ["PSM3", "PSM"]:
weather_df, meta = iotools.read_psm3(filename=file_in, map_variables=True)
elif file_type in ["TMY3", "TMY"]:
weather_df, meta = iotools.read_tmy3(
filename=file_in
) # map variable not worki - check pvlib for map_variables
elif file_type == "EPW":
weather_df, meta = iotools.read_epw(filename=file_in)
elif file_type == "H5":
weather_df, meta = read_h5(file=file_in, **kwargs)
elif file_type == "CSV":
weather_df, meta = csv_read(filename=file_in)
else:
print(f"File-Type not recognized. supported types:\n{supported}")
if not isinstance(meta, dict):
meta = meta.to_dict()
# map meta-names as needed
if map_variables == True:
map_weather(weather_df)
map_meta(meta)
if weather_df.index.tzinfo is None:
tz = "Etc/GMT%+d" % -meta["tz"]
weather_df = weather_df.tz_localize(tz)
return weather_df, meta
[docs]
def csv_read(filename):
"""
Read a locally stored csv weather file. The first line contains the meta data
variable names, and the second line contains the meta data values. This is followed
by the meterological data.
Parameters:
-----------
file_path : (str)
file path and name of h5 file to be read
Returns:
--------
weather_df : (pd.DataFrame)
DataFrame of weather data
meta : (dict)
Dictionary of metadata for the weather data
"""
file1 = open(filename, "r")
# get the meta data from the first two lines
metadata_fields = file1.readline().split(",")
metadata_fields[-1] = metadata_fields[-1].strip() # strip trailing newline
metadata_values = file1.readline().split(",")
metadata_values[-1] = metadata_values[-1].strip() # strip trailing newline
meta = dict(zip(metadata_fields, metadata_values))
for (
key
) in meta: # converts everything to a float that is possible to convert to a float
try:
meta[key] = float(meta[key])
except:
pass
# get the column headers
columns = file1.readline().split(",")
columns[-1] = columns[-1].strip() # strip trailing newline
# remove blank columns if they are there
columns = [col for col in columns if col != ""]
dtypes = dict.fromkeys(columns, float) # all floats except datevec
dtypes.update(Year=int, Month=int, Day=int, Hour=int, Minute=int)
dtypes["Cloud Type"] = int
dtypes["Fill Flag"] = int
weather_df = pd.read_csv(
file1,
header=None,
names=columns,
usecols=columns,
dtype=dtypes,
delimiter=",",
lineterminator="\n",
)
try:
dtidx = pd.to_datetime(
weather_df[["Year", "Month", "Day", "Hour", "Minute", "Second"]]
)
except:
try:
dtidx = pd.to_datetime(
weather_df[["Year", "Month", "Day", "Hour", "Minute"]]
)
except:
try:
dtidx = pd.to_datetime(weather_df[["Year", "Month", "Day", "Hour"]])
finally:
dtidx = print(
"Your data file should have columns for Year, Month, Day, and Hour"
)
weather_df.index = pd.DatetimeIndex(dtidx)
file1.close()
return weather_df, meta
[docs]
def map_weather(weather_df):
""" "
This will update the headings for meterological data to standard forms
as outlined in https://github.com/DuraMAT/pv-terms.
Returns:
--------
weather_df : (pd.DataFrame)
DataFrame of weather data with modified column headers.
"""
DSET_MAP = {
"year": "Year",
"month": "Month",
"day": "Day",
"hour": "Hour",
"minute": "Minute",
"second": "Second",
"GHI": "ghi",
"DHI": "dhi",
"DNI": "dni",
"Clearsky GHI": "ghi_clear",
"Clearsky DHI": "dhi_clear",
"Clearsky DNI": "dni_clear",
"Solar Zenith Angle": "solar_zenith",
"Temperature": "temp_air",
"air_temperature": "temp_air",
"Relative Humidity": "relative_humidity",
"Dew Point": "dew_point",
"temp_dew": "dew_point",
"Pressure": "pressure",
"Wind Speed": "wind_speed",
"Wind Direction": "wind_direction",
"Surface Albedo": "albedo",
"Precipitable Water": "precipitable_water",
}
for column_name in weather_df.columns:
if column_name in [*DSET_MAP.keys()]:
weather_df.rename(
columns={column_name: DSET_MAP[column_name]}, inplace=True
)
return weather_df
[docs]
def read_h5(gid, file, attributes=None, **_):
"""
Read a locally stored h5 weather file that follows NSRDB conventions.
Parameters:
-----------
file : (str)
file path and name of h5 file to be read
gid : (int)
gid for the desired location
attributes : (list)
List of weather attributes to extract from NSRDB
Returns:
--------
weather_df : (pd.DataFrame)
DataFrame of weather data
meta : (dict)
Dictionary of metadata for the weather data
"""
if os.path.dirname(file):
fp = file
else:
fp = os.path.join(os.path.dirname(__file__), os.path.basename(file))
with Outputs(fp, mode="r") as f:
meta = f.meta.loc[gid]
index = f.time_index
dattr = f.attrs
# TODO: put into utilities
if attributes == None:
attributes = list(dattr.keys())
try:
attributes.remove("meta")
attributes.remove("tmy_year_short")
except ValueError:
pass
weather_df = pd.DataFrame(index=index, columns=attributes)
for dset in attributes:
with Outputs(fp, mode="r") as f:
weather_df[dset] = f[dset, :, gid]
return weather_df, meta.to_dict()
[docs]
def ini_h5_geospatial(fps):
"""
initialize an h5 weather file that follows NSRDB conventions for
geospatial analyses.
Parameters:
-----------
file_path : (str)
file path and name of h5 file to be read
gid : (int)
gid for the desired location
attributes : (list)
List of weather attributes to extract from NSRDB
Returns:
--------
weather_df : (pd.DataFrame)
DataFrame of weather data
meta : (dict)
Dictionary of metadata for the weather data
"""
dss = []
drop_variables = ["meta", "time_index", "tmy_year", "tmy_year_short", "coordinates"]
for i, fp in enumerate(fps):
hf = h5py.File(fp, "r")
attr = list(hf)
attr_to_read = [elem for elem in attr if elem not in drop_variables]
chunks = []
shapes = []
for var in attr_to_read:
chunks.append(
hf[var].chunks if hf[var].chunks is not None else (np.nan, np.nan)
)
shapes.append(
hf[var].shape if hf[var].shape is not None else (np.nan, np.nan)
)
chunks = min(set(chunks))
shapes = min(set(shapes))
if i == 0:
time_index = pd.to_datetime(hf["time_index"][...].astype(str)).values
meta_df = pd.read_hdf(fp, key="meta")
coords = {"gid": meta_df.index.values, "time": time_index}
coords_len = {"time": time_index.shape[0], "gid": meta_df.shape[0]}
ds = xr.open_dataset(
fp,
engine="h5netcdf",
phony_dims="sort",
chunks={"phony_dim_0": chunks[0], "phony_dim_1": chunks[1]},
drop_variables=drop_variables,
mask_and_scale=False,
decode_cf=True,
)
for var in ds.data_vars:
if hasattr(getattr(ds, var), "psm_scale_factor"):
scale_factor = 1 / ds[var].psm_scale_factor
getattr(ds, var).attrs["scale_factor"] = scale_factor
# TODO: delete
# if tuple(coords_len.values()) == (
# ds.sizes["phony_dim_0"],
# ds.sizes["phony_dim_1"],
# ):
# rename = {"phony_dim_0": "time", "phony_dim_1": "gid"}
# elif tuple(coords_len.values()) == (
# ds.sizes["phony_dim_1"],
# ds.sizes["phony_dim_0"],
# ):
# rename = {"phony_dim_0": "gid", "phony_dim_1": "time"}
# else:
# raise ValueError("Dimensions do not match for {}".format(var))
rename = {}
for (
phony,
length,
) in ds.sizes.items():
if length == coords_len["time"]:
rename[phony] = "time"
elif length == coords_len["gid"]:
rename[phony] = "gid"
ds = ds.rename(rename)
ds = ds.assign_coords(coords)
# TODO: In case re-chunking becomes necessary
# ax0 = list(ds.sizes.keys())[list(ds.sizes.values()).index(shapes[0])]
# ax1 = list(ds.sizes.keys())[list(ds.sizes.values()).index(shapes[1])]
# ds = ds.chunk(chunks={ax0:chunks[0], ax1:chunks[1]})
dss.append(ds)
ds = xr.merge(dss)
ds = xr.decode_cf(ds)
# Rechunk time axis
ds = ds.chunk(chunks={"time": -1, "gid": ds.chunks["gid"]})
weather_ds = ds
return weather_ds, meta_df
[docs]
def get_NSRDB_fnames(satellite, names, NREL_HPC=False, **_):
"""
Get a list of NSRDB files for a given satellite and year
Parameters:
-----------
satellite : (str)
'GOES', 'METEOSAT', 'Himawari', 'SUNY', 'CONUS', 'Americas'
names : (int or str)
PVLIB naming convention year or 'TMY':
If int, year of desired data
If str, 'TMY' or 'TMY3'
NREL_HPC : (bool)
If True, use NREL HPC path
If False, use AWS path
Returns:
--------
nsrdb_fnames : (list)
List of NSRDB files for a given satellite and year
hsds : (bool)
If True, use h5pyd to access NSRDB files
If False, use h5py to access NSRDB files
"""
sat_map = {
"GOES": "full_disc",
"METEOSAT": "meteosat",
"Himawari": "himawari",
"SUNY": "india",
"CONUS": "conus",
"Americas": "current",
}
if NREL_HPC:
hpc_fp = "/datasets/NSRDB/"
hsds = False
else:
hpc_fp = "/nrel/nsrdb/v3/"
hsds = True
if type(names) in [int, float]:
nsrdb_fp = os.path.join(
hpc_fp, sat_map[satellite], "*_{}.h5".format(int(names))
)
nsrdb_fnames = glob.glob(nsrdb_fp)
else:
nsrdb_fp = os.path.join(
hpc_fp, sat_map[satellite], "*_{}*.h5".format(names.lower())
)
nsrdb_fnames = glob.glob(nsrdb_fp)
if len(nsrdb_fnames) == 0:
raise FileNotFoundError(
"Couldn't find NSRDB input files! \nSearched for: '{}'".format(nsrdb_fp)
)
return nsrdb_fnames, hsds
[docs]
def get_NSRDB(
satellite=None,
names="TMY",
NREL_HPC=False,
gid=None,
location=None,
geospatial=False,
attributes=None,
**_,
):
"""
Get NSRDB weather data from different satellites and years.
Provide either gid or location tuple.
Parameters:
-----------
satellite : (str)
'GOES', 'METEOSAT', 'Himawari', 'SUNY', 'CONUS', 'Americas'
names : (int or str)
If int, year of desired data
If str, 'TMY' or 'TMY3'
NREL_HPC : (bool)
If True, use NREL HPC path
If False, use AWS path
gid : (int)
gid for the desired location
location : (tuple)
(latitude, longitude) for the desired location
attributes : (list)
List of weather attributes to extract from NSRDB
Returns:
--------
weather_df : (pd.DataFrame)
DataFrame of weather data
meta : (dict)
Dictionary of metadata for the weather data
"""
DSET_MAP = {"air_temperature": "temp_air", "Relative Humidity": "relative_humidity"}
META_MAP = {"elevation": "altitude", "Local Time Zone": "tz", "timezone": "tz"}
if (
satellite == None
): # TODO: This function is not fully written as of January 3, 2024
satellite, gid = get_satellite(location)
print("the satellite is ", satellite)
if not geospatial:
nsrdb_fnames, hsds = get_NSRDB_fnames(
satellite=satellite, names=names, NREL_HPC=NREL_HPC
)
dattr = {}
for i, file in enumerate(nsrdb_fnames):
with NSRDBX(file, hsds=hsds) as f:
if i == 0:
if gid == None: # TODO: add exception handling
gid = f.lat_lon_gid(location)
meta = f["meta", gid].iloc[0]
index = f.time_index
lattr = f.datasets
for attr in lattr:
dattr[attr] = file
if attributes == None:
attributes = list(dattr.keys())
try:
attributes.remove("meta")
attributes.remove("tmy_year_short")
except ValueError:
pass
weather_df = pd.DataFrame(index=index)
for dset in attributes:
# switch dset names to pvlib standard
if dset in [*DSET_MAP.keys()]:
column_name = DSET_MAP[dset]
else:
column_name = dset
with NSRDBX(dattr[dset], hsds=hsds) as f:
weather_df[column_name] = f[dset, :, gid]
# switch meta key names to pvlib standard
re_idx = []
for key in [*meta.index]:
if key in META_MAP.keys():
re_idx.append(META_MAP[key])
else:
re_idx.append(key)
meta.index = re_idx
return weather_df, meta.to_dict()
elif geospatial:
nsrdb_fnames, hsds = get_NSRDB_fnames(satellite, names, NREL_HPC)
weather_ds, meta_df = ini_h5_geospatial(nsrdb_fnames)
if attributes is not None:
weather_ds = weather_ds[attributes]
for dset in weather_ds.data_vars:
if dset in DSET_MAP.keys():
weather_ds = weather_ds.rename({dset: DSET_MAP[dset]})
for mset in meta_df.columns:
if mset in META_MAP.keys():
meta_df.rename(columns={mset: META_MAP[mset]}, inplace=True)
return weather_ds, meta_df
[docs]
def repeat_annual_time_series(time_series, start_year, n_years):
"""
Repeat a pandas time series dataframe containing annual data.
For example, repeat TMY data by n_years, adding in leap days as necessary.
For now, this function requires 1 or more full years of uniform
interval (non-leap year) data, i.e. length must be a multiple of 8760.
On leap days, all data is set to 0.
TODO: make it possible to have weirder time series, e.g. non uniform intervals.
Include option for synthetic leap day data
Parameters:
-----------
time_series : (pd.DataFrame)
pandas dataframe with DatetimeIndex
time_series : (int)
desired starting year of time_series
n_years : (int)
number of years to repeat time_series
Returns:
--------
new_time_series : (pd.DataFrame)
pandas dataframe repeated n_years
"""
if len(time_series) % 8760 != 0:
raise ValueError("Length of time_series must be a multiple of 8760")
tz = time_series.index.tz
time_series = time_series.tz_localize(
None
) # timezone aware timeseries can cause problems, we'll make it tz-naive for now
time_series.index = time_series.index.map(lambda dt: dt.replace(year=start_year))
start = time_series.index[0]
for year in range(start_year, start_year + n_years):
if year == start_year:
if is_leap_year(year):
this_year = time_series.copy()
this_year.index = time_series.index.map(
lambda dt: dt.replace(year=year)
)
this_year = pd.concat(
[
this_year[: str(year) + "-02-28"],
pd.DataFrame(
0,
index=pd.date_range(
start=datetime.datetime(
year=year, month=2, day=29, minute=start.minute
),
end=datetime.datetime(year=year, month=3, day=1),
freq="H",
),
columns=time_series.columns,
),
this_year[str(year) + "-03-01" :],
]
)
new_time_series = this_year
else:
this_year = time_series.copy()
this_year.index = time_series.index.map(
lambda dt: dt.replace(year=year)
)
new_time_series = this_year
else:
if is_leap_year(year):
this_year = time_series.copy()
this_year.index = time_series.index.map(
lambda dt: dt.replace(year=year)
)
this_year = pd.concat(
[
this_year[: str(year) + "-02-28"],
pd.DataFrame(
0,
index=pd.date_range(
start=datetime.datetime(
year=year, month=2, day=29, minute=start.minute
),
end=datetime.datetime(year=year, month=3, day=1),
freq="H",
),
columns=time_series.columns,
),
this_year[str(year) + "-03-01" :],
]
)
new_time_series = pd.concat([new_time_series, this_year])
else:
this_year = time_series.copy()
this_year.index = time_series.index.map(
lambda dt: dt.replace(year=year)
)
new_time_series = pd.concat([new_time_series, this_year])
new_time_series.index = new_time_series.index.tz_localize(
tz=tz
) # add back in the timezone
return new_time_series
[docs]
def is_leap_year(year):
"""Returns True if year is a leap year"""
if year % 4 != 0:
return False
elif year % 100 != 0:
return True
elif year % 400 != 0:
return False
else:
return True
[docs]
def get_satellite(location):
"""
identify a satellite to use for a given lattitude and longitude. This is to provide default values worldwide, but a more
experienced user may want to specify a specific satellite to get better data.
Provide a location tuple.
Parameters:
-----------
location : (tuple)
(latitude, longitude) for the desired location
Returns:
--------
satellite : (str)
'GOES', 'METEOSAT', 'Himawari', 'SUNY', 'CONUS', 'Americas'
gid : (int)
gid for the desired location
"""
# this is just a placeholder till the actual code gets programmed.
satellite = "PSM3"
# gid = f.lat_lon_gid(lat_lon=location) # I couldn't get this to work
gid = None
return satellite, gid
[docs]
def write(data_df, metadata, savefile="WeatherFile.csv"):
"""
Saves dataframe with weather data and any associated meta data in an *.csv format.
The metadata will be formatted on the first two lines with the first being the descriptor
and the second line being the value. Then the meterological, time and other data series
headers on on the third line with all the subsequent data on the remaining lines. This
format can be read by the PVDeg software.
Parameters
-----------
data_df : pandas.DataFrame
timeseries data.
metdata : dictionary
Dictionary with 'latitude', 'longitude', 'altitude', 'source',
'tz' for timezone, and other meta data.
savefile : str
Name of file to save output as.
Name of file to save output as.
standardSAM : boolean
This checks the dataframe to avoid having a leap day, then averages it
to SAM style (closed to the right),
and fills the years so it starst on YEAR/1/1 0:0 and ends on
YEAR/12/31 23:00.
includeminute ; Bool
For hourly data, if SAM input does not have Minutes, it calculates the
sun position 30 minutes prior to the hour (i.e. 12 timestamp means sun
position at 11:30).
If minutes are included, it will calculate the sun position at the time
of the timestamp (12:00 at 12:00)
Set to true if resolution of data is sub-hourly.
Name of file to save output as.
standardSAM : boolean
This checks the dataframe to avoid having a leap day, then averages it
to SAM style (closed to the right),
and fills the years so it starst on YEAR/1/1 0:0 and ends on
YEAR/12/31 23:00.
includeminute ; Bool
For hourly data, if SAM input does not have Minutes, it calculates the
sun position 30 minutes prior to the hour (i.e. 12 timestamp means sun
position at 11:30).
If minutes are included, it will calculate the sun position at the time
of the timestamp (12:00 at 12:00)
Set to true if resolution of data is sub-hourly.
Returns
-------
Nothing, it just writes the file.
"""
meta_string = (
", ".join(str(key) for key, value in metadata.items())
+ "\n"
+ ", ".join(str(value) for key, value in metadata.items())
)
result_df = pd.concat([data_df], axis=1).reindex()
savedata = result_df.to_string(index=False).split("\n")
savedata.pop(0)
savedata = [",".join(ele.split()) for ele in savedata]
savedata = "\n".join(savedata)
columns = list(
data_df.columns
) # This had to be pulled out separately because spaces can get turned into commas in the header names.
str1 = ""
for ele in columns:
str1 = str1 + ele + ","
savedata = meta_string + "\n" + str1 + "\n" + savedata
file1 = open(savefile, "w")
file1.writelines(savedata)
file1.close()