Module pipelines

Pre-set pipelines to infer trees and place and label env sequences

QueryLabeller

Place queries onto reference tree and assign function and taxonomy

Source code in src/metatag/pipelines.py
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
class QueryLabeller:
    """
    Place queries onto reference tree and assign function and taxonomy
    """

    def __init__(
        self,
        input_query: Path,
        reference_alignment: Path,
        reference_tree: Path,
        reference_labels: Path,
        tree_model: str,
        tree_clusters: Path = None,
        tree_cluster_scores: Path = None,
        tree_cluster_score_threshold: float = None,
        alignment_method: str = "papara",
        output_directory: Path = None,
        maximum_placement_distance: float = 1.0,
        distance_measure: str = "pendant_diameter_ratio",
        minimum_placement_lwr: float = 0.8,
        logfile: Path = None,
    ):
        """Place queries onto reference tree and assign function and taxonomy

        Args:
            input_query (Path): path to query fasta file
            reference_alignment (Path): path to reference alignment in FASTA format
            reference_tree (Path): path to reference tree in Newick format
            tree_model (str): substitution model to use for tree inference
            tree_clusters (Path, optional): path to tsv file containing tree cluster
                definitions. Defaults to None.
            tree_cluster_scores (Path, optional): path to tsv file containing tree
                cluster scores. Defaults to None.
            reference_labels (Path, optional): path to reference labels file in pickle
                format. Defaults to None.
            alignment_method (str, optional): choose aligner: "papara" or "hmmalign".
                Defaults to "papara".
            output_directory (Path, optional): path to output directory.
                Defaults to None.
            maximum_placement_distance (float, optional): maximum distance of placed
                sequences (distance measure below). Defaults to 1.0.
            distance_measure (str, optional): choose distance measure for placements:
                "pendant_diameter_ratio",
                "pendant_distal_ratio" or "pendant". Defaults to "pendant_diameter_ratio".
            minimum_placement_lwr (float, optional): cutoff value for the LWR of
                placements. Defaults to 0.8.
            logfile (Path, optional): path to logfile. Defaults to None.
        """
        self.input_query = Path(input_query).resolve()
        self.reference_alignment = Path(reference_alignment).resolve()
        self.reference_tree = Path(reference_tree).resolve()
        self.tree_model = (
            Path(tree_model).resolve() if Path(tree_model).is_file() else tree_model
        )
        if tree_clusters is not None:
            self.tree_clusters = Path(tree_clusters).resolve()
        else:
            self.tree_clusters = None
        if tree_cluster_scores is not None:
            self.tree_cluster_scores = Path(tree_cluster_scores).resolve()
        else:
            self.tree_cluster_scores = None
        self.tree_cluster_score_threshold = tree_cluster_score_threshold
        self.alignment_method = alignment_method
        self.reference_labels = reference_labels
        if output_directory is None:
            self.output_directory = self.input_query.parent / "placement_results"
            self.output_directory.mkdir(exist_ok=False)
        else:
            self.output_directory = Path(output_directory).resolve()
        self.maximum_placement_distance = maximum_placement_distance
        self.distance_measure = distance_measure
        self.minimum_placement_lwr = minimum_placement_lwr
        self.out_cleaned_query = Path(
            self.output_directory / "query_cleaned.faa"
        ).resolve()
        self.out_query_labels = self.output_directory / "query_cleaned_id_dict.pickle"
        self._place_outdir = self.output_directory / "placements"
        self._place_outdir.mkdir(parents=True, exist_ok=True)

        self._assign_outdir = self.output_directory / "assignments"
        self._assign_outdir.mkdir(parents=True, exist_ok=True)

        self._count_outdir = self.output_directory / "counts"
        self._count_outdir.mkdir(parents=True, exist_ok=True)

        self._jplace = self._filtered_jplace = self._place_outdir / "epa_result.jplace"
        if (self.maximum_placement_distance is None) and (
            self.minimum_placement_lwr is None
        ):
            self._filtered_jplace = self._place_outdir / "epa_result.jplace"
        elif (self.maximum_placement_distance is None) and (
            self.minimum_placement_lwr is not None
        ):
            self._filtered_jplace = (
                self._place_outdir / "epa_result_lwr_filtered.jplace"
            )
        elif (self.maximum_placement_distance is not None) and (
            self.minimum_placement_lwr is None
        ):
            self._filtered_jplace = (
                self._place_outdir / "epa_result_distance_filtered.jplace"
            )
        else:
            self._filtered_jplace = (
                self._place_outdir / "epa_result_distance_filtered_lwr_filtered.jplace"
            )

        self._out_placements_tree = self._place_outdir / "epa_result.newick"
        self._out_taxtable = self._assign_outdir / "placed_tax_assignments.tsv"
        self._logfile = logfile

    @property
    def place_directory(self) -> Path:
        """Path to directory with placement results."""
        return self._place_outdir

    @property
    def assign_directory(self) -> Path:
        """Path to directory with assignment results."""
        return self._assign_outdir

    @property
    def count_directory(self) -> Path:
        """Path to directory with count results."""
        return self._count_outdir

    @property
    def jplace(self) -> Path:
        """Path to output file with placements in jplace format."""
        return self._filtered_jplace

    @property
    def placements_tree(self) -> Path:
        """Path to output file with placements in Newick format."""
        return self._out_placements_tree

    @property
    def taxtable(self) -> Path:
        """Path to output file with taxonomic assignments."""
        return self._out_taxtable

    @property
    def query_labels(self) -> Path:
        """Path to output file with query labels."""
        return self.out_query_labels

    @property
    def logfile(self) -> Path:
        """Path to logfile."""
        return self._logfile

    def run(self) -> None:
        """Run pipeline to annotate query sequences through evolutionary placement."""
        preprocess_args = CommandArgs(
            data=self.input_query,
            outfile=self.out_cleaned_query,
            translate=True,
            dna=False,
            relabel=True,
            idprefix="query_",
            export_dup=True,
            logfile=self._logfile,
        )
        preprocess.run(preprocess_args)

        place_args = CommandArgs(
            aln=self.reference_alignment,
            tree=self.reference_tree,
            query=self.out_cleaned_query,
            outdir=self._place_outdir,
            aln_method=self.alignment_method,
            tree_model=self.tree_model,
            logfile=self._logfile,
        )
        placesequences.run(place_args)

        assign_args = CommandArgs(
            jplace=self._jplace,
            labels=self.reference_labels,
            query_labels=[self.out_query_labels],
            ref_clusters=self.tree_clusters,
            ref_cluster_scores=self.tree_cluster_scores,
            outgroup=None,
            prefix="placed_tax_",
            outdir=self._assign_outdir,
            max_distance=self.maximum_placement_distance,
            distance_measure=self.distance_measure,
            minimum_lwr=self.minimum_placement_lwr,
            duplicated_query_ids=None,
            taxofile=None,
            logfile=self._logfile,
        )
        labelplacements.run(assign_args)

        count_args = CommandArgs(
            taxtable=self._out_taxtable,
            taxlevels=["genus", "family", "order", "class", "phylum"],
            cluster_ids=None,
            score_threshold=self.tree_cluster_score_threshold,
            outdir=self._count_outdir,
            outprefix=None,
            export_right_queries=True,
            logfile=self._logfile,
        )
        countplacements.run(count_args)

        relabel_args = CommandArgs(
            tree=self._out_placements_tree,
            labels=self.reference_labels + [self.out_query_labels],
            labelprefixes=["ref_", "query_"],
            taxonomy=True,
            aln=None,
            outdir=None,
            taxofile=None,
            logfile=self._logfile,
        )
        relabeltree.run(relabel_args)

assign_directory: Path property

Path to directory with assignment results.

count_directory: Path property

Path to directory with count results.

jplace: Path property

Path to output file with placements in jplace format.

logfile: Path property

Path to logfile.

place_directory: Path property

Path to directory with placement results.

placements_tree: Path property

Path to output file with placements in Newick format.

query_labels: Path property

Path to output file with query labels.

taxtable: Path property

Path to output file with taxonomic assignments.

__init__(input_query, reference_alignment, reference_tree, reference_labels, tree_model, tree_clusters=None, tree_cluster_scores=None, tree_cluster_score_threshold=None, alignment_method='papara', output_directory=None, maximum_placement_distance=1.0, distance_measure='pendant_diameter_ratio', minimum_placement_lwr=0.8, logfile=None)

Place queries onto reference tree and assign function and taxonomy

Parameters:

Name Type Description Default
input_query Path

path to query fasta file

required
reference_alignment Path

path to reference alignment in FASTA format

required
reference_tree Path

path to reference tree in Newick format

required
tree_model str

substitution model to use for tree inference

required
tree_clusters Path

path to tsv file containing tree cluster definitions. Defaults to None.

None
tree_cluster_scores Path

path to tsv file containing tree cluster scores. Defaults to None.

None
reference_labels Path

path to reference labels file in pickle format. Defaults to None.

required
alignment_method str

choose aligner: "papara" or "hmmalign". Defaults to "papara".

'papara'
output_directory Path

path to output directory. Defaults to None.

None
maximum_placement_distance float

maximum distance of placed sequences (distance measure below). Defaults to 1.0.

1.0
distance_measure str

choose distance measure for placements: "pendant_diameter_ratio", "pendant_distal_ratio" or "pendant". Defaults to "pendant_diameter_ratio".

'pendant_diameter_ratio'
minimum_placement_lwr float

cutoff value for the LWR of placements. Defaults to 0.8.

0.8
logfile Path

path to logfile. Defaults to None.

None
Source code in src/metatag/pipelines.py
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
def __init__(
    self,
    input_query: Path,
    reference_alignment: Path,
    reference_tree: Path,
    reference_labels: Path,
    tree_model: str,
    tree_clusters: Path = None,
    tree_cluster_scores: Path = None,
    tree_cluster_score_threshold: float = None,
    alignment_method: str = "papara",
    output_directory: Path = None,
    maximum_placement_distance: float = 1.0,
    distance_measure: str = "pendant_diameter_ratio",
    minimum_placement_lwr: float = 0.8,
    logfile: Path = None,
):
    """Place queries onto reference tree and assign function and taxonomy

    Args:
        input_query (Path): path to query fasta file
        reference_alignment (Path): path to reference alignment in FASTA format
        reference_tree (Path): path to reference tree in Newick format
        tree_model (str): substitution model to use for tree inference
        tree_clusters (Path, optional): path to tsv file containing tree cluster
            definitions. Defaults to None.
        tree_cluster_scores (Path, optional): path to tsv file containing tree
            cluster scores. Defaults to None.
        reference_labels (Path, optional): path to reference labels file in pickle
            format. Defaults to None.
        alignment_method (str, optional): choose aligner: "papara" or "hmmalign".
            Defaults to "papara".
        output_directory (Path, optional): path to output directory.
            Defaults to None.
        maximum_placement_distance (float, optional): maximum distance of placed
            sequences (distance measure below). Defaults to 1.0.
        distance_measure (str, optional): choose distance measure for placements:
            "pendant_diameter_ratio",
            "pendant_distal_ratio" or "pendant". Defaults to "pendant_diameter_ratio".
        minimum_placement_lwr (float, optional): cutoff value for the LWR of
            placements. Defaults to 0.8.
        logfile (Path, optional): path to logfile. Defaults to None.
    """
    self.input_query = Path(input_query).resolve()
    self.reference_alignment = Path(reference_alignment).resolve()
    self.reference_tree = Path(reference_tree).resolve()
    self.tree_model = (
        Path(tree_model).resolve() if Path(tree_model).is_file() else tree_model
    )
    if tree_clusters is not None:
        self.tree_clusters = Path(tree_clusters).resolve()
    else:
        self.tree_clusters = None
    if tree_cluster_scores is not None:
        self.tree_cluster_scores = Path(tree_cluster_scores).resolve()
    else:
        self.tree_cluster_scores = None
    self.tree_cluster_score_threshold = tree_cluster_score_threshold
    self.alignment_method = alignment_method
    self.reference_labels = reference_labels
    if output_directory is None:
        self.output_directory = self.input_query.parent / "placement_results"
        self.output_directory.mkdir(exist_ok=False)
    else:
        self.output_directory = Path(output_directory).resolve()
    self.maximum_placement_distance = maximum_placement_distance
    self.distance_measure = distance_measure
    self.minimum_placement_lwr = minimum_placement_lwr
    self.out_cleaned_query = Path(
        self.output_directory / "query_cleaned.faa"
    ).resolve()
    self.out_query_labels = self.output_directory / "query_cleaned_id_dict.pickle"
    self._place_outdir = self.output_directory / "placements"
    self._place_outdir.mkdir(parents=True, exist_ok=True)

    self._assign_outdir = self.output_directory / "assignments"
    self._assign_outdir.mkdir(parents=True, exist_ok=True)

    self._count_outdir = self.output_directory / "counts"
    self._count_outdir.mkdir(parents=True, exist_ok=True)

    self._jplace = self._filtered_jplace = self._place_outdir / "epa_result.jplace"
    if (self.maximum_placement_distance is None) and (
        self.minimum_placement_lwr is None
    ):
        self._filtered_jplace = self._place_outdir / "epa_result.jplace"
    elif (self.maximum_placement_distance is None) and (
        self.minimum_placement_lwr is not None
    ):
        self._filtered_jplace = (
            self._place_outdir / "epa_result_lwr_filtered.jplace"
        )
    elif (self.maximum_placement_distance is not None) and (
        self.minimum_placement_lwr is None
    ):
        self._filtered_jplace = (
            self._place_outdir / "epa_result_distance_filtered.jplace"
        )
    else:
        self._filtered_jplace = (
            self._place_outdir / "epa_result_distance_filtered_lwr_filtered.jplace"
        )

    self._out_placements_tree = self._place_outdir / "epa_result.newick"
    self._out_taxtable = self._assign_outdir / "placed_tax_assignments.tsv"
    self._logfile = logfile

run()

Run pipeline to annotate query sequences through evolutionary placement.

Source code in src/metatag/pipelines.py
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
def run(self) -> None:
    """Run pipeline to annotate query sequences through evolutionary placement."""
    preprocess_args = CommandArgs(
        data=self.input_query,
        outfile=self.out_cleaned_query,
        translate=True,
        dna=False,
        relabel=True,
        idprefix="query_",
        export_dup=True,
        logfile=self._logfile,
    )
    preprocess.run(preprocess_args)

    place_args = CommandArgs(
        aln=self.reference_alignment,
        tree=self.reference_tree,
        query=self.out_cleaned_query,
        outdir=self._place_outdir,
        aln_method=self.alignment_method,
        tree_model=self.tree_model,
        logfile=self._logfile,
    )
    placesequences.run(place_args)

    assign_args = CommandArgs(
        jplace=self._jplace,
        labels=self.reference_labels,
        query_labels=[self.out_query_labels],
        ref_clusters=self.tree_clusters,
        ref_cluster_scores=self.tree_cluster_scores,
        outgroup=None,
        prefix="placed_tax_",
        outdir=self._assign_outdir,
        max_distance=self.maximum_placement_distance,
        distance_measure=self.distance_measure,
        minimum_lwr=self.minimum_placement_lwr,
        duplicated_query_ids=None,
        taxofile=None,
        logfile=self._logfile,
    )
    labelplacements.run(assign_args)

    count_args = CommandArgs(
        taxtable=self._out_taxtable,
        taxlevels=["genus", "family", "order", "class", "phylum"],
        cluster_ids=None,
        score_threshold=self.tree_cluster_score_threshold,
        outdir=self._count_outdir,
        outprefix=None,
        export_right_queries=True,
        logfile=self._logfile,
    )
    countplacements.run(count_args)

    relabel_args = CommandArgs(
        tree=self._out_placements_tree,
        labels=self.reference_labels + [self.out_query_labels],
        labelprefixes=["ref_", "query_"],
        taxonomy=True,
        aln=None,
        outdir=None,
        taxofile=None,
        logfile=self._logfile,
    )
    relabeltree.run(relabel_args)

QueryProcessor

Preprocess query sequences: remove duplicates, translate and relabel if needed, prefilter sequences by HMM.

Source code in src/metatag/pipelines.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
class QueryProcessor:
    """
    Preprocess query sequences: remove duplicates, translate
    and relabel if needed, prefilter sequences by HMM.
    """

    def __init__(
        self,
        input_query: Path,
        hmms: list[Path] = None,
        minimum_sequence_length: int = 30,
        maximum_sequence_length: int = None,
        idprefix: str = "query_",
        relabel: bool = False,
        translate: bool = False,
        export_dup: bool = False,
        output_directory: Path = None,
        hmmsearch_args: str = None,
        logfile: Path = None,
    ):
        """Preprocess query sequences

        Args:
            input_query (Path): path to query sequences
            hmms (list[Path], optional): list of paths to HMMs to prefilter query sequences. Defaults to None.
            minimum_sequence_length (int, optional): minimum length for sequences to be kept. Defaults to 30.
            maximum_sequence_length (int, optional): maximum length for sequences to be kept. Defaults to None.
            idprefix (str, optional): prefix for sequence IDs. Defaults to "query_".
            relabel (bool, optional): relabel sequences to short IDs. Defaults to False.
            translate (bool, optional): translate DNA sequences to peptide. Defaults to False.
            export_dup (bool, optional): export duplicated sequences. Defaults to False.
            output_directory (Path, optional): path to output directory. Defaults to None.
            logfile (Path, optional): path to logfile. Defaults to None.
        """
        self.input_query = Path(input_query).resolve()
        if hmms is not None:
            self.hmms = [Path(hmm).resolve() for hmm in hmms]
        else:
            self.hmms = None
        self.hmmsearch_args = hmmsearch_args
        self.minimum_sequence_length = minimum_sequence_length
        self.maximum_sequence_length = maximum_sequence_length
        self.idprefix = idprefix
        self.relabel = relabel
        self.translate = translate
        self.export_dup = export_dup
        if output_directory is None:
            self._output_directory = self.input_query.parent
        else:
            self._output_directory = Path(output_directory).resolve()
        self._logfile = logfile
        self._out_filtered_query = (
            self._output_directory
            / f"{self.input_query.stem}_filtered{self.input_query.suffix}"
        )

    @property
    def filtered_query(self) -> Path:
        """Get path to filtered query."""
        return self._out_filtered_query

    @property
    def output_directory(self) -> Path:
        """Get output directory."""
        return self._output_directory

    @property
    def logfile(self) -> Path:
        """Get path to log file."""
        return self._logfile

    def run(self) -> None:
        """Run pipeline to preprocess query sequences"""
        preprocess_args = CommandArgs(
            data=self.input_query,
            outfile=self._out_filtered_query,
            translate=self.translate,
            dna=False,
            export_dup=self.export_dup,
            relabel=self.relabel,
            idprefix=self.idprefix,
            logfile=self._logfile,
        )
        preprocess.run(preprocess_args)

        if self.hmms is not None:
            database_args = CommandArgs(
                data=self._out_filtered_query,
                outdir=self._output_directory,
                outfile=self._out_filtered_query,
                hmms=self.hmms,
                prefix="",
                relabel=False,
                nocdhit=True,
                noduplicates=False,
                relabel_prefixes=None,
                maxsizes=None,
                minseqlength=self.minimum_sequence_length,
                maxseqlength=self.maximum_sequence_length,
                hmmsearch_args=self.hmmsearch_args,
                logfile=self._logfile,
            )
            makedatabase.run(database_args)

filtered_query: Path property

Get path to filtered query.

logfile: Path property

Get path to log file.

output_directory: Path property

Get output directory.

__init__(input_query, hmms=None, minimum_sequence_length=30, maximum_sequence_length=None, idprefix='query_', relabel=False, translate=False, export_dup=False, output_directory=None, hmmsearch_args=None, logfile=None)

Preprocess query sequences

Parameters:

Name Type Description Default
input_query Path

path to query sequences

required
hmms list[Path]

list of paths to HMMs to prefilter query sequences. Defaults to None.

None
minimum_sequence_length int

minimum length for sequences to be kept. Defaults to 30.

30
maximum_sequence_length int

maximum length for sequences to be kept. Defaults to None.

None
idprefix str

prefix for sequence IDs. Defaults to "query_".

'query_'
relabel bool

relabel sequences to short IDs. Defaults to False.

False
translate bool

translate DNA sequences to peptide. Defaults to False.

False
export_dup bool

export duplicated sequences. Defaults to False.

False
output_directory Path

path to output directory. Defaults to None.

None
logfile Path

path to logfile. Defaults to None.

None
Source code in src/metatag/pipelines.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
def __init__(
    self,
    input_query: Path,
    hmms: list[Path] = None,
    minimum_sequence_length: int = 30,
    maximum_sequence_length: int = None,
    idprefix: str = "query_",
    relabel: bool = False,
    translate: bool = False,
    export_dup: bool = False,
    output_directory: Path = None,
    hmmsearch_args: str = None,
    logfile: Path = None,
):
    """Preprocess query sequences

    Args:
        input_query (Path): path to query sequences
        hmms (list[Path], optional): list of paths to HMMs to prefilter query sequences. Defaults to None.
        minimum_sequence_length (int, optional): minimum length for sequences to be kept. Defaults to 30.
        maximum_sequence_length (int, optional): maximum length for sequences to be kept. Defaults to None.
        idprefix (str, optional): prefix for sequence IDs. Defaults to "query_".
        relabel (bool, optional): relabel sequences to short IDs. Defaults to False.
        translate (bool, optional): translate DNA sequences to peptide. Defaults to False.
        export_dup (bool, optional): export duplicated sequences. Defaults to False.
        output_directory (Path, optional): path to output directory. Defaults to None.
        logfile (Path, optional): path to logfile. Defaults to None.
    """
    self.input_query = Path(input_query).resolve()
    if hmms is not None:
        self.hmms = [Path(hmm).resolve() for hmm in hmms]
    else:
        self.hmms = None
    self.hmmsearch_args = hmmsearch_args
    self.minimum_sequence_length = minimum_sequence_length
    self.maximum_sequence_length = maximum_sequence_length
    self.idprefix = idprefix
    self.relabel = relabel
    self.translate = translate
    self.export_dup = export_dup
    if output_directory is None:
        self._output_directory = self.input_query.parent
    else:
        self._output_directory = Path(output_directory).resolve()
    self._logfile = logfile
    self._out_filtered_query = (
        self._output_directory
        / f"{self.input_query.stem}_filtered{self.input_query.suffix}"
    )

run()

Run pipeline to preprocess query sequences

Source code in src/metatag/pipelines.py
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
def run(self) -> None:
    """Run pipeline to preprocess query sequences"""
    preprocess_args = CommandArgs(
        data=self.input_query,
        outfile=self._out_filtered_query,
        translate=self.translate,
        dna=False,
        export_dup=self.export_dup,
        relabel=self.relabel,
        idprefix=self.idprefix,
        logfile=self._logfile,
    )
    preprocess.run(preprocess_args)

    if self.hmms is not None:
        database_args = CommandArgs(
            data=self._out_filtered_query,
            outdir=self._output_directory,
            outfile=self._out_filtered_query,
            hmms=self.hmms,
            prefix="",
            relabel=False,
            nocdhit=True,
            noduplicates=False,
            relabel_prefixes=None,
            maxsizes=None,
            minseqlength=self.minimum_sequence_length,
            maxseqlength=self.maximum_sequence_length,
            hmmsearch_args=self.hmmsearch_args,
            logfile=self._logfile,
        )
        makedatabase.run(database_args)

ReferenceTreeBuilder

Reconstruct reference phylogenetic tree from sequence database and hmms

