Module api

Classes to facilitate usage within Python scripts / Notebooks

Build

Bases: Command

Build command object.

Source code in src/pynteny/api.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
class Build(Command):
    """Build command object."""

    def __init__(
        self,
        data: Path,
        prepend: bool = False,
        outfile: Path = None,
        logfile: Path = None,
        processes: int = None,
        tempdir: Path = None,
    ):
        """Translate nucleotide assembly file and assign contig and gene location
           info to each identified ORF (using prodigal).

        Args:
            data (Path): _description_
            outfile (Path, optional): path to file containing built database. Defaults to None.
            logfile (Path, optional): path to logfile. Defaults to None.
            processes (int, optional): maximum number of processes. Defaults to all minus one.
            tempdir (Path, optional): path to directory to contain temporary files. Defaults to None.
        """

        self._args = CommandArgs(
            data=Path(data),
            prepend=prepend,
            outfile=Path(outfile) if outfile is not None else outfile,
            logfile=Path(logfile) if logfile is not None else logfile,
            processes=processes,
            tempdir=Path(tempdir) if tempdir is not None else tempdir,
        )

    def run(self) -> None:
        """Run pynteny search"""
        return build_database(self._args)

__init__(data, prepend=False, outfile=None, logfile=None, processes=None, tempdir=None)

Translate nucleotide assembly file and assign contig and gene location info to each identified ORF (using prodigal).

Parameters:
  • data (Path) –

    description

  • outfile (Path, default: None ) –

    path to file containing built database. Defaults to None.

  • logfile (Path, default: None ) –

    path to logfile. Defaults to None.

  • processes (int, default: None ) –

    maximum number of processes. Defaults to all minus one.

  • tempdir (Path, default: None ) –

    path to directory to contain temporary files. Defaults to None.

Source code in src/pynteny/api.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def __init__(
    self,
    data: Path,
    prepend: bool = False,
    outfile: Path = None,
    logfile: Path = None,
    processes: int = None,
    tempdir: Path = None,
):
    """Translate nucleotide assembly file and assign contig and gene location
       info to each identified ORF (using prodigal).

    Args:
        data (Path): _description_
        outfile (Path, optional): path to file containing built database. Defaults to None.
        logfile (Path, optional): path to logfile. Defaults to None.
        processes (int, optional): maximum number of processes. Defaults to all minus one.
        tempdir (Path, optional): path to directory to contain temporary files. Defaults to None.
    """

    self._args = CommandArgs(
        data=Path(data),
        prepend=prepend,
        outfile=Path(outfile) if outfile is not None else outfile,
        logfile=Path(logfile) if logfile is not None else logfile,
        processes=processes,
        tempdir=Path(tempdir) if tempdir is not None else tempdir,
    )

run()

Run pynteny search

Source code in src/pynteny/api.py
207
208
209
def run(self) -> None:
    """Run pynteny search"""
    return build_database(self._args)

Command

Parent class for Pynteny command

Source code in src/pynteny/api.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class Command:
    """
    Parent class for Pynteny command

    Args:
        CommandArgs
    """

    def _repr_html_(self):
        """Executed by Jupyter to print Author and version in html"""
        return """
        <table>
            <tr>
                <td><strong>Pynteny version</strong></td><td>{version}</td>
            </tr><tr>
                <td><strong>Author</strong></td><td>{author}</td>
            </tr>
        </table>
        """.format(
            version=__version__, author=__author__
        )

    def update(self, argname: str, value: str) -> None:
        """Update argument value in pynteny command.

        Args:
            argname (str): argument name to be updated.
            value (str): new argument value.
        """
        setattr(self._args, argname, value)

    @staticmethod
    def cite():
        """Display Pynteny citation"""
        args = CommandArgs(version=__version__, author=__author__)
        citation = get_citation(args, silent=True)
        print(citation)

cite() staticmethod

Display Pynteny citation

Source code in src/pynteny/api.py
57
58
59
60
61
62
@staticmethod
def cite():
    """Display Pynteny citation"""
    args = CommandArgs(version=__version__, author=__author__)
    citation = get_citation(args, silent=True)
    print(citation)

update(argname, value)

Update argument value in pynteny command.

Parameters:
  • argname (str) –

    argument name to be updated.

  • value (str) –

    new argument value.

Source code in src/pynteny/api.py
48
49
50
51
52
53
54
55
def update(self, argname: str, value: str) -> None:
    """Update argument value in pynteny command.

    Args:
        argname (str): argument name to be updated.
        value (str): new argument value.
    """
    setattr(self._args, argname, value)

Download

Bases: Command

Download HMM database from NCBI.

Source code in src/pynteny/api.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
class Download(Command):
    """Download HMM database from NCBI."""

    def __init__(
        self,
        outdir: Path,
        logfile: Path = None,
        force: bool = False,
        unpack: bool = False,
    ):
        """Download HMM database from NCBI.

        Args:
            outdir (Path): path to ouput directory in which to store downloaded HMMs.
            logfile (Path, optional): path to log file. Defaults to None.
            force (bool, optional): force-download database again if already downloaded.
            unpack (bool, optional): whether to unpack downloaded file. If False, then PGAP's database.
                will be unpacked in each Pynteny session. Defaults to False.
        """

        self._args = CommandArgs(
            outdir=Path(outdir),
            logfile=Path(logfile) if logfile is not None else logfile,
            force=force,
            unpack=unpack,
        )

    def run(self) -> None:
        """Run pynteny download"""
        return download_hmms(self._args)

__init__(outdir, logfile=None, force=False, unpack=False)

Download HMM database from NCBI.

Parameters:
  • outdir (Path) –

    path to ouput directory in which to store downloaded HMMs.

  • logfile (Path, default: None ) –

    path to log file. Defaults to None.

  • force (bool, default: False ) –

    force-download database again if already downloaded.

  • unpack (bool, default: False ) –

    whether to unpack downloaded file. If False, then PGAP's database. will be unpacked in each Pynteny session. Defaults to False.

Source code in src/pynteny/api.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
def __init__(
    self,
    outdir: Path,
    logfile: Path = None,
    force: bool = False,
    unpack: bool = False,
):
    """Download HMM database from NCBI.

    Args:
        outdir (Path): path to ouput directory in which to store downloaded HMMs.
        logfile (Path, optional): path to log file. Defaults to None.
        force (bool, optional): force-download database again if already downloaded.
        unpack (bool, optional): whether to unpack downloaded file. If False, then PGAP's database.
            will be unpacked in each Pynteny session. Defaults to False.
    """

    self._args = CommandArgs(
        outdir=Path(outdir),
        logfile=Path(logfile) if logfile is not None else logfile,
        force=force,
        unpack=unpack,
    )

run()

Run pynteny download

Source code in src/pynteny/api.py
239
240
241
def run(self) -> None:
    """Run pynteny download"""
    return download_hmms(self._args)

Search

Bases: Command

Search command object.

Source code in src/pynteny/api.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
class Search(Command):
    """Search command object."""

    def __init__(
        self,
        data: Path,
        synteny_struc: str,
        gene_ids: bool = False,
        unordered: bool = False,
        best_hmm_wins: bool = False,
        reuse: bool = False,
        hmm_dir: Path = None,
        hmm_meta: Path = None,
        outdir: Path = None,
        prefix: str = "",
        hmmsearch_args: str = None,
        logfile: Path = None,
        processes: int = None,
    ):
        """Query sequence database for HMM hits arranged in provided synteny structure.

        Args:
            data (Path): path to input labelled database.
            synteny_struc (str): a str describing the desired synteny structure,
                structured as follows:

                '>hmm_a N_ab hmm_b bc <hmm_c'

                where N_ab corresponds to the maximum number of genes separating
                gene found by hmm_a and gene found by hmm_b, and hmm_ corresponds
                to the name of the hmm as provided in the keys of hmm_hits.
                More than two hmms can be concatenated. Strand location may be
                specificed by using '>' for sense and '<' for antisense.
            gene_ids (bool, optional): whether gene symbols are used in synteny
                string instead of HMM names. Defaults to False.
            unordered (bool, optional): whether the HMMs should be arranged in the
                exact same order displayed in the synteny_structure or in
                any order If ordered, the filters would filter collinear rather
                than syntenic structures. Defaults to False.
            best_hmm_wins (bool, optional): if True, when the same peptide is hit
                by more than one HMM (paralog cross-hits), keep only the
                highest-scoring HMM for that peptide. Defaults to False.
            reuse (bool, optional): if True then HMMER3 won't be run again for HMMs already
                searched in the same output directory. Defaults to False.
            hmm_dir (Path, optional): path to directory containing input HMM files.
                Defaults to None, in which case the PGAP database is downloaded if not
                already so.
            hmm_meta (Path, optional): path to PGAP's metadata file. Defaults to None.
            outdir (Path, optional): path to output directory. Defaults to None.
            prefix (str, optional): prefix of output file names. Defaults to "".
            hmmsearch_args (str, optional): additional arguments to hmmsearch or hmmscan. Each
                element in the list is a string with additional arguments for each
                input hmm (arranged in the same order), an element can also take a
                value of None to avoid passing additional arguments for a specific
                input hmm. A single string may also be passed, in which case the
                same additional argument is passed to hmmsearch for all input hmms.
                Defaults to None. Defaults to None.
            logfile (Path, optional): path to log file. Defaults to None.
            processes (int, optional): maximum number of threads to be employed.
                Defaults to all minus one.
        """

        self._args = CommandArgs(
            data=Path(data),
            synteny_struc=synteny_struc,
            hmm_dir=Path(hmm_dir) if hmm_dir is not None else hmm_dir,
            hmm_meta=Path(hmm_meta) if hmm_meta is not None else hmm_meta,
            outdir=Path(outdir) if outdir is not None else outdir,
            prefix=prefix,
            hmmsearch_args=hmmsearch_args,
            gene_ids=gene_ids,
            logfile=Path(logfile) if logfile is not None else logfile,
            processes=processes,
            unordered=unordered,
            best_hmm_wins=best_hmm_wins,
            reuse=reuse,
        )

    def parse_genes(self, synteny_struc: str) -> str:
        """Parse gene IDs in synteny structure and find corresponding HMM names
        in provided HMM database.

        Args:
            synteny_struc (str): a str describing the desired synteny structure,
                structured as follows:

                '>hmm_a N_ab hmm_b bc <hmm_c'

                where N_ab corresponds to the maximum number of genes separating
                gene found by hmm_a and gene found by hmm_b, and hmm_ corresponds
                to the name of the hmm as provided in the keys of hmm_hits.
                More than two hmms can be concatenated. Strand location may be
                specificed by using '>' for sense and '<' for antisense.

        Returns:
            str: parsed synteny structure in which gene symbols are replaced by
                 corresponding HMM names.
        """
        args = CommandArgs(
            synteny_struc=synteny_struc,
            hmm_meta=self._args.hmm_meta,
            logfile=self._args.logfile,
        )
        return parse_gene_ids(args)

    def run(self) -> SyntenyHits:
        """Run pynteny search"""
        return synteny_search(self._args)

__init__(data, synteny_struc, gene_ids=False, unordered=False, best_hmm_wins=False, reuse=False, hmm_dir=None, hmm_meta=None, outdir=None, prefix='', hmmsearch_args=None, logfile=None, processes=None)

Query sequence database for HMM hits arranged in provided synteny structure.

Parameters:
  • data (Path) –

    path to input labelled database.

  • synteny_struc (str) –

    a str describing the desired synteny structure, structured as follows:

    '>hmm_a N_ab hmm_b bc <hmm_c'

    where N_ab corresponds to the maximum number of genes separating gene found by hmm_a and gene found by hmm_b, and hmm_ corresponds to the name of the hmm as provided in the keys of hmm_hits. More than two hmms can be concatenated. Strand location may be specificed by using '>' for sense and '<' for antisense.

  • gene_ids (bool, default: False ) –

    whether gene symbols are used in synteny string instead of HMM names. Defaults to False.

  • unordered (bool, default: False ) –

    whether the HMMs should be arranged in the exact same order displayed in the synteny_structure or in any order If ordered, the filters would filter collinear rather than syntenic structures. Defaults to False.

  • best_hmm_wins (bool, default: False ) –

    if True, when the same peptide is hit by more than one HMM (paralog cross-hits), keep only the highest-scoring HMM for that peptide. Defaults to False.

  • reuse (bool, default: False ) –

    if True then HMMER3 won't be run again for HMMs already searched in the same output directory. Defaults to False.

  • hmm_dir (Path, default: None ) –

    path to directory containing input HMM files. Defaults to None, in which case the PGAP database is downloaded if not already so.

  • hmm_meta (Path, default: None ) –

    path to PGAP's metadata file. Defaults to None.

  • outdir (Path, default: None ) –

    path to output directory. Defaults to None.

  • prefix (str, default: '' ) –

    prefix of output file names. Defaults to "".

  • hmmsearch_args (str, default: None ) –

    additional arguments to hmmsearch or hmmscan. Each element in the list is a string with additional arguments for each input hmm (arranged in the same order), an element can also take a value of None to avoid passing additional arguments for a specific input hmm. A single string may also be passed, in which case the same additional argument is passed to hmmsearch for all input hmms. Defaults to None. Defaults to None.

  • logfile (Path, default: None ) –

    path to log file. Defaults to None.

  • processes (int, default: None ) –

    maximum number of threads to be employed. Defaults to all minus one.

Source code in src/pynteny/api.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def __init__(
    self,
    data: Path,
    synteny_struc: str,
    gene_ids: bool = False,
    unordered: bool = False,
    best_hmm_wins: bool = False,
    reuse: bool = False,
    hmm_dir: Path = None,
    hmm_meta: Path = None,
    outdir: Path = None,
    prefix: str = "",
    hmmsearch_args: str = None,
    logfile: Path = None,
    processes: int = None,
):
    """Query sequence database for HMM hits arranged in provided synteny structure.

    Args:
        data (Path): path to input labelled database.
        synteny_struc (str): a str describing the desired synteny structure,
            structured as follows:

            '>hmm_a N_ab hmm_b bc <hmm_c'

            where N_ab corresponds to the maximum number of genes separating
            gene found by hmm_a and gene found by hmm_b, and hmm_ corresponds
            to the name of the hmm as provided in the keys of hmm_hits.
            More than two hmms can be concatenated. Strand location may be
            specificed by using '>' for sense and '<' for antisense.
        gene_ids (bool, optional): whether gene symbols are used in synteny
            string instead of HMM names. Defaults to False.
        unordered (bool, optional): whether the HMMs should be arranged in the
            exact same order displayed in the synteny_structure or in
            any order If ordered, the filters would filter collinear rather
            than syntenic structures. Defaults to False.
        best_hmm_wins (bool, optional): if True, when the same peptide is hit
            by more than one HMM (paralog cross-hits), keep only the
            highest-scoring HMM for that peptide. Defaults to False.
        reuse (bool, optional): if True then HMMER3 won't be run again for HMMs already
            searched in the same output directory. Defaults to False.
        hmm_dir (Path, optional): path to directory containing input HMM files.
            Defaults to None, in which case the PGAP database is downloaded if not
            already so.
        hmm_meta (Path, optional): path to PGAP's metadata file. Defaults to None.
        outdir (Path, optional): path to output directory. Defaults to None.
        prefix (str, optional): prefix of output file names. Defaults to "".
        hmmsearch_args (str, optional): additional arguments to hmmsearch or hmmscan. Each
            element in the list is a string with additional arguments for each
            input hmm (arranged in the same order), an element can also take a
            value of None to avoid passing additional arguments for a specific
            input hmm. A single string may also be passed, in which case the
            same additional argument is passed to hmmsearch for all input hmms.
            Defaults to None. Defaults to None.
        logfile (Path, optional): path to log file. Defaults to None.
        processes (int, optional): maximum number of threads to be employed.
            Defaults to all minus one.
    """

    self._args = CommandArgs(
        data=Path(data),
        synteny_struc=synteny_struc,
        hmm_dir=Path(hmm_dir) if hmm_dir is not None else hmm_dir,
        hmm_meta=Path(hmm_meta) if hmm_meta is not None else hmm_meta,
        outdir=Path(outdir) if outdir is not None else outdir,
        prefix=prefix,
        hmmsearch_args=hmmsearch_args,
        gene_ids=gene_ids,
        logfile=Path(logfile) if logfile is not None else logfile,
        processes=processes,
        unordered=unordered,
        best_hmm_wins=best_hmm_wins,
        reuse=reuse,
    )

parse_genes(synteny_struc)

Parse gene IDs in synteny structure and find corresponding HMM names in provided HMM database.

Parameters:
  • synteny_struc (str) –

    a str describing the desired synteny structure, structured as follows:

    '>hmm_a N_ab hmm_b bc <hmm_c'

    where N_ab corresponds to the maximum number of genes separating gene found by hmm_a and gene found by hmm_b, and hmm_ corresponds to the name of the hmm as provided in the keys of hmm_hits. More than two hmms can be concatenated. Strand location may be specificed by using '>' for sense and '<' for antisense.

Returns:
  • str( str ) –

    parsed synteny structure in which gene symbols are replaced by corresponding HMM names.

Source code in src/pynteny/api.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
def parse_genes(self, synteny_struc: str) -> str:
    """Parse gene IDs in synteny structure and find corresponding HMM names
    in provided HMM database.

    Args:
        synteny_struc (str): a str describing the desired synteny structure,
            structured as follows:

            '>hmm_a N_ab hmm_b bc <hmm_c'

            where N_ab corresponds to the maximum number of genes separating
            gene found by hmm_a and gene found by hmm_b, and hmm_ corresponds
            to the name of the hmm as provided in the keys of hmm_hits.
            More than two hmms can be concatenated. Strand location may be
            specificed by using '>' for sense and '<' for antisense.

    Returns:
        str: parsed synteny structure in which gene symbols are replaced by
             corresponding HMM names.
    """
    args = CommandArgs(
        synteny_struc=synteny_struc,
        hmm_meta=self._args.hmm_meta,
        logfile=self._args.logfile,
    )
    return parse_gene_ids(args)

run()

Run pynteny search

Source code in src/pynteny/api.py
170
171
172
def run(self) -> SyntenyHits:
    """Run pynteny search"""
    return synteny_search(self._args)

Module filter

Tools to filter HMM hits by synteny structure

SyntenyHMMfilter

Tools to search for synteny structures among sets of hmm models

Source code in src/pynteny/filter.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
class SyntenyHMMfilter:
    """Tools to search for synteny structures among sets of hmm models"""

    def __init__(
        self,
        hmm_hits: dict,
        synteny_structure: str,
        unordered: bool = True,
        best_hmm_wins: bool = False,
    ) -> None:
        """Search for contigs that satisfy the given gene synteny structure.

        Args:
            hmm_hits (dict): a dict of pandas DataFrames, as output by
                parseHMMsearchOutput with keys corresponding to hmm names
            synteny_structure (str): a str describing the desired synteny structure,
                structured as follows:

                '>hmm_a N_ab hmm_b bc <hmm_c'

                where N_ab corresponds to the maximum number of genes separating
                gene found by hmm_a and gene found by hmm_b, and hmm_ corresponds
                to the name of the hmm as provided in the keys of hmm_hits.
                More than two hmms can be concatenated. Strand location may be
                specificed by using '>' for sense and '<' for antisense.
            unordered (bool, optional): whether the HMMs should be arranged in the
                exact same order displayed in the synteny_structure or in
                any order. If ordered, the filters would filter collinear rather
                than syntenic structures. Defaults to True.
            best_hmm_wins (bool, optional): if True, when the same peptide is hit
                by more than one HMM (e.g. paralogous models that cross-hit),
                keep only the highest-scoring HMM for that peptide. This prevents
                duplicate rows from breaking the rolling-window synteny matcher.
                Defaults to False.
        """
        self._unordered = unordered
        self._best_hmm_wins = best_hmm_wins
        self._hmm_hits = hmm_hits
        self._hmms = list(hmm_hits.keys())
        self._synteny_structure = synteny_structure
        self._contains_hmm_groups = syntenyparser.contains_HMM_groups(
            self._synteny_structure
        )
        self._parsed_structure = syntenyparser.parse_synteny_structure(
            self._synteny_structure
        )
        if self._unordered:
            self._parsed_structure["strands"] = [
                "" for _ in self._parsed_structure["strands"]
            ]
            logger.warning(
                "Disregarding strand location as unordered structured selected"
            )
        self._hmm_order_dict = dict(
            zip(
                self._parsed_structure["hmm_groups"],
                range(len(self._parsed_structure["hmm_groups"])),
            )
        )
        self._n_hmm_groups = len(self._parsed_structure["hmm_groups"])
        self._n_hmms = len(self._hmm_order_dict)
        self._unordered = unordered

    def _in_hmm_group(self, query_hmm_group: str, hmm_group: str) -> bool:
        """check if hmm name patters in query hmm group match hmm group"""
        query_split = query_hmm_group.split("|")
        return all([hmm_pattern in hmm_group for hmm_pattern in query_split])

    def _assign_code_to_HMM(self, hmm_group_name: str) -> int:
        hmm_group_name = str(hmm_group_name)
        code = [
            code
            for hmm_group, code in self._hmm_order_dict.items()
            if self._in_hmm_group(hmm_group_name, hmm_group)
            # if set(hmm_group_name.split("|")).issubset(set(hmm_group.split("|")))
        ]
        if len(code) > 1:
            logger.error(
                f"HMM: {hmm_group_name} found in more than one hmm group in synteny structure"
            )
            sys.exit(1)
        if not code:
            logger.error(f"No code found for HMM: {hmm_group_name}")
            sys.exit(1)
        return code[0]

    def _add_meta_codes_to_HMM_hits(self, all_hit_labels: pd.Dataframe) -> pd.Dataframe:
        """Add numeric codes for each hmm and strand, compute distance between genes"""
        all_hit_labels["gene_pos_diff"] = all_hit_labels.gene_pos.diff()
        all_hit_labels.loc[0, "gene_pos_diff"] = (
            1  # required for rolling (skips first nan)
        )
        all_hit_labels["hmm_code"] = all_hit_labels.hmm.apply(self._assign_code_to_HMM)
        all_hit_labels["strand"] = all_hit_labels.strand.apply(
            lambda strand: -1 if strand == "neg" else (1 if strand == "pos" else 0)
        )
        return all_hit_labels

    def _merge_hits_by_HMM_group(self, hits: pd.DataFrame):
        """Merge hit sequences within an HMM group representing
        the same gene symbol. Drop duplicated hits within
        each HMM group.
        """
        groups = hits.groupby(["full_label"]).groups
        for group_idxs in groups.values():
            hmm_names = set(hits.loc[group_idxs, "hmm"].values)
            candidate_hmm_group = [
                hmm_group_str
                for hmm_group_str in self._parsed_structure["hmm_groups"]
                if hmm_names.issubset(set(hmm_group_str.split("|")))
            ]
            if candidate_hmm_group:
                hmm_group = "|".join(hmm_names)
            else:
                hmm_group = "discard"
            hits.loc[group_idxs, "hmm"] = hmm_group
        hits = hits[hits.hmm != "discard"].drop_duplicates()
        hits.hmm = hits.hmm.apply(lambda x: str(x))
        return hits

    def get_all_HMM_hits(self) -> pd.DataFrame:
        """Group and preprocess all hit labels into a single dataframe.

        Returns:
            pd.DataFrame: HMMER3 hit labels matching provided HMMs.
        """
        hit_labels = {}
        for hmm, hits in self._hmm_hits.items():
            labels = hits.id.values.tolist()
            if not labels:
                logger.error(f"No records found in database matching HMM: {hmm}")
                sys.exit(1)
            parsed = labelparser.parse_from_list(labels)
            parsed["hmm"] = hmm
            # Carry the HMMER bitscore along (only when needed) so paralog
            # cross-hits on the same peptide can be disambiguated by score.
            # It is dropped again right after deduplication so the default code
            # path keeps exactly the same columns as before.
            if self._best_hmm_wins and "bitscore" in hits.columns:
                parsed["bitscore"] = hits.bitscore.values
            hit_labels[hmm] = parsed
        # Create single dataframe with new column corresponding to HMM and all hits
        # Remove contigs with less hits than the number of hmms in synteny structure (drop if enabling partial searching)
        all_hit_labels = (
            pd.concat(hit_labels.values())
            .groupby("contig")
            .filter(lambda x: len(x) >= self._n_hmm_groups)
            .sort_values(["contig", "gene_pos"], ascending=True)
        )
        all_hit_labels = all_hit_labels.reset_index(drop=True)
        if self._best_hmm_wins:
            # Keep only the highest-scoring HMM per peptide. Without this,
            # paralogous HMMs that cross-hit the same peptide create duplicate
            # rows that interleave with true syntenic neighbours and break the
            # rolling-window matcher (silent false negatives).
            if "bitscore" not in all_hit_labels.columns:
                all_hit_labels["bitscore"] = float("nan")
            all_hit_labels = (
                all_hit_labels.sort_values("bitscore", ascending=False)
                .drop_duplicates(subset=["full_label"], keep="first")
                .sort_values(["contig", "gene_pos"], ascending=True)
                .reset_index(drop=True)
                .drop(columns="bitscore")
            )
        if self._contains_hmm_groups:
            all_hit_labels = self._merge_hits_by_HMM_group(all_hit_labels)
        all_hit_labels = all_hit_labels.reset_index(drop=True)
        return self._add_meta_codes_to_HMM_hits(all_hit_labels)

    def filter_hits_by_synteny_structure(self) -> dict:
        """Search for contigs that satisfy the given gene synteny structure.

        Returns:
            dict: HMMER3 hits separated by contig.
        """
        all_matched_hits = {}
        filters = SyntenyPatternFilters(
            self._synteny_structure, unordered=self._unordered
        )
        all_hit_labels = self.get_all_HMM_hits()
        if all_hit_labels.full_label.duplicated().any():
            n_dup = int(all_hit_labels.full_label.duplicated().sum())
            dup_mask = all_hit_labels.full_label.duplicated(keep=False)
            affected_combinations = (
                all_hit_labels[dup_mask]
                .groupby("full_label")
                .hmm.apply(lambda s: tuple(sorted(set(s))))
                .value_counts()
                .head(5)
                .to_dict()
            )
            logger.warning(
                f"{n_dup} peptide(s) hit by >=2 HMMs (paralog cross-hits). "
                f"Top HMM combinations: {affected_combinations}. "
                "Syntenic patterns adjacent to these peptides may be silently "
                "dropped. Re-run with --best_hmm_wins to keep only each "
                "peptide's highest-scoring HMM."
            )
        if all_hit_labels.full_label.duplicated().sum() == all_hit_labels.shape[0] / 2:
            logger.error(
                (
                    "All input profile HMMs rendered identical sequence hits. "
                    "Inspect HMMER output tables to evaluate hits."
                )
            )
            sys.exit(1)
        contig_names = all_hit_labels.contig.unique()

        for contig in contig_names:
            contig_hits = all_hit_labels[all_hit_labels.contig == contig].reset_index(
                drop=True
            )
            matched_hit_labels = {
                hmm_group: [] for hmm_group in contig_hits.hmm.unique()
            }

            if contig_hits.hmm.nunique() >= self._n_hmms:
                hmm_match = contig_hits.hmm_code.rolling(
                    window=self._n_hmm_groups
                ).apply(filters.contains_hmm_pattern)
                strand_match = contig_hits.strand.rolling(
                    window=self._n_hmm_groups
                ).apply(filters.contains_strand_pattern)
                if self._n_hmm_groups > 1:
                    distance_match = contig_hits.gene_pos_diff.rolling(
                        window=self._n_hmm_groups
                    ).apply(filters.contains_distance_pattern)
                    matched_rows = contig_hits[
                        (hmm_match == 1) & (strand_match == 1) & (distance_match == 1)
                    ]
                else:
                    matched_rows = contig_hits[(hmm_match == 1) & (strand_match == 1)]
                for i in matched_rows.index:
                    matched_hits = contig_hits.iloc[
                        i - (self._n_hmm_groups - 1) : i + 1, :
                    ]
                    for label, hmm in zip(
                        (matched_hits.full_label.values), matched_hits.hmm
                    ):
                        matched_hit_labels[hmm].append(label)

                all_matched_hits[contig] = matched_hit_labels
        return all_matched_hits

