Skip to content

api

Classes

BedLikeCoords dataclass

Represents the coordinates (only, no refname or strand) that correspond to a zero-based open-ended position.

Source code in prymer/api/span.py
@dataclass(init=True, frozen=True)
class BedLikeCoords:
    """Represents the coordinates (only, no refname or strand) that correspond to a
    zero-based open-ended position."""

    start: int
    end: int

    def __post_init__(self) -> None:
        if self.start < 0:
            raise ValueError(f"Start position must be >=0; received start={self.start}")
        if self.end < self.start:
            raise ValueError(f"End must be >= start; received start={self.start}, end={self.end}")

ClusteredIntervals dataclass

The list of clusters (intervals) and the original source intervals. The source intervals must have the name corresponding to the cluster to which the source interval belongs. Each cluster must envelop ("wholly contain") the intervals associated with the cluster.

Attributes:

Name Type Description
clusters list[Interval]

the clusters that wholly contain one or more source intervals.

intervals list[Interval]

the source intervals, with name corresponding to the name of the associated cluster.

Source code in prymer/api/clustering.py
@dataclass(frozen=True, init=True, slots=True)
class ClusteredIntervals:
    """
    The list of clusters (intervals) and the original source intervals.  The source intervals must
    have the name corresponding to the cluster to which the source interval belongs. Each cluster
    must envelop ("wholly contain") the intervals associated with the cluster.

    Attributes:
        clusters: the clusters that wholly contain one or more source intervals.
        intervals: the source intervals, with name corresponding to the name of the associated
            cluster.
    """

    clusters: list[Interval]
    intervals: list[Interval]

    def __post_init__(self) -> None:
        cluster_names: set[str] = {c.name for c in self.clusters}
        interval_names: set[str] = {i.name for i in self.intervals}
        # Check that the names of clusters are unique
        if len(self.clusters) != len(cluster_names):
            raise ValueError("Cluster names are not unique")
        # Check that every interval has the name of one cluster
        for interval in self.intervals:
            if interval.name not in cluster_names:
                raise ValueError(f"Interval does not belong to a cluster: {interval}")
        # Check that every cluster has at least one interval associated with the cluster
        if len(interval_names) != len(cluster_names):
            raise ValueError("Cluster and interval names differ.")

FileBasedVariantLookup

Bases: ContextManager, VariantLookup

Implementation of VariantLookup that queries against indexed VCF files each time a query is performed. Assumes the index is located adjacent to the VCF file and has the same base name with either a .csi or .tbi suffix.

Example:

>>> with FileBasedVariantLookup([Path("./tests/api/data/miniref.variants.vcf.gz")], min_maf=0.0, include_missing_mafs=False) as lookup:
...     lookup.query(refname="chr2", start=7999, end=8000)
[SimpleVariant(id='complex-variant-sv-1/1', refname='chr2', pos=8000, ref='T', alt='<DEL>', end=8000, variant_type=<VariantType.OTHER: 'OTHER'>, maf=None)]
Source code in prymer/api/variant_lookup.py
class FileBasedVariantLookup(ContextManager, VariantLookup):
    """Implementation of `VariantLookup` that queries against indexed VCF files each time a query is
    performed. Assumes the index is located adjacent to the VCF file and has the same base name with
    either a .csi or .tbi suffix.

    Example:

    ```python
    >>> with FileBasedVariantLookup([Path("./tests/api/data/miniref.variants.vcf.gz")], min_maf=0.0, include_missing_mafs=False) as lookup:
    ...     lookup.query(refname="chr2", start=7999, end=8000)
    [SimpleVariant(id='complex-variant-sv-1/1', refname='chr2', pos=8000, ref='T', alt='<DEL>', end=8000, variant_type=<VariantType.OTHER: 'OTHER'>, maf=None)]

    ```
    """  # noqa: E501

    def __init__(self, vcf_paths: list[Path], min_maf: Optional[float], include_missing_mafs: bool):
        self._readers: list[VariantFile] = []
        super().__init__(
            vcf_paths=vcf_paths, min_maf=min_maf, include_missing_mafs=include_missing_mafs
        )
        if len(vcf_paths) == 0:
            raise ValueError("No VCF paths given to query.")
        for path in vcf_paths:
            if (
                not path.with_suffix(path.suffix + ".csi").is_file()
                and not path.with_suffix(path.suffix + ".tbi").is_file()
            ):
                raise ValueError(f"Cannot perform fetch with missing index file for VCF: {path}")
            open_fh = pysam.VariantFile(str(path))
            self._readers.append(open_fh)

    def __enter__(self) -> "FileBasedVariantLookup":
        """Enter the context manager."""
        return self

    def __exit__(
        self,
        exc_type: Optional[type[BaseException]],
        exc_value: Optional[BaseException],
        traceback: Optional[TracebackType],
    ) -> None:
        """Exit this context manager while closing the underlying VCF handles."""
        self.close()
        return None

    def _query(self, refname: str, start: int, end: int) -> list[SimpleVariant]:
        """Queries variants from the VCFs used by this lookup and returns a `SimpleVariant`."""
        simple_variants: list[SimpleVariant] = []
        for fh, path in zip(self._readers, self.vcf_paths, strict=True):
            if not fh.header.contigs.get(refname):
                _logger.debug(f"Header in VCF file {path} does not contain chromosome {refname}.")
                continue
            #  pysam.fetch is 0-based, half-open
            variants = [variant for variant in fh.fetch(contig=refname, start=start - 1, end=end)]
            simple_variants.extend(self.to_variants(variants, source_vcf=path))
        return sorted(simple_variants, key=lambda x: x.pos)

    def close(self) -> None:
        """Close the underlying VCF file handles."""
        for handle in self._readers:
            handle.close()

Functions

__enter__
__enter__() -> FileBasedVariantLookup

Enter the context manager.

Source code in prymer/api/variant_lookup.py
def __enter__(self) -> "FileBasedVariantLookup":
    """Enter the context manager."""
    return self
__exit__
__exit__(
    exc_type: Optional[type[BaseException]],
    exc_value: Optional[BaseException],
    traceback: Optional[TracebackType],
) -> None

Exit this context manager while closing the underlying VCF handles.

Source code in prymer/api/variant_lookup.py
def __exit__(
    self,
    exc_type: Optional[type[BaseException]],
    exc_value: Optional[BaseException],
    traceback: Optional[TracebackType],
) -> None:
    """Exit this context manager while closing the underlying VCF handles."""
    self.close()
    return None
close
close() -> None

Close the underlying VCF file handles.

Source code in prymer/api/variant_lookup.py
def close(self) -> None:
    """Close the underlying VCF file handles."""
    for handle in self._readers:
        handle.close()

MinOptMax dataclass

Bases: Generic[Numeric]

Stores a minimum, an optimal, and a maximum value (either all ints or all floats).

min must be less than max. opt should be greater than the min value and less than the max value.

Attributes:

Name Type Description
min Numeric

the minimum value

opt Numeric

the optimal value

max Numeric

the maximum value

Raises:

Type Description
ValueError

if min > max

ValueError

if min is not less than opt and opt is not less than max

Source code in prymer/api/minoptmax.py
@dataclass(slots=True, frozen=True, init=True)
class MinOptMax(Generic[Numeric]):
    """Stores a minimum, an optimal, and a maximum value (either all ints or all floats).

    `min` must be less than `max`. `opt` should be greater than the `min`
     value and less than the `max` value.

    Attributes:
        min: the minimum value
        opt: the optimal value
        max: the maximum value


    Raises:
        ValueError: if min > max
        ValueError: if `min` is not less than `opt` and `opt` is not less than `max`
    """

    min: Numeric
    opt: Numeric
    max: Numeric

    def __post_init__(self) -> None:
        dtype = type(self.min)
        if not isinstance(self.max, dtype) or not isinstance(self.opt, dtype):
            raise TypeError(
                "Min, opt, and max must be the same type; "
                f"received min: {dtype}, opt: {type(self.opt)}, max: {type(self.max)}"
            )
        if self.min > self.max:
            raise ValueError(
                f"Min must be no greater than max; received min: {self.min}, max: {self.max}"
            )
        if not (self.min <= self.opt <= self.max):
            raise ValueError(
                "Arguments must satisfy min <= opt <= max: "
                f"received min: {self.min}, opt: {self.opt}, max: {self.max}"
            )

    def __iter__(self) -> Iterator[float]:
        """Returns an iterator of min, opt, and max"""
        return iter([self.min, self.opt, self.max])

    def __str__(self) -> str:
        """Returns a string representation of min, opt, and max"""
        return f"(min:{self.min}, opt:{self.opt}, max:{self.max})"

Functions

__iter__
__iter__() -> Iterator[float]

Returns an iterator of min, opt, and max

Source code in prymer/api/minoptmax.py
def __iter__(self) -> Iterator[float]:
    """Returns an iterator of min, opt, and max"""
    return iter([self.min, self.opt, self.max])
__str__
__str__() -> str

Returns a string representation of min, opt, and max

Source code in prymer/api/minoptmax.py
def __str__(self) -> str:
    """Returns a string representation of min, opt, and max"""
    return f"(min:{self.min}, opt:{self.opt}, max:{self.max})"

Oligo dataclass

Bases: OligoLike, Metric['Oligo']

Stores the properties of the designed oligo.

Oligos can include both single primer and internal probe designs. Primer3 emits information about the specific properties of a primer and/or probe, including the base sequence, melting temperature (tm), and score.

The penalty score of the design is emitted by Primer3 and controlled by the corresponding design parameters. The penalty for a primer is set by the combination of PrimerAndAmpliconParameters and PrimerWeights. A probe penalty is set by ProbeParameters and ProbeWeights.

The span of an Oligo object represents the mapping of the oligo to the genome. Oligo objects can optionally have a tail sequence to prepend to the 5' end of the primer or probe, as well as a name for downstream record keeping.

Oligo objects can also store thermodynamic characteristics of primer and/or probe design. These include the calculated melting temperatures of the most stable hairpin structure, self-, and end- complementarity. These values can be emitted by Primer3.

These thermodynamic characteristics are meant to quantify how likely it is the primer or probe will bind to itself (e.g., instead of the target DNA). A melting temperature close to or greater than the intended melting temperature for target binding indicates the primer or probe is likely to form stable hairpins or dimers, leading to reduced efficiency of the reaction.

Attributes:

Name Type Description
tm float

the calculated melting temperature of the oligo

penalty float

the penalty or score for the oligo

span Span

the mapping of the primer to the genome

name Span

an optional name to use for the primer

tm_homodimer Optional[float]

calculated melting temperature that represents the tendency of an oligo to bind to itself (self-complementarity)

tm_3p_anchored_homodimer Optional[float]

calculated melting temperature that represents the tendency of a primer to bind to the 3'-END of an identical primer

tm_secondary_structure Optional[float]

calculated melting temperature of the oligo hairpin structure

bases Optional[str]

the base sequence of the oligo (excluding any tail)

tail Optional[str]

an optional tail sequence to put on the 5' end of the primer

Source code in prymer/api/oligo.py
@dataclass(frozen=True, init=True, kw_only=True, slots=True)
class Oligo(OligoLike, Metric["Oligo"]):
    """Stores the properties of the designed oligo.

    Oligos can include both single primer and internal probe designs. Primer3 emits information
    about the specific properties of a primer and/or probe, including the base sequence,
    melting temperature (tm), and score.

    The penalty score of the design is emitted by Primer3 and controlled by the corresponding
    design parameters. The penalty for a primer is set by the combination of
    `PrimerAndAmpliconParameters` and `PrimerWeights`.
    A probe penalty is set by `ProbeParameters` and `ProbeWeights`.

    The span of an `Oligo` object represents the mapping of the oligo
    to the genome. `Oligo` objects can optionally have a `tail` sequence to prepend to the 5' end
    of the primer or probe, as well as a `name` for downstream record keeping.

    `Oligo` objects can also store thermodynamic characteristics of primer and/or probe
    design. These include the calculated melting temperatures of the most stable hairpin structure,
    self-, and end- complementarity. These values can be emitted by Primer3.

    These thermodynamic characteristics are meant to quantify how likely it is the primer or probe
    will bind to itself (e.g., instead of the target DNA). A melting temperature close to or greater
    than the intended melting temperature for target binding indicates the primer or probe is
    likely to form stable hairpins or dimers, leading to reduced efficiency of the reaction.

    Attributes:
        tm: the calculated melting temperature of the oligo
        penalty: the penalty or score for the oligo
        span: the mapping of the primer to the genome
        name: an optional name to use for the primer
        tm_homodimer: calculated melting temperature that represents
            the tendency of an oligo to bind to itself (self-complementarity)
        tm_3p_anchored_homodimer: calculated melting temperature that represents
            the tendency of a primer to bind to the 3'-END of an identical primer
        tm_secondary_structure: calculated melting temperature of the oligo hairpin structure
        bases: the base sequence of the oligo (excluding any tail)
        tail: an optional tail sequence to put on the 5' end of the primer

    """

    tm: float
    penalty: float
    span: Span
    tm_homodimer: Optional[float] = None
    tm_3p_anchored_homodimer: Optional[float] = None
    tm_secondary_structure: Optional[float] = None
    bases: Optional[str] = None
    tail: Optional[str] = None

    def __post_init__(self) -> None:
        super(Oligo, self).__post_init__()

    def longest_hp_length(self) -> int:
        """Length of longest homopolymer in the oligo."""
        if self.bases is None:
            return 0
        else:
            return longest_homopolymer_length(self.bases)

    @property
    def length(self) -> int:
        """Length of un-tailed oligo."""
        return self.span.length

    def untailed_length(self) -> int:
        """Length of un-tailed oligo."""
        return self.span.length

    def tailed_length(self) -> int:
        """Length of tailed oligo."""
        return self.span.length if self.tail is None else self.span.length + len(self.tail)

    def longest_dinucleotide_run_length(self) -> int:
        """Number of bases in the longest dinucleotide run in a oligo.

        A dinucleotide run is when length two repeat-unit is repeated. For example,
        TCTC (length = 4) or ACACACACAC (length = 10). If there are no such runs, returns 2
        (or 0 if there are fewer than 2 bases)."""
        return longest_dinucleotide_run_length(self.bases)

    def with_tail(self, tail: str) -> "Oligo":
        """Returns a copy of the oligo with the tail sequence attached."""
        return replace(self, tail=tail)

    def with_name(self, name: str) -> "Oligo":
        """Returns a copy of oligo object with the given name."""
        return replace(self, name=name)

    def bases_with_tail(self) -> Optional[str]:
        """
        Returns the sequence of the oligo prepended by the tail.

        If `tail` is None, only return `bases`.
        """
        if self.tail is None:
            return self.bases
        return f"{self.tail}{self.bases}"

    def to_bed12_row(self) -> str:
        """Returns the BED detail format view:
        https://genome.ucsc.edu/FAQ/FAQformat.html#format1.7"""
        bed_coord = self.span.get_bedlike_coords()
        return "\t".join(
            map(
                str,
                [
                    self.span.refname,  # contig
                    bed_coord.start,  # start
                    bed_coord.end,  # end
                    self.id,  # name
                    500,  # score
                    self.span.strand.value,  # strand
                    bed_coord.start,  # thick start
                    bed_coord.end,  # thick end
                    "100,100,100",  # color
                    1,  # block count
                    f"{self.length}",  # block sizes
                    "0",  # block starts (relative to `start`)
                ],
            )
        )

    def __str__(self) -> str:
        """
        Returns a string representation of this oligo
        """
        # If the bases field is None, replace with MISSING_BASES_STRING
        bases: str = self.bases if self.bases is not None else MISSING_BASES_STRING
        return f"{bases}\t{self.tm}\t{self.penalty}\t{self.span}"

    @classmethod
    def _parsers(cls) -> Dict[type, Callable[[str], Any]]:
        return {
            Span: lambda value: Span.from_string(value),
        }

    @staticmethod
    def compare(this: "Oligo", that: "Oligo", seq_dict: SequenceDictionary) -> int:
        """Compares this oligo to that oligo by their span, ordering references using the given
        sequence dictionary.

        Args:
            this: the first oligo
            that: the second oligo
            seq_dict: the sequence dictionary used to order references

        Returns:
            -1 if this oligo is less than the that oligo, 0 if equal, 1 otherwise
        """
        return Span.compare(this=this.span, that=that.span, seq_dict=seq_dict)

Attributes

length property
length: int

Length of un-tailed oligo.

Functions

__str__
__str__() -> str

Returns a string representation of this oligo

Source code in prymer/api/oligo.py
def __str__(self) -> str:
    """
    Returns a string representation of this oligo
    """
    # If the bases field is None, replace with MISSING_BASES_STRING
    bases: str = self.bases if self.bases is not None else MISSING_BASES_STRING
    return f"{bases}\t{self.tm}\t{self.penalty}\t{self.span}"
bases_with_tail
bases_with_tail() -> Optional[str]

Returns the sequence of the oligo prepended by the tail.

If tail is None, only return bases.

Source code in prymer/api/oligo.py
def bases_with_tail(self) -> Optional[str]:
    """
    Returns the sequence of the oligo prepended by the tail.

    If `tail` is None, only return `bases`.
    """
    if self.tail is None:
        return self.bases
    return f"{self.tail}{self.bases}"
compare staticmethod
compare(
    this: Oligo, that: Oligo, seq_dict: SequenceDictionary
) -> int

Compares this oligo to that oligo by their span, ordering references using the given sequence dictionary.

Parameters:

Name Type Description Default
this Oligo

the first oligo

required
that Oligo

the second oligo

required
seq_dict SequenceDictionary

the sequence dictionary used to order references

required

Returns:

Type Description
int

-1 if this oligo is less than the that oligo, 0 if equal, 1 otherwise

Source code in prymer/api/oligo.py
@staticmethod
def compare(this: "Oligo", that: "Oligo", seq_dict: SequenceDictionary) -> int:
    """Compares this oligo to that oligo by their span, ordering references using the given
    sequence dictionary.

    Args:
        this: the first oligo
        that: the second oligo
        seq_dict: the sequence dictionary used to order references

    Returns:
        -1 if this oligo is less than the that oligo, 0 if equal, 1 otherwise
    """
    return Span.compare(this=this.span, that=that.span, seq_dict=seq_dict)
longest_dinucleotide_run_length
longest_dinucleotide_run_length() -> int

Number of bases in the longest dinucleotide run in a oligo.

A dinucleotide run is when length two repeat-unit is repeated. For example, TCTC (length = 4) or ACACACACAC (length = 10). If there are no such runs, returns 2 (or 0 if there are fewer than 2 bases).

Source code in prymer/api/oligo.py
def longest_dinucleotide_run_length(self) -> int:
    """Number of bases in the longest dinucleotide run in a oligo.

    A dinucleotide run is when length two repeat-unit is repeated. For example,
    TCTC (length = 4) or ACACACACAC (length = 10). If there are no such runs, returns 2
    (or 0 if there are fewer than 2 bases)."""
    return longest_dinucleotide_run_length(self.bases)
longest_hp_length
longest_hp_length() -> int

Length of longest homopolymer in the oligo.

Source code in prymer/api/oligo.py
def longest_hp_length(self) -> int:
    """Length of longest homopolymer in the oligo."""
    if self.bases is None:
        return 0
    else:
        return longest_homopolymer_length(self.bases)
tailed_length
tailed_length() -> int

Length of tailed oligo.

Source code in prymer/api/oligo.py
def tailed_length(self) -> int:
    """Length of tailed oligo."""
    return self.span.length if self.tail is None else self.span.length + len(self.tail)
to_bed12_row
to_bed12_row() -> str

Returns the BED detail format view: https://genome.ucsc.edu/FAQ/FAQformat.html#format1.7

Source code in prymer/api/oligo.py
def to_bed12_row(self) -> str:
    """Returns the BED detail format view:
    https://genome.ucsc.edu/FAQ/FAQformat.html#format1.7"""
    bed_coord = self.span.get_bedlike_coords()
    return "\t".join(
        map(
            str,
            [
                self.span.refname,  # contig
                bed_coord.start,  # start
                bed_coord.end,  # end
                self.id,  # name
                500,  # score
                self.span.strand.value,  # strand
                bed_coord.start,  # thick start
                bed_coord.end,  # thick end
                "100,100,100",  # color
                1,  # block count
                f"{self.length}",  # block sizes
                "0",  # block starts (relative to `start`)
            ],
        )
    )
untailed_length
untailed_length() -> int

Length of un-tailed oligo.

Source code in prymer/api/oligo.py
def untailed_length(self) -> int:
    """Length of un-tailed oligo."""
    return self.span.length
with_name
with_name(name: str) -> Oligo

Returns a copy of oligo object with the given name.

Source code in prymer/api/oligo.py
def with_name(self, name: str) -> "Oligo":
    """Returns a copy of oligo object with the given name."""
    return replace(self, name=name)
with_tail
with_tail(tail: str) -> Oligo

Returns a copy of the oligo with the tail sequence attached.

Source code in prymer/api/oligo.py
def with_tail(self, tail: str) -> "Oligo":
    """Returns a copy of the oligo with the tail sequence attached."""
    return replace(self, tail=tail)

OligoLike dataclass

Bases: ABC

An abstract base class for oligo-like objects, such as individual primers or primer pairs.

Attributes:

Name Type Description
name Optional[str]

an optional name to use for the primer

The id field shall be the 'name' field if supplied, or a generated value based on the location of the primer-like object.

Source code in prymer/api/oligo_like.py
@dataclass(frozen=True, init=True, slots=True)
class OligoLike(ABC):
    """
    An abstract base class for oligo-like objects, such as individual primers or primer pairs.

    Attributes:
        name: an optional name to use for the primer

    The `id` field shall be the 'name' field if supplied, or a generated value based on the
    location of the primer-like object.
    """

    name: Optional[str] = None

    def __post_init__(self) -> None:
        # If supplied, bases must be a non-empty String & the same length as the
        # span length
        if self.bases is not None:
            if len(self.bases) == 0:
                raise ValueError("Bases must not be an empty string")
            elif self.span.length != len(self.bases):
                raise ValueError(
                    "Conflicting lengths: "
                    f"span length={self.span.length},"
                    f" sequence length={len(self.bases)}"
                )

    @property
    @abstractmethod
    def span(self) -> Span:
        """Returns the mapping of the oligo-like object to a genome."""

    @property
    @abstractmethod
    def bases(self) -> Optional[str]:
        """Returns the base sequence of the oligo-like object."""

    @property
    def percent_gc_content(self) -> float:
        """
        The GC of the amplicon sequence in the range 0-100, or zero if there is no amplicon
        sequence.
        """
        if self.bases is None:
            return 0.0
        else:
            return round(gc_content(self.bases) * 100, 3)

    @property
    def id(self) -> str:
        """
        Returns the identifier for the oligo-like object. This shall be the `name`
        if one exists, otherwise a generated value based on the location of the object.
        """
        if self.name is not None:
            return self.name
        else:
            return self.location_string

    @property
    def location_string(self) -> str:
        """Returns a string representation of the location of the oligo-like object."""
        return (
            f"{self.span.refname}_{self.span.start}_"
            + f"{self.span.end}_{self._strand_to_location_string()}"
        )

    @abstractmethod
    def to_bed12_row(self) -> str:
        """
        Formats the oligo-like into 12 tab-separated fields matching the BED 12-column spec.
        See: https://genome.ucsc.edu/FAQ/FAQformat.html#format1
        """

    def _strand_to_location_string(self) -> str:
        """
        Returns a string representation appropriate for location_string of the strand of the
        primer
        """
        match self.span.strand:
            case Strand.POSITIVE:
                return "F"
            case Strand.NEGATIVE:
                return "R"
            case _:  # pragma: no cover
                # Not calculating coverage on this line as it should be impossible to reach
                assert_never(f"Encountered unhandled Strand value: {self.span.strand}")

Attributes

bases abstractmethod property
bases: Optional[str]

Returns the base sequence of the oligo-like object.

id property
id: str

Returns the identifier for the oligo-like object. This shall be the name if one exists, otherwise a generated value based on the location of the object.

location_string property
location_string: str

Returns a string representation of the location of the oligo-like object.

percent_gc_content property
percent_gc_content: float

The GC of the amplicon sequence in the range 0-100, or zero if there is no amplicon sequence.

span abstractmethod property
span: Span

Returns the mapping of the oligo-like object to a genome.

Functions

to_bed12_row abstractmethod
to_bed12_row() -> str