Source code in src/metatag/pipelines.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
class ReferenceTreeBuilder:
    """
    Reconstruct reference phylogenetic tree from sequence database and hmms
    """

    def __init__(
        self,
        input_database: Path,
        hmms: list[Path],
        maximum_hmm_reference_sizes: list[int] = None,
        minimum_sequence_length: int = 30,
        maximum_sequence_length: int = None,
        output_directory: Path = None,
        translate: bool = False,
        relabel: bool = True,
        remove_duplicates: bool = True,
        relabel_prefixes: list[str] = None,
        hmmsearch_args: str = None,
        tree_method: str = "fasttree",
        tree_model: str = "iqtest",
        msa_method: str = "muscle",
        skip_preprocess: bool = False,
        logfile: Path = None,
    ):
        """Reconstruct reference phylogenetic tree from sequence database and hmms

        Args:
            input_database (Path): path to input sequence database in FASTA format
            hmms (list[Path]): list of paths to HMM files
            max_hmm_reference_size (list[int], optional): list of maximum database size for each HMM. Defaults to None.
            min_sequence_length (int, optional): minimum length of sequences in final database. Defaults to 10.
            max_sequence_length (int, optional): maximum length of sequencesin final database. Defaults to 1000.
            output_directory (Path, optional): path to output directory. Defaults to None.
            translate (bool, optional): whether to translate input (DNA) sequences. Defaults to False.
            relabel (bool, optional): whether to relabel records in reference database with provisional short labels. Defaults to True.
            remove_duplicates (bool, optional): whether to remove duplicated sequences in database. Defaults to True.
            relabel_prefixes (list[str], optional): list of prefixes to be added to each relabelled record. One for
                each HMM-derived database. Defaults to None.
            hmmsearch_args (str, optional): additional arguments to hmmsearch as a string. Defaults to None.
            tree_method (str, optional): choose tree inference method: "iqtree" or "fasttree". Defaults to "fasttree".
            tree_model (str, optional): choose method to select substitution model: "iqtest", "modeltest",
                or a valid substitution model name (compatible with EPA-ng). Defaults to "iqtest".
            msa_method (str, optional): choose msa method for reference database: "muscle" or "mafft".
                Defaults to "muscle".
            skip_preprocess (bool, optional): whether to skip preprocessing step. Defaults to False.
            logfile (Path, optional): path to logfile. Defaults to None.
        """
        self.input_database = Path(input_database).resolve()
        self.hmms = hmms
        if maximum_hmm_reference_sizes is None:
            self.max_hmm_reference_sizes = [1000 for _ in hmms]
        else:
            self.max_hmm_reference_sizes = maximum_hmm_reference_sizes
        self.minimum_sequence_length = minimum_sequence_length
        self.maximum_sequence_length = maximum_sequence_length
        self.translate = translate
        self.relabel = relabel
        self.remove_duplicates = remove_duplicates
        self.relabel_prefixes = relabel_prefixes
        if isinstance(hmmsearch_args, str) and (not hmmsearch_args.startswith(" ")):
            hmmsearch_args = " " + hmmsearch_args
        self.hmmsearch_args = hmmsearch_args
        self.msa_method = msa_method
        self.tree_method = tree_method
        self.tree_model = tree_model
        self._output_directory = Path(output_directory).resolve()
        self._out_cleaned_database = Path(
            self._output_directory / f"{self.input_database.stem}_cleaned.faa"
        ).resolve()
        self._out_reference_database = self._output_directory / "ref_database.faa"
        self._out_reference_tree = self._output_directory / "ref_database.newick"
        self._out_reference_alignment = self._output_directory / "ref_database.faln"
        self.out_reference_labels = (
            self._output_directory / "ref_database_id_dict.pickle"
        )
        self.out_tree_model = None
        self.skip_preprocess = skip_preprocess
        self._logfile = logfile

    @property
    def output_directory(self) -> Path:
        """Get output directory."""
        return self._output_directory

    @property
    def cleaned_database(self) -> Path:
        """Get path to cleaned database."""
        return self._out_cleaned_database

    @property
    def reference_database(self) -> Path:
        """Get path to reference database."""
        return self._out_reference_database

    @property
    def reference_labels(self) -> Path:
        """Get path to reference labels."""
        return self.out_reference_labels

    @property
    def reference_tree(self) -> Path:
        """Get path to reference tree."""
        return self._out_reference_tree

    @property
    def reference_alignment(self) -> Path:
        """Get path to reference alignment."""
        return self._out_reference_alignment

    @property
    def logfile(self) -> Path:
        """Get path to log file."""
        return self._logfile

    def run(self) -> None:
        """Run pipeline to build reference tree."""
        if not self.skip_preprocess:
            preprocess_args = CommandArgs(
                data=self.input_database,
                outfile=self._out_cleaned_database,
                translate=self.translate,
                dna=self.translate,
                export_dup=self.remove_duplicates,
                relabel=False,
                idprefix=None,
                logfile=self._logfile,
            )
            preprocess.run(preprocess_args)
        if not self._out_cleaned_database.exists():
            logger.error(
                "Please, first run preprocess step by setting 'skip_preprocess=False'."
            )

        database_args = CommandArgs(
            data=self._out_cleaned_database,
            outdir=self._output_directory,
            outfile=None,
            hmms=self.hmms,
            prefix="",
            relabel=self.relabel,
            nocdhit=False,
            noduplicates=False,
            relabel_prefixes=self.relabel_prefixes,
            maxsizes=self.max_hmm_reference_sizes,
            minseqlength=self.minimum_sequence_length,
            maxseqlength=self.maximum_sequence_length,
            hmmsearch_args=self.hmmsearch_args,
            logfile=self._logfile,
        )
        makedatabase.run(database_args)

        tree_args = CommandArgs(
            data=self._out_reference_database,
            outdir=self._output_directory,
            msa_method=self.msa_method,
            tree_model=self.tree_model,
            tree_method=self.tree_method,
            logfile=self._logfile,
        )
        buildtree.run(tree_args)

        relabel_args = CommandArgs(
            tree=self._out_reference_tree,
            labels=[self.out_reference_labels],
            labelprefixes=None,
            taxonomy=True,
            aln=None,
            outdir=None,
            taxofile=None,
            logfile=self._logfile,
        )
        relabeltree.run(relabel_args)

cleaned_database: Path property

Get path to cleaned database.

logfile: Path property

Get path to log file.

output_directory: Path property

Get output directory.

reference_alignment: Path property

Get path to reference alignment.

reference_database: Path property

Get path to reference database.

reference_labels: Path property

Get path to reference labels.

reference_tree: Path property

Get path to reference tree.

__init__(input_database, hmms, maximum_hmm_reference_sizes=None, minimum_sequence_length=30, maximum_sequence_length=None, output_directory=None, translate=False, relabel=True, remove_duplicates=True, relabel_prefixes=None, hmmsearch_args=None, tree_method='fasttree', tree_model='iqtest', msa_method='muscle', skip_preprocess=False, logfile=None)

Reconstruct reference phylogenetic tree from sequence database and hmms

Parameters:

Name Type Description Default
input_database Path

path to input sequence database in FASTA format

required
hmms list[Path]

list of paths to HMM files

required
max_hmm_reference_size list[int]

list of maximum database size for each HMM. Defaults to None.

required
min_sequence_length int

minimum length of sequences in final database. Defaults to 10.

required
max_sequence_length int

maximum length of sequencesin final database. Defaults to 1000.

required
output_directory Path

path to output directory. Defaults to None.

None
translate bool

whether to translate input (DNA) sequences. Defaults to False.

False
relabel bool

whether to relabel records in reference database with provisional short labels. Defaults to True.

True
remove_duplicates bool

whether to remove duplicated sequences in database. Defaults to True.

True
relabel_prefixes list[str]

list of prefixes to be added to each relabelled record. One for each HMM-derived database. Defaults to None.

None
hmmsearch_args str

additional arguments to hmmsearch as a string. Defaults to None.

None
tree_method str

choose tree inference method: "iqtree" or "fasttree". Defaults to "fasttree".

'fasttree'
tree_model str

choose method to select substitution model: "iqtest", "modeltest", or a valid substitution model name (compatible with EPA-ng). Defaults to "iqtest".

'iqtest'
msa_method str

choose msa method for reference database: "muscle" or "mafft". Defaults to "muscle".

'muscle'
skip_preprocess bool

whether to skip preprocessing step. Defaults to False.

False
logfile Path

path to logfile. Defaults to None.

None
Source code in src/metatag/pipelines.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def __init__(
    self,
    input_database: Path,
    hmms: list[Path],
    maximum_hmm_reference_sizes: list[int] = None,
    minimum_sequence_length: int = 30,
    maximum_sequence_length: int = None,
    output_directory: Path = None,
    translate: bool = False,
    relabel: bool = True,
    remove_duplicates: bool = True,
    relabel_prefixes: list[str] = None,
    hmmsearch_args: str = None,
    tree_method: str = "fasttree",
    tree_model: str = "iqtest",
    msa_method: str = "muscle",
    skip_preprocess: bool = False,
    logfile: Path = None,
):
    """Reconstruct reference phylogenetic tree from sequence database and hmms

    Args:
        input_database (Path): path to input sequence database in FASTA format
        hmms (list[Path]): list of paths to HMM files
        max_hmm_reference_size (list[int], optional): list of maximum database size for each HMM. Defaults to None.
        min_sequence_length (int, optional): minimum length of sequences in final database. Defaults to 10.
        max_sequence_length (int, optional): maximum length of sequencesin final database. Defaults to 1000.
        output_directory (Path, optional): path to output directory. Defaults to None.
        translate (bool, optional): whether to translate input (DNA) sequences. Defaults to False.
        relabel (bool, optional): whether to relabel records in reference database with provisional short labels. Defaults to True.
        remove_duplicates (bool, optional): whether to remove duplicated sequences in database. Defaults to True.
        relabel_prefixes (list[str], optional): list of prefixes to be added to each relabelled record. One for
            each HMM-derived database. Defaults to None.
        hmmsearch_args (str, optional): additional arguments to hmmsearch as a string. Defaults to None.
        tree_method (str, optional): choose tree inference method: "iqtree" or "fasttree". Defaults to "fasttree".
        tree_model (str, optional): choose method to select substitution model: "iqtest", "modeltest",
            or a valid substitution model name (compatible with EPA-ng). Defaults to "iqtest".
        msa_method (str, optional): choose msa method for reference database: "muscle" or "mafft".
            Defaults to "muscle".
        skip_preprocess (bool, optional): whether to skip preprocessing step. Defaults to False.
        logfile (Path, optional): path to logfile. Defaults to None.
    """
    self.input_database = Path(input_database).resolve()
    self.hmms = hmms
    if maximum_hmm_reference_sizes is None:
        self.max_hmm_reference_sizes = [1000 for _ in hmms]
    else:
        self.max_hmm_reference_sizes = maximum_hmm_reference_sizes
    self.minimum_sequence_length = minimum_sequence_length
    self.maximum_sequence_length = maximum_sequence_length
    self.translate = translate
    self.relabel = relabel
    self.remove_duplicates = remove_duplicates
    self.relabel_prefixes = relabel_prefixes
    if isinstance(hmmsearch_args, str) and (not hmmsearch_args.startswith(" ")):
        hmmsearch_args = " " + hmmsearch_args
    self.hmmsearch_args = hmmsearch_args
    self.msa_method = msa_method
    self.tree_method = tree_method
    self.tree_model = tree_model
    self._output_directory = Path(output_directory).resolve()
    self._out_cleaned_database = Path(
        self._output_directory / f"{self.input_database.stem}_cleaned.faa"
    ).resolve()
    self._out_reference_database = self._output_directory / "ref_database.faa"
    self._out_reference_tree = self._output_directory / "ref_database.newick"
    self._out_reference_alignment = self._output_directory / "ref_database.faln"
    self.out_reference_labels = (
        self._output_directory / "ref_database_id_dict.pickle"
    )
    self.out_tree_model = None
    self.skip_preprocess = skip_preprocess
    self._logfile = logfile

run()

Run pipeline to build reference tree.

Source code in src/metatag/pipelines.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def run(self) -> None:
    """Run pipeline to build reference tree."""
    if not self.skip_preprocess:
        preprocess_args = CommandArgs(
            data=self.input_database,
            outfile=self._out_cleaned_database,
            translate=self.translate,
            dna=self.translate,
            export_dup=self.remove_duplicates,
            relabel=False,
            idprefix=None,
            logfile=self._logfile,
        )
        preprocess.run(preprocess_args)
    if not self._out_cleaned_database.exists():
        logger.error(
            "Please, first run preprocess step by setting 'skip_preprocess=False'."
        )

    database_args = CommandArgs(
        data=self._out_cleaned_database,
        outdir=self._output_directory,
        outfile=None,
        hmms=self.hmms,
        prefix="",
        relabel=self.relabel,
        nocdhit=False,
        noduplicates=False,
        relabel_prefixes=self.relabel_prefixes,
        maxsizes=self.max_hmm_reference_sizes,
        minseqlength=self.minimum_sequence_length,
        maxseqlength=self.maximum_sequence_length,
        hmmsearch_args=self.hmmsearch_args,
        logfile=self._logfile,
    )
    makedatabase.run(database_args)

    tree_args = CommandArgs(
        data=self._out_reference_database,
        outdir=self._output_directory,
        msa_method=self.msa_method,
        tree_model=self.tree_model,
        tree_method=self.tree_method,
        logfile=self._logfile,
    )
    buildtree.run(tree_args)

    relabel_args = CommandArgs(
        tree=self._out_reference_tree,
        labels=[self.out_reference_labels],
        labelprefixes=None,
        taxonomy=True,
        aln=None,
        outdir=None,
        taxofile=None,
        logfile=self._logfile,
    )
    relabeltree.run(relabel_args)

Module database.preprocessing

Tools to preprocess sequence databases

assert_correct_sequence_format(fasta_file, output_file=None, is_peptide=True)

Filter out (DNA or peptide) sequences containing illegal characters

Parameters:

Name Type Description Default
fasta_file Path

path to input FASTA file

required
output_file Path

path to output file. Defaults to None.

None
is_peptide bool

whether FASTA contains peptide sequences. Defaults to True.

True
Source code in src/metatag/database/preprocessing.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def assert_correct_sequence_format(
    fasta_file: Path,
    output_file: Path = None,
    is_peptide: bool = True,
) -> None:
    """Filter out (DNA or peptide) sequences containing illegal characters

    Args:
        fasta_file (Path): path to input FASTA file
        output_file (Path, optional): path to output file. Defaults to None.
        is_peptide (bool, optional): whether FASTA contains peptide sequences.
            Defaults to True.
    """
    fasta_file = Path(fasta_file).resolve()
    dirname = fasta_file.parent
    fname, ext = fasta_file.stem, fasta_file.suffix

    def remove_stop_codon_signals(record_seq: str) -> str:
        return record_seq.replace("*", "")

    if output_file is None:
        output_file = dirname / f"{fname}_modified{ext}"
    else:
        output_file = Path(output_file).resolve()
    if is_peptide:
        is_legit_sequence = is_legit_peptide_sequence
    else:
        is_legit_sequence = is_legit_dna_sequence

    with open(output_file, "w") as outfile, open(fasta_file, "r") as ffile:
        fasta = pyfastx.Fasta(ffile.name, build_index=False, full_name=True)
        for record_name, record_seq in fasta:
            if is_peptide:
                record_seq = remove_stop_codon_signals(record_seq)
            if is_legit_sequence(record_seq):
                outfile.write(f">{record_name}\n{record_seq}\n")

fasta_contains_nucleotide_sequences(fasta_file)

Check whether fasta file contains nucleotide sequences

Parameters:

Name Type Description Default
fasta_file Path

path to input FASTA file

required

Returns:

Name Type Description
bool bool

whehter FASTa contains nucleotide sequences

Source code in src/metatag/database/preprocessing.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def fasta_contains_nucleotide_sequences(fasta_file: Path) -> bool:
    """Check whether fasta file contains nucleotide sequences

    Args:
        fasta_file (Path): path to input FASTA file

    Returns:
        bool: whehter FASTa contains nucleotide sequences
    """
    ffile = open(fasta_file, "r")
    seq_1 = next(SeqIO.parse(ffile, "fasta"))
    seq = seq_1.seq
    ffile.close()
    return is_legit_dna_sequence(seq)

is_fasta(filename)

Check whether file is of type FASTA

Parameters:

Name Type Description Default
filename Path

path to input file

required

Returns:

Name Type Description
bool bool

answer

Source code in src/metatag/database/preprocessing.py
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
def is_fasta(filename: Path) -> bool:
    """Check whether file is of type FASTA

    Args:
        filename (Path): path to input file

    Returns:
        bool: answer
    """
    filename = Path(filename).resolve()
    if filename.exists():
        with open(filename, "r") as handle:
            fasta = SeqIO.parse(handle, "fasta")
            return any(fasta)
    else:
        return False

is_legit_dna_sequence(record_seq)

Assert that DNA sequence only contains valid symbols

Parameters:

Name Type Description Default
record_seq str

record squence as a string

required

Returns:

Name Type Description
bool bool

whether sequence corresponds to DNA

Source code in src/metatag/database/preprocessing.py
125
126
127
128
129
130
131
132
133
134
135
136
def is_legit_dna_sequence(record_seq: str) -> bool:
    """Assert that DNA sequence only contains valid symbols

    Args:
        record_seq (str): record squence as a string

    Returns:
        bool: whether sequence corresponds to DNA
    """
    nts = {"A", "G", "T", "C"}
    seq_symbols = {s.upper() for s in record_seq}
    return seq_symbols.issubset(nts)

is_legit_peptide_sequence(record_seq)

Assert that peptide sequence only contains valid symbols

Parameters:

Name Type Description Default
record_seq str

record sequence as a string

required

Returns:

Name Type Description
bool bool

whether sequence correpsonds to a peptide

Source code in src/metatag/database/preprocessing.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def is_legit_peptide_sequence(record_seq: str) -> bool:
    """Assert that peptide sequence only contains valid symbols

    Args:
        record_seq (str): record sequence as a string

    Returns:
        bool: whether sequence correpsonds to a peptide
    """
    aas = {
        "A",
        "C",
        "D",
        "E",
        "F",
        "G",
        "H",
        "I",
        "K",
        "L",
        "M",
        "N",
        "P",
        "Q",
        "R",
        "S",
        "T",
        "V",
        "W",
        "Y",
    }
    seq_symbols = {s.upper() for s in record_seq}
    return seq_symbols.issubset(aas)

merge_fastas(input_fastas_dir, output_fasta=None)

Merge input fasta files into a single fasta

Parameters:

Name Type Description Default
input_fastas_dir Path

path to input FASTA file

required
output_fasta Path

path to output file. Defaults to None.

None
Source code in src/metatag/database/preprocessing.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def merge_fastas(input_fastas_dir: Path, output_fasta: Path = None) -> None:
    """Merge input fasta files into a single fasta

    Args:
        input_fastas_dir (Path): path to input FASTA file
        output_fasta (Path, optional): path to output file. Defaults to None.
    """
    input_fastas_dir = Path(input_fastas_dir).resolve()
    if output_fasta is None:
        output_fasta = input_fastas_dir / "merged.fasta"
    else:
        output_fasta = Path(output_fasta).resolve()
    file = open(output_fasta, "w", encoding="UTF-8")
    cmd_str = f"printf '%s\\0' * | xargs -0 cat > {output_fasta}"
    terminal_execute(cmd_str, work_dir=input_fastas_dir, suppress_shell_output=False)
    file.close()

remove_duplicates_from_fasta(input_fasta, output_fasta=None, export_duplicates=False, duplicates_file=None)

Removes duplicate entries (either by sequence or ID) from fasta.

Parameters:

Name Type Description Default
input_fasta Path

path to input FASTA file

required
output_fasta Path

path to output file. Defaults to None.

None
export_duplicates bool

whether to export a file cotnainig duplicated sequences. Defaults to False.

False
duplicates_file Path

path to duplicates output file. Defaults to None.

None
Source code in src/metatag/database/preprocessing.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def remove_duplicates_from_fasta(
    input_fasta: Path,
    output_fasta: Path = None,
    export_duplicates: bool = False,
    duplicates_file: Path = None,
) -> None:
    """Removes duplicate entries (either by sequence or ID) from fasta.

    Args:
        input_fasta (Path): path to input FASTA file
        output_fasta (Path, optional): path to output file. Defaults to None.
        export_duplicates (bool, optional): whether to export a file cotnainig
            duplicated sequences. Defaults to False.
        duplicates_file (Path, optional): path to duplicates output file. Defaults to None.
    """
    if output_fasta is None:
        output_fasta = set_default_output_path(input_fasta, "_noduplicates")
    else:
        output_fasta = Path(output_fasta).resolve()

    if duplicates_file is None:
        duplicates_file = set_default_output_path(
            input_fasta, "_duplicates", extension=".txt"
        )
    wrappers.run_seqkit_nodup(
        input_fasta=input_fasta,
        output_fasta=output_fasta,
        export_duplicates=export_duplicates,
        duplicates_file=duplicates_file,
    )

set_original_record_ids_in_fasta(input_fasta, label_dict=None, output_fasta=None)

Relabel temporary record ID by original IDs

Parameters:

Name Type Description Default
input_fasta Path

path to input FASTA file

required
label_dict dict

dictionary containing labels to short IDs Defaults to None.

None
output_fasta Path

path to output file. Defaults to None.

None
Source code in src/metatag/database/preprocessing.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
def set_original_record_ids_in_fasta(
    input_fasta: Path, label_dict: dict = None, output_fasta: Path = None
) -> None:
    """Relabel temporary record ID by original IDs

    Args:
        input_fasta (Path): path to input FASTA file
        label_dict (dict, optional): dictionary containing labels to short IDs
            Defaults to None.
        output_fasta (Path, optional): path to output file. Defaults to None.
    """
    input_fasta = Path(input_fasta).resolve()
    if output_fasta is None:
        output_fasta = set_default_output_path(input_fasta, tag="_original_ids")
    else:
        output_fasta = Path(output_fasta).resolve()

    def relabel_records():
        for record in SeqIO.parse(input_fasta, "fasta"):
            if record.name in label_dict.keys():
                name = label_dict[record.name]
                record.name = name
                record.id = name
                record.description = name
            yield record

    SeqIO.write(relabel_records(), output_fasta, "fasta")

set_temp_record_ids_in_fasta(input_fasta, output_dir=None, prefix=None, output_fasta=None, output_dict=None)

Change record ids for numbers and store then in a dictionary

Parameters:

Name Type Description Default
input_fasta Path

path to input FASTA file

required
output_dir Path

path to output directory. Defaults to None.

None
prefix str

prefix to record names in output FASTA. Defaults to None.

None
output_fasta Path

description. Defaults to None.

None
output_dict Path

description. Defaults to None.

None
Source code in src/metatag/database/preprocessing.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def set_temp_record_ids_in_fasta(
    input_fasta: Path,
    output_dir: Path = None,
    prefix: str = None,
    output_fasta: Path = None,
    output_dict: Path = None,
) -> None:
    """Change record ids for numbers and store then in a dictionary

    Args:
        input_fasta (Path): path to input FASTA file
        output_dir (Path, optional): path to output directory. Defaults to None.
        prefix (str, optional): prefix to record names in output FASTA.
             Defaults to None.
        output_fasta (Path, optional): _description_. Defaults to None.
        output_dict (Path, optional): _description_. Defaults to None.
    """
    input_fasta = Path(input_fasta).resolve()
    if output_dir is None:
        output_dir = input_fasta.parent
    else:
        output_dir = Path(output_dir).resolve()
    if prefix is not None:
        prefix_str = prefix
    else:
        prefix_str = ""

    fasta_file = set_default_output_path(
        input_fasta, tag="_short_ids", only_filename=True
    )
    dict_file = set_default_output_path(
        input_fasta, tag="_id_dict", extension=".pickle", only_filename=True
    )
    if output_fasta is None:
        output_fasta = output_dir / fasta_file
    if output_dict is None:
        output_dict = output_dir / dict_file

    id_dict = dict()
    with open(output_fasta, "w", encoding="UTF-8") as outfasta, open(
        input_fasta, "r", encoding="UTF-8"
    ) as ffile:
        fasta = pyfastx.Fasta(ffile.name, build_index=False, full_name=True)
        for n, (record_name, record_seq) in enumerate(fasta):
            new_id = f"{prefix_str}{n}"
            id_dict[new_id] = record_name
            outfasta.write(f">{new_id}\n{record_seq}\n")
    save_to_pickle_file(id_dict, output_dict)

write_record_names_to_file(input_fasta, filter_by_tag=None, output_file=None)

Write a txt file containing a list of record IDs in fasta

Parameters:

Name Type Description Default
input_fasta Path

path to input FASTA file

required
filter_by_tag str

set to str containing a pattern to match in record labels. In this case, only matched record labels are returned. Defaults to None.

None
output_file Path

path to output file. Defaults to None.

None
Source code in src/metatag/database/preprocessing.py
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
def write_record_names_to_file(
    input_fasta: Path, filter_by_tag: str = None, output_file: Path = None
):
    """Write a txt file containing a list of record IDs in fasta

    Args:
        input_fasta (Path): path to input FASTA file
        filter_by_tag (str, optional): set to str containing a pattern to match
            in record labels. In this case, only matched record labels
            are returned. Defaults to None.
        output_file (Path, optional): path to output file. Defaults to None.
    """
    input_fasta = Path(input_fasta).resolve()
    if output_file is None:
        output_file = set_default_output_path(input_fasta, extension=".txt")
    else:
        output_file = Path(output_file).resolve()
    if filter_by_tag is not None:
        pattern = filter_by_tag
    else:
        pattern = ">"
    cmd_str = f"grep '{pattern}' {input_fasta} | cut -c 2- > {output_file}"
    terminal_execute(cmd_str, suppress_shell_output=False)

Module database.manipulation

Tools to create peptide-specific sequence databases

convert_fasta_aln_to_phylip(input_fasta_aln, output_phylip=None)

Convert alignments in Fasta to Phylip.

Parameters:

Name Type Description Default
input_fasta_aln Path

path to input alignment file

required
output_phylip Path

path to output file. Defaults to None.

None
Source code in src/metatag/database/manipulation.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def convert_fasta_aln_to_phylip(
    input_fasta_aln: Path, output_phylip: Path = None
) -> None:
    """Convert alignments in Fasta to Phylip.

    Args:
        input_fasta_aln (Path): path to input alignment file
        output_phylip (Path, optional): path to output file. Defaults to None.
    """
    input_fasta_aln = Path(input_fasta_aln).resolve()
    if output_phylip is None:
        output_phylip = set_default_output_path(input_fasta_aln, extension=".phylip")
    else:
        output_phylip = Path(output_phylip).resolve()
    with open(input_fasta_aln, "r") as input_handle, open(
        output_phylip, "w"
    ) as output_handle:
        alignments = AlignIO.parse(input_handle, "fasta")
        AlignIO.write(alignments, output_handle, "phylip-relaxed")

convert_phylip_to_fasta_aln(input_phylip, output_file=None)

Convert alignments in Phylip to Fasta format

Parameters:

Name Type Description Default
input_phylip Path

path to input alignment file

required
output_file Path

path to output file. Defaults to None.

None
Source code in src/metatag/database/manipulation.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def convert_phylip_to_fasta_aln(input_phylip: Path, output_file: Path = None) -> None:
    """Convert alignments in Phylip to Fasta format

    Args:
        input_phylip (Path): path to input alignment file
        output_file (Path, optional): path to output file. Defaults to None.
    """
    input_phylip = Path(input_phylip).resolve()
    if output_file is None:
        output_file = set_default_output_path(input_phylip, extension=".faln")
    else:
        output_file = Path(output_file).resolve()
    alignments = AlignIO.parse(input_phylip, "phylip-relaxed")
    AlignIO.write(alignments, output_file, "fasta")

convert_stockholm_to_fasta_aln(input_stockholm, output_fasta=None)

Convert alignment file in Stockholm format to fasta

Parameters:

Name Type Description Default
input_stockholm Path

path to input alignment file

required
output_fasta Path

