Source code for mmtfPyspark.datasets.g2sDataset

#!/user/bin/env python
'''g2sDataset.py

This class maps human genetic variation positions to PDB structure positions.
Genomic positions must be specified for the hgvs-grch37 reference genome using
the HGVS sequence variant nomenclature.

References
----------
- HGVS sequence variant nomenclature http://varnomen.hgvs.org/
- G2S Web Services https://g2s.genomenexus.org/
- Juexin Wang, Robert Sheridan, S Onur Sumer, Nikolaus Schultz, Dong Xu, Jianjiong Gao; 
  G2S: a web-service for annotating genomic variants on 3D protein structures (2018) 
  Bioinformatics. https://doi.org/10.1093/bioinformatics/bty047

Examples
--------
>>> variantIds = ["chr7:g.140449098T>C", "chr7:g.140449100T>C"]
>>> ds = g2sDataset.get_position_dataset(variantIds, "3TV4", "A")
>>> ds.show()
+-----------+-------+-----------+------------+-----------+-------------------+
|structureId|chainId|pdbPosition|pdbAminoAcid|  refGenome|        variationId|
+-----------+-------+-----------+------------+-----------+-------------------+
|       3TV4|      A|        661|           N|hgvs-grch37|chr7:g.140449098T>C|
|       3TV4|      A|        660|           N|hgvs-grch37|chr7:g.140449100T>C|
+-----------+-------+-----------+------------+-----------+-------------------+

'''
__author__ = "Mars (Shih-Cheng) Huang"
__maintainer__ = "Mars (Shih-Cheng) Huang"
__email__ = "marshuang80@gmail.com"
__version__ = "0.2.0"
__status__ = "Done"

from pyspark.sql.functions import col, explode, upper
from pyspark.sql import SparkSession
from pyspark import SparkContext
from io import BytesIO
import requests

REFERENCE_GENOME = "hgvs-grch37"
G2S_REST_URL = "https://g2s.genomenexus.org/api/alignments/"+REFERENCE_GENOME+"/"


[docs]def get_position_dataset(variationIds, structureId = None, chainId = None): '''Downloads PDB residue mappings for a list of genomic variations Parameters ---------- variationIds : list genomic variation ids, e.g. chr7:g.140449103A>C structureId : str specific PDB structure used for mapping [None] chainId : str specific chain used for mapping [None] Returns ------- dataset dataset with PDB mapping information ''' dataset = _get_dataset(variationIds, structureId, chainId) if dataset == None: return None cols = ["structureId","chainId","pdbPosition","pdbAminoAcid","refGenome",\ "variationId"] return dataset.select(cols).distinct()
[docs]def get_full_dataset(variationIds, structureId = None, chainId = None): '''Downloads PDB residue mappings and alignment information for a list of genomic variations Parameters ---------- variationIds : list genomic variation ids, e.g. chr7:g.140449103A>C structureId : str specific PDB structure used for mapping [None] chainId : str specific chain used for mapping [None] Returns ------- dataset dataset with PDB mapping information ''' return _get_dataset(variationIds, structureId, chainId)
def _get_dataset(variationIds, structureId, chainId): '''Downloads PDB residue mappings for a list of genomic variations Parameters ---------- variationIds : list genomic variation ids, e.g. chr7:g.140449103A>C structureId : str specific PDB structure used for mapping chainId : str specific chain used for mapping Returns ------- dataset dataset with PDB mapping information ''' # Get a spark context spark = SparkSession.builder.getOrCreate() sc = spark.sparkContext # Download data in parallel data = sc.parallelize(variationIds) \ .flatMap(lambda m: _get_data(m, structureId, chainId)) # Convert Python Rdd to dataframe dataframe = spark.read.json(data) # Returns None if dataframe is empty if len(dataframe.columns) == 0: print("g2sDataset: no match found") return None dataframe = _standardize_data(dataframe) return _flatten_dataframe(dataframe) def _get_data(variationId, structureId, chainId): '''Downloads PDB residue mappings for a list of genomic variations''' data = [] if structureId is None and chainId is None: url = G2S_REST_URL + variationId + '/residueMapping' elif structureId is not None and chainId is not None: url = G2S_REST_URL + variationId + '/pdb/' + structureId + '_' \ + chainId + '/residueMapping' else: raise Exception("Both structureId and chainId have to be provided") try: req = requests.get(url) except: print(f"WARNING: could not load data for: {variationId}") return data if b"\"error\":\"Not Found\"" in req.content: print(f"WARNING: could not load data for: {variationId}") return data results = [inputStream.decode() for inputStream in BytesIO(req.content).readlines()] if results is not None and len(results[0]) > 100: results = _add_variant_id(results, REFERENCE_GENOME, variationId) data = results return data def _add_variant_id(json, refGenome, variationId): '''Adds reference genome and variationId to each json records Parameters ---------- json : list list of original json strings refGenome reference genome variationId : str variation identifier ''' ids = "\"refGenome\":\"" + refGenome + "\"," + "\"variationId\":\"" \ + variationId + "\"," + "\"alignmentId\"" return [j.replace("\"alignmentId\"", ids) for j in json] def _standardize_data(df): '''Standardize data and column names to be consistent''' return df.withColumn("structureId", upper(col("pdbId"))) \ .withColumnRenamed("chain", "chainId") def _flatten_dataframe(df): return df.withColumn("pdbPosition", explode(col("residueMapping.pdbPosition"))) \ .withColumn("pdbAminoAcid", explode(col("residueMapping.pdbAminoAcid")))