alibaba/EasyParallelLibrary

AttributeError: 'NoneType' object has no attribute 'taskgraph'

Waterpine opened this issue · 3 comments

Hi EPL team,

When I use epl library to train the following code:

import os
import numpy as np
from concurrent.futures import ThreadPoolExecutor
from PIL import Image
import tensorflow as tf
import epl

def preprocess_image(image):
    # Resize and crop
    width, height = image.size
    if width > height:
        new_width = int(224 * width / height)
        image = image.resize((new_width, 224))
        left = (new_width - 224) / 2
        image = image.crop((left, 0, left + 224, 224))
    else:
        new_height = int(224 * height / width)
        image = image.resize((224, new_height))
        top = (new_height - 224) / 2
        image = image.crop((0, top, 224, top + 224))

    # Normalize pixel values
    image = np.array(image, dtype=np.float32) / 255.0
    mean = np.array([0.485, 0.456, 0.406])[None, None, :]
    std = np.array([0.229, 0.224, 0.225])[None, None, :]
    image = (image - mean) / std

    return image


def load_and_preprocess_image(path):
    image = Image.open(path).convert('RGB')
    return preprocess_image(image)


train_image_dir = '/users/Master/imagenet/train'
val_image_dir = '/users/Master/imagenet/val'
class_names = sorted(os.listdir(train_image_dir))
num_classes = len(class_names)

train_image_paths = []
train_labels = []
val_image_paths = []
val_labels = []

for label, class_name in enumerate(class_names):
    train_class_dir = os.path.join(train_image_dir, class_name)
    val_class_dir = os.path.join(val_image_dir, class_name)

    for img_name in os.listdir(train_class_dir):
        img_path = os.path.join(train_class_dir, img_name)
        train_image_paths.append(img_path)
        train_labels.append(label)

    for img_name in os.listdir(val_class_dir):
        img_path = os.path.join(val_class_dir, img_name)
        val_image_paths.append(img_path)
        val_labels.append(label)


def load_images_parallel(image_paths, num_workers=16):
    with ThreadPoolExecutor(max_workers=num_workers) as executor:
        images = list(executor.map(load_and_preprocess_image, image_paths))
    return np.array(images)


def load_images_chunk(image_paths, labels, batch_size):
    num_batches = int(np.ceil(len(image_paths) / batch_size))
    for i in range(num_batches):
        batch_image_paths = image_paths[i * batch_size:(i + 1) * batch_size]
        batch_labels = labels[i * batch_size:(i + 1) * batch_size]
        batch_images = load_images_parallel(batch_image_paths)
        batch_labels_one_hot = tf.keras.utils.to_categorical(batch_labels, num_classes=num_classes)
        yield batch_images, batch_labels_one_hot


def conv2d_bn(x, filters, kernel_size, strides=1, padding='same', activation=tf.nn.relu, name=None):
    x = tf.layers.conv2d(x, filters, kernel_size, strides=strides, padding=padding, use_bias=False, name=name)
    x = tf.layers.batch_normalization(x, training=True)
    if activation is not None:
        x = activation(x)
    return x


def identity_block(input_tensor, filters, stage, block):
    filters1, filters2, filters3 = filters
    conv_name_base = 'res' + str(stage) + block + '_branch'
    bn_name_base = 'bn' + str(stage) + block + '_branch'

    x = conv2d_bn(input_tensor, filters1, 1, name=conv_name_base + '2a')
    x = conv2d_bn(x, filters2, 3, name=conv_name_base + '2b')
    x = conv2d_bn(x, filters3, 1, activation=None, name=conv_name_base + '2c')

    x = tf.add(x, input_tensor)
    x = tf.nn.relu(x)
    return x


def conv_block(input_tensor, filters, stage, block, strides=2):
    filters1, filters2, filters3 = filters
    conv_name_base = 'res' + str(stage) + block + '_branch'
    bn_name_base = 'bn' + str(stage) + block + '_branch'

    x = conv2d_bn(input_tensor, filters1, 1, strides=strides, name=conv_name_base + '2a')
    x = conv2d_bn(x, filters2, 3, name=conv_name_base + '2b')
    x = conv2d_bn(x, filters3, 1, activation=None, name=conv_name_base + '2c')

    shortcut = conv2d_bn(input_tensor, filters3, 1, strides=strides, activation=None, name=conv_name_base + '1')

    x = tf.add(x, shortcut)
    x = tf.nn.relu(x)
    return x


