Coverage for src/robotide/utils/eventhandler.py: 25%
234 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-06 10:40 +0100
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-06 10:40 +0100
1# Copyright 2008-2015 Nokia Networks
2# Copyright 2016- Robot Framework Foundation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
16import os 1bc
18import wx 1bc
20IS_WINDOWS = os.sep == '\\' 1bc
23def normalize_windows_path(path): 1bc
24 return path.lower().replace('\\', '/') if IS_WINDOWS else path 1a
27class _RideFSWatcherHandler: 1bc
29 def __init__(self): 1bc
30 self._fs_watcher = None 1bc
31 self._is_workspace_dirty = False 1bc
32 self._initial_watched_path = None 1bc
33 self._watched_path = set() 1bc
34 self._excluded_path = set() 1bc
36 def create_fs_watcher(self, path): 1bc
37 if self._fs_watcher: 1a
38 return 1a
39 self._initial_watched_path = path 1a
40 try: 1a
41 self._fs_watcher = wx.FileSystemWatcher() 1a
42 except Exception as e:
43 print(e)
44 return
45 self._fs_watcher.Bind(wx.EVT_FSWATCHER, self._on_fs_event) 1a
47 def start_listening(self, path): 1bc
48 if self._initial_watched_path != path: 1a
49 self._initial_watched_path = path 1a
50 self.stop_listening() 1a
51 # on MSW, we get a popup from wxWidgets
52 # (https://github.com/wxWidgets/wxWidgets/blob/master/src/msw/fswatcher.cpp#L165)
53 # when the path is a network share, like for example WSL: \\wsl.localhost\docker-desktop\tmp\
54 # We avoid the popup by ignoring it
55 if path.startswith('\\\\') and IS_WINDOWS: 55 ↛ 56line 55 didn't jump to line 56 because the condition on line 55 was never true1a
56 print(f"INFO: Not watching file system changes for path: {path}")
57 return
58 if os.path.isdir(path): 1a
59 # only watch folders
60 # MSW do not support watch single file
61 path = os.path.join(path, '') 1a
62 try: 1a
63 self._fs_watcher.AddTree(path) 1a
64 except Exception as e:
65 print(e)
66 return
67 # Add all files to the monitoring list
68 from wx import FileSystem 1a
69 fs = FileSystem() 1a
70 fs.ChangePathTo(path, True) 1a
71 # An assertion error happens when chinese chars named directories, so we try to ignore it
72 # wx._core.wxAssertionError: C++ assertion "Assert failure" failed at ../src/common/unichar.cpp(65) in
73 # ToHi8bit(): character cannot be converted to single byte
74 file_search = None 1a
75 try: 1a
76 file_search = fs.FindFirst("*") 1a
77 except AssertionError:
78 pass
79 while file_search: 1a
80 if self._is_valid_file_format(file_search): 80 ↛ 81line 80 didn't jump to line 81 because the condition on line 80 was never true1a
81 changing_file = fs.URLToFileName(file_search)
82 self._watched_path.add(normalize_windows_path(changing_file))
83 if IS_WINDOWS: # Here we only add the file parent directory
84 changing_file = os.path.join(os.path.dirname(changing_file), '')
85 try:
86 self._fs_watcher.Add(changing_file)
87 except Exception as e:
88 print(e)
89 try: 1a
90 file_search = fs.FindNext() 1a
91 except AssertionError:
92 pass
93 self._watched_path.add(normalize_windows_path(path)) 1a
94 self._exclude_paths() 1a
95 else:
96 self._watched_path.add(normalize_windows_path(path)) # Here we add the file path 1a
97 if IS_WINDOWS: 97 ↛ 98line 97 didn't jump to line 98 because the condition on line 97 was never true1a
98 path = os.path.join(os.path.dirname(path), '') # Here we only add the file parent directory
99 try: 1a
100 self._fs_watcher.Add(path) 1a
101 except Exception as e:
102 print(e)
103 return
104 self._exclude_paths() 1a
106 def stop_listening(self): 1bc
107 self._is_workspace_dirty = False 1a
108 self._fs_watcher.RemoveAll() 1a
109 self._watched_path = set() 1a
111 def _exclude_paths(self): 1bc
112 for item in self._excluded_path: 112 ↛ 113line 112 didn't jump to line 113 because the loop on line 112 never started1a
113 if os.path.isdir(item):
114 item = os.path.join(item, '')
115 try:
116 self._fs_watcher.RemoveTree(item)
117 except Exception:
118 pass
119 # Remove all files to the monitoring list
120 from wx import FileSystem
121 fs = FileSystem()
122 fs.ChangePathTo(item, True)
123 file_search = None
124 try:
125 file_search = fs.FindFirst("*")
126 except AssertionError:
127 pass
128 while file_search:
129 if self._is_valid_file_format(file_search):
130 changing_file = fs.URLToFileName(file_search)
131 try:
132 self._watched_path.remove(normalize_windows_path(changing_file))
133 except KeyError:
134 pass
135 try:
136 self._fs_watcher.Remove(changing_file)
137 except Exception:
138 pass
139 try:
140 file_search = fs.FindNext()
141 except AssertionError:
142 pass
143 try:
144 self._watched_path.remove(normalize_windows_path(item))
145 except KeyError:
146 pass
147 else:
148 if self._is_valid_file_format(item):
149 try:
150 self._watched_path.remove(normalize_windows_path(item))
151 except KeyError:
152 pass
153 try:
154 self._fs_watcher.Remove(item)
155 except Exception:
156 pass
158 def exclude_listening(self, path): 1bc
159 self._excluded_path = set() 1a
160 if isinstance(path, list): 160 ↛ 161line 160 didn't jump to line 161 because the condition on line 160 was never true1a
161 for item in path:
162 if os.path.isdir(item):
163 item = os.path.join(item, '')
164 self._excluded_path.add(normalize_windows_path(item))
165 # Remove all files to the monitoring list
166 from wx import FileSystem
167 fs = FileSystem()
168 fs.ChangePathTo(item, True)
169 file_search = None
170 try:
171 file_search = fs.FindFirst("*")
172 except AssertionError:
173 pass
174 while file_search:
175 if self._is_valid_file_format(file_search):
176 self._excluded_path.add(normalize_windows_path(fs.URLToFileName(file_search)))
177 try:
178 file_search = fs.FindNext()
179 except AssertionError:
180 pass
181 else:
182 if self._is_valid_file_format(item):
183 self._excluded_path.add(normalize_windows_path(item))
184 else:
185 if self._is_valid_file_format(path): 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true1a
186 self._excluded_path.add(normalize_windows_path(path))
188 def is_workspace_dirty(self): 1bc
189 if self._watched_path: 189 ↛ 192line 189 didn't jump to line 192 because the condition on line 189 was always true1a
190 return self._is_workspace_dirty 1a
191 else:
192 return False
194 def is_watcher_created(self): 1bc
195 return self._fs_watcher is not None 1a
197 def get_workspace_new_path(self): 1bc
198 return self._initial_watched_path # Returning file or directory name 1a
200 def _on_fs_event(self, event): 1bc
201 if self._is_mark_dirty_needed(event):
202 self._is_workspace_dirty = True
204 def _is_mark_dirty_needed(self, event): 1bc
205 new_path = event.GetNewPath()
206 if os.path.isdir(new_path):
207 norm_new_path = normalize_windows_path(os.path.join(new_path, ''))
208 norm_new_dir = norm_new_path
209 else:
210 norm_new_path = normalize_windows_path(new_path)
211 norm_new_dir = normalize_windows_path(os.path.join(os.path.dirname(new_path), ''))
212 previous_path = event.GetPath()
213 norm_previous_path = normalize_windows_path(previous_path)
214 norm_previous_dir = normalize_windows_path(os.path.join(os.path.dirname(previous_path), ''))
215 change_type = event.GetChangeType()
217 def is_path_excluded(path):
218 if path.endswith('/'): # We assume it is normalized
219 excluded_directories = set()
220 for x in self._excluded_path:
221 if x.endswith('/'):
222 excluded_directories.add(x)
223 if not excluded_directories:
224 return False
225 for excluded in excluded_directories:
226 if path.startswith(excluded):
227 return True
228 else:
229 return path in self._excluded_path
231 # DEBUG
232 def event_name(code):
233 table = ['FSW_EVENT_MODIFY', 'FSW_EVENT_CREATE', 'FSW_EVENT_DELETE', 'FSW_EVENT_RENAME']
234 value = 0
235 if code == wx.FSW_EVENT_MODIFY:
236 value = 0
237 if code == wx.FSW_EVENT_CREATE:
238 value = 1
239 if code == wx.FSW_EVENT_DELETE:
240 value = 2
241 if code == wx.FSW_EVENT_RENAME:
242 value = 3
243 return table[value]
245 watched = []
246 self._fs_watcher.GetWatchedPaths(watched)
247 # print(f"DEBUG: eventhandler.py _is_mark_dirty_needed event={event.ToString()}\n GetWatchedPaths={watched}")
249 # print(f"\nDEBUG: eventhandler _is_mark_dirty_needed new_path={new_path} previous_path={previous_path}"
250 # f" change_type={change_type}=={event_name(change_type)}\n"
251 # f"norm_previous_path={norm_previous_path} norm_previous_dir={norm_previous_dir}\n"
252 # f" self._watched_path={self._watched_path} self._excluded_path={self._excluded_path}")
254 if change_type == wx.FSW_EVENT_MODIFY:
255 if (not is_path_excluded(norm_new_dir) and not is_path_excluded(norm_previous_path)
256 and not is_path_excluded(norm_previous_dir)):
257 if os.path.isfile(previous_path):
258 return self._is_valid_file_format(previous_path)
259 return False
261 if change_type == wx.FSW_EVENT_CREATE:
262 if (is_path_excluded(norm_new_dir) or is_path_excluded(norm_previous_path)
263 or is_path_excluded(norm_previous_dir)):
264 return False
265 if os.path.isfile(new_path):
266 return self._is_valid_file_format(new_path)
267 elif os.path.isdir(new_path):
268 return True
269 elif change_type == wx.FSW_EVENT_DELETE:
270 if is_path_excluded(norm_previous_path) or is_path_excluded(norm_previous_dir):
271 return False
272 if norm_previous_path in self._watched_path:
273 # workspace root folder / suite file is deleted
274 self._watched_path.remove(norm_previous_path)
275 return True
276 if norm_previous_dir in self._watched_path:
277 # workspace root folder / suite file is deleted
278 self._watched_path.remove(norm_previous_dir)
279 return True
280 # We need to check if it was a directory or a valid file, not possible to detect it was a directory
281 if norm_previous_path.endswith(os.sep) or norm_previous_path == norm_previous_dir[:-1]:
282 return True
283 else:
284 return self._is_valid_file_format(previous_path)
285 elif change_type == wx.FSW_EVENT_RENAME:
286 if is_path_excluded(norm_new_path) or is_path_excluded(norm_new_dir):
287 return False
288 if norm_previous_path in self._watched_path:
289 # workspace root folder / suite file is renamed
290 self._watched_path.remove(norm_previous_path)
291 self._watched_path.add(norm_new_path)
292 return True
293 if self._is_valid_file_format(previous_path): # Old name was valid file
294 return True
295 # We need to check if it is a directory or a valid file
296 if os.path.isfile(new_path):
297 return self._is_valid_file_format(new_path)
298 elif os.path.isdir(new_path):
299 return True
300 else:
301 return False
303 @staticmethod 1bc
304 def _is_valid_file_format(file_path): 1bc
305 # only watch files with certain extensions
306 suffixes = ('.robot', '.txt', '.resource', '.tsv') # DEBUG: Make these extensions configurable 1a
307 return os.path.splitext(file_path)[-1].lower() in suffixes 1a
310RideFSWatcherHandler = _RideFSWatcherHandler() 1bc