Robot Framework
OperatingSystem.py
Go to the documentation of this file.
1 # Copyright 2008-2015 Nokia Networks
2 # Copyright 2016- Robot Framework Foundation
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 
16 import fnmatch
17 import glob
18 import os
19 import pathlib
20 import re
21 import shutil
22 import tempfile
23 import time
24 
25 from robot.version import get_version
26 from robot.api import logger
27 from robot.api.deco import keyword
28 from robot.utils import (abspath, ConnectionCache, console_decode, del_env_var,
29  get_env_var, get_env_vars, get_time, is_truthy,
30  is_string, normpath, parse_time, plural_or_not,
31  safe_str, secs_to_timestamp, secs_to_timestr, seq2str,
32  set_env_var, timestr_to_secs, CONSOLE_ENCODING, WINDOWS)
33 
34 __version__ = get_version()
35 PROCESSES = ConnectionCache('No active processes.')
36 
37 
38 
155  ROBOT_LIBRARY_SCOPE = 'GLOBAL'
156  ROBOT_LIBRARY_VERSION = __version__
157 
158 
193  def run(self, command):
194  return self._run_run(command)[1]
195 
196 
220  def run_and_return_rc(self, command):
221  return self._run_run(command)[0]
222 
223 
242  def run_and_return_rc_and_output(self, command):
243  return self._run_run(command)
244 
245  def _run(self, command):
246  process = _Process(command)
247  self._info_info("Running command '%s'." % process)
248  stdout = process.read()
249  rc = process.close()
250  return rc, stdout
251 
252 
277  def get_file(self, path, encoding='UTF-8', encoding_errors='strict'):
278  path = self._absnorm_absnorm(path)
279  self._link_link("Getting file '%s'.", path)
280  encoding = self._map_encoding_map_encoding(encoding)
281  # Using `newline=None` (default) and not converting `\r\n` -> `\n`
282  # ourselves would be better but some of our own acceptance tests
283  # depend on these semantics. Best solution would probably be making
284  # `newline` configurable.
285  # FIXME: Make `newline` configurable or at least submit an issue about that.
286  with open(path, encoding=encoding, errors=encoding_errors, newline='') as f:
287  return f.read().replace('\r\n', '\n')
288 
289  def _map_encoding(self, encoding):
290  return {'SYSTEM': None,
291  'CONSOLE': CONSOLE_ENCODING}.get(encoding.upper(), encoding)
292 
293 
298  def get_binary_file(self, path):
299  path = self._absnorm_absnorm(path)
300  self._link_link("Getting file '%s'.", path)
301  with open(path, 'rb') as f:
302  return f.read()
303 
304 
338  def grep_file(self, path, pattern, encoding='UTF-8', encoding_errors='strict',
339  regexp=False):
340  path = self._absnorm_absnorm(path)
341  if not regexp:
342  pattern = fnmatch.translate(f'{pattern}*')
343  reobj = re.compile(pattern)
344  encoding = self._map_encoding_map_encoding(encoding)
345  lines = []
346  total_lines = 0
347  self._link_link("Reading file '%s'.", path)
348  with open(path, encoding=encoding, errors=encoding_errors) as file:
349  for line in file:
350  total_lines += 1
351  line = line.rstrip('\r\n')
352  if reobj.search(line):
353  lines.append(line)
354  self._info_info('%d out of %d lines matched' % (len(lines), total_lines))
355  return '\n'.join(lines)
356 
357 
366  def log_file(self, path, encoding='UTF-8', encoding_errors='strict'):
367  content = self.get_fileget_file(path, encoding, encoding_errors)
368  self._info_info(content)
369  return content
370 
371  # File and directory existence
372 
373 
380  def should_exist(self, path, msg=None):
381  path = self._absnorm_absnorm(path)
382  if not self._glob_glob(path):
383  self._fail_fail(msg, "Path '%s' does not exist." % path)
384  self._link_link("Path '%s' exists.", path)
385 
386 
393  def should_not_exist(self, path, msg=None):
394  path = self._absnorm_absnorm(path)
395  matches = self._glob_glob(path)
396  if matches:
397  self._fail_fail(msg, self._get_matches_error_get_matches_error('Path', path, matches))
398  self._link_link("Path '%s' does not exist.", path)
399 
400  def _glob(self, path):
401  return glob.glob(path) if not os.path.exists(path) else [path]
402 
403  def _get_matches_error(self, what, path, matches):
404  if not self._is_glob_path_is_glob_path(path):
405  return "%s '%s' exists." % (what, path)
406  return "%s '%s' matches %s." % (what, path, seq2str(sorted(matches)))
407 
408  def _is_glob_path(self, path):
409  return '*' in path or '?' in path or ('[' in path and ']' in path)
410 
411 
418  def file_should_exist(self, path, msg=None):
419  path = self._absnorm_absnorm(path)
420  matches = [p for p in self._glob_glob(path) if os.path.isfile(p)]
421  if not matches:
422  self._fail_fail(msg, "File '%s' does not exist." % path)
423  self._link_link("File '%s' exists.", path)
424 
425 
432  def file_should_not_exist(self, path, msg=None):
433  path = self._absnorm_absnorm(path)
434  matches = [p for p in self._glob_glob(path) if os.path.isfile(p)]
435  if matches:
436  self._fail_fail(msg, self._get_matches_error_get_matches_error('File', path, matches))
437  self._link_link("File '%s' does not exist.", path)
438 
439 
446  def directory_should_exist(self, path, msg=None):
447  path = self._absnorm_absnorm(path)
448  matches = [p for p in self._glob_glob(path) if os.path.isdir(p)]
449  if not matches:
450  self._fail_fail(msg, "Directory '%s' does not exist." % path)
451  self._link_link("Directory '%s' exists.", path)
452 
453 
460  def directory_should_not_exist(self, path, msg=None):
461  path = self._absnorm_absnorm(path)
462  matches = [p for p in self._glob_glob(path) if os.path.isdir(p)]
463  if matches:
464  self._fail_fail(msg, self._get_matches_error_get_matches_error('Directory', path, matches))
465  self._link_link("Directory '%s' does not exist.", path)
466 
467  # Waiting file/dir to appear/disappear
468 
469 
484  def wait_until_removed(self, path, timeout='1 minute'):
485  path = self._absnorm_absnorm(path)
486  timeout = timestr_to_secs(timeout)
487  maxtime = time.time() + timeout
488  while self._glob_glob(path):
489  if timeout >= 0 and time.time() > maxtime:
490  self._fail_fail("'%s' was not removed in %s."
491  % (path, secs_to_timestr(timeout)))
492  time.sleep(0.1)
493  self._link_link("'%s' was removed.", path)
494 
495 
510  def wait_until_created(self, path, timeout='1 minute'):
511  path = self._absnorm_absnorm(path)
512  timeout = timestr_to_secs(timeout)
513  maxtime = time.time() + timeout
514  while not self._glob_glob(path):
515  if timeout >= 0 and time.time() > maxtime:
516  self._fail_fail("'%s' was not created in %s."
517  % (path, secs_to_timestr(timeout)))
518  time.sleep(0.1)
519  self._link_link("'%s' was created.", path)
520 
521  # Dir/file empty
522 
523 
527  def directory_should_be_empty(self, path, msg=None):
528  path = self._absnorm_absnorm(path)
529  items = self._list_dir_list_dir(path)
530  if items:
531  self._fail_fail(msg, "Directory '%s' is not empty. Contents: %s."
532  % (path, seq2str(items, lastsep=', ')))
533  self._link_link("Directory '%s' is empty.", path)
534 
535 
539  def directory_should_not_be_empty(self, path, msg=None):
540  path = self._absnorm_absnorm(path)
541  items = self._list_dir_list_dir(path)
542  if not items:
543  self._fail_fail(msg, "Directory '%s' is empty." % path)
544  self._link_link("Directory '%%s' contains %d item%s."
545  % (len(items), plural_or_not(items)), path)
546 
547 
551  def file_should_be_empty(self, path, msg=None):
552  path = self._absnorm_absnorm(path)
553  if not os.path.isfile(path):
554  self._error_error("File '%s' does not exist." % path)
555  size = os.stat(path).st_size
556  if size > 0:
557  self._fail_fail(msg,
558  "File '%s' is not empty. Size: %d bytes." % (path, size))
559  self._link_link("File '%s' is empty.", path)
560 
561 
565  def file_should_not_be_empty(self, path, msg=None):
566  path = self._absnorm_absnorm(path)
567  if not os.path.isfile(path):
568  self._error_error("File '%s' does not exist." % path)
569  size = os.stat(path).st_size
570  if size == 0:
571  self._fail_fail(msg, "File '%s' is empty." % path)
572  self._link_link("File '%%s' contains %d bytes." % size, path)
573 
574  # Creating and removing files and directory
575 
576 
598  def create_file(self, path, content='', encoding='UTF-8'):
599  path = self._write_to_file_write_to_file(path, content, encoding)
600  self._link_link("Created file '%s'.", path)
601 
602  def _write_to_file(self, path, content, encoding=None, mode='w'):
603  path = self._absnorm_absnorm(path)
604  parent = os.path.dirname(path)
605  if not os.path.exists(parent):
606  os.makedirs(parent)
607  if encoding:
608  encoding = self._map_encoding_map_encoding(encoding)
609  with open(path, mode, encoding=encoding) as f:
610  f.write(content)
611  return path
612 
613 
633  def create_binary_file(self, path, content):
634  if is_string(content):
635  content = bytes(ord(c) for c in content)
636  path = self._write_to_file_write_to_file(path, content, mode='wb')
637  self._link_link("Created binary file '%s'.", path)
638 
639 
648  def append_to_file(self, path, content, encoding='UTF-8'):
649  path = self._write_to_file_write_to_file(path, content, encoding, mode='a')
650  self._link_link("Appended to file '%s'.", path)
651 
652 
661  def remove_file(self, path):
662  path = self._absnorm_absnorm(path)
663  matches = self._glob_glob(path)
664  if not matches:
665  self._link_link("File '%s' does not exist.", path)
666  for match in matches:
667  if not os.path.isfile(match):
668  self._error_error("Path '%s' is not a file." % match)
669  os.remove(match)
670  self._link_link("Removed file '%s'.", match)
671 
672 
677  def remove_files(self, *paths):
678  for path in paths:
679  self.remove_fileremove_file(path)
680 
681 
687  def empty_directory(self, path):
688  path = self._absnorm_absnorm(path)
689  for item in self._list_dir_list_dir(path, absolute=True):
690  if os.path.isdir(item):
691  shutil.rmtree(item)
692  else:
693  os.remove(item)
694  self._link_link("Emptied directory '%s'.", path)
695 
696 
702  def create_directory(self, path):
703  path = self._absnorm_absnorm(path)
704  if os.path.isdir(path):
705  self._link_link("Directory '%s' already exists.", path )
706  elif os.path.exists(path):
707  self._error_error("Path '%s' is not a directory." % path)
708  else:
709  os.makedirs(path)
710  self._link_link("Created directory '%s'.", path)
711 
712 
721  def remove_directory(self, path, recursive=False):
722  path = self._absnorm_absnorm(path)
723  if not os.path.exists(path):
724  self._link_link("Directory '%s' does not exist.", path)
725  elif not os.path.isdir(path):
726  self._error_error("Path '%s' is not a directory." % path)
727  else:
728  if is_truthy(recursive):
729  shutil.rmtree(path)
730  else:
731  self.directory_should_be_emptydirectory_should_be_empty(
732  path, "Directory '%s' is not empty." % path)
733  os.rmdir(path)
734  self._link_link("Removed directory '%s'.", path)
735 
736  # Moving and copying files and directories
737 
738 
764  def copy_file(self, source, destination):
765  source, destination = \
766  self._prepare_copy_and_move_file_prepare_copy_and_move_file(source, destination)
767  if not self._are_source_and_destination_same_file_are_source_and_destination_same_file(source, destination):
768  source, destination = self._atomic_copy_atomic_copy(source, destination)
769  self._link_link("Copied file from '%s' to '%s'.", source, destination)
770  return destination
771 
772  def _prepare_copy_and_move_file(self, source, destination):
773  source = self._normalize_copy_and_move_source_normalize_copy_and_move_source(source)
774  destination = self._normalize_copy_and_move_destination_normalize_copy_and_move_destination(destination)
775  if os.path.isdir(destination):
776  destination = os.path.join(destination, os.path.basename(source))
777  return source, destination
778 
780  source = self._absnorm_absnorm(source)
781  sources = self._glob_glob(source)
782  if len(sources) > 1:
783  self._error_error("Multiple matches with source pattern '%s'." % source)
784  if sources:
785  source = sources[0]
786  if not os.path.exists(source):
787  self._error_error("Source file '%s' does not exist." % source)
788  if not os.path.isfile(source):
789  self._error_error("Source file '%s' is not a regular file." % source)
790  return source
791 
792  def _normalize_copy_and_move_destination(self, destination):
793  if isinstance(destination, pathlib.Path):
794  destination = str(destination)
795  is_dir = os.path.isdir(destination) or destination.endswith(('/', '\\'))
796  destination = self._absnorm_absnorm(destination)
797  directory = destination if is_dir else os.path.dirname(destination)
798  self._ensure_destination_directory_exists_ensure_destination_directory_exists(directory)
799  return destination
800 
802  if not os.path.exists(path):
803  os.makedirs(path)
804  elif not os.path.isdir(path):
805  self._error_error("Destination '%s' exists and is not a directory." % path)
806 
807  def _are_source_and_destination_same_file(self, source, destination):
808  if self._force_normalize_force_normalize(source) == self._force_normalize_force_normalize(destination):
809  self._link_link("Source '%s' and destination '%s' point to the same "
810  "file.", source, destination)
811  return True
812  return False
813 
814  def _force_normalize(self, path):
815  # TODO: Should normalize_path also support link normalization?
816  # TODO: Should we handle dos paths like 'exampl~1.txt'?
817  return os.path.realpath(normpath(path, case_normalize=True))
818 
819 
835  def _atomic_copy(self, source, destination):
836  temp_directory = tempfile.mkdtemp(dir=os.path.dirname(destination))
837  temp_file = os.path.join(temp_directory, os.path.basename(source))
838  try:
839  shutil.copy(source, temp_file)
840  if os.path.exists(destination):
841  os.remove(destination)
842  shutil.move(temp_file, destination)
843  finally:
844  shutil.rmtree(temp_directory)
845  return source, destination
846 
847 
858  def move_file(self, source, destination):
859  source, destination = \
860  self._prepare_copy_and_move_file_prepare_copy_and_move_file(source, destination)
861  if not self._are_source_and_destination_same_file_are_source_and_destination_same_file(destination, source):
862  shutil.move(source, destination)
863  self._link_link("Moved file from '%s' to '%s'.", source, destination)
864  return destination
865 
866 
881  def copy_files(self, *sources_and_destination):
882  sources, destination \
883  = self._prepare_copy_and_move_files_prepare_copy_and_move_files(sources_and_destination)
884  for source in sources:
885  self.copy_filecopy_file(source, destination)
886 
888  if len(items) < 2:
889  self._error_error('Must contain destination and at least one source.')
890  sources = self._glob_files_glob_files(items[:-1])
891  destination = self._absnorm_absnorm(items[-1])
892  self._ensure_destination_directory_exists_ensure_destination_directory_exists(destination)
893  return sources, destination
894 
895  def _glob_files(self, patterns):
896  files = []
897  for pattern in patterns:
898  files.extend(self._glob_glob(self._absnorm_absnorm(pattern)))
899  return files
900 
901 
907  def move_files(self, *sources_and_destination):
908  sources, destination \
909  = self._prepare_copy_and_move_files_prepare_copy_and_move_files(sources_and_destination)
910  for source in sources:
911  self.move_filemove_file(source, destination)
912 
913 
919  def copy_directory(self, source, destination):
920  source, destination = self._prepare_copy_and_move_directory_prepare_copy_and_move_directory(source, destination)
921  shutil.copytree(source, destination)
922  self._link_link("Copied directory from '%s' to '%s'.", source, destination)
923 
924  def _prepare_copy_and_move_directory(self, source, destination):
925  source = self._absnorm_absnorm(source)
926  destination = self._absnorm_absnorm(destination)
927  if not os.path.exists(source):
928  self._error_error("Source '%s' does not exist." % source)
929  if not os.path.isdir(source):
930  self._error_error("Source '%s' is not a directory." % source)
931  if os.path.exists(destination) and not os.path.isdir(destination):
932  self._error_error("Destination '%s' is not a directory." % destination)
933  if os.path.exists(destination):
934  base = os.path.basename(source)
935  destination = os.path.join(destination, base)
936  else:
937  parent = os.path.dirname(destination)
938  if not os.path.exists(parent):
939  os.makedirs(parent)
940  return source, destination
941 
942 
948  def move_directory(self, source, destination):
949  source, destination \
950  = self._prepare_copy_and_move_directory_prepare_copy_and_move_directory(source, destination)
951  shutil.move(source, destination)
952  self._link_link("Moved directory from '%s' to '%s'.", source, destination)
953 
954  # Environment Variables
955 
956  @keyword(types=None)
957 
968  def get_environment_variable(self, name, default=None):
969  value = get_env_var(name, default)
970  if value is None:
971  self._error_error("Environment variable '%s' does not exist." % name)
972  return value
973 
974 
979  def set_environment_variable(self, name, value):
980  set_env_var(name, value)
981  self._info_info("Environment variable '%s' set to value '%s'."
982  % (name, value))
983 
984 
1004  def append_to_environment_variable(self, name, *values, **config):
1005  sentinel = object()
1006  initial = self.get_environment_variableget_environment_variable(name, sentinel)
1007  if initial is not sentinel:
1008  values = (initial,) + values
1009  separator = config.pop('separator', os.pathsep)
1010  if config:
1011  config = ['='.join(i) for i in sorted(config.items())]
1012  self._error_error('Configuration %s not accepted.'
1013  % seq2str(config, lastsep=' or '))
1014  self.set_environment_variableset_environment_variable(name, separator.join(values))
1015 
1016 
1023  def remove_environment_variable(self, *names):
1024  for name in names:
1025  value = del_env_var(name)
1026  if value:
1027  self._info_info("Environment variable '%s' deleted." % name)
1028  else:
1029  self._info_info("Environment variable '%s' does not exist." % name)
1030 
1031 
1035  def environment_variable_should_be_set(self, name, msg=None):
1036  value = get_env_var(name)
1037  if not value:
1038  self._fail_fail(msg, "Environment variable '%s' is not set." % name)
1039  self._info_info("Environment variable '%s' is set to '%s'." % (name, value))
1040 
1041 
1045  def environment_variable_should_not_be_set(self, name, msg=None):
1046  value = get_env_var(name)
1047  if value:
1048  self._fail_fail(msg, "Environment variable '%s' is set to '%s'."
1049  % (name, value))
1050  self._info_info("Environment variable '%s' is not set." % name)
1051 
1052 
1059  return get_env_vars()
1060 
1061 
1066  def log_environment_variables(self, level='INFO'):
1067  variables = get_env_vars()
1068  for name in sorted(variables, key=lambda item: item.lower()):
1069  self._log_log('%s = %s' % (name, variables[name]), level)
1070  return variables
1071 
1072  # Path
1073 
1074 
1093  def join_path(self, base, *parts):
1094  parts = [str(p) if isinstance(p, pathlib.Path) else p.replace('/', os.sep)
1095  for p in (base,) + parts]
1096  return self.normalize_pathnormalize_path(os.path.join(*parts))
1097 
1098 
1111  def join_paths(self, base, *paths):
1112  return [self.join_pathjoin_path(base, path) for path in paths]
1113 
1114 
1137  def normalize_path(self, path, case_normalize=False):
1138  if isinstance(path, pathlib.Path):
1139  path = str(path)
1140  else:
1141  path = path.replace('/', os.sep)
1142  path = os.path.normpath(os.path.expanduser(path))
1143  # os.path.normcase doesn't normalize on OSX which also, by default,
1144  # has case-insensitive file system. Our robot.utils.normpath would
1145  # do that, but it's not certain would that, or other things that the
1146  # utility do, desirable.
1147  if case_normalize:
1148  path = os.path.normcase(path)
1149  return path or '.'
1150 
1151 
1167  def split_path(self, path):
1168  return os.path.split(self.normalize_pathnormalize_path(path))
1169 
1170 
1195  def split_extension(self, path):
1196  path = self.normalize_pathnormalize_path(path)
1197  basename = os.path.basename(path)
1198  if basename.startswith('.' * basename.count('.')):
1199  return path, ''
1200  if path.endswith('.'):
1201  path2 = path.rstrip('.')
1202  trailing_dots = '.' * (len(path) - len(path2))
1203  path = path2
1204  else:
1205  trailing_dots = ''
1206  basepath, extension = os.path.splitext(path)
1207  if extension.startswith('.'):
1208  extension = extension[1:]
1209  if extension:
1210  extension += trailing_dots
1211  else:
1212  basepath += trailing_dots
1213  return basepath, extension
1214 
1215  # Misc
1216 
1217 
1251  def get_modified_time(self, path, format='timestamp'):
1252  path = self._absnorm_absnorm(path)
1253  if not os.path.exists(path):
1254  self._error_error("Path '%s' does not exist." % path)
1255  mtime = get_time(format, os.stat(path).st_mtime)
1256  self._link_link("Last modified time of '%%s' is %s." % mtime, path)
1257  return mtime
1258 
1259 
1294  def set_modified_time(self, path, mtime):
1295  mtime = parse_time(mtime)
1296  path = self._absnorm_absnorm(path)
1297  if not os.path.exists(path):
1298  self._error_error("File '%s' does not exist." % path)
1299  if not os.path.isfile(path):
1300  self._error_error("Path '%s' is not a regular file." % path)
1301  os.utime(path, (mtime, mtime))
1302  time.sleep(0.1) # Give os some time to really set these times
1303  tstamp = secs_to_timestamp(mtime, seps=('-', ' ', ':'))
1304  self._link_link("Set modified time of '%%s' to %s." % tstamp, path)
1305 
1306 
1307  def get_file_size(self, path):
1308  path = self._absnorm_absnorm(path)
1309  if not os.path.isfile(path):
1310  self._error_error("File '%s' does not exist." % path)
1311  size = os.stat(path).st_size
1312  plural = plural_or_not(size)
1313  self._link_link("Size of file '%%s' is %d byte%s." % (size, plural), path)
1314  return size
1315 
1316 
1338  def list_directory(self, path, pattern=None, absolute=False):
1339  items = self._list_dir_list_dir(path, pattern, absolute)
1340  self._info_info('%d item%s:\n%s' % (len(items), plural_or_not(items),
1341  '\n'.join(items)))
1342  return items
1343 
1344 
1345  def list_files_in_directory(self, path, pattern=None, absolute=False):
1346  files = self._list_files_in_dir_list_files_in_dir(path, pattern, absolute)
1347  self._info_info('%d file%s:\n%s' % (len(files), plural_or_not(files),
1348  '\n'.join(files)))
1349  return files
1350 
1351 
1352  def list_directories_in_directory(self, path, pattern=None, absolute=False):
1353  dirs = self._list_dirs_in_dir_list_dirs_in_dir(path, pattern, absolute)
1354  self._info_info('%d director%s:\n%s' % (len(dirs),
1355  'y' if len(dirs) == 1 else 'ies',
1356  '\n'.join(dirs)))
1357  return dirs
1358 
1359 
1365  def count_items_in_directory(self, path, pattern=None):
1366  count = len(self._list_dir_list_dir(path, pattern))
1367  self._info_info("%s item%s." % (count, plural_or_not(count)))
1368  return count
1369 
1370 
1371  def count_files_in_directory(self, path, pattern=None):
1372  count = len(self._list_files_in_dir_list_files_in_dir(path, pattern))
1373  self._info_info("%s file%s." % (count, plural_or_not(count)))
1374  return count
1375 
1376 
1377  def count_directories_in_directory(self, path, pattern=None):
1378  count = len(self._list_dirs_in_dir_list_dirs_in_dir(path, pattern))
1379  self._info_info("%s director%s." % (count, 'y' if count == 1 else 'ies'))
1380  return count
1381 
1382  def _list_dir(self, path, pattern=None, absolute=False):
1383  path = self._absnorm_absnorm(path)
1384  self._link_link("Listing contents of directory '%s'.", path)
1385  if not os.path.isdir(path):
1386  self._error_error("Directory '%s' does not exist." % path)
1387  # result is already unicode but safe_str also handles NFC normalization
1388  items = sorted(safe_str(item) for item in os.listdir(path))
1389  if pattern:
1390  items = [i for i in items if fnmatch.fnmatchcase(i, pattern)]
1391  if is_truthy(absolute):
1392  path = os.path.normpath(path)
1393  items = [os.path.join(path, item) for item in items]
1394  return items
1395 
1396  def _list_files_in_dir(self, path, pattern=None, absolute=False):
1397  return [item for item in self._list_dir_list_dir(path, pattern, absolute)
1398  if os.path.isfile(os.path.join(path, item))]
1399 
1400  def _list_dirs_in_dir(self, path, pattern=None, absolute=False):
1401  return [item for item in self._list_dir_list_dir(path, pattern, absolute)
1402  if os.path.isdir(os.path.join(path, item))]
1403 
1404 
1412  def touch(self, path):
1413  path = self._absnorm_absnorm(path)
1414  if os.path.isdir(path):
1415  self._error_error("Cannot touch '%s' because it is a directory." % path)
1416  if not os.path.exists(os.path.dirname(path)):
1417  self._error_error("Cannot touch '%s' because its parent directory does "
1418  "not exist." % path)
1419  if os.path.exists(path):
1420  mtime = round(time.time())
1421  os.utime(path, (mtime, mtime))
1422  self._link_link("Touched existing file '%s'.", path)
1423  else:
1424  open(path, 'w').close()
1425  self._link_link("Touched new file '%s'.", path)
1426 
1427  def _absnorm(self, path):
1428  return abspath(self.normalize_pathnormalize_path(path))
1429 
1430  def _fail(self, *messages):
1431  raise AssertionError(next(msg for msg in messages if msg))
1432 
1433  def _error(self, msg):
1434  raise RuntimeError(msg)
1435 
1436  def _info(self, msg):
1437  self._log_log(msg, 'INFO')
1438 
1439  def _link(self, msg, *paths):
1440  paths = tuple('<a href="file://%s">%s</a>' % (p, p) for p in paths)
1441  self._log_log(msg % paths, 'HTML')
1442 
1443  def _warn(self, msg):
1444  self._log_log(msg, 'WARN')
1445 
1446  def _log(self, msg, level):
1447  logger.write(msg, level)
1448 
1449 
1450 class _Process:
1451 
1452  def __init__(self, command):
1453  self._command_command = self._process_command_process_command(command)
1454  self._process_process = os.popen(self._command_command)
1455 
1456  def __str__(self):
1457  return self._command_command
1458 
1459  def read(self):
1460  return self._process_output_process_output(self._process_process.read())
1461 
1462  def close(self):
1463  try:
1464  rc = self._process_process.close()
1465  except IOError: # Has occurred sometimes in Windows
1466  return 255
1467  if rc is None:
1468  return 0
1469  # In Windows return code is value returned by
1470  # command (can be almost anything)
1471  # In other OS:
1472  # Return code must be converted with 'rc >> 8' and it is
1473  # between 0-255 after conversion
1474  if WINDOWS:
1475  return rc % 256
1476  return rc >> 8
1477 
1478  def _process_command(self, command):
1479  if '>' not in command:
1480  if command.endswith('&'):
1481  command = command[:-1] + ' 2>&1 &'
1482  else:
1483  command += ' 2>&1'
1484  return command
1485 
1486  def _process_output(self, output):
1487  if '\r\n' in output:
1488  output = output.replace('\r\n', '\n')
1489  if output.endswith('\n'):
1490  output = output[:-1]
1491  return console_decode(output)
A library providing keywords for operating system related tasks.
def _are_source_and_destination_same_file(self, source, destination)
def should_exist(self, path, msg=None)
Fails unless the given path (file or directory) exists.
def get_environment_variable(self, name, default=None)
Returns the value of an environment variable with the given name.
def count_directories_in_directory(self, path, pattern=None)
Wrapper for Count Items In Directory returning only directory count.
def get_file(self, path, encoding='UTF-8', encoding_errors='strict')
Returns the contents of a specified file.
def _list_dirs_in_dir(self, path, pattern=None, absolute=False)
def list_files_in_directory(self, path, pattern=None, absolute=False)
Wrapper for List Directory that returns only files.
def copy_file(self, source, destination)
Copies the source file into the destination.
def grep_file(self, path, pattern, encoding='UTF-8', encoding_errors='strict', regexp=False)
Returns the lines of the specified file that match the pattern.
def directory_should_not_exist(self, path, msg=None)
Fails if the given path points to an existing file.
def set_modified_time(self, path, mtime)
Sets the file modification and access times.
def wait_until_created(self, path, timeout='1 minute')
Waits until the given file or directory is created.
def file_should_not_exist(self, path, msg=None)
Fails if the given path points to an existing file.
def log_environment_variables(self, level='INFO')
Logs all environment variables using the given log level.
def get_environment_variables(self)
Returns currently available environment variables as a dictionary.
def copy_directory(self, source, destination)
Copies the source directory into the destination.
def join_path(self, base, *parts)
Joins the given path part(s) to the given base path.
def list_directories_in_directory(self, path, pattern=None, absolute=False)
Wrapper for List Directory that returns only directories.
def join_paths(self, base, *paths)
Joins given paths with base and returns resulted paths.
def directory_should_not_be_empty(self, path, msg=None)
Fails if the specified directory is empty.
def create_file(self, path, content='', encoding='UTF-8')
Creates a file with the given content and encoding.
def _atomic_copy(self, source, destination)
Copy file atomically (or at least try to).
def _list_files_in_dir(self, path, pattern=None, absolute=False)
def log_file(self, path, encoding='UTF-8', encoding_errors='strict')
Wrapper for Get File that also logs the returned file.
def append_to_file(self, path, content, encoding='UTF-8')
Appends the given content to the specified file.
def touch(self, path)
Emulates the UNIX touch command.
def remove_file(self, path)
Removes a file with the given path.
def create_binary_file(self, path, content)
Creates a binary file with the given content.
def split_path(self, path)
Splits the given path from the last path separator (/ or \\‍).
def copy_files(self, *sources_and_destination)
Copies specified files to the target directory.
def _write_to_file(self, path, content, encoding=None, mode='w')
def _get_matches_error(self, what, path, matches)
def remove_directory(self, path, recursive=False)
Removes the directory pointed to by the given path.
def move_files(self, *sources_and_destination)
Moves specified files to the target directory.
def _list_dir(self, path, pattern=None, absolute=False)
def split_extension(self, path)
Splits the extension from the given path.
def should_not_exist(self, path, msg=None)
Fails if the given path (file or directory) exists.
def _normalize_copy_and_move_destination(self, destination)
def get_binary_file(self, path)
Returns the contents of a specified file.
def list_directory(self, path, pattern=None, absolute=False)
Returns and logs items in a directory, optionally filtered with pattern.
def directory_should_be_empty(self, path, msg=None)
Fails unless the specified directory is empty.
def move_file(self, source, destination)
Moves the source file into the destination.
def get_modified_time(self, path, format='timestamp')
Returns the last modification time of a file or directory.
def get_file_size(self, path)
Returns and logs file size as an integer in bytes.
def empty_directory(self, path)
Deletes all the content from the given directory.
def environment_variable_should_not_be_set(self, name, msg=None)
Fails if the specified environment variable is set.
def environment_variable_should_be_set(self, name, msg=None)
Fails if the specified environment variable is not set.
def file_should_be_empty(self, path, msg=None)
Fails unless the specified file is empty.
def _prepare_copy_and_move_file(self, source, destination)
def run_and_return_rc_and_output(self, command)
Runs the given command in the system and returns the RC and output.
def count_items_in_directory(self, path, pattern=None)
Returns and logs the number of all items in the given directory.
def count_files_in_directory(self, path, pattern=None)
Wrapper for Count Items In Directory returning only file count.
def set_environment_variable(self, name, value)
Sets an environment variable to a specified value.
def wait_until_removed(self, path, timeout='1 minute')
Waits until the given file or directory is removed.
def append_to_environment_variable(self, name, *values, **config)
Appends given values to environment variable name.
def _prepare_copy_and_move_directory(self, source, destination)
def create_directory(self, path)
Creates the specified directory.
def run(self, command)
Runs the given command in the system and returns the output.
def normalize_path(self, path, case_normalize=False)
Normalizes the given path.
def remove_environment_variable(self, *names)
Deletes the specified environment variable.
def remove_files(self, *paths)
Uses Remove File to remove multiple files one-by-one.
def file_should_not_be_empty(self, path, msg=None)
Fails if the specified file is empty.
def directory_should_exist(self, path, msg=None)
Fails unless the given path points to an existing directory.
def move_directory(self, source, destination)
Moves the source directory into a destination.
def run_and_return_rc(self, command)
Runs the given command in the system and returns the return code.
def file_should_exist(self, path, msg=None)
Fails unless the given path points to an existing file.
def console_decode(string, encoding=CONSOLE_ENCODING)
Decodes bytes from console encoding to Unicode.
Definition: encoding.py:39
def seq2str(sequence, quote="'", sep=', ', lastsep=' and ')
Returns sequence in format ‘'item 1’, 'item 2' and 'item 3'`.
Definition: misc.py:79
def plural_or_not(item)
Definition: misc.py:73
def get_env_var(name, default=None)
Definition: robotenv.py:21
def set_env_var(name, value)
Definition: robotenv.py:30
def get_env_vars(upper=os.sep !='/')
Definition: robotenv.py:41
def del_env_var(name)
Definition: robotenv.py:34
def abspath(path, case_normalize=False)
Replacement for os.path.abspath with some enhancements and bug fixes.
Definition: robotpath.py:65
def normpath(path, case_normalize=False)
Replacement for os.path.normpath with some enhancements.
Definition: robotpath.py:46
def get_time(format='timestamp', time_=None)
Return the given or current time in requested format.
Definition: robottime.py:236
def secs_to_timestamp(secs, seps=None, millis=False)
Definition: robottime.py:348
def secs_to_timestr(secs, compact=False)
Converts time in seconds to a string representation.
Definition: robottime.py:151
def parse_time(timestr)
Parses the time string and returns its value as seconds since epoch.
Definition: robottime.py:271
def timestr_to_secs(timestr, round_to=3, accept_plain_values=True)
Parses time strings like '1h 10s', '01:00:10' and '42' and returns seconds.
Definition: robottime.py:55
def is_truthy(item)
Returns True or False depending on is the item considered true or not.
Definition: robottypes.py:162
def safe_str(item)
Definition: unic.py:21
def get_version(naked=False)
Definition: version.py:24