xinali/articles

一个简单多进程且易于使用的传统fuzzer

xinali opened this issue · 1 comments

使用cdb作为进程debugger,效率一般,但可以满足特定需求,目前在写自己的windows debugger

#encoding:utf-8

# ====================================================
# python: 3.5+
# 利用cdb作为debugger的一个简单传统fuzzer
# 执行: python3 xfuzzer.py -h 查看帮助选项
# 产生一个文件: 
#     uniq_crash_type_YYYY-mm-dd.txt  => 所有crash type结果

# type是根据call stack 最后两个调用函数划分
# fuzz_output_post_fix用于标记不同fuzz程序的结果文件

# example:
# "C:\Users\dt\AppData\Local\Programs\Python\Python37\python3.exe" xfuzzer.py -f H:\fuzzing\FuzzPdf\FuzzPdf.exe -o pdf -d H:\pdf_corpus -s H:\poc.pdf
# "C:\Users\dt\AppData\Local\Programs\Python\Python37\python3.exe" xfuzzer.py -f H:\fuzzing\FuzzPdf\FuzzPdf.exe -o pdf_xfuzz -d H:\pdf_handled -c H:\pdf_corpus_mutate -s H:\fuzz_cache\poc.pdf
# ====================================================

import sys
import glob
import subprocess
import os
import datetime
import signal
from time import monotonic as timer
import argparse
from threading import Timer
import time
from  multiprocessing import Process,Pool, Manager
import signal
import shutil
import logging 

kill = lambda process: process.kill()

# 调试器位置
debugger = "C:\\Program Files (x86)\\Windows Kits\\10\Debuggers\\x64\\cdb.exe"
# 结果存储文件
uniq_crash_type_file = 'uniq_crash_type_{}'.format((datetime.datetime.now()).strftime("%Y-%m-%d"))
options = ''


class Logger(object):  
    def __init__(self):  
        """ 
        initial 
        """  
        # log_path = logPath  
        logging.addLevelName(20, "NOTICE:")  
        logging.addLevelName(30, "WARNING:")  
        logging.addLevelName(40, "FATAL:")  
        logging.addLevelName(50, "FATAL:")  
        logging.basicConfig(level=logging.DEBUG,  
                format="%(levelname)s %(asctime)s %(filename)s %(message)s",  
                datefmt="%Y-%m-%d %H:%M:%S",  
                filename='handle.log',  
                filemode="a")  
        console = logging.StreamHandler()  
        console.setLevel(logging.DEBUG)  
        formatter = logging.Formatter("%(levelname)s %(asctime)s %(filename)s %(message)s")  
        console.setFormatter(formatter)  
        logging.getLogger("").addHandler(console)  
  
    def debug(self, msg=""):  
        """ 
        output DEBUG level LOG 
        """  
        logging.debug(str(msg))  
  
    def info(self, msg=""):  
        """ 
        output INFO level LOG 
        """  
        logging.info(str(msg))  
  
    def warning(self, msg=""):  
        """ 
        output WARN level LOG 
        """  
        logging.warning(str(msg))  
  
    def exception(self, msg=""):  
        """ 
        output Exception stack LOG 
        """  
        logging.exception(str(msg))  
  
    def error(self, msg=""):  
        """ 
        output ERROR level LOG 
        """  
        logging.error(str(msg))  
  
    def critical(self, msg=""):  
        """ 
        output FATAL level LOG 
        """  
        logging.critical(str(msg))
logger = Logger()


# 根据调用栈最后两个函数判定类型 (Windows)
def get_crash_type_win32(stdout):
     # 获取到crash type
    i = 0
    stdout_lines = stdout.decode().split('\n')
    crash_type = None
    for crash_data in stdout_lines:
        if "RetAddr" in crash_data:
            break
        i += 1
    
    if i == len(stdout_lines):
        return None
    
    i += 1
    # 得出最后两个调用栈作为划分类型
    if len(stdout_lines) > i and len(stdout_lines[i].split(' ')) >= 3:
        crash_type = stdout_lines[i].split(' ')[2]
    if len(stdout_lines) > i+1 and len(stdout_lines[i+1].split(' ')) >= 3:
        crash_type += ' -> ' + stdout_lines[i+1].split(' ')[2]
    return crash_type


# 将crash_types写入文件
def handle_crash_types(crash_types):
    fp = open(uniq_crash_type_file, 'w')
    for crash_type in crash_types:
        if not crash_type:
            continue
        fp.write(crash_type + ':\n')
        for crash_file in crash_types[crash_type]:
            fp.write("   [+] {}\n".format(crash_file))
    fp.close()


def prepare_data(corpus_dir):
    files = []
    files.extend(glob.glob(corpus_dir+'/*', recursive=True))
    if len(files) == 0:
        return None
    return files


