Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
208 views
in Technique[技术] by (71.8m points)

python - Speed up sampling of kernel estimate

Here's a MWE of a much larger code I'm using. Basically, it performs a Monte Carlo integration over a KDE (kernel density estimate) for all values located below a certain threshold (the integration method was suggested over at this question BTW: Integrate 2D kernel density estimate).

import numpy as np
from scipy import stats
import time

# Generate some random two-dimensional data:
def measure(n):
    "Measurement model, return two coupled measurements."
    m1 = np.random.normal(size=n)
    m2 = np.random.normal(scale=0.5, size=n)
    return m1+m2, m1-m2

# Get data.
m1, m2 = measure(20000)
# Define limits.
xmin = m1.min()
xmax = m1.max()
ymin = m2.min()
ymax = m2.max()

# Perform a kernel density estimate on the data.
x, y = np.mgrid[xmin:xmax:100j, ymin:ymax:100j]
values = np.vstack([m1, m2])
kernel = stats.gaussian_kde(values)

# Define point below which to integrate the kernel.
x1, y1 = 0.5, 0.5

# Get kernel value for this point.
tik = time.time()
iso = kernel((x1,y1))
print 'iso: ', time.time()-tik

# Sample from KDE distribution (Monte Carlo process).
tik = time.time()
sample = kernel.resample(size=1000)
print 'resample: ', time.time()-tik

# Filter the sample leaving only values for which
# the kernel evaluates to less than what it does for
# the (x1, y1) point defined above.
tik = time.time()
insample = kernel(sample) < iso
print 'filter/sample: ', time.time()-tik

# Integrate for all values below iso.
tik = time.time()
integral = insample.sum() / float(insample.shape[0])
print 'integral: ', time.time()-tik

The output looks something like this:

iso:  0.00259208679199
resample:  0.000817060470581
filter/sample:  2.10829401016
integral:  4.2200088501e-05

which clearly means that the filter/sample call is consuming almost all of the time the code uses to run. I have to run this block of code iteratively several thousand times so it can get pretty time consuming.

Is there any way to speed up the filtering/sampling process?


Add

Here's a slightly more realistic MWE of my actual code with Ophion's multi-threading solution written into it:

import numpy as np
from scipy import stats
from multiprocessing import Pool

def kde_integration(m_list):

    m1, m2 = [], []
    for item in m_list:
        # Color data.
        m1.append(item[0])
        # Magnitude data.
        m2.append(item[1])

    # Define limits.
    xmin, xmax = min(m1), max(m1)
    ymin, ymax = min(m2), max(m2)

    # Perform a kernel density estimate on the data:
    x, y = np.mgrid[xmin:xmax:100j, ymin:ymax:100j]
    values = np.vstack([m1, m2])
    kernel = stats.gaussian_kde(values)

    out_list = []

    for point in m_list:

        # Compute the point below which to integrate.
        iso = kernel((point[0], point[1]))

        # Sample KDE distribution
        sample = kernel.resample(size=1000)

        #Create definition.
        def calc_kernel(samp):
            return kernel(samp)

        #Choose number of cores and split input array.
        cores = 4
        torun = np.array_split(sample, cores, axis=1)

        #Calculate
        pool = Pool(processes=cores)
        results = pool.map(calc_kernel, torun)

        #Reintegrate and calculate results
        insample_mp = np.concatenate(results) < iso

        # Integrate for all values below iso.
        integral = insample_mp.sum() / float(insample_mp.shape[0])

        out_list.append(integral)

    return out_list


# Generate some random two-dimensional data:
def measure(n):
    "Measurement model, return two coupled measurements."
    m1 = np.random.normal(size=n)
    m2 = np.random.normal(scale=0.5, size=n)
    return m1+m2, m1-m2

# Create list to pass.
m_list = []
for i in range(60):
    m1, m2 = measure(5)
    m_list.append(m1.tolist())
    m_list.append(m2.tolist())

# Call KDE integration function.
print 'Integral result: ', kde_integration(m_list)

The solution presented by Ophion works great on the original code I presented, but fails with the following error in this version:

Integral result: Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 504, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

I tried moving the calc_kernel function around since one of the answers in this question Multiprocessing: How to use Pool.map on a function defined in a class? states that "the function that you give to map() must be accessible through an import of your module"; but I still can't get this code to work.

Any help will be very much appreciated.


Add 2

Implementing Ophion's suggestion to remove the calc_kernel function and simply using:

results = pool.map(kernel, torun)

works to get rid of the PicklingError but now I see that if I create an initial m_list of anything more than around 62-63 items I get this error:

Traceback (most recent call last):
  File "~/gauss_kde_temp.py", line 67, in <module>
    print 'Integral result: ', kde_integration(m_list)
  File "~/gauss_kde_temp.py", line 38, in kde_integration
    pool = Pool(processes=cores)
  File "/usr/lib/python2.7/multiprocessing/__init__.py", line 232, in Pool
    return Pool(processes, initializer, initargs, maxtasksperchild)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 161, in __init__
    self._result_handler.start()
  File "/usr/lib/python2.7/threading.py", line 494, in start
    _start_new_thread(self.__bootstrap, ())
thread.error: can't start new thread

Since my actual list in my real implementation of this code can have up to 2000 items, this issue renders the code unusable. Line 38 is this one:

pool = Pool(processes=cores)

so apparently it has something to do with the number of cores I'm using?

This question "Can't start a new thread error" in Python suggests using:

threading.active_count()

to check the number of threads I have going when I get that error. I checked and it always crashes when it reaches 374 threads. How can I code around this issue?


Here's the new question dealing with this last issue: Thread error: can't start new thread

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Probably the easiest way to speed this up is to parallelize kernel(sample):

Taking this code fragment:

tik = time.time()
insample = kernel(sample) < iso
print 'filter/sample: ', time.time()-tik
#filter/sample:  1.94065904617

Change this to use multiprocessing:

from multiprocessing import Pool
tik = time.time()

#Create definition.
def calc_kernel(samp):
    return kernel(samp)

#Choose number of cores and split input array.
cores = 4
torun = np.array_split(sample, cores, axis=1)

#Calculate
pool = Pool(processes=cores)
results = pool.map(calc_kernel, torun)

#Reintegrate and calculate results
insample_mp = np.concatenate(results) < iso

print 'multiprocessing filter/sample: ', time.time()-tik
#multiprocessing filter/sample:  0.496874094009

Double check they are returning the same answer:

print np.all(insample==insample_mp)
#True

A 3.9x improvement on 4 cores. Not sure what you are running this on, but after about 6 processors your input array size is not large enough to get considerably gains. For example using 20 processors its only about 5.8x faster.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...