Module dadi.DFE.Cache2D_mod

Developed by the Gutenkunst group, building off of the fitdadi code.

Expand source code
"""
Developed by the Gutenkunst group, building off of the fitdadi code.
"""
import sys, traceback
import numpy as np
import scipy.integrate

class Cache2D:
    def __init__(self, params, ns, demo_sel_func, pts,
                 gamma_bounds=(1e-4, 2000.), gamma_pts=100,
                 additional_gammas=[], cpus=None, gpus=0, verbose=False,
                 split_jobs=1, this_job_id=0):
        """
        params: Optimized demographic parameters
        ns: Sample sizes for cached spectra
        demo_sel_func: DaDi demographic function with selection. 
                       gamma1, gamma2 must be the last arguments.
        pts: Grid point settings for demo_sel_func
        gamma_bounds: Range of gammas to integrate over
        gamma_points: Number of gamma grid points over which to integrate
        additional_gammas: Sequence of additional gamma values to store results
                           for. Useful for point masses of explicit neutrality
                           or positive selection.
        cpus: Number of CPU jobs to launch. If None (the default), then
              all CPUs available will be used.
        gpus: Number of GPU jobs to launch.
        verbose: If True, print messages to track progress of cache generation.
        split_jobs: To split cache generation across multiple computing jobs,
                    set split_jobs > 1 and this_job_id.
        this_job_id: Defines which entry in split_jobs this run will create.
                     (Indexed from 0.)
        For example, to split Cache2D generation over 3 independent jobs, set
            split_jobs=3 and create jobs with this_job_id=0,1,2.
            Then use Cache2D.merge to combine the outputs.
        """
        # Store details regarding how this cache was generated. May be useful
        # for keeping track of pickled caches.
        self.ns = ns
        self.pts = pts
        self.func_name = demo_sel_func.__name__
        self.params = params

        # Create a vector of gammas that are log-spaced over sequential 
        # intervals or log-spaced over a single interval.
        self.gammas = -np.logspace(np.log10(gamma_bounds[1]),
                                   np.log10(gamma_bounds[0]), gamma_pts)

        # Store bulk negative gammas for use in integration of negative pdf
        self.neg_gammas = self.gammas
        # Add additional gammas to array
        self.gammas = np.concatenate((self.gammas, additional_gammas))

        self.spectra = [[None]*len(self.gammas) for _ in self.gammas]

        if cpus is None:
            import multiprocessing
            cpus = multiprocessing.cpu_count()

        if not cpus > 1 or gpus > 0: #for running with a single thread
            self._single_process(verbose, split_jobs, this_job_id, demo_sel_func)
        else: #for running with with multiple cores
            self._multiple_processes(cpus, gpus, verbose, split_jobs, this_job_id, demo_sel_func)

        # self.spectra is an array of arrays. The first two dimensions are
        # indexed by the pairs of gamma values, and the remaining dimensions
        # are the spectra themselves.
        if split_jobs == 1:
            self.spectra = np.array(self.spectra)
        
    def _single_process(self, verbose, split_jobs, this_job_id, demo_sel_func):
        func_ex = Numerics.make_extrap_func(demo_sel_func)

        this_eval = 0
        for ii,gamma in enumerate(self.gammas):
            for jj, gamma2 in enumerate(self.gammas):
                if this_eval % split_jobs == this_job_id:
                    self.spectra[ii][jj] = func_ex(tuple(self.params)+(gamma,gamma2),
                            self.ns, self.pts)
                    if verbose: print('{0},{1}: {2},{3}'.format(ii,jj, gamma,gamma2))
                this_eval += 1
                        
    def _multiple_processes(self, cpus, gpus, verbose, split_jobs, this_job_id, demo_sel_func):
        from multiprocessing import Manager, Process

        with Manager() as manager:
            work = manager.Queue(cpus+gpus)
            results = manager.list()
                
            # Assemble pool of workers
            pool = []
            for i in range(cpus):
                p = Process(target=self._worker_sfs,
                            args=(work, results, demo_sel_func, self.params, self.ns, self.pts, verbose, False))
                p.start()
                pool.append(p)
            for i in range(gpus):
                p = Process(target=self._worker_sfs,
                            args=(work, results, demo_sel_func, self.params, self.ns, self.pts, verbose, True))
                p.start()
                pool.append(p)

            # Put all jobs on queue
            this_eval = 0
            for ii, gamma in enumerate(self.gammas):
                for jj, gamma2 in enumerate(self.gammas):
                    if this_eval % split_jobs == this_job_id:
                        work.put((ii,jj, gamma,gamma2))
                    this_eval += 1
            # Put commands on queue to close out workers
            for jj in range(cpus+gpus):
                work.put(None)
            # Stop workers
            for p in pool:
                p.join()
            for ii,jj, sfs in results:
                self.spectra[ii][jj] = sfs
                    
    def _worker_sfs(self, in_queue, outlist, demo_sel_func, params, ns, pts, verbose, usegpu):
        """
        Worker function -- used to generate SFSes for
        pairs of gammas.
        """
        demo_sel_func = Numerics.make_extrap_func(demo_sel_func)
        dadi.cuda_enabled(usegpu)
        while True:
            item = in_queue.get()
            if item is None:
                return
            ii, jj, gamma, gamma2 = item
            try:
                sfs = demo_sel_func(tuple(params)+(gamma,gamma2), ns, pts)
                if verbose:
                    print('{0},{1}: {2},{3}'.format(ii, jj, gamma, gamma2))
                outlist.append((ii,jj,sfs))
            except BaseException as inst:
                # If an exception occurs in the worker function, print an error
                # and populate the outlist with an item that will cause a later crash.
                tb = sys.exc_info()[2]
                traceback.print_tb(tb)
                outlist.append(inst)

    def integrate(self, params, ns, sel_dist, theta, pts,
                  exterior_int=True):
        """
        Integrate spectra over a bivariate prob. dist. for negative gammas.

        params: Parameters for sel_dist
        ns: Ignored
        sel_dist: Bivariate probability distribution,
                  taking in arguments (xx, yy, params)
        theta: Population-scaled mutation rate
        pts: Ignored
        exterior_int: If False, do not integrate outside sampled domain.

        Note also that the ns and pts arguments are ignored. They are only
        present for compatibility with other dadi functions that apply to
        demographic models.
        """
        # Restrict our gammas and spectra to negative gammas.
        Nneg = len(self.neg_gammas)
        spectra = self.spectra[:Nneg, :Nneg]

        # Weights for integration
        weights = sel_dist(-self.neg_gammas, -self.neg_gammas, params)
        # Apply weighting. Could probably do this in a single fancy numpy
        # multiplication, which might be faster.
        weighted_spectra = 0*spectra
        for ii, row in enumerate(weights):
            for jj, w in enumerate(row):
                weighted_spectra[ii,jj] = w*spectra[ii,jj]

        # Integrate out the first two axes, which are the gamma axes.
        temp = np.trapz(weighted_spectra, self.neg_gammas, axis=0)
        fs = np.trapz(temp, self.neg_gammas, axis=0)

        if not exterior_int:
            return Spectrum(theta*fs)

        # Test whether our DFE pdf is symmetric. If it is we can dramatically reduce our calculations.
        testx = np.logspace(-2,2,3)
        testout = sel_dist(testx, testx, params)
        # We need atol=0 here to ensure small values don't lead to spurious pass of test
        symmetric_dfe = np.allclose(testout, testout.T, atol=0, rtol=1e-12)

        max_gamma = -self.neg_gammas[-1]
        min_gamma = -self.neg_gammas[0]
        # Account for density that is outside the simulated domain.
        weights_1low, weights_1high = 0*self.neg_gammas, 0*self.neg_gammas
        weights_2low, weights_2high = 0*self.neg_gammas, 0*self.neg_gammas
        for ii, gamma in enumerate(-self.neg_gammas):
            w_1low, err = scipy.integrate.quad(sel_dist, min_gamma, np.inf, epsabs=1e-4, epsrel=1e-3, args=(gamma,params))
            w_1high, err = scipy.integrate.quad(sel_dist, 0, max_gamma, epsabs=1e-4, epsrel=1e-3, args=(gamma,params))
            if symmetric_dfe:
                w_2low, w_2high = w_1low, w_1high
            else:
                marg2_func = lambda g2: sel_dist(gamma, g2, params)
                w_2low, err = scipy.integrate.quad(marg2_func, min_gamma, np.inf, epsabs=1e-4, epsrel=1e-3)
                w_2high, err = scipy.integrate.quad(marg2_func, 0, max_gamma, epsabs=1e-4, epsrel=1e-3)
            weights_1low[ii] = w_1low
            weights_2low[ii] = w_2low
            weights_1high[ii] = w_1high
            weights_2high[ii] = w_2high

        # Strongly deleterious gamma2, in-range gamma1
        fs += np.trapz(spectra[:,0] * weights_2low[:,np.newaxis,np.newaxis],
                       self.neg_gammas, axis=0)
        # Neutral gamma2, in-range gamma1
        fs += np.trapz(spectra[:,-1] * weights_2high[:,np.newaxis,np.newaxis],
                       self.neg_gammas, axis=0)
        # Strongly deleterious gamma1, in-range gamma2
        fs += np.trapz(spectra[0,:] * weights_1low[:,np.newaxis,np.newaxis],
                       self.neg_gammas, axis=0)
        # Neutral gamma1, in-range gamma2
        fs += np.trapz(spectra[-1,:] * weights_1high[:,np.newaxis,np.newaxis],
                       self.neg_gammas, axis=0)

        # Both neutral, really slow
        weight, err = scipy.integrate.dblquad(sel_dist, 0, max_gamma,
                                              lambda _: 0, lambda _: max_gamma,
                                              epsrel=1e-3, epsabs=1e-4, args=[params])
        fs += spectra[-1,-1]*weight

        # Neutral gamma2, strongly deleterious gamma1
        weight, err = scipy.integrate.dblquad(sel_dist, 0, max_gamma,
                lambda _: min_gamma, lambda _: np.inf,
                epsrel=1e-3, epsabs=1e-4 ,args=[params])
        fs += spectra[0,-1]*weight

        # Neutral gamma1, strongly deleterious gamma2
        if not symmetric_dfe:
            weight, err = scipy.integrate.dblquad(sel_dist, min_gamma, np.inf,
                    lambda _: 0, lambda _: max_gamma,
                    epsrel=1e-3, epsabs=1e-4, args=[params])
        fs += spectra[-1,0]*weight

        return Spectrum(theta*fs)

    def integrate_point_pos(self, params, ns, biv_seldist, theta,
                            rho=0, pts=None):
        """
        Integrate spectra over a bivariate prob. dist. for negative gammas plus
        a point mass of positive selection.

        params: Parameters for sel_dist and positive selection.
                It is assumed that the last four parameters are:
                Proportion positive selection in pop1, postive gamma for pop1,
                 prop. positive in pop2, and positive gamma for pop2.
                Earlier arguments are assumed to be for the continuous bivariate
                distribution.
        ns: Ignored
        biv_seldist: Bivariate probability distribution for negative selection,
                     taking in arguments (xx, yy, params)
        theta: Population-scaled mutation rate
        rho: Correlation coefficient used to connect negative and positive
             components of DFE.
        pts: Ignored

        Note that no normalization is performed, so alleles not covered by the
        specified range of gammas are assumed not to be seen in the data.

        Note that in the triallelic paper (Ragsdale et al. 2016 Genetics), we
        weighted each portion of the DFE based on rho. This was to ensure that
        the rho = 0 limit corresponded to independent sampling and the rho = 1
        limit corresponding to exactly equal selection coefficients for each
        pair of derived alleles.

        If we generalize to allow the marginal DFEs to differ between the
        two populations, the rho = 1 limit cannot be held perfectly between
        both negative and positive selection quadrants. (In log-space, the
        positive selection mass is infinitely far from the negative selection
        DFE.)

        We do use a similar procedure to the triallelic paper to end up
        with something like:
        p++ = p1*p2 + rho*(sqrt(p1*p2) - p1*p2)
        p+- = (1-rho) * p1*(1-p2)
        p-- = (1-p1)*(1-p2) + rho*(1-sqrt(p1*p2) - (1-p1)*(1-p2))

        The logic here is that in the rho=1 limit, we set the proportion
        positively selected to be the geometric mean of p1 and p2, the
        proportion positive in one pop and negative in the other (p+-) to be
        zero, and the remainder negative in both populations p--. We then
        linearly interpolate between the rho=0 and rho=1 cases.

        This requires the integration method to explicitly know about rho,
        so it's not completely general to all joint DFEs. rho is thus
        included as a parameter in the argument list.
        """
        biv_params = params[:-4]
        ppos1, gammapos1, ppos2, gammapos2 = params[-4:]

        Nneg = len(self.neg_gammas)
        weights = biv_seldist(-self.neg_gammas, -self.neg_gammas, biv_params)

        # Case in which both are negative, so we integrate directly
        # over the bivariate distribution
        neg_neg = self.integrate(biv_params, ns, biv_seldist, 1, pts)

        # Case in which both are positive, which is a single spectrum.
        try:
            pos_pos = self.spectra[self.gammas == gammapos1,
                                   self.gammas == gammapos2][0]
        except IndexError:
            raise IndexError('Failed to find requested gammapos1={0:.4f} '
                             'and/or gammapos2={1:0.4f} in cached spectra. '
                             'Were they included in additional_gammas during '
                             'cache generation?'.format(gammapos1,gammapos2))

        # Case in which pop1 is positive and pop2 is negative.
        pos_neg_spectra = np.squeeze(self.spectra[self.gammas==gammapos1,:Nneg])
        # Obtain the marginal DFE for pop2 by integrating over the weights
        # for pop1.
        pos_neg_weights = np.trapz(weights, self.neg_gammas, axis=0)
        # For numpy multiplication...
        pos_neg_weights = pos_neg_weights[:,np.newaxis,np.newaxis]
        # Do the weighted integral.
        pos_neg = np.trapz(pos_neg_weights*pos_neg_spectra,
                           self.neg_gammas, axis=0)

        # Case in which pop2 is positive and pop1 is negative.
        neg_pos_spectra = np.squeeze(self.spectra[:Nneg,self.gammas==gammapos2])
        # Now integrate over pop2 to get marginal DFE for pop1
        neg_pos_weights = np.trapz(weights, self.neg_gammas, axis=1)
        neg_pos_weights = neg_pos_weights[:,np.newaxis,np.newaxis]
        neg_pos = np.trapz(neg_pos_weights*neg_pos_spectra,
                           self.neg_gammas, axis=0)

        # Weighting factors for each quadrant of the DFE. Note that in the case
        # that ppos1==ppos2, these reduce to the model of Ragsdale et al. (2016)
        p_pos_pos = ppos1*ppos2 + rho*(np.sqrt(ppos1*ppos2) - ppos1*ppos2)
        p_pos_neg = (1-rho) * ppos1*(1-ppos2)
        p_neg_pos = (1-rho) * (1-ppos1)*ppos2
        p_neg_neg = (1-ppos1)*(1-ppos2)\
                + rho*(1-np.sqrt(ppos1*ppos2) - (1-ppos1)*(1-ppos2))

        fs = p_pos_pos*pos_pos + p_pos_neg*pos_neg + p_neg_pos*neg_pos\
                + p_neg_neg*neg_neg
        return theta*fs

    def integrate_symmetric_point_pos(self, params, ns, biv_seldist, theta, pts=None):
        """
        Convenience method for integrating spectra over a bivariate prob. dist.
        for negative gammas plus a symmetric point mass of positive selection.

        params: Parameters for sel_dist and positive selection.
                It is assumed that the last two parameters are:
                Proportion positive selection, postive gamma
                Earlier arguments are assumed to be for the continuous bivariate
                distribution.
                *It is assumed that the last of those earlier arguments is
                 the correlation coefficient rho.*
        ns: Ignored
        biv_seldist: Bivariate probability distribution for negative selection,
                     taking in arguments (xx, yy, params)
        theta: Population-scaled mutation rate
        pts: Ignored
        """
        seldist_params = params[:-2]
        rho = seldist_params[-1]
        ppos, gammapos = params[-2:]

        params = np.concatenate((seldist_params, [ppos,gammapos,ppos,gammapos]))

        return self.integrate_point_pos(params, ns, biv_seldist, theta,
                                        rho=rho, pts=None)

    @staticmethod
    def merge(caches):
        """
        Merge caches generated with split_jobs.

        caches: Cache2D objects to merge

        Note: Will fail if caches conflict or do not merge into a complete cache.
        """
        import copy
        # Copy our first cache to start the output
        new_cache = copy.deepcopy(caches[0])
        for other in caches[1:]:
            for ii, row in enumerate(other.spectra):
                for jj, fs in enumerate(row):
                    if fs is not None:
                        if new_cache.spectra[ii][jj] is not None \
                                and not np.all(new_cache.spectra[ii][jj] == fs):
                            raise ValueError("Merged cached conflicts with current.")
                        new_cache.spectra[ii][jj] = fs
        # Check that new cache is complete
        for ii, row in enumerate(new_cache.spectra):
            for jj, fs in enumerate(row):
                if fs is None:
                    raise ValueError("Cache is incomplete after merging. "
                            "First missing entry is {0},{1}.".format(ii,jj))
        # Convert cache spectra to array
        new_cache.spectra = np.array(new_cache.spectra)
        return new_cache
