Module api

Classes to facilitate usage within Python scripts / Notebooks

Build

Bases: Command

Build command object.

Source code in src/pynteny/api.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
class Build(Command):
    """Build command object."""

    def __init__(
        self,
        data: Path,
        prepend: bool = False,
        outfile: Path = None,
        logfile: Path = None,
        processes: int = None,
        tempdir: Path = None,
    ):
        """Translate nucleotide assembly file and assign contig and gene location
           info to each identified ORF (using prodigal).

        Args:
            data (Path): _description_
            outfile (Path, optional): path to file containing built database. Defaults to None.
            logfile (Path, optional): path to logfile. Defaults to None.
            processes (int, optional): maximum number of processes. Defaults to all minus one.
            tempdir (Path, optional): path to directory to contain temporary files. Defaults to None.
        """

        self._args = CommandArgs(
            data=Path(data),
            prepend=prepend,
            outfile=Path(outfile) if outfile is not None else outfile,
            logfile=Path(logfile) if logfile is not None else logfile,
            processes=processes,
            tempdir=Path(tempdir) if tempdir is not None else tempdir,
        )

    def run(self) -> None:
        """Run pynteny search"""
        return build_database(self._args)

__init__(data, prepend=False, outfile=None, logfile=None, processes=None, tempdir=None)

Translate nucleotide assembly file and assign contig and gene location info to each identified ORF (using prodigal).

Parameters:

Name Type Description Default
data Path

description

required
outfile Path

path to file containing built database. Defaults to None.

None
logfile Path

path to logfile. Defaults to None.

None
processes int

maximum number of processes. Defaults to all minus one.

None
tempdir Path

path to directory to contain temporary files. Defaults to None.

None
Source code in src/pynteny/api.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
def __init__(
    self,
    data: Path,
    prepend: bool = False,
    outfile: Path = None,
    logfile: Path = None,
    processes: int = None,
    tempdir: Path = None,
):
    """Translate nucleotide assembly file and assign contig and gene location
       info to each identified ORF (using prodigal).

    Args:
        data (Path): _description_
        outfile (Path, optional): path to file containing built database. Defaults to None.
        logfile (Path, optional): path to logfile. Defaults to None.
        processes (int, optional): maximum number of processes. Defaults to all minus one.
        tempdir (Path, optional): path to directory to contain temporary files. Defaults to None.
    """

    self._args = CommandArgs(
        data=Path(data),
        prepend=prepend,
        outfile=Path(outfile) if outfile is not None else outfile,
        logfile=Path(logfile) if logfile is not None else logfile,
        processes=processes,
        tempdir=Path(tempdir) if tempdir is not None else tempdir,
    )

run()

Run pynteny search

Source code in src/pynteny/api.py
202
203
204
def run(self) -> None:
    """Run pynteny search"""
    return build_database(self._args)

Command

Parent class for Pynteny command

Source code in src/pynteny/api.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class Command:
    """
    Parent class for Pynteny command

    Args:
        CommandArgs
    """

    def _repr_html_(self):
        """Executed by Jupyter to print Author and version in html"""
        return """
        <table>
            <tr>
                <td><strong>Pynteny version</strong></td><td>{version}</td>
            </tr><tr>
                <td><strong>Author</strong></td><td>{author}</td>
            </tr>
        </table>
        """.format(
            version=__version__, author=__author__
        )

    def update(self, argname: str, value: str) -> None:
        """Update argument value in pynteny command.

        Args:
            argname (str): argument name to be updated.
            value (str): new argument value.
        """
        setattr(self._args, argname, value)

    @staticmethod
    def cite():
        """Display Pynteny citation"""
        args = CommandArgs(version=__version__, author=__author__)
        citation = get_citation(args, silent=True)
        print(citation)

cite() staticmethod

Display Pynteny citation

Source code in src/pynteny/api.py
57
58
59
60
61
62
@staticmethod
def cite():
    """Display Pynteny citation"""
    args = CommandArgs(version=__version__, author=__author__)
    citation = get_citation(args, silent=True)
    print(citation)

update(argname, value)

Update argument value in pynteny command.

Parameters:

Name Type Description Default
argname str

argument name to be updated.

required
value str

new argument value.

required
Source code in src/pynteny/api.py
48
49
50
51
52
53
54
55
def update(self, argname: str, value: str) -> None:
    """Update argument value in pynteny command.

    Args:
        argname (str): argument name to be updated.
        value (str): new argument value.
    """
    setattr(self._args, argname, value)

Download

Bases: Command

Download HMM database from NCBI.

Source code in src/pynteny/api.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
class Download(Command):
    """Download HMM database from NCBI."""

    def __init__(
        self,
        outdir: Path,
        logfile: Path = None,
        force: bool = False,
        unpack: bool = False,
    ):
        """Download HMM database from NCBI.

        Args:
            outdir (Path): path to ouput directory in which to store downloaded HMMs.
            logfile (Path, optional): path to log file. Defaults to None.
            force (bool, optional): force-download database again if already downloaded.
            unpack (bool, optional): whether to unpack downloaded file. If False, then PGAP's database.
                will be unpacked in each Pynteny session. Defaults to False.
        """

        self._args = CommandArgs(
            outdir=Path(outdir),
            logfile=Path(logfile) if logfile is not None else logfile,
            force=force,
            unpack=unpack,
        )

    def run(self) -> None:
        """Run pynteny download"""
        return download_hmms(self._args)

__init__(outdir, logfile=None, force=False, unpack=False)

Download HMM database from NCBI.

Parameters:

Name Type Description Default
outdir Path

path to ouput directory in which to store downloaded HMMs.

required
logfile Path

path to log file. Defaults to None.

None
force bool

force-download database again if already downloaded.

False
unpack bool

whether to unpack downloaded file. If False, then PGAP's database. will be unpacked in each Pynteny session. Defaults to False.

False
Source code in src/pynteny/api.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def __init__(
    self,
    outdir: Path,
    logfile: Path = None,
    force: bool = False,
    unpack: bool = False,
):
    """Download HMM database from NCBI.

    Args:
        outdir (Path): path to ouput directory in which to store downloaded HMMs.
        logfile (Path, optional): path to log file. Defaults to None.
        force (bool, optional): force-download database again if already downloaded.
        unpack (bool, optional): whether to unpack downloaded file. If False, then PGAP's database.
            will be unpacked in each Pynteny session. Defaults to False.
    """

    self._args = CommandArgs(
        outdir=Path(outdir),
        logfile=Path(logfile) if logfile is not None else logfile,
        force=force,
        unpack=unpack,
    )

run()

Run pynteny download

Source code in src/pynteny/api.py
234
235
236
def run(self) -> None:
    """Run pynteny download"""
    return download_hmms(self._args)

Search

Bases: Command

Search command object.

Source code in src/pynteny/api.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
class Search(Command):
    """Search command object."""

    def __init__(
        self,
        data: Path,
        synteny_struc: str,
        gene_ids: bool = False,
        unordered: bool = False,
        reuse: bool = False,
        hmm_dir: Path = None,
        hmm_meta: Path = None,
        outdir: Path = None,
        prefix: str = "",
        hmmsearch_args: str = None,
        logfile: Path = None,
        processes: int = None,
    ):
        """Query sequence database for HMM hits arranged in provided synteny structure.

        Args:
            data (Path): path to input labelled database.
            synteny_struc (str): a str describing the desired synteny structure,
                structured as follows:

                '>hmm_a N_ab hmm_b bc <hmm_c'

                where N_ab corresponds to the maximum number of genes separating
                gene found by hmm_a and gene found by hmm_b, and hmm_ corresponds
                to the name of the hmm as provided in the keys of hmm_hits.
                More than two hmms can be concatenated. Strand location may be
                specificed by using '>' for sense and '<' for antisense.
            gene_ids (bool, optional): whether gene symbols are used in synteny
                string instead of HMM names. Defaults to False.
            unordered (bool, optional): whether the HMMs should be arranged in the
                exact same order displayed in the synteny_structure or in
                any order If ordered, the filters would filter collinear rather
                than syntenic structures. Defaults to False.
            reuse (bool, optional): if True then HMMER3 won't be run again for HMMs already
                searched in the same output directory. Defaults to False.
            hmm_dir (Path, optional): path to directory containing input HMM files.
                Defaults to None, in which case the PGAP database is downloaded if not
                already so.
            hmm_meta (Path, optional): path to PGAP's metadata file. Defaults to None.
            outdir (Path, optional): path to output directory. Defaults to None.
            prefix (str, optional): prefix of output file names. Defaults to "".
            hmmsearch_args (str, optional): additional arguments to hmmsearch or hmmscan. Each
                element in the list is a string with additional arguments for each
                input hmm (arranged in the same order), an element can also take a
                value of None to avoid passing additional arguments for a specific
                input hmm. A single string may also be passed, in which case the
                same additional argument is passed to hmmsearch for all input hmms.
                Defaults to None. Defaults to None.
            logfile (Path, optional): path to log file. Defaults to None.
            processes (int, optional): maximum number of threads to be employed.
                Defaults to all minus one.
        """

        self._args = CommandArgs(
            data=Path(data),
            synteny_struc=synteny_struc,
            hmm_dir=Path(hmm_dir) if hmm_dir is not None else hmm_dir,
            hmm_meta=Path(hmm_meta) if hmm_meta is not None else hmm_meta,
            outdir=Path(outdir) if outdir is not None else outdir,
            prefix=prefix,
            hmmsearch_args=hmmsearch_args,
            gene_ids=gene_ids,
            logfile=Path(logfile) if logfile is not None else logfile,
            processes=processes,
            unordered=unordered,
            reuse=reuse,
        )

    def parse_genes(self, synteny_struc: str) -> str:
        """Parse gene IDs in synteny structure and find corresponding HMM names
        in provided HMM database.

        Args:
            synteny_struc (str): a str describing the desired synteny structure,
                structured as follows:

                '>hmm_a N_ab hmm_b bc <hmm_c'

                where N_ab corresponds to the maximum number of genes separating
                gene found by hmm_a and gene found by hmm_b, and hmm_ corresponds
                to the name of the hmm as provided in the keys of hmm_hits.
                More than two hmms can be concatenated. Strand location may be
                specificed by using '>' for sense and '<' for antisense.

        Returns:
            str: parsed synteny structure in which gene symbols are replaced by
                 corresponding HMM names.
        """
        args = CommandArgs(
            synteny_struc=synteny_struc,
            hmm_meta=self._args.hmm_meta,
            logfile=self._args.logfile,
        )
        return parse_gene_ids(args)

    def run(self) -> SyntenyHits:
        """Run pynteny search"""
        return synteny_search(self._args)

__init__(data, synteny_struc, gene_ids=False, unordered=False, reuse=False, hmm_dir=None, hmm_meta=None, outdir=None, prefix='', hmmsearch_args=None, logfile=None, processes=None)

Query sequence database for HMM hits arranged in provided synteny structure.

Parameters:

Name Type Description Default
data Path

path to input labelled database.

required
synteny_struc str

a str describing the desired synteny structure, structured as follows:

'>hmm_a N_ab hmm_b bc <hmm_c'

where N_ab corresponds to the maximum number of genes separating gene found by hmm_a and gene found by hmm_b, and hmm_ corresponds to the name of the hmm as provided in the keys of hmm_hits. More than two hmms can be concatenated. Strand location may be specificed by using '>' for sense and '<' for antisense.

required
gene_ids bool

whether gene symbols are used in synteny string instead of HMM names. Defaults to False.

False
unordered bool

whether the HMMs should be arranged in the exact same order displayed in the synteny_structure or in any order If ordered, the filters would filter collinear rather than syntenic structures. Defaults to False.

False
reuse bool

if True then HMMER3 won't be run again for HMMs already searched in the same output directory. Defaults to False.

False
hmm_dir Path

path to directory containing input HMM files. Defaults to None, in which case the PGAP database is downloaded if not already so.

None
hmm_meta Path

path to PGAP's metadata file. Defaults to None.

None
outdir Path

path to output directory. Defaults to None.

None
prefix str

prefix of output file names. Defaults to "".

''
hmmsearch_args str

additional arguments to hmmsearch or hmmscan. Each element in the list is a string with additional arguments for each input hmm (arranged in the same order), an element can also take a value of None to avoid passing additional arguments for a specific input hmm. A single string may also be passed, in which case the same additional argument is passed to hmmsearch for all input hmms. Defaults to None. Defaults to None.

None
logfile Path

path to log file. Defaults to None.

None
processes int

maximum number of threads to be employed. Defaults to all minus one.

None
Source code in src/pynteny/api.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def __init__(
    self,
    data: Path,
    synteny_struc: str,
    gene_ids: bool = False,
    unordered: bool = False,
    reuse: bool = False,
    hmm_dir: Path = None,
    hmm_meta: Path = None,
    outdir: Path = None,
    prefix: str = "",
    hmmsearch_args: str = None,
    logfile: Path = None,
    processes: int = None,
):
    """Query sequence database for HMM hits arranged in provided synteny structure.

    Args:
        data (Path): path to input labelled database.
        synteny_struc (str): a str describing the desired synteny structure,
            structured as follows:

            '>hmm_a N_ab hmm_b bc <hmm_c'

            where N_ab corresponds to the maximum number of genes separating
            gene found by hmm_a and gene found by hmm_b, and hmm_ corresponds
            to the name of the hmm as provided in the keys of hmm_hits.
            More than two hmms can be concatenated. Strand location may be
            specificed by using '>' for sense and '<' for antisense.
        gene_ids (bool, optional): whether gene symbols are used in synteny
            string instead of HMM names. Defaults to False.
        unordered (bool, optional): whether the HMMs should be arranged in the
            exact same order displayed in the synteny_structure or in
            any order If ordered, the filters would filter collinear rather
            than syntenic structures. Defaults to False.
        reuse (bool, optional): if True then HMMER3 won't be run again for HMMs already
            searched in the same output directory. Defaults to False.
        hmm_dir (Path, optional): path to directory containing input HMM files.
            Defaults to None, in which case the PGAP database is downloaded if not
            already so.
        hmm_meta (Path, optional): path to PGAP's metadata file. Defaults to None.
        outdir (Path, optional): path to output directory. Defaults to None.
        prefix (str, optional): prefix of output file names. Defaults to "".
        hmmsearch_args (str, optional): additional arguments to hmmsearch or hmmscan. Each
            element in the list is a string with additional arguments for each
            input hmm (arranged in the same order), an element can also take a
            value of None to avoid passing additional arguments for a specific
            input hmm. A single string may also be passed, in which case the
            same additional argument is passed to hmmsearch for all input hmms.
            Defaults to None. Defaults to None.
        logfile (Path, optional): path to log file. Defaults to None.
        processes (int, optional): maximum number of threads to be employed.
            Defaults to all minus one.
    """

    self._args = CommandArgs(
        data=Path(data),
        synteny_struc=synteny_struc,
        hmm_dir=Path(hmm_dir) if hmm_dir is not None else hmm_dir,
        hmm_meta=Path(hmm_meta) if hmm_meta is not None else hmm_meta,
        outdir=Path(outdir) if outdir is not None else outdir,
        prefix=prefix,
        hmmsearch_args=hmmsearch_args,
        gene_ids=gene_ids,
        logfile=Path(logfile) if logfile is not None else logfile,
        processes=processes,
        unordered=unordered,
        reuse=reuse,
    )

parse_genes(synteny_struc)

Parse gene IDs in synteny structure and find corresponding HMM names in provided HMM database.

Parameters:

Name Type Description Default
synteny_struc str

a str describing the desired synteny structure, structured as follows:

'>hmm_a N_ab hmm_b bc <hmm_c'

where N_ab corresponds to the maximum number of genes separating gene found by hmm_a and gene found by hmm_b, and hmm_ corresponds to the name of the hmm as provided in the keys of hmm_hits. More than two hmms can be concatenated. Strand location may be specificed by using '>' for sense and '<' for antisense.

required

Returns:

Name Type Description
str str

parsed synteny structure in which gene symbols are replaced by corresponding HMM names.

Source code in src/pynteny/api.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def parse_genes(self, synteny_struc: str) -> str:
    """Parse gene IDs in synteny structure and find corresponding HMM names
    in provided HMM database.

    Args:
        synteny_struc (str): a str describing the desired synteny structure,
            structured as follows:

            '>hmm_a N_ab hmm_b bc <hmm_c'

            where N_ab corresponds to the maximum number of genes separating
            gene found by hmm_a and gene found by hmm_b, and hmm_ corresponds
            to the name of the hmm as provided in the keys of hmm_hits.
            More than two hmms can be concatenated. Strand location may be
            specificed by using '>' for sense and '<' for antisense.

    Returns:
        str: parsed synteny structure in which gene symbols are replaced by
             corresponding HMM names.
    """
    args = CommandArgs(
        synteny_struc=synteny_struc,
        hmm_meta=self._args.hmm_meta,
        logfile=self._args.logfile,
    )
    return parse_gene_ids(args)

run()

Run pynteny search

Source code in src/pynteny/api.py
165
166
167
def run(self) -> SyntenyHits:
    """Run pynteny search"""
    return synteny_search(self._args)

Module filter

Tools to filter HMM hits by synteny structure

SyntenyHMMfilter

Tools to search for synteny structures among sets of hmm models

Source code in src/pynteny/filter.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
class SyntenyHMMfilter:
    """Tools to search for synteny structures among sets of hmm models"""

    def __init__(
        self, hmm_hits: dict, synteny_structure: str, unordered: bool = True
    ) -> None:
        """Search for contigs that satisfy the given gene synteny structure.

        Args:
            hmm_hits (dict): a dict of pandas DataFrames, as output by
                parseHMMsearchOutput with keys corresponding to hmm names
            synteny_structure (str): a str describing the desired synteny structure,
                structured as follows:

                '>hmm_a N_ab hmm_b bc <hmm_c'

                where N_ab corresponds to the maximum number of genes separating
                gene found by hmm_a and gene found by hmm_b, and hmm_ corresponds
                to the name of the hmm as provided in the keys of hmm_hits.
                More than two hmms can be concatenated. Strand location may be
                specificed by using '>' for sense and '<' for antisense.
            unordered (bool, optional): whether the HMMs should be arranged in the
                exact same order displayed in the synteny_structure or in
                any order. If ordered, the filters would filter collinear rather
                than syntenic structures. Defaults to True.
        """
        self._unordered = unordered
        self._hmm_hits = hmm_hits
        self._hmms = list(hmm_hits.keys())
        self._synteny_structure = synteny_structure
        self._contains_hmm_groups = syntenyparser.contains_HMM_groups(
            self._synteny_structure
        )
        self._parsed_structure = syntenyparser.parse_synteny_structure(
            self._synteny_structure
        )
        if self._unordered:
            self._parsed_structure["strands"] = [
                "" for _ in self._parsed_structure["strands"]
            ]
            logger.warning(
                "Disregarding strand location as unordered structured selected"
            )
        self._hmm_order_dict = dict(
            zip(
                self._parsed_structure["hmm_groups"],
                range(len(self._parsed_structure["hmm_groups"])),
            )
        )
        self._n_hmm_groups = len(self._parsed_structure["hmm_groups"])
        self._n_hmms = len(self._hmms)  # len(self._hmm_order_dict)
        self._unordered = unordered

    def _in_hmm_group(self, query_hmm_group: str, hmm_group: str) -> bool:
        """check if hmm name patters in query hmm group match hmm group"""
        query_split = query_hmm_group.split("|")
        return all([hmm_pattern in hmm_group for hmm_pattern in query_split])

    def _assign_code_to_HMM(self, hmm_group_name: str) -> int:
        hmm_group_name = str(hmm_group_name)
        code = [
            code
            for hmm_group, code in self._hmm_order_dict.items()
            if self._in_hmm_group(hmm_group_name, hmm_group)
            # if set(hmm_group_name.split("|")).issubset(set(hmm_group.split("|")))
        ]
        if len(code) > 1:
            logger.error(
                f"HMM: {hmm_group_name} found in more than one hmm group in synteny structure"
            )
            sys.exit(1)
        if not code:
            logger.error(f"No code found for HMM: {hmm_group_name}")
            sys.exit(1)
        return code[0]

    def _add_meta_codes_to_HMM_hits(self, all_hit_labels: pd.Dataframe) -> pd.Dataframe:
        """Add numeric codes for each hmm and strand, compute distance between genes"""
        all_hit_labels["gene_pos_diff"] = all_hit_labels.gene_pos.diff()
        all_hit_labels.loc[
            0, "gene_pos_diff"
        ] = 1  # required for rolling (skips first nan)
        all_hit_labels["hmm_code"] = all_hit_labels.hmm.apply(self._assign_code_to_HMM)
        all_hit_labels["strand"] = all_hit_labels.strand.apply(
            lambda strand: -1 if strand == "neg" else (1 if strand == "pos" else 0)
        )
        return all_hit_labels

    def _merge_hits_by_HMM_group(self, hits: pd.DataFrame):
        """Merge hit sequences within an HMM group representing
        the same gene symbol. Drop duplicated hits within
        each HMM group.
        """
        groups = hits.groupby(["full_label"]).groups
        for group_idxs in groups.values():
            hmm_names = set(hits.loc[group_idxs, "hmm"].values)
            candidate_hmm_group = [
                hmm_group_str
                for hmm_group_str in self._parsed_structure["hmm_groups"]
                if hmm_names.issubset(set(hmm_group_str.split("|")))
            ]
            if candidate_hmm_group:
                hmm_group = "|".join(hmm_names)
            else:
                hmm_group = "discard"
            hits.loc[group_idxs, "hmm"] = hmm_group
        hits = hits[hits.hmm != "discard"].drop_duplicates()
        hits.hmm = hits.hmm.apply(lambda x: str(x))
        return hits

    def get_all_HMM_hits(self) -> pd.DataFrame:
        """Group and preprocess all hit labels into a single dataframe.

        Returns:
            pd.DataFrame: HMMER3 hit labels matching provided HMMs.
        """
        hit_labels = {}
        for hmm, hits in self._hmm_hits.items():
            labels = hits.id.values.tolist()
            if not labels:
                logger.error(f"No records found in database matching HMM: {hmm}")
                sys.exit(1)
            hit_labels[hmm] = labelparser.parse_from_list(labels)
            hit_labels[hmm]["hmm"] = hmm
        # Create single dataframe with new column corresponding to HMM and all hits
        # Remove contigs with less hits than the number of hmms in synteny structure (drop if enabling partial searching)
        all_hit_labels = (
            pd.concat(hit_labels.values())
            .groupby("contig")
            .filter(lambda x: len(x) >= self._n_hmm_groups)
            .sort_values(["contig", "gene_pos"], ascending=True)
        )
        all_hit_labels = all_hit_labels.reset_index(drop=True)
        if self._contains_hmm_groups:
            all_hit_labels = self._merge_hits_by_HMM_group(all_hit_labels)
        all_hit_labels = all_hit_labels.reset_index(drop=True)
        return self._add_meta_codes_to_HMM_hits(all_hit_labels)

    def filter_hits_by_synteny_structure(self) -> dict:
        """Search for contigs that satisfy the given gene synteny structure.

        Returns:
            dict: HMMER3 hits separated by contig.
        """
        all_matched_hits = {}
        filters = SyntenyPatternFilters(
            self._synteny_structure, unordered=self._unordered
        )
        all_hit_labels = self.get_all_HMM_hits()
        if all_hit_labels.full_label.duplicated().any():
            logger.warning(
                "At least two different HMMs produced identical sequence hits"
            )
        if all_hit_labels.full_label.duplicated().sum() == all_hit_labels.shape[0] / 2:
            logger.error(
                (
                    "All input profile HMMs rendered identical sequence hits. "
                    "Inspect HMMER output tables to evaluate hits."
                )
            )
            sys.exit(1)
        contig_names = all_hit_labels.contig.unique()

        for contig in contig_names:
            contig_hits = all_hit_labels[all_hit_labels.contig == contig].reset_index(
                drop=True
            )
            matched_hit_labels = {
                hmm_group: [] for hmm_group in contig_hits.hmm.unique()
            }

            if contig_hits.hmm.nunique() >= self._n_hmms:
                hmm_match = contig_hits.hmm_code.rolling(
                    window=self._n_hmm_groups
                ).apply(filters.contains_hmm_pattern)
                strand_match = contig_hits.strand.rolling(
                    window=self._n_hmm_groups
                ).apply(filters.contains_strand_pattern)
                if self._n_hmm_groups > 1:
                    distance_match = contig_hits.gene_pos_diff.rolling(
                        window=self._n_hmm_groups
                    ).apply(filters.contains_distance_pattern)
                    matched_rows = contig_hits[
                        (hmm_match == 1) & (strand_match == 1) & (distance_match == 1)
                    ]
                else:
                    matched_rows = contig_hits[(hmm_match == 1) & (strand_match == 1)]
                for i in matched_rows.index:
                    matched_hits = contig_hits.iloc[
                        i - (self._n_hmm_groups - 1) : i + 1, :
                    ]
                    for label, hmm in zip(
                        (matched_hits.full_label.values), matched_hits.hmm
                    ):
                        matched_hit_labels[hmm].append(label)

                all_matched_hits[contig] = matched_hit_labels
        return all_matched_hits

