mlverse/torch

dataloader issure when use parallel

Opened this issue · 2 comments

code:
ministdsta<-minist_dataset(xarray2,label=dfminist2[,1])
ministdlta<-dataloader(ministdsta,batch_size=200,shuffle=T,
num_workers = 2,pin_memory=T,
worker_globals=list(xarray2,dfminist2,ministdsta)
)
result:
Warning messages:
1: Datasets used with parallel dataloader (num_workers > 0) shouldn't have fields containing tensors as they can't be correctly passed to the wroker subprocesses.

  • A field named 'y' exists.
    2: Datasets used with parallel dataloader (num_workers > 0) shouldn't have fields containing tensors as they can't be correctly passed to the wroker subprocesses.
  • A field named 'x' exists.

and then when run in coorp.
code:
torch_manual_seed(1)

for(epoch in 1:100) {
coro::loop(for(b in ministdlta) {

b1<-b[[1]]
b2<-b[[2]]

})
print(epoch)}

result:
Error in self$.pop_task():
! Error when getting dataset item.
Caused by error:
! in callr subprocess.
Caused by error:
! 找不到对象'.socket_con'
Run rlang::last_trace() to see where the error occurred.

rlang::last_trace()
<error/runtime_error>
Error in self$.pop_task():
! Error when getting dataset item.
Caused by error:
! in callr subprocess.
Caused by error:
! 找不到对象'.socket_con'


Backtrace:
x

  1. +-coro::loop(...)
  2. | -rlang::eval_bare(loop, env)
  3. -coro (local) <fn>()
  4. +-coro::is_exhausted(elt <<- iterator())
  5. +-elt <<- iterator()
  6. | -rlang::env_poke(env, lhs, value, inherit = TRUE, create = FALSE)
  7. -torch (local) iterator()
  8. \-torch::dataloader_next(iter, coro::exhausted())
    
  9.   \-iter$.next()
    
  10.     \-self$.next_data()
    
  11.       \-self$.pop_task()
    

Run rlang::last_trace(drop = FALSE) to see 1 hidden frame.

I'm sorry, but I can already see why the dataloader parellel I wrote myself why it reported an error.
Here's an example of what works, in which I'll explain how to write the dataloader parallel correctly.

library(torch)
dfminist2<-read.csv("mnist_train.csv",header=F)
dfminist2[,1]<-dfminist2[,1]+1
dfminist2<-as.matrix(dfminist2)

minist_dataset<-dataset(
initialize=function(xarray,label){
#It is important to note that the data passed into initialize needs to be in the form of an array, matrix or vector.
#And you can't convert the attributes of the data to a tensor in the initialise section。
#if you do, you will get an unknown error when using dataloader parallel.
self$x<-xarray
self$y<-label
},.getitem=function(index){
#Extraction and transformation of data in the getitem section.
x_re<-torch_tensor(self$x[index,])$reshape(c(1,28,28))
y_re<-torch_tensor(self$y[index],dtype=torch_long())
list(x=x_re,y=y_re)
},.length=function(){length(self$y)})

ministdsta<-minist_dataset(dfminist2[,-1],dfminist2[,1])
ministdlta<-dataloader(ministdsta,batch_size=100,shuffle=T,
num_workers =2,#The number of lines needed is set here,
#but of course if the data conversion job is simple, as it is in my example,
#there is no need for multiple lines. Otherwise it will be much slower than a single thread.

#The following is if you use another data, function, or package in your dataset section.
#Then you need to broadcast these variables as names in multiple threads of R.
#As needed in the parrallel package.
#worker_init_fn = c("myfun1","myfun2"),
#worker_globals = c("mydadtaname1","mydataname2"),
#worker_packages = c("torchvison","otherpackagesname")
)

