bccp/nbodykit

Help with parallelising nbodykit code

Opened this issue · 5 comments

Hi,

I’m trying to make this code run in parallel. It's built upon nbodykit and in essence does the following:

  • Generates initial linear density field (d_ic) on the mesh,
  • Computes displacement field on the mesh,
  • Generates uniform catalog of particles and computes d_ic at each particles position,
  • Shifts each particle by the displacement field at its position,
  • Assigns shifted particles to the mesh by weighting each particle by d_ic to obtain the output d1 field.

Ideally I would like to obtain the same output d1 field when I run it on a single core and when I run it on multiple cores. Below is a the core function which computes d1. I then save it doing FieldMesh(d1).save(…) and I get different output fields when I run with: python code.py or with mpirun -np 4 python code.py. The output fields look similar and I'm guessing what happens is that I'm only saving the field from a single core...

def generate_d1(delta_ic, cosmo, nbar, zic, zout, plot=True, weight=True, Rsmooth=0, seed=1234, Rdelta=0, comm=None):
    scale_factor = 1/(1+zout)
    Nmesh = delta_ic.Nmesh
    BoxSize = delta_ic.BoxSize[0]
    prefactor = cosmo.scale_independent_growth_factor(zout)/cosmo.scale_independent_growth_factor(zic)

    disp_f = [get_displacement_from_density_rfield(delta_ic, component=i, Psi_type='Zeldovich', smoothing={'R': Rsmooth,}) for i in range(3)]
    Nptcles_per_dim = int((nbar*BoxSize**3)**(1/3))
    
    pos = UniformCatalog(nbar, BoxSize=BoxSize, seed=seed)['Position']
    N = pos.shape[0]
    
    dtype = np.dtype([('Position', ('f8', 3)), ('delta_1', 'f8')])
    catalog = np.empty(pos.shape[0], dtype=dtype)
    catalog['Position'][:] = pos[:]
    del pos

    catalog['delta_1'][:] = delta_ic.readout(catalog['Position'], resampler='cic')*prefactor
    catalog['delta_1'][:] -= np.mean(catalog['delta_1'])

    displacement = np.zeros((N, 3))
    for i in range(3):
        displacement[:, i] = disp_f[i].readout(catalog['Position'], resampler='cic')
    displacement *= prefactor
    catalog['Position'][:] = (catalog['Position'] + displacement) % BoxSize
    del displacement, disp_f
    
    catalog = ArrayCatalog(catalog, BoxSize=BoxSize * np.ones(3), Nmesh=Nmesh, comm=comm)
    d1 = catalog.to_mesh(value='delta_1', compensated=True).to_real_field()     
    return d1

I tried going over the instructions to parallelise, but unfortunately nothing worked so far, so I was just wondering if there is a quick hack to make it work, that would be great. Please let me know if more info is needed.

Thanks and bests,
Andrej

Hi,

Thanks for your reply. Yes, the difference is bigger than the round-off error.

I made some progress in the meanwhile and rewrote my function following this. I paste the whole code I used for testing below. I'm now able to get the same results when running on a single core (serial) vs multiple, but only in the case where I simply assign particles to the mesh after moving them by Zeldovich. However, when I try to do value=delta_1 in to_mesh() function, I again get different results. You can see this in the plot I attach showing these 4 cases: serial vs mpirun, and switching on/off the value keyword. Let me know if this makes sense and if there's a way to get the same results.

Bests,
Andrej

d1_comparison

import numpy as np
from nbodykit.lab import *
# from nbodykit import CurrentMPIComm
from pmesh.pm import ParticleMesh

def get_dlin(seed, Nmesh, BoxSize, Pk):
    pm = ParticleMesh([Nmesh,Nmesh,Nmesh], BoxSize, comm=comm)
    wn = pm.generate_whitenoise(seed)
    dlin = wn.apply(lambda k, v: Pk(sum(ki ** 2 for ki in k)**0.5) ** 0.5 * v / v.BoxSize.prod() ** 0.5)
    return dlin
    
def generate_d1(delta_ic, cosmo, nbar, zic, zout, plot=True, weight=True, Rsmooth=0, seed=1234, Rdelta=0, comm=None):
    scale_factor = 1/(1+zout)
    Nmesh = delta_ic.Nmesh
    BoxSize = delta_ic.BoxSize[0]
    prefactor = cosmo.scale_independent_growth_factor(zout)/cosmo.scale_independent_growth_factor(zic)

    pos = UniformCatalog(nbar, BoxSize=BoxSize, seed=seed, comm=comm)['Position'].compute()
    N = pos.shape[0]
    catalog = np.empty(pos.shape[0], dtype=[('Position', ('f8', 3)), ('delta_1', 'f8')])
    displ_catalog = np.empty(pos.shape[0], dtype=[('displ', ('f8',3))])
    
    catalog['Position'][:] = pos[:]

    layout = delta_ic.pm.decompose(pos)

    catalog['delta_1'] = delta_ic.c2r().readout(pos)
    
    def potential_transfer_function(k, v):
        k2 = k.normp(zeromode=1)
        return v / (k2)
    pot_k = delta_ic.apply(potential_transfer_function, out=Ellipsis)
    
    for d in range(3):
        def force_transfer_function(k, v, d=d):
            return k[d] * 1j * v
        force_d = pot_k.apply(force_transfer_function).c2r(out=Ellipsis)
        displ_catalog['displ'][:, d] = force_d.readout(pos, layout=layout, resampler='cic')*prefactor
    
    catalog['Position'] = (catalog['Position'] + displ_catalog['displ']) % BoxSize
    del pos, displ_catalog

    d1 = ArrayCatalog(catalog, BoxSize=BoxSize * np.ones(3), Nmesh=Nmesh, comm=comm).to_mesh(compensated=True)
    # d1 = ArrayCatalog(catalog, BoxSize=BoxSize * np.ones(3), Nmesh=Nmesh, comm=comm).to_mesh(value='delta_1', compensated=True)
    return d1

comm = CurrentMPIComm.get()    
print ('comm', comm, 'comm.rank', comm.rank)
rank = comm.rank

##########################
### General parameters ###
##########################

seed = 5
nbar = 0.01
Nmesh = 64
BoxSize = 100
zout = 1
zic = 127 # TNG initial redshift

print ("Generating HI mock in real-space at output redshift z=%.0f, in a BoxSize L=%.1f using nbar=%.2f (%i particles) on a Nmesh=%i^3 grid with IC seed %i..."%(zout, BoxSize, nbar, int(nbar*BoxSize**3), Nmesh, seed))

# Cosmology
c = cosmology.Planck15
c = c.match(sigma8=0.8159)
Plin_z0 = cosmology.LinearPower(c, 0)
Dic  = c.scale_independent_growth_factor(zic)

# Generate linear overdensity field at zic
print ('Generating initial density field... ')
dlin = get_dlin(seed, Nmesh, BoxSize, Plin_z0)
dlin *= Dic

# Compute shifted fields
print ('Computing shifted fields... ')
d1 = generate_d1(dlin, c, nbar, zic, zout, comm=comm)
d1.save('new_d1_seed_%i_mpi_novalue'%seed, mode='real')

Hi,

Thanks, yes, I also realised that layout was the problem, so now I include it in here for example: delta_1 = delta_ic.c2r().readout(pos, layout=layout, resampler='cic'). I'm finally getting more reasonable results, with some minor differences.

Just to understand better, what would be the best thing to use for the smoothing parameter in layout = delta_ic.pm.decompose(pos, smoothing=?)? And what are the units of the smoothing parameter? Should I use larger values than the cell size?

Bests,
Andrej