__init__(hmm_hits, synteny_structure, unordered=True, best_hmm_wins=False)

Search for contigs that satisfy the given gene synteny structure.

Parameters:
  • hmm_hits (dict) –

    a dict of pandas DataFrames, as output by parseHMMsearchOutput with keys corresponding to hmm names

  • synteny_structure (str) –

    a str describing the desired synteny structure, structured as follows:

    '>hmm_a N_ab hmm_b bc <hmm_c'

    where N_ab corresponds to the maximum number of genes separating gene found by hmm_a and gene found by hmm_b, and hmm_ corresponds to the name of the hmm as provided in the keys of hmm_hits. More than two hmms can be concatenated. Strand location may be specificed by using '>' for sense and '<' for antisense.

  • unordered (bool, default: True ) –

    whether the HMMs should be arranged in the exact same order displayed in the synteny_structure or in any order. If ordered, the filters would filter collinear rather than syntenic structures. Defaults to True.

  • best_hmm_wins (bool, default: False ) –

    if True, when the same peptide is hit by more than one HMM (e.g. paralogous models that cross-hit), keep only the highest-scoring HMM for that peptide. This prevents duplicate rows from breaking the rolling-window synteny matcher. Defaults to False.

Source code in src/pynteny/filter.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def __init__(
    self,
    hmm_hits: dict,
    synteny_structure: str,
    unordered: bool = True,
    best_hmm_wins: bool = False,
) -> None:
    """Search for contigs that satisfy the given gene synteny structure.

    Args:
        hmm_hits (dict): a dict of pandas DataFrames, as output by
            parseHMMsearchOutput with keys corresponding to hmm names
        synteny_structure (str): a str describing the desired synteny structure,
            structured as follows:

            '>hmm_a N_ab hmm_b bc <hmm_c'

            where N_ab corresponds to the maximum number of genes separating
            gene found by hmm_a and gene found by hmm_b, and hmm_ corresponds
            to the name of the hmm as provided in the keys of hmm_hits.
            More than two hmms can be concatenated. Strand location may be
            specificed by using '>' for sense and '<' for antisense.
        unordered (bool, optional): whether the HMMs should be arranged in the
            exact same order displayed in the synteny_structure or in
            any order. If ordered, the filters would filter collinear rather
            than syntenic structures. Defaults to True.
        best_hmm_wins (bool, optional): if True, when the same peptide is hit
            by more than one HMM (e.g. paralogous models that cross-hit),
            keep only the highest-scoring HMM for that peptide. This prevents
            duplicate rows from breaking the rolling-window synteny matcher.
            Defaults to False.
    """
    self._unordered = unordered
    self._best_hmm_wins = best_hmm_wins
    self._hmm_hits = hmm_hits
    self._hmms = list(hmm_hits.keys())
    self._synteny_structure = synteny_structure
    self._contains_hmm_groups = syntenyparser.contains_HMM_groups(
        self._synteny_structure
    )
    self._parsed_structure = syntenyparser.parse_synteny_structure(
        self._synteny_structure
    )
    if self._unordered:
        self._parsed_structure["strands"] = [
            "" for _ in self._parsed_structure["strands"]
        ]
        logger.warning(
            "Disregarding strand location as unordered structured selected"
        )
    self._hmm_order_dict = dict(
        zip(
            self._parsed_structure["hmm_groups"],
            range(len(self._parsed_structure["hmm_groups"])),
        )
    )
    self._n_hmm_groups = len(self._parsed_structure["hmm_groups"])
    self._n_hmms = len(self._hmm_order_dict)
    self._unordered = unordered

filter_hits_by_synteny_structure()

Search for contigs that satisfy the given gene synteny structure.

Returns:
  • dict( dict ) –

    HMMER3 hits separated by contig.

Source code in src/pynteny/filter.py
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
def filter_hits_by_synteny_structure(self) -> dict:
    """Search for contigs that satisfy the given gene synteny structure.

    Returns:
        dict: HMMER3 hits separated by contig.
    """
    all_matched_hits = {}
    filters = SyntenyPatternFilters(
        self._synteny_structure, unordered=self._unordered
    )
    all_hit_labels = self.get_all_HMM_hits()
    if all_hit_labels.full_label.duplicated().any():
        n_dup = int(all_hit_labels.full_label.duplicated().sum())
        dup_mask = all_hit_labels.full_label.duplicated(keep=False)
        affected_combinations = (
            all_hit_labels[dup_mask]
            .groupby("full_label")
            .hmm.apply(lambda s: tuple(sorted(set(s))))
            .value_counts()
            .head(5)
            .to_dict()
        )
        logger.warning(
            f"{n_dup} peptide(s) hit by >=2 HMMs (paralog cross-hits). "
            f"Top HMM combinations: {affected_combinations}. "
            "Syntenic patterns adjacent to these peptides may be silently "
            "dropped. Re-run with --best_hmm_wins to keep only each "
            "peptide's highest-scoring HMM."
        )
    if all_hit_labels.full_label.duplicated().sum() == all_hit_labels.shape[0] / 2:
        logger.error(
            (
                "All input profile HMMs rendered identical sequence hits. "
                "Inspect HMMER output tables to evaluate hits."
            )
        )
        sys.exit(1)
    contig_names = all_hit_labels.contig.unique()

    for contig in contig_names:
        contig_hits = all_hit_labels[all_hit_labels.contig == contig].reset_index(
            drop=True
        )
        matched_hit_labels = {
            hmm_group: [] for hmm_group in contig_hits.hmm.unique()
        }

        if contig_hits.hmm.nunique() >= self._n_hmms:
            hmm_match = contig_hits.hmm_code.rolling(
                window=self._n_hmm_groups
            ).apply(filters.contains_hmm_pattern)
            strand_match = contig_hits.strand.rolling(
                window=self._n_hmm_groups
            ).apply(filters.contains_strand_pattern)
            if self._n_hmm_groups > 1:
                distance_match = contig_hits.gene_pos_diff.rolling(
                    window=self._n_hmm_groups
                ).apply(filters.contains_distance_pattern)
                matched_rows = contig_hits[
                    (hmm_match == 1) & (strand_match == 1) & (distance_match == 1)
                ]
            else:
                matched_rows = contig_hits[(hmm_match == 1) & (strand_match == 1)]
            for i in matched_rows.index:
                matched_hits = contig_hits.iloc[
                    i - (self._n_hmm_groups - 1) : i + 1, :
                ]
                for label, hmm in zip(
                    (matched_hits.full_label.values), matched_hits.hmm
                ):
                    matched_hit_labels[hmm].append(label)

            all_matched_hits[contig] = matched_hit_labels
    return all_matched_hits

get_all_HMM_hits()

Group and preprocess all hit labels into a single dataframe.

Returns:
  • DataFrame

    pd.DataFrame: HMMER3 hit labels matching provided HMMs.

Source code in src/pynteny/filter.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
def get_all_HMM_hits(self) -> pd.DataFrame:
    """Group and preprocess all hit labels into a single dataframe.

    Returns:
        pd.DataFrame: HMMER3 hit labels matching provided HMMs.
    """
    hit_labels = {}
    for hmm, hits in self._hmm_hits.items():
        labels = hits.id.values.tolist()
        if not labels:
            logger.error(f"No records found in database matching HMM: {hmm}")
            sys.exit(1)
        parsed = labelparser.parse_from_list(labels)
        parsed["hmm"] = hmm
        # Carry the HMMER bitscore along (only when needed) so paralog
        # cross-hits on the same peptide can be disambiguated by score.
        # It is dropped again right after deduplication so the default code
        # path keeps exactly the same columns as before.
        if self._best_hmm_wins and "bitscore" in hits.columns:
            parsed["bitscore"] = hits.bitscore.values
        hit_labels[hmm] = parsed
    # Create single dataframe with new column corresponding to HMM and all hits
    # Remove contigs with less hits than the number of hmms in synteny structure (drop if enabling partial searching)
    all_hit_labels = (
        pd.concat(hit_labels.values())
        .groupby("contig")
        .filter(lambda x: len(x) >= self._n_hmm_groups)
        .sort_values(["contig", "gene_pos"], ascending=True)
    )
    all_hit_labels = all_hit_labels.reset_index(drop=True)
    if self._best_hmm_wins:
        # Keep only the highest-scoring HMM per peptide. Without this,
        # paralogous HMMs that cross-hit the same peptide create duplicate
        # rows that interleave with true syntenic neighbours and break the
        # rolling-window matcher (silent false negatives).
        if "bitscore" not in all_hit_labels.columns:
            all_hit_labels["bitscore"] = float("nan")
        all_hit_labels = (
            all_hit_labels.sort_values("bitscore", ascending=False)
            .drop_duplicates(subset=["full_label"], keep="first")
            .sort_values(["contig", "gene_pos"], ascending=True)
            .reset_index(drop=True)
            .drop(columns="bitscore")
        )
    if self._contains_hmm_groups:
        all_hit_labels = self._merge_hits_by_HMM_group(all_hit_labels)
    all_hit_labels = all_hit_labels.reset_index(drop=True)
    return self._add_meta_codes_to_HMM_hits(all_hit_labels)

SyntenyHits

Store and manipulate synteny hits by contig

Source code in src/pynteny/filter.py
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
class SyntenyHits:
    """Store and manipulate synteny hits by contig"""

    def __init__(self, synteny_hits: pd.DataFrame) -> None:
        """
        Initialize from synteny hits object.
        """
        self._synteny_hits = synteny_hits.drop_duplicates()

    @staticmethod
    def _hits_to_dataframe(hits_by_contig: dict) -> pd.DataFrame:
        """Return synteny hits as a dataframe"""
        data = []
        columns = [
            "contig",
            "gene_id",
            "gene_number",
            "locus",
            "strand",
            "full_label",
            "hmm",
        ]
        for contig, matched_hits in hits_by_contig.items():
            for hmm, labels in matched_hits.items():
                for label in labels:
                    parsed_label = labelparser.parse(label)
                    data.append(
                        [
                            parsed_label["contig"],
                            parsed_label["gene_id"],
                            parsed_label["gene_pos"],
                            parsed_label["locus_pos"],
                            parsed_label["strand"],
                            parsed_label["full_label"],
                            hmm,
                        ]
                    )
        return pd.DataFrame(data, columns=columns).sort_values(
            ["contig", "gene_number"]
        )

    @classmethod
    def from_hits_dict(cls, hits_by_contig: dict) -> SyntenyHits:
        """Initialize SyntenyHits object from hits_by_contig dictionary.

        Args:
            hits_by_contig (dict): HMMER3 hit labels separated by contig name.

        Returns:
            SyntenyHits: initialized object of class SyntenyHits.
        """
        return cls(cls._hits_to_dataframe(hits_by_contig))

    @property
    def hits(self) -> pd.DataFrame:
        """Return synteny hits.

        Returns:
            pd.DataFrame: Synteny hits as dataframe.
        """
        return self._synteny_hits

    def add_HMM_meta_info_to_hits(self, hmm_meta: Path) -> SyntenyHits:
        """Add molecular metadata to synteny hits.

        Args:
            hmm_meta (Path): path to PGAP metadata file.

        Returns:
            SyntenyHits: and instance of class SyntenyHits.
        """
        hmm_meta = Path(hmm_meta)
        fields = ["gene_symbol", "label", "product", "ec_number"]
        if all([f in self._synteny_hits.columns for f in fields]):
            return self._synteny_hits
        hmm_db = HMMDatabase(hmm_meta)
        self._synteny_hits[fields] = ""
        for row in self._synteny_hits.itertuples():
            meta_values = [
                [
                    str(v).replace("nan", "")
                    for k, v in hmm_db.get_meta_info_for_HMM(hmm).items()
                    if k != "accession"
                ]
                for hmm in row.hmm.split("|")
            ]
            self._synteny_hits.loc[row.Index, fields] = [
                "|".join(v) for v in zip(*meta_values)
            ]
        return SyntenyHits(self._synteny_hits)

    def write_to_TSV(self, output_tsv: Path) -> None:
        """Write synteny hits to a TSV file.

        Args:
            output_tsv (Path): path to output tsv file.
        """
        output_tsv = Path(output_tsv)
        self._synteny_hits.to_csv(output_tsv, sep="\t", index=False)

    def write_hit_sequences_to_FASTA_files(
        self,
        sequence_database: Path,
        output_dir: Path,
        output_prefix: str = None,
    ) -> None:
        """Write matching sequences to FASTA files.

        Args:
            sequence_database (Path): path to the peptide or nucleotide sequence database
                in which the synteny search was conducted.
            output_dir (Path): path to output directory.
            output_prefix (str, optional): prefix for output files. Defaults to None.
        """
        sequence_database = Path(sequence_database)
        output_dir = Path(output_dir)
        fasta = FASTA(sequence_database)
        hmm_groups = self._synteny_hits.hmm.unique().tolist()

        for hmm_group in hmm_groups:
            record_ids = self._synteny_hits[
                self._synteny_hits.hmm == hmm_group
            ].full_label.values.tolist()

            if (
                "gene_symbol" in self._synteny_hits.columns
                and "label" in self._synteny_hits.columns
            ):
                gene_symbol = self._synteny_hits[
                    self._synteny_hits.hmm == hmm_group
                ].gene_symbol.unique()[0]
                gene_label = self._synteny_hits[
                    self._synteny_hits.hmm == hmm_group
                ].label.unique()[0]
                gene_id = (
                    gene_symbol
                    if not pd.isna(gene_symbol)
                    else (gene_label if not pd.isna(gene_label) else "")
                )
            else:
                gene_id = ""
            gene_id_str = gene_id + "_" if gene_id else ""

            output_fasta = (
                output_dir
                / f"{output_prefix}{gene_id_str.replace('|', '_')}{hmm_group.replace('|', '_')}_hits.fasta"
            )
            if record_ids:
                fasta.filter_by_IDs(
                    record_ids=record_ids,
                    output_file=output_fasta,
                    point_to_new_file=False,
                )
            else:
                logger.warning(
                    f"No record matches found in synteny structure for HMM: {hmm_group}"
                )

hits property

Return synteny hits.

Returns:
  • DataFrame

    pd.DataFrame: Synteny hits as dataframe.

__init__(synteny_hits)

Initialize from synteny hits object.

Source code in src/pynteny/filter.py
392
393
394
395
396
def __init__(self, synteny_hits: pd.DataFrame) -> None:
    """
    Initialize from synteny hits object.
    """
    self._synteny_hits = synteny_hits.drop_duplicates()

add_HMM_meta_info_to_hits(hmm_meta)

Add molecular metadata to synteny hits.

Parameters:
  • hmm_meta (Path) –

    path to PGAP metadata file.

Returns:
  • SyntenyHits( SyntenyHits ) –

    and instance of class SyntenyHits.

Source code in src/pynteny/filter.py
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
def add_HMM_meta_info_to_hits(self, hmm_meta: Path) -> SyntenyHits:
    """Add molecular metadata to synteny hits.

    Args:
        hmm_meta (Path): path to PGAP metadata file.

    Returns:
        SyntenyHits: and instance of class SyntenyHits.
    """
    hmm_meta = Path(hmm_meta)
    fields = ["gene_symbol", "label", "product", "ec_number"]
    if all([f in self._synteny_hits.columns for f in fields]):
        return self._synteny_hits
    hmm_db = HMMDatabase(hmm_meta)
    self._synteny_hits[fields] = ""
    for row in self._synteny_hits.itertuples():
        meta_values = [
            [
                str(v).replace("nan", "")
                for k, v in hmm_db.get_meta_info_for_HMM(hmm).items()
                if k != "accession"
            ]
            for hmm in row.hmm.split("|")
        ]
        self._synteny_hits.loc[row.Index, fields] = [
            "|".join(v) for v in zip(*meta_values)
        ]
    return SyntenyHits(self._synteny_hits)

from_hits_dict(hits_by_contig) classmethod

Initialize SyntenyHits object from hits_by_contig dictionary.

Parameters:
  • hits_by_contig (dict) –

    HMMER3 hit labels separated by contig name.

Returns:
  • SyntenyHits( SyntenyHits ) –

    initialized object of class SyntenyHits.

Source code in src/pynteny/filter.py
430
431
432
433
434
435
436
437
438
439
440
@classmethod
def from_hits_dict(cls, hits_by_contig: dict) -> SyntenyHits:
    """Initialize SyntenyHits object from hits_by_contig dictionary.

    Args:
        hits_by_contig (dict): HMMER3 hit labels separated by contig name.

    Returns:
        SyntenyHits: initialized object of class SyntenyHits.
    """
    return cls(cls._hits_to_dataframe(hits_by_contig))

write_hit_sequences_to_FASTA_files(sequence_database, output_dir, output_prefix=None)

Write matching sequences to FASTA files.

Parameters:
  • sequence_database (Path) –

    path to the peptide or nucleotide sequence database in which the synteny search was conducted.

  • output_dir (Path) –

    path to output directory.

  • output_prefix (str, default: None ) –

    prefix for output files. Defaults to None.

Source code in src/pynteny/filter.py
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
def write_hit_sequences_to_FASTA_files(
    self,
    sequence_database: Path,
    output_dir: Path,
    output_prefix: str = None,
) -> None:
    """Write matching sequences to FASTA files.

    Args:
        sequence_database (Path): path to the peptide or nucleotide sequence database
            in which the synteny search was conducted.
        output_dir (Path): path to output directory.
        output_prefix (str, optional): prefix for output files. Defaults to None.
    """
    sequence_database = Path(sequence_database)
    output_dir = Path(output_dir)
    fasta = FASTA(sequence_database)
    hmm_groups = self._synteny_hits.hmm.unique().tolist()

    for hmm_group in hmm_groups:
        record_ids = self._synteny_hits[
            self._synteny_hits.hmm == hmm_group
        ].full_label.values.tolist()

        if (
            "gene_symbol" in self._synteny_hits.columns
            and "label" in self._synteny_hits.columns
        ):
            gene_symbol = self._synteny_hits[
                self._synteny_hits.hmm == hmm_group
            ].gene_symbol.unique()[0]
            gene_label = self._synteny_hits[
                self._synteny_hits.hmm == hmm_group
            ].label.unique()[0]
            gene_id = (
                gene_symbol
                if not pd.isna(gene_symbol)
                else (gene_label if not pd.isna(gene_label) else "")
            )
        else:
            gene_id = ""
        gene_id_str = gene_id + "_" if gene_id else ""

        output_fasta = (
            output_dir
            / f"{output_prefix}{gene_id_str.replace('|', '_')}{hmm_group.replace('|', '_')}_hits.fasta"
        )
        if record_ids:
            fasta.filter_by_IDs(
                record_ids=record_ids,
                output_file=output_fasta,
                point_to_new_file=False,
            )
        else:
            logger.warning(
                f"No record matches found in synteny structure for HMM: {hmm_group}"
            )

write_to_TSV(output_tsv)

Write synteny hits to a TSV file.

Parameters:
  • output_tsv (Path) –

    path to output tsv file.

Source code in src/pynteny/filter.py
480
481
482
483
484
485
486
487
def write_to_TSV(self, output_tsv: Path) -> None:
    """Write synteny hits to a TSV file.

    Args:
        output_tsv (Path): path to output tsv file.
    """
    output_tsv = Path(output_tsv)
    self._synteny_hits.to_csv(output_tsv, sep="\t", index=False)

SyntenyPatternFilters

Methods to filter hmm hits in the same contig by synteny structure or collinearity. These filters are inputs to pandas.Dataframe.rolling method.

Source code in src/pynteny/filter.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
class SyntenyPatternFilters:
    """Methods to filter hmm hits in the  same contig by synteny structure
    or collinearity. These filters are inputs to pandas.Dataframe.rolling method.
    """

    def __init__(self, synteny_structure: str, unordered: bool = False) -> None:
        """Initialize filter class from synteny structure.

        Args:
            synteny_structure (str): a str describing the desired synteny structure,
                structured as follows:

                '>hmm_a N_ab hmm_b bc <hmm_c'

                where N_ab corresponds to the maximum number of genes separating
                gene found by hmm_a and gene found by hmm_b, and hmm_ corresponds
                to the name of the hmm as provided in the keys of hmm_hits.
                More than two hmms can be concatenated. Strand location may be
                specified by using '>' for sense and '<' for antisense.
            unordered (bool, optional): whether the HMMs should be arranged in the
                exact same order displayed in the synteny_structure or in
                any order. If ordered, the filters would filter collinear rather
                than syntenic structures. Defaults to False.
        """
        parsed_structure = syntenyparser.parse_synteny_structure(synteny_structure)
        if unordered:
            parsed_structure["strands"] = ["" for _ in parsed_structure["strands"]]
        self._hmm_order_dict = dict(
            zip(
                parsed_structure["hmm_groups"],
                range(len(parsed_structure["hmm_groups"])),
            )
        )
        # parsed_structure = syntenyparser.parse_synteny_structure(synteny_structure)
        # hmm_codes = list(range(len(parsed_structure["hmm_groups"])))
        self.hmm_code_order_pattern = [
            self._hmm_order_dict[hmm_group]
            for hmm_group in parsed_structure["hmm_groups"]
        ]
        if unordered:
            self.hmm_code_order_pattern = sorted(self.hmm_code_order_pattern)

        if unordered:
            max_distance = max(parsed_structure["distances"])
            max_distances = [float("inf")] + [
                max_distance for _ in parsed_structure["distances"]
            ]
        else:
            max_distances = [float("inf")] + parsed_structure["distances"]
        min_distances = [-float("inf")] + [0 for _ in parsed_structure["distances"]]
        self.distance_order_pattern = [
            (min_dist, max_dist + 1)
            for min_dist, max_dist in zip(min_distances, max_distances)
        ]

        self.strand_order_pattern = list(
            map(
                lambda strand: -1 if strand == "neg" else (1 if strand == "pos" else 0),
                parsed_structure["strands"],
            )
        )
        self._unordered = unordered

    def contains_hmm_pattern(self, data: pd.Series) -> int:
        """Check if series items contain a profile HMM

        Args:
            data (pd.Series): a series resulting from calling rolling on a pandas column.

        Returns:
            int: 1 for True 0 for False.
        """
        hmm_codes = [int(code) for code in data.values]
        if self._unordered:
            hmm_codes = sorted(hmm_codes)
        return 1 if hmm_codes == self.hmm_code_order_pattern else 0

    def contains_distance_pattern(self, data: pd.Series) -> int:
        """Check if series items satisfy the maximum distance
           between HMM hits.

        Args:
            data (pd.Series): a series resulting from calling rolling on a pandas column.

        Returns:
            int: 1 for True 0 for False.
        """
        return (
            1
            if all(
                [
                    (data_dist <= max_dist) and (data_dist > min_dist)
                    for data_dist, (min_dist, max_dist) in zip(
                        data.values.tolist(), self.distance_order_pattern
                    )
                ]
            )
            else 0
        )

    def contains_strand_pattern(self, data: pd.Series) -> int:
        """Check if series items satisfy the strand pattern
           between HMM hits.

        Args:
            data (pd.Series): a series resulting from calling rolling on a pandas column.

        Returns:
            int: 1 for True 0 for False.
        """
        strand_comparisons = []
        for data_strand, pattern_strand in zip(data.values, self.strand_order_pattern):
            if pattern_strand != 0:
                strand_comparisons.append(data_strand == pattern_strand)
            else:
                strand_comparisons.append(True)
        return 1 if all(strand_comparisons) else 0

__init__(synteny_structure, unordered=False)

Initialize filter class from synteny structure.

Parameters:
  • synteny_structure (str) –

    a str describing the desired synteny structure, structured as follows:

    '>hmm_a N_ab hmm_b bc <hmm_c'

    where N_ab corresponds to the maximum number of genes separating gene found by hmm_a and gene found by hmm_b, and hmm_ corresponds to the name of the hmm as provided in the keys of hmm_hits. More than two hmms can be concatenated. Strand location may be specified by using '>' for sense and '<' for antisense.

  • unordered (bool, default: False ) –

    whether the HMMs should be arranged in the exact same order displayed in the synteny_structure or in any order. If ordered, the filters would filter collinear rather than syntenic structures. Defaults to False.

Source code in src/pynteny/filter.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def __init__(self, synteny_structure: str, unordered: bool = False) -> None:
    """Initialize filter class from synteny structure.

    Args:
        synteny_structure (str): a str describing the desired synteny structure,
            structured as follows:

            '>hmm_a N_ab hmm_b bc <hmm_c'

            where N_ab corresponds to the maximum number of genes separating
            gene found by hmm_a and gene found by hmm_b, and hmm_ corresponds
            to the name of the hmm as provided in the keys of hmm_hits.
            More than two hmms can be concatenated. Strand location may be
            specified by using '>' for sense and '<' for antisense.
        unordered (bool, optional): whether the HMMs should be arranged in the
            exact same order displayed in the synteny_structure or in
            any order. If ordered, the filters would filter collinear rather
            than syntenic structures. Defaults to False.
    """
    parsed_structure = syntenyparser.parse_synteny_structure(synteny_structure)
    if unordered:
        parsed_structure["strands"] = ["" for _ in parsed_structure["strands"]]
    self._hmm_order_dict = dict(
        zip(
            parsed_structure["hmm_groups"],
            range(len(parsed_structure["hmm_groups"])),
        )
    )
    # parsed_structure = syntenyparser.parse_synteny_structure(synteny_structure)
    # hmm_codes = list(range(len(parsed_structure["hmm_groups"])))
    self.hmm_code_order_pattern = [
        self._hmm_order_dict[hmm_group]
        for hmm_group in parsed_structure["hmm_groups"]
    ]
    if unordered:
        self.hmm_code_order_pattern = sorted(self.hmm_code_order_pattern)

    if unordered:
        max_distance = max(parsed_structure["distances"])
        max_distances = [float("inf")] + [
            max_distance for _ in parsed_structure["distances"]
        ]
    else:
        max_distances = [float("inf")] + parsed_structure["distances"]
    min_distances = [-float("inf")] + [0 for _ in parsed_structure["distances"]]
    self.distance_order_pattern = [
        (min_dist, max_dist + 1)
        for min_dist, max_dist in zip(min_distances, max_distances)
    ]

    self.strand_order_pattern = list(
        map(
            lambda strand: -1 if strand == "neg" else (1 if strand == "pos" else 0),
            parsed_structure["strands"],
        )
    )
    self._unordered = unordered

contains_distance_pattern(data)

Check if series items satisfy the maximum distance between HMM hits.

Parameters:
  • data (Series) –

    a series resulting from calling rolling on a pandas column.

Returns:
  • int( int ) –

    1 for True 0 for False.

Source code in src/pynteny/filter.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def contains_distance_pattern(self, data: pd.Series) -> int:
    """Check if series items satisfy the maximum distance
       between HMM hits.

    Args:
        data (pd.Series): a series resulting from calling rolling on a pandas column.

    Returns:
        int: 1 for True 0 for False.
    """
    return (
        1
        if all(
            [
                (data_dist <= max_dist) and (data_dist > min_dist)
                for data_dist, (min_dist, max_dist) in zip(
                    data.values.tolist(), self.distance_order_pattern
                )
            ]
        )
        else 0
    )

contains_hmm_pattern(data)

Check if series items contain a profile HMM

Parameters:
  • data (Series) –

    a series resulting from calling rolling on a pandas column.

Returns:
  • int( int ) –

    1 for True 0 for False.

