A Python implementation of the Table Schema standard.
Table
to work with data tables described by Table SchemaSchema
representing Table SchemaField
representing Table Schema fieldvalidate
to validate Table Schemainfer
to infer Table Schema from data- built-in command-line interface to validate and infer schemas
- storage/plugins system to connect tables to different storage backends like SQL Database
The package uses semantic versioning. It means that major versions could include breaking changes. It's highly recommended to specify tableschema
version range in your setup/requirements
file e.g. tableschema>=1.0,<2.0
.
$ pip install tableschema
Let's start with a simple example:
from tableschema import Table
# Create table
table = Table('path.csv', schema='schema.json')
# Print schema descriptor
print(table.schema.descriptor)
# Print cast rows in a dict form
for keyed_row in table.iter(keyed=True):
print(keyed_row)
A table is a core concept in a tabular data world. It represents data with metadata (Table Schema). Let's see how we can use it in practice.
Consider we have some local csv file. It could be inline data or from a remote link - all supported by the Table
class (except local files for in-brower usage of course). But say it's data.csv
for now:
city,location
london,"51.50,-0.11"
paris,"48.85,2.30"
rome,N/A
Let's create and read a table instance. We use the static Table.load
method and the table.read
method with the keyed
option to get an array of keyed rows:
table = Table('data.csv')
table.headers # ['city', 'location']
table.read(keyed=True)
# [
# {city: 'london', location: '51.50,-0.11'},
# {city: 'paris', location: '48.85,2.30'},
# {city: 'rome', location: 'N/A'},
# ]
As we can see, our locations are just strings. But they should be geopoints. Also, Rome's location is not available, but it's just a string N/A
instead of None
. First we have to infer Table Schema:
table.infer()
table.schema.descriptor
# { fields:
# [ { name: 'city', type: 'string', format: 'default' },
# { name: 'location', type: 'geopoint', format: 'default' } ],
# missingValues: [ '' ] }
table.read(keyed=True)
# Fails with a data validation error
Let's fix the "not available" location. There is a missingValues
property in Table Schema specification. As a first try we set missingValues
to N/A
in table.schema.descriptor
. The schema descriptor can be changed in-place, but all changes should also be committed using table.schema.commit()
:
table.schema.descriptor['missingValues'] = 'N/A'
table.schema.commit()
table.schema.valid # false
table.schema.errors
# [<ValidationError: "'N/A' is not of type 'array'">]
As a good citizens we've decided to check our schema descriptor's validity. And it's not valid! We should use an array for the missingValues
property. Also, don't forget to include "empty string" as a valid missing value:
table.schema.descriptor['missingValues'] = ['', 'N/A']
table.schema.commit()
table.schema.valid # true
All good. It looks like we're ready to read our data again:
table.read(keyed=True)
# [
# {city: 'london', location: [51.50,-0.11]},
# {city: 'paris', location: [48.85,2.30]},
# {city: 'rome', location: null},
# ]
Now we see that:
- locations are arrays with numeric latitude and longitude
- Rome's location is a native Python
None
And because there are no errors after reading, we can be sure that our data is valid against our schema. Let's save it:
table.schema.save('schema.json')
table.save('data.csv')
Our data.csv
looks the same because it has been stringified back to csv
format. But now we have schema.json
:
{
"fields": [
{
"name": "city",
"type": "string",
"format": "default"
},
{
"name": "location",
"type": "geopoint",
"format": "default"
}
],
"missingValues": [
"",
"N/A"
]
}
If we decide to improve it even more we could update the schema file and then open it again. But now providing a schema path:
table = Table('data.csv', schema='schema.json')
# Continue the work
As already mentioned a given schema can be used to validate data (see the Schema section for schema specification details). In default mode invalid data rows immediately trigger an exception in the table.iter()
/table.write()
methods.
Suppose this schema-invalid local file invalid_data.csv
:
key,value
zero,0
one,not_an_integer
two,2
We're going to validate the data against the following schema:
table = Table(
'invalid_data.csv',
schema={'fields': [{'name': 'key'}, {'name': 'value', 'type': 'integer'}]})
Iterating over the data triggers an exception due to the failed cast of 'not_an_integer'
to int
:
for row in table.iter():
print(row)
# Traceback (most recent call last):
# ...
# tableschema.exceptions.CastError: There are 1 cast errors (see exception.errors) for row "3"
Hint: The row number count starts with 1 and also includes header lines.
(Note: You can optionally switch off iter()
/read()
value casting using the cast parameter, see reference below.)
By providing a custom exception handler (a callable) to those methods you can treat occurring exceptions at your own discretion, i.e. to "fail late" and e.g. gather a validation report on the whole data:
errors = []
def exc_handler(exc, row_number=None, row_data=None, error_data=None):
errors.append((exc, row_number, row_data, error_data))
for row in table.iter(exc_handler=exc_handler):
print(row)
# ['zero', 0]
# ['one', FailedCast('not_an_integer')]
# ['two', 2]
print(errors)
# [(CastError('There are 1 cast errors (see exception.errors) for row "3"',),
# 3,
# OrderedDict([('key', 'one'), ('value', 'not_an_integer')]),
# OrderedDict([('value', 'not_an_integer')]))]
Note that
- Data rows are yielded even though the data is schema-invalid; this is due to our custom expression handler choosing not to raise exceptions (but rather collect them in the errors list).
- Data field values that can't get casted properly (if
iter()
/read()
cast parameter is set to True, which is the default) are wrapped into aFailedCast
"value holder". This allows for distinguishing uncasted values from successfully casted values on the data consumer side.FailedCast
instances can only get yielded when custom exception handling is in place. - The custom exception handler callable must support a function signature as specified in the
iter()
/read()
sections of theTable
class API reference.
A model of a schema with helpful methods for working with the schema and supported data. Schema instances can be initialized with a schema source as a url to a JSON file or a JSON object. The schema is initially validated (see validate below). By default validation errors will be stored in schema.errors
but in a strict mode it will be instantly raised.
Let's create a blank schema. It's not valid because descriptor.fields
property is required by the Table Schema specification:
schema = Schema()
schema.valid # false
schema.errors
# [<ValidationError: "'fields' is a required property">]
To avoid creating a schema descriptor by hand we will use a schema.infer
method to infer the descriptor from given data:
schema.infer([
['id', 'age', 'name'],
['1','39','Paul'],
['2','23','Jimmy'],
['3','36','Jane'],
['4','28','Judy'],
])
schema.valid # true
schema.descriptor
#{ fields:
# [ { name: 'id', type: 'integer', format: 'default' },
# { name: 'age', type: 'integer', format: 'default' },
# { name: 'name', type: 'string', format: 'default' } ],
# missingValues: [ '' ] }
Now we have an inferred schema and it's valid. We can cast data rows against our schema. We provide a string input which will be cast correspondingly:
schema.cast_row(['5', '66', 'Sam'])
# [ 5, 66, 'Sam' ]
But if we try provide some missing value to the age
field, the cast will fail because the only valid "missing" value is an empty string. Let's update our schema:
schema.cast_row(['6', 'N/A', 'Walt'])
# Cast error
schema.descriptor['missingValues'] = ['', 'N/A']
schema.commit()
schema.cast_row(['6', 'N/A', 'Walt'])
# [ 6, None, 'Walt' ]
We can save the schema to a local file, and resume work on it at any time by loading it from that file:
schema.save('schema.json')
schema = Schema('schema.json')
from tableschema import Field
# Init field
field = Field({'name': 'name', 'type': 'number'})
# Cast a value
field.cast_value('12345') # -> 12345
Data values can be cast to native Python objects with a Field instance. Type instances can be initialized with field descriptors. This allows formats and constraints to be defined.
Casting a value will check the value is of the expected type, is in the correct format, and complies with any constraints imposed by a schema. E.g. a date value (in ISO 8601 format) can be cast with a DateType instance. Values that can't be cast will raise an InvalidCastError
exception.
Casting a value that doesn't meet the constraints will raise a ConstraintError
exception.
cli()
Command-line interface
Usage: tableschema [OPTIONS] COMMAND [ARGS]...
Options:
--help Show this message and exit.
Commands:
infer Infer a schema from data.
info Return info on this version of Table Schema
validate Validate that a supposed schema is in fact a Table Schema.
Table(self, source, schema=None, strict=False, post_cast=[], storage=None, **options)
Table representation
Arguments
- source (str/list[]): data source one of:
- local file (path)
- remote file (url)
- array of arrays representing the rows
- schema (any): data schema in all forms supported by
Schema
class - strict (bool): strictness option to pass to
Schema
constructor - post_cast (function[]): list of post cast processors
- storage (None): storage name like
sql
orbigquery
- options (dict):
tabulator
or storage's options
Raises
TableSchemaException
: raises on any error
Table's SHA256 hash if it's available.
If it's already read using e.g. table.read
, otherwise returns None
.
In the middle of an iteration it returns hash of already read contents
Returns
str/None
: SHA256 hash
Table's headers is available
Returns
str[]
: headers
Returns schema class instance if available
Returns
Schema
: schema
Table's size in BYTES if it's available
If it's already read using e.g. table.read
, otherwise returns None
.
In the middle of an iteration it returns size of already read contents
Returns
int/None
: size in BYTES
table.iter(self, keyed=False, extended=False, cast=True, integrity=False, relations=False, foreign_keys_values=False, exc_handler=None)
Iterates through the table data and emits rows cast based on table schema.
Arguments
- keyed (bool):
yield keyed rows in a form of
{header1: value1, header2: value2}
(default is false; the form of rows is[value1, value2]
) - extended (bool):
yield extended rows in a for of
[rowNumber, [header1, header2], [value1, value2]]
(default is false; the form of rows is[value1, value2]
) - cast (bool): disable data casting if false (default is true)
- integrity (dict):
dictionary in a form of
{'size': <bytes>, 'hash': '<sha256>'}
to check integrity of the table when it's read completely. Both keys are optional. - relations (dict):
dictionary of foreign key references in a form
of
{resource1: [{field1: value1, field2: value2}, ...], ...}
. If provided, foreign key fields will checked and resolved to one of their references (/!\ one-to-many fk are not completely resolved). - foreign_keys_values (dict):
three-level dictionary of foreign key references optimized
to speed up validation process in a form of
{resource1: {(fk_field1, fk_field2): {(value1, value2): {one_keyedrow}, ... }}}
. If not provided but relations is true, it will be created before the validation process by index_foreign_keys_values method - exc_handler (func): optional custom exception handler callable. Can be used to defer raising errors (i.e. "fail late"), e.g. for data validation purposes. Must support the signature below
Custom exception handler
def exc_handler(exc, row_number=None, row_data=None, error_data=None):
'''Custom exception handler (example)
# Arguments:
exc(Exception):
Deferred exception instance
row_number(int):
Data row number that triggers exception exc
row_data(OrderedDict):
Invalid data row source data
error_data(OrderedDict):
Data row source data field subset responsible for the error, if
applicable (e.g. invalid primary or foreign key fields). May be
identical to row_data.
'''
# ...
Raises
TableSchemaException
: base class of any errorCastError
: data cast errorIntegrityError
: integrity checking errorUniqueKeyError
: unique key constraint violationUnresolvedFKError
: unresolved foreign key reference error
Returns
Iterator[list]
: yields rows
table.read(self, keyed=False, extended=False, cast=True, limit=None, integrity=False, relations=False, foreign_keys_values=False, exc_handler=None)
Read the whole table and return as array of rows
It has the same API as
table.iter
except for
Arguments
- limit (int): limit count of rows to read and return
Returns
list[]
: returns rows
table.infer(self, limit=100, confidence=0.75, missing_values=[''])
Infer a schema for the table.
It will infer and set Table Schema to table.schema
based on table data.
Arguments
- limit (int): limit rows sample size
- confidence (float): how many casting errors are allowed (as a ratio, between 0 and 1)
- missing_values (str[]): list of missing values (by default
['']
)
Returns
dict
: Table Schema descriptor
table.save(self, target, storage=None, **options)
Save data source to file locally in CSV format with ,
(comma) delimiter
To save schema use
table.schema.save()
Arguments
- target (str): saving target (e.g. file path)
- storage (None/str): storage name like
sql
orbigquery
- options (dict):
tabulator
or storage options
Raises
TableSchemaException
: raises an error if there is saving problem
Returns
True/Storage
: returns true or storage instance
table.index_foreign_keys_values(self, relations)
Creates a three-level dictionary of foreign key references
We create them optimized to speed up validation process in a form of
{resource1: {(fk_field1, fk_field2): {(value1, value2): {one_keyedrow}, ... }}}
.
For each foreign key of the schema it will iterate through the corresponding
relations['resource']
to create an index (i.e. a dict) of existing values
for the foreign fields and store on keyed row for each value combination.
The optimization relies on the indexation of possible values for one foreign key in a hashmap to later speed up resolution.
This method is public to allow creating the index once to apply it on multiple tables charing the same schema (typically grouped resources in datapackage)
Notes
- the second key of the output is a tuple of the foreign fields, a proxy identifier of the foreign key
- the same relation resource can be indexed multiple times as a schema can contain more than one Foreign Keys pointing to the same resource
Arguments
- relations (dict):
dict of foreign key references in a form of
{resource1: [{field1: value1, field2: value2}, ...], ...}
. It must contain all resources pointed in the foreign keys schema definition.
Returns
dict
:
returns a three-level dictionary of foreign key references
optimized to speed up validation process in a form of
{resource1: {(fk_field1, fk_field2): {(value1, value2): {one_keyedrow}, ... }}})
Schema(self, descriptor={}, strict=False)
Schema representation
Arguments
- descriptor (str/dict): schema descriptor one of: - local path - remote url - dictionary
- strict (bool): flag to specify validation behaviour:
- if false, errors will not be raised but instead collected in
schema.errors
- if true, validation errors are raised immediately
Raises
TableSchemaException
: raise any error that occurs during the process
Schema's descriptor
Returns
dict
: descriptor
Validation errors
Always empty in strict mode.
Returns
Exception[]
: validation errors
Schema's field names
Returns
str[]
: an array of field names
Schema's fields
Returns
Field[]
: an array of field instances
Schema's foreign keys
Returns
dict[]
: foreign keys
Schema's field names
Returns
str[]
: an array of field names
Schema's primary keys
Returns
str[]
: primary keys
Validation status
Always true in strict mode.
Returns
bool
: validation status
schema.get_field(self, name)
Get schema's field by name.
Use
table.update_field
if you want to modify the field descriptor
Arguments
- name (str): schema field name
Returns
Field/None
: Field
instance or None
if not found
schema.get_field(self, name)
Get schema's field by name.
Use
table.update_field
if you want to modify the field descriptor
Arguments
- name (str): schema field name
Returns
Field/None
: Field
instance or None
if not found
schema.add_field(self, descriptor)
Add new field to schema.
The schema descriptor will be validated with newly added field descriptor.
Arguments
- descriptor (dict): field descriptor
Raises
TableSchemaException
: raises any error that occurs during the process
Returns
Field/None
: added Field
instance or None
if not added
schema.update_field(self, name, update)
Update existing descriptor field by name
Arguments
- name (str): schema field name
- update (dict): update to apply to field's descriptor
Returns
bool
: true on success and false if no field is found to be modified
schema.remove_field(self, name)
Remove field resource by name.
The schema descriptor will be validated after field descriptor removal.
Arguments
- name (str): schema field name
Raises
TableSchemaException
: raises any error that occurs during the process
Returns
Field/None
: removed Field
instances or None
if not found
schema.cast_row(self, row, fail_fast=False, row_number=None, exc_handler=None)
Cast row based on field types and formats.
Arguments
- row (any[]: data row as an array of values
Returns
any[]
: returns cast data row
schema.infer(self, rows, headers=1, confidence=0.75, guesser_cls=None, resolver_cls=None)
Infer and set schema.descriptor
based on data sample.
Arguments
- rows (list[]): array of arrays representing rows.
- headers (int/str[]): data sample headers (one of):
- row number containing headers (
rows
should contain headers rows) - array of headers (rows
should NOT contain headers rows) - confidence (float): how many casting errors are allowed (as a ratio, between 0 and 1)
- guesser_cls (class): you can implement inferring strategies by providing type-guessing and type-resolving classes [experimental]
- resolver_cls (class): you can implement inferring strategies by providing type-guessing and type-resolving classes [experimental]
Returns
dict
: Table Schema descriptor
schema.commit(self, strict=None)
Update schema instance if there are in-place changes in the descriptor.
Example
from tableschema import Schema
descriptor = {'fields': [{'name': 'my_field', 'title': 'My Field', 'type': 'string'}]}
schema = Schema(descriptor)
print(schema.get_field('my_field').descriptor['type']) # string
# Update descriptor by field position
schema.descriptor['fields'][0]['type'] = 'number'
# Update descriptor by field name
schema.update_field('my_field', {'title': 'My Pretty Field'}) # True
# Change are not committed
print(schema.get_field('my_field').descriptor['type']) # string
print(schema.get_field('my_field').descriptor['title']) # My Field
# Commit change
schema.commit()
print(schema.get_field('my_field').descriptor['type']) # number
print(schema.get_field('my_field').descriptor['title']) # My Pretty Field
Arguments
- strict (bool): alter
strict
mode for further work
Raises
TableSchemaException
: raises any error that occurs during the process
Returns
bool
: true on success and false if not modified
schema.save(self, target, ensure_ascii=True)
Save schema descriptor to target destination.
Arguments
- target (str): path where to save a descriptor
Raises
TableSchemaException
: raises any error that occurs during the process
Returns
bool
: true on success
Field(self, descriptor, missing_values=[''], schema=None)
Field representaion
Arguments
- descriptor (dict): schema field descriptor
- missingValues (str[]): an array with string representing missing values
Raises
TableSchemaException
: raises any error that occurs during the process
Field constraints
Returns
dict
: dict of field constraints
Fields's descriptor
Returns
dict
: descriptor
Field format
Returns
str
: field format
Field name
Returns
str
: field name
Whether field is required
Returns
bool
: true if required
Returns a schema instance if the field belongs to some schema
Returns
Schema
: field's schema
Field type
Returns
str
: field type
field.cast_value(self, value, constraints=True, preserve_missing_values=False)
Cast given value according to the field type and format.
Arguments
- value (any): value to cast against field
- constraints (boll/str[]): gets constraints configuration - it could be set to true to disable constraint checks - it could be an Array of constraints to check e.g. ['minimum', 'maximum']
Raises
TableSchemaException
: raises any error that occurs during the process
Returns
any
: returns cast value
field.test_value(self, value, constraints=True)
Test whether value is compliant to the field.
Arguments
- value (any): value to cast against field
- constraints (bool/str[]): constraints configuration
Returns
bool
: returns if value is compliant to the field
Storage(self, **options)
Storage factory/interface
For users
Use
Storage.connect
to instantiate a storage
For instantiation of concrete storage instances,
tableschema.Storage
provides a unified factory method connect
(which uses the plugin system under the hood):
# pip install tableschema_sql
from tableschema import Storage
storage = Storage.connect('sql', **options)
storage.create('bucket', descriptor)
storage.write('bucket', rows)
storage.read('bucket')
For integrators
The library includes interface declaration to implement tabular Storage
.
This interface allow to use different data storage systems like SQL
with tableschema.Table
class (load/save) as well as on the data package level:
An implementor must follow tableschema.Storage
interface
to write his own storage backend. Concrete storage backends
could include additional functionality specific to conrete storage system.
See plugins
below to know how to integrate custom storage plugin into your workflow.
Return list of storage bucket names.
A bucket
is a special term which has almost the same meaning as table
.
You should consider bucket
as a table
stored in the storage
.
Raises
exceptions.StorageError
: raises on any error
Returns
str[]
: return list of bucket names
storage.connect(name, **options)
Create tabular storage
based on storage name.
This method is statis:
Storage.connect()
Arguments
- name (str): storage name like
sql
- options (dict): concrete storage options
Raises
StorageError
: raises on any error
Returns
Storage
: returns Storage
instance
storage.create(self, bucket, descriptor, force=False)
Create one/multiple buckets.
Arguments
- bucket (str/list): bucket name or list of bucket names
- descriptor (dict/dict[]): schema descriptor or list of descriptors
- force (bool): whether to delete and re-create already existing buckets
Raises
exceptions.StorageError
: raises on any error
storage.delete(self, bucket=None, ignore=False)
Delete one/multiple/all buckets.
Arguments
- bucket (str/list/None): bucket name or list of bucket names to delete.
If
None
, all buckets will be deleted - descriptor (dict/dict[]): schema descriptor or list of descriptors
- ignore (bool): don't raise an error on non-existent bucket deletion
Raises
exceptions.StorageError
: raises on any error
storage.describe(self, bucket, descriptor=None)
Get/set bucket's Table Schema descriptor
Arguments
- bucket (str): bucket name
- descriptor (dict/None): schema descriptor to set
Raises
exceptions.StorageError
: raises on any error
Returns
dict
: returns Table Schema descriptor
storage.iter(self, bucket)
Return an iterator of typed values based on the schema of this bucket.
Arguments
- bucket (str): bucket name
Raises
exceptions.StorageError
: raises on any error
Returns
list[]
: yields data rows
storage.read(self, bucket)
Read typed values based on the schema of this bucket.
Arguments
- bucket (str): bucket name Raises
exceptions.StorageError
: raises on any error Returns
list[]
: returns data rows
storage.write(self, bucket, rows)
This method writes data rows into storage
.
It should store values of unsupported types as strings internally (like csv does).
Arguments
- bucket (str): bucket name
- rows (list[]): data rows to write
Raises
exceptions.StorageError
: raises on any error
validate(descriptor)
Validate descriptor
Arguments
- dict: descriptor
Raises
ValidationError
: on validation errors
Returns
bool
: True
infer(source, headers=1, limit=100, confidence=0.75, missing_values=[''], **options)
Infer source schema.
Arguments
- source (any): source as path, url or inline data
- headers (int/str[]): headers rows number or headers list
- confidence (float): how many casting errors are allowed (as a ratio, between 0 and 1)
- missing_values (str[]): list of missing values (by default
['']
)
Raises
TableSchemaException
: raises any error that occurs during the process
Returns
dict
: returns schema descriptor
FailedCast(self, value)
Wrap an original data field value that failed to be properly casted.
FailedCast allows for further processing/yielding values but still be able to distinguish uncasted values on the consuming side.
Delegates attribute access and the basic rich comparison methods to the underlying object. Supports default user-defined classes hashability i.e. is hashable based on object identity (not based on the wrapped value).
Arguments
- value (any): value
DataPackageException(self, message, errors=[])
Base class for all DataPackage/TableSchema exceptions.
If there are multiple errors, they can be read from the exception object:
try:
# lib action
except DataPackageException as exception:
if exception.multiple:
for error in exception.errors:
# handle error
List of nested errors
Returns
DataPackageException[]
: list of nested errors
Whether it's a nested exception
Returns
bool
: whether it's a nested exception
TableSchemaException(self, message, errors=[])
Base class for all TableSchema exceptions.
LoadError(self, message, errors=[])
All loading errors.
ValidationError(self, message, errors=[])
All validation errors.
CastError(self, message, errors=[])
All value cast errors.
IntegrityError(self, message, errors=[])
All integrity errors.
UniqueKeyError(self, message, errors=[])
Unique key constraint violation (CastError subclass)
RelationError(self, message, errors=[])
All relations errors.
UnresolvedFKError(self, message, errors=[])
Unresolved foreign key reference error (RelationError subclass).
StorageError(self, message, errors=[])
All storage errors.
The project follows the Open Knowledge International coding standards.
Recommended way to get started is to create and activate a project virtual environment. To install package and development dependencies into active environment:
$ make install
To run tests with linting and coverage:
$ make test
Here described only breaking and the most important changes. The full changelog and documentation for all released versions can be found in the nicely formatted commit history.
- Added
missing_values
argument to theinfer
function (#269)
- Support optional custom exception handling for table.iter/read (#259)
- Added
preserve_missing_values
parameter tofield.cast_value
- Added an ability to check table's integrity while reading
- Implemented the
table.size
andtable.hash
properties
- Added
table.index_foreign_keys_values
and improved foreign key checks performance
- Added
field.schema
property
- In
strict
mode raise an exception if there are problems in field construction
- Allow providing custom guesser and resolver to schema infer
- Added
schema.update_field
method
- Support datetime with no time for date casting
- Support floats like 1.0 for integer casting
- Added the
confidence
parameter toinfer
- The library has been rebased on the Frictionless Data specs v1 - https://frictionlessdata.io/specs/table-schema/