Robot Framework Integrated Development Environment (RIDE)
eventhandler.py
Go to the documentation of this file.
1 # Copyright 2008-2015 Nokia Networks
2 # Copyright 2016- Robot Framework Foundation
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 
16 import os
17 
18 import wx
19 
20 
22 
23  def __init__(self):
24  self._fs_watcher_fs_watcher = None
25  self._is_workspace_dirty_is_workspace_dirty = False
26  self._initial_watched_path_initial_watched_path = None
27  self._watched_path_watched_path = set()
28 
29  def create_fs_watcher(self, path):
30  if self._fs_watcher_fs_watcher:
31  return
32  self._initial_watched_path_initial_watched_path = path
33  self._fs_watcher_fs_watcher = wx.FileSystemWatcher()
34  self._fs_watcher_fs_watcher.Bind(wx.EVT_FSWATCHER, self._on_fs_event_on_fs_event)
35  # print(f"DEBUG: FileSystemWatcher create_fs_watcher CREATED")
36 
37  def start_listening(self, path):
38  self.stop_listeningstop_listening()
39  if os.path.isdir(path):
40  # only watch folders
41  # MSW do not support watch single file
42  path = os.path.join(path, '')
43  result = self._fs_watcher_fs_watcher.AddTree(path) #, filter="*.r*o*")
44  # print(f"DEBUG: FileSystemWatcher start_listening DIR result is ={result}")
45  # Add all files to the monitoring list
46  from wx import FileSystem
47  fs = FileSystem()
48  fs.ChangePathTo(path, True)
49  # An assertion error happens when chinese chars named directories, so we try to ignore it
50  # wx._core.wxAssertionError: C++ assertion "Assert failure" failed at ../src/common/unichar.cpp(65) in
51  # ToHi8bit(): character cannot be converted to single byte
52  file_search = None
53  try:
54  file_search = fs.FindFirst("*")
55  except AssertionError:
56  pass
57  while file_search:
58  if self._is_valid_file_format_is_valid_file_format(file_search):
59  # print(f"DEBUG: FileSystemWatcher start_listening file_search={file_search}")
60  self._watched_path_watched_path.add(fs.URLToFileName(file_search))
61  try:
62  file_search = fs.FindNext()
63  except AssertionError:
64  pass
65  self._watched_path_watched_path.add(path)
66  else:
67  self._watched_path_watched_path.add(path) # Here we add the file path
68  path = os.path.join(os.path.dirname(path), '')
69  result = self._fs_watcher_fs_watcher.Add(path) # Here we only add the file parent directory
70  # print(f"DEBUG: FileSystemWatcher start_listening FILE result is ={result}")
71  # print(f"DEBUG: FileSystemWatcher start_listening self._watched_path={self._watched_path}")
72 
73  def stop_listening(self):
74  # print(f"DEBUG: FileSystemWatcher stop_listening")
75  self._is_workspace_dirty_is_workspace_dirty = False
76  self._fs_watcher_fs_watcher.RemoveAll()
77  self._watched_path_watched_path = set()
78 
79  def is_workspace_dirty(self):
80  # print(f"DEBUG: is_workspace_dirty self._watched_path = {self._watched_path}")
81  if self._watched_path_watched_path:
82  return self._is_workspace_dirty_is_workspace_dirty
83  else:
84  return False
85 
86  def is_watcher_created(self):
87  return self._fs_watcher_fs_watcher is not None
88 
90  return self._initial_watched_path_initial_watched_path # self._watched_path.pop() # Returning file or directory name
91 
92  def _on_fs_event(self, event):
93  if self._is_mark_dirty_needed_is_mark_dirty_needed(event):
94  self._is_workspace_dirty_is_workspace_dirty = True
95 
96  def _is_mark_dirty_needed(self, event):
97  new_path = event.GetNewPath()
98  previous_path = event.GetPath()
99  change_type = event.GetChangeType()
100 
101  if change_type == wx.FSW_EVENT_MODIFY: # DEBUG or change_type == wx.FSW_EVENT_ACCESS
102  """
103  paths = list()
104  count = self._fs_watcher.GetWatchedPaths(paths) # DEBUG This is always empty
105  print(f"DEBUG: FSW_EVENT_MODIFY count={count} paths={paths}")
106  for file in paths:
107  # print(f"DEBUG: FileSystemWatcher count files {count} event wx.FSW_EVENT_MODIFY file = {file}")
108  if file == previous_path:
109  return True
110  """
111  if previous_path in self._watched_path_watched_path:
112  return True
113  return False
114 
115  if change_type == wx.FSW_EVENT_CREATE:
116  if os.path.isdir(previous_path):
117  return True
118  elif os.path.isfile(previous_path):
119  return self._is_valid_file_format_is_valid_file_format(previous_path)
120  elif change_type == wx.FSW_EVENT_DELETE:
121  if previous_path in self._watched_path_watched_path:
122  # workspace root folder / suite file is deleted
123  self._watched_path_watched_path.remove(previous_path)
124  return True
125  if previous_path.endswith(os.sep):
126  return True
127  else:
128  return self._is_valid_file_format_is_valid_file_format(previous_path)
129  elif change_type == wx.FSW_EVENT_RENAME:
130  if previous_path in self._watched_path_watched_path:
131  # workspace root folder / suite file is renamed
132  self._watched_path_watched_path.remove(previous_path)
133  self._watched_path_watched_path.add(new_path)
134  return True
135  if os.path.isdir(new_path):
136  return True
137  elif os.path.isfile(new_path):
138  return self._is_valid_file_format_is_valid_file_format(new_path)
139  else:
140  return False
141 
142  @staticmethod
143  def _is_valid_file_format(file_path):
144  # only watch files with certain extensions
145  suffixes = ('.robot', '.txt', '.resource', '.tsv')
146  return os.path.splitext(file_path)[-1].lower() in suffixes
147 
148 
149 RideFSWatcherHandler = _RideFSWatcherHandler()