oracle/oci-python-sdk

Potential Memory Leak while bulk-copying objects

ncantile opened this issue · 8 comments

Hello team, i have to duplicate a bucket of roughly 20TB to a destination bucket in the same region. I installed this package on an OL8 Virtual Machine and I'm using the object_storage_bulk_copy.py script with instance principal authentication to do so.
The script works but after roughly 30 minutes it reclaims all the 16GB of RAM of my VM and thus the process freezes the machine.
Several questions comes to my head:

  1. What are the minimal requirements to perform that script?
  2. Changing the number of workers (from 50 to 25) doesn't seem to solve the issue, does it have an impact?
  3. I cannot use the prefix flag as all the objects are in a single directory and I have to copy the whole bucket anyways.

Do you have any suggestion on this topic? how can I improve the performance of the bulk copy?
Thanks

Hi The bulk copy script uses multi threads, pickle and memory to check status of the operation, (it is not memory leak), in order to reduce memory usage, you can change the script to run in serial (not threads) which will take longer to run.
Alternative, create larger memory VM.

Many thanks for the reply, have you got any suggestion on how to disable multithreading?

Need to do changes to the main loop
In general like below

    while True:
        response = object_storage_client.list_objects(ns, bucket, start=next_starts_with, prefix=source_prefix, retry_strategy=oci.retry.DEFAULT_RETRY_STRATEGY)
        next_starts_with = response.data.next_start_with

        for object_ in response.data.objects:
            if source_prefix and not object_.name.startswith(source_prefix):
                continue
            if source_prefix_exclude and object_.name.startswith(source_prefix_exclude):
                continue

            # copy_object....
            

        if not next_starts_with:
            break

I am happy to write it later today or tomorrow, but if you have a lot of files, it can take time to copy.

I'll get back to you when I've had time to test your suggestions, thanks again!

Here new script, serial, I will push it for next week
If you are happy, please close the ticket

# coding: utf-8
# Copyright (c) 2016, 2023, Oracle and/or its affiliates.  All rights reserved.
# This software is dual-licensed to you under the Universal Permissive License (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose either license.

##########################################################################
# object_storage_copy.py
#
# @author: Adi Z
#
# Supports Python 3
#
# DISCLAIMER – This is not an official Oracle application,  It does not supported by Oracle Support, It should NOT be used for utilization calculation purposes
##########################################################################
# Info:
#    Copy object storage bucket to other bucket with single thread
#
##########################################################################
# Application Command line parameters
#
#   -c config  - Config file section to use (tenancy profile)
#   -t profile - Profile in config file, DEFAULT as default
#   -p proxy   - Set Proxy (i.e. www-proxy-server.com:80)
#   -ip        - Use Instance Principals for Authentication
#   -dt        - Use Instance Principals with delegation token for cloud shell
#   -sb source_bucket
#   -sr source_region
#   -sn source_namespace
#   -sp source_prefix_include
#   -se source_prefix_exclude
#   -db destination_bucket
#   -dr destination_region
#   -ig ignore_check_exist
##########################################################################

import oci
import argparse
import datetime
import sys
import os
import copy


##########################################################################
# Pre Main
##########################################################################

# Get Command Line Parser
parser = argparse.ArgumentParser()
parser.add_argument('-t', default="", dest='config_profile', help='Config file section to use (tenancy profile)')
parser.add_argument('-p', default="", dest='proxy', help='Set Proxy (i.e. www-proxy-server.com:80) ')
parser.add_argument('-ip', action='store_true', default=False, dest='is_instance_principals', help='Use Instance Principals for Authentication')
parser.add_argument('-dt', action='store_true', default=False, dest='is_delegation_token', help='Use Delegation Token for Authentication')
parser.add_argument('-c', default="", dest='config_file', help="Config File (default=~/.oci/config)")
parser.add_argument('-sb', default="", dest='source_bucket', help='Source Bucket Name')
parser.add_argument('-sr', default="", dest='source_region', help='Source Region (Default current connection)')
parser.add_argument('-sn', default="", dest='source_namespace', help='Source Namespace (Default current connection)')
parser.add_argument('-sp', default="", dest='source_prefix_include', help='Source Prefix Include')
parser.add_argument('-se', default="", dest='source_prefix_exclude', help='Source Prefix Exclude')
parser.add_argument('-db', default="", dest='destination_bucket', help='Destination Bucket Name')
parser.add_argument('-dr', default="", dest='destination_region', help='Destination Region')
parser.add_argument('-dn', default="", dest='destination_namespace', help='Destination Namespace (Default current connection)')
cmd = parser.parse_args()

