L ia3 rddlZddlmZddlmZmZmZddlZddlm Z ddl m Z ddl mZddlmZddlmZddlmZdd lmZdd lmZdd lmZmZmZmZmZmZm Z dd l!m"Z"m#Z#dd l$m%Z%m&Z&ddl'm(Z(ddl)m*Z*ddl+m,Z,m-Z-m.Z.ddl/m0Z0ddl1m2Z2ddl3m4Z4ddl5m6Z6e7e8e9eee:ee:ffZ;dgZ3#678*E   s C -C%valct|turt|jdk(ryt|jdjturyt|jdjt ur t dyt|t urAt|jt ust|jtur t dy)NrFTz1Cannot handle DTensor nested inside ShardedTensorzCannot handle nested DTensor)r7rlen local_shardstensorr ValueError _local_tensor)rAs r*_is_nested_tensorrHUs CyM! s! "a '   "1%,, - >   "1%,, - 8PQ Q  cg  S  7*d33D3D.E.V788 r,propsr:cP|dk(r2ttjt|j }n-tj|t|j }tj ||j |j|j|j|S)Nr&)r:dtypelayout requires_grad pin_memorydevice) rtorchrOrcurrent_deviceemptyrKrLrMrN)rIr:r#rOs r* _alloc_tensorrSdseell$6{$C$R$R$TU +K8GGI  ;; kk||))##  r, state_dictci}d}|jD]\}}d|jf||<t|s't|j dk(sJdt |t sJd|j d}|jj|jjf||<|jj}||fS)a+ Load the right TP slice of the optimizer state. This is not easy since the per-tensor slicing can't be inferred from checkpoint metadata. We take advantage of the model state_dict producing a sliced ST to figure out what we need to load. This is pretty fragile and it might be easier for FSDP to compute this info for us. Returns a dictionary where keys are the same of the state_dict and the value is a tuple of (offset, size) for the current rank TP slice. N.B. The state_dict *MUST* come from FSDP.sharded_state_dict. Nz%Cannot handle ST with multiple shardsz$Can only handle nested ShardedTensorr) itemsr:rHrCrD isinstancermetadata shard_offsets shard_sizesrE_process_group)rTspecsdp_pgkeyvalueshards r*_get_state_dict_2d_layoutrbxs#%E)-E &&(0 UEJJL)c U #u))+,1 7 1e]3 6 3&&(+E,,**E#JLL//E0"   r,ceZdZUeeefed<eed<eed<deee e fddffd Z de fdZ d edejffd ZxZS) _ReaderWithOffset translationrTrY fqn_to_offsetr$Nclt|||_ti|_i|_i|_yN)super__init__rfrrYrTre)selfrf __class__s r*rjz_ReaderWithOffset.__init__s0 *   r,c g}i|_|jjD]\}}|jj|}t |t s|t|||z }A||jvr|t|||z }`|j|}t|jdk(sJ|jd}ttjt|jj|tj|jj g}t#|t%t&||}|D]} | j(j*Jt-| j(j*|} t/j0| j(tj| } | |j| j(<||z }t3|S)NrVr)offsetssizes)offset)rerTrWrYstate_dict_metadatarXrrrfrCrDrrPSizerrZr[rrr dest_indexrpr dataclassesreplacer) rkrequestsfqnobjmdrporiginal_shard local_chunksreqsrioriginal_offsetoriginal_indexs r*create_local_planz#_ReaderWithOffset.create_local_plans--/$ HC2237Bc=1.sB<<$,,,.sB<<'',Fs'')*a/ // --/2N$!JJ).*A*A*O*OQWX **^%<%<%H%HI L4T/4lD  A}}++777"3BMM4H4H&"Q!,!4!4MM%**_*E"3A  /  A  HI$ J!!r,indexcVt||jj||Srh)ri lookup_tensorreget)rkrrls r*rz_ReaderWithOffset.lookup_tensors&w$T%5%5%9%9%%GHHr,)__name__ __module__ __qualname__dictr__annotations__rrr=rintrjrrrPTensorr __classcell__)rls@r*rdrdsmm]233d3 +=&>4("8("TI=IU\\IIr,rdmodel_state_dict optimizer_keystorage_readerplannerc .|j}t|\}}tjj |j }t |}|fg} ttjD]6} t|| |jz} | jd| d| 8td| } n t|} i} i}|jjD]|\}}|j |}|d|k7rt#|t$rd| |<5|j&j)dk(r%t+|j,|j&|| |<w|mt/t+|j,|j&|tj0tj|jt3| |<|d }|j5|d|j&fd}t7|j,j8|j,j:|j,j<|j,j>|j,j@ }| jCtEjF||}g}tj0|}|jHD]i}tKtL|jNjQ|k7r/|jtSt+|j,|jT|| ktWjX||| }||vr(||d tKtZt\||d||<|| |<t_| || ta|n| tc| |j } | S)a Load a state_dict in conjunction with FSDP sharded optimizer state. This is the current recommended way to checkpoint FSDP. >>> # xdoctest: +SKIP >>> import torch.distributed.checkpoint as dist_cp >>> # Save >>> model: torch.nn.Model >>> optim_params = model.parameters() >>> optim = torch.optim.SGD(optim_params, lr=0.01) >>> # Save >>> with FSDP.state_dict_type(model, StateDictType.SHARDED_STATE_DICT): >>> state_dict = { >>> "optimizer": FSDP.optim_state_dict(model, optim), >>> "model": model.state_dict() >>> } >>> dist_cp.save_state_dict( >>> state_dict=optim_state, >>> storage_writer=dist_cp.FileSystemWriter("checkpoint"), >>> planner=dist_cp.DefaultSavePlanner(), >>> ) >>> >>> # Load >>> with FSDP.state_dict_type(model_tp, StateDictType.SHARDED_STATE_DICT): >>> model_state_dict = model_tp.state_dict() >>> checkpoint = { >>> "model": model_state_dict >>> } >>> dist_cp.load_state_dict( >>> state_dict=checkpoint, >>> storage_reader=dist_cp.FileSystemReader(checkpoint_file), >>> planner=dist_cp.DefaultLoadPlanner(), >>> ) >>> model.load_state_dict(checkpoint["model_state"]) >>> >>> optim_state = dist_cp.load_sharded_optimizer_state_dict( >>> model_state_dict, >>> optimizer_key="optimizer", >>> storage_reader=dist_cp.FileSystemReader("checkpoint"), >>> ) >>> >>> flattened_osd = FSDP.optim_state_dict_to_load( >>> model, optim, optim_state["optimizer"] >>> ) >>> >>> optim.load_state_dict(flattened_osd) Nr/r0rr1z rV)rank world_sizenum_devices_per_noder-)rKrLrM memory_formatrN)rErY) process_group)rTrr)2 read_metadatarbr4r5r6r7rr8r9rr(appendr r@rqrW planner_datarXrr:numelrS propertiesrget_rankrrShardTensorPropertiesrKrLrMrrNbuild_metadatarPrrshards_metadatarr placementrr r[r+_init_from_local_shards_and_global_metadatarrrrdr )rrrrrY layout_specsr^dp_pg_device_typer)r3i device_info sharding_specrTrfr_r`key_pathspec_key alloc_sizerst_mdrD current_rankshard_mdsts r*r!r!sfj++-H34DEL%--DDUKPP&'89M } t**,- 9A0!1}'A'A'C#CK   aS+7 8  9 *aJG ,U3 #%J.0M2288:8! U((- A;- '  e1 2*JsO  ::    "+  %**.?JsO]:e.. rs$(( +E@XJKGK> BL:,Cx '> 'M!NNO (  #CS'+""#( 5<< D  FL #+C=?B \\("" $*;*;!< <="J7I*7I|&* N%NN"Nk " N  Nr,