def resnet50(input_tensor, classes):
    x = conv2d_bn(input_tensor, 64, 7, strides=2, name='conv1')
    x = tf.layers.max_pooling2d(x, 3, strides=2, padding='same', name='pool1')

    x = conv_block(x, [64, 64, 256], stage=2, block='a', strides=1)
    x = identity_block(x, [64, 64, 256], stage=2, block='b')
    x = identity_block(x, [64, 64, 256], stage=2, block='c')

    x = conv_block(x, [128, 128, 512], stage=3, block='a')
    x = identity_block(x, [128, 128, 512], stage=3, block='b')
    x = identity_block(x, [128, 128, 512], stage=3, block='c')
    x = identity_block(x, [128, 128, 512], stage=3, block='d')

    x = conv_block(x, [256, 256, 1024], stage=4, block='a')
    x = identity_block(x, [256, 256, 1024], stage=4, block='b')
    x = identity_block(x, [256, 256, 1024], stage=4, block='c')
    x = identity_block(x, [256, 256, 1024], stage=4, block='d')
    x = identity_block(x, [256, 256, 1024], stage=4, block='e')
    x = identity_block(x, [256, 256, 1024], stage=4, block='f')

    x = conv_block(x, [512, 512, 2048], stage=5, block='a')
    x = identity_block(x, [512, 512, 2048], stage=5, block='b')
    x = identity_block(x, [512, 512, 2048], stage=5, block='c')

    x = tf.layers.average_pooling2d(x, 7, strides=1, padding='valid', name='pool5')
    x = tf.layers.flatten(x)
    x = tf.layers.dense(x, classes, activation=None, name='fc1000')

    return x


def run_model():
    with tf.Session() as sess:
        input_tensor = tf.placeholder(tf.float32, shape=[None, 224, 224, 3], name="input_image")
        labels_tensor = tf.placeholder(tf.float32, shape=[None, num_classes], name="labels")
        learning_rate = 0.001

        logits = resnet50(input_tensor, num_classes)

        loss_op = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits_v2(logits=logits, labels=labels_tensor))
        optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)
        train_op = optimizer.minimize(loss_op)

        correct_pred = tf.equal(tf.argmax(logits, 1), tf.argmax(labels_tensor, 1))
        accuracy_op = tf.reduce_mean(tf.cast(correct_pred, tf.float32))

        sess.run(tf.global_variables_initializer())

        epochs = 10
        batch_size = 64

        for epoch in range(epochs):
            step = 0

            for batch_images, batch_labels_one_hot in load_images_chunk(train_image_paths, train_labels, batch_size):
                _, loss, accuracy = sess.run(
                    [train_op, loss_op, accuracy_op],
                    feed_dict={input_tensor: batch_images, labels_tensor: batch_labels_one_hot}
                )
                print(f"Epoch {epoch + 1}/{epochs}, Step: {step}, Loss: {loss:.4f}, Accuracy: {accuracy:.4f}")
                step = step + 1

            # Validate the model
            val_accuracy_list = []
            for batch_images, batch_labels_one_hot in load_images_chunk(val_image_paths, val_labels, batch_size):
                accuracy = sess.run(accuracy_op,
                                    feed_dict={input_tensor: batch_images, labels_tensor: batch_labels_one_hot})
                val_accuracy_list.append(accuracy)
            val_accuracy = np.mean(val_accuracy_list)
            print(f"Validation Accuracy: {val_accuracy:.4f}")


if __name__ == '__main__':
    tf.logging.set_verbosity(tf.logging.INFO)
    config_json = {}
    epl.init(epl.Config(config_json))
    print(epl.Env.get().cluster.gpu_num_per_worker)
    if epl.Env.get().cluster.gpu_num_per_worker > 1:
        # Avoid NCCL hang.
        os.environ["NCCL_LAUNCH_MODE"] = "GROUP"
    epl.set_default_strategy(epl.replicate(device_count=1))
    run_model()

