Source code for mmtfPyspark.datasets.customReportService
#!/user/bin/env python
"""customReportService.py
This class uses RCSB PDB Tabular Report RESTful webservices to retrieve
metadata and annotations for all current entries in the ProteinDataBank.
References
----------
- List of supported fieldnames: http://www.rcsb.org/pdb/results/reportField.do
- The RCSB Protein Data Bank:redesignedwebsiteandwebservices2011NucleicAcidsRes.39:D392-D401. https://dx.doi.org/10.1093/nar/gkq1021
Examples
--------
Retrieve PubMedCentral, PubMedID, and Depositiondate:
>>> ds = customReportService.get_dataset("pmc","pubmedId","depositionDate")
>>> ds.printSchema()
>>> ds.show(5)
"""
__author__ = "Mars (Shih-Cheng) Huang"
__maintainer__ = "Mars (Shih-Cheng) Huang"
__email__ = "marshuang80@gmail.com"
__version__ = "0.2.0"
__status__ = "Done"
import tempfile
import urllib
from urllib import request
from pyspark.sql import SparkSession
SERVICELOCATIONS = ["https://www.rcsb.org/pdb/rest/customReport",
"https://www1.rcsb.org/pdb/rest/customReport",
"https://www2.rcsb.org/pdb/rest/customReport"]
CURRENT_URL = "?pdbids=*&service=wsfile&format=csv&primaryOnly=1&customReportColumns="
[docs]def get_dataset(columnNames):
"""Returns a dataset with the specified columns for all current PDB entires.
See <a href="https://www.rcsb.org/pdb/results/reportField.do"> for a list
of supported filed names
Parameters
----------
columnNames : str, list
names of columns for the dataset
Returns
-------
dataset
dataset with the specified columns
"""
if type(columnNames) == str:
columnNames = [columnNames]
query = CURRENT_URL + ','.join(columnNames)
return _get_dataset(query, columnNames)
def _get_dataset(query, columnNames):
"""Get dataset using different service locations"""
dataset = None
spark = None
for SERVICELOCATION in SERVICELOCATIONS:
try:
inStream = _post_query(SERVICELOCATION, query)
tmp = tempfile.NamedTemporaryFile(delete=False)
with open(tmp.name, "w") as t:
for l in inStream:
t.writelines(str(l)[2:-3] + '\n')
spark = SparkSession.builder.getOrCreate()
dataset = _read_csv(spark, tmp.name)
except:
continue
break
if dataset is None:
print("ERROR: cannot connect to service location")
return dataset
return _concat_ids(spark, dataset, columnNames)
def _concat_ids(spark, dataset, columnNames):
"""Concatenates structureId and chainId fields into a single key if chainId
field is present
Parameters
----------
spark : :obj:`SparkSession <pyspark.sql.SparkSession>`
dataset : Dataframe
columnNames : list
"""
if "chainId" in dataset.columns:
dataset.createOrReplaceTempView("table")
sql = "SELECT CONCAT(structureId,'.',chainId) as structureChainId," + \
"structureId,chainId,%s" % ','.join(columnNames) + \
" from table"
dataset = spark.sql(sql)
return dataset
def _post_query(service, query):
"""Post PDB Ids and fields in a query string to the RESTful RCSB web service
Parameters
----------
query : str
RESTful query urlopen
service :str
service location
Returns
-------
stream
input stream to response
"""
encodedQuery = urllib.parse.quote(query).encode('utf-8')
url = request.Request(service)
stream = urllib.request.urlopen(url, data=encodedQuery)
return stream
def _read_csv(spark, inputFileName):
'''Reads CSV file into a Spark dataset
Parameters
----------
spark : Spark Context
inputFileName : str
directory path for the input file
'''
dataset = spark.read \
.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load(inputFileName) \
.cache()
return dataset