Source code for mmtfPyspark.datasets.myVariantDataset
#!/user/bin/env python
'''myVariantDataset.py
This class queries and retrieves missense variations using the MyVariant.info
web services for a list of UniProt ids.
References
----------
- For more information: http://myvariant.info
- Query syntax: http://myvariant.info/docs/
- Xin J, Mark A, Afrasiabi C, Tsueng G, Juchler M, Gopal N, Stupp GS, Putman TE, Ainscough BJ, Griffith OL, Torkamani A, Whetzel PL, Mungall CJ, Mooney SD, Su AI, Wu C (2016) High-performance web services for querying gene and variant annotation. Genome Biology 17(1):1-7. https://doi.org/10.1186/s13059-016-0953-9
Examples
--------
Get all missense variations for a list of Uniprot Ids:
>>> uniprotIds = ['P15056'] # BRAF
>>> ds = MyVariantDataset.get_variations(uniprotIds)
>>> ds.show()
Return missense variations that match a query
>>> uniprotIds = ['P15056'] # BRAF
>>> query = "clinivar.rcv.clinical_significance:pathogenic"
... + "OR linivar.rcv.clinical_significance:likely pathogenic"
>>> ds = MyVariantDataset.get_variations(uniprotIds, query)
>>> ds.show()
+-------------------+---------+
| variationId|uniprotId|
+-------------------+---------+
|chr7:g.140454006G>T| P15056|
|chr7:g.140453153A>T| P15056|
|chr7:g.140477853C>A| P15056|
+-------------------+---------+
'''
__author__ = "Mars (Shih-Cheng) Huang"
__maintainer__ = "Mars (Shih-Cheng) Huang"
__email__ = "marshuang80@gmail.com"
__version__ = "0.2.0"
__status__ = "Done"
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode
from pyspark import SparkContext
from io import BytesIO
import requests
MYVARIANT_QUERY_URL = "http://myvariant.info/v1/query?q="
MYVARIANT_SCROLL_URL = "http://myvariant.info/v1/query?scroll_id="
[docs]def get_variations(uniprotIds, query = ''):
'''Returns a dataset of missense variabtions for a list of Uniprot Ids and a
MyVariant.info query.
References
----------
query syntax http://myvariant.info/docs/
Examples
--------
>>> uniprotIds = ['P15056'] # BRAF
>>> query = "clinivar.rcv.clinical_significance:pathogenic"
... + "OR linivar.rcv.clinical_significance:likely pathogenic"
>>> ds = MyVariantDataset.get_variations(uniprotIds, query)
Parameters
----------
uniprotIds : list
list of Uniprot Ids
query : str
MyVariant.info query string ['']
Returns
-------
dataset
dataset with variation Ids and Uniprot Ids or null if no data are found
'''
# Get spark context
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
# Download data in paralle
data = sc.parallelize(uniprotIds).flatMap(lambda x: _get_data(x, query))
# Convert Python Rdd to dataframe
dataframe = spark.read.json(data)
# return null if dataset contains no results
if 'hits' not in dataframe.columns:
print("MyVariantDataset: no match found")
return None
return _flatten_dataframe(dataframe)
def _get_data(uniprotId, query):
'''get data from MyVariant
'''
data = []
# Create url
url = MYVARIANT_QUERY_URL \
+ "snpeff.ann.effect:missense_variant AND dbnsfp.uniprot.acc:" \
+ uniprotId
if query is not '':
url += f" AND ({query})"
url += "&fields=_id&fetch_all=true"
url = url.replace(" ","%20")
# Open url
try:
req = requests.get(url)
except:
print(f"WARNING: Counld not load data for {uniprotId}")
return data
inputStream = BytesIO(req.content)
# read results
# A maximum of 1000 records are returned per request
# Additional records are retrived iteratively using the scrollId
results = _read_results(inputStream)
# check if hits results are empty
if "[]" not in results:
results = _add_uniprot_id(results, uniprotId)
data.append(results)
scrollId = _get_scroll_id(results)
while scrollId is not None:
url = MYVARIANT_SCROLL_URL + scrollId
try:
req = requests.get(url)
except:
print(f"WARNING: Counld not load data for {uniprotId}")
continue
inputStream = BytesIO(req.content)
results = _read_results(inputStream)
if "No results to return" not in results:
results = _add_uniprot_id(results, uniprotId)
data.append(results)
scollId = _get_scroll_id(results)
else:
scrollId = None
return data
def _get_scroll_id(results):
'''Get the scroll id from results
'''
if "_scroll_id" in results:
return results.split("\"")[3]
return None
def _read_results(inputStream):
'''Converts data read from input stream into a single line of text
Parameters
----------
inputStream
input stream
Returns
-------
str
response
'''
sb = ''
for line in inputStream:
sb += line.decode()
return sb
def _add_uniprot_id(line, uniprotId):
ids = "\"uniprotId\":" + "\"" + uniprotId + "\"," + "\"hits\"";
return line.replace("\"hits\"", ids);
#ids = f"\"uniprotIds\":\"{uniprotId}\",\n \"_id\""
#return line.replace("\"_id\"", ids)
def _flatten_dataframe(df):
return df.withColumn("variationId", explode(df.hits._id)) \
.select(col("variationId"), col("uniprotId"))