L iv+rddlZddlZddlZddlZddlZddlZddlZddlmZddl Z ddl m Z ddl mZgdZej aej$Zej(ZGddeZGddZeZd Zd Zd Zd Zd ZdZej>dgdZ ej>dddgZ!y)N)Enum)_get_current_rpc_agent) RPCExecMode serialize deserialize PythonUDFRemoteExceptionceZdZdZdZdZdZy)rsyncasync async_jitremoteN)__name__ __module__ __qualname__SYNCASYNC ASYNC_JITREMOTEd/mnt/ssd/data/python-lab/Trading/venv/lib/python3.12/site-packages/torch/distributed/rpc/internal.pyrrs D EI FrrcpeZdZdZdZdZedZdZedZ dZ dZ ed Z d Z d Zd Zy )_InternalRPCPicklera  This class provides serialize() and deserialize() interfaces to serialize data to be "binary string + tensor table" format So for RPC python UDF function and args, non tensor data will be serialized into regular binary string, tensor data will be put into thread local tensor tables, this serialization format is consistent with builtin operator and args using JIT pickler. This format will make tensor handling in C++ much easier, e.g. attach tensor to distributed autograd graph in C++ ctjj|_|j|jt j <i|_yN)copyregdispatch_tablecopy_dispatch_table_tensor_reducertorchTensor_class_reducer_dict)selfs r__init__z_InternalRPCPickler.__init__+s;&55::<-1-A-AU\\*#% rc@||jvr||j|<yyr)r$)r% obj_classreducers r_register_reducerz%_InternalRPCPickler._register_reducer2s% D44 429D $ $Y / 5rc(tj|Sr)_thread_local_tensor_tables recv_tables)cls tensor_indexs r_tensor_receiverz$_InternalRPCPickler._tensor_receiver7s+66|DDrctjj|ttjdz }tj |ffS)N)r, send_tablesappendlenrr0)r%tensorr/s rr!z#_InternalRPCPickler._tensor_reducer<s?#//66v>6BBCaG #44|oFFrcTtjjj|Sr)distrpcPyRRef _deserialize)r.rref_fork_datas r_py_rref_receiverz%_InternalRPCPickler._py_rref_receiverBsxx++N;;rcH|j}tj|ffSr) _serializerr=)r%py_rrefr<s r_py_rref_reducerz$_InternalRPCPickler._py_rref_reducerFs$ ++-#557HIIrc$|j|Sr)rA)r%rrefs r _rref_reducerz!_InternalRPCPickler._rref_reducerJs$$T**rcntj|}tjj |}|S)z Given a serialized representation of a ScriptModule created with torch.jit.save, loads and returns the ScriptModule. )ioBytesIOr"jitload)r.script_module_serializedfms r_script_module_receiverz+_InternalRPCPickler._script_module_receiverMs* JJ/ 0 IINN1 rctj}tjj ||t j |jffS)z, Serializes a ScriptModule. )rFrGr"rHsaverrMgetvalue)r% script_modulerKs r_script_module_reducerz*_InternalRPCPickler._script_module_reducerWs: JJL }a(#;;ajjl_MMrctj}t|}|j|_|j |jt jj<|j|jt jj<t|tjjr#|j|j|j <|j"j%D]}|j"||j|< t't(drt(j*}nd}gt(_|j-|t(j*}| |t(_nt(`|j/|fS)ze Serialize non tensor data into binary string, tensor data into tensor table r3N)rFrG_picklerr rrAr8r9r:rDRRef isinstancer"rH ScriptModulerR __class__r$keyshasattrr,r3dumprP)r%objrKp class_nameold_send_tablestensorss rrz_InternalRPCPickler.serialize_s7 JJL QK//-1,A,A)+/*<*<' c59911 2.2.I.IA  S]] +22779 PJ+/+C+CJ+OA  Z ( P . >9EEO"O24#/ s .99  &6E ' 3+7 g&&rcVttdrtj}nd}|t_ tt j |}|j }| |t_|St`|S#t$r*}t|dz}t|}||_ Yd}~Ed}~wwxYw)zJ Deserialize binary string + tensor table to original obj r-Nz Default RPC pickler does not serialize function code. Ensure that UDFs are defined on both caller and callee modules.) rZr,r- _unpicklerrFrGrIAttributeErrorstr __cause__)r% binary_data tensor_tableold_recv_tables unpicklerrete except_strs rrz_InternalRPCPickler.deserializes . >9EEO"O2>#/ "2::k#:;I.."C  &6E ' 3 ,7 ) A  !,CCMM s.A55 B(> B##B(N)rrr__doc__r&r* classmethodr0r!r=rArDrMrRrrrrrrr sq&: EEG <<J+N3'j#rrc,tj|Sr)_internal_rpc_picklerr)r\s rrrs * *3 //rc.tj||Sr)rpr)rfrgs rrrs , ,[, GGrc~ t|tr||j|ji|j}|S#t $rw}dt jdt|dtj}t|tjt|t|}Yd}~|Sd}~wwxYw)z This function is exclusively called from C++. See ``torch/csrc/distributed/rpc/python_rpc_handler.cpp``. Runs a Python UDF and returns its return value. Wraps any exception in ``RemoteException`` if the function raises. zOn z:  )fileN)rVrcfuncargskwargs Exceptionrget_worker_inforepr traceback format_excprintsysstderrr type) python_udfresultrkrls r _run_functionrs 6 j. 1  *//GZ5F5FG M 6(*::<=SAwir)..01 3  jszz* T!W5 M6s8< B<A,B77B<ct|trC|jjdj d}d} |j |}||yy#t $r }tdt|d||d}~wwxYw)Nzutf-8unicode_escapez8Failed to create original exception type. Error msg was z' Original exception on remote side was ) rVr msgencodedecodeexception_type BaseException RuntimeErrorrd)r exception_msgexcrks r_handle_exceptionrs&/* ))'299:JK  '' 6C ?I + J3q6(9-J  sA A>A99A>c 8d|jd|d|d|d }|S)a Builds the key that RPC calls are profiled with using the autograd profiler. This will be the name of the corresponding Event recorded in the profiler. Args: exec_type (RPCExecMode): Type of RPC/RRef call func_name (str): Name of function being profiled. current_worker_name (str): Name of current worker. dst_worker_name (str): Name of the destination worker. Returns: String representing profiling key rpc_#( -> ))value) exec_type func_namecurrent_worker_namedst_worker_name profile_keys r_build_rpc_profiling_keyrs8" yq 1-@,AoEVVWX rc tjjsJdd|jdt |d|d|d }tjj }tjj |||S)ar This function should be called from RPC/RRef functions to create a RecordFunction object for profiling. This function also runs the before callbacks that start the profiling, though the user is responsible for running the appropriate callbacks when the function to be profiled finishes. Args: exec_type (RPCExecMode): Type of RPC/RRef call func_name (str): Name of function being profiled. current_worker_name (str): Name of current worker. dest_worker_name (str): Name of the destination worker. Returns: An instance of `torch.autograd._RecordFunction`. z$Autograd profiler should be enabled.rrrrr)r"autograd_profiler_enabledrrd_RecordFunction_run_before_callbacks)rrrdest_worker_namerrfs r_start_record_functionrs{ >> + + -U/UU -)3y>*:!rs  < V.ioo/ >>    $WWv,-0H.$,. #K " ";0J K (+(():UDT