API assistance?
Opened this issue · 3 comments
I'm trying to use the unblob API for what I think should be a fairly straightforward task but I'm having some difficulties and hoping to get some help. I haven't found many examples of API usage so I'm hoping this issue might also help other users get started with the API from the code I have and learn from my mistakes.
My goal here is to use unblob to do a recursive extraction of a blob but to produce a clean copy of each extraction without any sub-extractions (i.e., have no _extract
files within any of my output directories). Instead I want each of the extractions to be stored one directory deep within an output directory (e.g., output/extraction1, output/extraction2). I've previously implemented something like this by just running unblob then parsing the generated outputs looking for files named *_extract
but I think it should be much cleaner to do this with the API.
I've written the code below which successfully logs a lot of information about the extraction process and almost gets me what I want, but I find that extraction files sometimes still end up in my output so I suspect I'm doing something wrong or missing something obvious here.
I have 3 specific questions, but any advice or guidance would be much appreciated! Thanks
- Am I missing a much easier way to do this with the API?
- Are
blob_id
values supposed to be unique per blob? It seems like the same blob_id will show up with distinct paths for example if a blob is carved into 2 files, both the base blob and the 2 generated files will have the same blob_id. Am I just misunderstanding this interface? - Are there examples of API usage somewhere?
Example usage after installing unblob dependencies, unblob itself, and saving the below script as extracator.py
wget https://dlcdnets.asus.com/pub/ASUS/wireless/RT-N66U_C1/FW_RT_N66U_C1_300438510000.zip
python3 extractor.py FW_RT_N66U_C1_300438510000.zip output
In the generated output directory I see one of the extracted directories contains two _extract
directories
$ ls output/extracted/*
output/extracted/00dd3c22e77b9382941bf19afb8370b1897535fb.unblob:
Firmware_Release
output/extracted/31c56af333e9f4652626f6e0e10418e27dd1af33.unblob:
part0 part0_extract part1 part1_extract
output/extracted/a0f5794c735f16dd4a7b12042ef23e1d95dcb134.unblob:
bin cifs1 cifs2 dev etc home jffs lib media mmc mnt opt proc rom root sbin sys sysroot tmp usr var www
output/extracted/e008abdad83a71b718083b949b91dafaf071b3c5.unblob:
lzma.uncompressed
It's almost right, but the part0_extract
and part1_extract
directories within output/extracted/31c56af333e9f4652626f6e0e10418e27dd1af33.unblob
shouldn't be there!
import os
import sys
import multiprocessing
import shutil
import hashlib
import ipdb
import subprocess
from pathlib import Path
from typing import Dict, List
from unblob.processing import ExtractionConfig, process_file
from unblob.logging import configure_logger
from unblob import report
def sha1sum_file(filename):
sha1 = hashlib.sha1() # XXX: if minimum python version >= 3.9, pass usedforsecurity=False
buf = bytearray(128*1024)
bufview = memoryview(buf)
with open(filename, 'rb', buffering=0) as f:
for n in iter(lambda : f.readinto(bufview), 0):
sha1.update(bufview[:n])
return sha1.hexdigest()
def extract(filesystem: Path, outdir: Path):
'''
Given a filesystem blob, extract it into outdir. Return a dictionary
mapping a unique ID for each extracted blob to (ID of source file, source file, extracted directory)
Note that one ID may be an empty string (e.g., for the input file).
'''
configure_logger(0, outdir, Path('unblob.log'))
unblob_results = process_file(
ExtractionConfig(extract_root=outdir / 'unblob.root',
entropy_depth=0,
max_depth=3,
verbose=False,
keep_extracted_chunks=True),
filesystem)
known_tasks = {} # task_id -> [task_file, subtask.path, [files_in_subtask_path]]
for task_result in unblob_results.results:
task_file = task_result.task.path
task_id = task_result.task.blob_id
for subtask in task_result.subtasks:
if subtask.blob_id not in known_tasks:
# XXX: We'll see the same subtask.blob_id for each time we extract more data from a blob. E.g., we could have
# blob_id=1 for foo.zip which has a subtask.path of foo.zip_extracted. Then we'll see blob_id=1 for foo.zip_extracted/nextfile
# because the nextfile is derived from foo.zip.
known_tasks[subtask.blob_id] = [task_file, subtask.path, [subtask.path]]
#print(f"New blob_id in subtask with ID={subtask.blob_id}\n\tExtracted from {task_file} (ID={task_id})\n\tResults at {subtask.path}")
else:
known_tasks[subtask.blob_id][2].append(subtask.path)
#print(f"Duplicate blob_id with ID={subtask.blob_id}\n\tExtracted from {task_file} (ID={task_id})\n\tResults at {subtask.path}")
return known_tasks
def package(known_tasks, output_dir):
'''
We want to create a clean set of directories in outdir.
For each extraction we want to create
outdir/[sha1sum_of_blob].[unblob]/
'''
# Create results directories. output/blobs/<sha1sum> and output/extracted/<sha1sum>.<extractor>
blob_dir = output_dir / 'blobs'
blob_dir.mkdir()
extracted_dir = output_dir / 'extracted'
extracted_dir.mkdir()
# Copy each blob to the blob directory
for task_id, info in known_tasks.items():
input_hash = sha1sum_file(info[0])
shutil.copy(info[0], blob_dir / input_hash)
# Copy each extraction to the extracted directory
# Identify all extraction directories that were created (so we don't copy later)
extraction_dirs = set()
for task_id, info in known_tasks.items():
extraction_dirs.add(info[1])
for task_id, info in known_tasks.items():
input_hash = sha1sum_file(info[0])
out_dir = extracted_dir / f"{input_hash}.unblob"
# We want to copy everything from info[1] to out_dir but exclude any directories in extraction_dirs
# First copy everything - we'll use CP over shutil to handle weird files
subprocess.check_output(['cp', '-r', info[1], out_dir])
# Now for each file in extraction_dirs (except info[1]), delete
for extraction_dir in extraction_dirs:
if extraction_dir == info[1]:
continue
# Rewrite path to be relative to out_dir
try:
relative_path = Path(extraction_dir).relative_to(info[1])
except ValueError:
# Not a subpath, skip
continue
# Ensure output is not-empty and within the output directory
if not len(str(out_dir / relative_path)) or not str(out_dir / relative_path).startswith(str(out_dir)):
print("XXX skipping rm of unsafe path", out_dir / relative_path)
continue
subprocess.check_output(['rm', '-rf', str(out_dir / relative_path)])
def extract_and_package(firmware, output_dir):
# Initial extraction
known_tasks = extract(firmware, output_dir)
# Log extraction results
for task_id, info in known_tasks.items():
print(f"\nID {task_id}:")
print(f"\tPath: {info[0]} {sha1sum_file(info[0])}")
print(f"\tExtraction directory {info[1]}")
print(f"\tExtracted children:")
for subtask_path in info[2]:
print(f"\t - {subtask_path}")
package(known_tasks, output_dir)
if __name__ == '__main__':
firmware = Path(sys.argv[1])
if not firmware.exists():
print(f"File {firmware} does not exist")
os.exit(1)
output_dir = Path(sys.argv[2])
if output_dir.exists():
print(f"Output directory {output_dir} already exists. Removing it.")
shutil.rmtree(output_dir)
output_dir.mkdir(parents=True)
extract_and_package(firmware, output_dir)
We'll have a look at it this week. Thanks for bringing this up, it's a good opportunity to improve our documentation.
- Am I missing a much easier way to do this with the API ?
Issue is on us, not you. part0
has chunk id 14692:1
but it does not show up in the JSON report structure because it's made of two different chunks with chunk ids 14693:1
and 14693:2
. The report is therefore missing a TaskResult
holding a task with path /tmp/out/FW_RT_N66U_C1_300438510000.zip_extract/Firmware_Release/RT-N66U_C1_3.0.0.4_385_10000-gd8ccd3c.trx_extract/part0_extract
. Since it's not there, it's not in known_tasks
and your package()
function is blind.
This issue is documented at #554, I'll see if we can prioritize it.
On another note, one improvement you can make is to use the HashReport
returned by unblob to get the file SHA1 instead of recomputing it. unblob computes MD5, SHA1, and SHA256 of processed files.
Something along those lines. I leave the hash_report usage to you.
--- extractor.py 2024-06-17 09:10:51.035069581 +0200
+++ extractor.py 2024-06-17 08:57:03.218147812 +0200
@@ -8,6 +8,7 @@
from pathlib import Path
from typing import Dict, List
from unblob.processing import ExtractionConfig, process_file
+from unblob.report import HashReport
from unblob.logging import configure_logger
from unblob import report
@@ -40,7 +41,7 @@
for task_result in unblob_results.results:
task_file = task_result.task.path
task_id = task_result.task.blob_id
-
+ hash_report = [report for report in task_result.reports if isinstance(report, HashReport)]
for subtask in task_result.subtasks:
if subtask.blob_id not in known_tasks:
# XXX: We'll see the same subtask.blob_id for each time we extract more data from a blob. E.g., we could have
- Are blob_id values supposed to be unique per blob? It seems like the same blob_id will show up with distinct paths for example if a blob is carved into 2 files, both the base blob and the 2 generated files will have the same blob_id. Am I just misunderstanding this interface ?
You can think of blob_id
as a kind of parent_id
, indicating to which blob a chunk belongs to. You can use the id
field from ChunkReport
to have a unique identifier for files that are extracted. With the file you're extracting, you get a ZIP file with chunk id 14688:1
that contains a trx file with id 14691:1
and blob_id 14688:1
. The trx file contains part0 holding two chunks with id 14693:1
and 14693:2
.
- Are there examples of API usage somewhere ?
Not at the moment. We have some auto-generated documentation at https://unblob.org/api/ but it's clearly insufficient.
Thanks so much! Really appreciate the guidance.