__init__(hmm_hits, synteny_structure, unordered=True)

Search for contigs that satisfy the given gene synteny structure.

Parameters:

Name Type Description Default
hmm_hits dict

a dict of pandas DataFrames, as output by parseHMMsearchOutput with keys corresponding to hmm names

required
synteny_structure str

a str describing the desired synteny structure, structured as follows:

'>hmm_a N_ab hmm_b bc <hmm_c'

where N_ab corresponds to the maximum number of genes separating gene found by hmm_a and gene found by hmm_b, and hmm_ corresponds to the name of the hmm as provided in the keys of hmm_hits. More than two hmms can be concatenated. Strand location may be specificed by using '>' for sense and '<' for antisense.

required
unordered bool

whether the HMMs should be arranged in the exact same order displayed in the synteny_structure or in any order. If ordered, the filters would filter collinear rather than syntenic structures. Defaults to True.

True
Source code in src/pynteny/filter.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def __init__(
    self, hmm_hits: dict, synteny_structure: str, unordered: bool = True
) -> None:
    """Search for contigs that satisfy the given gene synteny structure.

    Args:
        hmm_hits (dict): a dict of pandas DataFrames, as output by
            parseHMMsearchOutput with keys corresponding to hmm names
        synteny_structure (str): a str describing the desired synteny structure,
            structured as follows:

            '>hmm_a N_ab hmm_b bc <hmm_c'

            where N_ab corresponds to the maximum number of genes separating
            gene found by hmm_a and gene found by hmm_b, and hmm_ corresponds
            to the name of the hmm as provided in the keys of hmm_hits.
            More than two hmms can be concatenated. Strand location may be
            specificed by using '>' for sense and '<' for antisense.
        unordered (bool, optional): whether the HMMs should be arranged in the
            exact same order displayed in the synteny_structure or in
            any order. If ordered, the filters would filter collinear rather
            than syntenic structures. Defaults to True.
    """
    self._unordered = unordered
    self._hmm_hits = hmm_hits
    self._hmms = list(hmm_hits.keys())
    self._synteny_structure = synteny_structure
    self._contains_hmm_groups = syntenyparser.contains_HMM_groups(
        self._synteny_structure
    )
    self._parsed_structure = syntenyparser.parse_synteny_structure(
        self._synteny_structure
    )
    if self._unordered:
        self._parsed_structure["strands"] = [
            "" for _ in self._parsed_structure["strands"]
        ]
        logger.warning(
            "Disregarding strand location as unordered structured selected"
        )
    self._hmm_order_dict = dict(
        zip(
            self._parsed_structure["hmm_groups"],
            range(len(self._parsed_structure["hmm_groups"])),
        )
    )
    self._n_hmm_groups = len(self._parsed_structure["hmm_groups"])
    self._n_hmms = len(self._hmms)  # len(self._hmm_order_dict)
    self._unordered = unordered

filter_hits_by_synteny_structure()

Search for contigs that satisfy the given gene synteny structure.

Returns:

Name Type Description
dict dict

HMMER3 hits separated by contig.

Source code in src/pynteny/filter.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
def filter_hits_by_synteny_structure(self) -> dict:
    """Search for contigs that satisfy the given gene synteny structure.

    Returns:
        dict: HMMER3 hits separated by contig.
    """
    all_matched_hits = {}
    filters = SyntenyPatternFilters(
        self._synteny_structure, unordered=self._unordered
    )
    all_hit_labels = self.get_all_HMM_hits()
    if all_hit_labels.full_label.duplicated().any():
        logger.warning(
            "At least two different HMMs produced identical sequence hits"
        )
    if all_hit_labels.full_label.duplicated().sum() == all_hit_labels.shape[0] / 2:
        logger.error(
            (
                "All input profile HMMs rendered identical sequence hits. "
                "Inspect HMMER output tables to evaluate hits."
            )
        )
        sys.exit(1)
    contig_names = all_hit_labels.contig.unique()

    for contig in contig_names:
        contig_hits = all_hit_labels[all_hit_labels.contig == contig].reset_index(
            drop=True
        )
        matched_hit_labels = {
            hmm_group: [] for hmm_group in contig_hits.hmm.unique()
        }

        if contig_hits.hmm.nunique() >= self._n_hmms:
            hmm_match = contig_hits.hmm_code.rolling(
                window=self._n_hmm_groups
            ).apply(filters.contains_hmm_pattern)
            strand_match = contig_hits.strand.rolling(
                window=self._n_hmm_groups
            ).apply(filters.contains_strand_pattern)
            if self._n_hmm_groups > 1:
                distance_match = contig_hits.gene_pos_diff.rolling(
                    window=self._n_hmm_groups
                ).apply(filters.contains_distance_pattern)
                matched_rows = contig_hits[
                    (hmm_match == 1) & (strand_match == 1) & (distance_match == 1)
                ]
            else:
                matched_rows = contig_hits[(hmm_match == 1) & (strand_match == 1)]
            for i in matched_rows.index:
                matched_hits = contig_hits.iloc[
                    i - (self._n_hmm_groups - 1) : i + 1, :
                ]
                for label, hmm in zip(
                    (matched_hits.full_label.values), matched_hits.hmm
                ):
                    matched_hit_labels[hmm].append(label)

            all_matched_hits[contig] = matched_hit_labels
    return all_matched_hits

get_all_HMM_hits()

Group and preprocess all hit labels into a single dataframe.

Returns:

Type Description
pd.DataFrame

pd.DataFrame: HMMER3 hit labels matching provided HMMs.

Source code in src/pynteny/filter.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def get_all_HMM_hits(self) -> pd.DataFrame:
    """Group and preprocess all hit labels into a single dataframe.

    Returns:
        pd.DataFrame: HMMER3 hit labels matching provided HMMs.
    """
    hit_labels = {}
    for hmm, hits in self._hmm_hits.items():
        labels = hits.id.values.tolist()
        if not labels:
            logger.error(f"No records found in database matching HMM: {hmm}")
            sys.exit(1)
        hit_labels[hmm] = labelparser.parse_from_list(labels)
        hit_labels[hmm]["hmm"] = hmm
    # Create single dataframe with new column corresponding to HMM and all hits
    # Remove contigs with less hits than the number of hmms in synteny structure (drop if enabling partial searching)
    all_hit_labels = (
        pd.concat(hit_labels.values())
        .groupby("contig")
        .filter(lambda x: len(x) >= self._n_hmm_groups)
        .sort_values(["contig", "gene_pos"], ascending=True)
    )
    all_hit_labels = all_hit_labels.reset_index(drop=True)
    if self._contains_hmm_groups:
        all_hit_labels = self._merge_hits_by_HMM_group(all_hit_labels)
    all_hit_labels = all_hit_labels.reset_index(drop=True)
    return self._add_meta_codes_to_HMM_hits(all_hit_labels)

SyntenyHits

Store and manipulate synteny hits by contig

Source code in src/pynteny/filter.py
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
class SyntenyHits:
    """Store and manipulate synteny hits by contig"""

    def __init__(self, synteny_hits: pd.DataFrame) -> None:
        """
        Initialize from synteny hits object.
        """
        self._synteny_hits = synteny_hits.drop_duplicates()

    @staticmethod
    def _hits_to_dataframe(hits_by_contig: dict) -> pd.DataFrame:
        """Return synteny hits as a dataframe"""
        data = []
        columns = [
            "contig",
            "gene_id",
            "gene_number",
            "locus",
            "strand",
            "full_label",
            "hmm",
        ]
        for contig, matched_hits in hits_by_contig.items():
            for hmm, labels in matched_hits.items():
                for label in labels:
                    parsed_label = labelparser.parse(label)
                    data.append(
                        [
                            parsed_label["contig"],
                            parsed_label["gene_id"],
                            parsed_label["gene_pos"],
                            parsed_label["locus_pos"],
                            parsed_label["strand"],
                            parsed_label["full_label"],
                            hmm,
                        ]
                    )
        return pd.DataFrame(data, columns=columns).sort_values(
            ["contig", "gene_number"]
        )

    @classmethod
    def from_hits_dict(cls, hits_by_contig: dict) -> SyntenyHits:
        """Initialize SyntenyHits object from hits_by_contig dictionary.

        Args:
            hits_by_contig (dict): HMMER3 hit labels separated by contig name.

        Returns:
            SyntenyHits: initialized object of class SyntenyHits.
        """
        return cls(cls._hits_to_dataframe(hits_by_contig))

    @property
    def hits(self) -> pd.DataFrame:
        """Return synteny hits.

        Returns:
            pd.DataFrame: Synteny hits as dataframe.
        """
        return self._synteny_hits

    def add_HMM_meta_info_to_hits(self, hmm_meta: Path) -> SyntenyHits:
        """Add molecular metadata to synteny hits.

        Args:
            hmm_meta (Path): path to PGAP metadata file.

        Returns:
            SyntenyHits: and instance of class SyntenyHits.
        """
        hmm_meta = Path(hmm_meta)
        fields = ["gene_symbol", "label", "product", "ec_number"]
        if all([f in self._synteny_hits.columns for f in fields]):
            return self._synteny_hits
        pgap = PGAP(hmm_meta)
        self._synteny_hits[fields] = ""
        for row in self._synteny_hits.itertuples():
            meta_values = [
                [
                    str(v).replace("nan", "")
                    for k, v in pgap.get_meta_info_for_HMM(hmm).items()
                    if k != "#ncbi_accession"
                ]
                for hmm in row.hmm.split("|")
            ]
            self._synteny_hits.loc[row.Index, fields] = [
                "|".join(v) for v in zip(*meta_values)
            ]
        return SyntenyHits(self._synteny_hits)

    def write_to_TSV(self, output_tsv: Path) -> None:
        """Write synteny hits to a TSV file.

        Args:
            output_tsv (Path): path to output tsv file.
        """
        output_tsv = Path(output_tsv)
        self._synteny_hits.to_csv(output_tsv, sep="\t", index=False)

    def write_hit_sequences_to_FASTA_files(
        self,
        sequence_database: Path,
        output_dir: Path,
        output_prefix: str = None,
    ) -> None:
        """Write matching sequences to FASTA files.

        Args:
            sequence_database (Path): path to the peptide or nucleotide sequence database
                in which the synteny search was conducted.
            output_dir (Path): path to output directory.
            output_prefix (str, optional): prefix for output files. Defaults to None.
        """
        sequence_database = Path(sequence_database)
        output_dir = Path(output_dir)
        fasta = FASTA(sequence_database)
        hmm_groups = self._synteny_hits.hmm.unique().tolist()

        for hmm_group in hmm_groups:
            record_ids = self._synteny_hits[
                self._synteny_hits.hmm == hmm_group
            ].full_label.values.tolist()

            if (
                "gene_symbol" in self._synteny_hits.columns
                and "label" in self._synteny_hits.columns
            ):
                gene_symbol = self._synteny_hits[
                    self._synteny_hits.hmm == hmm_group
                ].gene_symbol.unique()[0]
                gene_label = self._synteny_hits[
                    self._synteny_hits.hmm == hmm_group
                ].label.unique()[0]
                gene_id = (
                    gene_symbol
                    if not pd.isna(gene_symbol)
                    else (gene_label if not pd.isna(gene_label) else "")
                )
            else:
                gene_id = ""
            gene_id_str = gene_id + "_" if gene_id else ""

            output_fasta = (
                output_dir
                / f"{output_prefix}{gene_id_str.replace('|', '_')}{hmm_group.replace('|', '_')}_hits.fasta"
            )
            if record_ids:
                fasta.filter_by_IDs(
                    record_ids=record_ids,
                    output_file=output_fasta,
                    point_to_new_file=False,
                )
            else:
                logger.warning(
                    f"No record matches found in synteny structure for HMM: {hmm_group}"
                )

hits: pd.DataFrame property

Return synteny hits.

Returns:

Type Description
pd.DataFrame

pd.DataFrame: Synteny hits as dataframe.

__init__(synteny_hits)

Initialize from synteny hits object.

Source code in src/pynteny/filter.py
347
348
349
350
351
def __init__(self, synteny_hits: pd.DataFrame) -> None:
    """
    Initialize from synteny hits object.
    """
    self._synteny_hits = synteny_hits.drop_duplicates()

add_HMM_meta_info_to_hits(hmm_meta)

Add molecular metadata to synteny hits.

Parameters:

Name Type Description Default
hmm_meta Path

path to PGAP metadata file.

required

Returns:

Name Type Description
SyntenyHits SyntenyHits

and instance of class SyntenyHits.

Source code in src/pynteny/filter.py
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
def add_HMM_meta_info_to_hits(self, hmm_meta: Path) -> SyntenyHits:
    """Add molecular metadata to synteny hits.

    Args:
        hmm_meta (Path): path to PGAP metadata file.

    Returns:
        SyntenyHits: and instance of class SyntenyHits.
    """
    hmm_meta = Path(hmm_meta)
    fields = ["gene_symbol", "label", "product", "ec_number"]
    if all([f in self._synteny_hits.columns for f in fields]):
        return self._synteny_hits
    pgap = PGAP(hmm_meta)
    self._synteny_hits[fields] = ""
    for row in self._synteny_hits.itertuples():
        meta_values = [
            [
                str(v).replace("nan", "")
                for k, v in pgap.get_meta_info_for_HMM(hmm).items()
                if k != "#ncbi_accession"
            ]
            for hmm in row.hmm.split("|")
        ]
        self._synteny_hits.loc[row.Index, fields] = [
            "|".join(v) for v in zip(*meta_values)
        ]
    return SyntenyHits(self._synteny_hits)

from_hits_dict(hits_by_contig) classmethod

Initialize SyntenyHits object from hits_by_contig dictionary.

Parameters:

Name Type Description Default
hits_by_contig dict

HMMER3 hit labels separated by contig name.

required

Returns:

Name Type Description
SyntenyHits SyntenyHits

initialized object of class SyntenyHits.

Source code in src/pynteny/filter.py
385
386
387
388
389
390
391
392
393
394
395
@classmethod
def from_hits_dict(cls, hits_by_contig: dict) -> SyntenyHits:
    """Initialize SyntenyHits object from hits_by_contig dictionary.

    Args:
        hits_by_contig (dict): HMMER3 hit labels separated by contig name.

    Returns:
        SyntenyHits: initialized object of class SyntenyHits.
    """
    return cls(cls._hits_to_dataframe(hits_by_contig))

write_hit_sequences_to_FASTA_files(sequence_database, output_dir, output_prefix=None)

Write matching sequences to FASTA files.

Parameters:

Name Type Description Default
sequence_database Path

path to the peptide or nucleotide sequence database in which the synteny search was conducted.

required
output_dir Path

path to output directory.

required
output_prefix str

prefix for output files. Defaults to None.

None
Source code in src/pynteny/filter.py
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
def write_hit_sequences_to_FASTA_files(
    self,
    sequence_database: Path,
    output_dir: Path,
    output_prefix: str = None,
) -> None:
    """Write matching sequences to FASTA files.

    Args:
        sequence_database (Path): path to the peptide or nucleotide sequence database
            in which the synteny search was conducted.
        output_dir (Path): path to output directory.
        output_prefix (str, optional): prefix for output files. Defaults to None.
    """
    sequence_database = Path(sequence_database)
    output_dir = Path(output_dir)
    fasta = FASTA(sequence_database)
    hmm_groups = self._synteny_hits.hmm.unique().tolist()

    for hmm_group in hmm_groups:
        record_ids = self._synteny_hits[
            self._synteny_hits.hmm == hmm_group
        ].full_label.values.tolist()

        if (
            "gene_symbol" in self._synteny_hits.columns
            and "label" in self._synteny_hits.columns
        ):
            gene_symbol = self._synteny_hits[
                self._synteny_hits.hmm == hmm_group
            ].gene_symbol.unique()[0]
            gene_label = self._synteny_hits[
                self._synteny_hits.hmm == hmm_group
            ].label.unique()[0]
            gene_id = (
                gene_symbol
                if not pd.isna(gene_symbol)
                else (gene_label if not pd.isna(gene_label) else "")
            )
        else:
            gene_id = ""
        gene_id_str = gene_id + "_" if gene_id else ""

        output_fasta = (
            output_dir
            / f"{output_prefix}{gene_id_str.replace('|', '_')}{hmm_group.replace('|', '_')}_hits.fasta"
        )
        if record_ids:
            fasta.filter_by_IDs(
                record_ids=record_ids,
                output_file=output_fasta,
                point_to_new_file=False,
            )
        else:
            logger.warning(
                f"No record matches found in synteny structure for HMM: {hmm_group}"
            )

write_to_TSV(output_tsv)

Write synteny hits to a TSV file.

Parameters:

Name Type Description Default
output_tsv Path

path to output tsv file.

required
Source code in src/pynteny/filter.py
435
436
437
438
439
440
441
442
def write_to_TSV(self, output_tsv: Path) -> None:
    """Write synteny hits to a TSV file.

    Args:
        output_tsv (Path): path to output tsv file.
    """
    output_tsv = Path(output_tsv)
    self._synteny_hits.to_csv(output_tsv, sep="\t", index=False)

SyntenyPatternFilters

Methods to filter hmm hits in the same contig by synteny structure or collinearity. These filters are inputs to pandas.Dataframe.rolling method.

Source code in src/pynteny/filter.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
class SyntenyPatternFilters:
    """Methods to filter hmm hits in the  same contig by synteny structure
    or collinearity. These filters are inputs to pandas.Dataframe.rolling method.
    """

    def __init__(self, synteny_structure: str, unordered: bool = False) -> None:
        """Initialize filter class from synteny structure.

        Args:
            synteny_structure (str): a str describing the desired synteny structure,
                structured as follows:

                '>hmm_a N_ab hmm_b bc <hmm_c'

                where N_ab corresponds to the maximum number of genes separating
                gene found by hmm_a and gene found by hmm_b, and hmm_ corresponds
                to the name of the hmm as provided in the keys of hmm_hits.
                More than two hmms can be concatenated. Strand location may be
                specified by using '>' for sense and '<' for antisense.
            unordered (bool, optional): whether the HMMs should be arranged in the
                exact same order displayed in the synteny_structure or in
                any order. If ordered, the filters would filter collinear rather
                than syntenic structures. Defaults to False.
        """
        parsed_structure = syntenyparser.parse_synteny_structure(synteny_structure)
        if unordered:
            parsed_structure["strands"] = ["" for _ in parsed_structure["strands"]]
        self._hmm_order_dict = dict(
            zip(
                parsed_structure["hmm_groups"],
                range(len(parsed_structure["hmm_groups"])),
            )
        )
        # parsed_structure = syntenyparser.parse_synteny_structure(synteny_structure)
        # hmm_codes = list(range(len(parsed_structure["hmm_groups"])))
        self.hmm_code_order_pattern = [
            self._hmm_order_dict[hmm_group]
            for hmm_group in parsed_structure["hmm_groups"]
        ]
        if unordered:
            self.hmm_code_order_pattern = sorted(self.hmm_code_order_pattern)

        if unordered:
            max_distance = max(parsed_structure["distances"])
            max_distances = [float("inf")] + [
                max_distance for _ in parsed_structure["distances"]
            ]
        else:
            max_distances = [float("inf")] + parsed_structure["distances"]
        min_distances = [-float("inf")] + [0 for _ in parsed_structure["distances"]]
        self.distance_order_pattern = [
            (min_dist, max_dist + 1)
            for min_dist, max_dist in zip(min_distances, max_distances)
        ]

        self.strand_order_pattern = list(
            map(
                lambda strand: -1 if strand == "neg" else (1 if strand == "pos" else 0),
                parsed_structure["strands"],
            )
        )
        self._unordered = unordered

    def contains_hmm_pattern(self, data: pd.Series) -> int:
        """Check if series items contain a profile HMM

        Args:
            data (pd.Series): a series resulting from calling rolling on a pandas column.

        Returns:
            int: 1 for True 0 for False.
        """
        hmm_codes = [int(code) for code in data.values]
        if self._unordered:
            hmm_codes = sorted(hmm_codes)
        return 1 if hmm_codes == self.hmm_code_order_pattern else 0

    def contains_distance_pattern(self, data: pd.Series) -> int:
        """Check if series items satisfy the maximum distance
           between HMM hits.

        Args:
            data (pd.Series): a series resulting from calling rolling on a pandas column.

        Returns:
            int: 1 for True 0 for False.
        """
        return (
            1
            if all(
                [
                    (data_dist <= max_dist) and (data_dist > min_dist)
                    for data_dist, (min_dist, max_dist) in zip(
                        data.values.tolist(), self.distance_order_pattern
                    )
                ]
            )
            else 0
        )

    def contains_strand_pattern(self, data: pd.Series) -> int:
        """Check if series items satisfy the strand pattern
           between HMM hits.

        Args:
            data (pd.Series): a series resulting from calling rolling on a pandas column.

        Returns:
            int: 1 for True 0 for False.
        """
        strand_comparisons = []
        for data_strand, pattern_strand in zip(data.values, self.strand_order_pattern):
            if pattern_strand != 0:
                strand_comparisons.append(data_strand == pattern_strand)
            else:
                strand_comparisons.append(True)
        return 1 if all(strand_comparisons) else 0

__init__(synteny_structure, unordered=False)

Initialize filter class from synteny structure.

Parameters:

Name Type Description Default
synteny_structure str

a str describing the desired synteny structure, structured as follows:

'>hmm_a N_ab hmm_b bc <hmm_c'

where N_ab corresponds to the maximum number of genes separating gene found by hmm_a and gene found by hmm_b, and hmm_ corresponds to the name of the hmm as provided in the keys of hmm_hits. More than two hmms can be concatenated. Strand location may be specified by using '>' for sense and '<' for antisense.

required
unordered bool

whether the HMMs should be arranged in the exact same order displayed in the synteny_structure or in any order. If ordered, the filters would filter collinear rather than syntenic structures. Defaults to False.

False
Source code in src/pynteny/filter.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def __init__(self, synteny_structure: str, unordered: bool = False) -> None:
    """Initialize filter class from synteny structure.

    Args:
        synteny_structure (str): a str describing the desired synteny structure,
            structured as follows:

            '>hmm_a N_ab hmm_b bc <hmm_c'

            where N_ab corresponds to the maximum number of genes separating
            gene found by hmm_a and gene found by hmm_b, and hmm_ corresponds
            to the name of the hmm as provided in the keys of hmm_hits.
            More than two hmms can be concatenated. Strand location may be
            specified by using '>' for sense and '<' for antisense.
        unordered (bool, optional): whether the HMMs should be arranged in the
            exact same order displayed in the synteny_structure or in
            any order. If ordered, the filters would filter collinear rather
            than syntenic structures. Defaults to False.
    """
    parsed_structure = syntenyparser.parse_synteny_structure(synteny_structure)
    if unordered:
        parsed_structure["strands"] = ["" for _ in parsed_structure["strands"]]
    self._hmm_order_dict = dict(
        zip(
            parsed_structure["hmm_groups"],
            range(len(parsed_structure["hmm_groups"])),
        )
    )
    # parsed_structure = syntenyparser.parse_synteny_structure(synteny_structure)
    # hmm_codes = list(range(len(parsed_structure["hmm_groups"])))
    self.hmm_code_order_pattern = [
        self._hmm_order_dict[hmm_group]
        for hmm_group in parsed_structure["hmm_groups"]
    ]
    if unordered:
        self.hmm_code_order_pattern = sorted(self.hmm_code_order_pattern)

    if unordered:
        max_distance = max(parsed_structure["distances"])
        max_distances = [float("inf")] + [
            max_distance for _ in parsed_structure["distances"]
        ]
    else:
        max_distances = [float("inf")] + parsed_structure["distances"]
    min_distances = [-float("inf")] + [0 for _ in parsed_structure["distances"]]
    self.distance_order_pattern = [
        (min_dist, max_dist + 1)
        for min_dist, max_dist in zip(min_distances, max_distances)
    ]

    self.strand_order_pattern = list(
        map(
            lambda strand: -1 if strand == "neg" else (1 if strand == "pos" else 0),
            parsed_structure["strands"],
        )
    )
    self._unordered = unordered

contains_distance_pattern(data)

Check if series items satisfy the maximum distance between HMM hits.

Parameters:

Name Type Description Default
data pd.Series

a series resulting from calling rolling on a pandas column.

required

Returns:

Name Type Description
int int

1 for True 0 for False.

Source code in src/pynteny/filter.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def contains_distance_pattern(self, data: pd.Series) -> int:
    """Check if series items satisfy the maximum distance
       between HMM hits.

    Args:
        data (pd.Series): a series resulting from calling rolling on a pandas column.

    Returns:
        int: 1 for True 0 for False.
    """
    return (
        1
        if all(
            [
                (data_dist <= max_dist) and (data_dist > min_dist)
                for data_dist, (min_dist, max_dist) in zip(
                    data.values.tolist(), self.distance_order_pattern
                )
            ]
        )
        else 0
    )

contains_hmm_pattern(data)

Check if series items contain a profile HMM

Parameters:

Name Type Description Default
data pd.Series

