Move from MT to Dist
Opened this issue · 23 comments
This issue is to discuss moving IPFitting entirely from multi-threading to distributed. I propose:
- Option to make the LSQ matrix a distributed array
- Parallel assembly via
pmap
or something similar rather than our threaded implementation. - Tutorial how to set this up on a cluster.
Despite thoughts about bigger rewrites, it might be very useful right now to test this out within the current IPFitting.
I saw some commented code that seemed like an early effort. Does whoever wrote this want to share their thoughts?
# function LsqDB_dist(basis::IPBasis,
# configs::AbstractVector{Dat})
# # assign indices, count observations and allocate a matrix
# Ψ = _alloc_lsq_matrix_dist(configs, basis, 4)
# irow = 0
# @sync @distributed for i = 1:nworkers()
# # add energy to the lsq system
# irow += 1
# #y[irow] = wE * E / length(at)
# #Ψ_part = localpart(Ψ)
# #@show size(Ψ_part)
# #@show Ψ[irow, :] .= repeat([irow], 36)
# Ψ_part = localpart(Ψ)
# vec(Ψ_part) .= (1:length(Ψ_part)) .+ 1000*myid()
# #Ψ_part .= energy(basis, at.at) / length(at.at)
# # add forces to the lsq system
# # nf = 3*length(at.at)
# # #y[(irow+1):(irow+nf)] = wF * mat(F)[:]
# # Fb = forces(basis, at.at)
# # for ib = 1:length(basis)
# # Ψ[(irow+1):(irow+nf), ib] = mat(Fb[ib])[:]
# # end
# # irow += nf
# end
# return Ψ
# end
@andresrossb it would be really helpful if you can discuss this with us. e.g. share your experience with the nonlinear codes, etc.
My current thought is that we have two issues that are related but not the same:
- system matrix: distributed or local
- assembly distributed
Realistically speaking, I think the assembly will always be distributed. What Andres does in his nonlinear codes is to assemble gradients of the loss for a small subset of the training structures, then sends those gradients to the manager process where the gradients are combined and then the step taken. The new parameters are then sent back to the workers.
Here, I think what we should do is
[STEP 1] Have each worker assemble matrix blocks, then send them to the manager process where they are combined into the big matrix.
[STEP 2] figure our how to combine [STEP 1] with a distributed array. What makes me pause on this is that I don't know how well we can control (a priori or a posteriori) how the array is distributed. And in fact this may not be a good way to think about it anyhow since the distribution of the assembly might be naturally very different from that of the matrix. This needs more discussion.
For [STEP 1] the task would be to read how the assembly is currently done multi-threaded, then simply replace the multi-threading with distribution over processes and add the communication. Andres should summarize what he did for his use-case. It was a bit of a puzzle until we figured things out and then seemed really straightforward.
Forgive me if this is naive, but you haven't mentioned the solvers yet. Is that because we expect Julia can apply the existing solver code naturally to a distributed matrix?
Not naive at all. My understanding is that Julia can apply its iterative solvers to distributed arrays without difficulties, and that it has defaults to dispatch to for direct solvers as well. But we haven't tested this, and TBH, we may wish to enable various options that only apply in the distributed setting. Let's call that [STEP 3]?
The main reason I asked is it seems plausible a solver could prefer the array distributed in a particular way. So STEP 3 would influence STEP 2. But sounds like we should just get something working and then revise as necessary.
you are probably right. All this is entirely outside my domain of expertise and should be done "right".
For [STEP 1] the task would be to read how the assembly is currently done multi-threaded, then simply replace the multi-threading with distribution over processes and add the communication. Andres should summarize what he did for his use-case. It was a bit of a puzzle until we figured things out and then seemed really straightforward.
For this specific case the idea was to compute gradients using Zygote on each of the workers for a subset of the data. Then simply combine them in the main processor, take a step and then send the new parameters to the workers.
Firstly, this code can add the required libraries to the workers, in this case it was mostly Flux and Zygote.
using Distributed
Distributed.addprocs(80)
@everywhere using Pkg
@everywhere Pkg.activate(pwd())
@everywhere Pkg.instantiate()
Then we assemble the model and divide the data locally, so it happens only once, and then send it using:
@eval @everywhere model=$model
@eval @everywhere x4worker=$x4worker
@eval @everywhere y4worker=$y4worker
Currently we share the whole divided data set among all workers, but I imagine we could simply change it to use a combination of @eval
and @spawnat
to be more efficient with memory.
We then have a function to evaluate a gradient on a single data point, and a function that calls it in different workers.
#on a worker
@everywhere function _gl(x, y)
l, back = Zygote.pullback(()->loss(pot,x,y), Flux.params(model))
return(l, back(1))
end
#local
#function that dispatches all workers and returns the loss and gradients
function gradcalc()
futures = Vector{Future}(undef, np)
for i in 1:np
f = @spawnat (i+1) map((x,y) -> _gl(x,y), x4worker[i], y4worker[i])
futures[i] = f
end
fg = fetch.(futures)
#flatten the gradient and mapreduce the loss
tl = 0.0
grds = []
for c1 in fg
for c2 in c1
l, g = c2
tl += l
push!(grds, g)
end
end
#gsum2mat sums gradients (they are Flux's Grad objects so it requires a little effort)
return tl / length(X), gsum2mat(grds) ./ length(X)
end
Once the gradient is computed we simply update the parameters in our main processor as usual, and then send the new parameters to each worker to update the model.
model[1].weight = reshape(w,s1,s2) #update the local model with the new parameters w
deepp = deepcopy(Flux.params(model)) #we deepcopy the parameters of the model
@eval @everywhere deepp=$deepp #we send to all workers the deep copy
@everywhere Flux.loadparams!(model, deepp) #we load this new parameters to the model in each worker
I created a PR with a first attempt at [STEP 1], it won't run yet, but I think it's a step in the right direction.
I have a version of this working now (for the matrix assembly only). It took a fair bit of effort (much of which was just the learning curve for me), but I used pmap
so the final result is actually reasonably succinct. Do I understand correctly that I should push it to @andresrossb's PR?
CC: @cortner
depends - did you start from Andres' PR, or start from scratch? If you started from Andres' PR, then you could just push it to his branch.
Would it be relatively easy to also incorporate this into
I did start from Andres' PR, so happy to push it there. I'll also put something in ACEfit, just need to acclimate there.
I realized I'm having this problem. It'll work much of the time but crash randomly. Currently trying to find a solution less ugly than duplicating every single using
statement.
So my overall impression is that there is a still a fair bit of brittleness in both multi-threading and multi-processing in Julia. We can start another issue where we discuss possible fail-safes? E.g. if process crashes, catch it start a new one. But also a more global one that if the „master“ process dies, then there should be a file we can load to restart…
This thread is useful, posting for future reference. It’s really important to activate an environment on all workers, either explicitly with @Everywhere (I see Andres did a version of this above) or by setting JULIA_PROJECT beforehand. By contrast, a single activate
on the manager process isn’t sufficient for consistent results.
https://discourse.julialang.org/t/packages-and-workers/14072
(Something similar holds for using
statements, but that seems well known, whereas the activate
bit is more subtle.)
I have always thought that people should set the JULIA_PROJECT variable before running any scripts. That seems to be the cleanest and both the ASE and OpenMM calculators assume this.
Interesting and good to know the last part
Just pushed to Andres' PR a solution for [STEP 1]. The main difference in the code is a revised pfor
loop, which I've stripped down to the bare essentials. I left the old pfor
code there under comments.
The main difference for the user is to create the db with dB = LsqDB_dist(
.
Many possible improvements, and I'm still thinking about [STEP 2].
eventually this should become a topic for the software meetings I think.