Formats the oligo-like into 12 tab-separated fields matching the BED 12-column spec. See: https://genome.ucsc.edu/FAQ/FAQformat.html#format1

Source code in prymer/api/oligo_like.py
@abstractmethod
def to_bed12_row(self) -> str:
    """
    Formats the oligo-like into 12 tab-separated fields matching the BED 12-column spec.
    See: https://genome.ucsc.edu/FAQ/FAQformat.html#format1
    """

PrimerPair dataclass

Bases: OligoLike

Represents a pair of primers that work together to amplify an amplicon. The coordinates of the amplicon are determined to span from the start of the left primer through the end of the right primer.

Attributes:

Name Type Description
left_primer Oligo

the left primer in the pair

right_primer Oligo

the right primer in the pair

amplicon_sequence Optional[str]

an optional sequence of the expected amplicon

amplicon_tm float

the melting temperature of the expected amplicon

penalty float

the penalty value assigned by primer3 to the primer pair

name float

an optional name to use for the primer pair

Raises:

Type Description
ValueError

if the chromosomes of the left and right primers are not the same

Source code in prymer/api/primer_pair.py
@dataclass(frozen=True, init=True, kw_only=True)
class PrimerPair(OligoLike):
    """
    Represents a pair of primers that work together to amplify an amplicon. The
    coordinates of the amplicon are determined to span from the start of the left
    primer through the end of the right primer.

    Attributes:
        left_primer: the left primer in the pair
        right_primer: the right primer in the pair
        amplicon_sequence: an optional sequence of the expected amplicon
        amplicon_tm: the melting temperature of the expected amplicon
        penalty: the penalty value assigned by primer3 to the primer pair
        name: an optional name to use for the primer pair

    Raises:
        ValueError: if the chromosomes of the left and right primers are not the same
    """

    left_primer: Oligo
    right_primer: Oligo
    amplicon_tm: float
    penalty: float
    amplicon_sequence: Optional[str] = None

    @cached_property
    def amplicon(self) -> Span:
        """Returns the mapping for the amplicon"""
        return self.calculate_amplicon_span(self.left_primer, self.right_primer)

    @property
    def span(self) -> Span:
        """Returns the mapping for the amplicon"""
        return self.amplicon

    @property
    def bases(self) -> Optional[str]:
        """Returns the bases of the amplicon sequence"""
        return self.amplicon_sequence

    @property
    def length(self) -> int:
        """Returns the length of the amplicon"""
        return self.amplicon.length

    @property
    def inner(self) -> Span:
        """
        Returns the inner region of the amplicon (not including the primers). I.e. the region of
        the genome covered by the primer pair, without the primer regions.  If the primers
        overlap, then the inner mapping is the midpoint at where they overlap
        """
        if self.left_primer.span.overlaps(self.right_primer.span):
            # Use a flooring division as these values are all ints
            midpoint = (self.left_primer.span.end + self.right_primer.span.start) // 2
            return replace(
                self.left_primer.span,
                start=midpoint,
                end=midpoint,
            )
        else:
            return replace(
                self.left_primer.span,
                start=self.left_primer.span.end + 1,
                end=self.right_primer.span.start - 1,
            )

    def with_tails(self, left_tail: str, right_tail: str) -> "PrimerPair":
        """
        Returns a copy of the primer pair where the left and right primers are tailed.

        Args:
            left_tail: The tail to add to the left primer
            right_tail: The tail to add to the right primer

        Returns:
            A copy of the primer pair with the tail(s) added to the primers
        """
        return replace(
            self,
            left_primer=self.left_primer.with_tail(left_tail),
            right_primer=self.right_primer.with_tail(right_tail),
        )

    def with_names(self, pp_name: str, lp_name: str, rp_name: str) -> "PrimerPair":
        """
        Returns a copy of the primer pair with names assigned to the primer pair,
        left primer, and right primer.

        Args:
            pp_name: The optional name of the primer pair
            lp_name: The optional name of the left primer
            rp_name: The optional name of the right primer

        Returns:
            A copy of the primer pair with the provided names assigned
        """
        return replace(
            self,
            name=pp_name,
            left_primer=self.left_primer.with_name(lp_name),
            right_primer=self.right_primer.with_name(rp_name),
        )

    def to_bed12_row(self) -> str:
        """
        Returns the BED detail format view: https://genome.ucsc.edu/FAQ/FAQformat.html#format1.7.

        NB: BED is 0-based and Prymer is 1-based, so we need to convert
        """
        block_sizes = ",".join(
            [
                f"{gm.length}"
                for gm in [
                    self.left_primer.span,
                    self.inner,
                    self.right_primer.span,
                ]
            ]
        )

        block_starts = ",".join(
            [
                f"{self.amplicon.get_offset(gm.start)}"
                for gm in [
                    self.left_primer.span,
                    self.inner,
                    self.right_primer.span,
                ]
            ],
        )
        bed_like_coords = self.span.get_bedlike_coords()
        return "\t".join(
            map(
                str,
                [
                    self.span.refname,  # contig
                    bed_like_coords.start,  # start
                    bed_like_coords.end,  # end
                    self.id,  # name
                    500,  # score
                    self.span.strand.value,  # strand
                    bed_like_coords.start,  # thick start
                    bed_like_coords.end,  # thick end
                    "100,100,100",  # color
                    3,  # block count
                    block_sizes,
                    block_starts,  # relative to `start`
                ],
            )
        )

    def __iter__(self) -> Iterator[Oligo]:
        """Returns an iterator of left and right primers"""
        return iter([self.left_primer, self.right_primer])

    def __str__(self) -> str:
        """Returns a string representation of the primer pair"""
        sequence = self.amplicon_sequence if self.amplicon_sequence else MISSING_BASES_STRING
        return (
            f"{self.left_primer}\t{self.right_primer}\t{sequence}\t"
            + f"{self.amplicon_tm}\t{self.penalty}"
        )

    @staticmethod
    def calculate_amplicon_span(left_primer: Oligo, right_primer: Oligo) -> Span:
        """
        Calculates the amplicon Span from the left and right primers.

        Args:
            left_primer: the left primer for the amplicon
            right_primer: the right primer for the amplicon

        Returns:
            a Span starting at the first base of the left primer and ending at the last base of
            the right primer

        Raises:
            ValueError: If `left_primer` and `right_primer` have different reference names.
            ValueError: If `left_primer` doesn't start before the right primer.
            ValueError: If `right_primer` ends before `left_primer`.
        """
        # Require that `left_primer` and `right_primer` both map to the same reference sequence
        if left_primer.span.refname != right_primer.span.refname:
            raise ValueError(
                "Left and right primers are on different references. "
                f"Left primer ref: {left_primer.span.refname}. "
                f"Right primer ref: {right_primer.span.refname}"
            )

        # Require that the left primer starts before the right primer
        if left_primer.span.start > right_primer.span.start:
            raise ValueError(
                "Left primer does not start before the right primer. "
                f"Left primer span: {left_primer.span}, "
                f"Right primer span: {right_primer.span}"
            )

        # Require that the left primer starts before the right primer
        if right_primer.span.end < left_primer.span.end:
            raise ValueError(
                "Right primer ends before left primer ends. "
                f"Left primer span: {left_primer.span}, "
                f"Right primer span: {right_primer.span}"
            )

        return Span(left_primer.span.refname, left_primer.span.start, right_primer.span.end)

    @staticmethod
    def compare(
        this: "PrimerPair",
        that: "PrimerPair",
        seq_dict: SequenceDictionary,
        by_amplicon: bool = True,
    ) -> int:
        """Compares this primer pair to that primer pair by their span, ordering references using
        the given sequence dictionary.

        Args:
            this: the first primer pair
            that: the second primer pair
            seq_dict: the sequence dictionary used to order references
            by_amplicon: ture to compare using the amplicon property on a primer pair, false to
                compare first using the left primer then the right primer

        Returns:
            -1 if this primer pair is less than the that primer pair, 0 if equal, 1 otherwise
        """
        if by_amplicon:
            return Span.compare(this=this.amplicon, that=that.amplicon, seq_dict=seq_dict)
        else:
            retval = Oligo.compare(this=this.left_primer, that=that.left_primer, seq_dict=seq_dict)
            if retval == 0:
                retval = Oligo.compare(
                    this=this.right_primer, that=that.right_primer, seq_dict=seq_dict
                )
            return retval

Attributes

amplicon cached property
amplicon: Span

Returns the mapping for the amplicon

bases property
bases: Optional[str]

Returns the bases of the amplicon sequence

inner property
inner: Span

Returns the inner region of the amplicon (not including the primers). I.e. the region of the genome covered by the primer pair, without the primer regions. If the primers overlap, then the inner mapping is the midpoint at where they overlap

length property
length: int

Returns the length of the amplicon

span property
span: Span

Returns the mapping for the amplicon

Functions

__iter__
__iter__() -> Iterator[Oligo]

Returns an iterator of left and right primers

Source code in prymer/api/primer_pair.py
def __iter__(self) -> Iterator[Oligo]:
    """Returns an iterator of left and right primers"""
    return iter([self.left_primer, self.right_primer])
__str__
__str__() -> str

Returns a string representation of the primer pair

Source code in prymer/api/primer_pair.py
def __str__(self) -> str:
    """Returns a string representation of the primer pair"""
    sequence = self.amplicon_sequence if self.amplicon_sequence else MISSING_BASES_STRING
    return (
        f"{self.left_primer}\t{self.right_primer}\t{sequence}\t"
        + f"{self.amplicon_tm}\t{self.penalty}"
    )
calculate_amplicon_span staticmethod
calculate_amplicon_span(
    left_primer: Oligo, right_primer: Oligo
) -> Span

Calculates the amplicon Span from the left and right primers.

Parameters:

Name Type Description Default
left_primer Oligo

the left primer for the amplicon

required
right_primer Oligo

the right primer for the amplicon

required

Returns:

Type Description
Span

a Span starting at the first base of the left primer and ending at the last base of

Span

the right primer

Raises:

Type Description
ValueError

If left_primer and right_primer have different reference names.

ValueError

If left_primer doesn't start before the right primer.

ValueError

If right_primer ends before left_primer.

Source code in prymer/api/primer_pair.py
@staticmethod
def calculate_amplicon_span(left_primer: Oligo, right_primer: Oligo) -> Span:
    """
    Calculates the amplicon Span from the left and right primers.

    Args:
        left_primer: the left primer for the amplicon
        right_primer: the right primer for the amplicon

    Returns:
        a Span starting at the first base of the left primer and ending at the last base of
        the right primer

    Raises:
        ValueError: If `left_primer` and `right_primer` have different reference names.
        ValueError: If `left_primer` doesn't start before the right primer.
        ValueError: If `right_primer` ends before `left_primer`.
    """
    # Require that `left_primer` and `right_primer` both map to the same reference sequence
    if left_primer.span.refname != right_primer.span.refname:
        raise ValueError(
            "Left and right primers are on different references. "
            f"Left primer ref: {left_primer.span.refname}. "
            f"Right primer ref: {right_primer.span.refname}"
        )

    # Require that the left primer starts before the right primer
    if left_primer.span.start > right_primer.span.start:
        raise ValueError(
            "Left primer does not start before the right primer. "
            f"Left primer span: {left_primer.span}, "
            f"Right primer span: {right_primer.span}"
        )

    # Require that the left primer starts before the right primer
    if right_primer.span.end < left_primer.span.end:
        raise ValueError(
            "Right primer ends before left primer ends. "
            f"Left primer span: {left_primer.span}, "
            f"Right primer span: {right_primer.span}"
        )

    return Span(left_primer.span.refname, left_primer.span.start, right_primer.span.end)
compare staticmethod
compare(
    this: PrimerPair,
    that: PrimerPair,
    seq_dict: SequenceDictionary,
    by_amplicon: bool = True,
) -> int

Compares this primer pair to that primer pair by their span, ordering references using the given sequence dictionary.

Parameters:

Name Type Description Default
this PrimerPair

the first primer pair

required
that PrimerPair

the second primer pair

required
seq_dict SequenceDictionary

the sequence dictionary used to order references

required
by_amplicon bool

ture to compare using the amplicon property on a primer pair, false to compare first using the left primer then the right primer

True

Returns:

Type Description
int

-1 if this primer pair is less than the that primer pair, 0 if equal, 1 otherwise

Source code in prymer/api/primer_pair.py
@staticmethod
def compare(
    this: "PrimerPair",
    that: "PrimerPair",
    seq_dict: SequenceDictionary,
    by_amplicon: bool = True,
) -> int:
    """Compares this primer pair to that primer pair by their span, ordering references using
    the given sequence dictionary.

    Args:
        this: the first primer pair
        that: the second primer pair
        seq_dict: the sequence dictionary used to order references
        by_amplicon: ture to compare using the amplicon property on a primer pair, false to
            compare first using the left primer then the right primer

    Returns:
        -1 if this primer pair is less than the that primer pair, 0 if equal, 1 otherwise
    """
    if by_amplicon:
        return Span.compare(this=this.amplicon, that=that.amplicon, seq_dict=seq_dict)
    else:
        retval = Oligo.compare(this=this.left_primer, that=that.left_primer, seq_dict=seq_dict)
        if retval == 0:
            retval = Oligo.compare(
                this=this.right_primer, that=that.right_primer, seq_dict=seq_dict
            )
        return retval
to_bed12_row
to_bed12_row() -> str

Returns the BED detail format view: https://genome.ucsc.edu/FAQ/FAQformat.html#format1.7.

NB: BED is 0-based and Prymer is 1-based, so we need to convert

Source code in prymer/api/primer_pair.py
def to_bed12_row(self) -> str:
    """
    Returns the BED detail format view: https://genome.ucsc.edu/FAQ/FAQformat.html#format1.7.

    NB: BED is 0-based and Prymer is 1-based, so we need to convert
    """
    block_sizes = ",".join(
        [
            f"{gm.length}"
            for gm in [
                self.left_primer.span,
                self.inner,
                self.right_primer.span,
            ]
        ]
    )

    block_starts = ",".join(
        [
            f"{self.amplicon.get_offset(gm.start)}"
            for gm in [
                self.left_primer.span,
                self.inner,
                self.right_primer.span,
            ]
        ],
    )
    bed_like_coords = self.span.get_bedlike_coords()
    return "\t".join(
        map(
            str,
            [
                self.span.refname,  # contig
                bed_like_coords.start,  # start
                bed_like_coords.end,  # end
                self.id,  # name
                500,  # score
                self.span.strand.value,  # strand
                bed_like_coords.start,  # thick start
                bed_like_coords.end,  # thick end
                "100,100,100",  # color
                3,  # block count
                block_sizes,
                block_starts,  # relative to `start`
            ],
        )
    )
with_names
with_names(
    pp_name: str, lp_name: str, rp_name: str
) -> PrimerPair

Returns a copy of the primer pair with names assigned to the primer pair, left primer, and right primer.

Parameters:

Name Type Description Default
pp_name str

The optional name of the primer pair

required
lp_name str

The optional name of the left primer

required
rp_name str

The optional name of the right primer

required

Returns:

Type Description
PrimerPair

A copy of the primer pair with the provided names assigned

Source code in prymer/api/primer_pair.py
def with_names(self, pp_name: str, lp_name: str, rp_name: str) -> "PrimerPair":
    """
    Returns a copy of the primer pair with names assigned to the primer pair,
    left primer, and right primer.

    Args:
        pp_name: The optional name of the primer pair
        lp_name: The optional name of the left primer
        rp_name: The optional name of the right primer

    Returns:
        A copy of the primer pair with the provided names assigned
    """
    return replace(
        self,
        name=pp_name,
        left_primer=self.left_primer.with_name(lp_name),
        right_primer=self.right_primer.with_name(rp_name),
    )
with_tails
with_tails(left_tail: str, right_tail: str) -> PrimerPair

Returns a copy of the primer pair where the left and right primers are tailed.

Parameters:

Name Type Description Default
left_tail str

The tail to add to the left primer

required
right_tail str

The tail to add to the right primer

required

Returns:

Type Description
PrimerPair

A copy of the primer pair with the tail(s) added to the primers

Source code in prymer/api/primer_pair.py
def with_tails(self, left_tail: str, right_tail: str) -> "PrimerPair":
    """
    Returns a copy of the primer pair where the left and right primers are tailed.

    Args:
        left_tail: The tail to add to the left primer
        right_tail: The tail to add to the right primer

    Returns:
        A copy of the primer pair with the tail(s) added to the primers
    """
    return replace(
        self,
        left_primer=self.left_primer.with_tail(left_tail),
        right_primer=self.right_primer.with_tail(right_tail),
    )

SimpleVariant dataclass

Represents a variant from a given genomic range.

The reference and alternate alleles typically match the VCF from which the variant originated, differing when the reference and alternate alleles share common leading bases. For example, given a reference allele of CACACA and alternate allele CACA, the reference and alternate alleles have their common leading bases removed except for a single "anchor base" (i.e. CAC is removed), yielding the reference allele ACA and alternate allele A. This will also change the reference variant position (pos property), which is defined as the position of the first base in the reference allele.

The variant_type is derived from the reference and alternate alleles after the above (see VariantType.from_alleles()).

Furthermore, the variant end position (end property) is defined for insertions as the base after the insertion. For all other variant types, this is the last base in the reference allele.

Attributes:

Name Type Description
id str

the variant identifier

refname str

the reference sequence name

pos int

the start position of the variant (1-based inclusive)

end int

the end position of the variant (1-based inclusive)

ref str

the reference base

alt str

the alternate base

variant_type VariantType

the variant type

maf Optional[float]

optionally, the minor allele frequency

Source code in prymer/api/variant_lookup.py
@dataclass(slots=True, frozen=True, init=True)
class SimpleVariant:
    """Represents a variant from a given genomic range.

    The reference and alternate alleles typically match the VCF from which the variant originated,
    differing when the reference and alternate alleles share common leading bases.  For example,
    given a reference allele of `CACACA` and alternate allele `CACA`, the reference and alternate
    alleles have their common leading bases removed except for a single "anchor base" (i.e. `CAC` is
    removed), yielding the reference allele `ACA` and alternate allele `A`.  This will also change
    the reference variant position (`pos` property), which is defined as the position of the first
    base in the reference allele.

    The `variant_type` is derived from the reference and alternate alleles after the above (see
    [`VariantType.from_alleles()`][prymer.api.variant_lookup.VariantType.from_alleles]).

    Furthermore, the variant end position (`end` property) is defined for insertions as the base
    after the insertion.  For all other variant types, this is the last base in the reference
    allele.

    Attributes:
        id: the variant identifier
        refname: the reference sequence name
        pos: the start position of the variant (1-based inclusive)
        end: the end position of the variant (1-based inclusive)
        ref: the reference base
        alt: the alternate base
        variant_type: the variant type
        maf: optionally, the minor allele frequency
    """

    id: str
    refname: str
    pos: int
    ref: str
    alt: str
    end: int = field(init=False)
    variant_type: VariantType = field(init=False)
    maf: Optional[float] = None

    def __post_init__(self) -> None:
        # Simplify the ref/alt alleles
        # find the leading prefix of the ref and alternate allele
        prefix_length = 0
        for r, a in zip(self.ref, self.alt, strict=False):
            if r != a:
                break
            prefix_length += 1
        if prefix_length > 1:  # > 1 to always keep an anchor base
            offset = prefix_length - 1
            object.__setattr__(self, "ref", self.ref[offset:])
            object.__setattr__(self, "alt", self.alt[offset:])
            object.__setattr__(self, "pos", self.pos + offset)

        # Derive the end and variant_type from the ref and alt alleles.
        object.__setattr__(
            self, "variant_type", VariantType.from_alleles(ref=self.ref, alt=self.alt)
        )

        # span all reference bases (e.g. SNP -> 1bp, MNV -> len(MNV), DEL -> len(DEL),
        end: int = self.pos + len(self.ref) - 1
        if self.variant_type == VariantType.INSERTION:
            # span the base preceding and following the insertion, so add a base at the end
            end += 1
        object.__setattr__(self, "end", end)

    def __str__(self) -> str:
        """Compact String representation of the variant that includes all relevant info."""
        maf_string = f"{self.maf:.4f}" if self.maf is not None else "NA"
        return f"{self.id}@{self.refname}:{self.pos}[{self.ref}/{self.alt} {maf_string}]"

    def to_span(self) -> Span:
        """Creates a Span object that represents the genomic span of this variant.
        Insertions will span the base preceding and following the inserted bases."""
        return Span(refname=self.refname, start=self.pos, end=self.end, strand=Strand.POSITIVE)

    @staticmethod
    def build(variant: VariantRecord) -> list["SimpleVariant"]:
        """Convert `pysam.VariantRecord` to `SimpleVariant`. Only the first ALT allele is used."""
        maf = calc_maf_from_filter(variant)
        simple_variants: list[SimpleVariant] = []

        for i, alt in enumerate(variant.alts, start=1):
            simple_variant = SimpleVariant(
                id=f"{variant.id}-{i}/{len(variant.alts)}",
                refname=variant.chrom,
                pos=variant.pos,
                ref=variant.ref,
                alt=alt,
                maf=maf,
            )
            simple_variants.append(simple_variant)

        return simple_variants

Functions

__str__
__str__() -> str

Compact String representation of the variant that includes all relevant info.

Source code in prymer/api/variant_lookup.py
def __str__(self) -> str:
    """Compact String representation of the variant that includes all relevant info."""
    maf_string = f"{self.maf:.4f}" if self.maf is not None else "NA"
    return f"{self.id}@{self.refname}:{self.pos}[{self.ref}/{self.alt} {maf_string}]"
build staticmethod
build(variant: VariantRecord) -> list[SimpleVariant]

Convert pysam.VariantRecord to SimpleVariant. Only the first ALT allele is used.

Source code in prymer/api/variant_lookup.py
@staticmethod
def build(variant: VariantRecord) -> list["SimpleVariant"]:
    """Convert `pysam.VariantRecord` to `SimpleVariant`. Only the first ALT allele is used."""
    maf = calc_maf_from_filter(variant)
    simple_variants: list[SimpleVariant] = []

    for i, alt in enumerate(variant.alts, start=1):
        simple_variant = SimpleVariant(
            id=f"{variant.id}-{i}/{len(variant.alts)}",
            refname=variant.chrom,
            pos=variant.pos,
            ref=variant.ref,
            alt=alt,
            maf=maf,
        )
        simple_variants.append(simple_variant)

    return simple_variants
to_span
to_span() -> Span

Creates a Span object that represents the genomic span of this variant. Insertions will span the base preceding and following the inserted bases.

Source code in prymer/api/variant_lookup.py
def to_span(self) -> Span:
    """Creates a Span object that represents the genomic span of this variant.
    Insertions will span the base preceding and following the inserted bases."""
    return Span(refname=self.refname, start=self.pos, end=self.end, strand=Strand.POSITIVE)

Span dataclass

Bases: Metric['Span']

Represents the span of some sequence (target, primer, amplicon, etc.) to a genome.

Attributes:

Name Type Description
refname str

name of a reference sequence or contig or chromosome

start int

the 1-based start position of the Span (inclusive)

end int

the 1-based end position of the Span (inclusive)

strand Strand

the strand of the Span (POSITIVE by default)

