L i*3UdZddlmZddlZddlZddlZddlZddlZddl Z ddl Z ddl Z ddl m Z mZmZmZmZmZmZddlmZddlZddlmZddlZddlmZddlmZddlm Z m!Z!m"Z"m#Z#dd l$m%Z%m&Z&dd l'm(Z(m)Z)m*Z*m+Z+erdd l,m-Z-gd Z.ed Z/eddZ0ee1gdfZ2ee3e/ge fZ4ejjjlZ6de7d<ejjjpZ8ejrjtZ:ejve<Z=GddZ>Gdde*Z?dZ@dZAdZBGddee0ZCGddZDGddeDZEGd d!eDZFy)"aDefinition of the DataLoader and associated iterators that subclass _BaseDataLoaderIter. To support these two classes, in `./_utils` we define many utility methods and functions to be run in multiprocessing. E.g., the data loading worker loop is in `./_utils/worker.py`. ) annotationsN)AnyCallableGenericOptional TYPE_CHECKINGTypeVarUnion)Self)ExceptionWrapper)_utils)!_IterDataPipeSerializationWrapper _MapDataPipeSerializationWrapper IterDataPipe MapDataPipe)DatasetIterableDataset) BatchSampler RandomSamplerSamplerSequentialSampler)Iterable) DataLoaderget_worker_infodefault_collatedefault_convert_T_T_coT) covariant _collate_fn_trc$eZdZdZdZedZy) _DatasetKindrc|tjk(r"tjj ||||Stjj ||||SN)r"Mapr fetch_MapDatasetFetcher_IterableDatasetFetcher)kinddatasetauto_collation collate_fn drop_lasts a/mnt/ssd/data/python-lab/Trading/venv/lib/python3.12/site-packages/torch/utils/data/dataloader.pycreate_fetcherz_DatasetKind.create_fetcherPsR <## #<<22Y <<77Y N)__name__ __module__ __qualname__r&r staticmethodr0r1r/r"r"Ls CHr1r"ceZdZdZdZy)_InfiniteConstantSamplerzxAnalogous to ``itertools.repeat(None, None)``. Used as sampler for :class:`~torch.utils.data.IterableDataset`. c#K dwr%r6selfs r/__iter__z!_InfiniteConstantSampler.__iter__bsJs N)r2r3r4__doc__r<r6r1r/r8r8\s  r1r8ctjr KK##22-!1!y!"r1ctjdtjj|}t |t j rt j|d||jS)Nr6dtype generatorr)srcgroup) rFemptyint64random_rJr? ProcessGroup broadcastitem)rZpg _shared_seeds r/_share_dist_seedresS;;r5== =RL"d''( |"5    r1cjeZdZUdZded<ded<ded<ded <ded <d ed <d ed<ded<ded<ded<dZ d#ddddd d$dZd%dZedZ e jdZ fdZ d%dZ edZ ed Zd&d!Zd"ZxZS)'ra Data loader combines a dataset and a sampler, and provides an iterable over the given dataset. The :class:`~torch.utils.data.DataLoader` supports both map-style and iterable-style datasets with single- or multi-process loading, customizing loading order and optional automatic batching (collation) and memory pinning. See :py:mod:`torch.utils.data` documentation page for more details. Args: dataset (Dataset): dataset from which to load the data. batch_size (int, optional): how many samples per batch to load (default: ``1``). shuffle (bool, optional): set to ``True`` to have the data reshuffled at every epoch (default: ``False``). sampler (Sampler or Iterable, optional): defines the strategy to draw samples from the dataset. Can be any ``Iterable`` with ``__len__`` implemented. If specified, :attr:`shuffle` must not be specified. batch_sampler (Sampler or Iterable, optional): like :attr:`sampler`, but returns a batch of indices at a time. Mutually exclusive with :attr:`batch_size`, :attr:`shuffle`, :attr:`sampler`, and :attr:`drop_last`. num_workers (int, optional): how many subprocesses to use for data loading. ``0`` means that the data will be loaded in the main process. (default: ``0``) collate_fn (Callable, optional): merges a list of samples to form a mini-batch of Tensor(s). Used when using batched loading from a map-style dataset. pin_memory (bool, optional): If ``True``, the data loader will copy Tensors into device/CUDA pinned memory before returning them. If your data elements are a custom type, or your :attr:`collate_fn` returns a batch that is a custom type, see the example below. drop_last (bool, optional): set to ``True`` to drop the last incomplete batch, if the dataset size is not divisible by the batch size. If ``False`` and the size of dataset is not divisible by the batch size, then the last batch will be smaller. (default: ``False``) timeout (numeric, optional): if positive, the timeout value for collecting a batch from workers. Should always be non-negative. (default: ``0``) worker_init_fn (Callable, optional): If not ``None``, this will be called on each worker subprocess with the worker id (an int in ``[0, num_workers - 1]``) as input, after seeding and before data loading. (default: ``None``) multiprocessing_context (str or multiprocessing.context.BaseContext, optional): If ``None``, the default `multiprocessing context `_ # noqa: D401 of your operating system will be used. (default: ``None``) generator (torch.Generator, optional): If not ``None``, this RNG will be used by RandomSampler to generate random indexes and multiprocessing to generate ``base_seed`` for workers. (default: ``None``) prefetch_factor (int, optional, keyword-only arg): Number of batches loaded in advance by each worker. ``2`` means there will be a total of 2 * num_workers batches prefetched across all workers. (default value depends on the set value for num_workers. If value of num_workers=0 default is ``None``. Otherwise, if value of ``num_workers > 0`` default is ``2``). persistent_workers (bool, optional): If ``True``, the data loader will not shut down the worker processes after a dataset has been consumed once. This allows to maintain the workers `Dataset` instances alive. (default: ``False``) pin_memory_device (str, optional): Deprecated, the current :ref:`accelerator` will be used as the device if ``pin_memory=True``. in_order (bool, optional): If ``False``, the data loader will not enforce that batches are returned in a first-in, first-out order. Only applies when ``num_workers > 0``. (default: ``True``) .. warning:: If the ``spawn`` start method is used, :attr:`worker_init_fn` cannot be an unpicklable object, e.g., a lambda function. See :ref:`multiprocessing-best-practices` on more details related to multiprocessing in PyTorch. .. warning:: ``len(dataloader)`` heuristic is based on the length of the sampler used. When :attr:`dataset` is an :class:`~torch.utils.data.IterableDataset`, it instead returns an estimate based on ``len(dataset) / batch_size``, with proper rounding depending on :attr:`drop_last`, regardless of multi-process loading configurations. This represents the best guess PyTorch can make because PyTorch trusts user :attr:`dataset` code in correctly handling multi-process loading to avoid duplicate data. However, if sharding results in multiple workers having incomplete last batches, this estimate can still be inaccurate, because (1) an otherwise complete batch can be broken into multiple ones and (2) more than one batch worth of samples can be dropped when :attr:`drop_last` is set. Unfortunately, PyTorch can not detect such cases in general. See `Dataset Types`_ for more details on these two types of datasets and how :class:`~torch.utils.data.IterableDataset` interacts with `Multi-process data loading`_. .. warning:: See :ref:`reproducibility`, and :ref:`dataloader-workers-random-seed`, and :ref:`data-loading-randomness` notes for random seed related questions. .. warning:: Setting `in_order` to `False` can harm reproducibility and may lead to a skewed data distribution being fed to the trainer in cases with imbalanced data. Dataset[_T_co]r+ Optional[int] batch_sizeintrIbool pin_memoryr.floattimeoutzUnion[Sampler, Iterable]samplerstrpin_memory_deviceprefetch_factorzOptional[_BaseDataLoaderIter] _iteratorFNT)rrpersistent_workersrqin_ordercRtjjd|dkr td| dkr td|dk(r | td|dkDr|d}n||dkr td|r|dk(r td||_||_||_||_||_| |_ | |_ | |_ ||_ t|jtrt|j|_n4t|jt rt#|j|_t|t$rt&j(|_t|tr8|Htj,j.j0j3|| }n|d vrtd ||td ||.td |t5|}t&j6|_| |r td||dk7s|s|| r tdd}d} n| | r td|C|j*t&j(k(r t9}n|rt;|| }n t=|}|| t?||| }||_ | |_!||_"||_#| |_$|A|jJrtLjNjP}ntLjNjR}||_*||_+d|_,d|_-d|_.|j_tj`dddy)Nzpython.data_loaderrzXnum_workers option should be non-negative; use num_workers=0 to disable multiprocessing.z%timeout option should be non-negativezprefetch_factor option could only be specified in multiprocessing.let num_workers > 0 to enable multiprocessing, otherwise set prefetch_factor to None.z-prefetch_factor option should be non-negativez/persistent_workers option needs num_workers > 0)shuffle>FNzVDataLoader with IterableDataset: expected unspecified shuffle option, but got shuffle=zVDataLoader with IterableDataset: expected unspecified sampler option, but got sampler=zbDataLoader with IterableDataset: expected unspecified batch_sampler option, but got batch_sampler=z1sampler option is mutually exclusive with shuffler#z[batch_sampler option is mutually exclusive with batch_size, shuffle, sampler, and drop_lastFzVbatch_size=None option disables auto-batching and is mutually exclusive with drop_lastrYT DataloaderenabledTrue)1rF_C_log_api_usage_once ValueErrorr+rIrrrlrqrnrMmultiprocessing_contextrvrJrrrrrr"r _dataset_kindrGrHrKapply_shuffle_settingsrkr&r8rrrrir.ro batch_samplerrZ_auto_collationr collaterrr-ru_DataLoader__initialized_IterableDataset_len_calledrscheck_worker_number_rationality set_vital)r;r+riryrorrIr-rlr.rnrMrrZrrrurqrvs r/__init__zDataLoader.__init__sT* $$%9: ?@  Q;DE E !  ;h 1_!8O  (_q-@LM M +"2NO O &.$!2 ,'>$   dllL 1 0), but got num_workers=) rIrJrprFmultiprocessingget_all_start_methodsr get_contextpython_multiprocessingcontext BaseContext TypeErrorr)r;rvalid_start_methodss r/rz"DataLoader.multiprocessing_contexts " .!#5s;*/*?*?*U*U*W'.6II(FFYE\]77N6QS /4.C.C.O.O//+"+-C-K-K-W-W$33J2KM !##'#3#3"46 *A&r1c|jr)|dvr%t|d|jjdt|||y)N)rirror.r+ruz# attribute should not be set after z is initialized)rr __class__r2super __setattr__)r;attrvalrs r/rzDataLoader.__setattr__sU   $+ # &;DNN> !%%d+>> !%%' 'r1c|jduSr%)rr:s r/rzDataLoader._auto_collations!!--r1cJ|jr |jS|jSr%)rrror:s r/_index_samplerzDataLoader._index_samplers#   %% %<< r1c*|jtjk(rbt|jx}|_|j 8ddlm}|jr||j z}|S|||j z }|St|jS)Nr)ceil) rr"rlenr+rrimathrr.r)r;lengthrs r/__len__zDataLoader.__len__s   !6!6 6"9>#t6FM"&4??":;FMt**+ +r1cd}|jr|jdk(ryd}d}ttdr! ttjd}d}|tj }||}|(tj|||j|y|j|kDr(tj|||j|yy#t $rYwxYw)NcL|dj||rdndnd}d|d|d}|S)Nz|Our suggested max number of worker in current system is {}{}, which is smaller than what this DataLoader is going to create.rtz% (`cpuset` is not taken into account)zUDataLoader is not able to compute a suggested max number of worker in current system.zThis DataLoader will create z worker processes in total. z Please be aware that excessive worker creation might get DataLoader running slow or even freeze, lower the worker number to avoid potential slowness/freeze if necessary.)format)num_worker_suggestnum_worker_createdcpuset_checkedsuggested_max_worker_msgwarn_msgs r/_create_warning_msgzGDataLoader.check_worker_number_rationality.._create_warning_msg;sb&1Hf* .!H l! %*//A.BB^_w^xy[[  Or1rFsched_getaffinityT) rIhasattrosrr Exception cpu_countwarningswarn)r;rmax_num_worker_suggestrrs r/rz*DataLoader.check_worker_number_rationality s6 84#3#3q#8 "& 2* + ),R-A-A!-D)E&!% " ) I$)2& ! ) MM#*D,<,r#tAjBd|j>d|j<r5tjDjsd}tAjB||j<xrtjDj|_#|jFr,tjDjIx} |jJnd|_&|jLdk(rd|_#d}tAjB||jN|_(|jR|_*tW|j0|_,tjZd tj\ j_|j ja|_1|jd|_3d |_4d |jjjld|_7y)Ngloo)backendznpin_memory_device is deprecated, the current accelerator will be used as the device,ignore pin_memory_device='z'.zj'pin_memory' argument is set as true but no accelerator is found, then device pinned memory won't be used.mpsFzf'pin_memory' argument is set as true but not supported on MPS now, device pinned memory won't be used.r6rWrYrzenumerate(DataLoader)#z .__next__)8r+_datasetrd_pgrJrr?r@rA new_grouprerZrF Generator manual_seedrGrHrKapply_random_seedrrrr. _drop_lastrrI _num_workersrD _world_size_rankrlrqrr accelerator _pin_memorycurrent_acceleratortype_pin_memory_devicern_timeoutr- _collate_fniter _sampler_iterr]r^r_rb _base_seedru_persistent_workers _num_yieldedrr2 _profile_name)r;loader shared_rngwsrankraccs r/rz_BaseDataLoaderIter.__init__{s   dmm\ 2  "t':':'<>>&9 01A1A488 LD *J  " "4#4#4 5!KK,,;;MM zDM$11+1+M+M(%55 **$33"..,.D   !9!9 MM--3-E-E,FbJ    U%6%6%C%C%E;  MM( #",,Q1B1B1O1O1Q ))==??L HH   " "e +$D 6  MM( # !,,!$"5"56 KK%++ . Wv//W 0 TV  $*#<#< 5dnn6M6M5NiXr1c|Sr%r6r:s r/r<z_BaseDataLoaderIter.__iter__s r1ct|j|_d|_|j|_t |j trt|j|j|_ tj}|j|jtjj j"j%|j ||_yyr)rrrrrrJrrrerZrrdrFrrrGrHrKr)r;r first_iterrs r/rz_BaseDataLoaderIter._resets!$"5"56+1+M+M( dmm\ 2 01A1A488 LD *J  " "4#4#4 5!KK,,;;MM zDM 3r1c,t|jSr%)nextrr:s r/ _next_indexz_BaseDataLoaderIter._next_indexsD&&''r1ctr%)NotImplementedErrorr:s r/ _next_dataz_BaseDataLoaderIter._next_datas!!r1cDtjjj|j5|j |j |j}|xjdz c_|jtjk(rx|jl|j|jkDrSd|jd|jd|jd}|jdkDr|dz }tj ||cdddS#1swYyxYw)Nr#zLength of IterableDataset z was reported to be z&(when accessing len(dataloader)), but z samples have been fetched. rzFor multiprocessing data-loading, this could be caused by not properly configuring the IterableDataset replica at each worker. Please see https://pytorch.org/docs/stable/data.html#torch.utils.data.IterableDataset for examples.)rFautogradprofilerrecord_functionrrrrrrr"rrrrrr)r;rHrs r/__next__z_BaseDataLoaderIter.__next__s ^^ $ $ 4 4T5G5G H !!) ??$D    " ""l&;&;;44@%%(H(HH1?STXTtTtSut |||j|_|j|_|j dkDsJ|jdkDsJ|jtj}n |j}|j|_ t|jttfr?t!j"t$|j|j&|j(|_ |j+|_d|_d|_|j3|_g|_g|_t;|j D]!}|j+}|j=|j?t@jBjD|jF|j||j,|j4|jH|jJ|jL|jN|j||j |jP|jRf}d|_*|jW|j6jY||j8jY|$|jZrt]j2|_/taj*|_1tjdjg}t]jht@jjjl|j,|jb||j^|jnf}d|_*|jW||_8n|j,|_1|jPrA|jZr5ddl9}|j8D]"}|jutvjx|$t@jzj}t|td|j8Dt@jzjd|_|j|dy)NrF)targetargsTc34K|]}|jywr%)pid.0ws r/ z:_MultiProcessingDataLoaderIter.__init__..s/A!%%/s)r)Crrrr_prefetch_factorrv _in_orderrrrFrrM_worker_init_fnrJrrr functoolspartialrUrrQueue_worker_result_queue_worker_pids_set _shutdownEvent_workers_done_event _index_queues_workersrangecancel_join_threadProcessr worker _worker_looprrrrrrrddaemonstartappendr threading_pin_memory_thread_done_eventqueue _data_queuercurrent_device_indexThreadrl_pin_memory_loopr_pin_memory_threadatexitregisterr_clean_up_workersignal_handling_set_worker_pidsidtuple_set_SIGCHLD_handlerr) r;rri index_queuercurrent_device_idpin_memory_threadr%rs r/rz'_MultiProcessingDataLoaderIter.__init__Psd  & 6 6  1$$$$$q(((  ) ) 1&+&;&; #&,&D&D #%44 dmmlK%@ A#,#4#4($$   $D %<$A$A$C! %#:#@#@#B  t(()" $A1779K  * * ,'//}}11&&MM--,,(($$OOOO((%%,,%%0A&AH GGI    % %k 2 MM  #E" $H   1:1BD . %{{}D  % 1 1 F F H  ) 0 0((99--$$%66++  ! (,  $  # # %'8D ##88D   # #(8(8 ]] T > O OQRS T // tH // /  335 $ Ft ,r1cJt |||d|_d|_i|_d|_t |jDcgc]}dc}|_t |jDcgc]}dc}|_ tjt |j|_ |st |jD]G}|j|jtj j#|j$I|j}|dkDrF|j'\}}t)|tj j"r |J|dz}|dkDrFt |j*|jzD]}|j-ycc}wcc}w)NrTr#)rr _send_idx _rcvd_idx _task_info_tasks_outstandingrr_workers_status_workers_num_tasks itertoolscycle_worker_queue_idx_cyclerputr r_ResumeIterationrd _get_datarJr_try_put_index) r;rrr-idxresume_iteration_cnt return_idx return_data_rs r/rz%_MultiProcessingDataLoaderIter._resetsx vz*  /4D4E4E.FGG /4D4E4E.F"G1"G'0uT=N=N7O'P$T../ ""3'++MM2243D3DE $(#4#4 &**...*:' Kj&--*H*HI&...(A-( '* t,,t/@/@@A "A    ! "+ H #Hs  F- F c |jj|}d|fS#t$r0}g}t|jD]H\}}|j |s|j r'|j||j|Jt|dkDr(djd|D}td|d|t|tjrYd}~yddl}ddl} d } t#| D cgc]} | j%ncc} wc} #t&$r/}|j|j(k(r td dYd}~d}~wwxYwd}~wwxYw) NrnTrz, c3FK|]}t|jywr%)rprrs r/rz?_MultiProcessingDataLoaderIter._try_get_data..s$HASZ$Hs!zDataLoader worker (pid(s) z) exited unexpectedly)FN aToo many open files. Communication with the workers is no longer possible. Please increase the limit using `ulimit -n` in the shell or change the sharing strategy by calling `torch.multiprocessing.set_sharing_strategy('file_system')` at the beginning of your code)r getr enumeraterr6is_aliver_mark_worker_as_unavailablerjoin RuntimeErrorrJrEmptyerrnotempfilerNamedTemporaryFileOSErrorEMFILE) r;rnrHefailed_workersrPrpids_strrOrPfds_limit_marginr-s r/ _try_get_dataz,_MultiProcessingDataLoaderIter._try_get_datas^( ##'''8D$< %  N )$-- 8 @ 1'' 21::<"))!,44Y? @>"Q&99$H$HH"0 :OP!U[[)$   $& 8=>N8OP1,,.PP  77ell*&9  +  7% s]" E,EE*A4E#E,D;D DE E%%E EEEEc|jdkDr;|j|j\}}|r|Std|jd|jrW|jj r2|j\}}|r|S|jj r2td |j\}}|r|S)NrzDataLoader timed out after z secondsz%Pin memory thread exited unexpectedly)rrXrMrr$rJ)r;successrHs r/r=z(_MultiProcessingDataLoaderIter._get_datas ==1  ..t}}=MGT "1$--I  ))224 $ 2 2 4 K))224 ##JKK $ 2 2 4 Kr1c |j|jkr|jj|jd}|r:|d}t |dk(s|j |rnh|j|j=|xjdz c_|j|jkr|j s|jtt |j|jdk(rO|jj|j\}}|xjdz c_|j||S|js|jdkDsJ|j\}}|xjdzc_ |jtj k(rwt#|t$j&j(rS|j rd|j |j*<n|j-|j*|j/||jk7rU|j0s0|jj|d}|j||S|j|xx|fz cc<nE|jj|d}|xjdz c_|j||S)Nrrxr#F)r3r2r4rHrr6r_shutdown_workers StopIterationpop _process_datarr5r=rr"rrJr r_IterableDatasetStopIterationrPrKr>r )r;rRrPrHr?s r/rz)_MultiProcessingDataLoaderIter._next_datas6..4>>1**4>>4@ $QID Q$*>*>y*I7!#..4>>1//**,## 4??4>>23q8"&//"5"5dnn"E 4!#))$ ::~~$*A*AA*E EE(IC  # #q ( #!!\%:%::dFMM$O$OP//?D,,T^^<88H'')dnn$~~!% 3 3C 8 ;I--dI>>$/$ OO//4Q7 !#))$ ::sr1c|j|jz}|j|ksJ |j}t |jD]_}t |j}|j|s(|jrn-|j||t|jzks_ny|j|j|j|f|f|j|j<|j|xxdz cc<|xjdz c_|xjdz c_y#t$rYywxYwNr#)rrr5rr]rrr:r6r r7sumrr;r2r4)r; max_tasksrrCworker_queue_idxs r/r>z-_MultiProcessingDataLoaderIter._try_put_indexs>))D,=,== &&222 $$&Et(() A#D$@$@A ##$45>>,,-=>c((OB    +,00$..%1HI+;*='  01Q61 1$ !+   sD:: EEc|j|xxdzcc<|jt|tr|j |Srb)r7r>rJr reraise)r;rH worker_idxs r/r_z,_MultiProcessingDataLoaderIter._process_datas=  +q0+  d, - LLN r1c|j|s|jr|sJ|j|}|jdd|j|<|jj |k(sJy)NF)r6rrr;ris_set)r;rPshutdownqs r/rKz:_MultiProcessingDataLoaderIter._mark_worker_as_unavailablesu ##I.  $ $    y ) d +0Y'''..0H<<>r$xH,1D)&Azz|  &g`((**>>r$xH,1D)&Azz|  &sC%H %A7H AI;(I;c |jtj|jr|j yy#|jr|j wwxYw)NrE)rLr rqrJrs)rs r/r'z/_MultiProcessingDataLoaderIter._clean_up_workermsJ  FF6::F ;zz| qzz| s A#A'c$|jyr%)r\r:s r/__del__z&_MultiProcessingDataLoaderIter.__del__us  r1r)r2r3r4r=rrr rqrXr=rr>r_rKr\r5r'rvrrs@r/rrs`Tj r-h("T%+$C$C4n! F:;x:=8F&R!r1r)Gr= __future__rr r8loggingrrrrrrtypingrrrrrr r typing_extensionsr rFtorch.distributed distributedr?torch.utils.data.graph_settings torch._utilsr torch.utils.datar #torch.utils.data.datapipes.datapiperrrrtorch.utils.data.datasetrrtorch.utils.data.samplerrrrrcollections.abcr__all__rrrj_worker_init_fn_tlistr rrrrrr getLoggerr2loggerr"r8rDrUrerrrrr6r1r/rsQ#0 RRR" &)# >(  T]4(cUD[) $r(S) "(!?!??..00--//   8 $   w"&oodSSD#6<\ !%8\ !r1