indeedeng/lsmtree

MultiThreaded Put case, iterator count is not correct

davinash opened this issue · 2 comments

I wrote following test
20 threads doing puts in the Store, each threads puts 100 keys.
When I iterate over using iterator I am getting less keys

package com.indeed.lsmtree.core;

import com.indeed.util.serialization.StringSerializer;
import junit.framework.TestCase;
import org.apache.commons.io.FileUtils;
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;


public class MultiThreadsOp extends TestCase {
  final class PutTask1 implements Runnable {
    private int taskId;
    private Store map;

    public PutTask1(int id, Store map) {
      this.taskId = id;
      this.map = map;
    }

    @Override
    public void run() {
      System.out.println("Task ID : " + this.taskId + " performed by "
          + Thread.currentThread().getName());
      for (int rowIdx = 0; rowIdx < 100; rowIdx++) {
        try {
          final String key = "Key-" + rowIdx + taskId;
          final String val = "Value-" + rowIdx + taskId;
          this.map.put( key, val );
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    }
  }

  @Test
  public void testLSMOpPutGetMultipleThreads() throws IOException {
    try {
      Store map = new StoreBuilder<String, String>(
          new File("/tmp/testLSMOpPutGetMultipleThreads"), new StringSerializer(),
          new StringSerializer()).setMaxVolatileGenerationSize(8 * 1024).setCodec(null)
          .setStorageType(StorageType.INLINE).build();

      final int numOfThreads = 20;
      ExecutorService taskExecutor = Executors.newFixedThreadPool(numOfThreads);
      IntStream.range(0, numOfThreads).forEach(i -> taskExecutor.submit(new PutTask1(i, map)));

      taskExecutor.shutdown();
      try {
        while(!taskExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
          Thread.sleep(60000);
        }
      } catch (InterruptedException e) {
      }


      Iterator iterator = map.iterator();
      int count = 0;
      while ( iterator.hasNext()) {
        count++;
        iterator.next();
      }
      assertEquals(100 * numOfThreads, count);
    } finally {
      FileUtils.deleteDirectory(new File("/tmp/testLSMOpPutGetMultipleThreads"));
    }
  }
}

If I reduce the numOfThreads to say 10, test works fine.

thanks! i will take a look into this.