"""Module containing tools for reading alignment files in various formats.
Todo:
* FASTA reader?
"""
import csv
import json
import os
import numpy as np
from lmpy import Matrix
from biotaphy.common.sequence import Sequence
# .............................................................................
[docs]class AlignmentIOError(Exception):
"""Wrapper class for alignment errors."""
pass
# .............................................................................
[docs]def create_sequence_list_from_dict(values_dict):
"""Creates a list of sequences from a dictionary.
Args:
values_dict (dict) : A dictionary of taxon name keys and a list of
values for each value.
Note:
* The dictionary should have structure::
{
"{taxon_name}" : [{values}]
}
Returns:
list: A list of Sequence objects and None for headers.
Raises:
AlignmentIOError: If a dictionary value is not a list.
"""
headers = None
sequence_list = []
for name, values in values_dict.items():
if not isinstance(values, list):
raise AlignmentIOError('Values must be a list')
seq = Sequence(name=name)
seq.set_cont_values(values)
sequence_list.append(seq)
return sequence_list, headers
# .............................................................................
[docs]def get_character_matrix_from_sequences_list(sequences, var_headers=None):
"""Converts a list of sequences into a character matrix.
Args:
sequences (:obj:`list` of :obj:`Sequence`): A list of Sequence objects
to be converted.
var_headers (:obj:`list` of headers, optional): If provided, uses these
as variable headers for the columns in the matrix.
Returns:
Matrix: A matrix of sequence data.
"""
if var_headers is not None:
col_headers = var_headers
else:
col_headers = [
'Column {}'.format(i) for i in range(
len(sequences[0].cont_values))]
data = np.zeros((len(sequences), len(col_headers)), dtype=float)
row_headers = []
i = 0
for seq in sequences:
row_headers.append(seq.name)
data[i] = np.array(seq.cont_values)
i += 1
return Matrix(data, headers={'0': row_headers, '1': col_headers})
# .............................................................................
[docs]def load_alignment_from_filename(filename):
"""Attempts to load an alignment from a file path by guessing schema.
Args:
filename (str): The file location containing the alignment
Raises:
RuntimeError: Raised with the method needed to load the alignment
cannot be determined.
Returns:
tuple: Containing a list of sequences and headers
"""
_, ext = os.path.splitext(filename)
if ext == '.csv':
load_method = read_csv_alignment_flo
elif ext == '.json':
load_method = read_json_alignment_flo
elif ext == '.phylip':
load_method = read_phylip_alignment_flo
elif ext == '.tbl':
load_method = read_table_alignment_flo
else:
raise RuntimeError(
'Cannot determine load method for {} -- extension {}'.format(
filename, ext))
with open(filename) as align_file:
ret = load_method(align_file)
try:
sequences, headers = ret
except Exception:
sequences = ret
headers = None
return sequences, headers
# .............................................................................
[docs]def read_csv_alignment_flo(csv_flo):
"""Reads a CSV file-like object and return a list of sequences and headers.
Args:
csv_flo (file-like): A file-like object with CSV alignment data.
Returns:
A list of Sequence objects and headers.
Raises:
AlignmentIOError: If the number of columns is inconsistent across the
sequences.
"""
headers = None
sequence_list = []
has_header = csv.Sniffer().has_header(csv_flo.readline())
csv_flo.seek(0)
num_parts = None
for line in csv_flo:
parts = line.strip().split(',')
if num_parts is None:
num_parts = len(parts)
else:
if len(parts) != num_parts:
raise AlignmentIOError('Number of columns is inconsistent')
if has_header and headers is None:
headers = parts[1:]
else:
name = parts[0]
vals = [float(i) for i in parts[1:]]
seq = Sequence(name=name)
seq.set_cont_values(vals)
sequence_list.append(seq)
return sequence_list, headers
# .............................................................................
[docs]def read_json_alignment_flo(json_flo):
"""Read a JSON file-like object and return a list of sequences and headers.
Args:
json_flo (file-like): A file-like object with JSON alignment data.
Note:
* File should have structure::
{
"headers" : [{header_names}],
"values" : [
{
"name" : "{taxon_name}",
"values" : [{values}]
}
]
}
Returns:
A list of Sequence objects and headers.
Raises:
AlignmentIOError: If headers are provided but they are not a list.
"""
json_vals = json.load(json_flo)
if 'headers' in json_vals.keys():
headers = json_vals['headers']
if not isinstance(headers, list):
raise AlignmentIOError(
'If headers are provided, they must be a list')
else:
headers = None
sequence_list = []
for val_dict in json_vals['values']:
name = val_dict['name']
vals = [float(v) for v in val_dict['values']]
seq = Sequence(name=name)
seq.set_cont_values(vals)
sequence_list.append(seq)
return sequence_list, headers
# .............................................................................
[docs]def read_phylip_alignment_flo(phylip_flo):
"""Reads a phylip alignment file-like object and return the sequences.
Args:
phylip_flo (file-like): The phylip file-like object.
Note:
* We assume that the phylip files are extended and not strict (in terms
of how many characters for taxon names).
* The phylip file is in the format::
numoftaxa numofsites
seqlabel sequence
seqlabel sequence
Returns:
A list of Sequence objects.
Raises:
AlignmentIOError: If there is a problem creating sequences.
"""
seqlist = []
# First line is number of taxa and number of sites
i = phylip_flo.readline()
num_taxa, num_sites = [int(v) for v in i.strip().split()]
cnt = 0
for i in phylip_flo:
cnt += 1
parts = i.strip().split()
if len(parts) != num_sites + 1:
# Incorrect number of sites
raise AlignmentIOError('Incorrect number of sites for row: {}'.format(cnt))
seqlist.append(Sequence(name=parts[0], seq=parts[1:]))
if cnt != num_taxa: # pragma: no cover
raise AlignmentIOError(
'Incorrect number of taxa. Reported {} but found {}'.format(num_taxa, cnt)
)
return seqlist
# .............................................................................
[docs]def read_table_alignment_flo(table_flo):
"""Reads a table from a file-like object.
Args:
table_flo (file-like): A file-like object containing table data.
Returns:
A list of Sequence objects.
Raises:
AlignmentIOError: If there is a problem creating sequences.
"""
seqlist = []
for i in table_flo:
if len(i) > 2:
try:
spls = i.strip().split("\t")
name = spls[0].strip()
seq = spls[1].strip().split(" ")
seq = [float(j) for j in seq]
tseq = Sequence(name=name)
tseq.set_cont_values(seq)
seqlist.append(tseq)
except Exception as e:
raise AlignmentIOError(str(e))
return seqlist