Coverage for src/robotide/utils/eventhandler.py: 25%

234 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-06 10:40 +0100

1# Copyright 2008-2015 Nokia Networks 

2# Copyright 2016- Robot Framework Foundation 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15 

16import os 1bc

17 

18import wx 1bc

19 

20IS_WINDOWS = os.sep == '\\' 1bc

21 

22 

23def normalize_windows_path(path): 1bc

24 return path.lower().replace('\\', '/') if IS_WINDOWS else path 1a

25 

26 

27class _RideFSWatcherHandler: 1bc

28 

29 def __init__(self): 1bc

30 self._fs_watcher = None 1bc

31 self._is_workspace_dirty = False 1bc

32 self._initial_watched_path = None 1bc

33 self._watched_path = set() 1bc

34 self._excluded_path = set() 1bc

35 

36 def create_fs_watcher(self, path): 1bc

37 if self._fs_watcher: 1a

38 return 1a

39 self._initial_watched_path = path 1a

40 try: 1a

41 self._fs_watcher = wx.FileSystemWatcher() 1a

42 except Exception as e: 

43 print(e) 

44 return 

45 self._fs_watcher.Bind(wx.EVT_FSWATCHER, self._on_fs_event) 1a

46 

47 def start_listening(self, path): 1bc

48 if self._initial_watched_path != path: 1a

49 self._initial_watched_path = path 1a

50 self.stop_listening() 1a

51 # on MSW, we get a popup from wxWidgets 

52 # (https://github.com/wxWidgets/wxWidgets/blob/master/src/msw/fswatcher.cpp#L165) 

53 # when the path is a network share, like for example WSL: \\wsl.localhost\docker-desktop\tmp\ 

54 # We avoid the popup by ignoring it 

55 if path.startswith('\\\\') and IS_WINDOWS: 55 ↛ 56line 55 didn't jump to line 56 because the condition on line 55 was never true1a

56 print(f"INFO: Not watching file system changes for path: {path}") 

57 return 

58 if os.path.isdir(path): 1a

59 # only watch folders 

60 # MSW do not support watch single file 

61 path = os.path.join(path, '') 1a

62 try: 1a

63 self._fs_watcher.AddTree(path) 1a

64 except Exception as e: 

65 print(e) 

66 return 

67 # Add all files to the monitoring list 

68 from wx import FileSystem 1a

69 fs = FileSystem() 1a

70 fs.ChangePathTo(path, True) 1a

71 # An assertion error happens when chinese chars named directories, so we try to ignore it 

72 # wx._core.wxAssertionError: C++ assertion "Assert failure" failed at ../src/common/unichar.cpp(65) in 

73 # ToHi8bit(): character cannot be converted to single byte 

74 file_search = None 1a

75 try: 1a

76 file_search = fs.FindFirst("*") 1a

77 except AssertionError: 

78 pass 

79 while file_search: 1a

80 if self._is_valid_file_format(file_search): 80 ↛ 81line 80 didn't jump to line 81 because the condition on line 80 was never true1a

81 changing_file = fs.URLToFileName(file_search) 

82 self._watched_path.add(normalize_windows_path(changing_file)) 

83 if IS_WINDOWS: # Here we only add the file parent directory 

84 changing_file = os.path.join(os.path.dirname(changing_file), '') 

85 try: 

86 self._fs_watcher.Add(changing_file) 

87 except Exception as e: 

88 print(e) 

89 try: 1a

90 file_search = fs.FindNext() 1a

91 except AssertionError: 

92 pass 

93 self._watched_path.add(normalize_windows_path(path)) 1a

94 self._exclude_paths() 1a

95 else: 

96 self._watched_path.add(normalize_windows_path(path)) # Here we add the file path 1a

97 if IS_WINDOWS: 97 ↛ 98line 97 didn't jump to line 98 because the condition on line 97 was never true1a

98 path = os.path.join(os.path.dirname(path), '') # Here we only add the file parent directory 

99 try: 1a

100 self._fs_watcher.Add(path) 1a

101 except Exception as e: 

102 print(e) 

103 return 

104 self._exclude_paths() 1a

105 

106 def stop_listening(self): 1bc

107 self._is_workspace_dirty = False 1a

108 self._fs_watcher.RemoveAll() 1a

