# # UrbanSim software. Copyright (C) 1998-2004 University of Washington # # You can redistribute this program and/or modify it under the terms of the # GNU General Public License as published by the Free Software Foundation # (http://www.gnu.org/copyleft/gpl.html). # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the file LICENSE.html for copyright # and licensing information, and the file ACKNOWLEDGMENTS.html for funding and # other acknowledgments. # from opus_core.logger import logger from opus_core.resources import Resources from opus_core.fork_process import ForkProcess from opus_core.coefficients import Coefficients from opus_core.variable_name import VariableName from opus_core.simulation_state import SimulationState from opus_core.dataset import DataSet from opus_core.store.cache_flt_data import CacheFltData from opus_core.equation_specification import EquationSpecification from opus_core.services.run_server.storage_creator import StorageCreator from opus_core.store.mysql_database_server import MysqlDatabaseServer from urbansim.session_configuration import SessionConfiguration from urbansim.model_coordinators.model_system import ModelSystem from opus_core.storage_factory import StorageFactory class Estimator(object): def __init__(self, config=None, save_estimation_results=False): if 'cache_directory' in config['creating_baseyear_cache_configuration'].keys(): self.simulation_state = SimulationState(new_instance=True) self.simulation_state.set_cache_directory(config['creating_baseyear_cache_configuration']['cache_directory']) else: base_cache_dir = None if 'cache_directory_root' in config['creating_baseyear_cache_configuration'].keys(): base_cache_dir = config['creating_baseyear_cache_configuration']['cache_directory_root'] self.simulation_state = SimulationState(new_instance=True, base_cache_dir=base_cache_dir) config['creating_baseyear_cache_configuration']['cache_directory'] = self.simulation_state.get_cache_directory() self.session_configuration = SessionConfiguration(config, new_instance=True).get_session_configuration() self.config = Resources(config) self.save_estimation_results = save_estimation_results self.debuglevel = self.config.get("debuglevel", 4) self.model_system = ModelSystem() models = self.config.get('models',[]) self.model_name = None if "model_name" in config.keys(): self.model_name = config["model_name"] else: for model in models: if isinstance(model, dict): model_name = model.keys()[0] if (model[model_name] == "estimate") or (isinstance(model[model_name], list) and ("estimate" in model[model_name])): self.model_name = model_name break if config['creating_baseyear_cache_configuration'].get('cache_from_mysql', True): ForkProcess().fork_new_process(config['creating_baseyear_cache_configuration']['cache_mysql_data'], self.config) else: if not self.simulation_state.cache_directory_exists(): CacheFltData().run(self.config) def estimate(self, out_storage=None): self.model_system.run(self.config) self.extract_coefficients_and_specification() if self.save_estimation_results: self.save_results(out_storage=out_storage) def reestimate(self, specification_module_name, out_storage=None, type=None): """specification_module_name is name of a module that contains a dictionary called 'specification'. """ exec("import " + specification_module_name) eval("reload (" + specification_module_name + ")") exec("specification_dict =" + specification_module_name + ".specification") if type is not None: specification_dict = specification_dict[type] specification = load_specification_from_variable(specification_dict) new_namespace = self.model_system.run_year_namespace new_namespace["specification"] = specification self.model_system.do_process(new_namespace) self.extract_coefficients_and_specification() if self.save_estimation_results: self.save_results(out_storage=out_storage) def save_results(self, out_storage=None, model_name=None): if self.specification is None or self.coefficients is None: raise ValueError, "model specification or coefficient is None" #invalid = self.coefficients.is_invalid() if False: logger.log_warning('Invalid coefficients. Not saving results!') return if model_name is None: if self.model_name is not None: model_name = self.model_name else: raise ValueError, "model_name unspecified" if out_storage: pass elif 'output_configuration' in self.config: db_server = MysqlDatabaseServer(self.config["output_configuration"]) output_db = db_server.get_database(self.config["output_configuration"].database_name) out_storage = StorageFactory().build_storage_for_dataset(type='mysql_storage', location=output_db) else: raise StandardError, "No output_configuration given." # the original model name of development_project_lcm is too long as a mysql db table name, truncate it if model_name.rfind("_development_project_location_choice_model") >=0: model_name = model_name.replace('_project', '') specification_table = '%s_specification' % model_name coefficients_table = '%s_coefficients' % model_name self.specification.write(out_storage=out_storage, out_table_name=specification_table) self.coefficients.write(out_storage=out_storage, out_table_name=coefficients_table) self.cache_specification_and_coefficients(out_storage, specification_table, coefficients_table) def cache_specification_and_coefficients(self, storage, specification_table, coefficients_table): for table_name in [specification_table, coefficients_table]: dataset = DataSet(in_storage=storage, in_table_name=table_name, id_name=[], debug = self.config.get("debuglevel",0)) dataset.load_dataset() dataset.flush_dataset() def extract_coefficients_and_specification(self): for key in self.model_system.run_year_namespace.keys(): if isinstance(self.model_system.run_year_namespace[key], Coefficients): self.coefficients = self.model_system.run_year_namespace[key] if isinstance(self.model_system.run_year_namespace[key], EquationSpecification): self.specification = self.model_system.run_year_namespace[key] #for key in self.model_system.vardict.keys(): # if key.rfind("coefficients") >=0: # if isinstance(self.model_system.vardict[key], tuple): # self.coefficients = self.model_system.vardict[key][0] # else: # self.coefficients = self.model_system.vardict[key] # if key.rfind("specification") >=0: # if isinstance(self.model_system.vardict[key], tuple): # self.specification = self.model_system.vardict[key][0] # else: # self.specification = self.model_system.vardict[key] def cleanup(self, remove_cache=True): """Use this only if you don't want to reestimate.""" self.simulation_state.remove_singleton(delete_cache=remove_cache) self.session_configuration.remove_singleton() def load_specification_from_variable(spec_var): variables = [] coefficients = [] equations = [] submodels = [] #for variable in variables: try: for sub_model, submodel_spec in spec_var.items(): if isinstance(submodel_spec, tuple) or isinstance(submodel_spec, list): for var_coef in submodel_spec: if isinstance(var_coef, str): #var_coef is actually just variables long names variables.append(var_coef) coefficients.append(VariableName(var_coef).alias()) elif isinstance(var_coef, tuple) or isinstance(var_coef, list): if len(var_coef) == 1: variables.append(var_coef[0]) coefficients.append(VariableName(var_coef[0]).alias()) elif len(var_coef) == 2: variables.append(var_coef[0]) coefficients.append(var_coef[1]) else: logger.log_error("Wrong specification format for submodel %s variable %s" % sub_model, submodel_spec) elif isinstance(var_coef, dict): variables.append(var_coef.keys()[0]) coefficients.append(var_coef.values()[0]) else: logger.log_error("Wrong specification format for submodel %s variable %s" % sub_model, submodel_spec) submodels.append(sub_model) elif isinstance(submodel_spec, dict): if submodel_spec.has_key("equation_ids"): equation_ids = submodel_spec["equation_ids"] del submodel_spec["equation_ids"] else: equation_ids = None for var, coef in submodel_spec.items(): if not equation_ids: variables.append(var) coefficients.append(coef) equations = [] submodels.append(sub_model) elif type(coef) is list or type(coef) is tuple: for i in range(len(coef)): if coef[i] != 0: variables.append(var) coefficients.append(coef[i]) equations.append(equation_ids[i]) submodels.append(sub_model) else: logger.log_error("Wrong specification format for submodel %s variable %s; \nwith equation_ids provided, coefficients must be a list or tuple of the same length of equation_ids" % sub_model, var) except: raise ValueError, "Wrong specification format for model specification." specification = EquationSpecification(variables=variables, coefficients=coefficients, equations = equations, submodels=submodels) return specification def get_specification_for_estimation(specification_dict=None, specification_storage=None, specification_table = None): if specification_dict is not None: return load_specification_from_variable(specification_dict) from opus_core.choice_model import prepare_specification_and_coefficients (specification, dummy) = prepare_specification_and_coefficients(specification_storage, specification_table) return specification def update_controller_by_specification_from_module(run_configuration, model_name, specification_module): controller = run_configuration["models_configuration"][model_name]["controller"] controller["import"][specification_module] = "specification as spec" controller["prepare_for_estimate"]["arguments"]["specification_dict"] = "spec" controller["prepare_for_estimate"]["arguments"]["specification_storage"] = "None" run_configuration["models_configuration"][model_name]["controller"].merge(controller) return run_configuration