Source code in prymer/api/span.py
@dataclass(init=True, frozen=True, eq=True)
class Span(Metric["Span"]):
    """Represents the span of some sequence (target, primer, amplicon, etc.) to a genome.

    Attributes:
        refname: name of a reference sequence or contig or chromosome
        start: the 1-based start position of the Span (inclusive)
        end: the 1-based end position of the Span (inclusive)
        strand: the strand of the Span (POSITIVE by default)
    """

    refname: str
    start: int
    end: int
    strand: Strand = Strand.POSITIVE

    def __post_init__(self) -> None:
        if self.refname.strip() == "":
            raise ValueError("Ref name must be populated")
        if self.start < 1:
            raise ValueError(f"Start position must be >=1; received start={self.start}")
        if self.end < self.start:
            raise ValueError(f"End must be >= start; received start={self.start}, end={self.end}")

    @classmethod
    def from_string(cls, line: str) -> "Span":
        """Creates a Span from an input string.

        The input string should be delimited by a colon (":"). If strand information is missing
        after splitting the string, the strand is assumed to be positive.

        Args:
            line: input string

        Returns:
            Span: the Span object generated from the input string

        Raises:
            ValueError: if there are not at least 2 colon-delimited fields in string

        Example:
            >>> span_string = "chr1:1-10:+"
            >>> Span.from_string(span_string)
            Span(refname='chr1', start=1, end=10, strand=<Strand.POSITIVE: '+'>)
        """
        parts = line.strip().split(":")
        if len(parts) == 3:
            refname, range_, strand_symbol = parts
            try:
                strand = Strand(strand_symbol[0])
            except ValueError as e:
                raise ValueError(
                    "Did not find valid strand information; " f"received {strand_symbol}"
                ) from e
        elif len(parts) == 2:
            refname, range_ = parts
            strand = Strand.POSITIVE
        else:
            raise ValueError(
                f"Could not parse line (expected 2 or 3 colon-delimited fields), received {line}"
            )
        try:
            start, end = map(int, range_.split("-"))
        except ValueError as e:
            raise ValueError(
                f"Could not cast positional information to int; received {range_}"
            ) from e
        return Span(refname=refname, start=start, end=end, strand=strand)

    def __str__(self) -> str:
        return f"{self.refname}:{self.start}-{self.end}:{self.strand}"

    @property
    def length(self) -> int:
        """Get the length of the span/interval."""
        return self.end - self.start + 1

    def overlaps(self, other: "Span") -> bool:
        """Returns True if the spans overlap one another, False otherwise."""
        return (
            (self.refname == other.refname)
            and (self.start <= other.end)
            and (self.end >= other.start)
        )

    def length_of_overlap_with(self, other: "Span") -> int:
        """Returns the length of the region which overlaps the other span, or zero if there is
        no overlap"""
        overlap: int = 0
        if self.overlaps(other=other):
            start = max(self.start, other.start)
            end = min(self.end, other.end)
            overlap = end - start + 1
        return overlap

    def get_offset(self, position: int) -> int:
        """Returns a coordinate position that is relative to the current span.

        Args:
            position: the (genomic) position to provide relative coordinates for (1-based)

        Returns:
            A 0-based position relative to the Span

        Raises:
            ValueError: if the provided position is outside the coordinates of the Span

        Example:

        ```python
        >>> test_span = Span(refname="chr1", start=10, end=20, strand=Strand.POSITIVE)
        >>> print(test_span.get_offset(11))
        1

        ```
        """
        if position < self.start or position > self.end:
            raise ValueError(f"Position not within this span: {position}")
        return position - self.start

    def get_bedlike_coords(self) -> BedLikeCoords:
        """Returns the zero-based, open-ended coordinates (start and end) of the
         (1-based) Span, for using in bed-like formats.

         Returns:
             a BedLikeCoords instance containing the 0-based and open-ended coordinates of
             the Span

         Example:

        ```python
        >>> test_span = Span(refname="chr1", start=10, end=20, strand=Strand.POSITIVE)
        >>> print(test_span.get_bedlike_coords())
        BedLikeCoords(start=9, end=20)

        ```
        """
        return BedLikeCoords(self.start - 1, self.end)

    def get_subspan(
        self, offset: int, subspan_length: int, strand: Optional[Strand] = None
    ) -> "Span":
        """Returns a Span with absolute coords from an offset and a length.
        The strand of the new Span will be strand if given, otherwise it will be
        self.strand.

        Args:
            offset: the difference between the start position of the subspan and that
                of the current span (0-based)
            subspan_length: the length of the new span
            strand: the strand of the new span (if given)

        Returns:
            a new Span with objective coords derived from input offset and length

        Raises:
            ValueError: if the offset is less than 0
            ValueError: if the length of the subspan is 0 or less
            ValueError: if the offset is greater than the length of the source span

        Example:

        ```python
        >>> span = Span(refname="chr1", start=1000,end=2000, strand=Strand.POSITIVE)
        >>> test = span.get_subspan(offset=5, subspan_length=10)
        >>> print(test)
        chr1:1005-1014:+
        >>> print(test.length) #length = end - start + 1
        10

        ```
        """

        if offset < 0:
            raise ValueError(f"Offset must be > 0, received start={offset}")
        if subspan_length <= 0:
            raise ValueError(
                f"Length of a subspan must be positive, received length={subspan_length}"
            )
        if offset >= self.length:
            raise ValueError(
                "Offset of a relative subspan must be < source span length, "
                f"received offset={offset}, length={subspan_length}"
            )
        if offset + subspan_length > self.length:
            raise ValueError(
                f"End of sub-span must be within source span: source start={self.start}, "
                f"offset={offset}, sub-span length={subspan_length}"
            )

        absolute_start = self.start + offset
        absolute_end = coordmath.get_closed_end(absolute_start, subspan_length)
        strand = self.strand if strand is None else strand
        return replace(self, start=absolute_start, end=absolute_end, strand=strand)

    def contains(self, comparison: Self) -> bool:
        """Checks whether one span is wholly contained within another span.
        Returns `True` if one span contains the other, otherwise returns `False`.
        Does not use strand information (a span is considered to contain the other
        if they are adjacent in position but on opposite strands)."""
        return (
            self.refname == comparison.refname
            and self.start <= comparison.start
            and comparison.end <= self.end
        )

    def _to_tuple(self, seq_dict: SequenceDictionary) -> Tuple[int, int, int, int]:
        """Returns a tuple of reference index, start position, end position, and strand
        (0 forward, 1 reverse)"""
        ref_index = seq_dict.by_name(self.refname).index
        strand = 0 if self.strand == Strand.POSITIVE else 1
        return ref_index, self.start, self.end, strand

    @staticmethod
    def compare(this: "Span", that: "Span", seq_dict: SequenceDictionary) -> int:
        """Compares this span to that span, ordering references using the given sequence dictionary.

        Args:
            this: the first span
            that: the second span
            seq_dict: the sequence dictionary used to order references

        Returns:
            -1 if the first span is less than the second span, 0 if equal, 1 otherwise
        """
        left_tuple = this._to_tuple(seq_dict=seq_dict)
        right_tuple = that._to_tuple(seq_dict=seq_dict)
        if left_tuple < right_tuple:
            return -1
        elif left_tuple > right_tuple:
            return 1
        else:
            return 0

Attributes

length property
length: int

Get the length of the span/interval.

Functions

compare staticmethod
compare(
    this: Span, that: Span, seq_dict: SequenceDictionary
) -> int

Compares this span to that span, ordering references using the given sequence dictionary.

Parameters:

Name Type Description Default
this Span

the first span

required
that Span

the second span

required
seq_dict SequenceDictionary

the sequence dictionary used to order references

required

Returns:

Type Description
int

-1 if the first span is less than the second span, 0 if equal, 1 otherwise

Source code in prymer/api/span.py
@staticmethod
def compare(this: "Span", that: "Span", seq_dict: SequenceDictionary) -> int:
    """Compares this span to that span, ordering references using the given sequence dictionary.

    Args:
        this: the first span
        that: the second span
        seq_dict: the sequence dictionary used to order references

    Returns:
        -1 if the first span is less than the second span, 0 if equal, 1 otherwise
    """
    left_tuple = this._to_tuple(seq_dict=seq_dict)
    right_tuple = that._to_tuple(seq_dict=seq_dict)
    if left_tuple < right_tuple:
        return -1
    elif left_tuple > right_tuple:
        return 1
    else:
        return 0
contains
contains(comparison: Self) -> bool

Checks whether one span is wholly contained within another span. Returns True if one span contains the other, otherwise returns False. Does not use strand information (a span is considered to contain the other if they are adjacent in position but on opposite strands).

Source code in prymer/api/span.py
def contains(self, comparison: Self) -> bool:
    """Checks whether one span is wholly contained within another span.
    Returns `True` if one span contains the other, otherwise returns `False`.
    Does not use strand information (a span is considered to contain the other
    if they are adjacent in position but on opposite strands)."""
    return (
        self.refname == comparison.refname
        and self.start <= comparison.start
        and comparison.end <= self.end
    )
from_string classmethod
from_string(line: str) -> Span

Creates a Span from an input string.

The input string should be delimited by a colon (":"). If strand information is missing after splitting the string, the strand is assumed to be positive.

Parameters:

Name Type Description Default
line str

input string

required

Returns:

Name Type Description
Span Span

the Span object generated from the input string

Raises:

Type Description
ValueError

if there are not at least 2 colon-delimited fields in string

Example

span_string = "chr1:1-10:+" Span.from_string(span_string) Span(refname='chr1', start=1, end=10, strand=)

Source code in prymer/api/span.py
@classmethod
def from_string(cls, line: str) -> "Span":
    """Creates a Span from an input string.

    The input string should be delimited by a colon (":"). If strand information is missing
    after splitting the string, the strand is assumed to be positive.

    Args:
        line: input string

    Returns:
        Span: the Span object generated from the input string

    Raises:
        ValueError: if there are not at least 2 colon-delimited fields in string

    Example:
        >>> span_string = "chr1:1-10:+"
        >>> Span.from_string(span_string)
        Span(refname='chr1', start=1, end=10, strand=<Strand.POSITIVE: '+'>)
    """
    parts = line.strip().split(":")
    if len(parts) == 3:
        refname, range_, strand_symbol = parts
        try:
            strand = Strand(strand_symbol[0])
        except ValueError as e:
            raise ValueError(
                "Did not find valid strand information; " f"received {strand_symbol}"
            ) from e
    elif len(parts) == 2:
        refname, range_ = parts
        strand = Strand.POSITIVE
    else:
        raise ValueError(
            f"Could not parse line (expected 2 or 3 colon-delimited fields), received {line}"
        )
    try:
        start, end = map(int, range_.split("-"))
    except ValueError as e:
        raise ValueError(
            f"Could not cast positional information to int; received {range_}"
        ) from e
    return Span(refname=refname, start=start, end=end, strand=strand)
get_bedlike_coords
get_bedlike_coords() -> BedLikeCoords

Returns the zero-based, open-ended coordinates (start and end) of the (1-based) Span, for using in bed-like formats.

Returns: a BedLikeCoords instance containing the 0-based and open-ended coordinates of the Span

Example:

>>> test_span = Span(refname="chr1", start=10, end=20, strand=Strand.POSITIVE)
>>> print(test_span.get_bedlike_coords())
BedLikeCoords(start=9, end=20)
Source code in prymer/api/span.py
def get_bedlike_coords(self) -> BedLikeCoords:
    """Returns the zero-based, open-ended coordinates (start and end) of the
     (1-based) Span, for using in bed-like formats.

     Returns:
         a BedLikeCoords instance containing the 0-based and open-ended coordinates of
         the Span

     Example:

    ```python
    >>> test_span = Span(refname="chr1", start=10, end=20, strand=Strand.POSITIVE)
    >>> print(test_span.get_bedlike_coords())
    BedLikeCoords(start=9, end=20)

    ```
    """
    return BedLikeCoords(self.start - 1, self.end)
get_offset
get_offset(position: int) -> int

Returns a coordinate position that is relative to the current span.

Parameters:

Name Type Description Default
position int

the (genomic) position to provide relative coordinates for (1-based)

required

Returns:

Type Description
int

A 0-based position relative to the Span

Raises:

Type Description
ValueError

if the provided position is outside the coordinates of the Span

Example:

>>> test_span = Span(refname="chr1", start=10, end=20, strand=Strand.POSITIVE)
>>> print(test_span.get_offset(11))
1
Source code in prymer/api/span.py
def get_offset(self, position: int) -> int:
    """Returns a coordinate position that is relative to the current span.

    Args:
        position: the (genomic) position to provide relative coordinates for (1-based)

    Returns:
        A 0-based position relative to the Span

    Raises:
        ValueError: if the provided position is outside the coordinates of the Span

    Example:

    ```python
    >>> test_span = Span(refname="chr1", start=10, end=20, strand=Strand.POSITIVE)
    >>> print(test_span.get_offset(11))
    1

    ```
    """
    if position < self.start or position > self.end:
        raise ValueError(f"Position not within this span: {position}")
    return position - self.start
get_subspan
get_subspan(
    offset: int,
    subspan_length: int,
    strand: Optional[Strand] = None,
) -> Span

Returns a Span with absolute coords from an offset and a length. The strand of the new Span will be strand if given, otherwise it will be self.strand.

Parameters:

Name Type Description Default
offset int

the difference between the start position of the subspan and that of the current span (0-based)

required
subspan_length int

the length of the new span

required
strand Optional[Strand]

the strand of the new span (if given)

None

Returns:

Type Description
Span

a new Span with objective coords derived from input offset and length

Raises:

Type Description
ValueError

if the offset is less than 0

ValueError

if the length of the subspan is 0 or less

ValueError

if the offset is greater than the length of the source span

Example:

>>> span = Span(refname="chr1", start=1000,end=2000, strand=Strand.POSITIVE)
>>> test = span.get_subspan(offset=5, subspan_length=10)
>>> print(test)
chr1:1005-1014:+
>>> print(test.length) #length = end - start + 1
10
Source code in prymer/api/span.py
def get_subspan(
    self, offset: int, subspan_length: int, strand: Optional[Strand] = None
) -> "Span":
    """Returns a Span with absolute coords from an offset and a length.
    The strand of the new Span will be strand if given, otherwise it will be
    self.strand.

    Args:
        offset: the difference between the start position of the subspan and that
            of the current span (0-based)
        subspan_length: the length of the new span
        strand: the strand of the new span (if given)

    Returns:
        a new Span with objective coords derived from input offset and length

    Raises:
        ValueError: if the offset is less than 0
        ValueError: if the length of the subspan is 0 or less
        ValueError: if the offset is greater than the length of the source span

    Example:

    ```python
    >>> span = Span(refname="chr1", start=1000,end=2000, strand=Strand.POSITIVE)
    >>> test = span.get_subspan(offset=5, subspan_length=10)
    >>> print(test)
    chr1:1005-1014:+
    >>> print(test.length) #length = end - start + 1
    10

    ```
    """

    if offset < 0:
        raise ValueError(f"Offset must be > 0, received start={offset}")
    if subspan_length <= 0:
        raise ValueError(
            f"Length of a subspan must be positive, received length={subspan_length}"
        )
    if offset >= self.length:
        raise ValueError(
            "Offset of a relative subspan must be < source span length, "
            f"received offset={offset}, length={subspan_length}"
        )
    if offset + subspan_length > self.length:
        raise ValueError(
            f"End of sub-span must be within source span: source start={self.start}, "
            f"offset={offset}, sub-span length={subspan_length}"
        )

    absolute_start = self.start + offset
    absolute_end = coordmath.get_closed_end(absolute_start, subspan_length)
    strand = self.strand if strand is None else strand
    return replace(self, start=absolute_start, end=absolute_end, strand=strand)
length_of_overlap_with
length_of_overlap_with(other: Span) -> int

Returns the length of the region which overlaps the other span, or zero if there is no overlap

Source code in prymer/api/span.py
def length_of_overlap_with(self, other: "Span") -> int:
    """Returns the length of the region which overlaps the other span, or zero if there is
    no overlap"""
    overlap: int = 0
    if self.overlaps(other=other):
        start = max(self.start, other.start)
        end = min(self.end, other.end)
        overlap = end - start + 1
    return overlap
overlaps
overlaps(other: Span) -> bool

Returns True if the spans overlap one another, False otherwise.

Source code in prymer/api/span.py
def overlaps(self, other: "Span") -> bool:
    """Returns True if the spans overlap one another, False otherwise."""
    return (
        (self.refname == other.refname)
        and (self.start <= other.end)
        and (self.end >= other.start)
    )

Strand

Bases: StrEnum

Represents the strand of a span to the genome.

Source code in prymer/api/span.py
@unique
class Strand(StrEnum):
    """Represents the strand of a span to the genome."""

    POSITIVE = "+"
    NEGATIVE = "-"

VariantLookup

Bases: ABC

Base class to represent a variant from a given genomic range.

Attributes:

Name Type Description
vcf_paths list[Path]

the paths to the source VCFs for the variants

min_maf Optional[float]

optionally, return only variants with at least this minor allele frequency

include_missing_mafs bool

when filtering variants with a minor allele frequency, True to include variants with no annotated minor allele frequency, otherwise False. If no minor allele frequency is given, then this parameter does nothing.

Source code in prymer/api/variant_lookup.py
class VariantLookup(ABC):
    """Base class to represent a variant from a given genomic range.

    Attributes:
        vcf_paths: the paths to the source VCFs for the variants
        min_maf: optionally, return only variants with at least this minor allele frequency
        include_missing_mafs: when filtering variants with a minor allele frequency,
            `True` to include variants with no annotated minor allele frequency, otherwise `False`.
            If no minor allele frequency is given, then this parameter does nothing.
    """

    def __init__(
        self,
        vcf_paths: list[Path],
        min_maf: Optional[float],
        include_missing_mafs: bool,
    ) -> None:
        self.vcf_paths: list[Path] = vcf_paths
        self.min_maf: Optional[float] = min_maf
        self.include_missing_mafs: bool = include_missing_mafs

    @final
    def query(
        self,
        refname: str,
        start: int,
        end: int,
        maf: Optional[float] = None,
        include_missing_mafs: bool = None,
    ) -> list[SimpleVariant]:
        """Gets all variants that overlap a genomic range, optionally filter based on MAF threshold.

        Args:
            refname: the reference name
            start: the 1-based start position
            end: the 1-based end position
            maf: the MAF of the variant
            include_missing_mafs: whether to include variants with a missing MAF
                (overrides self.include_missing_mafs)
        """
        if maf is None:
            maf = self.min_maf
        if include_missing_mafs is None:
            include_missing_mafs = self.include_missing_mafs

        variants = self._query(refname=refname, start=start, end=end)
        if len(variants) == 0:
            _logger.debug(f"No variants extracted from region of interest: {refname}:{start}-{end}")
        if maf is None or maf <= 0.0:
            return variants
        elif include_missing_mafs:  # return variants with a MAF above threshold or missing
            return [v for v in variants if (v.maf is None or v.maf >= maf)]
        else:
            return [v for v in variants if v.maf is not None and v.maf >= maf]

    @staticmethod
    def to_variants(
        variants: list[VariantRecord], source_vcf: Optional[Path] = None
    ) -> list[SimpleVariant]:
        """Converts a list of `pysam.VariantRecords` to a list of `SimpleVariants` for ease of use.
        Filters variants based on their FILTER status, and sorts by start position.

        Args:
            variants: the variants to convert
            source_vcf: the optional path to the source VCF, used only for debug messages

        Returns:
            a list of `SimpleVariants`, one per alternate allele per variant.
        """
        simple_vars = []
        for variant in variants:
            if (
                "PASS" in list(variant.filter) or len(list(variant.filter)) == 0
            ):  # if passing or empty filters
                simple_variants = SimpleVariant.build(variant)
                if any(v.variant_type == VariantType.OTHER for v in simple_variants):
                    _logger.debug(
                        f"Input VCF file {source_vcf} may contain complex variants: {variant}"
                    )
                simple_vars.extend(simple_variants)
        return sorted(simple_vars, key=lambda v: (v.pos, v.id))

    @abstractmethod
    def _query(self, refname: str, start: int, end: int) -> list[SimpleVariant]:
        """Subclasses must implement this method."""

Functions

query
query(
    refname: str,
    start: int,
    end: int,
    maf: Optional[float] = None,
    include_missing_mafs: bool = None,
) -> list[SimpleVariant]

Gets all variants that overlap a genomic range, optionally filter based on MAF threshold.

Parameters:

Name Type Description Default
refname str

the reference name

required
start int

the 1-based start position

required
end int

the 1-based end position

required
maf Optional[float]

the MAF of the variant

None
include_missing_mafs bool

whether to include variants with a missing MAF (overrides self.include_missing_mafs)

None
Source code in prymer/api/variant_lookup.py
@final
def query(
    self,
    refname: str,
    start: int,
    end: int,
    maf: Optional[float] = None,
    include_missing_mafs: bool = None,
) -> list[SimpleVariant]:
    """Gets all variants that overlap a genomic range, optionally filter based on MAF threshold.

    Args:
        refname: the reference name
        start: the 1-based start position
        end: the 1-based end position
        maf: the MAF of the variant
        include_missing_mafs: whether to include variants with a missing MAF
            (overrides self.include_missing_mafs)
    """
    if maf is None:
        maf = self.min_maf
    if include_missing_mafs is None:
        include_missing_mafs = self.include_missing_mafs

    variants = self._query(refname=refname, start=start, end=end)
    if len(variants) == 0:
        _logger.debug(f"No variants extracted from region of interest: {refname}:{start}-{end}")
    if maf is None or maf <= 0.0:
        return variants
    elif include_missing_mafs:  # return variants with a MAF above threshold or missing
        return [v for v in variants if (v.maf is None or v.maf >= maf)]
    else:
        return [v for v in variants if v.maf is not None and v.maf >= maf]
to_variants staticmethod
to_variants(
    variants: list[VariantRecord],
    source_vcf: Optional[Path] = None,
) -> list[SimpleVariant]

Converts a list of pysam.VariantRecords to a list of SimpleVariants for ease of use. Filters variants based on their FILTER status, and sorts by start position.

Parameters:

Name Type Description Default
variants list[VariantRecord]

the variants to convert

required
source_vcf Optional[Path]

the optional path to the source VCF, used only for debug messages

None

Returns:

Type Description
list[SimpleVariant]

a list of SimpleVariants, one per alternate allele per variant.

Source code in prymer/api/variant_lookup.py
@staticmethod
def to_variants(
    variants: list[VariantRecord], source_vcf: Optional[Path] = None
) -> list[SimpleVariant]:
    """Converts a list of `pysam.VariantRecords` to a list of `SimpleVariants` for ease of use.
    Filters variants based on their FILTER status, and sorts by start position.

    Args:
        variants: the variants to convert
        source_vcf: the optional path to the source VCF, used only for debug messages

    Returns:
        a list of `SimpleVariants`, one per alternate allele per variant.
    """
    simple_vars = []
    for variant in variants:
        if (
            "PASS" in list(variant.filter) or len(list(variant.filter)) == 0
        ):  # if passing or empty filters
            simple_variants = SimpleVariant.build(variant)
            if any(v.variant_type == VariantType.OTHER for v in simple_variants):
                _logger.debug(
                    f"Input VCF file {source_vcf} may contain complex variants: {variant}"
                )
            simple_vars.extend(simple_variants)
    return sorted(simple_vars, key=lambda v: (v.pos, v.id))

VariantOverlapDetector

Bases: VariantLookup

Implements VariantLookup by reading the entire VCF into memory and loading the resulting Variants into an OverlapDetector.

Source code in prymer/api/variant_lookup.py
class VariantOverlapDetector(VariantLookup):
    """Implements `VariantLookup` by reading the entire VCF into memory and loading the resulting
    Variants into an `OverlapDetector`."""

    def __init__(self, vcf_paths: list[Path], min_maf: Optional[float], include_missing_mafs: bool):
        super().__init__(
            vcf_paths=vcf_paths, min_maf=min_maf, include_missing_mafs=include_missing_mafs
        )
        self._overlap_detector: OverlapDetector[_VariantInterval] = OverlapDetector()

        if len(vcf_paths) == 0:
            raise ValueError("No VCF paths given to query.")

        for path in vcf_paths:
            if (
                not path.with_suffix(path.suffix + ".csi").is_file()
                and not path.with_suffix(path.suffix + ".tbi").is_file()
            ):
                raise ValueError(f"Cannot perform fetch with missing index file for VCF: {path}")

            with reader(path) as fh:
                variant_intervals = iter(
                    _VariantInterval.build(simple_variant)
                    for variant in fh
                    for simple_variant in self.to_variants([variant], source_vcf=path)
                )
                self._overlap_detector.add_all(variant_intervals)

    def _query(self, refname: str, start: int, end: int) -> list[SimpleVariant]:
        """Queries variants from the VCFs used by this lookup."""
        query = Interval(
            refname=refname, start=start - 1, end=end
        )  # interval is half-open, 0-based
        overlapping_variants = (
            variant_interval.variant
            for variant_interval in self._overlap_detector.get_overlaps(query)
        )
        return sorted(overlapping_variants, key=lambda v: (v.pos, v.id))

VariantType

Bases: UppercaseStrEnum

Represents the type of variant.

