`/-/open-csv-from-url` endpoint
simonw opened this issue · 6 comments
Need a
/-/open-csv-from-urlendpoint.
Originally posted by @simonw in simonw/datasette-app#107 (comment)
Design:
POST /-/open-csv-from-url
{"url": "https://..."}
Returns 200 {"ok": true, "path": "/temporary/newtable"} if it works, 400 or 500 {"ok": false, "error": "Error message"} if it fails.
Efficiently sucking down a large CSV file over HTTP and writing rows to the database without blocking the server (or blocking the Datasette write queue) is a bit tricky. I'm going to base this implementation on https://github.com/simonw/datasette-import-table/blob/9825cab7c3e1cdd7bcfd0b21dda4a22bda601f88/datasette_import_table/__init__.py#L84-L127
This example looks useful too: https://github.com/mosquito/aiofile#async-csv-dict-reader
And https://www.python-httpx.org/async/#streaming-responses for Response.aiter_lines()
First attempt at that:
class AsyncDictReader:
def __init__(self, async_line_iterator, **kwargs):
self.async_line_iterator = async_line_iterator
self.buffer = io.BytesIO()
self.reader = DictReader(
io.TextIOWrapper(
self.buffer,
encoding=kwargs.pop('encoding', 'utf-8'),
errors=kwargs.pop('errors', 'replace'),
), **kwargs,
)
self.line_num = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.line_num == 0:
header = await self.async_line_iterator.__anext__()
self.buffer.write(header.encode("latin-1"))
line = await self.async_line_iterator.__anext__()
if not line:
raise StopAsyncIteration
self.buffer.write(line.encode("latin-1"))
self.buffer.seek(0)
try:
result = next(self.reader)
except StopIteration as e:
raise StopAsyncIteration from e
self.buffer.seek(0)
self.buffer.truncate(0)
self.line_num = self.reader.line_num
return result
url = "https://covid-19.datasettes.com/covid/us_census_county_populations_2019.csv?_stream=on&_size=max"
async with httpx.AsyncClient() as client:
async with client.stream('GET', url) as response:
async for row in AsyncDictReader(response.aiter_lines()):
print(row)Simpler version that takes advantage of aiter_lines() having decoded to unicode already:
class AsyncDictReader:
def __init__(self, async_line_iterator):
self.async_line_iterator = async_line_iterator
self.buffer = io.StringIO()
self.reader = DictReader(self.buffer)
self.line_num = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.line_num == 0:
header = await self.async_line_iterator.__anext__()
self.buffer.write(header)
line = await self.async_line_iterator.__anext__()
if not line:
raise StopAsyncIteration
self.buffer.write(line)
self.buffer.seek(0)
try:
result = next(self.reader)
except StopIteration as e:
raise StopAsyncIteration from e
self.buffer.seek(0)
self.buffer.truncate(0)
self.line_num = self.reader.line_num
return result