]L i|)ddlmZddlZddlmcmZddlZddl m Z m Z ddl m Z mZddlmZddlmZmZmZddlmZmZmZmZdd lmZmZdd lmZmZm Z dd l!m"Z"ed Z#ed dZ$eddZ%GddeZ&e dGddee#Z'e dGddee#Z(e dGddee$ee$Z)e dGddee%e e%Z*y)) annotationsN) OrderedDictdeque) dataclassfield) TracebackType)Generic NamedTupleTypeVar)BrokenResourceErrorClosedResourceError EndOfStream WouldBlock)TaskInfoget_current_task)EventObjectReceiveStreamObjectSendStream) checkpointT_ItemT_coT) covariantT_contra) contravariantcJeZdZUded<ded<ded<ded<ded<ded<y ) MemoryObjectStreamStatisticsintcurrent_buffer_usedfloatmax_buffer_sizeopen_send_streamsopen_receive_streamstasks_waiting_sendtasks_waiting_receiveN)__name__ __module__ __qualname____annotations__Z/mnt/ssd/data/python-lab/Trading/venv/lib/python3.12/site-packages/anyio/streams/memory.pyrrs&r+rF)eqcPeZdZUedeZded<edZded<d dZy ) MemoryObjectItemReceiverFinitdefault_factoryr task_info)r1ritemcnt|dd}|jjd|jd|dS)Nr4z (task_info=z, item=))getattr __class__r&r3)selfr4s r,__repr__z!MemoryObjectItemReceiver.__repr__)s<tVT*..))*+dnn5EWTHTUVVr+N)returnstr) r&r'r(rrr3r)r4r:r*r+r,r/r/$s)Ur>0sz"WOU$!ueDFMD#::!&E1!=3=NS KOK38 K3O/ r+r>ceZdZUded<eddZded<ddZddZdd Zdd Z dd Z dd Z dd Z ddZ ddZddZy)MemoryObjectReceiveStreamzMemoryObjectStreamState[T_co]_stateFr@bool_closedcB|jxjdz c_yN)rNrCrIs r, __post_init__z'MemoryObjectReceiveStream.__post_init__Ms ))Q.)r+c|jrt|jjr^|jjj d\}}|jj j ||j|jj r$|jj jS|jjstt)a Receive the next item if it can be done without waiting. :return: the received item :raises ~anyio.ClosedResourceError: if this send stream has been closed :raises ~anyio.EndOfStream: if the buffer is empty and this stream has been closed from the sending end :raises ~anyio.WouldBlock: if there are no items in the buffer and no tasks waiting to send Flast) rPrrNrEpopitemr?appendsetpopleftrBrr)r9 send_eventr4s r,receive_nowaitz(MemoryObjectReceiveStream.receive_nowaitPs <<% % ;; & &#{{::BBBN J KK   % %d + NN  ;;  ;;%%--/ /// r+cKtd{ |jS7#t$rt}t t }||j j|< |jd{7|j jj|dn+#|j jj|dwxYw |jcYS#t$rtdwxYwwxYwwrG) rr]rrr/rrNrDwaitpopr4AttributeErrorr)r9 receive_eventreceivers r,receivez!MemoryObjectReceiveStream.receivelsl ,&&( (  ,!GM/57H;CDKK ) )- 8 G#((*** --11-F --11-F ,}}$! ,!t+ , ,sdC8&C8(C8=C5&B&9A<:B&?'C5&(CC5 C C5C8 C11C55C8cR|jrtt|jS)a Create a clone of this receive stream. Each clone can be closed separately. Only when all clones have been closed will the receiving end of the memory stream be considered closed by the sending ends. :return: the cloned stream rN)rPrrMrNrIs r,clonezMemoryObjectReceiveStream.clones  <<% %( <#>#C#C#EF ( EIIK 6r+c,K|jywrGrnrIs r,aclosez MemoryObjectReceiveStream.aclose c6|jjSzj Return statistics about the current state of this stream. .. versionadded:: 3.0 rNrJrIs r,rJz$MemoryObjectReceiveStream.statistics {{%%''r+c|SrGr*rIs r, __enter__z#MemoryObjectReceiveStream.__enter__ r+c$|jyrGrpr9exc_typeexc_valexc_tbs r,__exit__z"MemoryObjectReceiveStream.__exit__ r+c|jsCtjd|jjdt |ddt d|yyNz Unclosed rS) stacklevelsourcerPwarningswarnr8r&idResourceWarningrIs r,__del__z!MemoryObjectReceiveStream.__del__F|| MMT^^445T"T(1QG  r+Nr;None)r;r)r;zMemoryObjectReceiveStream[T_co]rKr}ztype[BaseException] | Noner~zBaseException | NonerzTracebackType | Noner;r)r&r'r(r)rrPrTr]rdrgrnrqrJryrrr*r+r,rMrMHsq ))ue4GT4/8,( = (,&%   r+rMceZdZUded<eddZded<ddZddZdd Zdd Z dd Z dd Z dd Z ddZ ddZddZy)MemoryObjectSendStreamz!MemoryObjectStreamState[T_contra]rNFr@rOrPcB|jxjdz c_yrR)rNrBrIs r,rTz$MemoryObjectSendStream.__post_init__s &&!+&r+c2|jrt|jjst|jj rr|jj j d\}}|jjs||_ |jy|jj rrt|jj|jjkr&|jjj|yt)a Send an item immediately if it can be done without waiting. :param item: the item to send :raises ~anyio.ClosedResourceError: if this send stream has been closed :raises ~anyio.BrokenResourceError: if the stream has been closed from the receiving end :raises ~anyio.WouldBlock: if the buffer is full and there are no tasks waiting to receive FrVN)rPrrNrCr rDrXr3has_pending_cancellationr4rZrHr?r!rYr)r9r4rbrcs r, send_nowaitz"MemoryObjectSendStream.send_nowaits <<% %{{00% %kk++&*kk&C&C&K&KQV&K&W #M8%%>>@ $ !!# kk++ t{{!! "T[[%@%@ @ KK   % %d + r+cKtd{ |j|y7#t$rt}||jj |< |j d{7n4#t$r(|jj j|dwxYw||jj vr|jj |=tdYywxYww)a Send an item to the stream. If the buffer is full, this method blocks until there is again room in the buffer or the item can be sent directly to a receiver. :param item: the item to send :raises ~anyio.ClosedResourceError: if this send stream has been closed :raises ~anyio.BrokenResourceError: if the stream has been closed from the receiving end N) rrrrrNrEr_ BaseExceptionr`r )r9r4r\s r,sendzMemoryObjectSendStream.sendsl 4   T "  4J6:DKK ' ' 3  oo'''   ++// DA T[[888KK// ;)t39 4sRC!(C!*C!,CA1*A-+A10C11B""9CC!CC!cR|jrtt|jS)a  Create a clone of this send stream. Each clone can be closed separately. Only when all clones have been closed will the sending end of the memory stream be considered closed by the receiving ends. :return: the cloned stream rf)rPrrrNrIs r,rgzMemoryObjectSendStream.clones  <<% %%T[[99r+cn|jsd|_|jxjdzc_|jjdk(rit|jjj }|jjj |D]}|jyyyri)rPrNrBrjrDrkclearrZ)r9receive_eventsrms r,rnzMemoryObjectSendStream.closes||DL KK * *a / *{{--2!%dkk&C&C&H&H&J!K --335+ EIIK 3r+c,K|jywrGrprIs r,rqzMemoryObjectSendStream.aclose"rrrsc6|jjSrurvrIs r,rJz!MemoryObjectSendStream.statistics%rwr+c|SrGr*rIs r,ryz MemoryObjectSendStream.__enter__-rzr+c$|jyrGrpr|s r,rzMemoryObjectSendStream.__exit__0rr+c|jsCtjd|jjdt |ddt d|yyrrrIs r,rzMemoryObjectSendStream.__del__8rr+Nr)r4rr;r)r;z MemoryObjectSendStream[T_contra]rKr)r&r'r(r)rrPrTrrrgrnrqrJryrrr*r+r,rrsq --ue4GT4,:4< : "(,&%   r+r)+ __future__rbuiltins @py_builtins_pytest.assertion.rewrite assertionrewrite @pytest_arr collectionsrr dataclassesrrtypesrtypingr r r r rrr_core._testingrrabcrrrlowlevelrrrrrr/r>rMrr*r+r,rs"*(// 8>>!  v& :T 2 :  eWwvWW e gfo  . es /B4/Hssl eWX.0@0Jr+