path to output file. Defaults to None.

None
Source code in src/metatag/database/manipulation.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def convert_stockholm_to_fasta_aln(
    input_stockholm: Path, output_fasta: Path = None
) -> None:
    """Convert alignment file in Stockholm format to fasta

    Args:
        input_stockholm (Path): path to input alignment file
        output_fasta (Path, optional): path to output file. Defaults to None.
    """
    input_stockholm = Path(input_stockholm).resolve()
    if output_fasta is None:
        output_fasta = set_default_output_path(input_stockholm, extension=".faln")
    else:
        output_fasta = Path(output_fasta).resolve()
    alignments = AlignIO.read(input_stockholm, "stockholm")
    AlignIO.write(alignments, output_fasta, "fasta")

filter_fasta_by_hmm(hmm_model, input_fasta, output_fasta=None, hmmer_output=None, method='hmmsearch', additional_args=None)

Generate protein-specific database by filtering sequence database to only contain sequences corresponing to protein of interest

Parameters:

Name Type Description Default
hmm_model Path

path to hmm model

required
input_fasta Path

path to input FASTA file

required
output_fasta Path

path to output file. Defaults to None.

None
hmmer_output Path

path to hmmer output directory. Defaults to None.

None
method str

choose hmmer method: "hmmsearch" or "hmmscan". Defaults to "hmmsearch".

'hmmsearch'
additional_args str

additional arguments to hmmsearch/scan as a string. Defaults to None.

None
Source code in src/metatag/database/manipulation.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def filter_fasta_by_hmm(
    hmm_model: Path,
    input_fasta: Path,
    output_fasta: Path = None,
    hmmer_output: Path = None,
    method: str = "hmmsearch",
    additional_args: str = None,
) -> None:
    """Generate protein-specific database by filtering
       sequence database to only contain sequences
       corresponing to protein of interest

    Args:
        hmm_model (Path): path to hmm model
        input_fasta (Path): path to input FASTA file
        output_fasta (Path, optional): path to output file. Defaults to None.
        hmmer_output (Path, optional): path to hmmer output directory. Defaults to None.
        method (str, optional): choose hmmer method: "hmmsearch" or "hmmscan". Defaults to "hmmsearch".
        additional_args (str, optional): additional arguments to hmmsearch/scan as a string. Defaults to None.
    """
    hmm_model = Path(hmm_model).resolve()
    input_fasta = Path(input_fasta).resolve()
    if hmmer_output is None:
        hmmer_output = set_default_output_path(
            input_fasta, tag=f"_{hmm_model.stem}", extension=".txt"
        )
    else:
        hmmer_output = Path(hmmer_output).resolve()
    if output_fasta is None:
        output_fasta = set_default_output_path(
            input_fasta, tag=f"filtered_{hmm_model.stem}"
        )
    else:
        output_fasta = Path(output_fasta).resolve()

    logger.info("Running Hmmer...")
    wrappers.run_hmmsearch(
        hmm_model=hmm_model,
        input_fasta=input_fasta,
        output_file=hmmer_output,
        method=method,
        additional_args=additional_args,
    )
    logger.info("Parsing Hmmer output file...")
    hmmer_hits = parse_hmmsearch_output(hmmer_output)
    if (hmmer_hits.empty) or (not hmmer_hits.id.values.tolist()):
        logger.error("No records found in database matching provided hmm")
        sys.exit(1)
    logger.info("Filtering Fasta...")
    filter_fasta_by_ids(
        input_fasta, record_ids=hmmer_hits.id.values, output_fasta=output_fasta
    )

filter_fasta_by_ids(input_fasta, record_ids, output_fasta=None)

Filter records in fasta file matching provided IDs

Parameters:

Name Type Description Default
input_fasta Path

path to input FASTA file

required
record_ids list

list of record IDs to filter FASTA file by

required
output_fasta Path

path to output file. Defaults to None.

None
Source code in src/metatag/database/manipulation.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def filter_fasta_by_ids(
    input_fasta: Path, record_ids: list, output_fasta: Path = None
) -> None:
    """Filter records in fasta file matching provided IDs

    Args:
        input_fasta (Path): path to input FASTA file
        record_ids (list): list of record IDs to filter FASTA file by
        output_fasta (Path, optional): path to output file. Defaults to None.
    """
    input_fasta = Path(input_fasta).resolve()
    if output_fasta is None:
        output_fasta = set_default_output_path(input_fasta, "_fitered")
    else:
        output_fasta = Path(output_fasta).resolve()
    index_file = Path(input_fasta.as_posix() + ".fxi").resolve()
    index_file.unlink(missing_ok=True)
    record_ids = set(record_ids)
    with tempfile.NamedTemporaryFile(mode="w+t") as tmp_ids:
        tmp_ids.writelines("\n".join(record_ids))
        tmp_ids.flush()
        tmp_ids_path = tmp_ids.name
        cmd_str = f"seqkit grep -i -f {tmp_ids_path} {input_fasta} -o {output_fasta}"
        terminal_execute(cmd_str, suppress_shell_output=True)

filter_fasta_by_sequence_length(input_fasta, min_length=None, max_length=None, output_fasta=None)

Filter sequences by length in fasta file

Parameters:

Name Type Description Default
input_fasta Path

path to input FASTA file

required
min_length int

description. Defaults to None.

None
max_length int

description. Defaults to None.

None
output_fasta Path

description. Defaults to None.

None
Source code in src/metatag/database/manipulation.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def filter_fasta_by_sequence_length(
    input_fasta: Path,
    min_length: int = None,
    max_length: int = None,
    output_fasta: Path = None,
) -> None:
    """Filter sequences by length in fasta file

    Args:
        input_fasta (Path): path to input FASTA file
        min_length (int, optional): _description_. Defaults to None.
        max_length (int, optional): _description_. Defaults to None.
        output_fasta (Path, optional): _description_. Defaults to None.
    """
    if (min_length is None) and (max_length is None):
        warnings.warn("Missing boundary values for sequence length")
        return
    input_fasta = Path(input_fasta).resolve()
    with open(input_fasta, "r") as ffile:
        fasta = pyfastx.Fasta(ffile.name)
        record_ids = fasta.keys()
        if min_length is None:
            min_length = 0
        if max_length is not None:
            max_tag = str(max_length)
            record_ids.filter(record_ids >= min_length, record_ids <= max_length)
        else:
            max_tag = ""
            record_ids.filter(record_ids >= min_length)
        if output_fasta is None:
            output_fasta = set_default_output_path(
                input_fasta, f"_length_{min_length}_{max_tag}"
            )
        else:
            output_fasta = Path(output_fasta).resolve()
        if not record_ids:
            logger.error("No records found with given sequence length bounds")
            sys.exit(1)
        with open(output_fasta, "w") as fp:
            for record_id in record_ids:
                record_obj = fasta[record_id]
                fp.write(record_obj.raw)
    Path(input_fasta.as_posix() + ".fxi").unlink(missing_ok=True)

get_fasta_record_ids(fasta_file)

Extract record ids from fasta

Parameters:

Name Type Description Default
fasta_file Path

path to input FASTA file

required

Returns:

Name Type Description
set set

set of record IDs in FAStA file

Source code in src/metatag/database/manipulation.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
def get_fasta_record_ids(fasta_file: Path) -> set:
    """Extract record ids from fasta

    Args:
        fasta_file (Path): path to input FASTA file

    Returns:
        set: set of record IDs in FAStA file
    """
    fasta_file = Path(fasta_file).resolve()
    with open(fasta_file, "r") as ffile:
        fasta = pyfastx.Fasta(ffile.name, full_name=True)
        fasta_keys = set(fasta.keys())
    return fasta_keys

parse_hmmsearch_output(hmmer_output)

Parse hmmsearch or hmmscan summary table output file

Args hmmer_output (Path): path to hmmsearch or hmmscan summary table output file

Source code in src/metatag/database/manipulation.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def parse_hmmsearch_output(hmmer_output: Path) -> pd.DataFrame:
    """Parse hmmsearch or hmmscan summary table output file

    Args
        hmmer_output (Path): path to hmmsearch or hmmscan summary table output file
    """
    attribs = ["id", "bias", "bitscore", "description"]
    hits = defaultdict(list)
    with open(hmmer_output) as handle:
        for queryresult in SearchIO.parse(handle, "hmmer3-tab"):
            for hit in queryresult.hits:
                for attrib in attribs:
                    hits[attrib].append(getattr(hit, attrib))
    return pd.DataFrame.from_dict(hits)

split_reference_from_query_alignments(ref_query_msa, ref_ids=None, ref_prefix=None, output_dir=None)

Separate reference sequences from query sequences in msa fasta file

Parameters:

Name Type Description Default
ref_query_msa Path

path to input papara/hmmalign alignment file

required
ref_ids set

IDs of reference sequences. Defaults to None.

None
ref_prefix str

prefix employed by all reference sequences (Use instead of ref_ids). Defaults to None.

None
output_dir Path

path to output directory. Defaults to None.

None
Source code in src/metatag/database/manipulation.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def split_reference_from_query_alignments(
    ref_query_msa: Path,
    ref_ids: set = None,
    ref_prefix: str = None,
    output_dir: Path = None,
) -> None:
    """Separate reference sequences from query sequences in msa fasta file

    Args:
        ref_query_msa (Path): path to input papara/hmmalign alignment file
        ref_ids (set, optional): IDs of reference sequences. Defaults to None.
        ref_prefix (str, optional): prefix employed by all reference sequences
            (Use instead of ref_ids). Defaults to None.
        output_dir (Path, optional): path to output directory. Defaults to None.
    """
    ref_query_msa = Path(ref_query_msa).resolve()
    if output_dir is None:
        output_dir = ref_query_msa.parent
    else:
        output_dir = Path(output_dir).resolve()
    if (ref_ids is not None) and (ref_prefix is None):

        def is_reference(record_name):
            return record_name in ref_ids

    elif (ref_ids is None) and (ref_prefix is not None):

        def is_reference(record_name):
            return record_name.lower().startswith(ref_prefix.lower())

    else:
        logger.error("Provide either set of ref ids or ref prefix")
        sys.exit(1)
    out_ref_msa = set_default_output_path(ref_query_msa, tag="_ref_fraction")
    out_query_msa = set_default_output_path(ref_query_msa, tag="_query_fraction")

    with open(out_ref_msa, "w") as outref, open(out_query_msa, "w") as outquery, open(
        ref_query_msa, "r"
    ) as msa:
        fasta = pyfastx.Fasta(msa.name, build_index=False, full_name=True)
        for record_name, record_seq in fasta:
            if is_reference(record_name):
                outref.write(f">{record_name}\n{record_seq}\n")
            else:
                outquery.write(f">{record_name}\n{record_seq}\n")

Module database.reduction

Tools to reduce the size of the peptide-specific reference database

Currently based on: 1. CD-HIT 2. Repset: https://onlinelibrary.wiley.com/doi/10.1002/prot.25461

get_representative_set(input_seqs, input_pi, max_size=None, output_file=None)

Runs repset.py to obtain a representative set of size equal to max_size (or smaller if less sequences than max_size) or an ordered list (by 'representativeness') of representative sequences if max_size set to None.

Parameters:

Name Type Description Default
input_seqs Path

path to input FASTA file containing sequences

required
input_pi Path

path to input file containing pairwise identity

required
max_size int

maximum number of sequences in reduced database. Defaults to None.

None
output_file Path

path to output file. Defaults to None.

None
Source code in src/metatag/database/reduction.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def get_representative_set(
    input_seqs: Path, input_pi: Path, max_size: int = None, output_file: Path = None
) -> None:
    """Runs repset.py to obtain a representative
       set of size equal to max_size (or smaller if less sequences than max_size)
       or an ordered list (by 'representativeness') of representative sequences
       if max_size set to None.

    Args:
        input_seqs (Path): path to input FASTA file containing sequences
        input_pi (Path): path to input file containing pairwise identity
        max_size (int, optional): maximum number of sequences in reduced database.
            Defaults to None.
        output_file (Path, optional): path to output file. Defaults to None.
    """
    input_seqs = Path(input_seqs).resolve()
    input_pi = Path(input_pi).resolve()
    repset_exe = Path(__file__).parent.parent / "vendor" / "repset_min.py"

    if output_file is None:
        output_file = set_default_output_path(input_seqs, tag="_repset")
    else:
        output_file = Path(output_file).resolve()

    with TemporaryDirectoryPath() as tempdir:
        cmd_str = (
            f"python {repset_exe} --seqs {input_seqs} --pi {input_pi} "
            f"--outdir {tempdir} --size {max_size}"
        )
        terminal_execute(cmd_str, suppress_shell_output=False)

        with open(Path(tempdir) / "repset.txt") as repset:
            rep_ids = [rep_id.strip("\n") for rep_id in repset.readlines()]

    if (max_size is not None) and (max_size < len(rep_ids)):
        rep_ids = rep_ids[:max_size]

    filter_fasta_by_ids(
        input_fasta=input_seqs, record_ids=rep_ids, output_fasta=output_file
    )

reduce_database_redundancy(input_fasta, output_fasta=None, cdhit=True, maxsize=None, cdhit_args=None)

Reduce redundancy of peptide datatabase. Runs cd-hit, if selected, additional arguments to cdhit may be passed as a string (cdhit_args). Runs repset to obtain a final database size no larger (number of sequences) than selected maxsize. If maxsize = None, repset is not run.

Parameters:

Name Type Description Default
input_fasta Path

path to input FASTA file.

required
output_fasta Path

path to output, reduced fasta. Defaults to None.

None
cdhit bool

whether to use CD-HIT alongside repset. Defaults to True.

True
maxsize int

maximum number of sequences in final database. Defaults to None.

None
cdhit_args str

additional arguments to CD-HIT. Defaults to None.

None
Source code in src/metatag/database/reduction.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def reduce_database_redundancy(
    input_fasta: Path,
    output_fasta: Path = None,
    cdhit: bool = True,
    maxsize: int = None,
    cdhit_args: str = None,
) -> None:
    """Reduce redundancy of peptide datatabase.
       Runs cd-hit, if selected, additional arguments to cdhit
       may be passed as a string (cdhit_args).
       Runs repset to obtain a final database size no larger
       (number of sequences) than selected maxsize.
       If maxsize = None, repset is not run.

    Args:
        input_fasta (Path): path to input FASTA file.
        output_fasta (Path, optional): path to output, reduced fasta.
            Defaults to None.
        cdhit (bool, optional): whether to use CD-HIT alongside repset. Defaults to True.
        maxsize (int, optional): maximum number of sequences in final database. Defaults to None.
        cdhit_args (str, optional): additional arguments to CD-HIT. Defaults to None.
    """
    input_fasta = Path(input_fasta).resolve()
    if output_fasta is None:
        output_fasta = set_default_output_path(input_fasta, tag="_reduced")
    else:
        output_fasta = Path(output_fasta).resolve()
    if (not cdhit) and (maxsize is None):
        logger.info("No reduction algorithm has been selected.")

    with TemporaryFilePath() as tempaln, TemporaryFilePath() as tempfasta, TemporaryFilePath() as tempfasta2, TemporaryFilePath() as tempident:
        if cdhit:
            wrappers.run_cdhit(
                input_fasta=input_fasta,
                output_fasta=tempfasta,
                additional_args=cdhit_args,
            )
            Path(tempfasta.as_posix() + ".clstr").unlink(missing_ok=True)
        else:
            shutil.move(input_fasta, tempfasta)

        if maxsize is not None:
            wrappers.run_mafft(
                input_fasta=tempfasta,
                output_file=tempaln,
                processes=-1,
                parallel=True,
                additional_args="--retree 1 --maxiterate 0 --quiet",
            )

            wrappers.get_percent_identity_from_msa(
                input_msa=tempaln, output_file=tempident
            )

            logger.info("Finding representative sequences for reference database...")
            get_representative_set(
                input_seqs=tempfasta,
                input_pi=tempident,
                max_size=maxsize,
                output_file=tempfasta2,
            )
            shutil.move(tempfasta2, tempfasta)

        shutil.move(tempfasta, output_fasta)

Module database.labelparsers

Tools to parse sequence labels from different databases

LabelParser

Source code in src/metatag/database/labelparsers.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class LabelParser:
    def __init__(self, label: str) -> None:
        """Parse labels to extract genome ID and metadata

        Args:
            label (str): label to be parsed
        """
        self._label = label

    def extract_genome_id(self) -> str:
        """Extract genome ID from sequence label

        Returns:
            str: Genome ID
        """
        mmp_id = self.extract_mmp_id()
        taxid = self.extract_taxid()
        if mmp_id and taxid:
            warnings.warn("Label contains conflicting genome IDs")
            return ""
        elif mmp_id:
            genome_id = mmp_id
        elif taxid:
            genome_id = taxid
        else:
            genome_id = self.extract_oceanmicrobiome_id()
        return genome_id

    def extract_mmp_id(self) -> str:
        """Extract MMP ID from sequence label

        Returns:
            str: MMP ID
        """
        db_entry = re.compile("_MMP\d+")
        try:
            return re.search(db_entry, self._label).group(0).strip("_")
        except Exception:
            return ""

    def extract_taxid(self) -> str:
        """Extract NCBI taxon ID from sequence label

        Returns:
            str: NCBI taxon ID
        """
        db_entry = re.compile("(OX=)(\d+)")
        try:
            return f"taxid_{re.search(db_entry, self._label).group(2)}"
        except Exception:
            return ""

    def extract_oceanmicrobiome_id(self) -> str:
        """Extract OceanMicrobiome ID from sequence label

        Returns:
            str: OceanMicrobiome ID
        """
        return "_".join(self._label.split("_")[:4])

__init__(label)

Parse labels to extract genome ID and metadata

Parameters:

Name Type Description Default
label str

label to be parsed

required
Source code in src/metatag/database/labelparsers.py
13
14
15
16
17
18
19
def __init__(self, label: str) -> None:
    """Parse labels to extract genome ID and metadata

    Args:
        label (str): label to be parsed
    """
    self._label = label

extract_genome_id()

Extract genome ID from sequence label

Returns:

Name Type Description
str str

Genome ID

Source code in src/metatag/database/labelparsers.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def extract_genome_id(self) -> str:
    """Extract genome ID from sequence label

    Returns:
        str: Genome ID
    """
    mmp_id = self.extract_mmp_id()
    taxid = self.extract_taxid()
    if mmp_id and taxid:
        warnings.warn("Label contains conflicting genome IDs")
        return ""
    elif mmp_id:
        genome_id = mmp_id
    elif taxid:
        genome_id = taxid
    else:
        genome_id = self.extract_oceanmicrobiome_id()
    return genome_id

extract_mmp_id()

Extract MMP ID from sequence label

Returns:

Name Type Description
str str

MMP ID

Source code in src/metatag/database/labelparsers.py
40
41
42
43
44
45
46
47
48
49
50
def extract_mmp_id(self) -> str:
    """Extract MMP ID from sequence label

    Returns:
        str: MMP ID
    """
    db_entry = re.compile("_MMP\d+")
    try:
        return re.search(db_entry, self._label).group(0).strip("_")
    except Exception:
        return ""

extract_oceanmicrobiome_id()

Extract OceanMicrobiome ID from sequence label

Returns:

Name Type Description
str str

OceanMicrobiome ID

Source code in src/metatag/database/labelparsers.py
64
65
66
67
68
69
70
def extract_oceanmicrobiome_id(self) -> str:
    """Extract OceanMicrobiome ID from sequence label

    Returns:
        str: OceanMicrobiome ID
    """
    return "_".join(self._label.split("_")[:4])

extract_taxid()

Extract NCBI taxon ID from sequence label

Returns:

Name Type Description
str str

NCBI taxon ID

Source code in src/metatag/database/labelparsers.py
52
53
54
55
56
57
58
59
60
61
62
def extract_taxid(self) -> str:
    """Extract NCBI taxon ID from sequence label

    Returns:
        str: NCBI taxon ID
    """
    db_entry = re.compile("(OX=)(\d+)")
    try:
        return f"taxid_{re.search(db_entry, self._label).group(2)}"
    except Exception:
        return ""

Module alignment

Tools to perform multiple sequence alignments

align_peptides(input_fasta, method='muscle', output_file=None, additional_args=None)

Perform MSA on reference peptide sequences. Outputs in format fasta.aln

Parameters:

Name Type Description Default
input_fasta Path

path to fasta file containing reference sequences

required
method str

Choose alignment method: "muscle" or "mafft". Defaults to "muscle".

'muscle'
output_file Path

path to output alignment file. Defaults to None.

None
additional_args str

additional arguments to aligner. Defaults to None.

None
Source code in src/metatag/alignment.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def align_peptides(
    input_fasta: Path,
    method: str = "muscle",
    output_file: Path = None,
    additional_args: str = None,
):
    """Perform MSA on reference peptide sequences.
       Outputs in format fasta.aln

    Args:
        input_fasta (Path): path to fasta file containing reference sequences
        method (str, optional): Choose alignment method: "muscle" or "mafft". Defaults to "muscle".
        output_file (Path, optional): path to output alignment file. Defaults to None.
        additional_args (str, optional): additional arguments to aligner. Defaults to None.
    """
    input_fasta = Path(input_fasta).resolve()
    if output_file is None:
        output_file = set_default_output_path(
            input_fasta, extension=".fasta.aln", only_filename=True
        )
    else:
        output_file = Path(output_file).resolve()
    if method.lower() in "muscle":
        wrappers.run_muscle(
            input_fasta=input_fasta,
            output_file=output_file,
            additional_args=additional_args,
        )
    elif method.lower() in "mafft":
        wrappers.run_mafft(
            input_fasta=input_fasta,
            output_file=output_file,
            additional_args=additional_args,
        )
    else:
        logger.error("Invalid method. Valid methods: muscle or mafft")
        sys.exit(1)

align_short_reads_to_reference_msa(ref_msa, query_seqs, method='papara', tree_nwk=None, output_dir=None)

Align short read query sequences to reference MSA (fasta format). Outputs fasta msa alignment between query and reference sequences

Parameters:

Name Type Description Default
ref_msa Path

path to reference MSA in fasta format

required
query_seqs Path

path to query sequences in fasta format

required
method str

choose aligner: "hmmalign" or "papara". Defaults to "papara".

'papara'
tree_nwk Path

path to reference tree in newick format. Defaults to None.

None
output_dir Path

path to output directory. Defaults to None.

None
Source code in src/metatag/alignment.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def align_short_reads_to_reference_msa(
    ref_msa: Path,
    query_seqs: Path,
    method: str = "papara",
    tree_nwk: Path = None,
    output_dir: Path = None,
) -> None:
    """Align short read query sequences to reference MSA (fasta format).
       Outputs fasta msa alignment between query and reference sequences

    Args:
        ref_msa (Path): path to reference MSA in fasta format
        query_seqs (Path): path to query sequences in fasta format
        method (str, optional): choose aligner: "hmmalign" or "papara". Defaults to "papara".
        tree_nwk (Path, optional): path to reference tree in newick format. Defaults to None.
        output_dir (Path, optional): path to output directory. Defaults to None.
    """
    ref_msa = Path(ref_msa).resolve()
    query_seqs = Path(query_seqs).resolve()
    tree_nwk = Path(tree_nwk).resolve()

    if output_dir is None:
        output_dir = set_default_output_path(ref_msa, only_dirname=True)
    output_hmm = output_dir / set_default_output_path(
        ref_msa, extension=".hmm", only_filename=True
    )
    output_aln_seqs = output_dir / set_default_output_path(
        query_seqs, extension=".faln", only_filename=True
    )

    if method.lower() in "hmmalign":
        with TemporaryFilePath() as temp_file_path:
            wrappers.run_hmmbuild(input_aln=ref_msa, output_hmm=output_hmm)

            wrappers.run_hmmalign(
                input_aln=ref_msa,
                input_hmm=output_hmm,
                input_seqs=query_seqs,
                output_aln_seqs=temp_file_path,
            )
            convert_stockholm_to_fasta_aln(
                input_stockholm=temp_file_path, output_fasta=output_aln_seqs
            )
    elif method.lower() in "papara":
        with TemporaryFilePath() as temp_phy_path, TemporaryFilePath() as temp_aln_path:
            convert_fasta_aln_to_phylip(
                input_fasta_aln=ref_msa, output_phylip=temp_phy_path
            )
            wrappers.run_papara(
                tree_nwk=tree_nwk,
                msa_phy=temp_phy_path,
                query_fasta=query_seqs,
                output_aln=temp_aln_path,
            )
            convert_phylip_to_fasta_aln(
                input_phylip=temp_aln_path, output_file=output_aln_seqs
            )
    else:
        logger.error("Alignment method not implemented")
        sys.exit(1)

Module phylotree

Tools to perform phylogenetic tree reconstructions and query sequence placements onto trees

get_iq_tree_model_from_log_file(iqtree_log)

Parse iqtree log file and return best fit model

If model supplied, search model in Command: iqtree ... -m 'model' If not, then -m TEST or -m MFP If one of those, continue to line: Best-fit model: 'model' chosen according to BIC

Parameters:

Name Type Description Default
iqtree_log Path

path to iqtree log file

required

Returns:

Name Type Description
str str

best substitution model employed by iqtree

Source code in src/metatag/phylotree.py
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
def get_iq_tree_model_from_log_file(iqtree_log: Path) -> str:
    """Parse iqtree log file and return best fit model

    If model supplied, search model in Command: iqtree ... -m 'model'
    If not, then -m TEST or -m MFP
    If one of those, continue to line:
    Best-fit model: 'model' chosen according to BIC

    Args:
        iqtree_log (Path): path to iqtree log file

    Returns:
        str: best substitution model employed by iqtree
    """
    with open(iqtree_log, "r") as log:
        text = log.read()
        subtext = re.search("(?<=Command: iqtree)(.*)(?=\\n)", text).group(1)
        model = re.search("(?<=-m )(.*)(?= -bb)", subtext).group(1)
        if model.lower() in ["mfp", "test"]:
            model = re.search("(?<=Best-fit model: )(.*)(?= chosen)", text).group(1)
    return model