Source code in prymer/api/variant_lookup.py
@unique
class VariantType(UppercaseStrEnum):
    """Represents the type of variant."""

    SNP = auto()
    MNV = auto()
    INSERTION = auto()
    DELETION = auto()
    OTHER = auto()

    @staticmethod
    def from_alleles(ref: str, alt: str) -> "VariantType":
        """Builds a variant type from the given reference and alternate allele.

        Args:
            ref: the reference allele
            alt: the alternate allele

        Returns:
            the variant type
        """
        variant_type: VariantType
        if "<" in alt:
            variant_type = VariantType.OTHER
        elif (len(ref) == 1) and (len(alt) == 1):
            variant_type = VariantType.SNP
        elif len(ref) == len(alt):
            variant_type = VariantType.MNV
        elif len(ref) < len(alt):
            variant_type = VariantType.INSERTION
        elif len(ref) > len(alt):
            variant_type = VariantType.DELETION
        else:
            raise ValueError(f"Could not determine variant type for ref `{ref}` and alt `{alt}`")
        return variant_type

Functions

from_alleles staticmethod
from_alleles(ref: str, alt: str) -> VariantType

Builds a variant type from the given reference and alternate allele.

Parameters:

Name Type Description Default
ref str

the reference allele

required
alt str

the alternate allele

required

Returns:

Type Description
VariantType

the variant type

Source code in prymer/api/variant_lookup.py
@staticmethod
def from_alleles(ref: str, alt: str) -> "VariantType":
    """Builds a variant type from the given reference and alternate allele.

    Args:
        ref: the reference allele
        alt: the alternate allele

    Returns:
        the variant type
    """
    variant_type: VariantType
    if "<" in alt:
        variant_type = VariantType.OTHER
    elif (len(ref) == 1) and (len(alt) == 1):
        variant_type = VariantType.SNP
    elif len(ref) == len(alt):
        variant_type = VariantType.MNV
    elif len(ref) < len(alt):
        variant_type = VariantType.INSERTION
    elif len(ref) > len(alt):
        variant_type = VariantType.DELETION
    else:
        raise ValueError(f"Could not determine variant type for ref `{ref}` and alt `{alt}`")
    return variant_type

Functions

build_primer_pairs

build_primer_pairs(
    left_primers: Sequence[Oligo],
    right_primers: Sequence[Oligo],
    target: Span,
    amplicon_sizes: MinOptMax[int],
    amplicon_tms: MinOptMax[float],
    max_heterodimer_tm: Optional[float],
    weights: PrimerAndAmpliconWeights,
    fasta_path: Path,
) -> Iterator[PrimerPair]

Builds primer pairs from individual left and primers.

Only primer pairs that meet the requirements for amplicon sizes and tms will be returned. In addition, if max_heterodimer_tm is provided then primer pairs with a heterodimer tm exceeding the maximum will also be discarded.

Parameters:

Name Type Description Default
left_primers Sequence[Oligo]

the left primers

required
right_primers Sequence[Oligo]

the right primers

required
target Span

the genome mapping for the target

required
amplicon_sizes MinOptMax[int]

minimum, optimal, and maximum amplicon sizes (lengths)

required
amplicon_tms MinOptMax[float]

minimum, optimal, and maximum amplicon Tms

required
max_heterodimer_tm Optional[float]

if supplied, heterodimer Tms will be calculated for primer pairs, and those exceeding the maximum Tm will be discarded

required
weights PrimerAndAmpliconWeights

the set of penalty weights

required
fasta_path Path

the path to the FASTA file from which the amplicon sequence will be retrieved.

required

Returns:

Type Description
Iterator[PrimerPair]

An iterator over all the valid primer pairs, sorted by primer pair penalty.

Iterator[PrimerPair]

Primer pairs with smaller penalties are returned first.

Source code in prymer/api/picking.py
def build_primer_pairs(  # noqa: C901
    left_primers: Sequence[Oligo],
    right_primers: Sequence[Oligo],
    target: Span,
    amplicon_sizes: MinOptMax[int],
    amplicon_tms: MinOptMax[float],
    max_heterodimer_tm: Optional[float],
    weights: PrimerAndAmpliconWeights,
    fasta_path: Path,
) -> Iterator[PrimerPair]:
    """Builds primer pairs from individual left and primers.

    Only primer pairs that meet the requirements for amplicon sizes and tms will be returned.
    In addition, if `max_heterodimer_tm` is provided then primer pairs with a heterodimer tm
    exceeding the maximum will also be discarded.

    Args:
        left_primers: the left primers
        right_primers: the right primers
        target: the genome mapping for the target
        amplicon_sizes: minimum, optimal, and maximum amplicon sizes (lengths)
        amplicon_tms: minimum, optimal, and maximum amplicon Tms
        max_heterodimer_tm: if supplied, heterodimer Tms will be calculated for primer pairs,
            and those exceeding the maximum Tm will be discarded
        weights: the set of penalty weights
        fasta_path: the path to the FASTA file from which the amplicon sequence will be retrieved.

    Returns:
        An iterator over all the valid primer pairs, sorted by primer pair penalty.
        Primer pairs with smaller penalties are returned first.
    """
    # Short circuit if we have no left primers or no right primers
    if not any(left_primers) or not any(right_primers):
        return

    if any(p.span.refname != target.refname for p in left_primers):
        raise ValueError("Left primers exist on different reference than target.")

    if any(p.span.refname != target.refname for p in right_primers):
        raise ValueError("Right primers exist on different reference than target.")

    # Sort the left and right primers
    left_primers = sorted(left_primers, key=lambda p: p.span.start)
    right_primers = sorted(right_primers, key=lambda p: p.span.end)

    # Grab the sequence we'll use to fill in the amplicon sequence
    with FastaFile(f"{fasta_path}") as fasta:
        region_start = min(p.span.start for p in left_primers)
        region_end = max(p.span.end for p in right_primers)
        bases = fasta.fetch(target.refname, region_start - 1, region_end)

    # Each tuple is left_idx, right_idx, penalty, tm
    pairings: list[Tuple[int, int, float, float]] = []

    # generate all the primer pairs that don't violate hard size and Tm constraints
    first_right_primer_idx = 0

    # Nested loops over indices are used here so that we can skip potentially large chunks of
    # the cartesian product, based on the fact that we're sorting the left and right primers.
    # Two things are relied upon:
    #   1. If we encounter a left/right combo that either has the right primer leftward of the
    #      left primer _or_ generates a too-short amplicon, the neither that right primer nor
    #      any previous right primer can make a valid combination with any subsequent left primer.
    #   2. If we encounter a left/right combo that generates a too-large amplicon, then no
    #      subsequent right-primer can make a valid combination with that left primer
    for i in range(0, len(left_primers)):
        for j in range(first_right_primer_idx, len(right_primers)):
            lp = left_primers[i]
            rp = right_primers[j]

            # If the right primer isn't "to the right" of the left primer, move on
            if rp.span.start < lp.span.start or lp.span.end > rp.span.end:
                first_right_primer_idx = max(first_right_primer_idx, j+1)
                continue

            amp_span = PrimerPair.calculate_amplicon_span(lp, rp)

            if amp_span.length < amplicon_sizes.min:
                first_right_primer_idx = max(first_right_primer_idx, j+1)
                continue

            if amp_span.length > amplicon_sizes.max:
                break  # break in this case because all subsequent rps will yield longer amplicons

            # Since the amplicon span and the region_start are both 1-based, the minuend
            # becomes a zero-based offset
            amp_bases = bases[amp_span.start - region_start : amp_span.end - region_start + 1]
            amp_tm = calculate_long_seq_tm(amp_bases)

            if amp_tm < amplicon_tms.min or amp_tm > amplicon_tms.max:
                continue

            penalty = score(
                left_primer=lp,
                right_primer=rp,
                amplicon=amp_span,
                amplicon_tm=amp_tm,
                amplicon_sizes=amplicon_sizes,
                amplicon_tms=amplicon_tms,
                weights=weights,
            )

            pairings.append((i, j, penalty, amp_tm))

    # Sort by the penalty, ascending
    pairings.sort(key=lambda tup: tup[2])

    with NtThermoAlign() as ntthal:
        for i, j, penalty, tm in pairings:
            lp = left_primers[i]
            rp = right_primers[j]

            if max_heterodimer_tm is not None:
                if ntthal.duplex_tm(lp.bases, rp.bases) > max_heterodimer_tm:
                    continue

            amp_bases = bases[lp.span.start - region_start : rp.span.end - region_start + 1]

            pp = PrimerPair(
                left_primer=lp,
                right_primer=rp,
                amplicon_sequence=amp_bases,
                amplicon_tm=tm,
                penalty=penalty,
            )

            yield pp

cached

cached(
    vcf_paths: list[Path],
    min_maf: float,
    include_missing_mafs: bool = False,
) -> VariantOverlapDetector

Constructs a VariantLookup that caches all variants in memory for fast lookup. Appropriate for small VCFs.

Source code in prymer/api/variant_lookup.py
def cached(
    vcf_paths: list[Path], min_maf: float, include_missing_mafs: bool = False
) -> VariantOverlapDetector:
    """Constructs a `VariantLookup` that caches all variants in memory for fast lookup.
    Appropriate for small VCFs."""
    return VariantOverlapDetector(
        vcf_paths=vcf_paths, min_maf=min_maf, include_missing_mafs=include_missing_mafs
    )

cluster_intervals

cluster_intervals(
    intervals: list[Interval], max_size: int
) -> ClusteredIntervals

Cluster a list of intervals into intervals that overlap the given intervals and are not larger than max_size.

Implements a greedy algorithm for hierarchical clustering, merging subsequent intervals (from a sorted list) as long as the maximal size is respected. Each "cluster" is replaced by an interval that spans it, and the algorithm terminates when it can no longer merge anything without creating a cluster that is larger than max_size.

Parameters:

Name Type Description Default
intervals list[Interval]

The intervals to cluster.

required
max_size int

The maximum size (in bp) of the resulting clusters.

required

Returns:

Type Description
ClusteredIntervals

A named tuple (clusters, intervals), where clusters contains one (named) interval per

ClusteredIntervals

cluster, defining the region spanned by the cluster, and intervals contains the original

ClusteredIntervals

set of intervals, each adorned with a name that agrees with that of a cluster in

ClusteredIntervals

clusters that wholly contains it.

Raises:

Type Description
ValueError

If any of the input intervals are larger than max_size.

Source code in prymer/api/clustering.py
def cluster_intervals(
    intervals: list[Interval],
    max_size: int,
) -> ClusteredIntervals:
    """
    Cluster a list of intervals into intervals that overlap the given
    intervals and are not larger than `max_size`.

    Implements a greedy algorithm for hierarchical clustering, merging subsequent intervals
    (from a sorted list) as long as the maximal size is respected.
    Each "cluster" is replaced by an interval that spans it, and the algorithm terminates
    when it can no longer merge anything without creating a cluster that is larger than `max_size`.

    Args:
        intervals: The intervals to cluster.
        max_size: The maximum size (in bp) of the resulting clusters.

    Returns:
        A named tuple (`clusters`, `intervals`), where `clusters` contains one (named) interval per
        cluster, defining the region spanned by the cluster, and `intervals` contains the original
        set of intervals, each adorned with a `name` that agrees with that of a cluster in
        `clusters` that wholly contains it.

    Raises:
        ValueError: If any of the input intervals are larger than `max_size`.
    """

    intervals_by_refname: Dict[str, list[Interval]] = {
        key: list(group) for key, group in itertools.groupby(intervals, key=lambda x: x.refname)
    }
    # import ipdb; ipdb.set_trace()
    # iterate over the refnames and call _cluster_in_contig() per refname.

    # need to store this in a list since we iterate over it twice.
    per_refname_clusters_and_intervals = [
        x
        for x in (
            _cluster_in_contig(intervals_in_refname, max_size)
            for intervals_in_refname in intervals_by_refname.values()
        )
    ]

    # flatten the clusters and intervals
    clusters = [
        cluster
        for refname_result in per_refname_clusters_and_intervals
        for cluster in refname_result.clusters
    ]

    intervals_out = [
        interval
        for refname_results in per_refname_clusters_and_intervals
        for interval in refname_results.intervals
    ]

    return ClusteredIntervals(clusters=clusters, intervals=intervals_out)

disk_based

disk_based(
    vcf_paths: list[Path],
    min_maf: float,
    include_missing_mafs: bool = False,
) -> FileBasedVariantLookup

Constructs a VariantLookup that queries indexed VCFs on disk for each lookup.

Appropriate for large VCFs.

Example:

>>> with disk_based([Path("./tests/api/data/miniref.variants.vcf.gz")], min_maf=0.0) as lookup:
...     lookup.query(refname="chr2", start=7999, end=8000)
[SimpleVariant(id='complex-variant-sv-1/1', refname='chr2', pos=8000, ref='T', alt='<DEL>', end=8000, variant_type=<VariantType.OTHER: 'OTHER'>, maf=None)]
Source code in prymer/api/variant_lookup.py
def disk_based(
    vcf_paths: list[Path], min_maf: float, include_missing_mafs: bool = False
) -> FileBasedVariantLookup:
    """Constructs a `VariantLookup` that queries indexed VCFs on disk for each lookup.

    Appropriate for large VCFs.

    Example:

    ```python
    >>> with disk_based([Path("./tests/api/data/miniref.variants.vcf.gz")], min_maf=0.0) as lookup:
    ...     lookup.query(refname="chr2", start=7999, end=8000)
    [SimpleVariant(id='complex-variant-sv-1/1', refname='chr2', pos=8000, ref='T', alt='<DEL>', end=8000, variant_type=<VariantType.OTHER: 'OTHER'>, maf=None)]

    ```
    """  # noqa: E501
    return FileBasedVariantLookup(
        vcf_paths=vcf_paths, min_maf=min_maf, include_missing_mafs=include_missing_mafs
    )

Modules

clustering

Methods for merging intervals into "clusters"

This module contains utility functions related to clustering intervals into larger intervals.

There is currently one public method available:

  • cluster_intervals() -- clusters a list of intervals into a set of intervals that cover the input intervals, and are not larger than an input max_size. each original interval will be wholly covered by at least one cluster. The name attribute of each cluster will be set, and the original intervals are returned with a name attribute that matches that of a cluster that wholly contains it.
Examples
>>> from prymer.api.clustering import cluster_intervals
>>> from pybedlite.overlap_detector import Interval
>>> intervals = [Interval("chr1", 1, 2), Interval("chr1", 3, 4)]
>>> cluster_intervals(intervals, 10)
ClusteredIntervals(clusters=[Interval(refname='chr1', start=1, end=4, negative=False, name='chr1:1-4')], intervals=[Interval(refname='chr1', start=1, end=2, negative=False, name='chr1:1-4'), Interval(refname='chr1', start=3, end=4, negative=False, name='chr1:1-4')])
>>> cluster_intervals(intervals, 2)
ClusteredIntervals(clusters=[Interval(refname='chr1', start=1, end=2, negative=False, name='chr1:1-2'), Interval(refname='chr1', start=3, end=4, negative=False, name='chr1:3-4')], intervals=[Interval(refname='chr1', start=1, end=2, negative=False, name='chr1:1-2'), Interval(refname='chr1', start=3, end=4, negative=False, name='chr1:3-4')])

Classes

ClusteredIntervals dataclass

The list of clusters (intervals) and the original source intervals. The source intervals must have the name corresponding to the cluster to which the source interval belongs. Each cluster must envelop ("wholly contain") the intervals associated with the cluster.

Attributes:

Name Type Description
clusters list[Interval]

the clusters that wholly contain one or more source intervals.

intervals list[Interval]

the source intervals, with name corresponding to the name of the associated cluster.

Source code in prymer/api/clustering.py
@dataclass(frozen=True, init=True, slots=True)
class ClusteredIntervals:
    """
    The list of clusters (intervals) and the original source intervals.  The source intervals must
    have the name corresponding to the cluster to which the source interval belongs. Each cluster
    must envelop ("wholly contain") the intervals associated with the cluster.

    Attributes:
        clusters: the clusters that wholly contain one or more source intervals.
        intervals: the source intervals, with name corresponding to the name of the associated
            cluster.
    """

    clusters: list[Interval]
    intervals: list[Interval]

    def __post_init__(self) -> None:
        cluster_names: set[str] = {c.name for c in self.clusters}
        interval_names: set[str] = {i.name for i in self.intervals}
        # Check that the names of clusters are unique
        if len(self.clusters) != len(cluster_names):
            raise ValueError("Cluster names are not unique")
        # Check that every interval has the name of one cluster
        for interval in self.intervals:
            if interval.name not in cluster_names:
                raise ValueError(f"Interval does not belong to a cluster: {interval}")
        # Check that every cluster has at least one interval associated with the cluster
        if len(interval_names) != len(cluster_names):
            raise ValueError("Cluster and interval names differ.")

Functions

cluster_intervals
cluster_intervals(
    intervals: list[Interval], max_size: int
) -> ClusteredIntervals

Cluster a list of intervals into intervals that overlap the given intervals and are not larger than max_size.

Implements a greedy algorithm for hierarchical clustering, merging subsequent intervals (from a sorted list) as long as the maximal size is respected. Each "cluster" is replaced by an interval that spans it, and the algorithm terminates when it can no longer merge anything without creating a cluster that is larger than max_size.

Parameters:

Name Type Description Default
intervals list[Interval]

The intervals to cluster.

required
max_size int

The maximum size (in bp) of the resulting clusters.

required

Returns:

Type Description
ClusteredIntervals

A named tuple (clusters, intervals), where clusters contains one (named) interval per

ClusteredIntervals

cluster, defining the region spanned by the cluster, and intervals contains the original

ClusteredIntervals

set of intervals, each adorned with a name that agrees with that of a cluster in

ClusteredIntervals

clusters that wholly contains it.

Raises:

Type Description
ValueError

If any of the input intervals are larger than max_size.

Source code in prymer/api/clustering.py
def cluster_intervals(
    intervals: list[Interval],
    max_size: int,
) -> ClusteredIntervals:
    """
    Cluster a list of intervals into intervals that overlap the given
    intervals and are not larger than `max_size`.

    Implements a greedy algorithm for hierarchical clustering, merging subsequent intervals
    (from a sorted list) as long as the maximal size is respected.
    Each "cluster" is replaced by an interval that spans it, and the algorithm terminates
    when it can no longer merge anything without creating a cluster that is larger than `max_size`.

    Args:
        intervals: The intervals to cluster.
        max_size: The maximum size (in bp) of the resulting clusters.

    Returns:
        A named tuple (`clusters`, `intervals`), where `clusters` contains one (named) interval per
        cluster, defining the region spanned by the cluster, and `intervals` contains the original
        set of intervals, each adorned with a `name` that agrees with that of a cluster in
        `clusters` that wholly contains it.

    Raises:
        ValueError: If any of the input intervals are larger than `max_size`.
    """

    intervals_by_refname: Dict[str, list[Interval]] = {
        key: list(group) for key, group in itertools.groupby(intervals, key=lambda x: x.refname)
    }
    # import ipdb; ipdb.set_trace()
    # iterate over the refnames and call _cluster_in_contig() per refname.

    # need to store this in a list since we iterate over it twice.
    per_refname_clusters_and_intervals = [
        x
        for x in (
            _cluster_in_contig(intervals_in_refname, max_size)
            for intervals_in_refname in intervals_by_refname.values()
        )
    ]

    # flatten the clusters and intervals
    clusters = [
        cluster
        for refname_result in per_refname_clusters_and_intervals
        for cluster in refname_result.clusters
    ]

    intervals_out = [
        interval
        for refname_results in per_refname_clusters_and_intervals
        for interval in refname_results.intervals
    ]

    return ClusteredIntervals(clusters=clusters, intervals=intervals_out)

coordmath

Methods for coordinate-based math and interval manipulation.

Contains the following public methods:

Functions

get_closed_end
get_closed_end(start: int, length: int) -> int

Get the closed end of an interval given its start and length.

Parameters:

Name Type Description Default
start int

The start position of the interval.

required
length int

The length of the interval.

required

Returns:

Type Description
int

The closed end of the interval.

Example

get_closed_end(start=10, length=5) 14

Source code in prymer/api/coordmath.py
def get_closed_end(start: int, length: int) -> int:
    """
    Get the closed end of an interval given its start and length.

    Args:
        start: The start position of the interval.
        length: The length of the interval.

    Returns:
        The closed end of the interval.

    Example:
        >>> get_closed_end(start=10, length=5)
        14
    """
    return start + length - 1
get_locus_string
get_locus_string(record: Interval) -> str

Get the locus-string for an interval.

The output string will have the format <refname>:<start>-<end> No conversion on coordinates is performed, so the output is 0-based, open-ended.

Parameters:

Name Type Description Default
record Interval

The interval to get the locus-string for.

required

Returns:

Type Description
str

A locus-string for the interval.

Source code in prymer/api/coordmath.py
def get_locus_string(record: Interval) -> str:
    """
    Get the locus-string for an interval.

    The output string will have the format `<refname>:<start>-<end>`
    No conversion on coordinates is performed, so the output is 0-based, open-ended.

    Args:
        record: The interval to get the locus-string for.

    Returns:
        A locus-string for the interval.
    """
    return f"{record.refname}:{record.start}-{record.end}"
require_same_refname
require_same_refname(*intervals: Interval) -> None

Require that the input intervals all have the same refname.

Parameters:

Name Type Description Default
intervals Interval

one or more intervals

()

Raises:

Type Description
ValueError

if the intervals do not all have the same refname.

Source code in prymer/api/coordmath.py
def require_same_refname(*intervals: Interval) -> None:
    """
    Require that the input intervals all have the same refname.

    Args:
        intervals: one or more intervals

    Raises:
        ValueError: if the intervals do not all have the same refname.
    """

    refnames = set(i.refname for i in intervals)
    if len(refnames) != 1:
        raise ValueError(f"`intervals` must have exactly one refname\n Found {sorted(refnames)}")

melting

Methods for calculating melting temperatures.

There is currently one public method available:

Examples
>>> calculate_long_seq_tm(seq="GT" * 10, salt_molar_concentration=10.0, percent_formamide=10.0)
78.64999999999999

Functions

calculate_long_seq_tm
calculate_long_seq_tm(
    seq: str,
    salt_molar_concentration: float = 1.65,
    percent_formamide: float = 15.0,
) -> float

Calculate the melting temperature of an amplicon.

Uses the formula:

Tm = 81.5 + 0.41(%GC) - 675/N + 16.6 x log[Na+] - 0.62(%F)

from:

(Marmur & Doty 1962, J Mol Biol 5: 109-118; Schildkraut & Lifson 1965, Biopolymers 3: 195-208)

with the added chemical (formamide) correction.

Parameters:

Name Type Description Default
seq str

the amplicon sequence

required
salt_molar_concentration float

the molar concentration of salt

1.65
percent_formamide float

the percent formamide

15.0

Returns:

Type Description
float

the predicted melting temperature

Source code in prymer/api/melting.py
def calculate_long_seq_tm(
    seq: str, salt_molar_concentration: float = 1.65, percent_formamide: float = 15.0
) -> float:
    """Calculate the melting temperature of an amplicon.

    Uses the formula:

    `Tm = 81.5 + 0.41(%GC) - 675/N + 16.6 x log[Na+] - 0.62(%F)`

    from:

    (Marmur & Doty 1962, J Mol Biol 5: 109-118; Schildkraut & Lifson 1965, Biopolymers 3: 195-208)

    with the added chemical (formamide) correction.

    Args:
        seq: the amplicon sequence
        salt_molar_concentration: the molar concentration of salt
        percent_formamide: the percent formamide

    Returns:
        the predicted melting temperature
    """
    return (
        81.5
        + (16.6 * math.log10(salt_molar_concentration))
        + (41.0 * gc_content(seq))
        - (675.0 / len(seq))
        - (0.62 * percent_formamide)
    )

minoptmax

MinOptMax Classes and Methods

This module contains a class and class methods to hold a range of user-specific thresholds.

Each of the three class attributes represents a minimal, optimal, and maximal value. The three values can be either int or float values but all must be of the same type within one MinOptMax object (for example, min cannot be a float while max is an int).

Primer3 will use these values downstream to set an allowable range of specific parameters that inform primer design. For example, Primer3 can constrain primer melting temperature to be within a range of 55.0 - 65.0 Celsius based on an input MinOptMax(min=55.0, opt=60.0, max=65.0).

Examples of interacting with the MinOptMax class
>>> thresholds = MinOptMax(min=1.0, opt=2.0, max=4.0)
>>> print(thresholds)
(min:1.0, opt:2.0, max:4.0)
>>> list(thresholds)
[1.0, 2.0, 4.0]

