ML indZddlmZddlZddlmZmZddlm Z ddl m Z Gdde Z Gd d Z y) z+Watch parts of the file system for changes.) annotationsN)IterableSet) NamedTuple)FileSystemCachec,eZdZUded<ded<ded<y)FileDatafloatst_mtimeintst_sizestrhashN)__name__ __module__ __qualname____annotations__T/mnt/ssd/data/python-lab/Trading/venv/lib/python3.12/site-packages/mypy/fswatcher.pyr r sO L Irr cXeZdZdZd dZd dZddZddZddZddZ ddZ dd Z dd Z y )FileSystemWatchera+Watcher for file system changes among specific paths. All file system access is performed using FileSystemCache. We detect changed files by stat()ing them all and comparing hashes of potentially changed files. If a file has both size and mtime unmodified, the file is assumed to be unchanged. An important goal of this class is to make it easier to eventually use file system events to detect file changes. Note: This class doesn't flush the file system cache. If you don't manually flush it, changes won't be seen. c>||_t|_i|_yN)fsset_paths _file_data)selfrs r__init__zFileSystemWatcher.__init__$s # 68rcp|jjDcic] \}}| || c}}Scc}}wr)ritems)rkvs rdump_file_dataz FileSystemWatcher.dump_file_data)s/!%!6!6!8JAAM1JJJs 22c"||j|<yr)r)rpathdatas r set_file_datazFileSystemWatcher.set_file_data,s $rc|D] }||jvsd|j|<"|xjt|zc_yr)rrrrpathsr's radd_watched_pathsz#FileSystemWatcher.add_watched_paths/sA -D4;;&)-%  - s5z! rc|D]}||jvs|j|= |xjt|zc_yr)rrrr+s rremove_watched_pathsz&FileSystemWatcher.remove_watched_paths7s= *Dt&OOD) * s5z! rc|jj|}t|j|j||j |<yr)r hash_digestr r r r)rr'str1s r_updatezFileSystemWatcher._update=s5gg))$/ (bjj+ Nrcrt}|D]&}|j|}|jj|}|$|3|j |d|j|<T|$|j ||j ||z|j |j k7s,t|jt|jk7s|jj|}|j |||j |j k7s||jk7s|j |)|Sr) rrr stat_or_noneaddr3r r r r1r)rr,changedr'oldr2new_hashs r _find_changedzFileSystemWatcher._find_changedAs% *D//$'C%%d+Bz?KK%,0DOOD);KK%LLr*ZZ3;;.#bkk2Bc#,,FW2W $ww2248HLLr*zzS[[0H4H D)+ *,rc8|j|jS)zGReturn paths that have changes since the last call, in the watched set.)r:r)rs r find_changedzFileSystemWatcher.find_changed[s!!$++..rch|j||j||j|S)a/Alternative to find_changed() given explicit changes. This only calls self.fs.stat() on added or updated files, not on all files. It believes all other files are unchanged! Implies add_watched_paths() for add and update, and remove_watched_paths() for remove. )r/r-r:)rremoveupdates rupdate_changedz FileSystemWatcher.update_changed_s1 !!&) v&!!&))rN)rrreturnNone)rAz!dict[str, tuple[float, int, str]])r'rr(r rArB)r, Iterable[str]rArB)r'rr2zos.stat_resultrArB)r,rCrAAbstractSet[str])rArD)r> list[str]r?rErArD) rrr__doc__r r%r)r-r/r3r:r<r@rrrrrs6 "9 K%"" O4/ *rr)rF __future__roscollections.abcrr AbstractSettypingr mypy.fscacherr rrrrrMs/1" 8(z X*X*r