a series resulting from calling rolling on a pandas column.

required

Returns:

Name Type Description
int int

1 for True 0 for False.

Source code in src/pynteny/filter.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def contains_hmm_pattern(self, data: pd.Series) -> int:
    """Check if series items contain a profile HMM

    Args:
        data (pd.Series): a series resulting from calling rolling on a pandas column.

    Returns:
        int: 1 for True 0 for False.
    """
    hmm_codes = [int(code) for code in data.values]
    if self._unordered:
        hmm_codes = sorted(hmm_codes)
    return 1 if hmm_codes == self.hmm_code_order_pattern else 0

contains_strand_pattern(data)

Check if series items satisfy the strand pattern between HMM hits.

Parameters:

Name Type Description Default
data pd.Series

a series resulting from calling rolling on a pandas column.

required

Returns:

Name Type Description
int int

1 for True 0 for False.

Source code in src/pynteny/filter.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def contains_strand_pattern(self, data: pd.Series) -> int:
    """Check if series items satisfy the strand pattern
       between HMM hits.

    Args:
        data (pd.Series): a series resulting from calling rolling on a pandas column.

    Returns:
        int: 1 for True 0 for False.
    """
    strand_comparisons = []
    for data_strand, pattern_strand in zip(data.values, self.strand_order_pattern):
        if pattern_strand != 0:
            strand_comparisons.append(data_strand == pattern_strand)
        else:
            strand_comparisons.append(True)
    return 1 if all(strand_comparisons) else 0

filter_FASTA_by_synteny_structure(synteny_structure, input_fasta, input_hmms, unordered=False, hmm_meta=None, hmmer_output_dir=None, reuse_hmmer_results=True, method='hmmsearch', processes=None, additional_args=None)

Generate protein-specific database by filtering sequence database to only contain sequences which satisfy the provided (gene/hmm) structure.

Parameters:

Name Type Description Default
synteny_structure str

a str describing the desired synteny structure, structured as follows:

'>hmm_a N_ab hmm_b bc <hmm_c'

where N_ab corresponds to the maximum number of genes separating gene found by hmm_a and gene found by hmm_b, and hmm_ corresponds to the name of the hmm as provided in the keys of hmm_hits. More than two hmms can be concatenated. Strand location may be specificed by using '>' for sense and '<' for antisense.

required
input_fasta Path

input fasta containing sequence database to be searched.

required
input_hmms list[Path]

list containing paths to hmms contained in synteny structure.

required
unordered bool

whether HMM hits should follow the exact order displayed in the synteny structure string or not, i.e., whether to search for only synteny (colocation) or collinearity as well (same order). Defaults to False.

False
hmm_meta Path

path to PGAP's metadata file. Defaults to None.

None
hmmer_output_dir Path

output directory to store HMMER3 output files. Defaults to None.

None
reuse_hmmer_results bool

if True then HMMER3 won't be run again for HMMs already searched in the same output directory. Defaults to True.

True
method str

select between 'hmmsearch' or 'hmmscan'. Defaults to 'hmmsearch'.

'hmmsearch'
processes int

maximum number of threads to be employed. Defaults to all minus one.

None
additional_args list[str]

additional arguments to hmmsearch or hmmscan. Each element in the list is a string with additional arguments for each input hmm (arranged in the same order), an element can also take a value of None to avoid passing additional arguments for a specific input hmm. A single string may also be passed, in which case the same additional argument is passed to hmmsearch for all input hmms. Defaults to None.

None

Returns:

Name Type Description
SyntenyHits SyntenyHits

object of class SyntenyHits containing labels matching synteny structure.

Source code in src/pynteny/filter.py
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
def filter_FASTA_by_synteny_structure(
    synteny_structure: str,
    input_fasta: Path,
    input_hmms: list[Path],
    unordered: bool = False,
    hmm_meta: Path = None,
    hmmer_output_dir: Path = None,
    reuse_hmmer_results: bool = True,
    method: str = "hmmsearch",
    processes: int = None,
    additional_args: list[str] = None,
) -> SyntenyHits:
    """Generate protein-specific database by filtering sequence database
       to only contain sequences which satisfy the provided (gene/hmm) structure.

    Args:
        synteny_structure (str): a str describing the desired synteny structure,
            structured as follows:

            '>hmm_a N_ab hmm_b bc <hmm_c'

            where N_ab corresponds to the maximum number of genes separating
            gene found by hmm_a and gene found by hmm_b, and hmm_ corresponds
            to the name of the hmm as provided in the keys of hmm_hits.
            More than two hmms can be concatenated. Strand location may be
            specificed by using '>' for sense and '<' for antisense.
        input_fasta (Path): input fasta containing sequence database to be searched.
        input_hmms (list[Path]): list containing paths to hmms contained in synteny structure.
        unordered (bool, optional): whether HMM hits should follow the exact order
            displayed in the synteny structure string or not, i.e., whether
            to search for only synteny (colocation) or collinearity as well
            (same order). Defaults to False.
        hmm_meta (Path, optional): path to PGAP's metadata file. Defaults to None.
        hmmer_output_dir (Path, optional): output directory to store HMMER3 output files. Defaults to None.
        reuse_hmmer_results (bool, optional): if True then HMMER3 won't be run again for HMMs already
            searched in the same output directory. Defaults to True.
        method (str, optional): select between 'hmmsearch' or 'hmmscan'. Defaults to 'hmmsearch'.
        processes (int, optional): maximum number of threads to be employed. Defaults to all minus one.
        additional_args (list[str], optional): additional arguments to hmmsearch or hmmscan. Each
            element in the list is a string with additional arguments for each
            input hmm (arranged in the same order), an element can also take a
            value of None to avoid passing additional arguments for a specific
            input hmm. A single string may also be passed, in which case the
            same additional argument is passed to hmmsearch for all input hmms. Defaults to None.

    Returns:
        SyntenyHits: object of class SyntenyHits containing labels matching synteny structure.
    """
    input_fasta = Path(input_fasta)
    if hmmer_output_dir is None:
        hmmer_output_dir = Path(input_fasta.parent) / "hmmer_outputs"
    else:
        hmmer_output_dir = Path(hmmer_output_dir)

    if additional_args is None:
        additional_args = [None for _ in input_hmms]

    if isinstance(additional_args, str):
        logger.warning(f"Repeating hmmsearch arg: '{additional_args}' for all HMMs")
        additional_args = [additional_args for _ in input_hmms]

    elif isinstance(additional_args, list):
        if len(additional_args) == 1:
            logger.warning(
                f"Repeating hmmsearch arg: '{additional_args[0]}' for all HMMs"
            )
            additional_args = [additional_args[0] for _ in input_hmms]

        if (len(additional_args) > 1) and (len(additional_args) < len(input_hmms)):
            logger.error(
                "Provided additional argument strings are less than the number of input hmms."
            )
    else:
        logger.error(
            "Additional arguments must be: 1) a list[str], 2) a str, or 3) None"
        )
        sys.exit(1)
    if not os.path.isdir(hmmer_output_dir):
        os.mkdir(hmmer_output_dir)

    logger.info("Running Hmmer")
    hmmer = HMMER(
        input_hmms=input_hmms,
        input_data=input_fasta,
        hmm_output_dir=hmmer_output_dir,
        processes=processes,
        additional_args=additional_args,
    )
    hmm_hits = hmmer.get_HMMER_tables(
        reuse_hmmer_results=reuse_hmmer_results, method=method
    )
    hmms_without_hits = []
    for hmm_name, hmmer_hits in hmm_hits.items():
        if hmmer_hits.empty:
            hmms_without_hits.append(hmm_name)
    if hmms_without_hits:
        for hmm_name in hmms_without_hits:
            logger.error(f"No records found in database matching HMM: {hmm_name}")
        sys.exit(1)

    logger.info("Filtering results by synteny structure")
    syntenyfilter = SyntenyHMMfilter(hmm_hits, synteny_structure, unordered=unordered)
    hits_by_contig = syntenyfilter.filter_hits_by_synteny_structure()
    if hmm_meta is not None:
        return SyntenyHits.from_hits_dict(hits_by_contig).add_HMM_meta_info_to_hits(
            hmm_meta
        )
    else:
        return SyntenyHits.from_hits_dict(hits_by_contig)

Module preprocessing

Tools to preprocess sequence databases

  1. Remove illegal characters from peptide sequences
  2. Remove illegal symbols from file paths
  3. Relabel fasta records and make dictionary with old labels

Database

_Sequence database constructor

Source code in src/pynteny/preprocessing.py
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
class Database:
    """_Sequence database constructor"""

    def __init__(self, data: Path) -> None:
        """Initialize Database object

        Args:
            data (Path): path to either assembly fasta file (or a
                         directory containing assembly fasta files) or
                         a genbank file containing ORF annotations (or a
                         directory containing genbank files)

        Raises:
            FileNotFoundError: if file or directory doesn't exist
        """
        self._data = Path(data)
        if not self._data.exists():
            raise FileNotFoundError(f"{self._data} does not exist")
        if self._data.is_dir():
            self._data_files = [f for f in self._data.iterdir()]
        else:
            self._data_files = [self._data]

    @staticmethod
    def is_fasta(filename: Path) -> bool:
        """Check if file is in fasta format

        Args:
            filename (Path): path to input file

        Returns:
            bool: whether the file is in fasta format
        """
        filename = Path(filename)
        if filename.exists():
            fasta = list(SeqIO.parse(str(filename), "fasta"))
            return any(fasta)
        else:
            return False

    @staticmethod
    def is_gbk(filename: Path) -> bool:
        """Check if file is in genbank format

        Args:
            filename (Path): path to input file

        Returns:
            _bool: whether the file is in genbank format
        """
        filename = Path(filename)
        if filename.exists():
            gbk = list(SeqIO.parse(str(filename), "genbank"))
            return any(gbk)
        else:
            return False

    def build(
        self,
        seq_prefix: str = None,
        prepend_file_name: bool = False,
        output_file: Path = None,
        processes: int = None,
        tempdir: Path = None,
    ) -> LabelledFASTA:
        """Build database from data files.

        Args:
            seq_prefix (str, optionall): prefix to be added to each sequence in database.
                Defaults to "".
            prepend_file_name (bool, optional): whether to add file name as genome ID to
                each record in the result merged fasta file.
            output_file (Path, optional): path to output file. Defaults to None.
            processes (int, optional): maximum number of threads. Defaults to all minus one.
            tmpdir (Path, optional): path to temporary directory. Defaults to tempfile default.

        Returns:
            LabelledFASTA: object containing the labelled peptide database.
        """
        if output_file is None:
            output_file = self._data.parent / f"{self._data.stem}_labelled.faa"
        else:
            output_file = Path(output_file)
        if self.is_fasta(self._data_files[0]):
            with tempfile.NamedTemporaryFile() as tmpmerge:
                if self._data.is_dir():
                    assembly_fasta = FASTA.from_FASTA_directory(
                        self._data,
                        merged_fasta=Path(tmpmerge.name),
                        prepend_file_name=prepend_file_name,
                    )
                else:
                    assembly_fasta = FASTA(self._data)
                logger.info("Translating and annotating assembly data.")
                labelled_database = GeneAnnotator(assembly_fasta).annotate(
                    output_file=output_file,
                    processes=processes,
                    tempdir=tempdir,
                )
        elif self.is_gbk(self._data_files[0]):
            logger.info("Parsing GenBank data.")
            labelled_database = LabelledFASTA.from_genbank(
                self._data, output_file=output_file, prepend_file_name=prepend_file_name
            )
        else:
            logging.error(f"{self._data} is not a valid FASTA or genbank file")
            sys.exit(1)
        if seq_prefix is not None:
            labelled_database.add_prefix_to_records(seq_prefix, output_file)
            labelled_database = LabelledFASTA(labelled_database.file_path)
        return labelled_database

__init__(data)

Initialize Database object

Parameters:

Name Type Description Default
data Path

path to either assembly fasta file (or a directory containing assembly fasta files) or a genbank file containing ORF annotations (or a directory containing genbank files)

required

Raises:

Type Description
FileNotFoundError

if file or directory doesn't exist

Source code in src/pynteny/preprocessing.py
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
def __init__(self, data: Path) -> None:
    """Initialize Database object

    Args:
        data (Path): path to either assembly fasta file (or a
                     directory containing assembly fasta files) or
                     a genbank file containing ORF annotations (or a
                     directory containing genbank files)

    Raises:
        FileNotFoundError: if file or directory doesn't exist
    """
    self._data = Path(data)
    if not self._data.exists():
        raise FileNotFoundError(f"{self._data} does not exist")
    if self._data.is_dir():
        self._data_files = [f for f in self._data.iterdir()]
    else:
        self._data_files = [self._data]

build(seq_prefix=None, prepend_file_name=False, output_file=None, processes=None, tempdir=None)

Build database from data files.

Parameters:

Name Type Description Default
seq_prefix str, optionall

prefix to be added to each sequence in database. Defaults to "".

None
prepend_file_name bool

whether to add file name as genome ID to each record in the result merged fasta file.

False
output_file Path

path to output file. Defaults to None.

None
processes int

maximum number of threads. Defaults to all minus one.

None
tmpdir Path

path to temporary directory. Defaults to tempfile default.

required

Returns:

Name Type Description
LabelledFASTA LabelledFASTA

object containing the labelled peptide database.

Source code in src/pynteny/preprocessing.py
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
def build(
    self,
    seq_prefix: str = None,
    prepend_file_name: bool = False,
    output_file: Path = None,
    processes: int = None,
    tempdir: Path = None,
) -> LabelledFASTA:
    """Build database from data files.

    Args:
        seq_prefix (str, optionall): prefix to be added to each sequence in database.
            Defaults to "".
        prepend_file_name (bool, optional): whether to add file name as genome ID to
            each record in the result merged fasta file.
        output_file (Path, optional): path to output file. Defaults to None.
        processes (int, optional): maximum number of threads. Defaults to all minus one.
        tmpdir (Path, optional): path to temporary directory. Defaults to tempfile default.

    Returns:
        LabelledFASTA: object containing the labelled peptide database.
    """
    if output_file is None:
        output_file = self._data.parent / f"{self._data.stem}_labelled.faa"
    else:
        output_file = Path(output_file)
    if self.is_fasta(self._data_files[0]):
        with tempfile.NamedTemporaryFile() as tmpmerge:
            if self._data.is_dir():
                assembly_fasta = FASTA.from_FASTA_directory(
                    self._data,
                    merged_fasta=Path(tmpmerge.name),
                    prepend_file_name=prepend_file_name,
                )
            else:
                assembly_fasta = FASTA(self._data)
            logger.info("Translating and annotating assembly data.")
            labelled_database = GeneAnnotator(assembly_fasta).annotate(
                output_file=output_file,
                processes=processes,
                tempdir=tempdir,
            )
    elif self.is_gbk(self._data_files[0]):
        logger.info("Parsing GenBank data.")
        labelled_database = LabelledFASTA.from_genbank(
            self._data, output_file=output_file, prepend_file_name=prepend_file_name
        )
    else:
        logging.error(f"{self._data} is not a valid FASTA or genbank file")
        sys.exit(1)
    if seq_prefix is not None:
        labelled_database.add_prefix_to_records(seq_prefix, output_file)
        labelled_database = LabelledFASTA(labelled_database.file_path)
    return labelled_database

is_fasta(filename) staticmethod

Check if file is in fasta format

Parameters:

Name Type Description Default
filename Path

path to input file

required

Returns:

Name Type Description
bool bool

whether the file is in fasta format

Source code in src/pynteny/preprocessing.py
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
@staticmethod
def is_fasta(filename: Path) -> bool:
    """Check if file is in fasta format

    Args:
        filename (Path): path to input file

    Returns:
        bool: whether the file is in fasta format
    """
    filename = Path(filename)
    if filename.exists():
        fasta = list(SeqIO.parse(str(filename), "fasta"))
        return any(fasta)
    else:
        return False

is_gbk(filename) staticmethod

Check if file is in genbank format

Parameters:

Name Type Description Default
filename Path

path to input file

required

Returns:

Name Type Description
_bool bool

whether the file is in genbank format

Source code in src/pynteny/preprocessing.py
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
@staticmethod
def is_gbk(filename: Path) -> bool:
    """Check if file is in genbank format

    Args:
        filename (Path): path to input file

    Returns:
        _bool: whether the file is in genbank format
    """
    filename = Path(filename)
    if filename.exists():
        gbk = list(SeqIO.parse(str(filename), "genbank"))
        return any(gbk)
    else:
        return False

FASTA

Handle and process fasta files.

Source code in src/pynteny/preprocessing.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
class FASTA:
    """Handle and process fasta files."""

    def __init__(self, input_file: Path) -> None:
        """Initialize FASTA object.

        Args:
            input_file (Path): path to input fasta file.
        """
        self._input_file = Path(input_file)

    @property
    def file_path(self) -> Path:
        """Set new path to fasta file

        Args:
            new_path (Path, optional): path to fasta file. Defaults to None.

        Returns:
            Path: path to fasta file.
        """
        return self._input_file

    @file_path.setter
    def file_path(self, new_path: Path) -> None:
        self._input_file = Path(new_path)

    @classmethod
    def from_FASTA_directory(
        cls, input_dir: Path, merged_fasta: Path = None, prepend_file_name: bool = False
    ) -> FASTA:
        """Initialize FASTA class from directory of fasta files.

        Args:
            input_dir (Path): path to input directory.
            merged_fasta (Path, optional): path to output merged fasta. Defaults to None.
            prepend_file_name (bool, optional): whether to add file name as genome ID to
                each record in the result merged fasta file.

        Returns:
            FASTA: an initialized instance of class FASTA.
        """
        input_dir = Path(input_dir)
        if merged_fasta is None:
            merged_fasta = input_dir / "merged_database.fasta"
        else:
            merged_fasta = Path(merged_fasta)
        FASTAmerger(input_dir).merge(merged_fasta, prepend_file_name)
        return cls(merged_fasta)

    def remove_duplicates(
        self,
        output_file: Path = None,
        export_duplicates: bool = False,
        point_to_new_file: bool = True,
    ) -> None:
        """Removes duplicate entries (either by sequence or ID) from fasta.

        Args:
            output_file (Path, optional): path to output fasta file. Defaults to None.
            export_duplicates (bool, optional): whether duplicated records are exported to a file. Defaults to False.
            point_to_new_file (bool, optional): whether FASTA object should point to the newly generated file. Defaults to True.

        Yields:
            None: None
        """
        if output_file is None:
            output_file = (
                Path(self._input_file.parent)
                / f"{self._input_file.stem}_noduplicates{self._input_file.suffix}"
            )
        else:
            output_file = Path(output_file)
        wrappers.run_seqkit_nodup(
            input_fasta=self._input_file,
            output_fasta=output_file,
            export_duplicates=export_duplicates,
        )
        if point_to_new_file:
            self.file_path = output_file

    def remove_corrupted_sequences(
        self,
        output_file: Path = None,
        is_peptide: bool = True,
        keep_stop_codon: bool = False,
        point_to_new_file: bool = True,
    ) -> None:
        """Filter out (DNA or peptide) sequences containing illegal characters.

        Args:
            output_file (Path, optional): path to output fasta file. Defaults to None.
            is_peptide (bool, optional): select if input is a peptide sequence, otherwise taken as nucleotide. Defaults to True.
            keep_stop_codon (bool, optional): whether to keep the stop codon in the peptide sequence. Defaults to False.
            point_to_new_file (bool, optional): whether FASTA object should point to the newly generated file. Defaults to True.
        """
        dirname = self._input_file.parent
        fname, ext = self._input_file.stem, self._input_file.suffix
        if output_file is None:
            output_file = Path(dirname) / f"{fname}_modified{ext}"
        else:
            output_file = Path(output_file)
        if is_peptide:
            isLegitSequence = is_legit_peptide_sequence
        else:
            isLegitSequence = is_legit_DNA_sequence

        fasta = pyfastx.Fasta(
            self.file_path.as_posix(), build_index=False, full_name=True
        )
        with open(output_file, "w+", encoding="UTF-8") as outfile:
            for record_name, record_seq in fasta:
                if is_peptide and (not keep_stop_codon):
                    record_seq = remove_stop_sodon_signals(record_seq)
                if isLegitSequence(record_seq):
                    outfile.write(f">{record_name}\n{record_seq}\n")
        if point_to_new_file:
            self.file_path = output_file

    def filter_by_IDs(
        self, record_ids: list, output_file: Path = None, point_to_new_file: bool = True
    ) -> None:
        """Filter records in fasta file matching provided IDs.

        Args:
            record_ids (list): list of record IDs to keep of original fasta file.
            output_file (Path, optional): path to output filtered fasta file. Defaults to None.
            point_to_new_file (bool, optional): whether FASTA object should point to the newly generated file. Defaults to True.
        """
        if output_file is None:
            output_file = (
                Path(self._input_file.parent)
                / f"{self._input_file.stem}_filtered{self._input_file.suffix}"
            )
        else:
            output_file = Path(output_file)
        with tempfile.NamedTemporaryFile(mode="w+t") as tmp_ids:
            tmp_ids.writelines("\n".join(record_ids))
            tmp_ids.flush()
            tmp_ids_path = tmp_ids.name
            cmd_str = (
                f"seqkit grep -i -f {tmp_ids_path} {self._input_file} -o {output_file}"
            )
            utils.terminal_execute(cmd_str, suppress_shell_output=True)
        if point_to_new_file:
            self.file_path = output_file

    def split_by_contigs(self, output_dir: Path = None) -> None:
        """Split large fasta file into several ones containing one contig each.

        Args:
            output_dir (Path, optional): _description_. Defaults to None.
        """
        if output_dir is None:
            output_dir = (
                Path(self._input_file.parent) / "split_" + self._input_file.name
            )
        else:
            output_dir = Path(output_dir)
        output_dir.mkdir(parents=True, exist_ok=True)
        contigs = pyfastx.Fasta(
            self.file_path.as_posix(), build_index=False, full_name=True
        )
        for contig_name, seq in contigs:
            output_file = (
                output_dir / f"{contig_name.split(' ')[0]}{self._input_file.suffix}"
            )
            with open(output_file, "w+", encoding="UTF-8") as outfile:
                outfile.write(f">{contig_name}\n")
                outfile.write(seq + "\n")

    def filter_by_minimum_length(
        self, min_length: int, output_file: Path = None, point_to_new_file: bool = True
    ) -> None:
        """Filter records in fasta file by minimum length.

        Args:
            min_length (int): minimal length of sequences to be kept in filtered fasta file.
            output_file (Path, optional): path to output filtered fasta file. Defaults to None.
            point_to_new_file (bool, optional): whether FASTA object should point to the newly generated file. Defaults to True.
        """
        if output_file is None:
            output_file = (
                Path(self._input_file.parent)
                / f"{self._input_file.stem}_minlength{self._input_file.suffix}"
            )
        else:
            output_file = Path(output_file)
        fasta = pyfastx.Fasta(
            self.file_path.as_posix(), build_index=False, full_name=True
        )
        with open(output_file, "w+", encoding="UTF-8") as outfile:
            for record_name, record_seq in fasta:
                if len(record_seq) >= min_length:
                    outfile.write(f">{record_name}\n{record_seq}\n")
        if point_to_new_file:
            self.file_path = output_file

    def add_prefix_to_records(
        self, prefix: str, output_file: Path = None, point_to_new_file: bool = True
    ) -> None:
        """Add prefix to sequence records in FASTA

        Args:
            prefix (str): prefix to be added.
            output_file (Path, optional): path to output filtered fasta file. Defaults to None.
            point_to_new_file (bool, optional): whether FASTA object should point to the newly generated file. Defaults to True.
        """
        if output_file is None:
            output_file = (
                Path(self._input_file.parent)
                / f"{self._input_file.stem}_prefixed{self._input_file.suffix}"
            )
        else:
            output_file = Path(output_file)
        fasta = pyfastx.Fasta(
            self.file_path.as_posix(), build_index=False, full_name=True
        )
        prefix = prefix.strip("_")
        with open(output_file, "w+", encoding="UTF-8") as outfile:
            for record_name, record_seq in fasta:
                outfile.write(f">{prefix}_{record_name}\n{record_seq}\n")
        if point_to_new_file:
            self.file_path = output_file

file_path: Path property writable

Set new path to fasta file

Parameters:

Name Type Description Default
new_path Path

path to fasta file. Defaults to None.

required

Returns:

Name Type Description
Path Path

path to fasta file.

__init__(input_file)

Initialize FASTA object.

Parameters:

Name Type Description Default
input_file Path

path to input fasta file.

required
Source code in src/pynteny/preprocessing.py
140
141
142
143
144
145
146
def __init__(self, input_file: Path) -> None:
    """Initialize FASTA object.

    Args:
        input_file (Path): path to input fasta file.
    """
    self._input_file = Path(input_file)

add_prefix_to_records(prefix, output_file=None, point_to_new_file=True)

Add prefix to sequence records in FASTA

Parameters:

Name Type Description Default
prefix str

prefix to be added.

required
output_file Path

path to output filtered fasta file. Defaults to None.

None
point_to_new_file bool

whether FASTA object should point to the newly generated file. Defaults to True.