Classes

MinOptMax dataclass

Bases: Generic[Numeric]

Stores a minimum, an optimal, and a maximum value (either all ints or all floats).

min must be less than max. opt should be greater than the min value and less than the max value.

Attributes:

Name Type Description
min Numeric

the minimum value

opt Numeric

the optimal value

max Numeric

the maximum value

Raises:

Type Description
ValueError

if min > max

ValueError

if min is not less than opt and opt is not less than max

Source code in prymer/api/minoptmax.py
@dataclass(slots=True, frozen=True, init=True)
class MinOptMax(Generic[Numeric]):
    """Stores a minimum, an optimal, and a maximum value (either all ints or all floats).

    `min` must be less than `max`. `opt` should be greater than the `min`
     value and less than the `max` value.

    Attributes:
        min: the minimum value
        opt: the optimal value
        max: the maximum value


    Raises:
        ValueError: if min > max
        ValueError: if `min` is not less than `opt` and `opt` is not less than `max`
    """

    min: Numeric
    opt: Numeric
    max: Numeric

    def __post_init__(self) -> None:
        dtype = type(self.min)
        if not isinstance(self.max, dtype) or not isinstance(self.opt, dtype):
            raise TypeError(
                "Min, opt, and max must be the same type; "
                f"received min: {dtype}, opt: {type(self.opt)}, max: {type(self.max)}"
            )
        if self.min > self.max:
            raise ValueError(
                f"Min must be no greater than max; received min: {self.min}, max: {self.max}"
            )
        if not (self.min <= self.opt <= self.max):
            raise ValueError(
                "Arguments must satisfy min <= opt <= max: "
                f"received min: {self.min}, opt: {self.opt}, max: {self.max}"
            )

    def __iter__(self) -> Iterator[float]:
        """Returns an iterator of min, opt, and max"""
        return iter([self.min, self.opt, self.max])

    def __str__(self) -> str:
        """Returns a string representation of min, opt, and max"""
        return f"(min:{self.min}, opt:{self.opt}, max:{self.max})"
Functions
__iter__
__iter__() -> Iterator[float]

Returns an iterator of min, opt, and max

Source code in prymer/api/minoptmax.py
def __iter__(self) -> Iterator[float]:
    """Returns an iterator of min, opt, and max"""
    return iter([self.min, self.opt, self.max])
__str__
__str__() -> str

Returns a string representation of min, opt, and max

Source code in prymer/api/minoptmax.py
def __str__(self) -> str:
    """Returns a string representation of min, opt, and max"""
    return f"(min:{self.min}, opt:{self.opt}, max:{self.max})"

oligo

Oligo Class and Methods

This module contains a class and class methods to represent an oligo (e.g., designed by Primer3).

Oligos can represent single primer and/or internal probe designs.

Class attributes include the base sequence, melting temperature, and the score of the oligo. The mapping of the oligo to the genome is also stored.

Optional attributes include naming information and a tail sequence to attach to the 5' end of the oligo (if applicable). Optional attributes also include the thermodynamic results from Primer3.

Examples of interacting with the Oligo class
>>> from prymer.api.span import Span, Strand
>>> oligo_span = Span(refname="chr1", start=1, end=20)
>>> oligo = Oligo(tm=70.0, penalty=-123.0, span=oligo_span, bases="AGCT" * 5)
>>> oligo.longest_hp_length()
1
>>> oligo.length
20
>>> oligo.name is None
True
>>> oligo = Oligo(tm=70.0, penalty=-123.0, span=oligo_span, bases="GACGG"*4)
>>> oligo.longest_hp_length()
3
>>> oligo.untailed_length()
20
>>> oligo.tailed_length()
20
>>> primer = oligo.with_tail(tail="GATTACA")
>>> primer.untailed_length()
20
>>> primer.tailed_length()
27
>>> primer = primer.with_name(name="fwd_primer")
>>> primer.name
'fwd_primer'

Oligos may also be written to a file and subsequently read back in, as the Oligo class is an fgpyo Metric class:

>>> from pathlib import Path
>>> left_span = Span(refname="chr1", start=1, end=20)
>>> left = Oligo(tm=70.0, penalty=-123.0, span=left_span, bases="G"*20)
>>> right_span = Span(refname="chr1", start=101, end=120)
>>> right = Oligo(tm=70.0, penalty=-123.0, span=right_span, bases="T"*20)
>>> path = Path("/tmp/path/to/primers.txt")
>>> Oligo.write(path, left, right)  # doctest: +SKIP
>>> primers = Oligo.read(path)  # doctest: +SKIP
>>> list(primers)  # doctest: +SKIP
[
    Oligo(tm=70.0, penalty=-123.0, span=amplicon_span, bases="G"*20),
    Oligo(tm=70.0, penalty=-123.0, span=amplicon_span, bases="T"*20)
]

Attributes

Classes

Oligo dataclass

Bases: OligoLike, Metric['Oligo']

Stores the properties of the designed oligo.

Oligos can include both single primer and internal probe designs. Primer3 emits information about the specific properties of a primer and/or probe, including the base sequence, melting temperature (tm), and score.

The penalty score of the design is emitted by Primer3 and controlled by the corresponding design parameters. The penalty for a primer is set by the combination of PrimerAndAmpliconParameters and PrimerWeights. A probe penalty is set by ProbeParameters and ProbeWeights.

The span of an Oligo object represents the mapping of the oligo to the genome. Oligo objects can optionally have a tail sequence to prepend to the 5' end of the primer or probe, as well as a name for downstream record keeping.

Oligo objects can also store thermodynamic characteristics of primer and/or probe design. These include the calculated melting temperatures of the most stable hairpin structure, self-, and end- complementarity. These values can be emitted by Primer3.

These thermodynamic characteristics are meant to quantify how likely it is the primer or probe will bind to itself (e.g., instead of the target DNA). A melting temperature close to or greater than the intended melting temperature for target binding indicates the primer or probe is likely to form stable hairpins or dimers, leading to reduced efficiency of the reaction.

Attributes:

Name Type Description
tm float

the calculated melting temperature of the oligo

penalty float

the penalty or score for the oligo

span Span

the mapping of the primer to the genome

name Span

an optional name to use for the primer

tm_homodimer Optional[float]

calculated melting temperature that represents the tendency of an oligo to bind to itself (self-complementarity)

tm_3p_anchored_homodimer Optional[float]

calculated melting temperature that represents the tendency of a primer to bind to the 3'-END of an identical primer

tm_secondary_structure Optional[float]

calculated melting temperature of the oligo hairpin structure

bases Optional[str]

the base sequence of the oligo (excluding any tail)

tail Optional[str]

an optional tail sequence to put on the 5' end of the primer

Source code in prymer/api/oligo.py
@dataclass(frozen=True, init=True, kw_only=True, slots=True)
class Oligo(OligoLike, Metric["Oligo"]):
    """Stores the properties of the designed oligo.

    Oligos can include both single primer and internal probe designs. Primer3 emits information
    about the specific properties of a primer and/or probe, including the base sequence,
    melting temperature (tm), and score.

    The penalty score of the design is emitted by Primer3 and controlled by the corresponding
    design parameters. The penalty for a primer is set by the combination of
    `PrimerAndAmpliconParameters` and `PrimerWeights`.
    A probe penalty is set by `ProbeParameters` and `ProbeWeights`.

    The span of an `Oligo` object represents the mapping of the oligo
    to the genome. `Oligo` objects can optionally have a `tail` sequence to prepend to the 5' end
    of the primer or probe, as well as a `name` for downstream record keeping.

    `Oligo` objects can also store thermodynamic characteristics of primer and/or probe
    design. These include the calculated melting temperatures of the most stable hairpin structure,
    self-, and end- complementarity. These values can be emitted by Primer3.

    These thermodynamic characteristics are meant to quantify how likely it is the primer or probe
    will bind to itself (e.g., instead of the target DNA). A melting temperature close to or greater
    than the intended melting temperature for target binding indicates the primer or probe is
    likely to form stable hairpins or dimers, leading to reduced efficiency of the reaction.

    Attributes:
        tm: the calculated melting temperature of the oligo
        penalty: the penalty or score for the oligo
        span: the mapping of the primer to the genome
        name: an optional name to use for the primer
        tm_homodimer: calculated melting temperature that represents
            the tendency of an oligo to bind to itself (self-complementarity)
        tm_3p_anchored_homodimer: calculated melting temperature that represents
            the tendency of a primer to bind to the 3'-END of an identical primer
        tm_secondary_structure: calculated melting temperature of the oligo hairpin structure
        bases: the base sequence of the oligo (excluding any tail)
        tail: an optional tail sequence to put on the 5' end of the primer

    """

    tm: float
    penalty: float
    span: Span
    tm_homodimer: Optional[float] = None
    tm_3p_anchored_homodimer: Optional[float] = None
    tm_secondary_structure: Optional[float] = None
    bases: Optional[str] = None
    tail: Optional[str] = None

    def __post_init__(self) -> None:
        super(Oligo, self).__post_init__()

    def longest_hp_length(self) -> int:
        """Length of longest homopolymer in the oligo."""
        if self.bases is None:
            return 0
        else:
            return longest_homopolymer_length(self.bases)

    @property
    def length(self) -> int:
        """Length of un-tailed oligo."""
        return self.span.length

    def untailed_length(self) -> int:
        """Length of un-tailed oligo."""
        return self.span.length

    def tailed_length(self) -> int:
        """Length of tailed oligo."""
        return self.span.length if self.tail is None else self.span.length + len(self.tail)

    def longest_dinucleotide_run_length(self) -> int:
        """Number of bases in the longest dinucleotide run in a oligo.

        A dinucleotide run is when length two repeat-unit is repeated. For example,
        TCTC (length = 4) or ACACACACAC (length = 10). If there are no such runs, returns 2
        (or 0 if there are fewer than 2 bases)."""
        return longest_dinucleotide_run_length(self.bases)

    def with_tail(self, tail: str) -> "Oligo":
        """Returns a copy of the oligo with the tail sequence attached."""
        return replace(self, tail=tail)

    def with_name(self, name: str) -> "Oligo":
        """Returns a copy of oligo object with the given name."""
        return replace(self, name=name)

    def bases_with_tail(self) -> Optional[str]:
        """
        Returns the sequence of the oligo prepended by the tail.

        If `tail` is None, only return `bases`.
        """
        if self.tail is None:
            return self.bases
        return f"{self.tail}{self.bases}"

    def to_bed12_row(self) -> str:
        """Returns the BED detail format view:
        https://genome.ucsc.edu/FAQ/FAQformat.html#format1.7"""
        bed_coord = self.span.get_bedlike_coords()
        return "\t".join(
            map(
                str,
                [
                    self.span.refname,  # contig
                    bed_coord.start,  # start
                    bed_coord.end,  # end
                    self.id,  # name
                    500,  # score
                    self.span.strand.value,  # strand
                    bed_coord.start,  # thick start
                    bed_coord.end,  # thick end
                    "100,100,100",  # color
                    1,  # block count
                    f"{self.length}",  # block sizes
                    "0",  # block starts (relative to `start`)
                ],
            )
        )

    def __str__(self) -> str:
        """
        Returns a string representation of this oligo
        """
        # If the bases field is None, replace with MISSING_BASES_STRING
        bases: str = self.bases if self.bases is not None else MISSING_BASES_STRING
        return f"{bases}\t{self.tm}\t{self.penalty}\t{self.span}"

    @classmethod
    def _parsers(cls) -> Dict[type, Callable[[str], Any]]:
        return {
            Span: lambda value: Span.from_string(value),
        }

    @staticmethod
    def compare(this: "Oligo", that: "Oligo", seq_dict: SequenceDictionary) -> int:
        """Compares this oligo to that oligo by their span, ordering references using the given
        sequence dictionary.

        Args:
            this: the first oligo
            that: the second oligo
            seq_dict: the sequence dictionary used to order references

        Returns:
            -1 if this oligo is less than the that oligo, 0 if equal, 1 otherwise
        """
        return Span.compare(this=this.span, that=that.span, seq_dict=seq_dict)
Attributes
length property
length: int

Length of un-tailed oligo.

Functions
__str__
__str__() -> str

Returns a string representation of this oligo

Source code in prymer/api/oligo.py
def __str__(self) -> str:
    """
    Returns a string representation of this oligo
    """
    # If the bases field is None, replace with MISSING_BASES_STRING
    bases: str = self.bases if self.bases is not None else MISSING_BASES_STRING
    return f"{bases}\t{self.tm}\t{self.penalty}\t{self.span}"
bases_with_tail
bases_with_tail() -> Optional[str]

Returns the sequence of the oligo prepended by the tail.

If tail is None, only return bases.

Source code in prymer/api/oligo.py
def bases_with_tail(self) -> Optional[str]:
    """
    Returns the sequence of the oligo prepended by the tail.

    If `tail` is None, only return `bases`.
    """
    if self.tail is None:
        return self.bases
    return f"{self.tail}{self.bases}"
compare staticmethod
compare(
    this: Oligo, that: Oligo, seq_dict: SequenceDictionary
) -> int

Compares this oligo to that oligo by their span, ordering references using the given sequence dictionary.

Parameters:

Name Type Description Default
this Oligo

the first oligo

required
that Oligo

the second oligo

required
seq_dict SequenceDictionary

the sequence dictionary used to order references

required

Returns:

Type Description
int

-1 if this oligo is less than the that oligo, 0 if equal, 1 otherwise

Source code in prymer/api/oligo.py
@staticmethod
def compare(this: "Oligo", that: "Oligo", seq_dict: SequenceDictionary) -> int:
    """Compares this oligo to that oligo by their span, ordering references using the given
    sequence dictionary.

    Args:
        this: the first oligo
        that: the second oligo
        seq_dict: the sequence dictionary used to order references

    Returns:
        -1 if this oligo is less than the that oligo, 0 if equal, 1 otherwise
    """
    return Span.compare(this=this.span, that=that.span, seq_dict=seq_dict)
longest_dinucleotide_run_length
longest_dinucleotide_run_length() -> int

Number of bases in the longest dinucleotide run in a oligo.

A dinucleotide run is when length two repeat-unit is repeated. For example, TCTC (length = 4) or ACACACACAC (length = 10). If there are no such runs, returns 2 (or 0 if there are fewer than 2 bases).

Source code in prymer/api/oligo.py
def longest_dinucleotide_run_length(self) -> int:
    """Number of bases in the longest dinucleotide run in a oligo.

    A dinucleotide run is when length two repeat-unit is repeated. For example,
    TCTC (length = 4) or ACACACACAC (length = 10). If there are no such runs, returns 2
    (or 0 if there are fewer than 2 bases)."""
    return longest_dinucleotide_run_length(self.bases)
longest_hp_length
longest_hp_length() -> int

Length of longest homopolymer in the oligo.

Source code in prymer/api/oligo.py
def longest_hp_length(self) -> int:
    """Length of longest homopolymer in the oligo."""
    if self.bases is None:
        return 0
    else:
        return longest_homopolymer_length(self.bases)
tailed_length
tailed_length() -> int

Length of tailed oligo.

Source code in prymer/api/oligo.py
def tailed_length(self) -> int:
    """Length of tailed oligo."""
    return self.span.length if self.tail is None else self.span.length + len(self.tail)
to_bed12_row
to_bed12_row() -> str

Returns the BED detail format view: https://genome.ucsc.edu/FAQ/FAQformat.html#format1.7

Source code in prymer/api/oligo.py
def to_bed12_row(self) -> str:
    """Returns the BED detail format view:
    https://genome.ucsc.edu/FAQ/FAQformat.html#format1.7"""
    bed_coord = self.span.get_bedlike_coords()
    return "\t".join(
        map(
            str,
            [
                self.span.refname,  # contig
                bed_coord.start,  # start
                bed_coord.end,  # end
                self.id,  # name
                500,  # score
                self.span.strand.value,  # strand
                bed_coord.start,  # thick start
                bed_coord.end,  # thick end
                "100,100,100",  # color
                1,  # block count
                f"{self.length}",  # block sizes
                "0",  # block starts (relative to `start`)
            ],
        )
    )
untailed_length
untailed_length() -> int

Length of un-tailed oligo.

Source code in prymer/api/oligo.py
def untailed_length(self) -> int:
    """Length of un-tailed oligo."""
    return self.span.length
with_name
with_name(name: str) -> Oligo

Returns a copy of oligo object with the given name.

Source code in prymer/api/oligo.py
def with_name(self, name: str) -> "Oligo":
    """Returns a copy of oligo object with the given name."""
    return replace(self, name=name)
with_tail
with_tail(tail: str) -> Oligo

Returns a copy of the oligo with the tail sequence attached.

Source code in prymer/api/oligo.py
def with_tail(self, tail: str) -> "Oligo":
    """Returns a copy of the oligo with the tail sequence attached."""
    return replace(self, tail=tail)

oligo_like

Class and Methods for oligo-like objects

The OligoLike class is an abstract base class designed to represent oligo-like objects, such as individual primers and probes or primer pairs. This class encapsulates common attributes and provides a foundation for more specialized implementations.

In particular, the following methods/attributes need to be implemented:

  • span() -- the mapping of the oligo-like object to the genome.
  • bases() -- the bases of the oligo-like object, or None if not available.
  • to_bed12_row() -- the 12-field BED representation of this oligo-like object.

See the following concrete implementations:

  • Primer -- a class to store an individual oligo
  • PrimerPair -- a class to store a primer pair

Attributes

MISSING_BASES_STRING module-attribute
MISSING_BASES_STRING: str = '*'

Used in string representations of primer-like objects when their bases property return None

Classes

OligoLike dataclass

Bases: ABC

An abstract base class for oligo-like objects, such as individual primers or primer pairs.

Attributes:

Name Type Description
name Optional[str]

an optional name to use for the primer

The id field shall be the 'name' field if supplied, or a generated value based on the location of the primer-like object.

Source code in prymer/api/oligo_like.py
@dataclass(frozen=True, init=True, slots=True)
class OligoLike(ABC):
    """
    An abstract base class for oligo-like objects, such as individual primers or primer pairs.

    Attributes:
        name: an optional name to use for the primer

    The `id` field shall be the 'name' field if supplied, or a generated value based on the
    location of the primer-like object.
    """

    name: Optional[str] = None

    def __post_init__(self) -> None:
        # If supplied, bases must be a non-empty String & the same length as the
        # span length
        if self.bases is not None:
            if len(self.bases) == 0:
                raise ValueError("Bases must not be an empty string")
            elif self.span.length != len(self.bases):
                raise ValueError(
                    "Conflicting lengths: "
                    f"span length={self.span.length},"
                    f" sequence length={len(self.bases)}"
                )

    @property
    @abstractmethod
    def span(self) -> Span:
        """Returns the mapping of the oligo-like object to a genome."""

    @property
    @abstractmethod
    def bases(self) -> Optional[str]:
        """Returns the base sequence of the oligo-like object."""

    @property
    def percent_gc_content(self) -> float:
        """
        The GC of the amplicon sequence in the range 0-100, or zero if there is no amplicon
        sequence.
        """
        if self.bases is None:
            return 0.0
        else:
            return round(gc_content(self.bases) * 100, 3)

    @property
    def id(self) -> str:
        """
        Returns the identifier for the oligo-like object. This shall be the `name`
        if one exists, otherwise a generated value based on the location of the object.
        """
        if self.name is not None:
            return self.name
        else:
            return self.location_string

    @property
    def location_string(self) -> str:
        """Returns a string representation of the location of the oligo-like object."""
        return (
            f"{self.span.refname}_{self.span.start}_"
            + f"{self.span.end}_{self._strand_to_location_string()}"
        )

    @abstractmethod
    def to_bed12_row(self) -> str:
        """
        Formats the oligo-like into 12 tab-separated fields matching the BED 12-column spec.
        See: https://genome.ucsc.edu/FAQ/FAQformat.html#format1
        """

    def _strand_to_location_string(self) -> str:
        """
        Returns a string representation appropriate for location_string of the strand of the
        primer
        """
        match self.span.strand:
            case Strand.POSITIVE:
                return "F"
            case Strand.NEGATIVE:
                return "R"
            case _:  # pragma: no cover
                # Not calculating coverage on this line as it should be impossible to reach
                assert_never(f"Encountered unhandled Strand value: {self.span.strand}")
Attributes
bases abstractmethod property
bases: Optional[str]

Returns the base sequence of the oligo-like object.

id property
id: str

Returns the identifier for the oligo-like object. This shall be the name if one exists, otherwise a generated value based on the location of the object.

location_string property
location_string: str

Returns a string representation of the location of the oligo-like object.

percent_gc_content property
percent_gc_content: float

The GC of the amplicon sequence in the range 0-100, or zero if there is no amplicon sequence.

span abstractmethod property
span: Span

Returns the mapping of the oligo-like object to a genome.

Functions
to_bed12_row abstractmethod
to_bed12_row() -> str

Formats the oligo-like into 12 tab-separated fields matching the BED 12-column spec. See: https://genome.ucsc.edu/FAQ/FAQformat.html#format1

Source code in prymer/api/oligo_like.py
@abstractmethod
def to_bed12_row(self) -> str:
    """
    Formats the oligo-like into 12 tab-separated fields matching the BED 12-column spec.
    See: https://genome.ucsc.edu/FAQ/FAQformat.html#format1
    """

picking

Methods and class for building and scoring primer pairs from a list of left and right primers.

Typically, primer pairs are built from a list of left and right primers for a given target with the build_primer_pairs() method. The returned primer pairs are automatically scored (using the score() method).

Module contents

Contains the following public classes and methods:

  • score() -- Scores the amplicon amplified by a primer pair in a manner similar to Primer3.
  • build_primer_pairs() -- Builds primer pairs from individual left and primers.

Classes

Functions

build_primer_pairs
build_primer_pairs(
    left_primers: Sequence[Oligo],
    right_primers: Sequence[Oligo],
    target: Span,
    amplicon_sizes: MinOptMax[int],
    amplicon_tms: MinOptMax[float],
    max_heterodimer_tm: Optional[float],
    weights: PrimerAndAmpliconWeights,
    fasta_path: Path,
) -> Iterator[PrimerPair]

Builds primer pairs from individual left and primers.

Only primer pairs that meet the requirements for amplicon sizes and tms will be returned. In addition, if max_heterodimer_tm is provided then primer pairs with a heterodimer tm exceeding the maximum will also be discarded.

Parameters:

Name Type Description Default
left_primers Sequence[Oligo]

the left primers

required
right_primers Sequence[Oligo]

the right primers

required
target Span

the genome mapping for the target

required
amplicon_sizes MinOptMax[int]

minimum, optimal, and maximum amplicon sizes (lengths)

required
amplicon_tms MinOptMax[float]

minimum, optimal, and maximum amplicon Tms

required
max_heterodimer_tm Optional[float]

if supplied, heterodimer Tms will be calculated for primer pairs, and those exceeding the maximum Tm will be discarded

required
weights PrimerAndAmpliconWeights

the set of penalty weights

required
fasta_path Path

the path to the FASTA file from which the amplicon sequence will be retrieved.

required

Returns:

Type Description
Iterator[PrimerPair]

An iterator over all the valid primer pairs, sorted by primer pair penalty.

Iterator[PrimerPair]

Primer pairs with smaller penalties are returned first.

