PDO Bulk Library PDO Bulk Library
Tools for easy use bulk/batch inserts with support ON CONFLICT/ON DUPLICATE KEY/DO NOTHING/INSERT IGNORE, RETURNING sql operators and simple math/concatenation logic.
Contains helper classes for interacting with databases.
Currently supported PDO Postgresql/MySQL.
Installation
Install from composer:
cd /path/to/yourproject
composer require eltaline/bulk
Or install by local file:
cd /path/to/yourproject
wget -O composer.json https://raw.githubusercontent.com/eltaline/bulk/master/composer-local.json
composer install
Requirements
This library requires PHP 7.1 or later.
Package overview
This package contains several helpers:
PgSQL: PSLIns
, PSLInsNth
, PSLInsUpd
, PSLDel
.
MySQL: MSLIns
, MSLInsNth
, MSLInsUpd
, MSLDel
.
These classes built on top of PDO
, allow you to speed up database
rows insertion & deletion by performing multiple(bulk) operations per query, with this API.
Testing table scheme
Create table for testing in PostgreSQL/MySQL.
CREATE TABLE tablename (id integer NOT NULL, name varchar(128), class varchar(128), age integer, height integer, weight integer, PRIMARY KEY(id,name));
Description important fucntions
Flush function is important, and need to be used everywhere at the end of work with queue. Do write last queue buffer and reset queue buffer.
$ins->flush(); // Write last or < queue size, part of data from queue.
Queue functions queue() or queuearray() can be used only separately in one loop.
$ins->queue(); // Collect data values until queue size, then write part of data.
$ins->queuearray(); // Collect data values(from simple array with single query values of data) until queue size, then write part of data.
Description optional functions
Can use optional counters in logic.
$tot = $ins->getTotalOperations();
$aff = $ins->getAffectedRows();
Reset function for buffer and counters, do reset queue buffer and counters.
$ins->reset(); // Resetting only counters
$ins->resetbuf(); // Resetting only buffer
$ins->resetall(); // Resetting buffer and counters
PSLIns & MSLIns
This class takes advantage of the bulk insert to empty/temp tables.
- Implements INSERT ...
- Supported RETURNING operator
To use it, create a PSLIns
or MSLIns
instance with:
- your
PDO
connection object - the number of inserts to perform per bulk query
- the name of your table
- the name of the columns to insert
- optional set empty array(not used for INSERT, but need empty if set RETURNING options)
- optional set empty array(not used for INSERT, but need empty if set RETURNING options)
- optional set RETURNING field or fields or '*'
- optional set PDO fetch mode for RETURNING clause(by default empty)
PSLInsNth & MSLInsNth
This class takes advantage of the bulk insert without duplicate key errors in tables.
- Implements ON CONFLICT DO NOTHING and INSERT IGNORE
- Supported RETURNING operator
To use it, create a PSLInsNth
or MSLInsNth
instance with:
- your
PDO
connection object - the number of inserts to perform per bulk query
- the name of your table
- the name of the columns to insert
- optional set empty array(not used for INSERT, but need empty if set RETURNING options)
- optional set empty array(not used for INSERT, but need empty if set RETURNING options)
- optional set RETURNING field or fields or '*'
- optional set PDO fetch mode for RETURNING clause(by default empty)
PSLInsUpd & MSLInsUpd
This class takes advantage of the bulk insert with simple math logic in tables.
- Implements ON CONFLICT (unqiue/composite key) DO UPDATE SET ... and ON DUPLICATE KEY UPDATE ...
- Supported + - / * math operators (column+column, column-column, column/column , column*column)
- Supported | concatenation operator (column|column|;) where ; is separator
- Supported RETURNING operator
To use it, create a PSLInsUpd
or MSLInsUpd
instance with:
- your
PDO
connection object - the number of inserts to perform per bulk query
- the name of your table
- the name of the columns to insert
- the name of the columns as primary key of table (unique/composite)
- the name of the columns for update or column names with format column+column for update and value addition
- optional set RETURNING field or fields or '*'
- optional set PDO fetch mode for RETURNING clause(by default empty)
PSLDel & MSLDel
This class takes advantage of the bulk delete by value`s of column or columns.
- Implements DELETE ...
- Supported RETURNING operator
To use it, create a PSLDel
or MSLDel
instance with:
- your
PDO
connection object - the number of inserts to perform per bulk query
- the name of your table
- the name of the column or columns for where statement
- optional set empty array(not using now for DELETE, but need empty if set RETURNING options)
- optional set empty array(not using now for DELETE, but need empty if set RETURNING options)
- optional set RETURNING field or fields or '*'
- optional set PDO fetch mode for RETURNING clause(by default empty)
Beginning
include(__DIR__ . '/vendor/autoload.php');
use PDOBulk\Db\PSLIns;
use PDOBulk\Db\PSLInsNth;
use PDOBulk\Db\PSLInsUpd;
use PDOBulk\Db\PSLDel;
use PDOBulk\Db\MSLIns;
use PDOBulk\Db\MSLInsNth;
use PDOBulk\Db\MSLInsUpd;
use PDOBulk\Db\MSLDel;
$pdo = new PDO(...);
$pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
$pdo->setAttribute(PDO::ATTR_EMULATE_PREPARES, false);
$data = array();
$data[] = ['1', 'Mark', 'Soldier' , '24', '175', '85'];
$data[] = ['2', 'Steve', 'Engineer', '36', '190', '95'];
$data[] = ['3', 'Clara', 'Sniper', '18', '180', '57'];
Transactions
You can also use transaction for speed up bulk query.
$pdo->beginTransaction(); // Before loop with queue() or queuearray();
$pdo->commit(); // After loop with queue() or queuearray();
Additionally use try with catch (Exception $e) for rollback transaction.
$pdo->beginTransaction();
try {
foreach (...) {
$ins->queue(...);
//$ins->queuearray(...);
}
} catch (Exception $e) {
$pdo->rollBack();
throw $ins;
}
}
try {
$ins->flush();
$pdo->commit();
} catch (Exception $e) {
$pdo->rollBack();
throw $ins;
}
Simple Operations
Supported in next classes:
PgSQL: PSLIns
, PSLInsNth
, PSLInsUpd
, PSLDel
.
MySQL: MSLIns
, PSLInsNth
, MSLInsUpd
, MSLDel
.
- Classes PSLInsNth & MSLInsNth implements ON CONFLICT DO NOTHING & INSERT IGNORE clauses.
Format: (connection object, queue size, table, [values columns])
$ins = new PSLIns($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight']);
$ins = new MSLIns($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight']);
Format: (connection object, queue size, table, [values columns])
$ins = new PSLInsNth($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight']);
$ins = new MSLInsNth($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight']);
Format: (connection object, queue size, table, [values columns], [conflict/duplicate columns], [update columns])
$ins = new PSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class','weight']);
$ins = new MSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class','weight']);
Format: (connection object, queue size, table, [values columns])
$ins = new PSLDel($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight']);
$ins = new MSLDel($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight']);
Full example:
include(__DIR__ . '/vendor/autoload.php');
use PDOBulk\Db\PSLIns;
use PDOBulk\Db\PSLInsNth;
use PDOBulk\Db\PSLInsUpd;
use PDOBulk\Db\PSLDel;
use PDOBulk\Db\MSLIns;
use PDOBulk\Db\MSLInsNth;
use PDOBulk\Db\MSLInsUpd;
use PDOBulk\Db\MSLDel;
$pdo = new PDO(...);
$pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
$pdo->setAttribute(PDO::ATTR_EMULATE_PREPARES, false);
$data = array();
$data[] = ['1', 'Mark', 'Soldier' , '24', '175', '85'];
$data[] = ['2', 'Steve', 'Engineer', '36', '190', '95'];
$data[] = ['3', 'Clara', 'Sniper', '18', '180', '57'];
$ins = new PSLIns($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight']);
//$ins = new PSLInsNth($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight']);
//$ins = new PSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class','weight']);
//$ins = new PSLDel($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight']);
//$ins = new MSLIns($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight']);
//$ins = new MSLInsNth($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight']);
//$ins = new MSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class','weight']);
//$ins = new MSLDel($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight']);
foreach ($data as $fields) {
$id = $fields[0];
$name = $fields[1];
$class = $fields[2];
$age = $fields[3];
$height = $fields[4];
$weight = $fields[5];
$ins->queue($id, $name, $class, $age, $height, $weight);
//$ins->queuearray($fields);
}
//Bulk Write Complete Operation
$res = $ins->flush();
$tot = $ins->getTotalOperations();
$aff = $ins->getAffectedRows();
//Reset function for counters
$ins->reset();
print("Total queue operations: " . $tot . "\n");
print("Total affected rows: " . $aff . "\n");
Advanced Operations
Supported in next classes:
PgSQL: PSLInsUpd
.
MySQL: MSLInsUpd
.
- Supported math/concatenation operations on fields with previous values.
Format: (connection object, queue size, table, [values columns], [conflict/duplicate columns], [update columns])
$ins = new PSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class|class|;','weight+weight']);
$ins = new MSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class|class|;','weight+weight']);
$ins = new PSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class|class|;','weight-weight']);
$ins = new MSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class|class|;','weight-weight']);
$ins = new PSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class|class|;','weight*weight']);
$ins = new MSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class|class|;','weight*weight']);
$ins = new PSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class|class|;','weight/weight']);
$ins = new MSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class|class|;','weight/weight']);
Full example:
include(__DIR__ . '/vendor/autoload.php');
use PDOBulk\Db\PSLInsUpd;
use PDOBulk\Db\MSLInsUpd;
$pdo = new PDO(...);
$pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
$pdo->setAttribute(PDO::ATTR_EMULATE_PREPARES, false);
$data = array();
$data[] = ['1', 'Mark', 'Soldier' , '24', '175', '85'];
$data[] = ['2', 'Steve', 'Engineer', '36', '190', '95'];
$data[] = ['3', 'Clara', 'Sniper', '18', '180', '57'];
$ins = new PSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class|class|;','weight+weight']);
//$ins = new MSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class|class|;','weight+weight']);
foreach ($data as $fields) {
$id = $fields[0];
$name = $fields[1];
$class = $fields[2];
$age = $fields[3];
$height = $fields[4];
$weight = $fields[5];
$ins->queue($id, $name, $class, $age, $height, $weight);
//$ins->queuearray($fields);
}
//Bulk Write Complete Operation
$res = $ins->flush();
$tot = $ins->getTotalOperations();
$aff = $ins->getAffectedRows();
//Reset function for counters
$ins->reset();
print("Total queue operations: " . $tot . "\n");
print("Total affected rows: " . $aff . "\n");
Returning Operations
Supported in next classes:
PgSQL: PSLIns
, PSLInsNth
, PSLInsUpd
, PSLDel
.
MySQL: MSLIns
, MSLInsNth
, MSLInsUpd
, MSLDel
.
Use one of supported class with RETURNING operator.
- Last argument is PDO fetch mode
- Also can use advanced math/concatenation operations together with returning operations
Supported fetch modes:
PDO::FETCH_BOTH, PDO::FETCH_NUM,
PDO::FETCH_ASSOC, PDO::FETCH_UNIQUE,
PDO::FETCH_GROUP, PDO::FETCH_KEY_PAIR,
PDO::FETCH_GROUP|PDO::FETCH_COLUMN,
PDO::FETCH_OBJ
Format: (connection object, queue size, table, [values columns], [not used], [not used], [returning columns], [fetch mode])
$ins = new PSLIns($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], [], [], ['id,name'], ['PDO::FETCH_ASSOC']);
$ins = new MSLIns($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], [], [], ['id,name'], ['PDO::FETCH_ASSOC']);
Format: (connection object, queue size, table, [values columns], [not used], [not used], [returning columns], [fetch mode])
$ins = new PSLInsNth($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], [], [], ['id,name'], ['PDO::FETCH_ASSOC']);
$ins = new MSLInsNth($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], [], [], ['id,name'], ['PDO::FETCH_ASSOC']);
Format: (connection object, queue size, table, [values columns], [conflict/duplicate columns], [update columns], [returning columns], [fetch mode])
$ins = new PSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class','weight'], ['id,name'], ['PDO::FETCH_ASSOC']);
$ins = new MSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class','weight'], ['id,name'], ['PDO::FETCH_ASSOC']);
Format: (connection object, queue size, table, [values columns], [not using now], [not using now], [returning columns], [fetch mode])
$ins = new PSLDel($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], [], [], ['id,name'], ['PDO::FETCH_ASSOC']);
$ins = new MSLDel($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], [], [], ['id,name'], ['PDO::FETCH_ASSOC']);
Full example:
include(__DIR__ . '/vendor/autoload.php');
use PDOBulk\Db\PSLIns;
use PDOBulk\Db\PSLInsNth;
use PDOBulk\Db\PSLInsUpd;
use PDOBulk\Db\PSLDel;
use PDOBulk\Db\MSLIns;
use PDOBulk\Db\MSLInsNth;
use PDOBulk\Db\MSLInsUpd;
use PDOBulk\Db\MSLDel;
$pdo = new PDO(...);
$pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
$pdo->setAttribute(PDO::ATTR_EMULATE_PREPARES, false);
$data = array();
$data[] = ['1', 'Mark', 'Soldier' , '24', '175', '85'];
$data[] = ['2', 'Steve', 'Engineer', '36', '190', '95'];
$data[] = ['3', 'Clara', 'Sniper', '18', '180', '57'];
$ins = new PSLIns($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], [], [], ['id,name'], ['PDO::FETCH_ASSOC']);
//$ins = new PSLInsNth($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], [], [], ['id,name'], ['PDO::FETCH_ASSOC']);
//$ins = new PSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class','weight'], ['id,name'], ['PDO::FETCH_ASSOC']);
//$ins = new PSLDel($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], [], [], ['id,name'], ['PDO::FETCH_ASSOC']);
//$ins = new MSLIns($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], [], [], ['id,name'], ['PDO::FETCH_ASSOC']);
//$ins = new MSLInsNth($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], [], [], ['id,name'], ['PDO::FETCH_ASSOC']);
//$ins = new MSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class','weight'], ['id,name'], ['PDO::FETCH_ASSOC']);
//$ins = new MSLDel($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], [], [], ['id,name'], ['PDO::FETCH_ASSOC']);
$part = array(); // Define array for merge $res by RETURNING data of column or columns.
foreach ($data as $fields) {
$id = $fields[0];
$name = $fields[1];
$class = $fields[2];
$age = $fields[3];
$height = $fields[4];
$weight = $fields[5];
$res = $ins->queue($id, $name, $class, $age, $height, $weight);
//$res = $ins->queuearray($fields);
if(!empty($res)) $part[] = $res;
}
//Bulk Write Complete Operation
$res = $ins->flush();
//Reassemble queue+flush part of RETURNING array
if(!empty($res)) $part[] = $res;
$retarray = array();
foreach($part as $k => $v) {
foreach($v as $km => $vm) {
$retarray[]=$vm;
}
}
$tot = $ins->getTotalOperations();
$aff = $ins->getAffectedRows();
//Reset function for counters
$ins->reset();
print_r($retarray);
print("Total queue operations: " . $tot . "\n");
print("Total affected rows: " . $aff . "\n");
RAW Sql
For setting columns in RAW Sql format, You need add suffix :IS_RAW in column names. Example:
$ins = new PSLInsUpd($pdo, 1, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','COALESCE(name, \'test\'):IS_RAW'], ['name']);
/*
Prepared sql:
INSERT INTO users ("id", "name", "class", "age", "height", "weight") VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT ("id", COALESCE(name, 'test')) DO UPDATE name = EXCLUDED."name"
*/
Performance tips
To get the maximum performance out of this library, you should:
- wrap your operations in a transaction
- disable emulation of prepared statements (
PDO::ATTR_EMULATE_PREPARES=false
)
These two tips combined can get you up to 50% more throughput in terms of inserts per second.
Recommendations
When using transactions, I recommend not forget use over this helpers - try and catch with throw and $pdo->rollBack();
.
Limitations
Be careful when raising the number of operations per bulk query, as you might hit these limits.
$ins = new PSL...($pdo, 1000, 'tablename', ['columnname']);
Recommended use this library with 100-1000 queries per bulk query insertions.
- PHP's [memory_limit]
- MySQL's [max_allowed_packet]
- PDO also has a limit of 65535 query parameters per statement,
effectively limiting the number of operations per query to
floor(65535 / number of columns)
.
Maximum 65535 query parameters is allowed. Ex. 65535 / 10 columns ~= 10922 (is max queries per 1 bulk query).