get_tree_model_from_modeltest_log(modeltest_log, criterion='BIC')

Parse modeltest-ng log file and return best fit model according to selected criterion: BIC, AIC or AICc

Parameters:

Name Type Description Default
modeltest_log Path

path to modeltest-ng log file

required
criterion str

choose best model criterior. Defaults to "BIC".

'BIC'

Returns:

Name Type Description
str str

best substitution model employed by modeltest-ng

Source code in src/metatag/phylotree.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
def get_tree_model_from_modeltest_log(
    modeltest_log: Path, criterion: str = "BIC"
) -> str:
    """Parse modeltest-ng log file and return best fit model
    according to selected criterion: BIC, AIC or AICc

    Args:
        modeltest_log (Path): path to modeltest-ng log file
        criterion (str, optional): choose best model criterior. Defaults to "BIC".

    Returns:
        str: best substitution model employed by modeltest-ng
    """
    with open(modeltest_log, "r") as log:
        text = log.read()
        model = easy_pattern_matching(
            easy_pattern_matching(
                text, f"Best model according to {criterion}\n", "\nlnL"
            ),
            left_pattern="Model:",
        ).strip()
        return model

infer_tree(ref_aln, output_dir, method='iqtree', substitution_model='modeltest', additional_args=None)

Infer tree from reference msa. Best substitution model selected by default.

Parameters:

Name Type Description Default
ref_aln Path

path to reference alignment in FASTA format

required
output_dir Path

path to output directory

required
method str

choose tree inference method: "iqtree" or "fasttree". Defaults to "iqtree".

'iqtree'
substitution_model str

substitution model employed to infer tree or path to iqtree log file containing model. The name of an algorithm to choose best substitution model can also be provided. In that case, choose between "iqtest" or "modeltest". Defaults to "modeltest".

'modeltest'
additional_args str

additional arguments to tree algorithm as a string. Defaults to None.

None
Source code in src/metatag/phylotree.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def infer_tree(
    ref_aln: Path,
    output_dir: Path,
    method: str = "iqtree",
    substitution_model: str = "modeltest",
    additional_args: str = None,
) -> None:
    """Infer tree from reference msa. Best substitution model
       selected by default.

    Args:
        ref_aln (Path): path to reference alignment in FASTA format
        output_dir (Path): path to output directory
        method (str, optional): choose tree inference method: "iqtree" or "fasttree". Defaults to "iqtree".
        substitution_model (str, optional): substitution model employed to infer tree or path to iqtree log
            file containing model. The name of an algorithm to choose best substitution model can also be provided.
            In that case, choose between "iqtest" or "modeltest". Defaults to "modeltest".
        additional_args (str, optional): additional arguments to tree algorithm as a string. Defaults to None.
    """
    ref_aln = Path(ref_aln).resolve()
    if output_dir is None:
        output_dir = set_default_output_path(ref_aln, only_dirname=True)
    else:
        output_dir = Path(output_dir).resolve()
    if method.lower() in "iqtree":
        if "modeltest" in substitution_model.lower():
            logger.info("Selecting best subsitution model per modeltest-ng...")
            wrappers.runModelTest(
                input_algns=ref_aln, n_processes=None, output_dir=output_dir
            )
            best_model = get_tree_model_from_modeltest_log(
                modeltest_log=output_dir / "modeltest_result.log",
                criterion="BIC",
            )
            modeltest_starting_tree = output_dir / "modeltest_result.tree"
        elif "iqtest" in substitution_model.lower():
            best_model = "TEST"
            modeltest_starting_tree = None
        else:
            best_model = substitution_model
            modeltest_starting_tree = None

        wrappers.run_iqtree(
            input_algns=ref_aln,
            output_dir=output_dir,
            output_prefix="ref_database",
            keep_recovery_files=True,
            substitution_model=best_model,
            starting_tree=modeltest_starting_tree,
            additional_args=additional_args,
        )
        shutil.move(
            output_dir / "ref_database.contree",
            output_dir / "ref_database.newick",
        )

    elif method.lower() in "fasttree":
        wrappers.run_fasttree(
            input_algns=ref_aln,
            output_file=output_dir / "ref_database.newick",
            additional_args=additional_args,
        )
    else:
        logger.error("Wrong method, enter iqtree or fasttree")
        sys.exit(1)

relabel_tree(input_newick, label_dict, output_file=None, iTOL=True)

Relabel tree leaves with labels from provided dictionary. If iTOL is set, then labels are checked for iTOL compatibility

Parameters:

Name Type Description Default
input_newick Path

path to input tree in newick format

required
label_dict dict

dictionary with labels to replace. Keys original labels, values new labels.

required
output_file Path

path to output relabelled tree. Defaults to None.

None
iTOL bool

whether to check if label complies with iTOL requirements. Defaults to True.

True
Source code in src/metatag/phylotree.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
def relabel_tree(
    input_newick: Path, label_dict: dict, output_file: Path = None, iTOL=True
) -> None:
    """Relabel tree leaves with labels from
    provided dictionary. If iTOL is set, then
    labels are checked for iTOL compatibility

    Args:
        input_newick (Path): path to input tree in newick format
        label_dict (dict): dictionary with labels to replace. Keys original labels, values new labels.
        output_file (Path, optional): path to output relabelled tree. Defaults to None.
        iTOL (bool, optional): whether to check if label complies with iTOL requirements. Defaults to True.
    """
    if output_file is None:
        output_file = set_default_output_path(input_newick, tag="_relabel")
    if iTOL:
        sanity_check = sanity_check_for_iTOL
    else:

        def sanity_check(x):
            return x

    with open(input_newick, "r") as file:
        data = file.read()
    with open(output_file, "w") as file:
        for k, v in label_dict.items():
            data = data.replace(k + ":", sanity_check(v))
        file.write(data)

sanity_check_for_iTOL(label)

Reformat label to comply with iTOL requirements, remove: 1. white spaces 2. double underscores 3. symbols outside english letters and numbers

Parameters:

Name Type Description Default
label str

tree label as a string

required

Returns:

Name Type Description
str str

reformatted label

Source code in src/metatag/phylotree.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def sanity_check_for_iTOL(label: str) -> str:
    """Reformat label to comply with iTOL requirements, remove:
    1. white spaces
    2. double underscores
    3. symbols outside english letters and numbers

    Args:
        label (str): tree label as a string

    Returns:
        str: reformatted label
    """
    legal_chars = re.compile("[^a-zA-Z0-9]")
    itol_label = legal_chars.sub("_", label).replace("__", "_")
    return itol_label

Module placement

Tools to quantify and assign labels to placed sequences

JplaceParser

Methods to parse jplace files, as specified in https://journals.plos.org/plosone/article?id=10.1371/journal.pone.0031009

Source code in src/metatag/placement.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
class JplaceParser:
    """
    Methods to parse jplace files, as specified in
    https://journals.plos.org/plosone/article?id=10.1371/journal.pone.0031009
    """

    def __init__(self, path_to_jplace: Path) -> None:
        self._path_to_jplace = Path(path_to_jplace).resolve()
        self.jplace = self.get_json_object()
        self._tree_obj = next(
            Phylo.parse(StringIO(self.newickfy_tree(self.jplace["tree"])), "newick")
        )

    def get_json_object(self) -> dict:
        with open(self._path_to_jplace, "r") as JSON:
            return json.load(JSON)

    @property
    def meta(self):
        """
        Print metadata
        """
        return self.jplace["metadata"]

    @property
    def fields(self):
        """
        Print data fields
        """
        return self.jplace["fields"]

    @property
    def placements(self):
        """
        Return placement objects
        """
        return self.jplace["placements"]

    @staticmethod
    def newickfy_tree(tree_str: str) -> str:
        """Remove branch IDs from jplace tree string

        Args:
            tree_str (str): jplace tree string

        Returns:
            str: newick tree string
        """
        subs_tree = re.sub("\{(\d+)\}", "", tree_str)
        return subs_tree

    def get_reference_sequences(self) -> list:
        """
        Get list of reference sequences in the placement tree
        """
        return [c.name for c in self._tree_obj.get_terminals()]

    def compute_tree_diameter(self) -> float:
        """
        Find maximum (pairwise) distance between two tips
        (leaves) in the tree
        """
        root = self._tree_obj.root
        max_distance = 0.0
        tips = self._tree_obj.get_terminals()
        for tip in tips:
            self._tree_obj.root_with_outgroup(tip)
            new_max = max(self._tree_obj.depths().values())
            if new_max > max_distance:
                max_distance = new_max
        self._tree_obj.root_with_outgroup(root)
        return max_distance

    def filter_placements_by_minimum_lwr(
        self, minimum_lwr: float, output_file: Path = None
    ) -> None:
        """Filter placements by minimum LWR

        Args:
            minimum_lwr (float): LWR threshold (between 0 and 1)
            output_file (Path, optional): path to output file. Defaults to None.
        """
        if output_file is None:
            output_file = set_default_output_path(
                self._path_to_jplace, tag=f"_min_lwr_{minimum_lwr}"
            )
        else:
            output_file = Path(output_file).resolve()
        jplace = self.get_json_object()

        filtered_placement_objs = []
        for placement_object in jplace["placements"]:
            filtered_placements = []
            for placement in placement_object["p"]:
                edge_num, likelihood, lwr, distal_length, pendant_length = placement
                if lwr >= minimum_lwr:
                    filtered_placements.append(placement)
            if filtered_placements:
                placement_object["p"] = filtered_placements
                filtered_placement_objs.append(placement_object)
        jplace["placements"] = filtered_placement_objs
        with open(output_file, "w") as ofile:
            json.dump(jplace, ofile, indent=2)

    def filter_placements_by_max_pendant_to_tree_diameter_ratio(
        self, max_pendant_ratio: float, output_file: Path = None
    ) -> None:
        """Filter placements by maximum pendant length

        Args:
            max_pendant_ratio (float): cutoff value for pendant  to tree diameter ratio
            output_file (Path, optional): path to output file. Defaults to None.
        """
        if output_file is None:
            output_file = set_default_output_path(
                self._path_to_jplace,
                tag=f"_max_pendant_diameter_ratio_{max_pendant_ratio}",
            )
        else:
            output_file = Path(output_file).resolve()
        tree_diameter = self.compute_tree_diameter()
        logger.info(f"Filtering placements for tree diameter: {tree_diameter}")
        jplace = self.get_json_object()

        filtered_placement_objs = []
        for placement_object in jplace["placements"]:
            filtered_placements = []
            for placement in placement_object["p"]:
                edge_num, likelihood, lwr, distal_length, pendant_length = placement
                if pendant_length / tree_diameter <= max_pendant_ratio:
                    filtered_placements.append(placement)
            if filtered_placements:
                placement_object["p"] = filtered_placements
                filtered_placement_objs.append(placement_object)
        jplace["placements"] = filtered_placement_objs

        with open(output_file, "w") as ofile:
            json.dump(jplace, ofile, indent=2)

    def filter_placements_by_max_pendant_length(
        self, max_pendant_length: float, output_file: Path = None
    ) -> None:
        """Filter placements by maximum pendant length

        Args:
            max_pendant_length (float): cutoff value for pendant length of placements
            output_file (Path, optional): path to output file. Defaults to None.
        """
        if output_file is None:
            output_file = set_default_output_path(
                self._path_to_jplace, tag=f"_max_pendant_{max_pendant_length}"
            )
        else:
            output_file = Path(output_file).resolve()
        jplace = self.get_json_object()

        filtered_placement_objs = []
        for placement_object in jplace["placements"]:
            filtered_placements = []
            for placement in placement_object["p"]:
                edge_num, likelihood, lwr, distal_length, pendant_length = placement
                if pendant_length <= max_pendant_length:
                    filtered_placements.append(placement)
            if filtered_placements:
                placement_object["p"] = filtered_placements
                filtered_placement_objs.append(placement_object)
        jplace["placements"] = filtered_placement_objs

        with open(output_file, "w") as ofile:
            json.dump(jplace, ofile, indent=2)

    def filter_placements_by_max_pendant_to_distal_length_ratio(
        self, max_pendant_ratio: float, output_file: Path = None
    ) -> None:
        """Filter placements by maximum pendant length

        Args:
            max_pendant_ratio (float): cutoff value for the pendant to
                distal length ratio of placements
            output_file (Path, optional): path to output file. Defaults to None.
        """
        if output_file is None:
            output_file = set_default_output_path(
                self._path_to_jplace,
                tag=f"_max_pendant_distal_ratio_{max_pendant_ratio}",
            )
        else:
            output_file = Path(output_file).resolve()
        jplace = self.get_json_object()

        filtered_placement_objs = []
        for placement_object in jplace["placements"]:
            filtered_placements = []
            for placement in placement_object["p"]:
                edge_num, likelihood, lwr, distal_length, pendant_length = placement
                if pendant_length / distal_length <= max_pendant_ratio:
                    filtered_placements.append(placement)
            if filtered_placements:
                placement_object["p"] = filtered_placements
                filtered_placement_objs.append(placement_object)
        jplace["placements"] = filtered_placement_objs

        with open(output_file, "w") as ofile:
            json.dump(jplace, ofile, indent=2)

fields property

Print data fields

meta property

Print metadata

placements property

Return placement objects

compute_tree_diameter()

Find maximum (pairwise) distance between two tips (leaves) in the tree

Source code in src/metatag/placement.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def compute_tree_diameter(self) -> float:
    """
    Find maximum (pairwise) distance between two tips
    (leaves) in the tree
    """
    root = self._tree_obj.root
    max_distance = 0.0
    tips = self._tree_obj.get_terminals()
    for tip in tips:
        self._tree_obj.root_with_outgroup(tip)
        new_max = max(self._tree_obj.depths().values())
        if new_max > max_distance:
            max_distance = new_max
    self._tree_obj.root_with_outgroup(root)
    return max_distance

filter_placements_by_max_pendant_length(max_pendant_length, output_file=None)

Filter placements by maximum pendant length

Parameters:

Name Type Description Default
max_pendant_length float

cutoff value for pendant length of placements

required
output_file Path

path to output file. Defaults to None.

None
Source code in src/metatag/placement.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def filter_placements_by_max_pendant_length(
    self, max_pendant_length: float, output_file: Path = None
) -> None:
    """Filter placements by maximum pendant length

    Args:
        max_pendant_length (float): cutoff value for pendant length of placements
        output_file (Path, optional): path to output file. Defaults to None.
    """
    if output_file is None:
        output_file = set_default_output_path(
            self._path_to_jplace, tag=f"_max_pendant_{max_pendant_length}"
        )
    else:
        output_file = Path(output_file).resolve()
    jplace = self.get_json_object()

    filtered_placement_objs = []
    for placement_object in jplace["placements"]:
        filtered_placements = []
        for placement in placement_object["p"]:
            edge_num, likelihood, lwr, distal_length, pendant_length = placement
            if pendant_length <= max_pendant_length:
                filtered_placements.append(placement)
        if filtered_placements:
            placement_object["p"] = filtered_placements
            filtered_placement_objs.append(placement_object)
    jplace["placements"] = filtered_placement_objs

    with open(output_file, "w") as ofile:
        json.dump(jplace, ofile, indent=2)

filter_placements_by_max_pendant_to_distal_length_ratio(max_pendant_ratio, output_file=None)

Filter placements by maximum pendant length

Parameters:

Name Type Description Default
max_pendant_ratio float

cutoff value for the pendant to distal length ratio of placements

required
output_file Path

path to output file. Defaults to None.

None
Source code in src/metatag/placement.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
def filter_placements_by_max_pendant_to_distal_length_ratio(
    self, max_pendant_ratio: float, output_file: Path = None
) -> None:
    """Filter placements by maximum pendant length

    Args:
        max_pendant_ratio (float): cutoff value for the pendant to
            distal length ratio of placements
        output_file (Path, optional): path to output file. Defaults to None.
    """
    if output_file is None:
        output_file = set_default_output_path(
            self._path_to_jplace,
            tag=f"_max_pendant_distal_ratio_{max_pendant_ratio}",
        )
    else:
        output_file = Path(output_file).resolve()
    jplace = self.get_json_object()

    filtered_placement_objs = []
    for placement_object in jplace["placements"]:
        filtered_placements = []
        for placement in placement_object["p"]:
            edge_num, likelihood, lwr, distal_length, pendant_length = placement
            if pendant_length / distal_length <= max_pendant_ratio:
                filtered_placements.append(placement)
        if filtered_placements:
            placement_object["p"] = filtered_placements
            filtered_placement_objs.append(placement_object)
    jplace["placements"] = filtered_placement_objs

    with open(output_file, "w") as ofile:
        json.dump(jplace, ofile, indent=2)

filter_placements_by_max_pendant_to_tree_diameter_ratio(max_pendant_ratio, output_file=None)

Filter placements by maximum pendant length

Parameters:

Name Type Description Default
max_pendant_ratio float

cutoff value for pendant to tree diameter ratio

required
output_file Path

path to output file. Defaults to None.

None
Source code in src/metatag/placement.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
def filter_placements_by_max_pendant_to_tree_diameter_ratio(
    self, max_pendant_ratio: float, output_file: Path = None
) -> None:
    """Filter placements by maximum pendant length

    Args:
        max_pendant_ratio (float): cutoff value for pendant  to tree diameter ratio
        output_file (Path, optional): path to output file. Defaults to None.
    """
    if output_file is None:
        output_file = set_default_output_path(
            self._path_to_jplace,
            tag=f"_max_pendant_diameter_ratio_{max_pendant_ratio}",
        )
    else:
        output_file = Path(output_file).resolve()
    tree_diameter = self.compute_tree_diameter()
    logger.info(f"Filtering placements for tree diameter: {tree_diameter}")
    jplace = self.get_json_object()

    filtered_placement_objs = []
    for placement_object in jplace["placements"]:
        filtered_placements = []
        for placement in placement_object["p"]:
            edge_num, likelihood, lwr, distal_length, pendant_length = placement
            if pendant_length / tree_diameter <= max_pendant_ratio:
                filtered_placements.append(placement)
        if filtered_placements:
            placement_object["p"] = filtered_placements
            filtered_placement_objs.append(placement_object)
    jplace["placements"] = filtered_placement_objs

    with open(output_file, "w") as ofile:
        json.dump(jplace, ofile, indent=2)

filter_placements_by_minimum_lwr(minimum_lwr, output_file=None)

Filter placements by minimum LWR

Parameters:

Name Type Description Default
minimum_lwr float

LWR threshold (between 0 and 1)

required
output_file Path

path to output file. Defaults to None.

None
Source code in src/metatag/placement.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def filter_placements_by_minimum_lwr(
    self, minimum_lwr: float, output_file: Path = None
) -> None:
    """Filter placements by minimum LWR

    Args:
        minimum_lwr (float): LWR threshold (between 0 and 1)
        output_file (Path, optional): path to output file. Defaults to None.
    """
    if output_file is None:
        output_file = set_default_output_path(
            self._path_to_jplace, tag=f"_min_lwr_{minimum_lwr}"
        )
    else:
        output_file = Path(output_file).resolve()
    jplace = self.get_json_object()

    filtered_placement_objs = []
    for placement_object in jplace["placements"]:
        filtered_placements = []
        for placement in placement_object["p"]:
            edge_num, likelihood, lwr, distal_length, pendant_length = placement
            if lwr >= minimum_lwr:
                filtered_placements.append(placement)
        if filtered_placements:
            placement_object["p"] = filtered_placements
            filtered_placement_objs.append(placement_object)
    jplace["placements"] = filtered_placement_objs
    with open(output_file, "w") as ofile:
        json.dump(jplace, ofile, indent=2)

get_reference_sequences()

Get list of reference sequences in the placement tree

Source code in src/metatag/placement.py
86
87
88
89
90
def get_reference_sequences(self) -> list:
    """
    Get list of reference sequences in the placement tree
    """
    return [c.name for c in self._tree_obj.get_terminals()]

newickfy_tree(tree_str) staticmethod

Remove branch IDs from jplace tree string

Parameters:

Name Type Description Default
tree_str str

jplace tree string

required

Returns:

Name Type Description
str str

newick tree string

Source code in src/metatag/placement.py
73
74
75
76
77
78
79
80
81
82
83
84
@staticmethod
def newickfy_tree(tree_str: str) -> str:
    """Remove branch IDs from jplace tree string

    Args:
        tree_str (str): jplace tree string

    Returns:
        str: newick tree string
    """
    subs_tree = re.sub("\{(\d+)\}", "", tree_str)
    return subs_tree

TaxAssignParser

Parse function and taxonomy placement assignments table

Source code in src/metatag/placement.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
class TaxAssignParser:
    """
    Parse function and taxonomy placement assignments table
    """

    def __init__(self, tax_assign_path: Path):
        """Parse function and taxonomy placement assignments table

        Args:
            tax_assign_path (Path): path to tsv file containing taxonomic assignments
        """
        self._tax_assign = pd.read_csv(tax_assign_path, sep="\t")
        self._tax_assign = self._tax_assign[self._tax_assign.cluster_id != "DISTANT"]
        if "cluster_score" in self._tax_assign.columns:
            self._tax_assign.cluster_score = self._tax_assign.cluster_score.apply(
                lambda x: float(x)
            )
        else:
            self._tax_assign["cluster_score"] = 1.0
        self._tax_assign.cluster_taxopath = self._tax_assign.cluster_taxopath.apply(
            lambda x: "Unspecified" if pd.isna(x) else x
        )
        self._tax_assign.taxopath = self._tax_assign.taxopath.apply(
            lambda x: "Unspecified" if pd.isna(x) else x
        )

    @property
    def taxlevels(self):
        return Taxopath().taxlevels

    def count_hits(
        self,
        cluster_ids: list[str] = None,
        score_threshold: float = None,
        taxopath_type: str = "taxopath",
        path_to_query_list: Path = None,
    ) -> TaxonomyCounter:
        """Count hits within given cluster ids and at specificied taxon level

        Args:
            cluster_ids (list[str], optional): IDs of tree clusters to be included
                in the counting of placements. Defaults to None.
            score_threshold (float, optional): global placement score threshold to filter
                low-quality placements out. Defaults to None.
            taxopath_type (str, optional): 'taxopath' to use gappa-assign taxopath or
                'cluster_taxopath' to use lowest common taxopath of the reference tree cluster.
                Defaults to "taxopath".
            path_to_query_list (Path, optional): if not None, then a tsv is exported to
                defined location containing those queries with correct cluster assignment (
                according to defined 'valid' cluster ids or threshold). Defaults to None.

        Returns:
            TaxonomyCounter: _description_
        """
        if cluster_ids is None:
            cluster_ids = self._tax_assign.cluster_id.unique()
        if score_threshold is None:
            score_threshold = 0.0

        query_hits = self._tax_assign[
            (
                (self._tax_assign.cluster_id.isin(cluster_ids))
                & (self._tax_assign.cluster_score >= score_threshold)
            )
        ]
        if path_to_query_list is not None:
            query_hits.to_csv(path_to_query_list, sep="\t", index=False)
        taxopath_hits = query_hits[taxopath_type].values
        if len(taxopath_hits) == 0:
            logger.error(
                "No placement hits returned for the provided filter parameters"
            )
            sys.exit(1)

        taxlevel_counter = TaxonomyCounter(taxopath_list=taxopath_hits)
        return taxlevel_counter

__init__(tax_assign_path)

Parse function and taxonomy placement assignments table

Parameters:

Name Type Description Default
tax_assign_path Path

path to tsv file containing taxonomic assignments

required
Source code in src/metatag/placement.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
def __init__(self, tax_assign_path: Path):
    """Parse function and taxonomy placement assignments table

    Args:
        tax_assign_path (Path): path to tsv file containing taxonomic assignments
    """
    self._tax_assign = pd.read_csv(tax_assign_path, sep="\t")
    self._tax_assign = self._tax_assign[self._tax_assign.cluster_id != "DISTANT"]
    if "cluster_score" in self._tax_assign.columns:
        self._tax_assign.cluster_score = self._tax_assign.cluster_score.apply(
            lambda x: float(x)
        )
    else:
        self._tax_assign["cluster_score"] = 1.0
    self._tax_assign.cluster_taxopath = self._tax_assign.cluster_taxopath.apply(
        lambda x: "Unspecified" if pd.isna(x) else x
    )
    self._tax_assign.taxopath = self._tax_assign.taxopath.apply(
        lambda x: "Unspecified" if pd.isna(x) else x
    )

count_hits(cluster_ids=None, score_threshold=None, taxopath_type='taxopath', path_to_query_list=None)

Count hits within given cluster ids and at specificied taxon level

Parameters:

Name Type Description Default
cluster_ids list[str]

IDs of tree clusters to be included in the counting of placements. Defaults to None.

None
score_threshold float

global placement score threshold to filter low-quality placements out. Defaults to None.

None
taxopath_type str

'taxopath' to use gappa-assign taxopath or 'cluster_taxopath' to use lowest common taxopath of the reference tree cluster. Defaults to "taxopath".

'taxopath'
path_to_query_list Path

if not None, then a tsv is exported to defined location containing those queries with correct cluster assignment ( according to defined 'valid' cluster ids or threshold). Defaults to None.

None

Returns:

Name Type Description
TaxonomyCounter TaxonomyCounter

description

