gildas-lormeau/zip.js

Feature: ZipReader and ZipWriter as Streams via .pipeThrough

BlackAsLight opened this issue · 2 comments

Hello, while this project does offer the services I am in need of, it doesn't quite offer it in the way I'd like to consume it. Specifically when dealing with streams. I very much like to stream in zip files from external services, unzip them in memory, modify the contents and rezip them to save, all without having to write any step, but the final if I so choose, to the disk.

I did come up with two working classes to wrap the existing ZipReader and ZipWriter classes that are instead TransformStreams so one can place them inside a readable.pipeThrough and I've found that they work quite well. I do hope that you'd consider adding these classes or something similar to achieve the same effect.

Wrapper for ZipReader

import { Entry, ZipReader, ZipReaderConstructorOptions } from 'https://deno.land/x/zipjs@v2.7.34/index.js'

export class ZipDecompressStream<T> {
	readable: ReadableStream<Omit<Entry, 'getData'> & { readable?: ReadableStream<Uint8Array> }>
	writable: WritableStream<T>
	constructor (options?: ZipReaderConstructorOptions) {
		const { readable, writable } = new TransformStream<T, T>()
		const gen = new ZipReader(readable, options).getEntriesGenerator()
		this.readable = new ReadableStream({
			async pull(controller) {
				const { done, value } = await gen.next()
				if (done)
					return controller.close()
				const chunk = {
					...value,
					readable: (function () {
						const { readable, writable } = new TransformStream<Uint8Array, Uint8Array>()
						if (value.getData) {
							value.getData(writable)
							return readable
						}
					})()
				}
				delete chunk.getData
				controller.enqueue(chunk)
			}
		})
		this.writable = writable
	}
}

Wrapper for ZipWriter

import { ZipWriter, ZipWriterConstructorOptions } from 'https://deno.land/x/zipjs@v2.7.34/index.js'

export class ZipCompressStream<T> {
	readable: ReadableStream<Uint8Array>
	writable: WritableStream<T>
	constructor (path: string, options?: ZipWriterConstructorOptions) {
		const { readable, writable } = new TransformStream<T, Uint8Array>()
		const zipWriter = new ZipWriter(writable, options) // Out-Bound Compressed
		zipWriter.add(path, (() => {
			const { readable, writable } = new TransformStream<T, T>({
				flush() {
					zipWriter.close()
				}
			})
			this.writable = writable // In-Bound Uncompressed
			return readable // In-Bound Uncompressed
		})())
		this.readable = readable // Out-Bound Uncompressed
	}
}

Example

This working example just streams in a zip file from the web, decompresses it and then recompresses it before saving it to disk.

// Required Setup
import { DOMParser } from 'https://deno.land/x/deno_dom@v0.1.43/deno-dom-wasm.ts'

const url = 'https://politicsandwar.com/data/trades/trades-'
	+ new DOMParser()
		.parseFromString(await (await fetch('https://politicsandwar.com/data/trades/?C=M;O=A')).text(), 'text/html')!
		.querySelector('a[href*="trade"]')!
		.getAttribute('href')!
		.slice(7, -8)
	+ '.csv.zip'

// Working Example
for await (const entry of (await fetch(url)).body!.pipeThrough(new ZipDecompressStream()))
	if (entry.readable) 
		entry.readable
			.pipeThrough(new ZipCompressStream(entry.filename))
			.pipeTo((await Deno.create(entry.filename + '.zip')).writable)

It should be noted that with ZipDecompressStream, if the developer doesn't completely consume the readable stream or call .cancel() (or use a method that would call .cancel() like a for await of) on it then it will hang around in memory forever, even if it goes out of scope.

import { CsvParseStream } from 'https://jsr.io/@std/csv/0.216.0/csv_parse_stream.ts'

for await ( const entry of (await fetch(url)).body!.pipeThrough(new ZipDecompressStream()))
	if (entry.readable) {
		const readable = entry.readable
			.pipeThrough(new TextDecoderStream())
			.pipeThrough(new CsvParseStream())
		console.log(await readable[ Symbol.asyncIterator ]().next())
	}

Thank you very much for the suggestion. I had thought about it but had left the task. What bothered me is that if you set the option bufferedWrite to true when using ZipWriter and cancel the compression of an entry in the ZIP (with the signal option), an AbortError exception is thrown (as expected) but it should not interrupt the creation of the ZIP file ideally. The same kind of issue exists with duplicated filenames in the ZIP. With streams, this is not possible.