if len(sys.argv) < 2:
    parser.print_help()
    raise SystemExit

if not cmd.source_bucket or not cmd.destination_bucket:
    print("Source and Destination buckets parameters are required !!!\n")
    parser.print_help()
    raise SystemExit

# Global  Variables and queues
data = {}

# Global Variables
object_storage_client = None
object_storage_client_dest = None

source_bucket = cmd.source_bucket
source_region = cmd.source_region
source_namespace = cmd.source_namespace
destination_namespace = cmd.destination_namespace
source_prefix = cmd.source_prefix_include
source_prefix_exclude = cmd.source_prefix_exclude
destination_bucket = cmd.destination_bucket
destination_region = cmd.destination_region

# Update Variables based on the parameters
config_file = (cmd.config_file if cmd.config_file else oci.config.DEFAULT_LOCATION)
config_profile = (cmd.config_profile if cmd.config_profile else oci.config.DEFAULT_PROFILE)


##########################################################################
# Create signer for Authentication
# Input - config_file, config_profile and is_instance_principals and is_delegation_token
# Output - config and signer objects
##########################################################################
def create_signer(config_file, config_profile, is_instance_principals, is_delegation_token):

    # if instance principals authentications
    if is_instance_principals:
        try:
            signer = oci.auth.signers.InstancePrincipalsSecurityTokenSigner()
            config = {'region': signer.region, 'tenancy': signer.tenancy_id}
            return config, signer

        except Exception:
            print_header("Error obtaining instance principals certificate, aborting")
            raise SystemExit

    # -----------------------------
    # Delegation Token
    # -----------------------------
    elif is_delegation_token:

        try:
            # check if env variables OCI_CONFIG_FILE, OCI_CONFIG_PROFILE exist and use them
            env_config_file = os.environ.get('OCI_CONFIG_FILE')
            env_config_section = os.environ.get('OCI_CONFIG_PROFILE')

            # check if file exist
            if env_config_file is None or env_config_section is None:
                print("*** OCI_CONFIG_FILE and OCI_CONFIG_PROFILE env variables not found, abort. ***")
                print("")
                raise SystemExit

            config = oci.config.from_file(env_config_file, env_config_section)
            delegation_token_location = config["delegation_token_file"]

            with open(delegation_token_location, 'r') as delegation_token_file:
                delegation_token = delegation_token_file.read().strip()
                # get signer from delegation token
                signer = oci.auth.signers.InstancePrincipalsDelegationTokenSigner(delegation_token=delegation_token)

                return config, signer

        except KeyError:
            print("* Key Error obtaining delegation_token_file")
            raise SystemExit

        except Exception:
            raise

    # -----------------------------
    # config file authentication
    # -----------------------------
    else:
        config = oci.config.from_file(
            (config_file if config_file else oci.config.DEFAULT_LOCATION),
            (config_profile if config_profile else oci.config.DEFAULT_PROFILE)
        )
        signer = oci.signer.Signer(
            tenancy=config["tenancy"],
            user=config["user"],
            fingerprint=config["fingerprint"],
            private_key_file_location=config.get("key_file"),
            pass_phrase=oci.config.get_config_value_or_default(config, "pass_phrase"),
            private_key_content=config.get("key_content")
        )
        return config, signer


##############################################################################
# get time
##############################################################################
def get_time(full=False):
    if full:
        return str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    else:
        return str(datetime.datetime.now().strftime("%H:%M:%S"))


##########################################################################
# Print header centered
##########################################################################
def print_header(name):
    chars = int(90)
    print("")
    print('#' * chars)
    print("#" + name.center(chars - 2, " ") + "#")
    print('#' * chars)


