simonw/datasette-app-support

`/-/open-csv-from-url` endpoint

simonw opened this issue · 6 comments

Need a /-/open-csv-from-url endpoint.

Originally posted by @simonw in simonw/datasette-app#107 (comment)

Design:

POST /-/open-csv-from-url
{"url": "https://..."}

Returns 200 {"ok": true, "path": "/temporary/newtable"} if it works, 400 or 500 {"ok": false, "error": "Error message"} if it fails.

Efficiently sucking down a large CSV file over HTTP and writing rows to the database without blocking the server (or blocking the Datasette write queue) is a bit tricky. I'm going to base this implementation on https://github.com/simonw/datasette-import-table/blob/9825cab7c3e1cdd7bcfd0b21dda4a22bda601f88/datasette_import_table/__init__.py#L84-L127

First attempt at that:

class AsyncDictReader:
    def __init__(self, async_line_iterator, **kwargs):
        self.async_line_iterator = async_line_iterator
        self.buffer = io.BytesIO()
        self.reader = DictReader(
            io.TextIOWrapper(
                self.buffer,
                encoding=kwargs.pop('encoding', 'utf-8'),
                errors=kwargs.pop('errors', 'replace'),
            ), **kwargs,
        )
        self.line_num = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.line_num == 0:
            header = await self.async_line_iterator.__anext__()
            self.buffer.write(header.encode("latin-1"))

        line = await self.async_line_iterator.__anext__()

        if not line:
            raise StopAsyncIteration

        self.buffer.write(line.encode("latin-1"))
        self.buffer.seek(0)

        try:
            result = next(self.reader)
        except StopIteration as e:
            raise StopAsyncIteration from e

        self.buffer.seek(0)
        self.buffer.truncate(0)
        self.line_num = self.reader.line_num

        return result

url = "https://covid-19.datasettes.com/covid/us_census_county_populations_2019.csv?_stream=on&_size=max"
async with httpx.AsyncClient() as client:
    async with client.stream('GET', url) as response:
        async for row in AsyncDictReader(response.aiter_lines()):
            print(row)

Simpler version that takes advantage of aiter_lines() having decoded to unicode already:

class AsyncDictReader:
    def __init__(self, async_line_iterator):
        self.async_line_iterator = async_line_iterator
        self.buffer = io.StringIO()
        self.reader = DictReader(self.buffer)
        self.line_num = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.line_num == 0:
            header = await self.async_line_iterator.__anext__()
            self.buffer.write(header)

        line = await self.async_line_iterator.__anext__()

        if not line:
            raise StopAsyncIteration

        self.buffer.write(line)
        self.buffer.seek(0)

        try:
            result = next(self.reader)
        except StopIteration as e:
            raise StopAsyncIteration from e

        self.buffer.seek(0)
        self.buffer.truncate(0)
        self.line_num = self.reader.line_num

        return result