Help with parallelising nbodykit code
Opened this issue · 5 comments
Hi,
I’m trying to make this code run in parallel. It's built upon nbodykit and in essence does the following:
- Generates initial linear density field (d_ic) on the mesh,
- Computes displacement field on the mesh,
- Generates uniform catalog of particles and computes d_ic at each particles position,
- Shifts each particle by the displacement field at its position,
- Assigns shifted particles to the mesh by weighting each particle by d_ic to obtain the output d1 field.
Ideally I would like to obtain the same output d1 field when I run it on a single core and when I run it on multiple cores. Below is a the core function which computes d1. I then save it doing FieldMesh(d1).save(…) and I get different output fields when I run with: python code.py or with mpirun -np 4 python code.py. The output fields look similar and I'm guessing what happens is that I'm only saving the field from a single core...
def generate_d1(delta_ic, cosmo, nbar, zic, zout, plot=True, weight=True, Rsmooth=0, seed=1234, Rdelta=0, comm=None):
scale_factor = 1/(1+zout)
Nmesh = delta_ic.Nmesh
BoxSize = delta_ic.BoxSize[0]
prefactor = cosmo.scale_independent_growth_factor(zout)/cosmo.scale_independent_growth_factor(zic)
disp_f = [get_displacement_from_density_rfield(delta_ic, component=i, Psi_type='Zeldovich', smoothing={'R': Rsmooth,}) for i in range(3)]
Nptcles_per_dim = int((nbar*BoxSize**3)**(1/3))
pos = UniformCatalog(nbar, BoxSize=BoxSize, seed=seed)['Position']
N = pos.shape[0]
dtype = np.dtype([('Position', ('f8', 3)), ('delta_1', 'f8')])
catalog = np.empty(pos.shape[0], dtype=dtype)
catalog['Position'][:] = pos[:]
del pos
catalog['delta_1'][:] = delta_ic.readout(catalog['Position'], resampler='cic')*prefactor
catalog['delta_1'][:] -= np.mean(catalog['delta_1'])
displacement = np.zeros((N, 3))
for i in range(3):
displacement[:, i] = disp_f[i].readout(catalog['Position'], resampler='cic')
displacement *= prefactor
catalog['Position'][:] = (catalog['Position'] + displacement) % BoxSize
del displacement, disp_f
catalog = ArrayCatalog(catalog, BoxSize=BoxSize * np.ones(3), Nmesh=Nmesh, comm=comm)
d1 = catalog.to_mesh(value='delta_1', compensated=True).to_real_field()
return d1
I tried going over the instructions to parallelise, but unfortunately nothing worked so far, so I was just wondering if there is a quick hack to make it work, that would be great. Please let me know if more info is needed.
Thanks and bests,
Andrej
Hi,
Thanks for your reply. Yes, the difference is bigger than the round-off error.
I made some progress in the meanwhile and rewrote my function following this. I paste the whole code I used for testing below. I'm now able to get the same results when running on a single core (serial) vs multiple, but only in the case where I simply assign particles to the mesh after moving them by Zeldovich. However, when I try to do value=delta_1
in to_mesh()
function, I again get different results. You can see this in the plot I attach showing these 4 cases: serial vs mpirun, and switching on/off the value keyword. Let me know if this makes sense and if there's a way to get the same results.
Bests,
Andrej
import numpy as np
from nbodykit.lab import *
# from nbodykit import CurrentMPIComm
from pmesh.pm import ParticleMesh
def get_dlin(seed, Nmesh, BoxSize, Pk):
pm = ParticleMesh([Nmesh,Nmesh,Nmesh], BoxSize, comm=comm)
wn = pm.generate_whitenoise(seed)
dlin = wn.apply(lambda k, v: Pk(sum(ki ** 2 for ki in k)**0.5) ** 0.5 * v / v.BoxSize.prod() ** 0.5)
return dlin
def generate_d1(delta_ic, cosmo, nbar, zic, zout, plot=True, weight=True, Rsmooth=0, seed=1234, Rdelta=0, comm=None):
scale_factor = 1/(1+zout)
Nmesh = delta_ic.Nmesh
BoxSize = delta_ic.BoxSize[0]
prefactor = cosmo.scale_independent_growth_factor(zout)/cosmo.scale_independent_growth_factor(zic)
pos = UniformCatalog(nbar, BoxSize=BoxSize, seed=seed, comm=comm)['Position'].compute()
N = pos.shape[0]
catalog = np.empty(pos.shape[0], dtype=[('Position', ('f8', 3)), ('delta_1', 'f8')])
displ_catalog = np.empty(pos.shape[0], dtype=[('displ', ('f8',3))])
catalog['Position'][:] = pos[:]
layout = delta_ic.pm.decompose(pos)
catalog['delta_1'] = delta_ic.c2r().readout(pos)
def potential_transfer_function(k, v):
k2 = k.normp(zeromode=1)
return v / (k2)
pot_k = delta_ic.apply(potential_transfer_function, out=Ellipsis)
for d in range(3):
def force_transfer_function(k, v, d=d):
return k[d] * 1j * v
force_d = pot_k.apply(force_transfer_function).c2r(out=Ellipsis)
displ_catalog['displ'][:, d] = force_d.readout(pos, layout=layout, resampler='cic')*prefactor
catalog['Position'] = (catalog['Position'] + displ_catalog['displ']) % BoxSize
del pos, displ_catalog
d1 = ArrayCatalog(catalog, BoxSize=BoxSize * np.ones(3), Nmesh=Nmesh, comm=comm).to_mesh(compensated=True)
# d1 = ArrayCatalog(catalog, BoxSize=BoxSize * np.ones(3), Nmesh=Nmesh, comm=comm).to_mesh(value='delta_1', compensated=True)
return d1
comm = CurrentMPIComm.get()
print ('comm', comm, 'comm.rank', comm.rank)
rank = comm.rank
##########################
### General parameters ###
##########################
seed = 5
nbar = 0.01
Nmesh = 64
BoxSize = 100
zout = 1
zic = 127 # TNG initial redshift
print ("Generating HI mock in real-space at output redshift z=%.0f, in a BoxSize L=%.1f using nbar=%.2f (%i particles) on a Nmesh=%i^3 grid with IC seed %i..."%(zout, BoxSize, nbar, int(nbar*BoxSize**3), Nmesh, seed))
# Cosmology
c = cosmology.Planck15
c = c.match(sigma8=0.8159)
Plin_z0 = cosmology.LinearPower(c, 0)
Dic = c.scale_independent_growth_factor(zic)
# Generate linear overdensity field at zic
print ('Generating initial density field... ')
dlin = get_dlin(seed, Nmesh, BoxSize, Plin_z0)
dlin *= Dic
# Compute shifted fields
print ('Computing shifted fields... ')
d1 = generate_d1(dlin, c, nbar, zic, zout, comm=comm)
d1.save('new_d1_seed_%i_mpi_novalue'%seed, mode='real')
Hi,
Thanks, yes, I also realised that layout was the problem, so now I include it in here for example: delta_1 = delta_ic.c2r().readout(pos, layout=layout, resampler='cic').
I'm finally getting more reasonable results, with some minor differences.
Just to understand better, what would be the best thing to use for the smoothing parameter in layout = delta_ic.pm.decompose(pos, smoothing=?)
? And what are the units of the smoothing parameter? Should I use larger values than the cell size?
Bests,
Andrej