##########################################################################
# Print Info
##########################################################################
def print_command_info():
    print_header("Running Object Storage Bulk Copy")
    print("Written by Adi Z, Oct 2023")
    print("Starts at        : " + get_time(True))
    print("Command Line     : " + ' '.join(x for x in sys.argv[1:]))
    print("Source Namespace : " + source_namespace)
    print("Source Region    : " + source_region)
    print("Source Bucket    : " + source_bucket)
    print("Source Prefix    : " + source_prefix)
    print("Dest   Namespace : " + destination_namespace)
    print("Dest   Region    : " + destination_region)
    print("Dest   Bucket    : " + destination_bucket)


##############################################################################
# copy_objects_main
##############################################################################
def copy_objects_main(ns, bucket):
    global source_region
    global destination_region
    global source_namespace
    global destination_namespace
    global object_storage_client
    global object_storage_client_dest

    count = 0
    next_starts_with = None
    print("")

    while True:
        response = object_storage_client.list_objects(ns, bucket, start=next_starts_with, prefix=source_prefix, retry_strategy=oci.retry.DEFAULT_RETRY_STRATEGY)
        next_starts_with = response.data.next_start_with

        for object_ in response.data.objects:
            if source_prefix and not object_.name.startswith(source_prefix):
                continue
            if source_prefix_exclude and object_.name.startswith(source_prefix_exclude):
                continue

            response = copy_object(source_namespace, source_bucket, object_.name, destination_namespace, destination_region, destination_bucket, object_.name)
            count += 1
            print(str(str(count) + ".").ljust(5) + object_.name)

        if not next_starts_with:
            break

    print("")
    return count


##############################################################################
# copy_object
##############################################################################
def copy_object(src_ns, src_b, src_o, dst_ns, dst_r, dst_b, dst_o):
    copy_request = oci.object_storage.models.copy_object_details.CopyObjectDetails()
    copy_request.source_object_name = src_o
    copy_request.destination_namespace = dst_ns
    copy_request.destination_region = dst_r
    copy_request.destination_bucket = dst_b
    copy_request.destination_object_name = dst_o

    return object_storage_client.copy_object(src_ns, src_b, copy_request)


##############################################################################
# connect to objec storage
##############################################################################
def connect_to_object_storage():

    # global parameters
    global source_region
    global destination_region
    global source_namespace
    global destination_namespace
    global object_storage_client
    global object_storage_client_dest

    print_header("Connecting to Object Storage")

    # get signer
    config, signer = create_signer(cmd.config_file, cmd.config_profile, cmd.is_instance_principals, cmd.is_delegation_token)
    config_destination = copy.deepcopy(config)

    # assign region from config file
    if not source_region:
        source_region = config['region']
    else:
        config['region'] = source_region

    if not destination_region:
        destination_region = config['region']
    else:
        config_destination['region'] = destination_region

    try:
        # connect to source region
        print("\nConnecting to Object Storage Service for source region - " + source_region)
        object_storage_client = oci.object_storage.ObjectStorageClient(config, signer=signer)
        if cmd.proxy:
            object_storage_client.base_client.session.proxies = {'https': cmd.proxy}

        # retrieve namespace from object storage
        if not source_namespace:
            source_namespace = object_storage_client.get_namespace(retry_strategy=oci.retry.DEFAULT_RETRY_STRATEGY).data
        print("Succeed.")

    except Exception as e:
        print("\nError connecting to object storage at source region - " + str(e))
        raise SystemExit

    try:
        # connect to destination object storage
        print("\nConnecting to Object Storage Service for destination region - " + destination_region)
        object_storage_client_dest = oci.object_storage.ObjectStorageClient(config_destination, signer=signer)
        if cmd.proxy:
            object_storage_client_dest.base_client.session.proxies = {'https': cmd.proxy}

        # retrieve namespace from object storage
        if not destination_namespace:
            destination_namespace = object_storage_client_dest.get_namespace(retry_strategy=oci.retry.DEFAULT_RETRY_STRATEGY).data
        print("Succeed.")

    except Exception as e:
        print("\nError connecting to object storage at destination region - " + str(e))
        raise SystemExit


##############################################################################
# main
##############################################################################
def main():

    # connect to object storage
    connect_to_object_storage()

    # print command info
    print_command_info()

    print_header("Start Processing")

    count = copy_objects_main(source_namespace, source_bucket)

    print_header("Copied " + str(count) + " files")
    print_header("Copy Completed at " + get_time())


##############################################################################
# Execute
##############################################################################
if __name__ == '__main__':
    main()