True
Source code in src/pynteny/preprocessing.py
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
def add_prefix_to_records(
    self, prefix: str, output_file: Path = None, point_to_new_file: bool = True
) -> None:
    """Add prefix to sequence records in FASTA

    Args:
        prefix (str): prefix to be added.
        output_file (Path, optional): path to output filtered fasta file. Defaults to None.
        point_to_new_file (bool, optional): whether FASTA object should point to the newly generated file. Defaults to True.
    """
    if output_file is None:
        output_file = (
            Path(self._input_file.parent)
            / f"{self._input_file.stem}_prefixed{self._input_file.suffix}"
        )
    else:
        output_file = Path(output_file)
    fasta = pyfastx.Fasta(
        self.file_path.as_posix(), build_index=False, full_name=True
    )
    prefix = prefix.strip("_")
    with open(output_file, "w+", encoding="UTF-8") as outfile:
        for record_name, record_seq in fasta:
            outfile.write(f">{prefix}_{record_name}\n{record_seq}\n")
    if point_to_new_file:
        self.file_path = output_file

filter_by_IDs(record_ids, output_file=None, point_to_new_file=True)

Filter records in fasta file matching provided IDs.

Parameters:

Name Type Description Default
record_ids list

list of record IDs to keep of original fasta file.

required
output_file Path

path to output filtered fasta file. Defaults to None.

None
point_to_new_file bool

whether FASTA object should point to the newly generated file. Defaults to True.

True
Source code in src/pynteny/preprocessing.py
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def filter_by_IDs(
    self, record_ids: list, output_file: Path = None, point_to_new_file: bool = True
) -> None:
    """Filter records in fasta file matching provided IDs.

    Args:
        record_ids (list): list of record IDs to keep of original fasta file.
        output_file (Path, optional): path to output filtered fasta file. Defaults to None.
        point_to_new_file (bool, optional): whether FASTA object should point to the newly generated file. Defaults to True.
    """
    if output_file is None:
        output_file = (
            Path(self._input_file.parent)
            / f"{self._input_file.stem}_filtered{self._input_file.suffix}"
        )
    else:
        output_file = Path(output_file)
    with tempfile.NamedTemporaryFile(mode="w+t") as tmp_ids:
        tmp_ids.writelines("\n".join(record_ids))
        tmp_ids.flush()
        tmp_ids_path = tmp_ids.name
        cmd_str = (
            f"seqkit grep -i -f {tmp_ids_path} {self._input_file} -o {output_file}"
        )
        utils.terminal_execute(cmd_str, suppress_shell_output=True)
    if point_to_new_file:
        self.file_path = output_file

filter_by_minimum_length(min_length, output_file=None, point_to_new_file=True)

Filter records in fasta file by minimum length.

Parameters:

Name Type Description Default
min_length int

minimal length of sequences to be kept in filtered fasta file.

required
output_file Path

path to output filtered fasta file. Defaults to None.

None
point_to_new_file bool

whether FASTA object should point to the newly generated file. Defaults to True.

True
Source code in src/pynteny/preprocessing.py
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
def filter_by_minimum_length(
    self, min_length: int, output_file: Path = None, point_to_new_file: bool = True
) -> None:
    """Filter records in fasta file by minimum length.

    Args:
        min_length (int): minimal length of sequences to be kept in filtered fasta file.
        output_file (Path, optional): path to output filtered fasta file. Defaults to None.
        point_to_new_file (bool, optional): whether FASTA object should point to the newly generated file. Defaults to True.
    """
    if output_file is None:
        output_file = (
            Path(self._input_file.parent)
            / f"{self._input_file.stem}_minlength{self._input_file.suffix}"
        )
    else:
        output_file = Path(output_file)
    fasta = pyfastx.Fasta(
        self.file_path.as_posix(), build_index=False, full_name=True
    )
    with open(output_file, "w+", encoding="UTF-8") as outfile:
        for record_name, record_seq in fasta:
            if len(record_seq) >= min_length:
                outfile.write(f">{record_name}\n{record_seq}\n")
    if point_to_new_file:
        self.file_path = output_file

from_FASTA_directory(input_dir, merged_fasta=None, prepend_file_name=False) classmethod

Initialize FASTA class from directory of fasta files.

Parameters:

Name Type Description Default
input_dir Path

path to input directory.

required
merged_fasta Path

path to output merged fasta. Defaults to None.

None
prepend_file_name bool

whether to add file name as genome ID to each record in the result merged fasta file.

False

Returns:

Name Type Description
FASTA FASTA

an initialized instance of class FASTA.

Source code in src/pynteny/preprocessing.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
@classmethod
def from_FASTA_directory(
    cls, input_dir: Path, merged_fasta: Path = None, prepend_file_name: bool = False
) -> FASTA:
    """Initialize FASTA class from directory of fasta files.

    Args:
        input_dir (Path): path to input directory.
        merged_fasta (Path, optional): path to output merged fasta. Defaults to None.
        prepend_file_name (bool, optional): whether to add file name as genome ID to
            each record in the result merged fasta file.

    Returns:
        FASTA: an initialized instance of class FASTA.
    """
    input_dir = Path(input_dir)
    if merged_fasta is None:
        merged_fasta = input_dir / "merged_database.fasta"
    else:
        merged_fasta = Path(merged_fasta)
    FASTAmerger(input_dir).merge(merged_fasta, prepend_file_name)
    return cls(merged_fasta)

remove_corrupted_sequences(output_file=None, is_peptide=True, keep_stop_codon=False, point_to_new_file=True)

Filter out (DNA or peptide) sequences containing illegal characters.

Parameters:

Name Type Description Default
output_file Path

path to output fasta file. Defaults to None.

None
is_peptide bool

select if input is a peptide sequence, otherwise taken as nucleotide. Defaults to True.

True
keep_stop_codon bool

whether to keep the stop codon in the peptide sequence. Defaults to False.

False
point_to_new_file bool

whether FASTA object should point to the newly generated file. Defaults to True.

True
Source code in src/pynteny/preprocessing.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
def remove_corrupted_sequences(
    self,
    output_file: Path = None,
    is_peptide: bool = True,
    keep_stop_codon: bool = False,
    point_to_new_file: bool = True,
) -> None:
    """Filter out (DNA or peptide) sequences containing illegal characters.

    Args:
        output_file (Path, optional): path to output fasta file. Defaults to None.
        is_peptide (bool, optional): select if input is a peptide sequence, otherwise taken as nucleotide. Defaults to True.
        keep_stop_codon (bool, optional): whether to keep the stop codon in the peptide sequence. Defaults to False.
        point_to_new_file (bool, optional): whether FASTA object should point to the newly generated file. Defaults to True.
    """
    dirname = self._input_file.parent
    fname, ext = self._input_file.stem, self._input_file.suffix
    if output_file is None:
        output_file = Path(dirname) / f"{fname}_modified{ext}"
    else:
        output_file = Path(output_file)
    if is_peptide:
        isLegitSequence = is_legit_peptide_sequence
    else:
        isLegitSequence = is_legit_DNA_sequence

    fasta = pyfastx.Fasta(
        self.file_path.as_posix(), build_index=False, full_name=True
    )
    with open(output_file, "w+", encoding="UTF-8") as outfile:
        for record_name, record_seq in fasta:
            if is_peptide and (not keep_stop_codon):
                record_seq = remove_stop_sodon_signals(record_seq)
            if isLegitSequence(record_seq):
                outfile.write(f">{record_name}\n{record_seq}\n")
    if point_to_new_file:
        self.file_path = output_file

remove_duplicates(output_file=None, export_duplicates=False, point_to_new_file=True)

Removes duplicate entries (either by sequence or ID) from fasta.

Parameters:

Name Type Description Default
output_file Path

path to output fasta file. Defaults to None.

None
export_duplicates bool

whether duplicated records are exported to a file. Defaults to False.

False
point_to_new_file bool

whether FASTA object should point to the newly generated file. Defaults to True.

True

Yields:

Name Type Description
None None

None

Source code in src/pynteny/preprocessing.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def remove_duplicates(
    self,
    output_file: Path = None,
    export_duplicates: bool = False,
    point_to_new_file: bool = True,
) -> None:
    """Removes duplicate entries (either by sequence or ID) from fasta.

    Args:
        output_file (Path, optional): path to output fasta file. Defaults to None.
        export_duplicates (bool, optional): whether duplicated records are exported to a file. Defaults to False.
        point_to_new_file (bool, optional): whether FASTA object should point to the newly generated file. Defaults to True.

    Yields:
        None: None
    """
    if output_file is None:
        output_file = (
            Path(self._input_file.parent)
            / f"{self._input_file.stem}_noduplicates{self._input_file.suffix}"
        )
    else:
        output_file = Path(output_file)
    wrappers.run_seqkit_nodup(
        input_fasta=self._input_file,
        output_fasta=output_file,
        export_duplicates=export_duplicates,
    )
    if point_to_new_file:
        self.file_path = output_file

split_by_contigs(output_dir=None)

Split large fasta file into several ones containing one contig each.

Parameters:

Name Type Description Default
output_dir Path

description. Defaults to None.

None
Source code in src/pynteny/preprocessing.py
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
def split_by_contigs(self, output_dir: Path = None) -> None:
    """Split large fasta file into several ones containing one contig each.

    Args:
        output_dir (Path, optional): _description_. Defaults to None.
    """
    if output_dir is None:
        output_dir = (
            Path(self._input_file.parent) / "split_" + self._input_file.name
        )
    else:
        output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)
    contigs = pyfastx.Fasta(
        self.file_path.as_posix(), build_index=False, full_name=True
    )
    for contig_name, seq in contigs:
        output_file = (
            output_dir / f"{contig_name.split(' ')[0]}{self._input_file.suffix}"
        )
        with open(output_file, "w+", encoding="UTF-8") as outfile:
            outfile.write(f">{contig_name}\n")
            outfile.write(seq + "\n")

FASTAmerger

Source code in src/pynteny/preprocessing.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
class FASTAmerger:
    def __init__(self, input_dir: Path):
        self.input_dir = Path(input_dir)

    def prepend_filename_to_record_names(self, output_dir: Path) -> None:
        """Prepend file name to each record label in fasta file

        Args:
            output_dir (Path): _description_
        """
        output_dir = Path(output_dir)
        output_dir.mkdir(parents=True, exist_ok=True)
        for file in self.input_dir.iterdir():
            fasta = pyfastx.Fasta(file.as_posix(), build_index=False, full_name=True)
            new_file_content = [
                f">{file.stem}_{record_name}\n{record_seq}\n"
                for record_name, record_seq in fasta
            ]
            with open(output_dir / file.name, "w+", encoding="UTF-8") as outfile:
                outfile.writelines(new_file_content)

    def merge(self, output_file: Path = None, prepend_file_name: bool = False) -> None:
        """Merge input fasta files into a one (multi)fasta file.

        Args:
            output_file (Path, optional): path to ouput merged fasta file. Defaults to None.
            prepend_file_name (bool, optional): whether to add file name as genome ID to
                each record in the result merged fasta file.

        """
        if output_file is None:
            output_file = self.input_dir / "merged.fasta"
        else:
            output_file = Path(output_file)
        logger.info("Merging FASTA files in input directory")
        cmd_str = f"printf '%s\\0' * | xargs -0 cat > {output_file}"
        if prepend_file_name:
            with tempfile.TemporaryDirectory() as tempdir:
                self.prepend_filename_to_record_names(output_dir=tempdir)
                utils.terminal_execute(cmd_str, work_dir=tempdir)
        else:
            utils.terminal_execute(cmd_str, work_dir=self.input_dir)
        logging.shutdown()

merge(output_file=None, prepend_file_name=False)

Merge input fasta files into a one (multi)fasta file.

Parameters:

Name Type Description Default
output_file Path

path to ouput merged fasta file. Defaults to None.

None
prepend_file_name bool

whether to add file name as genome ID to each record in the result merged fasta file.

False
Source code in src/pynteny/preprocessing.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def merge(self, output_file: Path = None, prepend_file_name: bool = False) -> None:
    """Merge input fasta files into a one (multi)fasta file.

    Args:
        output_file (Path, optional): path to ouput merged fasta file. Defaults to None.
        prepend_file_name (bool, optional): whether to add file name as genome ID to
            each record in the result merged fasta file.

    """
    if output_file is None:
        output_file = self.input_dir / "merged.fasta"
    else:
        output_file = Path(output_file)
    logger.info("Merging FASTA files in input directory")
    cmd_str = f"printf '%s\\0' * | xargs -0 cat > {output_file}"
    if prepend_file_name:
        with tempfile.TemporaryDirectory() as tempdir:
            self.prepend_filename_to_record_names(output_dir=tempdir)
            utils.terminal_execute(cmd_str, work_dir=tempdir)
    else:
        utils.terminal_execute(cmd_str, work_dir=self.input_dir)
    logging.shutdown()

prepend_filename_to_record_names(output_dir)

Prepend file name to each record label in fasta file

Parameters:

Name Type Description Default
output_dir Path

description

required
Source code in src/pynteny/preprocessing.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def prepend_filename_to_record_names(self, output_dir: Path) -> None:
    """Prepend file name to each record label in fasta file

    Args:
        output_dir (Path): _description_
    """
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)
    for file in self.input_dir.iterdir():
        fasta = pyfastx.Fasta(file.as_posix(), build_index=False, full_name=True)
        new_file_content = [
            f">{file.stem}_{record_name}\n{record_seq}\n"
            for record_name, record_seq in fasta
        ]
        with open(output_dir / file.name, "w+", encoding="UTF-8") as outfile:
            outfile.writelines(new_file_content)

GeneAnnotator

Run prodigal on assembly, predict ORFs and extract location info

Source code in src/pynteny/preprocessing.py
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
class GeneAnnotator:
    """Run prodigal on assembly, predict ORFs and extract location info"""

    def __init__(self, assembly: FASTA) -> None:
        """Initialize GeneAnnotator

        Args:
            assembly (FASTA): fasta object containing assembled nucleotide sequences
        """
        self._assembly = assembly

    def annotate(
        self,
        processes: int = None,
        metagenome: bool = True,
        output_file: Path = None,
        prodigal_args: str = None,
        tempdir: Path = None,
    ) -> LabelledFASTA:
        """Run prodigal on assembly and export single fasta file with peptide ORFs predictions

        Args:
            processes (int, optional): maximum number of threads. Defaults to all minus one.
            metagenome (bool, optional): whether assembled sequences correspond to metagenomic data. Defaults to True.
            output_file (Path, optional): path to output fasta file. Defaults to None.
            prodigal_args (str, optional): additional arguments to be passed to prodigal CLI. Defaults to None.
            tempdir (Path, optional): path to temporary directory. Defaults to tempfile default.

        Returns:
            LabelledFASTA: object containing the labelled peptide database.
        """
        if processes is None:
            processes = os.cpu_count() - 1
        if output_file is None:
            output_file = (
                self._assembly.file_path.parent
                / f"{self._assembly.file_path.stem}_annotated.faa"
            )
        else:
            output_file = Path(output_file)
        if tempdir is None:
            tempdir = Path(tempfile.gettempdir())
        else:
            tempdir = Path(tempdir).resolve()
        with tempfile.TemporaryDirectory(
            dir=tempdir
        ) as contigs_dir, tempfile.TemporaryDirectory(
            dir=tempdir
        ) as prodigal_dir, tempfile.NamedTemporaryFile() as temp_fasta:
            contigs_dir = Path(contigs_dir)
            prodigal_dir = Path(prodigal_dir)
            logger.info("Running prodigal on assembly data")
            self._assembly.split_by_contigs(contigs_dir)
            utils.parallelize_over_input_files(
                wrappers.run_prodigal,
                input_list=list(contigs_dir.iterdir()),
                processes=processes,
                output_dir=prodigal_dir,
                output_format="fasta",
                metagenome=metagenome,
                additional_args=prodigal_args,
            )
            logging.shutdown()
            FASTAmerger(prodigal_dir).merge(
                Path(temp_fasta.name), prepend_file_name=False
            )
            return LabelledFASTA.from_prodigal_output(
                Path(temp_fasta.name), output_file
            )

__init__(assembly)

Initialize GeneAnnotator

Parameters:

Name Type Description Default
assembly FASTA

fasta object containing assembled nucleotide sequences

required
Source code in src/pynteny/preprocessing.py
515
516
517
518
519
520
521
def __init__(self, assembly: FASTA) -> None:
    """Initialize GeneAnnotator

    Args:
        assembly (FASTA): fasta object containing assembled nucleotide sequences
    """
    self._assembly = assembly

annotate(processes=None, metagenome=True, output_file=None, prodigal_args=None, tempdir=None)

Run prodigal on assembly and export single fasta file with peptide ORFs predictions

Parameters:

Name Type Description Default
processes int

maximum number of threads. Defaults to all minus one.

None
metagenome bool

whether assembled sequences correspond to metagenomic data. Defaults to True.

True
output_file Path

path to output fasta file. Defaults to None.

None
prodigal_args str

additional arguments to be passed to prodigal CLI. Defaults to None.

None
tempdir Path

path to temporary directory. Defaults to tempfile default.

None

Returns:

Name Type Description
LabelledFASTA LabelledFASTA

object containing the labelled peptide database.

Source code in src/pynteny/preprocessing.py
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
def annotate(
    self,
    processes: int = None,
    metagenome: bool = True,
    output_file: Path = None,
    prodigal_args: str = None,
    tempdir: Path = None,
) -> LabelledFASTA:
    """Run prodigal on assembly and export single fasta file with peptide ORFs predictions

    Args:
        processes (int, optional): maximum number of threads. Defaults to all minus one.
        metagenome (bool, optional): whether assembled sequences correspond to metagenomic data. Defaults to True.
        output_file (Path, optional): path to output fasta file. Defaults to None.
        prodigal_args (str, optional): additional arguments to be passed to prodigal CLI. Defaults to None.
        tempdir (Path, optional): path to temporary directory. Defaults to tempfile default.

    Returns:
        LabelledFASTA: object containing the labelled peptide database.
    """
    if processes is None:
        processes = os.cpu_count() - 1
    if output_file is None:
        output_file = (
            self._assembly.file_path.parent
            / f"{self._assembly.file_path.stem}_annotated.faa"
        )
    else:
        output_file = Path(output_file)
    if tempdir is None:
        tempdir = Path(tempfile.gettempdir())
    else:
        tempdir = Path(tempdir).resolve()
    with tempfile.TemporaryDirectory(
        dir=tempdir
    ) as contigs_dir, tempfile.TemporaryDirectory(
        dir=tempdir
    ) as prodigal_dir, tempfile.NamedTemporaryFile() as temp_fasta:
        contigs_dir = Path(contigs_dir)
        prodigal_dir = Path(prodigal_dir)
        logger.info("Running prodigal on assembly data")
        self._assembly.split_by_contigs(contigs_dir)
        utils.parallelize_over_input_files(
            wrappers.run_prodigal,
            input_list=list(contigs_dir.iterdir()),
            processes=processes,
            output_dir=prodigal_dir,
            output_format="fasta",
            metagenome=metagenome,
            additional_args=prodigal_args,
        )
        logging.shutdown()
        FASTAmerger(prodigal_dir).merge(
            Path(temp_fasta.name), prepend_file_name=False
        )
        return LabelledFASTA.from_prodigal_output(
            Path(temp_fasta.name), output_file
        )

LabelledFASTA

Bases: FASTA

Tools to add and parse FASTA with positional info on record tags

Source code in src/pynteny/preprocessing.py
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
class LabelledFASTA(FASTA):
    """Tools to add and parse FASTA with positional info on record tags"""

    @classmethod
    def from_prodigal_output(
        cls, prodigal_faa: Path, output_file: Path = None
    ) -> LabelledFASTA:
        """Instantiate class from prodigal output file.
        Extract positional gene info from prodigal output and export to fasta file.

        Args:
            prodigal_faa (Path): path to prodigal output file containing peptide sequences
            output_file (Path, optional): path to output labelled fasta file. Defaults to None.

        Returns:
            LabelledFASTA: object containing the labelled peptide database.
        """
        number_prodigal_record_fields = 9
        prodigal_faa = Path(prodigal_faa)
        if output_file is None:
            output_file = prodigal_faa.parent / f"{prodigal_faa.stem}_longlabels.fasta"
        else:
            output_file = Path(output_file)
        data = pyfastx.Fasta(prodigal_faa.as_posix(), build_index=False, full_name=True)
        with open(output_file, "w+", encoding="UTF-8") as outfile:
            for record_name, record_seq in data:
                name_list = record_name.split(" ")
                if len(name_list) < number_prodigal_record_fields:
                    logger.error(
                        f"Invalid prodigal header format for record: {record_name}"
                    )
                    sys.exit(1)
                contig = "_".join(name_list[0].split("_")[:-1])
                gene_number = name_list[0].split("_")[-1]
                start, end = name_list[2], name_list[4]
                strand = (
                    "pos"
                    if name_list[6] == "1"
                    else ("neg" if name_list[6] == "-1" else "")
                )
                header = f">{contig}_{gene_number}__{contig}_{gene_number}_{start}_{end}_{strand}"
                outfile.write(header + "\n")
                outfile.write(record_seq + "\n")
        logging.shutdown()
        return cls(output_file)

    @classmethod
    def from_genbank(
        cls,
        gbk_data: Path,
        output_file: Path = None,
        prefix: str = None,
        nucleotide: bool = False,
        prepend_file_name: bool = False,
    ) -> LabelledFASTA:
        """Assign gene positional info, such as contig, gene number and loci
        to each record in genbank database and return LabelledFASTA object.

        Args:
            gbk_data (Path): path to file or directory contanining genbank files
            output_file (Path, optional): path to output labelled fasta file. Defaults to None.
            prefix (str, optional): prefix for output file. Defaults to None.
            nucleotide (bool, optional): whether records corresponds to nucleotide sequences instead of peptides. Defaults to False.
            prepend_file_name (bool, optional): whether to add file name as genome ID to
                each record in the result merged fasta file.
        Returns:
            LabelledFASTA: object containing the labelled peptide database.
        """
        gbk_data = Path(gbk_data)
        if gbk_data.is_dir():
            gbk_files = [gbk_data / f for f in gbk_data.iterdir()]
        else:
            gbk_files = [gbk_data]

        if prepend_file_name:

            def gbk_name(gbk_file):
                return gbk_file.stem

        else:

            def gbk_name(gbk_file):
                return

        gbk_contigs = [
            (gbk_name(gbk_file), contig)
            for gbk_file in gbk_files
            for contig in SeqIO.parse(gbk_file, "genbank")
        ]

        if output_file is None:
            output_file = Path(gbk_files[0].parent) / f"{prefix}sequence_database.fasta"
        else:
            output_file = Path(output_file)

        with open(output_file, "w+", encoding="UTF-8") as outfile:
            for gbk_file_name, gbk_contig in gbk_contigs:
                gene_counter = 0
                for feature in gbk_contig.features:
                    if "cds" in feature.type.lower():
                        gene_counter = cls.write_record(
                            gbk_contig,
                            feature,
                            outfile,
                            gene_counter,
                            gbk_file_name,
                            nucleotide,
                        )
        return cls(output_file)

    @staticmethod
    def get_label_str(
        gbk_contig: SeqRecord, feature: SeqFeature, gene_counter: int
    ) -> str:
        name = feature.qualifiers["locus_tag"][0].replace("_", ".")
        start, end, strand = (
            str(feature.location.start),
            str(feature.location.end),
            feature.location.strand,
        )
        start = start.replace(">", "").replace("<", "")
        end = end.replace(">", "").replace("<", "")
        strand_sense = "neg" if strand == -1 else ("pos" if strand == 1 else "")
        return f">{name}__{gbk_contig.name.replace('_', '')}_{gene_counter}_{start}_{end}_{strand_sense}\n"

    @staticmethod
    def write_record(
        gbk_contig: SeqRecord,
        feature: SeqFeature,
        output_file: TextIO,
        gene_counter: int,
        prefix: str = None,
        nucleotide: bool = False,
    ) -> int:
        header = LabelledFASTA.get_label_str(gbk_contig, feature, gene_counter)
        if prefix is not None:
            header = f"{prefix}_{header}"
        if (not nucleotide) and ("translation" in feature.qualifiers):
            sequence = feature.qualifiers["translation"][0]
        elif nucleotide:
            sequence = str(feature.extract(gbk_contig).seq)
        else:
            return gene_counter
        output_file.write(header)
        output_file.write(sequence + "\n")
        gene_counter += 1
        return gene_counter

from_genbank(gbk_data, output_file=None, prefix=None, nucleotide=False, prepend_file_name=False) classmethod

Assign gene positional info, such as contig, gene number and loci to each record in genbank database and return LabelledFASTA object.

Parameters:

Name Type Description Default
gbk_data Path

path to file or directory contanining genbank files

required
output_file Path

path to output labelled fasta file. Defaults to None.

None
prefix str

prefix for output file. Defaults to None.

None
nucleotide bool

whether records corresponds to nucleotide sequences instead of peptides. Defaults to False.

False
prepend_file_name bool

whether to add file name as genome ID to each record in the result merged fasta file.