Source code in src/pynteny/filter.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def contains_hmm_pattern(self, data: pd.Series) -> int:
    """Check if series items contain a profile HMM

    Args:
        data (pd.Series): a series resulting from calling rolling on a pandas column.

    Returns:
        int: 1 for True 0 for False.
    """
    hmm_codes = [int(code) for code in data.values]
    if self._unordered:
        hmm_codes = sorted(hmm_codes)
    return 1 if hmm_codes == self.hmm_code_order_pattern else 0

contains_strand_pattern(data)

Check if series items satisfy the strand pattern between HMM hits.

Parameters:
  • data (Series) –

    a series resulting from calling rolling on a pandas column.

Returns:
  • int( int ) –

    1 for True 0 for False.

Source code in src/pynteny/filter.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def contains_strand_pattern(self, data: pd.Series) -> int:
    """Check if series items satisfy the strand pattern
       between HMM hits.

    Args:
        data (pd.Series): a series resulting from calling rolling on a pandas column.

    Returns:
        int: 1 for True 0 for False.
    """
    strand_comparisons = []
    for data_strand, pattern_strand in zip(data.values, self.strand_order_pattern):
        if pattern_strand != 0:
            strand_comparisons.append(data_strand == pattern_strand)
        else:
            strand_comparisons.append(True)
    return 1 if all(strand_comparisons) else 0

filter_FASTA_by_synteny_structure(synteny_structure, input_fasta, input_hmms, unordered=False, best_hmm_wins=False, hmm_meta=None, hmmer_output_dir=None, reuse_hmmer_results=True, method='hmmsearch', processes=None, additional_args=None)

Generate protein-specific database by filtering sequence database to only contain sequences which satisfy the provided (gene/hmm) structure.

Parameters:
  • synteny_structure (str) –

    a str describing the desired synteny structure, structured as follows:

    '>hmm_a N_ab hmm_b bc <hmm_c'

    where N_ab corresponds to the maximum number of genes separating gene found by hmm_a and gene found by hmm_b, and hmm_ corresponds to the name of the hmm as provided in the keys of hmm_hits. More than two hmms can be concatenated. Strand location may be specificed by using '>' for sense and '<' for antisense.

  • input_fasta (Path) –

    input fasta containing sequence database to be searched.

  • input_hmms (list[Path]) –

    list containing paths to hmms contained in synteny structure.

  • unordered (bool, default: False ) –

    whether HMM hits should follow the exact order displayed in the synteny structure string or not, i.e., whether to search for only synteny (colocation) or collinearity as well (same order). Defaults to False.

  • best_hmm_wins (bool, default: False ) –

    if True, when the same peptide is hit by more than one HMM (paralog cross-hits), keep only the highest-scoring HMM for that peptide before matching the synteny structure. Defaults to False.

  • hmm_meta (Path, default: None ) –

    path to PGAP's metadata file. Defaults to None.

  • hmmer_output_dir (Path, default: None ) –

    output directory to store HMMER3 output files. Defaults to None.

  • reuse_hmmer_results (bool, default: True ) –

    if True then HMMER3 won't be run again for HMMs already searched in the same output directory. Defaults to True.

  • method (str, default: 'hmmsearch' ) –

    select between 'hmmsearch' or 'hmmscan'. Defaults to 'hmmsearch'.

  • processes (int, default: None ) –

    maximum number of threads to be employed. Defaults to all minus one.

  • additional_args (list[str], default: None ) –

    additional arguments to hmmsearch or hmmscan. Each element in the list is a string with additional arguments for each input hmm (arranged in the same order), an element can also take a value of None to avoid passing additional arguments for a specific input hmm. A single string may also be passed, in which case the same additional argument is passed to hmmsearch for all input hmms. Defaults to None.

Returns:
  • SyntenyHits( SyntenyHits ) –

    object of class SyntenyHits containing labels matching synteny structure.

Source code in src/pynteny/filter.py
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
def filter_FASTA_by_synteny_structure(
    synteny_structure: str,
    input_fasta: Path,
    input_hmms: list[Path],
    unordered: bool = False,
    best_hmm_wins: bool = False,
    hmm_meta: Path = None,
    hmmer_output_dir: Path = None,
    reuse_hmmer_results: bool = True,
    method: str = "hmmsearch",
    processes: int = None,
    additional_args: list[str] = None,
) -> SyntenyHits:
    """Generate protein-specific database by filtering sequence database
       to only contain sequences which satisfy the provided (gene/hmm) structure.

    Args:
        synteny_structure (str): a str describing the desired synteny structure,
            structured as follows:

            '>hmm_a N_ab hmm_b bc <hmm_c'

            where N_ab corresponds to the maximum number of genes separating
            gene found by hmm_a and gene found by hmm_b, and hmm_ corresponds
            to the name of the hmm as provided in the keys of hmm_hits.
            More than two hmms can be concatenated. Strand location may be
            specificed by using '>' for sense and '<' for antisense.
        input_fasta (Path): input fasta containing sequence database to be searched.
        input_hmms (list[Path]): list containing paths to hmms contained in synteny structure.
        unordered (bool, optional): whether HMM hits should follow the exact order
            displayed in the synteny structure string or not, i.e., whether
            to search for only synteny (colocation) or collinearity as well
            (same order). Defaults to False.
        best_hmm_wins (bool, optional): if True, when the same peptide is hit by
            more than one HMM (paralog cross-hits), keep only the highest-scoring
            HMM for that peptide before matching the synteny structure. Defaults
            to False.
        hmm_meta (Path, optional): path to PGAP's metadata file. Defaults to None.
        hmmer_output_dir (Path, optional): output directory to store HMMER3 output files. Defaults to None.
        reuse_hmmer_results (bool, optional): if True then HMMER3 won't be run again for HMMs already
            searched in the same output directory. Defaults to True.
        method (str, optional): select between 'hmmsearch' or 'hmmscan'. Defaults to 'hmmsearch'.
        processes (int, optional): maximum number of threads to be employed. Defaults to all minus one.
        additional_args (list[str], optional): additional arguments to hmmsearch or hmmscan. Each
            element in the list is a string with additional arguments for each
            input hmm (arranged in the same order), an element can also take a
            value of None to avoid passing additional arguments for a specific
            input hmm. A single string may also be passed, in which case the
            same additional argument is passed to hmmsearch for all input hmms. Defaults to None.

    Returns:
        SyntenyHits: object of class SyntenyHits containing labels matching synteny structure.
    """
    input_fasta = Path(input_fasta)
    if hmmer_output_dir is None:
        hmmer_output_dir = Path(input_fasta.parent) / "hmmer_outputs"
    else:
        hmmer_output_dir = Path(hmmer_output_dir)

    if additional_args is None:
        additional_args = [None for _ in input_hmms]

    if isinstance(additional_args, str):
        logger.warning(f"Repeating hmmsearch arg: '{additional_args}' for all HMMs")
        additional_args = [additional_args for _ in input_hmms]

    elif isinstance(additional_args, list):
        if len(additional_args) == 1:
            logger.warning(
                f"Repeating hmmsearch arg: '{additional_args[0]}' for all HMMs"
            )
            additional_args = [additional_args[0] for _ in input_hmms]

        if (len(additional_args) > 1) and (len(additional_args) < len(input_hmms)):
            logger.error(
                "Provided additional argument strings are less than the number of input hmms."
            )
    else:
        logger.error(
            "Additional arguments must be: 1) a list[str], 2) a str, or 3) None"
        )
        sys.exit(1)
    if not os.path.isdir(hmmer_output_dir):
        os.mkdir(hmmer_output_dir)

    logger.info("Running Hmmer")
    hmmer = HMMER(
        input_hmms=input_hmms,
        input_data=input_fasta,
        hmm_output_dir=hmmer_output_dir,
        processes=processes,
        additional_args=additional_args,
    )
    hmm_hits = hmmer.get_HMMER_tables(
        reuse_hmmer_results=reuse_hmmer_results, method=method
    )
    hmms_without_hits = []
    for hmm_name, hmmer_hits in hmm_hits.items():
        if hmmer_hits.empty:
            hmms_without_hits.append(hmm_name)
    if hmms_without_hits:
        for hmm_name in hmms_without_hits:
            logger.error(f"No records found in database matching HMM: {hmm_name}")
        sys.exit(1)

    logger.info("Filtering results by synteny structure")
    syntenyfilter = SyntenyHMMfilter(
        hmm_hits,
        synteny_structure,
        unordered=unordered,
        best_hmm_wins=best_hmm_wins,
    )
    hits_by_contig = syntenyfilter.filter_hits_by_synteny_structure()
    if hmm_meta is not None:
        return SyntenyHits.from_hits_dict(hits_by_contig).add_HMM_meta_info_to_hits(
            hmm_meta
        )
    else:
        return SyntenyHits.from_hits_dict(hits_by_contig)

Module preprocessing

Tools to preprocess sequence databases

  1. Remove illegal characters from peptide sequences
  2. Remove illegal symbols from file paths
  3. Relabel fasta records and make dictionary with old labels

Database

_Sequence database constructor

Source code in src/pynteny/preprocessing.py
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
class Database:
    """_Sequence database constructor"""

    def __init__(self, data: Path) -> None:
        """Initialize Database object

        Args:
            data (Path): path to either assembly fasta file (or a
                         directory containing assembly fasta files) or
                         a genbank file containing ORF annotations (or a
                         directory containing genbank files)

        Raises:
            FileNotFoundError: if file or directory doesn't exist
        """
        self._data = Path(data)
        if not self._data.exists():
            raise FileNotFoundError(f"{self._data} does not exist")
        if self._data.is_dir():
            self._data_files = [f for f in self._data.iterdir()]
        else:
            self._data_files = [self._data]

    @staticmethod
    def is_fasta(filename: Path) -> bool:
        """Check if file is in fasta format

        Args:
            filename (Path): path to input file

        Returns:
            bool: whether the file is in fasta format
        """
        filename = Path(filename)
        if not filename.exists():
            return False
        # Recent Biopython versions raise (instead of returning no records) when
        # the file is not valid FASTA (e.g. a GenBank file with leading
        # comments), so treat any parse error as "not a FASTA file".
        try:
            return any(SeqIO.parse(str(filename), "fasta"))
        except ValueError:
            return False

    @staticmethod
    def is_gbk(filename: Path) -> bool:
        """Check if file is in genbank format

        Args:
            filename (Path): path to input file

        Returns:
            _bool: whether the file is in genbank format
        """
        filename = Path(filename)
        if not filename.exists():
            return False
        try:
            return any(SeqIO.parse(str(filename), "genbank"))
        except ValueError:
            return False

    def build(
        self,
        seq_prefix: str = None,
        prepend_file_name: bool = False,
        output_file: Path = None,
        processes: int = None,
        tempdir: Path = None,
    ) -> LabelledFASTA:
        """Build database from data files.

        Args:
            seq_prefix (str, optionall): prefix to be added to each sequence in database.
                Defaults to "".
            prepend_file_name (bool, optional): whether to add file name as genome ID to
                each record in the result merged fasta file.
            output_file (Path, optional): path to output file. Defaults to None.
            processes (int, optional): maximum number of threads. Defaults to all minus one.
            tmpdir (Path, optional): path to temporary directory. Defaults to tempfile default.

        Returns:
            LabelledFASTA: object containing the labelled peptide database.
        """
        if output_file is None:
            output_file = self._data.parent / f"{self._data.stem}_labelled.faa"
        else:
            output_file = Path(output_file)
        if self.is_fasta(self._data_files[0]):
            with tempfile.NamedTemporaryFile() as tmpmerge:
                if self._data.is_dir():
                    assembly_fasta = FASTA.from_FASTA_directory(
                        self._data,
                        merged_fasta=Path(tmpmerge.name),
                        prepend_file_name=prepend_file_name,
                    )
                else:
                    assembly_fasta = FASTA(self._data)
                logger.info("Translating and annotating assembly data.")
                labelled_database = GeneAnnotator(assembly_fasta).annotate(
                    output_file=output_file,
                    processes=processes,
                    tempdir=tempdir,
                )
        elif self.is_gbk(self._data_files[0]):
            logger.info("Parsing GenBank data.")
            labelled_database = LabelledFASTA.from_genbank(
                self._data, output_file=output_file, prepend_file_name=prepend_file_name
            )
        else:
            logging.error(f"{self._data} is not a valid FASTA or genbank file")
            sys.exit(1)
        if seq_prefix is not None:
            labelled_database.add_prefix_to_records(seq_prefix, output_file)
            labelled_database = LabelledFASTA(labelled_database.file_path)
        return labelled_database

__init__(data)

Initialize Database object

Parameters:
  • data (Path) –

    path to either assembly fasta file (or a directory containing assembly fasta files) or a genbank file containing ORF annotations (or a directory containing genbank files)

Raises:
  • FileNotFoundError

    if file or directory doesn't exist

Source code in src/pynteny/preprocessing.py
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
def __init__(self, data: Path) -> None:
    """Initialize Database object

    Args:
        data (Path): path to either assembly fasta file (or a
                     directory containing assembly fasta files) or
                     a genbank file containing ORF annotations (or a
                     directory containing genbank files)

    Raises:
        FileNotFoundError: if file or directory doesn't exist
    """
    self._data = Path(data)
    if not self._data.exists():
        raise FileNotFoundError(f"{self._data} does not exist")
    if self._data.is_dir():
        self._data_files = [f for f in self._data.iterdir()]
    else:
        self._data_files = [self._data]

build(seq_prefix=None, prepend_file_name=False, output_file=None, processes=None, tempdir=None)

Build database from data files.

Parameters:
  • seq_prefix ((str, optionall), default: None ) –

    prefix to be added to each sequence in database. Defaults to "".

  • prepend_file_name (bool, default: False ) –

    whether to add file name as genome ID to each record in the result merged fasta file.

  • output_file (Path, default: None ) –

    path to output file. Defaults to None.

  • processes (int, default: None ) –

    maximum number of threads. Defaults to all minus one.

  • tmpdir (Path) –

    path to temporary directory. Defaults to tempfile default.

Returns:
  • LabelledFASTA( LabelledFASTA ) –

    object containing the labelled peptide database.

Source code in src/pynteny/preprocessing.py
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
def build(
    self,
    seq_prefix: str = None,
    prepend_file_name: bool = False,
    output_file: Path = None,
    processes: int = None,
    tempdir: Path = None,
) -> LabelledFASTA:
    """Build database from data files.

    Args:
        seq_prefix (str, optionall): prefix to be added to each sequence in database.
            Defaults to "".
        prepend_file_name (bool, optional): whether to add file name as genome ID to
            each record in the result merged fasta file.
        output_file (Path, optional): path to output file. Defaults to None.
        processes (int, optional): maximum number of threads. Defaults to all minus one.
        tmpdir (Path, optional): path to temporary directory. Defaults to tempfile default.

    Returns:
        LabelledFASTA: object containing the labelled peptide database.
    """
    if output_file is None:
        output_file = self._data.parent / f"{self._data.stem}_labelled.faa"
    else:
        output_file = Path(output_file)
    if self.is_fasta(self._data_files[0]):
        with tempfile.NamedTemporaryFile() as tmpmerge:
            if self._data.is_dir():
                assembly_fasta = FASTA.from_FASTA_directory(
                    self._data,
                    merged_fasta=Path(tmpmerge.name),
                    prepend_file_name=prepend_file_name,
                )
            else:
                assembly_fasta = FASTA(self._data)
            logger.info("Translating and annotating assembly data.")
            labelled_database = GeneAnnotator(assembly_fasta).annotate(
                output_file=output_file,
                processes=processes,
                tempdir=tempdir,
            )
    elif self.is_gbk(self._data_files[0]):
        logger.info("Parsing GenBank data.")
        labelled_database = LabelledFASTA.from_genbank(
            self._data, output_file=output_file, prepend_file_name=prepend_file_name
        )
    else:
        logging.error(f"{self._data} is not a valid FASTA or genbank file")
        sys.exit(1)
    if seq_prefix is not None:
        labelled_database.add_prefix_to_records(seq_prefix, output_file)
        labelled_database = LabelledFASTA(labelled_database.file_path)
    return labelled_database

is_fasta(filename) staticmethod

Check if file is in fasta format

Parameters:
  • filename (Path) –

    path to input file

Returns:
  • bool( bool ) –

    whether the file is in fasta format

Source code in src/pynteny/preprocessing.py
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
@staticmethod
def is_fasta(filename: Path) -> bool:
    """Check if file is in fasta format

    Args:
        filename (Path): path to input file

    Returns:
        bool: whether the file is in fasta format
    """
    filename = Path(filename)
    if not filename.exists():
        return False
    # Recent Biopython versions raise (instead of returning no records) when
    # the file is not valid FASTA (e.g. a GenBank file with leading
    # comments), so treat any parse error as "not a FASTA file".
    try:
        return any(SeqIO.parse(str(filename), "fasta"))
    except ValueError:
        return False

is_gbk(filename) staticmethod

Check if file is in genbank format

Parameters:
  • filename (Path) –

    path to input file

Returns:
  • _bool( bool ) –

    whether the file is in genbank format

Source code in src/pynteny/preprocessing.py
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
@staticmethod
def is_gbk(filename: Path) -> bool:
    """Check if file is in genbank format

    Args:
        filename (Path): path to input file

    Returns:
        _bool: whether the file is in genbank format
    """
    filename = Path(filename)
    if not filename.exists():
        return False
    try:
        return any(SeqIO.parse(str(filename), "genbank"))
    except ValueError:
        return False

FASTA

Handle and process fasta files.

Source code in src/pynteny/preprocessing.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
class FASTA:
    """Handle and process fasta files."""

    def __init__(self, input_file: Path) -> None:
        """Initialize FASTA object.

        Args:
            input_file (Path): path to input fasta file.
        """
        self._input_file = Path(input_file)

    @property
    def file_path(self) -> Path:
        """Set new path to fasta file

        Args:
            new_path (Path, optional): path to fasta file. Defaults to None.

        Returns:
            Path: path to fasta file.
        """
        return self._input_file

    @file_path.setter
    def file_path(self, new_path: Path) -> None:
        self._input_file = Path(new_path)

    @classmethod
    def from_FASTA_directory(
        cls, input_dir: Path, merged_fasta: Path = None, prepend_file_name: bool = False
    ) -> FASTA:
        """Initialize FASTA class from directory of fasta files.

        Args:
            input_dir (Path): path to input directory.
            merged_fasta (Path, optional): path to output merged fasta. Defaults to None.
            prepend_file_name (bool, optional): whether to add file name as genome ID to
                each record in the result merged fasta file.

        Returns:
            FASTA: an initialized instance of class FASTA.
        """
        input_dir = Path(input_dir)
        if merged_fasta is None:
            merged_fasta = input_dir / "merged_database.fasta"
        else:
            merged_fasta = Path(merged_fasta)
        FASTAmerger(input_dir).merge(merged_fasta, prepend_file_name)
        return cls(merged_fasta)

    def remove_duplicates(
        self,
        output_file: Path = None,
        export_duplicates: bool = False,
        point_to_new_file: bool = True,
    ) -> None:
        """Removes duplicate entries (either by sequence or ID) from fasta.

        Args:
            output_file (Path, optional): path to output fasta file. Defaults to None.
            export_duplicates (bool, optional): whether duplicated records are exported to a file. Defaults to False.
            point_to_new_file (bool, optional): whether FASTA object should point to the newly generated file. Defaults to True.

        Yields:
            None: None
        """
        if output_file is None:
            output_file = (
                Path(self._input_file.parent)
                / f"{self._input_file.stem}_noduplicates{self._input_file.suffix}"
            )
        else:
            output_file = Path(output_file)
        wrappers.run_seqkit_nodup(
            input_fasta=self._input_file,
            output_fasta=output_file,
            export_duplicates=export_duplicates,
        )
        if point_to_new_file:
            self.file_path = output_file

    def remove_corrupted_sequences(
        self,
        output_file: Path = None,
        is_peptide: bool = True,
        keep_stop_codon: bool = False,
        point_to_new_file: bool = True,
    ) -> None:
        """Filter out (DNA or peptide) sequences containing illegal characters.

        Args:
            output_file (Path, optional): path to output fasta file. Defaults to None.
            is_peptide (bool, optional): select if input is a peptide sequence, otherwise taken as nucleotide. Defaults to True.
            keep_stop_codon (bool, optional): whether to keep the stop codon in the peptide sequence. Defaults to False.
            point_to_new_file (bool, optional): whether FASTA object should point to the newly generated file. Defaults to True.
        """
        dirname = self._input_file.parent
        fname, ext = self._input_file.stem, self._input_file.suffix
        if output_file is None:
            output_file = Path(dirname) / f"{fname}_modified{ext}"
        else:
            output_file = Path(output_file)
        if is_peptide:
            isLegitSequence = is_legit_peptide_sequence
        else:
            isLegitSequence = is_legit_DNA_sequence

        fasta = pyfastx.Fasta(
            self.file_path.as_posix(), build_index=False, full_name=True
        )
        with open(output_file, "w+", encoding="UTF-8") as outfile:
            for record_name, record_seq in fasta:
                if is_peptide and (not keep_stop_codon):
                    record_seq = remove_stop_sodon_signals(record_seq)
                if isLegitSequence(record_seq):
                    outfile.write(f">{record_name}\n{record_seq}\n")
        if point_to_new_file:
            self.file_path = output_file

    def filter_by_IDs(
        self, record_ids: list, output_file: Path = None, point_to_new_file: bool = True
    ) -> None:
        """Filter records in fasta file matching provided IDs.

        Args:
            record_ids (list): list of record IDs to keep of original fasta file.
            output_file (Path, optional): path to output filtered fasta file. Defaults to None.
            point_to_new_file (bool, optional): whether FASTA object should point to the newly generated file. Defaults to True.
        """
        if output_file is None:
            output_file = (
                Path(self._input_file.parent)
                / f"{self._input_file.stem}_filtered{self._input_file.suffix}"
            )
        else:
            output_file = Path(output_file)
        wanted_ids = {record_id.lower() for record_id in record_ids}
        fasta = pyfastx.Fasta(
            self.file_path.as_posix(), build_index=False, full_name=True
        )
        with open(output_file, "w+", encoding="UTF-8") as outfile:
            for record_name, record_seq in fasta:
                record_id = record_name.split()[0]
                if record_id.lower() in wanted_ids:
                    outfile.write(f">{record_name}\n{record_seq}\n")
        if point_to_new_file:
            self.file_path = output_file

    def split_by_contigs(self, output_dir: Path = None) -> None:
        """Split large fasta file into several ones containing one contig each.

        Args:
            output_dir (Path, optional): _description_. Defaults to None.
        """
        if output_dir is None:
            output_dir = (
                Path(self._input_file.parent) / "split_" + self._input_file.name
            )
        else:
            output_dir = Path(output_dir)
        output_dir.mkdir(parents=True, exist_ok=True)
        contigs = pyfastx.Fasta(
            self.file_path.as_posix(), build_index=False, full_name=True
        )
        for contig_name, seq in contigs:
            output_file = (
                output_dir / f"{contig_name.split(' ')[0]}{self._input_file.suffix}"
            )
            with open(output_file, "w+", encoding="UTF-8") as outfile:
                outfile.write(f">{contig_name}\n")
                outfile.write(seq + "\n")

    def filter_by_minimum_length(
        self, min_length: int, output_file: Path = None, point_to_new_file: bool = True
    ) -> None:
        """Filter records in fasta file by minimum length.

        Args:
            min_length (int): minimal length of sequences to be kept in filtered fasta file.
            output_file (Path, optional): path to output filtered fasta file. Defaults to None.
            point_to_new_file (bool, optional): whether FASTA object should point to the newly generated file. Defaults to True.
        """
        if output_file is None:
            output_file = (
                Path(self._input_file.parent)
                / f"{self._input_file.stem}_minlength{self._input_file.suffix}"
            )
        else:
            output_file = Path(output_file)
        fasta = pyfastx.Fasta(
            self.file_path.as_posix(), build_index=False, full_name=True
        )
        with open(output_file, "w+", encoding="UTF-8") as outfile:
            for record_name, record_seq in fasta:
                if len(record_seq) >= min_length:
                    outfile.write(f">{record_name}\n{record_seq}\n")
        if point_to_new_file:
            self.file_path = output_file

    def add_prefix_to_records(
        self, prefix: str, output_file: Path = None, point_to_new_file: bool = True
    ) -> None:
        """Add prefix to sequence records in FASTA

        Args:
            prefix (str): prefix to be added.
            output_file (Path, optional): path to output filtered fasta file. Defaults to None.
            point_to_new_file (bool, optional): whether FASTA object should point to the newly generated file. Defaults to True.
        """
        if output_file is None:
            output_file = (
                Path(self._input_file.parent)
                / f"{self._input_file.stem}_prefixed{self._input_file.suffix}"
            )
        else:
            output_file = Path(output_file)
        fasta = pyfastx.Fasta(
            self.file_path.as_posix(), build_index=False, full_name=True
        )
        prefix = prefix.strip("_")
        # Write to a temporary file first and then move it into place. pyfastx
        # reads the input lazily, so writing directly to output_file would
        # truncate the source whenever output_file == self.file_path.
        with tempfile.NamedTemporaryFile(
            mode="w+t",
            encoding="UTF-8",
            dir=output_file.parent,
            suffix=output_file.suffix,
            delete=False,
        ) as outfile:
            tmp_path = Path(outfile.name)
            for record_name, record_seq in fasta:
                outfile.write(f">{prefix}_{record_name}\n{record_seq}\n")
        shutil.move(tmp_path.as_posix(), output_file.as_posix())
        if point_to_new_file:
            self.file_path = output_file

file_path property writable

Set new path to fasta file

Parameters:
  • new_path (Path) –

    path to fasta file. Defaults to None.

Returns:
  • Path( Path ) –

    path to fasta file.

__init__(input_file)

Initialize FASTA object.

Parameters:
  • input_file (Path) –

    path to input fasta file.

Source code in src/pynteny/preprocessing.py
150
151
152
153
154
155
156
def __init__(self, input_file: Path) -> None:
    """Initialize FASTA object.

    Args:
        input_file (Path): path to input fasta file.
    """
    self._input_file = Path(input_file)

add_prefix_to_records(prefix, output_file=None, point_to_new_file=True)

Add prefix to sequence records in FASTA

Parameters:
  • prefix (str) –

    prefix to be added.

  • output_file (Path, default: None ) –

    path to output filtered fasta file. Defaults to None.

  • point_to_new_file (bool, default: True ) –

    whether FASTA object should point to the newly generated file. Defaults to True.

Source code in src/pynteny/preprocessing.py
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
def add_prefix_to_records(
    self, prefix: str, output_file: Path = None, point_to_new_file: bool = True
) -> None:
    """Add prefix to sequence records in FASTA

    Args:
        prefix (str): prefix to be added.
        output_file (Path, optional): path to output filtered fasta file. Defaults to None.
        point_to_new_file (bool, optional): whether FASTA object should point to the newly generated file. Defaults to True.
    """
    if output_file is None:
        output_file = (
            Path(self._input_file.parent)
            / f"{self._input_file.stem}_prefixed{self._input_file.suffix}"
        )
    else:
        output_file = Path(output_file)
    fasta = pyfastx.Fasta(
        self.file_path.as_posix(), build_index=False, full_name=True
    )
    prefix = prefix.strip("_")
    # Write to a temporary file first and then move it into place. pyfastx
    # reads the input lazily, so writing directly to output_file would
    # truncate the source whenever output_file == self.file_path.
    with tempfile.NamedTemporaryFile(
        mode="w+t",
        encoding="UTF-8",
        dir=output_file.parent,
        suffix=output_file.suffix,
        delete=False,
    ) as outfile:
        tmp_path = Path(outfile.name)
        for record_name, record_seq in fasta:
            outfile.write(f">{prefix}_{record_name}\n{record_seq}\n")
    shutil.move(tmp_path.as_posix(), output_file.as_posix())
    if point_to_new_file:
        self.file_path = output_file

filter_by_IDs(record_ids, output_file=None, point_to_new_file=True)

Filter records in fasta file matching provided IDs.

