When use the query_stream method, the error "bytes remaining on stream" occurs.
artorias1024 opened this issue · 1 comments
artorias1024 commented
This is the logic of my code.
async fn create_data_events(&self, captured_tables: HashSet<TableId>) -> core::result::Result<(), MySqlConnectorError> {
let pool = &self.context.pool;
let sender = &self.sender;
let max_concurrent_tasks = 2;
let semaphore = Arc::new(Semaphore::new(max_concurrent_tasks));
let mut handlers = Vec::new();
for table_id in captured_tables {
let table = self.context.schema.table_for(&table_id).unwrap().clone();
let connection = pool.get_conn().await?;
let sender_clone = sender.clone();
// Self::export_data(connection, table_id, table, sender.clone()).await?;
let semaphore_clone = semaphore.clone();
let join_handler = tokio::spawn(async move {
let _permit = semaphore_clone.acquire().await.unwrap();
Self::export_data(connection, table_id, table, sender_clone).await
});
handlers.push(join_handler);
}
for handle in handlers {
if let Ok(snapshot_result) = handle.await {}
}
Ok(())
}
async fn export_data(
mut connection: Conn,
table_id: TableId,
table: Arc<Table>,
sender: Sender<FlatMessage>,
) -> core::result::Result<(), MySqlConnectorError> {
{
info!("Exporting data from table '{}'", table_id);
let export_start_time = Instant::now();
let mut stream: ResultSetStream<Row, TextProtocol> = connection.query_stream(format!("select * from {}", table_id)).await?;
let mut rows_count = 0;
while let Some(next) = stream.next().await {
match next {
Ok(row) => {
let message = table.generate_snapshot_record(row);
match sender.send(message).await {
Ok(_) => {}
Err(err) => error!("{}", err),
}
}
Err(err) => {
error!("Snapshotting of table {} failed : {:#?}", table_id, err);
}
}
rows_count += 1;
}
drop(stream);
info!(
"Finished exporting {} records for table'{}';total duration {:#?}",
rows_count,
table_id,
export_start_time.elapsed()
);
}
let _ = connection.disconnect().await?;
Ok(())
}
artorias1024 commented
It's very strange that when I set the Semaphore to 1, which means that the connection is not used concurrently, no error occurs.