#
# Example demography + selection functions
#
import dadi
from dadi import Numerics, PhiManip, Integration
from dadi.Spectrum_mod import Spectrum

def mixture(params, ns, s1, s2, sel_dist1, sel_dist2, theta, pts,
                  exterior_int=True):
    """
    Weighted summation of 1d and 2d distributions that share parameters.
    The 1d distribution is equivalent to assuming selection coefficients are
    perfectly correlated.

    params: Parameters for potential optimization.
            It is assumed that last parameter is the weight for the 2d dist.
            The second-to-last parameter is assumed to be the correlation
                coefficient for the 2d distribution.
            The remaining parameters as assumed to be shared between the
                1d and 2d distributions.
    ns: Ignored
    s1: Cache1D object for 1d distribution
    s2: Cache2D object for 2d distribution
    sel_dist1: Univariate probability distribution for s1
    sel_dist2: Bivariate probability distribution for s2
    theta: Population-scaled mutation rate
    pts: Ignored
    exterior_int: If False, do not integrate outside sampled domain.
    """
    fs1 = s1.integrate(params[:-2], None, sel_dist1, theta, None, exterior_int)
    fs2 = s2.integrate(params[:-1], None, sel_dist2, theta, None, exterior_int)

    p2d = params[-1]
    return (1-p2d)*fs1 + p2d*fs2

