radical-collaboration/hpc-workflows

Running Lucas's EnTK workflow on Traverse

mturilli opened this issue Β· 46 comments

Depends on #111

Downloading data from within workflow

One of the future goals of my workflow is whenever an initial solution for an
earthquake mechanism is available to run an inversion on this new earthquake.
For that workflow it would be of supreme convenience if the data download could
happen inside the workflow.

Workflow

The workflow can be split into multiple stages.

  1. Create database structure for the earthquake and data to be produced
  2. Downloading the observed data
  3. Simulations (computationally heavy part)
  4. Processing of the data
  5. Inversion of the data.

We want to potentially do multiple iterations of stages 3-5, which would
raise the computational cost by a factor of the iterations, but we are not there
yet.

Downloading data

Ideally, I would like that all the above stages are included in a pipeline.
One pipeline for each earthquake, meaning, multiple pipelines in a workflow.

This would mean that the earthquake data has to be downloaded in a
Task/Stage. But I know that generally the Tasks are sent to the compute
nodes which usually do not have an internet connection. That means that the task
would somehow have to communicate with the manager to move that task to the
head node.

One of the script that I run is basically ./downloaddata.sh <eq_id> and it does
the thing. It just can't do it from the compute node. In my case, both head and
compute node have access to the same drives. So, I wouldn't have to send data
anywhere.

[This bit might not be too important:] Since I would like that to be done for
many earthquakes, it also has to happen
more or less dynamically. It would be nice if it can be allocated to the
different cores on the head node, so that downloads for different earthquakes
can happen in parallel.

Thanks in advance!

Let me know what you think.

I T ' S . R U N N I N G

The reason for that was misunderstanding in the GPU resource assignment!

For each task we have to set the CPU requirements to the correct number of MPI tasks like

task.cpu_reqs = {
    'processes': 6, 
    'process_type': 'MPI',
    'threads_per_process': 1, 
    'thread_type' : 'OpenMP'
    }

But the GPU assignment is GPUs/Task. So if I need 6 GPUs for my 6 tasks the GPU count is one. Since it will assign one GPU to each MPI task:

task.gpu_reqs = {
    'processes': 1,
    'process_type': 'MPI', 
    'threads_per_process': 1, 
    'thread_type' : 'CUDA'
    }

So the above task is requiring 6 CPUs and 6 GPUs, but 1 GPU per CPU task.

Thanks for solving this!

Compared to our last communication, what did you change to make it work?

I used the above resource dictionaries, that's all! So my specfem task with 6 MPI slices would look like this.

t1 = Task()
t1.pre_exec = [
    # Load necessary modules
    'module purge',
    'module load openmpi/gcc',
    'module load cudatoolkit/10.0',
        
    # Change to your specfem run directory
    'rm -rf /home/lsawade/simple_entk_specfem/specfem_run',
    'mkdir /home/lsawade/simple_entk_specfem/specfem_run',
    'cd /home/lsawade/simple_entk_specfem/specfem_run',
        
    # Create data structure in place
    'ln -s /scratch/gpfs/lsawade/specfem_6/bin .',
    'ln -s /scratch/gpfs/lsawade/specfem_6/DATABASES_MPI .',
    'cp -r /scratch/gpfs/lsawade/specfem_6/OUTPUT_FILES .',
    'mkdir DATA',
    'cp /scratch/gpfs/lsawade/specfem_6/DATA/CMTSOLUTION ./DATA/',
    'cp /scratch/gpfs/lsawade/specfem_6/DATA/STATIONS ./DATA/',
    'cp /scratch/gpfs/lsawade/specfem_6/DATA/Par_file ./DATA/'
    ]
t1.executable = ['./bin/xspecfem3D']
t1.cpu_reqs = {'processes': 6, 'process_type': 'MPI', 'threads_per_process': 1, 'thread_type' : 'OpenMP'}
t1.gpu_reqs = {'processes': 1, 'process_type': 'MPI', 'threads_per_process': 1, 'thread_type' : 'CUDA'}
t1.download_output_data = ['STDOUT', 'STDERR']
t1.name = "SIMULATION"
specfem_stage.add_tasks(t1)

Yes, the naming used for the gpu specs is confusing, and we are planning to change that rather sooner than later.