Parameters:
  • record_ids (list) –

    list of record IDs to keep of original fasta file.

  • output_file (Path, default: None ) –

    path to output filtered fasta file. Defaults to None.

  • point_to_new_file (bool, default: True ) –

    whether FASTA object should point to the newly generated file. Defaults to True.

Source code in src/pynteny/preprocessing.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
def filter_by_IDs(
    self, record_ids: list, output_file: Path = None, point_to_new_file: bool = True
) -> None:
    """Filter records in fasta file matching provided IDs.

    Args:
        record_ids (list): list of record IDs to keep of original fasta file.
        output_file (Path, optional): path to output filtered fasta file. Defaults to None.
        point_to_new_file (bool, optional): whether FASTA object should point to the newly generated file. Defaults to True.
    """
    if output_file is None:
        output_file = (
            Path(self._input_file.parent)
            / f"{self._input_file.stem}_filtered{self._input_file.suffix}"
        )
    else:
        output_file = Path(output_file)
    wanted_ids = {record_id.lower() for record_id in record_ids}
    fasta = pyfastx.Fasta(
        self.file_path.as_posix(), build_index=False, full_name=True
    )
    with open(output_file, "w+", encoding="UTF-8") as outfile:
        for record_name, record_seq in fasta:
            record_id = record_name.split()[0]
            if record_id.lower() in wanted_ids:
                outfile.write(f">{record_name}\n{record_seq}\n")
    if point_to_new_file:
        self.file_path = output_file

filter_by_minimum_length(min_length, output_file=None, point_to_new_file=True)

Filter records in fasta file by minimum length.

Parameters:
  • min_length (int) –

    minimal length of sequences to be kept in filtered fasta file.

  • output_file (Path, default: None ) –

    path to output filtered fasta file. Defaults to None.

  • point_to_new_file (bool, default: True ) –

    whether FASTA object should point to the newly generated file. Defaults to True.

Source code in src/pynteny/preprocessing.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
def filter_by_minimum_length(
    self, min_length: int, output_file: Path = None, point_to_new_file: bool = True
) -> None:
    """Filter records in fasta file by minimum length.

    Args:
        min_length (int): minimal length of sequences to be kept in filtered fasta file.
        output_file (Path, optional): path to output filtered fasta file. Defaults to None.
        point_to_new_file (bool, optional): whether FASTA object should point to the newly generated file. Defaults to True.
    """
    if output_file is None:
        output_file = (
            Path(self._input_file.parent)
            / f"{self._input_file.stem}_minlength{self._input_file.suffix}"
        )
    else:
        output_file = Path(output_file)
    fasta = pyfastx.Fasta(
        self.file_path.as_posix(), build_index=False, full_name=True
    )
    with open(output_file, "w+", encoding="UTF-8") as outfile:
        for record_name, record_seq in fasta:
            if len(record_seq) >= min_length:
                outfile.write(f">{record_name}\n{record_seq}\n")
    if point_to_new_file:
        self.file_path = output_file

from_FASTA_directory(input_dir, merged_fasta=None, prepend_file_name=False) classmethod

Initialize FASTA class from directory of fasta files.

Parameters:
  • input_dir (Path) –

    path to input directory.

  • merged_fasta (Path, default: None ) –

    path to output merged fasta. Defaults to None.

  • prepend_file_name (bool, default: False ) –

    whether to add file name as genome ID to each record in the result merged fasta file.

Returns:
  • FASTA( FASTA ) –

    an initialized instance of class FASTA.

Source code in src/pynteny/preprocessing.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
@classmethod
def from_FASTA_directory(
    cls, input_dir: Path, merged_fasta: Path = None, prepend_file_name: bool = False
) -> FASTA:
    """Initialize FASTA class from directory of fasta files.

    Args:
        input_dir (Path): path to input directory.
        merged_fasta (Path, optional): path to output merged fasta. Defaults to None.
        prepend_file_name (bool, optional): whether to add file name as genome ID to
            each record in the result merged fasta file.

    Returns:
        FASTA: an initialized instance of class FASTA.
    """
    input_dir = Path(input_dir)
    if merged_fasta is None:
        merged_fasta = input_dir / "merged_database.fasta"
    else:
        merged_fasta = Path(merged_fasta)
    FASTAmerger(input_dir).merge(merged_fasta, prepend_file_name)
    return cls(merged_fasta)

remove_corrupted_sequences(output_file=None, is_peptide=True, keep_stop_codon=False, point_to_new_file=True)

Filter out (DNA or peptide) sequences containing illegal characters.

Parameters:
  • output_file (Path, default: None ) –

    path to output fasta file. Defaults to None.

  • is_peptide (bool, default: True ) –

    select if input is a peptide sequence, otherwise taken as nucleotide. Defaults to True.

  • keep_stop_codon (bool, default: False ) –

    whether to keep the stop codon in the peptide sequence. Defaults to False.

  • point_to_new_file (bool, default: True ) –

    whether FASTA object should point to the newly generated file. Defaults to True.

Source code in src/pynteny/preprocessing.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
def remove_corrupted_sequences(
    self,
    output_file: Path = None,
    is_peptide: bool = True,
    keep_stop_codon: bool = False,
    point_to_new_file: bool = True,
) -> None:
    """Filter out (DNA or peptide) sequences containing illegal characters.

    Args:
        output_file (Path, optional): path to output fasta file. Defaults to None.
        is_peptide (bool, optional): select if input is a peptide sequence, otherwise taken as nucleotide. Defaults to True.
        keep_stop_codon (bool, optional): whether to keep the stop codon in the peptide sequence. Defaults to False.
        point_to_new_file (bool, optional): whether FASTA object should point to the newly generated file. Defaults to True.
    """
    dirname = self._input_file.parent
    fname, ext = self._input_file.stem, self._input_file.suffix
    if output_file is None:
        output_file = Path(dirname) / f"{fname}_modified{ext}"
    else:
        output_file = Path(output_file)
    if is_peptide:
        isLegitSequence = is_legit_peptide_sequence
    else:
        isLegitSequence = is_legit_DNA_sequence

    fasta = pyfastx.Fasta(
        self.file_path.as_posix(), build_index=False, full_name=True
    )
    with open(output_file, "w+", encoding="UTF-8") as outfile:
        for record_name, record_seq in fasta:
            if is_peptide and (not keep_stop_codon):
                record_seq = remove_stop_sodon_signals(record_seq)
            if isLegitSequence(record_seq):
                outfile.write(f">{record_name}\n{record_seq}\n")
    if point_to_new_file:
        self.file_path = output_file

remove_duplicates(output_file=None, export_duplicates=False, point_to_new_file=True)

Removes duplicate entries (either by sequence or ID) from fasta.

Parameters:
  • output_file (Path, default: None ) –

    path to output fasta file. Defaults to None.

  • export_duplicates (bool, default: False ) –

    whether duplicated records are exported to a file. Defaults to False.

  • point_to_new_file (bool, default: True ) –

    whether FASTA object should point to the newly generated file. Defaults to True.

Yields:
  • None( None ) –

    None

Source code in src/pynteny/preprocessing.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def remove_duplicates(
    self,
    output_file: Path = None,
    export_duplicates: bool = False,
    point_to_new_file: bool = True,
) -> None:
    """Removes duplicate entries (either by sequence or ID) from fasta.

    Args:
        output_file (Path, optional): path to output fasta file. Defaults to None.
        export_duplicates (bool, optional): whether duplicated records are exported to a file. Defaults to False.
        point_to_new_file (bool, optional): whether FASTA object should point to the newly generated file. Defaults to True.

    Yields:
        None: None
    """
    if output_file is None:
        output_file = (
            Path(self._input_file.parent)
            / f"{self._input_file.stem}_noduplicates{self._input_file.suffix}"
        )
    else:
        output_file = Path(output_file)
    wrappers.run_seqkit_nodup(
        input_fasta=self._input_file,
        output_fasta=output_file,
        export_duplicates=export_duplicates,
    )
    if point_to_new_file:
        self.file_path = output_file

split_by_contigs(output_dir=None)

Split large fasta file into several ones containing one contig each.

Parameters:
  • output_dir (Path, default: None ) –

    description. Defaults to None.

Source code in src/pynteny/preprocessing.py
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
def split_by_contigs(self, output_dir: Path = None) -> None:
    """Split large fasta file into several ones containing one contig each.

    Args:
        output_dir (Path, optional): _description_. Defaults to None.
    """
    if output_dir is None:
        output_dir = (
            Path(self._input_file.parent) / "split_" + self._input_file.name
        )
    else:
        output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)
    contigs = pyfastx.Fasta(
        self.file_path.as_posix(), build_index=False, full_name=True
    )
    for contig_name, seq in contigs:
        output_file = (
            output_dir / f"{contig_name.split(' ')[0]}{self._input_file.suffix}"
        )
        with open(output_file, "w+", encoding="UTF-8") as outfile:
            outfile.write(f">{contig_name}\n")
            outfile.write(seq + "\n")

FASTAmerger

Source code in src/pynteny/preprocessing.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
class FASTAmerger:
    def __init__(self, input_dir: Path):
        self.input_dir = Path(input_dir)

    def prepend_filename_to_record_names(self, output_dir: Path) -> None:
        """Prepend file name to each record label in fasta file

        Args:
            output_dir (Path): _description_
        """
        output_dir = Path(output_dir)
        output_dir.mkdir(parents=True, exist_ok=True)
        for file in self.input_dir.iterdir():
            fasta = pyfastx.Fasta(file.as_posix(), build_index=False, full_name=True)
            new_file_content = [
                f">{file.stem}_{record_name}\n{record_seq}\n"
                for record_name, record_seq in fasta
            ]
            with open(output_dir / file.name, "w+", encoding="UTF-8") as outfile:
                outfile.writelines(new_file_content)

    def merge(self, output_file: Path = None, prepend_file_name: bool = False) -> None:
        """Merge input fasta files into a one (multi)fasta file.

        Args:
            output_file (Path, optional): path to ouput merged fasta file. Defaults to None.
            prepend_file_name (bool, optional): whether to add file name as genome ID to
                each record in the result merged fasta file.

        """
        if output_file is None:
            output_file = self.input_dir / "merged.fasta"
        else:
            output_file = Path(output_file)
        logger.info("Merging FASTA files in input directory")

        def _concatenate(source_dir: Path) -> None:
            source_dir = Path(source_dir)
            with open(output_file, "wb") as outfile:
                for file in sorted(source_dir.iterdir()):
                    if file.is_file():
                        with open(file, "rb") as infile:
                            shutil.copyfileobj(infile, outfile)

        if prepend_file_name:
            with tempfile.TemporaryDirectory() as tempdir:
                self.prepend_filename_to_record_names(output_dir=tempdir)
                _concatenate(tempdir)
        else:
            _concatenate(self.input_dir)
        logging.shutdown()

merge(output_file=None, prepend_file_name=False)

Merge input fasta files into a one (multi)fasta file.

Parameters:
  • output_file (Path, default: None ) –

    path to ouput merged fasta file. Defaults to None.

  • prepend_file_name (bool, default: False ) –

    whether to add file name as genome ID to each record in the result merged fasta file.

Source code in src/pynteny/preprocessing.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def merge(self, output_file: Path = None, prepend_file_name: bool = False) -> None:
    """Merge input fasta files into a one (multi)fasta file.

    Args:
        output_file (Path, optional): path to ouput merged fasta file. Defaults to None.
        prepend_file_name (bool, optional): whether to add file name as genome ID to
            each record in the result merged fasta file.

    """
    if output_file is None:
        output_file = self.input_dir / "merged.fasta"
    else:
        output_file = Path(output_file)
    logger.info("Merging FASTA files in input directory")

    def _concatenate(source_dir: Path) -> None:
        source_dir = Path(source_dir)
        with open(output_file, "wb") as outfile:
            for file in sorted(source_dir.iterdir()):
                if file.is_file():
                    with open(file, "rb") as infile:
                        shutil.copyfileobj(infile, outfile)

    if prepend_file_name:
        with tempfile.TemporaryDirectory() as tempdir:
            self.prepend_filename_to_record_names(output_dir=tempdir)
            _concatenate(tempdir)
    else:
        _concatenate(self.input_dir)
    logging.shutdown()

prepend_filename_to_record_names(output_dir)

Prepend file name to each record label in fasta file

Parameters:
  • output_dir (Path) –

    description

Source code in src/pynteny/preprocessing.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def prepend_filename_to_record_names(self, output_dir: Path) -> None:
    """Prepend file name to each record label in fasta file

    Args:
        output_dir (Path): _description_
    """
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)
    for file in self.input_dir.iterdir():
        fasta = pyfastx.Fasta(file.as_posix(), build_index=False, full_name=True)
        new_file_content = [
            f">{file.stem}_{record_name}\n{record_seq}\n"
            for record_name, record_seq in fasta
        ]
        with open(output_dir / file.name, "w+", encoding="UTF-8") as outfile:
            outfile.writelines(new_file_content)

GeneAnnotator

Run prodigal on assembly, predict ORFs and extract location info

Source code in src/pynteny/preprocessing.py
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
class GeneAnnotator:
    """Run prodigal on assembly, predict ORFs and extract location info"""

    def __init__(self, assembly: FASTA) -> None:
        """Initialize GeneAnnotator

        Args:
            assembly (FASTA): fasta object containing assembled nucleotide sequences
        """
        self._assembly = assembly

    def annotate(
        self,
        processes: int = None,
        metagenome: bool = True,
        output_file: Path = None,
        prodigal_args: str = None,
        tempdir: Path = None,
    ) -> LabelledFASTA:
        """Run prodigal on assembly and export single fasta file with peptide ORFs predictions

        Args:
            processes (int, optional): maximum number of threads. Defaults to all minus one.
            metagenome (bool, optional): whether assembled sequences correspond to metagenomic data. Defaults to True.
            output_file (Path, optional): path to output fasta file. Defaults to None.
            prodigal_args (str, optional): additional arguments to be passed to prodigal CLI. Defaults to None.
            tempdir (Path, optional): path to temporary directory. Defaults to tempfile default.

        Returns:
            LabelledFASTA: object containing the labelled peptide database.
        """
        if processes is None:
            processes = os.cpu_count() - 1
        if output_file is None:
            output_file = (
                self._assembly.file_path.parent
                / f"{self._assembly.file_path.stem}_annotated.faa"
            )
        else:
            output_file = Path(output_file)
        if prodigal_args:
            logger.warning(
                "prodigal_args are ignored when predicting genes with pyrodigal: "
                f"'{prodigal_args}'"
            )
        if tempdir is None:
            tempdir = Path(tempfile.gettempdir())
        else:
            tempdir = Path(tempdir).resolve()

        logger.info("Predicting genes with pyrodigal")
        assembly_path = self._assembly.file_path
        gene_finder = pyrodigal.GeneFinder(meta=metagenome)
        if not metagenome:
            training_seqs = [
                seq.encode()
                for _, seq in pyfastx.Fasta(
                    assembly_path.as_posix(), build_index=False, full_name=True
                )
            ]
            gene_finder.train(*training_seqs)

        contigs = pyfastx.Fasta(
            assembly_path.as_posix(), build_index=False, full_name=True
        )

        # pyrodigal releases the GIL during find_genes, so we can predict genes
        # for several contigs concurrently while keeping input order for the
        # output (and thus reproducible gene labels).
        def _find(record):
            record_name, record_seq = record
            seq_id = record_name.split()[0]
            return seq_id, gene_finder.find_genes(record_seq.encode())

        with tempfile.NamedTemporaryFile(
            mode="w+t", dir=tempdir, suffix=".faa"
        ) as temp_fasta:
            with ThreadPoolExecutor(max_workers=max(processes, 1)) as executor:
                for seq_id, genes in executor.map(_find, contigs):
                    genes.write_translations(temp_fasta, sequence_id=seq_id)
            temp_fasta.flush()
            logging.shutdown()
            return LabelledFASTA.from_prodigal_output(
                Path(temp_fasta.name), output_file
            )

__init__(assembly)

Initialize GeneAnnotator

Parameters:
  • assembly (FASTA) –

    fasta object containing assembled nucleotide sequences

Source code in src/pynteny/preprocessing.py
537
538
539
540
541
542
543
def __init__(self, assembly: FASTA) -> None:
    """Initialize GeneAnnotator

    Args:
        assembly (FASTA): fasta object containing assembled nucleotide sequences
    """
    self._assembly = assembly

annotate(processes=None, metagenome=True, output_file=None, prodigal_args=None, tempdir=None)

Run prodigal on assembly and export single fasta file with peptide ORFs predictions

Parameters:
  • processes (int, default: None ) –

    maximum number of threads. Defaults to all minus one.

  • metagenome (bool, default: True ) –

    whether assembled sequences correspond to metagenomic data. Defaults to True.

  • output_file (Path, default: None ) –

    path to output fasta file. Defaults to None.

  • prodigal_args (str, default: None ) –

    additional arguments to be passed to prodigal CLI. Defaults to None.

  • tempdir (Path, default: None ) –

    path to temporary directory. Defaults to tempfile default.

Returns:
  • LabelledFASTA( LabelledFASTA ) –

    object containing the labelled peptide database.

Source code in src/pynteny/preprocessing.py
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
def annotate(
    self,
    processes: int = None,
    metagenome: bool = True,
    output_file: Path = None,
    prodigal_args: str = None,
    tempdir: Path = None,
) -> LabelledFASTA:
    """Run prodigal on assembly and export single fasta file with peptide ORFs predictions

    Args:
        processes (int, optional): maximum number of threads. Defaults to all minus one.
        metagenome (bool, optional): whether assembled sequences correspond to metagenomic data. Defaults to True.
        output_file (Path, optional): path to output fasta file. Defaults to None.
        prodigal_args (str, optional): additional arguments to be passed to prodigal CLI. Defaults to None.
        tempdir (Path, optional): path to temporary directory. Defaults to tempfile default.

    Returns:
        LabelledFASTA: object containing the labelled peptide database.
    """
    if processes is None:
        processes = os.cpu_count() - 1
    if output_file is None:
        output_file = (
            self._assembly.file_path.parent
            / f"{self._assembly.file_path.stem}_annotated.faa"
        )
    else:
        output_file = Path(output_file)
    if prodigal_args:
        logger.warning(
            "prodigal_args are ignored when predicting genes with pyrodigal: "
            f"'{prodigal_args}'"
        )
    if tempdir is None:
        tempdir = Path(tempfile.gettempdir())
    else:
        tempdir = Path(tempdir).resolve()

    logger.info("Predicting genes with pyrodigal")
    assembly_path = self._assembly.file_path
    gene_finder = pyrodigal.GeneFinder(meta=metagenome)
    if not metagenome:
        training_seqs = [
            seq.encode()
            for _, seq in pyfastx.Fasta(
                assembly_path.as_posix(), build_index=False, full_name=True
            )
        ]
        gene_finder.train(*training_seqs)

    contigs = pyfastx.Fasta(
        assembly_path.as_posix(), build_index=False, full_name=True
    )

    # pyrodigal releases the GIL during find_genes, so we can predict genes
    # for several contigs concurrently while keeping input order for the
    # output (and thus reproducible gene labels).
    def _find(record):
        record_name, record_seq = record
        seq_id = record_name.split()[0]
        return seq_id, gene_finder.find_genes(record_seq.encode())

    with tempfile.NamedTemporaryFile(
        mode="w+t", dir=tempdir, suffix=".faa"
    ) as temp_fasta:
        with ThreadPoolExecutor(max_workers=max(processes, 1)) as executor:
            for seq_id, genes in executor.map(_find, contigs):
                genes.write_translations(temp_fasta, sequence_id=seq_id)
        temp_fasta.flush()
        logging.shutdown()
        return LabelledFASTA.from_prodigal_output(
            Path(temp_fasta.name), output_file
        )

LabelledFASTA

Bases: FASTA

Tools to add and parse FASTA with positional info on record tags

Source code in src/pynteny/preprocessing.py
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
class LabelledFASTA(FASTA):
    """Tools to add and parse FASTA with positional info on record tags"""

    @classmethod
    def from_prodigal_output(
        cls, prodigal_faa: Path, output_file: Path = None
    ) -> LabelledFASTA:
        """Instantiate class from prodigal output file.
        Extract positional gene info from prodigal output and export to fasta file.

        Args:
            prodigal_faa (Path): path to prodigal output file containing peptide sequences
            output_file (Path, optional): path to output labelled fasta file. Defaults to None.

        Returns:
            LabelledFASTA: object containing the labelled peptide database.
        """
        number_prodigal_record_fields = 9
        prodigal_faa = Path(prodigal_faa)
        if output_file is None:
            output_file = prodigal_faa.parent / f"{prodigal_faa.stem}_longlabels.fasta"
        else:
            output_file = Path(output_file)
        data = pyfastx.Fasta(prodigal_faa.as_posix(), build_index=False, full_name=True)
        with open(output_file, "w+", encoding="UTF-8") as outfile:
            for record_name, record_seq in data:
                name_list = record_name.split(" ")
                if len(name_list) < number_prodigal_record_fields:
                    logger.error(
                        f"Invalid prodigal header format for record: {record_name}"
                    )
                    sys.exit(1)
                contig = "_".join(name_list[0].split("_")[:-1])
                gene_number = name_list[0].split("_")[-1]
                start, end = name_list[2], name_list[4]
                strand = (
                    "pos"
                    if name_list[6] == "1"
                    else ("neg" if name_list[6] == "-1" else "")
                )
                header = f">{contig}_{gene_number}__{contig}_{gene_number}_{start}_{end}_{strand}"
                outfile.write(header + "\n")
                outfile.write(record_seq + "\n")
        logging.shutdown()
        return cls(output_file)

    @classmethod
    def from_genbank(
        cls,
        gbk_data: Path,
        output_file: Path = None,
        prefix: str = None,
        nucleotide: bool = False,
        prepend_file_name: bool = False,
    ) -> LabelledFASTA:
        """Assign gene positional info, such as contig, gene number and loci
        to each record in genbank database and return LabelledFASTA object.

        Args:
            gbk_data (Path): path to file or directory contanining genbank files
            output_file (Path, optional): path to output labelled fasta file. Defaults to None.
            prefix (str, optional): prefix for output file. Defaults to None.
            nucleotide (bool, optional): whether records corresponds to nucleotide sequences instead of peptides. Defaults to False.
            prepend_file_name (bool, optional): whether to add file name as genome ID to
                each record in the result merged fasta file.
        Returns:
            LabelledFASTA: object containing the labelled peptide database.
        """
        gbk_data = Path(gbk_data)
        if gbk_data.is_dir():
            gbk_files = [gbk_data / f for f in gbk_data.iterdir()]
        else:
            gbk_files = [gbk_data]

        if prepend_file_name:

            def gbk_name(gbk_file):
                return gbk_file.stem

        else:

            def gbk_name(gbk_file):
                return

        gbk_contigs = [
            (gbk_name(gbk_file), contig)
            for gbk_file in gbk_files
            for contig in SeqIO.parse(gbk_file, "genbank")
        ]

        if output_file is None:
            output_file = Path(gbk_files[0].parent) / f"{prefix}sequence_database.fasta"
        else:
            output_file = Path(output_file)

        with open(output_file, "w+", encoding="UTF-8") as outfile:
            for gbk_file_name, gbk_contig in gbk_contigs:
                gene_counter = 0
                for feature in gbk_contig.features:
                    if "cds" in feature.type.lower():
                        gene_counter = cls.write_record(
                            gbk_contig,
                            feature,
                            outfile,
                            gene_counter,
                            gbk_file_name,
                            nucleotide,
                        )
        return cls(output_file)

    @staticmethod
    def get_label_str(
        gbk_contig: SeqRecord, feature: SeqFeature, gene_counter: int
    ) -> str:
        name = feature.qualifiers["locus_tag"][0].replace("_", ".")
        start, end, strand = (
            str(feature.location.start),
            str(feature.location.end),
            feature.location.strand,
        )
        start = start.replace(">", "").replace("<", "")
        end = end.replace(">", "").replace("<", "")
        strand_sense = "neg" if strand == -1 else ("pos" if strand == 1 else "")
        return f">{name}__{gbk_contig.name.replace('_', '')}_{gene_counter}_{start}_{end}_{strand_sense}\n"

    @staticmethod
    def write_record(
        gbk_contig: SeqRecord,
        feature: SeqFeature,
        output_file: TextIO,
        gene_counter: int,
        prefix: str = None,
        nucleotide: bool = False,
    ) -> int:
        header = LabelledFASTA.get_label_str(gbk_contig, feature, gene_counter)
        if prefix is not None:
            header = f"{prefix}_{header}"
        if (not nucleotide) and ("translation" in feature.qualifiers):
            sequence = feature.qualifiers["translation"][0]
        elif nucleotide:
            sequence = str(feature.extract(gbk_contig).seq)
        else:
            return gene_counter
        output_file.write(header)
        output_file.write(sequence + "\n")
        gene_counter += 1
        return gene_counter

from_genbank(gbk_data, output_file=None, prefix=None, nucleotide=False, prepend_file_name=False) classmethod

Assign gene positional info, such as contig, gene number and loci to each record in genbank database and return LabelledFASTA object.

Parameters:
  • gbk_data (Path) –

    path to file or directory contanining genbank files

  • output_file (Path, default: None ) –

    path to output labelled fasta file. Defaults to None.

  • prefix (str, default: None ) –

    prefix for output file. Defaults to None.

  • nucleotide (bool, default: False ) –

    whether records corresponds to nucleotide sequences instead of peptides. Defaults to False.

  • prepend_file_name (bool, default: False ) –

    whether to add file name as genome ID to each record in the result merged fasta file.

Returns: LabelledFASTA: object containing the labelled peptide database.

Source code in src/pynteny/preprocessing.py
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
@classmethod
def from_genbank(
    cls,
    gbk_data: Path,
    output_file: Path = None,
    prefix: str = None,
    nucleotide: bool = False,
    prepend_file_name: bool = False,
) -> LabelledFASTA:
    """Assign gene positional info, such as contig, gene number and loci
    to each record in genbank database and return LabelledFASTA object.

    Args:
        gbk_data (Path): path to file or directory contanining genbank files
        output_file (Path, optional): path to output labelled fasta file. Defaults to None.
        prefix (str, optional): prefix for output file. Defaults to None.
        nucleotide (bool, optional): whether records corresponds to nucleotide sequences instead of peptides. Defaults to False.
        prepend_file_name (bool, optional): whether to add file name as genome ID to
            each record in the result merged fasta file.
    Returns:
        LabelledFASTA: object containing the labelled peptide database.
    """
    gbk_data = Path(gbk_data)
    if gbk_data.is_dir():
        gbk_files = [gbk_data / f for f in gbk_data.iterdir()]
    else:
        gbk_files = [gbk_data]

    if prepend_file_name:

        def gbk_name(gbk_file):
            return gbk_file.stem

    else:

        def gbk_name(gbk_file):
            return

    gbk_contigs = [
        (gbk_name(gbk_file), contig)
        for gbk_file in gbk_files
        for contig in SeqIO.parse(gbk_file, "genbank")
    ]

    if output_file is None:
        output_file = Path(gbk_files[0].parent) / f"{prefix}sequence_database.fasta"
    else:
        output_file = Path(output_file)

    with open(output_file, "w+", encoding="UTF-8") as outfile:
        for gbk_file_name, gbk_contig in gbk_contigs:
            gene_counter = 0
            for feature in gbk_contig.features:
                if "cds" in feature.type.lower():
                    gene_counter = cls.write_record(
                        gbk_contig,
                        feature,
                        outfile,
                        gene_counter,
                        gbk_file_name,
                        nucleotide,
                    )
    return cls(output_file)

from_prodigal_output(prodigal_faa, output_file=None) classmethod

Instantiate class from prodigal output file. Extract positional gene info from prodigal output and export to fasta file.