def mixture_symmetric_point_pos(params, ns, s1, s2, sel_dist1, sel_dist2,
                                theta, pts=None):
    """
    Weighted summation of 1d and 2d distributions with positive selection.
    The 1d distribution is equivalent to assuming selection coefficients are
    perfectly correlated.

    params: Parameters for potential optimization.
            The last parameter is the weight for the 2d dist.
            The second-to-last parameter is positive gammma for the point mass.
            The third-to-last parameter is the proportion of positive selection.
            The fourth-to-last parameter is the correlation coefficient for the
                2d distribution.
            The remaining parameters as must be shared between the 1d and 2d
                distributions.
    ns: Ignored
    s1: Cache1D object for 1d distribution
    s2: Cache2D object for 2d distribution
    sel_dist1: Univariate probability distribution for s1
    sel_dist2: Bivariate probability distribution for s2
    theta: Population-scaled mutation rate
    pts: Ignored
    """
    pdf_params = params[:-4]
    rho, ppos, gamma_pos, p2d = params[-4:]

    params1 = list(pdf_params) + [ppos, gamma_pos]
    fs1 = s1.integrate_point_pos(params1, None, sel_dist1, theta, Npos=1, pts=None)
    params2 = list(pdf_params) + [rho, ppos, gamma_pos]
    fs2 = s2.integrate_symmetric_point_pos(params2, None, sel_dist2, theta, None)
    return (1-p2d)*fs1 + p2d*fs2

def mixture_point_pos(params, ns, s1, s2, sel_dist1, sel_dist2,
                      theta, pts=None):
    """
    Weighted summation of 1d and 2d distributions with positive selection.
    The 1d distribution is equivalent to assuming selection coefficients are
    perfectly correlated.

    params: Parameters for potential optimization.
            The last parameter is the weight for the 2d dist.
            The second-to-last parameter is positive gammma for the point mass in population 2.
            The third-to-last parameter is the proportion of positive selection in population 2.
            The fourth-to-last parameter is positive gamma for the point mass in population 1.
            The fifth-to-last parameter is the proportion of positive selection in population 1.
            The sixth-to-last parameter is the correlation coefficient for the
                2d distribution.
            The remaining parameters as must be shared between the 1d and 2d
                distributions.
    ns: Ignored
    s1: Cache1D object for 1d distribution
    s2: Cache2D object for 2d distribution
    sel_dist1: Univariate probability distribution for s1
    sel_dist2: Bivariate probability distribution for s2
    theta: Population-scaled mutation rate
    pts: Ignored
    """
    pdf_params = params[:-6]
    rho, ppos1, gamma_pos1, ppos2, gamma_pos2, p2d = params[-6:]

    params1 = list(pdf_params) + [ppos1, gamma_pos1]
    fs1 = s1.integrate_point_pos(params1, None, sel_dist1, theta, Npos=1, pts=None)
    params2 = list(pdf_params) + [rho, ppos1, gamma_pos1, ppos2, gamma_pos2]
    fs2 = s2.integrate_point_pos(params2, None, sel_dist2, theta, None)
    return (1-p2d)*fs1 + p2d*fs2

Functions

def mixture(params, ns, s1, s2, sel_dist1, sel_dist2, theta, pts, exterior_int=True)

Weighted summation of 1d and 2d distributions that share parameters. The 1d distribution is equivalent to assuming selection coefficients are perfectly correlated.

params: Parameters for potential optimization. It is assumed that last parameter is the weight for the 2d dist. The second-to-last parameter is assumed to be the correlation coefficient for the 2d distribution. The remaining parameters as assumed to be shared between the 1d and 2d distributions. ns: Ignored s1: Cache1D object for 1d distribution s2: Cache2D object for 2d distribution sel_dist1: Univariate probability distribution for s1 sel_dist2: Bivariate probability distribution for s2 theta: Population-scaled mutation rate pts: Ignored exterior_int: If False, do not integrate outside sampled domain.

Expand source code
def mixture(params, ns, s1, s2, sel_dist1, sel_dist2, theta, pts,
                  exterior_int=True):
    """
    Weighted summation of 1d and 2d distributions that share parameters.
    The 1d distribution is equivalent to assuming selection coefficients are
    perfectly correlated.

    params: Parameters for potential optimization.
            It is assumed that last parameter is the weight for the 2d dist.
            The second-to-last parameter is assumed to be the correlation
                coefficient for the 2d distribution.
            The remaining parameters as assumed to be shared between the
                1d and 2d distributions.
    ns: Ignored
    s1: Cache1D object for 1d distribution
    s2: Cache2D object for 2d distribution
    sel_dist1: Univariate probability distribution for s1
    sel_dist2: Bivariate probability distribution for s2
    theta: Population-scaled mutation rate
    pts: Ignored
    exterior_int: If False, do not integrate outside sampled domain.
    """
    fs1 = s1.integrate(params[:-2], None, sel_dist1, theta, None, exterior_int)
    fs2 = s2.integrate(params[:-1], None, sel_dist2, theta, None, exterior_int)

    p2d = params[-1]
    return (1-p2d)*fs1 + p2d*fs2
