AFL++ Add Custom Mutator

本篇主要记录在 AFL++ 框架上添加 custom mutator 的过程

目前 custom mutator 支持的语言包括 C/C++, Python, Rust. 出于快速开发和便捷性的目的, 本次记录主要是 Python 语言的自定义变异器实现.

APIs

下面是所有 Python 的 APIs, 注释部分概括了 API 的逻辑功能.

几乎所有函数都是 [optional] 的, 不需要则都可以不实现, 即最小的 Python custom_mutator 为一个空文件.

def init(seed):
    '''
    [optional]
    This method is called when AFL++ starts up and is used to seed RNG and set up buffers and state.
    '''
    pass

def fuzz_count(buf):
    '''
    [optional]
    When a queue entry is selected to be fuzzed, afl-fuzz selects the number of fuzzing attempts with this input based on a few factors. If, however, the custom mutator wants to set this number instead on how often it is called for a specific queue entry, use this function. This function is most useful if AFL_CUSTOM_MUTATOR_ONLY is not used.
    '''
    return cnt

def splice_optout():
    '''
    [optional]
    If this function is present, no splicing target is passed to the fuzz function. This saves time if splicing data is not needed by the custom fuzzing function. This function is never called, just needs to be present to activate.
    '''
    pass

def fuzz(buf, add_buf, max_size):
    '''
    [optional]
    This method performs your custom mutations on a given input. The add_buf is the contents of another queue item that can be used for splicing - or anything else - and can also be ignored. If you are not using this additional data then define splice_optout (see above). This function is optional. Returning a length of 0 is valid and is interpreted as skipping this one mutation result. For non-Python: the returned output buffer is under your memory management!
    '''
    return mutated_out

def describe(max_description_length):
    '''
    [optional]
    When this function is called, it shall describe the current test case, generated by the last mutation. This will be called, for example, to name the written test case file after a crash occurred. Using it can help to reproduce crashing mutations.
    '''
    return "description_of_current_mutation"

def post_process(buf):
    '''
    [optional]
    For some cases, the format of the mutated data returned from the custom mutator is not suitable to directly execute the target with this input. For example, when using libprotobuf-mutator, the data returned is in a protobuf format which corresponds to a given grammar. In order to execute the target, the protobuf data must be converted to the plain-text format expected by the target. In such scenarios, the user can define the post_process function. This function is then transforming the data into the format expected by the API before executing the target.

This can return any python object that implements the buffer protocol and supports PyBUF_SIMPLE. These include bytes, bytearray, etc.

You can decide in the post_process mutator to not send the mutated data to the target, e.g. if it is too short, too corrupted, etc. If so, return a NULL buffer and zero length (or a 0 length string in Python).
    '''
    return out_buf

def init_trim(buf):
    '''
    [optional]
    This method is called at the start of each trimming operation and receives the initial buffer. It should return the amount of iteration steps possible on this input (e.g., if your input has n elements and you want to remove them one by one, return n, if you do a binary search, return log(n), and so on).

If your trimming algorithm doesn’t allow to determine the amount of (remaining) steps easily (esp. while running), then you can alternatively return 1 here and always return 0 in post_trim until you are finished and no steps remain. In that case, returning 1 in post_trim will end the trimming routine. The whole current index/max iterations stuff is only used to show progress.
    '''
    return cnt

def trim():
    '''
    [optional]
    This method is called for each trimming operation. It doesn’t have any arguments because there is already the initial buffer from init_trim and we can memorize the current state in the data variables. This can also save reparsing steps for each iteration. It should return the trimmed input buffer.
    '''
    return out_buf

def post_trim(success):
    '''
    [optional]
    This method is called after each trim operation to inform you if your trimming step was successful or not (in terms of coverage). If you receive a failure here, you should reset your input to the last known good state. In any case, this method must return the next trim iteration index (from 0 to the maximum amount of steps you returned in init_trim).
    '''
    return next_index

def havoc_mutation(buf, max_size):
    '''
    [optional]
    performs a single custom mutation on a given input. This mutation is stacked with other mutations in havoc.
    '''
    return mutated_out

