Source code for squid.mave

import numpy as np
import random


[docs] class InSilicoMAVE(): """Module for performing in silico MAVE. Parameters ---------- mut_generator : class Module for performing random mutagenesis. pred_generator : class Module for inferring model predictions. seq_length : int Full length L of input sequence. mut_window : [int, int] Index of start and stop position along sequence to probe; i.e., [start, stop], where start < stop and both entries satisfy 0 <= int <= L. context_agnostic : boole Option for generating global neighborhoods, such that the sequence surrounding a conserved pattern of interest is randomly mutated across the in silico MAVE dataset inter_window : [int, int] or [[int, int], [int, int], ...] Index of start and stop position of each inter-site window, where each window defines the boundaries of the sequence in between two sites of interest (optional, for 'context_agnostic') save_window : [int <= mut_window[0], int >= mut_window[1]] Window used for delimiting sequences that are exported in 'x_mut' array; if used, the 'save_window' interval must be equal to or larger 'mut_window', and if larger, 'save window' must contain the interval 'mut_window' entirely alphabet : list The alphabet used to determine the C characters in the logo such that each entry is a string; e.g., ['A','C','G','T'] for DNA. """ def __init__(self, mut_generator, mut_predictor, seq_length, mut_window=None, context_agnostic=False, inter_window=None, save_window=None, alphabet=['A','C','G','T']): self.mut_generator = mut_generator self.mut_predictor = mut_predictor self.seq_length = seq_length self.mut_window = mut_window if mut_window is not None: self.start_position = mut_window[0] self.stop_position = mut_window[1] else: self.start_position = 0 self.stop_position = seq_length mut_window = [self.start_position, self.stop_position] self.context_agnostic = context_agnostic self.inter_window = inter_window if save_window is not None and mut_window is not None: if (save_window[0] > mut_window[0]) or (save_window[1] < mut_window[1]): save_window = None print("Conflict found in 'save_window' interval, setting to None.") self.save_window = save_window self.alphabet = alphabet
[docs] def generate(self, x, num_sim, seed=None, verbose=1): """Randomly mutate segments in a set of one-hot DNA sequences. Parameters ---------- x : torch.Tensor One-hot sequence (shape: (L,A)). num_sim : int Number of sequences to mutagenize for in silico MAVE. seed : int, optional Sets the random number seed. Returns ------- x_mut : numpy.ndarray Sequences simulated by mut_predictor. y_mut : numpy.ndarray Inferred predictions for sequences (shape: (N,1)). """ if seed: np.random.seed(seed) if verbose: print('') print('Building in silico MAVE...') # generate in silico MAVE based on mutagenesis strategy if self.mut_window is not None: x_window = self.delimit_range(x, self.start_position, self.stop_position) x_mut = self.mut_generator(x_window, num_sim) if self.context_agnostic: x_mut = self.pad_seq_random(x_mut, x, self.start_position, self.stop_position) if self.save_window: print("Variable 'save_window' not yet implemented for 'context_agnostic' mode.") # optional: perform global mutagenesis in between two (or more) sites of interest if self.inter_window is not None: num_inter_windows = sum(isinstance(i, list) for i in self.inter_window) # count number of inter-site windows if num_inter_windows == 0: num_inter_windows += 1 self.inter_window = [self.inter_window] for w in range(num_inter_windows): w_start = self.inter_window[w][0] w_stop = self.inter_window[w][1] x_mut = self.pad_seq_random(x_mut, x, w_start, w_stop, inter=True) else: x_mut = self.pad_seq(x_mut, x, self.start_position, self.stop_position, self.save_window) if self.mut_predictor is None: # skip inference y_mut = None else: # required for surrogate modeling y_mut = self.mut_predictor(x_mut, x, self.save_window) else: x_mut = self.mut_generator(x, num_sim) if self.mut_predictor is None: # skip inference y_mut = None else: # required for surrogate modeling y_mut = self.mut_predictor(x_mut, x, self.save_window) return x_mut, y_mut
[docs] def pad_seq(self, x_mut, x, start_position, stop_position, save_window): """Function to pad mutated sequences on both sides with the original unmutated context. Parameters ---------- x_mut : numpy.ndarray Sequences with randomly mutated segments with length l < L defined by l = stop_position - start_position (shape: (N,l,C)). x : torch.Tensor Batch of one-hot sequences (shape: (L,A)). start_position : int Index of start position along sequence to probe. stop_position : int Index of stop position along sequence to probe. Returns ------- numpy.ndarray Sequences with randomly mutated segments, padded to correct shape with random DNA (shape: (N,L,C)). """ N = x_mut.shape[0] x = x[np.newaxis,:].astype('uint8') #x_start = np.tile(x[:,:start_position,:], (N,1,1)) # high memory use #x_stop = np.tile(x[:,stop_position:,:], (N,1,1)) # high memory use if save_window is None: x_start = np.broadcast_to(x[:,:start_position,:], (N,start_position,x.shape[2])) x_stop = np.broadcast_to(x[:,stop_position:,:], (N,x.shape[1]-stop_position,x.shape[2])) else: x_start = np.broadcast_to(x[:,save_window[0]:start_position,:], (N,start_position-save_window[0],x.shape[2])) x_stop = np.broadcast_to(x[:,stop_position:save_window[1],:], (N,save_window[1]-stop_position,x.shape[2])) return np.concatenate([x_start, x_mut, x_stop], axis=1)
[docs] def pad_seq_random(self, x_mut, x, start_position, stop_position, dinuc=False, inter=False): """Function to pad mutated sequences on both sides with random DNA. Parameters ---------- x_mut : numpy.ndarray Sequences with randomly mutated segments with length l < L defined by l = stop_position - start_position (shape: (N,l,C)). x : torch.Tensor Batch of one-hot sequences (shape: (L,A)). start_position : int Index of start position along sequence to probe. stop_position : int Index of stop position along sequence to probe. dinuc : boole Perform mutagenesis by random shuffle (False) or dinucleotide shuffle (True). inter : boole Pad sequence to the left and right of 'mut_window' (False) or within 'inter_window' (True) Returns ------- numpy.ndarray Sequences with randomly mutated segments, padded to correct shape with random DNA (shape: (N,L,C)). """ x = x.astype('uint8') N = x_mut.shape[0] if dinuc is False: x_shuffle = random_shuffle(x, self.alphabet, num_shufs=N) else: x_shuffle = dinuc_shuffle(x, num_shufs=N) if inter is False: x_padded = np.concatenate([ x_shuffle[:,:start_position,:], x_mut, x_shuffle[:,stop_position:,:]], axis=1) else: x_padded = np.concatenate([x_mut[:,:start_position,:], x_shuffle[:,start_position:stop_position,:], x_mut[:,stop_position:,:]], axis=1) '''else: x_padded = x_shuffle.copy() # Start with shuffled sequence # Insert pattern but preserve the region to randomize x_padded[:, self.start_position:start_position, :] = x_mut[:, :start_position-self.start_position, :] x_padded[:, stop_position+1:self.stop_position, :] = x_mut[:, stop_position-self.start_position+1:, :] ''' return x_padded
[docs] def delimit_range(self, x, start_position, stop_position): """Function to delimit sequence to a specific region. Parameters ---------- x : torch.Tensor Batch of one-hot sequences (shape: (L,A)). start_position : int Index of start position along sequence to probe. stop_position : int Index of stop position along sequence to probe. Returns ------- numpy.ndarray Delimited sequences with length l < L defined by l = stop_position - start_position (shape: (N,l,C)). """ return x[start_position:stop_position,:]
#------------------------------------------------------------------------------------ # Useful shuffle function #------------------------------------------------------------------------------------
[docs] def random_shuffle(seq, alphabet=['A','C','G','T'], num_shufs=None, rng=None): """Creates random shuffles with equiprobability of characters at each position Parameters ---------- seq : ndarray one-hot encoding of sequence num_shufs : int the number of shuffles to create; if unspecified, only one shuffle will be created Returns ------- ndarray ndarray of shuffled versions of 'seq' (shape=(N,L,D)), also one-hot encoded If 'num_shufs' is not specified, then the first dimension of N will not be present (i.e. a single string will be returned, or an LxD array). """ def seq2oh(seq, alphabet=['A','C','G','T']): L = len(seq) one_hot = np.zeros(shape=(L,len(alphabet)), dtype=np.float32) for idx, i in enumerate(seq): for jdx, j in enumerate(alphabet): if i == j: one_hot[idx,jdx] = 1 return one_hot if num_shufs is None: num_shufs = 1 seqs = np.zeros(shape=(num_shufs,seq.shape[0],seq.shape[1])) for seq_idx in range(num_shufs): random_seq = ''.join(random.choices(str(''.join(alphabet)), k=seq.shape[0])) seqs[seq_idx,:,:] = seq2oh(random_seq, alphabet) return seqs
# taken from https://github.com/kundajelab/deeplift/blob/master/deeplift/dinuc_shuffle.py
[docs] def dinuc_shuffle(seq, num_shufs=None, rng=None): """Creates shuffles of the given sequence, in which dinucleotide frequencies are preserved. Parameters ---------- seq : str or ndarray either a string of length L, or an L x D NumPy array of one-hot encodings num_shufs : int the number of shuffles to create, N; if unspecified, only one shuffle will be created `rng`: a NumPy RandomState object, to use for performing shuffles Returns ------- list (if 'seq' is string) List of N strings of length L, each one being a shuffled version of 'seq' ndarray (if 'seq' is ndarray) ndarray of shuffled versions of 'seq' (shape=(N,L,D)), also one-hot encoded If 'num_shufs' is not specified, then the first dimension of N will not be present (i.e. a single string will be returned, or an LxD array). """ def string_to_char_array(seq): """ Converts an ASCII string to a NumPy array of byte-long ASCII codes. e.g. "ACGT" becomes [65, 67, 71, 84]. """ return np.frombuffer(bytearray(seq, "utf8"), dtype=np.int8) def char_array_to_string(arr): """ Converts a NumPy array of byte-long ASCII codes into an ASCII string. e.g. [65, 67, 71, 84] becomes "ACGT". """ return arr.tostring().decode("ascii") def one_hot_to_tokens(one_hot): """ Converts an L x D one-hot encoding into an L-vector of integers in the range [0, D], where the token D is used when the one-hot encoding is all 0. This assumes that the one-hot encoding is well-formed, with at most one 1 in each column (and 0s elsewhere). """ tokens = np.tile(one_hot.shape[1], one_hot.shape[0]) # Vector of all D seq_inds, dim_inds = np.where(one_hot) tokens[seq_inds] = dim_inds return tokens def tokens_to_one_hot(tokens, one_hot_dim): """ Converts an L-vector of integers in the range [0, D] to an L x D one-hot encoding. The value `D` must be provided as `one_hot_dim`. A token of D means the one-hot encoding is all 0s. """ identity = np.identity(one_hot_dim + 1)[:, :-1] # Last row is all 0s return identity[tokens] if type(seq) is str: arr = string_to_char_array(seq) elif type(seq) is np.ndarray and len(seq.shape) == 2: seq_len, one_hot_dim = seq.shape arr = one_hot_to_tokens(seq) else: raise ValueError("Expected string or one-hot encoded array") if not rng: rng = np.random.RandomState() # Get the set of all characters, and a mapping of which positions have which # characters; use `tokens`, which are integer representations of the # original characters chars, tokens = np.unique(arr, return_inverse=True) # For each token, get a list of indices of all the tokens that come after it shuf_next_inds = [] for t in range(len(chars)): mask = tokens[:-1] == t # Excluding last char inds = np.where(mask)[0] shuf_next_inds.append(inds + 1) # Add 1 for next token if type(seq) is str: all_results = [] else: all_results = np.empty( (num_shufs if num_shufs else 1, seq_len, one_hot_dim), dtype=seq.dtype ) for i in range(num_shufs if num_shufs else 1): # Shuffle the next indices for t in range(len(chars)): inds = np.arange(len(shuf_next_inds[t])) inds[:-1] = rng.permutation(len(inds) - 1) # Keep last index same shuf_next_inds[t] = shuf_next_inds[t][inds] counters = [0] * len(chars) # Build the resulting array ind = 0 result = np.empty_like(tokens) result[0] = tokens[ind] for j in range(1, len(tokens)): t = tokens[ind] ind = shuf_next_inds[t][counters[t]] counters[t] += 1 result[j] = tokens[ind] if type(seq) is str: all_results.append(char_array_to_string(chars[result])) else: all_results[i] = tokens_to_one_hot(chars[result], one_hot_dim) return all_results if num_shufs else all_results[0]
if __name__ == "__main__": # Set random seed for reproducibility np.random.seed(42) random.seed(42) # Test setup
[docs] seq_length = 249
alphabet = ['A','C','G','T'] start_pos = seq_length//2 pattern = 'TGANNCA' # Create background sequence with pattern bg_left = random.choices(str(''.join(alphabet)), k=start_pos) bg_right = random.choices(str(''.join(alphabet)), k=seq_length - start_pos - len(pattern)) seq = ''.join(bg_left) + pattern + ''.join(bg_right) x_ref = np.array([[1 if c == base else 0 for base in alphabet] for c in seq], dtype=np.uint8) # Set up pattern window and inter window pattern_window = [start_pos, start_pos+len(pattern)] inter_window = [start_pos+3, start_pos+5] # NN positions in TGANNCA # Set up mutagenizer class RandomMutagenesis: def __init__(self, uniform=False, mut_rate=0, seed=42): self.uniform = uniform self.mut_rate = mut_rate if seed: np.random.seed(seed) def __call__(self, x, num_seqs): return np.tile(x[np.newaxis,:,:], (num_seqs,1,1)) mut_generator = RandomMutagenesis(uniform=False, mut_rate=0, seed=42) num_seqs = 10 # Create MAVE and generate sequences mave = InSilicoMAVE( mut_generator, None, seq_length, mut_window=pattern_window, inter_window=inter_window, alphabet=alphabet, context_agnostic=True ) x_mut, y_mut = mave.generate(x_ref, num_sim=num_seqs) # Print shapes for debugging print(f"x_ref shape: {x_ref.shape}") print(f"x_mut shape: {x_mut.shape}") print(f"\nOriginal pattern: {pattern}") print(f"Generated sequences at pattern position:") for seq in x_mut: pattern_seq = ''.join([alphabet[b] for b in np.argmax(seq[pattern_window[0]:pattern_window[1]], axis=1)]) print(pattern_seq)