def mixture_point_pos(params, ns, s1, s2, sel_dist1, sel_dist2, theta, pts=None)

Weighted summation of 1d and 2d distributions with positive selection. The 1d distribution is equivalent to assuming selection coefficients are perfectly correlated.

params: Parameters for potential optimization. The last parameter is the weight for the 2d dist. The second-to-last parameter is positive gammma for the point mass in population 2. The third-to-last parameter is the proportion of positive selection in population 2. The fourth-to-last parameter is positive gamma for the point mass in population 1. The fifth-to-last parameter is the proportion of positive selection in population 1. The sixth-to-last parameter is the correlation coefficient for the 2d distribution. The remaining parameters as must be shared between the 1d and 2d distributions. ns: Ignored s1: Cache1D object for 1d distribution s2: Cache2D object for 2d distribution sel_dist1: Univariate probability distribution for s1 sel_dist2: Bivariate probability distribution for s2 theta: Population-scaled mutation rate pts: Ignored

Expand source code
def mixture_point_pos(params, ns, s1, s2, sel_dist1, sel_dist2,
                      theta, pts=None):
    """
    Weighted summation of 1d and 2d distributions with positive selection.
    The 1d distribution is equivalent to assuming selection coefficients are
    perfectly correlated.

    params: Parameters for potential optimization.
            The last parameter is the weight for the 2d dist.
            The second-to-last parameter is positive gammma for the point mass in population 2.
            The third-to-last parameter is the proportion of positive selection in population 2.
            The fourth-to-last parameter is positive gamma for the point mass in population 1.
            The fifth-to-last parameter is the proportion of positive selection in population 1.
            The sixth-to-last parameter is the correlation coefficient for the
                2d distribution.
            The remaining parameters as must be shared between the 1d and 2d
                distributions.
    ns: Ignored
    s1: Cache1D object for 1d distribution
    s2: Cache2D object for 2d distribution
    sel_dist1: Univariate probability distribution for s1
    sel_dist2: Bivariate probability distribution for s2
    theta: Population-scaled mutation rate
    pts: Ignored
    """
    pdf_params = params[:-6]
    rho, ppos1, gamma_pos1, ppos2, gamma_pos2, p2d = params[-6:]

    params1 = list(pdf_params) + [ppos1, gamma_pos1]
    fs1 = s1.integrate_point_pos(params1, None, sel_dist1, theta, Npos=1, pts=None)
    params2 = list(pdf_params) + [rho, ppos1, gamma_pos1, ppos2, gamma_pos2]
    fs2 = s2.integrate_point_pos(params2, None, sel_dist2, theta, None)
    return (1-p2d)*fs1 + p2d*fs2
def mixture_symmetric_point_pos(params, ns, s1, s2, sel_dist1, sel_dist2, theta, pts=None)

Weighted summation of 1d and 2d distributions with positive selection. The 1d distribution is equivalent to assuming selection coefficients are perfectly correlated.

params: Parameters for potential optimization. The last parameter is the weight for the 2d dist. The second-to-last parameter is positive gammma for the point mass. The third-to-last parameter is the proportion of positive selection. The fourth-to-last parameter is the correlation coefficient for the 2d distribution. The remaining parameters as must be shared between the 1d and 2d distributions. ns: Ignored s1: Cache1D object for 1d distribution s2: Cache2D object for 2d distribution sel_dist1: Univariate probability distribution for s1 sel_dist2: Bivariate probability distribution for s2 theta: Population-scaled mutation rate pts: Ignored

Expand source code
def mixture_symmetric_point_pos(params, ns, s1, s2, sel_dist1, sel_dist2,
                                theta, pts=None):
    """
    Weighted summation of 1d and 2d distributions with positive selection.
    The 1d distribution is equivalent to assuming selection coefficients are
    perfectly correlated.

    params: Parameters for potential optimization.
            The last parameter is the weight for the 2d dist.
            The second-to-last parameter is positive gammma for the point mass.
            The third-to-last parameter is the proportion of positive selection.
            The fourth-to-last parameter is the correlation coefficient for the
                2d distribution.
            The remaining parameters as must be shared between the 1d and 2d
                distributions.
    ns: Ignored
    s1: Cache1D object for 1d distribution
    s2: Cache2D object for 2d distribution
    sel_dist1: Univariate probability distribution for s1
    sel_dist2: Bivariate probability distribution for s2
    theta: Population-scaled mutation rate
    pts: Ignored
    """
    pdf_params = params[:-4]
    rho, ppos, gamma_pos, p2d = params[-4:]

    params1 = list(pdf_params) + [ppos, gamma_pos]
    fs1 = s1.integrate_point_pos(params1, None, sel_dist1, theta, Npos=1, pts=None)
    params2 = list(pdf_params) + [rho, ppos, gamma_pos]
    fs2 = s2.integrate_symmetric_point_pos(params2, None, sel_dist2, theta, None)
    return (1-p2d)*fs1 + p2d*fs2

Classes

class Cache2D (params, ns, demo_sel_func, pts, gamma_bounds=(0.0001, 2000.0), gamma_pts=100, additional_gammas=[], cpus=None, gpus=0, verbose=False, split_jobs=1, this_job_id=0)

params: Optimized demographic parameters ns: Sample sizes for cached spectra demo_sel_func: DaDi demographic function with selection. gamma1, gamma2 must be the last arguments. pts: Grid point settings for demo_sel_func gamma_bounds: Range of gammas to integrate over gamma_points: Number of gamma grid points over which to integrate additional_gammas: Sequence of additional gamma values to store results for. Useful for point masses of explicit neutrality or positive selection. cpus: Number of CPU jobs to launch. If None (the default), then all CPUs available will be used. gpus: Number of GPU jobs to launch. verbose: If True, print messages to track progress of cache generation. split_jobs: To split cache generation across multiple computing jobs, set split_jobs > 1 and this_job_id. this_job_id: Defines which entry in split_jobs this run will create. (Indexed from 0.) For example, to split Cache2D generation over 3 independent jobs, set split_jobs=3 and create jobs with this_job_id=0,1,2. Then use Cache2D.merge to combine the outputs.

