vibur/vibur-object-pool

Initialize the initial objects in parallel

sdoeringNew opened this issue · 11 comments

The current source code is pretty straight forward:

        for (int i = 0; i < initialSize; i++) {
            available.offerLast(requireNonNull(poolObjectFactory.create()));
            createdTotal.incrementAndGet();
        }

However if creating an pooled object takes one second and the initial size is ten it takes roughly ten seconds for the pool to be initialized. Creating it in parallel could boost the initializing time.

Why is the compiler not complaining that addInitialObjects() does not throw an checked exception although clearly a Throwable might be thrown. 😕

Hi @sdoeringNew, sorry for my slow reply I'm travelling overseas at the moment.

For your first question, to implement similar in parallel initialization the pool constructor needs to accept an additional parameter of type Executor or ExecutorService; then the initialization tasks need to be submitted to this Executor. Given that it is not a very common case where the initialization of a single object takes ~ a second, I feel that similar design will unnecessary complicate the pool implementation.

I suggest the following workaround to your problem: create in parallel in an Executor of your choice the initial number of objects, for example 10 objects, add them to a Queue and send this Queue as a parameter to the constructor of your own implementation of PoolObjectFactory, the same PoolObjectFactory that you'll after that send as a parameter to the constructor of ConcurrentPool. Then within your own implementation of PoolObjectFactory.create() just take an object from the Queue and return it. Once the queue is exhausted, i.e., when you're creating the 11th Object (if you need to), you'll start creating the Objects on demand. I think, this should work for your case.

As to your second question, please see the discussion here: #6 . This will give you a background why the catch of Throwable was introduced at this place.

For your first question, to implement similar in parallel initialization the pool constructor needs to accept an additional parameter of type Executor or ExecutorService; then the initialization tasks need to be submitted to this Executor. Given that it is not a very common case where the initialization of a single object takes ~ a second, I feel that similar design will unnecessary complicate the pool implementation.

A resource pool is normally used for resources that take long to initialize. That is the reason they will be pooled in the first place. So a second might be really a bit long in most cases but not unrealistic.

And no, it does not need an ExecutorService as additional parameter. As addInitialObjects(..) is only called once it would suffice to create an ExecutorService within this method. There will be no additional complexity outside this method. If the pool would be Java 8 it could be done very easily:

    try {
        IntStream.range(0, initialSize).parallel().forEach(i -> {
            available.offerLast(requireNonNull(poolObjectFactory.create()));
            createdTotal.incrementAndGet();
        });
    } catch (Throwable t) { // equivalent to catching "RuntimeException | Error", however, better for Kotlin interoperability
        drainCreated();
        throw t;
    }

As you notice only one line is different. But unfortunately it's still 1.7 and therefore the implementation would be a bit far more complex.

But I tested it with this implementation and our application actually started within five seconds and not 15s as before. That's quite an impressive performance improvement although only one line has changed.

Yes, this project is Java 1.7, I don't immediate plans to migrate it to Java 1.8 or newer. Streams parallel() uses ForkJoinPool.commonPool() which is Java 1.8. If this was Java 1.8, the commonPool() could be supplied as a parameter to the class constructor which gives additional flexibility to use different thread pool than the commmonPool() (this is why I originally suggested it), or we can implicitly refer to the commonPool() via Streams, or we can use the commonPool() explicitly. I don't like the idea to create an ExecutorService in the class constructor as this is equivalent to creating threads inside the constructor. If I was to implement a similar design, I would prefer to have the thread pool supplied as a parameter (i.e., injected) as this pushes the responsibility of the thread pool creation, management, shutdown, etc, to the application using the the object-pool.

Please consider the solution which I suggested you in my first post. It should give you the same performance benefit as the solution that you have implemented.

Ok. Seems I get to nowhere with you.

Perhaps another approach then.

As a ConcurrentCollection is always necessary for pool creation... What about using an already preinitialized collection? That implementation would be fairly simple - even in Java 1.7.