I am confronted with the following issue:
Traceback (most recent call last):
File "resnet50_split3.py", line 203, in
run_model()
File "resnet50_split3.py", line 164, in run_model
sess.run(tf.global_variables_initializer())
File "/users/Master/anaconda3/envs/py37/lib/python3.7/site-packages/epl/parallel/hooks.py", line 453, in run
assign_ops = _init_local_resources(self, fn)
File "/users/Master/anaconda3/envs/py37/lib/python3.7/site-packages/epl/parallel/hooks.py", line 416, in _init_local_resources
assign_ops = broadcast_variables()
File "/users/Master/anaconda3/envs/py37/lib/python3.7/site-packages/epl/parallel/hooks.py", line 339, in broadcast_variables
bcast_variables = taskgraph.get_variables(replica_idx)
File "/users/Master/anaconda3/envs/py37/lib/python3.7/site-packages/epl/ir/taskgraph.py", line 409, in get_variables
if id(var_tensor.taskgraph) != id(self):
AttributeError: 'NoneType' object has no attribute 'taskgraph'

Could you give me a hand when you are free? Thank you very much!

you can use tf.train.MonitoredTrainingSession instead of tf.Session , and global_variables_initializer is not necessary when using MonitoredTrainingSession, you can refer https://github.com/alibaba/FastNN/blob/73b70c633117ccff4f1a270f461bacb96e0fc4ee/resnet/resnet_dp.py#L67

Thanks for your reply! I modify the code as follows:

import numpy as np
import tensorflow as tf
from tensorflow.keras.datasets import cifar10
import epl
import os


def conv_bn_relu(inputs, filters, kernel_size, stride, training):
    conv = tf.layers.conv2d(inputs, filters, kernel_size, strides=stride, padding='SAME', use_bias=False)
    bn = tf.layers.batch_normalization(conv, training=training)
    relu = tf.nn.relu(bn)
    return relu


def bottleneck_block(inputs, filters, stride, training):
    shortcut = inputs
    out = conv_bn_relu(inputs, filters, 1, 1, training)
    out = conv_bn_relu(out, filters, 3, stride, training)
    out = conv_bn_relu(out, 4 * filters, 1, 1, training)
    if stride != 1 or inputs.get_shape().as_list()[-1] != 4 * filters:
        shortcut = tf.layers.conv2d(inputs, 4 * filters, 1, strides=stride, padding='SAME', use_bias=False)
        shortcut = tf.layers.batch_normalization(shortcut, training=training)
    out = tf.add(out, shortcut)
    return out


def resnet50(inputs, training):
    out = conv_bn_relu(inputs, 64, 3, 1, training)
    out = bottleneck_block(out, 64, 1, training)
    out = bottleneck_block(out, 128, 2, training)
    out = bottleneck_block(out, 256, 2, training)
    out = bottleneck_block(out, 512, 2, training)
    out = tf.layers.average_pooling2d(out, 4, 1)
    out = tf.layers.flatten(out)
    out = tf.layers.dense(out, 10)
    return out