Expand source code
class Cache2D:
    def __init__(self, params, ns, demo_sel_func, pts,
                 gamma_bounds=(1e-4, 2000.), gamma_pts=100,
                 additional_gammas=[], cpus=None, gpus=0, verbose=False,
                 split_jobs=1, this_job_id=0):
        """
        params: Optimized demographic parameters
        ns: Sample sizes for cached spectra
        demo_sel_func: DaDi demographic function with selection. 
                       gamma1, gamma2 must be the last arguments.
        pts: Grid point settings for demo_sel_func
        gamma_bounds: Range of gammas to integrate over
        gamma_points: Number of gamma grid points over which to integrate
        additional_gammas: Sequence of additional gamma values to store results
                           for. Useful for point masses of explicit neutrality
                           or positive selection.
        cpus: Number of CPU jobs to launch. If None (the default), then
              all CPUs available will be used.
        gpus: Number of GPU jobs to launch.
        verbose: If True, print messages to track progress of cache generation.
        split_jobs: To split cache generation across multiple computing jobs,
                    set split_jobs > 1 and this_job_id.
        this_job_id: Defines which entry in split_jobs this run will create.
                     (Indexed from 0.)
        For example, to split Cache2D generation over 3 independent jobs, set
            split_jobs=3 and create jobs with this_job_id=0,1,2.
            Then use Cache2D.merge to combine the outputs.
        """
        # Store details regarding how this cache was generated. May be useful
        # for keeping track of pickled caches.
        self.ns = ns
        self.pts = pts
        self.func_name = demo_sel_func.__name__
        self.params = params

        # Create a vector of gammas that are log-spaced over sequential 
        # intervals or log-spaced over a single interval.
        self.gammas = -np.logspace(np.log10(gamma_bounds[1]),
                                   np.log10(gamma_bounds[0]), gamma_pts)

        # Store bulk negative gammas for use in integration of negative pdf
        self.neg_gammas = self.gammas
        # Add additional gammas to array
        self.gammas = np.concatenate((self.gammas, additional_gammas))

        self.spectra = [[None]*len(self.gammas) for _ in self.gammas]

        if cpus is None:
            import multiprocessing
            cpus = multiprocessing.cpu_count()

        if not cpus > 1 or gpus > 0: #for running with a single thread
            self._single_process(verbose, split_jobs, this_job_id, demo_sel_func)
        else: #for running with with multiple cores
            self._multiple_processes(cpus, gpus, verbose, split_jobs, this_job_id, demo_sel_func)

        # self.spectra is an array of arrays. The first two dimensions are
        # indexed by the pairs of gamma values, and the remaining dimensions
        # are the spectra themselves.
        if split_jobs == 1:
            self.spectra = np.array(self.spectra)
        
    def _single_process(self, verbose, split_jobs, this_job_id, demo_sel_func):
        func_ex = Numerics.make_extrap_func(demo_sel_func)

        this_eval = 0
        for ii,gamma in enumerate(self.gammas):
            for jj, gamma2 in enumerate(self.gammas):
                if this_eval % split_jobs == this_job_id:
                    self.spectra[ii][jj] = func_ex(tuple(self.params)+(gamma,gamma2),
                            self.ns, self.pts)
                    if verbose: print('{0},{1}: {2},{3}'.format(ii,jj, gamma,gamma2))
                this_eval += 1
                        
    def _multiple_processes(self, cpus, gpus, verbose, split_jobs, this_job_id, demo_sel_func):
        from multiprocessing import Manager, Process

        with Manager() as manager:
            work = manager.Queue(cpus+gpus)
            results = manager.list()
                
            # Assemble pool of workers
            pool = []
            for i in range(cpus):
                p = Process(target=self._worker_sfs,
                            args=(work, results, demo_sel_func, self.params, self.ns, self.pts, verbose, False))
                p.start()
                pool.append(p)
            for i in range(gpus):
                p = Process(target=self._worker_sfs,
                            args=(work, results, demo_sel_func, self.params, self.ns, self.pts, verbose, True))
                p.start()
                pool.append(p)

            # Put all jobs on queue
            this_eval = 0
            for ii, gamma in enumerate(self.gammas):
                for jj, gamma2 in enumerate(self.gammas):
                    if this_eval % split_jobs == this_job_id:
                        work.put((ii,jj, gamma,gamma2))
                    this_eval += 1
            # Put commands on queue to close out workers
            for jj in range(cpus+gpus):
                work.put(None)
            # Stop workers
            for p in pool:
                p.join()
            for ii,jj, sfs in results:
                self.spectra[ii][jj] = sfs
                    
    def _worker_sfs(self, in_queue, outlist, demo_sel_func, params, ns, pts, verbose, usegpu):
        """
        Worker function -- used to generate SFSes for
        pairs of gammas.
        """
        demo_sel_func = Numerics.make_extrap_func(demo_sel_func)
        dadi.cuda_enabled(usegpu)
        while True:
            item = in_queue.get()
            if item is None:
                return
            ii, jj, gamma, gamma2 = item
            try:
                sfs = demo_sel_func(tuple(params)+(gamma,gamma2), ns, pts)
                if verbose:
                    print('{0},{1}: {2},{3}'.format(ii, jj, gamma, gamma2))
                outlist.append((ii,jj,sfs))
            except BaseException as inst:
                # If an exception occurs in the worker function, print an error
                # and populate the outlist with an item that will cause a later crash.
                tb = sys.exc_info()[2]
                traceback.print_tb(tb)
                outlist.append(inst)

    def integrate(self, params, ns, sel_dist, theta, pts,
                  exterior_int=True):
        """
        Integrate spectra over a bivariate prob. dist. for negative gammas.

        params: Parameters for sel_dist
        ns: Ignored
        sel_dist: Bivariate probability distribution,
                  taking in arguments (xx, yy, params)
        theta: Population-scaled mutation rate
        pts: Ignored
        exterior_int: If False, do not integrate outside sampled domain.

        Note also that the ns and pts arguments are ignored. They are only
        present for compatibility with other dadi functions that apply to
        demographic models.
        """
        # Restrict our gammas and spectra to negative gammas.
        Nneg = len(self.neg_gammas)
        spectra = self.spectra[:Nneg, :Nneg]

        # Weights for integration
        weights = sel_dist(-self.neg_gammas, -self.neg_gammas, params)
        # Apply weighting. Could probably do this in a single fancy numpy
        # multiplication, which might be faster.
        weighted_spectra = 0*spectra
        for ii, row in enumerate(weights):
            for jj, w in enumerate(row):
                weighted_spectra[ii,jj] = w*spectra[ii,jj]

        # Integrate out the first two axes, which are the gamma axes.
        temp = np.trapz(weighted_spectra, self.neg_gammas, axis=0)
        fs = np.trapz(temp, self.neg_gammas, axis=0)

        if not exterior_int:
            return Spectrum(theta*fs)

        # Test whether our DFE pdf is symmetric. If it is we can dramatically reduce our calculations.
        testx = np.logspace(-2,2,3)
        testout = sel_dist(testx, testx, params)
        # We need atol=0 here to ensure small values don't lead to spurious pass of test
        symmetric_dfe = np.allclose(testout, testout.T, atol=0, rtol=1e-12)

        max_gamma = -self.neg_gammas[-1]
        min_gamma = -self.neg_gammas[0]
        # Account for density that is outside the simulated domain.
        weights_1low, weights_1high = 0*self.neg_gammas, 0*self.neg_gammas
        weights_2low, weights_2high = 0*self.neg_gammas, 0*self.neg_gammas
        for ii, gamma in enumerate(-self.neg_gammas):
            w_1low, err = scipy.integrate.quad(sel_dist, min_gamma, np.inf, epsabs=1e-4, epsrel=1e-3, args=(gamma,params))
            w_1high, err = scipy.integrate.quad(sel_dist, 0, max_gamma, epsabs=1e-4, epsrel=1e-3, args=(gamma,params))
            if symmetric_dfe:
                w_2low, w_2high = w_1low, w_1high
            else:
                marg2_func = lambda g2: sel_dist(gamma, g2, params)
                w_2low, err = scipy.integrate.quad(marg2_func, min_gamma, np.inf, epsabs=1e-4, epsrel=1e-3)
                w_2high, err = scipy.integrate.quad(marg2_func, 0, max_gamma, epsabs=1e-4, epsrel=1e-3)
            weights_1low[ii] = w_1low
            weights_2low[ii] = w_2low
            weights_1high[ii] = w_1high
            weights_2high[ii] = w_2high

        # Strongly deleterious gamma2, in-range gamma1
        fs += np.trapz(spectra[:,0] * weights_2low[:,np.newaxis,np.newaxis],
                       self.neg_gammas, axis=0)
        # Neutral gamma2, in-range gamma1
        fs += np.trapz(spectra[:,-1] * weights_2high[:,np.newaxis,np.newaxis],
                       self.neg_gammas, axis=0)
        # Strongly deleterious gamma1, in-range gamma2
        fs += np.trapz(spectra[0,:] * weights_1low[:,np.newaxis,np.newaxis],
                       self.neg_gammas, axis=0)
        # Neutral gamma1, in-range gamma2
        fs += np.trapz(spectra[-1,:] * weights_1high[:,np.newaxis,np.newaxis],
                       self.neg_gammas, axis=0)

        # Both neutral, really slow
        weight, err = scipy.integrate.dblquad(sel_dist, 0, max_gamma,
                                              lambda _: 0, lambda _: max_gamma,
                                              epsrel=1e-3, epsabs=1e-4, args=[params])
        fs += spectra[-1,-1]*weight

        # Neutral gamma2, strongly deleterious gamma1
        weight, err = scipy.integrate.dblquad(sel_dist, 0, max_gamma,
                lambda _: min_gamma, lambda _: np.inf,
                epsrel=1e-3, epsabs=1e-4 ,args=[params])
        fs += spectra[0,-1]*weight

        # Neutral gamma1, strongly deleterious gamma2
        if not symmetric_dfe:
            weight, err = scipy.integrate.dblquad(sel_dist, min_gamma, np.inf,
                    lambda _: 0, lambda _: max_gamma,
                    epsrel=1e-3, epsabs=1e-4, args=[params])
        fs += spectra[-1,0]*weight

        return Spectrum(theta*fs)

    def integrate_point_pos(self, params, ns, biv_seldist, theta,
                            rho=0, pts=None):
        """
        Integrate spectra over a bivariate prob. dist. for negative gammas plus
        a point mass of positive selection.

        params: Parameters for sel_dist and positive selection.
                It is assumed that the last four parameters are:
                Proportion positive selection in pop1, postive gamma for pop1,
                 prop. positive in pop2, and positive gamma for pop2.
                Earlier arguments are assumed to be for the continuous bivariate
                distribution.
        ns: Ignored
        biv_seldist: Bivariate probability distribution for negative selection,
                     taking in arguments (xx, yy, params)
        theta: Population-scaled mutation rate
        rho: Correlation coefficient used to connect negative and positive
             components of DFE.
        pts: Ignored

        Note that no normalization is performed, so alleles not covered by the
        specified range of gammas are assumed not to be seen in the data.

        Note that in the triallelic paper (Ragsdale et al. 2016 Genetics), we
        weighted each portion of the DFE based on rho. This was to ensure that
        the rho = 0 limit corresponded to independent sampling and the rho = 1
        limit corresponding to exactly equal selection coefficients for each
        pair of derived alleles.

        If we generalize to allow the marginal DFEs to differ between the
        two populations, the rho = 1 limit cannot be held perfectly between
        both negative and positive selection quadrants. (In log-space, the
        positive selection mass is infinitely far from the negative selection
        DFE.)

        We do use a similar procedure to the triallelic paper to end up
        with something like:
        p++ = p1*p2 + rho*(sqrt(p1*p2) - p1*p2)
        p+- = (1-rho) * p1*(1-p2)
        p-- = (1-p1)*(1-p2) + rho*(1-sqrt(p1*p2) - (1-p1)*(1-p2))

        The logic here is that in the rho=1 limit, we set the proportion
        positively selected to be the geometric mean of p1 and p2, the
        proportion positive in one pop and negative in the other (p+-) to be
        zero, and the remainder negative in both populations p--. We then
        linearly interpolate between the rho=0 and rho=1 cases.

        This requires the integration method to explicitly know about rho,
        so it's not completely general to all joint DFEs. rho is thus
        included as a parameter in the argument list.
        """
        biv_params = params[:-4]
        ppos1, gammapos1, ppos2, gammapos2 = params[-4:]

        Nneg = len(self.neg_gammas)
        weights = biv_seldist(-self.neg_gammas, -self.neg_gammas, biv_params)

        # Case in which both are negative, so we integrate directly
        # over the bivariate distribution
        neg_neg = self.integrate(biv_params, ns, biv_seldist, 1, pts)

        # Case in which both are positive, which is a single spectrum.
        try:
            pos_pos = self.spectra[self.gammas == gammapos1,
                                   self.gammas == gammapos2][0]
        except IndexError:
            raise IndexError('Failed to find requested gammapos1={0:.4f} '
                             'and/or gammapos2={1:0.4f} in cached spectra. '
                             'Were they included in additional_gammas during '
                             'cache generation?'.format(gammapos1,gammapos2))

        # Case in which pop1 is positive and pop2 is negative.
        pos_neg_spectra = np.squeeze(self.spectra[self.gammas==gammapos1,:Nneg])
        # Obtain the marginal DFE for pop2 by integrating over the weights
        # for pop1.
        pos_neg_weights = np.trapz(weights, self.neg_gammas, axis=0)
        # For numpy multiplication...
        pos_neg_weights = pos_neg_weights[:,np.newaxis,np.newaxis]
        # Do the weighted integral.
        pos_neg = np.trapz(pos_neg_weights*pos_neg_spectra,
                           self.neg_gammas, axis=0)

        # Case in which pop2 is positive and pop1 is negative.
        neg_pos_spectra = np.squeeze(self.spectra[:Nneg,self.gammas==gammapos2])
        # Now integrate over pop2 to get marginal DFE for pop1
        neg_pos_weights = np.trapz(weights, self.neg_gammas, axis=1)
        neg_pos_weights = neg_pos_weights[:,np.newaxis,np.newaxis]
        neg_pos = np.trapz(neg_pos_weights*neg_pos_spectra,
                           self.neg_gammas, axis=0)

        # Weighting factors for each quadrant of the DFE. Note that in the case
        # that ppos1==ppos2, these reduce to the model of Ragsdale et al. (2016)
        p_pos_pos = ppos1*ppos2 + rho*(np.sqrt(ppos1*ppos2) - ppos1*ppos2)
        p_pos_neg = (1-rho) * ppos1*(1-ppos2)
        p_neg_pos = (1-rho) * (1-ppos1)*ppos2
        p_neg_neg = (1-ppos1)*(1-ppos2)\
                + rho*(1-np.sqrt(ppos1*ppos2) - (1-ppos1)*(1-ppos2))

        fs = p_pos_pos*pos_pos + p_pos_neg*pos_neg + p_neg_pos*neg_pos\
                + p_neg_neg*neg_neg
        return theta*fs

    def integrate_symmetric_point_pos(self, params, ns, biv_seldist, theta, pts=None):
        """
        Convenience method for integrating spectra over a bivariate prob. dist.
        for negative gammas plus a symmetric point mass of positive selection.

        params: Parameters for sel_dist and positive selection.
                It is assumed that the last two parameters are:
                Proportion positive selection, postive gamma
                Earlier arguments are assumed to be for the continuous bivariate
                distribution.
                *It is assumed that the last of those earlier arguments is
                 the correlation coefficient rho.*
        ns: Ignored
        biv_seldist: Bivariate probability distribution for negative selection,
                     taking in arguments (xx, yy, params)
        theta: Population-scaled mutation rate
        pts: Ignored
        """
        seldist_params = params[:-2]
        rho = seldist_params[-1]
        ppos, gammapos = params[-2:]

        params = np.concatenate((seldist_params, [ppos,gammapos,ppos,gammapos]))

        return self.integrate_point_pos(params, ns, biv_seldist, theta,
                                        rho=rho, pts=None)

    @staticmethod
    def merge(caches):
        """
        Merge caches generated with split_jobs.

        caches: Cache2D objects to merge

        Note: Will fail if caches conflict or do not merge into a complete cache.
        """
        import copy
        # Copy our first cache to start the output
        new_cache = copy.deepcopy(caches[0])
        for other in caches[1:]:
            for ii, row in enumerate(other.spectra):
                for jj, fs in enumerate(row):
                    if fs is not None:
                        if new_cache.spectra[ii][jj] is not None \
                                and not np.all(new_cache.spectra[ii][jj] == fs):
                            raise ValueError("Merged cached conflicts with current.")
                        new_cache.spectra[ii][jj] = fs
        # Check that new cache is complete
        for ii, row in enumerate(new_cache.spectra):
            for jj, fs in enumerate(row):
                if fs is None:
                    raise ValueError("Cache is incomplete after merging. "
                            "First missing entry is {0},{1}.".format(ii,jj))
        # Convert cache spectra to array
        new_cache.spectra = np.array(new_cache.spectra)
        return new_cache

