Make Loom backend interrupt safe with ctrl+c by clearing broken cached query servery
fsaad opened this issue · 0 comments
fsaad commented
Interrupting a SIMULATE
query using ctrl+c
causes an IOError: [Errno 32] Broken pipe
, presumably due to the cached server breaking as a result of the interrupt.
The IOError
can be repaired by removing the offending query server from the cache:
del bdb.backends['loom']._cache[bdb][1]
We should wrap the line server._predict(reader, num_samples, writer, False)
in the stack trace below around a try-except
block which captures the IOError and resets the server in the event that the pipe is broken.
============
Full stack trace
/scratch/fsaad/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/bayesdb.py in execute(self, string, bindings)
230 bindings = ()
231 return self._maybe_trace(
--> 232 self.tracer, self._do_execute, string, bindings)
233
234 def _maybe_trace(self, tracer, meth, string, bindings):
/scratch/fsaad/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/bayesdb.py in _maybe_trace(self, tracer, meth, string, bindings)
238 if tracer:
239 tracer(string, bindings)
--> 240 return meth(string, bindings)
241
242 def _qid(self):
/scratch/fsaad/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/bayesdb.py in _do_execute(self, string, bindings)
279 else:
280 raise ValueError('>1 phrase in string')
--> 281 cursor = bql.execute_phrase(self, phrase, bindings)
282 return self._empty_cursor if cursor is None else cursor
283
/scratch/fsaad/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/bql.py in execute_phrase(bdb, phrase, bindings)
92 ifnotexists = 'IF NOT EXISTS ' if phrase.ifnotexists else ''
93 out.write('CREATE %sTABLE %s%s AS ' % (temp, ifnotexists, qt))
---> 94 compiler.compile_query(bdb, phrase.query, out)
95 winders, unwinders = out.getwindings()
96 with compiler.bayesdb_wind(bdb, winders, unwinders):
/scratch/fsaad/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/compiler.py in compile_query(bdb, query, out)
212 :param Output out: output accumulator
213 """
--> 214 _compile_query(bdb, query, BQLCompiler_None(), out)
215
216 def _compile_query(bdb, query, bql_compiler, out):
/scratch/fsaad/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/compiler.py in _compile_query(bdb, query, bql_compiler, out)
252 compile_infer_auto(bdb, query, out)
253 elif isinstance(query, ast.Simulate):
--> 254 compile_simulate(bdb, query, out)
255 elif isinstance(query, ast.SimulateModels):
256 compile_simulate_models(bdb, query, bql_compiler, out)
/scratch/fsaad/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/compiler.py in compile_simulate(bdb, simulate, out)
716 bdb, population_id, generator_id, modelnos,
717 constraints, colnos, numpredictions=nsamples,
--> 718 accuracy=simulate.accuracy
719 ):
720 out.winder(insert_sql, row)
/scratch/fsaad/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/bqlfn.py in bayesdb_simulate(bdb, population_id, generator_id, modelnos, constraints, colnos, numpredictions, accuracy)
571 else:
572 counts = []
--> 573 rowses = map(simulate, generator_ids, backends, counts)
574 all_rows = [row for rows in rowses for row in rows]
575 assert all(isinstance(row, (tuple, list)) for row in all_rows)
/scratch/fsaad/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/bqlfn.py in simulate(generator_id, backend, n)
547 return backend.simulate_joint(
548 bdb, generator_id, modelnos, rowid, colnos, constraints,
--> 549 num_samples=n, accuracy=accuracy)
550 generator_ids = _retrieve_generator_ids(bdb, population_id, generator_id)
551 backends = [
/scratch/fsaad/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/backends/loom_backend.py in simulate_joint(self, bdb, generator_id, modelnos, rowid, targets, constraints, num_samples, accuracy)
803
804 # Obtain the prediction.
--> 805 server._predict(reader, num_samples, writer, False)
806
807 # Parse the CSV output.
/scratch/fsaad/.pyenv2.7/local/lib/python2.7/site-packages/loom/preql.pyc in _predict(self, reader, count, writer, id_offset)
331 to_sample,
332 conditioning_row,
--> 333 count)
334 for sample in samples:
335 sample = self.decode_row(sample, header)
/scratch/fsaad/.pyenv2.7/local/lib/python2.7/site-packages/loom/query.pyc in sample(self, to_sample, conditioning_row, sample_count)
152 request.sample.to_sample.dense[:] = to_sample
153 request.sample.sample_count = sample_count
--> 154 self.protobuf_server.send(request)
155 response = self.protobuf_server.receive()
156 if response.error:
/scratch/fsaad/.pyenv2.7/local/lib/python2.7/site-packages/loom/query.pyc in send(self, request)
334 request_string = request.SerializeToString()
335 protobuf_stream_write(request_string, self.proc.stdin)
--> 336 self.proc.stdin.flush()
337
338 def receive(self):
IOError: [Errno 32] Broken pipe