#In the training process, you need to pay attention to the following, the dataset and dataloader part should be written in the for_epoch loop.
for(epoch in 1:100){

#Execute dataset , dataloader and optimizer here
ministdsta<-mnist_dataset_tr(trx,dftr[,1]+1)
ministdlta<-dataloader(ministdsta,batch_size=100,shuffle=T,num_workers=6)
optimizer<-optim_adamw(model$parameters,lr=0.001)

model$train()
coro::loop(for(b in ministdlta){
optimizer$zero_grad()
output<-model(b[[1]]$to(device = device))
loss<-nnf_multilabel_soft_margin_loss(output,b[[2]]$to(device = device))
loss$backward()
optimizer$step()
})

#After each epoch, you need to delete loss,output,b,optimizer,dataset,dataloader
#And manually free cpu memory and gpu memory
rm(list=c("loss","output","b","optimizer","ministdsta","ministdlta"))
gc()
cuda_empty_cache()

#In this way, you can use dataloader parallel normally, especially if you use gpu for neural network accelerated training。
#you can greatly increase the gpu usage and shorten the dataload time.

#The above dataloader parallel in the large batch_size is still index error, after practice, although the underlying code can not be corrected, but through the following code to a large extent to prevent the occurrence of such an error occurs.

#code
minist_dataset<-dataset(
initialize=function(xarray,label){
#It is important to note that the data passed into initialize needs to be in the form of an array, matrix or vector.
#And you can't convert the attributes of the data to a tensor in the initialise section。
#if you do, you will get an unknown error when using dataloader parallel.
self$x<-xarray
self$y<-label
},.getitem=function(index){
#Set the dataset function internally to be single-threaded; note that this setting does not affect the multi-threaded operations of the torch in the main process.Using a single thread reduces the probability of index errors on dataset.
torch_set_num_threads(1)
#Extraction and transformation of data in the getitem section.
x_re<-torch_tensor(self$x[index,])$reshape(c(1,28,28))
y_re<-torch_tensor(self$y[index],dtype=torch_long())
list(x=x_re,y=y_re)
},.length=function(){length(self$y)})

ministdsta<-minist_dataset(dfminist2[,-1],dfminist2[,1])
ministdlta<-dataloader(ministdsta,batch_size=100,shuffle=T,
num_workers =2,#The number of lines needed is set here,
#but of course if the data conversion job is simple, as it is in my example,
#there is no need for multiple lines. Otherwise it will be much slower than a single thread.

#The following is if you use another data, function, or package in your dataset section.
#Then you need to broadcast these variables as names in multiple threads of R.
#As needed in the parrallel package.
#worker_init_fn = c("myfun1","myfun2"),
#worker_globals = c("mydadtaname1","mydataname2"),
#worker_packages = c("torchvison","otherpackagesname")
)

for(epoch in 1:100){
#Separating the multi-threading of the dataloader from the training of the neural network means that the seeming batch_data is pulled out of the dataloader before formal training takes place.
mydstr<-my_dataset_tr(trx,dftr[,1]+1)
mydltr<-dataloader(mydstr,batch_size=100,shuffle=T,num_workers=6)
repeat{#The key is to use a repeat loop and use tryCatch to filter for possible dataloader errors.
batch_list<-tryCatch({coro::collect(mydltr)},error=function(e){})
#If there is no error, abort the loop; if there is an error at dataloader, repeat dataloader.
if(!is.null(batch_list)){break}}
optimizer<-optim_adamw(model$parameters,lr=0.001)
model$train()
train_losses<-c()
pre<-c()
true<-c()
for(b in batch_list){#In the formal training process, the loop is performed directly for batch_list, not for dataloader.
optimizer$zero_grad()
output<-model(b[[1]]$to(device = device))
loss<-nnf_multilabel_soft_margin_loss(output,b[[2]]$to(device = device))
loss$backward()
optimizer$step()
train_losses<-c(train_losses,loss$item())#
pred<-torch_max(output$to(device="cpu"),dim=2)[[2]]#
pre<-c(pre,as.numeric(pred))
true<-c(true,as.numeric(torch_max(b[[2]],dim=2)[[2]]))
}
train_auc<-mean(true==pre)
rm(list=c("loss","output","b","optimizer","mydstr","mydltr","batch_list"))
gc()
cuda_empty_cache()
}

#This way, the dataloader parallel can be used normally