"""Makes it easier to run analyses on several samples in parallel."""importloggingfromcollectionsimportabcfrommultiprocessingimportget_contextfromrich.progressimporttrackimportwarnings
[docs]defworkflow(func,args,threads=4,description=None,progress=True):"""Run analyses for several samples in parallel. This will analyze several samples in parallel. Includes a workaround for optlang memory leak. Arguments --------- func : function A function that takes a single argument (can be any object) and that performs your analysis for a single sample. args : array-like object An array-like object (list, tuple, numpy array, pandas Series, etc.) that contains the arguments for each sample. threads : positive int How many samples to analyze in parallel at once. description : str The dewscription shown in front of the progress bar. progress : bool Whether to show a progress bar. """ifnotisinstance(args,abc.Sized):ValueError("`args` must have a length.")ifdescriptionisNone:description="Running"logger.setLevel("ERROR")# Don't generate overhead if single threadifthreads==1:it=map(func,args)ifprogress:it=track(it,total=len(args),description=description)returnlist(it)# We don't use the context manager because of# https://pytest-cov.readthedocs.io/en/latest/subprocess-support.htmlpool=get_context("spawn").Pool(processes=threads,maxtasksperchild=1)try:withwarnings.catch_warnings():warnings.simplefilter("ignore")it=pool.imap_unordered(func,args)ifprogress:it=track(it,total=len(args),description="Running")results=list(it)finally:pool.close()pool.join()logger.setLevel("WARNING")returnresults