K i0xdZddlZddlZddlZddlmZmZddlmZm Z ddl m Z ddl m Z mZmZmZmZddl Z e j$r ddl mZmZmZedZgd ZGd d eZGd d eZdede deej8fddfdZGddeeZGddeeZGddeZ GddeZ!y)aAsynchronous queues for coroutines. These classes are very similar to those provided in the standard library's `asyncio package `_. .. warning:: Unlike the standard library's `queue` module, the classes defined here are *not* thread-safe. To use these queues from another thread, use `.IOLoop.add_callback` to transfer control to the `.IOLoop` thread before calling any queue methods. N)genioloop)Future"future_set_result_unless_cancelled)Event)UnionTypeVarGeneric AwaitableOptional)DequeTupleAny_T)Queue PriorityQueue LifoQueue QueueFull QueueEmptyceZdZdZy)rz:Raised by `.Queue.get_nowait` when the queue has no items.N__name__ __module__ __qualname____doc__T/mnt/ssd/data/python-lab/Trading/venv/lib/python3.12/site-packages/tornado/queues.pyrr/sDrrceZdZdZy)rzBRaised by `.Queue.put_nowait` when a queue is at its maximum size.Nrrrrrr5sLrrfuturetimeoutreturnc|rLdfd }tjjj||j fdyy)Ncnjs$jtjyyN)done set_exceptionr TimeoutError)r sr on_timeoutz _set_timeout..on_timeout@s(;;=$$S%5%5%78!rc&jSr%)remove_timeout)_io_looptimeout_handles rz_set_timeout..Fs7+A+A.+Qrr"N)rIOLoopcurrent add_timeoutadd_done_callback)r r!r)r-r.s` @@r _set_timeoutr5;sG 9--'') ,,WjA  !QRrc&eZdZddZdeefdZy)_QueueIteratorr"Nc||_yr%)q)selfr9s r__init__z_QueueIterator.__init__Js rc6|jjSr%)r9getr:s r __anext__z_QueueIterator.__anext__Msvvzz|r)r9z Queue[_T]r"N)rrrr;r rr?rrrr7r7Is9R=rr7ceZdZdZdZddeddfdZedefdZdefdZ de fdZ de fd Z dd e d eeeej$fdd fd Zd e ddfdZ dd eeeej$fdee fdZde fdZddZ dd eeeej$fdedfdZdee fdZddZde fdZd e ddfdZd e ddfdZddZ de!fdZ"de!fdZ#de!fdZ$y)raCoordinate producer and consumer coroutines. If maxsize is 0 (the default) the queue size is unbounded. .. testcode:: import asyncio from tornado.ioloop import IOLoop from tornado.queues import Queue q = Queue(maxsize=2) async def consumer(): async for item in q: try: print('Doing work on %s' % item) await asyncio.sleep(0.01) finally: q.task_done() async def producer(): for item in range(5): await q.put(item) print('Put %s' % item) async def main(): # Start consumer without waiting (since it never finishes). IOLoop.current().spawn_callback(consumer) await producer() # Wait for producer to put all tasks. await q.join() # Wait for consumer to finish all tasks. print('Done') asyncio.run(main()) .. testoutput:: Put 0 Put 1 Doing work on 0 Put 2 Doing work on 1 Put 3 Doing work on 2 Put 4 Doing work on 3 Doing work on 4 Done In versions of Python without native coroutines (before 3.5), ``consumer()`` could be written as:: @gen.coroutine def consumer(): while True: item = yield q.get() try: print('Doing work on %s' % item) yield gen.sleep(0.01) finally: q.task_done() .. versionchanged:: 4.3 Added ``async for`` support in Python 3.5. Nmaxsizer"c4| td|dkr td||_|jt j g|_t j g|_d|_t|_ |jjy)Nzmaxsize can't be Nonerzmaxsize can't be negative) TypeError ValueError_maxsize_init collectionsdeque_getters_putters_unfinished_tasksr _finishedset)r:rAs rr;zQueue.__init__s{ ?34 4 Q;89 9  #))"- #))"- !" rc|jS)z%Number of items allowed in the queue.)rEr>s rrAz Queue.maxsizes}}rc,t|jS)zNumber of items in the queue.)len_queuer>s rqsizez Queue.qsizes4;;rc|j Sr%rQr>s remptyz Queue.emptys;;rc\|jdk(ry|j|jk\S)NrF)rArRr>s rfullz Queue.fulls& <<1 ::<4<</ /ritemr!z Future[None]ct} |j||jd|S#t$r-|jj ||ft ||Y|SwxYw)aPut an item into the queue, perhaps waiting until there is room. Returns a Future, which raises `tornado.util.TimeoutError` after a timeout. ``timeout`` may be a number denoting a time (on the same scale as `tornado.ioloop.IOLoop.time`, normally `time.time`), or a `datetime.timedelta` object for a deadline relative to the current time. N)r put_nowait set_resultrrJappendr5)r:rXr!r s rputz Queue.putsi $ OOD !   d #  * MM $ 0  )  *s02A&%A&cD|j|jr]|jsJd|jj}|j |t ||j y|jrt|j |y)z{Put an item into the queue without blocking. If no free slot is immediately available, raise `QueueFull`. z)queue non-empty, why are getters waiting?N) _consume_expiredrIrUpopleft_Queue__put_internalr_getrWr)r:rXgetters rrZzQueue.put_nowaitsw  ==::< L!L L<]]**,F    % .vtyy{ C YY[O    %rct} |j|j|S#t$r+|jj |t ||Y|SwxYw)a.Remove and return an item from the queue. Returns an awaitable which resolves once an item is available, or raises `tornado.util.TimeoutError` after a timeout. ``timeout`` may be a number denoting a time (on the same scale as `tornado.ioloop.IOLoop.time`, normally `time.time`), or a `datetime.timedelta` object for a deadline relative to the current time. .. note:: The ``timeout`` argument of this method differs from that of the standard library's `queue.Queue.get`. That method interprets numeric values as relative timeouts; this one interprets them as absolute deadlines and requires ``timedelta`` objects for relative timeouts (consistent with other timeouts in Tornado). )rr[ get_nowaitrrIr\r5)r:r!r s rr=z Queue.gets^. *   doo/ 0  * MM  (  )  *s-0A! A!cH|j|jra|jsJd|jj\}}|j |t |d|j S|jr|j St)zRemove and return an item from the queue without blocking. Return an item if one is immediately available, else raise `QueueEmpty`. z(queue not full, why are putters waiting?N) r_rJrWr`rarrbrRr)r:rXputters rrezQueue.get_nowaits  ==99; J J J;==002LD&    % .vt <99;  ZZ\99;  rc|jdkr td|xjdzc_|jdk(r|jjyy)aIndicate that a formerly enqueued task is complete. Used by queue consumers. For each `.get` used to fetch a task, a subsequent call to `.task_done` tells the queue that the processing on the task is complete. If a `.join` is blocking, it resumes when all items have been processed; that is, when every `.put` is matched by a `.task_done`. Raises `ValueError` if called more times than `.put`. rz!task_done() called too many timesN)rKrDrLrMr>s r task_donezQueue.task_donesR  ! !Q &@A A !#  ! !Q & NN    'rc8|jj|S)zBlock until all items in the queue are processed. Returns an awaitable, which raises `tornado.util.TimeoutError` after a timeout. )rLwait)r:r!s rjoinz Queue.join$s~~""7++rct|Sr%)r7r>s r __aiter__zQueue.__aiter__.s d##rc6tj|_yr%)rGrHrQr>s rrFz Queue._init2s!'') rc6|jjSr%)rQr`r>s rrbz Queue._get5s{{""$$rc:|jj|yr%rQr\r:rXs r_putz Queue._put8 4 rc|xjdz c_|jj|j|y)Nri)rKrLclearrurts r__put_internalzQueue.__put_internal=s. !#  $rc|jrg|jddjrG|jj|jr!|jddjrG|jrd|jdjrF|jj|jr|jdjrDyyyy)Nrri)rJr&r`rIr>s rr_zQueue._consume_expiredBsmm a 0 3 8 8 : MM ! ! #mm a 0 3 8 8 :mm a 0 5 5 7 MM ! ! #mm a 0 5 5 7m 7mrcdt|jdtt|d|j dS)N)typerhexid_formatr>s r__repr__zQueue.__repr__Js74:&&'tC4M?!DLLN;K1MMrcVdt|jd|jdS)Nr|r}r~)rrrr>s r__str__z Queue.__str__Ms)4:&&'q(8::rc:d|j}t|ddr|d|jzz }|jr|dt |jzz }|j r|dt |j zz }|j r|d|j zz }|S)Nzmaxsize=rQz queue=%rz getters[%s]z putters[%s]z tasks=%s)rAgetattrrQrIrPrJrK)r:results rrz Queue._formatPsDLL+, 44 ( kDKK/ /F == ns4=='99 9F == ns4=='99 9F  ! ! kD$:$:: :F r)rr%r0)%rrrrrQintr;propertyrArRboolrUrWrr rfloatdatetime timedeltar]rZr r=rerjrmr7rorFrbrurar_strrrrrrrrrQsAJF  D  s t0d0OS!)%x7I7I0I*J!K .&r&d&"EIeX-?-?&? @A 2>B$!&EI,eX-?-?&? @A, 4,$>"-$*%b%!!! 2$ $N#N;;  rrc4eZdZdZddZdeddfdZdefdZy) raA `.Queue` that retrieves entries in priority order, lowest first. Entries are typically tuples like ``(priority number, data)``. .. testcode:: import asyncio from tornado.queues import PriorityQueue async def main(): q = PriorityQueue() q.put((1, 'medium-priority item')) q.put((0, 'high-priority item')) q.put((10, 'low-priority item')) print(await q.get()) print(await q.get()) print(await q.get()) asyncio.run(main()) .. testoutput:: (0, 'high-priority item') (1, 'medium-priority item') (10, 'low-priority item') r"Ncg|_yr%rTr>s rrFzPriorityQueue._initz  rrXcDtj|j|yr%)heapqheappushrQrts rruzPriorityQueue._put}s t{{D)rc@tj|jSr%)rheappoprQr>s rrbzPriorityQueue._gets}}T[[))rr0rrrrrFrrurbrrrrr]s+8****b*rrc4eZdZdZddZdeddfdZdefdZy) raA `.Queue` that retrieves the most recently put items first. .. testcode:: import asyncio from tornado.queues import LifoQueue async def main(): q = LifoQueue() q.put(3) q.put(2) q.put(1) print(await q.get()) print(await q.get()) print(await q.get()) asyncio.run(main()) .. testoutput:: 1 2 3 r"Ncg|_yr%rTr>s rrFzLifoQueue._initrrrXc:|jj|yr%rsrts rruzLifoQueue._putrvrc6|jjSr%)rQpopr>s rrbzLifoQueue._gets{{  rr0rrrrrrs+4!!!!b!rr)"rrGrrtornadorrtornado.concurrentrr tornado.locksrtypingrr r r r TYPE_CHECKINGr rrr__all__ Exceptionrrrrr5r7rrrrrrrs  I??  (( T] L    S  S"40B0B#BC S  SWR[IGBKIX$*E$*N"!"!r