Parameters:
  • prodigal_faa (Path) –

    path to prodigal output file containing peptide sequences

  • output_file (Path, default: None ) –

    path to output labelled fasta file. Defaults to None.

Returns:
  • LabelledFASTA( LabelledFASTA ) –

    object containing the labelled peptide database.

Source code in src/pynteny/preprocessing.py
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
@classmethod
def from_prodigal_output(
    cls, prodigal_faa: Path, output_file: Path = None
) -> LabelledFASTA:
    """Instantiate class from prodigal output file.
    Extract positional gene info from prodigal output and export to fasta file.

    Args:
        prodigal_faa (Path): path to prodigal output file containing peptide sequences
        output_file (Path, optional): path to output labelled fasta file. Defaults to None.

    Returns:
        LabelledFASTA: object containing the labelled peptide database.
    """
    number_prodigal_record_fields = 9
    prodigal_faa = Path(prodigal_faa)
    if output_file is None:
        output_file = prodigal_faa.parent / f"{prodigal_faa.stem}_longlabels.fasta"
    else:
        output_file = Path(output_file)
    data = pyfastx.Fasta(prodigal_faa.as_posix(), build_index=False, full_name=True)
    with open(output_file, "w+", encoding="UTF-8") as outfile:
        for record_name, record_seq in data:
            name_list = record_name.split(" ")
            if len(name_list) < number_prodigal_record_fields:
                logger.error(
                    f"Invalid prodigal header format for record: {record_name}"
                )
                sys.exit(1)
            contig = "_".join(name_list[0].split("_")[:-1])
            gene_number = name_list[0].split("_")[-1]
            start, end = name_list[2], name_list[4]
            strand = (
                "pos"
                if name_list[6] == "1"
                else ("neg" if name_list[6] == "-1" else "")
            )
            header = f">{contig}_{gene_number}__{contig}_{gene_number}_{start}_{end}_{strand}"
            outfile.write(header + "\n")
            outfile.write(record_seq + "\n")
    logging.shutdown()
    return cls(output_file)

is_legit_DNA_sequence(record_seq)

Assert that DNA sequence only contains valid symbols.

Parameters:
  • record_seq (str) –

    nucleotide sequence.

Returns:
  • bool( bool ) –

    whether nucleotide sequence only contains legit symbols.

Source code in src/pynteny/preprocessing.py
80
81
82
83
84
85
86
87
88
89
90
91
def is_legit_DNA_sequence(record_seq: str) -> bool:
    """Assert that DNA sequence only contains valid symbols.

    Args:
        record_seq (str): nucleotide sequence.

    Returns:
        bool: whether nucleotide sequence only contains legit symbols.
    """
    nts = {"A", "G", "T", "C", "N"}
    seq_symbols = {s.upper() for s in record_seq}
    return seq_symbols.issubset(nts)

is_legit_peptide_sequence(record_seq)

Assert that peptide sequence only contains valid symbols.

Parameters:
  • record_seq (str) –

    peptide sequence.

Returns:
  • bool( bool ) –

    whether peptide sequence only contains legit symbols.

Source code in src/pynteny/preprocessing.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def is_legit_peptide_sequence(record_seq: str) -> bool:
    """Assert that peptide sequence only contains valid symbols.

    Args:
        record_seq (str): peptide sequence.

    Returns:
        bool: whether peptide sequence only contains legit symbols.
    """
    aas = {
        "A",
        "C",
        "D",
        "E",
        "F",
        "G",
        "H",
        "I",
        "K",
        "L",
        "M",
        "N",
        "P",
        "Q",
        "R",
        "S",
        "T",
        "V",
        "W",
        "Y",
        "*",
    }
    seq_symbols = {s.upper() for s in record_seq}
    return seq_symbols.issubset(aas)

remove_stop_sodon_signals(record_seq)

Remove stop codon signals from peptide sequence

Parameters:
  • record_seq (str) –

    peptide sequence.

Returns:
  • str( str ) –

    a peptide sequence without stop codon symbols.

Source code in src/pynteny/preprocessing.py
32
33
34
35
36
37
38
39
40
41
def remove_stop_sodon_signals(record_seq: str) -> str:
    """Remove stop codon signals from peptide sequence

    Args:
        record_seq (str): peptide sequence.

    Returns:
        str: a peptide sequence without stop codon symbols.
    """
    return record_seq.replace("*", "")

Module hmm

Tools to parse Hmmer output and PGAP (HMM) database

HMMDatabase

Base class for HMM databases

Source code in src/pynteny/hmm.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
class HMMDatabase:
    """Base class for HMM databases"""

    def __init__(
        self,
        metadata_file: Path,
        metadata_columns: list[str] = None,
    ) -> None:
        """Initialize class HMMDatabase

        Args:
            metadata_file (Path): path to metadata file.
            metadata_columns (list[str], optional): list of metadata columns to be
                                                    used. Defaults to None (all columns).
        """
        self._metadata_file = Path(metadata_file)
        self._meta = pd.read_csv(
            self._metadata_file, sep="\t", usecols=metadata_columns
        )
        if "#ncbi_accession" in self._meta.columns:
            self._meta = self._meta.rename(columns={"#ncbi_accession": "accession"})

    @property
    def meta(self) -> pd.DataFrame:
        """Return PFAM metadata

        Returns:
            pd.DataFrame: PFAM metadata
        """
        return self._meta

    @property
    def meta_file(self) -> Path:
        """Return path to  metadata file

        Returns:
            Path: metadata file
        """
        return self._metadata_file

    def get_HMM_names_by_gene_symbol(self, gene_symbol: str) -> list[str]:
        """Try to retrieve HMM by its gene symbol, more
           than one HMM may map to a single gene symbol

        Args:
            gene_symbol (str): gene symbol to be searched for HMM.

        Returns:
            list[str]: list of HMM names matching gene symbol.
        """
        meta = self._meta  # .dropna(subset=["gene_symbol", "label"], axis=0)
        return meta[((meta.gene_symbol == gene_symbol) | (meta.label == gene_symbol))][
            "accession"
        ].values.tolist()

    def get_HMM_group_for_gene_symbol(self, gene_symbol: str) -> str:
        """Get HMMs corresponding to gene symbol in PGAP metadata.
           If more than one HMM matching gene symbol, return a HMM group

        Args:
            gene_symbol (str): gene symbol to be searched for HMM.

        Returns:
            str: string of HMM names (group separated by |)
        """
        hmms = self.get_HMM_names_by_gene_symbol(gene_symbol)
        if not hmms:
            logger.error(f"No HMM found for gene {gene_symbol}")
            sys.exit(1)
        if len(hmms) == 1:
            return hmms.pop()
        else:
            return "|".join(hmms)

    def get_HMM_gene_ID(self, hmm_name: str) -> list[str]:
        """Get gene symbols matching given hmm.

        Args:
            hmm_name (str): query HMM name.

        Returns:
            list[str]: list of gene symbols matching given HMM.
        """
        meta = self._meta.dropna(subset=["accession"], axis=0)
        return meta[meta["accession"] == hmm_name]["gene_symbol"].values.tolist()

    def get_meta_info_for_HMM(self, hmm_name: str) -> dict:
        """Get meta info for given hmm.

        Args:
            hmm_name (str): query HMM name.

        Returns:
            dict: metadata of provided HMM.
        """
        meta = self._meta.dropna(subset=["accession"], axis=0).fillna("")
        metadata = {
            k: list(v.values())[0] if list(v.values())[0] else "undef"
            for k, v in meta[meta["accession"] == hmm_name].to_dict().items()
        }
        if not metadata:
            logger.warning(f"No metadata for HMM: {hmm_name}")
        return metadata

meta property

Return PFAM metadata

Returns:
  • DataFrame

    pd.DataFrame: PFAM metadata

meta_file property

Return path to metadata file

Returns:
  • Path( Path ) –

    metadata file

__init__(metadata_file, metadata_columns=None)

Initialize class HMMDatabase

Parameters:
  • metadata_file (Path) –

    path to metadata file.

  • metadata_columns (list[str], default: None ) –

    list of metadata columns to be used. Defaults to None (all columns).

Source code in src/pynteny/hmm.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def __init__(
    self,
    metadata_file: Path,
    metadata_columns: list[str] = None,
) -> None:
    """Initialize class HMMDatabase

    Args:
        metadata_file (Path): path to metadata file.
        metadata_columns (list[str], optional): list of metadata columns to be
                                                used. Defaults to None (all columns).
    """
    self._metadata_file = Path(metadata_file)
    self._meta = pd.read_csv(
        self._metadata_file, sep="\t", usecols=metadata_columns
    )
    if "#ncbi_accession" in self._meta.columns:
        self._meta = self._meta.rename(columns={"#ncbi_accession": "accession"})

get_HMM_gene_ID(hmm_name)

Get gene symbols matching given hmm.

Parameters:
  • hmm_name (str) –

    query HMM name.

Returns:
  • list[str]

    list[str]: list of gene symbols matching given HMM.

Source code in src/pynteny/hmm.py
204
205
206
207
208
209
210
211
212
213
214
def get_HMM_gene_ID(self, hmm_name: str) -> list[str]:
    """Get gene symbols matching given hmm.

    Args:
        hmm_name (str): query HMM name.

    Returns:
        list[str]: list of gene symbols matching given HMM.
    """
    meta = self._meta.dropna(subset=["accession"], axis=0)
    return meta[meta["accession"] == hmm_name]["gene_symbol"].values.tolist()

get_HMM_group_for_gene_symbol(gene_symbol)

Get HMMs corresponding to gene symbol in PGAP metadata. If more than one HMM matching gene symbol, return a HMM group

Parameters:
  • gene_symbol (str) –

    gene symbol to be searched for HMM.

Returns:
  • str( str ) –

    string of HMM names (group separated by |)

Source code in src/pynteny/hmm.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def get_HMM_group_for_gene_symbol(self, gene_symbol: str) -> str:
    """Get HMMs corresponding to gene symbol in PGAP metadata.
       If more than one HMM matching gene symbol, return a HMM group

    Args:
        gene_symbol (str): gene symbol to be searched for HMM.

    Returns:
        str: string of HMM names (group separated by |)
    """
    hmms = self.get_HMM_names_by_gene_symbol(gene_symbol)
    if not hmms:
        logger.error(f"No HMM found for gene {gene_symbol}")
        sys.exit(1)
    if len(hmms) == 1:
        return hmms.pop()
    else:
        return "|".join(hmms)

get_HMM_names_by_gene_symbol(gene_symbol)

Try to retrieve HMM by its gene symbol, more than one HMM may map to a single gene symbol

Parameters:
  • gene_symbol (str) –

    gene symbol to be searched for HMM.

Returns:
  • list[str]

    list[str]: list of HMM names matching gene symbol.

Source code in src/pynteny/hmm.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def get_HMM_names_by_gene_symbol(self, gene_symbol: str) -> list[str]:
    """Try to retrieve HMM by its gene symbol, more
       than one HMM may map to a single gene symbol

    Args:
        gene_symbol (str): gene symbol to be searched for HMM.

    Returns:
        list[str]: list of HMM names matching gene symbol.
    """
    meta = self._meta  # .dropna(subset=["gene_symbol", "label"], axis=0)
    return meta[((meta.gene_symbol == gene_symbol) | (meta.label == gene_symbol))][
        "accession"
    ].values.tolist()

get_meta_info_for_HMM(hmm_name)

Get meta info for given hmm.

Parameters:
  • hmm_name (str) –

    query HMM name.

Returns:
  • dict( dict ) –

    metadata of provided HMM.

Source code in src/pynteny/hmm.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def get_meta_info_for_HMM(self, hmm_name: str) -> dict:
    """Get meta info for given hmm.

    Args:
        hmm_name (str): query HMM name.

    Returns:
        dict: metadata of provided HMM.
    """
    meta = self._meta.dropna(subset=["accession"], axis=0).fillna("")
    metadata = {
        k: list(v.values())[0] if list(v.values())[0] else "undef"
        for k, v in meta[meta["accession"] == hmm_name].to_dict().items()
    }
    if not metadata:
        logger.warning(f"No metadata for HMM: {hmm_name}")
    return metadata

HMMER

Run Hmmer on multiple hmms and parse output

Source code in src/pynteny/hmm.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class HMMER:
    """Run Hmmer on multiple hmms and parse output"""

    def __init__(
        self,
        input_hmms: list[Path],
        hmm_output_dir: Path,
        input_data: Path,
        additional_args: list[str] = None,
        processes: int = None,
    ):
        """Initialize class HMMER

        Args:
            input_hmms (list[Path]): list of paths to input HMM files.
            hmm_output_dir (Path): path to output directory to HMMER output files.
            input_data (Path): path to input fasta file with sequence database.
            additional_args (list[str]): additional arguments to hmmsearch or hmmscan. Each
                element in the list is a string with additional arguments for each
                input hmm (arranged in the same order), an element can also take a
                value of None to avoid passing additional arguments for a specific
                input hmm. A single string may also be passed, in which case the
                same additional argument is passed to hmmsearch for all input hmms.
                Defaults to None.
            processes (int, optional): maximum number of threads to be employed.
                Defaults to all minus one.
        """
        if additional_args is None:
            additional_args = [None for _ in range(len(input_hmms))]
        self._hmmer_output_dir = Path(hmm_output_dir)
        self._input_hmms = [Path(hmm) for hmm in input_hmms]
        self._input_fasta = Path(input_data)
        self._additional_args = additional_args
        self._processes = processes

    @property
    def hmm_names(self) -> list[str]:
        """Get file names of input HMMs

        Returns:
            list[str]: list of file names.
        """
        return [hmm_path.stem for hmm_path in self._input_hmms]

    @staticmethod
    def parse_HMM_search_output(hmmer_output: Path) -> pd.DataFrame:
        """Parse hmmsearch or hmmscan summary table output file.

        Args:
            hmmer_output (str): path to HMMER output file.

        Returns:
            pd.DataFrame: a dataframe containing parsed HMMER output.
        """
        hmmer_output = Path(hmmer_output)
        attribs = ["id", "bias", "bitscore", "description"]
        hits = defaultdict(list)
        with open(hmmer_output, encoding="UTF-8") as handle:
            for queryresult in SearchIO.parse(handle, "hmmer3-tab"):
                for hit in queryresult.hits:
                    for attrib in attribs:
                        hits[attrib].append(getattr(hit, attrib))
        return pd.DataFrame.from_dict(hits)

    def get_HMMER_tables(
        self, reuse_hmmer_results: bool = True, method: str = "hmmsearch"
    ) -> dict[pd.DataFrame]:
        """Run hmmer for given hmm list

        Args:
            reuse_hmmer_results (bool, optional): if True then HMMER3 won't be run
                again for HMMs already searched in the same output directory. Defaults to True.
            method (str, optional): select between 'hmmsearch' or 'hmmscan'. Defaults to 'hmmsearch'.

        Returns:
            dict[pd.DataFrame]: dict of HMMER hits as pandas dataframes.
        """
        hmm_hits = {}
        for hmm_model, add_args in zip(self._input_hmms, self._additional_args):
            hmm_name = hmm_model.stem
            hmmer_output = Path(self._hmmer_output_dir) / f"hmmer_output_{hmm_name}.txt"

            if not (reuse_hmmer_results and os.path.isfile(hmmer_output)):
                wrappers.run_HMM_search(
                    hmm_model=hmm_model,
                    input_fasta=self._input_fasta,
                    output_file=hmmer_output,
                    method=method,
                    processes=self._processes,
                    additional_args=add_args,
                )
            elif reuse_hmmer_results and os.path.isfile(hmmer_output):
                logger.info(f"Reusing Hmmer results for HMM: {hmm_name}")
            hmm_hits[hmm_name] = HMMER.parse_HMM_search_output(hmmer_output)
        return hmm_hits

hmm_names property

Get file names of input HMMs

Returns:
  • list[str]

    list[str]: list of file names.

__init__(input_hmms, hmm_output_dir, input_data, additional_args=None, processes=None)

Initialize class HMMER

Parameters:
  • input_hmms (list[Path]) –

    list of paths to input HMM files.

  • hmm_output_dir (Path) –

    path to output directory to HMMER output files.

  • input_data (Path) –

    path to input fasta file with sequence database.

  • additional_args (list[str], default: None ) –

    additional arguments to hmmsearch or hmmscan. Each element in the list is a string with additional arguments for each input hmm (arranged in the same order), an element can also take a value of None to avoid passing additional arguments for a specific input hmm. A single string may also be passed, in which case the same additional argument is passed to hmmsearch for all input hmms. Defaults to None.

  • processes (int, default: None ) –

    maximum number of threads to be employed. Defaults to all minus one.

Source code in src/pynteny/hmm.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def __init__(
    self,
    input_hmms: list[Path],
    hmm_output_dir: Path,
    input_data: Path,
    additional_args: list[str] = None,
    processes: int = None,
):
    """Initialize class HMMER

    Args:
        input_hmms (list[Path]): list of paths to input HMM files.
        hmm_output_dir (Path): path to output directory to HMMER output files.
        input_data (Path): path to input fasta file with sequence database.
        additional_args (list[str]): additional arguments to hmmsearch or hmmscan. Each
            element in the list is a string with additional arguments for each
            input hmm (arranged in the same order), an element can also take a
            value of None to avoid passing additional arguments for a specific
            input hmm. A single string may also be passed, in which case the
            same additional argument is passed to hmmsearch for all input hmms.
            Defaults to None.
        processes (int, optional): maximum number of threads to be employed.
            Defaults to all minus one.
    """
    if additional_args is None:
        additional_args = [None for _ in range(len(input_hmms))]
    self._hmmer_output_dir = Path(hmm_output_dir)
    self._input_hmms = [Path(hmm) for hmm in input_hmms]
    self._input_fasta = Path(input_data)
    self._additional_args = additional_args
    self._processes = processes

get_HMMER_tables(reuse_hmmer_results=True, method='hmmsearch')

Run hmmer for given hmm list

Parameters:
  • reuse_hmmer_results (bool, default: True ) –

    if True then HMMER3 won't be run again for HMMs already searched in the same output directory. Defaults to True.

  • method (str, default: 'hmmsearch' ) –

    select between 'hmmsearch' or 'hmmscan'. Defaults to 'hmmsearch'.

Returns:
  • dict[DataFrame]

    dict[pd.DataFrame]: dict of HMMER hits as pandas dataframes.

Source code in src/pynteny/hmm.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def get_HMMER_tables(
    self, reuse_hmmer_results: bool = True, method: str = "hmmsearch"
) -> dict[pd.DataFrame]:
    """Run hmmer for given hmm list

    Args:
        reuse_hmmer_results (bool, optional): if True then HMMER3 won't be run
            again for HMMs already searched in the same output directory. Defaults to True.
        method (str, optional): select between 'hmmsearch' or 'hmmscan'. Defaults to 'hmmsearch'.

    Returns:
        dict[pd.DataFrame]: dict of HMMER hits as pandas dataframes.
    """
    hmm_hits = {}
    for hmm_model, add_args in zip(self._input_hmms, self._additional_args):
        hmm_name = hmm_model.stem
        hmmer_output = Path(self._hmmer_output_dir) / f"hmmer_output_{hmm_name}.txt"

        if not (reuse_hmmer_results and os.path.isfile(hmmer_output)):
            wrappers.run_HMM_search(
                hmm_model=hmm_model,
                input_fasta=self._input_fasta,
                output_file=hmmer_output,
                method=method,
                processes=self._processes,
                additional_args=add_args,
            )
        elif reuse_hmmer_results and os.path.isfile(hmmer_output):
            logger.info(f"Reusing Hmmer results for HMM: {hmm_name}")
        hmm_hits[hmm_name] = HMMER.parse_HMM_search_output(hmmer_output)
    return hmm_hits

parse_HMM_search_output(hmmer_output) staticmethod

Parse hmmsearch or hmmscan summary table output file.

Parameters:
  • hmmer_output (str) –

    path to HMMER output file.

Returns:
  • DataFrame

    pd.DataFrame: a dataframe containing parsed HMMER output.

Source code in src/pynteny/hmm.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
@staticmethod
def parse_HMM_search_output(hmmer_output: Path) -> pd.DataFrame:
    """Parse hmmsearch or hmmscan summary table output file.

    Args:
        hmmer_output (str): path to HMMER output file.

    Returns:
        pd.DataFrame: a dataframe containing parsed HMMER output.
    """
    hmmer_output = Path(hmmer_output)
    attribs = ["id", "bias", "bitscore", "description"]
    hits = defaultdict(list)
    with open(hmmer_output, encoding="UTF-8") as handle:
        for queryresult in SearchIO.parse(handle, "hmmer3-tab"):
            for hit in queryresult.hits:
                for attrib in attribs:
                    hits[attrib].append(getattr(hit, attrib))
    return pd.DataFrame.from_dict(hits)

PFAM

Bases: HMMDatabase

Tools to preprocess the PFAM-A hmm database

Source code in src/pynteny/hmm.py
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
class PFAM(HMMDatabase):
    """Tools to preprocess the PFAM-A hmm database"""

    @classmethod
    def from_gz_file(
        cls, hmm_gz_file: Path, hmm_outdir: Path = None, meta_outfile: Path = None
    ) -> PFAM:
        """Initialize PFAM class from hmm file

        Args:
            hmm_gz_file (Path): path to hmm gz file.
            hmm_outdir (Path): path to output directory to store individual HMM files.
            meta_outfile (Path, optional): path to metadata output file. Defaults to None.
        """
        hmm_gz_file = Path(hmm_gz_file)
        if hmm_outdir is not None:
            hmm_outdir = Path(hmm_outdir)
        else:
            hmm_outdir = hmm_gz_file.parent / "pfam_hmms"
        if meta_outfile is not None:
            meta_outfile = Path(meta_outfile)
        else:
            meta_outfile = hmm_outdir / f"{hmm_gz_file.stem}_meta.tsv"
        with tempfile.NamedTemporaryFile() as temp:
            extract_gz_file(hmm_gz_file, temp.name)
            split_hmms(temp.name, hmm_outdir)
            cls.construct_meta_file(hmm_outdir, meta_outfile)
        return cls(meta_outfile)

    def construct_meta_file(
        self, hmm_database: Path, meta_outfile: Path = None
    ) -> None:
        """Construct metadata file from individual HMM files.

        Args:
            hmm_database (Path): path to HMM database.
            meta_outfile (Path): path to metadata file.
        """
        logger.info("Constructing metadata file for PFAM-A database")
        if meta_outfile is None:
            meta_outfile = hmm_database / "PFAM_meta.tsv"
        else:
            meta_outfile = Path(meta_outfile)
        hmm_meta_lines = ["accession\tgene_symbol\tdescription\tlength\n"]
        for hmm_file in hmm_database.glob("*.hmm"):
            with open(hmm_file, "r") as f:
                hmm_text = f.read()

            acc_code = [
                entry for entry in hmm_text.split("\n") if entry.startswith("ACC")
            ]
            name = [entry for entry in hmm_text.split("\n") if entry.startswith("NAME")]
            description = [
                entry for entry in hmm_text.split("\n") if entry.startswith("DESC")
            ]
            length = [
                entry for entry in hmm_text.split("\n") if entry.startswith("LENG")
            ]
            name = name[0].split()[1] if name else "Unspecified"
            description = description[0].split()[1] if description else "Unspecified"
            length = length[0].split()[1] if length else "Unspecified"

            if acc_code:
                acc_name = acc_code[0].split()[1]
                hmm_meta_lines.append(f"{acc_name}\t{name}\t{description}\t{length}\n")
        with open(meta_outfile, "w+") as f:
            f.writelines(hmm_meta_lines)
        self._metadata_file = meta_outfile
        self._meta = pd.read_csv(meta_outfile, sep="\t")

construct_meta_file(hmm_database, meta_outfile=None)

Construct metadata file from individual HMM files.

Parameters:
  • hmm_database (Path) –

    path to HMM database.

  • meta_outfile (Path, default: None ) –

    path to metadata file.

Source code in src/pynteny/hmm.py
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
def construct_meta_file(
    self, hmm_database: Path, meta_outfile: Path = None
) -> None:
    """Construct metadata file from individual HMM files.

    Args:
        hmm_database (Path): path to HMM database.
        meta_outfile (Path): path to metadata file.
    """
    logger.info("Constructing metadata file for PFAM-A database")
    if meta_outfile is None:
        meta_outfile = hmm_database / "PFAM_meta.tsv"
    else:
        meta_outfile = Path(meta_outfile)
    hmm_meta_lines = ["accession\tgene_symbol\tdescription\tlength\n"]
    for hmm_file in hmm_database.glob("*.hmm"):
        with open(hmm_file, "r") as f:
            hmm_text = f.read()

        acc_code = [
            entry for entry in hmm_text.split("\n") if entry.startswith("ACC")
        ]
        name = [entry for entry in hmm_text.split("\n") if entry.startswith("NAME")]
        description = [
            entry for entry in hmm_text.split("\n") if entry.startswith("DESC")
        ]
        length = [
            entry for entry in hmm_text.split("\n") if entry.startswith("LENG")
        ]
        name = name[0].split()[1] if name else "Unspecified"
        description = description[0].split()[1] if description else "Unspecified"
        length = length[0].split()[1] if length else "Unspecified"

        if acc_code:
            acc_name = acc_code[0].split()[1]
            hmm_meta_lines.append(f"{acc_name}\t{name}\t{description}\t{length}\n")
    with open(meta_outfile, "w+") as f:
        f.writelines(hmm_meta_lines)
    self._metadata_file = meta_outfile
    self._meta = pd.read_csv(meta_outfile, sep="\t")

from_gz_file(hmm_gz_file, hmm_outdir=None, meta_outfile=None) classmethod

Initialize PFAM class from hmm file

Parameters:
  • hmm_gz_file (Path) –

    path to hmm gz file.

  • hmm_outdir (Path, default: None ) –

    path to output directory to store individual HMM files.

  • meta_outfile (Path, default: None ) –

    path to metadata output file. Defaults to None.

Source code in src/pynteny/hmm.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
@classmethod
def from_gz_file(
    cls, hmm_gz_file: Path, hmm_outdir: Path = None, meta_outfile: Path = None
) -> PFAM:
    """Initialize PFAM class from hmm file

    Args:
        hmm_gz_file (Path): path to hmm gz file.
        hmm_outdir (Path): path to output directory to store individual HMM files.
        meta_outfile (Path, optional): path to metadata output file. Defaults to None.
    """
    hmm_gz_file = Path(hmm_gz_file)
    if hmm_outdir is not None:
        hmm_outdir = Path(hmm_outdir)
    else:
        hmm_outdir = hmm_gz_file.parent / "pfam_hmms"
    if meta_outfile is not None:
        meta_outfile = Path(meta_outfile)
    else:
        meta_outfile = hmm_outdir / f"{hmm_gz_file.stem}_meta.tsv"
    with tempfile.NamedTemporaryFile() as temp:
        extract_gz_file(hmm_gz_file, temp.name)
        split_hmms(temp.name, hmm_outdir)
        cls.construct_meta_file(hmm_outdir, meta_outfile)
    return cls(meta_outfile)

PGAP

Bases: HMMDatabase

Tools to parse PGAP hmm database metadata

Source code in src/pynteny/hmm.py
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
class PGAP(HMMDatabase):
    """Tools to parse PGAP hmm database metadata"""

    def __init__(self, *args, **kwargs):
        """Initialize class PGAP"""
        super().__init__(*args, **kwargs)

    def remove_missing_HMMs_from_metadata(
        self, hmm_database: Path, meta_outfile: Path = None
    ) -> None:
        """Remove HMMs from metadata that are not in HMM directory

        Args:
            hmm_database (Path): path to HMM database.
            hmm_dir (Path): path to directory containing PGAP database.
            meta_outfile (Path, optional): path to output file. Defaults to None.
        """
        logger.info("Removing missing HMMs from PGAP metadata")
        hmm_dir = hmm_database
        if meta_outfile is None:
            meta_outfile = (
                self._metadata_file.parent
                / f"{self._metadata_file.stem}_preprocessed.tsv"
            )
        else:
            meta_outfile = Path(meta_outfile)
        if is_tar_file(hmm_dir):
            hmm_file_names = [
                Path(hmm_file).stem.strip() for hmm_file in list_tar_dir(hmm_dir)
            ]
        else:
            hmm_file_names = [hmm_file.stem.strip() for hmm_file in hmm_dir.iterdir()]
        not_found = set()
        for i, row in self._meta.iterrows():
            if row["accession"].strip() not in hmm_file_names:
                not_found.add(i)
        self._meta = self._meta.drop(not_found)
        self._metadata_file = meta_outfile
        self._meta.to_csv(meta_outfile, sep="\t", index=False)

