Treasure Data API library for Python
td-client
supports the following versions of Python.
- Python 3.5+
- PyPy
You can install the releases from PyPI.
$ pip install td-client
It'd be better to install certifi to enable SSL certificate verification.
$ pip install certifi
Please see also the examples at Treasure Data Documentation.
The td-client documentation is hosted at https://tdclient.readthedocs.io/, or you can go directly to the API documentation.
For information on the parameters that may be used when reading particular types of data, see File import parameters.
Treasure Data API key will be read from environment variable TD_API_KEY
, if none is given via apikey=
argument passed to tdclient.Client
.
Treasure Data API endpoint https://api.treasuredata.com
is used by default. You can override this with environment variable TD_API_SERVER
, which in turn can be overridden via endpoint=
argument passed to tdclient.Client
. List of available Treasure Data sites and corresponding API endpoints can be found here.
import tdclient
with tdclient.Client() as td:
for job in td.jobs():
print(job.job_id)
Running jobs on Treasure Data.
import tdclient
with tdclient.Client() as td:
job = td.query("sample_datasets", "SELECT COUNT(1) FROM www_access", type="hive")
job.wait()
for row in job.result():
print(repr(row))
td-client-python implements PEP 0249 Python Database API v2.0. You can use td-client-python with external libraries which supports Database API such like pandas.
import pandas
import tdclient
def on_waiting(cursor):
print(cursor.job_status())
with tdclient.connect(db="sample_datasets", type="presto", wait_callback=on_waiting) as td:
data = pandas.read_sql("SELECT symbol, COUNT(1) AS c FROM nasdaq GROUP BY symbol", td)
print(repr(data))
We offer another package for pandas named pytd with some advanced features. You may prefer it if you need to do complicated things, such like exporting result data to Treasure Data, printing job's progress during long execution, etc.
Importing data into Treasure Data in streaming manner, as similar as fluentd is doing.
import sys
import tdclient
with tdclient.Client() as td:
for file_name in sys.argv[:1]:
td.import_file("mydb", "mytbl", "csv", file_name)
Warning
Importing data in streaming manner requires certain amount of time to be ready to query since schema update will be executed with delay.
Importing data into Treasure Data in batch manner.
import sys
import tdclient
import uuid
import warnings
if len(sys.argv) <= 1:
sys.exit(0)
with tdclient.Client() as td:
session_name = "session-{}".format(uuid.uuid1())
bulk_import = td.create_bulk_import(session_name, "mydb", "mytbl")
try:
for file_name in sys.argv[1:]:
part_name = "part-{}".format(file_name)
bulk_import.upload_file(part_name, "json", file_name)
bulk_import.freeze()
except:
bulk_import.delete()
raise
bulk_import.perform(wait=True)
if 0 < bulk_import.error_records:
warnings.warn("detected {} error records.".format(bulk_import.error_records))
if 0 < bulk_import.valid_records:
print("imported {} records.".format(bulk_import.valid_records))
else:
raise(RuntimeError("no records have been imported: {}".format(bulk_import.name)))
bulk_import.commit(wait=True)
bulk_import.delete()
If you want to import data as msgpack format, you can write as follows:
import io
import time
import uuid
import warnings
import tdclient
t1 = int(time.time())
l1 = [{"a": 1, "b": 2, "time": t1}, {"a": 3, "b": 9, "time": t1}]
with tdclient.Client() as td:
session_name = "session-{}".format(uuid.uuid1())
bulk_import = td.create_bulk_import(session_name, "mydb", "mytbl")
try:
_bytes = tdclient.util.create_msgpack(l1)
bulk_import.upload_file("part", "msgpack", io.BytesIO(_bytes))
bulk_import.freeze()
except:
bulk_import.delete()
raise
bulk_import.perform(wait=True)
# same as the above example
The td-client
package will generally make sensible choices on how to read
the columns in CSV and TSV data, but sometimes the user needs to override the
default mechanism. This can be done using the optional file import
parameters dtypes
and converters
.
For instance, consider CSV data that starts with the following records:
time,col1,col2,col3 1575454204,a,0001,a;b;c 1575454204,b,0002,d;e;f
If that data is read using the defaults, it will produce values that look like:
1575454204, "a", 1, "a;b;c"
1575454204, "b", 2, "d;e;f"
that is, an integer, a string, an integer and another string.
If the user wants to keep the leading zeroes in col2
, then they can
specify the column datatype as string. For instance, using
bulk_import.upload_file
to read data from input_data
:
bulk_import.upload_file(
"part", "msgpack", input_data,
dtypes={"col2": "str"},
)
which would produce:
1575454204, "a", "0001", "a;b;c"
1575454204, "b", "0002", "d;e;f"
If they also wanted to treat col3
as a sequence of strings, separated by
semicolons, then they could specify a function to process col3
:
bulk_import.upload_file(
"part", "msgpack", input_data,
dtypes={"col2": "str"},
converters={"col3", lambda x: x.split(";")},
)
which would produce:
1575454204, "a", "0001", ["a", "b", "c"]
1575454204, "b", "0002", ["d", "e", "f"]
Run tests.
$ python setup.py test
You can run tests against all supported Python versions. I'd recommend you to install pyenv to manage Pythons.
$ pyenv shell system
$ for version in $(cat .python-version); do [ -d "$(pyenv root)/versions/${version}" ] || pyenv install "${version}"; done
$ pyenv shell --unset
Install tox.
$ pip install tox
Then, run tox
.
$ tox
Create and push a tag to GitHub, then creating a Release on GitHub will publish new version to PyPI.
If you want to release manually, you can upload by twine.
$ python setup.py bdist_wheel sdist
$ twine upload dist/*
Apache Software License, Version 2.0