#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
nmrstarlib.translator
~~~~~~~~~~~~~~~~~~~~~
This module provides the :class:`~nmrstarlib.translator.Translator` abstract class
and concrete classes: :class:`~nmrstarlib.translator.StarFileToStarFile` for converting between
NMR-STAR/CIF and JSONized NMR-STAR/CIF formats and :class:`~nmrstarlib.translator.StarFileToPeakList`
for converting NMR-STAR formatted file into simulated peak list file.
"""
import itertools
from . import nmrstarlib
from . import fileio
from . import plsimulator
[docs]class Translator(object):
"""Translator abstract class."""
[docs] def __init__(self, from_path, to_path, from_format=None, to_format=None):
"""Translator initializer.
:param str from_path: Path to input file(s).
:param str to_path: Path to output file(s).
:param str from_format: Input format.
:param str to_format: Output format.
"""
self.from_path = from_path
self.to_path = to_path
self.from_format = from_format
self.to_format = to_format
self.from_path_compression = fileio.GenericFilePath.is_compressed(from_path)
self.to_path_compression = fileio.GenericFilePath.is_compressed(to_path)
[docs] def __iter__(self):
"""Abstract iterator must be implemented in a subclass."""
raise NotImplementedError()
[docs]class StarFileToStarFile(Translator):
"""Translator concrete class that can convert between NMR-STAR/CIF and JSONized NMR-STAR/CIF formats."""
file_extension = {"json": ".json",
"nmrstar": ".str",
"cif": ".cif"}
[docs] def __init__(self, from_path, to_path, from_format=None, to_format=None):
"""StarFileToStarFile translator initializer.
:param str from_path: Path to input file(s).
:param str to_path: Path to output file(s).
:param str from_format: Input format: `nmrstar`, `cif`, or `json`.
:param str to_format: Output format: `nmrstar`, `cif`, or `json`.
"""
super(StarFileToStarFile, self).__init__(from_path, to_path, from_format, to_format)
[docs] def __iter__(self):
"""Iterator that yields instances of :class:`~nmrstarlib.nmrstarlib.StarFile` instances.
:return: instance of :class:`~nmrstarlib.nmrstarlib.StarFile` object instance.
:rtype: :class:`~nmrstarlib.nmrstarlib.StarFile`
"""
for starfile in fileio.read_files(self.from_path):
yield starfile
[docs]class StarFileToPeakList(Translator):
"""Translator concrete class that can convert NMR-STAR of JSONized NMR-STAR formatted file
into peak list file."""
file_extension = {"json": ".json",
"sparky": ".txt",
"autoassign": ".pks"}
[docs] def __init__(self, from_path, to_path, from_format, to_format, spectrum_name, plsplit=None, noise_generator=None, nmrstar_version="3"):
"""StarFileToPeakList initializer.
:param str from_path: Path to input file(s).
:param str to_path: Path to output file(s).
:param str from_format: Input format: `nmrstar` or `json`.
:param str to_format: Output format: `json` or `sparky`.
:param str spectrum_name: Name of spectrum from which to simulate peak list.
:param tuple plsplit: How to split peak list in order to account for multiple sources of variance.
:param str nmrstar_version: Version of NMR-STAR format to use for look up chemical shifts loop.
:param noise_generator: Subclasses of :class:`~nmrstarlib.noise.NoiseGenerator` object.
:type noise_generator: :class:`~nmrstarlib.noise.NoiseGenerator`
"""
super(StarFileToPeakList, self).__init__(from_path, to_path, from_format, to_format)
if plsplit is None:
plsplit = (100,)
if all(0 <= i <= 1 for i in plsplit):
plsplit = tuple(i*100 for i in plsplit)
elif all(0 <= i <= 100 for i in plsplit):
plsplit = tuple(i for i in plsplit)
else:
raise ValueError("All percentages must be in interval from 0 to 1 or from 0 % to 100 %.")
if sum(plsplit) == 100:
pass
else:
raise ValueError('"plsplit" percentages must sum up to 100%, current sum is {}%'.format(sum(plsplit)))
if noise_generator is not None:
for param_name, param_values in noise_generator.parameters.items():
if len(param_values) == len(plsplit):
continue
else:
raise ValueError('Length of noise values of "{}" parameter must be equal to length of peak list split.'.format(param_name))
self.plsplit = plsplit
self.noise_generator = noise_generator
self.spectrum = self.create_spectrum(spectrum_name)
self.nmrstar_version = nmrstar_version
[docs] @staticmethod
def create_spectrum(spectrum_name):
"""Initialize spectrum and peak descriptions.
:param str spectrum_name: Name of the spectrum from which peak list will be simulated.
:return: Spectrum object.
:rtype: :class:`~nmrstarlib.plsimulator.Spectrum`
"""
try:
spectrum_description = nmrstarlib.SPECTRUM_DESCRIPTIONS[spectrum_name]
except KeyError:
raise NotImplementedError("Experiment type is not defined.")
spectrum = plsimulator.Spectrum(spectrum_name, spectrum_description["Labels"],
spectrum_description["MinNumberPeaksPerSpinSystem"],
spectrum_description.get("ResonanceLimit", None))
for peak_descr in spectrum_description["PeakDescriptions"]:
spectrum.append(plsimulator.PeakDescription(peak_descr["fraction"], peak_descr["dimensions"]))
return spectrum
[docs] @staticmethod
def create_sequence_sites(chain, seq_site_length):
"""Create sequence sites using sequence ids.
:param dict chain: Chain object that contains chemical shift values and assignment information.
:param int seq_site_length: Length of a single sequence site.
:return: List of sequence sites.
:rtype: :py:class:`list`
"""
seq_ids = sorted(list(chain.keys()), key=int) # make sure that sequence is sorted by sequence id
slices = [itertools.islice(seq_ids, i, None) for i in range(seq_site_length)]
seq_site_ids = list(zip(*slices))
sequence_sites = []
for seq_site_id in seq_site_ids:
seq_site = plsimulator.SequenceSite(chain[seq_id] for seq_id in seq_site_id)
if seq_site.is_sequential():
sequence_sites.append(seq_site)
else:
continue
return sequence_sites
[docs] @staticmethod
def calculate_intervals(chunk_sizes):
"""Calculate intervals for a given chunk sizes.
:param list chunk_sizes: List of chunk sizes.
:return: Tuple of intervals.
:rtype: :py:class:`tuple`
"""
start_indexes = [sum(chunk_sizes[:i]) for i in range(0, len(chunk_sizes))]
end_indexes = [sum(chunk_sizes[:i+1]) for i in range(0, len(chunk_sizes))]
return tuple(zip(start_indexes, end_indexes))
[docs] def split_by_percent(self, spin_systems_list):
"""Split list of spin systems by specified percentages.
:param list spin_systems_list: List of spin systems.
:return: List of spin systems divided into sub-lists corresponding to specified split percentages.
:rtype: :py:class:`list`
"""
chunk_sizes = [int((i*len(spin_systems_list))/100) for i in self.plsplit]
if sum(chunk_sizes) < len(spin_systems_list):
difference = len(spin_systems_list) - sum(chunk_sizes)
chunk_sizes[chunk_sizes.index(min(chunk_sizes))] += difference
assert sum(chunk_sizes) == len(spin_systems_list), \
"sum of chunk sizes must be equal to spin systems list length."
intervals = self.calculate_intervals(chunk_sizes)
chunks_of_spin_systems_by_percentage = [itertools.islice(spin_systems_list, *interval) for interval in intervals]
return chunks_of_spin_systems_by_percentage
[docs] def create_peaklist(self, spectrum, chain, chain_idx, source):
"""Create peak list file.
:param spectrum: Spectrum object instance.
:type spectrum: :class:`~nmrstarlib.plsimulator.Spectrum`
:param dict chain: Chain object that contains chemical shift values and assignment information.
:param int chain_idx: Protein chain index.
:param str source: :class:`~nmrstarlib.nmrstarlib.StarFile` source.
:return: Peak list object.
:rtype: :class:`~nmrstarlib.plsimulator.PeakList`
"""
sequence_sites = self.create_sequence_sites(chain, spectrum.seq_site_length)
spin_systems = []
peaklist = plsimulator.PeakList(spectrum.name, spectrum.labels, source, chain_idx)
for seq_site in sequence_sites:
spin_system = plsimulator.SpinSystem()
for template in spectrum.peak_templates:
peak = plsimulator.Peak(template.dimension_labels)
for dim in template:
chemshift = seq_site[dim.position].get(dim.label, None)
assignment = "{}{}{}".format(seq_site[dim.position]["AA3Code"],
seq_site[dim.position]["Seq_ID"],
dim.label)
if chemshift and assignment:
peak_dim = plsimulator.Dimension(dim.label, dim.position, assignment, float(chemshift))
peak.append(peak_dim)
else:
continue
if len(peak) == len(template):
spin_system.append(peak)
peaklist.append(peak)
else:
continue
spin_systems.append(spin_system)
if all(len(i) < spectrum.min_spin_system_peaks for i in spin_systems):
return None
if self.noise_generator is not None:
spin_systems_chunks = self.split_by_percent(spin_systems)
for split_idx, chunk in enumerate(spin_systems_chunks):
for spin_system in chunk:
for peak in spin_system:
peak.apply_noise(self.noise_generator, split_idx)
return peaklist
[docs] def __iter__(self):
"""Iterator that yields instances of :class:`~nmrstarlib.plsimulator.PeakList` instances.
:return: instance of :class:`~nmrstarlib.plsimulator.PeakList` object instance.
:rtype: :class:`~nmrstarlib.plsimulator.PeakList`
"""
for starfile in fileio.read_files(self.from_path):
chains = starfile.chem_shifts_by_residue(amino_acids_and_atoms=self.spectrum.amino_acids_and_atoms,
nmrstar_version=self.nmrstar_version)
for chain_idx, chain in enumerate(chains):
peaklist = self.create_peaklist(self.spectrum, chain, chain_idx, starfile.source)
if peaklist:
yield peaklist
else:
continue