probcomp/bayeslite

Make Loom backend interrupt safe with ctrl+c by clearing broken cached query servery

fsaad opened this issue · 0 comments

fsaad commented

Interrupting a SIMULATE query using ctrl+c causes an IOError: [Errno 32] Broken pipe, presumably due to the cached server breaking as a result of the interrupt.

The IOError can be repaired by removing the offending query server from the cache:

del bdb.backends['loom']._cache[bdb][1]

We should wrap the line server._predict(reader, num_samples, writer, False) in the stack trace below around a try-except block which captures the IOError and resets the server in the event that the pipe is broken.

============
Full stack trace

/scratch/fsaad/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/bayesdb.py in execute(self, string, bindings)
    230             bindings = ()
    231         return self._maybe_trace(
--> 232             self.tracer, self._do_execute, string, bindings)
    233 
    234     def _maybe_trace(self, tracer, meth, string, bindings):

/scratch/fsaad/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/bayesdb.py in _maybe_trace(self, tracer, meth, string, bindings)
    238         if tracer:
    239             tracer(string, bindings)
--> 240         return meth(string, bindings)
    241 
    242     def _qid(self):

/scratch/fsaad/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/bayesdb.py in _do_execute(self, string, bindings)
    279         else:
    280             raise ValueError('>1 phrase in string')
--> 281         cursor = bql.execute_phrase(self, phrase, bindings)
    282         return self._empty_cursor if cursor is None else cursor
    283 

/scratch/fsaad/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/bql.py in execute_phrase(bdb, phrase, bindings)
     92             ifnotexists = 'IF NOT EXISTS ' if phrase.ifnotexists else ''
     93             out.write('CREATE %sTABLE %s%s AS ' % (temp, ifnotexists, qt))
---> 94             compiler.compile_query(bdb, phrase.query, out)
     95             winders, unwinders = out.getwindings()
     96             with compiler.bayesdb_wind(bdb, winders, unwinders):

/scratch/fsaad/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/compiler.py in compile_query(bdb, query, out)
    212     :param Output out: output accumulator
    213     """
--> 214     _compile_query(bdb, query, BQLCompiler_None(), out)
    215 
    216 def _compile_query(bdb, query, bql_compiler, out):

/scratch/fsaad/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/compiler.py in _compile_query(bdb, query, bql_compiler, out)
    252         compile_infer_auto(bdb, query, out)
    253     elif isinstance(query, ast.Simulate):
--> 254         compile_simulate(bdb, query, out)
    255     elif isinstance(query, ast.SimulateModels):
    256         compile_simulate_models(bdb, query, bql_compiler, out)

/scratch/fsaad/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/compiler.py in compile_simulate(bdb, simulate, out)
    716                 bdb, population_id, generator_id, modelnos,
    717                 constraints, colnos, numpredictions=nsamples,
--> 718                 accuracy=simulate.accuracy
    719             ):
    720             out.winder(insert_sql, row)

/scratch/fsaad/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/bqlfn.py in bayesdb_simulate(bdb, population_id, generator_id, modelnos, constraints, colnos, numpredictions, accuracy)
    571     else:
    572         counts = []
--> 573     rowses = map(simulate, generator_ids, backends, counts)
    574     all_rows = [row for rows in rowses for row in rows]
    575     assert all(isinstance(row, (tuple, list)) for row in all_rows)

/scratch/fsaad/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/bqlfn.py in simulate(generator_id, backend, n)
    547         return backend.simulate_joint(
    548             bdb, generator_id, modelnos, rowid, colnos, constraints,
--> 549             num_samples=n, accuracy=accuracy)
    550     generator_ids = _retrieve_generator_ids(bdb, population_id, generator_id)
    551     backends = [

/scratch/fsaad/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/backends/loom_backend.py in simulate_joint(self, bdb, generator_id, modelnos, rowid, targets, constraints, num_samples, accuracy)
    803 
    804         # Obtain the prediction.
--> 805         server._predict(reader, num_samples, writer, False)
    806 
    807         # Parse the CSV output.

/scratch/fsaad/.pyenv2.7/local/lib/python2.7/site-packages/loom/preql.pyc in _predict(self, reader, count, writer, id_offset)
    331                 to_sample,
    332                 conditioning_row,
--> 333                 count)
    334             for sample in samples:
    335                 sample = self.decode_row(sample, header)

/scratch/fsaad/.pyenv2.7/local/lib/python2.7/site-packages/loom/query.pyc in sample(self, to_sample, conditioning_row, sample_count)
    152         request.sample.to_sample.dense[:] = to_sample
    153         request.sample.sample_count = sample_count
--> 154         self.protobuf_server.send(request)
    155         response = self.protobuf_server.receive()
    156         if response.error:

/scratch/fsaad/.pyenv2.7/local/lib/python2.7/site-packages/loom/query.pyc in send(self, request)
    334         request_string = request.SerializeToString()
    335         protobuf_stream_write(request_string, self.proc.stdin)
--> 336         self.proc.stdin.flush()
    337 
    338     def receive(self):

IOError: [Errno 32] Broken pipe