Source code in src/metatag/placement.py
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def count_hits(
    self,
    cluster_ids: list[str] = None,
    score_threshold: float = None,
    taxopath_type: str = "taxopath",
    path_to_query_list: Path = None,
) -> TaxonomyCounter:
    """Count hits within given cluster ids and at specificied taxon level

    Args:
        cluster_ids (list[str], optional): IDs of tree clusters to be included
            in the counting of placements. Defaults to None.
        score_threshold (float, optional): global placement score threshold to filter
            low-quality placements out. Defaults to None.
        taxopath_type (str, optional): 'taxopath' to use gappa-assign taxopath or
            'cluster_taxopath' to use lowest common taxopath of the reference tree cluster.
            Defaults to "taxopath".
        path_to_query_list (Path, optional): if not None, then a tsv is exported to
            defined location containing those queries with correct cluster assignment (
            according to defined 'valid' cluster ids or threshold). Defaults to None.

    Returns:
        TaxonomyCounter: _description_
    """
    if cluster_ids is None:
        cluster_ids = self._tax_assign.cluster_id.unique()
    if score_threshold is None:
        score_threshold = 0.0

    query_hits = self._tax_assign[
        (
            (self._tax_assign.cluster_id.isin(cluster_ids))
            & (self._tax_assign.cluster_score >= score_threshold)
        )
    ]
    if path_to_query_list is not None:
        query_hits.to_csv(path_to_query_list, sep="\t", index=False)
    taxopath_hits = query_hits[taxopath_type].values
    if len(taxopath_hits) == 0:
        logger.error(
            "No placement hits returned for the provided filter parameters"
        )
        sys.exit(1)

    taxlevel_counter = TaxonomyCounter(taxopath_list=taxopath_hits)
    return taxlevel_counter

add_clusters_to_tax_table(in_taxtable, clusters=None, out_taxtable=None)

Add tree cluster info at the beginning of each taxopath according to clusters defined in dictionary 'clusters'

Parameters:

Name Type Description Default
in_taxtable Path

path to taxonomy table

required
clusters dict, optionall

dictionary with keys equal to cluster IDs and values to lists of cluster members. If None, then all sequences are assumed to belong to the same cluster. Defaults to None.

None
out_taxtable Path

path to output taxonomy table. Defaults to None.

None
Source code in src/metatag/placement.py
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
def add_clusters_to_tax_table(
    in_taxtable: Path, clusters: dict = None, out_taxtable: Path = None
) -> None:
    """Add tree cluster info at the beginning of each taxopath
    according to clusters defined in dictionary 'clusters'

    Args:
        in_taxtable (Path): path to taxonomy table
        clusters (dict, optionall): dictionary with keys equal to cluster IDs
            and values to lists of cluster members. If None, then all sequences
            are assumed to belong to the same cluster. Defaults to None.
        out_taxtable (Path, optional): path to output taxonomy table. Defaults to None.
    """
    if out_taxtable is None:
        out_taxtable = set_default_output_path(in_taxtable, tag="_clustered")
    else:
        out_taxtable = Path(out_taxtable).resolve()
    taxtable = pd.read_csv(in_taxtable, sep="\t", header=None, dtype=str)
    for i, row in taxtable.iterrows():
        row[1] = clusters[row[0]] if clusters is not None else "C0" + ";" + row[1]
    taxtable.to_csv(out_taxtable, sep="\t", index=False, header=None)

add_duplicates_to_assignment_table(taxtable, query_duplicates, output_file=None)

Add duplicated query IDs to cluster and taxonomic assignment table

Parameters:

Name Type Description Default
taxtable Path

path to cluster and taxonomic assignment table

required
query_duplicates Path

path to query duplicates file as output by seqkit rmdup

required
output_file Path

path to output file. Defaults to None.

None
Source code in src/metatag/placement.py
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
def add_duplicates_to_assignment_table(
    taxtable: Path, query_duplicates: Path, output_file: Path = None
) -> None:
    """Add duplicated query IDs to cluster and taxonomic assignment table

    Args:
        taxtable (Path): path to cluster and taxonomic assignment table
        query_duplicates (Path): path to query duplicates file as output by seqkit rmdup
        output_file (Path, optional): path to output file. Defaults to None.
    """
    if output_file is None:
        output_file = set_default_output_path(taxtable, tag="_duplicates")
    else:
        output_file = Path(output_file).resolve()

    duplicated_hits = []
    assigns = pd.read_csv(taxtable, sep="\t", index_col=False)
    dup_dict = parse_duplicates_from_seqkit(query_duplicates)
    for query_name, duplicate_string in dup_dict.items():
        duplicates = duplicate_string.split(",")
        duplicated_hits = []
        for duplicate in duplicates:
            row = assigns[assigns.query_name == query_name].copy()
            if not row.empty:
                row = row.iloc[0, :]
                row.query_name = duplicate.strip()
                duplicated_hits.append(row)
    if duplicated_hits:
        assigns = pd.concat(
            [assigns, pd.concat(duplicated_hits, axis=0, ignore_index=True)],
            axis=0,
            ignore_index=True,
        )
    assigns.to_csv(output_file, sep="\t", index=False)

add_query_labels_to_assign_table(input_table, query_labels, output_table=None)

Add new column containing actual query labels to query taxonomy/cluster assignment table

Parameters:

Name Type Description Default
input_table Path

path to query taxonomy/cluster assignment table

required
query_labels dict

dictionary with keys equal to query short IDs and values to query labels

required
output_table Path

path to output table. Defaults to None.

None
Source code in src/metatag/placement.py
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
def add_query_labels_to_assign_table(
    input_table: Path, query_labels: dict, output_table: Path = None
) -> None:
    """Add new column containing actual query labels to query taxonomy/cluster
    assignment table

    Args:
        input_table (Path): path to query taxonomy/cluster assignment table
        query_labels (dict): dictionary with keys equal to query short IDs
            and values to query labels
        output_table (Path, optional): path to output table. Defaults to None.
    """

    def relabel_query(query_id: str, query_labels: dict) -> str:
        try:
            return query_labels[query_id]
        except Exception:
            return query_id

    if output_table is None:
        output_table = set_default_output_path(input_table, tag="_relabel")
    else:
        output_table = Path(output_table).resolve()
    df = pd.read_csv(input_table, sep="\t")
    df.insert(
        1, "query_name", df.query_id.apply(lambda x: relabel_query(x, query_labels))
    )
    df = df.set_index("query_id")
    df.to_csv(output_table, sep="\t")

assign_labels_to_placements(jplace, ref_labels, query_labels=None, output_dir=None, output_prefix=None, only_best_hit=True, ref_clusters_file=None, ref_cluster_scores_file=None, gappa_additional_args=None, only_unique_cluster=True, taxo_file=None)

Assign taxonomy and/or tree cluster IDs to placed query sequences based on taxonomy assigned to tree reference sequences using gappa examine assign.

Parameters:

Name Type Description Default
jplace Path

path to jplace file

required
ref_labels dict

dictionary containing short IDs as keys and long labels as values for reference sequences

required
query_labels dict

dictionary containing short IDs as keys and long labels as values for query sequences. Defaults to None.

None
output_dir Path

path to output directory. Defaults to None.

None
output_prefix str

prefix to output files. Defaults to None.

None
only_best_hit bool

only report taxonomy with largest LWR per query. Defaults to True.

True
ref_clusters_file Path

path to tsv containing reference cluster definitions. Defaults to None.

None
ref_cluster_scores_file Path

path to tsv containing cluster scores. Defaults to None.

None
gappa_additional_args str

additional arguments to gappa. Defaults to None.

None
only_unique_cluster bool

if True, keep only queries with multiple placement locations if they were assigned to the same cluster. Defaults to True.

True
taxo_file Path

path to taxonomy database. Defaults to None.

None
Source code in src/metatag/placement.py
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
def assign_labels_to_placements(
    jplace: Path,
    ref_labels: dict,
    query_labels: dict = None,
    output_dir: Path = None,
    output_prefix: str = None,
    only_best_hit: bool = True,
    ref_clusters_file: Path = None,
    ref_cluster_scores_file: Path = None,
    gappa_additional_args: str = None,
    only_unique_cluster: bool = True,
    taxo_file: Path = None,
) -> None:
    """Assign taxonomy and/or tree cluster IDs to placed query sequences based on
    taxonomy assigned to tree reference sequences using gappa examine assign.

    Args:
        jplace (Path): path to jplace file
        ref_labels (dict): dictionary containing short IDs as keys and long
            labels as values for reference sequences
        query_labels (dict, optional): dictionary containing short IDs as keys
            and long labels as values for query sequences. Defaults to None.
        output_dir (Path, optional): path to output directory. Defaults to None.
        output_prefix (str, optional): prefix to output files. Defaults to None.
        only_best_hit (bool, optional): only report taxonomy with largest LWR
            per query. Defaults to True.
        ref_clusters_file (Path, optional): path to tsv containing reference
            cluster definitions. Defaults to None.
        ref_cluster_scores_file (Path, optional): path to tsv containing cluster
            scores. Defaults to None.
        gappa_additional_args (str, optional): additional arguments to gappa.
            Defaults to None.
        only_unique_cluster (bool, optional): if True, keep only queries with
            multiple placement locations if they were assigned to the same
            cluster. Defaults to True.
        taxo_file (Path, optional): path to taxonomy database. Defaults to None.
    """
    if output_dir is None:
        output_dir = set_default_output_path(jplace, only_dirname=True)
    else:
        output_dir = Path(output_dir).resolve()
    if output_prefix is None:
        output_prefix = set_default_output_path(jplace, only_filename=True)

    if ref_clusters_file is not None:
        defined_clusters = True
        ref_clusters = parse_tree_clusters(ref_clusters_file, cluster_as_key=False)
        ref_clusters_as_keys = parse_tree_clusters(
            ref_clusters_file, cluster_as_key=True
        )
    else:
        defined_clusters = False
        ref_clusters = None

    if ref_cluster_scores_file is not None:
        ref_cluster_scores = parse_tree_cluster_quality_scores(ref_cluster_scores_file)
    else:
        ref_cluster_scores = None

    if taxo_file is None:
        taxo_file = package_dir / "data" / "merged_taxonomy.tsv"
    else:
        taxo_file = Path(taxo_file).resolve()

    gappa_assign_out = output_dir / f"{output_prefix}per_query.tsv"
    query_taxo_out = output_dir / f"{output_prefix}assignments.tsv"

    # Remove references that are not in placement tree
    ref_in_jplace_tree = JplaceParser(jplace).get_reference_sequences()
    ref_labels = {k: v for k, v in ref_labels.items() if k in ref_in_jplace_tree}

    with TemporaryFilePath() as temptax:
        taxonomy = TaxonomyAssigner(taxo_file=taxo_file)
        taxonomy.build_gappa_taxonomy_table(ref_labels, output_file=temptax)
        add_clusters_to_tax_table(
            in_taxtable=temptax, clusters=ref_clusters, out_taxtable=temptax
        )
        if defined_clusters:
            clusters_taxopath = taxonomy.assign_lowest_common_taxonomy_to_clusters(
                clusters=ref_clusters_as_keys, label_dict=ref_labels
            )
        else:
            clusters_taxopath = None
        wrappers.run_gappa_assign(
            jplace=jplace,
            taxonomy_file=temptax,
            output_dir=output_dir,
            output_prefix=output_prefix,
            only_best_hit=only_best_hit,
            additional_args=gappa_additional_args,
        )

    with TemporaryFilePath() as tempout, TemporaryFilePath() as tempout2, TemporaryFilePath() as tempout3:
        parse_gappa_assign_table(
            input_table=gappa_assign_out,
            has_cluster_id=True,
            cluster_scores=ref_cluster_scores,
            output_file=tempout,
            clusters_taxopath=clusters_taxopath,
        )

        if only_unique_cluster and defined_clusters:
            filter_non_unique_placement_assignments(
                placed_tax_assignments=tempout, output_file=tempout2
            )
            shutil.move(tempout2, tempout)

        pick_taxopath_with_highest_lwr(
            placed_tax_assignments=tempout, output_file=tempout3
        )

        if query_labels is not None:
            add_query_labels_to_assign_table(
                input_table=tempout3,
                query_labels=query_labels,
                output_table=query_taxo_out,
            )
        else:
            shutil.move(tempout3, query_taxo_out)

filter_non_unique_placement_assignments(placed_tax_assignments, output_file=None)

Remove queries that were assigned to more than one cluster from placements assignments table

Parameters:

Name Type Description Default
placed_tax_assignments Path

path to placed taxonomic assignments table

required
output_file Path

path to output file. Defaults to None.

None
Source code in src/metatag/placement.py
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
def filter_non_unique_placement_assignments(
    placed_tax_assignments: Path, output_file: Path = None
) -> None:
    """Remove queries that were assigned to more than one cluster from
       placements assignments table

    Args:
        placed_tax_assignments (Path): path to placed taxonomic assignments table
        output_file (Path, optional): path to output file. Defaults to None.
    """
    if output_file is None:
        output_file = set_default_output_path(placed_tax_assignments, tag="_filtered")
    else:
        output_file = Path(output_file).resolve()

    df = pd.read_csv(placed_tax_assignments, sep="\t")
    queries_in_more_than_one_cluster, _ = find_queries_placed_in_several_clusters(
        placed_tax_assignments
    )
    fdf = df[~df.query_id.isin(queries_in_more_than_one_cluster)].set_index("query_id")
    fdf.to_csv(output_file, sep="\t")

find_queries_placed_in_several_clusters(placed_tax_assignments)

Find queries placed in more than one cluster

Parameters:

Name Type Description Default
placed_tax_assignments Path

path to placed taxonomic assignments table

required

Returns:

Type Description
tuple[list, pd.DataFrame]

tuple[list, pd.DataFrame]: list of query IDs placed in more than one cluster and dataframe with unique cluster assignments per query

Source code in src/metatag/placement.py
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
def find_queries_placed_in_several_clusters(
    placed_tax_assignments: Path,
) -> tuple[list, pd.DataFrame]:
    """Find queries placed in more than one cluster

    Args:
        placed_tax_assignments (Path): path to placed taxonomic assignments table

    Returns:
        tuple[list, pd.DataFrame]: list of query IDs placed in more than
            one cluster and dataframe with unique cluster assignments per query
    """
    df = pd.read_csv(placed_tax_assignments, sep="\t", dtype=str)
    dfu = df.groupby("query_id")["cluster_id"].agg(["unique"])

    queries_in_more_than_one_cluster = []
    for i, row in dfu.iterrows():
        if len(row.item()) > 1:
            queries_in_more_than_one_cluster.append(row.name)
    return queries_in_more_than_one_cluster, dfu

parse_duplicates_from_seqkit(query_duplicates)

Add a column with the query IDs of duplicated sequences to the taxonomy assignments file.

Parameters:

Name Type Description Default
query_duplicates Path

path to query duplicates file as output by seqkit rmdup

required

Returns:

Name Type Description
_type_ None

description

Source code in src/metatag/placement.py
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
def parse_duplicates_from_seqkit(query_duplicates: Path) -> None:
    """Add a column with the query IDs of duplicated sequences to the taxonomy assignments file.

    Args:
        query_duplicates (Path): path to query duplicates file as output by seqkit rmdup

    Returns:
        _type_: _description_
    """
    df = pd.read_csv(query_duplicates, sep="\t", header=None)
    dup_labels = {
        dup_pair[1]
        .split(",")[0]
        .strip(): ",".join(
            [seq_name.strip() for seq_name in dup_pair[1].split(",")[1:]]
        )
        .strip()
        for dup_pair in df.values
    }
    return dup_labels

parse_gappa_assign_table(input_table, has_cluster_id=True, cluster_scores=None, clusters_taxopath=None, output_file=None)

Parse gappa assign per query taxonomy assignment result tsv

Parameters:

Name Type Description Default
input_table Path

path to gappa assign per query taxonomy assignment result tsv

required
has_cluster_id bool

set to True if results table includes reference cluster info in the first element of taxopath. Defaults to True.

True
cluster_scores dict

dictionary with values set to cluster quality scores. It is only used if has_cluster_id = True. Defaults to None.

None
clusters_taxopath dict

dict with keys equal to cluster IDs and values corresponding to the lowest common taxopath for the cluster. Defaults to None.

None
output_file Path

path to output file. Defaults to None.

None
Source code in src/metatag/placement.py
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
def parse_gappa_assign_table(
    input_table: Path,
    has_cluster_id: bool = True,
    cluster_scores: dict = None,
    clusters_taxopath: dict = None,
    output_file: Path = None,
) -> None:
    """Parse gappa assign per query taxonomy assignment result tsv

    Args:
        input_table (Path): path to gappa assign per query taxonomy assignment result tsv
        has_cluster_id (bool, optional): set to True if results table includes reference
            cluster info in the first element of taxopath. Defaults to True.
        cluster_scores (dict, optional): dictionary with values set to cluster quality
            scores. It is only used if has_cluster_id = True. Defaults to None.
        clusters_taxopath (dict, optional): dict with keys equal to cluster IDs and values
            corresponding to the lowest common taxopath for the cluster. Defaults to None.
        output_file (Path, optional): path to output file. Defaults to None.
    """
    if output_file is None:
        output_file = set_default_output_path(input_table, tag="_parsed")
    else:
        output_file = Path(output_file).resolve()
    if (has_cluster_id) and (cluster_scores is not None):
        cluster_str = (
            "cluster_id" + "\t" + "cluster_score" + "\t" + "cluster_taxopath" + "\t"
        )
    elif (has_cluster_id) and (cluster_scores is None):
        cluster_str = "cluster_id" + "\t" + "cluster_taxopath" + "\t"
    elif not has_cluster_id:
        cluster_str = ""
    table = pd.read_csv(input_table, sep="\t")
    with open(output_file, "w") as file:
        lines = []
        header = "query_id" + "\t" + "LWR" + "\t" + cluster_str + "taxopath" + "\n"
        lines.append(header)
        for i, row in table.iterrows():
            if ";" not in row.taxopath:
                row.taxopath = row.taxopath + ";Unspecified"
            if has_cluster_id:
                elems = row.taxopath.split(";")
                cluster_id = elems[0]
                if (clusters_taxopath is not None) and (
                    cluster_id in clusters_taxopath.keys()
                ):
                    cluster_taxopath = clusters_taxopath[cluster_id]
                else:
                    cluster_taxopath = ""
                if not cluster_taxopath:
                    cluster_taxopath = "Unspecified"
                taxopath = cluster_taxopath + "\t" + ";".join(elems[1:])
            else:
                cluster_id, taxopath = "", row.taxopath
            if cluster_scores is not None:
                if cluster_id in cluster_scores.keys():
                    cluster_score = str(cluster_scores[cluster_id])
                else:
                    cluster_score = str(None)
                line = (
                    row["name"]
                    + "\t"
                    + str(row["LWR"])
                    + "\t"
                    + cluster_id
                    + "\t"
                    + cluster_score
                    + "\t"
                    + taxopath
                    + "\n"
                )
            else:
                line = (
                    row["name"]
                    + "\t"
                    + str(row["LWR"])
                    + "\t"
                    + cluster_id
                    + "\t"
                    + taxopath
                    + "\n"
                )
            lines.append(line)
        file.writelines(lines)

parse_tree_cluster_quality_scores(cluster_scores_tsv)

Parse cluster quality scores file into dictionary

Parameters:

Name Type Description Default
cluster_scores_tsv Path

path to tsv containing cluster scores

required

Returns:

Name Type Description
dict dict

dict with keys equal to cluster IDs and values to scores

Source code in src/metatag/placement.py
405
406
407
408
409
410
411
412
413
414
415
416
def parse_tree_cluster_quality_scores(cluster_scores_tsv: Path) -> dict:
    """Parse cluster quality scores file into dictionary

    Args:
        cluster_scores_tsv (Path): path to tsv containing cluster scores

    Returns:
        dict: dict with keys equal to cluster IDs and values to scores
    """
    df = pd.read_csv(cluster_scores_tsv, sep="\t")
    df["cluster"] = df["cluster"].astype(str)
    return dict(df.values)

parse_tree_clusters(clusters_tsv, cluster_as_key=True)

Parse clusters text file into dictionary

Parameters:

Name Type Description Default
clusters_tsv Path

path to tsv containing tree cluster definitions

required
cluster_as_key bool

if True then dict keys are cluster IDs and values

True

Returns:

Name Type Description
dict dict

description

Source code in src/metatag/placement.py
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
def parse_tree_clusters(clusters_tsv: Path, cluster_as_key: bool = True) -> dict:
    """Parse clusters text file into dictionary

    Args:
        clusters_tsv (Path): path to tsv containing tree cluster definitions
        cluster_as_key (bool, optional): if True then dict keys are cluster IDs and values
        are lists of reference IDs. If False, dict keys are reference IDs and
        values the cluster to which they belong.. Defaults to True.

    Returns:
        dict: _description_
    """
    df = pd.read_csv(clusters_tsv, sep="\t", dtype=str)
    if cluster_as_key:
        cluster_ids = df.cluster.unique()
        return {
            cluster_id: df[df.cluster == cluster_id].id.values.tolist()
            for cluster_id in cluster_ids
        }
    else:
        return dict(df.values)

pick_taxopath_with_highest_lwr(placed_tax_assignments, output_file=None)

Pick taxopath assigment with higuest LWR for each placed query

Parameters:

Name Type Description Default
placed_tax_assignments Path

path to placed taxonomic assignments table

required
output_file Path

path to output file. Defaults to None.

None
Source code in src/metatag/placement.py
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
def pick_taxopath_with_highest_lwr(
    placed_tax_assignments: Path, output_file: Path = None
) -> None:
    """Pick taxopath assigment with higuest LWR for each placed query

    Args:
        placed_tax_assignments (Path): path to placed taxonomic assignments table
        output_file (Path, optional): path to output file. Defaults to None.
    """
    if output_file is None:
        output_file = set_default_output_path(
            placed_tax_assignments, tag="_unique_taxopath"
        )
    else:
        output_file = Path(output_file).resolve()

    df = pd.read_csv(placed_tax_assignments, sep="\t")
    df.groupby("query_id", group_keys=["LWR"]).aggregate("max").to_csv(
        output_file, sep="\t"
    )

place_reads_onto_tree(input_tree, tree_model, ref_aln, query_seqs, aln_method='papara', output_dir=None)

Performs short read placement onto phylogenetic tree tree_model: str, either the model name or path to log output by iqtree workflow example: https://github.com/Pbdas/epa-ng/wiki/Full-Stack-Example

Parameters:

Name Type Description Default
input_tree Path

path to input tree

required
tree_model str

substitution model used for tree inference

required
ref_aln Path

path to reference alignment

required
query_seqs Path

path to query sequences

required
aln_method str

choose either "papara" or "hmmalign". Defaults to "papara".

'papara'
output_dir Path

path to output directory. Defaults to None.

None
Source code in src/metatag/placement.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
def place_reads_onto_tree(
    input_tree: Path,
    tree_model: str,
    ref_aln: Path,
    query_seqs: Path,
    aln_method: str = "papara",
    output_dir: Path = None,
) -> None:
    """Performs short read placement onto phylogenetic tree
    tree_model: str, either the model name or path to log output by iqtree
    workflow example: https://github.com/Pbdas/epa-ng/wiki/Full-Stack-Example

    Args:
        input_tree (Path): path to input tree
        tree_model (str): substitution model used for tree inference
        ref_aln (Path): path to reference alignment
        query_seqs (Path): path to query sequences
        aln_method (str, optional): choose either "papara" or "hmmalign". Defaults to "papara".
        output_dir (Path, optional): path to output directory. Defaults to None.
    """
    if output_dir is None:
        output_dir = set_default_output_path(query_seqs, only_dirname=True)
    else:
        output_dir = Path(output_dir).resolve()

    if Path(tree_model).is_file():
        tree_model = get_iq_tree_model_from_log_file(tree_model)
        logger.info(f"Running EPA-ng with inferred substitution model: {tree_model}")

    ref_ids = get_fasta_record_ids(ref_aln)
    ref_query_msa = output_dir / set_default_output_path(
        query_seqs, extension=".faln", only_filename=True
    )
    aln_ref_frac = set_default_output_path(
        ref_query_msa, tag="_ref_fraction", extension=".faln"
    )
    aln_query_frac = set_default_output_path(
        ref_query_msa, tag="_query_fraction", extension=".faln"
    )

    align_short_reads_to_reference_msa(
        ref_msa=ref_aln,
        query_seqs=query_seqs,
        method=aln_method,
        tree_nwk=input_tree,
        output_dir=output_dir,
    )

    split_reference_from_query_alignments(
        ref_query_msa=ref_query_msa, ref_ids=ref_ids, output_dir=output_dir
    )

    wrappers.run_epang(
        input_tree=input_tree,
        input_aln_ref=aln_ref_frac,
        input_aln_query=aln_query_frac,
        model=tree_model,
        output_dir=output_dir,
        processes=None,
        additional_args=None,
    )

Module taxonomy

Tools to assign taxonomy to reference and query (placed) sequences

TaxonomyAssigner

Methods to assign taxonomy to reference sequences