@mturilli : just making you aware that this popped up again.

I'm encountering a new, but very different issue. In my project environment, much like the radical-stack function, I have "binaries" installed via setup.py. I'm trying to run those executables, or rather activate the environment they're in, in the task's pre-executables, but the workflow manager will not bow to my will. I'm probably just misunderstanding something

What I was thinking would work is the following task setup:

task = Task()
task.name = "MyTask"
task.pre_exec = [
    "module load anaconda3",
    "conda activate gcmt3d"
    ]
task.executable = "<my_in_gcmt3d_installed_bin>"
task.arguments = ["-f  file", "-p param_file"]

But the executable is looking in @andre-merzky 's ve.rp environment where it is of course not installed. Are there any workarounds/am I doing something wrong?

Update

After some further testing I found that conda can't be found as an executable. This means that the pre_exec list, probably has exactly 0 effect. There must be a common work around since -- I assume -- others have used python environments with EnTK.

Update 2

I tried another another thing. I loaded anaconda and used the gcmt3d environment's python executable and this seems to work

task.pre_exec = [
    "module load anaconda3",
    ]
task.executable = "/home/lsawade/.conda/envs/gcmt3d/bin/python"
task.arguments = ['-c', 'import gcmt3d']

This sort of solves my problem, but isn't exactly user-friendly. Since I'm loading parameter files for to feed into the workflow manager it's easy to add python-bin to my parameter file which I use and can then change. But a solution like this (same as first piece of code above):

task = Task()
task.name = "MyTask"
task.pre_exec = [
    "module load anaconda3",
    "conda activate gcmt3d"
    ]
task.executable = "<my_in_gcmt3d_installed_bin>"
task.arguments = ["-f  file", "-p param_file"]

would be more elegant. I don't know if that's possible though.

Hi @lsawade : since the pilot is running in it's own environment, it is indeed sometimes tricky to pull up a new virtualenv or conda env for the tasks. But yes, we have some workarounds for that. What surprises me though is that conda wouldn't be found after the pre_exec - that is something I need to look into. I am glad you have a workaround...

@lsawade , my suggestion for the conda search is to update $PATH in your pre_exec, for example,
try this first on your bash shell to check the location:

module load anaconda3
dirname $(which conda)

If you can see where it is located, I would add it the $PATH:

task.pre_exec = [
  "module load anaconda3",
  "export PATH=<conda path>:$PATH",
  "conda activate gcmt3d"
]

I assume the behavior is somewhat different than you expected because you may have conda init in your .bashrc, for example on Summit,:

if ...
  . "/sw/summit/python/3.6/anaconda3/5.3.0/etc/profile.d/conda.sh"
else
     export PATH="/sw/summit/python/3.6/anaconda3/5.3.0/bin:$PATH"

So your shell knows where conda is located whereas EnTK Doesn’t.

BTW, module show anaconda3 also tells how it updates $PATH.

@lee212 That sounds like a good solution! I will test that!

I have also encountered an unrelated issue Trying to run multiple simulations simultaneously.

So I tried creating multiple tasks using a for-loop with different run directories. It seems like EnTK is hanging again. See below workflow:

from radical.entk import Pipeline, Stage, Task, AppManager
import traceback, sys, os


hostname = os.environ.get('RMQ_HOSTNAME', 'localhost')
port = os.environ.get('RMQ_PORT', 5672)