However, when there is no such need, your implementations can be very useful. I'm willing to integrate the code into the library, do you want me to take care of it or do you want to make a PR?

I would prefer to make a pull request as then I get that contributor status, but I'm not 100% sure what all the changes one would need to make. Looking at the repo, I'm guessing the two new files would be under lib/core/ and I'm guessing one is manually updating the index.d.ts with the correct types, but I'm not sure what imports and exports I need to do and where to make it propagate to the top.

On a side note I did also realise that the ZipCompressStream was limited to only compressing one document in a zip file, and since zip files do support more so I managed to come up with another class that would allow such a thing

export class MultiZipCompressStream {
	#readable: ReadableStream<Uint8Array>
	#zipWriter: ZipWriter<unknown>
	constructor (options?: ZipWriterConstructorOptions) {
		const { readable, writable } = new TransformStream<Uint8Array, Uint8Array>()
		this.#readable = readable
		this.#zipWriter = new ZipWriter(writable, options)
	}

	get readable() {
		return this.#readable
	}

	writable<T>(path: string) {
		const { readable, writable } = new TransformStream<T, T>()
		this.#zipWriter.add(path, readable)
		return writable
	}

	close(comment?: Uint8Array, options?: ZipWriterCloseOptions) {
		return this.#zipWriter.close(comment, options)
	}
}
const zipper = new MultiZipCompressStream()
zipper.readable.pipeTo((await Deno.create('Archive.zip')).writable)
for (let i = 0; i < Math.min(urls.length, 2); ++i)
	for await (const entry of (await fetch(urls[ i ])).body!.pipeThrough(new ZipDecompressStream()))
		if (entry.readable) {
			entry.readable.pipeTo(zipper.writable(entry.filename))
		}
zipper.close()

Based off these changes I decided to merge the two into one class and came up with this

export class ZipCompressStream {
	#readable: ReadableStream<Uint8Array>
	#zipWriter: ZipWriter<unknown>
	constructor (options?: ZipWriterConstructorOptions) {
		const { readable, writable } = new TransformStream<Uint8Array, Uint8Array>()
		this.#readable = readable
		this.#zipWriter = new ZipWriter(writable, options)
	}

	get readable() {
		return this.#readable
	}

	transform<T>(path: string) {
		const { readable, writable } = new TransformStream<T, T>({
			flush: () => {
				this.#zipWriter.close()
			}
		})
		this.#zipWriter.add(path, readable)
		return { readable: this.#readable, writable }
	}

	writable<T>(path: string) {
		const { readable, writable } = new TransformStream<T, T>()
		this.#zipWriter.add(path, readable)
		return writable
	}

	close(comment?: Uint8Array, options?: ZipWriterCloseOptions) {
		return this.#zipWriter.close(comment, options)
	}
}

If zipping multiple files, or really, if using the writable function instead of the transform function then one must call close when one no longer wants to append more otherwise the end product is corrupt.

// Zipping two files together
const zipper = new ZipCompressStream()
zipper.readable.pipeTo((await Deno.create('Archive.zip')).writable)
for (let i = 0; i < Math.min(urls.length, 2); ++i)
	for await (const entry of (await fetch(urls[ i ])).body!.pipeThrough(new ZipDecompressStream()))
		if (entry.readable)
			entry.readable.pipeTo(zipper.writable(entry.filename))
zipper.close()

// Zipping a single file
for await (const entry of (await fetch(urls[ 2 ])).body!.pipeThrough(new ZipDecompressStream()))
	if (entry.readable)
		entry.readable
			.pipeThrough(new ZipCompressStream().transform(entry.filename))
			.pipeTo((await Deno.create(entry.filename + '.zip')).writable)

On another side note, calling .return() on an async iterator seems to be a good method for cleaning up the Zip if you want to exit early and aren't inside a for await...of loop

for await (const entry of (await fetch(url)).body!.pipeThrough(new ZipDecompressStream()))
	if (entry.readable) {
		console.log(entry.filename)
		const iter = entry.readable[ Symbol.asyncIterator ]()
		console.log(await iter.next())
		console.log(await iter.return!())
	}