L io ddlZddlZddlZddlZddlZddlmZmZddlm Z m Z ddl m Z ddl mZddlmZmZmZmZmZddlZddlmZddlmZddlmZmZdd lmZdd l m!Z!d d l"m#Z#m$Z$d d l%m&Z&m'Z'm(Z(d dl)m*Z*gdZ+ejXe-Z.Gdde Z/e/j`Z0e/jbZ1e/jdZ2e/jfZ3e/jhZ4e/jjZ5e/jlZ6e/jnZ7e/jpZ8e/jrZ9e/jtZ:e0Z;e1Zej~dZ@GddeZAedeAdeBfdZC dIdeDeEeFeeAfdeeEdeBfdZGGddeZH dIdeFejdeeBdeFejfdZK dIdeFejdeeBdeDeEeFejffd ZLd!eFejfd"ZMGd#d$eHZNGd%d&eNZOGd'd(eNZPGd)d*eNZQ dJd+eFeeAd,eEdeFeAfd-ZRd+eFeeAdeFeAfd.ZSd+eDeEeFeAfd/eeEgeEfd0eEdeDeEeFeAffd1ZTd2eDeEeFeeAfd3eEd0eEd4eEdeDeEeEff d5ZUGd6d7eHZVGd8d9eVZWGd:d;eVZX dKd<ZYGd=d>eVZZGd?d@eVZ[GdAdBeVZ\GdCdDeWZ]dEeBfdFZ^d/eeEgeEfd0eEfdGZ_dHZ`y)LN)ABCabstractmethod)Counter defaultdict)Enum) lru_cache)AnyCallable NamedTupleOptionalUnion)OptimizedModule) FSDPModule UnshardHandle)_Loss)record_function)generate_rank_to_stage_mappinggenerate_stage_to_rank_mapping) merge_chunkssplit_args_kwargs_into_chunksTensorChunkSpec)_PipelineStageBase) get_schedule_classPipelineScheduleSinglePipelineScheduleMulti Schedule1F1B ScheduleGPipeScheduleInterleaved1F1BScheduleLoopedBFSScheduleInterleavedZeroBubbleScheduleZBVZeroBubbleScheduleDualPipeVcNeZdZdZdZdZdZdZdZdZ dZ d Z d Z d Z d Zed Zy)_ComputationTyper cptjdtjdtjdtjdtj dtj dtjdtjdtjd tjd tjd i }||S) NFIWUNSHARDRESHARDSEND_FRECV_FSEND_BRECV_BB OVERLAP_F_B) r%FORWARDBACKWARD_INPUTBACKWARD_WEIGHTr4r5r6r7r8r9 FULL_BACKWARDr;)selfstr_maps l/mnt/ssd/data/python-lab/Trading/venv/lib/python3.12/site-packages/torch/distributed/pipelining/schedules.py__str__z_ComputationType.__str__9s  $ $c  + +S  , ,c  $ $i  $ $i  # #X  # #X  # #X  # #X  * *C  ( (-  t}c|dk(rtjS|dk(rtjS|dk(rtjS|dk(rtjS|dk(rtj S|dk(rtj S|dk(rtjS|dk(rtjS|d k(rtjS|d k(rtjS|d k(rtjStd |) Nr1r2r3r4r5r6r7r8r9r:r;zInvalid computation type ) r%r<r=r>r4r5r6r7r8r9r?r; RuntimeErroractions rBfrom_strz_ComputationType.from_strIs S=#++ + s]#22 2 s]#33 3 y #++ + y #++ + x #** * x #** * x #** * x #** * s]#11 1 } $#// /!:6(CD DrDN)__name__ __module__ __qualname__r<r=r>r4r5r6r7r8r9r?r;rC staticmethodrIrDrBr%r%+sTGNOGG F F F FMK EErDr%z?(\d+)(F|I|B|W|UNSHARD|RESHARD|SEND_F|RECV_F|SEND_B|RECV_B)(\d*)ceZdZUeed<eed<dZeeed<dZee ded<dZ dZ e d e fd Zed efd Zy) _Action stage_indexcomputation_typeNmicrobatch_index)rP. sub_actionsc"|jSN)__repr__r@s rBrCz_Action.__str__s}}rDcP|jC|jDcgc] }t|}}ddj|d|jSt |j }|t |jz }|j |t |j z }|Scc}w)N(;))rTreprjoinrRstrrQrS)r@ sub_actionsub_action_reprsrepr_strs rBrWz_Action.__repr__s    'CGCSCSTZZ 0T Tsxx 012!D4I4I3JK K4++,H D112 2H$$0C 5 566O UsB#returncP|jttttt fvSrV)rRr<r?r=r>r;rXs rB is_compute_opz_Action.is_compute_ops)$$      )   rD action_stringc|j}|dk(ry|jdrd|vr|jd}|d|}||dzd}g}|jrM|jdD]9}tj |j}|)|j |;t dtj |d|rt|SdStj|x}rW|j\}} } t t|tj | t| rt| SdS|dk(rytd |d ) z Reverse of __repr__ String should be formatted as [stage][action type][(microbatch)] e.g. `2F0`, `1UNSHARD`, `3SEND_F1` NrZr\rr[)rQrRrSrTzInvalid action string: zD, should be formatted as [stage][action type][(microbatch)] e.g. 2F0)strip startswithfindsplitrPrIappendr%tuple _action_regexmatchgroupsintlenrF) rf bracket_endsub_partcomputation_type_partrTsub_strr`rqrQrRrSs rBrIz_Action.from_strs&++- B   # #C (SM-A',,S1K$+H%2a!% ! K~~'~~c27G!(!1!1'--/!BJ!-#**:67!1!:!:;P!Q!%2=E+.  DH  "'' 6 65 6>Clln ;K)+;K  ))*:;),-=)>$% EI  b %m_4x y  rD)rJrKrLrs__annotations__r%rSr rTrorCrWpropertyboolrerMr_rIrNrDrBrPrP}sq&&&*hsm*37K%/07  t  3 3 3 rDrPrHrccdt|S)NzPP:r_rGs rB_get_profiler_function_namer~s V rDpipeline_ordererror_step_numberc ^tj|}|D]/}tt||D]}||| d|||<1t d|j D}t|Dcgc]4}dt |jtt |dz z6}}t|Dcgc]}|j|dg|z}}ttj|ddi}t|} t| Dcgc]}dt |z} }t|g|D cgc]} t d| Dc} dt|d d zzdjfd t| Dz} t||D cgc]T\} }| d djfd t|Dz|!t!| j#d|k(rdndzV}} }| dzdj|zdz}|Scc}wcc}wcc}wcc} wcc}} w)z Formats the pipeline order in a timestep (row) x rank (column) grid of actions and returns the formatted string. If `error_step_number` is passed in, an additional label will be added to signify which step that it is erroring on. rhc32K|]}t|ywrV)rt).0actionss rB z)_format_pipeline_order..sHWCLHszStep r fillvalueRank c3LK|]}|tt|ndyw)Nr)rtr_)ritems rBrz)_format_pipeline_order..s" F$d.CD NA 5 Fs"$ rr&c3:K|]\}}|d|dyw.s0<)1E5;q>" " #$<sz: c3LK|]\}}t|d|dywrr})rrrrs rBrz)_format_pipeline_order..s/R4c$i+a.!1123Rs!$z <-- ERROR HERE )copydeepcopyrangertmaxvaluesr_zfillsortedgetlist itertools zip_longestzipr^ enumeratersrm)rrrankr num_steps step_labelskey rank_actionstransposed_actions num_ranks rank_labelscol header_rowrrowformatted_rowsformatted_tablers @rB_format_pipeline_orderrsc]]>2N-s>$/01 -Ad#A&.*,t$Q' --H0E0E0GHHIAFyAQ<=#a&,,s3y1}#5677K >DN=S693y 01Li33\PRPQN#I-29-=>7SV#>K>{8%78  F# FFK KN+a/0388<5>{5K<4Jk+=>  E3 ' ((R9S>R R S!,EKKM!$%)::    N !4'$))N*CCdJO G? s89HHH:H$AH)ceZdZ ddedeedejfdeee dfdee e e fdee e e e fee fdef d Zd Zd Zd Ze dd eedeedeedeefdZeddddeefdZddddeefdZ dd eedeedeedeefdZdZ ddee dfdee e e ffdZdee de fdZy) _PipelineScheduleNn_microbatchesloss_fn.args_chunk_speckwargs_chunk_specoutput_merge_spec scale_gradsc||_||_||_||_||_||_ |jdu|_g|_tjd|jjy)NzUsing %s) _n_microbatches_loss_fnr_args_chunk_spec_kwargs_chunk_spec_output_merge_spec _has_backward_internal_lossesloggerinfo __class__rJ)r@rrrrrrs rB__init__z_PipelineSchedule.__init__sq . '!0"3"3 "]]$657 J 7 78rDc|jr>|j1|j|||}|jj |yyyrV)is_lastr _compute_lossrrn)r@stageoutput target_mbsmb_indexlosss rB_maybe_compute_lossz%_PipelineSchedule._maybe_compute_loss9sD ==T]]6%%fj.BCD  ! ! ( ( .7=rDcd|cxkxrt|jknc}|jr|j|r|j|St|jdk7r|st d|d|jy)NrzLoss for microbatch z6 is not available. Available losses for microbatches: )rtrrrrF)r@rr valid_indexs rB_maybe_get_lossz!_PipelineSchedule._maybe_get_loss>s8@c$*?*?&@@ ==T]]6;((2 2 && '1 ,[&xj166:6K6K5LN  rDct|ts|g}td|D}|r}|{t|j|j k7r.t d|j dt|j|j|j|j|jjy)zB Update the losses to those in the internal state c34K|]}|jywrVr)rrs rBrz3_PipelineSchedule._update_losses..Qs!DE%--!DsN Expecting z losses but got ) isinstanceranyrtrrrFclearextend)r@stageslossescontains_last_stages rB_update_lossesz _PipelineSchedule._update_lossesJs &$'XF!!DV!DD 6#54(()T-A-AA" !5!5 66Fs4K`K`GaFbc LLN MM$// 0 ##%rDarg_mbs kwarg_mbsrrct)z Run one iteration of the pipeline schedule with list of microbatches. Will go through all the microbatches according to the schedule implementation. Args: microbatches: list of microbatch args. NotImplementedError)r@rrrrs rB_step_microbatchesz$_PipelineSchedule._step_microbatchesas "!rDtargetrct) Run one iteration of the pipeline schedule with *whole-batch* input. Will chunk the input into microbatches automatically, and go through the microbatches according to the schedule implementation. args: positional arguments to the model (as in non-pipeline case). kwargs: keyword arguments to the model (as in non-pipeline case). target: target for the loss function. losses: a list to store the losses for each microbatch. r)r@rrargskwargss rBstepz_PipelineSchedule.stepss "!rDcz|j} d|_|j|||d|||_S#||_wxYw)a Run one iteration of the pipeline schedule with *whole-batch* input. Will chunk the input into microbatches automatically, and go through the microbatches, calling forward only. args: positional arguments to the model (as in non-pipeline case). kwargs: keyword arguments to the model (as in non-pipeline case). target: target values for the loss function. losses: a list to store the losses for each microbatch. Fr)rr)r@rrrroriginal_has_backwards rBevalz_PipelineSchedule.evalsH!% 2 2 7!&D 499d6&KFK"7D !6D s1 :cdtffd }| ||dndgjz}| ||dnigjz}| ||d|'t|tst dt |||fS)z* Pre-process/check inputs namec t|tst|dt|t |j k7r't dj d|dt |y)Nz must be a list but got a rrz but got )rr TypeErrortypertr ValueError)mbsrr@s rBcheck_type_and_lenz;_PipelineSchedule._check_inputs..check_type_and_lensmc4(4&(B49+ NOO3x4///  !5!5 6avYs3xjQ0rDrrNrrz losses must be a list but got a )r_rrrrr)r@rrrrrs` rB _check_inputsz_PipelineSchedule._check_inputss #    w 2dT111G  y+ 6t333I  ! z< 8  fd+"B4<. QRR !!rDc&|j||SrV)r)r@rrs rBrz_PipelineSchedule._compute_losss}}VV,,rDrrc|s|r4t|||j|j|j\}}||fSdg|jzig|jzfS)zj Splits a full-batch input into chunks (i.e. microbatches) and returns the chunks rN)rrrr)r@rr args_split kwargs_splits rB _split_inputsz_PipelineSchedule._split_inputssm 6'D$$%%'' ( $J |+ +4$...t7K7K0KK KrD output_chunksrcc.t||jS)z Merge output chunks back to a batch state. If output_merge_spec is None, the utility will merge output chunks by dimension 0 (batch dim). )rr)r@rs rB_merge_outputsz _PipelineSchedule._merge_outputss    # #  rDNNNNTNNNNrV)rJrKrLrsr r torchTensorrordictr_r r r{rrrrrrrrrrrrrrNrDrBrrs:>AEBFIM  9 9(3 #456 9"%(<"=> 9 $Do)=$>? 9 $E$sCx.%**D$EF 9 9D/ &.#'$(%)!% "$"D>"TN "  """!% "x~ " ""&7x~7,#'$(%)!% $"$$"D>$"TN $"  $"L- ,0LCHoLc3h(L. DI # rDrp2p_opsdescct|dk(rgS|r|dnd}tjd||tj|S)zt Simple wrapper over batch_isend_irecv from torch.distributed, which just adds a descriptive logger on top. rz, rhzbatch_p2p %s%s)rtrdebugdistbatch_isend_irecv)rrdesc_strs rB _batch_p2prsF  7|q "$r{H LL!8W5  ! !' **rDctt}i}t|dk(r|S|D] }||jj |"t |j D]\}}t||||<|S)z Sorts the list of P2P ops by the peer rank, and then calls batch_isend_irecv. Return a dictionary of works by peer rank. This function helps us avoid hangs in case of skip connections. rr)rrrtpeerrnritemsr)rr ops_by_peer work_by_peeropr opss rB_sorted_batch_p2prs0;4/@K/1L 7|q(BGG##B'(K--/08 c'$7 T8 rDworkc2|D]}|jy)zX Waits for a list of dist.Work (typically from _batch_p2p / _sorted_batch_p2p). N)wait)rws rB_wait_batch_p2prs rDceZdZdZ ddededeedeee dfdee e e fd ee e e e fee fd effd Zd Zddd deefdZdee eeeeffdZxZS)ra Base class for single-stage schedules. Implements the `step` method. Derived classes should implement `_step_microbatches`. Gradients are scaled by num_microbatches depending on the `scale_grads` argument, defaulting to True. This setting should match the configuration of your loss_fn, which may either average losses (scale_grads=True) or sum losses (scale_grads=False). Nrrrr.rrrct|||||||||_|j|_d|_||jkrt d|d|jd|j|_y)NrrrrrrFzNumber of microbatches (z9) must be greater than or equal to the number of stages (z).) superr_stage num_stages _num_stages_stage_initializedr_get_pipeline_orderr) r@rrrrrrrrs rBrzPipelineScheduleSingle.__init__s )+//#    ++"' D,, ,*>*:;##'#3#3"4B8   $ $ & rDc@g}|j|jjtt ||jj |j |||jr%|jj|j d|_ yNT) rr_get_init_p2p_neighbors_opsrr_prepare_forward_infrarr_prepare_backward_infrar)r@rrall_opss rB_initialize_stagez(PipelineScheduleSingle._initialize_stage=st%'t{{>>@A 7+, **4+?+?vN    KK / /0D0D E"&rDrrc|jrtjs td|j|j_|jj |j||\}}|*ttj||j}nd}|j|||||jjr%|j|jjSyrzstep() requires gradients to be enabled for backward computation; it should not be used under torch.no_grad() context. Please call eval() instead.N)rris_grad_enabledrFr has_backwardclear_runtime_statesrr tensor_splitrrrrr)r@rrrrrr targets_splits rBrzPipelineScheduleSingle.stepJs   e&;&;&=. $(#5#5   ((*$(#5#5dF#C L   !3!3FD list of actions NrNrXs rBrz*PipelineScheduleSingle._get_pipeline_orderts rDr)rJrKrL__doc__rrsr r rorrr_r r r{rr%rrrPr __classcell__rs@rBrrs'+AEBFIM   !    (#  "%(<"=>  $Do)=$>?  $E$sCx.%**D$EF    D '"&(x~(TXd3Xg=N8O3O.P%QrDrc JeZdZdZ ddeedeedeedeefdZy) _ScheduleForwardOnlyzo The forward-only schedule. Will go through all the microbatches and perform only the forward pass Nrrrrc|| td|j||||\}}|js|j|d|dg}t |j D]}t d|5|jj|}t|d}|jD] } t| |jj||||||jj|}t|d}|j|jdddtj!d|jj"||D] } t| y#1swYIxYw) z< Run one iteration of the pipeline schedule Nz7Forward-only schedule does not support loss computationrForward fwd_recvr fwd_send[%s] Forwarded microbatch %s)rFrrr%rrrrget_fwd_recv_opsrrrforward_one_chunkget_fwd_send_opsrrrrQ) r@rrrrfwd_sends_to_waitrrworksrs rBrz'_ScheduleForwardOnly._step_microbatchesso  !V%7I "//JPVW&&  " "71:y| <46t++, UA 8A30 9kk2215)#J?!LLN*D#D)* --aYq\Jkk2215)#J?!((8 9 LL79P9PRS T U$& "D D ! "# 9 9s 3B3E..E7 r)rJrKrLr.r rrrNrDrBr2r2sP#'$(%)!% *"$*"D>*"TN *"  *"rDr2c reZdZdZ d deedeedeedeefdZdeeeeee ffd Z y) rz^ The GPipe schedule. Will go through all the microbatches in a fill-drain manner. Nrrrrc f|j||||\}}|js|j|d|dg}t|jD]}t d|5|j j|}t|d}|jD] } t| |j j|||||} |j j|}t|d}|j|jdddtjd|j j ||j#|j  |||D] } t| g} t|jD]}t d|5|j j%|}t|d }|jD] } t| |j'|j |} |j j)|| ||jd z k( |j j+|}t|d }| j|jdddtjd |j j |!|j j-|j,r |jnd | D] } t| |j/|j |y#1swYxYw#1swYxYw)z Run one iteration of the pipeline schedule with list of microbatches. Will go through all the microbatches according to the GPipe schedule. Args: microbatches: list of microbatch args. rr4r5r r6Nr7z Backward bwd_recvrr last_backwardbwd_sendz[%s] Backwarded microbatch %sgrad_scale_factor)rrr%rrrrr8rrrr9r:rrrrQrget_bwd_recv_opsrbackward_one_chunkget_bwd_send_opsrr) r@rrrrr;rrr<rrbwd_sends_to_waitrs rBrz ScheduleGPipe._step_microbatchess"//JPVW&&  " "71:y| <46t++, IA 8A30 9kk2215)#J?!LLN*D#D)*66q'!*iPQlSkk2215)#J?!((8 9 LL79P9PRS T  $ $T[[&*a H I(& "D D ! " 46t++, VA 9QC1 9kk2215)#J?!LLN*D#D)*++DKK; .."#t';';a'?"?/ kk2215)#J?!((8 9" LL8$++:Q:QST U% V( 6:6F6Fd22A  & "D D ! " DKK0m 9 94 9 9s%B3L'CL'L$ 'L0 rcc i}|j}t|D]}g}|}|jdg|zt|jD],}|j t |t j|.d|dz |z z}|jdg|zt|jD],}|j t |t j|.|||<|S)z Returns the pipeline order for GPipe schedule. See base method in PipelineScheduleSingle for details on the schedule IR format. Nr'r) rrrrrnrPr%r<r?)r@r pp_group_sizerr warmup_delaymb_idxbackward_delays rBrz!ScheduleGPipe._get_pipeline_orders (( -( +D/1G L NND6L0 1  4 45 Pwt-=-E-EvNO P-!"3d":;N NND6N2 3  4 45 Vwt-=-K-KVTU V$+N4 ' +*rDr rJrKrLr.r rrrrsrPrrNrDrBrrs{#'$(%)!% N1$N1D>N1TN N1  N1`Xd3Xg=N8O3O.P%QrDrc reZdZdZ d deedeedeedeefdZdeeeeee ffd Z y) rzo The 1F1B schedule. Will perform one forward and one backward on the microbatches in steady state. Nrrrrc||j||||\}}|js|j|d|dt|j|j |j jz }d}d}g}g} t|D]} |j j|} tt| d|j j|||||} t||j j|} ||dz k7r t| d}|j|j | |||dz } |j j|} tt| | zd|j!|j |}|j j#||||jdz k(|j j%|}|dz }||jk(rn|j j|} tt|| zd|j j|||||} |j|j | |||j j|} |dz }Et|d }||jkr|j j|} tt| d |j!|j |}|j j#||||jdz k(t||j j%|}t|d }|dz }||jkr|j j'|j&r |jnd t||j)|j |y ) z Run one iteration of the pipeline schedule with list of microbatches. Will go through all the microbatches according to the 1F1B schedule. Args: microbatches: list of microbatch args. rr5r rr6fwd_send_bwd_recvr@bwd_send_fwd_recvrBr?rCN)rrr%minrrrrQrr8rrr9r:rrErrFrGrr)r@rrrr warmup_chunks fwd_mb_index bwd_mb_index send_work fwd_sends_ fwd_recvsr bwd_recvsr bwd_sendss rBrzSchedule1F1B._step_microbatches7s"//JPVW&&  " "71:y| <    t{{66 6   &(  }% A 44\BI JyzB C[[22gl3Y|5LF I & 44\BI}q00&yzB  $ $T[[&*l S A L5 > 44\BI Jy9'I A L'T111* 6:6F6Fd22A   " DKK0rDrcc i}|j}t|D]}g}|jdg|z|dz |z }d}t|D].}|jt |t j ||}0tdd|dz |z z}|jdg|zd} |j|z } | dkDri|dz }|jt |t j || dz} |jt |t j| | dz } | dkDri|j| z } | dkDr||z dkDrK|jd| dkDri|jt |t j| | dz } | dz} n4|jt |t j| | dz } | dz} | dkDr|||<|S)z Returns the pipeline order for 1F1B schedule. See base method in PipelineScheduleSingle for details on the schedule IR format. Nrrr&) rrrrnrPr%r<rrr?) r@rrJrr num_forward forward_mbr wait_for_1f1b backward_mbremaining_forwardremaining_backwards rBrz Schedule1F1B._get_pipeline_orders (( -(8 +D/1G NND6D= ))1,4KJ;' wt-=-E-EqIJ    1 (9D(@#ABM NND6M1 2K $ 4 4{ B #a'a wt-=-E-EzRS!Q&!D"2"@"@+Nq $a'"&!5!5 !C $q("D(A-NN4()A-#D*:*H*H+V$q( *a/*NN&6&D&DkR 1$K&!+&%%q(($+N4 q8 +rrDrrNrNrDrBrr1s}#'$(%)!% J1$J1D>J1TN J1  J1XBXd3Xg=N8O3O.P%QBrDrcompute_actionsmax_active_stagesc dtdtttdttfd}t g dtf fd }dtf fd }t |D]s\}}| ||||d tt fd  }tt fd  }|D] } ||  |D] } ||   j|u S) a]Given a basic schedule involving only compute actions (F,B,W,OVERLAP_F_B), add UNSHARD/RESHARD actions for FSDP. UNSHARD refers to fetching the full contents of an FSDP-sharded layer, requiring an all-gather operation. RESHARD does the opposite, releasing memory (but doing no communication) We abandon the "timestep lock" during lowering max_active_stages controls how many prefetches we allow. It should be measured in mb and tuneable but in practice 3 stages is probably the thing we want? (to account for having one f and one b active, and something else prefetching?) count next_actionsrcc t}g}|D]}||jtk(r|jx|jD]W}|j|vs|j |j|j |jt||k(sWnt||k(s|S|j|vs|j |j|j |jt||k(s|S|S)zdRemove duplicates (same stage, different microbatch), find next 'count' stages that will do compute.)setrRr;rTrQaddrnrt)rgrhseenretar`s rBnext_stage_indicesz0_add_unshard_reshard..next_stage_indicess "A}%%49R&'mm& %11= HHZ%;%;<JJz'='=>"3x50 % & 3x5( }}D0/ 1==1s8u,! ' "& rDrQchj|jt|tdyrV)rkrnrPr4rQ active_stagesfsdp_aware_actionss rB_unshardz&_add_unshard_reshard.._unshard7s(+&!!'+w"EFrDchj|jt|tdyrV)removernrPr5rqs rB_reshardz&_add_unshard_reshard.._reshard;s([)!!'+w"EFrDNc |vSrVrN)srrs rBz&_add_unshard_reshard..Fs a}&<rDc |vSrVrN)rynext_ns rBrzz&_add_unshard_reshard..Hs avorD)rsrr rPrjrfilterrn) rdrerortrwrrHfetchevictrrrrsr|s @@@rB_add_unshard_reshardrs "&x'8"9 c8"eM(*GcGGcG/* 6 > $$5qr7JKVrQrSrnrPr?)rdmerged_actionsrH next_actions rB _merge_bwr[sN  $$Q' > /"q'9'A    "/"q'9'A-0,@1,Doa($   # #~ 5',,?""k&=&=='';+G+GG  ! !**M6;R;RS     "  ! !& )1 2 rD stage_to_rankrc : |Dcic]}|g}}|Dcic] }|t}}dtdtffd dtdtttff fd }dttdttdtffd }|D]N}g}||D]=} | (| j |j | j -|j| ?|||<P|r7d} t|D]}t||dkDsJd |d t||||d} || ||sC| ||j| ||j|  | r{|| \} } ||j| ||j| || jj| || jj| ||jdt||dk(r||=d } | sJd |r7|Scc}wcc}w) za Transforms a compute-only schedule into a complete schedule with communication actions. rHrccF|jtk(r<|jdz k7xr(|jdz|jk7S|jttfvr9|jdk7xr(|jdz |jk7Sy)NrrF)rRr1rQr=r?)rHrrs rB _has_commsz"_add_send_recv.._has_commss  " "a '%%a73M""Q&=v112=3 3 $ $(G G%%*3}""Q&0v11203 3rDc|s J|d|j}|j}|j}t||tk(rt nt |}|tk(r|dzn|dz }t||tk(rtnt|}||fS)Nz is not a valid comm actionr) rQrRrSrPr1r6r8r7r9)rH stage_idxctyperLsendrecv_stage_idxrecvrs rB _get_commsz"_add_send_recv.._get_commss&!IfX-H#II!&& ''((yEQJ&FFK*/1*Q)a-~!vPTzrD prev_actionscJ|y|jtk(rc|jdk(sTt|jt|j |vryt|jdz t|j |vryy|jt tfvr|jdz k(st|jt|j |vryt|jdzt |j |vryt|jdzt|j |vryyy)aWe don't put our own recv ops in the schedule, we let a sender on another rank put our recv ops in place. This helps ensure a sane (non-hanging) ordering of sends and recvs. But it also means we might not be able to schedule our next compute action yet. TrrF) rRr1rQrPr7rSr=r?r9)rHrrs rB_ready_to_schedulez*_add_send_recv.._ready_to_schedules  >  $ $ )&2D2D2I**FF4K4KL **Q.63J3JK   # # 'F F&&*q.8**FF4K4KL **Q.@W@WX **Q. v?V?VW rDFrrank=z, len(compute_actions[rank])=Tz6Malformed compute schedule, can't schedule sends/recvs) rjrPr{ror rTrrnrrtrkrQr)rdrrr comm_actionsrrr new_actionsrHprogressrrrs `` @rB_add_send_recvrsrDS-S4dBh-SL-SET,UTT35[,UL,U 7 t 7uWg-='>*!*14W* *^ ,%' %d+ +F!f&8&8&D""6#5#56""6*  + !,, ?+ Dt,-1 4'7C 5689 1%T*1-F%fl4.@A!T"))&1T"&&v.f%!+F!3JD$!&--d3 &**40 t/?/?!@AHHN t/?/?!@AEEdK D ! % %a (?4()Q.#D)H1 2QQQx9 : ].T,Us HHrrJnum_microbatchesc t|k(sJd|dtt|D]}|vrJd|t|Dcic]=}|ttttt tt ti?c}idtdtdtffd }D]n}t|D][\}}| t|tsJd|d |d |d |j|jD] } || ||R||||]pD]} t| t} t| t} t| t } t| t }| |k(sJd | d td| d|| |k(sJd| d| d|| | |zdzz|k(rJd| d|d| d| d| Scc}w)Nz2Schedule has incorrect number of ranks - expected z , actual z%Schedule is missing actions for rank rHrrc 2|j}|j}|j}|tk(r |tj |n"|t k(rX| |tvr.d|d|d|d|d }t |}|d|}t| |t j |n|tk(rX| |tvr.d|d|d|d|d }t |}|d|}t| |tj |n`|tk(rW| |tvr.d|d|d |d|d }t |}|d|}t| |tj || vr| |<y |} || k(sJd|d|d |d |d | y)zPProcess a single action and update stage_actions and stage_index_to_rank_mappingr, step z": Running Full Backward for stage z , microbatch z without first running Forwardrz Full pipeline schedule: z#: Running Backward Input for stage z$: Running Backward Weight for stage z% without first running Backward Inputz: Stage z is assigned to both rank z and rank N) rQrRrSr1rkr:rAssertionErrorr2r3) rHrrs_idrmb_id error_msgformatted_schedulefull_error_msg existing_rankr stage_actionsstage_index_to_rank_mappings rB_process_actionz+_validate_schedule.._process_actionsc!!'''' A: $  " & &u - aZM$/22D6.PQUPVW""'(FH&<t&"!k!>?Q>RS%^44 $  " & &u - aZM$/22D6.QRVQWX""'(FH&<t&"!k!>?Q>RS%^44 $  " & &u - aZM$/22D6.RSWRXY""'(MO&<t&"!k!>?Q>RS%^44 $  " & &u - 2 204 ' -7=M=( vWTF(4&8RSWRXXbcpbqr (rDrrz: Got an invalid action: z, expected instance of _ActionzGot rz microbatches for stage z , expected z(Invalid backward microbatches for stage z8: I and W must have equal counts, but got I=z, W=r&z : expected z( total backwards, but got B=z, I=) rtrr1rjr:r2r3rPrsrrrT)rrJrrrstage_idrrrHr`rf_mbb_mbi_mbw_mbrrs` @@rB_validate_schedulers w<= ( <]O9UXY`UaTbc (m$OwN"Gv NNOj)=   su su su su   =M#%99s9#9v4%gdm4 4LD&~fg. vWTF*CF8Kij . !!-"("4"4.s%--2LHrDzDeprecation warning: 'use_full_backward' is no longer supported. Simply stop passing it, and everything should still work fine.)rr_stagesrr group_sizerJ group_rankrrstage_index_to_group_rank_stages_initializedr_should_compute_lossrrwarning) r@rrrrrrrrrrrs @rBrzPipelineScheduleMulti.__init__s )+//#   !!9//#AY111I(( )G    0 0* &\\ ME.2.L.LE + M$) d2$L!CE  ( NNQ  )rDrcg}|jD]!}|j|j#tt |t }|jD]q}|j r|j|j||}n|j|j||}|jsW|j|jsd|_ yr ) rrr!rrrois_firstr"rrr#r)r@rrr$rnext_stage_argss rB_initialize_stagesz(PipelineScheduleMulti._initialize_stagess%'\\ @E NN5<<> ? @ 7+,,17\\ DE~~"'">">(($##(">">((/6#!!--d.B.BC D$( rDrrcct||j|j|j|_|j D]}|j|_y)z] Allocates the stage index to rank mapping which is needed for communication N)rrJrrrr)r@rrs rB_validate_and_set_stage_mappingz5PipelineScheduleMulti._validate_and_set_stage_mappingsT *<         * & \\ ME.2.L.LE + MrDct|dd5}tj|}|jD] }|j |j|" dddy#1swYyxYw)QDump a CSV representation of the schedule into a file with the provided filename.rrhnewlineN)opencsvwriterrwriterow)r@filenamecsvfilerrs rB _dump_csvzPipelineScheduleMulti._dump_csvs_ (C , ;ZZ(F++ ; 3 3D 9: ; ; ; ;s AAA'cH|dk(sJt|d5}tj|}t|D]5\}}|Dcgc]}tj |c}|j |<7 ddd|j|j ycc}w#1swY*xYw)zLoad a CSV representation of the schedule from a file with the provided filename. This API will most likely get renamed/refactored so is marked as internal for now. format must be "compute_only" for PipelineScheduleMulti. compute_onlyrhrN)rrreaderrrPrIrr)r@rformatrrrrrys rB _load_csvzPipelineScheduleMulti._load_csvs ''' (B ' O7ZZ(F&v. O cJM,NQW-=-=a-@,N##D) O O ,,T-@-@A -O O Os+BBBBB!rrc|jrtjs td|jD]}|j|_|jD]}|j |j||\}}|*ttj||j}nd}|j|||||jD]+}|js|j|jcSyr')rrr(rFrr)r*rrr+rrrrr) r@rrrrrrrr,s rBrzPipelineScheduleMulti.steps   e&;&;&=. \\ 4E!%!3!3E  4\\ )E  & & ( )$(#5#5dF#C L   !3!3FD+>?? @rDrrrc  |j||||\}}|js|j|d|d|jDcic]}|j|}}t }t }|j D]\} | dkDr!|j|j| dz | |jdz ks<|j|j| dz^t} t|j|jD]\} } g} | &| j}| j}| j} |Jd|t j"k(rT|| }|j%|||||}|j'||||| j)|j+|n|t j,k(r|| }|j/||}| | xxdz cc<| | |j0k(}|j2r |j0nd}|j5||d||r|j3|| j)|j7|n|t j8k(rM|| }|j/||}|j5||dd| j)|j7|n|t j:k(re|| }| | xxdz cc<| | |j0k(}|j2r |j0nd}|j=|||r |j3|nt?d ||D]}|j|}d}| tA|kr|| }|*|j}|j}|j} |Jd|t j"k(r1| dz|vsr|| dz}| j)|jC||t,t8t:fvrt?d ||D]}|j|}d}| tA|kr|| }|*|j}|j}|j} |Jd|t"t:fvrf|t8t,fvr1| dz |vs||| dz }| j)|jE|t?d |tGtI| |jY|j|ycc}w#tJ$rs}tLjOd |j|jPjRtU|| | tLjOd tW|j| |d}~wwxYw) Operate on the microbatches for looped schedules (multiple stages on each rank). TODO: Does not use sorted_batch_isend_irecv(). As a result, this schedule does not support models with skip connections. rrNzCAll currently supported action types require valid microbatch_indexTr full_backwardrAFrAzUnknown computation type zi[Rank %s] pipeline schedule %s caught the following exception '%s' at time_step %s when running action %sz%sr)-rrrrrQrjkeysrkrrrrrrrRrSr%r<r9rrr:r?rrrrFrGr=r>backward_weight_one_chunkrrtr8rErr ExceptionrerrorrrJr_rr)r@rrrrrstage_index_to_stageall_prev_ranksall_next_ranksrQbackward_counter time_steprHrrRrrrrArD prev_rank prev_rank_opsprev_rank_action next_rank next_rank_opsnext_rank_actiones rBrz(PipelineScheduleMulti._step_microbatchess "//JPVW''  # #GAJ ! = 37,,? ).E  u $? ? $'5#&5/446 TKQ""4#A#A+PQ/#RST--11""4#A#A+PQ/#RS  T*1!*4+>+>tyy+I!JT  IvS (*%'-'>'>$%66H"("4"4K#/]/(+;+C+CC 4[ A!&!8!8$gh&789L"00 HU 5#9#9(#CD)-=-K-KK 4[ A#33E8D(5:5,[9T=Q=QQ&594D4DD00!*00$!%*.*7 1 )!--.?@ 5#9#9(#CD)-=-L-LL 4[ A#33E8D00$!%*/*/ 1  5#9#9(#CD)-=-M-MM 4[ A(5:5,[9T=Q=QQ&594D4DD00!*77$*78)!--.?@(+DEUDV)WXX"0I$($7$7 $BM'+$ 3}#55+8+C('3+;+L+L(#3#D#D&6&B&B '3a3,/?/G/GG*Q2FF)=[1_(M # 5+A+A(+K L-)*+2 !",";"0I$($7$7 $BM'+$ 3}#55+8+C('3+;+L+L(#3#D#D&6&B&B '3a3,/II -.-1PP*Q2FF)=[1_(M # 5+A+A(+K L","; - $Do)=$>? -$E$sCx.%**D$EF-$D>--^(uS#X(4 MChw&7!889 M  M;B "&+x~+^#'$(%)!% z2$z2D>z2TN z2  z2rDrc eZdZdZ ddeeeeefde ffd Z dde de ffd Z dde de fdZ dZ dd eed eed eed eefd ZxZS)_PipelineScheduleRuntimea% Provides a simple runtime that requires a 'schedule IR' including specified communication operations. Can be instantiated directly by creating _PipelineScheduleRuntime and calling load_csv, or can be subclassed and the subclass can be responsible for creating a schedule IR. rrc "t|i_|dk(rC|D]=}gj|<||D]$}|Jj|j|&?y|dk(r|j D]<\}}t |D])\}}| |j rtd|d|d|d>|D]}t||j|<tjfdj _ytd |d ) z Given an in-memory representation for a simple compute-only schedule, lower it to a complex schedule including communication actions. Stores the schedule in self, and must be called before running step_mo() compute_commsNrz?Expected compute-only schedule but found communication action 'z ' at rank z , position ze. Communication actions (e.g. SEND_F, RECV_F, etc.) should not be present when format='compute_only'.c"j|SrVrryr@s rBrzzG_PipelineScheduleRuntime._prepare_schedule_with_comms..s(F(Fq(IrD)rrformat= is not implemented) rrpipeline_order_with_commsrnr rrerrrrr)r@rrrrH action_listrrs` rB_prepare_schedule_with_commsz5_PipelineScheduleRuntime._prepare_schedule_with_commss_ /8CE& _ $ H79..t4%dmHF!---2248??GH H ~ %%,]]_ !k!*;!7IAv)&2F2F( &xz${1#FPQ   7KDM8..t4  .<..I++.D * & 1D&EF FrDrc|dk(r+t |||j|jy|dk(rzi}t |d5}t j |}t|D]+\}}|Dcgc]}tj|c}||<-|j||dddytd|dcc}w#1swYyxYw) a Loads a csv in simple format and then lowers it to include communication actions format must be either "compute_only" or "compute_comms". If compute_only, the lowering passes will automatically be run to generate a compute_comms schedule. rrrhr)rNrr) rrrrrrrrrPrIr) r@rrrrrrrryrs rBrz"_PipelineScheduleRuntime._load_csvs ^ # G h '  - -d.A.A B  &Gh+ JwG,!*6!2GID#BE$FQW%5%5a%8$FGDMG11'&1I  J J & 1D&EF F%G J Js+C1C CCCc|dk(rp|jJdt|dd5}tj|}|jD] }|j |j|" dddy|dk(rp|j Jdt|dd5}tj|}|j D] }|j |j |" dddyy#1swYyxYw#1swYyxYw) rrNz'Compute only schedule must be availablerrhrrz6Must initialize compute_comms schedule before dump_csv)rrrrrr)r@rrrrrs rBrz"_PipelineScheduleRuntime._dump_csv#s  ^ #&&2 9 2hR0 ?GG, //?DOOD$7$7$=>? ? ? &11= H =hR0 JGG, ::JDOOD$B$B4$HIJ J J '  ? ? J JsAC,AC8,C58DcLtjfdjS)Nc"j|SrVrrs rBrzz4_PipelineScheduleRuntime._simulate..9sd44Q7rD)_simulate_comms_computerrrXs`rB _simulatez"_PipelineScheduleRuntime._simulate6s%&  * * 7     rDrrrrc  |j||||\}}|js|j|d|d|jDcic]}|j|}}|j Jdi}i}g} it dtffd } t} t|j |jD]\} } | j}| j | jnd}|dk\s|ttfvs Jd| d| j}||}t|j t"}|d z|v}|d z |v}t$j'd | | t)t+| 5|t,k(r+| j/t1|j3|n|t4k(r+| j/t1|j7|nj|t8k(r.||f|vsJd t1|j;||||f<n3|t<k(r.||f|vsJd t1|j?||||f<n|tk(r7|r|vr|vs Jd |d|j jAd|<n|tk(r=|r|vs Jd|d|vs Jd|d|j jCnv|tDk(r|r| ||jFs/|s-||f|vs Jd| dtI|jK||f|jM|||||}|jO|||||r||d zjQ||n|tRk(r|r| ||jTs/|s-||f|vs Jd| dtI|jK||f|jW||}| |xxd z cc<| ||jXk(}|jZr |jXnd }|j]||d||r|j[||r||d z j_|ja||n|tbk(r|r| ||jTs/|s-||f|vs Jd| dtI|jK||f|jW||}|j]||dd|r{||d z j_|ja||nS|tdk(r;|r| || |xxd z cc<|jg|| ||jXk(ntid| ddddts| r%tI| jKts| r%tsdk(sJd|ju|j|ycc}w#1swYsxYw#tj$r>}t$jmd| | totq|j | |d}~wwxYw)rrNzLMust call _prepare_schedule_with_comms() before calling _step_microbatches()rcx|vr'|j|=j||vs Jd|y)zQIf an unshard is active for `stage_idx`, wait() it and mark `stage_idx` unshared.z*Attempted to compute on sharded stage_idx=N)rrk)r unshard_opsunsharded_stagess rB_assert_unshardedzF_PipelineScheduleRuntime._step_microbatches.._assert_unshardedcsRK'I&++- * $$Y/ 00 =9,? 0rDrizaction=z missing mb_indexrz8_PipelineScheduleRuntime running time_step %d, action %szARecv twice for {stage_idx=} {mb_index=} without executing forwardzBRecv twice for {stage_idx=} {mb_index=} without executing backwardzUnsharding the same stage_idx=z twiceT)async_opzResharding stage_idx=z without unshardingz before finishing unshardzComputing action=z before receiving inputz Attempted to run compute action=rFrz is unknown or unsupportedz\_PipelineScheduleRuntime caught exception at step %s when running action %s. Full Schedule:rzUnused unshard operations);rrrrrQrrjrsrrrrRrSr4r5rsubmodrrrrr~r6rnrr:r8rGr7r8r9rEunshardreshardr<rrrr9rset_local_fwd_inputr?rrrrrFset_local_bwd_inputget_local_bwd_outputr=r>rrrrprintrrtr)r@rrrrrr bwd_recv_ops fwd_recv_opssend_opsrrrrH comp_typerrstage_uses_fsdpis_next_stage_on_this_rankis_prev_stage_on_this_rankrrrArDrrrs @@rBrz+_PipelineScheduleRuntime._step_microbatches=se"//JPVW''  # #GAJ ! = 37,,? ).E  u $? ? --9 Z 9 @B ?A +-13 5  *1!*4+I+I$))+T!U|  Iv{ "33 ..:++  1} 6)1fY/01#.. ,Y7",U\\:"F-6]>R-R*-6]>R-R* N %%@%HITQ !F*  53I3I(3S(TU"f,  53I3I(3S(TU"f,%$ ". .`  . ?I!228<? i%:;#f,%$ ". .a  . ?I!228<? i%:;#g-* )1A A$-[$@I"Ai\HI!A6;\\5I5ISW5I5XK 2"g-*#,0@#@"8i\9L M#@$-K#?"8i\9R S#?"LL002"g-*-i8!&$>!* ($".$.#5VI5L M $. ,L,<,!* ($".$.#DF9D[ \ $. ,L,<, :W X 9XXr)rr)rJrKrLr.rrsrr rPr_rrrrrr/r0s@rBrrs%,Gc4 1223,G,G\G#GsG*J#JsJ& #'$(%)!% w2$w2D>w2TN w2  w2rDrc |eZdZdZ d deededeee e fdeee e e fee fdef fd ZdZxZS) r ai Breadth-First Pipeline Parallelism. See https://arxiv.org/abs/2211.05953 for details. Similar to Interleaved 1F1B, Looped BFS supports multiple stages per rank. What is different is that when microbatches are ready for multiple local stages, Loops BFS will prioritizes the earlier stage, running all available microbatches at once. rrrrrct||||||i|_t|jD]"}|j |}||j|<$y)N)rrrrr)rrrrrJ!_calculate_single_rank_operations) r@rrrrrrrank_opsrs rBrzScheduleLoopedBFS.__init__Asl )/#  CE$,,- 1D==dCH(0D   % 1rDc t|j}t||j|z|j}t|Dcgc]}d}}|D]/|j fdt|j D1d|jdz |z z}|j dg|zt |D]8|j fdt t|j D:|Scc}w)Nc3TK|]}ttj|!ywrV)rPr%r<rrrQs rBrzFScheduleLoopedBFS._calculate_single_rank_operations..es) %5%=%=xH%(r&rc3TK|]}ttj|!ywrV)rPr%r?rs rBrzFScheduleLoopedBFS._calculate_single_rank_operations..ps) %5%C%CXNr)rtrrrJrrreversed)r@rn_local_stages stage_indicesrYrpost_warmup_opsrQs @rBrz3ScheduleLoopedBFS._calculate_single_rank_operationsZsT\\* $$$~5t7I7I rt)rrJ warmup_ops fwd_bwd_ops cooldown_opsrforward_stage_indexbackward_stage_indexnum_1f1b_microbatchesenable_zero_bubblefwd_stage_mb_indexbwd_stage_mb_indexweight_stage_mb_indexrYrr total_opsbackward_op_idsweight_op_countFULL_BACKWARD_OR_BACKWARD_INPUTrfwd_stage_indexrrUbwd_stage_indexrVweight_stage_indexweight_mb_indexs rB_get_1f1b_rank_opsr-ws*5S)9)4S)9,7,<8=T{(C!(CH(C &ma.?$.F)GG d O'$.2[(<7IOO--$IM% ?1"5O/??3  / OO)9)A)A8L Z!^# 89 2 8 [ 8 81"5O 2? CC 3  / OO)9)A)A<P 326O 2? CC 3  / OO)H,W   " "2 &!b:o9N&N%9#O4&"(==O'PPO=%&89*(88' 1$ &%226O 2? CC 3  / OO)H,W   " "2 &!b:o9N&N%9#O4&"(==O'PPO=%&89*(88' 1$[M%^ 33G!G1//2RS45GH HO 501  "$4$D$Do  1 33G!G Og)Ds K2ceZdZdZ d deededeedee e dfdee e e fdee e e efe efd effd Zd eeefd ZxZS)ra The Interleaved 1F1B schedule. See https://arxiv.org/pdf/2104.04473 for details. Will perform one forward and one backward on the microbatches in steady state and supports multiple stages per rank. When microbatches are ready for multiple local stages, Interleaved 1F1B prioritizes the earlier microbatch (also called "depth first"). This schedule is mostly similar to the original paper. It differs by being relaxing the requirement of num_microbatch % pp_size == 0. Using the flex_pp schedule, we will have num_rounds = max(1, n_microbatches // pp_group_size) and it works as long as n_microbatches % num_rounds is 0. As a few examples, support 1. pp_group_size = 4, n_microbatches = 10. We will have num_rounds = 2 and n_microbatches % 2 is 0. 2. pp_group_size = 4, n_microbatches = 3. We will have num_rounds = 1 and n_microbatches % 1 is 0. rrrr.rrrc |dj|_t | |||||||t ||_|dj |_td||jz|_ ||jz|_ ||jzdk7rtd|jd|di|_ t|jD]"}|j|} | |j|<$y)Nrrrrrrrrrz_Interleaved 1F1B requires the number of microbatches to be a multiple of the number of rounds ( ), but got .)rrJrrrtrrrrnumber_of_roundsmicrobatches_per_roundrrrr) r@rrrrrrrrrrs rBrz ScheduleInterleaved1F1B.__init__ s$AY11 )+//#  "&k1I(( #A~9K9K'K L&48M8M&M# D11 1Q 65595J5J4KL)*!- CE$,,- 1D==dCH(0D   % 1rDrcc  fd}| jjz}| z }||z } |z|z}tjd |||fd} fd}t jj ||||S)Ncjdz jz}d}||jdz |z zz}t|jjzS)Nrr&rr4rJrSrrwarmups_ops_last_stagemultiply_factorrr@s rBget_rank_warmup_opszVScheduleInterleaved1F1B._calculate_single_rank_operations..get_rank_warmup_ops9 o##a'++&, " O//##a'4/3J z4#7#7$:M:M#MN NrD=rank %s, warmup_ops %s, 1f1b %s, cooldown_ops %s total_ops %sc`|jzjz}|jzzSrVr4rrJr local_indexrr@s rBrzVScheduleInterleaved1F1B._calculate_single_rank_operations..forward_stage_indexZ 44#>#>>$BUBUUK$"4"44< .backward_stage_index_ W##:%$*E*EE%%&&   $"4"44< $1 $Do)=$>? $1$E$sCx.%**D$EF$1$1L9 hw>O9P9 rDrceZdZdZ ddeededeedee e dfdee e e fdee e e efe efd effd Zd eeefd Zd ZxZS)r!aw The Interleaved Zero Bubble schedule. See https://arxiv.org/pdf/2401.10241 for details. Will perform one forward and one backward on inputs for the microbatches in steady state and supports multiple stages per rank. Uses the backward for weights to fill in the pipeline bubble. In particular this is implementing the ZB1P schedule in the paper. rrrr.rrrc |D]'}t|jtstd|dj|_t ||||||||t||_ |dj|_ td||j z|_ ||jz|_||jzdk7rtd|jd|di|_t#|j D]"} |j%| } | |j | <$|j'|j|j z|_y)NzYThe Zero Bubble schedule is not supported with stage modules that have used torch.compilerr0rzZZero bubble requires the number of microbatches to be a multiple of the number of rounds (r1r2)rrrrFrrJrrrtrrrrr3r4rrrr_add_bubbles_to_actions r@rrrrrrrrrrrs rBrz&ScheduleInterleavedZeroBubble.__init__ sn E%,,8", $AY11 )+//#  "&k1I(( #A~9K9K'K L&48M8M&M# D11 1Q 65595J5J4KL)*!- CE$,,- 1D==dCH(0D   % 1#::   $"4"4 4 rDrcc  fd}| jjz}| z }||z } |z|z}tjd |||fd} fd}} t jj ||||| d S)Ncjdz jz}d}||jdz |z zz}t|jjzSrDr7r8s rBr;z\ScheduleInterleavedZeroBubble._calculate_single_rank_operations..get_rank_warmup_ops r<rDr=c`|jzjz}|jzzSrVr?r@s rBrz\ScheduleInterleavedZeroBubble._calculate_single_rank_operations..forward_stage_index rBrDcjdz |z jzjzz }|jzzSrDrErFs rBrz]ScheduleInterleavedZeroBubble._calculate_single_rank_operations..backward_stage_index rGrDT)r!rH) r@rr;rIrrr%rrr rs `` @rBrz?ScheduleInterleavedZeroBubble._calculate_single_rank_operations s O). ,,t/C/CC$z1 % 3 ,|;  K        =  =!%!            !#  rDc|j}d}t}i}i}i}d}t|jD]} g|| <d|| <d|| < d} t} t|jD]} || } | t || k\rd} || | || | } | J| \}}}}||||||s>|| j || | || j |||f|| xxdz cc<|| j d|| xxdz cc<|| xxdz cc<|| j d|j| | rn|dkDrtjd|||S)Nc|tjk(r|dk7r |dz ||f|vryy|tjk(r'||dz k(r|tj|f|vS|dz||f|vSy)NrrTF)r%r<r?)rr microbatchnum_stages_globalseen_opss rB need_bubblezJScheduleInterleavedZeroBubble._add_bubbles_to_actions..need_bubble s%---A:519b*"=X"M  '555-11!#3#;#;ZHPXXX 2z2(BBrDrTFrz?Non zero bubbles added: total_bubbles_added=%s bubbles_added=%s) rrjrrJrtrnrkupdaterr)r@rVrrXrWresult next_pointer bubbles_addedtotal_bubbles_addedr should_stop temp_seen_ops timestamp temp_actionrQrrUrYs rBrMz5ScheduleInterleavedZeroBubble._add_bubbles_to_actions s%%  4 $Do)=$>? 4 $E$sCx.%**D$EF4 4 l> hw>O9P> @@rDr!ceZdZdZ d deededeedee e dfdee e e fdee e e efe efd effd Zd eeefd ZxZS)r"a The Zero Bubble schedule (ZBV variant). See https://arxiv.org/pdf/2401.10241 Section 6 for details. This schedules requires exactly two stages per rank. This schedule will perform one forward and one backward on inputs for the microbatches in steady state and supports multiple stages per rank. Uses backward with respect to weights to fill in the pipeline bubble. This ZB-V schedule would have the "zero bubble" property only if time forward == time backward input == time backward weights. In practice, this is not likely true for real models so alternatively a greedy scheduler could be implemented for unequal/unbalanced time. rrrr.rrrc .|dj|_t | |||||||t |j|j d|_|jD]}|j |_t||_ |jdk7rtd|jd|dj|_ |dj|_ i|_t|jD]"} |j!| } | |j| <$y)Nrr0vstyler&0ZBV requires exactly 2 stages per rank, but got r2)rrJrrrrrrrtrrrrrrrrrNs rBrzScheduleZBVZeroBubble.__init__H s/$AY11 )+//#  *H    0 0* &\\ ME.2.L.LE + M"&k   ! #B&&'q*  1I((  ).. CE$,,- 1D==dCH(0D   % 1rDrcctd|jzdz |j}t|Dcgc]}d}}d\}}}}d|j|z zdz } |} |jdz |z } t| D](}|j t | t||dz }*|} t| D]N}|j t | t||dz }|j t | t||dz }P|j|z } t| D]o}|j t | t||dz }|j t | t||j t | t||dz }q||ks||kr||kr&|j t | t||dz }|j t | t||j t | t||dz }|j t | t||dz }|j t | t||j t | t||dz }||kr||kr||}}|}t|D]N}|j t | t||dz }|j t | t||dz }P|j|z }t|D]N}|j t | t||dz }|j t | t||dz }P||kr,|j t | t||dz }||kr,||kr,|j t | t||dz }||kr,||k(r||k(sJ||k(r||k(sJ|Dcgc]-}|'|j|j|jkr|nd/}}|Scc}wcc}w)Nr&r)rrrr)rRrS) rrJrrrrnrPr1r2r3rS)r@rn_microrYrf0_cntf1_cntb0_cntb1_cnt warmup_n1stage_id_chunk0stage_id_chunk1 warmup_n2 warmup_n3w0_cntw1_cnt cooldown_n1 cooldown_n2rHs rBrz7ScheduleZBVZeroBubble._calculate_single_rank_operationst s[a$,,,q0$2F2FG;@;,GaT,G,G*4&++d23a7 //A-4y! A OO!fU  aKF    y! A OO!fU  aKF OO!fU  aKF &&- y! A OO!fU  aKF OO!fU  OO!fU  aKF vo'!1'!f !  OO!fU  OO!fU  aKF OO!fU  aKF OO!fU  OO!fU  aKF5vo'!18  {# A OO!fU  aKF OO!fU  aKF ((4/ {# A OO!fU  aKF OO!fU  aKF vo OO!fU  aKF vo vo OO!fU  aKF vo Ff$444Ff$444#   %++7++d.B.BB     a-HL  s P2P rrJr0s@rBr"r"8 s &'+AEBFIM *1'(*1*1(# *1 "%(<"=> *1 $Do)=$>? *1$E$sCx.%**D$EF*1*1Xthw>O9PtrDr"ceZdZdZ d deededeedee e dfdee e e fdee e e efe efd effd Zd eeefd ZxZS)r#z The DualPipeV schedule. A more efficient schedule variant based on the DualPipe schedule introduced by DeepSeek in https://arxiv.org/pdf/2412.19437 Based on the open sourced code from https://github.com/deepseek-ai/DualPipe rrrr.rrrc |dj|_t | |||||||t |j|j d|_|jD]}|j |_t||_ |jdk7rtd|jd||j krtd|d |j d |dj|_ |dj|_ i|_t|jD]"} |j!| } | |j| <$|j#|jy) Nrr0rdrer&rgr2zDDualPipeV requires at least as many microbatches as stages, but got z microbatches and z stages.)rrJrrrrrrrtrrrrrrrrrrNs rBrzScheduleDualPipeV.__init__ s$AY11 )+//#  *H    0 0* &\\ ME.2.L.LE + M"&k   ! #B&&'q*  D,, ,V!""4T5E5E4FhP  1I((  ).. CE$,,- 1D==dCH(0D   % 1 ))$*=*=>rDrccg}ig|j}|j}t||dzd}||\}}dtffd dtdtdtffd }dtdtd t ffd } dtffd } ||z d z dz} t | D]} | ||t|d z} t | D]} | ||t| ||t ||z d z }t |D]&} | ||t| || ||t(||dzz |zd z}t |D]B}|dk(r%||d z k(r| ||t| ||tn ||||||||D||z d z }t |D]} | ||t|||||d z}d}t |D]V}||dzk(r |dzd k(rd}|rtnt}| |||||dzk(r |dzdk(rd}|rtnt}| |||X||z d z }t |D]"} | ||rtnt}| |||$|d z}t |D] } | | |S)Nr&rdrerQc|tf}|tf}j|ddz|<j|ddz|<y)zbHelper method to increment BACKWARD_INPUT and BACKWARD_WEIGHT counters when FULL_BACKWARD is used.rrN)r=r>r)rQ input_key weight_keycounterss rBincrement_backward_countszVScheduleDualPipeV._calculate_single_rank_operations..increment_backward_counts6 sK$n5I%7J"*,,y!"#BHZ rDr forward_stagebackward_stagec|tf}|tf}j|d}j|d}t|t|t|t|f}|j tdt d||dz|< |y)zYHelper method to add an overlapped forward+backward action which tracks microbatch index.rriNr)r<r=rrPr?rnr;) rrr forward_key backward_keyr_rarTr}r~s rBadd_overlap_f_bzLScheduleDualPipeV._calculate_single_rank_operations..add_overlap_f_b= s)'2K*N;L!k15J",,|Q7K w ; {CK NN72{D+F G%/NH[ ! %n 5rDrRc|tk7r||fn|tf}j|d}|jt ||||tk(r |y|tk(rj||f|dz|<y)Nrr)r?r=rrnrP)rrQrRrrr}r~ weight_queues rB add_actionzGScheduleDualPipeV._calculate_single_rank_operations..add_actionT s$}4./!>2   ||C+H NN7;0@(K L =0)+6$~5 ''h(?@ (1  rDcsyjd\}}|jt|t||tf}j |ddz|<y)z4Helper method to add a weight action from the queue.Nrr)rrnrPr>r)ractual_stage_indexr,r|r}rs rBadd_weight_action_if_pendingzYScheduleDualPipeV._calculate_single_rank_operations..add_weight_action_if_pendingl sb2>2B2B12E /  NN&## -o>J#+<< A#>#BHZ rDrr)rrFT) rJrrrsrr%rr<r=r?)r@rrr num_chunksrank_to_stages stage0_index stage1_indexrrrstep_1rYstep_2step_3step_4rstep_5step_6 enable_zbr step_7step_8r}r~rs @@@rBrz3ScheduleDualPipeV._calculate_single_rank_operations' s +-   && )) 7 y1}C &4D%9" l C3 C 6 6 6  6. - - -/ -0 C$ C$d"Q&!+v 7A w g 6 7v 7A w g 6 w g 6 7 T!A%v 7A w n = ( 1 w g 6 7 i!m+d2Q6v AAv$)a-/7L':7L-@".#/ *+  $T!A%v A w m < *+   v 9AFaKD1HM *3I w i 8FaKD1HM *3I w i 8 9T!A%v 9A ( 1*3I w i 8 9 v 2A ( 1 2rDrrJr0s@rBr#r# s'+AEBFIM 2?'(2?2?(# 2? "%(<"=> 2? $Do)=$>? 2?$E$sCx.%**D$EF2?2?h^hw>O9P^rDr# schedule_namec Ltttttt t ttd }|jDcic]}|j|}}|j}||vr(td|dt|j|||Scc}w)z Maps a schedule name (case insensitive) to its corresponding class object. Args: schedule_name (str): The name of the schedule. ) 1F1BInterleaved1F1BGPipe LoopedBFSInterleavedZeroBubblerr ZBVZeroBubble DualPipeVzUnknown schedule name 'z'. The valid options are ) rrrr r!rrr"r#rlowerrr)r schedule_mapklowercase_keyslowercase_schedule_names rBrr s2&!>"8!6.& L-9,=,=,?@qaggil@N@+113n4%m_4MdS_SdSdSfNgMh i   '>? @@ AsB!c * t|Dcic]}|||Dcgc]}|| c}}}}t|Dcic]}|gc} Dcic] }|tc} dtdttf fd }dttdt f fd }|r_d}t|D]O}t ||dk(r||d}||r"| |||||jdd }G||dQt|d D]} t || dk(s|| =t|D]`}t ||dk(r |d ||d}||s/|| |d < |j|||jdbt|d D]} t || dk(s|| =|s>td t |D]}td |d||dtd|r_ Scc}wcc}}wcc}wcc}w)aThis function dry-run simulates the actions in the schedule from the perspective of all ranks, and flags any deadlocks caused by missing or misordered communications. It also simulates any bubbles in time where a rank can not execute any action due to waiting for unmet dependencies. The total number of simulator steps can be used as a metric for unit tests involving IR optimization passes as reordering and merging of IR can reduce the number of simulated steps. The simulation is not high-fidelity and does not model overlapping of compute and communication, or cuda streams. Future work may be to enhance this and model the compute time, comms overlap, and even memory. NrrHc\|j|||j|yyrV)rnrk)rrH_prev_ops_rank _schedules rBadd_to_schedulez0_simulate_comms_compute..add_to_schedule s3$v&   4 $ $V , rDrcc|y|j} |}|jtk(rd|jdk(ryt|jt|j |vryt|jdz t|j |vryy|jt tfvr|j dz k(ryt|jt|j |vryt|jdzt |j |vryt|jdzt|j |vryy|jtk(ry|jtk(r)t|jt|j }||vS|jtk(r-|dz }t|t|j }| |vS|jtk(rTt|jt |j }t|jt|j }||vxs||vS|jtk(r-|dz}t|t|j }| |vStd|)NTrrFzUnsupported action type ) rQrRr1rPr7rSr=r?r9r>r6r8r) rHrprev_ops expected_fpeer_stage_idx expected_send expected_b expected_bwrrrs rBrz3_simulate_comms_compute.._ready_to_schedule sd >&& !- ":;  " "a '!!Q&**FF4K4KLPXX**Q.63J3JKxW  $ $(G G!!Z!^3v))663J3JKxW**Q.@W@WX**Q. v?V?VW  $ $ 7  $ $ . !3!3Q8O8OPJ) )  $ $ .&]N#NFF*  ..@Q!-q@@N $N32b2IHQ.QttSU{.QN-c-8G+<- 8B8G#48B8Bt >* ,D>$'(A-#D)!,F!&)%#D&1t$((+d+ ,5 &A>!$%*"1% & >* ,D>$'(A-r".#D)!,F!&)%*0IdOB'"4(,,V4t$((+ ,5 &A>!$%*"1% & )+A)+L M& I~nT.B1.E-FGH I:; ;W Z kA2/Rs& HHHH H HHc Ng}t|D]\}t||D]I\}}| |jt||jt t tfvrdndd|||ddK^ddl}t|d5}|jd |i|dddy#1swYyxYw) a This function dumps a schedule IR into a chrometrace format so it can be visualized. It is currently very basic and only serves as a graphical alternative to dumping the schedule IR as text. As future work we may extend this to include more accurate heuristics for durations, or let users input durations, add 'flow events' to let the UI show the connection between sends and recvs, and model cuda streams for comm/compute as separate streams on the chrometrace view. N computation communicationXr)rcatphpidtidtsdurrr traceEvents) rrrnr_rRr1r:r3jsonrdump)schedulereventsrtimesteprHrfs rB_dump_chrometracerh sFx  )(4. 9  Hf~ MMK"22q!Qi?&,"   & h . =&)1-...s =BB$rV)r')rF)arrrloggingreabcrr collectionsrrenumr functoolsrtypingr r r r r rtorch.distributed distributedr torch._dynamortorch.distributed.fsdprrtorch.nn.modules.lossrtorch.profilerr_utilsrrrUrrrrr__all__ getLoggerrJrr%r<r=r>r4r5r6r7r8r9r?r;r1r2r3r:compilerprPr_r~rrsrrrP2POpWorkrrrrr2rrrrrrrrr r-rr!r"r#rrrrNrDrBrs  #,== )<'*RTT%    8 $7Et7Et  " "!00"22  " "  " "         .. **   F S jS l C  (,;d8G#4556;}; ;|G G V6: + $**  +%-c] + $))_ +6: $** %-c] #tDII 6$tyy/t.tn0"10"ft*tnT)TrP(7+,PP ']Pf#(7+,# ']#Lv#tG},-vSE3J'vv #tG}  vrx' #tHW-.. /x'x'x' x'  #s(^ x'v`2-`2F \24\2~ =-=RFRq 3q hA$9AHp1pfZ0ZzAcA6B#+SE3J#7BEHBJ!.rD