L i67dZddlZddlZddlZddlmZ eddl m Z ddl m Z ddl mZmZddlmZmZdd lmZmZ ddlZGd d eZGd d eZGdde ZGddeZy#e$r edZYewxYw#e$rdZYIwxYw)aCustom implementation of multiprocessing.Pool with custom pickler. This module provides efficient ways of working with data stored in shared memory with numpy.memmap arrays without inducing any memory copy between the parent and child processes. This module should not be imported if multiprocessing is not available as it implements subclasses of multiprocessing Pool that uses a custom alternative to SimpleQueue. N)sleep)BytesIO)Pool)HIGHEST_PROTOCOLPickler)TemporaryResourcesManagerget_memmapping_reducers)assert_spawningmpc"eZdZdZdefdZdZy)CustomizablePickleraPickler that accepts custom reducers. TODO python2_drop : can this be simplified ? HIGHEST_PROTOCOL is selected by default as this pickler is used to pickle ephemeral datastructures for interprocess communication hence no backward compatibility is required. `reducers` is expected to be a dictionary with key/values being `(type, callable)` pairs where `callable` is a function that give an instance of `type` will return a tuple `(constructor, tuple_of_objects)` to rebuild an instance out of the pickled `tuple_of_objects` as would return a `__reduce__` method. See the standard library documentation on pickling for more details. Nc>tj||||i}ttdr$tjj |_n#t j j |_|jD]\}}|j||y)N)protocoldispatch) r__init__hasattrrcopycopyregdispatch_tableitemsregister)selfwriterreducersrtype reduce_funcs Q/mnt/ssd/data/python-lab/Trading/venv/lib/python3.12/site-packages/joblib/pool.pyrzCustomizablePickler.__init__Fsv9  H 7J '$,,113DM#*"8"8"="="?D !)!1 - D+ MM$ , -cnttdrfd}||j|<y|j|<y)z@Attach a reducer function to a given type in the dispatch table.rc:|}|j|d|iy)Nobj) save_reduce)rr"reducedrs r dispatcherz0CustomizablePickler.register..dispatcherZs"%c*   73S3rN)rrrr)rrrr%s ` rrzCustomizablePickler.registerUs2 7J ' 4#-DMM$ (3D   %r)__name__ __module__ __qualname____doc__rrrrrrr.s.)-7G - 4rrc0eZdZdZddZdZdZdZdZy) CustomizablePicklingQueueaLocked Pipe implementation that uses a customizable pickler. This class is an alternative to the multiprocessing implementation of SimpleQueue in order to make it possible to pass custom pickling reducers, for instance to avoid memory copy when passing memory mapped datastructures. `reducers` is expected to be a dict with key / values being `(type, callable)` pairs where `callable` is a function that, given an instance of `type`, will return a tuple `(constructor, tuple_of_objects)` to rebuild an instance out of the pickled `tuple_of_objects` as would return a `__reduce__` method. See the standard library documentation on pickling for more details. Nc||_|jd\|_|_|j |_t jdk(rd|_n|j |_|jy)NF)duplexwin32) _reducersPipe_reader_writerLock_rlocksysplatform_wlock _make_methods)rcontextrs rrz"CustomizablePicklingQueue.__init__ts[!%,\\\%?" dllln <<7 "DK!,,.DK rct||j|j|j|j|j fSN)r r2r3r5r8r0rs r __getstate__z&CustomizablePicklingQueue.__getstate__~s/ dllDKKdnnUUrcf|\|_|_|_|_|_|j yr<)r2r3r5r8r0r9)rstates r __setstate__z&CustomizablePicklingQueue.__setstate__s(QVNt|T[$+t~ rc8|jj Sr<)r2pollr=s remptyzCustomizablePicklingQueue.emptys<<$$&&&rcjjx_jjjj cfd}|_jr fd_njjx_j_ yjjjj cfd}|_ y)NcH S#wxYwr<r*)racquirerecvrreleasesrgetz4CustomizablePicklingQueue._make_methods..gets J v  s !ct}t|jj|jj |j yr<)rrr0dumpr3 send_bytesgetvalue)r"bufferrs rsendz5CustomizablePicklingQueue._make_methods..sends< #FDNN;@@E ''(9:rcJ |S#wxYwr<r*)r"rP wlock_acquire wlock_releases rputz4CustomizablePicklingQueue._make_methods..puts$9!OMOs ") r2rH_recvr5acquirereleaserJr0_sendr3rPr8rT) rrJrTrGrHrIrPrRrSs ` @@@@@@rr9z'CustomizablePicklingQueue._make_methodss LL--- T![[00$++2E2E(  >> ; DJ $ 1 1 1DJ ;; DH,0KK,?,?ATAT (M= $DHrr<) r&r'r(r)rr>rArDr9r*rrr,r,cs! V'$rr,c,eZdZdZ dfd ZdZxZS) PicklingPoolaPool implementation with customizable pickling reducers. This is useful to control how data is shipped between processes and makes it possible to use shared memory without useless copies induces by the default pickling methods of the original objects passed as arguments to dispatch. `forward_reducers` and `backward_reducers` are expected to be dictionaries with key/values being `(type, callable)` pairs where `callable` is a function that, given an instance of `type`, will return a tuple `(constructor, tuple_of_objects)` to rebuild an instance out of the pickled `tuple_of_objects` as would return a `__reduce__` method. See the standard library documentation about pickling for more details. c | t}| t}||_||_t|}|j|t t |di|y)N) processesr*)dict_forward_reducers_backward_reducersupdatesuperrZr)rr\forward_reducersbackward_reducerskwargspoolargs __class__s rrzPicklingPool.__init__sY  ##v   $ $ !1"3), lD*6X6rct|dt}t||j|_t||j |_|jj|_|j j|_ y)N_ctx) getattrr r,r^_inqueuer_ _outqueuerX _quick_putrU _quick_get)rr:s r _setup_queueszPicklingPool._setup_queuessZ$+1'4;Q;QR 27DProcess pool that shares large arrays to avoid memory copy. This drop-in replacement for `multiprocessing.pool.Pool` makes it possible to work efficiently with shared memory in a numpy context. Existing instances of numpy.memmap are preserved: the child suprocesses will have access to the same shared memory in the original mode except for the 'w+' mode that is automatically transformed as 'r+' to avoid zeroing the original data upon instantiation. Furthermore large arrays from the parent process are automatically dumped to a temporary folder on the filesystem such as child processes to access their content via memmapping (file system backed shared memory). Note: it is important to call the terminate method to collect the temporary folder used by the pool. Parameters ---------- processes: int, optional Number of worker processes running concurrently in the pool. initializer: callable, optional Callable executed on worker process creation. initargs: tuple, optional Arguments passed to the initializer callable. temp_folder: (str, callable) optional If str: Folder to be used by the pool for memmapping large arrays for sharing memory with worker processes. If None, this will try in order: - a folder pointed by the JOBLIB_TEMP_FOLDER environment variable, - /dev/shm if the folder exists and is writable: this is a RAMdisk filesystem available by default on modern Linux distributions, - the default system temporary folder that can be overridden with TMP, TMPDIR or TEMP environment variables, typically /tmp under Unix operating systems. if callable: An callable in charge of dynamically resolving a temporary folder for memmapping large arrays. max_nbytes int or None, optional, 1e6 by default Threshold on the size of arrays passed to the workers that triggers automated memory mapping in temp_folder. Use None to disable memmapping of large arrays. mmap_mode: {'r+', 'r', 'w+', 'c'} Memmapping mode for numpy arrays passed to workers. See 'max_nbytes' parameter documentation for more details. forward_reducers: dictionary, optional Reducers used to pickle objects passed from main process to worker processes: see below. backward_reducers: dictionary, optional Reducers used to pickle return values from workers back to the main process. verbose: int, optional Make it possible to monitor how the communication of numpy arrays with the subprocess is handled (pickling or memmapping) prewarm: bool or str, optional, "auto" by default. If True, force a read on newly memmapped array to make sure that OS pre-cache it in memory. This can be useful to avoid concurrent disk access when the same data array is passed to different worker processes. If "auto" (by default), prewarm is set to True, unless the Linux shared memory partition /dev/shm is available and used as temp folder. `forward_reducers` and `backward_reducers` are expected to be dictionaries with key/values being `(type, callable)` pairs where `callable` is a function that give an instance of `type` will return a tuple `(constructor, tuple_of_objects)` to rebuild an instance out of the pickled `tuple_of_objects` as would return a `__reduce__` method. See the standard library documentation on pickling for more details. c t|} | |_t| j|||||d|\}}t |||} | j | t t|"di| y)NF)temp_folder_resolver max_nbytes mmap_moderbrcverboseunlink_on_gc_collectprewarm)r\rbrcr*) r _temp_folder_managerr resolve_temp_folder_namer]r`rarrr) rr\ temp_folderrurvrbrcrwryrdmanagerrerfs rrzMemmappingPool.__init__#s~,K8$+! /F!(!A!A!-/!& / ++-/   nd,8x8rc(d}t|D]} tt|n|jjy#t$rE}t |t r+td|dz|k(rtjd|zYd}~}d}~wwxYw)N g?rz@Failed to terminate worker processes in multiprocessing pool: %r) rangerarr terminateOSError isinstance WindowsErrorrwarningswarnrz_clean_temporary_resources)r n_retriesierfs rrzMemmappingPool.terminateIs y! A nd57  !!<<> a.#J1u ) 8:;< sA B ;B  Bct|dd |jS|jj|_|jS)N_cached_temp_folder)rirrzr{r=s r _temp_folderzMemmappingPool._temp_folder]sJ 4. 5 A++ +))BBD  $++ +r)NNg.ArNNrF) r&r'r(r)rrpropertyrrorps@rrrrrsCJ\$9L?( , ,rrr)r)rr6rtimerr NameErrorriormultiprocessing.poolrpicklerr_memmapping_reducerr r _multiprocessing_helpersr r numpynp ImportErrorrobjectr,rZrrr*rrrs  &,S924'24jJJZ#/4#/LT,\T,:L Bs"A.B. A>=A>B  B