def handle_result(output_queue):
    global uniq_crash_type_file
    uniq_crash_type_file = uniq_crash_type_file+'_{}.txt'.format(options.output_post_fix)

    print ('total output result:' + str(output_queue.qsize()))
    crash_types = {}
    while not output_queue.empty():
        data = output_queue.get()
        crash_type = data[0] # crash/timeout
        input_file = data[1]
        stdout = data[2]

        if crash_type == 'timeout':
            # 添加timeout文件
            if 'timeout' not in crash_types.keys():
                crash_types['timeout'] = [input_file]
            else:
                crash_types['timeout'].append(input_file)
        else:
            crash_type = get_crash_type_win32(stdout)
            if crash_type == None:
                continue
            if crash_type not in crash_types.keys():
                crash_types[crash_type] = [input_file]
            else:
                crash_types[crash_type].append(input_file)

    handle_crash_types(crash_types)


def fuzz(input_queue, output_queue, fuzz_program, specific_file_arg, dst_dir):
    while not input_queue.empty():
        data = input_queue.get()
        index = data[0]
        input_file = data[1]
        start_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
        logger.info('[start: {}] handling {} {}'.format(start_time, index, input_file)) 

        # 将crash文件数据写入特定文件
        specific_file = None
        if specific_file_arg:
            file_name, file_ext = os.path.splitext(specific_file_arg)
            specific_file = '{}_{}{}'.format(file_name, str(os.getpid()), file_ext)
            with open(input_file, 'rb') as f:
                fp_specific = open(specific_file, 'wb')
                crash_file_data = f.read()
                fp_specific.write(crash_file_data)
                fp_specific.close()
        else:
            specific_file = input_file
        stdout = None
        stderr = None
        crash_type = None
        crash_types = {}
        # 处理windows
        command = [debugger, 
                        "-y", 
                        "srv*c:\symbols*https://msdl.microsoft.com/download/symbols", 
                        "-c", 
                        "g;kp 10;q",
                        fuzz_program,
                        specific_file]
        process_out = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        # timeout code by jfs https://stackoverflow.com/questions/36952245/subprocess-timeout-failure
        try:
            stdout, stderr = process_out.communicate(timeout=10)
        except subprocess.TimeoutExpired:
            # os.kill(process_out.pid, signal.SIGINT) # send signal to the process group
            subprocess.call(['taskkill', '/F', '/T', '/PID', str(process_out.pid)]) 
            stdout, stderr = process_out.communicate()
            output_queue.put(('timeout', input_file, None))

        output_queue.put(('crash', input_file, stdout))
        crash_type = get_crash_type_win32(stdout)
        if crash_type != None:
            logger.info(crash_type)
        file_basename = os.path.basename(input_file)
        dst_file = os.path.join(dst_dir, file_basename)
        if os.path.exists(dst_file):
            logger.info(dst_file + ' exists!')
        else:
            shutil.move(input_file, dst_file)


def init_fuzzer():
    """
    Pool worker initializer for keyboard interrupt on Windows
    """
    signal.signal(signal.SIGINT, signal.SIG_IGN)


def main():

    global options
    files = []
    if options.corpus_dir:
        files = prepare_data(options.corpus_dir)
    else:
        logger.error("No corpus directory")
    if not files:
        logger.error('No corpus files')
        exit(1)
    logger.info("total files: " + str(len(files)))

    # 输出已经处理corpus目录
    if not os.path.exists(options.output_directory):
        os.mkdir(options.output_directory)
    
    manager = Manager()
    input_queue = manager.Queue()
    output_queue = manager.Queue()
    count = 0
    for file_name in files:
        input_queue.put((count, file_name))
        count += 1

    # 准备四个进程 
    logger.info('start process...')
    pool = Pool(4, init_fuzzer)
    worker_num = 60
    fuzzer_threads = []
    for i in range(worker_num):
        result = pool.apply_async(fuzz, args=(input_queue,
                                           output_queue, 
                                           options.fuzz_program, 
                                           options.specific_file, 
                                           options.output_directory)
                                )
        fuzzer_threads.append(result)
    
    try:
        [fuzzer.get() for fuzzer in fuzzer_threads]
    except:
        logger.exception("User enter keyboard interupted!")
        pool.terminate()
        pool.join()

    # write result to file
    logger.info('write data to file...')
    handle_result(output_queue)


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("-f", "--fuzz_program", help="fuzz program", required=True)
    parser.add_argument("-c", "--corpus_dir", help="fuzz corpus directory", required=True)
    parser.add_argument("-d", "--output_directory", help="fuzz output directory(absolute path)", required=True)
    parser.add_argument("-o", "--output_post_fix", help="output file post fix")
    parser.add_argument("-s", "--specific_file", help="write crash file to a specific file")
    options = parser.parse_args()
    main()

功能过于简单,已放弃,目前已经用rust重写,自定义debugger/mutator,丰富变异算法