False

Returns:

Name Type Description
LabelledFASTA LabelledFASTA

object containing the labelled peptide database.

Source code in src/pynteny/preprocessing.py
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
@classmethod
def from_genbank(
    cls,
    gbk_data: Path,
    output_file: Path = None,
    prefix: str = None,
    nucleotide: bool = False,
    prepend_file_name: bool = False,
) -> LabelledFASTA:
    """Assign gene positional info, such as contig, gene number and loci
    to each record in genbank database and return LabelledFASTA object.

    Args:
        gbk_data (Path): path to file or directory contanining genbank files
        output_file (Path, optional): path to output labelled fasta file. Defaults to None.
        prefix (str, optional): prefix for output file. Defaults to None.
        nucleotide (bool, optional): whether records corresponds to nucleotide sequences instead of peptides. Defaults to False.
        prepend_file_name (bool, optional): whether to add file name as genome ID to
            each record in the result merged fasta file.
    Returns:
        LabelledFASTA: object containing the labelled peptide database.
    """
    gbk_data = Path(gbk_data)
    if gbk_data.is_dir():
        gbk_files = [gbk_data / f for f in gbk_data.iterdir()]
    else:
        gbk_files = [gbk_data]

    if prepend_file_name:

        def gbk_name(gbk_file):
            return gbk_file.stem

    else:

        def gbk_name(gbk_file):
            return

    gbk_contigs = [
        (gbk_name(gbk_file), contig)
        for gbk_file in gbk_files
        for contig in SeqIO.parse(gbk_file, "genbank")
    ]

    if output_file is None:
        output_file = Path(gbk_files[0].parent) / f"{prefix}sequence_database.fasta"
    else:
        output_file = Path(output_file)

    with open(output_file, "w+", encoding="UTF-8") as outfile:
        for gbk_file_name, gbk_contig in gbk_contigs:
            gene_counter = 0
            for feature in gbk_contig.features:
                if "cds" in feature.type.lower():
                    gene_counter = cls.write_record(
                        gbk_contig,
                        feature,
                        outfile,
                        gene_counter,
                        gbk_file_name,
                        nucleotide,
                    )
    return cls(output_file)

from_prodigal_output(prodigal_faa, output_file=None) classmethod

Instantiate class from prodigal output file. Extract positional gene info from prodigal output and export to fasta file.

Parameters:

Name Type Description Default
prodigal_faa Path

path to prodigal output file containing peptide sequences

required
output_file Path

path to output labelled fasta file. Defaults to None.

None

Returns:

Name Type Description
LabelledFASTA LabelledFASTA

object containing the labelled peptide database.

Source code in src/pynteny/preprocessing.py
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
@classmethod
def from_prodigal_output(
    cls, prodigal_faa: Path, output_file: Path = None
) -> LabelledFASTA:
    """Instantiate class from prodigal output file.
    Extract positional gene info from prodigal output and export to fasta file.

    Args:
        prodigal_faa (Path): path to prodigal output file containing peptide sequences
        output_file (Path, optional): path to output labelled fasta file. Defaults to None.

    Returns:
        LabelledFASTA: object containing the labelled peptide database.
    """
    number_prodigal_record_fields = 9
    prodigal_faa = Path(prodigal_faa)
    if output_file is None:
        output_file = prodigal_faa.parent / f"{prodigal_faa.stem}_longlabels.fasta"
    else:
        output_file = Path(output_file)
    data = pyfastx.Fasta(prodigal_faa.as_posix(), build_index=False, full_name=True)
    with open(output_file, "w+", encoding="UTF-8") as outfile:
        for record_name, record_seq in data:
            name_list = record_name.split(" ")
            if len(name_list) < number_prodigal_record_fields:
                logger.error(
                    f"Invalid prodigal header format for record: {record_name}"
                )
                sys.exit(1)
            contig = "_".join(name_list[0].split("_")[:-1])
            gene_number = name_list[0].split("_")[-1]
            start, end = name_list[2], name_list[4]
            strand = (
                "pos"
                if name_list[6] == "1"
                else ("neg" if name_list[6] == "-1" else "")
            )
            header = f">{contig}_{gene_number}__{contig}_{gene_number}_{start}_{end}_{strand}"
            outfile.write(header + "\n")
            outfile.write(record_seq + "\n")
    logging.shutdown()
    return cls(output_file)

is_legit_DNA_sequence(record_seq)

Assert that DNA sequence only contains valid symbols.

Parameters:

Name Type Description Default
record_seq str

nucleotide sequence.

required

Returns:

Name Type Description
bool bool

whether nucleotide sequence only contains legit symbols.

Source code in src/pynteny/preprocessing.py
78
79
80
81
82
83
84
85
86
87
88
89
def is_legit_DNA_sequence(record_seq: str) -> bool:
    """Assert that DNA sequence only contains valid symbols.

    Args:
        record_seq (str): nucleotide sequence.

    Returns:
        bool: whether nucleotide sequence only contains legit symbols.
    """
    nts = {"A", "G", "T", "C", "N"}
    seq_symbols = {s.upper() for s in record_seq}
    return seq_symbols.issubset(nts)

is_legit_peptide_sequence(record_seq)

Assert that peptide sequence only contains valid symbols.

Parameters:

Name Type Description Default
record_seq str

peptide sequence.

required

Returns:

Name Type Description
bool bool

whether peptide sequence only contains legit symbols.

Source code in src/pynteny/preprocessing.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def is_legit_peptide_sequence(record_seq: str) -> bool:
    """Assert that peptide sequence only contains valid symbols.

    Args:
        record_seq (str): peptide sequence.

    Returns:
        bool: whether peptide sequence only contains legit symbols.
    """
    aas = {
        "A",
        "C",
        "D",
        "E",
        "F",
        "G",
        "H",
        "I",
        "K",
        "L",
        "M",
        "N",
        "P",
        "Q",
        "R",
        "S",
        "T",
        "V",
        "W",
        "Y",
        "*",
    }
    seq_symbols = {s.upper() for s in record_seq}
    return seq_symbols.issubset(aas)

remove_stop_sodon_signals(record_seq)

Remove stop codon signals from peptide sequence

Parameters:

Name Type Description Default
record_seq str

peptide sequence.

required

Returns:

Name Type Description
str str

a peptide sequence without stop codon symbols.

Source code in src/pynteny/preprocessing.py
30
31
32
33
34
35
36
37
38
39
def remove_stop_sodon_signals(record_seq: str) -> str:
    """Remove stop codon signals from peptide sequence

    Args:
        record_seq (str): peptide sequence.

    Returns:
        str: a peptide sequence without stop codon symbols.
    """
    return record_seq.replace("*", "")

Module hmm

Tools to parse Hmmer output and PGAP (HMM) database

HMMER

Run Hmmer on multiple hmms and parse output

Source code in src/pynteny/hmm.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
class HMMER:
    """Run Hmmer on multiple hmms and parse output"""

    def __init__(
        self,
        input_hmms: list[Path],
        hmm_output_dir: Path,
        input_data: Path,
        additional_args: list[str] = None,
        processes: int = None,
    ):
        """Initialize class HMMER

        Args:
            input_hmms (list[Path]): list of paths to input HMM files.
            hmm_output_dir (Path): path to output directory to HMMER output files.
            input_data (Path): path to input fasta file with sequence database.
            additional_args (list[str]): additional arguments to hmmsearch or hmmscan. Each
                element in the list is a string with additional arguments for each
                input hmm (arranged in the same order), an element can also take a
                value of None to avoid passing additional arguments for a specific
                input hmm. A single string may also be passed, in which case the
                same additional argument is passed to hmmsearch for all input hmms.
                Defaults to None.
            processes (int, optional): maximum number of threads to be employed.
                Defaults to all minus one.
        """
        if additional_args is None:
            additional_args = [None for _ in range(len(input_hmms))]
        self._hmmer_output_dir = Path(hmm_output_dir)
        self._input_hmms = [Path(hmm) for hmm in input_hmms]
        self._input_fasta = Path(input_data)
        self._additional_args = additional_args
        self._processes = processes

    @property
    def hmm_names(self) -> list[str]:
        """Get file names of input HMMs

        Returns:
            list[str]: list of file names.
        """
        return [hmm_path.stem for hmm_path in self._input_hmms]

    @staticmethod
    def parse_HMM_search_output(hmmer_output: Path) -> pd.DataFrame:
        """Parse hmmsearch or hmmscan summary table output file.

        Args:
            hmmer_output (str): path to HMMER output file.

        Returns:
            pd.DataFrame: a dataframe containing parsed HMMER output.
        """
        hmmer_output = Path(hmmer_output)
        attribs = ["id", "bias", "bitscore", "description"]
        hits = defaultdict(list)
        with open(hmmer_output, encoding="UTF-8") as handle:
            for queryresult in SearchIO.parse(handle, "hmmer3-tab"):
                for hit in queryresult.hits:
                    for attrib in attribs:
                        hits[attrib].append(getattr(hit, attrib))
        return pd.DataFrame.from_dict(hits)

    def get_HMMER_tables(
        self, reuse_hmmer_results: bool = True, method: str = "hmmsearch"
    ) -> dict[pd.DataFrame]:
        """Run hmmer for given hmm list

        Args:
            reuse_hmmer_results (bool, optional): if True then HMMER3 won't be run
                again for HMMs already searched in the same output directory. Defaults to True.
            method (str, optional): select between 'hmmsearch' or 'hmmscan'. Defaults to 'hmmsearch'.

        Returns:
            dict[pd.DataFrame]: dict of HMMER hits as pandas dataframes.
        """
        hmm_hits = {}
        for hmm_model, add_args in zip(self._input_hmms, self._additional_args):
            hmm_name = hmm_model.stem
            hmmer_output = Path(self._hmmer_output_dir) / f"hmmer_output_{hmm_name}.txt"

            if not (reuse_hmmer_results and os.path.isfile(hmmer_output)):
                wrappers.run_HMM_search(
                    hmm_model=hmm_model,
                    input_fasta=self._input_fasta,
                    output_file=hmmer_output,
                    method=method,
                    processes=self._processes,
                    additional_args=add_args,
                )
            elif reuse_hmmer_results and os.path.isfile(hmmer_output):
                logger.info(f"Reusing Hmmer results for HMM: {hmm_name}")
            hmm_hits[hmm_name] = HMMER.parse_HMM_search_output(hmmer_output)
        return hmm_hits

hmm_names: list[str] property

Get file names of input HMMs

Returns:

Type Description
list[str]

list[str]: list of file names.

__init__(input_hmms, hmm_output_dir, input_data, additional_args=None, processes=None)

Initialize class HMMER

Parameters:

Name Type Description Default
input_hmms list[Path]

list of paths to input HMM files.

required
hmm_output_dir Path

path to output directory to HMMER output files.

required
input_data Path

path to input fasta file with sequence database.

required
additional_args list[str]

additional arguments to hmmsearch or hmmscan. Each element in the list is a string with additional arguments for each input hmm (arranged in the same order), an element can also take a value of None to avoid passing additional arguments for a specific input hmm. A single string may also be passed, in which case the same additional argument is passed to hmmsearch for all input hmms. Defaults to None.

None
processes int

maximum number of threads to be employed. Defaults to all minus one.

None
Source code in src/pynteny/hmm.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def __init__(
    self,
    input_hmms: list[Path],
    hmm_output_dir: Path,
    input_data: Path,
    additional_args: list[str] = None,
    processes: int = None,
):
    """Initialize class HMMER

    Args:
        input_hmms (list[Path]): list of paths to input HMM files.
        hmm_output_dir (Path): path to output directory to HMMER output files.
        input_data (Path): path to input fasta file with sequence database.
        additional_args (list[str]): additional arguments to hmmsearch or hmmscan. Each
            element in the list is a string with additional arguments for each
            input hmm (arranged in the same order), an element can also take a
            value of None to avoid passing additional arguments for a specific
            input hmm. A single string may also be passed, in which case the
            same additional argument is passed to hmmsearch for all input hmms.
            Defaults to None.
        processes (int, optional): maximum number of threads to be employed.
            Defaults to all minus one.
    """
    if additional_args is None:
        additional_args = [None for _ in range(len(input_hmms))]
    self._hmmer_output_dir = Path(hmm_output_dir)
    self._input_hmms = [Path(hmm) for hmm in input_hmms]
    self._input_fasta = Path(input_data)
    self._additional_args = additional_args
    self._processes = processes

get_HMMER_tables(reuse_hmmer_results=True, method='hmmsearch')

Run hmmer for given hmm list

Parameters:

Name Type Description Default
reuse_hmmer_results bool

if True then HMMER3 won't be run again for HMMs already searched in the same output directory. Defaults to True.

True
method str

select between 'hmmsearch' or 'hmmscan'. Defaults to 'hmmsearch'.

'hmmsearch'

Returns:

Type Description
dict[pd.DataFrame]

dict[pd.DataFrame]: dict of HMMER hits as pandas dataframes.

Source code in src/pynteny/hmm.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def get_HMMER_tables(
    self, reuse_hmmer_results: bool = True, method: str = "hmmsearch"
) -> dict[pd.DataFrame]:
    """Run hmmer for given hmm list

    Args:
        reuse_hmmer_results (bool, optional): if True then HMMER3 won't be run
            again for HMMs already searched in the same output directory. Defaults to True.
        method (str, optional): select between 'hmmsearch' or 'hmmscan'. Defaults to 'hmmsearch'.

    Returns:
        dict[pd.DataFrame]: dict of HMMER hits as pandas dataframes.
    """
    hmm_hits = {}
    for hmm_model, add_args in zip(self._input_hmms, self._additional_args):
        hmm_name = hmm_model.stem
        hmmer_output = Path(self._hmmer_output_dir) / f"hmmer_output_{hmm_name}.txt"

        if not (reuse_hmmer_results and os.path.isfile(hmmer_output)):
            wrappers.run_HMM_search(
                hmm_model=hmm_model,
                input_fasta=self._input_fasta,
                output_file=hmmer_output,
                method=method,
                processes=self._processes,
                additional_args=add_args,
            )
        elif reuse_hmmer_results and os.path.isfile(hmmer_output):
            logger.info(f"Reusing Hmmer results for HMM: {hmm_name}")
        hmm_hits[hmm_name] = HMMER.parse_HMM_search_output(hmmer_output)
    return hmm_hits

parse_HMM_search_output(hmmer_output) staticmethod

Parse hmmsearch or hmmscan summary table output file.

Parameters:

Name Type Description Default
hmmer_output str

path to HMMER output file.

required

Returns:

Type Description
pd.DataFrame

pd.DataFrame: a dataframe containing parsed HMMER output.

Source code in src/pynteny/hmm.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
@staticmethod
def parse_HMM_search_output(hmmer_output: Path) -> pd.DataFrame:
    """Parse hmmsearch or hmmscan summary table output file.

    Args:
        hmmer_output (str): path to HMMER output file.

    Returns:
        pd.DataFrame: a dataframe containing parsed HMMER output.
    """
    hmmer_output = Path(hmmer_output)
    attribs = ["id", "bias", "bitscore", "description"]
    hits = defaultdict(list)
    with open(hmmer_output, encoding="UTF-8") as handle:
        for queryresult in SearchIO.parse(handle, "hmmer3-tab"):
            for hit in queryresult.hits:
                for attrib in attribs:
                    hits[attrib].append(getattr(hit, attrib))
    return pd.DataFrame.from_dict(hits)

PGAP

Tools to parse PGAP hmm database metadata

Source code in src/pynteny/hmm.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
class PGAP:
    """Tools to parse PGAP hmm database metadata"""

    def __init__(self, meta_file: Path):
        """Initialize class PGAP

        Args:
            meta_file (Path): path to PGAP's metadata file.
        """
        meta_file = Path(meta_file)
        meta = pd.read_csv(
            str(meta_file),
            sep="\t",
            usecols=[
                "#ncbi_accession",
                "gene_symbol",
                "label",
                "product_name",
                "ec_numbers",
            ],
        )
        # meta = meta[
        #     ["#ncbi_accession", "gene_symbol", "label", "product_name", "ec_numbers"]
        # ]
        self._meta = meta
        self._meta_file = meta_file

    @staticmethod
    def extract_PGAP_to_directory(pgap_tar: Path, output_dir: Path) -> None:
        """Extract PGAP hmm database (tar.gz) to given directory

        Args:
            pgap_tar (Path): path to compressed PGAP database.
            output_dir (Path): path to output directory.
        """
        pgap_tar = Path(pgap_tar)
        if not is_tar_file(pgap_tar):
            logger.warning(f"{pgap_tar} is not a tar file. Skipping extraction")
            sys.exit(1)
        logger.info("Extracting hmm files to target directory")
        output_dir = Path(output_dir)
        if output_dir.exists():
            shutil.rmtree(output_dir)
        extract_tar_file(tar_file=pgap_tar, dest_dir=output_dir)
        flatten_directory(output_dir)

    def remove_missing_HMMs_from_metadata(
        self, hmm_dir: Path, output_file: Path = None
    ) -> None:
        """Remove HMMs from metadata that are not in HMM directory

        Args:
            hmm_dir (Path): path to directory containing PGAP database.
            output_file (Path, optional): path to output file. Defaults to None.
        """
        hmm_dir = Path(hmm_dir)
        if output_file is None:
            output_file = self._meta.parent / f"{self._meta.stem}_missing_hmms.tsv"
        else:
            output_file = Path(output_file)
        if is_tar_file(hmm_dir):
            hmm_file_names = [
                Path(hmm_file).stem.strip() for hmm_file in list_tar_dir(hmm_dir)
            ]
        else:
            hmm_file_names = [hmm_file.stem.strip() for hmm_file in hmm_dir.iterdir()]
        not_found = set()
        for i, row in self._meta.iterrows():
            if row["#ncbi_accession"].strip() not in hmm_file_names:
                not_found.add(i)
        self._meta = self._meta.drop(not_found)
        self._meta.to_csv(output_file, sep="\t", index=False)

    def get_HMM_names_by_gene_symbol(self, gene_symbol: str) -> list[str]:
        """Try to retrieve HMM by its gene symbol, more
           than one HMM may map to a single gene symbol

        Args:
            gene_symbol (str): gene symbol to be searched for HMM.

        Returns:
            list[str]: list of HMM names matching gene symbol.
        """
        meta = self._meta  # .dropna(subset=["gene_symbol", "label"], axis=0)
        return meta[
            (
                (meta.gene_symbol == gene_symbol)
                |
                # (meta.label.str.contains(gene_id))
                (meta.label == gene_symbol)
            )
        ]["#ncbi_accession"].values.tolist()

    def get_HMM_group_for_gene_symbol(self, gene_symbol: str) -> str:
        """Get HMMs corresponding to gene symbol in PGAP metadata.
           If more than one HMM matching gene symbol, return a HMM group

        Args:
            gene_symbol (str): gene symbol to be searched for HMM.

        Returns:
            str: string of HMM names (group separated by |)
        """
        hmms = self.get_HMM_names_by_gene_symbol(gene_symbol)
        if not hmms:
            logger.error(f"No HMM found for gene {gene_symbol}")
            sys.exit(1)
        if len(hmms) == 1:
            return hmms.pop()
        else:
            return "|".join(hmms)

    def get_HMM_gene_ID(self, hmm_name: str) -> list[str]:
        """Get gene symbols matching given hmm.

        Args:
            hmm_name (str): query HMM name.

        Returns:
            list[str]: list of gene symbols matching given HMM.
        """
        meta = self._meta.dropna(subset=["#ncbi_accession"], axis=0)
        return meta[meta["#ncbi_accession"] == hmm_name]["gene_symbol"].values.tolist()

    def get_meta_info_for_HMM(self, hmm_name: str) -> dict:
        """Get meta info for given hmm.

        Args:
            hmm_name (str): query HMM name.

        Returns:
            dict: metadata of provided HMM.
        """
        meta = self._meta.dropna(subset=["#ncbi_accession"], axis=0).applymap(
            lambda x: x if not pd.isna(x) else ""
        )
        metadata = {
            k: list(v.values())[0] if list(v.values())[0] else "undef"
            for k, v in meta[meta["#ncbi_accession"] == hmm_name].to_dict().items()
        }
        if not metadata:
            logger.warning(f"No metadata for HMM: {hmm_name}")
        return metadata

__init__(meta_file)

Initialize class PGAP

Parameters:

Name Type Description Default
meta_file Path

path to PGAP's metadata file.

required
Source code in src/pynteny/hmm.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def __init__(self, meta_file: Path):
    """Initialize class PGAP

    Args:
        meta_file (Path): path to PGAP's metadata file.
    """
    meta_file = Path(meta_file)
    meta = pd.read_csv(
        str(meta_file),
        sep="\t",
        usecols=[
            "#ncbi_accession",
            "gene_symbol",
            "label",
            "product_name",
            "ec_numbers",
        ],
    )
    # meta = meta[
    #     ["#ncbi_accession", "gene_symbol", "label", "product_name", "ec_numbers"]
    # ]
    self._meta = meta
    self._meta_file = meta_file

extract_PGAP_to_directory(pgap_tar, output_dir) staticmethod

Extract PGAP hmm database (tar.gz) to given directory

Parameters:

Name Type Description Default
pgap_tar Path

path to compressed PGAP database.

required
output_dir Path

path to output directory.

required
Source code in src/pynteny/hmm.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
@staticmethod
def extract_PGAP_to_directory(pgap_tar: Path, output_dir: Path) -> None:
    """Extract PGAP hmm database (tar.gz) to given directory

    Args:
        pgap_tar (Path): path to compressed PGAP database.
        output_dir (Path): path to output directory.
    """
    pgap_tar = Path(pgap_tar)
    if not is_tar_file(pgap_tar):
        logger.warning(f"{pgap_tar} is not a tar file. Skipping extraction")
        sys.exit(1)
    logger.info("Extracting hmm files to target directory")
    output_dir = Path(output_dir)
    if output_dir.exists():
        shutil.rmtree(output_dir)
    extract_tar_file(tar_file=pgap_tar, dest_dir=output_dir)
    flatten_directory(output_dir)

get_HMM_gene_ID(hmm_name)

Get gene symbols matching given hmm.

Parameters:

Name Type Description Default
hmm_name str

query HMM name.

required

Returns:

Type Description
list[str]

list[str]: list of gene symbols matching given HMM.

Source code in src/pynteny/hmm.py
235
236
237
238
239
240
241
242
243
244
245
def get_HMM_gene_ID(self, hmm_name: str) -> list[str]:
    """Get gene symbols matching given hmm.

    Args:
        hmm_name (str): query HMM name.

    Returns:
        list[str]: list of gene symbols matching given HMM.
    """
    meta = self._meta.dropna(subset=["#ncbi_accession"], axis=0)
    return meta[meta["#ncbi_accession"] == hmm_name]["gene_symbol"].values.tolist()

get_HMM_group_for_gene_symbol(gene_symbol)

Get HMMs corresponding to gene symbol in PGAP metadata. If more than one HMM matching gene symbol, return a HMM group

Parameters:

Name Type Description Default
gene_symbol str

gene symbol to be searched for HMM.

required

Returns:

Name Type Description
str str

string of HMM names (group separated by |)

Source code in src/pynteny/hmm.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
def get_HMM_group_for_gene_symbol(self, gene_symbol: str) -> str:
    """Get HMMs corresponding to gene symbol in PGAP metadata.
       If more than one HMM matching gene symbol, return a HMM group

    Args:
        gene_symbol (str): gene symbol to be searched for HMM.

    Returns:
        str: string of HMM names (group separated by |)
    """
    hmms = self.get_HMM_names_by_gene_symbol(gene_symbol)
    if not hmms:
        logger.error(f"No HMM found for gene {gene_symbol}")
        sys.exit(1)
    if len(hmms) == 1:
        return hmms.pop()
    else:
        return "|".join(hmms)

get_HMM_names_by_gene_symbol(gene_symbol)

Try to retrieve HMM by its gene symbol, more than one HMM may map to a single gene symbol

Parameters:

Name Type Description Default
gene_symbol str

gene symbol to be searched for HMM.

required

Returns:

Type Description
list[str]

list[str]: list of HMM names matching gene symbol.

Source code in src/pynteny/hmm.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def get_HMM_names_by_gene_symbol(self, gene_symbol: str) -> list[str]:
    """Try to retrieve HMM by its gene symbol, more
       than one HMM may map to a single gene symbol

    Args:
        gene_symbol (str): gene symbol to be searched for HMM.

    Returns:
        list[str]: list of HMM names matching gene symbol.
    """
    meta = self._meta  # .dropna(subset=["gene_symbol", "label"], axis=0)
    return meta[
        (
            (meta.gene_symbol == gene_symbol)
            |
            # (meta.label.str.contains(gene_id))
            (meta.label == gene_symbol)
        )
    ]["#ncbi_accession"].values.tolist()

get_meta_info_for_HMM(hmm_name)

Get meta info for given hmm.

Parameters:

Name Type Description Default
hmm_name str

query HMM name.

required

Returns:

Name Type Description
dict dict

metadata of provided HMM.

