Source code for flexvalue.config

import toml

from dataclasses import dataclass, field
from typing import List


class FLEXValueException(Exception):
    pass


[docs]@dataclass class FLEXValueConfig: database_type: str host: str = None port: int = None user: str = None password: str = None database: str = None project: str = None elec_load_shape_file: str = None elec_av_costs_file: str = None therms_profiles_file: str = None gas_av_costs_file: str = None project_info_file: str = None metered_load_shape_file: str = None elec_av_costs_table: str = None elec_load_shape_table: str = None therms_profiles_table: str = None gas_av_costs_table: str = None project_info_table: str = None metered_load_shape_table: str = None output_file: str = None output_table: str = None electric_output_table: str = None gas_output_table: str = None aggregation_columns: List[str] = field(default_factory=list) process_elec_load_shape: bool = False process_elec_av_costs: bool = False process_therms_profiles: bool = False process_gas_av_costs: bool = False process_metered_load_shape: bool = False reset_elec_load_shape: bool = False reset_elec_av_costs: bool = False reset_therms_profiles: bool = False reset_gas_av_costs: bool = False elec_components: List[str] = field(default_factory=list) gas_components: List[str] = field(default_factory=list) elec_addl_fields: List[str] = field(default_factory=list) gas_addl_fields: List[str] = field(default_factory=list) separate_output_tables: bool = False use_value_curve_name_for_join: bool = False @staticmethod def from_file(config_file): data = toml.load(config_file) db = data.get("database", dict()) run_info = data.get("run", dict()) return FLEXValueConfig( database_type=db.get("database_type", None), host=db.get("host", None), port=db.get("port", None), user=db.get("user", None), password=db.get("password", None), database=db.get("database", None), project=db.get("project", None), elec_av_costs_table=db.get("elec_av_costs_table", None), elec_load_shape_table=db.get("elec_load_shape_table", None), therms_profiles_table=db.get("therms_profiles_table", None), gas_av_costs_table=db.get("gas_av_costs_table", None), metered_load_shape_table=db.get("metered_load_shape_table", None), project_info_table=db.get("project_info_table", None), elec_load_shape_file=run_info.get("elec_load_shape", None), elec_av_costs_file=run_info.get("elec_av_costs", None), therms_profiles_file=run_info.get("therms_profiles", None), gas_av_costs_file=run_info.get("gas_av_costs", None), project_info_file=run_info.get("project_info", None), metered_load_shape_file=run_info.get("metered_load_shape_file", None), output_file=run_info.get("output_file", None), electric_output_table=run_info.get("electric_output_table", None), gas_output_table=run_info.get("gas_output_table", None), aggregation_columns=run_info.get("aggregation_columns", []), reset_elec_load_shape=run_info.get("reset_elec_load_shape", None), reset_elec_av_costs=run_info.get("reset_elec_av_costs", None), reset_gas_av_costs=run_info.get("reset_gas_av_costs", None), reset_therms_profiles=run_info.get("reset_therms_profiles", None), process_elec_load_shape=run_info.get("process_elec_load_shape", None), process_elec_av_costs=run_info.get("process_elec_av_costs", None), process_therms_profiles=run_info.get("process_therms_profiles", None), process_gas_av_costs=run_info.get("process_gas_av_costs", None), process_metered_load_shape=run_info.get("process_metered_load_shape", None), elec_components=run_info.get("elec_components", []), gas_components=run_info.get("gas_components", []), separate_output_tables=run_info.get("separate_output_tables", None), elec_addl_fields=run_info.get("elec_addl_fields", []), gas_addl_fields=run_info.get("gas_addl_fields", []), use_value_curve_name_for_join=run_info.get( "use_value_curve_name_for_join", None ), ) def validate(self): if not self.database_type: return if self.database_type == "postgresql": if not any( [ self.database_type, self.host, self.port, self.user, self.password, self.database, ] ): raise FLEXValueException( "When using postgresql, you must provide at least of the following values in the config file: host, port, user, password." ) if self.database_type == "bigquery": if not all( [ self.project, self.project_info_table, self.elec_load_shape_table, self.therms_profiles_table, self.elec_av_costs_table, self.gas_av_costs_table, ] ): raise FLEXValueException( "When using bigquery, you must provide all of the following values in the config file: project, project_info_table, elec_load_shape_table, therms_profiles_table, elec_av_costs_table, gas_av_costs_table." ) if self.separate_output_tables == True: if not self.electric_output_table or not self.gas_output_table: raise FLEXValueException( "When you specify separate_output_tables, you must specify both electric_output_table and gas_output_table." ) def float_type(self): if self.database_type == "bigquery": return "FLOAT64" else: return "FLOAT"