Source code for nmrstarlib.fileio

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
nmrstarlib.fileio
~~~~~~~~~~~~~~~~~

This module provides routines for reading ``NMR-STAR`` and ``CIF`` 
formatted files from difference kinds of sources:

   * Single ``NMR-STAR`` or ``CIF`` formatted file on a local machine.
   * Directory containing multiple ``NMR-STAR`` or ``CIF`` formatted files.
   * Compressed zip/tar archive of ``NMR-STAR`` or ``CIF`` formatted files.
   * URL address of ``NMR-STAR`` or ``CIF`` formatted file.
   * ``BMRB ID`` of ``NMR-STAR`` or ``PDB ID`` of ``CIF`` formatted file. 
"""

import os
import io
import sys
import zipfile
import tarfile
import bz2
import gzip
import re

from . import nmrstarlib

if sys.version_info.major == 3:
    from urllib.request import urlopen
    from urllib.parse import urlparse
    from urllib.error import HTTPError
else:
    from urllib2 import urlopen
    from urlparse import urlparse
    from urllib2 import HTTPError


[docs]def _generate_filenames(sources): """Generate filenames. :param tuple sources: Sequence of strings representing path to file(s). :return: Path to file(s). :rtype: :py:class:`str` """ for source in sources: if os.path.isdir(source): for path, dirlist, filelist in os.walk(source): for fname in filelist: if nmrstarlib.VERBOSE: print("Processing file: {}".format(os.path.abspath(fname))) if GenericFilePath.is_compressed(fname): if nmrstarlib.VERBOSE: print("Skipping compressed file: {}".format(os.path.abspath(fname))) continue else: yield os.path.join(path, fname) elif os.path.isfile(source): yield source elif GenericFilePath.is_url(source): yield source elif source.isdigit(): try: urlopen(nmrstarlib.BMRB_REST + source) yield nmrstarlib.BMRB_REST + source except HTTPError: urlopen(nmrstarlib.PDB_REST + source + ".cif") yield nmrstarlib.PDB_REST + source + ".cif" elif re.match("[\w\d]{4}", source): yield nmrstarlib.PDB_REST + source + ".cif" else: raise TypeError("Unknown file source.")
[docs]def _generate_handles(filenames): """Open a sequence of filenames one at time producing file objects. The file is closed immediately when proceeding to the next iteration. :param generator filenames: Generator object that yields the path to each file, one at a time. :return: Filehandle to be processed into a :class:`~nmrstarlib.nmrstarlib.StarFile` instance. """ for fname in filenames: if nmrstarlib.VERBOSE: print("Processing file: {}".format(os.path.abspath(fname))) path = GenericFilePath(fname) for filehandle, source in path.open(): yield filehandle, source filehandle.close()
[docs]def read_files(*sources): """Construct a generator that yields :class:`~nmrstarlib.nmrstarlib.StarFile` instances. :param sources: One or more strings representing path to file(s). :return: :class:`~nmrstarlib.nmrstarlib.StarFile` instance(s). :rtype: :class:`~nmrstarlib.nmrstarlib.StarFile` """ filenames = _generate_filenames(sources) filehandles = _generate_handles(filenames) for fh, source in filehandles: starfile = nmrstarlib.StarFile.read(fh, source) yield starfile
[docs]class GenericFilePath(object): """`GenericFilePath` class knows how to open local files or files over URL."""
[docs] def __init__(self, path): """Initialize path. :param str path: String representing a path to local file(s) or valid URL address of file(s). """ self.path = path
[docs] def open(self): """Generator that opens and yields filehandles using appropriate facilities: test if path represents a local file or file over URL, if file is compressed or not. :return: Filehandle to be processed into a :class:`~nmrstarlib.nmrstarlib.StarFile` instance. """ is_url = self.is_url(self.path) compression_type = self.is_compressed(self.path) if not compression_type: if is_url: filehandle = urlopen(self.path) else: filehandle = open(self.path, "r") source = self.path yield filehandle, source filehandle.close() elif compression_type: if is_url: response = urlopen(self.path) path = response.read() response.close() else: path = self.path if compression_type == "zip": ziparchive = zipfile.ZipFile(io.BytesIO(path), "r") if is_url else zipfile.ZipFile(path) for name in ziparchive.infolist(): if not name.filename.endswith("/"): filehandle = ziparchive.open(name) source = self.path + "/" + name.filename yield filehandle, source filehandle.close() elif compression_type in ("tar", "tar.bz2", "tar.gz"): tararchive = tarfile.open(fileobj=io.BytesIO(path)) if is_url else tarfile.open(path) for name in tararchive: if name.isfile(): filehandle = tararchive.extractfile(name) source = self.path + "/" + name.name yield filehandle, source filehandle.close() elif compression_type == "bz2": filehandle = bz2.BZ2File(io.BytesIO(path)) if is_url else bz2.BZ2File(path) source = self.path yield filehandle, source filehandle.close() elif compression_type == "gz": filehandle = gzip.open(io.BytesIO(path)) if is_url else gzip.open(path) source = self.path yield filehandle, source filehandle.close()
[docs] @staticmethod def is_compressed(path): """Test if path represents compressed file(s). :param str path: Path to file(s). :return: String specifying compression type if compressed, "" otherwise. :rtype: :py:class:`str` """ if path.endswith(".zip"): return "zip" elif path.endswith(".tar.gz"): return "tar.gz" elif path.endswith(".tar.bz2"): return "tar.bz2" elif path.endswith(".gz"): return "gz" elif path.endswith(".bz2"): return "bz2" elif path.endswith(".tar"): return "tar" return ""
[docs] @staticmethod def is_url(path): """Test if path represents a valid URL. :param str path: Path to file. :return: True if path is valid url string, False otherwise. :rtype: :py:obj:`True` or :py:obj:`False` """ try: parse_result = urlparse(path) return all((parse_result.scheme, parse_result.netloc, parse_result.path)) except ValueError: return False