__init__(*args, **kwargs)

Initialize class PGAP

Source code in src/pynteny/hmm.py
238
239
240
def __init__(self, *args, **kwargs):
    """Initialize class PGAP"""
    super().__init__(*args, **kwargs)

remove_missing_HMMs_from_metadata(hmm_database, meta_outfile=None)

Remove HMMs from metadata that are not in HMM directory

Parameters:
  • hmm_database (Path) –

    path to HMM database.

  • hmm_dir (Path) –

    path to directory containing PGAP database.

  • meta_outfile (Path, default: None ) –

    path to output file. Defaults to None.

Source code in src/pynteny/hmm.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
def remove_missing_HMMs_from_metadata(
    self, hmm_database: Path, meta_outfile: Path = None
) -> None:
    """Remove HMMs from metadata that are not in HMM directory

    Args:
        hmm_database (Path): path to HMM database.
        hmm_dir (Path): path to directory containing PGAP database.
        meta_outfile (Path, optional): path to output file. Defaults to None.
    """
    logger.info("Removing missing HMMs from PGAP metadata")
    hmm_dir = hmm_database
    if meta_outfile is None:
        meta_outfile = (
            self._metadata_file.parent
            / f"{self._metadata_file.stem}_preprocessed.tsv"
        )
    else:
        meta_outfile = Path(meta_outfile)
    if is_tar_file(hmm_dir):
        hmm_file_names = [
            Path(hmm_file).stem.strip() for hmm_file in list_tar_dir(hmm_dir)
        ]
    else:
        hmm_file_names = [hmm_file.stem.strip() for hmm_file in hmm_dir.iterdir()]
    not_found = set()
    for i, row in self._meta.iterrows():
        if row["accession"].strip() not in hmm_file_names:
            not_found.add(i)
    self._meta = self._meta.drop(not_found)
    self._metadata_file = meta_outfile
    self._meta.to_csv(meta_outfile, sep="\t", index=False)

download_pfam(download_dir, unpack=False)

Download PFAM database

Parameters:
  • unpack (bool, default: False ) –

    if True then PFAM database will be extracted

Source code in src/pynteny/hmm.py
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
def download_pfam(download_dir: Path, unpack: bool = False) -> Path:
    """Download PFAM database

    Args:
        unpack (bool, optional): if True then PFAM database will be extracted
    """
    if download_dir.exists():
        logger.warning(
            f"{download_dir} already exists. Downloader may overwrite files."
        )
    PFAM_file = download_dir / "Pfam-A.gz"
    logger.info("Downloading PFAM-A hmm database")
    url = "https://ftp.ebi.ac.uk/pub/databases/Pfam/current_release/Pfam-A.hmm.gz"
    download_file(url, PFAM_file)
    if unpack:
        destination_path = download_dir / "pfam_hmms"
        extract_to_directory(PFAM_file, destination_dir=destination_path)
        return destination_path
    else:
        return PFAM_file

download_pgap(download_dir, unpack=False)

Download PGAP database

Parameters:
  • download_dir (Path) –

    path to output directory.

  • unpack (bool, default: False ) –

    if True then PGAP database will be extracted

Source code in src/pynteny/hmm.py
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
def download_pgap(download_dir: Path, unpack: bool = False) -> tuple[Path, Path]:
    """Download PGAP database

    Args:
        download_dir (Path): path to output directory.
        unpack (bool, optional): if True then PGAP database will be extracted
    """
    if download_dir.exists():
        logger.warning(
            f"{download_dir} already exists. Downloader may overwrite files."
        )

    data_url = "https://ftp.ncbi.nlm.nih.gov/hmm/current/hmm_PGAP.HMM.tgz"
    meta_url = "https://ftp.ncbi.nlm.nih.gov/hmm/current/hmm_PGAP.tsv"
    PGAP_file = download_dir / "hmm_PGAP.HMM.tgz"
    meta_file = download_dir / "hmm_PGAP.tsv"
    download_file(data_url, PGAP_file)
    download_file(meta_url, meta_file)
    if unpack:
        destination_path = download_dir / "pgap_hmms"
        extract_to_directory(PGAP_file, destination_dir=destination_path)
        return destination_path, meta_file
    else:
        return PGAP_file, meta_file

Module labelparser

Tools to parse record labels to extract coded info

parse(label)

Parse sequence labels to obtain contig and locus info

Parameters:
  • label (str) –

    sequence label

Returns:
  • dict( dict ) –

    dictionary with parsed label info.

Source code in src/pynteny/parsers/labelparser.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def parse(label: str) -> dict:
    """Parse sequence labels to obtain contig and locus info

    Args:
        label (str): sequence label

    Returns:
        dict: dictionary with parsed label info.
    """
    parsed_dict = {
        "full_label": label,
        "gene_id": "",
        "contig": "",
        "gene_pos": None,
        "locus_pos": None,
        "strand": "",
    }
    try:
        entry = label.split("__")[0]
        meta = label.split("__")[1]
        strand = meta.split("_")[-1]
        locus_pos = tuple([int(pos) for pos in meta.split("_")[-3:-1]])
        gene_pos = int(meta.split("_")[-4])
        contig = "_".join(meta.split("_")[:-4])

        parsed_dict["gene_id"] = entry
        parsed_dict["contig"] = contig
        parsed_dict["gene_pos"] = gene_pos
        parsed_dict["locus_pos"] = locus_pos
        parsed_dict["strand"] = strand
    except Exception:
        pass
    return parsed_dict

parse_from_list(labels)

Parse labels in list of labels and return DataFrame.

Parameters:
  • labels (list) –

    list of labels as stringgs.

Returns:
  • DataFrame

    pd.DataFrame: Dataframe containing parsed information from labels.

Source code in src/pynteny/parsers/labelparser.py
52
53
54
55
56
57
58
59
60
61
def parse_from_list(labels: list[str]) -> pd.DataFrame:
    """Parse labels in list of labels and return DataFrame.

    Args:
        labels (list, optional): list of labels as stringgs.

    Returns:
        pd.DataFrame: Dataframe containing parsed information from labels.
    """
    return pd.DataFrame([parse(label) for label in labels])

Module syntenyparser

Tools to parse synteny structure strings

contains_HMM_groups(synteny_structure)

Check whether structure contains groups of gene-equivalent HMMs.

Source code in src/pynteny/parsers/syntenyparser.py
26
27
28
def contains_HMM_groups(synteny_structure: str) -> bool:
    """Check whether structure contains groups of gene-equivalent HMMs."""
    return "|" in synteny_structure

get_HMM_groups_in_structure(synteny_structure)

Get hmm names employed in synteny structure, if more than one hmm for the same gene, return a list with all of them.

Source code in src/pynteny/parsers/syntenyparser.py
73
74
75
76
77
78
79
80
81
82
83
def get_HMM_groups_in_structure(synteny_structure: str) -> list[str]:
    """Get hmm names employed in synteny structure,
    if more than one hmm for the same gene, return
    a list with all of them.
    """
    links = synteny_structure.replace("(", "").replace(")", "").strip().split()
    if not links:
        logger.error("Invalid format for synteny structure")
        sys.exit(1)
    hmm_groups = [split_strand_from_locus(h)[1] for h in links if not h.isdigit()]
    return hmm_groups

get_all_HMMs_in_structure(synteny_structure)

Get hmm names employed in synteny structure, if more than one hmm for the same gene, return a list with all of them.

Source code in src/pynteny/parsers/syntenyparser.py
 96
 97
 98
 99
100
101
102
103
def get_all_HMMs_in_structure(synteny_structure: str) -> list[str]:
    """Get hmm names employed in synteny structure,
    if more than one hmm for the same gene, return
    a list with all of them.
    """
    hmm_groups = get_HMM_groups_in_structure(synteny_structure)
    hmm_names = [hmm for hmm_group in hmm_groups for hmm in hmm_group.split("|")]
    return hmm_names

get_gene_symbols_in_structure(synteny_structure)

Retrieve gene symbols contained in synteny structure.

Source code in src/pynteny/parsers/syntenyparser.py
86
87
88
89
90
91
92
93
def get_gene_symbols_in_structure(synteny_structure: str) -> list[str]:
    """Retrieve gene symbols contained in synteny structure."""
    links = synteny_structure.strip().split()
    if not links:
        logger.error("Invalid format for synteny structure")
        sys.exit(1)
    gene_symbols = [split_strand_from_locus(h)[1] for h in links if not h.isdigit()]
    return gene_symbols

get_maximum_distances_in_structure(synteny_structure)

Get maximum gene distances in synteny structure.

Source code in src/pynteny/parsers/syntenyparser.py
129
130
131
132
133
134
135
def get_maximum_distances_in_structure(synteny_structure: str) -> list[int]:
    """Get maximum gene distances in synteny structure."""
    links = synteny_structure.strip().split()
    if not links:
        logger.error("Invalid format for synteny structure")
        sys.exit(1)
    return [int(dist) for dist in links if dist.isdigit()]

get_strands_in_structure(synteny_structure, parsed_symbol=True)

Get strand sense list in structure.

Parameters:
  • synteny_structure (str) –

    a synteny structure.

  • parsed_symbol (bool, default: True ) –

    if True, strand info '>' is parsed as 'pos' and '<' as 'neg'. Defaults to True.

Returns:
  • list[str]

    list[str]: parsed synteny structure as a list of tuples containing HMM name and strand info for each HMM group.

Source code in src/pynteny/parsers/syntenyparser.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def get_strands_in_structure(
    synteny_structure: str, parsed_symbol: bool = True
) -> list[str]:
    """Get strand sense list in structure.

    Args:
        synteny_structure (str): a synteny structure.
        parsed_symbol (bool, optional): if True, strand info '>' is parsed
            as 'pos' and '<' as 'neg'. Defaults to True.

    Returns:
        list[str]: parsed synteny structure as a list of tuples containing
            HMM name and strand info for each HMM group.
    """
    links = synteny_structure.strip().split()
    if not links:
        logger.error("Invalid format for synteny structure")
        sys.exit(1)
    return [
        split_strand_from_locus(h, parsed_symbol)[0] for h in links if not h.isdigit()
    ]

is_valid_structure(synteny_structure)

Validate synteny structure format.

Source code in src/pynteny/parsers/syntenyparser.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def is_valid_structure(synteny_structure: str) -> bool:
    """Validate synteny structure format."""
    synteny_structure = synteny_structure.replace(" |", "|").replace("| ", "|").strip()
    parsed_struc = parse_synteny_structure(synteny_structure)
    right_type = all(
        (
            is_right_list_nested_type(parsed_struc["hmm_groups"], str),
            is_right_list_nested_type(parsed_struc["strands"], str),
            is_right_list_nested_type(parsed_struc["distances"], int),
        )
    )
    right_format = len(parsed_struc["hmm_groups"]) == len(
        parsed_struc["strands"]
    ) and len(parsed_struc["hmm_groups"]) == (len(parsed_struc["distances"]) + 1)
    return False if (not right_type or not right_format) else True

parse_genes_in_synteny_structure(synteny_structure, hmm_meta)

Convert gene-based synteny structure into a HMM-based one. If a gene symbol matches more than one HMM, return a HMM group like: (HMM1 | HMM2 | ...).

Parameters:
  • synteny_structure (str) –

    a string like the following:

    hmm_a n_ab <hmm_b n_bc hmm_c

    where '>' indicates a hmm target located on the positive strand, '<' a target located on the negative strand, and n_ab cooresponds to the maximum number of genes separating matched gene a and b. Multiple hmms may be employed (limited by computational capabilities). No order symbol in a hmm indicates that results should be independent of strand location.

  • hmm_meta (Path) –

    path to PGAP's metadata file.

Returns:
  • tuple[str, dict]

    tuple[str,dict]: parsed synteny structure where gene symbols are replaced by HMM names.

Source code in src/pynteny/parsers/syntenyparser.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def parse_genes_in_synteny_structure(
    synteny_structure: str, hmm_meta: Path
) -> tuple[str, dict]:
    """Convert gene-based synteny structure into a HMM-based one.
    If a gene symbol matches more than one HMM, return a HMM group
    like: (HMM1 | HMM2 | ...).

    Args:
        synteny_structure (str): a string like the following:
            >hmm_a n_ab <hmm_b n_bc hmm_c

            where '>' indicates a hmm target located on the positive strand,
            '<' a target located on the negative strand, and n_ab cooresponds
            to the maximum number of genes separating matched gene a and b.
            Multiple hmms may be employed (limited by computational capabilities).
            No order symbol in a hmm indicates that results should be independent
            of strand location.
        hmm_meta (Path): path to PGAP's metadata file.

    Returns:
        tuple[str,dict]: parsed synteny structure where gene symbols are replaced
            by HMM names.
    """
    pgap = PGAP(hmm_meta)
    gene_symbols = get_gene_symbols_in_structure(synteny_structure)
    strand_locs = get_strands_in_structure(synteny_structure, parsed_symbol=False)
    gene_dists = get_maximum_distances_in_structure(synteny_structure)
    hmm_groups = {
        gene_symbol: pgap.get_HMM_group_for_gene_symbol(gene_symbol)
        for gene_symbol in gene_symbols
    }
    unmatched_genes = [
        gene_id for gene_id, hmm_group in hmm_groups.items() if not hmm_group
    ]
    if unmatched_genes:
        logger.error(
            f"These genes did not get a HMM match in database: {unmatched_genes}"
        )
        sys.exit(1)

    hmm_synteny_struc = ""

    for strand, dist, hmm_group in zip(
        strand_locs, [""] + gene_dists, hmm_groups.values()
    ):
        if "|" in hmm_group:
            hmm_group = f"({hmm_group})"
        hmm_synteny_struc += f"{dist} {strand}{hmm_group} "

    return hmm_synteny_struc.strip(), hmm_groups

parse_synteny_structure(synteny_structure)

Parse synteny structure string.

Parameters:
  • synteny_structure (str) –

    a string like the following:

    hmm_a n_ab <hmm_b n_bc hmm_c

    where '>' indicates a hmm target located on the positive strand, '<' a target located on the negative strand, and n_ab cooresponds to the maximum number of genes separating matched gene a and b. Multiple hmms may be employed (limited by computational capabilities). No order symbol in a hmm indicates that results should be independent of strand location.

Returns:
  • dict( dict ) –

    parsed synteny structure.

Source code in src/pynteny/parsers/syntenyparser.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def parse_synteny_structure(synteny_structure: str) -> dict:
    """Parse synteny structure string.

    Args:
        synteny_structure (str): a string like the following:
            >hmm_a n_ab <hmm_b n_bc hmm_c

            where '>' indicates a hmm target located on the positive strand,
            '<' a target located on the negative strand, and n_ab cooresponds
            to the maximum number of genes separating matched gene a and b.
            Multiple hmms may be employed (limited by computational capabilities).
            No order symbol in a hmm indicates that results should be independent
            of strand location.

    Returns:
        dict: parsed synteny structure.
    """
    max_dists = get_maximum_distances_in_structure(synteny_structure)
    hmm_groups = get_HMM_groups_in_structure(synteny_structure)
    strands = get_strands_in_structure(synteny_structure)
    return {"hmm_groups": hmm_groups, "strands": strands, "distances": max_dists}

reformat_synteny_structure(synteny_structure)

Remove illegal symbols and extra white space.

Source code in src/pynteny/parsers/syntenyparser.py
20
21
22
23
def reformat_synteny_structure(synteny_structure: str) -> str:
    """Remove illegal symbols and extra white space."""
    synteny_structure = synteny_structure.replace(" |", "|").replace("| ", "|").strip()
    return synteny_structure

split_strand_from_locus(locus_str, parsed_symbol=True)

Split strand info from locus tag / HMM model.

Parameters:
  • locus_str (str) –

    a substring of a synteny structure containing a gene symbol / HMM name and strand info.

  • parsed_symbol (bool, default: True ) –

    if True, strand info '>' is parsed as 'pos' and '<' as 'neg'. Defaults to True.

Returns:
  • tuple[str]

    tuple[str]: tuple with parsed strand info and gene symbol / HMM name.

Source code in src/pynteny/parsers/syntenyparser.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def split_strand_from_locus(locus_str: str, parsed_symbol: bool = True) -> tuple[str]:
    """Split strand info from locus tag / HMM model.

    Args:
        locus_str (str): a substring of a synteny structure containing
            a gene symbol / HMM name and strand info.
        parsed_symbol (bool, optional): if True, strand info '>' is parsed
            as 'pos' and '<' as 'neg'. Defaults to True.

    Returns:
        tuple[str]: tuple with parsed strand info and gene symbol / HMM name.
    """
    locus_str = locus_str.strip()
    if locus_str[0] == "<" or locus_str[0] == ">":
        sense = locus_str[0]
        locus_str = locus_str[1:]
        if parsed_symbol:
            strand = "pos" if sense == ">" else "neg"
        else:
            strand = sense
    else:
        strand = ""
    return (strand, locus_str)

Module cli

Pynteny

Main command based on: https://selvakumar-arumugapandian.medium.com/ \ command-line-subcommands-with-pythons-argparse-4dbac80f7110

Source code in src/pynteny/cli.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class Pynteny:
    """Main command based on:
    https://selvakumar-arumugapandian.medium.com/  \\
    command-line-subcommands-with-pythons-argparse-4dbac80f7110
    """

    def __init__(self, subcommand: str, subcommand_args: list[str]):
        """Initialize main command

        Args:
            subcommand (str): subcommand name
            subcommand_args (list[str]): list of subcommand arguments and values.
        """
        self._subcommand = subcommand
        if "--hmmsearch_args" in subcommand_args:
            hmm_arg_idx = subcommand_args.index("--hmmsearch_args") + 1
            subcommand_args[hmm_arg_idx] = " " + subcommand_args[hmm_arg_idx]
        self._subcommand_args = subcommand_args
        parser = argparse.ArgumentParser(
            description=(self._printLogo()),
            usage=("pynteny <subcommand> [-h] [args] \n"),
            epilog=(self._generate_cool_quotes()),
            formatter_class=argparse.RawTextHelpFormatter,
        )
        parser._positionals.title = "subcommands"
        parser.add_argument(
            help=("search \n" "build \n" "parse \n" "download \n" "cite \n"),
            dest="subcommand",
            metavar="",
        )
        parser.add_argument(
            "-v",
            "--version",
            help="show version and exit",
            action="version",
            version=__version__,
        )
        if len(sys.argv) < 2:
            parser.print_help()
            sys.exit(1)
        args = parser.parse_args(self._subcommand)
        input_subcommand = getattr(args, "subcommand")
        ConfigParser.initialize_config_file()
        self._call_subcommand(subcommand_name=input_subcommand)

    def _printLogo(self):
        print(
            (
                """
    ____              __
   / __ \__  ______  / /____  ____  __  __
  / /_/ / / / / __ \/ __/ _ \/ __ \/ / / /
 / ____/ /_/ / / / / /_/  __/ / / / /_/ /
/_/    \__, /_/ /_/\__/\___/_/ /_/\__, /
      /____/                     /____/

"""
                f"Synteny-based Hmmer searches made easy, v{__version__}\n"
                "Semidán Robaina Estévez (srobaina@ull.edu.es), 2022\n"
                " \n"
            )
        )

    def _generate_cool_quotes(self):
        quotes = [
            "May the force be with you (Yoda)",
            "This looks like a job for a computer (AI)",
            "This is such a wonderful day to do bioinformatics (SR)",
            "One does not simply walk into Mordor (J.R.R. Tolkien)",
            "Damn, looks like a rainy day, let's do bioiformatics! (SR)",
        ]
        return f"{random.choice(quotes)}\n"

    def _call_subcommand(self, subcommand_name: str) -> None:
        subcommand = getattr(self, subcommand_name)
        subcommand()

    def search(self):
        """Call search subcommand."""
        parser = SubcommandParser.search()
        args = parser.parse_args(self._subcommand_args)
        sub.synteny_search(args)

    def build(self):
        """Call build subcommand."""
        parser = SubcommandParser.build()
        args = parser.parse_args(self._subcommand_args)
        sub.build_database(args)

    def parse(self):
        """Call parse subcommand."""
        parser = SubcommandParser.parse()
        args = parser.parse_args(self._subcommand_args)
        sub.parse_gene_ids(args)

    def download(self):
        """Call download subcommand."""
        parser = SubcommandParser.download()
        args = parser.parse_args(self._subcommand_args)
        sub.download_hmms(args)

    def cite(self):
        """Print pynteny's citation string"""
        parser = SubcommandParser.cite()
        args = parser.parse_args(self._subcommand_args)
        args.version = __version__
        sub.get_citation(args)

__init__(subcommand, subcommand_args)

Initialize main command

Parameters:
  • subcommand (str) –

    subcommand name

  • subcommand_args (list[str]) –

    list of subcommand arguments and values.

Source code in src/pynteny/cli.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def __init__(self, subcommand: str, subcommand_args: list[str]):
    """Initialize main command

    Args:
        subcommand (str): subcommand name
        subcommand_args (list[str]): list of subcommand arguments and values.
    """
    self._subcommand = subcommand
    if "--hmmsearch_args" in subcommand_args:
        hmm_arg_idx = subcommand_args.index("--hmmsearch_args") + 1
        subcommand_args[hmm_arg_idx] = " " + subcommand_args[hmm_arg_idx]
    self._subcommand_args = subcommand_args
    parser = argparse.ArgumentParser(
        description=(self._printLogo()),
        usage=("pynteny <subcommand> [-h] [args] \n"),
        epilog=(self._generate_cool_quotes()),
        formatter_class=argparse.RawTextHelpFormatter,
    )
    parser._positionals.title = "subcommands"
    parser.add_argument(
        help=("search \n" "build \n" "parse \n" "download \n" "cite \n"),
        dest="subcommand",
        metavar="",
    )
    parser.add_argument(
        "-v",
        "--version",
        help="show version and exit",
        action="version",
        version=__version__,
    )
    if len(sys.argv) < 2:
        parser.print_help()
        sys.exit(1)
    args = parser.parse_args(self._subcommand)
    input_subcommand = getattr(args, "subcommand")
    ConfigParser.initialize_config_file()
    self._call_subcommand(subcommand_name=input_subcommand)

build()

Call build subcommand.

Source code in src/pynteny/cli.py
104
105
106
107
108
def build(self):
    """Call build subcommand."""
    parser = SubcommandParser.build()
    args = parser.parse_args(self._subcommand_args)
    sub.build_database(args)

cite()

Print pynteny's citation string

Source code in src/pynteny/cli.py
122
123
124
125
126
127
def cite(self):
    """Print pynteny's citation string"""
    parser = SubcommandParser.cite()
    args = parser.parse_args(self._subcommand_args)
    args.version = __version__
    sub.get_citation(args)

download()

Call download subcommand.

Source code in src/pynteny/cli.py
116
117
118
119
120
def download(self):
    """Call download subcommand."""
    parser = SubcommandParser.download()
    args = parser.parse_args(self._subcommand_args)
    sub.download_hmms(args)

parse()

Call parse subcommand.

Source code in src/pynteny/cli.py
110
111
112
113
114
def parse(self):
    """Call parse subcommand."""
    parser = SubcommandParser.parse()
    args = parser.parse_args(self._subcommand_args)
    sub.parse_gene_ids(args)

search()

Call search subcommand.

Source code in src/pynteny/cli.py
 98
 99
100
101
102
def search(self):
    """Call search subcommand."""
    parser = SubcommandParser.search()
    args = parser.parse_args(self._subcommand_args)
    sub.synteny_search(args)

SubcommandParser

Argparse parsers for Pynteny's subcommands

