diff --git a/shell/src/main.py b/shell/src/main.py index d7955a55..decdc81e 100644 --- a/shell/src/main.py +++ b/shell/src/main.py @@ -63,11 +63,11 @@ def run(stdin, stdout, stderr, argv): if args.jobs != 1: import crosscat.MultiprocessingEngine as ccme jobs = args.jobs if args.jobs > 0 else None - crosscat = ccme.MultiprocessingEngine(seed=args.seed, cpu_count=jobs) + metamodel = CrosscatMetamodel( + ccme.MultiprocessingEngine, cckwargs={'pool': ccme.Pool(jobs)}) else: import crosscat.LocalEngine as ccle - crosscat = ccle.LocalEngine(seed=args.seed) - metamodel = CrosscatMetamodel(crosscat) + metamodel = CrosscatMetamodel(ccle.LocalEngine) bayeslite.bayesdb_register_metamodel(bdb, metamodel) bdbshell = shell.Shell(bdb, 'crosscat', stdin, stdout, stderr) with hook.set_current_shell(bdbshell): diff --git a/src/__init__.py b/src/__init__.py index 9e02febc..138021d0 100644 --- a/src/__init__.py +++ b/src/__init__.py @@ -116,7 +116,7 @@ def bql_quote_name(name): from bayeslite.metamodels.crosscat import CrosscatMetamodel from crosscat.LocalEngine import LocalEngine as CrosscatLocalEngine -bayesdb_builtin_metamodel(CrosscatMetamodel(CrosscatLocalEngine(seed=0))) +bayesdb_builtin_metamodel(CrosscatMetamodel(CrosscatLocalEngine)) import bayeslite.remote import os diff --git a/src/metamodel.py b/src/metamodel.py index c4d21f54..693d3cb8 100644 --- a/src/metamodel.py +++ b/src/metamodel.py @@ -27,7 +27,7 @@ from bayeslite.metamodels.crosscat import CrosscatMetamodel bdb = bayeslite.bayesdb_open(pathname='foo.bdb', builtin_metamodels=False) - cc = crosscat.LocalEngine.LocalEngine(seed=0) + cc = crosscat.LocalEngine.LocalEngine bayeslite.bayesdb_register_metamodel(bdb, CrosscatMetamodel(cc)) Then you can model a table with Crosscat and query the probable diff --git a/src/metamodels/crosscat.py b/src/metamodels/crosscat.py index 33048e15..5dc84f37 100644 --- a/src/metamodels/crosscat.py +++ b/src/metamodels/crosscat.py @@ -22,6 +22,9 @@ interface for Crosscat. """ +# For the EngineTemplate import +from __future__ import absolute_import + import apsw import itertools import json @@ -43,6 +46,9 @@ from bayeslite.util import cursor_value from bayeslite.util import unique +from crosscat.EngineTemplate import EngineTemplate +from crosscat.MultiprocessingEngine import MultiprocessingEngine, Pool + crosscat_schema_1 = ''' INSERT INTO bayesdb_metamodel (name, version) VALUES ('crosscat', 1); @@ -220,15 +226,55 @@ class CrosscatMetamodel(metamodel.IBayesDBMetamodel): Internally, the Crosscat metamodel adds SQL tables to the database with names that begin with ``bayesdb_crosscat_``. + + `crosscat` can be a crosscat engine instance or a constructor callable + which takes a seed. Prefer to pass a constructor, since that way a new + crosscat engine is constructed for each query, and its random seed is taken + from the BayesDB object. The 'crosscat as instance' case is kept for + backwards compatibility. + + If using a crosscat MultiprocessingEngine, the process pool must be passed + as a 'pool' argument. It would not do for a new process pool to be created + every time a new query is made. + + E.g. + + >>> from crosscat import MultiprocessingEngine as mpe + >>> import bayeslite.metamodels.crosscat as cc + >>> from bayeslite import bayesdb_register_metamodel as register_metamodel + >>> from bayeslite import bayesdb_open + >>> pool = mpe.Pool(4) + >>> engine_factory = lambda s: mpe.MultiprocessingEngine(seed=s, pool=pool) + >>> metamodel = cc.CrosscatMetamodel(engine_factory) + >>> bdb = bayesdb_open(builtin_metamodels=False) + >>> register_metamodel(bdb, metamodel) + + Since people will often want in particular to create MultiprocessingEngine + factories with a fixed thread pool, + crosscat.MultiprocessingEngine.MultiprocessingEngineFactoryFromPool is + provided as a convenience function: + + >>> engine_factory = mpe.MultiprocessingEngineFactoryFromPool(pool) + >>> metamodel = cc.CrosscatMetamodel(thelambda) + >>> register_metamodel(bdb, metamodel) + """ def __init__(self, crosscat, subsample=None): - if subsample is None: - subsample = False self._crosscat = crosscat - self._subsample = subsample + self._subsample = False if subsample is None else subsample self._theta_validator = crosscat_theta_validator.Validator() + def _crosscat_engine(self, bdb): + if isinstance(self._crosscat, EngineTemplate): + return self._crosscat + # XXX with so few seeds, the optimistic scenario is that someone is + # going to birthday-paradox themselves. We need an RNG on the crosscat + # side with a bigger seed space. Python's randint can take an arbitrary + # range, here, but crosscat can't take a bigger seed. + seed = bdb._py_prng.randint(0, 2**32-1) + return self._crosscat(seed=seed) + def _crosscat_cache_nocreate(self, bdb): if bdb.cache is None: return None @@ -403,7 +449,7 @@ def _crosscat_get_rows(self, bdb, generator_id, rowids, X_L_list, if len(rows) > 0: # Need to put more stuff into the subsample temporarily T = self._crosscat_data(bdb, generator_id, M_c) - X_L_list, X_D_list, T = self._crosscat.insert( + X_L_list, X_D_list, T = self._crosscat_engine(bdb).insert( M_c=M_c, T=T, X_L_list=X_L_list, @@ -794,7 +840,7 @@ def initialize_models(self, bdb, generator_id, modelnos, model_config): 'row_initialization': 'from_the_prior', } M_c = self._crosscat_metadata(bdb, generator_id) - X_L_list, X_D_list = self._crosscat.initialize( + X_L_list, X_D_list = self._crosscat_engine(bdb).initialize( M_c=M_c, M_r=None, # XXX T=self._crosscat_data(bdb, generator_id, M_c), @@ -811,7 +857,8 @@ def initialize_models(self, bdb, generator_id, modelnos, model_config): for colno1, colno2, dep in crosscat_gen_column_dependencies(bdb, generator_id)] if 0 < len(dep_constraints): - X_L_list, X_D_list = self._crosscat.ensure_col_dep_constraints( + engine = self._crosscat_engine(bdb) + X_L_list, X_D_list = engine.ensure_col_dep_constraints( M_c=M_c, M_r=None, T=self._crosscat_data(bdb, generator_id, M_c), @@ -944,7 +991,8 @@ def analyze_models(self, bdb, generator_id, modelnos=None, iterations=1, iterations_in_ckpt = 0 while True: X_L_list_0 = X_L_list - X_L_list, X_D_list, diagnostics = self._crosscat.analyze( + engine = self._crosscat_engine(bdb) + X_L_list, X_D_list, diagnostics = engine.analyze( M_c=M_c, T=T, do_diagnostics=True, @@ -1056,7 +1104,7 @@ def column_mutual_information(self, bdb, generator_id, modelno, colno0, X_D_list = self._crosscat_latent_data(bdb, generator_id, modelno) cc_colno0 = crosscat_cc_colno(bdb, generator_id, colno0) cc_colno1 = crosscat_cc_colno(bdb, generator_id, colno1) - r = self._crosscat.mutual_information( + r = self._crosscat_engine(bdb).mutual_information( M_c=self._crosscat_metadata(bdb, generator_id), X_L_list=X_L_list, X_D_list=X_D_list, @@ -1078,7 +1126,7 @@ def row_similarity(self, bdb, generator_id, modelno, rowid, target_rowid, [given_row_id, target_row_id], X_L_list, X_D_list = \ self._crosscat_get_rows(bdb, generator_id, [rowid, target_rowid], X_L_list, X_D_list) - return self._crosscat.similarity( + return self._crosscat_engine(bdb).similarity( M_c=self._crosscat_metadata(bdb, generator_id), X_L_list=X_L_list, X_D_list=X_D_list, @@ -1100,7 +1148,7 @@ def predict_confidence(self, bdb, generator_id, modelno, colno, rowid, self._crosscat_get_row(bdb, generator_id, rowid, X_L_list, X_D_list) cc_colno = crosscat_cc_colno(bdb, generator_id, colno) - code, confidence = self._crosscat.impute_and_confidence( + code, confidence = self._crosscat_engine(bdb).impute_and_confidence( M_c=M_c, X_L=X_L_list, X_D=X_D_list, @@ -1124,7 +1172,8 @@ def simulate_joint(self, bdb, generator_id, targets, constraints, X_D_list = self._crosscat_latent_data(bdb, generator_id, modelno) Q, Y, X_L_list, X_D_list = self._crosscat_remap_two( bdb, generator_id, X_L_list, X_D_list, targets, constraints) - raw_outputs = self._crosscat.simple_predictive_sample( + engine = self._crosscat_engine(bdb) + raw_outputs = engine.simple_predictive_sample( M_c=M_c, X_L=X_L_list, X_D=X_D_list, @@ -1155,7 +1204,7 @@ def logpdf_joint(self, bdb, generator_id, targets, constraints, X_D_list = self._crosscat_latent_data(bdb, generator_id, modelno) Q, Y, X_L_list, X_D_list = self._crosscat_remap_two( bdb, generator_id, X_L_list, X_D_list, targets, constraints) - r = self._crosscat.predictive_probability_multistate( + r = self._crosscat_engine(bdb).predictive_probability_multistate( M_c=M_c, X_L_list=X_L_list, X_D_list=X_D_list, @@ -1215,7 +1264,7 @@ def insertmany(self, bdb, generator_id, rows): modelnos = [modelno for modelno, _theta_json in models] thetas = [json.loads(theta_json) for _modelno, theta_json in models] - X_L_list, X_D_list, T = self._crosscat.insert( + X_L_list, X_D_list, T = self._crosscat_engine(bdb).insert( M_c=M_c, T=T, X_L_list=[theta['X_L'] for theta in thetas], diff --git a/tests/test_codebook.py b/tests/test_codebook.py index 102fcec3..8c60a62e 100644 --- a/tests/test_codebook.py +++ b/tests/test_codebook.py @@ -51,12 +51,12 @@ def test_codebook_value_map(): ANALYZE dummy_cc SIMULATE specifying `city` = `LA` (throws KeyError) ''' - + with bayeslite.bayesdb_open(builtin_metamodels=False) as bdb: - cc = crosscat.LocalEngine.LocalEngine(seed=0) + cc = crosscat.LocalEngine.LocalEngine ccme = CrosscatMetamodel(cc) bayeslite.bayesdb_register_metamodel(bdb, ccme) - + bayeslite.bayesdb_read_csv(bdb,'dummy', dummy_data, header=True,create=True) diff --git a/tests/test_column_dep.py b/tests/test_column_dep.py index cc0547da..381bb845 100644 --- a/tests/test_column_dep.py +++ b/tests/test_column_dep.py @@ -51,7 +51,7 @@ def test_complex_dependencies__ci_slow(): # Create the database. with bayeslite.bayesdb_open(builtin_metamodels=False) as bdb: - cc = crosscat.LocalEngine.LocalEngine(seed=0) + cc = crosscat.LocalEngine.LocalEngine ccme = CrosscatMetamodel(cc) bayeslite.bayesdb_register_metamodel(bdb, ccme) diff --git a/tests/test_core.py b/tests/test_core.py index 866cdd3c..c7cf85ca 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -42,10 +42,11 @@ def powerset(s): return itertools.chain.from_iterable(combinations) def local_crosscat(): - return crosscat.LocalEngine.LocalEngine(seed=0) + return crosscat.LocalEngine.LocalEngine def multiprocessing_crosscat(): - return crosscat.MultiprocessingEngine.MultiprocessingEngine(seed=0) + return crosscat.MultiprocessingEngine.MultiprocessingEngineFactoryFromPool( + crosscat.MultiprocessingEngine.Pool()) @contextlib.contextmanager def bayesdb(metamodel=None, **kwargs): @@ -284,8 +285,7 @@ def t1_subcat(): columns=['label CATEGORICAL', 'weight CATEGORICAL']) def t1_mp(): - crosscat = multiprocessing_crosscat() - metamodel = CrosscatMetamodel(crosscat) + metamodel = CrosscatMetamodel(multiprocessing_crosscat()) return bayesdb_generator(bayesdb(metamodel=metamodel), 't1', 't1_cc', t1_schema, t1_data, columns=['label CATEGORICAL', 'age NUMERICAL', 'weight NUMERICAL']) @@ -545,20 +545,19 @@ def test_crosscat_constraints(): class FakeEngine(crosscat.LocalEngine.LocalEngine): def predictive_probability_multistate(self, M_c, X_L_list, X_D_list, Y, Q): - self._last_Y = Y + FakeEngine._last_Y = Y sup = super(FakeEngine, self) return sup.simple_predictive_probability_multistate(M_c=M_c, X_L_list=X_L_list, X_D_list=X_D_list, Y=Y, Q=Q) def simple_predictive_sample(self, M_c, X_L, X_D, Y, Q, n): - self._last_Y = Y + FakeEngine._last_Y = Y return super(FakeEngine, self).simple_predictive_sample(M_c=M_c, X_L=X_L, X_D=X_D, Y=Y, Q=Q, n=n) def impute_and_confidence(self, M_c, X_L, X_D, Y, Q, n): - self._last_Y = Y + FakeEngine._last_Y = Y return super(FakeEngine, self).impute_and_confidence(M_c=M_c, X_L=X_L, X_D=X_D, Y=Y, Q=Q, n=n) - engine = FakeEngine(seed=0) - mm = CrosscatMetamodel(engine) + mm = CrosscatMetamodel(FakeEngine) with bayesdb(metamodel=mm) as bdb: t1_schema(bdb) t1_data(bdb) @@ -581,12 +580,12 @@ def impute_and_confidence(self, M_c, X_L, X_D, Y, Q, n): bdb.execute('ANALYZE t1_cc FOR 1 ITERATION WAIT') bdb.execute('ESTIMATE PROBABILITY OF age = 8 GIVEN (weight = 16)' ' BY t1_cc').next() - assert engine._last_Y == [(28, 2, 16)] + assert FakeEngine._last_Y == [(28, 2, 16)] bdb.execute("SELECT age FROM t1 WHERE label = 'baz'").next() bdb.execute("INFER age FROM t1_cc WHERE label = 'baz'").next() - assert engine._last_Y == [(3, 0, 1), (3, 2, 32)] + assert FakeEngine._last_Y == [(3, 0, 1), (3, 2, 32)] bdb.execute('SIMULATE weight FROM t1_cc GIVEN age = 8 LIMIT 1').next() - assert engine._last_Y == [(28, 1, 8)] + assert FakeEngine._last_Y == [(28, 1, 8)] def test_bayesdb_generator_fresh_row_id(): with bayesdb_generator(bayesdb(), 't1', 't1_cc', t1_schema, lambda x: 0,\ diff --git a/tests/test_correlation.py b/tests/test_correlation.py index fa41ebdc..befd4f30 100644 --- a/tests/test_correlation.py +++ b/tests/test_correlation.py @@ -22,7 +22,7 @@ def test_correlation(): with bayeslite.bayesdb_open(builtin_metamodels=False) as bdb: - cc = crosscat.LocalEngine.LocalEngine(seed=0) + cc = crosscat.LocalEngine.LocalEngine ccme = CrosscatMetamodel(cc) bayeslite.bayesdb_register_metamodel(bdb, ccme) bdb.sql_execute('CREATE TABLE u(id, c0, c1, n0, n1, r0, r1)') diff --git a/tests/test_guess.py b/tests/test_guess.py index d33478cf..984a6e1c 100644 --- a/tests/test_guess.py +++ b/tests/test_guess.py @@ -109,7 +109,7 @@ def test_guess_generator(): data = ((chr(c) + chr(d), (c + d) % 2, math.sqrt(c + d)) for c, d in aa_zz) for row in data: bdb.sql_execute('INSERT INTO t (x, y, z) VALUES (?, ?, ?)', row) - cc = crosscat.LocalEngine.LocalEngine(seed=0) + cc = crosscat.LocalEngine.LocalEngine metamodel = CrosscatMetamodel(cc) bayeslite.bayesdb_register_metamodel(bdb, metamodel) with pytest.raises(ValueError): diff --git a/tests/test_subsample.py b/tests/test_subsample.py index 9a3299b8..cac0a7a2 100644 --- a/tests/test_subsample.py +++ b/tests/test_subsample.py @@ -27,7 +27,7 @@ def test_subsample(): with bayeslite.bayesdb_open(builtin_metamodels=False) as bdb: - cc = crosscat.LocalEngine.LocalEngine(seed=0) + cc = crosscat.LocalEngine.LocalEngine metamodel = CrosscatMetamodel(cc) bayeslite.bayesdb_register_metamodel(bdb, metamodel) with open(dha_csv, 'rU') as f: