Source code for mmtfPyspark.datasets.drugBankDataset

#!/user/bin/env python
'''drugBankDatset.py

This module provides access to DrugBank containing drug structure and drug target
imformation. These datasets contain identifiers and names for integration with
other data resources.

References
----------
- Drug Bank. https://www.drugbank.ca
- Wishart DS, et al., DrugBank 5.0: a major update to the DrugBank database for 2018.
  Nucleic Acids Res. 2017 Nov 8. https://dx.doi.org/10.1093/nar/gkx1037

'''
__author__ = "Mars (Shih-Cheng) Huang"
__maintainer__ = "Mars (Shih-Cheng) Huang"
__email__ = "marshuang80@gmail.com"
__version__ = "0.2.0"
__status__ = "Done"

import requests
import tempfile
import io
from zipfile import ZipFile
from io import BytesIO
from pyspark.sql import SparkSession

DRUG_GROUP = ['ALL', 'APPROVED', 'EXPERIMENTAL', 'NUTRACEUTICAL', 'ILLICIT'\
              'WITHDRAWN', 'INVESTIGATIONAL']
DRUG_TYPE = ['SMALL_MOLECULE', 'BIOTECH']
BASE_URL = "https://www.drugbank.ca/releases/latest/downloads/"
BUFFER = 2048











[docs]def get_dataset(url, username=None, password=None): '''Downloads a DrugBank dataset Parameters ---------- url : str DrugBank dataset download links username : str, optional DrugBank username <None> password : str, optional DrugBank password <None> Returns ------- dataset DrugBank dataset ''' if username is None and password is None: # get input stream to first zip entry req = requests.get(url) else: # TODO dataset that requires authentication req = requests.get(url, auth=(username, password)) if req.text == 'Invalid Email or password.': raise ValueError('Invalid Email or password.') # Decode and unzip file unzipped = _decode_as_zip_input_stream(req.content) # save data to a temporary file (Dataset csv reader requires a input # file!) tempFileName = _save_temp_file(unzipped) #load temporary CSV file to Spark dataste dataset = _read_csv(tempFileName) dataset = _remove_spaces_from_column_names(dataset) return dataset
def _decode_as_zip_input_stream(content): '''Returns an input stream to the first zip file entry Parameters ---------- content : inputStream inputStream content from request Returns ------- inputStream unzipped InputStream ''' zipfile = ZipFile(BytesIO(content)) return [line.decode() for line \ in zipfile.open(zipfile.namelist()[0]).readlines()] def _save_temp_file(unzipped): '''Saves tabular report as a temporary CSV file Parameters ---------- unzipped : list list of unzipped content Returns ------- str path to the tempfile ''' tempFile = tempfile.NamedTemporaryFile(delete=False) with io.open(tempFile.name, "w", encoding='utf-8') as t: t.writelines(unzipped) return tempFile.name def _read_csv(inputFileName): '''Reads CSV file into Spark dataset Parameters ---------- fileName : str name of the input csv fileName Returns ------- dataset a spark dataset ''' spark = SparkSession.builder.getOrCreate() dataset = spark.read.format("csv") \ .option("header", "true") \ .option("inferSchema", "true") \ .load(inputFileName) return dataset def _remove_spaces_from_column_names(original): '''Remove spaces from column names to ensure compatibility with parquet Parameters ---------- original : dataset the original dataset Returns ------- dataset dataset with columns renamed ''' for existingName in original.columns: newName = existingName.replace(' ','') # TODO: double check dataset "withColumnRenamed" funciton original = original.withColumnRenamed(existingName,newName) return original