Source code in src/pynteny/cli.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
class SubcommandParser:
    """Argparse parsers for Pynteny's subcommands"""

    @staticmethod
    def get_help_str(subcommand: str) -> str:
        """Get help string for subcommand.

        Args:
            subcommand (str): subcommand name.

        Returns:
            str: help string.
        """
        parser = getattr(SubcommandParser, subcommand)()
        with tempfile.NamedTemporaryFile(mode="w+") as file:
            parser.print_help(file)
            file.flush()
            with open(file.name, encoding="UTF-8") as help_file:
                help_str = help_file.read()
        return help_str

    @staticmethod
    def search() -> argparse.ArgumentParser:
        """Parser for the search subcommand.

        Returns:
            argparse.ArgumentParser: ArgumentParser object.
        """
        parser = argparse.ArgumentParser(
            description=(
                "Query sequence database for HMM hits arranged in provided synteny structure."
            ),
            usage=("pynteny search [-h] [args] \n"),
            epilog="  \n",
            formatter_class=argparse.RawTextHelpFormatter,
        )

        optional = parser._action_groups.pop()
        required = parser.add_argument_group("required arguments")
        parser._action_groups.append(optional)

        required.add_argument(
            "-s",
            "--synteny_struc",
            metavar="",
            dest="synteny_struc",
            type=str,
            required=True,
            help=(
                "string displaying hmm structure to search for, such as: \n"
                " \n"
                "'>hmm_a n_ab <hmm_b n_bc hmm_c'\n"
                " \n"
                "where '>' indicates a hmm target located on the positive strand, \n"
                "'<' a target located on the negative strand, and n_ab cooresponds \n"
                "to the maximum number of genes separating matched genes a and b. \n"
                "Multiple hmms may be employed. \n"
                "No order symbol in a hmm indicates that results should be independent \n"
                "of strand location. "
            ),
        )
        required.add_argument(
            "-i",
            "--data",
            dest="data",
            metavar="",
            type=Path,
            required=True,
            help=(
                "path to fasta file containing peptide database. \n"
                "Record labels must follow the format specified in docs \n"
                "(see section: General Usage). Pynteny build subcommand exports \n"
                "the generated database in the correct format"
            ),
        )
        optional.add_argument(
            "-d",
            "--hmm_dir",
            dest="hmm_dir",
            type=Path,
            metavar="",
            required=False,
            default=None,
            help=(
                "path to directory containing hmm (i.e, tigrfam or pfam) models. \n"
                "IMPORTANT: the directory must contain one hmm per file, and the file \n"
                "name must coincide with the hmm name that will be displayed in the synteny structure. \n"
                "The directory can contain more hmm models than used in the synteny structure. \n"
                "It may also be the path to a compressed (tar, tar.gz, tgz) directory. \n"
                "If not provided, hmm models (PGAP database) will be downloaded from the NCBI. \n"
                "(if not already downloaded)"
            ),
        )
        optional.add_argument(
            "-o",
            "--outdir",
            dest="outdir",
            type=Path,
            metavar="",
            help="path to output directory",
            default=None,
        )
        optional.add_argument(
            "-x",
            "--prefix",
            dest="prefix",
            type=str,
            metavar="",
            default="",
            help="prefix to be added to output files",
        )
        optional.add_argument(
            "-p",
            "--processes",
            dest="processes",
            type=int,
            metavar="",
            default=None,
            help="maximum number of processes available to HMMER. Defaults to all but one.",
        )
        optional.add_argument(
            "-a",
            "--hmmsearch_args",
            dest="hmmsearch_args",
            type=str,
            metavar="",
            default=None,
            required=False,
            help=(
                "list of comma-separated additional arguments to hmmsearch for each input hmm. \n"
                "A single argument may be provided, in which case the same additional argument \n"
                "is employed in all hmms."
            ),
        )
        optional.add_argument(
            "-g",
            "--gene_ids",
            dest="gene_ids",
            default=False,
            action="store_true",
            help=(
                "use gene symbols in synteny structure instead of HMM names. \n"
                "If set, a path to the hmm database metadata file must be provided \n"
                "in argument '--hmm_meta'"
            ),
        )
        optional.add_argument(
            "-u",
            "--unordered",
            dest="unordered",
            default=False,
            action="store_true",
            help=(
                "whether the HMMs should be arranged in the exact same order displayed \n"
                "in the synteny_structure or in  any order. If ordered, the filters will \n"
                "filter collinear rather than syntenic structures. \n"
                "If more than two HMMs are employed, the largest maximum distance among any \n"
                "pair is considered to run the search."
            ),
        )
        optional.add_argument(
            "-b",
            "--best_hmm_wins",
            dest="best_hmm_wins",
            default=False,
            action="store_true",
            help=(
                "if the same peptide is hit by more than one HMM (e.g. paralogous \n"
                "models that cross-hit), keep only the highest-scoring HMM for that \n"
                "peptide. Use this when your HMM panel contains close paralogs \n"
                "(e.g. nitrogenase nifD/nifK, nifE/nifN) to avoid silently dropping \n"
                "genuine syntenic blocks. Disabled by default."
            ),
        )
        optional.add_argument(
            "-r",
            "--reuse",
            dest="reuse",
            default=False,
            action="store_true",
            help=(
                "reuse hmmsearch result table in following synteny searches. \n"
                "Do not delete hmmer_outputs subdirectory for this option to work."
            ),
        )
        optional.add_argument(
            "-m",
            "--hmm_meta",
            dest="hmm_meta",
            type=Path,
            default=None,
            metavar="",
            required=False,
            help="path to hmm database metadata file",
        )
        optional.add_argument(
            "-l",
            "--log",
            dest="logfile",
            type=Path,
            default=None,
            metavar="",
            required=False,
            help="path to log file. Log not written by default.",
        )
        return parser

    @staticmethod
    def build() -> argparse.ArgumentParser:
        """Parser for the build subcommand.

        Returns:
            argparse.ArgumentParser: ArgumentParser object.
        """
        parser = argparse.ArgumentParser(
            description=(
                "Translate nucleotide assembly file and assign contig and gene location info \n"
                "to each identified ORF (using prodigal). Label predicted ORFs according to \n"
                "positional info and export a fasta file containing predicted and translated ORFs. \n"
                "Alternatively, extract peptide sequences from GenBank file containing ORF annotations \n"
                "and write labelled peptide sequences to a fasta file."
            ),
            usage=("pynteny build [-h] [args] \n"),
            epilog="  \n",
            formatter_class=argparse.RawTextHelpFormatter,
        )

        optional = parser._action_groups.pop()
        required = parser.add_argument_group("required arguments")
        parser._action_groups.append(optional)

        required.add_argument(
            "-i",
            "--data",
            dest="data",
            type=Path,
            required=True,
            metavar="",
            help=(
                "path to assembly input nucleotide data or annotated GenBank file. \n"
                "It can be a single file or a directory of files (either of FASTA or GeneBank format). \n"
                "If a directory, file name can be prepended to the label of each translated peptide \n"
                "originally coming from that file by default (i.e., to track the genome of origin) \n"
                "with the flag '--prepend-filename.'"
            ),
        )
        optional.add_argument(
            "-x",
            "--prepend-filename",
            dest="prepend",
            default=False,
            action="store_true",
            help=(
                "whether to prepend file name to peptide sequences when a directory \n"
                "with multiple fasta or genbank files is used as input."
            ),
        )
        optional.add_argument(
            "-o",
            "--outfile",
            dest="outfile",
            type=Path,
            metavar="",
            default=None,
            help=(
                "path to output (labelled peptide database) file. Defaults to \n"
                "file in directory of input data."
            ),
        )
        optional.add_argument(
            "-p",
            "--processes",
            dest="processes",
            type=int,
            metavar="",
            required=False,
            default=None,
            help=("set maximum number of processes. Defaults to all but one."),
        )
        optional.add_argument(
            "-t",
            "--tempdir",
            dest="tempdir",
            type=Path,
            default=None,
            metavar="",
            required=False,
            help="path to directory to store temporary files. Defaults to system temp directory.",
        )
        optional.add_argument(
            "-l",
            "--log",
            dest="logfile",
            type=Path,
            default=None,
            metavar="",
            required=False,
            help="path to log file. Log not written by default.",
        )
        return parser

    @staticmethod
    def parse() -> argparse.ArgumentParser:
        """Parser for the parse subcommand.

        Returns:
            argparse.ArgumentParser: ArgumentParser object.
        """
        parser = argparse.ArgumentParser(
            description=(
                "Translate synteny structure with gene symbols into one with\n"
                "HMM groups, according to provided HMM database."
            ),
            usage=("pynteny parse [-h] [args] \n"),
            epilog="  \n",
            formatter_class=argparse.RawTextHelpFormatter,
        )

        optional = parser._action_groups.pop()
        required = parser.add_argument_group("required arguments")
        parser._action_groups.append(optional)

        required.add_argument(
            "-s",
            "--synteny_struc",
            dest="synteny_struc",
            type=str,
            required=True,
            metavar="",
            help=("synteny structure containing gene symbols instead of HMMs"),
        )
        optional.add_argument(
            "-m",
            "--hmm_meta",
            dest="hmm_meta",
            type=Path,
            metavar="",
            required=False,
            help=(
                "path to hmm database metadata file. If already donwloaded with \n"
                "pynteny downloaded, hmm meta file is retrieved from default location."
            ),
        )
        optional.add_argument(
            "-l",
            "--log",
            dest="logfile",
            type=Path,
            default=None,
            metavar="",
            required=False,
            help="path to log file. Log not written by default.",
        )
        return parser

    @staticmethod
    def download() -> argparse.ArgumentParser:
        """Parser for the download subcommand.

        Returns:
            argparse.ArgumentParser: ArgumentParser object.
        """
        parser = argparse.ArgumentParser(
            description=("Download HMM database from NCBI."),
            epilog="  \n",
            usage=("pynteny download [-h] [args] \n"),
            formatter_class=argparse.RawTextHelpFormatter,
        )

        optional = parser._action_groups.pop()
        required = parser.add_argument_group("required arguments")
        parser._action_groups.append(optional)

        required.add_argument(
            "-o",
            "--outdir",
            dest="outdir",
            type=Path,
            metavar="",
            required=False,
            default=None,
            help=(
                "path to directory where to download HMM database. \n"
                "Note, if database is relocated after download, Pynteny won't be able to find it automatically, \n"
                "You can either: delete and download again in the new location or manually indicate database and \n"
                "meta file location in Pynteny search."
            ),
        )
        optional.add_argument(
            "-u",
            "--unpack",
            dest="unpack",
            default=False,
            action="store_true",
            help="unpack originally compressed database files",
        )
        optional.add_argument(
            "-f",
            "--force",
            dest="force",
            default=False,
            action="store_true",
            help="force-download database again if already downloaded",
        )
        optional.add_argument(
            "-pgap",
            "--pgap",
            dest="pgap",
            default=False,
            action="store_true",
            help="download PGAP database (default)",
        )
        optional.add_argument(
            "-pfam",
            "--pfam",
            dest="pfam",
            default=False,
            action="store_true",
            help="download PFAM database",
        )
        optional.add_argument(
            "-l",
            "--log",
            dest="logfile",
            type=Path,
            default=None,
            metavar="",
            required=False,
            help="path to log file. Log not written by default.",
        )
        return parser

    @staticmethod
    def cite() -> argparse.ArgumentParser:
        """Parser for the cite subcommand.

        Returns:
            argparse.ArgumentParser: ArgumentParser object.
        """
        parser = argparse.ArgumentParser(
            description=("Print pynteny's citation string."),
            epilog="  \n",
            usage=("pynteny cite [-h] \n"),
            formatter_class=argparse.RawTextHelpFormatter,
        )
        optional = parser._action_groups.pop()
        parser.add_argument_group("required arguments")
        parser._action_groups.append(optional)
        return parser

build() staticmethod

Parser for the build subcommand.

Returns:
  • ArgumentParser

    argparse.ArgumentParser: ArgumentParser object.

Source code in src/pynteny/cli.py
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
@staticmethod
def build() -> argparse.ArgumentParser:
    """Parser for the build subcommand.

    Returns:
        argparse.ArgumentParser: ArgumentParser object.
    """
    parser = argparse.ArgumentParser(
        description=(
            "Translate nucleotide assembly file and assign contig and gene location info \n"
            "to each identified ORF (using prodigal). Label predicted ORFs according to \n"
            "positional info and export a fasta file containing predicted and translated ORFs. \n"
            "Alternatively, extract peptide sequences from GenBank file containing ORF annotations \n"
            "and write labelled peptide sequences to a fasta file."
        ),
        usage=("pynteny build [-h] [args] \n"),
        epilog="  \n",
        formatter_class=argparse.RawTextHelpFormatter,
    )

    optional = parser._action_groups.pop()
    required = parser.add_argument_group("required arguments")
    parser._action_groups.append(optional)

    required.add_argument(
        "-i",
        "--data",
        dest="data",
        type=Path,
        required=True,
        metavar="",
        help=(
            "path to assembly input nucleotide data or annotated GenBank file. \n"
            "It can be a single file or a directory of files (either of FASTA or GeneBank format). \n"
            "If a directory, file name can be prepended to the label of each translated peptide \n"
            "originally coming from that file by default (i.e., to track the genome of origin) \n"
            "with the flag '--prepend-filename.'"
        ),
    )
    optional.add_argument(
        "-x",
        "--prepend-filename",
        dest="prepend",
        default=False,
        action="store_true",
        help=(
            "whether to prepend file name to peptide sequences when a directory \n"
            "with multiple fasta or genbank files is used as input."
        ),
    )
    optional.add_argument(
        "-o",
        "--outfile",
        dest="outfile",
        type=Path,
        metavar="",
        default=None,
        help=(
            "path to output (labelled peptide database) file. Defaults to \n"
            "file in directory of input data."
        ),
    )
    optional.add_argument(
        "-p",
        "--processes",
        dest="processes",
        type=int,
        metavar="",
        required=False,
        default=None,
        help=("set maximum number of processes. Defaults to all but one."),
    )
    optional.add_argument(
        "-t",
        "--tempdir",
        dest="tempdir",
        type=Path,
        default=None,
        metavar="",
        required=False,
        help="path to directory to store temporary files. Defaults to system temp directory.",
    )
    optional.add_argument(
        "-l",
        "--log",
        dest="logfile",
        type=Path,
        default=None,
        metavar="",
        required=False,
        help="path to log file. Log not written by default.",
    )
    return parser

cite() staticmethod

Parser for the cite subcommand.

Returns:
  • ArgumentParser

    argparse.ArgumentParser: ArgumentParser object.

Source code in src/pynteny/cli.py
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
@staticmethod
def cite() -> argparse.ArgumentParser:
    """Parser for the cite subcommand.

    Returns:
        argparse.ArgumentParser: ArgumentParser object.
    """
    parser = argparse.ArgumentParser(
        description=("Print pynteny's citation string."),
        epilog="  \n",
        usage=("pynteny cite [-h] \n"),
        formatter_class=argparse.RawTextHelpFormatter,
    )
    optional = parser._action_groups.pop()
    parser.add_argument_group("required arguments")
    parser._action_groups.append(optional)
    return parser

download() staticmethod

Parser for the download subcommand.

Returns:
  • ArgumentParser

    argparse.ArgumentParser: ArgumentParser object.

Source code in src/pynteny/cli.py
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
@staticmethod
def download() -> argparse.ArgumentParser:
    """Parser for the download subcommand.

    Returns:
        argparse.ArgumentParser: ArgumentParser object.
    """
    parser = argparse.ArgumentParser(
        description=("Download HMM database from NCBI."),
        epilog="  \n",
        usage=("pynteny download [-h] [args] \n"),
        formatter_class=argparse.RawTextHelpFormatter,
    )

    optional = parser._action_groups.pop()
    required = parser.add_argument_group("required arguments")
    parser._action_groups.append(optional)

    required.add_argument(
        "-o",
        "--outdir",
        dest="outdir",
        type=Path,
        metavar="",
        required=False,
        default=None,
        help=(
            "path to directory where to download HMM database. \n"
            "Note, if database is relocated after download, Pynteny won't be able to find it automatically, \n"
            "You can either: delete and download again in the new location or manually indicate database and \n"
            "meta file location in Pynteny search."
        ),
    )
    optional.add_argument(
        "-u",
        "--unpack",
        dest="unpack",
        default=False,
        action="store_true",
        help="unpack originally compressed database files",
    )
    optional.add_argument(
        "-f",
        "--force",
        dest="force",
        default=False,
        action="store_true",
        help="force-download database again if already downloaded",
    )
    optional.add_argument(
        "-pgap",
        "--pgap",
        dest="pgap",
        default=False,
        action="store_true",
        help="download PGAP database (default)",
    )
    optional.add_argument(
        "-pfam",
        "--pfam",
        dest="pfam",
        default=False,
        action="store_true",
        help="download PFAM database",
    )
    optional.add_argument(
        "-l",
        "--log",
        dest="logfile",
        type=Path,
        default=None,
        metavar="",
        required=False,
        help="path to log file. Log not written by default.",
    )
    return parser

get_help_str(subcommand) staticmethod

Get help string for subcommand.

Parameters:
  • subcommand (str) –

    subcommand name.

Returns:
  • str( str ) –

    help string.

Source code in src/pynteny/cli.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
@staticmethod
def get_help_str(subcommand: str) -> str:
    """Get help string for subcommand.

    Args:
        subcommand (str): subcommand name.

    Returns:
        str: help string.
    """
    parser = getattr(SubcommandParser, subcommand)()
    with tempfile.NamedTemporaryFile(mode="w+") as file:
        parser.print_help(file)
        file.flush()
        with open(file.name, encoding="UTF-8") as help_file:
            help_str = help_file.read()
    return help_str

parse() staticmethod

Parser for the parse subcommand.

Returns:
  • ArgumentParser

    argparse.ArgumentParser: ArgumentParser object.

Source code in src/pynteny/cli.py
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
@staticmethod
def parse() -> argparse.ArgumentParser:
    """Parser for the parse subcommand.

    Returns:
        argparse.ArgumentParser: ArgumentParser object.
    """
    parser = argparse.ArgumentParser(
        description=(
            "Translate synteny structure with gene symbols into one with\n"
            "HMM groups, according to provided HMM database."
        ),
        usage=("pynteny parse [-h] [args] \n"),
        epilog="  \n",
        formatter_class=argparse.RawTextHelpFormatter,
    )

    optional = parser._action_groups.pop()
    required = parser.add_argument_group("required arguments")
    parser._action_groups.append(optional)

    required.add_argument(
        "-s",
        "--synteny_struc",
        dest="synteny_struc",
        type=str,
        required=True,
        metavar="",
        help=("synteny structure containing gene symbols instead of HMMs"),
    )
    optional.add_argument(
        "-m",
        "--hmm_meta",
        dest="hmm_meta",
        type=Path,
        metavar="",
        required=False,
        help=(
            "path to hmm database metadata file. If already donwloaded with \n"
            "pynteny downloaded, hmm meta file is retrieved from default location."
        ),
    )
    optional.add_argument(
        "-l",
        "--log",
        dest="logfile",
        type=Path,
        default=None,
        metavar="",
        required=False,
        help="path to log file. Log not written by default.",
    )
    return parser

search() staticmethod

Parser for the search subcommand.

Returns:
  • ArgumentParser

    argparse.ArgumentParser: ArgumentParser object.

Source code in src/pynteny/cli.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
@staticmethod
def search() -> argparse.ArgumentParser:
    """Parser for the search subcommand.

    Returns:
        argparse.ArgumentParser: ArgumentParser object.
    """
    parser = argparse.ArgumentParser(
        description=(
            "Query sequence database for HMM hits arranged in provided synteny structure."
        ),
        usage=("pynteny search [-h] [args] \n"),
        epilog="  \n",
        formatter_class=argparse.RawTextHelpFormatter,
    )

    optional = parser._action_groups.pop()
    required = parser.add_argument_group("required arguments")
    parser._action_groups.append(optional)

    required.add_argument(
        "-s",
        "--synteny_struc",
        metavar="",
        dest="synteny_struc",
        type=str,
        required=True,
        help=(
            "string displaying hmm structure to search for, such as: \n"
            " \n"
            "'>hmm_a n_ab <hmm_b n_bc hmm_c'\n"
            " \n"
            "where '>' indicates a hmm target located on the positive strand, \n"
            "'<' a target located on the negative strand, and n_ab cooresponds \n"
            "to the maximum number of genes separating matched genes a and b. \n"
            "Multiple hmms may be employed. \n"
            "No order symbol in a hmm indicates that results should be independent \n"
            "of strand location. "
        ),
    )
    required.add_argument(
        "-i",
        "--data",
        dest="data",
        metavar="",
        type=Path,
        required=True,
        help=(
            "path to fasta file containing peptide database. \n"
            "Record labels must follow the format specified in docs \n"
            "(see section: General Usage). Pynteny build subcommand exports \n"
            "the generated database in the correct format"
        ),
    )
    optional.add_argument(
        "-d",
        "--hmm_dir",
        dest="hmm_dir",
        type=Path,
        metavar="",
        required=False,
        default=None,
        help=(
            "path to directory containing hmm (i.e, tigrfam or pfam) models. \n"
            "IMPORTANT: the directory must contain one hmm per file, and the file \n"
            "name must coincide with the hmm name that will be displayed in the synteny structure. \n"
            "The directory can contain more hmm models than used in the synteny structure. \n"
            "It may also be the path to a compressed (tar, tar.gz, tgz) directory. \n"
            "If not provided, hmm models (PGAP database) will be downloaded from the NCBI. \n"
            "(if not already downloaded)"
        ),
    )
    optional.add_argument(
        "-o",
        "--outdir",
        dest="outdir",
        type=Path,
        metavar="",
        help="path to output directory",
        default=None,
    )
    optional.add_argument(
        "-x",
        "--prefix",
        dest="prefix",
        type=str,
        metavar="",
        default="",
        help="prefix to be added to output files",
    )
    optional.add_argument(
        "-p",
        "--processes",
        dest="processes",
        type=int,
        metavar="",
        default=None,
        help="maximum number of processes available to HMMER. Defaults to all but one.",
    )
    optional.add_argument(
        "-a",
        "--hmmsearch_args",
        dest="hmmsearch_args",
        type=str,
        metavar="",
        default=None,
        required=False,
        help=(
            "list of comma-separated additional arguments to hmmsearch for each input hmm. \n"
            "A single argument may be provided, in which case the same additional argument \n"
            "is employed in all hmms."
        ),
    )
    optional.add_argument(
        "-g",
        "--gene_ids",
        dest="gene_ids",
        default=False,
        action="store_true",
        help=(
            "use gene symbols in synteny structure instead of HMM names. \n"
            "If set, a path to the hmm database metadata file must be provided \n"
            "in argument '--hmm_meta'"
        ),
    )
    optional.add_argument(
        "-u",
        "--unordered",
        dest="unordered",
        default=False,
        action="store_true",
        help=(
            "whether the HMMs should be arranged in the exact same order displayed \n"
            "in the synteny_structure or in  any order. If ordered, the filters will \n"
            "filter collinear rather than syntenic structures. \n"
            "If more than two HMMs are employed, the largest maximum distance among any \n"
            "pair is considered to run the search."
        ),
    )
    optional.add_argument(
        "-b",
        "--best_hmm_wins",
        dest="best_hmm_wins",
        default=False,
        action="store_true",
        help=(
            "if the same peptide is hit by more than one HMM (e.g. paralogous \n"
            "models that cross-hit), keep only the highest-scoring HMM for that \n"
            "peptide. Use this when your HMM panel contains close paralogs \n"
            "(e.g. nitrogenase nifD/nifK, nifE/nifN) to avoid silently dropping \n"
            "genuine syntenic blocks. Disabled by default."
        ),
    )
    optional.add_argument(
        "-r",
        "--reuse",
        dest="reuse",
        default=False,
        action="store_true",
        help=(
            "reuse hmmsearch result table in following synteny searches. \n"
            "Do not delete hmmer_outputs subdirectory for this option to work."
        ),
    )
    optional.add_argument(
        "-m",
        "--hmm_meta",
        dest="hmm_meta",
        type=Path,
        default=None,
        metavar="",
        required=False,
        help="path to hmm database metadata file",
    )
    optional.add_argument(
        "-l",
        "--log",
        dest="logfile",
        type=Path,
        default=None,
        metavar="",
        required=False,
        help="path to log file. Log not written by default.",
    )
    return parser

Module subcommands

Functions containing CLI subcommands

build_database(args)

Build annotated peptide database from input assembly or GenBank data.

Parameters:
  • args (Union[CommandArgs, ArgumentParser]) –

    arguments object.

Source code in src/pynteny/subcommands.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def build_database(args: Union[CommandArgs, ArgumentParser]) -> None:
    """Build annotated peptide database from input assembly
    or GenBank data.

    Args:
        args (Union[CommandArgs, ArgumentParser]): arguments object.
    """
    logger = init_logger(args)
    logger.info("Building annotated peptide database")
    if args.processes is None:
        args.processes = os.cpu_count() - 1
    if args.data.is_dir() and args.prepend:
        prepend_file_name = True
    else:
        prepend_file_name = False

    database = Database(args.data)
    database.build(
        output_file=args.outfile,
        prepend_file_name=prepend_file_name,
        processes=args.processes,
        tempdir=args.tempdir,
    )
    logger.info("Database built successfully!")
    logging.shutdown()

download_hmms(args)

Download HMM (PGAP) database from NCBI.

Parameters:
  • args (Union[CommandArgs, ArgumentParser]) –

    arguments object.

Source code in src/pynteny/subcommands.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
def download_hmms(args: Union[CommandArgs, ArgumentParser]) -> None:
    """Download HMM (PGAP) database from NCBI.

    Args:
        args (Union[CommandArgs, ArgumentParser]): arguments object.
    """
    logger = init_logger(args)
    config = ConfigParser.get_default_config()
    if (config.get_field("PGAP_data_downloaded")) and (args.pgap) and (not args.force):
        logger.info("PGAP HMM database already downloaded. Skipping download")
    elif (
        (config.get_field("PFAM_data_downloaded")) and (args.pfam) and (not args.force)
    ):
        logger.info("PFAM HMM database already downloaded. Skipping download")
    elif (
        (config.get_field("PGAP_data_downloaded"))
        and (args.pgap)
        and (config.get_field("PFAM_data_downloaded"))
        and (args.pfam)
        and (not args.force)
    ):
        logger.info("HMM databases already downloaded. Skipping download")
        sys.exit(1)
    if args.outdir is None:
        logger.error("Please, provide directory in which to download HMM databases.")
        sys.exit(1)
    else:
        download_dir = Path(args.outdir).absolute()
    if not download_dir.exists():
        download_dir.mkdir(parents=True, exist_ok=True)

    config.update_config("database_dir", download_dir.as_posix())

    if args.pgap:
        logger.info("Downloading PGAP database")
        try:
            PGAP_path, PGAP_meta_file = download_pgap(download_dir, unpack=args.unpack)
            PGAP(PGAP_meta_file).remove_missing_HMMs_from_metadata(
                PGAP_path, PGAP_meta_file
            )
            config.update_config("unpack_PGAP_database", args.unpack)
            logger.info("PGAP database downloaded successfully\n")
            config.update_config("PGAP_data_downloaded", True)
            config.update_config("PGAP_database", PGAP_path.as_posix())
            config.update_config("PGAP_meta_file", PGAP_meta_file.as_posix())
        except Exception:
            logger.exception(
                "Failed to download PGAP database. Please check your internet connection."
            )
            sys.exit(1)

    if args.pfam:
        logger.info("Downloading PFAM-A database")
        try:
            PFAM_meta_file = download_dir / "hmm_PFAM.tsv"
            PFAM_path = download_dir / "PFAM_hmms"
            PFAM_gz_file = download_pfam(download_dir, unpack=True)
            PFAM.from_gz_file(
                PFAM_gz_file,
                hmm_outdir=PFAM_path,
                meta_outfile=PFAM_meta_file,
            )
            config.update_config("unpack_PFAM_database", True)
            logger.info("PFAM database downloaded successfully\n")
            config.update_config("PFAM_data_downloaded", True)
            config.update_config("PFAM_database", PFAM_path.as_posix())
            config.update_config("PFAM_meta_file", PFAM_meta_file.as_posix())
        except Exception:
            logger.exception(
                "Failed to download PFAM-A database. Please check your internet connection."
            )
            sys.exit(1)
    logging.shutdown()

get_citation(args, silent=False)

Get Pynteny citation string.

Parameters:
  • args (ArgumentParser) –

    arguments object.

  • silent (bool, default: False ) –

    do not print to terminal. Defaults to False.

Returns:
  • str( str ) –

    Pyntey citation text.

Source code in src/pynteny/subcommands.py
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
def get_citation(args: Union[CommandArgs, ArgumentParser], silent: bool = False) -> str:
    """Get Pynteny citation string.

    Args:
        args (argparse.ArgumentParser): arguments object.
        silent (bool, optional): do not print to terminal.
            Defaults to False.

    Returns:
        str: Pyntey citation text.
    """
    citation = (
        "Semidán Robaina Estévez (2022). Pynteny: synteny-aware hmm searches made easy"
        f"(Version {args.version}). Zenodo. https://doi.org/10.5281/zenodo.7048685"
    )
    if not silent:
        print("If you use this software, please cite it as below: ")
        print(citation)
    return citation

init_logger(args)

Initialize logger object

Parameters:
  • args (Union[CommandArgs, ArgumentParser]) –

    arguments object

Returns:
  • Logger

    logging.Logger: initialized logger object

Source code in src/pynteny/subcommands.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def init_logger(args: Union[CommandArgs, ArgumentParser]) -> logging.Logger:
    """Initialize logger object

    Args:
        args (Union[CommandArgs, ArgumentParser]): arguments object

    Returns:
        logging.Logger: initialized logger object
    """
    if args.logfile is None:
        args.logfile = Path(os.devnull)
    elif not Path(args.logfile.parent).exists():
        Path(args.logfile.parent).mkdir(parents=True)
    logging.basicConfig(
        format="%(asctime)s | %(levelname)s: %(message)s",
        handlers=[logging.FileHandler(args.logfile), logging.StreamHandler(sys.stdout)],
        level=logging.NOTSET,
    )
    logger = logging.getLogger(__name__)
    return logger

parse_gene_ids(args)

Convert gene symbols to hmm names.

Parameters:
  • args (Union[CommandArgs, ArgumentParser]) –

    arguments object.

Returns:
  • str( str ) –

    synteny structure where gene symbols are replaced by HMM names.

Source code in src/pynteny/subcommands.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
def parse_gene_ids(args: Union[CommandArgs, ArgumentParser]) -> str:
    """Convert gene symbols to hmm names.

    Args:
        args (Union[CommandArgs, ArgumentParser]): arguments object.

    Returns:
        str: synteny structure where gene symbols are replaced
            by HMM names.
    """
    logger = init_logger(args)
    config = ConfigParser.get_default_config()
    if args.hmm_meta is None:
        if not (
            config.get_field("PGAP_data_downloaded")
            or config.get_field("PFAM_data_downloaded")
        ):
            logger.error(
                "Please download hmm database meta file or provide path to existing one first."
            )
            sys.exit(1)
        else:
            args.hmm_meta = Path(config.get_field("PGAP_meta_file"))
    (
        gene_synteny_struc,
        gene_to_hmm_group,
    ) = syntenyparser.parse_genes_in_synteny_structure(
        synteny_structure=args.synteny_struc, hmm_meta=args.hmm_meta
    )
    logger.info(
        f'Translated \n "{args.synteny_struc}" \n to \n "{gene_synteny_struc}" \n according to provided HMM database metadata'
    )
    logging.shutdown()
    return gene_synteny_struc

