Source code for mmtfPyspark.io.mmtfWriter
#!/user/bin/env python
'''mmtfWriter.py
Encodes and write MMTF encoded structure data to a Hadoop Sequence File
'''
__author__ = "Mars (Shih-Cheng) Huang"
__maintainer__ = "Mars (Shih-Cheng) Huang"
__email__ = "marshuang80@gmail.com"
__version__ = "0.2.0"
__status__ = "Done"
from mmtf.api.mmtf_writer import MMTFEncoder
from mmtfPyspark.utils import MmtfStructure
from pyspark.sql import SparkSession
import gzip
import msgpack
import os
import base64
[docs]def write_sequence_file(path, structure, compressed=True):
'''Encodes and writes MMTF encoded structure data to a Hadoop Sequnce File
Parameters
----------
path : str
Path to Hadoop file directory)
structure : tuple
structure data to be written
compress : bool
if true, apply gzip compression
'''
# Can't apply first() function on list
if type(structure.first()[1]) == MmtfStructure:
structure = structure.map(lambda s: (s[0], s[1].set_alt_loc_list()))
structure.map(lambda t: (t[0], _to_byte_array(t[1], compressed)))\
.saveAsHadoopFile(path,
"org.apache.hadoop.mapred.SequenceFileOutputFormat",
"org.apache.hadoop.io.Text",
"org.apache.hadoop.io.BytesWritable")
[docs]def write_mmtf_files(path, structure):
'''Encodes and writes MMTF encoded and gzipped structure data to individual .mmtf.gz files.
Parameters
----------
path : str
Path to Hadoop file directory
structure : tuple
structure data to be written
'''
if path[-1] != "/":
path = path + "/"
if not os.path.exists(path):
os.makedirs(path)
structure = structure.map(lambda s: (s[0], s[1].set_alt_loc_list())) \
.map(lambda t: (t[0], _to_byte_array(t[1], False))) \
.foreach(lambda t: gzip.open(path + t[0] + '.mmtf.gz', mode='wb').write(t[1]))
[docs]def to_mmtf_base64(structure):
'''Encodes a mmtfStructure to base64 byte array
Parameters
----------
structure : mmtfStructure
structure to be encoded to base64 byte array
Returns
-------
list
base64 byte array
'''
byteArray = _to_byte_array(structure, compressed=False)
return base64.b64encode(byteArray).decode()
def _to_byte_array(structure, compressed):
'''Returns an MMTF-encoded byte array with optional gzip compression
Returns
-------
list
MMTF encoded and optionally gzipped structure data
'''
if type(structure) == MmtfStructure:
if not structure.alt_loc_set:
structure = structure.set_alt_loc_list()
byte_array = bytearray(msgpack.packb(MMTFEncoder.encode_data(structure), use_bin_type = True))
if compressed:
return gzip.compress(byte_array)
else:
return byte_array