Source code in src/metatag/taxonomy.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
class TaxonomyAssigner:
    """
    Methods to assign taxonomy to reference sequences
    """

    def __init__(self, taxo_file: Path):
        self._taxodata = (
            pd.read_csv(taxo_file, sep="\t")
            .drop_duplicates(subset="genome")
            .set_index("genome")
        )
        """Methods to assign taxonomy to reference sequences
        """

    @staticmethod
    def lowest_common_taxonomy(taxopaths: list[str]) -> str:
        """Find lowest common taxonomy among set of taxopaths

        Args:
            taxopaths (list[str]): list of taxopath strings

        Returns:
            str: lowest common taxopaht
        """
        data = pd.DataFrame([t.split(";") for t in taxopaths])
        taxlevels = ["d__", "p__", "c__", "o__", "f__", "g__", "s__"]
        lowest_tax = []
        for n, tax_level in enumerate(taxlevels):
            taxa = data.iloc[:, n].drop_duplicates().values
            if len(taxa) == 1:
                lowest_tax.append(taxa[0])
            else:
                break
        return ";".join(lowest_tax)

    def _extract_genome_id_from_label(self, label: str) -> str:
        genome_id = LabelParser(label).extract_genome_id()
        return genome_id

    def assign_taxonomy_to_label(self, label: str) -> str:
        """Assign GTDB taxonomy to label based on genome ID

        Args:
            label (str): reference label containing genome ID

        Returns:
            str: GTDB taxonomy as a string taxopath
        """
        genome_id = self._extract_genome_id_from_label(label)
        if genome_id in self._taxodata.index:
            return self._taxodata.loc[genome_id].item()
        else:
            return "No_taxonomy_found"

    def assign_lowest_common_taxonomy_to_labels(self, labels: list[str]) -> str:
        """Assing taxonomy to set of labels and find lowest common taxonomy
        among them

        Args:
            labels (list[str]): list of reference labels containing genome IDs

        Returns:
            str: lowest common GTDB taxonomy
        """
        taxopaths = [
            taxopath
            for taxopath in map(self.assign_taxonomy_to_label, labels)
            if taxopath != "No_taxonomy_found"
        ]
        try:
            if taxopaths:
                return self.lowest_common_taxonomy(taxopaths)
            else:
                return "Unspecified"
        except Exception:
            return "Unspecified"

    def assign_lowest_common_taxonomy_to_clusters(
        self, clusters: dict, label_dict: dict = None
    ) -> dict:
        """Find lowest possible common taxonomy to reference labels in clusters
        If reference labels do not contain genome IDs, a dictionary, label_dict,
        of reference labels and genome ids (or labels with genome ids) must be passed

        Args:
            clusters (dict): dictionary with keys as cluster IDs and values as lists
                of reference labels in each cluster
            label_dict (dict, optional): dictionary with keys as short IDs and values
                as reference (full) labels. Defaults to None.

        Returns:
            dict: dictionary with keys as cluster IDs and values as lowest common
                taxopath for each cluster
        """
        clusters_taxopath = {}
        for cluster_id, cluster in clusters.items():
            if label_dict is not None:
                cluster_labels = [label_dict[ref_id] for ref_id in cluster]
            else:
                cluster_labels = cluster
            taxopath = self.assign_lowest_common_taxonomy_to_labels(cluster_labels)
            clusters_taxopath[cluster_id] = taxopath
        return clusters_taxopath

    def build_gappa_taxonomy_table(
        self, ref_id_dict: dict, output_file: Path = None
    ) -> None:
        """Build gappa-compatible taxonomy file as specified here:
        https://github.com/lczech/gappa/wiki/Subcommand:-assign
        Removes references without assigned taxonomy

        Args:
            ref_id_dict (dict): dictionary with keys as reference IDs and values as
                reference labels
            output_file (Path, optional): path to output file. Defaults to None.
        """
        if output_file is None:
            output_file = Path().resolve() / "gappa_taxonomy.tsv"
        else:
            output_file = Path(output_file).resolve()

        with open(output_file, "w") as outfile:
            lines = []
            for ref_id, label in ref_id_dict.items():
                taxon_str = self.assign_taxonomy_to_label(label)
                taxon_str = (
                    "Unspecified" if ("No_taxonomy_found" in taxon_str) else taxon_str
                )
                # if taxon_str != 'Unspecified':
                lines.append(f"{ref_id}\t{taxon_str}\n")
            outfile.writelines(lines)

assign_lowest_common_taxonomy_to_clusters(clusters, label_dict=None)

Find lowest possible common taxonomy to reference labels in clusters If reference labels do not contain genome IDs, a dictionary, label_dict, of reference labels and genome ids (or labels with genome ids) must be passed

Parameters:

Name Type Description Default
clusters dict

dictionary with keys as cluster IDs and values as lists of reference labels in each cluster

required
label_dict dict

dictionary with keys as short IDs and values as reference (full) labels. Defaults to None.

None

Returns:

Name Type Description
dict dict

dictionary with keys as cluster IDs and values as lowest common taxopath for each cluster

Source code in src/metatag/taxonomy.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def assign_lowest_common_taxonomy_to_clusters(
    self, clusters: dict, label_dict: dict = None
) -> dict:
    """Find lowest possible common taxonomy to reference labels in clusters
    If reference labels do not contain genome IDs, a dictionary, label_dict,
    of reference labels and genome ids (or labels with genome ids) must be passed

    Args:
        clusters (dict): dictionary with keys as cluster IDs and values as lists
            of reference labels in each cluster
        label_dict (dict, optional): dictionary with keys as short IDs and values
            as reference (full) labels. Defaults to None.

    Returns:
        dict: dictionary with keys as cluster IDs and values as lowest common
            taxopath for each cluster
    """
    clusters_taxopath = {}
    for cluster_id, cluster in clusters.items():
        if label_dict is not None:
            cluster_labels = [label_dict[ref_id] for ref_id in cluster]
        else:
            cluster_labels = cluster
        taxopath = self.assign_lowest_common_taxonomy_to_labels(cluster_labels)
        clusters_taxopath[cluster_id] = taxopath
    return clusters_taxopath

assign_lowest_common_taxonomy_to_labels(labels)

Assing taxonomy to set of labels and find lowest common taxonomy among them

Parameters:

Name Type Description Default
labels list[str]

list of reference labels containing genome IDs

required

Returns:

Name Type Description
str str

lowest common GTDB taxonomy

Source code in src/metatag/taxonomy.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def assign_lowest_common_taxonomy_to_labels(self, labels: list[str]) -> str:
    """Assing taxonomy to set of labels and find lowest common taxonomy
    among them

    Args:
        labels (list[str]): list of reference labels containing genome IDs

    Returns:
        str: lowest common GTDB taxonomy
    """
    taxopaths = [
        taxopath
        for taxopath in map(self.assign_taxonomy_to_label, labels)
        if taxopath != "No_taxonomy_found"
    ]
    try:
        if taxopaths:
            return self.lowest_common_taxonomy(taxopaths)
        else:
            return "Unspecified"
    except Exception:
        return "Unspecified"

assign_taxonomy_to_label(label)

Assign GTDB taxonomy to label based on genome ID

Parameters:

Name Type Description Default
label str

reference label containing genome ID

required

Returns:

Name Type Description
str str

GTDB taxonomy as a string taxopath

Source code in src/metatag/taxonomy.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def assign_taxonomy_to_label(self, label: str) -> str:
    """Assign GTDB taxonomy to label based on genome ID

    Args:
        label (str): reference label containing genome ID

    Returns:
        str: GTDB taxonomy as a string taxopath
    """
    genome_id = self._extract_genome_id_from_label(label)
    if genome_id in self._taxodata.index:
        return self._taxodata.loc[genome_id].item()
    else:
        return "No_taxonomy_found"

build_gappa_taxonomy_table(ref_id_dict, output_file=None)

Build gappa-compatible taxonomy file as specified here: https://github.com/lczech/gappa/wiki/Subcommand:-assign Removes references without assigned taxonomy

Parameters:

Name Type Description Default
ref_id_dict dict

dictionary with keys as reference IDs and values as reference labels

required
output_file Path

path to output file. Defaults to None.

None
Source code in src/metatag/taxonomy.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def build_gappa_taxonomy_table(
    self, ref_id_dict: dict, output_file: Path = None
) -> None:
    """Build gappa-compatible taxonomy file as specified here:
    https://github.com/lczech/gappa/wiki/Subcommand:-assign
    Removes references without assigned taxonomy

    Args:
        ref_id_dict (dict): dictionary with keys as reference IDs and values as
            reference labels
        output_file (Path, optional): path to output file. Defaults to None.
    """
    if output_file is None:
        output_file = Path().resolve() / "gappa_taxonomy.tsv"
    else:
        output_file = Path(output_file).resolve()

    with open(output_file, "w") as outfile:
        lines = []
        for ref_id, label in ref_id_dict.items():
            taxon_str = self.assign_taxonomy_to_label(label)
            taxon_str = (
                "Unspecified" if ("No_taxonomy_found" in taxon_str) else taxon_str
            )
            # if taxon_str != 'Unspecified':
            lines.append(f"{ref_id}\t{taxon_str}\n")
        outfile.writelines(lines)

lowest_common_taxonomy(taxopaths) staticmethod

Find lowest common taxonomy among set of taxopaths

Parameters:

Name Type Description Default
taxopaths list[str]

list of taxopath strings

required

Returns:

Name Type Description
str str

lowest common taxopaht

Source code in src/metatag/taxonomy.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
@staticmethod
def lowest_common_taxonomy(taxopaths: list[str]) -> str:
    """Find lowest common taxonomy among set of taxopaths

    Args:
        taxopaths (list[str]): list of taxopath strings

    Returns:
        str: lowest common taxopaht
    """
    data = pd.DataFrame([t.split(";") for t in taxopaths])
    taxlevels = ["d__", "p__", "c__", "o__", "f__", "g__", "s__"]
    lowest_tax = []
    for n, tax_level in enumerate(taxlevels):
        taxa = data.iloc[:, n].drop_duplicates().values
        if len(taxa) == 1:
            lowest_tax.append(taxa[0])
        else:
            break
    return ";".join(lowest_tax)

TaxonomyCounter

Source code in src/metatag/taxonomy.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
class TaxonomyCounter:
    def __init__(self, taxopath_list: list[str]):
        """
        Tools to summarize taxonomical diversity in a list of taxopaths
        """
        taxodicts = [Taxopath(taxopath_str).taxodict for taxopath_str in taxopath_list]
        self._taxohits = pd.DataFrame(taxodicts).applymap(
            lambda x: "Unspecified" if x is None else x
        )

    def get_counts(
        self,
        taxlevel: str = "family",
        output_tsv: Path = None,
        plot_type: str = "bar",
        output_pdf: Path = None,
    ) -> None:
        """Compute counts and fraction at specified taxonomy levels

        Args:
            taxlevel (str, optional): tanoxomy level to perform counts at.
                Defaults to "family".
            output_tsv (Path, optional): path to output file. Defaults to None.
            plot_type (str, optional): choose either "bar" or "pie". Defaults to "bar".
            output_pdf (Path, optional): path to output pdf with figures.
                Defaults to None.

        Returns:
            pd.DataFrame: dataframe with counts and fraction at specified taxlevel
        """
        counts = self._taxohits[taxlevel].value_counts(normalize=False)
        # Merge counts from "Unspecified" and empty tax level, e.g., "f__"
        matched_row = counts.index[counts.index.str.fullmatch("[a-zA-Z]__")]
        if not matched_row.empty:
            empty_tax_level = matched_row.item()
            counts[counts.index == "Unspecified"] = (
                counts[counts.index == "Unspecified"] + counts[empty_tax_level].item()
            )
            counts = counts.drop(labels=empty_tax_level)

        counts.index.name = taxlevel
        df = counts.to_frame(name="counts")
        df["fraction"] = df.counts.apply(lambda x: x / df.counts.sum())
        if output_tsv is not None:
            df.to_csv(output_tsv, sep="\t")
        if output_pdf is not None:
            self.plot_counts(
                df,
                plot_type=plot_type,
                output_pdf=output_pdf,
                title=f"Taxonomy at level: {taxlevel}",
            )

    def plot_counts(
        self,
        count_data: pd.DataFrame,
        output_pdf: Path,
        plot_type: str = "bar",
        title: str = None,
    ) -> None:
        """Make (and optionally export) barplot ('bar') or pieplot ('pie')
        figure depicting counting results at specified taxonomic level

        Args:
            count_data (pd.DataFrame): dataframe with counts and fraction
                at specified taxonomy level as returned by get_counts()
            plot_type (str, optional): choose between "bar" and "pie".
                Defaults to "bar".
            output_pdf (Path, optional): path to output pdf containing figure.
                Defaults to None.
            title (str, optional): figure title. Defaults to None.
        """
        if title is None:
            title = ""
        if plot_type == "pie":
            count_data.counts.plot.pie(
                figsize=(15, 15), title=title, rotatelabels=True
            ).get_figure().savefig(output_pdf, format="pdf")
        else:
            count_data.counts.plot.bar(
                figsize=(15, 15), title=title, rot=90
            ).get_figure().savefig(output_pdf, format="pdf")

__init__(taxopath_list)

Tools to summarize taxonomical diversity in a list of taxopaths

Source code in src/metatag/taxonomy.py
209
210
211
212
213
214
215
216
def __init__(self, taxopath_list: list[str]):
    """
    Tools to summarize taxonomical diversity in a list of taxopaths
    """
    taxodicts = [Taxopath(taxopath_str).taxodict for taxopath_str in taxopath_list]
    self._taxohits = pd.DataFrame(taxodicts).applymap(
        lambda x: "Unspecified" if x is None else x
    )

get_counts(taxlevel='family', output_tsv=None, plot_type='bar', output_pdf=None)

Compute counts and fraction at specified taxonomy levels

Parameters:

Name Type Description Default
taxlevel str

tanoxomy level to perform counts at. Defaults to "family".

'family'
output_tsv Path

path to output file. Defaults to None.

None
plot_type str

choose either "bar" or "pie". Defaults to "bar".

'bar'
output_pdf Path

path to output pdf with figures. Defaults to None.

None

Returns:

Type Description
None

pd.DataFrame: dataframe with counts and fraction at specified taxlevel

Source code in src/metatag/taxonomy.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
def get_counts(
    self,
    taxlevel: str = "family",
    output_tsv: Path = None,
    plot_type: str = "bar",
    output_pdf: Path = None,
) -> None:
    """Compute counts and fraction at specified taxonomy levels

    Args:
        taxlevel (str, optional): tanoxomy level to perform counts at.
            Defaults to "family".
        output_tsv (Path, optional): path to output file. Defaults to None.
        plot_type (str, optional): choose either "bar" or "pie". Defaults to "bar".
        output_pdf (Path, optional): path to output pdf with figures.
            Defaults to None.

    Returns:
        pd.DataFrame: dataframe with counts and fraction at specified taxlevel
    """
    counts = self._taxohits[taxlevel].value_counts(normalize=False)
    # Merge counts from "Unspecified" and empty tax level, e.g., "f__"
    matched_row = counts.index[counts.index.str.fullmatch("[a-zA-Z]__")]
    if not matched_row.empty:
        empty_tax_level = matched_row.item()
        counts[counts.index == "Unspecified"] = (
            counts[counts.index == "Unspecified"] + counts[empty_tax_level].item()
        )
        counts = counts.drop(labels=empty_tax_level)

    counts.index.name = taxlevel
    df = counts.to_frame(name="counts")
    df["fraction"] = df.counts.apply(lambda x: x / df.counts.sum())
    if output_tsv is not None:
        df.to_csv(output_tsv, sep="\t")
    if output_pdf is not None:
        self.plot_counts(
            df,
            plot_type=plot_type,
            output_pdf=output_pdf,
            title=f"Taxonomy at level: {taxlevel}",
        )

plot_counts(count_data, output_pdf, plot_type='bar', title=None)

Make (and optionally export) barplot ('bar') or pieplot ('pie') figure depicting counting results at specified taxonomic level

Parameters:

Name Type Description Default
count_data pd.DataFrame

dataframe with counts and fraction at specified taxonomy level as returned by get_counts()

required
plot_type str

choose between "bar" and "pie". Defaults to "bar".

'bar'
output_pdf Path

path to output pdf containing figure. Defaults to None.

required
title str

figure title. Defaults to None.

None
Source code in src/metatag/taxonomy.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def plot_counts(
    self,
    count_data: pd.DataFrame,
    output_pdf: Path,
    plot_type: str = "bar",
    title: str = None,
) -> None:
    """Make (and optionally export) barplot ('bar') or pieplot ('pie')
    figure depicting counting results at specified taxonomic level

    Args:
        count_data (pd.DataFrame): dataframe with counts and fraction
            at specified taxonomy level as returned by get_counts()
        plot_type (str, optional): choose between "bar" and "pie".
            Defaults to "bar".
        output_pdf (Path, optional): path to output pdf containing figure.
            Defaults to None.
        title (str, optional): figure title. Defaults to None.
    """
    if title is None:
        title = ""
    if plot_type == "pie":
        count_data.counts.plot.pie(
            figsize=(15, 15), title=title, rotatelabels=True
        ).get_figure().savefig(output_pdf, format="pdf")
    else:
        count_data.counts.plot.bar(
            figsize=(15, 15), title=title, rot=90
        ).get_figure().savefig(output_pdf, format="pdf")

Taxopath

Object to contain taxopath

Source code in src/metatag/taxonomy.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
class Taxopath:
    """
    Object to contain taxopath
    """

    def __init__(self, taxopath_str: str = None, delimiter: str = ";"):
        """_summary_

        Args:
            taxopath_str (str, optional): taxopath as a string. Defaults to None.
            delimiter (str, optional): taxa delimiter in taxopath. Defaults to ";".
        """
        self._taxopath = taxopath_str
        self._delimiter = delimiter
        self._tax_levels = [
            "domain",
            "phylum",
            "class",
            "order",
            "family",
            "genus",
            "species",
        ]
        self.taxodict = self._dict_from_taxopath()

    def _dict_from_taxopath(self):
        if self._taxopath is None:
            taxolist = []
        else:
            taxolist = [elem.strip() for elem in self._taxopath.split(self._delimiter)]
        taxolist.extend([None for _ in range(len(self._tax_levels) - len(taxolist))])
        return {taxlevel: taxon for taxlevel, taxon in zip(self._tax_levels, taxolist)}

    @classmethod
    def from_dict(cls, taxodict: dict, delimiter: str = ";") -> Taxopath:
        """Instantiate Taxopath object from dict

        Args:
            taxodict (dict): dict of taxonomic levels and taxa
            delimiter (str, optional): delimiter to separate taxon levels.
                Defaults to ";".

        Returns:
            Taxopath: Taxopath object
        """
        taxa = []
        for taxon in taxodict.values():
            if taxon is None:
                break
            else:
                taxa.append(taxon)
        taxostr = delimiter.join(taxa)
        return cls(taxopath_str=taxostr, delimiter=delimiter)

__init__(taxopath_str=None, delimiter=';')

summary

Parameters:

Name Type Description Default
taxopath_str str

taxopath as a string. Defaults to None.

None
delimiter str

taxa delimiter in taxopath. Defaults to ";".

';'
Source code in src/metatag/taxonomy.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def __init__(self, taxopath_str: str = None, delimiter: str = ";"):
    """_summary_

    Args:
        taxopath_str (str, optional): taxopath as a string. Defaults to None.
        delimiter (str, optional): taxa delimiter in taxopath. Defaults to ";".
    """
    self._taxopath = taxopath_str
    self._delimiter = delimiter
    self._tax_levels = [
        "domain",
        "phylum",
        "class",
        "order",
        "family",
        "genus",
        "species",
    ]
    self.taxodict = self._dict_from_taxopath()

from_dict(taxodict, delimiter=';') classmethod

Instantiate Taxopath object from dict

Parameters:

Name Type Description Default
taxodict dict

dict of taxonomic levels and taxa

required
delimiter str

delimiter to separate taxon levels. Defaults to ";".

';'

Returns:

Name Type Description
Taxopath Taxopath

Taxopath object

Source code in src/metatag/taxonomy.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@classmethod
def from_dict(cls, taxodict: dict, delimiter: str = ";") -> Taxopath:
    """Instantiate Taxopath object from dict

    Args:
        taxodict (dict): dict of taxonomic levels and taxa
        delimiter (str, optional): delimiter to separate taxon levels.
            Defaults to ";".

    Returns:
        Taxopath: Taxopath object
    """
    taxa = []
    for taxon in taxodict.values():
        if taxon is None:
            break
        else:
            taxa.append(taxon)
    taxostr = delimiter.join(taxa)
    return cls(taxopath_str=taxostr, delimiter=delimiter)

Module visualization

Tools to visualize phylogenetic trees with and without short read placement.

make_feature_metadata_table(label_dict, output_tsv, original_labels=True)

Construct feature metadata tsv classifiying reference and query sequences for empress https://github.com/biocore/empress/issues/548

Parameters:

Name Type Description Default
label_dict dict

dictionary of sequence short IDs and labels. Reference sequences should be prefixed with "ref_", and query sequences should be prefixed with "query_".

required
output_tsv Path

path to output tsv file with metadata table.

required
original_labels bool

whether to include original long labels in tree. Defaults to True.

True
Source code in src/metatag/visualization.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def make_feature_metadata_table(
    label_dict: dict, output_tsv: Path, original_labels: bool = True
) -> None:
    """Construct feature metadata tsv classifiying reference and
    query sequences for empress
    https://github.com/biocore/empress/issues/548

    Args:
        label_dict (dict): dictionary of sequence short IDs and labels.
            Reference sequences should be prefixed with "ref_", and
            query sequences should be prefixed with "query_".
        output_tsv (Path): path to output tsv file with metadata table.
        original_labels (bool, optional): whether to include original
            long labels in tree. Defaults to True.
    """
    feature_dict = {
        seq_name if original_labels else seq_id: "ref" if "ref_" in seq_id else "query"
        for seq_id, seq_name in label_dict.items()
    }
    df = pd.DataFrame.from_dict(feature_dict, orient="index", columns=["Sequence type"])
    df.index.name = "Feature ID"
    df.to_csv(output_tsv, sep="\t")

make_tree_html(input_tree, output_dir=None, feature_metadata=None)

Runs empress tree-plot empress: https://github.com/biocore/empress

input_tree (Path): path to tree in newick format output_dir (Path, optional): path to output directory. Defaults to None. feature_metadata (Path, optional): path to fieature metadata table as output by make_feature_metadata_table. Defaults to None.

Source code in src/metatag/visualization.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def make_tree_html(
    input_tree: Path, output_dir: Path = None, feature_metadata: Path = None
) -> None:
    """Runs empress tree-plot
    empress:  https://github.com/biocore/empress

    Args:
    input_tree (Path): path to tree in newick format
    output_dir (Path, optional): path to output directory. Defaults to None.
    feature_metadata (Path, optional): path to fieature metadata
        table as output by make_feature_metadata_table. Defaults to None.
    """
    input_tree = Path(input_tree).resolve()
    if output_dir is None:
        output_dir = input_tree.parent / "empress-plot"
    else:
        output_dir = Path(output_dir).resolve()
    if feature_metadata is not None:
        meta_str = f"-fm {feature_metadata}"
    else:
        meta_str = ""
    cmd_str = f"empress tree-plot -t {input_tree} {meta_str} -o {output_dir}"
    terminal_execute(cmd_str, suppress_shell_output=False)

plot_tree_in_browser(input_tree, output_dir=None, feature_metadata=None)

Runs empress tree-plot and opens generated html in browser empress: https://github.com/biocore/empress

Parameters:

Name Type Description Default
input_tree Path

path to tree in newick format

required
output_dir Path

path to output directory. Defaults to None.

None
feature_metadata Path

path to fieature metadata table as output by make_feature_metadata_table. Defaults to None.

None
Source code in src/metatag/visualization.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def plot_tree_in_browser(
    input_tree: Path, output_dir: Path = None, feature_metadata: Path = None
) -> None:
    """Runs empress tree-plot and opens generated html in browser
    empress: https://github.com/biocore/empress

    Args:
        input_tree (Path): path to tree in newick format
        output_dir (Path, optional): path to output directory. Defaults to None.
        feature_metadata (Path, optional): path to fieature metadata
            table as output by make_feature_metadata_table. Defaults to None.
    """
    make_tree_html(input_tree, output_dir, feature_metadata)
    webbrowser.open_new_tab((output_dir / "empress.html").as_posix())

Module utils

Functions and classes for general purposes

CommandArgs

Base class to hold command line arguments.

Source code in src/metatag/utils.py
27
28
29
30
31
class CommandArgs:
    """Base class to hold command line arguments."""

    def __init__(self, **kwargs):
        self.__dict__.update(kwargs)

ConfigParser

Handle MetaTag configuration file.

Source code in src/metatag/utils.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
class ConfigParser:
    """Handle MetaTag configuration file."""

    def __init__(self, config_file: Path) -> None:
        """Handle MetaTag configuration file."

        Args:
            config_file (Path): _description_
        """
        self._config_file = Path(config_file).resolve()
        self._config = self.get_config()

    @classmethod
    def get_default_config(cls):
        """Initialize ConfigParser with default config file."""
        return cls(cls.initialize_config_file())

    @staticmethod
    def initialize_config_file() -> Path:
        """Initialize empty config file.
        Returns:
            Path: path to generated config file.
        """
        config_file = Path(__file__).parent / "config.json"
        if not config_file.exists():
            config = {
                "input_database": "",
                "hmm_directory": "",
                "max_hmm_reference_size": "",
                "min_sequence_length": "",
                "max_sequence_length": "",
                "output_directory": "",
                "translate": "",
                "relabel": "",
                "remove_duplicates": "",
                "relabel_prefixes": "",
                "hmmsearch_args": "",
                "tree_method": "",
            }
            with open(config_file, "w", encoding="UTF-8") as f:
                json.dump(config, f, indent=4)
        return config_file

    def get_config_path(self) -> Path:
        """Show config file path."""
        return self._config_file

    def get_config(self) -> dict:
        """Load config file.
        Returns:
            dict: dict containing fields and values of config file.
        """
        with open(self._config_file, "r", encoding="UTF-8") as file:
            config = json.loads(file.read())
        return config

    def write_config(self) -> None:
        """Write config dict to file."""
        with open(self._config_file, "w", encoding="UTF-8") as f:
            json.dump(self._config, f, indent=4)

    def update_config(self, key: str, value: str) -> None:
        """Update config file
        Args:
            key (str): config file key name to be updated.
            value (str): new value.
        """
        self._config[key] = value
        self.write_config()

    def get_field(self, key: str) -> str:
        """Get field from config file.
        Args:
            key (str): key name to get the value from.
        Returns:
            str: key value.
        """
        return self._config[key]

__init__(config_file)

Handle MetaTag configuration file."

Parameters:

Name Type Description Default
config_file Path

description