Source code in prymer/api/picking.py
def build_primer_pairs(  # noqa: C901
    left_primers: Sequence[Oligo],
    right_primers: Sequence[Oligo],
    target: Span,
    amplicon_sizes: MinOptMax[int],
    amplicon_tms: MinOptMax[float],
    max_heterodimer_tm: Optional[float],
    weights: PrimerAndAmpliconWeights,
    fasta_path: Path,
) -> Iterator[PrimerPair]:
    """Builds primer pairs from individual left and primers.

    Only primer pairs that meet the requirements for amplicon sizes and tms will be returned.
    In addition, if `max_heterodimer_tm` is provided then primer pairs with a heterodimer tm
    exceeding the maximum will also be discarded.

    Args:
        left_primers: the left primers
        right_primers: the right primers
        target: the genome mapping for the target
        amplicon_sizes: minimum, optimal, and maximum amplicon sizes (lengths)
        amplicon_tms: minimum, optimal, and maximum amplicon Tms
        max_heterodimer_tm: if supplied, heterodimer Tms will be calculated for primer pairs,
            and those exceeding the maximum Tm will be discarded
        weights: the set of penalty weights
        fasta_path: the path to the FASTA file from which the amplicon sequence will be retrieved.

    Returns:
        An iterator over all the valid primer pairs, sorted by primer pair penalty.
        Primer pairs with smaller penalties are returned first.
    """
    # Short circuit if we have no left primers or no right primers
    if not any(left_primers) or not any(right_primers):
        return

    if any(p.span.refname != target.refname for p in left_primers):
        raise ValueError("Left primers exist on different reference than target.")

    if any(p.span.refname != target.refname for p in right_primers):
        raise ValueError("Right primers exist on different reference than target.")

    # Sort the left and right primers
    left_primers = sorted(left_primers, key=lambda p: p.span.start)
    right_primers = sorted(right_primers, key=lambda p: p.span.end)

    # Grab the sequence we'll use to fill in the amplicon sequence
    with FastaFile(f"{fasta_path}") as fasta:
        region_start = min(p.span.start for p in left_primers)
        region_end = max(p.span.end for p in right_primers)
        bases = fasta.fetch(target.refname, region_start - 1, region_end)

    # Each tuple is left_idx, right_idx, penalty, tm
    pairings: list[Tuple[int, int, float, float]] = []

    # generate all the primer pairs that don't violate hard size and Tm constraints
    first_right_primer_idx = 0

    # Nested loops over indices are used here so that we can skip potentially large chunks of
    # the cartesian product, based on the fact that we're sorting the left and right primers.
    # Two things are relied upon:
    #   1. If we encounter a left/right combo that either has the right primer leftward of the
    #      left primer _or_ generates a too-short amplicon, the neither that right primer nor
    #      any previous right primer can make a valid combination with any subsequent left primer.
    #   2. If we encounter a left/right combo that generates a too-large amplicon, then no
    #      subsequent right-primer can make a valid combination with that left primer
    for i in range(0, len(left_primers)):
        for j in range(first_right_primer_idx, len(right_primers)):
            lp = left_primers[i]
            rp = right_primers[j]

            # If the right primer isn't "to the right" of the left primer, move on
            if rp.span.start < lp.span.start or lp.span.end > rp.span.end:
                first_right_primer_idx = max(first_right_primer_idx, j+1)
                continue

            amp_span = PrimerPair.calculate_amplicon_span(lp, rp)

            if amp_span.length < amplicon_sizes.min:
                first_right_primer_idx = max(first_right_primer_idx, j+1)
                continue

            if amp_span.length > amplicon_sizes.max:
                break  # break in this case because all subsequent rps will yield longer amplicons

            # Since the amplicon span and the region_start are both 1-based, the minuend
            # becomes a zero-based offset
            amp_bases = bases[amp_span.start - region_start : amp_span.end - region_start + 1]
            amp_tm = calculate_long_seq_tm(amp_bases)

            if amp_tm < amplicon_tms.min or amp_tm > amplicon_tms.max:
                continue

            penalty = score(
                left_primer=lp,
                right_primer=rp,
                amplicon=amp_span,
                amplicon_tm=amp_tm,
                amplicon_sizes=amplicon_sizes,
                amplicon_tms=amplicon_tms,
                weights=weights,
            )

            pairings.append((i, j, penalty, amp_tm))

    # Sort by the penalty, ascending
    pairings.sort(key=lambda tup: tup[2])

    with NtThermoAlign() as ntthal:
        for i, j, penalty, tm in pairings:
            lp = left_primers[i]
            rp = right_primers[j]

            if max_heterodimer_tm is not None:
                if ntthal.duplex_tm(lp.bases, rp.bases) > max_heterodimer_tm:
                    continue

            amp_bases = bases[lp.span.start - region_start : rp.span.end - region_start + 1]

            pp = PrimerPair(
                left_primer=lp,
                right_primer=rp,
                amplicon_sequence=amp_bases,
                amplicon_tm=tm,
                penalty=penalty,
            )

            yield pp
score
score(
    left_primer: Oligo,
    right_primer: Oligo,
    amplicon: Span,
    amplicon_tm: float,
    amplicon_sizes: MinOptMax[int],
    amplicon_tms: MinOptMax[float],
    weights: PrimerAndAmpliconWeights,
) -> float

Score the amplicon in a manner similar to Primer3

Sums the following:

  1. Primer penalties: the provided left and right primer penalties
  2. Amplicon size: The difference between the current amplicon size and optimal amplicon size scaled by the product size weight. Is zero if the optimal amplicon size is zero.
  3. Amplicon Tm: The difference in melting temperature between the calculated and optimal, weighted by the product melting temperature.

Parameters:

Name Type Description Default
left_primer Oligo

the left primer

required
right_primer Oligo

the right primer

required
amplicon Span

the amplicon mapping

required
amplicon_tm float

the melting temperature of the amplicon

required
amplicon_sizes MinOptMax[int]

minimum, optimal, and maximum amplicon sizes (lengths)

required
amplicon_tms MinOptMax[float]

minimum, optimal, and maximum amplicon Tms

required
weights PrimerAndAmpliconWeights

the set of penalty weights

required

Returns:

Type Description
float

the penalty for the whole amplicon.

Source code in prymer/api/picking.py
def score(
    left_primer: Oligo,
    right_primer: Oligo,
    amplicon: Span,
    amplicon_tm: float,
    amplicon_sizes: MinOptMax[int],
    amplicon_tms: MinOptMax[float],
    weights: PrimerAndAmpliconWeights,
) -> float:
    """Score the amplicon in a manner similar to Primer3

    Sums the following:

    1. Primer penalties: the provided left and right primer penalties
    2. Amplicon size: The difference between the current amplicon size and optimal amplicon size
       scaled by the product size weight.  Is zero if the optimal amplicon size is zero.
    3. Amplicon Tm: The difference in melting temperature between the calculated and optimal,
       weighted by the product melting temperature.

    Args:
        left_primer: the left primer
        right_primer: the right primer
        amplicon: the amplicon mapping
        amplicon_tm: the melting temperature of the amplicon
        amplicon_sizes: minimum, optimal, and maximum amplicon sizes (lengths)
        amplicon_tms: minimum, optimal, and maximum amplicon Tms
        weights: the set of penalty weights

    Returns:
        the penalty for the whole amplicon.
    """
    # The penalty for the amplicon size:
    # 1. No penalty if the optimal amplicon size is zero
    # 2. The difference between the current amplicon size and optimal amplicon size scaled by the
    #    product size weight.  The product size weight is different depending on if the amplicon
    #    size is greater or less than the optimal amplicons.
    size_penalty: float
    if amplicon_sizes.opt == 0:
        size_penalty = 0.0
    elif amplicon.length > amplicon_sizes.opt:
        size_penalty = (amplicon.length - amplicon_sizes.opt) * weights.product_size_gt
    else:
        size_penalty = (amplicon_sizes.opt - amplicon.length) * weights.product_size_lt

    # The penalty for the amplicon melting temperature.
    # The difference in melting temperature between the calculated and optimal is weighted by the
    # product melting temperature.
    tm_penalty: float
    if amplicon_tms.opt == 0.0:
        tm_penalty = 0.0
    elif amplicon_tm > amplicon_tms.opt:
        tm_penalty = (amplicon_tm - amplicon_tms.opt) * weights.product_tm_gt
    else:
        tm_penalty = (amplicon_tms.opt - amplicon_tm) * weights.product_tm_lt

    # Put it all together
    return left_primer.penalty + right_primer.penalty + size_penalty + tm_penalty

primer

This module is deprecated - see prymer/api/oligo.py

Classes

Primer dataclass

Bases: Oligo

A deprecated alias for Oligo.

This class exists to maintain backwards compatibility with earlier releases of prymer and may be removed in a future version.

Source code in prymer/api/primer.py
@dataclass(frozen=True, init=True, slots=True)
class Primer(Oligo):
    """
    A deprecated alias for `Oligo`.

    This class exists to maintain backwards compatibility with earlier releases of `prymer`
    and may be removed in a future version.
    """

    warnings.warn(
        "The Primer class was deprecated, use Oligo instead",
        DeprecationWarning,
        stacklevel=2,
    )

primer_pair

Primer Pair Class and Methods

This module contains the PrimerPair class and class methods to represent a primer pair. The primer pair is comprised of a left and right primer that work together to amplify an amplicon.

Class attributes include each of the primers (represented by an Oligo object), information about the expected amplicon (positional information about how the amplicon maps to the genome, the sequence, and its melting temperature), as well as a score of the primer pair (e.g. as emitted by Primer3).

Optional attributes include naming information to keep track of the pair and design parameters used to create the pairing.

Examples