Source code in src/pynteny/hmm.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
def get_meta_info_for_HMM(self, hmm_name: str) -> dict:
    """Get meta info for given hmm.

    Args:
        hmm_name (str): query HMM name.

    Returns:
        dict: metadata of provided HMM.
    """
    meta = self._meta.dropna(subset=["#ncbi_accession"], axis=0).applymap(
        lambda x: x if not pd.isna(x) else ""
    )
    metadata = {
        k: list(v.values())[0] if list(v.values())[0] else "undef"
        for k, v in meta[meta["#ncbi_accession"] == hmm_name].to_dict().items()
    }
    if not metadata:
        logger.warning(f"No metadata for HMM: {hmm_name}")
    return metadata

remove_missing_HMMs_from_metadata(hmm_dir, output_file=None)

Remove HMMs from metadata that are not in HMM directory

Parameters:

Name Type Description Default
hmm_dir Path

path to directory containing PGAP database.

required
output_file Path

path to output file. Defaults to None.

None
Source code in src/pynteny/hmm.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def remove_missing_HMMs_from_metadata(
    self, hmm_dir: Path, output_file: Path = None
) -> None:
    """Remove HMMs from metadata that are not in HMM directory

    Args:
        hmm_dir (Path): path to directory containing PGAP database.
        output_file (Path, optional): path to output file. Defaults to None.
    """
    hmm_dir = Path(hmm_dir)
    if output_file is None:
        output_file = self._meta.parent / f"{self._meta.stem}_missing_hmms.tsv"
    else:
        output_file = Path(output_file)
    if is_tar_file(hmm_dir):
        hmm_file_names = [
            Path(hmm_file).stem.strip() for hmm_file in list_tar_dir(hmm_dir)
        ]
    else:
        hmm_file_names = [hmm_file.stem.strip() for hmm_file in hmm_dir.iterdir()]
    not_found = set()
    for i, row in self._meta.iterrows():
        if row["#ncbi_accession"].strip() not in hmm_file_names:
            not_found.add(i)
    self._meta = self._meta.drop(not_found)
    self._meta.to_csv(output_file, sep="\t", index=False)

Module labelparser

Tools to parse record labels to extract coded info

parse(label)

Parse sequence labels to obtain contig and locus info

Parameters:

Name Type Description Default
label str

sequence label

required

Returns:

Name Type Description
dict dict

dictionary with parsed label info.

Source code in src/pynteny/parsers/labelparser.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def parse(label: str) -> dict:
    """Parse sequence labels to obtain contig and locus info

    Args:
        label (str): sequence label

    Returns:
        dict: dictionary with parsed label info.
    """
    parsed_dict = {
        "full_label": label,
        "gene_id": "",
        "contig": "",
        "gene_pos": None,
        "locus_pos": None,
        "strand": "",
    }
    try:
        entry = label.split("__")[0]
        meta = label.split("__")[1]
        strand = meta.split("_")[-1]
        locus_pos = tuple([int(pos) for pos in meta.split("_")[-3:-1]])
        gene_pos = int(meta.split("_")[-4])
        contig = "_".join(meta.split("_")[:-4])

        parsed_dict["gene_id"] = entry
        parsed_dict["contig"] = contig
        parsed_dict["gene_pos"] = gene_pos
        parsed_dict["locus_pos"] = locus_pos
        parsed_dict["strand"] = strand
    except Exception:
        pass
    return parsed_dict

parse_from_list(labels)

Parse labels in list of labels and return DataFrame.

Parameters:

Name Type Description Default
labels list

list of labels as stringgs.

required

Returns:

Type Description
pd.DataFrame

pd.DataFrame: Dataframe containing parsed information from labels.

Source code in src/pynteny/parsers/labelparser.py
52
53
54
55
56
57
58
59
60
61
def parse_from_list(labels: list[str]) -> pd.DataFrame:
    """Parse labels in list of labels and return DataFrame.

    Args:
        labels (list, optional): list of labels as stringgs.

    Returns:
        pd.DataFrame: Dataframe containing parsed information from labels.
    """
    return pd.DataFrame([parse(label) for label in labels])

Module syntenyparser

Tools to parse synteny structure strings

contains_HMM_groups(synteny_structure)

Check whether structure contains groups of gene-equivalent HMMs.

Source code in src/pynteny/parsers/syntenyparser.py
26
27
28
def contains_HMM_groups(synteny_structure: str) -> bool:
    """Check whether structure contains groups of gene-equivalent HMMs."""
    return "|" in synteny_structure

get_HMM_groups_in_structure(synteny_structure)

Get hmm names employed in synteny structure, if more than one hmm for the same gene, return a list with all of them.

Source code in src/pynteny/parsers/syntenyparser.py
73
74
75
76
77
78
79
80
81
82
83
def get_HMM_groups_in_structure(synteny_structure: str) -> list[str]:
    """Get hmm names employed in synteny structure,
    if more than one hmm for the same gene, return
    a list with all of them.
    """
    links = synteny_structure.replace("(", "").replace(")", "").strip().split()
    if not links:
        logger.error("Invalid format for synteny structure")
        sys.exit(1)
    hmm_groups = [split_strand_from_locus(h)[1] for h in links if not h.isdigit()]
    return hmm_groups

get_all_HMMs_in_structure(synteny_structure)

Get hmm names employed in synteny structure, if more than one hmm for the same gene, return a list with all of them.

Source code in src/pynteny/parsers/syntenyparser.py
 96
 97
 98
 99
100
101
102
103
def get_all_HMMs_in_structure(synteny_structure: str) -> list[str]:
    """Get hmm names employed in synteny structure,
    if more than one hmm for the same gene, return
    a list with all of them.
    """
    hmm_groups = get_HMM_groups_in_structure(synteny_structure)
    hmm_names = [hmm for hmm_group in hmm_groups for hmm in hmm_group.split("|")]
    return hmm_names

get_gene_symbols_in_structure(synteny_structure)

Retrieve gene symbols contained in synteny structure.

Source code in src/pynteny/parsers/syntenyparser.py
86
87
88
89
90
91
92
93
def get_gene_symbols_in_structure(synteny_structure: str) -> list[str]:
    """Retrieve gene symbols contained in synteny structure."""
    links = synteny_structure.strip().split()
    if not links:
        logger.error("Invalid format for synteny structure")
        sys.exit(1)
    gene_symbols = [split_strand_from_locus(h)[1] for h in links if not h.isdigit()]
    return gene_symbols

get_maximum_distances_in_structure(synteny_structure)

Get maximum gene distances in synteny structure.

Source code in src/pynteny/parsers/syntenyparser.py
129
130
131
132
133
134
135
def get_maximum_distances_in_structure(synteny_structure: str) -> list[int]:
    """Get maximum gene distances in synteny structure."""
    links = synteny_structure.strip().split()
    if not links:
        logger.error("Invalid format for synteny structure")
        sys.exit(1)
    return [int(dist) for dist in links if dist.isdigit()]

get_strands_in_structure(synteny_structure, parsed_symbol=True)

Get strand sense list in structure.

Parameters:

Name Type Description Default
synteny_structure str

a synteny structure.

required
parsed_symbol bool

if True, strand info '>' is parsed as 'pos' and '<' as 'neg'. Defaults to True.

True

Returns:

Type Description
list[str]

list[str]: parsed synteny structure as a list of tuples containing HMM name and strand info for each HMM group.

Source code in src/pynteny/parsers/syntenyparser.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def get_strands_in_structure(
    synteny_structure: str, parsed_symbol: bool = True
) -> list[str]:
    """Get strand sense list in structure.

    Args:
        synteny_structure (str): a synteny structure.
        parsed_symbol (bool, optional): if True, strand info '>' is parsed
            as 'pos' and '<' as 'neg'. Defaults to True.

    Returns:
        list[str]: parsed synteny structure as a list of tuples containing
            HMM name and strand info for each HMM group.
    """
    links = synteny_structure.strip().split()
    if not links:
        logger.error("Invalid format for synteny structure")
        sys.exit(1)
    return [
        split_strand_from_locus(h, parsed_symbol)[0] for h in links if not h.isdigit()
    ]

is_valid_structure(synteny_structure)

Validate synteny structure format.

Source code in src/pynteny/parsers/syntenyparser.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def is_valid_structure(synteny_structure: str) -> bool:
    """Validate synteny structure format."""
    synteny_structure = synteny_structure.replace(" |", "|").replace("| ", "|").strip()
    parsed_struc = parse_synteny_structure(synteny_structure)
    right_type = all(
        (
            is_right_list_nested_type(parsed_struc["hmm_groups"], str),
            is_right_list_nested_type(parsed_struc["strands"], str),
            is_right_list_nested_type(parsed_struc["distances"], int),
        )
    )
    right_format = len(parsed_struc["hmm_groups"]) == len(
        parsed_struc["strands"]
    ) and len(parsed_struc["hmm_groups"]) == (len(parsed_struc["distances"]) + 1)
    return False if (not right_type or not right_format) else True

parse_genes_in_synteny_structure(synteny_structure, hmm_meta)

Convert gene-based synteny structure into a HMM-based one. If a gene symbol matches more than one HMM, return a HMM group like: (HMM1 | HMM2 | ...).

Parameters:

Name Type Description Default
synteny_structure str

a string like the following:

hmm_a n_ab <hmm_b n_bc hmm_c

where '>' indicates a hmm target located on the positive strand, '<' a target located on the negative strand, and n_ab cooresponds to the maximum number of genes separating matched gene a and b. Multiple hmms may be employed (limited by computational capabilities). No order symbol in a hmm indicates that results should be independent of strand location.

required
hmm_meta Path

path to PGAP's metadata file.

required

Returns:

Type Description
tuple[str, dict]

tuple[str,dict]: parsed synteny structure where gene symbols are replaced by HMM names.

Source code in src/pynteny/parsers/syntenyparser.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def parse_genes_in_synteny_structure(
    synteny_structure: str, hmm_meta: Path
) -> tuple[str, dict]:
    """Convert gene-based synteny structure into a HMM-based one.
    If a gene symbol matches more than one HMM, return a HMM group
    like: (HMM1 | HMM2 | ...).

    Args:
        synteny_structure (str): a string like the following:
            >hmm_a n_ab <hmm_b n_bc hmm_c

            where '>' indicates a hmm target located on the positive strand,
            '<' a target located on the negative strand, and n_ab cooresponds
            to the maximum number of genes separating matched gene a and b.
            Multiple hmms may be employed (limited by computational capabilities).
            No order symbol in a hmm indicates that results should be independent
            of strand location.
        hmm_meta (Path): path to PGAP's metadata file.

    Returns:
        tuple[str,dict]: parsed synteny structure where gene symbols are replaced
            by HMM names.
    """
    pgap = PGAP(Path(hmm_meta))
    gene_symbols = get_gene_symbols_in_structure(synteny_structure)
    strand_locs = get_strands_in_structure(synteny_structure, parsed_symbol=False)
    gene_dists = get_maximum_distances_in_structure(synteny_structure)
    hmm_groups = {
        gene_symbol: pgap.get_HMM_group_for_gene_symbol(gene_symbol)
        for gene_symbol in gene_symbols
    }
    unmatched_genes = [
        gene_id for gene_id, hmm_group in hmm_groups.items() if not hmm_group
    ]
    if unmatched_genes:
        logger.error(
            f"These genes did not get a HMM match in database: {unmatched_genes}"
        )
        sys.exit(1)

    hmm_synteny_struc = ""

    for strand, dist, hmm_group in zip(
        strand_locs, [""] + gene_dists, hmm_groups.values()
    ):
        if "|" in hmm_group:
            hmm_group = f"({hmm_group})"
        hmm_synteny_struc += f"{dist} {strand}{hmm_group} "

    return hmm_synteny_struc.strip(), hmm_groups

parse_synteny_structure(synteny_structure)

Parse synteny structure string.

Parameters:

Name Type Description Default
synteny_structure str

a string like the following:

hmm_a n_ab <hmm_b n_bc hmm_c

where '>' indicates a hmm target located on the positive strand, '<' a target located on the negative strand, and n_ab cooresponds to the maximum number of genes separating matched gene a and b. Multiple hmms may be employed (limited by computational capabilities). No order symbol in a hmm indicates that results should be independent of strand location.

required

Returns:

Name Type Description
dict dict

parsed synteny structure.

Source code in src/pynteny/parsers/syntenyparser.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def parse_synteny_structure(synteny_structure: str) -> dict:
    """Parse synteny structure string.

    Args:
        synteny_structure (str): a string like the following:
            >hmm_a n_ab <hmm_b n_bc hmm_c

            where '>' indicates a hmm target located on the positive strand,
            '<' a target located on the negative strand, and n_ab cooresponds
            to the maximum number of genes separating matched gene a and b.
            Multiple hmms may be employed (limited by computational capabilities).
            No order symbol in a hmm indicates that results should be independent
            of strand location.

    Returns:
        dict: parsed synteny structure.
    """
    max_dists = get_maximum_distances_in_structure(synteny_structure)
    hmm_groups = get_HMM_groups_in_structure(synteny_structure)
    strands = get_strands_in_structure(synteny_structure)
    return {"hmm_groups": hmm_groups, "strands": strands, "distances": max_dists}

reformat_synteny_structure(synteny_structure)

Remove illegal symbols and extra white space.

Source code in src/pynteny/parsers/syntenyparser.py
20
21
22
23
def reformat_synteny_structure(synteny_structure: str) -> str:
    """Remove illegal symbols and extra white space."""
    synteny_structure = synteny_structure.replace(" |", "|").replace("| ", "|").strip()
    return synteny_structure

split_strand_from_locus(locus_str, parsed_symbol=True)

Split strand info from locus tag / HMM model.

Parameters:

Name Type Description Default
locus_str str

a substring of a synteny structure containing a gene symbol / HMM name and strand info.

required
parsed_symbol bool

if True, strand info '>' is parsed as 'pos' and '<' as 'neg'. Defaults to True.

True

Returns:

Type Description
tuple[str]

tuple[str]: tuple with parsed strand info and gene symbol / HMM name.

Source code in src/pynteny/parsers/syntenyparser.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def split_strand_from_locus(locus_str: str, parsed_symbol: bool = True) -> tuple[str]:
    """Split strand info from locus tag / HMM model.

    Args:
        locus_str (str): a substring of a synteny structure containing
            a gene symbol / HMM name and strand info.
        parsed_symbol (bool, optional): if True, strand info '>' is parsed
            as 'pos' and '<' as 'neg'. Defaults to True.

    Returns:
        tuple[str]: tuple with parsed strand info and gene symbol / HMM name.
    """
    locus_str = locus_str.strip()
    if locus_str[0] == "<" or locus_str[0] == ">":
        sense = locus_str[0]
        locus_str = locus_str[1:]
        if parsed_symbol:
            strand = "pos" if sense == ">" else "neg"
        else:
            strand = sense
    else:
        strand = ""
    return (strand, locus_str)

Module cli

Pynteny

Main command based on: https://selvakumar-arumugapandian.medium.com/ \ command-line-subcommands-with-pythons-argparse-4dbac80f7110

Source code in src/pynteny/cli.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class Pynteny:
    """Main command based on:
    https://selvakumar-arumugapandian.medium.com/  \\
    command-line-subcommands-with-pythons-argparse-4dbac80f7110
    """

    def __init__(self, subcommand: str, subcommand_args: list[str]):
        """Initialize main command

        Args:
            subcommand (str): subcommand name
            subcommand_args (list[str]): list of subcommand arguments and values.
        """
        self._subcommand = subcommand
        if "--hmmsearch_args" in subcommand_args:
            hmm_arg_idx = subcommand_args.index("--hmmsearch_args") + 1
            subcommand_args[hmm_arg_idx] = " " + subcommand_args[hmm_arg_idx]
        self._subcommand_args = subcommand_args
        parser = argparse.ArgumentParser(
            description=(self._printLogo()),
            usage=("pynteny <subcommand> [-h] [args] \n"),
            epilog=(self._generate_cool_quotes()),
            formatter_class=argparse.RawTextHelpFormatter,
        )
        parser._positionals.title = "subcommands"
        parser.add_argument(
            help=("search \n" "build \n" "parse \n" "download \n" "cite \n"),
            dest="subcommand",
            metavar="",
        )
        parser.add_argument(
            "-v",
            "--version",
            help="show version and exit",
            action="version",
            version=__version__,
        )
        if len(sys.argv) < 2:
            parser.print_help()
            sys.exit(1)
        args = parser.parse_args(self._subcommand)
        input_subcommand = getattr(args, "subcommand")
        ConfigParser.initialize_config_file()
        self._call_subcommand(subcommand_name=input_subcommand)

    def _printLogo(self):
        print(
            (
                """
    ____              __
   / __ \__  ______  / /____  ____  __  __
  / /_/ / / / / __ \/ __/ _ \/ __ \/ / / /
 / ____/ /_/ / / / / /_/  __/ / / / /_/ /
/_/    \__, /_/ /_/\__/\___/_/ /_/\__, /
      /____/                     /____/

"""
                f"Synteny-based Hmmer searches made easy, v{__version__}\n"
                "Semidán Robaina Estévez (srobaina@ull.edu.es), 2022\n"
                " \n"
            )
        )

    def _generate_cool_quotes(self):
        quotes = [
            "May the force be with you (Yoda)",
            "This looks like a job for a computer (AI)",
            "This is such a wonderful day to do bioinformatics (SR)",
            "One does not simply walk into Mordor (J.R.R. Tolkien)",
            "Damn, looks like a rainy day, let's do bioiformatics! (SR)",
        ]
        return f"{random.choice(quotes)}\n"

    def _call_subcommand(self, subcommand_name: str) -> None:
        subcommand = getattr(self, subcommand_name)
        subcommand()

    def search(self):
        """Call search subcommand."""
        parser = SubcommandParser.search()
        args = parser.parse_args(self._subcommand_args)
        sub.synteny_search(args)

    def build(self):
        """Call build subcommand."""
        parser = SubcommandParser.build()
        args = parser.parse_args(self._subcommand_args)
        sub.build_database(args)

    def parse(self):
        """Call parse subcommand."""
        parser = SubcommandParser.parse()
        args = parser.parse_args(self._subcommand_args)
        sub.parse_gene_ids(args)

    def download(self):
        """Call download subcommand."""
        parser = SubcommandParser.download()
        args = parser.parse_args(self._subcommand_args)
        sub.download_hmms(args)

    def cite(self):
        """Print pynteny's citation string"""
        parser = SubcommandParser.cite()
        args = parser.parse_args(self._subcommand_args)
        args.version = __version__
        sub.get_citation(args)

__init__(subcommand, subcommand_args)

Initialize main command

Parameters:

Name Type Description Default
subcommand str

subcommand name

required
subcommand_args list[str]

list of subcommand arguments and values.

required
Source code in src/pynteny/cli.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def __init__(self, subcommand: str, subcommand_args: list[str]):
    """Initialize main command

    Args:
        subcommand (str): subcommand name
        subcommand_args (list[str]): list of subcommand arguments and values.
    """
    self._subcommand = subcommand
    if "--hmmsearch_args" in subcommand_args:
        hmm_arg_idx = subcommand_args.index("--hmmsearch_args") + 1
        subcommand_args[hmm_arg_idx] = " " + subcommand_args[hmm_arg_idx]
    self._subcommand_args = subcommand_args
    parser = argparse.ArgumentParser(
        description=(self._printLogo()),
        usage=("pynteny <subcommand> [-h] [args] \n"),
        epilog=(self._generate_cool_quotes()),
        formatter_class=argparse.RawTextHelpFormatter,
    )
    parser._positionals.title = "subcommands"
    parser.add_argument(
        help=("search \n" "build \n" "parse \n" "download \n" "cite \n"),
        dest="subcommand",
        metavar="",
    )
    parser.add_argument(
        "-v",
        "--version",
        help="show version and exit",
        action="version",
        version=__version__,
    )
    if len(sys.argv) < 2:
        parser.print_help()
        sys.exit(1)
    args = parser.parse_args(self._subcommand)
    input_subcommand = getattr(args, "subcommand")
    ConfigParser.initialize_config_file()
    self._call_subcommand(subcommand_name=input_subcommand)

build()

Call build subcommand.

Source code in src/pynteny/cli.py
104
105
106
107
108
def build(self):
    """Call build subcommand."""
    parser = SubcommandParser.build()
    args = parser.parse_args(self._subcommand_args)
    sub.build_database(args)

cite()

Print pynteny's citation string

Source code in src/pynteny/cli.py
122
123
124
125
126
127
def cite(self):
    """Print pynteny's citation string"""
    parser = SubcommandParser.cite()
    args = parser.parse_args(self._subcommand_args)
    args.version = __version__
    sub.get_citation(args)

download()

Call download subcommand.

Source code in src/pynteny/cli.py
116
117
118
119
120
def download(self):
    """Call download subcommand."""
    parser = SubcommandParser.download()
    args = parser.parse_args(self._subcommand_args)
    sub.download_hmms(args)

parse()

Call parse subcommand.

Source code in src/pynteny/cli.py
110
111
112
113
114
def parse(self):
    """Call parse subcommand."""
    parser = SubcommandParser.parse()
    args = parser.parse_args(self._subcommand_args)
    sub.parse_gene_ids(args)

search()

Call search subcommand.

Source code in src/pynteny/cli.py
 98
 99
100
101
102
def search(self):
    """Call search subcommand."""
    parser = SubcommandParser.search()
    args = parser.parse_args(self._subcommand_args)
    sub.synteny_search(args)

SubcommandParser

Argparse parsers for Pynteny's subcommands