def run_model():
    (X_train, y_train), (X_test, y_test) = cifar10.load_data()

    X_train, X_test = X_train.astype(np.float32) / 255.0, X_test.astype(np.float32) / 255.0
    y_train, y_test = y_train.astype(np.int32), y_test.astype(np.int32)

    images = tf.placeholder(tf.float32, shape=(None, 32, 32, 3), name='images')
    labels = tf.placeholder(tf.int32, shape=(None), name='labels')
    is_training = tf.placeholder(tf.bool, name='is_training')

    logits = resnet50(images, is_training)
    loss = tf.losses.sparse_softmax_cross_entropy(labels=labels, logits=logits)

    global_step = tf.train.get_or_create_global_step()
    optimizer = tf.train.AdamOptimizer(0.001)
    train_op = optimizer.minimize(loss, global_step=global_step)

    batch_size = 128
    n_epochs = 100
    hooks = [tf.train.StopAtStepHook(last_step=n_epochs * len(X_train) // batch_size)]

    def get_batch(data, labels, batch_size):
        idx = np.random.choice(np.arange(len(data)), batch_size, replace=False)
        return data[idx], labels[idx].flatten()

    with tf.train.MonitoredTrainingSession(hooks=hooks) as sess:
        while not sess.should_stop():
            batch_images, batch_labels = get_batch(X_train, y_train, batch_size)
            _, train_loss, step = sess.run(
                [train_op, loss, global_step],
                feed_dict={images: batch_images, labels: batch_labels, is_training: True}
            )
            if step % 100 == 0:
                print(f"Step {step}, Loss: {train_loss:.4f}")


if __name__ == '__main__':
    tf.logging.set_verbosity(tf.logging.INFO)
    config_json = {}
    epl.init(epl.Config(config_json))
    print(epl.Env.get().cluster.gpu_num_per_worker)
    if epl.Env.get().cluster.gpu_num_per_worker > 1:
        # Avoid NCCL hang.
        os.environ["NCCL_LAUNCH_MODE"] = "GROUP"
    epl.set_default_strategy(epl.replicate(device_count=1))
    run_model()

However, I am confronted with the following issue:

Traceback (most recent call last):
File "/users/Master/anaconda3/envs/py37/lib/python3.7/site-packages/tensorflow_core/python/client/session.py", line 1365, in _do_call
return fn(*args)
File "/users/Master/anaconda3/envs/py37/lib/python3.7/site-packages/tensorflow_core/python/client/session.py", line 1350, in _run_fn
target_list, run_metadata)
File "/users/Master/anaconda3/envs/py37/lib/python3.7/site-packages/tensorflow_core/python/client/session.py", line 1443, in _call_tf_sessionrun
run_metadata)
tensorflow.python.framework.errors_impl.InvalidArgumentError: From /job:worker/replica:0/task:0:
You must feed a value for placeholder tensor 'EPL_REPLICA_1/labels' with dtype int32
[[{{node EPL_REPLICA_1/labels}}]]

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "resnet50_split4.py", line 89, in
run_model()
File "resnet50_split4.py", line 74, in run_model
feed_dict={images: batch_images, labels: batch_labels, is_training: True}
File "/users/Master/anaconda3/envs/py37/lib/python3.7/site-packages/tensorflow_core/python/training/monitored_session.py", line 754, in run
run_metadata=run_metadata)
File "/users/Master/anaconda3/envs/py37/lib/python3.7/site-packages/tensorflow_core/python/training/monitored_session.py", line 1259, in run
run_metadata=run_metadata)
File "/users/Master/anaconda3/envs/py37/lib/python3.7/site-packages/tensorflow_core/python/training/monitored_session.py", line 1360, in run
raise six.reraise(*original_exc_info)
File "/users/Master/anaconda3/envs/py37/lib/python3.7/site-packages/six.py", line 719, in reraise
raise value
File "/users/Master/anaconda3/envs/py37/lib/python3.7/site-packages/tensorflow_core/python/training/monitored_session.py", line 1345, in run
return self._sess.run(*args, **kwargs)
File "/users/Master/anaconda3/envs/py37/lib/python3.7/site-packages/tensorflow_core/python/training/monitored_session.py", line 1418, in run
run_metadata=run_metadata)
File "/users/Master/anaconda3/envs/py37/lib/python3.7/site-packages/tensorflow_core/python/training/monitored_session.py", line 1176, in run
return self._sess.run(*args, **kwargs)
File "/users/Master/anaconda3/envs/py37/lib/python3.7/site-packages/epl/parallel/hooks.py", line 464, in run
outputs = fn(self, actual_fetches, feed_dict, options, run_metadata)
File "/users/Master/anaconda3/envs/py37/lib/python3.7/site-packages/tensorflow_core/python/client/session.py", line 956, in run
run_metadata_ptr)
File "/users/Master/anaconda3/envs/py37/lib/python3.7/site-packages/tensorflow_core/python/client/session.py", line 1180, in _run
feed_dict_tensor, options, run_metadata)
File "/users/Master/anaconda3/envs/py37/lib/python3.7/site-packages/tensorflow_core/python/client/session.py", line 1359, in _do_run
run_metadata)
File "/users/Master/anaconda3/envs/py37/lib/python3.7/site-packages/tensorflow_core/python/client/session.py", line 1384, in _do_call
raise type(e)(node_def, op, message)
tensorflow.python.framework.errors_impl.InvalidArgumentError: From /job:worker/replica:0/task:0:
You must feed a value for placeholder tensor 'EPL_REPLICA_1/labels' with dtype int32
[[node EPL_REPLICA_1/labels (defined at /users/Master/anaconda3/envs/py37/lib/python3.7/site-packages/tensorflow_core/python/framework/ops.py:1748) ]]