109 self._watched_path = set() 1a

110 

111 def _exclude_paths(self): 1bc

112 for item in self._excluded_path: 112 ↛ 113line 112 didn't jump to line 113 because the loop on line 112 never started1a

113 if os.path.isdir(item): 

114 item = os.path.join(item, '') 

115 try: 

116 self._fs_watcher.RemoveTree(item) 

117 except Exception: 

118 pass 

119 # Remove all files to the monitoring list 

120 from wx import FileSystem 

121 fs = FileSystem() 

122 fs.ChangePathTo(item, True) 

123 file_search = None 

124 try: 

125 file_search = fs.FindFirst("*") 

126 except AssertionError: 

127 pass 

128 while file_search: 

129 if self._is_valid_file_format(file_search): 

130 changing_file = fs.URLToFileName(file_search) 

131 try: 

132 self._watched_path.remove(normalize_windows_path(changing_file)) 

133 except KeyError: 

134 pass 

135 try: 

136 self._fs_watcher.Remove(changing_file) 

137 except Exception: 

138 pass 

139 try: 

140 file_search = fs.FindNext() 

141 except AssertionError: 

142 pass 

143 try: 

144 self._watched_path.remove(normalize_windows_path(item)) 

145 except KeyError: 

146 pass 

147 else: 

148 if self._is_valid_file_format(item): 

149 try: 

150 self._watched_path.remove(normalize_windows_path(item)) 

151 except KeyError: 

152 pass 

153 try: 

154 self._fs_watcher.Remove(item) 

155 except Exception: 

156 pass 

157 

158 def exclude_listening(self, path): 1bc

159 self._excluded_path = set() 1a

160 if isinstance(path, list): 160 ↛ 161line 160 didn't jump to line 161 because the condition on line 160 was never true1a

161 for item in path: 

162 if os.path.isdir(item): 

163 item = os.path.join(item, '') 

164 self._excluded_path.add(normalize_windows_path(item)) 

165 # Remove all files to the monitoring list 

166 from wx import FileSystem 

167 fs = FileSystem() 

168 fs.ChangePathTo(item, True) 

169 file_search = None 

170 try: 

171 file_search = fs.FindFirst("*") 

172 except AssertionError: 

173 pass 

174 while file_search: 

175 if self._is_valid_file_format(file_search): 

176 self._excluded_path.add(normalize_windows_path(fs.URLToFileName(file_search))) 

177 try: 

178 file_search = fs.FindNext() 

179 except AssertionError: 

180 pass 

181 else: 

182 if self._is_valid_file_format(item): 

183 self._excluded_path.add(normalize_windows_path(item)) 

184 else: 

185 if self._is_valid_file_format(path): 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true1a

186 self._excluded_path.add(normalize_windows_path(path)) 

187 

188 def is_workspace_dirty(self): 1bc

189 if self._watched_path: 189 ↛ 192line 189 didn't jump to line 192 because the condition on line 189 was always true1a

190 return self._is_workspace_dirty 1a

191 else: 

192 return False 

193 

194 def is_watcher_created(self): 1bc

195 return self._fs_watcher is not None 1a

196 

197 def get_workspace_new_path(self): 1bc

198 return self._initial_watched_path # Returning file or directory name 1a

199 

200 def _on_fs_event(self, event): 1bc

201 if self._is_mark_dirty_needed(event): 

202 self._is_workspace_dirty = True 

203 

204 def _is_mark_dirty_needed(self, event): 1bc

205 new_path = event.GetNewPath() 

206 if os.path.isdir(new_path): 

207 norm_new_path = normalize_windows_path(os.path.join(new_path, '')) 

208 norm_new_dir = norm_new_path 

209 else: 

210 norm_new_path = normalize_windows_path(new_path) 

211 norm_new_dir = normalize_windows_path(os.path.join(os.path.dirname(new_path), '')) 

212 previous_path = event.GetPath() 

213 norm_previous_path = normalize_windows_path(previous_path) 

214 norm_previous_dir = normalize_windows_path(os.path.join(os.path.dirname(previous_path), '')) 

215 change_type = event.GetChangeType() 

216 

217 def is_path_excluded(path): 

218 if path.endswith('/'): # We assume it is normalized 

219 excluded_directories = set() 