if __name__ == '__main__':
    p = Pipeline()

    specfem_stage = Stage()
    
    for i in range(1):
        t = Task()
        t.name = "SIMULATION_%d" % i
        tdir = "/home/lsawade/simple_entk_specfem/specfem_run_%d" % i
        t.pre_exec = [
            # Load necessary modules
            'module purge',
            'module load openmpi/gcc',
            'module load cudatoolkit/10.0',
            
            # Change to your specfem run directory
            'rm -rf %s',
            'mkdir %s',
            'cd %s',
            
            # Create data structure in place
            'ln -s /scratch/gpfs/lsawade/specfem_6/bin .',
            'ln -s /scratch/gpfs/lsawade/specfem_6/DATABASES_MPI .',
            'cp -r /scratch/gpfs/lsawade/specfem_6/OUTPUT_FILES .',
            'mkdir DATA',
            'cp /scratch/gpfs/lsawade/specfem_6/DATA/CMTSOLUTION ./DATA/',
            'cp /scratch/gpfs/lsawade/specfem_6/DATA/STATIONS ./DATA/',
            'cp /scratch/gpfs/lsawade/specfem_6/DATA/Par_file ./DATA/'
        ]
        t.executable = ['./bin/xspecfem3D']
        t.cpu_reqs = {'processes': 6, 'process_type': 'MPI', 'threads_per_process': 1, 'thread_type' : 'OpenMP'}
        t.gpu_reqs = {'processes': 1, 'process_type': 'MPI', 'threads_per_process': 1, 'thread_type' : 'CUDA'}
        t.download_output_data = ['STDOUT', 'STDERR']
        specfem_stage.add_tasks(t)
        
    p.add_stages(specfem_stage)
        
    res_dict = {
        'resource':  'princeton.traverse',  
        'schema'   : 'local',
        'walltime': 30*60,
        'cpus': 48,
        'gpus': 6
    }

    appman = AppManager(hostname=hostname, port=port, resubmit_failed=False)
    appman.resource_desc = res_dict
    appman.workflow = set([p])
    appman.run()        

Note that the for-loop is just one task. I wanted to try launching multiple ones, but it wasn't possible because of how many nodes are requested in the final run command in the unit.000000.sh

...
/usr/bin/srun --exclusive --cpu-bind=none --ntasks 6 --cpus-per-task 1 --gpus-per-task 1 --nodelist=/scratch/gpfs/lsawade/radical.pilot.sandbox/re.session.traverse.princeton.edu.lsawade.018375.0003/pilot.0000/unit.000000//unit.000000.nodes --export=ALL,CUDA_VISIBLE_DEVICES="" ./bin/xspecfem3D
...

The unit.000000/STDERR shows the following message

srun: error: Required nodelist includes more nodes than permitted by max-node count (6 > 3). Eliminating nodes from the nodelist.

So, my assumption is that the nodelist, which should be 2 for the above job is wrongfully assigned to the number of processes? Since I want 6 processes and GPUs