Static methods

def merge(caches)

Merge caches generated with split_jobs.

caches: Cache2D objects to merge

Note: Will fail if caches conflict or do not merge into a complete cache.

Expand source code
@staticmethod
def merge(caches):
    """
    Merge caches generated with split_jobs.

    caches: Cache2D objects to merge

    Note: Will fail if caches conflict or do not merge into a complete cache.
    """
    import copy
    # Copy our first cache to start the output
    new_cache = copy.deepcopy(caches[0])
    for other in caches[1:]:
        for ii, row in enumerate(other.spectra):
            for jj, fs in enumerate(row):
                if fs is not None:
                    if new_cache.spectra[ii][jj] is not None \
                            and not np.all(new_cache.spectra[ii][jj] == fs):
                        raise ValueError("Merged cached conflicts with current.")
                    new_cache.spectra[ii][jj] = fs
    # Check that new cache is complete
    for ii, row in enumerate(new_cache.spectra):
        for jj, fs in enumerate(row):
            if fs is None:
                raise ValueError("Cache is incomplete after merging. "
                        "First missing entry is {0},{1}.".format(ii,jj))
    # Convert cache spectra to array
    new_cache.spectra = np.array(new_cache.spectra)
    return new_cache

Methods

def integrate(self, params, ns, sel_dist, theta, pts, exterior_int=True)