Search peptide database by synteny structure containing HMMs.

Parameters:
  • args (Union[CommandArgs, ArgumentParser]) –

    arguments object.

Returns:
  • SyntenyHits( SyntenyHits ) –

    instance of SyntenyHits.

Source code in src/pynteny/subcommands.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def synteny_search(args: Union[CommandArgs, ArgumentParser]) -> SyntenyHits:
    """Search peptide database by synteny structure containing HMMs.

    Args:
        args (Union[CommandArgs, ArgumentParser]): arguments object.

    Returns:
        SyntenyHits: instance of SyntenyHits.
    """
    logger = init_logger(args)
    if not Path(args.data).exists():
        logger.error("Sequence data file does not exist")
        sys.exit(1)

    config = ConfigParser.get_default_config()
    args.synteny_struc = syntenyparser.reformat_synteny_structure(args.synteny_struc)
    if not syntenyparser.is_valid_structure(args.synteny_struc):
        logger.error(
            (
                f"Invalid synteny structure format: {args.synteny_struc}. "
                "Execute pynteny search --help to see the right format."
            )
        )
        sys.exit(1)
    if args.hmm_dir is None:
        if not (
            config.get_field("PGAP_data_downloaded")
            or config.get_field("PFAM_data_downloaded")
        ):
            logger.warning(
                "HMM database not found. Downloading PGAP database from NCBI"
            )
            down_args = CommandArgs(unpack=True, outdir=None, logfile=None)
            download_hmms(down_args)
        else:
            args.hmm_dir = Path(config.get_field("PGAP_database"))
    if args.gene_ids:
        if args.hmm_meta is None:
            if not (
                config.get_field("PGAP_data_downloaded")
                or config.get_field("PFAM_data_downloaded")
            ):
                logger.error(
                    "Please download hmm database first or provide path to hmm metadata file."
                )
                sys.exit(1)
            else:
                args.hmm_meta = Path(config.get_field("PGAP_meta_file"))
        logger.info("Finding matching HMMs for gene symbols")
        (
            gene_synteny_struc,
            gene_to_hmm_group,
        ) = syntenyparser.parse_genes_in_synteny_structure(
            synteny_structure=args.synteny_struc, hmm_meta=args.hmm_meta
        )
        args.synteny_struc = gene_synteny_struc
        logger.info(
            f"Found the following HMMs in database for given structure:\n{gene_synteny_struc}"
        )

    temp_hmm_dir = Path(args.hmm_dir.parent) / "temp_hmm_dir"
    if is_tar_file(args.hmm_dir) or is_gz_file(args.hmm_dir):
        extract_to_directory(args.hmm_dir, temp_hmm_dir)
        hmm_dir = temp_hmm_dir
    else:
        hmm_dir = args.hmm_dir

    hmm_names = syntenyparser.get_all_HMMs_in_structure(args.synteny_struc)
    unique_hmm_names = np.unique(hmm_names)
    input_hmms = [
        file
        for file in hmm_dir.iterdir()
        if any([hmm_name in file.as_posix() for hmm_name in hmm_names])
    ]
    if len(input_hmms) < len(unique_hmm_names):
        logger.error(
            "Not all HMMs in synteny structure found in HMM directory. "
            "Remember to include '--gene_ids' option if you want to search by gene symbols."
        )
        sys.exit(1)
    if args.outdir is None:
        args.outdir = Path(args.data.parent)
    if not args.outdir.exists():
        args.outdir.mkdir(parents=True, exist_ok=True)
    if args.hmmsearch_args is None:
        hmmsearch_args = ",".join(["None" for _ in input_hmms])
    else:
        hmmsearch_args = args.hmmsearch_args
    hmmsearch_args = list(map(lambda x: x.strip(), hmmsearch_args.split(",")))
    hmmsearch_args = list(map(lambda x: None if x == "None" else x, hmmsearch_args))
    hmmer_output_dir = os.path.join(args.outdir, "hmmer_outputs/")
    synteny_table = args.outdir / f"{args.prefix}synteny_matched.tsv"

    logger.info("Searching database by synteny structure")
    synteny_hits = filter_FASTA_by_synteny_structure(
        synteny_structure=args.synteny_struc,
        unordered=args.unordered,
        best_hmm_wins=getattr(args, "best_hmm_wins", False),
        input_fasta=args.data,
        input_hmms=input_hmms,
        hmm_meta=args.hmm_meta,
        hmmer_output_dir=hmmer_output_dir,
        reuse_hmmer_results=args.reuse,
        method="hmmsearch",
        processes=args.processes,
        additional_args=hmmsearch_args,
    )
    synteny_hits.write_to_TSV(synteny_table)
    logger.info("Writing matching sequences to FASTA files")
    synteny_hits.write_hit_sequences_to_FASTA_files(
        sequence_database=args.data, output_dir=args.outdir, output_prefix=args.prefix
    )
    if temp_hmm_dir.exists():
        shutil.rmtree(temp_hmm_dir)
    logger.info("Finished!")
    logging.shutdown()
    return synteny_hits

Module utils

Functions and classes for general purposes

CommandArgs

Base class to hold command line arguments.

Source code in src/pynteny/utils.py
27
28
29
30
31
class CommandArgs:
    """Base class to hold command line arguments."""

    def __init__(self, **kwargs):
        self.__dict__.update(kwargs)

ConfigParser

Handle Pynteny configuration file.

Source code in src/pynteny/utils.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
class ConfigParser:
    """Handle Pynteny configuration file."""

    def __init__(self, config_file: Path) -> None:
        self._config_file = Path(config_file)
        self._config = self.get_config()

    @classmethod
    def get_default_config(cls):
        """Initialize ConfigParser with default config file."""
        return cls(cls.initialize_config_file())

    @staticmethod
    def initialize_config_file() -> Path:
        """Initialize empty config file.

        Returns:
            Path: path to generated config file.
        """
        config_file = Path(__file__).parent / "config.json"
        if not config_file.exists():
            config = {
                "database_dir": "",
                "upack_PGAP_database": False,
                "upack_PFAM_database": False,
                "PGAP_data_downloaded": False,
                "PFAM_data_downloaded": False,
                "PGAP_database": "",
                "PGAP_meta_file": "",
                "PFAM_database": "",
                "PFAM_meta_file": "",
            }
            with open(config_file, "w", encoding="UTF-8") as f:
                json.dump(config, f, indent=4)
        return config_file

    def get_config_path(self) -> Path:
        """Show config file path."""
        return self._config_file

    def get_config(self) -> dict:
        """Load config file.

        Returns:
            dict: dict containing fields and values of config file.
        """
        with open(self._config_file, "r", encoding="UTF-8") as file:
            config = json.loads(file.read())
        return config

    def write_config(self) -> None:
        """Write config dict to file."""
        with open(self._config_file, "w", encoding="UTF-8") as f:
            json.dump(self._config, f, indent=4)

    def update_config(self, key: str, value: str) -> None:
        """Update config file

        Args:
            key (str): config file key name to be updated.
            value (str): new value.
        """
        self._config[key] = value
        self.write_config()

    def get_field(self, key: str) -> str:
        """Get field from config file.

        Args:
            key (str): key name to get the value from.

        Returns:
            str: key value.
        """
        return self._config[key]

get_config()

Load config file.

Returns:
  • dict( dict ) –

    dict containing fields and values of config file.

Source code in src/pynteny/utils.py
74
75
76
77
78
79
80
81
82
def get_config(self) -> dict:
    """Load config file.

    Returns:
        dict: dict containing fields and values of config file.
    """
    with open(self._config_file, "r", encoding="UTF-8") as file:
        config = json.loads(file.read())
    return config

get_config_path()

Show config file path.

Source code in src/pynteny/utils.py
70
71
72
def get_config_path(self) -> Path:
    """Show config file path."""
    return self._config_file

get_default_config() classmethod

Initialize ConfigParser with default config file.

Source code in src/pynteny/utils.py
41
42
43
44
@classmethod
def get_default_config(cls):
    """Initialize ConfigParser with default config file."""
    return cls(cls.initialize_config_file())

get_field(key)

Get field from config file.

Parameters:
  • key (str) –

    key name to get the value from.

Returns:
  • str( str ) –

    key value.

Source code in src/pynteny/utils.py
 99
100
101
102
103
104
105
106
107
108
def get_field(self, key: str) -> str:
    """Get field from config file.

    Args:
        key (str): key name to get the value from.

    Returns:
        str: key value.
    """
    return self._config[key]

initialize_config_file() staticmethod

Initialize empty config file.

Returns:
  • Path( Path ) –

    path to generated config file.

Source code in src/pynteny/utils.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@staticmethod
def initialize_config_file() -> Path:
    """Initialize empty config file.

    Returns:
        Path: path to generated config file.
    """
    config_file = Path(__file__).parent / "config.json"
    if not config_file.exists():
        config = {
            "database_dir": "",
            "upack_PGAP_database": False,
            "upack_PFAM_database": False,
            "PGAP_data_downloaded": False,
            "PFAM_data_downloaded": False,
            "PGAP_database": "",
            "PGAP_meta_file": "",
            "PFAM_database": "",
            "PFAM_meta_file": "",
        }
        with open(config_file, "w", encoding="UTF-8") as f:
            json.dump(config, f, indent=4)
    return config_file

update_config(key, value)

Update config file

Parameters:
  • key (str) –

    config file key name to be updated.

  • value (str) –

    new value.

Source code in src/pynteny/utils.py
89
90
91
92
93
94
95
96
97
def update_config(self, key: str, value: str) -> None:
    """Update config file

    Args:
        key (str): config file key name to be updated.
        value (str): new value.
    """
    self._config[key] = value
    self.write_config()

write_config()

Write config dict to file.

Source code in src/pynteny/utils.py
84
85
86
87
def write_config(self) -> None:
    """Write config dict to file."""
    with open(self._config_file, "w", encoding="UTF-8") as f:
        json.dump(self._config, f, indent=4)

download_file(url, output_file)

Download file from url

Parameters:
  • url (str) –

    url where file to be downloaded

  • output_file (Path) –

    path to downloaded file

Source code in src/pynteny/utils.py
204
205
206
207
208
209
210
211
212
213
214
215
216
def download_file(url: str, output_file: Path) -> None:
    """Download file from url

    Args:
        url (str): url where file to be downloaded
        output_file (Path): path to downloaded file
    """
    output_file = Path(output_file)
    with requests.get(url, stream=True) as r:
        total_length = int(r.headers.get("Content-Length"))
        with tqdm.wrapattr(r.raw, "read", total=total_length, desc="") as raw:
            with open(output_file, "wb") as output:
                shutil.copyfileobj(raw, output)

extract_gz_file(gz_file, dest_dir=None)

Extract gz files to dest_dir.

Parameters:
  • gz_file (Path) –

    path to gz file.

  • dest_dir (Path, default: None ) –

    path to destination directory to store the uncompressed file. Defaults to None.

Source code in src/pynteny/utils.py
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
def extract_gz_file(gz_file: Path, dest_dir: Path = None) -> None:
    """Extract gz files to dest_dir.

    Args:
        gz_file (Path): path to gz file.
        dest_dir (Path, optional): path to destination directory
            to store the uncompressed file. Defaults to None.
    """
    gz_file = Path(gz_file)
    if dest_dir is None:
        dest_dir = "."
    if gz_file.as_posix().endswith("gz"):
        gz = tarfile.open(gz_file, "r:gz")
        gz.extractall(path=dest_dir)
        gz.close()
    else:
        logger.error("Input is not a gz file")
        sys.exit(1)

extract_tar_file(tar_file, dest_dir=None)

Extract tar or tar.gz files to dest_dir.

Parameters:
  • tar_file (Path) –

    path to tar file.

  • dest_dir (Path, default: None ) –

    path to destination directory to store the uncompressed file. Defaults to None.

Source code in src/pynteny/utils.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
def extract_tar_file(tar_file: Path, dest_dir: Path = None) -> None:
    """Extract tar or tar.gz files to dest_dir.

    Args:
        tar_file (Path): path to tar file.
        dest_dir (Path, optional): path to destination directory
            to store the uncompressed file. Defaults to None.
    """
    tar_file = Path(tar_file)
    if dest_dir is None:
        dest_dir = "."
    if (tar_file.as_posix().endswith("tar.gz")) or (
        tar_file.as_posix().endswith("tgz")
    ):
        tar = tarfile.open(tar_file, "r:gz")
        tar.extractall(path=dest_dir)
        tar.close()
    elif tar_file.as_posix().endswith("tar"):
        tar = tarfile.open(tar_file, "r:")
        tar.extractall(path=dest_dir)
        tar.close()
    else:
        logger.error("Input is not a tar file")
        sys.exit(1)

extract_to_directory(file, destination_dir)

Extract hmm database (tar.gz) to downlaod directory

Parameters:
  • hmm_file (Path) –

    path to compressed database.

Source code in src/pynteny/utils.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
def extract_to_directory(file: Path, destination_dir: Path) -> None:
    """Extract hmm database (tar.gz) to downlaod directory

    Args:
        hmm_file (Path): path to compressed database.
    """
    file = Path(file)
    if (not is_tar_file(file)) and (not is_gz_file(file)):
        logger.warning(f"{file} is not a tar or gz file. Skipping extraction")
        return
    else:
        logger.info("Extracting files to target directory")
        if is_tar_file(file):
            extract_tar_file(file, destination_dir)
        elif is_gz_file(file):
            extract_gz_file(file, destination_dir)
        flatten_directory(destination_dir)
        os.remove(file)
        logger.info("Database unpacked successfully")

flatten_directory(directory)

Flatten directory, i.e. remove all subdirectories and copy all files to the top level directory.

Parameters:
  • directory (Path) –

    path to directory.

Source code in src/pynteny/utils.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
def flatten_directory(directory: Path) -> None:
    """Flatten directory, i.e. remove all subdirectories and
    copy all files to the top level directory.

    Args:
        directory (Path): path to directory.
    """
    directory = Path(directory)
    directory = directory.as_posix()
    for dirpath, _, filenames in os.walk(directory, topdown=False):
        for filename in filenames:
            i = 0
            source = os.path.join(dirpath, filename)
            target = os.path.join(directory, filename)
            while os.path.exists(target):
                i += 1
                file_parts = os.path.splitext(os.path.basename(filename))

                target = os.path.join(
                    directory,
                    file_parts[0] + "_" + str(i) + file_parts[1],
                )
            shutil.move(source, target)
        if dirpath != directory:
            os.rmdir(dirpath)

is_gz_file(gz_file)

Check whether file is gz-compressed.

Parameters:
  • gz_file (Path) –

    path to file.

Returns:
  • bool( bool ) –

    whether file is compressed or not.

Source code in src/pynteny/utils.py
232
233
234
235
236
237
238
239
240
241
242
def is_gz_file(gz_file: Path) -> bool:
    """Check whether file is gz-compressed.

    Args:
        gz_file (Path): path to file.

    Returns:
        bool: whether file is compressed or not.
    """
    gz_file = Path(gz_file)
    return gz_file.is_file() and gz_file.as_posix().endswith("gz")

is_right_list_nested_type(list_object, inner_type)

Check if all elements in list are of the same type.

Parameters:
  • list_object (list) –

    list containing elements.

  • inner_type (type) –

    type to be checked.

Returns:
  • bool( bool ) –

    whether list contains elements of the same specified type.

Source code in src/pynteny/utils.py
354
355
356
357
358
359
360
361
362
363
364
def is_right_list_nested_type(list_object: list, inner_type: type) -> bool:
    """Check if all elements in list are of the same type.

    Args:
        list_object (list): list containing elements.
        inner_type (type): type to be checked.

    Returns:
        bool: whether list contains elements of the same specified type.
    """
    return all(isinstance(x, inner_type) for x in list_object)

is_tar_file(tar_file)

Check whether file is tar-compressed.

Parameters:
  • tar_file (Path) –

    path to file.

Returns:
  • bool( bool ) –

    whether file is compressed or not.

Source code in src/pynteny/utils.py
219
220
221
222
223
224
225
226
227
228
229
def is_tar_file(tar_file: Path) -> bool:
    """Check whether file is tar-compressed.

    Args:
        tar_file (Path): path to file.

    Returns:
        bool: whether file is compressed or not.
    """
    tar_file = Path(tar_file)
    return tar_file.is_file() and tarfile.is_tarfile(tar_file.as_posix())

list_tar_dir(tar_dir)

List files within tar or tar.gz directory.

Parameters:
  • tar_dir (Path) –

    path to directory containing tar files.

Returns:
  • list( list ) –

    list of tar files.

Source code in src/pynteny/utils.py
312
313
314
315
316
317
318
319
320
321
322
323
324
def list_tar_dir(tar_dir: Path) -> list:
    """List files within tar or tar.gz directory.

    Args:
        tar_dir (Path): path to directory containing tar files.

    Returns:
        list: list of tar files.
    """
    tar_dir = Path(tar_dir)
    with tarfile.open(tar_dir, "r") as tar_obj:
        files = tar_obj.getnames()
    return files

parallelize_over_input_files(callable, input_list, processes=None, max_tasks_per_child=10, **callable_kwargs)

Parallelize callable over a set of input objects using a pool of workers. Inputs in input list are passed to the first argument of the callable. Additional callable named arguments may be passed.

Parameters:
  • callable (_type_) –

    function to be run.

  • input_list (list) –

    list of inputs to callable.

  • n_processes (int) –

    maximum number of threads. Defaults to all minus one.

  • max_tasks_per_child (int, default: 10 ) –

    maximum number of tasks per child process before is reset. Defaults to 10.

Source code in src/pynteny/utils.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def parallelize_over_input_files(
    callable,
    input_list: list,
    processes: int = None,
    max_tasks_per_child=10,
    **callable_kwargs,
) -> None:
    """Parallelize callable over a set of input objects using a pool
    of workers. Inputs in input list are passed to the first argument
    of the callable. Additional callable named arguments may be passed.

    Args:
        callable (_type_): function to be run.
        input_list (list): list of inputs to callable.
        n_processes (int, optional): maximum number of threads.
            Defaults to all minus one.
        max_tasks_per_child (int, optional): maximum number of tasks per child
            process before is reset. Defaults to 10.
    """
    if processes is None:
        processes = os.cpu_count - 1
    p = Pool(processes, maxtasksperchild=max_tasks_per_child)
    p.map(partial(callable, **callable_kwargs), input_list)
    p.close()
    p.join()

set_default_output_path(input_path, tag=None, extension=None, only_filename=False, only_basename=False, only_dirname=False)

Utility function to generate a default path to output file or directory based on an input file name and path.

Parameters:
  • input_path (Path) –

    path to input file.

  • tag (str, default: None ) –

    text tag to be added to file name. Defaults to None.

  • extension (str, default: None ) –

    change input file extension with this one. Defaults to None.

  • only_filename (bool, default: False ) –

    output only default filename. Defaults to False.

  • only_basename (bool, default: False ) –

    output only default basename (no extension). Defaults to False.

  • only_dirname (bool, default: False ) –

    output only path to default output directory. Defaults to False.

Returns:
  • Path( Path ) –

    a path or name to a default output file.

Source code in src/pynteny/utils.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def set_default_output_path(
    input_path: Path,
    tag: str = None,
    extension: str = None,
    only_filename: bool = False,
    only_basename: bool = False,
    only_dirname: bool = False,
) -> Path:
    """Utility function to generate a default path to output file
    or directory based on an input file name and path.

    Args:
        input_path (Path): path to input file.
        tag (str, optional): text tag to be added to file name. Defaults to None.
        extension (str, optional): change input file extension with this one. Defaults to None.
        only_filename (bool, optional): output only default filename. Defaults to False.
        only_basename (bool, optional): output only default basename (no extension). Defaults to False.
        only_dirname (bool, optional): output only path to default output directory. Defaults to False.

    Returns:
        Path: a path or name to a default output file.
    """
    input_path = Path(input_path)
    dirname = input_path.parent
    fname, ext = input_path.stem, input_path.suffix
    if extension is None:
        extension = ext
    if tag is None:
        tag = ""
    default_file = f"{fname}{tag}{extension}"
    if only_basename:
        return Path(fname)
    if only_filename:
        return Path(default_file)
    if only_dirname:
        return Path(dirname)
    else:
        return Path(os.path.abspath(os.path.join(dirname, default_file)))

split_hmms(hmm_file, output_dir)

Split PFAM-A hmm database into individual HMM files and extract metadata file.

Parameters:
  • output_dir (Path) –

    path to output directory.

Source code in src/pynteny/utils.py
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def split_hmms(hmm_file: Path, output_dir: Path) -> None:
    """Split PFAM-A hmm database into individual HMM files
        and extract metadata file.

    Args:
        output_dir (Path): path to output directory.
    """
    output_dir = Path(output_dir)
    if not output_dir.exists():
        output_dir.mkdir(parents=True)
    with open(hmm_file, "r") as f:
        hmm_text = f.read()
    hmm_texts = hmm_text.split("//")
    for text in hmm_texts:
        acc_code = [entry for entry in text.split("\n") if entry.startswith("ACC")]
        if acc_code:
            acc_name = acc_code[0].split()[1]
            with open(output_dir / f"{acc_name}.hmm", "w+") as f:
                f.write(text)

terminal_execute(command_str, suppress_shell_output=False, work_dir=None, return_output=False)

Execute given command in terminal through Python.

Parameters:
  • command_str (str) –

    terminal command to be executed.

  • suppress_shell_output (bool, default: False ) –

    suppress shell output. Defaults to False.

  • work_dir (Path, default: None ) –

    change working directory. Defaults to None.

  • return_output (bool, default: False ) –

    whether to return execution output. Defaults to False.

Returns:
  • STDOUT

    subprocess.STDOUT: subprocess output.

Source code in src/pynteny/utils.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def terminal_execute(
    command_str: str,
    suppress_shell_output=False,
    work_dir: Path = None,
    return_output=False,
) -> subprocess.STDOUT:
    """Execute given command in terminal through Python.

    Args:
        command_str (str): terminal command to be executed.
        suppress_shell_output (bool, optional): suppress shell output. Defaults to False.
        work_dir (Path, optional): change working directory. Defaults to None.
        return_output (bool, optional): whether to return execution output. Defaults to False.

    Returns:
        subprocess.STDOUT: subprocess output.
    """
    if suppress_shell_output:
        suppress_code = ">/dev/null 2>&1"
        command_str = f"{command_str} {suppress_code}"
    output = subprocess.run(
        command_str, shell=True, cwd=work_dir, capture_output=return_output
    )
    return output

Module wrappers

In-process replacements for the external CLI tools that Pynteny used to shell out to (HMMER, Prodigal, seqkit). These now rely on the pure-Python, pip-installable packages pyhmmer, pyrodigal and pyfastx so that Pynteny can be installed without conda.

Run an HMM search with pyhmmer and write the results as an HMMER3 tabular (--tblout) output file.

In-process replacement for the hmmsearch/hmmscan CLI tools. The on-disk format is unchanged (standard HMMER3 tabular output), so downstream parsing and result reuse keep working as before.

Parameters:
  • hmm_model (Path) –

    path to profile HMM to be used.

  • input_fasta (Path) –

    path to fasta containing the sequence database to be searched.

  • output_file (Path, default: None ) –

    path to HMMER tabular output file. Defaults to None.

  • method (str, default: 'hmmsearch' ) –

    either 'hmmsearch' or 'hmmscan'. Defaults to 'hmmsearch'.

  • processes (int, default: None ) –

    maximum number of threads. Defaults to all minus one.

  • additional_args (str, default: None ) –

    a CLI-style string with additional arguments for hmmsearch/hmmscan. Defaults to None.

Source code in src/pynteny/wrappers.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def run_HMM_search(
    hmm_model: Path,
    input_fasta: Path,
    output_file: Path = None,
    method: str = "hmmsearch",
    processes: int = None,
    additional_args: str = None,
) -> None:
    """Run an HMM search with pyhmmer and write the results as an HMMER3
    tabular (``--tblout``) output file.

    In-process replacement for the ``hmmsearch``/``hmmscan`` CLI tools. The
    on-disk format is unchanged (standard HMMER3 tabular output), so downstream
    parsing and result reuse keep working as before.

    Args:
        hmm_model (Path): path to profile HMM to be used.
        input_fasta (Path): path to fasta containing the sequence database to be searched.
        output_file (Path, optional): path to HMMER tabular output file. Defaults to None.
        method (str, optional): either 'hmmsearch' or 'hmmscan'. Defaults to 'hmmsearch'.
        processes (int, optional): maximum number of threads. Defaults to all minus one.
        additional_args (str, optional): a CLI-style string with additional
            arguments for hmmsearch/hmmscan. Defaults to None.
    """
    hmm_model = Path(hmm_model)
    input_fasta = Path(input_fasta)
    if processes is None:
        processes = os.cpu_count() - 1
    if output_file is None:
        output_file = set_default_output_path(input_fasta, "_hmmer_hits", ".txt")
    else:
        output_file = Path(output_file)

    options = _parse_hmmsearch_args(additional_args)

    with pyhmmer.easel.SequenceFile(input_fasta.as_posix(), digital=True) as seq_file:
        sequences = seq_file.read_block()
    with pyhmmer.plan7.HMMFile(hmm_model.as_posix()) as hmm_file:
        hmms = list(hmm_file)

    search = pyhmmer.hmmscan if method == "hmmscan" else pyhmmer.hmmsearch
    if method == "hmmscan":
        # hmmscan queries sequences against the HMM database
        all_hits = list(search(sequences, hmms, cpus=processes, **options))
    else:
        all_hits = list(search(hmms, sequences, cpus=processes, **options))

    with open(output_file, "wb") as fh:
        for i, hits in enumerate(all_hits):
            hits.write(fh, format="targets", header=(i == 0))

run_seqkit_nodup(input_fasta, output_fasta=None, export_duplicates=False)

Remove duplicated sequences from a fasta file (by sequence content).

Pure-Python replacement for seqkit rmdup -s.

Parameters:
  • input_fasta (Path) –

    path to input fasta.

  • output_fasta (Path, default: None ) –

    path to output fasta. Defaults to None.

  • export_duplicates (bool, default: False ) –

    whether to export a file containing the IDs of duplicated sequences. Defaults to False.

Source code in src/pynteny/wrappers.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def run_seqkit_nodup(
    input_fasta: Path, output_fasta: Path = None, export_duplicates: bool = False
):
    """Remove duplicated sequences from a fasta file (by sequence content).

    Pure-Python replacement for ``seqkit rmdup -s``.

    Args:
        input_fasta (Path): path to input fasta.
        output_fasta (Path, optional): path to output fasta. Defaults to None.
        export_duplicates (bool, optional): whether to export a file containing
            the IDs of duplicated sequences. Defaults to False.
    """
    input_fasta = Path(input_fasta)
    if output_fasta is None:
        output_fasta = set_default_output_path(input_fasta, tag="_no_duplicates")
    else:
        output_fasta = Path(output_fasta)

    seen_sequences = set()
    duplicated_ids = []
    fasta = pyfastx.Fasta(input_fasta.as_posix(), build_index=False, full_name=True)
    with open(output_fasta, "w+", encoding="UTF-8") as outfile:
        for record_name, record_seq in fasta:
            seq_key = record_seq.upper()
            if seq_key in seen_sequences:
                duplicated_ids.append(record_name.split()[0])
                continue
            seen_sequences.add(seq_key)
            outfile.write(f">{record_name}\n{record_seq}\n")

    if export_duplicates:
        dup_file = set_default_output_path(
            input_fasta, tag="_duplicates", extension=".txt"
        )
        with open(dup_file, "w+", encoding="UTF-8") as f:
            f.write("\n".join(duplicated_ids))