K iddlZddlZddlZddlZddlZddlmZddlmZddl m Z ddl m Z ddl mZddlmZmZmZmZddlmZGd d eZeGd d ej.Zy) N)gen)IOStream)app_log) TCPServer) skipIfNonUnix) AsyncTestCase ExpectLogbind_unused_portgen_test)TuplecBeZdZedZedZdZedZy) TCPServerTestc#8KGddt}dx}} t\}}|}|j|tt j}t t d5|jd|f|jd|jtjddd||j||jyy#1swY/xYw#||j||jwwxYww)Nc0eZdZejdZy)FTCPServerTest.test_handle_stream_coroutine_logging..TestServerc3nK|jtd|jddz yw)Nhellor) read_byteslencloseselfstreamaddresss a/mnt/ssd/data/python-lab/Trading/venv/lib/python3.12/site-packages/tornado/test/tcpserver_test.py handle_streamzTTCPServerTest.test_handle_stream_coroutine_logging..TestServer.handle_streams+''H 66 As35N__name__ __module__ __qualname__r coroutinerr TestServerrs ]]  r$r%zException in callback localhostr)rr add_socketrsocketr rconnectwriteread_until_closermomentstopr)rr%serverclientsockports r$test_handle_stream_coroutine_loggingz2TCPServerTest.test_handle_stream_coroutine_loggings   )+JD$\F   d #fmmo.F7$;< !nnk4%899ll8,,--//jj  ! ! ! " ! ! ! ! "s6DAC0)A C$6C0>&D$C-)C00'DDc#`KGddt}t\}}|}|j|tt j}|j d|f|j }|j|d|j|jyw)NceZdZdZy)ETCPServerTest.test_handle_stream_native_coroutine..TestServercNK|jd|jyw)Ndata)r*rrs rrzSTCPServerTest.test_handle_stream_native_coroutine..TestServer.handle_stream3s W% s#%N)rr r!rr#r$rr%r52s r$r%r&r7) rr r'rr(r)r+ assertEqualr-r)rr%r0r1r.r/results r#test_handle_stream_native_coroutinez1TCPServerTest.test_handle_stream_native_coroutine.s   &' d$&--/*nnk4011..00 )  sB,B.ct\}}t}|j||j|jyN)r rr'r-)rr0r1r.s rtest_stop_twicezTCPServerTest.test_stop_twiceAs5%' d$  r$c#H KG fddt}t\}}| j|d|f d}t|Dcgc]}t t j !}}g t j fd}|Dcgc] }|| c}|jt dd t |k(r|jd D]}|jycc}wcc}w# D]}|jwxYww) Nc6eZdZejfdZy)7TCPServerTest.test_stop_in_callback..TestServerc3RKj|jywr<)r-r+)rrrr.s rrzETCPServerTest.test_stop_in_callback..TestServer.handle_streamOs --//s$'Nr)r.srr%r@Ns ]] 0 0r$r%r&(c3vK |jj|y#t$rYywxYwwr<)r)appendOSError)cconnected_clients server_addrs rr)z4TCPServerTest.test_stop_in_callback..connect\s> ,ii ,,"((+  s9*9 6969rzall clients failed connectingzHat least one client should fail connecting for the test to be meaningful) rr r'rangerr(rr" assertGreaterrskipTestr) rr%r0r1Niclientsr)rFrGr.rHs @@@rtest_stop_in_callbackz#TCPServerTest.test_stop_in_callbackHs  0 0 &' d$"D) 6;Ah?8FMMO,??  ,  ,$++awqz++ 30116UV $%* 4 '   1@,'   s6A D"$C<3!D"D#!D"D$"D"DD"N)rr r!r r2r:r=rOr#r$rrrsA 6$))r$rc>eZdZdedeeeffdZdZdZdZdZ y) TestMultiprocesscodereturnc $ tjtjdgd|dd}|j|jfS#tj$r8}t d|j d|jd|j|d}~wwxYw)Nz-Werror::DeprecationWarningTutf8)capture_outputinputencodingcheckzProcess returned z stdout=z stderr=) subprocessrunsys executableCalledProcessError RuntimeError returncodestdoutstderr)rrRr9es r run_subproczTestMultiprocess.run_subproc~s ^^!>?# F}}fmm++ ,, #ALL>!((8AHH:V  s*AB3B  Bctjd}|j|\}}|jdj t |d|j|dy)Na import asyncio from tornado.tcpserver import TCPServer async def main(): server = TCPServer() server.listen(0, address='127.0.0.1') asyncio.run(main()) print('012', end='') 012textwrapdedentrdr8joinsortedrrRouterrs rtest_listen_singlez#TestMultiprocess.test_listen_singlesZ   ##D)S -u5 b!r$ctjd}|j|\}}|jdj t |d|j|dy)Na import warnings from tornado.ioloop import IOLoop from tornado.process import task_id from tornado.tcpserver import TCPServer warnings.simplefilter("ignore", DeprecationWarning) server = TCPServer() server.bind(0, address='127.0.0.1') server.start(3) IOLoop.current().run_sync(lambda: None) print(task_id(), end='') rfrgrhrms rtest_bind_startz TestMultiprocess.test_bind_startX  "##D)S -u5 b!r$ctjd}|j|\}}|jdj t |d|j|dy)Na import asyncio from tornado.netutil import bind_sockets from tornado.process import fork_processes, task_id from tornado.ioloop import IOLoop from tornado.tcpserver import TCPServer sockets = bind_sockets(0, address='127.0.0.1') fork_processes(3) async def post_fork_main(): server = TCPServer() server.add_sockets(sockets) asyncio.run(post_fork_main()) print(task_id(), end='') rfrgrhrms rtest_add_socketsz!TestMultiprocess.test_add_socketsrsr$ctjd}|j|\}}|jdj t |d|j|dy)Na import asyncio import socket from tornado.netutil import bind_sockets from tornado.process import task_id, fork_processes from tornado.tcpserver import TCPServer # Pick an unused port which we will be able to bind to multiple times. (sock,) = bind_sockets(0, address='127.0.0.1', family=socket.AF_INET, reuse_port=True) port = sock.getsockname()[1] fork_processes(3) async def main(): server = TCPServer() server.listen(port, address='127.0.0.1', reuse_port=True) asyncio.run(main()) print(task_id(), end='') rfrgrhrms rtest_listen_multi_reuse_portz-TestMultiprocess.test_listen_multi_reuse_portsX  ,##D)S -u5 b!r$N) rr r!strr rdrprrrurwr#r$rrQrQws1 , ,c3h ,"(",","r$rQ)r(rZr\riunittesttornadortornado.iostreamr tornado.logrtornado.tcpserverrtornado.test.utilrtornado.testingrr r r typingr rTestCaserQr#r$rrs_  %'+PPaMaLn"x((n"n"r$