bitcapybara/budmq

common reader and writer io/pool

Closed this issue · 10 comments

use one stream for each producer/consumer to guarantee message order

  • for sync api, use one stream for each producer/consumer, guaranteed order
  • for async api, use one stream for each message, to avoid block, order not guaranteed

draft for async producer optimization:

  • send all producer messages to a Queue
  • if Queue remaining message quantity exceeds a threshold, open a new stream

use a independent stream for ping/pong packets

stream pool(bb8)?

client

let initial_num = 100;
let pool = Vec::with_capacity(initial_num);
for i in 0..initial_num {
    pool.push(conn.open_bidirect_stream())
}

server

while let Some(stream) = conn.accept() {
    let broker_tx = broker_tx.clone();
    tokio::spawn(async move {
         let framed = Framed::new(stream, PacketCodec);
         while let Some(packet) = stream.recv().await {
              broker_tx.send(packet);
         }
    });
}

Put the stream pool functionality in the common package

impl<M> Drop for PooledConnection<M>
where
    M: ManageConnection,
{
    fn drop(&mut self) {
        self.pool.put_back(self.checkout, self.conn.take().unwrap());
    }
}

impl<M> Deref for PooledConnection<M>
where
    M: ManageConnection,
{
    type Target = M::Connection;

    fn deref(&self) -> &M::Connection {
        &self.conn.as_ref().unwrap().conn
    }
}

impl<M> DerefMut for PooledConnection<M>
where
    M: ManageConnection,
{
    fn deref_mut(&mut self) -> &mut M::Connection {
        &mut self.conn.as_mut().unwrap().conn
    }
}
  1. Test the connection before putting it into the connection pool to ensure it is available. Only connections that pass the test can be put into the connection pool.
  2. Regularly check the connections in the connection pool and remove unavailable connections. You can start a thread to regularly check all connections and remove unavailable connections.
  3. Also test the connection after retrieving it from the connection pool and before returning it to the connection pool to ensure it is still available. If unavailable, discard the connection instead of putting it back to the connection pool.
  4. Set the maximum idle time for a connection. When a connection has been idle in the connection pool for longer than that value, test it again before next use to ensure its availability.
  5. The connection pool can set the maximum and minimum number of connections to always keep a certain number of active connections in the pool and avoid recreating connections every time a connection is retrieved.

singlepool: Arc<Mutex<Framed<BidirectionStream, PacketCodec>>

use id in request to avoid wait for response

useHashMap<u64, res_rx> to wait response

done