required
Source code in src/metatag/utils.py
37
38
39
40
41
42
43
44
def __init__(self, config_file: Path) -> None:
    """Handle MetaTag configuration file."

    Args:
        config_file (Path): _description_
    """
    self._config_file = Path(config_file).resolve()
    self._config = self.get_config()

get_config()

Load config file.

Returns:

Name Type Description
dict dict

dict containing fields and values of config file.

Source code in src/metatag/utils.py
81
82
83
84
85
86
87
88
def get_config(self) -> dict:
    """Load config file.
    Returns:
        dict: dict containing fields and values of config file.
    """
    with open(self._config_file, "r", encoding="UTF-8") as file:
        config = json.loads(file.read())
    return config

get_config_path()

Show config file path.

Source code in src/metatag/utils.py
77
78
79
def get_config_path(self) -> Path:
    """Show config file path."""
    return self._config_file

get_default_config() classmethod

Initialize ConfigParser with default config file.

Source code in src/metatag/utils.py
46
47
48
49
@classmethod
def get_default_config(cls):
    """Initialize ConfigParser with default config file."""
    return cls(cls.initialize_config_file())

get_field(key)

Get field from config file.

Parameters:

Name Type Description Default
key str

key name to get the value from.

required

Returns:

Name Type Description
str str

key value.

Source code in src/metatag/utils.py
104
105
106
107
108
109
110
111
def get_field(self, key: str) -> str:
    """Get field from config file.
    Args:
        key (str): key name to get the value from.
    Returns:
        str: key value.
    """
    return self._config[key]

initialize_config_file() staticmethod

Initialize empty config file.

Returns:

Name Type Description
Path Path

path to generated config file.

Source code in src/metatag/utils.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@staticmethod
def initialize_config_file() -> Path:
    """Initialize empty config file.
    Returns:
        Path: path to generated config file.
    """
    config_file = Path(__file__).parent / "config.json"
    if not config_file.exists():
        config = {
            "input_database": "",
            "hmm_directory": "",
            "max_hmm_reference_size": "",
            "min_sequence_length": "",
            "max_sequence_length": "",
            "output_directory": "",
            "translate": "",
            "relabel": "",
            "remove_duplicates": "",
            "relabel_prefixes": "",
            "hmmsearch_args": "",
            "tree_method": "",
        }
        with open(config_file, "w", encoding="UTF-8") as f:
            json.dump(config, f, indent=4)
    return config_file

update_config(key, value)

Update config file

Parameters:

Name Type Description Default
key str

config file key name to be updated.

required
value str

new value.

required
Source code in src/metatag/utils.py
 95
 96
 97
 98
 99
100
101
102
def update_config(self, key: str, value: str) -> None:
    """Update config file
    Args:
        key (str): config file key name to be updated.
        value (str): new value.
    """
    self._config[key] = value
    self.write_config()

write_config()

Write config dict to file.

Source code in src/metatag/utils.py
90
91
92
93
def write_config(self) -> None:
    """Write config dict to file."""
    with open(self._config_file, "w", encoding="UTF-8") as f:
        json.dump(self._config, f, indent=4)

DictMerger

Source code in src/metatag/utils.py
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
class DictMerger:
    def __init__(self, dicts: list[dict]) -> None:
        """
        Toos to merge python dictionaries into a single one
        Args
            dicts: list of dictionaries to be merged
        """
        self._dict_list = dicts

    @classmethod
    def from_pickle_paths(cls, dict_paths: list[Path]) -> DictMerger:
        """Initialize class from list of paths to dictionaries (pickle)

        Args:
            dict_paths (list[Path]): list of paths to piclke files

        Returns:
            DictMerger: DictMerger instance
        """
        dict_paths = [Path(dict_path).resolve() for dict_path in dict_paths]
        dict_list = [
            cls.read_from_pickle_file(dict_path.as_posix().strip())
            for dict_path in dict_paths
        ]
        return cls(dicts=dict_list)

    @staticmethod
    def read_from_pickle_file(path_to_file: Path = "object.pkl"):
        """Load python object from pickle file

        Args:
            path_to_file (Path, optional): path to pickle file.
                Defaults to "object.pkl".

        Returns:
            _type_: Python object
        """
        in_file = open(path_to_file, "rb")
        python_object = pickle.load(in_file)
        in_file.close()
        return python_object

    def merge(
        self, dict_prefixes: list[str] = None, save_pickle_path: Path = None
    ) -> dict:
        """Merge dictionaries

        Args:
            dict_prefixes (list[str], optional): list of strings containing prefixes
            to be added to values in each dict (optional). Defaults to None.
            save_pickle_path (Path, optional): _description_. Defaults to None.

        Returns:
            dict: _description_
        """
        if dict_prefixes is None:
            dict_prefixes = ["" for _ in range(len(self._dict_list))]
        else:
            dict_prefixes = dict_prefixes

        merged_dict = {
            key: prefix + value
            for prefix, dict_object in zip(dict_prefixes, self._dict_list)
            for (key, value) in dict_object.items()
        }

        if save_pickle_path is not None:
            save_to_pickle_file(merged_dict, save_pickle_path)
        return merged_dict

__init__(dicts)

Toos to merge python dictionaries into a single one Args dicts: list of dictionaries to be merged

Source code in src/metatag/utils.py
347
348
349
350
351
352
353
def __init__(self, dicts: list[dict]) -> None:
    """
    Toos to merge python dictionaries into a single one
    Args
        dicts: list of dictionaries to be merged
    """
    self._dict_list = dicts

from_pickle_paths(dict_paths) classmethod

Initialize class from list of paths to dictionaries (pickle)

Parameters:

Name Type Description Default
dict_paths list[Path]

list of paths to piclke files

required

Returns:

Name Type Description
DictMerger DictMerger

DictMerger instance

Source code in src/metatag/utils.py
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
@classmethod
def from_pickle_paths(cls, dict_paths: list[Path]) -> DictMerger:
    """Initialize class from list of paths to dictionaries (pickle)

    Args:
        dict_paths (list[Path]): list of paths to piclke files

    Returns:
        DictMerger: DictMerger instance
    """
    dict_paths = [Path(dict_path).resolve() for dict_path in dict_paths]
    dict_list = [
        cls.read_from_pickle_file(dict_path.as_posix().strip())
        for dict_path in dict_paths
    ]
    return cls(dicts=dict_list)

merge(dict_prefixes=None, save_pickle_path=None)

Merge dictionaries

Parameters:

Name Type Description Default
dict_prefixes list[str]

list of strings containing prefixes

None
save_pickle_path Path

description. Defaults to None.

None

Returns:

Name Type Description
dict dict

description

Source code in src/metatag/utils.py
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
def merge(
    self, dict_prefixes: list[str] = None, save_pickle_path: Path = None
) -> dict:
    """Merge dictionaries

    Args:
        dict_prefixes (list[str], optional): list of strings containing prefixes
        to be added to values in each dict (optional). Defaults to None.
        save_pickle_path (Path, optional): _description_. Defaults to None.

    Returns:
        dict: _description_
    """
    if dict_prefixes is None:
        dict_prefixes = ["" for _ in range(len(self._dict_list))]
    else:
        dict_prefixes = dict_prefixes

    merged_dict = {
        key: prefix + value
        for prefix, dict_object in zip(dict_prefixes, self._dict_list)
        for (key, value) in dict_object.items()
    }

    if save_pickle_path is not None:
        save_to_pickle_file(merged_dict, save_pickle_path)
    return merged_dict

read_from_pickle_file(path_to_file='object.pkl') staticmethod

Load python object from pickle file

Parameters:

Name Type Description Default
path_to_file Path

path to pickle file. Defaults to "object.pkl".

'object.pkl'

Returns:

Name Type Description
_type_

Python object

Source code in src/metatag/utils.py
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
@staticmethod
def read_from_pickle_file(path_to_file: Path = "object.pkl"):
    """Load python object from pickle file

    Args:
        path_to_file (Path, optional): path to pickle file.
            Defaults to "object.pkl".

    Returns:
        _type_: Python object
    """
    in_file = open(path_to_file, "rb")
    python_object = pickle.load(in_file)
    in_file.close()
    return python_object

TemporaryDirectoryPath

Custom context manager to create a temporary directory which is removed when exiting context manager

Source code in src/metatag/utils.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
class TemporaryDirectoryPath:
    """
    Custom context manager to create a temporary directory
    which is removed when exiting context manager
    """

    def __init__(self, work_dir: Path = None):
        """Custom context manager to create a temporary directory
           which is removed when exiting context manager

        Args:
            work_dir (Path, optional): path to working directory. Defaults to None.
        """
        if work_dir is not None:
            self.work_dir = Path(work_dir).resolve()
        else:
            self.work_dir = work_dir

    def __enter__(self):
        temp_id = "".join(random.choice(string.ascii_lowercase) for i in range(10))
        self.dir_path = (
            self.work_dir / f"temp_{temp_id}/"
            if self.work_dir is not None
            else Path(f"temp_{temp_id}").resolve()
        )
        self.dir_path.mkdir(parents=True, exist_ok=True)
        return self.dir_path

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.dir_path.exists():
            shutil.rmtree(self.dir_path)

__init__(work_dir=None)

Custom context manager to create a temporary directory which is removed when exiting context manager

Parameters:

Name Type Description Default
work_dir Path

path to working directory. Defaults to None.

None
Source code in src/metatag/utils.py
185
186
187
188
189
190
191
192
193
194
195
def __init__(self, work_dir: Path = None):
    """Custom context manager to create a temporary directory
       which is removed when exiting context manager

    Args:
        work_dir (Path, optional): path to working directory. Defaults to None.
    """
    if work_dir is not None:
        self.work_dir = Path(work_dir).resolve()
    else:
        self.work_dir = work_dir

TemporaryFilePath

Custom context manager to create a temporary file which is removed when exiting context manager

Source code in src/metatag/utils.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
class TemporaryFilePath:
    """
    Custom context manager to create a temporary file
    which is removed when exiting context manager
    """

    def __init__(
        self, work_dir: Path = None, extension: str = None, create_file: bool = False
    ):
        """Custom context manager to create a temporary file
           which is removed when exiting context manager

        Args:
            work_dir (Path, optional): path to working directory. Defaults to None.
            extension (str, optional): file extension. Defaults to None.
            create_file (bool, optional): whether to create a permanent file.
                Defaults to False.
        """
        if work_dir is not None:
            self.work_dir = Path(work_dir).resolve()
        else:
            self.work_dir = Path().resolve()
        self.extension = extension or ""
        self.create_file = create_file

    def __enter__(self):
        temp_id = "".join(random.choice(string.ascii_lowercase) for i in range(10))
        temp_file_name = f"temp_{temp_id}{self.extension}"
        self.file_path = (
            self.work_dir / temp_file_name if self.work_dir else Path(temp_file_name)
        )
        if self.create_file:
            self.file_path.mkdir(parents=True, exist_ok=True)
        return self.file_path

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.file_path.unlink(missing_ok=True)

__init__(work_dir=None, extension=None, create_file=False)

Custom context manager to create a temporary file which is removed when exiting context manager

Parameters:

Name Type Description Default
work_dir Path

path to working directory. Defaults to None.

None
extension str

file extension. Defaults to None.

None
create_file bool

whether to create a permanent file. Defaults to False.

False
Source code in src/metatag/utils.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def __init__(
    self, work_dir: Path = None, extension: str = None, create_file: bool = False
):
    """Custom context manager to create a temporary file
       which is removed when exiting context manager

    Args:
        work_dir (Path, optional): path to working directory. Defaults to None.
        extension (str, optional): file extension. Defaults to None.
        create_file (bool, optional): whether to create a permanent file.
            Defaults to False.
    """
    if work_dir is not None:
        self.work_dir = Path(work_dir).resolve()
    else:
        self.work_dir = Path().resolve()
    self.extension = extension or ""
    self.create_file = create_file

easy_pattern_matching(text, left_pattern, right_pattern=None)

Just straightforward string searchs between two patterns

Parameters:

Name Type Description Default
text str

srring to be searched

required
left_pattern str

left most border pattern

required
right_pattern str

right most border pattern. Defaults to None.

None

Returns:

Name Type Description
str str

description

Source code in src/metatag/utils.py
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
def easy_pattern_matching(
    text: str, left_pattern: str, right_pattern: str = None
) -> str:
    """Just straightforward string searchs between two patterns

    Args:
        text (str): srring to be searched
        left_pattern (str): left most border pattern
        right_pattern (str, optional): right most border pattern. Defaults to None.

    Returns:
        str: _description_
    """
    idx1 = text.find(left_pattern)
    left_subtext = text[idx1 + len(left_pattern) :]
    if right_pattern is not None:
        idx2 = left_subtext.find(right_pattern)
        matched_text = left_subtext[:idx2]
    else:
        matched_text = left_subtext
    return matched_text

init_logger(args)

Initialize logger object

Parameters:

Name Type Description Default
args Union[CommandArgs, ArgumentParser]

arguments object

required

Returns:

Type Description
logging.Logger

logging.Logger: initialized logger object

Source code in src/metatag/utils.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def init_logger(args: Union[CommandArgs, ArgumentParser]) -> logging.Logger:
    """Initialize logger object
    Args:
        args (Union[CommandArgs, ArgumentParser]): arguments object
    Returns:
        logging.Logger: initialized logger object
    """
    if args.logfile is None:
        args.logfile = Path(os.devnull)
    elif not Path(args.logfile.parent).exists():
        Path(args.logfile.parent).mkdir(parents=True)
    logging.basicConfig(
        format="%(asctime)s | %(levelname)s: %(message)s",
        handlers=[ClosingFileHandler(args.logfile), logging.StreamHandler(sys.stdout)],
        level=logging.INFO,
    )
    logger = logging.getLogger(__name__)
    return logger

parallelize_over_input_files(callable, input_list, processes=None, **callable_kwargs)

Parallelize callable over a set of input objects using a pool of workers. Inputs in input list are passed to the first argument of the callable. Additional callable named arguments may be passed.

Parameters:

Name Type Description Default
callable _type_

function to be called.

required
input_list list

list of input objects to callable.

required
n_processes int

maximum number of processes. Defaults to None.

required
Source code in src/metatag/utils.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
def parallelize_over_input_files(
    callable, input_list: list, processes: int = None, **callable_kwargs
) -> None:
    """Parallelize callable over a set of input objects using a pool
    of workers. Inputs in input list are passed to the first argument
    of the callable. Additional callable named arguments may be passed.

    Args:
        callable (_type_): function to be called.
        input_list (list): list of input objects to callable.
        n_processes (int, optional): maximum number of processes. Defaults to None.
    """
    if processes is None:
        processes = os.cpu_count - 1
    p = Pool(processes=processes)
    p.map(partial(callable, **callable_kwargs), input_list)
    p.close()
    p.join()

read_from_pickle_file(path_to_file='object.pkl')

Load python object from pickle file.

Parameters:

Name Type Description Default
path_to_file Path

path to picke file. Defaults to "object.pkl".

'object.pkl'

Returns:

Name Type Description
_type_

Python object.

Source code in src/metatag/utils.py
262
263
264
265
266
267
268
269
270
271
272
273
274
def read_from_pickle_file(path_to_file: Path = "object.pkl"):
    """Load python object from pickle file.

    Args:
        path_to_file (Path, optional): path to picke file. Defaults to "object.pkl".

    Returns:
        _type_: Python object.
    """
    infile = open(path_to_file, "rb")
    python_object = pickle.load(infile)
    infile.close()
    return python_object

save_to_pickle_file(python_object, path_to_file='object.pkl')

Save python object to pickle file

Parameters:

Name Type Description Default
python_object object

description

required
path_to_file Path

description. Defaults to "object.pkl".

'object.pkl'
Source code in src/metatag/utils.py
250
251
252
253
254
255
256
257
258
259
def save_to_pickle_file(python_object: object, path_to_file: Path = "object.pkl"):
    """Save python object to pickle file

    Args:
        python_object (object): _description_
        path_to_file (Path, optional): _description_. Defaults to "object.pkl".
    """
    outfile = open(path_to_file, "wb")
    pickle.dump(python_object, outfile)
    outfile.close()

set_default_output_path(input_path, tag=None, extension=None, only_filename=False, only_basename=False, only_dirname=False)

Utility function to generate a default path to output file or directory based on an input file name and path.

Parameters:

Name Type Description Default
input_path Path

path to input file.

required
tag str

text tag to be added to file name. Defaults to None.

None
extension str

change input file extension with this one. Defaults to None.

None
only_filename bool

output only default filename. Defaults to False.

False
only_basename bool

output only default basename (no extension). Defaults to False.

False
only_dirname bool

output only path to default output directory. Defaults to False.

False

Returns:

Name Type Description
Path Path

a path or name to a default output file.

Source code in src/metatag/utils.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def set_default_output_path(
    input_path: Path,
    tag: str = None,
    extension: str = None,
    only_filename: bool = False,
    only_basename: bool = False,
    only_dirname: bool = False,
) -> Path:
    """Utility function to generate a default path to output file
    or directory based on an input file name and path.
    Args:
        input_path (Path): path to input file.
        tag (str, optional): text tag to be added to file name. Defaults to None.
        extension (str, optional): change input file extension with this one. Defaults to None.
        only_filename (bool, optional): output only default filename. Defaults to False.
        only_basename (bool, optional): output only default basename (no extension). Defaults to False.
        only_dirname (bool, optional): output only path to default output directory. Defaults to False.
    Returns:
        Path: a path or name to a default output file.
    """
    input_path = Path(input_path).resolve()
    dirname = input_path.parent
    fname, ext = input_path.stem, input_path.suffix
    if extension is None:
        extension = ext
    if tag is None:
        tag = ""
    default_file = f"{fname}{tag}{extension}"
    if only_basename:
        return Path(fname)
    if only_filename:
        return Path(default_file)
    if only_dirname:
        return dirname
    else:
        return (dirname / default_file).resolve()

terminal_execute(command_str, suppress_shell_output=False, work_dir=None, return_output=False)

Execute given command in terminal through Python.

Parameters:

Name Type Description Default
command_str str

terminal command to be executed.

required
suppress_shell_output bool

suppress shell output. Defaults to False.

False
work_dir Path

change working directory. Defaults to None.

None
return_output bool

whether to return execution output. Defaults to False.

False

Returns:

Type Description
subprocess.STDOUT

subprocess.STDOUT: subprocess output.

Source code in src/metatag/utils.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
def terminal_execute(
    command_str: str,
    suppress_shell_output=False,
    work_dir: Path = None,
    return_output=False,
) -> subprocess.STDOUT:
    """Execute given command in terminal through Python.
    Args:
        command_str (str): terminal command to be executed.
        suppress_shell_output (bool, optional): suppress shell output. Defaults to False.
        work_dir (Path, optional): change working directory. Defaults to None.
        return_output (bool, optional): whether to return execution output. Defaults to False.
    Returns:
        subprocess.STDOUT: subprocess output.
    """
    if suppress_shell_output:
        command_str = f"{command_str} >/dev/null 2>&1"
    else:
        command_str = command_str

    output = subprocess.run(
        command_str, shell=True, cwd=work_dir, capture_output=return_output, stdout=None
    )
    return output

Module wrappers

Simple CLI wrappers to several tools

get_percent_identity_from_msa(input_msa, output_file=None)

Run esl-alipid to compute pairwise PI from a MSA.

Parameters:

Name Type Description Default
input_msa Path

path to input MSA file

required
output_file Path

path to output file. Defaults to None.

None
Source code in src/metatag/wrappers.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def get_percent_identity_from_msa(input_msa: Path, output_file: Path = None) -> None:
    """Run esl-alipid to compute pairwise PI from a MSA.

    Args:
        input_msa (Path): path to input MSA file
        output_file (Path, optional): path to output file.
            Defaults to None.
    """
    input_msa = Path(input_msa).resolve()
    if output_file is None:
        output_file = set_default_output_path(input_msa, tag="_PI", extension=".txt")
    else:
        output_file = Path(output_file).resolve()
    cmd_str = f"esl-alipid {input_msa} > {output_file}"
    terminal_execute(cmd_str, suppress_shell_output=False)

remove_auxiliary_output(output_prefix)

Removes iqtree auxiliary output files

Parameters:

Name Type Description Default
output_prefix str

prefix of output files

required
Source code in src/metatag/wrappers.py
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
def remove_auxiliary_output(output_prefix: str):
    """Removes iqtree auxiliary output files

    Args:
        output_prefix (str): prefix of output files
    """
    exts_to_remove = [
        ".bionj",
        ".ckp.gz",
        ".iqtree",
        ".log",
        ".model.gz",
        ".splits.nex",
        ".mldist",
    ]
    files_to_remove = [Path(output_prefix + ext) for ext in exts_to_remove]
    for file_path in files_to_remove:
        file_path.unlink(missing_ok=True)

run_cdhit(input_fasta, output_fasta=None, additional_args=None)

Simple CLI wrapper to cd-hit to obtain representative sequences CD-HIT may be used to remove duplicated sequences (keeps one representative) with parameters -c 1 -t 1.

Parameters:

Name Type Description Default
input_fasta Path

path to input fasta file

required
output_fasta Path

path to output file. Defaults to None.

None
additional_args str

additional arguments to cdhit. Defaults to None.

None
Source code in src/metatag/wrappers.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
def run_cdhit(
    input_fasta: Path, output_fasta: Path = None, additional_args: str = None
) -> None:
    """Simple CLI wrapper to cd-hit to obtain representative sequences
    CD-HIT may be used to remove duplicated sequences (keeps one representative)
    with parameters -c 1 -t 1.

    Args:
        input_fasta (Path): path to input fasta file
        output_fasta (Path, optional): path to output file. Defaults to None.
        additional_args (str, optional): additional arguments to cdhit.
            Defaults to None.
    """
    input_fasta = Path(input_fasta).resolve()
    if output_fasta is None:
        output_fasta = set_default_output_path(input_fasta, "_cdhit")
    else:
        output_fasta = Path(output_fasta).resolve()
    if additional_args is None:
        additional_args = ""
    cmd_str = f"cd-hit -i {input_fasta} -o {output_fasta} {additional_args}"
    terminal_execute(cmd_str, suppress_shell_output=True)

run_epang(input_tree, input_aln_ref, input_aln_query, model=None, output_dir=None, processes=None, overwrite_previous_results=True, additional_args=None)

Simple CLI wrapper to EPA-ng See epa-ng -h for additional parameters input_tree: newick format input_aln: fasta format input_aln_query: fasta format (sequences must be alignned to reference msa fasta and have the same length as the reference msa alignment) epa-ng: https://github.com/Pbdas/epa-ng

Parameters:

Name Type Description Default
input_tree Path

path to tree file in newick format

required
input_aln_ref Path

path to reference alignment in fasta format

required
input_aln_query Path

path to query alignment in fasta format

required
model str

substitution model employed to infer tree. Defaults to None.

None
output_dir Path

path to output directory. Defaults to None.

None
n_threads int

maximum number of processes. Defaults to None.

required
overwrite_previous_results bool

whether to overwrite result files of a previous run. Defaults to True.

True
additional_args str

additional arguments to epa-ng. Defaults to None.

None
Source code in src/metatag/wrappers.py
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
def run_epang(
    input_tree: Path,
    input_aln_ref: Path,
    input_aln_query: Path,
    model: str = None,
    output_dir: Path = None,
    processes: int = None,
    overwrite_previous_results: bool = True,
    additional_args: str = None,
) -> None:
    """Simple CLI wrapper to EPA-ng
    See epa-ng -h for additional parameters
    input_tree: newick format
    input_aln: fasta format
    input_aln_query: fasta format (sequences must be alignned to reference
    msa fasta and have the same length as the reference msa alignment)
    epa-ng: https://github.com/Pbdas/epa-ng

    Args:
        input_tree (Path): path to tree file in newick format
        input_aln_ref (Path): path to reference alignment in fasta format
        input_aln_query (Path): path to query alignment in fasta format
        model (str, optional): substitution model employed to infer tree.
            Defaults to None.
        output_dir (Path, optional): path to output directory. Defaults to None.
        n_threads (int, optional): maximum number of processes. Defaults to None.
        overwrite_previous_results (bool, optional): whether to overwrite result
            files of a previous run. Defaults to True.
        additional_args (str, optional): additional arguments to epa-ng. Defaults to None.
    """
    input_tree = Path(input_tree).resolve()
    input_aln_ref = Path(input_aln_ref).resolve()
    input_aln_query = Path(input_aln_query).resolve()
    if output_dir is None:
        output_dir = input_tree.parent
    else:
        output_dir = Path(output_dir).resolve()
    if model is None:
        model = "GTR+G"
    if processes is None:
        processes = os.cpu_count() - 1
    if overwrite_previous_results:
        overwrite_str = "--redo"
    else:
        overwrite_str = ""
    if additional_args is not None:
        args_str = additional_args
    else:
        args_str = ""

    cmd_str = (
        f"epa-ng --ref-msa {input_aln_ref} --tree {input_tree} --query {input_aln_query} "
        f"--model {model} --threads {processes} --outdir {output_dir} {overwrite_str} {args_str}"
    )
    terminal_execute(cmd_str, suppress_shell_output=True)

run_fasttree(input_algns, output_file=None, nucleotides=False, starting_tree=None, quiet=True, additional_args=None)

Simple CLI wrapper to fasttree. fasttree accepts multiple alignments in fasta or phylip formats It seems that fasttree does not allow inputing subsitution model. Default substitution model for protein seqs is JTT

Parameters:

Name Type Description Default
input_algns Path

path to input fasta file

required
output_file Path

path to output file. Defaults to None.

None
nucleotides bool

whether data is DNA. Defaults to False.

False
starting_tree str

path to starting tree to help inference. Defaults to None.

None
additional_args str

additional arguments to fasttree. Defaults to None.

