Source code for mmtfPyspark.ml.proteinSequenceEncoder

'''proteinSequenceEncoder.py

This class encodes a protein sequence into a feature vector.
The protein sequence must be present in the input data set, the default column
name is "sequence". The default column name for the feature vector is "features".

'''
__author__ = "Mars (Shih-Cheng) Huang"
__maintainer__ = "Mars (Shih-Cheng) Huang"
__email__ = "marshuang80@gmail.com"
__version__ = "0.2.0"
__status__ = "Done"

from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.linalg import VectorUDT
from pyspark.ml.feature import Word2Vec, Word2VecModel
from mmtfPyspark.ml import sequenceNgrammer


[docs]class ProteinSequenceEncoder(object): ''' This class encodes a protein sequence into a feature vector. The protein sequence must be present in the input data set, the default column name is "sequence". The default column name for the feature vector is "features". Attributes ---------- data : DataFrame input data to be encoded [None] inputCol : str name of the input column [sequence] outputCol : str name of the output column [features] ''' model = None AMINO_ACIDS21 = ['A', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'K', 'L', 'M', \ 'N', 'P', 'Q', 'R', 'S', 'T', 'V', 'W', 'X', 'Y'] properties = { 'A' : [1.28,0.05,1.00,0.31,6.11,0.42,0.23], 'G' : [0.00,0.00,0.00,0.00,6.07,0.13,0.15], 'V' : [3.67,0.14,3.00,1.22,6.02,0.27,0.49], 'L' : [2.59,0.19,4.00,1.70,6.04,0.39,0.31], 'I' : [4.19,0.19,4.00,1.80,6.04,0.30,0.45], 'F' : [2.94,0.29,5.89,1.79,5.67,0.30,0.38], 'Y' : [2.94,0.30,6.47,0.96,5.66,0.25,0.41], 'W' : [3.21,0.41,8.08,2.25,5.94,0.32,0.42], 'T' : [3.03,0.11,2.60,0.26,5.60,0.21,0.36], 'S' : [1.31,0.06,1.60,-0.04,5.70,0.20,0.28], 'R' : [2.34,0.29,6.13,-1.01,10.74,0.36,0.25], 'K' : [1.89,0.22,4.77,-0.99,9.99,0.32,0.27], 'H' : [2.99,0.23,4.66,0.13,7.69,0.27,0.30], 'D' : [1.60,0.11,2.78,-0.77,2.95,0.25,0.20], 'E' : [1.56,0.15,3.78,-0.64,3.09,0.42,0.21], 'N' : [1.60,0.13,2.95,-0.60,6.52,0.21,0.22], 'Q' : [1.56,0.18,3.95,-0.22,5.65,0.36,0.25], 'M' : [2.35,0.22,4.43,1.23,5.71,0.38,0.32], 'P' : [2.67,0.00,2.72,0.72,6.80,0.13,0.34], 'C' : [1.77,0.13,2.43,1.54,6.35,0.17,0.41], 'X' : [0.00,0.00,0.00,0.00,0.00,0.00,0.00], } # Source: https://ftp.ncbi.nih.gov/repository/blocks/unix/blosum/BLOSUM/blosum62.blast.new blosum62 = { # A R N D C Q E G H I L K M F P S T W Y V 'A' : [ 4,-1,-2,-2, 0,-1,-1, 0,-2,-1,-1,-1,-1,-2,-1, 1, 0,-3,-2, 0], 'R' : [-1, 5, 0,-2,-3, 1, 0,-2, 0,-3,-2, 2,-1,-3,-2,-1,-1,-3,-2,-3], 'N' : [-2, 0, 6, 1,-3, 0, 0, 0, 1,-3,-3, 0,-2,-3,-2, 1, 0,-4,-2,-3], 'D' : [-2,-2, 1, 6,-3, 0, 2,-1,-1,-3,-4,-1,-3,-3,-1, 0,-1,-4,-3,-3], 'C' : [ 0,-3,-3,-3, 9,-3,-4,-3,-3,-1,-1,-3,-1,-2,-3,-1,-1,-2,-2,-1], 'Q' : [-1, 1, 0, 0,-3, 5, 2,-2, 0,-3,-2, 1, 0,-3,-1, 0,-1,-2,-1,-2], 'E' : [-1, 0, 0, 2,-4, 2, 5,-2, 0,-3,-3, 1,-2,-3,-1, 0,-1,-3,-2,-2], 'G' : [ 0,-2, 0,-1,-3,-2,-2, 6,-2,-4,-4,-2,-3,-3,-2, 0,-2,-2,-3,-3], 'H' : [-2, 0, 1,-1,-3, 0, 0,-2, 8,-3,-3,-1,-2,-1,-2,-1,-2,-2, 2,-3], 'I' : [-1,-3,-3,-3,-1,-3,-3,-4,-3, 4, 2,-3, 1, 0,-3,-2,-1,-3,-1, 3], 'L' : [-1,-2,-3,-4,-1,-2,-3,-4,-3, 2, 4,-2, 2, 0,-3,-2,-1,-2,-1, 1], 'K' : [-1, 2, 0,-1,-3, 1, 1,-2,-1,-3,-2, 5,-1,-3,-1, 0,-1,-3,-2,-2], 'M' : [-1,-1,-2,-3,-1, 0,-2,-3,-2, 1, 2,-1, 5, 0,-2,-1,-1,-1,-1, 1], 'F' : [-2,-3,-3,-3,-2,-3,-3,-3,-1, 0, 0,-3, 0, 6,-4,-2,-2, 1, 3,-1], 'P' : [-1,-2,-2,-1,-3,-1,-1,-2,-2,-3,-3,-1,-2,-4, 7,-1,-1,-4,-3,-2], 'S' : [ 1,-1, 1, 0,-1, 0, 0, 0,-1,-2,-2, 0,-1,-2,-1, 4, 1,-3,-2,-2], 'T' : [ 0,-1, 0,-1,-1,-1,-1,-2,-2,-1,-1,-1,-1,-2,-1, 1, 5,-2,-2, 0], 'W' : [-3,-3,-4,-4,-2,-2,-3,-2,-2,-3,-2,-3,-1, 1,-4,-3,-2,11, 2,-3], 'Y' : [-2,-2,-2,-3,-2,-1,-2,-3, 2,-1,-1,-2,-1, 3,-3,-2,-2, 2, 7,-1], 'V' : [ 0,-3,-3,-3,-1,-2,-2,-3,-3, 3, 1,-2, 1,-1,-2,-2, 0,-3,-1, 4], 'X' : [-4,-4,-4,-4,-4,-4,-4,-4,-4,-4,-4,-4,-4,-4,-4,-4,-4,-4,-4,-4], } def __init__(self, data = None, inputCol = "sequence", outputCol = "features"): self.data = data self.inputCol = inputCol self.outputCol = outputCol
[docs] def one_hot_encode(self, data = None, inputCol = None, outputCol = None): ''' One-hot encodes a protein sequence. The one-hot encoding encodes the 20 natural amino acids, plus X for any other residue for a total of 21 elements per residue. Parameters ---------- data : DataFrame input data to be encoded [None] inputCol : str name of the input column [None] outputCol : str name of the output column [None] ''' # Setting class variables if data is not None: self.data = data if inputCol is not None: self.inputCol = inputCol if outputCol is not None: self.outputCol = outputCol if self.data is None: raise ValueError("Class variable data is not defined, please pass\ in a dataframe into the data parameter") session = SparkSession.builder.getOrCreate() AMINO_ACIDS21 = self.AMINO_ACIDS21 # Encoder function to be passed as User Defined Function (UDF) def _encoder(s): values = [0] * len(AMINO_ACIDS21) * len(s) for i in range(len(s)): if s[i] in AMINO_ACIDS21: index = AMINO_ACIDS21.index(s[i]) else: index = AMINO_ACIDS21.index('X') values[i*len(AMINO_ACIDS21) + index] = 1 return Vectors.dense(values) session.udf.register("encoder", _encoder, VectorUDT()) self.data.createOrReplaceTempView("table") sql = f"SELECT *, encoder({self.inputCol}) AS {self.outputCol} from table" data = session.sql(sql) return data
[docs] def property_encode(self, data = None, inputCol = None, outputCol = None): '''Encodes a protein sequence by 7 physicochemical properties References ---------- Meiler, J., Müller, M., Zeidler, A. et al. J Mol Model (2001) https://link.springer.com/article/10.1007/s008940100038 Parameters ---------- data : DataFrame input data to be encoded [None] inputCol : str name of the input column [None] outputCol : str name of the output column [None] Returns ------- dataset dataset with feature vector appended ''' # Setting class variables if data is not None: self.data = data if inputCol is not None: self.inputCol = inputCol if outputCol is not None: self.outputCol = outputCol if self.data is None: raise ValueError("Class variable data is not defined, please pass\ in a dataframe into the data parameter") session = SparkSession.builder.getOrCreate() properties = self.properties #Encoder function to be passed as User Defined Function (UDF) def _encoder(s): values = [] for i in range(len(s)): if s[i] in properties: values += properties[s[i]] return Vectors.dense(values) session.udf.register("encoder", _encoder, VectorUDT()) self.data.createOrReplaceTempView("table") sql = f"SELECT *, encoder({self.inputCol}) AS {self.outputCol} from table" data = session.sql(sql) return data
[docs] def blosum62_encode(self, data = None, inputCol = None, outputCol = None): '''Encodes a protein sequence by 7 Blosum62 References ---------- Blosum Matrix https://ftp.ncbi.nih.gov/repository/blocks/unix/blosum/BLOSUM/blosum62.blast.new Parameters ---------- data : DataFrame input data to be encoded [None] inputCol : str name of the input column [None] outputCol : str name of the output column [None] Returns ------- dataset dataset with feature vector appended ''' if data is not None: self.data = data if inputCol is not None: self.inputCol = inputCol if outputCol is not None: self.outputCol = outputCol if self.data is None: raise ValueError("Class variable data is not defined, please pass\ in a dataframe into the data parameter") session = SparkSession.builder.getOrCreate() blosum62 = self.blosum62 #Encoder function to be passed as User Defined Function (UDF) def _encoder(s): values = [] for i in range(len(s)): if s[i] in blosum62: values += blosum62[s[i]] return Vectors.dense(values) session.udf.register("encoder", _encoder, VectorUDT()) self.data.createOrReplaceTempView("table") sql = f"SELECT *, encoder({self.inputCol}) AS {self.outputCol} from table" data = session.sql(sql) return data
[docs] def overlapping_ngram_word2vec_encode(self, data = None, inputCol = None, outputCol = None, n = None, windowSize = None, vectorSize = None, fileName = None, sc = None): '''Encodes a protein sequence by converting it into n-grams and then transforming it into a Word2Vec feature vector. If given word2Vec file name, then this function encodes a protein sequence by converting it into n-grams and then transforming it using pre-trained word2Vec model read from that file Parameters ---------- data : DataFrame input data to be encoded [None] inputCol : str name of the input column [None] outputCol : str name of the output column [None] n : int The number of words in an n-gram [None] windowSize : int width of the window used to slide across the squence, context words from -window to window [None] vectorSize :int dimension of the feature vector [None] fileName : str filename of Word2Vec model [None] Returns ------- dataset dataset with features vector added to original dataset ''' if data is not None: self.data = data if inputCol is not None: self.inputCol = inputCol if outputCol is not None: self.outputCol = outputCol if self.data is None: raise ValueError("Class variable data is not defined, please pass\ in a dataframe into the data parameter") # Create n-grams out of the sequence # E.g., 2-gram IDCGH, ... =>[ID, DC, CG, GH, ...] data = sequenceNgrammer.ngram(self.data, n, "ngram") if not (n == None and windowSize == None and vectorSize == None): # Convert n-grams to W2V freature vector # [ID, DC, CG, GH, ...] => [0.1234, 0.2394, ...] word2Vec = Word2Vec() word2Vec.setInputCol("ngram") \ .setOutputCol(self.outputCol) \ .setNumPartitions(8) \ .setWindowSize(windowSize) \ .setVectorSize(vectorSize) \ self.model = word2Vec.fit(data) elif fileName != None and sc != None: reader = Word2VecModel() self.model = reader.load(sc, fileName) print(f"model file : {fileName} \n \ inputCol : {self.model.getInputCol()} \n \ windowSize : {self.model.getWindowSize()} \n \ vectorSize : {self.model.getVectorSize()}") self.model.setOutputCol(self.outputCol) else: raise Exception("Either provide word2Vec file (filename) + SparkContext (sc), \ or number of words(n) + window size(windowSize) \ + vector size (vetorSize), for function parameters") return return self.model.transform(data)
[docs] def shifted_3gram_word2vec_encode(self, data = None, inputCol = None, outputCol = None, windowSize = None, vectorSize = None, fileName = None, sc = None): '''Encodes a protein sequence as three non-overlapping 3-grams, trains a Word2Vec model on the 3-grams, and then averages the three resulting freature vectors. Parameters ---------- data : DataFrame input data to be encoded [None] inputCol : str name of the input column [None] outputCol : str name of the output column [None] windowSize : int width of the window used to slide across the sequence context words from -window to window vectorSize : int dimension of the feature vector [None] fileName : str filename of Word2VecModel [None] sc : SparkContext spark context [None] Returns ------- dataset dataset with features vector added to original dataset References ---------- Asgari E, Mofrad MRK (2015) Continuous Distributed Representation of Biological Sequences for Deep Proteomics and Genomics. PLOS ONE 10(11): e0141287. doi: https://doi.org/10.1371/journal.pone.0141287 ''' if data is not None: self.data = data if inputCol is not None: self.inputCol = inputCol if outputCol is not None: self.outputCol = outputCol if self.data is None: raise ValueError("Class variable data is not defined, please pass\ in a dataframe into the data parameter") # Create n-grams out of the sequence # e.g., 2-gram [IDCGH, ...] => [ID. DC, CG, GH,...] data = sequenceNgrammer.shifted_ngram(self.data, 3, 0, "ngram0") data = sequenceNgrammer.shifted_ngram(data, 3, 1, "ngram1") data = sequenceNgrammer.shifted_ngram(data, 3, 2, "ngram2") if not (windowSize == None and vectorSize == None): ngram0 = data.select("ngram0").withColumnRenamed("ngram0","ngram") ngram1 = data.select("ngram1").withColumnRenamed("ngram1","ngram") ngram2 = data.select("ngram2").withColumnRenamed("ngram2","ngram") ngrams = ngram0.union(ngram1).union(ngram2) # Convert n-grams to W2V feature vector # [I D, D C, C G, G H, ... ] => [0.1234, 0.2394, .. ] word2Vec = Word2Vec() word2Vec.setInputCol("ngram") \ .setOutputCol("feature") \ .setMinCount(10) \ .setNumPartitions(8) \ .setWindowSize(windowSize) \ .setVectorSize(vectorSize) self.model = word2Vec.fit(ngrams) elif fileName != None and sc != None: reader = Word2VecModel() self.model = reader.load(sc, fileName) print(f"model file : {fileName} \n \ inputCol : {self.model.getInputCol()} \n \ windowSize : {self.model.getWindowSize()} \n \ vectorSize : {self.model.getVectorSize()}") else: raise Exception("Either provide word2Vec file (filename) + SparkContext (sc), \ or window size(windowSize) + vector size (vetorSize), \ for function parameters") return #data = data.withColumn("feature0",self.model.transform(data.select('ngram0').withColumnRenamed("ngram0","ngram"))) for i in reversed(range(3)): feature = self.model.transform(data.select('ngram' + str(i)).withColumnRenamed("ngram" + str(i),"ngram")) data = data.join(feature.withColumnRenamed("ngram","ngram" + str(i)), "ngram" + str(i)) data = data.withColumnRenamed("feature", "feature" + str(i)) data = self._average_feature_vectors(data, self.outputCol) data.printSchema() cols = ['structureChainId','sequence','labelQ8','labelQ3','ngram0','ngram1',\ 'ngram2','feature0','feature1','feature2', 'features'] data = data.select(cols) return data
[docs] def get_word2vec_model(self): '''Returns a Word2VecModel created by overlapping_ngram_word2vec_encode() Returns ------- model overlapping Ngram Word2VecModel if available, otherwise None ''' return self.model
def _average_feature_vectors(self, data, outputCol): '''Average the feature vectors Parameters ---------- data : DataFrame input dataframe outputCol : str name of the output column ''' session = SparkSession.builder.getOrCreate() def _averager(v1, v2, v3): f1 = v1.toArray() f2 = v2.toArray() f3 = v3.toArray() length = min(len(f1), len(f2), len(f3)) average = [] for i in range(length): average.append((f1[i] + f2[i] + f3[i])/3.0) return Vectors.dense(average) session.udf.register("averager", _averager, VectorUDT()) data.createOrReplaceTempView("table") sql = f"SELECT *, averager(feature0, feature1, feature2) AS {self.outputCol} from table" data = session.sql(sql) return data