Boto3 is the name of the Python SDK for AWS. It allows you to directly create, update, and delete AWS resources from your Python scripts. Boto3 makes it easy to integrate your Python application, library, or script with AWS services including Amazon S3, Amazon EC2, Amazon DynamoDB, and more.
Refer to the full documentation - here
- pip install virtualenv
- python3 -m venv myenv
- cd myenv
- cd Scripts
- ./activate
- pip install boto3
- pip freeze > requirements.txt
- pip install -r requirements.txt
- aws configure
import os
import boto3
ec2_client = boto3.client("ec2")
ec2 = boto3.resource("ec2")
instances = ec2.create_instances(
ImageId = "ami-0cca134ec43cf708f",
MinCount = 1,
MaxCount = 1,
InstanceType = "t2.micro",
KeyName = "rxhxl-key-pair"
)
response = ec2_client.terminate_instances(
InstanceIds = ['instance id of the ec2 instance']
)
def create_instance():
instances = ec2.create_instances(
ImageId = "ami-0cca134ec43cf708f",
MinCount = 1,
MaxCount = 1,
InstanceType = "t2.micro",
KeyName = "rxhxl-key-pair"
)
print(instances)
for i in range(5):
create_instance()
import os
import boto3
import json_handler
import json
#specifying arguments as variables
key_name = "rxhxl-key-pair"
ami_id = "ami-0cca134ec43cf708f"
instance_type = "t2.micro"
region_name = "ap-south-1"
#creating ec2_instance
def create_instance():
instances = ec2_client.run_instances(
ImageId = ami_id,
MinCount =1,
MaxCount = 1,
InstanceType = instance_type,
KeyName = key_name
)
#get ip address of the instance
def get_public_ip(instance_id):
reservations = ec2_client.describe_instances(InstanceIds=[instance_id]).get("Reservations")
for reservation in reservations:
for instance in reservation['Instances']:
print(instance.get('PublicIpAddress'))
def get_running_instances():
reservations = ec2_client.describe_instances(Filters=[
{
"Name":"instance-state-name",
"Values":["running"],
}
]).get("Reservations")
for reservation in reservations:
for instance in reservation["Instances"]:
instance_id = instance["InstanceId"]
instance_type = instance["InstanceType"]
public_ip = instance["PublicIpAddress"]
private_ip = instance["PrivateIpAddress"]
print(f"{instance_id}, {instance_type}, {public_ip}, {private_ip}")
# get_running_instances()
def reboot_instance(instance_id):
response = ec2_client.reboot_instances(InstanceIds=[instance_id])
print(response)
# stop-instance
def stop_instance(instance_id):
response = ec2_client.stop_instances(InstanceIds=[instance_id])
print(response)
#start instances
def start_instance(instance_id):
response = ec2_client.start_instances(InstanceIds=[instance_id])
print(response)
#terminate a single instance
def terminate_instance(instance_id):
response = ec2_client.terminate_instances(InstanceIds=[instance_id])
print(response)
#terminate multiple ec2 instances
def terminate_instances(instance_ids):
response = ec2_client.terminate_instances(InstanceIds=instance_ids)
print(response)
final_instance_list = list(filter(lambda item:(item not in instance_ids),ec2_data["ec2_instance_ids"]))
ec2_data["ec2_instance_ids"] = final_instance_list
print(json.dumps(ec2_data["ec2_instance_ids"]))
import os
import logging
import json
import boto3
from botocore.exceptions import ClientError
from boto3.s3.transfer import TransferConfig
bucket_name = "<your bucket name>"
region_name = "ap-south-1"
#create a bucket
def create_bucket(bucket_name,region=None):
try:
if region is None:
s3_client = boto3.client('s3')
s3_client.create_bucket(Bucket=bucket_name)
else:
s3_client = boto3.client('s3',region_name=region)
location = {'LocationConstraint':region}
s3_client.create_bucket(Bucket=bucket_name,CreateBucketConfiguration=location)
except ClientError as e:
logging.error(e)
return False
return True
# create_bucket(bucket_name,region_name)
# list s3_buckets
def list_buckets(region=None):
s3_client = boto3.client('s3')
try:
if region is not None:
s3_client = boto3.client('s3',region_name=region)
response = s3_client.list_buckets()
print("Existing buckets")
for bucket in response['Buckets']:
print(f'{bucket["Name"]}')
except ClientError as e:
logging.error(e)
return False
return True
# list_buckets('ap-south-1')
#upload files in s3
def upload_file(file_name,bucket,object_name=None):
if object_name is None:
object_name = os.path.basename(file_name)
# upload the file
s3_client = boto3.client('s3')
try:
response = s3_client.upload_file(file_name,bucket,object_name)
except ClientError as e:
logging.error(e)
return False
return True
# upload_file('./file.txt',bucket_name)
# upload file object
def upload_file_object(file_name,bucket,object_name=None):
if object_name is None:
object_name = os.path.basename(file_name)
#upload the file
s3_client = boto3.client('s3')
try:
with open(file_name,"rb") as f:
s3_client.upload_fileobj(f,bucket,object_name)
except ClientError as e:
logging.error(e)
return False
return True
# delete empty buckets
def delete_empty_bucket(bucket):
s3_client = boto3.client('s3')
response = s3_client.delete_bucket(Bucket=bucket_name)
print(response)
# delete not_empty buckets
def delete_non_empty_bucket(bucket):
s3_client = s3 = boto3.client('s3')
bucketClient = s3_client.Bucket(bucket)
bucketClient.objects.all().delete()
bucketClient.meta.client.delete_bucket(Bucket=bucket)
# delete object
def delete_object(bucket,object_name):
s3_client = boto3.client('s3')
response = s3_client.delete_object(Bucket=bucket,Key=object_name)
print(response)
#downloaded files
def download_file(file_name,bucket,object_name):
s3_client = boto3.client('s3')
try:
response = s3_client.download_file(bucket,object_name,file_name)
except ClientError as e:
logging.error(e)
return False
return True
download_file("<the location and file name where to save>","bucket name","name of the object")
def download_file_object(file_name,bucket,object_name):
s3_client = boto3.client('s3')
try:
with open(file_name,"wb") as f:
s3_client.download_fileobj(bucket,object_name,f)
except ClientError as e:
logging.error(e)
return False
return True
# download_file("<the location and file name where to save>","bucket name","name of the object")
def upload_file_multipart(file_name,bucket,object_name = None):
GB = 1024 ** 3
config = TransferConfig(multipart_threshold=5*GB)
if object_name is None:
object_name = os.path.basename(file_name)
s3_client = boto3.client("s3")
try:
response = s3_client.upload_file(file_name,bucket,object_name,Config=config)
except ClientError as e:
logging.error(e)
return False
return True
#download file concurrently
def download_file_concurrently(file_name,bucket,object_name):
config = TransferConfig(max_concurrency=20)
s3_client = boto3.client('s3')
try:
response = s3_client.download_file(bucket,object_name,file_name,Config=config)
except ClientError as e:
logging.error(e)
return False
return True
#create a pre-signed url
def create_presigned_url(bucket,object_name,expiration=3600):
s3_client = boto3.client('s3')
try:
response = s3_client.generate_presigned_url(
'get_object',
Params={'Bucket':bucket,'Key':object_name},
ExpiresIn = expiration
)
except ClientError as e:
logging.error(e)
return None
return response
responseObject = create_presigned_url(bucket_name,'file.txt')
print(responseObject)
import json
import boto3
from botocore.exceptions import ClientError
#create IAM user
def create_iam_user(user_name):
try:
iam_client = boto3.client('iam')
response = iam_client.create_user(UserName=user_name)
except ClientError as e:
if e.response['Error']['Code'] == 'EntityAlreadyExists':
print("Object already exist")
else:
print("unexpected error")
return response
# responseObject = create_iam_user('GTR')
# print(responseObject)
#list all users
def list_all_users():
try:
iam_client = boto3.client('iam')
paginator = iam_client.get_paginator('list_users')
for response in paginator.paginate():
for user in response['Users']:
print("User name :",user['UserName'])
except ClientError as e:
if e.response['Error']['Code'] == 'EntityAlreadyExists':
print('Object already exists')
else:
print('Unexpected error: %s'%e)
# list_all_users()
#function to update iam user
def update_iam_user(existing_user_name,new_user_name):
try:
iam_client = boto3.client('iam')
iam_client.update_user(UserName=existing_user_name,NewUserName=new_user_name)
except ClientError as e:
if e.response['Error']['Code'] == 'EntityAlreadyExists':
print('Object already exists')
else:
print('Unexpected error: %s'%e)
# update_iam_user('GTR','Supraa')
#delete iam user
def delete_iam_user(existing_user_name):
try:
iam_client = boto3.client('iam')
iam_client.delete_user(UserName=existing_user_name)
except ClientError as e:
if e.response['Error']['Code'] == 'EntityAlreadyExists':
print('Object already exists')
else:
print('Unexpected error: %s'%e)
delete_iam_user('Supraa')
#function create iam policy
def create_iam_policy(policy_name,policy_json):
try:
iam_client = boto3.client('iam')
iam_client.create_policy(
PolicyName = policy_name,
PolicyDocument = json.dumps(policy_json)
)
except ClientError as e:
if e.response['Error']['Code'] == 'EntityAlreadyExists':
print("Object already exists")
return False
else:
print("Unexpected error: %s" % e)
return False
return True
#example of json policy
custom_policy_json = {
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": [
"ec2:*"
],
"Resource": "*"
}]
}
#attach policy with the user
def attach_custom_iam_policy_with_user(policy_name,user_name):
try:
sts = boto3.client('sts')
account_id = sts.get_caller_identity()['Account']
policy_arn = f'arn:aws:iam::{account_id}:policy/{policy_name}'
iam_client = boto3.client('iam')
iam_client.attach_user_policy(
UserName = user_name,
PolicyArn = policy_arn
)
except ClientError as e:
if e.response['Error']['Code'] == 'EntityAlreadyExists':
print("Object already exists")
else:
print("Unexpected error: %s" % e)
def attach_managed_iam_policy_with_user(policy_name,user_name):
try:
sts = boto3.client('sts')
policy_arn = f'arn:aws:iam::aws:policy/{policy_name}'
iam_client = boto3.client('iam')
iam_client.attach_user_policy(
UserName=user_name,
PolicyArn=policy_arn
)
except ClientError as e:
if e.response['Error']['Code'] == 'EntityAlreadyExists':
print("Object already exists")
else:
print("Unexpected error: %s" % e)
def detach_custom_iam_policy_with_user(policy_name, user_name):
try:
sts = boto3.client('sts')
account_id = sts.get_caller_identity()['Account']
policy_arn = f'arn:aws:iam::{account_id}:policy/{policy_name}'
iam_client = boto3.client('iam')
iam_client.detach_user_policy(
UserName=user_name,
PolicyArn=policy_arn
)
except ClientError as e:
if e.response['Error']['Code'] == 'EntityAlreadyExists':
print("Object already exists")
else:
print("Unexpected error: %s" % e)
#detach_custom_iam_policy_with_user("test_policy_1_by_sandip","rahul_policy_test")
def detach_managed_iam_policy_with_user(policy_name, user_name):
try:
sts = boto3.client('sts')
policy_arn = f'arn:aws:iam::aws:policy/{policy_name}'
iam_client = boto3.client('iam')
iam_client.detach_user_policy(
UserName=user_name,
PolicyArn=policy_arn
)
except ClientError as e:
if e.response['Error']['Code'] == 'EntityAlreadyExists':
print("Object already exists")
else:
print("Unexpected error: %s" % e)
#detach_managed_iam_policy_with_user("AdministratorAccess", "rahul_policy_test")
def add_policy_to_role(role_name, policy_arn):
try:
iam_client = boto3.client('iam')
iam_client.attach_role_policy(
RoleName=role_name,
PolicyArn=policy_arn
)
except ClientError as e:
if e.response['Error']['Code'] == 'EntityAlreadyExists':
print("Object already exists")
else:
print("Unexpected error: %s" % e)
def attach_custom_iam_policy_with_role(policy_name, role_name):
try:
sts = boto3.client('sts')
account_id = sts.get_caller_identity()['Account']
policy_arn = f'arn:aws:iam::{account_id}:policy/{policy_name}'
iam_client = boto3.client('iam')
iam_client.attach_role_policy(
RoleName=role_name,
PolicyArn=policy_arn
)
except ClientError as e:
if e.response['Error']['Code'] == 'EntityAlreadyExists':
print("Object already exists")
else:
print("Unexpected error: %s" % e)
def create_role(role_name, trust_document):
try:
iam_client = boto3.client('iam')
iam_client.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(trust_document)
)
except ClientError as e:
if e.response['Error']['Code'] == 'EntityAlreadyExists':
print("Object already exists")
else:
print("Unexpected error: %s" % e)