220 for x in self._excluded_path: 

221 if x.endswith('/'): 

222 excluded_directories.add(x) 

223 if not excluded_directories: 

224 return False 

225 for excluded in excluded_directories: 

226 if path.startswith(excluded): 

227 return True 

228 else: 

229 return path in self._excluded_path 

230 

231 # DEBUG 

232 def event_name(code): 

233 table = ['FSW_EVENT_MODIFY', 'FSW_EVENT_CREATE', 'FSW_EVENT_DELETE', 'FSW_EVENT_RENAME'] 

234 value = 0 

235 if code == wx.FSW_EVENT_MODIFY: 

236 value = 0 

237 if code == wx.FSW_EVENT_CREATE: 

238 value = 1 

239 if code == wx.FSW_EVENT_DELETE: 

240 value = 2 

241 if code == wx.FSW_EVENT_RENAME: 

242 value = 3 

243 return table[value] 

244 

245 watched = [] 

246 self._fs_watcher.GetWatchedPaths(watched) 

247 # print(f"DEBUG: eventhandler.py _is_mark_dirty_needed event={event.ToString()}\n GetWatchedPaths={watched}") 

248 

249 # print(f"\nDEBUG: eventhandler _is_mark_dirty_needed new_path={new_path} previous_path={previous_path}" 

250 # f" change_type={change_type}=={event_name(change_type)}\n" 

251 # f"norm_previous_path={norm_previous_path} norm_previous_dir={norm_previous_dir}\n" 

252 # f" self._watched_path={self._watched_path} self._excluded_path={self._excluded_path}") 

253 

254 if change_type == wx.FSW_EVENT_MODIFY: 

255 if (not is_path_excluded(norm_new_dir) and not is_path_excluded(norm_previous_path) 

256 and not is_path_excluded(norm_previous_dir)): 

257 if os.path.isfile(previous_path): 

258 return self._is_valid_file_format(previous_path) 

259 return False 

260 

261 if change_type == wx.FSW_EVENT_CREATE: 

262 if (is_path_excluded(norm_new_dir) or is_path_excluded(norm_previous_path) 

263 or is_path_excluded(norm_previous_dir)): 

264 return False 

265 if os.path.isfile(new_path): 

266 return self._is_valid_file_format(new_path) 

267 elif os.path.isdir(new_path): 

268 return True 

269 elif change_type == wx.FSW_EVENT_DELETE: 

270 if is_path_excluded(norm_previous_path) or is_path_excluded(norm_previous_dir): 

271 return False 

272 if norm_previous_path in self._watched_path: 

273 # workspace root folder / suite file is deleted 

274 self._watched_path.remove(norm_previous_path) 

275 return True 

276 if norm_previous_dir in self._watched_path: 

277 # workspace root folder / suite file is deleted 

278 self._watched_path.remove(norm_previous_dir) 

279 return True 

280 # We need to check if it was a directory or a valid file, not possible to detect it was a directory 

281 if norm_previous_path.endswith(os.sep) or norm_previous_path == norm_previous_dir[:-1]: 

282 return True 

283 else: 

284 return self._is_valid_file_format(previous_path) 

285 elif change_type == wx.FSW_EVENT_RENAME: 

286 if is_path_excluded(norm_new_path) or is_path_excluded(norm_new_dir): 

287 return False 

288 if norm_previous_path in self._watched_path: 

289 # workspace root folder / suite file is renamed 

290 self._watched_path.remove(norm_previous_path) 

291 self._watched_path.add(norm_new_path) 

292 return True 

293 if self._is_valid_file_format(previous_path): # Old name was valid file 

294 return True 

295 # We need to check if it is a directory or a valid file 

296 if os.path.isfile(new_path): 

297 return self._is_valid_file_format(new_path) 

298 elif os.path.isdir(new_path): 

299 return True 

300 else: 

301 return False 

302 

303 @staticmethod 1bc

304 def _is_valid_file_format(file_path): 1bc

305 # only watch files with certain extensions 

306 suffixes = ('.robot', '.txt', '.resource', '.tsv') # DEBUG: Make these extensions configurable 1a

307 return os.path.splitext(file_path)[-1].lower() in suffixes 1a

308 

309 

310RideFSWatcherHandler = _RideFSWatcherHandler() 1bc