See More

import os import math import random import struct import hashlib from . import dictionary INTERESTING8 = [-128, -1, 0, 1, 16, 32, 64, 100, 127] INTERESTING16 = [0, 128, 255, 256, 512, 1000, 1024, 4096, 32767, 65535] INTERESTING32 = [0, 1, 32768, 65535, 65536, 100663045, 2147483647, 4294967295] # A list of all the mutator clases we have available mutator_classes = [] def register_mutator(cls): mutator_classes.append(cls) return cls class Mutator(object): """ Base class for all mutators. All mutators are based on this class, and must provide a `mutate` method, which performs some form of mutation on the input resource. Subclasses can be created which share some properties with others. Each mutator has a number of properties which can be used to select whether the mutator is of interest to the user or not. `name` - describes the mutator `types` - provides a set of named types of mutations that the class performs. these types can be used to filter out uninteresting mutations. """ name = None types = set([]) def __init__(self, corpus): self.corpus = corpus @staticmethod def _rand(n): if n == 1 or n == 0: return 0 return random.randint(0, n-1) @classmethod def _choose_len(cls, n): x = cls._rand(100) if x < 90: return cls._rand(min(8, n)) + 1 elif x < 99: return cls._rand(min(32, n)) + 1 else: return cls._rand(n) + 1 @staticmethod def copy(dst, src, start_dst, start_src, end_dst=None, end_src=None): """ Copy of content from one slice of a source object to a destination object. dst and src may be the same object. """ end_src = len(src) if end_src is None else end_src end_dst = len(dst) if end_dst is None else end_dst byte_to_copy = min(end_src-start_src, end_dst-start_dst) dst[start_dst:start_dst+byte_to_copy] = src[start_src:start_src+byte_to_copy] def mutate(self, res): """ Function to mutate a given resource into another one. @return: new resource, or None if this mutator is not appropriate. """ raise NotImplementedError('mutate not implemented in {}'.format(self.__class__.__name__)) @register_mutator class MutatorRemoveRange(Mutator): name = 'Remove a range of bytes' types = set(['byte', 'remove']) def mutate(self, res): if len(res) < 2: # Originally this checked the size of the corpus; we merely check whether the # resource is long. If not, we give up. return None pos0 = self._rand(len(res)) num_to_remove = self._choose_len(len(res) - pos0) pos1 = pos0 + num_to_remove self.copy(res, res, pos0, pos1) return res[:len(res) - num_to_remove] @register_mutator class MutatorInsertBytes(Mutator): name = 'Insert a range of random bytes' types = set(['byte', 'insert']) def mutate(self, res): pos = self._rand(len(res) + 1) n = self._choose_len(10) for k in range(n): res.append(0) self.copy(res, res, pos+n, pos) for k in range(n): res[pos+k] = self._rand(256) return res @register_mutator class MutatorDuplicateBytes(Mutator): name = 'Duplicate a range of bytes' types = set(['byte', 'duplicate']) def mutate(self, res): if len(res) <= 1: return None src = self._rand(len(res)) dst = self._rand(len(res)) while src == dst: dst = self._rand(len(res)) n = self._choose_len(len(res) - src) tmp = bytearray(res[src:src+n]) for k in range(n): res.append(0) self.copy(res, res, dst+n, dst) for k in range(n): res[dst+k] = tmp[k] return res @register_mutator class MutatorCopyBytes(Mutator): # FIXME: Check how this diffs from DuplicateBytes name = 'Copy a range of bytes' types = set(['byte', 'copy']) def mutate(self, res): if len(res) <= 1: return None src = self._rand(len(res)) dst = self._rand(len(res)) while src == dst: dst = self._rand(len(res)) n = self._choose_len(len(res) - src) self.copy(res, res, src, dst, src+n) return res @register_mutator class MutatorBitFlip(Mutator): name = 'Bit flip' types = set(['bit', 'replace']) def mutate(self, res): if len(res) == 0: return None pos = self._rand(len(res)) res[pos] ^= 1 << self._rand(8) return res @register_mutator class MutatorRandomiseByte(Mutator): name = 'Set a byte to a random value.' types = set(['byte', 'replace']) def mutate(self, res): if len(res) == 0: return None pos = self._rand(len(res)) # We use rand(255) + 1 so that there is no `^ 0` applied to the byte; it always changes. res[pos] ^= self._rand(255) + 1 return res @register_mutator class MutatorSwapBytes(Mutator): name = 'Swap 2 bytes' types = set(['byte', 'swap']) def mutate(self, res): if len(res) <= 1: return None src = self._rand(len(res)) dst = self._rand(len(res)) while src == dst: dst = self._rand(len(res)) res[src], res[dst] = res[dst], res[src] return res @register_mutator class MutatorAddSubByte(Mutator): name = 'Add/subtract from a byte' types = set(['byte', 'addsub']) def mutate(self, res): if len(res) == 0: return None pos = self._rand(len(res)) v = self._rand(2**8) res[pos] = (res[pos] + v) % 256 return res @register_mutator class MutatorAddSubShort(Mutator): name = 'Add/subtract from a uint16' types = set(['short', 'addsub']) def mutate(self, res): if len(res) < 2: return None pos = self._rand(len(res) - 1) v = self._rand(2**16) if bool(random.getrandbits(1)): v = struct.pack('>H', v) else: v = struct.pack('I', v) else: v = struct.pack('Q', v) else: v = struct.pack('H', v) else: v = struct.pack('I', v) else: v = struct.pack(' everything's fine! if mutators_filter is None: return True # First check that the required mutator types are set for f in required_filters: if f not in cls.types: return False # Now remove any that are not allowed for f in negative_filters: if f in cls.types: return False return True # Construct an object for each mutator we can use self.mutators = [cls(self) for cls in mutator_classes if acceptable(cls)] if not self.mutators: raise CorpusError("No mutators are available") def __repr__(self): return "<{}(corpus of {}, {} mutators)>".format(self.__class__.__name__, len(self._inputs), len(self.mutators)) def _add_file(self, path): with open(path, 'rb') as f: self._inputs.append(bytearray(f.read())) @property def length(self): return len(self._inputs) @staticmethod def _rand(n): if n == 1 or n == 0: return 0 return random.randint(0, n-1) # Exp2 generates n with probability 1/2^(n+1). @staticmethod def _rand_exp(): rand_bin = bin(random.randint(0, 2**32-1))[2:] rand_bin = '0'*(32 - len(rand_bin)) + rand_bin count = 0 for i in rand_bin: if i == '0': count +=1 else: break return count def put(self, buf): self._inputs.append(buf) if self._save_corpus: m = hashlib.sha256() m.update(buf) fname = os.path.join(self._dirs[0], m.hexdigest()) with open(fname, 'wb') as f: f.write(buf) def generate_input(self): if not self._seed_run_finished: next_input = self._inputs[self._seed_idx] self._seed_idx += 1 if self._seed_idx >= len(self._inputs): self._seed_run_finished = True return next_input buf = self._inputs[self._rand(len(self._inputs))] return self.mutate(buf) def mutate(self, buf): res = buf[:] nm = self._rand_exp() #print("Start with {}".format(res)) for i in range(nm): # Select a mutator from those we can apply # We'll try up to 20 times, but if we don't find a # suitable mutator after that, we'll just give up. for n in range(20): x = self._rand(len(self.mutators)) mutator = self.mutators[x] #print("Mutate with {}".format(mutator.__class__.__name__)) newres = mutator.mutate(res) if newres is not None: break if newres is not None: res = newres if len(res) > self._max_input_size: res = res[:self._max_input_size] return res