```python

from prymer.api.span import Strand left_span = Span(refname="chr1", start=1, end=20) left_primer = Oligo(tm=70.0, penalty=-123.0, span=left_span, bases="G"20) right_span = Span(refname="chr1", start=101, end=120, strand=Strand.NEGATIVE) right_primer = Oligo(tm=70.0, penalty=-123.0, span=right_span, bases="T"20) primer_pair = PrimerPair( left_primer=left_primer, right_primer=right_primer, amplicon_sequence=None, amplicon_tm=70.0, penalty=-123.0, name="foobar" ) primer_pair.amplicon Span(refname='chr1', start=1, end=120, strand=) primer_pair.span Span(refname='chr1', start=1, end=120, strand=) primer_pair.inner Span(refname='chr1', start=21, end=100, strand=)

list(primer_pair) [Oligo(name=None, tm=70.0, penalty=-123.0, span=Span(refname='chr1', start=1, end=20, strand=), tm_homodimer=None, tm_3p_anchored_homodimer=None, tm_secondary_structure=None, bases='GGGGGGGGGGGGGGGGGGGG', tail=None), Oligo(name=None, tm=70.0, penalty=-123.0, span=Span(refname='chr1', start=101, end=120, strand=), tm_homodimer=None, tm_3p_anchored_homodimer=None, tm_secondary_structure=None, bases='TTTTTTTTTTTTTTTTTTTT', tail=None)]

Attributes

Classes

PrimerPair dataclass

Bases: OligoLike

Represents a pair of primers that work together to amplify an amplicon. The coordinates of the amplicon are determined to span from the start of the left primer through the end of the right primer.

Attributes:

Name Type Description
left_primer Oligo

the left primer in the pair

right_primer Oligo

the right primer in the pair

amplicon_sequence Optional[str]

an optional sequence of the expected amplicon

amplicon_tm float

the melting temperature of the expected amplicon

penalty float

the penalty value assigned by primer3 to the primer pair

name float

an optional name to use for the primer pair

Raises:

Type Description
ValueError

if the chromosomes of the left and right primers are not the same

Source code in prymer/api/primer_pair.py
@dataclass(frozen=True, init=True, kw_only=True)
class PrimerPair(OligoLike):
    """
    Represents a pair of primers that work together to amplify an amplicon. The
    coordinates of the amplicon are determined to span from the start of the left
    primer through the end of the right primer.

    Attributes:
        left_primer: the left primer in the pair
        right_primer: the right primer in the pair
        amplicon_sequence: an optional sequence of the expected amplicon
        amplicon_tm: the melting temperature of the expected amplicon
        penalty: the penalty value assigned by primer3 to the primer pair
        name: an optional name to use for the primer pair

    Raises:
        ValueError: if the chromosomes of the left and right primers are not the same
    """

    left_primer: Oligo
    right_primer: Oligo
    amplicon_tm: float
    penalty: float
    amplicon_sequence: Optional[str] = None

    @cached_property
    def amplicon(self) -> Span:
        """Returns the mapping for the amplicon"""
        return self.calculate_amplicon_span(self.left_primer, self.right_primer)

    @property
    def span(self) -> Span:
        """Returns the mapping for the amplicon"""
        return self.amplicon

    @property
    def bases(self) -> Optional[str]:
        """Returns the bases of the amplicon sequence"""
        return self.amplicon_sequence

    @property
    def length(self) -> int:
        """Returns the length of the amplicon"""
        return self.amplicon.length

    @property
    def inner(self) -> Span:
        """
        Returns the inner region of the amplicon (not including the primers). I.e. the region of
        the genome covered by the primer pair, without the primer regions.  If the primers
        overlap, then the inner mapping is the midpoint at where they overlap
        """
        if self.left_primer.span.overlaps(self.right_primer.span):
            # Use a flooring division as these values are all ints
            midpoint = (self.left_primer.span.end + self.right_primer.span.start) // 2
            return replace(
                self.left_primer.span,
                start=midpoint,
                end=midpoint,
            )
        else:
            return replace(
                self.left_primer.span,
                start=self.left_primer.span.end + 1,
                end=self.right_primer.span.start - 1,
            )

    def with_tails(self, left_tail: str, right_tail: str) -> "PrimerPair":
        """
        Returns a copy of the primer pair where the left and right primers are tailed.

        Args:
            left_tail: The tail to add to the left primer
            right_tail: The tail to add to the right primer

        Returns:
            A copy of the primer pair with the tail(s) added to the primers
        """
        return replace(
            self,
            left_primer=self.left_primer.with_tail(left_tail),
            right_primer=self.right_primer.with_tail(right_tail),
        )

    def with_names(self, pp_name: str, lp_name: str, rp_name: str) -> "PrimerPair":
        """
        Returns a copy of the primer pair with names assigned to the primer pair,
        left primer, and right primer.

        Args:
            pp_name: The optional name of the primer pair
            lp_name: The optional name of the left primer
            rp_name: The optional name of the right primer

        Returns:
            A copy of the primer pair with the provided names assigned
        """
        return replace(
            self,
            name=pp_name,
            left_primer=self.left_primer.with_name(lp_name),
            right_primer=self.right_primer.with_name(rp_name),
        )

    def to_bed12_row(self) -> str:
        """
        Returns the BED detail format view: https://genome.ucsc.edu/FAQ/FAQformat.html#format1.7.

        NB: BED is 0-based and Prymer is 1-based, so we need to convert
        """
        block_sizes = ",".join(
            [
                f"{gm.length}"
                for gm in [
                    self.left_primer.span,
                    self.inner,
                    self.right_primer.span,
                ]
            ]
        )

        block_starts = ",".join(
            [
                f"{self.amplicon.get_offset(gm.start)}"
                for gm in [
                    self.left_primer.span,
                    self.inner,
                    self.right_primer.span,
                ]
            ],
        )
        bed_like_coords = self.span.get_bedlike_coords()
        return "\t".join(
            map(
                str,
                [
                    self.span.refname,  # contig
                    bed_like_coords.start,  # start
                    bed_like_coords.end,  # end
                    self.id,  # name
                    500,  # score
                    self.span.strand.value,  # strand
                    bed_like_coords.start,  # thick start
                    bed_like_coords.end,  # thick end
                    "100,100,100",  # color
                    3,  # block count
                    block_sizes,
                    block_starts,  # relative to `start`
                ],
            )
        )

    def __iter__(self) -> Iterator[Oligo]:
        """Returns an iterator of left and right primers"""
        return iter([self.left_primer, self.right_primer])

    def __str__(self) -> str:
        """Returns a string representation of the primer pair"""
        sequence = self.amplicon_sequence if self.amplicon_sequence else MISSING_BASES_STRING
        return (
            f"{self.left_primer}\t{self.right_primer}\t{sequence}\t"
            + f"{self.amplicon_tm}\t{self.penalty}"
        )

    @staticmethod
    def calculate_amplicon_span(left_primer: Oligo, right_primer: Oligo) -> Span:
        """
        Calculates the amplicon Span from the left and right primers.

        Args:
            left_primer: the left primer for the amplicon
            right_primer: the right primer for the amplicon

        Returns:
            a Span starting at the first base of the left primer and ending at the last base of
            the right primer

        Raises:
            ValueError: If `left_primer` and `right_primer` have different reference names.
            ValueError: If `left_primer` doesn't start before the right primer.
            ValueError: If `right_primer` ends before `left_primer`.
        """
        # Require that `left_primer` and `right_primer` both map to the same reference sequence
        if left_primer.span.refname != right_primer.span.refname:
            raise ValueError(
                "Left and right primers are on different references. "
                f"Left primer ref: {left_primer.span.refname}. "
                f"Right primer ref: {right_primer.span.refname}"
            )

        # Require that the left primer starts before the right primer
        if left_primer.span.start > right_primer.span.start:
            raise ValueError(
                "Left primer does not start before the right primer. "
                f"Left primer span: {left_primer.span}, "
                f"Right primer span: {right_primer.span}"
            )

        # Require that the left primer starts before the right primer
        if right_primer.span.end < left_primer.span.end:
            raise ValueError(
                "Right primer ends before left primer ends. "
                f"Left primer span: {left_primer.span}, "
                f"Right primer span: {right_primer.span}"
            )

        return Span(left_primer.span.refname, left_primer.span.start, right_primer.span.end)

    @staticmethod
    def compare(
        this: "PrimerPair",
        that: "PrimerPair",
        seq_dict: SequenceDictionary,
        by_amplicon: bool = True,
    ) -> int:
        """Compares this primer pair to that primer pair by their span, ordering references using
        the given sequence dictionary.

        Args:
            this: the first primer pair
            that: the second primer pair
            seq_dict: the sequence dictionary used to order references
            by_amplicon: ture to compare using the amplicon property on a primer pair, false to
                compare first using the left primer then the right primer

        Returns:
            -1 if this primer pair is less than the that primer pair, 0 if equal, 1 otherwise
        """
        if by_amplicon:
            return Span.compare(this=this.amplicon, that=that.amplicon, seq_dict=seq_dict)
        else:
            retval = Oligo.compare(this=this.left_primer, that=that.left_primer, seq_dict=seq_dict)
            if retval == 0:
                retval = Oligo.compare(
                    this=this.right_primer, that=that.right_primer, seq_dict=seq_dict
                )
            return retval
Attributes
amplicon cached property
amplicon: Span

Returns the mapping for the amplicon

bases property
bases: Optional[str]

Returns the bases of the amplicon sequence

inner property
inner: Span

Returns the inner region of the amplicon (not including the primers). I.e. the region of the genome covered by the primer pair, without the primer regions. If the primers overlap, then the inner mapping is the midpoint at where they overlap

length property
length: int

Returns the length of the amplicon

span property
span: Span

Returns the mapping for the amplicon

Functions
__iter__
__iter__() -> Iterator[Oligo]

Returns an iterator of left and right primers

Source code in prymer/api/primer_pair.py
def __iter__(self) -> Iterator[Oligo]:
    """Returns an iterator of left and right primers"""
    return iter([self.left_primer, self.right_primer])
__str__
__str__() -> str

Returns a string representation of the primer pair

Source code in prymer/api/primer_pair.py
def __str__(self) -> str:
    """Returns a string representation of the primer pair"""
    sequence = self.amplicon_sequence if self.amplicon_sequence else MISSING_BASES_STRING
    return (
        f"{self.left_primer}\t{self.right_primer}\t{sequence}\t"
        + f"{self.amplicon_tm}\t{self.penalty}"
    )
calculate_amplicon_span staticmethod
calculate_amplicon_span(
    left_primer: Oligo, right_primer: Oligo
) -> Span

Calculates the amplicon Span from the left and right primers.

Parameters:

Name Type Description Default
left_primer Oligo

the left primer for the amplicon

required
right_primer Oligo

the right primer for the amplicon

required

Returns:

Type Description
Span

a Span starting at the first base of the left primer and ending at the last base of

Span

the right primer

Raises:

Type Description
ValueError

If left_primer and right_primer have different reference names.

ValueError

If left_primer doesn't start before the right primer.

ValueError

If right_primer ends before left_primer.

Source code in prymer/api/primer_pair.py
@staticmethod
def calculate_amplicon_span(left_primer: Oligo, right_primer: Oligo) -> Span:
    """
    Calculates the amplicon Span from the left and right primers.

    Args:
        left_primer: the left primer for the amplicon
        right_primer: the right primer for the amplicon

    Returns:
        a Span starting at the first base of the left primer and ending at the last base of
        the right primer

    Raises:
        ValueError: If `left_primer` and `right_primer` have different reference names.
        ValueError: If `left_primer` doesn't start before the right primer.
        ValueError: If `right_primer` ends before `left_primer`.
    """
    # Require that `left_primer` and `right_primer` both map to the same reference sequence
    if left_primer.span.refname != right_primer.span.refname:
        raise ValueError(
            "Left and right primers are on different references. "
            f"Left primer ref: {left_primer.span.refname}. "
            f"Right primer ref: {right_primer.span.refname}"
        )

    # Require that the left primer starts before the right primer
    if left_primer.span.start > right_primer.span.start:
        raise ValueError(
            "Left primer does not start before the right primer. "
            f"Left primer span: {left_primer.span}, "
            f"Right primer span: {right_primer.span}"
        )

    # Require that the left primer starts before the right primer
    if right_primer.span.end < left_primer.span.end:
        raise ValueError(
            "Right primer ends before left primer ends. "
            f"Left primer span: {left_primer.span}, "
            f"Right primer span: {right_primer.span}"
        )

    return Span(left_primer.span.refname, left_primer.span.start, right_primer.span.end)
compare staticmethod
compare(
    this: PrimerPair,
    that: PrimerPair,
    seq_dict: SequenceDictionary,
    by_amplicon: bool = True,
) -> int

Compares this primer pair to that primer pair by their span, ordering references using the given sequence dictionary.

Parameters:

Name Type Description Default
this PrimerPair

the first primer pair

required
that PrimerPair

the second primer pair

required
seq_dict SequenceDictionary

the sequence dictionary used to order references

required
by_amplicon bool

ture to compare using the amplicon property on a primer pair, false to compare first using the left primer then the right primer

True

Returns:

Type Description
int

-1 if this primer pair is less than the that primer pair, 0 if equal, 1 otherwise

Source code in prymer/api/primer_pair.py
@staticmethod
def compare(
    this: "PrimerPair",
    that: "PrimerPair",
    seq_dict: SequenceDictionary,
    by_amplicon: bool = True,
) -> int:
    """Compares this primer pair to that primer pair by their span, ordering references using
    the given sequence dictionary.

    Args:
        this: the first primer pair
        that: the second primer pair
        seq_dict: the sequence dictionary used to order references
        by_amplicon: ture to compare using the amplicon property on a primer pair, false to
            compare first using the left primer then the right primer

    Returns:
        -1 if this primer pair is less than the that primer pair, 0 if equal, 1 otherwise
    """
    if by_amplicon:
        return Span.compare(this=this.amplicon, that=that.amplicon, seq_dict=seq_dict)
    else:
        retval = Oligo.compare(this=this.left_primer, that=that.left_primer, seq_dict=seq_dict)
        if retval == 0:
            retval = Oligo.compare(
                this=this.right_primer, that=that.right_primer, seq_dict=seq_dict
            )
        return retval
to_bed12_row
to_bed12_row() -> str

Returns the BED detail format view: https://genome.ucsc.edu/FAQ/FAQformat.html#format1.7.

NB: BED is 0-based and Prymer is 1-based, so we need to convert

Source code in prymer/api/primer_pair.py
def to_bed12_row(self) -> str:
    """
    Returns the BED detail format view: https://genome.ucsc.edu/FAQ/FAQformat.html#format1.7.

    NB: BED is 0-based and Prymer is 1-based, so we need to convert
    """
    block_sizes = ",".join(
        [
            f"{gm.length}"
            for gm in [
                self.left_primer.span,
                self.inner,
                self.right_primer.span,
            ]
        ]
    )

    block_starts = ",".join(
        [
            f"{self.amplicon.get_offset(gm.start)}"
            for gm in [
                self.left_primer.span,
                self.inner,
                self.right_primer.span,
            ]
        ],
    )
    bed_like_coords = self.span.get_bedlike_coords()
    return "\t".join(
        map(
            str,
            [
                self.span.refname,  # contig
                bed_like_coords.start,  # start
                bed_like_coords.end,  # end
                self.id,  # name
                500,  # score
                self.span.strand.value,  # strand
                bed_like_coords.start,  # thick start
                bed_like_coords.end,  # thick end
                "100,100,100",  # color
                3,  # block count
                block_sizes,
                block_starts,  # relative to `start`
            ],
        )
    )
with_names
with_names(
    pp_name: str, lp_name: str, rp_name: str
) -> PrimerPair

Returns a copy of the primer pair with names assigned to the primer pair, left primer, and right primer.

Parameters:

Name Type Description Default
pp_name str

The optional name of the primer pair

required
lp_name str

The optional name of the left primer

required
rp_name str

The optional name of the right primer

required

Returns:

Type Description
PrimerPair

A copy of the primer pair with the provided names assigned

Source code in prymer/api/primer_pair.py
def with_names(self, pp_name: str, lp_name: str, rp_name: str) -> "PrimerPair":
    """
    Returns a copy of the primer pair with names assigned to the primer pair,
    left primer, and right primer.

    Args:
        pp_name: The optional name of the primer pair
        lp_name: The optional name of the left primer
        rp_name: The optional name of the right primer

    Returns:
        A copy of the primer pair with the provided names assigned
    """
    return replace(
        self,
        name=pp_name,
        left_primer=self.left_primer.with_name(lp_name),
        right_primer=self.right_primer.with_name(rp_name),
    )
with_tails
with_tails(left_tail: str, right_tail: str) -> PrimerPair

Returns a copy of the primer pair where the left and right primers are tailed.

Parameters:

Name Type Description Default
left_tail str

The tail to add to the left primer

required
right_tail str

The tail to add to the right primer

required

Returns:

Type Description
PrimerPair

A copy of the primer pair with the tail(s) added to the primers

Source code in prymer/api/primer_pair.py
def with_tails(self, left_tail: str, right_tail: str) -> "PrimerPair":
    """
    Returns a copy of the primer pair where the left and right primers are tailed.

    Args:
        left_tail: The tail to add to the left primer
        right_tail: The tail to add to the right primer

    Returns:
        A copy of the primer pair with the tail(s) added to the primers
    """
    return replace(
        self,
        left_primer=self.left_primer.with_tail(left_tail),
        right_primer=self.right_primer.with_tail(right_tail),
    )

span

Span Classes and Methods

The Span class and associated methods represent positional information from a span of some sequence (like a primer or an amplicon) over a reference sequence (e.g. genome, target, amplicon).

Class attributes include refname, start, end, and strand.

The Span class uses 1-based, closed intervals for start and end positions. The get_bedlike_coords() method may be used to conver a Span to zero-based open-ended coordinates.

Examples of interacting with the Span class
>>> span_string = "chr1:1-10:+"
>>> span = Span.from_string(span_string)
>>> span
Span(refname='chr1', start=1, end=10, strand=<Strand.POSITIVE: '+'>)
>>> print(span.start)
1
>>> # get 0-based position within span at position 10
>>> span.get_offset(position=10)
9
>>> # create a new subspan derived from the source map
>>> new_subspan = span.get_subspan(offset=5, subspan_length=5)
>>> new_subspan
Span(refname='chr1', start=6, end=10, strand=<Strand.POSITIVE: '+'>)
>>> print(span.length)
10

A Span can be compared to another Span to test for overlap, return the length of overlap, and test if one span fully contains the other:

>>> span1 = Span(refname="chr1", start=50, end=100)
>>> span2 = Span(refname="chr1", start=75, end=90)
>>> span1.overlaps(span2)
True
>>> span2.overlaps(span1)
True
>>> span1.length_of_overlap_with(span2)
16
>>> span1.length_of_overlap_with(Span(refname="chr1", start=75, end=125))
26
>>> span1.overlaps(Span(refname="chr1", start=200, end=225))
False
>>> span1.contains(Span(refname="chr1", start=75, end=125))
False
>>> span1.contains(span2)
True

In some cases, it's useful to have the coordinates be converted to zero-based open-ended, for example with use with the pysam module when using the fetch() methods for pysam.AlignmentFile, pysam.FastaFile, and pysam.VariantFile.

>>> span.get_bedlike_coords()
BedLikeCoords(start=0, end=10)

Classes

BedLikeCoords dataclass

Represents the coordinates (only, no refname or strand) that correspond to a zero-based open-ended position.

Source code in prymer/api/span.py
@dataclass(init=True, frozen=True)
class BedLikeCoords:
    """Represents the coordinates (only, no refname or strand) that correspond to a
    zero-based open-ended position."""

    start: int
    end: int

    def __post_init__(self) -> None:
        if self.start < 0:
            raise ValueError(f"Start position must be >=0; received start={self.start}")
        if self.end < self.start:
            raise ValueError(f"End must be >= start; received start={self.start}, end={self.end}")
Span dataclass

Bases: Metric['Span']

Represents the span of some sequence (target, primer, amplicon, etc.) to a genome.

Attributes:

Name Type Description
refname str

name of a reference sequence or contig or chromosome

start int

the 1-based start position of the Span (inclusive)

end int

the 1-based end position of the Span (inclusive)

strand Strand

the strand of the Span (POSITIVE by default)

Source code in prymer/api/span.py
@dataclass(init=True, frozen=True, eq=True)
class Span(Metric["Span"]):
    """Represents the span of some sequence (target, primer, amplicon, etc.) to a genome.

    Attributes:
        refname: name of a reference sequence or contig or chromosome
        start: the 1-based start position of the Span (inclusive)
        end: the 1-based end position of the Span (inclusive)
        strand: the strand of the Span (POSITIVE by default)
    """

    refname: str
    start: int
    end: int
    strand: Strand = Strand.POSITIVE

    def __post_init__(self) -> None:
        if self.refname.strip() == "":
            raise ValueError("Ref name must be populated")
        if self.start < 1:
            raise ValueError(f"Start position must be >=1; received start={self.start}")
        if self.end < self.start:
            raise ValueError(f"End must be >= start; received start={self.start}, end={self.end}")

    @classmethod
    def from_string(cls, line: str) -> "Span":
        """Creates a Span from an input string.

        The input string should be delimited by a colon (":"). If strand information is missing
        after splitting the string, the strand is assumed to be positive.

        Args:
            line: input string

        Returns:
            Span: the Span object generated from the input string

        Raises:
            ValueError: if there are not at least 2 colon-delimited fields in string

        Example:
            >>> span_string = "chr1:1-10:+"
            >>> Span.from_string(span_string)
            Span(refname='chr1', start=1, end=10, strand=<Strand.POSITIVE: '+'>)
        """
        parts = line.strip().split(":")
        if len(parts) == 3:
            refname, range_, strand_symbol = parts
            try:
                strand = Strand(strand_symbol[0])
            except ValueError as e:
                raise ValueError(
                    "Did not find valid strand information; " f"received {strand_symbol}"
                ) from e
        elif len(parts) == 2:
            refname, range_ = parts
            strand = Strand.POSITIVE
        else:
            raise ValueError(
                f"Could not parse line (expected 2 or 3 colon-delimited fields), received {line}"
            )
        try:
            start, end = map(int, range_.split("-"))
        except ValueError as e:
            raise ValueError(
                f"Could not cast positional information to int; received {range_}"
            ) from e
        return Span(refname=refname, start=start, end=end, strand=strand)

    def __str__(self) -> str:
        return f"{self.refname}:{self.start}-{self.end}:{self.strand}"

    @property
    def length(self) -> int:
        """Get the length of the span/interval."""
        return self.end - self.start + 1

    def overlaps(self, other: "Span") -> bool:
        """Returns True if the spans overlap one another, False otherwise."""
        return (
            (self.refname == other.refname)
            and (self.start <= other.end)
            and (self.end >= other.start)
        )

    def length_of_overlap_with(self, other: "Span") -> int:
        """Returns the length of the region which overlaps the other span, or zero if there is
        no overlap"""
        overlap: int = 0
        if self.overlaps(other=other):
            start = max(self.start, other.start)
            end = min(self.end, other.end)
            overlap = end - start + 1
        return overlap

    def get_offset(self, position: int) -> int:
        """Returns a coordinate position that is relative to the current span.

        Args:
            position: the (genomic) position to provide relative coordinates for (1-based)

        Returns:
            A 0-based position relative to the Span

        Raises:
            ValueError: if the provided position is outside the coordinates of the Span

        Example:

        ```python
        >>> test_span = Span(refname="chr1", start=10, end=20, strand=Strand.POSITIVE)
        >>> print(test_span.get_offset(11))
        1

        ```
        """
        if position < self.start or position > self.end:
            raise ValueError(f"Position not within this span: {position}")
        return position - self.start

    def get_bedlike_coords(self) -> BedLikeCoords:
        """Returns the zero-based, open-ended coordinates (start and end) of the
         (1-based) Span, for using in bed-like formats.

         Returns:
             a BedLikeCoords instance containing the 0-based and open-ended coordinates of
             the Span

         Example:

        ```python
        >>> test_span = Span(refname="chr1", start=10, end=20, strand=Strand.POSITIVE)
        >>> print(test_span.get_bedlike_coords())
        BedLikeCoords(start=9, end=20)

        ```
        """
        return BedLikeCoords(self.start - 1, self.end)

    def get_subspan(
        self, offset: int, subspan_length: int, strand: Optional[Strand] = None
    ) -> "Span":
        """Returns a Span with absolute coords from an offset and a length.
        The strand of the new Span will be strand if given, otherwise it will be
        self.strand.

        Args:
            offset: the difference between the start position of the subspan and that
                of the current span (0-based)
            subspan_length: the length of the new span
            strand: the strand of the new span (if given)

        Returns:
            a new Span with objective coords derived from input offset and length

        Raises:
            ValueError: if the offset is less than 0
            ValueError: if the length of the subspan is 0 or less
            ValueError: if the offset is greater than the length of the source span

        Example:

        ```python
        >>> span = Span(refname="chr1", start=1000,end=2000, strand=Strand.POSITIVE)
        >>> test = span.get_subspan(offset=5, subspan_length=10)
        >>> print(test)
        chr1:1005-1014:+
        >>> print(test.length) #length = end - start + 1
        10

        ```
        """

        if offset < 0:
            raise ValueError(f"Offset must be > 0, received start={offset}")
        if subspan_length <= 0:
            raise ValueError(
                f"Length of a subspan must be positive, received length={subspan_length}"
            )
        if offset >= self.length:
            raise ValueError(
                "Offset of a relative subspan must be < source span length, "
                f"received offset={offset}, length={subspan_length}"
            )
        if offset + subspan_length > self.length:
            raise ValueError(
                f"End of sub-span must be within source span: source start={self.start}, "
                f"offset={offset}, sub-span length={subspan_length}"
            )

        absolute_start = self.start + offset
        absolute_end = coordmath.get_closed_end(absolute_start, subspan_length)
        strand = self.strand if strand is None else strand
        return replace(self, start=absolute_start, end=absolute_end, strand=strand)

    def contains(self, comparison: Self) -> bool:
        """Checks whether one span is wholly contained within another span.
        Returns `True` if one span contains the other, otherwise returns `False`.
        Does not use strand information (a span is considered to contain the other
        if they are adjacent in position but on opposite strands)."""
        return (
            self.refname == comparison.refname
            and self.start <= comparison.start
            and comparison.end <= self.end
        )

    def _to_tuple(self, seq_dict: SequenceDictionary) -> Tuple[int, int, int, int]:
        """Returns a tuple of reference index, start position, end position, and strand
        (0 forward, 1 reverse)"""
        ref_index = seq_dict.by_name(self.refname).index
        strand = 0 if self.strand == Strand.POSITIVE else 1
        return ref_index, self.start, self.end, strand

    @staticmethod
    def compare(this: "Span", that: "Span", seq_dict: SequenceDictionary) -> int:
        """Compares this span to that span, ordering references using the given sequence dictionary.

        Args:
            this: the first span
            that: the second span
            seq_dict: the sequence dictionary used to order references

        Returns:
            -1 if the first span is less than the second span, 0 if equal, 1 otherwise
        """
        left_tuple = this._to_tuple(seq_dict=seq_dict)
        right_tuple = that._to_tuple(seq_dict=seq_dict)
        if left_tuple < right_tuple:
            return -1
        elif left_tuple > right_tuple:
            return 1
        else:
            return 0
Attributes
length property
length: int

Get the length of the span/interval.

Functions
compare staticmethod
compare(
    this: Span, that: Span, seq_dict: SequenceDictionary
) -> int

Compares this span to that span, ordering references using the given sequence dictionary.

Parameters:

Name Type Description Default
this Span

the first span

required
that Span

the second span

required
seq_dict SequenceDictionary

the sequence dictionary used to order references

required

Returns:

Type Description
int

-1 if the first span is less than the second span, 0 if equal, 1 otherwise

Source code in prymer/api/span.py
@staticmethod
def compare(this: "Span", that: "Span", seq_dict: SequenceDictionary) -> int:
    """Compares this span to that span, ordering references using the given sequence dictionary.

    Args:
        this: the first span
        that: the second span
        seq_dict: the sequence dictionary used to order references

    Returns:
        -1 if the first span is less than the second span, 0 if equal, 1 otherwise
    """
    left_tuple = this._to_tuple(seq_dict=seq_dict)
    right_tuple = that._to_tuple(seq_dict=seq_dict)
    if left_tuple < right_tuple:
        return -1
    elif left_tuple > right_tuple:
        return 1
    else:
        return 0
contains
contains(comparison: Self) -> bool

Checks whether one span is wholly contained within another span. Returns True if one span contains the other, otherwise returns False. Does not use strand information (a span is considered to contain the other if they are adjacent in position but on opposite strands).

Source code in prymer/api/span.py
def contains(self, comparison: Self) -> bool:
    """Checks whether one span is wholly contained within another span.
    Returns `True` if one span contains the other, otherwise returns `False`.
    Does not use strand information (a span is considered to contain the other
    if they are adjacent in position but on opposite strands)."""
    return (
        self.refname == comparison.refname
        and self.start <= comparison.start
        and comparison.end <= self.end
    )
from_string classmethod
from_string(line: str) -> Span

Creates a Span from an input string.

The input string should be delimited by a colon (":"). If strand information is missing after splitting the string, the strand is assumed to be positive.

Parameters:

Name Type Description Default
line str

input string

required

Returns:

Name Type Description
Span Span

the Span object generated from the input string

Raises:

Type Description
ValueError

if there are not at least 2 colon-delimited fields in string

Example

span_string = "chr1:1-10:+" Span.from_string(span_string) Span(refname='chr1', start=1, end=10, strand=)

Source code in prymer/api/span.py
@classmethod
def from_string(cls, line: str) -> "Span":
    """Creates a Span from an input string.

    The input string should be delimited by a colon (":"). If strand information is missing
    after splitting the string, the strand is assumed to be positive.

    Args:
        line: input string

    Returns:
        Span: the Span object generated from the input string

    Raises:
        ValueError: if there are not at least 2 colon-delimited fields in string

    Example:
        >>> span_string = "chr1:1-10:+"
        >>> Span.from_string(span_string)
        Span(refname='chr1', start=1, end=10, strand=<Strand.POSITIVE: '+'>)
    """
    parts = line.strip().split(":")
    if len(parts) == 3:
        refname, range_, strand_symbol = parts
        try:
            strand = Strand(strand_symbol[0])
        except ValueError as e:
            raise ValueError(
                "Did not find valid strand information; " f"received {strand_symbol}"
            ) from e
    elif len(parts) == 2:
        refname, range_ = parts
        strand = Strand.POSITIVE
    else:
        raise ValueError(
            f"Could not parse line (expected 2 or 3 colon-delimited fields), received {line}"
        )
    try:
        start, end = map(int, range_.split("-"))
    except ValueError as e:
        raise ValueError(
            f"Could not cast positional information to int; received {range_}"
        ) from e
    return Span(refname=refname, start=start, end=end, strand=strand)
get_bedlike_coords
get_bedlike_coords() -> BedLikeCoords

Returns the zero-based, open-ended coordinates (start and end) of the (1-based) Span, for using in bed-like formats.

Returns: a BedLikeCoords instance containing the 0-based and open-ended coordinates of the Span

Example:

>>> test_span = Span(refname="chr1", start=10, end=20, strand=Strand.POSITIVE)
>>> print(test_span.get_bedlike_coords())
BedLikeCoords(start=9, end=20)
Source code in prymer/api/span.py
def get_bedlike_coords(self) -> BedLikeCoords:
    """Returns the zero-based, open-ended coordinates (start and end) of the
     (1-based) Span, for using in bed-like formats.

     Returns:
         a BedLikeCoords instance containing the 0-based and open-ended coordinates of
         the Span

     Example:

    ```python
    >>> test_span = Span(refname="chr1", start=10, end=20, strand=Strand.POSITIVE)
    >>> print(test_span.get_bedlike_coords())
    BedLikeCoords(start=9, end=20)

    ```
    """
    return BedLikeCoords(self.start - 1, self.end)
get_offset
get_offset(position: int) -> int

Returns a coordinate position that is relative to the current span.

Parameters:

Name Type Description Default
position int

the (genomic) position to provide relative coordinates for (1-based)

required

Returns:

Type Description
int

A 0-based position relative to the Span

Raises:

Type Description
ValueError

if the provided position is outside the coordinates of the Span

Example:

>>> test_span = Span(refname="chr1", start=10, end=20, strand=Strand.POSITIVE)
>>> print(test_span.get_offset(11))
1
Source code in prymer/api/span.py
def get_offset(self, position: int) -> int:
    """Returns a coordinate position that is relative to the current span.

    Args:
        position: the (genomic) position to provide relative coordinates for (1-based)

    Returns:
        A 0-based position relative to the Span

    Raises:
        ValueError: if the provided position is outside the coordinates of the Span

    Example:

    ```python
    >>> test_span = Span(refname="chr1", start=10, end=20, strand=Strand.POSITIVE)
    >>> print(test_span.get_offset(11))
    1

    ```
    """
    if position < self.start or position > self.end:
        raise ValueError(f"Position not within this span: {position}")
    return position - self.start
get_subspan
get_subspan(
    offset: int,
    subspan_length: int,
    strand: Optional[Strand] = None,
) -> Span

Returns a Span with absolute coords from an offset and a length. The strand of the new Span will be strand if given, otherwise it will be self.strand.

Parameters:

Name Type Description Default
offset int

the difference between the start position of the subspan and that of the current span (0-based)

required
subspan_length int

the length of the new span

required
strand Optional[Strand]

the strand of the new span (if given)

None

Returns:

Type Description
Span

a new Span with objective coords derived from input offset and length

Raises:

Type Description
ValueError

if the offset is less than 0

ValueError

if the length of the subspan is 0 or less

ValueError

if the offset is greater than the length of the source span

Example:

>>> span = Span(refname="chr1", start=1000,end=2000, strand=Strand.POSITIVE)
>>> test = span.get_subspan(offset=5, subspan_length=10)
>>> print(test)
chr1:1005-1014:+
>>> print(test.length) #length = end - start + 1
10
Source code in prymer/api/span.py
def get_subspan(
    self, offset: int, subspan_length: int, strand: Optional[Strand] = None
) -> "Span":
    """Returns a Span with absolute coords from an offset and a length.
    The strand of the new Span will be strand if given, otherwise it will be
    self.strand.

    Args:
        offset: the difference between the start position of the subspan and that
            of the current span (0-based)
        subspan_length: the length of the new span
        strand: the strand of the new span (if given)

    Returns:
        a new Span with objective coords derived from input offset and length

    Raises:
        ValueError: if the offset is less than 0
        ValueError: if the length of the subspan is 0 or less
        ValueError: if the offset is greater than the length of the source span

    Example:

    ```python
    >>> span = Span(refname="chr1", start=1000,end=2000, strand=Strand.POSITIVE)
    >>> test = span.get_subspan(offset=5, subspan_length=10)
    >>> print(test)
    chr1:1005-1014:+
    >>> print(test.length) #length = end - start + 1
    10

    ```
    """

    if offset < 0:
        raise ValueError(f"Offset must be > 0, received start={offset}")
    if subspan_length <= 0:
        raise ValueError(
            f"Length of a subspan must be positive, received length={subspan_length}"
        )
    if offset >= self.length:
        raise ValueError(
            "Offset of a relative subspan must be < source span length, "
            f"received offset={offset}, length={subspan_length}"
        )
    if offset + subspan_length > self.length:
        raise ValueError(
            f"End of sub-span must be within source span: source start={self.start}, "
            f"offset={offset}, sub-span length={subspan_length}"
        )

    absolute_start = self.start + offset
    absolute_end = coordmath.get_closed_end(absolute_start, subspan_length)
    strand = self.strand if strand is None else strand
    return replace(self, start=absolute_start, end=absolute_end, strand=strand)
length_of_overlap_with
length_of_overlap_with(other: Span) -> int

Returns the length of the region which overlaps the other span, or zero if there is no overlap

Source code in prymer/api/span.py
def length_of_overlap_with(self, other: "Span") -> int:
    """Returns the length of the region which overlaps the other span, or zero if there is
    no overlap"""
    overlap: int = 0
    if self.overlaps(other=other):
        start = max(self.start, other.start)
        end = min(self.end, other.end)
        overlap = end - start + 1
    return overlap
overlaps
overlaps(other: Span) -> bool

Returns True if the spans overlap one another, False otherwise.

Source code in prymer/api/span.py
def overlaps(self, other: "Span") -> bool:
    """Returns True if the spans overlap one another, False otherwise."""
    return (
        (self.refname == other.refname)
        and (self.start <= other.end)
        and (self.end >= other.start)
    )
Strand

Bases: StrEnum

Represents the strand of a span to the genome.

Source code in prymer/api/span.py
@unique
class Strand(StrEnum):
    """Represents the strand of a span to the genome."""

    POSITIVE = "+"
    NEGATIVE = "-"

Modules

variant_lookup

Variant Lookup Class and Methods

This module contains the abstract class VariantLookup to facilitate retrieval of variants that overlap a given genomic coordinate range. Concrete implementations must implement the query() method for retrieving variants that overlap the given range.

Two concrete implementations are provided that both take a list of VCF files to be queried:

Each class can also use minor allele frequency (MAF) to filter variants.

The helper class SimpleVariant is included to facilitate VCF querying and reporting out results.

Examples
>>> from pathlib import Path
>>> lookup = cached(vcf_paths=[Path("./tests/api/data/miniref.variants.vcf.gz")], min_maf=0.00, include_missing_mafs=True)
>>> lookup.query(refname="chr2", start=7999, end=8000)
[SimpleVariant(id='complex-variant-sv-1/1', refname='chr2', pos=8000, ref='T', alt='<DEL>', end=8000, variant_type=<VariantType.OTHER: 'OTHER'>, maf=None)]
>>> variants = lookup.query(refname="chr2", start=7999, end=9900)
>>> len(variants)
14
>>> for v in variants:     print(v)
complex-variant-sv-1/1@chr2:8000[T/<DEL> NA]
rare-dbsnp-snp1-1/1@chr2:9000[A/C 0.0010]
common-dbsnp-snp1-1/1@chr2:9010[C/T 0.0100]
common-dbsnp-snp2-1/1@chr2:9020[A/C 0.0200]
rare-ac-an-snp-1/1@chr2:9030[G/A 0.0010]
rare-af-snp-1/1@chr2:9040[C/A 0.0005]
common-ac-an-snp-1/1@chr2:9050[C/G 0.0470]
common-af-snp-1/1@chr2:9060[T/C 0.0400]
common-multiallelic-1/2@chr2:9070[C/A 0.0525]
common-multiallelic-2/2@chr2:9070[C/T 0.0525]
common-insertion-1/1@chr2:9080[A/ACGT 0.0400]
common-deletion-1/1@chr2:9090[CTA/C 0.0400]
common-mixed-1/2@chr2:9100[CA/GG 0.1200]
common-mixed-2/2@chr2:9101[A/ACACA 0.1200]
>>> lookup.query(refname="ch12", start=7999, end=9900)
[]

Classes

FileBasedVariantLookup

Bases: ContextManager, VariantLookup

Implementation of VariantLookup that queries against indexed VCF files each time a query is performed. Assumes the index is located adjacent to the VCF file and has the same base name with either a .csi or .tbi suffix.

Example:

>>> with FileBasedVariantLookup([Path("./tests/api/data/miniref.variants.vcf.gz")], min_maf=0.0, include_missing_mafs=False) as lookup:
...     lookup.query(refname="chr2", start=7999, end=8000)
[SimpleVariant(id='complex-variant-sv-1/1', refname='chr2', pos=8000, ref='T', alt='<DEL>', end=8000, variant_type=<VariantType.OTHER: 'OTHER'>, maf=None)]
Source code in prymer/api/variant_lookup.py
class FileBasedVariantLookup(ContextManager, VariantLookup):
    """Implementation of `VariantLookup` that queries against indexed VCF files each time a query is
    performed. Assumes the index is located adjacent to the VCF file and has the same base name with
    either a .csi or .tbi suffix.

    Example:

    ```python
    >>> with FileBasedVariantLookup([Path("./tests/api/data/miniref.variants.vcf.gz")], min_maf=0.0, include_missing_mafs=False) as lookup:
    ...     lookup.query(refname="chr2", start=7999, end=8000)
    [SimpleVariant(id='complex-variant-sv-1/1', refname='chr2', pos=8000, ref='T', alt='<DEL>', end=8000, variant_type=<VariantType.OTHER: 'OTHER'>, maf=None)]

    ```
    """  # noqa: E501

    def __init__(self, vcf_paths: list[Path], min_maf: Optional[float], include_missing_mafs: bool):
        self._readers: list[VariantFile] = []
        super().__init__(
            vcf_paths=vcf_paths, min_maf=min_maf, include_missing_mafs=include_missing_mafs
        )
        if len(vcf_paths) == 0:
            raise ValueError("No VCF paths given to query.")
        for path in vcf_paths:
            if (
                not path.with_suffix(path.suffix + ".csi").is_file()
                and not path.with_suffix(path.suffix + ".tbi").is_file()
            ):
                raise ValueError(f"Cannot perform fetch with missing index file for VCF: {path}")
            open_fh = pysam.VariantFile(str(path))
            self._readers.append(open_fh)

    def __enter__(self) -> "FileBasedVariantLookup":
        """Enter the context manager."""
        return self

    def __exit__(
        self,
        exc_type: Optional[type[BaseException]],
        exc_value: Optional[BaseException],
        traceback: Optional[TracebackType],
    ) -> None:
        """Exit this context manager while closing the underlying VCF handles."""
        self.close()
        return None

    def _query(self, refname: str, start: int, end: int) -> list[SimpleVariant]:
        """Queries variants from the VCFs used by this lookup and returns a `SimpleVariant`."""
        simple_variants: list[SimpleVariant] = []
        for fh, path in zip(self._readers, self.vcf_paths, strict=True):
            if not fh.header.contigs.get(refname):
                _logger.debug(f"Header in VCF file {path} does not contain chromosome {refname}.")
                continue
            #  pysam.fetch is 0-based, half-open
            variants = [variant for variant in fh.fetch(contig=refname, start=start - 1, end=end)]
            simple_variants.extend(self.to_variants(variants, source_vcf=path))
        return sorted(simple_variants, key=lambda x: x.pos)

    def close(self) -> None:
        """Close the underlying VCF file handles."""
        for handle in self._readers:
            handle.close()
Functions
__enter__
__enter__() -> FileBasedVariantLookup

Enter the context manager.

Source code in prymer/api/variant_lookup.py
def __enter__(self) -> "FileBasedVariantLookup":
    """Enter the context manager."""
    return self
__exit__
__exit__(
    exc_type: Optional[type[BaseException]],
    exc_value: Optional[BaseException],
    traceback: Optional[TracebackType],
) -> None

Exit this context manager while closing the underlying VCF handles.

Source code in prymer/api/variant_lookup.py
def __exit__(
    self,
    exc_type: Optional[type[BaseException]],
    exc_value: Optional[BaseException],
    traceback: Optional[TracebackType],
) -> None:
    """Exit this context manager while closing the underlying VCF handles."""
    self.close()
    return None
close
close() -> None

Close the underlying VCF file handles.

Source code in prymer/api/variant_lookup.py
def close(self) -> None:
    """Close the underlying VCF file handles."""
    for handle in self._readers:
        handle.close()
SimpleVariant dataclass

Represents a variant from a given genomic range.

The reference and alternate alleles typically match the VCF from which the variant originated, differing when the reference and alternate alleles share common leading bases. For example, given a reference allele of CACACA and alternate allele CACA, the reference and alternate alleles have their common leading bases removed except for a single "anchor base" (i.e. CAC is removed), yielding the reference allele ACA and alternate allele A. This will also change the reference variant position (pos property), which is defined as the position of the first base in the reference allele.

The variant_type is derived from the reference and alternate alleles after the above (see VariantType.from_alleles()).

Furthermore, the variant end position (end property) is defined for insertions as the base after the insertion. For all other variant types, this is the last base in the reference allele.

Attributes:

Name Type Description
id str

the variant identifier

refname str

the reference sequence name

pos int

the start position of the variant (1-based inclusive)

end int

the end position of the variant (1-based inclusive)

ref str

the reference base

alt str

the alternate base

variant_type VariantType

the variant type

maf Optional[float]

optionally, the minor allele frequency

Source code in prymer/api/variant_lookup.py
@dataclass(slots=True, frozen=True, init=True)
class SimpleVariant:
    """Represents a variant from a given genomic range.

    The reference and alternate alleles typically match the VCF from which the variant originated,
    differing when the reference and alternate alleles share common leading bases.  For example,
    given a reference allele of `CACACA` and alternate allele `CACA`, the reference and alternate
    alleles have their common leading bases removed except for a single "anchor base" (i.e. `CAC` is
    removed), yielding the reference allele `ACA` and alternate allele `A`.  This will also change
    the reference variant position (`pos` property), which is defined as the position of the first
    base in the reference allele.

    The `variant_type` is derived from the reference and alternate alleles after the above (see
    [`VariantType.from_alleles()`][prymer.api.variant_lookup.VariantType.from_alleles]).

    Furthermore, the variant end position (`end` property) is defined for insertions as the base
    after the insertion.  For all other variant types, this is the last base in the reference
    allele.

    Attributes:
        id: the variant identifier
        refname: the reference sequence name
        pos: the start position of the variant (1-based inclusive)
        end: the end position of the variant (1-based inclusive)
        ref: the reference base
        alt: the alternate base
        variant_type: the variant type
        maf: optionally, the minor allele frequency
    """

    id: str
    refname: str
    pos: int
    ref: str
    alt: str
    end: int = field(init=False)
    variant_type: VariantType = field(init=False)
    maf: Optional[float] = None

    def __post_init__(self) -> None:
        # Simplify the ref/alt alleles
        # find the leading prefix of the ref and alternate allele
        prefix_length = 0
        for r, a in zip(self.ref, self.alt, strict=False):
            if r != a:
                break
            prefix_length += 1
        if prefix_length > 1:  # > 1 to always keep an anchor base
            offset = prefix_length - 1
            object.__setattr__(self, "ref", self.ref[offset:])
            object.__setattr__(self, "alt", self.alt[offset:])
            object.__setattr__(self, "pos", self.pos + offset)

        # Derive the end and variant_type from the ref and alt alleles.
        object.__setattr__(
            self, "variant_type", VariantType.from_alleles(ref=self.ref, alt=self.alt)
        )

        # span all reference bases (e.g. SNP -> 1bp, MNV -> len(MNV), DEL -> len(DEL),
        end: int = self.pos + len(self.ref) - 1
        if self.variant_type == VariantType.INSERTION:
            # span the base preceding and following the insertion, so add a base at the end
            end += 1
        object.__setattr__(self, "end", end)

    def __str__(self) -> str:
        """Compact String representation of the variant that includes all relevant info."""
        maf_string = f"{self.maf:.4f}" if self.maf is not None else "NA"
        return f"{self.id}@{self.refname}:{self.pos}[{self.ref}/{self.alt} {maf_string}]"

    def to_span(self) -> Span:
        """Creates a Span object that represents the genomic span of this variant.
        Insertions will span the base preceding and following the inserted bases."""
        return Span(refname=self.refname, start=self.pos, end=self.end, strand=Strand.POSITIVE)

    @staticmethod
    def build(variant: VariantRecord) -> list["SimpleVariant"]:
        """Convert `pysam.VariantRecord` to `SimpleVariant`. Only the first ALT allele is used."""
        maf = calc_maf_from_filter(variant)
        simple_variants: list[SimpleVariant] = []

        for i, alt in enumerate(variant.alts, start=1):
            simple_variant = SimpleVariant(
                id=f"{variant.id}-{i}/{len(variant.alts)}",
                refname=variant.chrom,
                pos=variant.pos,
                ref=variant.ref,
                alt=alt,
                maf=maf,
            )
            simple_variants.append(simple_variant)

        return simple_variants
Functions
__str__
__str__() -> str

Compact String representation of the variant that includes all relevant info.

Source code in prymer/api/variant_lookup.py
def __str__(self) -> str:
    """Compact String representation of the variant that includes all relevant info."""
    maf_string = f"{self.maf:.4f}" if self.maf is not None else "NA"
    return f"{self.id}@{self.refname}:{self.pos}[{self.ref}/{self.alt} {maf_string}]"
build staticmethod
build(variant: VariantRecord) -> list[SimpleVariant]

Convert pysam.VariantRecord to SimpleVariant. Only the first ALT allele is used.

Source code in prymer/api/variant_lookup.py
@staticmethod
def build(variant: VariantRecord) -> list["SimpleVariant"]:
    """Convert `pysam.VariantRecord` to `SimpleVariant`. Only the first ALT allele is used."""
    maf = calc_maf_from_filter(variant)
    simple_variants: list[SimpleVariant] = []

    for i, alt in enumerate(variant.alts, start=1):
        simple_variant = SimpleVariant(
            id=f"{variant.id}-{i}/{len(variant.alts)}",
            refname=variant.chrom,
            pos=variant.pos,
            ref=variant.ref,
            alt=alt,
            maf=maf,
        )
        simple_variants.append(simple_variant)

    return simple_variants
to_span
to_span() -> Span

Creates a Span object that represents the genomic span of this variant. Insertions will span the base preceding and following the inserted bases.

Source code in prymer/api/variant_lookup.py
def to_span(self) -> Span:
    """Creates a Span object that represents the genomic span of this variant.
    Insertions will span the base preceding and following the inserted bases."""
    return Span(refname=self.refname, start=self.pos, end=self.end, strand=Strand.POSITIVE)
VariantLookup

Bases: ABC

Base class to represent a variant from a given genomic range.

Attributes:

Name Type Description
vcf_paths list[Path]

the paths to the source VCFs for the variants

min_maf Optional[float]

optionally, return only variants with at least this minor allele frequency

include_missing_mafs bool

when filtering variants with a minor allele frequency, True to include variants with no annotated minor allele frequency, otherwise False. If no minor allele frequency is given, then this parameter does nothing.

Source code in prymer/api/variant_lookup.py
class VariantLookup(ABC):
    """Base class to represent a variant from a given genomic range.

    Attributes:
        vcf_paths: the paths to the source VCFs for the variants
        min_maf: optionally, return only variants with at least this minor allele frequency
        include_missing_mafs: when filtering variants with a minor allele frequency,
            `True` to include variants with no annotated minor allele frequency, otherwise `False`.
            If no minor allele frequency is given, then this parameter does nothing.
    """

    def __init__(
        self,
        vcf_paths: list[Path],
        min_maf: Optional[float],
        include_missing_mafs: bool,
    ) -> None:
        self.vcf_paths: list[Path] = vcf_paths
        self.min_maf: Optional[float] = min_maf
        self.include_missing_mafs: bool = include_missing_mafs

    @final
    def query(
        self,
        refname: str,
        start: int,
        end: int,
        maf: Optional[float] = None,
        include_missing_mafs: bool = None,
    ) -> list[SimpleVariant]:
        """Gets all variants that overlap a genomic range, optionally filter based on MAF threshold.

        Args:
            refname: the reference name
            start: the 1-based start position
            end: the 1-based end position
            maf: the MAF of the variant
            include_missing_mafs: whether to include variants with a missing MAF
                (overrides self.include_missing_mafs)
        """
        if maf is None:
            maf = self.min_maf
        if include_missing_mafs is None:
            include_missing_mafs = self.include_missing_mafs

        variants = self._query(refname=refname, start=start, end=end)
        if len(variants) == 0:
            _logger.debug(f"No variants extracted from region of interest: {refname}:{start}-{end}")
        if maf is None or maf <= 0.0:
            return variants
        elif include_missing_mafs:  # return variants with a MAF above threshold or missing
            return [v for v in variants if (v.maf is None or v.maf >= maf)]
        else:
            return [v for v in variants if v.maf is not None and v.maf >= maf]

    @staticmethod
    def to_variants(
        variants: list[VariantRecord], source_vcf: Optional[Path] = None
    ) -> list[SimpleVariant]:
        """Converts a list of `pysam.VariantRecords` to a list of `SimpleVariants` for ease of use.
        Filters variants based on their FILTER status, and sorts by start position.

        Args:
            variants: the variants to convert
            source_vcf: the optional path to the source VCF, used only for debug messages

        Returns:
            a list of `SimpleVariants`, one per alternate allele per variant.
        """
        simple_vars = []
        for variant in variants:
            if (
                "PASS" in list(variant.filter) or len(list(variant.filter)) == 0
            ):  # if passing or empty filters
                simple_variants = SimpleVariant.build(variant)
                if any(v.variant_type == VariantType.OTHER for v in simple_variants):
                    _logger.debug(
                        f"Input VCF file {source_vcf} may contain complex variants: {variant}"
                    )
                simple_vars.extend(simple_variants)
        return sorted(simple_vars, key=lambda v: (v.pos, v.id))

    @abstractmethod
    def _query(self, refname: str, start: int, end: int) -> list[SimpleVariant]:
        """Subclasses must implement this method."""
Functions
query
query(
    refname: str,
    start: int,
    end: int,
    maf: Optional[float] = None,
    include_missing_mafs: bool = None,
) -> list[SimpleVariant]

Gets all variants that overlap a genomic range, optionally filter based on MAF threshold.

Parameters:

Name Type Description Default
refname str

the reference name

required
start int

the 1-based start position

required
end int

the 1-based end position

required
maf Optional[float]

the MAF of the variant

None
include_missing_mafs bool

whether to include variants with a missing MAF (overrides self.include_missing_mafs)

None
Source code in prymer/api/variant_lookup.py
@final
def query(
    self,
    refname: str,
    start: int,
    end: int,
    maf: Optional[float] = None,
    include_missing_mafs: bool = None,
) -> list[SimpleVariant]:
    """Gets all variants that overlap a genomic range, optionally filter based on MAF threshold.

    Args:
        refname: the reference name
        start: the 1-based start position
        end: the 1-based end position
        maf: the MAF of the variant
        include_missing_mafs: whether to include variants with a missing MAF
            (overrides self.include_missing_mafs)
    """
    if maf is None:
        maf = self.min_maf
    if include_missing_mafs is None:
        include_missing_mafs = self.include_missing_mafs

    variants = self._query(refname=refname, start=start, end=end)
    if len(variants) == 0:
        _logger.debug(f"No variants extracted from region of interest: {refname}:{start}-{end}")
    if maf is None or maf <= 0.0:
        return variants
    elif include_missing_mafs:  # return variants with a MAF above threshold or missing
        return [v for v in variants if (v.maf is None or v.maf >= maf)]
    else:
        return [v for v in variants if v.maf is not None and v.maf >= maf]
to_variants staticmethod
to_variants(
    variants: list[VariantRecord],
    source_vcf: Optional[Path] = None,
) -> list[SimpleVariant]

Converts a list of pysam.VariantRecords to a list of SimpleVariants for ease of use. Filters variants based on their FILTER status, and sorts by start position.

Parameters:

Name Type Description Default
variants list[VariantRecord]

the variants to convert

required
source_vcf Optional[Path]

the optional path to the source VCF, used only for debug messages

None

Returns:

Type Description
list[SimpleVariant]

a list of SimpleVariants, one per alternate allele per variant.

Source code in prymer/api/variant_lookup.py
@staticmethod
def to_variants(
    variants: list[VariantRecord], source_vcf: Optional[Path] = None
) -> list[SimpleVariant]:
    """Converts a list of `pysam.VariantRecords` to a list of `SimpleVariants` for ease of use.
    Filters variants based on their FILTER status, and sorts by start position.

    Args:
        variants: the variants to convert
        source_vcf: the optional path to the source VCF, used only for debug messages

    Returns:
        a list of `SimpleVariants`, one per alternate allele per variant.
    """
    simple_vars = []
    for variant in variants:
        if (
            "PASS" in list(variant.filter) or len(list(variant.filter)) == 0
        ):  # if passing or empty filters
            simple_variants = SimpleVariant.build(variant)
            if any(v.variant_type == VariantType.OTHER for v in simple_variants):
                _logger.debug(
                    f"Input VCF file {source_vcf} may contain complex variants: {variant}"
                )
            simple_vars.extend(simple_variants)
    return sorted(simple_vars, key=lambda v: (v.pos, v.id))
VariantOverlapDetector

Bases: VariantLookup

Implements VariantLookup by reading the entire VCF into memory and loading the resulting Variants into an OverlapDetector.

Source code in prymer/api/variant_lookup.py
class VariantOverlapDetector(VariantLookup):
    """Implements `VariantLookup` by reading the entire VCF into memory and loading the resulting
    Variants into an `OverlapDetector`."""

    def __init__(self, vcf_paths: list[Path], min_maf: Optional[float], include_missing_mafs: bool):
        super().__init__(
            vcf_paths=vcf_paths, min_maf=min_maf, include_missing_mafs=include_missing_mafs
        )
        self._overlap_detector: OverlapDetector[_VariantInterval] = OverlapDetector()

        if len(vcf_paths) == 0:
            raise ValueError("No VCF paths given to query.")

        for path in vcf_paths:
            if (
                not path.with_suffix(path.suffix + ".csi").is_file()
                and not path.with_suffix(path.suffix + ".tbi").is_file()
            ):
                raise ValueError(f"Cannot perform fetch with missing index file for VCF: {path}")

            with reader(path) as fh:
                variant_intervals = iter(
                    _VariantInterval.build(simple_variant)
                    for variant in fh
                    for simple_variant in self.to_variants([variant], source_vcf=path)
                )
                self._overlap_detector.add_all(variant_intervals)

    def _query(self, refname: str, start: int, end: int) -> list[SimpleVariant]:
        """Queries variants from the VCFs used by this lookup."""
        query = Interval(
            refname=refname, start=start - 1, end=end
        )  # interval is half-open, 0-based
        overlapping_variants = (
            variant_interval.variant
            for variant_interval in self._overlap_detector.get_overlaps(query)
        )
        return sorted(overlapping_variants, key=lambda v: (v.pos, v.id))
VariantType

Bases: UppercaseStrEnum

Represents the type of variant.

Source code in prymer/api/variant_lookup.py
@unique
class VariantType(UppercaseStrEnum):
    """Represents the type of variant."""

    SNP = auto()
    MNV = auto()
    INSERTION = auto()
    DELETION = auto()
    OTHER = auto()

    @staticmethod
    def from_alleles(ref: str, alt: str) -> "VariantType":
        """Builds a variant type from the given reference and alternate allele.

        Args:
            ref: the reference allele
            alt: the alternate allele

        Returns:
            the variant type
        """
        variant_type: VariantType
        if "<" in alt:
            variant_type = VariantType.OTHER
        elif (len(ref) == 1) and (len(alt) == 1):
            variant_type = VariantType.SNP
        elif len(ref) == len(alt):
            variant_type = VariantType.MNV
        elif len(ref) < len(alt):
            variant_type = VariantType.INSERTION
        elif len(ref) > len(alt):
            variant_type = VariantType.DELETION
        else:
            raise ValueError(f"Could not determine variant type for ref `{ref}` and alt `{alt}`")
        return variant_type
Functions
from_alleles staticmethod
from_alleles(ref: str, alt: str) -> VariantType

Builds a variant type from the given reference and alternate allele.

Parameters:

Name Type Description Default
ref str

the reference allele

required
alt str

the alternate allele

required

Returns:

Type Description
VariantType

the variant type

Source code in prymer/api/variant_lookup.py
@staticmethod
def from_alleles(ref: str, alt: str) -> "VariantType":
    """Builds a variant type from the given reference and alternate allele.

    Args:
        ref: the reference allele
        alt: the alternate allele

    Returns:
        the variant type
    """
    variant_type: VariantType
    if "<" in alt:
        variant_type = VariantType.OTHER
    elif (len(ref) == 1) and (len(alt) == 1):
        variant_type = VariantType.SNP
    elif len(ref) == len(alt):
        variant_type = VariantType.MNV
    elif len(ref) < len(alt):
        variant_type = VariantType.INSERTION
    elif len(ref) > len(alt):
        variant_type = VariantType.DELETION
    else:
        raise ValueError(f"Could not determine variant type for ref `{ref}` and alt `{alt}`")
    return variant_type

Functions

cached
cached(
    vcf_paths: list[Path],
    min_maf: float,
    include_missing_mafs: bool = False,
) -> VariantOverlapDetector

Constructs a VariantLookup that caches all variants in memory for fast lookup. Appropriate for small VCFs.

Source code in prymer/api/variant_lookup.py
def cached(
    vcf_paths: list[Path], min_maf: float, include_missing_mafs: bool = False
) -> VariantOverlapDetector:
    """Constructs a `VariantLookup` that caches all variants in memory for fast lookup.
    Appropriate for small VCFs."""
    return VariantOverlapDetector(
        vcf_paths=vcf_paths, min_maf=min_maf, include_missing_mafs=include_missing_mafs
    )
calc_maf_from_filter
calc_maf_from_filter(
    variant: VariantRecord,
) -> Optional[float]

Calculates minor allele frequency (MAF) based on whether VariantRecord denotes CAF, AF, AC and AN in the INFO field. In none of those are found, looks in the FORMAT field for genotypes and calculates MAF.

If the variant has multiple alternate alleles, then the returned MAF is the sum of MAFs across all alternate alleles.

Parameters:

Name Type Description Default
variant VariantRecord

the variant from which the MAF will be computed

required

Returns:

Type Description
Optional[float]

the minor allele frequency (MAF)

Source code in prymer/api/variant_lookup.py
def calc_maf_from_filter(variant: pysam.VariantRecord) -> Optional[float]:
    """Calculates minor allele frequency (MAF) based on whether `VariantRecord` denotes
    CAF, AF, AC and AN in the INFO field. In none of those are found, looks in the FORMAT
    field for genotypes and calculates MAF.

    If the variant has multiple _alternate_ alleles, then the returned MAF is the sum of MAFs across
    all alternate alleles.

    Args:
        variant: the variant from which the MAF will be computed

    Returns:
        the minor allele frequency (MAF)
    """

    maf: Optional[float] = None
    if "CAF" in variant.info:
        # Assumes CAF is a list of allele frequencies, with the first being the reference allele
        maf = 1 - float(variant.info["CAF"][0])
    elif "AF" in variant.info:
        maf = sum(float(af) for af in variant.info["AF"])
    elif "AC" in variant.info and "AN" in variant.info:
        ac = sum(int(ac) for ac in variant.info["AC"])
        an = int(variant.info["AN"])
        maf = ac / an
    elif len(list(variant.samples)) > 0:  # if genotypes are not empty
        gts = [idx for sample in variant.samples.values() for idx in sample["GT"] if "GT" in sample]
        if len(gts) > 0:
            num_alt = sum(1 for idx in gts if idx != 0)
            maf = num_alt / len(gts)

    return maf
disk_based
disk_based(
    vcf_paths: list[Path],
    min_maf: float,
    include_missing_mafs: bool = False,
) -> FileBasedVariantLookup

Constructs a VariantLookup that queries indexed VCFs on disk for each lookup.

Appropriate for large VCFs.

Example:

>>> with disk_based([Path("./tests/api/data/miniref.variants.vcf.gz")], min_maf=0.0) as lookup:
...     lookup.query(refname="chr2", start=7999, end=8000)
[SimpleVariant(id='complex-variant-sv-1/1', refname='chr2', pos=8000, ref='T', alt='<DEL>', end=8000, variant_type=<VariantType.OTHER: 'OTHER'>, maf=None)]
Source code in prymer/api/variant_lookup.py
def disk_based(
    vcf_paths: list[Path], min_maf: float, include_missing_mafs: bool = False
) -> FileBasedVariantLookup:
    """Constructs a `VariantLookup` that queries indexed VCFs on disk for each lookup.

    Appropriate for large VCFs.

    Example:

    ```python
    >>> with disk_based([Path("./tests/api/data/miniref.variants.vcf.gz")], min_maf=0.0) as lookup:
    ...     lookup.query(refname="chr2", start=7999, end=8000)
    [SimpleVariant(id='complex-variant-sv-1/1', refname='chr2', pos=8000, ref='T', alt='<DEL>', end=8000, variant_type=<VariantType.OTHER: 'OTHER'>, maf=None)]

    ```
    """  # noqa: E501
    return FileBasedVariantLookup(
        vcf_paths=vcf_paths, min_maf=min_maf, include_missing_mafs=include_missing_mafs
    )