Original stack trace for 'EPL_REPLICA_1/labels':
File "resnet50_split4.py", line 89, in
run_model()
File "resnet50_split4.py", line 69, in run_model
with tf.train.MonitoredTrainingSession(hooks=hooks) as sess:
File "/users/Master/anaconda3/envs/py37/lib/python3.7/site-packages/tensorflow_core/python/training/monitored_session.py", line 584, in MonitoredTrainingSession
stop_grace_period_secs=stop_grace_period_secs)
File "/users/Master/anaconda3/envs/py37/lib/python3.7/site-packages/tensorflow_core/python/training/monitored_session.py", line 1014, in init
stop_grace_period_secs=stop_grace_period_secs)
File "/users/Master/anaconda3/envs/py37/lib/python3.7/site-packages/epl/parallel/hooks.py", line 319, in init
res = fn(self, *args, **kwargs)
File "/users/Master/anaconda3/envs/py37/lib/python3.7/site-packages/tensorflow_core/python/training/monitored_session.py", line 725, in init
self._sess = _RecoverableSession(self._coordinated_creator)
File "/users/Master/anaconda3/envs/py37/lib/python3.7/site-packages/tensorflow_core/python/training/monitored_session.py", line 1207, in init
_WrappedSession.init(self, self._create_session())
File "/users/Master/anaconda3/envs/py37/lib/python3.7/site-packages/tensorflow_core/python/training/monitored_session.py", line 1212, in _create_session
return self._sess_creator.create_session()
File "/users/Master/anaconda3/envs/py37/lib/python3.7/site-packages/tensorflow_core/python/training/monitored_session.py", line 878, in create_session
self.tf_sess = self._session_creator.create_session()
File "/users/Master/anaconda3/envs/py37/lib/python3.7/site-packages/tensorflow_core/python/training/monitored_session.py", line 638, in create_session
self._scaffold.finalize()
File "/users/Master/anaconda3/envs/py37/lib/python3.7/site-packages/epl/parallel/hooks.py", line 273, in finalize
fn(self)
File "/users/Master/anaconda3/envs/py37/lib/python3.7/site-packages/tensorflow_core/python/training/monitored_session.py", line 239, in finalize
ops.get_default_graph().finalize()
File "/users/Master/anaconda3/envs/py37/lib/python3.7/site-packages/epl/parallel/hooks.py", line 261, in finalize
Parallel.get().do_parallelism()
File "/users/Master/anaconda3/envs/py37/lib/python3.7/site-packages/epl/parallel/parallel.py", line 223, in do_parallelism
self.transformer.replicas_clone()
File "/users/Master/anaconda3/envs/py37/lib/python3.7/site-packages/epl/parallel/graph_editor.py", line 427, in replicas_clone
self._forward_clone()
File "/users/Master/anaconda3/envs/py37/lib/python3.7/site-packages/epl/parallel/graph_editor.py", line 343, in _forward_clone
target_device)
File "/users/Master/anaconda3/envs/py37/lib/python3.7/site-packages/epl/parallel/ops.py", line 237, in node_clone_for_replicas
op_def=op_def)
File "/users/Master/anaconda3/envs/py37/lib/python3.7/site-packages/tensorflow_core/python/framework/ops.py", line 1748, in init
self._traceback = tf_stack.extract_stack()

Could you give me a hand? Thank you very much!

you should replace get_batch with tf.data.Dataset