Source code in src/pynteny/cli.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
class SubcommandParser:
    """Argparse parsers for Pynteny's subcommands"""

    @staticmethod
    def get_help_str(subcommand: str) -> str:
        """Get help string for subcommand.

        Args:
            subcommand (str): subcommand name.

        Returns:
            str: help string.
        """
        parser = getattr(SubcommandParser, subcommand)()
        with tempfile.NamedTemporaryFile(mode="w+") as file:
            parser.print_help(file)
            file.flush()
            with open(file.name, encoding="UTF-8") as help_file:
                help_str = help_file.read()
        return help_str

    @staticmethod
    def search() -> argparse.ArgumentParser:
        """Parser for the search subcommand.

        Returns:
            argparse.ArgumentParser: ArgumentParser object.
        """
        parser = argparse.ArgumentParser(
            description=(
                "Query sequence database for HMM hits arranged in provided synteny structure."
            ),
            usage=("pynteny search [-h] [args] \n"),
            epilog="  \n",
            formatter_class=argparse.RawTextHelpFormatter,
        )

        optional = parser._action_groups.pop()
        required = parser.add_argument_group("required arguments")
        parser._action_groups.append(optional)

        required.add_argument(
            "-s",
            "--synteny_struc",
            metavar="",
            dest="synteny_struc",
            type=str,
            required=True,
            help=(
                "string displaying hmm structure to search for, such as: \n"
                " \n"
                "'>hmm_a n_ab <hmm_b n_bc hmm_c'\n"
                " \n"
                "where '>' indicates a hmm target located on the positive strand, \n"
                "'<' a target located on the negative strand, and n_ab cooresponds \n"
                "to the maximum number of genes separating matched genes a and b. \n"
                "Multiple hmms may be employed. \n"
                "No order symbol in a hmm indicates that results should be independent \n"
                "of strand location. "
            ),
        )
        required.add_argument(
            "-i",
            "--data",
            dest="data",
            metavar="",
            type=Path,
            required=True,
            help=(
                "path to fasta file containing peptide database. \n"
                "Record labels must follow the format specified in docs \n"
                "(see section: General Usage). Pynteny build subcommand exports \n"
                "the generated database in the correct format"
            ),
        )
        optional.add_argument(
            "-d",
            "--hmm_dir",
            dest="hmm_dir",
            type=Path,
            metavar="",
            required=False,
            default=None,
            help=(
                "path to directory containing hmm (i.e, tigrfam or pfam) models. \n"
                "IMPORTANT: the directory must contain one hmm per file, and the file \n"
                "name must coincide with the hmm name that will be displayed in the synteny structure. \n"
                "The directory can contain more hmm models than used in the synteny structure. \n"
                "It may also be the path to a compressed (tar, tar.gz, tgz) directory. \n"
                "If not provided, hmm models (PGAP database) will be downloaded from the NCBI. \n"
                "(if not already downloaded)"
            ),
        )
        optional.add_argument(
            "-o",
            "--outdir",
            dest="outdir",
            type=Path,
            metavar="",
            help="path to output directory",
            default=None,
        )
        optional.add_argument(
            "-x",
            "--prefix",
            dest="prefix",
            type=str,
            metavar="",
            default="",
            help="prefix to be added to output files",
        )
        optional.add_argument(
            "-p",
            "--processes",
            dest="processes",
            type=int,
            metavar="",
            default=None,
            help="maximum number of processes available to HMMER. Defaults to all but one.",
        )
        optional.add_argument(
            "-a",
            "--hmmsearch_args",
            dest="hmmsearch_args",
            type=str,
            metavar="",
            default=None,
            required=False,
            help=(
                "list of comma-separated additional arguments to hmmsearch for each input hmm. \n"
                "A single argument may be provided, in which case the same additional argument \n"
                "is employed in all hmms."
            ),
        )
        optional.add_argument(
            "-g",
            "--gene_ids",
            dest="gene_ids",
            default=False,
            action="store_true",
            help=(
                "use gene symbols in synteny structure instead of HMM names. \n"
                "If set, a path to the hmm database metadata file must be provided \n"
                "in argument '--hmm_meta'"
            ),
        )
        optional.add_argument(
            "-u",
            "--unordered",
            dest="unordered",
            default=False,
            action="store_true",
            help=(
                "whether the HMMs should be arranged in the exact same order displayed \n"
                "in the synteny_structure or in  any order. If ordered, the filters will \n"
                "filter collinear rather than syntenic structures. \n"
                "If more than two HMMs are employed, the largest maximum distance among any \n"
                "pair is considered to run the search."
            ),
        )
        optional.add_argument(
            "-r",
            "--reuse",
            dest="reuse",
            default=False,
            action="store_true",
            help=(
                "reuse hmmsearch result table in following synteny searches. \n"
                "Do not delete hmmer_outputs subdirectory for this option to work."
            ),
        )
        optional.add_argument(
            "-m",
            "--hmm_meta",
            dest="hmm_meta",
            type=Path,
            default=None,
            metavar="",
            required=False,
            help="path to hmm database metadata file",
        )
        optional.add_argument(
            "-l",
            "--log",
            dest="logfile",
            type=Path,
            default=None,
            metavar="",
            required=False,
            help="path to log file. Log not written by default.",
        )
        return parser

    @staticmethod
    def build() -> argparse.ArgumentParser:
        """Parser for the build subcommand.

        Returns:
            argparse.ArgumentParser: ArgumentParser object.
        """
        parser = argparse.ArgumentParser(
            description=(
                "Translate nucleotide assembly file and assign contig and gene location info \n"
                "to each identified ORF (using prodigal). Label predicted ORFs according to \n"
                "positional info and export a fasta file containing predicted and translated ORFs. \n"
                "Alternatively, extract peptide sequences from GenBank file containing ORF annotations \n"
                "and write labelled peptide sequences to a fasta file."
            ),
            usage=("pynteny build [-h] [args] \n"),
            epilog="  \n",
            formatter_class=argparse.RawTextHelpFormatter,
        )

        optional = parser._action_groups.pop()
        required = parser.add_argument_group("required arguments")
        parser._action_groups.append(optional)

        required.add_argument(
            "-i",
            "--data",
            dest="data",
            type=Path,
            required=True,
            metavar="",
            help=(
                "path to assembly input nucleotide data or annotated GenBank file. \n"
                "It can be a single file or a directory of files (either of FASTA or GeneBank format). \n"
                "If a directory, file name can be prepended to the label of each translated peptide \n"
                "originally coming from that file by default (i.e., to track the genome of origin) \n"
                "with the flag '--prepend-filename.'"
            ),
        )
        optional.add_argument(
            "-x",
            "--prepend-filename",
            dest="prepend",
            default=False,
            action="store_true",
            help=(
                "whether to prepend file name to peptide sequences when a directory \n"
                "with multiple fasta or genbank files is used as input."
            ),
        )
        optional.add_argument(
            "-o",
            "--outfile",
            dest="outfile",
            type=Path,
            metavar="",
            default=None,
            help=(
                "path to output (labelled peptide database) file. Defaults to \n"
                "file in directory of input data."
            ),
        )
        optional.add_argument(
            "-p",
            "--processes",
            dest="processes",
            type=int,
            metavar="",
            required=False,
            default=None,
            help=("set maximum number of processes. Defaults to all but one."),
        )
        optional.add_argument(
            "-t",
            "--tempdir",
            dest="tempdir",
            type=Path,
            default=None,
            metavar="",
            required=False,
            help="path to directory to store temporary files. Defaults to system temp directory.",
        )
        optional.add_argument(
            "-l",
            "--log",
            dest="logfile",
            type=Path,
            default=None,
            metavar="",
            required=False,
            help="path to log file. Log not written by default.",
        )
        return parser

    @staticmethod
    def parse() -> argparse.ArgumentParser:
        """Parser for the parse subcommand.

        Returns:
            argparse.ArgumentParser: ArgumentParser object.
        """
        parser = argparse.ArgumentParser(
            description=(
                "Translate synteny structure with gene symbols into one with\n"
                "HMM groups, according to provided HMM database."
            ),
            usage=("pynteny parse [-h] [args] \n"),
            epilog="  \n",
            formatter_class=argparse.RawTextHelpFormatter,
        )

        optional = parser._action_groups.pop()
        required = parser.add_argument_group("required arguments")
        parser._action_groups.append(optional)

        required.add_argument(
            "-s",
            "--synteny_struc",
            dest="synteny_struc",
            type=str,
            required=True,
            metavar="",
            help=("synteny structure containing gene symbols instead of HMMs"),
        )
        optional.add_argument(
            "-m",
            "--hmm_meta",
            dest="hmm_meta",
            type=Path,
            metavar="",
            required=False,
            help=(
                "path to hmm database metadata file. If already donwloaded with \n"
                "pynteny downloaded, hmm meta file is retrieved from default location."
            ),
        )
        optional.add_argument(
            "-l",
            "--log",
            dest="logfile",
            type=Path,
            default=None,
            metavar="",
            required=False,
            help="path to log file. Log not written by default.",
        )
        return parser

    @staticmethod
    def download() -> argparse.ArgumentParser:
        """Parser for the download subcommand.

        Returns:
            argparse.ArgumentParser: ArgumentParser object.
        """
        parser = argparse.ArgumentParser(
            description=("Download HMM database from NCBI."),
            epilog="  \n",
            usage=("pynteny download [-h] [args] \n"),
            formatter_class=argparse.RawTextHelpFormatter,
        )

        optional = parser._action_groups.pop()
        required = parser.add_argument_group("required arguments")
        parser._action_groups.append(optional)

        required.add_argument(
            "-o",
            "--outdir",
            dest="outdir",
            type=Path,
            metavar="",
            required=False,
            default=None,
            help=(
                "path to directory where to download HMM database. \n"
                "Note, if database is relocated after download, Pynteny won't be able to find it automatically, \n"
                "You can either: delete and download again in the new location or manually indicate database and \n"
                "meta file location in Pynteny search."
            ),
        )
        optional.add_argument(
            "-u",
            "--unpack",
            dest="unpack",
            default=False,
            action="store_true",
            help="unpack originally compressed database files",
        )
        optional.add_argument(
            "-f",
            "--force",
            dest="force",
            default=False,
            action="store_true",
            help="force-download database again if already downloaded",
        )
        optional.add_argument(
            "-l",
            "--log",
            dest="logfile",
            type=Path,
            default=None,
            metavar="",
            required=False,
            help="path to log file. Log not written by default.",
        )
        return parser

    @staticmethod
    def cite() -> argparse.ArgumentParser:
        """Parser for the cite subcommand.

        Returns:
            argparse.ArgumentParser: ArgumentParser object.
        """
        parser = argparse.ArgumentParser(
            description=("Print pynteny's citation string."),
            epilog="  \n",
            usage=("pynteny cite [-h] \n"),
            formatter_class=argparse.RawTextHelpFormatter,
        )
        optional = parser._action_groups.pop()
        parser.add_argument_group("required arguments")
        parser._action_groups.append(optional)
        return parser

build() staticmethod

Parser for the build subcommand.

Returns:

Type Description
argparse.ArgumentParser

argparse.ArgumentParser: ArgumentParser object.

Source code in src/pynteny/cli.py
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
@staticmethod
def build() -> argparse.ArgumentParser:
    """Parser for the build subcommand.

    Returns:
        argparse.ArgumentParser: ArgumentParser object.
    """
    parser = argparse.ArgumentParser(
        description=(
            "Translate nucleotide assembly file and assign contig and gene location info \n"
            "to each identified ORF (using prodigal). Label predicted ORFs according to \n"
            "positional info and export a fasta file containing predicted and translated ORFs. \n"
            "Alternatively, extract peptide sequences from GenBank file containing ORF annotations \n"
            "and write labelled peptide sequences to a fasta file."
        ),
        usage=("pynteny build [-h] [args] \n"),
        epilog="  \n",
        formatter_class=argparse.RawTextHelpFormatter,
    )

    optional = parser._action_groups.pop()
    required = parser.add_argument_group("required arguments")
    parser._action_groups.append(optional)

    required.add_argument(
        "-i",
        "--data",
        dest="data",
        type=Path,
        required=True,
        metavar="",
        help=(
            "path to assembly input nucleotide data or annotated GenBank file. \n"
            "It can be a single file or a directory of files (either of FASTA or GeneBank format). \n"
            "If a directory, file name can be prepended to the label of each translated peptide \n"
            "originally coming from that file by default (i.e., to track the genome of origin) \n"
            "with the flag '--prepend-filename.'"
        ),
    )
    optional.add_argument(
        "-x",
        "--prepend-filename",
        dest="prepend",
        default=False,
        action="store_true",
        help=(
            "whether to prepend file name to peptide sequences when a directory \n"
            "with multiple fasta or genbank files is used as input."
        ),
    )
    optional.add_argument(
        "-o",
        "--outfile",
        dest="outfile",
        type=Path,
        metavar="",
        default=None,
        help=(
            "path to output (labelled peptide database) file. Defaults to \n"
            "file in directory of input data."
        ),
    )
    optional.add_argument(
        "-p",
        "--processes",
        dest="processes",
        type=int,
        metavar="",
        required=False,
        default=None,
        help=("set maximum number of processes. Defaults to all but one."),
    )
    optional.add_argument(
        "-t",
        "--tempdir",
        dest="tempdir",
        type=Path,
        default=None,
        metavar="",
        required=False,
        help="path to directory to store temporary files. Defaults to system temp directory.",
    )
    optional.add_argument(
        "-l",
        "--log",
        dest="logfile",
        type=Path,
        default=None,
        metavar="",
        required=False,
        help="path to log file. Log not written by default.",
    )
    return parser

cite() staticmethod

Parser for the cite subcommand.

Returns:

Type Description
argparse.ArgumentParser

argparse.ArgumentParser: ArgumentParser object.

Source code in src/pynteny/cli.py
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
@staticmethod
def cite() -> argparse.ArgumentParser:
    """Parser for the cite subcommand.

    Returns:
        argparse.ArgumentParser: ArgumentParser object.
    """
    parser = argparse.ArgumentParser(
        description=("Print pynteny's citation string."),
        epilog="  \n",
        usage=("pynteny cite [-h] \n"),
        formatter_class=argparse.RawTextHelpFormatter,
    )
    optional = parser._action_groups.pop()
    parser.add_argument_group("required arguments")
    parser._action_groups.append(optional)
    return parser

download() staticmethod

Parser for the download subcommand.

Returns:

Type Description
argparse.ArgumentParser

argparse.ArgumentParser: ArgumentParser object.

Source code in src/pynteny/cli.py
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
@staticmethod
def download() -> argparse.ArgumentParser:
    """Parser for the download subcommand.

    Returns:
        argparse.ArgumentParser: ArgumentParser object.
    """
    parser = argparse.ArgumentParser(
        description=("Download HMM database from NCBI."),
        epilog="  \n",
        usage=("pynteny download [-h] [args] \n"),
        formatter_class=argparse.RawTextHelpFormatter,
    )

    optional = parser._action_groups.pop()
    required = parser.add_argument_group("required arguments")
    parser._action_groups.append(optional)

    required.add_argument(
        "-o",
        "--outdir",
        dest="outdir",
        type=Path,
        metavar="",
        required=False,
        default=None,
        help=(
            "path to directory where to download HMM database. \n"
            "Note, if database is relocated after download, Pynteny won't be able to find it automatically, \n"
            "You can either: delete and download again in the new location or manually indicate database and \n"
            "meta file location in Pynteny search."
        ),
    )
    optional.add_argument(
        "-u",
        "--unpack",
        dest="unpack",
        default=False,
        action="store_true",
        help="unpack originally compressed database files",
    )
    optional.add_argument(
        "-f",
        "--force",
        dest="force",
        default=False,
        action="store_true",
        help="force-download database again if already downloaded",
    )
    optional.add_argument(
        "-l",
        "--log",
        dest="logfile",
        type=Path,
        default=None,
        metavar="",
        required=False,
        help="path to log file. Log not written by default.",
    )
    return parser

get_help_str(subcommand) staticmethod

Get help string for subcommand.

Parameters:

Name Type Description Default
subcommand str

subcommand name.

required

Returns:

Name Type Description
str str

help string.

Source code in src/pynteny/cli.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
@staticmethod
def get_help_str(subcommand: str) -> str:
    """Get help string for subcommand.

    Args:
        subcommand (str): subcommand name.

    Returns:
        str: help string.
    """
    parser = getattr(SubcommandParser, subcommand)()
    with tempfile.NamedTemporaryFile(mode="w+") as file:
        parser.print_help(file)
        file.flush()
        with open(file.name, encoding="UTF-8") as help_file:
            help_str = help_file.read()
    return help_str

parse() staticmethod

Parser for the parse subcommand.

Returns:

Type Description
argparse.ArgumentParser

argparse.ArgumentParser: ArgumentParser object.

Source code in src/pynteny/cli.py
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
@staticmethod
def parse() -> argparse.ArgumentParser:
    """Parser for the parse subcommand.

    Returns:
        argparse.ArgumentParser: ArgumentParser object.
    """
    parser = argparse.ArgumentParser(
        description=(
            "Translate synteny structure with gene symbols into one with\n"
            "HMM groups, according to provided HMM database."
        ),
        usage=("pynteny parse [-h] [args] \n"),
        epilog="  \n",
        formatter_class=argparse.RawTextHelpFormatter,
    )

    optional = parser._action_groups.pop()
    required = parser.add_argument_group("required arguments")
    parser._action_groups.append(optional)

    required.add_argument(
        "-s",
        "--synteny_struc",
        dest="synteny_struc",
        type=str,
        required=True,
        metavar="",
        help=("synteny structure containing gene symbols instead of HMMs"),
    )
    optional.add_argument(
        "-m",
        "--hmm_meta",
        dest="hmm_meta",
        type=Path,
        metavar="",
        required=False,
        help=(
            "path to hmm database metadata file. If already donwloaded with \n"
            "pynteny downloaded, hmm meta file is retrieved from default location."
        ),
    )
    optional.add_argument(
        "-l",
        "--log",
        dest="logfile",
        type=Path,
        default=None,
        metavar="",
        required=False,
        help="path to log file. Log not written by default.",
    )
    return parser

search() staticmethod

Parser for the search subcommand.

Returns:

Type Description
argparse.ArgumentParser

argparse.ArgumentParser: ArgumentParser object.

Source code in src/pynteny/cli.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
@staticmethod
def search() -> argparse.ArgumentParser:
    """Parser for the search subcommand.

    Returns:
        argparse.ArgumentParser: ArgumentParser object.
    """
    parser = argparse.ArgumentParser(
        description=(
            "Query sequence database for HMM hits arranged in provided synteny structure."
        ),
        usage=("pynteny search [-h] [args] \n"),
        epilog="  \n",
        formatter_class=argparse.RawTextHelpFormatter,
    )

    optional = parser._action_groups.pop()
    required = parser.add_argument_group("required arguments")
    parser._action_groups.append(optional)

    required.add_argument(
        "-s",
        "--synteny_struc",
        metavar="",
        dest="synteny_struc",
        type=str,
        required=True,
        help=(
            "string displaying hmm structure to search for, such as: \n"
            " \n"
            "'>hmm_a n_ab <hmm_b n_bc hmm_c'\n"
            " \n"
            "where '>' indicates a hmm target located on the positive strand, \n"
            "'<' a target located on the negative strand, and n_ab cooresponds \n"
            "to the maximum number of genes separating matched genes a and b. \n"
            "Multiple hmms may be employed. \n"
            "No order symbol in a hmm indicates that results should be independent \n"
            "of strand location. "
        ),
    )
    required.add_argument(
        "-i",
        "--data",
        dest="data",
        metavar="",
        type=Path,
        required=True,
        help=(
            "path to fasta file containing peptide database. \n"
            "Record labels must follow the format specified in docs \n"
            "(see section: General Usage). Pynteny build subcommand exports \n"
            "the generated database in the correct format"
        ),
    )
    optional.add_argument(
        "-d",
        "--hmm_dir",
        dest="hmm_dir",
        type=Path,
        metavar="",
        required=False,
        default=None,
        help=(
            "path to directory containing hmm (i.e, tigrfam or pfam) models. \n"
            "IMPORTANT: the directory must contain one hmm per file, and the file \n"
            "name must coincide with the hmm name that will be displayed in the synteny structure. \n"
            "The directory can contain more hmm models than used in the synteny structure. \n"
            "It may also be the path to a compressed (tar, tar.gz, tgz) directory. \n"
            "If not provided, hmm models (PGAP database) will be downloaded from the NCBI. \n"
            "(if not already downloaded)"
        ),
    )
    optional.add_argument(
        "-o",
        "--outdir",
        dest="outdir",
        type=Path,
        metavar="",
        help="path to output directory",
        default=None,
    )
    optional.add_argument(
        "-x",
        "--prefix",
        dest="prefix",
        type=str,
        metavar="",
        default="",
        help="prefix to be added to output files",
    )
    optional.add_argument(
        "-p",
        "--processes",
        dest="processes",
        type=int,
        metavar="",
        default=None,
        help="maximum number of processes available to HMMER. Defaults to all but one.",
    )
    optional.add_argument(
        "-a",
        "--hmmsearch_args",
        dest="hmmsearch_args",
        type=str,
        metavar="",
        default=None,
        required=False,
        help=(
            "list of comma-separated additional arguments to hmmsearch for each input hmm. \n"
            "A single argument may be provided, in which case the same additional argument \n"
            "is employed in all hmms."
        ),
    )
    optional.add_argument(
        "-g",
        "--gene_ids",
        dest="gene_ids",
        default=False,
        action="store_true",
        help=(
            "use gene symbols in synteny structure instead of HMM names. \n"
            "If set, a path to the hmm database metadata file must be provided \n"
            "in argument '--hmm_meta'"
        ),
    )
    optional.add_argument(
        "-u",
        "--unordered",
        dest="unordered",
        default=False,
        action="store_true",
        help=(
            "whether the HMMs should be arranged in the exact same order displayed \n"
            "in the synteny_structure or in  any order. If ordered, the filters will \n"
            "filter collinear rather than syntenic structures. \n"
            "If more than two HMMs are employed, the largest maximum distance among any \n"
            "pair is considered to run the search."
        ),
    )
    optional.add_argument(
        "-r",
        "--reuse",
        dest="reuse",
        default=False,
        action="store_true",
        help=(
            "reuse hmmsearch result table in following synteny searches. \n"
            "Do not delete hmmer_outputs subdirectory for this option to work."
        ),
    )
    optional.add_argument(
        "-m",
        "--hmm_meta",
        dest="hmm_meta",
        type=Path,
        default=None,
        metavar="",
        required=False,
        help="path to hmm database metadata file",
    )
    optional.add_argument(
        "-l",
        "--log",
        dest="logfile",
        type=Path,
        default=None,
        metavar="",
        required=False,
        help="path to log file. Log not written by default.",
    )
    return parser

Module subcommands

Functions containing CLI subcommands

build_database(args)

Build annotated peptide database from input assembly or GenBank data.

Parameters:

Name Type Description Default
args Union[CommandArgs, ArgumentParser]

arguments object.

required
Source code in src/pynteny/subcommands.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def build_database(args: Union[CommandArgs, ArgumentParser]) -> None:
    """Build annotated peptide database from input assembly
    or GenBank data.

    Args:
        args (Union[CommandArgs, ArgumentParser]): arguments object.
    """
    logger = init_logger(args)
    logger.info("Building annotated peptide database")
    if args.processes is None:
        args.processes = os.cpu_count() - 1
    if args.data.is_dir() and args.prepend:
        prepend_file_name = True
    else:
        prepend_file_name = False

    database = Database(args.data)
    database.build(
        output_file=args.outfile,
        prepend_file_name=prepend_file_name,
        processes=args.processes,
        tempdir=args.tempdir,
    )
    logger.info("Database built successfully!")
    logging.shutdown()

download_hmms(args)

Download HMM (PGAP) database from NCBI.

Parameters:

Name Type Description Default
args Union[CommandArgs, ArgumentParser]

arguments object.

required
Source code in src/pynteny/subcommands.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
def download_hmms(args: Union[CommandArgs, ArgumentParser]) -> None:
    """Download HMM (PGAP) database from NCBI.

    Args:
        args (Union[CommandArgs, ArgumentParser]): arguments object.
    """
    logger = init_logger(args)
    config = ConfigParser.get_default_config()
    if (config.get_field("data_downloaded")) and (not args.force):
        logger.info("PGAP database already downloaded. Skipping download")
        sys.exit(1)
    if args.outdir is None:
        logger.error("Please, provide directory in which to download PGAP database.")
        sys.exit(1)
    else:
        download_dir = Path(args.outdir).absolute()
    if not download_dir.exists():
        download_dir.mkdir(parents=True, exist_ok=True)

    config.update_config("database_dir", download_dir.as_posix())
    config.update_config("upack_PGAP_database", args.unpack)

    data_url = "https://ftp.ncbi.nlm.nih.gov/hmm/current/hmm_PGAP.HMM.tgz"
    meta_url = "https://ftp.ncbi.nlm.nih.gov/hmm/current/hmm_PGAP.tsv"
    logger.info("Downloading PGAP database")
    try:
        PGAP_file = download_dir / "hmm_PGAP.HMM.tgz"
        meta_file = download_dir / "hmm_PGAP.tsv"
        download_file(data_url, PGAP_file)
        download_file(meta_url, meta_file)
        logger.info("Database dowloaded successfully\n")
        config.update_config("data_downloaded", True)
        config.update_config("PGAP_database", PGAP_file.as_posix())
        config.update_config("PGAP_meta_file", meta_file.as_posix())
    except Exception:
        logger.exception(
            "Failed to download PGAP database. Please check your internet connection."
        )
        sys.exit(1)
    logger.info("Removing missing entries from PGAP metadata file")
    PGAP(meta_file).remove_missing_HMMs_from_metadata(PGAP_file, meta_file)
    if args.unpack:
        logger.info("Unpacking PGAP database")
        unpacked_PGAP_dir = download_dir / "hmm_PGAP"
        PGAP.extract_PGAP_to_directory(PGAP_file, output_dir=unpacked_PGAP_dir)
        os.remove(PGAP_file)
        config.update_config("PGAP_database", unpacked_PGAP_dir.as_posix())
        logger.info("PGAP database unpacked successfully")
    logging.shutdown()

get_citation(args, silent=False)

Get Pynteny citation string.

Parameters:

Name Type Description Default
args argparse.ArgumentParser

arguments object.

required
silent bool

do not print to terminal. Defaults to False.

False

Returns:

Name Type Description
str str

Pyntey citation text.

Source code in src/pynteny/subcommands.py
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
def get_citation(args: Union[CommandArgs, ArgumentParser], silent: bool = False) -> str:
    """Get Pynteny citation string.

    Args:
        args (argparse.ArgumentParser): arguments object.
        silent (bool, optional): do not print to terminal.
            Defaults to False.

    Returns:
        str: Pyntey citation text.
    """
    citation = (
        "Semidán Robaina Estévez (2022). Pynteny: synteny-aware hmm searches made easy"
        f"(Version {args.version}). Zenodo. https://doi.org/10.5281/zenodo.7048685"
    )
    if not silent:
        print("If you use this software, please cite it as below: ")
        print(citation)
    return citation

init_logger(args)

Initialize logger object

Parameters:

Name Type Description Default
args Union[CommandArgs, ArgumentParser]

arguments object

required

Returns:

Type Description
logging.Logger

logging.Logger: initialized logger object

Source code in src/pynteny/subcommands.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def init_logger(args: Union[CommandArgs, ArgumentParser]) -> logging.Logger:
    """Initialize logger object

    Args:
        args (Union[CommandArgs, ArgumentParser]): arguments object

    Returns:
        logging.Logger: initialized logger object
    """
    if args.logfile is None:
        args.logfile = Path(os.devnull)
    elif not Path(args.logfile.parent).exists():
        Path(args.logfile.parent).mkdir(parents=True)
    logging.basicConfig(
        format="%(asctime)s | %(levelname)s: %(message)s",
        handlers=[logging.FileHandler(args.logfile), logging.StreamHandler(sys.stdout)],
        level=logging.NOTSET,
    )
    logger = logging.getLogger(__name__)
    return logger

parse_gene_ids(args)

Convert gene symbols to hmm names.

Parameters:

Name Type Description Default
args Union[CommandArgs, ArgumentParser]

arguments object.

required

Returns:

Name Type Description
str str

synteny structure where gene symbols are replaced by HMM names.