I am not a Slurm expert but I guess you may want to exchange the two variables: processes and threads_per_process to assign 6 physical cores per task. Also, if you consider 4 hw threads (SMT4) on Traverse, the numbers become 24 (6*4). It looks like:

        t.cpu_reqs = {
                              'processes': 1, 
                              'threads_per_process': 24, 
...

This results in srun command like:

...
/usr/bin/srun --exclusive --cpu-bind=none --ntasks 1 --cpus-per-task 24
...

This is actually built by a Pilot launch method from here: https://github.com/radical-cybertools/radical.pilot/blob/devel/src/radical/pilot/agent/launch_method/srun.py#L113

The srun launch method in RP is not very stable yet. We should probably check if mpirun or mpiexec are available on traverse and switch...

@lee212 I'm going to check out the processes, I know that Traverse has 4 hardware threads per core, so that might solve the problem.
But my whole resource set is just asking for a small amount of cores anyway, how come that EnTK would ask for more nodes. I mean I would understand less nodes with the thread argument, just not more. I'll continue to investigate. I'll check and update you. Thanks!

Just to double check, the --exclusive means, that the job is run on a certain nodes without sharing resources, correct?

@andre-merzky mpirun is available if an MPI module is loaded, e.g., module load openmpi/gcc. It is not available natively.

Update

Seems to me that I have been the issue.
After going over my code again after a few hours I found these neat little lines:

        . . .
        # Change to your specfem run directory
        'rm -rf %s',
        'mkdir %s',
        'cd %s',
        . . .

I don't usually like those, EnTK didn't either. One job ran succesfully, now trying to simultaneous ones.

Update 2

It seems to work πŸ€¦πŸ»β€β™‚οΈ

Ugh, those like look dangerous... Do you have a backup this time? :-D

It seems to work πŸ€¦πŸ»β€β™‚οΈ

Cool!

Now that the workflow work, we need to address #112 (comment) . @lsawade , could you show us the commands that you would use to move the data between two tasks?

Hi everyone,

So in my job, I'm requesting 4 nodes, and then for certain tasks I'm asking for a single core. Does EnTK automatically detect that I need one core on one node? I'm asking because I'm getting this error:

srun: Warning: can't run 1 processes on 4 nodes, setting nnodes to 1
srun: error: Unable to create step for job 72408: Requested node configuration is not available

The opposite happens for my specfem job, I would like to run one simulation on 2 nodes, using 3 GPUs on each Node (I thought that would make sense), but the resource assignment doesn't seem to happen correctly I get this error at runtime:

srun: error: Required nodelist includes more nodes than permitted by max-node count (6 > 4). Eliminating nodes from the nodelist.
srun: error: Unable to create step for job 72408: Memory required by task is not available

so it seems like srun tries to distribute the 6 processes that I'm requesting to 6 nodes:

/usr/bin/srun --exclusive --cpu-bind=none --ntasks 6 --cpus-per-task 1 --gpus-per-task 1 --nodelist=/scratch/gpfs/lsawade/radical.pilot.sandbox/re.session.traverse.princeton.edu.lsawade.018382.0001/pilot.0000/unit.000007//unit.000007.nodes --export=ALL,CUDA_VISIBLE_DEVICES="" ./bin/xspecfem3D

where the nodelist is:

traverse-k04g1,traverse-k04g1,traverse-k04g8,traverse-k04g8,traverse-k04g8,traverse-k04g8

So EnTK seems to ask for 6 Nodes for my task.

Is there an obvious error that I'm making that triggers this, or is there an easy way to avoid this?

@mturilli, the download/staginng data can be put on hold for now until we get the main workflow running. It should be lower priority than the computationally heavy part.

The node list contains 6 node names for the 6 ranks, but you will see that only two individual node names are used ([traverse-k04g1,traverse-k04g8]), so that looks correct.

Having said that: srun is indeed rather fickle with mpi layout requests, we have seen that on other machines. We'll try to switch you to mpirun as launch method.

Ah, I see so, in the json below it's simple asking for individual resources from the two nodes separately for each process?

But that then again would mean that it's asking for 6 GPUs from traverse-k04g8 right? Is that why srun is giving you trouble?

{'cores_per_node': 32,
 'gpus_per_node': 4,
 'lfs_per_node': {'path': '/tmp', 'size': 0},
 'lm_info': {'cores_per_node': 32},
 'mem_per_node': 0,
 'nodes': [{'core_map': [[2]],
            'gpu_map': [[2]],
            'lfs': {'path': '/tmp', 'size': 0},
            'mem': 0,
            'name': 'traverse-k04g1',
            'uid': 'traverse-k04g1'},
           {'core_map': [[3]],
            'gpu_map': [[3]],
            'lfs': {'path': '/tmp', 'size': 0},
            'mem': 0,
            'name': 'traverse-k04g1',
            'uid': 'traverse-k04g1'},
           {'core_map': [[0]],
            'gpu_map': [[0]],
            'lfs': {'path': '/tmp', 'size': 0},
            'mem': 0,
            'name': 'traverse-k04g8',
            'uid': 'traverse-k04g8'},
           {'core_map': [[1]],
            'gpu_map': [[1]],
            'lfs': {'path': '/tmp', 'size': 0},
            'mem': 0,
            'name': 'traverse-k04g8',
            'uid': 'traverse-k04g8'},
           {'core_map': [[2]],
            'gpu_map': [[2]],
            'lfs': {'path': '/tmp', 'size': 0},
            'mem': 0,
            'name': 'traverse-k04g8',
            'uid': 'traverse-k04g8'},
           {'core_map': [[3]],
            'gpu_map': [[3]],
            'lfs': {'path': '/tmp', 'size': 0},
            'mem': 0,
            'name': 'traverse-k04g8',
            'uid': 'traverse-k04g8'}]}

I feel like stepping in the dark right now.

I think I have sort of isolated the issue. When I have three nodes at my disposal. A job that requires only one process will fail as well using srun. @andre-merzky, see
radical.pilot.sandbox/re.session.traverse.princeton.edu.lsawade.018383.0006/pilot.0000/unit.000000.
While the nodelist specifies that only 1 Node is necessary for the job, and the .sl shows just a single task/process. srun (for whatever reason) will still try to use all nodes, giving me the following error:

srun: Warning: can't run 1 processes on 3 nodes, setting nnodes to 1 srun: error: Unable to create step for job 72601: Requested node configuration is not available

Now, in the unit.00000.sh, the nodelist specification is as follows:

. . . . /scratch/gpfs/lsawade/radical.pilot.sandbox/re.session.traverse.princeton.edu.lsawade.018383.0006/pilot.0000/unit.000000//unit.000000.nodes . . . .

Note the double slash between unit.000000 and unit.000000.nodes. Is that possible causing the error? The node list is therefore nothing and srun tries to use all nodes?

Testing with cat says no, but maybe there is something there...

@lsawade : what MPI module do you use on Traverse to compile your code?

I use openmpi/gcc

Hi, I am willing to run a test once this is ready but I wonder if I can have an account on Traverse for the test purpose.

Hi @lee212, we can probably organize an account for you!

Thanks @lsawade , I have access to Traverse now.

@lee212 Great!

@lee212 to run a standard test and report back

@lee212 are there any updates?

Assigning to @mturilli too as this has been labeled "priority high" for a while but we're still stuck.

Hi @shantenujha , I do not have an account on the machine so I am afraid I cannot do much about this ticket.

@lsawade , I wanted to inform you that I have completed a few test runs (mainly srun) and now try to switch mpirun for the correct mapping. I will post any progress.

@lee212 awesome! Thank you! Let me know if there's anything I can do!

@lsawade, FYI, I created a separate branch for mpirun testing, https://github.com/radical-cybertools/radical.pilot/tree/fix/princeton_traverse. If you like to give it a try, let me know. I, however, have to warn you this is not a solution and you may experience the same problem again.

@lsawade, I finished some tests and it seems working to assign gpus correctly with mpirun. I wanted to confirm this is resolved on your side as well, if you can share your scripts to run your jobs, I can replicate and see whether your problems are resolved or not, in specific, I am looking at these:

  • "requesting 4 nodes, and then for certain tasks, I'm asking for a single core"
  • "run one simulation on 2 nodes, using 3 GPUs on each Node"

The mpirun doesn't seem to behave like srun regarding correct parameter settings e.g. --ntasks and --nodelist on Traverse, which is good in my view to satisfy your job requirements.

BTW, I will summarize my test runs here to describe how it executed differently with mpirun. It will be the same tasks but with different launch commands and you may see what has been changed in the task placement.

@lsawade , can you share your script tested on Traverse? I heard that you're now working on Summit directly but this will be helpful to see the difference between srun and mpirun for the future.

@lee212 I will get you stuff on Thursday if that's ok, I'm a bit crammed with things right now!

Thanks lucas. Thursday sounds good!

@lee212 What RMQ port are you using? I got this from Matteo at some point, but it doesn't seem to work:

# Database resource
export RADICAL_PILOT_DBURL="mongodb://specfm:5p3cfm@two.radical-project.org:27017/specfm"

The error I'm getting is the following:

pika.exceptions.ConnectionClosed: Connection to 138.201.86.166:33267 failed: timeout

Just an update: After a chat session with @lee212 , I got specfem to run and I am now in the process of testing and working out the workflow.

Hi,

There are multiple things I would like to bring up in today's meeting:

  • Implementation of #SBATCH --reservation=test on Traverse.
  • Difficulty debugging, due to queue-ing. Sometimes even for a workflow containing a single Task, it's fairly hard to debug due to overhead and queue time.
  • Is there a dry-run optio that maybe just loads modules and checks if executable exist? Or can the workflow be launched in an allocated interactive job?
  • Download of data from within the workflow using the headnode: #112 (comment). This is something I want to figure out now. The download is launched by a simple executable request-data -f <some_input_file> which then downloads data to a certain directory without needing anything else. Meaning, as long as there is an internet connection the executable should handle it. Is it possible to run something from within the workflow on the headnode (which must have internet).

Thanks!

@lee212 , have you updated Princeton traverse to not having to use this resource setup for single core task:

t.cpu_reqs = {
    'processes': 1, 
    'process_type': 'MPI',
    'threads_per_process': 1, 
    'thread_type': 'OpenMP'}

?

@lsawade : gthe slurm adaptor in the RCT stack should support reservations. Please specify it as account:reservation in your project field of the resource description. Can you give that a try, please?

@andre-merzky: Do you mean project_id?