Integrate spectra over a bivariate prob. dist. for negative gammas.

params: Parameters for sel_dist ns: Ignored sel_dist: Bivariate probability distribution, taking in arguments (xx, yy, params) theta: Population-scaled mutation rate pts: Ignored exterior_int: If False, do not integrate outside sampled domain.

Note also that the ns and pts arguments are ignored. They are only present for compatibility with other dadi functions that apply to demographic models.

Expand source code
def integrate(self, params, ns, sel_dist, theta, pts,
              exterior_int=True):
    """
    Integrate spectra over a bivariate prob. dist. for negative gammas.

    params: Parameters for sel_dist
    ns: Ignored
    sel_dist: Bivariate probability distribution,
              taking in arguments (xx, yy, params)
    theta: Population-scaled mutation rate
    pts: Ignored
    exterior_int: If False, do not integrate outside sampled domain.

    Note also that the ns and pts arguments are ignored. They are only
    present for compatibility with other dadi functions that apply to
    demographic models.
    """
    # Restrict our gammas and spectra to negative gammas.
    Nneg = len(self.neg_gammas)
    spectra = self.spectra[:Nneg, :Nneg]

    # Weights for integration
    weights = sel_dist(-self.neg_gammas, -self.neg_gammas, params)
    # Apply weighting. Could probably do this in a single fancy numpy
    # multiplication, which might be faster.
    weighted_spectra = 0*spectra
    for ii, row in enumerate(weights):
        for jj, w in enumerate(row):
            weighted_spectra[ii,jj] = w*spectra[ii,jj]

    # Integrate out the first two axes, which are the gamma axes.
    temp = np.trapz(weighted_spectra, self.neg_gammas, axis=0)
    fs = np.trapz(temp, self.neg_gammas, axis=0)

    if not exterior_int:
        return Spectrum(theta*fs)

    # Test whether our DFE pdf is symmetric. If it is we can dramatically reduce our calculations.
    testx = np.logspace(-2,2,3)
    testout = sel_dist(testx, testx, params)
    # We need atol=0 here to ensure small values don't lead to spurious pass of test
    symmetric_dfe = np.allclose(testout, testout.T, atol=0, rtol=1e-12)

    max_gamma = -self.neg_gammas[-1]
    min_gamma = -self.neg_gammas[0]
    # Account for density that is outside the simulated domain.
    weights_1low, weights_1high = 0*self.neg_gammas, 0*self.neg_gammas
    weights_2low, weights_2high = 0*self.neg_gammas, 0*self.neg_gammas
    for ii, gamma in enumerate(-self.neg_gammas):
        w_1low, err = scipy.integrate.quad(sel_dist, min_gamma, np.inf, epsabs=1e-4, epsrel=1e-3, args=(gamma,params))
        w_1high, err = scipy.integrate.quad(sel_dist, 0, max_gamma, epsabs=1e-4, epsrel=1e-3, args=(gamma,params))
        if symmetric_dfe:
            w_2low, w_2high = w_1low, w_1high
        else:
            marg2_func = lambda g2: sel_dist(gamma, g2, params)
            w_2low, err = scipy.integrate.quad(marg2_func, min_gamma, np.inf, epsabs=1e-4, epsrel=1e-3)
            w_2high, err = scipy.integrate.quad(marg2_func, 0, max_gamma, epsabs=1e-4, epsrel=1e-3)
        weights_1low[ii] = w_1low
        weights_2low[ii] = w_2low
        weights_1high[ii] = w_1high
        weights_2high[ii] = w_2high

    # Strongly deleterious gamma2, in-range gamma1
    fs += np.trapz(spectra[:,0] * weights_2low[:,np.newaxis,np.newaxis],
                   self.neg_gammas, axis=0)
    # Neutral gamma2, in-range gamma1
    fs += np.trapz(spectra[:,-1] * weights_2high[:,np.newaxis,np.newaxis],
                   self.neg_gammas, axis=0)
    # Strongly deleterious gamma1, in-range gamma2
    fs += np.trapz(spectra[0,:] * weights_1low[:,np.newaxis,np.newaxis],
                   self.neg_gammas, axis=0)
    # Neutral gamma1, in-range gamma2
    fs += np.trapz(spectra[-1,:] * weights_1high[:,np.newaxis,np.newaxis],
                   self.neg_gammas, axis=0)

    # Both neutral, really slow
    weight, err = scipy.integrate.dblquad(sel_dist, 0, max_gamma,
                                          lambda _: 0, lambda _: max_gamma,
                                          epsrel=1e-3, epsabs=1e-4, args=[params])
    fs += spectra[-1,-1]*weight

    # Neutral gamma2, strongly deleterious gamma1
    weight, err = scipy.integrate.dblquad(sel_dist, 0, max_gamma,
            lambda _: min_gamma, lambda _: np.inf,
            epsrel=1e-3, epsabs=1e-4 ,args=[params])
    fs += spectra[0,-1]*weight

    # Neutral gamma1, strongly deleterious gamma2
    if not symmetric_dfe:
        weight, err = scipy.integrate.dblquad(sel_dist, min_gamma, np.inf,
                lambda _: 0, lambda _: max_gamma,
                epsrel=1e-3, epsabs=1e-4, args=[params])
    fs += spectra[-1,0]*weight

    return Spectrum(theta*fs)
def integrate_point_pos(self, params, ns, biv_seldist, theta, rho=0, pts=None)

Integrate spectra over a bivariate prob. dist. for negative gammas plus a point mass of positive selection.

params: Parameters for sel_dist and positive selection. It is assumed that the last four parameters are: Proportion positive selection in pop1, postive gamma for pop1, prop. positive in pop2, and positive gamma for pop2. Earlier arguments are assumed to be for the continuous bivariate distribution. ns: Ignored biv_seldist: Bivariate probability distribution for negative selection, taking in arguments (xx, yy, params) theta: Population-scaled mutation rate rho: Correlation coefficient used to connect negative and positive components of DFE. pts: Ignored

Note that no normalization is performed, so alleles not covered by the specified range of gammas are assumed not to be seen in the data.