None
Source code in src/metatag/wrappers.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
def run_fasttree(
    input_algns: Path,
    output_file: Path = None,
    nucleotides: bool = False,
    starting_tree: Path = None,
    quiet: bool = True,
    additional_args: str = None,
) -> None:
    """Simple CLI wrapper to fasttree.
    fasttree accepts multiple alignments in fasta or phylip formats
    It seems that fasttree does not allow inputing subsitution model.
    Default substitution model for protein seqs is JTT

    Args:
        input_algns (Path): path to input fasta file
        output_file (Path, optional): path to output file. Defaults to None.
        nucleotides (bool, optional): whether data is DNA. Defaults to False.
        starting_tree (str, optional): path to starting tree to help inference.
            Defaults to None.
        additional_args (str, optional): additional arguments to fasttree.
            Defaults to None.
    """
    input_algns = Path(input_algns).resolve()
    if output_file is None:
        output_file = set_default_output_path(
            input_algns, tag="_fasttree", extension=".newick"
        )
    else:
        output_file = Path(output_file).resolve()
    if nucleotides:
        nt_str = "-gtr -nt"
    else:
        nt_str = ""
    if starting_tree is not None:
        start_t_str = f"-intree {starting_tree}"
    else:
        start_t_str = ""
    if additional_args is None:
        additional_args = ""
    if quiet:
        additional_args += " -quiet"
    cmd_str = f"fasttree {additional_args} {nt_str} {input_algns} {start_t_str} > {output_file}"
    terminal_execute(cmd_str, suppress_shell_output=False)

run_gappa_assign(jplace, taxonomy_file, output_dir=None, output_prefix=None, only_best_hit=True, additional_args=None, delete_output_tree=True)

Use gappa examine assign to assign taxonomy to placed query sequences based on taxonomy assigned to tree reference sequences

argument: --resolve-missing-paths alongside --root-outgroup can be added to find missing taxonomic info in labels.

Info: https://github.com/lczech/gappa/wiki/Subcommand:-assign

Parameters:

Name Type Description Default
jplace Path

path to jplace file

required
taxonomy_file Path

path to taxonomy file as required by gappa assign

required
output_dir Path

path to output directory. Defaults to None.

None
output_prefix str

prefix to be added to output files. Defaults to None.

None
only_best_hit bool

return only best hit (highest LWR). Defaults to True.

True
additional_args str

additional arguments to gappa assign. Defaults to None.

None
delete_output_tree bool

whether to delete gappa assign output tree. Defaults to True.

True
Source code in src/metatag/wrappers.py
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
def run_gappa_assign(
    jplace: Path,
    taxonomy_file: Path,
    output_dir: Path = None,
    output_prefix: str = None,
    only_best_hit: bool = True,
    additional_args: str = None,
    delete_output_tree: bool = True,
) -> None:
    """Use gappa examine assign to assign taxonomy to placed query sequences
    based on taxonomy assigned to tree reference sequences

    argument: --resolve-missing-paths alongside --root-outgroup can be
    added to find missing taxonomic info in labels.

    Info: https://github.com/lczech/gappa/wiki/Subcommand:-assign

    Args:
        jplace (Path): path to jplace file
        taxonomy_file (Path): path to taxonomy file as required by gappa assign
        output_dir (Path, optional): path to output directory. Defaults to None.
        output_prefix (str, optional): prefix to be added to output files.
            Defaults to None.
        only_best_hit (bool, optional): return only best hit (highest LWR).
            Defaults to True.
        additional_args (str, optional): additional arguments to gappa assign.
            Defaults to None.
        delete_output_tree (bool, optional): whether to delete gappa assign
            output tree. Defaults to True.
    """
    jplace = Path(jplace).resolve()
    taxonomy_file = Path(taxonomy_file).resolve()
    if output_dir is None:
        output_dir = set_default_output_path(jplace, only_dirname=True)
    else:
        output_dir = Path(output_dir).resolve()
    if output_prefix is None:
        output_prefix = set_default_output_path(jplace, only_filename=True)
    if only_best_hit:
        best_str = "--best-hit"
    else:
        best_str = ""
    if additional_args is not None:
        args_str = additional_args
    else:
        args_str = ""

    cmd_str = (
        f"gappa examine assign --jplace-path {jplace} --taxon-file {taxonomy_file} "
        f"--out-dir {output_dir} --file-prefix {output_prefix} --allow-file-overwriting "
        f"--per-query-results {best_str} {args_str}"
    )
    terminal_execute(cmd_str, suppress_shell_output=True)
    outtree = output_dir / f"{output_prefix}labelled_tree.newick"
    if delete_output_tree:
        outtree.unlink(missing_ok=True)

run_gappa_graft(input_jplace, output_dir=None, output_prefix=None, additional_args=None)

Run gappa examine graft to obtain tree with placements in newick format

Parameters:

Name Type Description Default
input_jplace Path

path to jplace file

required
output_dir Path

path to output directory. Defaults to None.

None
output_prefix str

prefix to be added to output files. Defaults to None.

None
additional_args str

additional arguments to gappa. Defaults to None.

None
Source code in src/metatag/wrappers.py
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
def run_gappa_graft(
    input_jplace: Path,
    output_dir: Path = None,
    output_prefix: str = None,
    additional_args: str = None,
) -> None:
    """Run gappa examine graft to obtain tree with placements in
    newick format

    Args:
        input_jplace (Path): path to jplace file
        output_dir (Path, optional): path to output directory. Defaults to None.
        output_prefix (str, optional): prefix to be added to output files.
            Defaults to None.
        additional_args (str, optional): additional arguments to gappa. Defaults to None.
    """
    input_jplace = Path(input_jplace).resolve()
    if output_dir is None:
        outdir_str = ""
    else:
        outdir_str = f"--out-dir {Path(output_dir)}"
    if output_prefix is None:
        output_prefix_str = ""
    else:
        output_prefix_str = f"--file-prefix {output_prefix}"
    if additional_args is None:
        args_str = ""
    else:
        args_str = additional_args

    cmd_str = (
        f"gappa examine graft --jplace-path {input_jplace} "
        f"{outdir_str} {output_prefix_str} {args_str}"
    )
    terminal_execute(cmd_str, suppress_shell_output=True)

run_hmmalign(input_hmm, input_aln, input_seqs, output_aln_seqs=None, additional_args=None)

Simple CLI wrapper to hmmalign Align short read query sequences to reference MSA

Parameters:

Name Type Description Default
input_hmm Path

path to input hmm

required
input_aln Path

path to input alignment file

required
input_seqs Path

path to input sequences to be aligned

required
output_aln_seqs Path

path to output alignment. Defaults to None.

None
additional_args str

additional arguments to hmmalign. Defaults to None.

None
Source code in src/metatag/wrappers.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
def run_hmmalign(
    input_hmm: Path,
    input_aln: Path,
    input_seqs: Path,
    output_aln_seqs: Path = None,
    additional_args: str = None,
) -> None:
    """Simple CLI wrapper to hmmalign
    Align short read query sequences to reference MSA

    Args:
        input_hmm (Path): path to input hmm
        input_aln (Path): path to input alignment file
        input_seqs (Path): path to input sequences to be aligned
        output_aln_seqs (Path, optional): path to output alignment.
            Defaults to None.
        additional_args (str, optional): additional arguments to hmmalign.
            Defaults to None.
    """
    input_hmm = Path(input_hmm).resolve()
    input_aln = Path(input_aln).resolve()
    input_seqs = Path(input_seqs).resolve()
    if output_aln_seqs is None:
        output_aln_seqs = set_default_output_path(input_seqs, "_hmm", extension=".aln")
    else:
        output_aln_seqs = Path(output_aln_seqs).resolve()
    if additional_args is not None:
        args_str = additional_args
    else:
        args_str = ""
    cmd_str = (
        f"hmmalign -o {output_aln_seqs} --mapali {input_aln} --trim "
        f"--informat fasta {args_str} {input_hmm} {input_seqs}"
    )
    terminal_execute(cmd_str, suppress_shell_output=True)

run_hmmbuild(input_aln, output_hmm=None, additional_args=None)

Simple CLI wrapper to hmmbuild (build HMM profile from MSA file) additional args: see hmmbuild -h

Parameters:

Name Type Description Default
input_aln Path

path to input alignment file

required
output_hmm Path

path to output hmm. Defaults to None.

None
additional_args str

additional arguments to hmmbuild. Defaults to None.

None
Source code in src/metatag/wrappers.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def run_hmmbuild(
    input_aln: Path, output_hmm: Path = None, additional_args: str = None
) -> None:
    """Simple CLI wrapper to hmmbuild (build HMM profile from MSA file)
    additional args: see hmmbuild -h

    Args:
        input_aln (Path): path to input alignment file
        output_hmm (Path, optional): path to output hmm. Defaults to None.
        additional_args (str, optional): additional arguments to hmmbuild.
            Defaults to None.
    """
    input_aln = Path(input_aln).resolve()
    if output_hmm is None:
        output_hmm = set_default_output_path(input_aln, extension=".hmm")
    else:
        output_hmm = Path(output_hmm).resolve()
    if additional_args is not None:
        args_str = additional_args
    else:
        args_str = ""
    cmd_str = f"hmmbuild {args_str} {output_hmm} {input_aln}"
    terminal_execute(cmd_str, suppress_shell_output=True)

run_hmmsearch(hmm_model, input_fasta, output_file=None, method='hmmsearch', processes=None, additional_args=None)

Simple CLI wrapper to hmmsearch or hmmscan

Parameters:

Name Type Description Default
hmm_model Path

path to hmm model

required
input_fasta Path

path to input fasta file

required
output_file Path

path to output file. Defaults to None.

None
method str

choose method: "hmmscan" or "hmmsearcg". Defaults to "hmmsearch".

'hmmsearch'
n_processes int

maximum number of processes. Defaults to None.

required
additional_args str

additional arguments to hmmsearch/scan. Defaults to None.

None
Source code in src/metatag/wrappers.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def run_hmmsearch(
    hmm_model: Path,
    input_fasta: Path,
    output_file: Path = None,
    method: str = "hmmsearch",
    processes: int = None,
    additional_args: str = None,
) -> None:
    """Simple CLI wrapper to hmmsearch or hmmscan

    Args:
        hmm_model (Path): path to hmm model
        input_fasta (Path): path to input fasta file
        output_file (Path, optional): path to output file. Defaults to None.
        method (str, optional): choose method: "hmmscan" or "hmmsearcg".
            Defaults to "hmmsearch".
        n_processes (int, optional): maximum number of processes. Defaults to None.
        additional_args (str, optional): additional arguments to hmmsearch/scan.
            Defaults to None.
    """
    hmm_model = Path(hmm_model).resolve()
    input_fasta = Path(input_fasta).resolve()
    if output_file is None:
        output_file = set_default_output_path(input_fasta, "_hmmer_hits", ".txt")
    else:
        output_file = Path(output_file).resolve()
    if processes is None:
        processes = os.cpu_count() - 1
    if additional_args is not None:
        args_str = additional_args
    else:
        args_str = ""
    cmd_str = (
        f"{method} --tblout {output_file} {args_str} --cpu {processes} "
        f"{hmm_model} {input_fasta}"
    )
    terminal_execute(cmd_str, suppress_shell_output=True)

run_iqtree(input_algns, output_dir=None, output_prefix=None, keep_recovery_files=False, nucleotides=False, processes=None, substitution_model='TEST', starting_tree=None, bootstrap_replicates=1000, max_bootstrap_iterations=1000, overwrite_previous_results=True, quiet=True, additional_args=None)

Simple CLI wrapper to iqtree. iqtree accepts multiple alignments in fasta or phylip formats.

a string containing additional parameters and

parameter values to be passed to iqtree

output: iqtree outputs several files

Reducing computational time via model selection: http://www.iqtree.org/doc/Command-Reference

Parameters:

Name Type Description Default
input_algns Path

path to input fasta file

required
output_dir Path

path to output file. Defaults to None.

None
output_prefix str

prefix to be added to output files. Defaults to None.

None
keep_recovery_files bool

whether to keep recovery files. Defaults to False.

False
nucleotides bool

whether sequence data are DNA. Defaults to False.

False
n_processes int

maximum number of processes. Defaults to None.

required
substitution_model str

substitution model employed to infer tree. Defaults to "TEST".

'TEST'
starting_tree Path

path to tree file with starting tree to help inference. Defaults to None.

None
bootstrap_replicates int

number of bootstrap replicates. Defaults to 1000.

1000
max_bootstrap_iterations int

maximum number of bootstrap iterations. Defaults to 1000.

1000
overwrite_previous_results bool

whether to overwrite results of a previous run. Defaults to True.

True
additional_args str

additional arguments to iqtree. Defaults to None.

None
Source code in src/metatag/wrappers.py
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
def run_iqtree(
    input_algns: Path,
    output_dir: Path = None,
    output_prefix: str = None,
    keep_recovery_files: bool = False,
    nucleotides: bool = False,
    processes: int = None,
    substitution_model: str = "TEST",
    starting_tree: Path = None,
    bootstrap_replicates: int = 1000,
    max_bootstrap_iterations: int = 1000,
    overwrite_previous_results: bool = True,
    quiet: bool = True,
    additional_args: str = None,
) -> None:
    """Simple CLI wrapper to iqtree.
    iqtree accepts multiple alignments in fasta or phylip formats.

    additional_args: a string containing additional parameters and
                     parameter values to be passed to iqtree

    output: iqtree outputs several files

    Reducing computational time via model selection:
    http://www.iqtree.org/doc/Command-Reference

    Args:
        input_algns (Path): path to input fasta file
        output_dir (Path, optional): path to output file. Defaults to None.
        output_prefix (str, optional): prefix to be added to output files.
            Defaults to None.
        keep_recovery_files (bool, optional): whether to keep recovery files.
            Defaults to False.
        nucleotides (bool, optional): whether sequence data are DNA.
            Defaults to False.
        n_processes (int, optional): maximum number of processes. Defaults to None.
        substitution_model (str, optional): substitution model employed to infer tree.
            Defaults to "TEST".
        starting_tree (Path, optional): path to tree file with starting tree to
            help inference. Defaults to None.
        bootstrap_replicates (int, optional): number of bootstrap replicates.
            Defaults to 1000.
        max_bootstrap_iterations (int, optional): maximum number of bootstrap iterations.
            Defaults to 1000.
        overwrite_previous_results (bool, optional): whether to overwrite results
            of a previous run. Defaults to True.
        additional_args (str, optional): additional arguments to iqtree. Defaults to None.
    """
    input_algns = Path(input_algns).resolve()
    if output_dir is None:
        output_dir = input_algns.parent
    else:
        output_dir = Path(output_dir).resolve()
    if output_prefix is None:
        input_file = input_algns.name
        output_prefix = (output_dir / input_file).as_posix()
        output_prefix_str = f"-pre {output_prefix}"
    else:
        output_prefix = (output_dir / output_prefix).as_posix()
        output_prefix_str = f"-pre {output_prefix}"
    if nucleotides:
        seq_type = "DNA"
    else:
        seq_type = "AA"
    if processes is None:
        processes = "AUTO"
    if starting_tree is not None:
        start_t_str = f"-t {starting_tree}"
    else:
        start_t_str = ""
    if overwrite_previous_results:
        overwrite_str = "-redo"
    else:
        overwrite_str = ""
    if additional_args is None:
        additional_args = ""
    if quiet:
        additional_args += " --quiet"

    cmd_str = (
        f"iqtree -s {input_algns} -st {seq_type} -nt {processes} "
        f"-m {substitution_model} -bb {bootstrap_replicates} -mset raxml "
        f"-nm {max_bootstrap_iterations} "
        f"{output_prefix_str} {start_t_str} {overwrite_str} {additional_args}"
    )
    terminal_execute(cmd_str, suppress_shell_output=True)
    if not keep_recovery_files:
        remove_auxiliary_output(output_prefix)

run_mafft(input_fasta, output_file=None, processes=-1, parallel=True, quiet=True, additional_args=None)

Simple CLI wrapper to mafft (MSA)

Manual: https://mafft.cbrc.jp/alignment/software/manual/manual.html

CLI examples: mafft --globalpair --thread n in > out mafft --localpair --thread n in > out mafft --large --globalpair --thread n in > out

Parameters:

Name Type Description Default
input_fasta Path

path to input fasta file

required
output_file Path

path to output file. Defaults to None.

None
n_threads int

maximum number of processes. Defaults to -1.

required
parallel bool

description. Defaults to True.

True
additional_args str

additional arguments to mafft. Defaults to None.

None
Source code in src/metatag/wrappers.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def run_mafft(
    input_fasta: Path,
    output_file: Path = None,
    processes: int = -1,
    parallel: bool = True,
    quiet: bool = True,
    additional_args: str = None,
) -> None:
    """Simple CLI wrapper to mafft (MSA)

    Manual: https://mafft.cbrc.jp/alignment/software/manual/manual.html

    CLI examples:
    mafft --globalpair --thread n in > out
    mafft --localpair --thread n in > out
    mafft --large --globalpair --thread n in > out

    Args:
        input_fasta (Path): path to input fasta file
        output_file (Path, optional): path to output file. Defaults to None.
        n_threads (int, optional): maximum number of processes. Defaults to -1.
        parallel (bool, optional): _description_. Defaults to True.
        additional_args (str, optional): additional arguments to mafft.
            Defaults to None.
    """
    input_fasta = Path(input_fasta).resolve()
    if output_file is None:
        output_file = set_default_output_path(
            input_fasta, extension=".fasta.aln", only_filename=True
        )
    else:
        output_file = Path(output_file).resolve()
    if parallel:
        thread_str = f"--thread {processes}"
    else:
        thread_str = ""
    if additional_args is None:
        additional_args = ""
    if quiet:
        additional_args += " --quiet"
    cmd_str = f"mafft {thread_str} {additional_args} {input_fasta} > {output_file}"
    terminal_execute(cmd_str, suppress_shell_output=False)

run_muscle(input_fasta, output_file=None, maxiters=None, quiet=True, additional_args=None)

Simple CLI wrapper to muscle (MSA) muscle: https://www.drive5.com/muscle/manual/output_formats.html

output phylip and fasta.aln

Parameters:

Name Type Description Default
input_fasta Path

path to input fasta file

required
output_file Path

path to output file. Defaults to None.

None
maxiters int

maximum number of iterations. Defaults to None.

None
additional_args str

additional arguments to muscle. Defaults to None.

None
Source code in src/metatag/wrappers.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
def run_muscle(
    input_fasta: Path,
    output_file: Path = None,
    maxiters: int = None,
    quiet: bool = True,
    additional_args: str = None,
) -> None:
    """Simple CLI wrapper to muscle (MSA)
    muscle: https://www.drive5.com/muscle/manual/output_formats.html

    output phylip and fasta.aln

    Args:
        input_fasta (Path): path to input fasta file
        output_file (Path, optional): path to output file. Defaults to None.
        maxiters (int, optional): maximum number of iterations. Defaults to None.
        additional_args (str, optional): additional arguments to muscle. Defaults to None.
    """
    input_fasta = Path(input_fasta).resolve()
    if output_file is None:
        output_file = set_default_output_path(
            input_fasta, extension=".fasta.aln", only_filename=True
        )
    else:
        output_file = Path(output_file).resolve()
    if maxiters is None:
        maxiters = 2
    if additional_args is not None:
        args_str = additional_args
    else:
        args_str = ""
    if quiet:
        args_str += " -quiet"
    cmd_str = (
        f"muscle -in {input_fasta} -out {output_file} "
        f"-maxiters {maxiters} {args_str}"
    )
    terminal_execute(cmd_str, suppress_shell_output=False)

run_papara(tree_nwk, msa_phy, query_fasta, output_aln=None, additional_args=None)

Simple CLI wrapper to Papara. Output lignment in fasta format

Run Papara to do query alignment to reference MSA and tree (required for EPA-ng) Alignment could be done with hmmalign or muscle as well, but these tools don't consider the tree during alignment (would this be a justified improvement over hmmalign?)

There seems to be a problem with enabling multithreading in papara when run as a static executable. It looks like it has to be enabled during compilation (but compilation currently not working): https://stackoverflow.com/questions/19618926/thread-doesnt-work-with-an-error-enable-multithreading-to-use-stdthread-ope

Parameters:

Name Type Description Default
tree_nwk Path

path to tree file in newick format

required
msa_phy Path

path to reference alignment in phylip format

required
query_fasta Path

path to query fasta file

required
output_aln Path

path to output alignment. Defaults to None.

None
additional_args str

additional arguments to papara. Defaults to None.

None
Source code in src/metatag/wrappers.py
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
def run_papara(
    tree_nwk: Path,
    msa_phy: Path,
    query_fasta: Path,
    output_aln: Path = None,
    additional_args: str = None,
) -> None:
    """Simple CLI wrapper to Papara. Output lignment in fasta format

    Run Papara to do query alignment to reference MSA and tree (required for EPA-ng)
    Alignment could be done with hmmalign or muscle as well, but these tools don't
    consider the tree during alignment (would this be a justified improvement over hmmalign?)

    There seems to be a problem with enabling multithreading in papara when run as a static
    executable. It looks like it has to be enabled during compilation (but compilation currently not working):
    https://stackoverflow.com/questions/19618926/thread-doesnt-work-with-an-error-enable-multithreading-to-use-stdthread-ope

    Args:
        tree_nwk (Path): path to tree file in newick format
        msa_phy (Path): path to reference alignment in phylip format
        query_fasta (Path): path to query fasta file
        output_aln (Path, optional): path to output alignment. Defaults to None.
        additional_args (str, optional): additional arguments to papara. Defaults to None.
    """
    tree_nwk = Path(tree_nwk).resolve()
    msa_phy = Path(msa_phy).resolve()
    query_fasta = Path(query_fasta).resolve()

    if output_aln is None:
        output_aln = set_default_output_path(
            query_fasta, tag="_ref_aln", extension=".phylip"
        )
    else:
        output_aln = Path(output_aln).resolve()
    if additional_args is not None:
        args_str = additional_args
    else:
        args_str = ""
    cmd_str = (
        f"{papara_exec} -t {tree_nwk} "
        f"-s {msa_phy} -q {query_fasta} -n phylip "
        f"-r {args_str} -a"
    )
    with tempfile.TemporaryDirectory() as tempdir:
        terminal_execute(cmd_str, suppress_shell_output=True, work_dir=tempdir)
        shutil.move(os.path.join(tempdir, "papara_alignment.phylip"), output_aln)

run_prodigal(input_file, output_prefix=None, output_dir=None, metagenome=False, additional_args=None)

Simple CLI wrapper to prodigal

Parameters:

Name Type Description Default
input_file Path

path to input file

required
output_prefix str

prefix to be preppended to output files. Defaults to None.

None
output_dir Path

path to output directory. Defaults to None.

None
metagenome bool

whether original sequences are metagenomic. Defaults to False.

False
additional_args str

additional arguments to prodigal. Defaults to None.

None
Source code in src/metatag/wrappers.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def run_prodigal(
    input_file: Path,
    output_prefix: str = None,
    output_dir: Path = None,
    metagenome: bool = False,
    additional_args: str = None,
):
    """Simple CLI wrapper to prodigal

    Args:
        input_file (Path): path to input file
        output_prefix (str, optional): prefix to be preppended to output files.
            Defaults to None.
        output_dir (Path, optional): path to output directory. Defaults to None.
        metagenome (bool, optional): whether original sequences are metagenomic.
            Defaults to False.
        additional_args (str, optional): additional arguments to prodigal.
            Defaults to None.
    """
    input_file = Path(input_file).resolve()
    if output_dir is None:
        output_dir = set_default_output_path(input_file, only_dirname=True)
    else:
        output_dir = Path(output_dir).resolve()
    if output_prefix is None:
        output_prefix = set_default_output_path(input_file, only_filename=True)
    if metagenome:
        procedure = "meta"
    else:
        procedure = "single"
    if additional_args is not None:
        args_str = additional_args
    else:
        args_str = ""
    output_gbk = output_dir / f"{output_prefix}.gbk"
    output_fasta = output_dir / f"{output_prefix}.faa"
    cmd_str = (
        f"prodigal -i {input_file} -o {output_gbk} -p {procedure} "
        f"-a {output_fasta} -q {args_str}"
    )
    terminal_execute(cmd_str, suppress_shell_output=True)

run_seqkit_nodup(input_fasta, output_fasta=None, export_duplicates=False, duplicates_file=None)

Simpe CLI wrapper to seqkit rmdup

Parameters:

Name Type Description Default
input_fasta Path

path to input fasta file

required
output_fasta Path

path to output file. Defaults to None.

None
export_duplicates bool

whether to export duplicated sequences. Defaults to False.

False
duplicates_file Path

path to output file containing duplicates. Defaults to None.

None
Source code in src/metatag/wrappers.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def run_seqkit_nodup(
    input_fasta: Path,
    output_fasta: Path = None,
    export_duplicates: bool = False,
    duplicates_file: Path = None,
):
    """Simpe CLI wrapper to seqkit rmdup

    Args:
        input_fasta (Path): path to input fasta file
        output_fasta (Path, optional): path to output file. Defaults to None.
        export_duplicates (bool, optional): whether to export duplicated sequences.
            Defaults to False.
        duplicates_file (Path, optional): path to output file containing duplicates.
            Defaults to None.
    """
    if output_fasta is None:
        output_fasta = set_default_output_path(input_fasta, tag="_no_duplicates")
    else:
        output_fasta = Path(output_fasta).resolve()
    if export_duplicates:
        if duplicates_file is None:
            dup_file = set_default_output_path(
                input_fasta, tag="_duplicates", extension=".txt"
            )
        else:
            dup_file = duplicates_file
        dup_str = f"-D {dup_file}"
    else:
        dup_str = ""
    cmd_str = f"seqkit rmdup {input_fasta} --quiet -s {dup_str} -o {output_fasta}"
    terminal_execute(cmd_str, suppress_shell_output=True)