def havoc_mutation_probability():
    '''
    [optional]
    returns the probability that havoc_mutation is called in havoc. By default, it is 6%.
    '''
    return probability # int in [0, 100]

def queue_get(filename):
    '''
    [optional]
    This method determines whether AFL++ should fuzz the current queue entry or not: all defined custom mutators as well as all AFL++’s mutators.
    '''
    return True

def fuzz_send(buf):
    '''
    [optional]
    This method can be used if you want to send data to the target yourself, e.g. via IPC. This replaces some usage of utils/afl_proxy but requires that you start the target with afl-fuzz.
    Setting AFL_CUSTOM_MUTATOR_LATE_SEND will call the afl_custom_fuzz_send() function after the target has been restarted.
    '''
    pass

def queue_new_entry(filename_new_queue, filename_orig_queue):
    '''
    [optional]
    This methods is called after adding a new test case to the queue. If the contents of the file was changed, return True, False otherwise.
    '''
    return False

def introspection():
    '''
    [optional]
    This method is called after a new queue entry, crash or timeout is discovered if compiled with INTROSPECTION. The custom mutator can then return a string (const char *) that reports the exact mutations used.
    '''
    return string

def deinit():  # optional for Python
    '''
    [optional]
    The last method to be called, deinitializing the state.
    '''
    pass

Usage

确保 $PATH 下有 python3-config, 编译时自动包含 python 特性

which python3-config
# /usr/bin/python3-config

make clean && make -j source-only

运行, 通过设置两个环境变量启动 Python Module: PYTHONPATH, AFL_PYTHON_MODULE

# set the path where to find module
export PYTHONPATH=`dirname /path/to/AFLplusplus/custom_mutators/examples/example.py`
# set the module name without ".py" suffix
export AFL_PYTHON_MODULE=example
afl-fuzz /path/to/program

Debug

如果 mutator 出错导致 fuzzer 崩溃, 可以使用命令启用 core dump. 再用 gdb 调试崩溃时的栈帧 (因为调用 Python Module 所以只用 gdb 不能逐行调试 python 代码, 但可以查看 fuzzer 本身的栈帧)

ulimit -c unlimited
gdb ../afl-fuzz core.xxx 

# or 
gdb --args ./afl-fuzz -m none -i <path_to_seeds> -o <path_to_out> -- ./target
> b (custom_mutators.c:)fuzz
> r
> bt

如果怀疑 fuzzer 本身出错, 可以先关闭 custom_mutator, 再运行 fuzzing

unset PYTHONPATH
unset AFL_PYTHON_MODULE

如果怀疑是 C 和 Python 交互接口处有 bug, 可以设置 GNUmakefile 的优化等级为 0

override CFLAGS += -g -Wno-pointer-sign -Wno-variadic-macros -Wall -Wextra -Wno-pointer-arith \
            -fPIC -O0 -I include/ -I include/cJSON -DAFL_PATH=\"$(HELPER_PATH)\"  \
            -DBIN_PATH=\"$(BIN_PATH)\" -DDOC_PATH=\"$(DOC_PATH)\"

然后重新编译, 并在 gdb 调试时在 load_custom_mutator_py 等函数处下断点

❯ gdb --args ../afl-fuzz -m none -i ../../aflpp_benchmarks/zlib/seeds -o ../../aflpp_benchmarks/zlib/out -- ../../aflpp_benchmarks/zlib/zlib_uncompress_fuzzer

GNU gdb (Ubuntu 12.1-0ubuntu1~22.04.2) 12.1
Copyright (C) 2022 Free Software Foundation, Inc.
...
Reading symbols from ../afl-fuzz...
------- tip of the day (disable with set show-tips off) -------
Use $base("heap") to get the start address of a [heap] memory page
pwndbg> b load_custom_mutator_py
Breakpoint 1 at 0x4abe2: file src/afl-fuzz-python.c, line 434.
pwndbg> r
...

然后逐步跟进, 查看具体是哪一行源代码发生崩溃.

References

[1] https://aflplus.plus/docs/custom_mutators/

[2] https://github.com/AFLplusplus/AFLplusplus/blob/stable/src/afl-fuzz-mutators.c

results matching ""

    No results matching ""