Source code for mmtfPyspark.io.mmtfReader

#!/usr/bin/env python
'''mmtfReader.py: Methods for reading and downloading structures in MMTF file
formats. The data are returned as a PythonRDD with the structure id (e.g. PDB ID)
as the key and the structural data as the value.

Supported operations and file formats:
- Read directory of MMTF-Hadoop sequence files in full and reduced representation
- Download MMTF full and reduced representations using web service (mmtf.rcsb.org)
- Read directory of MMTF files (.mmtf, mmtf.gz)

'''
__author__ = "Mars (Shih-Cheng) Huang"
__maintainer__ = "Mars (Shih-Cheng) Huang"
__email__ = "marshuang80@gmail.com"
__version__ = "0.2.0"
__status__ = "Done"

import os
import msgpack
import gzip
from mmtfPyspark.utils import MmtfStructure
from mmtf.api import default_api
from os import path, walk
from pyspark.sql import SparkSession
import urllib

text = "org.apache.hadoop.io.Text"
byteWritable = "org.apache.hadoop.io.BytesWritable"


[docs]def read_full_sequence_file(pdbId=None, fraction=None, seed=123): '''Reads a MMTF-Hadoop Sequence file using the default file location. The default file location is determined by :func:`get_mmtf_full_path() <mmtfPyspark.io.mmtfReader.get_mmtf_full_path>` To download mmtf files: https://mmtf.rcsb.org/download.html Parameters ---------- pdbID : list, optional List of structures to read fraction : float, optional fraction of structure to read seed : int, optional random seed ''' return read_sequence_file(get_mmtf_full_path(), pdbId, fraction, seed)
[docs]def read_reduced_sequence_file(pdbId=None, fraction=None, seed=123): '''Reads a MMTF-Hadoop Sequence file using the default file location. The default file location is determined by :func:`get_mmtf_reduced_path() <mmtfPyspark.io.mmtfReader.get_mmtf_reducedget_mmtf_reduced_path>` To download mmtf files: {https://mmtf.rcsb.org/download.htm} Parameters ---------- pdbID : list, optional List of structures to read fraction : float, optional fraction of structure to read seed : int, optional random seed ''' return read_sequence_file(get_mmtf_reduced_path(), pdbId, fraction, seed)
[docs]def read_sequence_file(path, pdbId=None, fraction=None, seed=123): '''Reads an MMTF Hadoop Sequence File. Can read all files from path, randomly rample a fraction, or a subset based on input list. See <a href="http://mmtf.rcsb.org/download.html"> for file download information</a> Parameters ---------- path : str path to file directory pdbID : list List of structures to read fraction : float fraction of structure to read seed : int random seed Raises ------ Exception file path does not exist ''' if not os.path.exists(path): raise Exception("file path does not exist") spark = SparkSession.builder.getOrCreate() sc = spark.sparkContext infiles = sc.sequenceFile(path, text, byteWritable) # Read in all structures from a directory if (pdbId == None and fraction == None): return infiles.map(_call_sequence_file) # Read in a specified list of pdbIds elif(pdbId != None and fraction == None): pdbIdSet = set(pdbId) return infiles.filter(lambda t: str(t[0]) in pdbIdSet).map(_call_sequence_file) # Read in a random fraction of structures from a directory elif (pdbId == None and fraction != None): return infiles.sample(False, fraction, seed).map(_call_sequence_file) else: raise Exception("Inappropriate combination of parameters")
[docs]def read_mmtf_files(path): '''Read the specified PDB entries from a MMTF file Parameters ---------- path : str Path to MMTF files Returns ------- data structure data as keywork/value pairs ''' if not os.path.exists(path): raise Exception("file path does not exist") spark = SparkSession.builder.getOrCreate() sc = spark.sparkContext return sc.parallelize(_get_files(path)).map(_call_mmtf).filter(lambda t: t != None)
[docs]def download_mmtf_files(pdbIds, reduced=False): '''Download and reads the specified PDB entries using `MMTF web services <http://mmtf.rcsb.org/download.html>`_ with either full or reduced format Parameters ---------- path : str Path to PDB files reduced : bool flag to indicate reduced or full file format Returns ------- data structure data as keywork/value pairs ''' spark = SparkSession.builder.getOrCreate() sc = spark.sparkContext return sc.parallelize(set(pdbIds)) \ .map(lambda t: _get_structure(t, reduced)) \ .filter(lambda t: t is not None)
[docs]def download_full_mmtf_files(pdbIds): '''Download and reads the specified PDB entries in full mmtf format using `MMTF web services <http://mmtf.rcsb.org/download.html>`_ Parameters ---------- path : str Path to PDB files Returns ------- data structure data as keywork/value pairs ''' spark = SparkSession.builder.getOrCreate() sc = spark.sparkContext return sc.parallelize(set(pdbIds)) \ .map(lambda t: _get_structure(t, False)) \ .filter(lambda t: t is not None)
[docs]def download_reduced_mmtf_files(pdbIds): '''Download and reads the specified PDB entries in reduced mmtf format using `MMTF web services <http://mmtf.rcsb.org/download.html>`_ Parameters ---------- path : str Path to PDB files Returns ------- data structure data as keywork/value pairs ''' spark = SparkSession.builder.getOrCreate() sc = spark.sparkContext return sc.parallelize(set(pdbIds)) \ .map(lambda t: _get_structure(t, True)) \ .filter(lambda t: t != None)
def _get_structure(pdbId, reduced): '''Download and decode a list of structure from a list of PDBid Parameters ---------- pdbID : list List of structures to download Returns ------- tuple pdbID and deccoder ''' try: unpack = default_api.get_raw_data_from_url(pdbId, reduced) decoder = MmtfStructure(unpack) return (pdbId, decoder) except urllib.error.HTTPError: print(f"ERROR: {pdbId} is not a valid pdbId") def _call_sequence_file(t): '''Call function for hadoop sequence files''' # TODO: check if all sequence files are gzipped data = default_api.ungzip_data(t[1]) unpack = msgpack.unpackb(data.read(), raw=False) decoder = MmtfStructure(unpack) return (str(t[0]), decoder) def _call_mmtf(f): '''Call function for mmtf files''' if ".mmtf.gz" in f: name = f.split('/')[-1].split('.')[0].upper() data = gzip.open(f, 'rb') unpack = msgpack.unpack(data, raw=False) decoder = MmtfStructure(unpack) return (name, decoder) elif ".mmtf" in f: name = f.split('/')[-1].split('.')[0].upper() unpack = msgpack.unpack(open(f, "rb"), raw=False) decoder = MmtfStructure(unpack) return (name, decoder) def _get_files(user_path): '''Get List of files from path Parameters ---------- user_path : str File path Returns ------- list files in path ''' files = [] for dirpath, dirnames, filenames in walk(user_path): for f in filenames: if path.isdir(f): files += getFiles(f) else: files.append(dirpath + '/' + f) return files
[docs]def get_mmtf_full_path(): '''Returns the path to the full MMTF-Hadoop sequence file. It looks for the environmental variable "MMTF_FULL", if not set, an error message will be shown. Returns ------- str path to the mmtf_full directory ''' if 'MMTF_FULL' in os.environ: print( f"Hadoop Sequence file path: MMTF_FULL={os.environ.get('MMTF_FULL')}") return os.environ.get("MMTF_FULL") else: raise EnvironmentError("Environmental variable 'MMTF_FULL not set'")
[docs]def get_mmtf_reduced_path(): '''Returns the path to the reduced MMTF-Hadoop sequence file. It looks for the environmental variable "MMTF_REDUCED", if not set, an error message will be shown. Returns ------- str path to the mmtf_reduced directory ''' if 'MMTF_REDUCED' in os.environ: print( f"Hadoop Sequence file path: MMTF_REDUCED={os.environ.get('MMTF_REDUCED')}") return os.environ.get("MMTF_REDUCED") else: raise EnvironmentError("Environmental variable 'MMTF_REDUCED not set'")