Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
551 views
in Technique[技术] by (71.8m points)

python - Multiprocessing - map over list, killing processes that stall above timeout limit

I have a list of elements that I want to modify using multiprocessing. The issue is that for some particular inputs (unobservable prior to attempting), part of the my function stalls. I've shown this conceptually with the code below, where the function sometimes_stalling_processing() will occasionally stall indefinitely.

To put this into context, I'm processing a bunch of links using a web scraper and some of these links stall even with the use of timeout in the requests module. I've attempted different approaches (e.g. using eventlet), but come to the conclusion that it's perhaps easier to handle it at the multiprocessing level.

def stable_processing(obs):
    ...
    return processed_obs

def sometimes_stalling_processing(obs):
    ...
    return processed_obs

def extract_info(obs):
    new_obs = stable_processing(obs)
    try:
        new_obs = sometimes_stalling_processing(obs)
    except MyTimedOutError: # error doesn't exist, just here for conceptual purposes
        pass
    return new_obs

pool = Pool(processes=n_threads)
processed_dataset = pool.map(extract_info, dataset)
pool.close()
pool.join()

This question (How can I abort a task in a multiprocessing.Pool after a timeout?) seems very similar, but I've been unable to convert it to work with map instead of apply. I've also tried using the eventlet package, but that doesn't work. Note that I'm using Python 2.7.

How do I make pool.map() timeout on individual observations and kill sometimes_stalling_processing?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

You can take a look at the pebble library.

from pebble import ProcessPool
from concurrent.futures import TimeoutError

def sometimes_stalling_processing(obs):
    ...
    return processed_obs

with ProcessPool() as pool:
    future = pool.map(sometimes_stalling_processing, dataset, timeout=10)

    iterator = future.result()

    while True:
        try:
            result = next(iterator)
        except StopIteration:
            break
        except TimeoutError as error:
            print("function took longer than %d seconds" % error.args[1])

More examples in the documentaion.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...