public ConcurrentPool(ConcurrentCollection<T> available, PoolObjectFactory<T> poolObjectFactory,
                      int initialSize, int maxSize, boolean fair, Listener<T> listener, boolean preinitialized) {
    forbidIllegalArgument(initialSize < 0);
    forbidIllegalArgument(maxSize < 1 || maxSize < initialSize || maxSize > MAX_ALLOWED_SIZE);

    this.available = requireNonNull(available);
    this.poolObjectFactory = requireNonNull(poolObjectFactory);
    this.listener = listener;

    this.initialSize = initialSize;
    this.maxSize = maxSize;
    this.takeSemaphore = new Semaphore(maxSize, fair);

    if(preInitialized) {
        this.createdTotal = new AtomicInteger(initialSize);
    } else {
        this.createdTotal = new AtomicInteger(0);
        addInitialObjects();
    }
}

This idea makes more sense to me.

However, what exactly is your concern with the approach that I suggested?

My concern is that there will be a Queue that was only used in a short period of time - during initialization - exists over the whole lifetime of the factory. Primarely from the start of the application until the end of the application.
And it complicates the users code unnecessarily.

And there is already a Queue in the Object Pool. Why there is a need for another one? I can't comprehend that. (Hence the alternative approach using the already necessary pools queue.)

And a faster initialization is the concern for everyone. It's always good to have that. So it should be a part of the pool and not part of a work around. Every pool with an initial size > 1 gathers from this improvement.

And the solution is so simple.
Create a ExecutorService in the addInitialObjects() method. Initialize the pooled objects in parallel. Shut the ExecutorService down in the addInitialObjects() method. Don't hassle the user with that if he doesn't need to know. That logic is limited and encapsulated to 1 (in words: "one") tiny method. Threading is not an evil thing. 👍
All other approaches seems so overcomplicated for both the user and the maintainer.

The queue that will exist during the lifetime of the factory would be actually empty which is probably not an issue, however, the approach with the pre-initialized ConcurrentCollection is more simple.

I like the pre-initialized Collection approach as it is simple to implement, doesn't change the existing class API, and doesn't require any new threads management by the pool. I'll implement and release this idea anytime soon.

I don't like some parts of your comments though. Comments like "seems I get to nowhere with you" are not productive and are actually redundant.

Done in version 25.0; should be in maven central in a couple of hours or so.

Thank you.

There is a minor bug in your implementation. If available is > 0 but < initialSize the method addInitialObjects is not called to create the remaining objects.

Why there is this limitation?
forbidIllegalArgument(availableSize != 0 && availableSize != initialSize);

Why not simply this?

public ConcurrentPool(ConcurrentCollection<T> available, PoolObjectFactory<T> poolObjectFactory,
                      int initialSize, int maxSize, boolean fair, Listener<T> listener) {
    forbidIllegalArgument(initialSize < 0);
    forbidIllegalArgument(maxSize < 1 || maxSize < initialSize || maxSize > MAX_ALLOWED_SIZE);
    int availableSize = available.size();
    forbidIllegalArgument(availableSize != 0 && availableSize > initialSize);

    [..]

    this.createdTotal = new AtomicInteger(availableSize);
    addInitialObjects();
}

private void addInitialObjects() {
    try {
        for (int i = available.size(); i < initialSize; i++) {
            available.offerLast(requireNonNull(poolObjectFactory.create()));
            createdTotal.incrementAndGet();
        }
    } catch (Throwable t) { // equivalent to catching "RuntimeException | Error", however, better for Kotlin interoperability
        drainCreated();
        throw t;
    }
}

I intended it to be the way it is implemented. The javadoc says "it must be an empty collection or a collection pre-initialized with {@code initialSize} objects". I don't see any benefit the ConcurrentCollection to be half initialized on input. If the application wants to initialize the first half of the collection in parallel and the second half of the collection sequentially it can still do so and then can set the collection as a parameter to ConcurrentPool.