L iAddlZddlZddlZddlZddlZddlZddlmZddlm Z ddl Z ddl m Z m Z ddlmZmZej"dZGdd e ZGd d e Zy) N)suppress)quote)AbstractBufferedFileAbstractFileSystem)infer_storage_optionstokenizewebhdfscBeZdZdZeej ZdZ dfd Z e dZ dZ ddZ ddZedZed Zed Zd Zd d Zd ZdZdZd!dZdZdZdZd"dZdZdZd dZ dZ!d dZ"dZ#dZ$dZ%xZ&S)#WebHDFSa Interface to HDFS over HTTP using the WebHDFS API. Supports also HttpFS gateways. Four auth mechanisms are supported: insecure: no auth is done, and the user is assumed to be whoever they say they are (parameter ``user``), or a predefined value such as "dr.who" if not given spnego: when kerberos authentication is enabled, auth is negotiated by requests_kerberos https://github.com/requests/requests-kerberos . This establishes a session based on existing kinit login and/or specified principal/password; parameters are passed with ``kerb_kwargs`` token: uses an existing Hadoop delegation token from another secured service. Indeed, this client can also generate such tokens when not insecure. Note that tokens expire, but can be renewed (by a previously specified user) and may allow for proxying. basic-auth: used when both parameter ``user`` and parameter ``password`` are provided. )r webHDFSc  |jryt|di| | rdndd|d|d|_||_|xsi|_i|_| xsi|_||| td||j d<||_ ||_ | |td |||j d <|||j d <|r | td | |_ | |_ |jd t|||_y)a Parameters ---------- host: str Name-node address port: int Port for webHDFS kerberos: bool Whether to authenticate with kerberos for this connection token: str or None If given, use this token on every call to authenticate. A user and user-proxy may be encoded in the token and should not be also given user: str or None If given, assert the user name to connect with password: str or None If given, assert the password to use for basic auth. If password is provided, user must be provided also proxy_to: str or None If given, the user has the authority to proxy, and this value is the user in who's name actions are taken kerb_kwargs: dict Any extra arguments for HTTPKerberosAuth, see ``_ data_proxy: dict, callable or None If given, map data-node addresses. This can be necessary if the HDFS cluster is behind a proxy, running on Docker or otherwise has a mismatch between the host-names given by the name-node and the address by which to refer to them from the client. If a dict, maps host names ``host->data_proxy[host]``; if a callable, full URLs are passed, and function must conform to ``url->data_proxy(url)``. use_https: bool Whether to connect to the Name-node using HTTPS instead of HTTP session_cert: str or Tuple[str, str] or None Path to a certificate file, or tuple of (cert, key) files to use for the requests.Session session_verify: str, bool or None Path to a certificate file to use for verifying the requests.Session. kwargs Nhttpshttpz://:z /webhdfs/v1z_If passing a delegation token, must not set user or proxy_to, as these are encoded in the token delegationzQIf passing a password, the user must also beset in order to set up the basic-authz user.namedoaszJIf using Kerberos auth, do not specify the user, this is handled by kinit.webhdfs_)_cachedsuper__init__urlkerb kerb_kwargsparsproxy ValueErroruserpassword session_certsession_verify_connectr _fsid)selfhostportkerberostokenrr proxy_tor data_proxy use_httpsr!r"kwargs __class__s d/mnt/ssd/data/python-lab/Trading/venv/lib/python3.12/site-packages/fsspec/implementations/webhdfs.pyrzWebHDFS.__init__-s9r <<  "6"!*g7s4&${S &," %2  8#7  ',DIIl #    | < )- +&   (DIIf  (2  ), t 456 c|jSN)r$r%s r/fsidz WebHDFS.fsids zzr0ctj|_|jr|j|j_|j |j_|jr'ddlm }|di|j|j_ |j@|j3ddlm}||j|j|j_ yyy)Nr)HTTPKerberosAuth) HTTPBasicAuthr)requestsSessionsessionr!certr"verifyrrequests_kerberosr6rauthrr requests.authr7)r%r6r7s r/r#zWebHDFS._connects'')    $ 1 1DLL "11  99 : 0 D43C3C DDLL  99 T]]%> 3 -dii GDLL &? r0c ||j|nd}|j|jt|dz}|j }|j |j |j|d<tjd|||jj|j||||} | jdvrY | j} | dd } | dd } | d vr t| | d vr t| | d vr t!| t#| | j'| S#tt$f$rY#wxYw)Nz/=)safeopzsending %s with %s)methodrparamsdataallow_redirects)iiiiiRemoteExceptionmessage exception)IllegalArgumentExceptionUnsupportedOperationException)SecurityExceptionAccessControlException)FileNotFoundException)_strip_protocol _apply_proxyrrcopyupdaterupperloggerdebugr:request status_codejsonrPermissionErrorFileNotFoundError RuntimeErrorKeyErrorraise_for_status) r%rCrDpathrFredirectr-rargsouterrmsgexps r/_callz WebHDFS._callsS-1-=t##D)25D+A AB{{} DIIXXZT  )37ll""<<>$ #  ??7 7 ,hhj+,Y7+,[9WW$S/)KK)#..55+C00&s++  )  s D;;E  E c \|xs |j}t|||||j|||S)a^ Parameters ---------- path: str File location mode: str 'rb', 'wb', etc. block_size: int Client buffer size for read-ahead or write buffer autocommit: bool If False, writes to temporary file that only gets put in final location upon commit replication: int Number of copies of file on the cluster, write mode only permissions: str or int posix permissions, write mode only kwargs Returns ------- WebHDFile instance )mode block_sizetempdir autocommit replication permissions) blocksize WebHDFilerj)r%r_rhrirkrlrmr-s r/_openz WebHDFS._opens<B 14>>   !LL!##  r0cB|dj|d<|d|d<|S)Ntypelengthsize)lower)infos r/ _process_infozWebHDFS._process_infos*F|))+V H~V  r0ct|dS)Nr_)r)clsr_s r/rPzWebHDFS._strip_protocols$T*622r0ct|}|jdd|jddd|vr|jd|d<|S)Nr_protocolusernamer)rpop)urlpathrbs r/_get_kwargs_from_urlszWebHDFS._get_kwargs_from_urlssG#G,   D!  ''*-CK r0cz|jd|}|jd}||d<|j|S)N GETFILESTATUSr_ FileStatusname)rfrYrw)r%r_rbrvs r/rvz WebHDFS.info s>jjtj4xxz,'V !!$''r0c |jd|}|jdd}|D]0}|j||jddz|dz|d<2|rt |d St d |DS) N LISTSTATUSr FileStatusesr/ pathSuffixrc |dS)Nrr)is r/zWebHDFS.ls..s qyr0)keyc3&K|] }|d yw)rNr).0rvs r/ zWebHDFS.ls..s94$v,9s)rfrYrwrstripsorted)r%r_detailr-rbinfosrvs r/lsz WebHDFS.lssjjDj1 >*<8 GD   t $;;s+c1D4FFDL G %%89 99599 9r0cN|jd|}|jdS)z8Total numbers of files, directories and bytes under pathGETCONTENTSUMMARYrContentSummaryrfrY)r%r_rbs r/content_summaryzWebHDFS.content_summarys'jj,4j8xxz*++r0cD|jd|d}d|jvr\|j|jd}|jj |}|j |j dS|j |j dS)z/Checksum info of file, giving method and resultGETFILECHECKSUMF)r_r`Location FileChecksum)rfheadersrQr:getr^rY)r%r_rblocationout2s r/ukeyz WebHDFS.ukeysjj*jF  $((Z)@AH<<##H-D  ! ! #99;~. .  "88:n- -r0cJ|jd}|jdS)zGet user's home directoryGETHOMEDIRECTORYPathr)r%rbs r/home_directoryzWebHDFS.home_directory+s"jj+,xxz&!!r0c|r|jd|}n|jd}|jd}| td|dS)zRetrieve token which can give the same authority to other uses Parameters ---------- renewer: str or None User who may use this token; if None, will be current user GETDELEGATIONTOKEN)renewerTokenz1No token available for this user/security context urlString)rfrYr)r%rrbts r/get_delegation_tokenzWebHDFS.get_delegation_token0sT **17*CC**12C HHJw  9PQ Q~r0cP|jdd|}|jdS)z/Make token live longer. Returns new expiry timeRENEWDELEGATIONTOKENputrDr)longr)r%r)rbs r/renew_delegation_tokenzWebHDFS.renew_delegation_tokenAs(jj/UjKxxz&!!r0c,|jdd|y)z Stop the token from being usefulCANCELDELEGATIONTOKENrrNrf)r%r)s r/cancel_delegation_tokenzWebHDFS.cancel_delegation_tokenFs *5 Fr0c.|jdd||y)aSet the permission at path Parameters ---------- path: str location to set (file or directory) mod: str or int posix epresentation or permission, give as oct string, e.g, '777' or 0o777 SETPERMISSIONr)rDr_ permissionNr)r%r_mods r/chmodz WebHDFS.chmodJs ?5t Lr0cNi}|||d<|||d<|jdd|d|y)zChange owning user and/or groupNownergrouprrDr_)SETOWNERr)r%r_rrr-s r/chownz WebHDFS.chownWs=  #F7O  #F7O Ae$A&Ar0c.|jd|d|y)a9 Set file replication factor Parameters ---------- path: str File location (not for directories) replication: int Number of copies of file on the cluster. Should be smaller than number of data nodes; normally 3 on most systems. SETREPLICATIONr)r_rDrlNr)r%r_rls r/set_replicationzWebHDFS.set_replication`s #$u+ Vr0c ,|jdd|y)NMKDIRSrrrr%r_r-s r/mkdirz WebHDFS.mkdirns 8E 5r0cf|dur|j|r t||j|y)NF)existsFileExistsErrorr)r%r_exist_oks r/makedirszWebHDFS.makedirsqs, u T!2!$' ' 4r0c .|jdd||y)NRENAMEr)rDr_ destinationr)r%path1path2r-s r/mvz WebHDFS.mvvs 8E5 Ir0c B|jdd||rdydy)NDELETEdeletetruefalse)rDr_ recursiver)r%r_rr-s r/rmz WebHDFS.rmys0  )f  07  r0c &|j|yr2)rrs r/rm_filezWebHDFS.rm_files   r0c |j|5}dj|j|dtjdg} |j|d5}t j ||ddd|j|| dddy#1swY%xYw#t$r7tt5|j|ddd#1swYxYwwxYw#1swYyxYw)Nrz.tmp.wb) openjoin_parentsecrets token_hexshutil copyfileobjr BaseExceptionrr[r)r%lpathrpathr-lstream tmp_fnamerstreams r/cp_filezWebHDFS.cp_files YYu  $,,u"5w?P?PQS?T>U7V!WXI YYy$/97&&w89 5)   99! /0'GGI&''   sM9C( B%B5B%B" B%%C%=C C%C! C%%C((C1c|jr(t|jr|j|}|S|jr5|jjD]\}}|j||d}|S)N)rcallableitemsreplace)r%rkvs r/rQzWebHDFS._apply_proxysk ::(4::.zz(+H  ZZ ((* 51#++Aq!4 5r0) iFNNNNNNFNT)rNNT)rbNTNNFr2)NN)'__name__ __module__ __qualname____doc__strtempfile gettempdirrjr{rpropertyr4r#rfrp staticmethodrw classmethodrPrrvrrrrrrrrrrrrrrrrrQ __classcell__r.s@r/r r s!*%(%%'(G#H  c7JH$H+ Z 33( :, ." "" G MB W6 J  r0r cBeZdZdZfdZddZdZdZdZdZ xZ S) roz"A file living in HDFS over webHDFSc t|||fi||j}|jdd|j dd|jdd|j dd|j dd|_|j d}|j dddurR|j |_tj j|ttj|_yy)NrmrlirjrkF) rrrRrr}rmr_targetosrruuiduuid4)r%fsr_r-rjr.s r/rzWebHDFile.__init__s T,V, ::mT * 2 JJ}d + ::mT * 2 JJ}d +!::mS9**Y' ::lE *e 3))DK Wc$**,.?@DI 4r0c|jjj|j|jj ddi}|j y)zWrite one part of a multi-block file upload Parameters ========== final: bool This is the last block, so should complete file, if self.autocommit is True. content-typeapplication/octet-stream)rFrT)r r:postrbuffergetvaluer^)r%finalrbs r/ _upload_chunkzWebHDFile._upload_chunksRggoo"" MM%%'#%?@#  r0cZ|jj}d|jvrd\}}n d\}}d|d<|jj|||j fddi|}|jj |jd}d |jvr|jjj|d d i }|j|jjd d|j fddi|}|jj |jd|_ yy)zCreate remote file/uploada)APPENDPOST)CREATEPUTr overwriter`Frwr r)rrrN) r-rRrhr rfr_rQrr:rr^r)r%r-rCrDrbrrs r/_initiate_uploadzWebHDFile._initiate_uploads !!# $)) )JB(JB"(F; dggmmB LELVL77'' J(?@ $)) 77??&&>3M"N'D  ! ! # 477==6499WuWPVWD GG00j1IJDM r0ct|d}t|j|}||k\s||jk\ry|jj d|j |||z d}|j d|jvrY|jd}|jjj|jj|}|jS|jS)Nrr0OPENF)r_offsetrsr`r) maxminrtr rfr_r^rr:rrQcontent)r%startendrbrrs r/ _fetch_rangezWebHDFile._fetch_rangesE1 $))S! C<5DII-ggmm 5uu    ${{:.H77??&&tww';';H'EFD<< ;; r0cd|jj|j|jyr2)r rr_rr3s r/commitzWebHDFile.commits  499dkk*r0cN|jj|jyr2)r rr_r3s r/discardzWebHDFile.discards  499r0r) rrrrrrrr%r'r)rrs@r/roros&, A"K( +r0ro)loggingrrrrr  contextlibr urllib.parserr8specrrutilsrr getLoggerrUr rorr0r/r0sY   ;3   9 %E EP I$Ir0