Note that in the triallelic paper (Ragsdale et al. 2016 Genetics), we weighted each portion of the DFE based on rho. This was to ensure that the rho = 0 limit corresponded to independent sampling and the rho = 1 limit corresponding to exactly equal selection coefficients for each pair of derived alleles.

If we generalize to allow the marginal DFEs to differ between the two populations, the rho = 1 limit cannot be held perfectly between both negative and positive selection quadrants. (In log-space, the positive selection mass is infinitely far from the negative selection DFE.)

We do use a similar procedure to the triallelic paper to end up with something like: p++ = p1p2 + rho(sqrt(p1p2) - p1p2) p+- = (1-rho) * p1(1-p2) p– = (1-p1)(1-p2) + rho(1-sqrt(p1p2) - (1-p1)*(1-p2))

The logic here is that in the rho=1 limit, we set the proportion positively selected to be the geometric mean of p1 and p2, the proportion positive in one pop and negative in the other (p+-) to be zero, and the remainder negative in both populations p–. We then linearly interpolate between the rho=0 and rho=1 cases.

This requires the integration method to explicitly know about rho, so it's not completely general to all joint DFEs. rho is thus included as a parameter in the argument list.

Expand source code
def integrate_point_pos(self, params, ns, biv_seldist, theta,
                        rho=0, pts=None):
    """
    Integrate spectra over a bivariate prob. dist. for negative gammas plus
    a point mass of positive selection.

    params: Parameters for sel_dist and positive selection.
            It is assumed that the last four parameters are:
            Proportion positive selection in pop1, postive gamma for pop1,
             prop. positive in pop2, and positive gamma for pop2.
            Earlier arguments are assumed to be for the continuous bivariate
            distribution.
    ns: Ignored
    biv_seldist: Bivariate probability distribution for negative selection,
                 taking in arguments (xx, yy, params)
    theta: Population-scaled mutation rate
    rho: Correlation coefficient used to connect negative and positive
         components of DFE.
    pts: Ignored

    Note that no normalization is performed, so alleles not covered by the
    specified range of gammas are assumed not to be seen in the data.

    Note that in the triallelic paper (Ragsdale et al. 2016 Genetics), we
    weighted each portion of the DFE based on rho. This was to ensure that
    the rho = 0 limit corresponded to independent sampling and the rho = 1
    limit corresponding to exactly equal selection coefficients for each
    pair of derived alleles.

    If we generalize to allow the marginal DFEs to differ between the
    two populations, the rho = 1 limit cannot be held perfectly between
    both negative and positive selection quadrants. (In log-space, the
    positive selection mass is infinitely far from the negative selection
    DFE.)

    We do use a similar procedure to the triallelic paper to end up
    with something like:
    p++ = p1*p2 + rho*(sqrt(p1*p2) - p1*p2)
    p+- = (1-rho) * p1*(1-p2)
    p-- = (1-p1)*(1-p2) + rho*(1-sqrt(p1*p2) - (1-p1)*(1-p2))

    The logic here is that in the rho=1 limit, we set the proportion
    positively selected to be the geometric mean of p1 and p2, the
    proportion positive in one pop and negative in the other (p+-) to be
    zero, and the remainder negative in both populations p--. We then
    linearly interpolate between the rho=0 and rho=1 cases.

    This requires the integration method to explicitly know about rho,
    so it's not completely general to all joint DFEs. rho is thus
    included as a parameter in the argument list.
    """
    biv_params = params[:-4]
    ppos1, gammapos1, ppos2, gammapos2 = params[-4:]

    Nneg = len(self.neg_gammas)
    weights = biv_seldist(-self.neg_gammas, -self.neg_gammas, biv_params)

    # Case in which both are negative, so we integrate directly
    # over the bivariate distribution
    neg_neg = self.integrate(biv_params, ns, biv_seldist, 1, pts)

    # Case in which both are positive, which is a single spectrum.
    try:
        pos_pos = self.spectra[self.gammas == gammapos1,
                               self.gammas == gammapos2][0]
    except IndexError:
        raise IndexError('Failed to find requested gammapos1={0:.4f} '
                         'and/or gammapos2={1:0.4f} in cached spectra. '
                         'Were they included in additional_gammas during '
                         'cache generation?'.format(gammapos1,gammapos2))

    # Case in which pop1 is positive and pop2 is negative.
    pos_neg_spectra = np.squeeze(self.spectra[self.gammas==gammapos1,:Nneg])
    # Obtain the marginal DFE for pop2 by integrating over the weights
    # for pop1.
    pos_neg_weights = np.trapz(weights, self.neg_gammas, axis=0)
    # For numpy multiplication...
    pos_neg_weights = pos_neg_weights[:,np.newaxis,np.newaxis]
    # Do the weighted integral.
    pos_neg = np.trapz(pos_neg_weights*pos_neg_spectra,
                       self.neg_gammas, axis=0)

    # Case in which pop2 is positive and pop1 is negative.
    neg_pos_spectra = np.squeeze(self.spectra[:Nneg,self.gammas==gammapos2])
    # Now integrate over pop2 to get marginal DFE for pop1
    neg_pos_weights = np.trapz(weights, self.neg_gammas, axis=1)
    neg_pos_weights = neg_pos_weights[:,np.newaxis,np.newaxis]
    neg_pos = np.trapz(neg_pos_weights*neg_pos_spectra,
                       self.neg_gammas, axis=0)

    # Weighting factors for each quadrant of the DFE. Note that in the case
    # that ppos1==ppos2, these reduce to the model of Ragsdale et al. (2016)
    p_pos_pos = ppos1*ppos2 + rho*(np.sqrt(ppos1*ppos2) - ppos1*ppos2)
    p_pos_neg = (1-rho) * ppos1*(1-ppos2)
    p_neg_pos = (1-rho) * (1-ppos1)*ppos2
    p_neg_neg = (1-ppos1)*(1-ppos2)\
            + rho*(1-np.sqrt(ppos1*ppos2) - (1-ppos1)*(1-ppos2))

    fs = p_pos_pos*pos_pos + p_pos_neg*pos_neg + p_neg_pos*neg_pos\
            + p_neg_neg*neg_neg
    return theta*fs
def integrate_symmetric_point_pos(self, params, ns, biv_seldist, theta, pts=None)

Convenience method for integrating spectra over a bivariate prob. dist. for negative gammas plus a symmetric point mass of positive selection.

params: Parameters for sel_dist and positive selection. It is assumed that the last two parameters are: Proportion positive selection, postive gamma Earlier arguments are assumed to be for the continuous bivariate distribution. It is assumed that the last of those earlier arguments is the correlation coefficient rho. ns: Ignored biv_seldist: Bivariate probability distribution for negative selection, taking in arguments (xx, yy, params) theta: Population-scaled mutation rate pts: Ignored

Expand source code
def integrate_symmetric_point_pos(self, params, ns, biv_seldist, theta, pts=None):
    """
    Convenience method for integrating spectra over a bivariate prob. dist.
    for negative gammas plus a symmetric point mass of positive selection.

    params: Parameters for sel_dist and positive selection.
            It is assumed that the last two parameters are:
            Proportion positive selection, postive gamma
            Earlier arguments are assumed to be for the continuous bivariate
            distribution.
            *It is assumed that the last of those earlier arguments is
             the correlation coefficient rho.*
    ns: Ignored
    biv_seldist: Bivariate probability distribution for negative selection,
                 taking in arguments (xx, yy, params)
    theta: Population-scaled mutation rate
    pts: Ignored
    """
    seldist_params = params[:-2]
    rho = seldist_params[-1]
    ppos, gammapos = params[-2:]

    params = np.concatenate((seldist_params, [ppos,gammapos,ppos,gammapos]))

    return self.integrate_point_pos(params, ns, biv_seldist, theta,
                                    rho=rho, pts=None)