common reader and writer io/pool
Closed this issue · 10 comments
bitcapybara commented
use one stream for each producer/consumer to guarantee message order
bitcapybara commented
- for sync api, use one stream for each producer/consumer, guaranteed order
- for async api, use one stream for each message, to avoid block, order not guaranteed
bitcapybara commented
draft for async producer optimization:
- send all producer messages to a Queue
- if Queue remaining message quantity exceeds a threshold, open a new stream
bitcapybara commented
use a independent stream for ping/pong packets
bitcapybara commented
stream pool(bb8)?
client
let initial_num = 100;
let pool = Vec::with_capacity(initial_num);
for i in 0..initial_num {
pool.push(conn.open_bidirect_stream())
}
server
while let Some(stream) = conn.accept() {
let broker_tx = broker_tx.clone();
tokio::spawn(async move {
let framed = Framed::new(stream, PacketCodec);
while let Some(packet) = stream.recv().await {
broker_tx.send(packet);
}
});
}
bitcapybara commented
Put the stream pool functionality in the common
package
bitcapybara commented
impl<M> Drop for PooledConnection<M>
where
M: ManageConnection,
{
fn drop(&mut self) {
self.pool.put_back(self.checkout, self.conn.take().unwrap());
}
}
impl<M> Deref for PooledConnection<M>
where
M: ManageConnection,
{
type Target = M::Connection;
fn deref(&self) -> &M::Connection {
&self.conn.as_ref().unwrap().conn
}
}
impl<M> DerefMut for PooledConnection<M>
where
M: ManageConnection,
{
fn deref_mut(&mut self) -> &mut M::Connection {
&mut self.conn.as_mut().unwrap().conn
}
}
bitcapybara commented
- Test the connection before putting it into the connection pool to ensure it is available. Only connections that pass the test can be put into the connection pool.
- Regularly check the connections in the connection pool and remove unavailable connections. You can start a thread to regularly check all connections and remove unavailable connections.
- Also test the connection after retrieving it from the connection pool and before returning it to the connection pool to ensure it is still available. If unavailable, discard the connection instead of putting it back to the connection pool.
- Set the maximum idle time for a connection. When a connection has been idle in the connection pool for longer than that value, test it again before next use to ensure its availability.
- The connection pool can set the maximum and minimum number of connections to always keep a certain number of active connections in the pool and avoid recreating connections every time a connection is retrieved.
bitcapybara commented
singlepool: Arc<Mutex<Framed<BidirectionStream, PacketCodec>>
bitcapybara commented
use id in request to avoid wait for response
useHashMap<u64, res_rx>
to wait response
bitcapybara commented
done