K ilC"ddlZddlZddlZddlmZmZddlmZmZddl m Z m Z m Z m Z mZddl Z e jrddl mZmZgdZGddZGd d eZGd d ZGd dZGddeZGddeZGddZy)N)genioloop)Future"future_set_result_unless_cancelled)UnionOptionalTypeAny Awaitable)DequeSet) ConditionEvent SemaphoreBoundedSemaphoreLockc eZdZdZddZddZy)_TimeoutGarbageCollectorzBase class for objects that periodically clean up timed-out waiters. Avoids memory leak in a common pattern like: while True: yield condition.wait(short_timeout) print('looping....') NcDtj|_d|_y)Nr) collectionsdeque_waiters _timeoutsselfs S/mnt/ssd/data/python-lab/Trading/venv/lib/python3.12/site-packages/tornado/locks.py__init__z!_TimeoutGarbageCollector.__init__)s#))+ c|xjdz c_|jdkDr3d|_tjd|jD|_yy)Ndrc3BK|]}|jr|ywN)done).0ws r z<_TimeoutGarbageCollector._garbage_collect..2s-WAaffha-Ws)rrrrrs r_garbage_collectz)_TimeoutGarbageCollector._garbage_collect-sD ! >>C DN'---W-WWDM rreturnN)__name__ __module__ __qualname____doc__rr(rrrrsXrrcteZdZdZdefdZ d deeee jfde e fdZ d deddfdZd d Zy) raA condition allows one or more coroutines to wait until notified. Like a standard `threading.Condition`, but does not need an underlying lock that is acquired and released. With a `Condition`, coroutines can wait to be notified by other coroutines: .. testcode:: import asyncio from tornado import gen from tornado.locks import Condition condition = Condition() async def waiter(): print("I'll wait right here") await condition.wait() print("I'm done waiting") async def notifier(): print("About to notify") condition.notify() print("Done notifying") async def runner(): # Wait for waiter() and notifier() in parallel await gen.multi([waiter(), notifier()]) asyncio.run(runner()) .. testoutput:: I'll wait right here About to notify Done notifying I'm done waiting `wait` takes an optional ``timeout`` argument, which is either an absolute timestamp:: io_loop = IOLoop.current() # Wait up to 1 second for a notification. await condition.wait(timeout=io_loop.time() + 1) ...or a `datetime.timedelta` for a timeout relative to the current time:: # Wait up to 1 second. await condition.wait(timeout=datetime.timedelta(seconds=1)) The method returns False if there's no notification before the deadline. .. versionchanged:: 5.0 Previously, waiters could be notified synchronously from within `notify`. Now, the notification will always be received on the next iteration of the `.IOLoop`. r*cd|jj}|jr|dt|jzz }|dzS)N) __class__r+rlen)rresults r__repr__zCondition.__repr__qsBT^^,,-. == ns4=='99 9F|rNtimeoutctjj|rLdfd }tjj j ||jfdS)zWait for `.notify`. Returns a `.Future` that resolves ``True`` if the condition is notified, or ``False`` after a timeout. c^js tdjyNF)r$rr(rwaitersr on_timeoutz"Condition.wait..on_timeouts"{{}6vuE%%'rc&jSr#remove_timeout_io_looptimeout_handles rz Condition.wait..sw/E/En/Urr))rrappendrIOLoopcurrent add_timeoutadd_done_callbackrr8r>rDrEr=s` @@@rwaitzCondition.waitws` V$  ( mm++-G$00*EN  $ $%U V rncg}|r[|jrO|jj}|js|dz}|j||r |jrO|D]}t |dy)zWake ``n`` waiters.r TN)rpopleftr$rGr)rrNwaitersr=s rnotifyzCondition.notifysfDMM]]**,F;;=Qv& DMM  =F .vt < =rcL|jt|jy)zWake all waiters.N)rRr5rrs r notify_allzCondition.notify_alls C &'rr#r r))r+r,r-r.strr7rrfloatdatetime timedeltar boolrMintrRrTr/rrrr5sa9v#EIeX-?-?&? @A 4, = =D =(rrc~eZdZdZd dZdefdZdefdZd dZ d dZ d d e e e ejfdedfd Zy) raAn event blocks coroutines until its internal flag is set to True. Similar to `threading.Event`. A coroutine can wait for an event to be set. Once it is set, calls to ``yield event.wait()`` will not block unless the event has been cleared: .. testcode:: import asyncio from tornado import gen from tornado.locks import Event event = Event() async def waiter(): print("Waiting for event") await event.wait() print("Not waiting this time") await event.wait() print("Done") async def setter(): print("About to set the event") event.set() async def runner(): await gen.multi([waiter(), setter()]) asyncio.run(runner()) .. testoutput:: Waiting for event About to set the event Not waiting this time Done r*Nc0d|_t|_yr;)_valuesetrrs rrzEvent.__init__s  rczdj|jj|jrdSdS)Nz<{} {}>r_clear)formatr4r+is_setrs rr7zEvent.__repr__s: NN # #[[]E  (/  rc|jS)z-Return ``True`` if the internal flag is true.r^rs rrcz Event.is_sets {{rc|js;d|_|jD]$}|jr|jd&yy)zSet the internal flag to ``True``. All waiters are awakened. Calling `.wait` once the flag is set will not block. TN)r^rr$ set_result)rfuts rr_z Event.sets? {{DK}} )xxzNN4( )rcd|_y)zkReset the internal flag to ``False``. Calls to `.wait` will block until `.set` is called. FNrers rraz Event.clears  rr8ctjrjdSjj j fd|St j|}|j fd|S)zBlock until the internal flag is true. Returns an awaitable, which raises `tornado.util.TimeoutError` after a timeout. Nc:jj|Sr#)rremove)rhrs rrFzEvent.wait..s$--*>*>s*CrcHjsjSdSr#)r$cancel)tfrhs rrFzEvent.wait..ssxxz3::<tr)rr^rgraddrKr with_timeout)rr8 timeout_futrhs` @rrMz Event.waits|h ;; NN4 J # CD ?J**7C8K  ) )C  rr)r#)r+r,r-r.rrVr7rZrcr_rarrrWrXrYr rMr/rrrrsd%N #  )EIeX-?-?&? @A 4rrc`eZdZdZdeddfdZd dZddd eed ee jddfd Z y) _ReleasingContextManagerzReleases a Lock or Semaphore at the end of a "with" statement. with (yield semaphore.acquire()): pass # Now semaphore.release() has been called. objr*Nc||_yr#)_obj)rrus rrz!_ReleasingContextManager.__init__ s  rcyr#r/rs r __enter__z"_ReleasingContextManager.__enter__s rexc_typeOptional[Type[BaseException]]exc_valexc_tbc8|jjyr#)rwrelease)rrzr|r}s r__exit__z!_ReleasingContextManager.__exit__s rr)) r+r,r-r.r rryr BaseExceptiontypes TracebackTyperr/rrrtrtsZCD 1-(,,-   rrtceZdZdZddeddffd Zdeffd ZddZ dde e e e jfdeefd Zdd Zd d de ed e ej(ddfdZddZd d de ede ej(ddfdZxZS)ra A lock that can be acquired a fixed number of times before blocking. A Semaphore manages a counter representing the number of `.release` calls minus the number of `.acquire` calls, plus an initial value. The `.acquire` method blocks if necessary until it can return without making the counter negative. Semaphores limit access to a shared resource. To allow access for two workers at a time: .. testsetup:: semaphore from collections import deque from tornado import gen from tornado.ioloop import IOLoop from tornado.concurrent import Future inited = False async def simulator(futures): for f in futures: # simulate the asynchronous passage of time await gen.sleep(0) await gen.sleep(0) f.set_result(None) def use_some_resource(): global inited global futures_q if not inited: inited = True # Ensure reliable doctest output: resolve Futures one at a time. futures_q = deque([Future() for _ in range(3)]) IOLoop.current().add_callback(simulator, list(futures_q)) return futures_q.popleft() .. testcode:: semaphore import asyncio from tornado import gen from tornado.locks import Semaphore sem = Semaphore(2) async def worker(worker_id): await sem.acquire() try: print("Worker %d is working" % worker_id) await use_some_resource() finally: print("Worker %d is done" % worker_id) sem.release() async def runner(): # Join all workers. await gen.multi([worker(i) for i in range(3)]) asyncio.run(runner()) .. testoutput:: semaphore Worker 0 is working Worker 1 is working Worker 0 is done Worker 2 is working Worker 1 is done Worker 2 is done Workers 0 and 1 are allowed to run concurrently, but worker 2 waits until the semaphore has been released once, by worker 0. The semaphore can be used as an async context manager:: async def worker(worker_id): async with sem: print("Worker %d is working" % worker_id) await use_some_resource() # Now the semaphore has been released. print("Worker %d is done" % worker_id) For compatibility with older versions of Python, `.acquire` is a context manager, so ``worker`` could also be written as:: @gen.coroutine def worker(worker_id): with (yield sem.acquire()): print("Worker %d is working" % worker_id) yield use_some_resource() # Now the semaphore has been released. print("Worker %d is done" % worker_id) .. versionchanged:: 4.3 Added ``async with`` support in Python 3.5. valuer*NcPt||dkr td||_y)Nrz$semaphore initial value must be >= 0)superr ValueErrorr^rrr4s rrzSemaphore.__init__s(  19CD D rct|}|jdk(rdnd|j}|jr|dt |j}d|ddd|d S) Nrlockedzunlocked,value:z ,waiters:r2r z [z]>)rr7r^rr5)rresextrar4s rr7zSemaphore.__repr__sig  KK1,ODKK=2Q ==gYs4=='9&:;E3q9+Rwb))rc|xjdz c_|jrh|jj}|js0|xjdzc_|j t |y|jrgyy)*Increment the counter and wake one waiter.r N)r^rrPr$rgrtr<s rrzSemaphore.releasesb q mm]]**,F;;= q !!":4"@Ammrr8cttjdkDr1xjdzc_jtSjj |rLdfd }t jjj||jfdS)zDecrement the counter. Returns an awaitable. Block if the counter is zero and wait for a `.release`. The awaitable raises `.TimeoutError` after the deadline. rr cjs#jtjj yr#)r$ set_exceptionr TimeoutErrorr(r<srr>z%Semaphore.acquire..on_timeouts/!;;=,,S-=-=-?@))+rc&jSr#r@rBs rrFz#Semaphore.acquire..sg44^Drr)) rr^rgrtrrGrrHrIrJrKrLs` @@@racquirezSemaphore.acquires ;;? KK1 K   6t< =  MM  (, !--//1!(!4!4Wj!I((D rctd)Nz0Use 'async with' instead of 'with' for Semaphore RuntimeErrorrs rryzSemaphore.__enter__sMNNrtypr{ tracebackc$|jyr#ry)rrrrs rrzSemaphore.__exit__ rc@K|jd{y7wr#rrs r __aenter__zSemaphore.__aenter__lln tbc,K|jywr#rrrrrs r __aexit__zSemaphore.__aexit__ rUr)r#)r+r,r-r.r[rrVr7rrrrWrXrYr rtrryrrrrrr __classcell__r4s@rrrsbHc$*#*$EIeX-?-?&? @A + ,8O , &E//0    , & U(( )   rrc:eZdZdZddeddffd Zdfd ZxZS) ra:A semaphore that prevents release() being called too many times. If `.release` would increment the semaphore's value past the initial value, it raises `ValueError`. Semaphores are mostly used to guard resources with limited capacity, so a semaphore released too many times is a sign of a bug. rr*Nc4t||||_y)Nr)rr_initial_valuers rrzBoundedSemaphore.__init__s u%#rcj|j|jk\r tdt|y)rz!Semaphore released too many timesN)r^rrrr)rr4s rrzBoundedSemaphore.releases+ ;;$-- -@A A rrUr))r+r,r-r.r[rrrrs@rrrs%$c$$$rrceZdZdZddZdefdZ ddeee e jfde e fdZddZdd Zd d d eed eej&ddfdZddZd d d eed eej&ddfdZy)raA lock for coroutines. A Lock begins unlocked, and `acquire` locks it immediately. While it is locked, a coroutine that yields `acquire` waits until another coroutine calls `release`. Releasing an unlocked lock raises `RuntimeError`. A Lock can be used as an async context manager with the ``async with`` statement: >>> from tornado import locks >>> lock = locks.Lock() >>> >>> async def f(): ... async with lock: ... # Do something holding the lock. ... pass ... ... # Now the lock is released. For compatibility with older versions of Python, the `.acquire` method asynchronously returns a regular context manager: >>> async def f2(): ... with (yield lock.acquire()): ... # Do something holding the lock. ... pass ... ... # Now the lock is released. .. versionchanged:: 4.3 Added ``async with`` support in Python 3.5. r*Nc&td|_y)Nr r)r_blockrs rrz Lock.__init__ s&Q/ rcPd|jjd|jdS)Nr2z _block=r3)r4r+rrs rr7z Lock.__repr__ s&4>>**+8DKK=BBrr8c8|jj|S)zAttempt to lock. Returns an awaitable. Returns an awaitable, which raises `tornado.util.TimeoutError` after a timeout. )rr)rr8s rrz Lock.acquires{{""7++rcj |jjy#t$r tdwxYw)zUnlock. The first coroutine in line waiting for `acquire` gets the lock. If not locked, raise a `RuntimeError`. zrelease unlocked lockN)rrrrrs rrz Lock.releases2 8 KK   ! 867 7 8s2ctd)Nz+Use `async with` instead of `with` for Lockrrs rryzLock.__enter__&sHIIrrr{rrc$|jyr#rrs rrz Lock.__exit__)rrc@K|jd{y7wr#rrs rrzLock.__aenter__1rrc,K|jywr#rrs rrzLock.__aexit__4rrr)r#)r+r,r-r.rrVr7rrrWrXrYr rtrrryrrrrrrr/rrrrs"H0C#CEI,eX-?-?&? @A, + ,, 8J , & U(( )    , & U(( )   rr)rrXrtornadorrtornado.concurrentrrtypingrrr r r TYPE_CHECKINGr r __all__rrrrtrrrr/rrrs I88  ! IXX,f((f(RaaH0t(tny(UUr