Source code in src/pynteny/subcommands.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
def parse_gene_ids(args: Union[CommandArgs, ArgumentParser]) -> str:
    """Convert gene symbols to hmm names.

    Args:
        args (Union[CommandArgs, ArgumentParser]): arguments object.

    Returns:
        str: synteny structure where gene symbols are replaced
            by HMM names.
    """
    logger = init_logger(args)
    config = ConfigParser.get_default_config()
    if args.hmm_meta is None:
        if not config.get_field("data_downloaded"):
            logger.error(
                "Please download hmm database meta file or provide path to existing one first."
            )
            sys.exit(1)
        else:
            args.hmm_meta = Path(config.get_field("PGAP_meta_file"))
    (
        gene_synteny_struc,
        gene_to_hmm_group,
    ) = syntenyparser.parse_genes_in_synteny_structure(
        synteny_structure=args.synteny_struc, hmm_meta=args.hmm_meta
    )
    logger.info(
        f'Translated \n "{args.synteny_struc}" \n to \n "{gene_synteny_struc}" \n according to provided HMM database metadata'
    )
    logging.shutdown()
    return gene_synteny_struc

Search peptide database by synteny structure containing HMMs.

Parameters:

Name Type Description Default
args Union[CommandArgs, ArgumentParser]

arguments object.

required

Returns:

Name Type Description
SyntenyHits SyntenyHits

instance of SyntenyHits.

Source code in src/pynteny/subcommands.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def synteny_search(args: Union[CommandArgs, ArgumentParser]) -> SyntenyHits:
    """Search peptide database by synteny structure containing HMMs.

    Args:
        args (Union[CommandArgs, ArgumentParser]): arguments object.

    Returns:
        SyntenyHits: instance of SyntenyHits.
    """
    logger = init_logger(args)
    if not Path(args.data).exists():
        logger.error("Sequence data file does not exist")
        sys.exit(1)

    config = ConfigParser.get_default_config()
    args.synteny_struc = syntenyparser.reformat_synteny_structure(args.synteny_struc)
    if not syntenyparser.is_valid_structure(args.synteny_struc):
        logger.error(
            (
                f"Invalid synteny structure format: {args.synteny_struc}. "
                "Execute pynteny search --help to see the right format."
            )
        )
        sys.exit(1)
    if args.hmm_dir is None:
        if not config.get_field("data_downloaded"):
            logger.warning(
                "HMM database not found. Downloading PGAP database from NCBI"
            )
            down_args = CommandArgs(unpack=True, outdir=None, logfile=None)
            download_hmms(down_args)
        else:
            args.hmm_dir = Path(config.get_field("PGAP_database"))
    if args.gene_ids:
        if args.hmm_meta is None:
            if not config.get_field("data_downloaded"):
                logger.error(
                    "Please download hmm database first or provide path to hmm metadata file."
                )
                sys.exit(1)
            else:
                args.hmm_meta = Path(config.get_field("PGAP_meta_file"))
        logger.info("Finding matching HMMs for gene symbols")
        (
            gene_synteny_struc,
            gene_to_hmm_group,
        ) = syntenyparser.parse_genes_in_synteny_structure(
            synteny_structure=args.synteny_struc, hmm_meta=args.hmm_meta
        )
        args.synteny_struc = gene_synteny_struc
        logger.info(
            f"Found the following HMMs in database for given structure:\n{gene_synteny_struc}"
        )

    temp_hmm_dir = Path(args.hmm_dir.parent) / "temp_hmm_dir"
    if is_tar_file(args.hmm_dir):
        PGAP.extract_PGAP_to_directory(args.hmm_dir, temp_hmm_dir)
        hmm_dir = temp_hmm_dir
    else:
        hmm_dir = args.hmm_dir

    hmm_names = syntenyparser.get_all_HMMs_in_structure(args.synteny_struc)
    unique_hmm_names = np.unique(hmm_names)
    input_hmms = [
        file
        for file in hmm_dir.iterdir()
        if any([hmm_name in file.as_posix() for hmm_name in hmm_names])
    ]
    if len(input_hmms) < len(unique_hmm_names):
        logger.error(
            "Not all HMMs in synteny structure found in HMM directory. "
            "Remember to include '--gene_ids' option if you want to search by gene symbols."
        )
        sys.exit(1)
    if args.outdir is None:
        args.outdir = Path(args.data.parent)
    if not args.outdir.exists():
        args.outdir.mkdir(parents=True, exist_ok=True)
    if args.hmmsearch_args is None:
        hmmsearch_args = ",".join(["None" for _ in input_hmms])
    else:
        hmmsearch_args = args.hmmsearch_args
    hmmsearch_args = list(map(lambda x: x.strip(), hmmsearch_args.split(",")))
    hmmsearch_args = list(map(lambda x: None if x == "None" else x, hmmsearch_args))
    hmmer_output_dir = os.path.join(args.outdir, "hmmer_outputs/")
    synteny_table = args.outdir / f"{args.prefix}synteny_matched.tsv"

    logger.info("Searching database by synteny structure")
    synteny_hits = filter_FASTA_by_synteny_structure(
        synteny_structure=args.synteny_struc,
        unordered=args.unordered,
        input_fasta=args.data,
        input_hmms=input_hmms,
        hmm_meta=args.hmm_meta,
        hmmer_output_dir=hmmer_output_dir,
        reuse_hmmer_results=args.reuse,
        method="hmmsearch",
        processes=args.processes,
        additional_args=hmmsearch_args,
    )
    synteny_hits.write_to_TSV(synteny_table)
    logger.info("Writing matching sequences to FASTA files")
    synteny_hits.write_hit_sequences_to_FASTA_files(
        sequence_database=args.data, output_dir=args.outdir, output_prefix=args.prefix
    )
    if temp_hmm_dir.exists():
        shutil.rmtree(temp_hmm_dir)
    logger.info("Finished!")
    logging.shutdown()
    return synteny_hits

Module utils

Functions and classes for general purposes

CommandArgs

Base class to hold command line arguments.

Source code in src/pynteny/utils.py
27
28
29
30
31
class CommandArgs:
    """Base class to hold command line arguments."""

    def __init__(self, **kwargs):
        self.__dict__.update(kwargs)

ConfigParser

Handle Pynteny configuration file.

Source code in src/pynteny/utils.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
class ConfigParser:
    """Handle Pynteny configuration file."""

    def __init__(self, config_file: Path) -> None:
        self._config_file = Path(config_file)
        self._config = self.get_config()

    @classmethod
    def get_default_config(cls):
        """Initialize ConfigParser with default config file."""
        return cls(cls.initialize_config_file())

    @staticmethod
    def initialize_config_file() -> Path:
        """Initialize empty config file.

        Returns:
            Path: path to generated config file.
        """
        config_file = Path(__file__).parent / "config.json"
        if not config_file.exists():
            config = {
                "database_dir": "",
                "upack_PGAP_database": False,
                "data_downloaded": False,
                "PGAP_database": "",
                "PGAP_meta_file": "",
                "streamlit_process": "",
            }
            with open(config_file, "w", encoding="UTF-8") as f:
                json.dump(config, f, indent=4)
        return config_file

    def get_config_path(self) -> Path:
        """Show config file path."""
        return self._config_file

    def get_config(self) -> dict:
        """Load config file.

        Returns:
            dict: dict containing fields and values of config file.
        """
        with open(self._config_file, "r", encoding="UTF-8") as file:
            config = json.loads(file.read())
        return config

    def write_config(self) -> None:
        """Write config dict to file."""
        with open(self._config_file, "w", encoding="UTF-8") as f:
            json.dump(self._config, f, indent=4)

    def update_config(self, key: str, value: str) -> None:
        """Update config file

        Args:
            key (str): config file key name to be updated.
            value (str): new value.
        """
        self._config[key] = value
        self.write_config()

    def get_field(self, key: str) -> str:
        """Get field from config file.

        Args:
            key (str): key name to get the value from.

        Returns:
            str: key value.
        """
        return self._config[key]

get_config()

Load config file.

Returns:

Name Type Description
dict dict

dict containing fields and values of config file.

Source code in src/pynteny/utils.py
71
72
73
74
75
76
77
78
79
def get_config(self) -> dict:
    """Load config file.

    Returns:
        dict: dict containing fields and values of config file.
    """
    with open(self._config_file, "r", encoding="UTF-8") as file:
        config = json.loads(file.read())
    return config

get_config_path()

Show config file path.

Source code in src/pynteny/utils.py
67
68
69
def get_config_path(self) -> Path:
    """Show config file path."""
    return self._config_file

get_default_config() classmethod

Initialize ConfigParser with default config file.

Source code in src/pynteny/utils.py
41
42
43
44
@classmethod
def get_default_config(cls):
    """Initialize ConfigParser with default config file."""
    return cls(cls.initialize_config_file())

get_field(key)

Get field from config file.

Parameters:

Name Type Description Default
key str

key name to get the value from.

required

Returns:

Name Type Description
str str

key value.

Source code in src/pynteny/utils.py
 96
 97
 98
 99
100
101
102
103
104
105
def get_field(self, key: str) -> str:
    """Get field from config file.

    Args:
        key (str): key name to get the value from.

    Returns:
        str: key value.
    """
    return self._config[key]

initialize_config_file() staticmethod

Initialize empty config file.

Returns:

Name Type Description
Path Path

path to generated config file.

Source code in src/pynteny/utils.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@staticmethod
def initialize_config_file() -> Path:
    """Initialize empty config file.

    Returns:
        Path: path to generated config file.
    """
    config_file = Path(__file__).parent / "config.json"
    if not config_file.exists():
        config = {
            "database_dir": "",
            "upack_PGAP_database": False,
            "data_downloaded": False,
            "PGAP_database": "",
            "PGAP_meta_file": "",
            "streamlit_process": "",
        }
        with open(config_file, "w", encoding="UTF-8") as f:
            json.dump(config, f, indent=4)
    return config_file

update_config(key, value)

Update config file

Parameters:

Name Type Description Default
key str

config file key name to be updated.

required
value str

new value.

required
Source code in src/pynteny/utils.py
86
87
88
89
90
91
92
93
94
def update_config(self, key: str, value: str) -> None:
    """Update config file

    Args:
        key (str): config file key name to be updated.
        value (str): new value.
    """
    self._config[key] = value
    self.write_config()

write_config()

Write config dict to file.

Source code in src/pynteny/utils.py
81
82
83
84
def write_config(self) -> None:
    """Write config dict to file."""
    with open(self._config_file, "w", encoding="UTF-8") as f:
        json.dump(self._config, f, indent=4)

download_file(url, output_file)

Download file from url

Parameters:

Name Type Description Default
url str

url where file to be downloaded

required
output_file Path

path to downloaded file

required
Source code in src/pynteny/utils.py
201
202
203
204
205
206
207
208
209
210
211
212
213
def download_file(url: str, output_file: Path) -> None:
    """Download file from url

    Args:
        url (str): url where file to be downloaded
        output_file (Path): path to downloaded file
    """
    output_file = Path(output_file)
    with requests.get(url, stream=True) as r:
        total_length = int(r.headers.get("Content-Length"))
        with tqdm.wrapattr(r.raw, "read", total=total_length, desc="") as raw:
            with open(output_file, "wb") as output:
                shutil.copyfileobj(raw, output)

extract_tar_file(tar_file, dest_dir=None)

Extract tar or tar.gz files to dest_dir.

Parameters:

Name Type Description Default
tar_file Path

path to tar file.

required
dest_dir Path

path to destination directory to store the uncompressed file. Defaults to None.

None
Source code in src/pynteny/utils.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
def extract_tar_file(tar_file: Path, dest_dir: Path = None) -> None:
    """Extract tar or tar.gz files to dest_dir.

    Args:
        tar_file (Path): path to tar file.
        dest_dir (Path, optional): path to destination directory
            to store the uncompressed file. Defaults to None.
    """
    tar_file = Path(tar_file)
    if dest_dir is None:
        dest_dir = "."
    if (tar_file.as_posix().endswith("tar.gz")) or (
        tar_file.as_posix().endswith("tgz")
    ):
        tar = tarfile.open(tar_file, "r:gz")
        tar.extractall(path=dest_dir)
        tar.close()
    elif tar_file.as_posix().endswith("tar"):
        tar = tarfile.open(tar_file, "r:")
        tar.extractall(path=dest_dir)
        tar.close()
    else:
        logger.error("Input is not a tar file")
        sys.exit(1)

flatten_directory(directory)

Flatten directory, i.e. remove all subdirectories and copy all files to the top level directory.

Parameters:

Name Type Description Default
directory Path

path to directory.

required
Source code in src/pynteny/utils.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def flatten_directory(directory: Path) -> None:
    """Flatten directory, i.e. remove all subdirectories and
    copy all files to the top level directory.

    Args:
        directory (Path): path to directory.
    """
    directory = Path(directory)
    directory = directory.as_posix()
    for dirpath, _, filenames in os.walk(directory, topdown=False):
        for filename in filenames:
            i = 0
            source = os.path.join(dirpath, filename)
            target = os.path.join(directory, filename)
            while os.path.exists(target):
                i += 1
                file_parts = os.path.splitext(os.path.basename(filename))

                target = os.path.join(
                    directory,
                    file_parts[0] + "_" + str(i) + file_parts[1],
                )
            shutil.move(source, target)
        if dirpath != directory:
            os.rmdir(dirpath)

is_right_list_nested_type(list_object, inner_type)

Check if all elements in list are of the same type.

Parameters:

Name Type Description Default
list_object list

list containing elements.

required
inner_type type

type to be checked.

required

Returns:

Name Type Description
bool bool

whether list contains elements of the same specified type.

Source code in src/pynteny/utils.py
297
298
299
300
301
302
303
304
305
306
307
def is_right_list_nested_type(list_object: list, inner_type: type) -> bool:
    """Check if all elements in list are of the same type.

    Args:
        list_object (list): list containing elements.
        inner_type (type): type to be checked.

    Returns:
        bool: whether list contains elements of the same specified type.
    """
    return all(isinstance(x, inner_type) for x in list_object)

is_tar_file(tar_file)

Check whether file is tar-compressed.

Parameters:

Name Type Description Default
tar_file Path

path to file.

required

Returns:

Name Type Description
bool bool

whether file is compressed or not.

Source code in src/pynteny/utils.py
216
217
218
219
220
221
222
223
224
225
226
def is_tar_file(tar_file: Path) -> bool:
    """Check whether file is tar-compressed.

    Args:
        tar_file (Path): path to file.

    Returns:
        bool: whether file is compressed or not.
    """
    tar_file = Path(tar_file)
    return tar_file.is_file() and tarfile.is_tarfile(tar_file.as_posix())

list_tar_dir(tar_dir)

List files within tar or tar.gz directory.

Parameters:

Name Type Description Default
tar_dir Path

path to directory containing tar files.

required

Returns:

Name Type Description
list list

list of tar files.

Source code in src/pynteny/utils.py
255
256
257
258
259
260
261
262
263
264
265
266
267
def list_tar_dir(tar_dir: Path) -> list:
    """List files within tar or tar.gz directory.

    Args:
        tar_dir (Path): path to directory containing tar files.

    Returns:
        list: list of tar files.
    """
    tar_dir = Path(tar_dir)
    with tarfile.open(tar_dir, "r") as tar_obj:
        files = tar_obj.getnames()
    return files

parallelize_over_input_files(callable, input_list, processes=None, max_tasks_per_child=10, **callable_kwargs)

Parallelize callable over a set of input objects using a pool of workers. Inputs in input list are passed to the first argument of the callable. Additional callable named arguments may be passed.

Parameters:

Name Type Description Default
callable _type_

function to be run.

required
input_list list

list of inputs to callable.

required
n_processes int

maximum number of threads. Defaults to all minus one.

required
max_tasks_per_child int

maximum number of tasks per child process before is reset. Defaults to 10.

10
Source code in src/pynteny/utils.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def parallelize_over_input_files(
    callable,
    input_list: list,
    processes: int = None,
    max_tasks_per_child=10,
    **callable_kwargs,
) -> None:
    """Parallelize callable over a set of input objects using a pool
    of workers. Inputs in input list are passed to the first argument
    of the callable. Additional callable named arguments may be passed.

    Args:
        callable (_type_): function to be run.
        input_list (list): list of inputs to callable.
        n_processes (int, optional): maximum number of threads.
            Defaults to all minus one.
        max_tasks_per_child (int, optional): maximum number of tasks per child
            process before is reset. Defaults to 10.
    """
    if processes is None:
        processes = os.cpu_count - 1
    p = Pool(processes, maxtasksperchild=max_tasks_per_child)
    p.map(partial(callable, **callable_kwargs), input_list)
    p.close()
    p.join()

set_default_output_path(input_path, tag=None, extension=None, only_filename=False, only_basename=False, only_dirname=False)

Utility function to generate a default path to output file or directory based on an input file name and path.

Parameters:

Name Type Description Default
input_path Path

path to input file.

required
tag str

text tag to be added to file name. Defaults to None.

None
extension str

change input file extension with this one. Defaults to None.

None
only_filename bool

output only default filename. Defaults to False.

False
only_basename bool

output only default basename (no extension). Defaults to False.

False
only_dirname bool

output only path to default output directory. Defaults to False.

False

Returns:

Name Type Description
Path Path

a path or name to a default output file.

Source code in src/pynteny/utils.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def set_default_output_path(
    input_path: Path,
    tag: str = None,
    extension: str = None,
    only_filename: bool = False,
    only_basename: bool = False,
    only_dirname: bool = False,
) -> Path:
    """Utility function to generate a default path to output file
    or directory based on an input file name and path.

    Args:
        input_path (Path): path to input file.
        tag (str, optional): text tag to be added to file name. Defaults to None.
        extension (str, optional): change input file extension with this one. Defaults to None.
        only_filename (bool, optional): output only default filename. Defaults to False.
        only_basename (bool, optional): output only default basename (no extension). Defaults to False.
        only_dirname (bool, optional): output only path to default output directory. Defaults to False.

    Returns:
        Path: a path or name to a default output file.
    """
    input_path = Path(input_path)
    dirname = input_path.parent
    fname, ext = input_path.stem, input_path.suffix
    if extension is None:
        extension = ext
    if tag is None:
        tag = ""
    default_file = f"{fname}{tag}{extension}"
    if only_basename:
        return Path(fname)
    if only_filename:
        return Path(default_file)
    if only_dirname:
        return Path(dirname)
    else:
        return Path(os.path.abspath(os.path.join(dirname, default_file)))

terminal_execute(command_str, suppress_shell_output=False, work_dir=None, return_output=False)

Execute given command in terminal through Python.

Parameters:

Name Type Description Default
command_str str

terminal command to be executed.

required
suppress_shell_output bool

suppress shell output. Defaults to False.

False
work_dir Path

change working directory. Defaults to None.

None
return_output bool

whether to return execution output. Defaults to False.

False

Returns:

Type Description
subprocess.STDOUT

subprocess.STDOUT: subprocess output.

Source code in src/pynteny/utils.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def terminal_execute(
    command_str: str,
    suppress_shell_output=False,
    work_dir: Path = None,
    return_output=False,
) -> subprocess.STDOUT:
    """Execute given command in terminal through Python.

    Args:
        command_str (str): terminal command to be executed.
        suppress_shell_output (bool, optional): suppress shell output. Defaults to False.
        work_dir (Path, optional): change working directory. Defaults to None.
        return_output (bool, optional): whether to return execution output. Defaults to False.

    Returns:
        subprocess.STDOUT: subprocess output.
    """
    if suppress_shell_output:
        suppress_code = ">/dev/null 2>&1"
        command_str = f"{command_str} {suppress_code}"
    output = subprocess.run(
        command_str, shell=True, cwd=work_dir, capture_output=return_output
    )
    return output

Module wrappers

Simple CLI wrappers to several tools

Simple CLI wrapper to hmmsearch or hmmscan.

Parameters:

Name Type Description Default
hmm_model Path

path to profile HMM to be used.

required
input_fasta Path

path to fasta containing sequence database to be searched.

required
output_file Path

path to prodigal output table file. Defaults to None.

None
method str

either 'hmmsearch' or 'hmmscan'. Defaults to 'hmmsearch'.

'hmmsearch'
n_processes int

maximum number of threads. Defaults to all minus one.

required
additional_args str

a string containing additional arguments to hmmsearch/scan. Defaults to None.

None
Source code in src/pynteny/wrappers.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def run_HMM_search(
    hmm_model: Path,
    input_fasta: Path,
    output_file: Path = None,
    method: str = "hmmsearch",
    processes: int = None,
    additional_args: str = None,
) -> None:
    """Simple CLI wrapper to hmmsearch or hmmscan.

    Args:
        hmm_model (Path): path to profile HMM to be used.
        input_fasta (Path): path to fasta containing sequence database to be searched.
        output_file (Path, optional): path to prodigal output table file. Defaults to None.
        method (str, optional): either 'hmmsearch' or 'hmmscan'. Defaults to 'hmmsearch'.
        n_processes (int, optional): maximum number of threads. Defaults to all minus one.
        additional_args (str, optional): a string containing additional arguments to
            hmmsearch/scan. Defaults to None.
    """
    if processes is None:
        processes = os.cpu_count() - 1
    if output_file is None:
        output_file = set_default_output_path(input_fasta, "_hmmer_hits", ".txt")
    if additional_args is not None:
        args_str = additional_args
    else:
        args_str = ""
    cmd_str = (
        f"{method} --tblout {output_file} {args_str} --cpu {processes} "
        f"{hmm_model} {input_fasta}"
    )
    terminal_execute(cmd_str, suppress_shell_output=True)

run_prodigal(input_file, output_file=None, output_dir=None, output_format='fasta', metagenome=True, additional_args=None)

Simple CLI wrapper to prodigal.

Parameters:

Name Type Description Default
input_file Path

path to input fasta file with nucleotide sequences.

required
output_file Path

path to output file containing translated peptides. Defaults to None.

None
output_dir Path

path to output directory (all prodigal output files). Defaults to None.

None
output_format str

either 'gbk' or 'fasta'. Defaults to 'fasta'.

'fasta'
metagenome bool

whether input fasta correspond to a metagenomic sample. Defaults to False.

True
additional_args str

a string containing additional arguments to prodigal. Defaults to None.

None
Source code in src/pynteny/wrappers.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def run_prodigal(
    input_file: Path,
    output_file: Path = None,
    output_dir: Path = None,
    output_format: str = "fasta",
    metagenome: bool = True,
    additional_args: str = None,
):
    """Simple CLI wrapper to prodigal.

    Args:
        input_file (Path): path to input fasta file with nucleotide sequences.
        output_file (Path, optional): path to output file containing translated peptides.
            Defaults to None.
        output_dir (Path, optional): path to output directory (all prodigal output files).
            Defaults to None.
        output_format (str, optional): either 'gbk' or 'fasta'. Defaults to 'fasta'.
        metagenome (bool, optional): whether input fasta correspond to a metagenomic sample.
            Defaults to False.
        additional_args (str, optional): a string containing additional arguments to prodigal.
            Defaults to None.
    """
    input_file = Path(input_file)
    if metagenome:
        procedure = "meta"
    else:
        procedure = "single"
    if output_dir is None:
        output_dir = Path(input_file.parent)
    else:
        output_dir = Path(output_dir)
    if "fasta" in output_format.lower():
        if output_file is None:
            output_file = output_dir / f"{input_file.stem}prodigal_output.faa"
        out_str = f"-a {output_file}"
    else:
        if output_file is None:
            output_file = output_dir / f"{input_file.stem}prodigal_output.gbk"
        out_str = f"-o {output_file}"
    output_file = Path(output_file)
    if additional_args is not None:
        args_str = additional_args
    else:
        args_str = ""
    cmd_str = f"prodigal -i {input_file} -p {procedure} " f"-q {out_str} {args_str}"
    terminal_execute(cmd_str, suppress_shell_output=True)

run_seqkit_nodup(input_fasta, output_fasta=None, export_duplicates=False)

Simpe CLI wrapper to seqkit rmdup to remove sequence duplicates in fasta file.

Parameters:

Name Type Description Default
input_fasta Path

path to input fasta.

required
output_fasta Path

path to output fasta. Defaults to None.

None
export_duplicates bool

whether to export a file containing duplicated sequences. Defaults to False.

False
Source code in src/pynteny/wrappers.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def run_seqkit_nodup(
    input_fasta: Path, output_fasta: Path = None, export_duplicates: bool = False
):
    """Simpe CLI wrapper to seqkit rmdup to remove sequence duplicates
    in fasta file.

    Args:
        input_fasta (Path): path to input fasta.
        output_fasta (Path, optional): path to output fasta. Defaults to None.
        export_duplicates (bool, optional): whether to export a file containing
            duplicated sequences. Defaults to False.
    """
    input_fasta = Path(input_fasta)
    if output_fasta is None:
        output_fasta = set_default_output_path(input_fasta, tag="_no_duplicates")
    else:
        output_fasta = Path(output_fasta)
    if export_duplicates:
        dup_file = set_default_output_path(
            input_fasta, tag="_duplicates", extension=".txt"
        )
        dup_str = f"-D {dup_file}"
    else:
        dup_str = ""
    cmd_str = (
        f"seqkit rmdup {input_fasta} -s {dup_str} -o {output_fasta.as_posix()} --quiet"
    )
    terminal_execute(cmd_str, suppress_shell_output=True)