Robot Framework
Telnet.py
Go to the documentation of this file.
1 # Copyright 2008-2015 Nokia Networks
2 # Copyright 2016- Robot Framework Foundation
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 
16 from contextlib import contextmanager
17 import inspect
18 import re
19 import socket
20 import struct
21 import telnetlib
22 import time
23 
24 try:
25  import pyte
26 except ImportError:
27  pyte = None
28 
29 from robot.api import logger
30 from robot.api.deco import keyword
31 from robot.utils import (ConnectionCache, is_bytes, is_string, is_truthy,
32  secs_to_timestr, seq2str, timestr_to_secs)
33 from robot.version import get_version
34 
35 
36 
277 class Telnet:
278  ROBOT_LIBRARY_SCOPE = 'SUITE'
279  ROBOT_LIBRARY_VERSION = get_version()
280 
281 
304  def __init__(self, timeout='3 seconds', newline='CRLF',
305  prompt=None, prompt_is_regexp=False,
306  encoding='UTF-8', encoding_errors='ignore',
307  default_log_level='INFO', window_size=None,
308  environ_user=None, terminal_emulation=False,
309  terminal_type=None, telnetlib_log_level='TRACE',
310  connection_timeout=None):
311  self._timeout_timeout = timeout or 3.0
312  self._set_connection_timeout_set_connection_timeout(connection_timeout)
313  self._newline_newline = newline or 'CRLF'
314  self._prompt_prompt = (prompt, prompt_is_regexp)
315  self._encoding_encoding = encoding
316  self._encoding_errors_encoding_errors = encoding_errors
317  self._default_log_level_default_log_level = default_log_level
318  self._window_size_window_size = window_size
319  self._environ_user_environ_user = environ_user
320  self._terminal_emulation_terminal_emulation = terminal_emulation
321  self._terminal_type_terminal_type = terminal_type
322  self._telnetlib_log_level_telnetlib_log_level = telnetlib_log_level
323  self._cache_cache = ConnectionCache()
324  self._conn_conn = None
325  self._conn_kws_conn_kws = self._lib_kws_lib_kws = None
326 
327  def get_keyword_names(self):
328  return self._get_library_keywords_get_library_keywords() + self._get_connection_keywords_get_connection_keywords()
329 
331  if self._lib_kws_lib_kws is None:
332  self._lib_kws_lib_kws = self._get_keywords_get_keywords(self, ['get_keyword_names'])
333  return self._lib_kws_lib_kws
334 
335  def _get_keywords(self, source, excluded):
336  return [name for name in dir(source)
337  if self._is_keyword_is_keyword(name, source, excluded)]
338 
339  def _is_keyword(self, name, source, excluded):
340  return (name not in excluded and
341  not name.startswith('_') and
342  name != 'get_keyword_names' and
343  inspect.ismethod(getattr(source, name)))
344 
346  if self._conn_kws_conn_kws is None:
347  conn = self._get_connection_get_connection()
348  excluded = [name for name in dir(telnetlib.Telnet())
349  if name not in ['write', 'read', 'read_until']]
350  self._conn_kws_conn_kws = self._get_keywords_get_keywords(conn, excluded)
351  return self._conn_kws_conn_kws
352 
353  def __getattr__(self, name):
354  if name not in self._get_connection_keywords_get_connection_keywords():
355  raise AttributeError(name)
356  # If no connection is initialized, get attributes from a non-active
357  # connection. This makes it possible for Robot to create keyword
358  # handlers when it imports the library.
359  return getattr(self._conn_conn or self._get_connection_get_connection(), name)
360 
361  @keyword(types=None)
362 
378  def open_connection(self, host, alias=None, port=23, timeout=None,
379  newline=None, prompt=None, prompt_is_regexp=False,
380  encoding=None, encoding_errors=None,
381  default_log_level=None, window_size=None,
382  environ_user=None, terminal_emulation=None,
383  terminal_type=None, telnetlib_log_level=None,
384  connection_timeout=None):
385  timeout = timeout or self._timeout_timeout
386  connection_timeout = (timestr_to_secs(connection_timeout)
387  if connection_timeout
388  else self._connection_timeout_connection_timeout)
389  newline = newline or self._newline_newline
390  encoding = encoding or self._encoding_encoding
391  encoding_errors = encoding_errors or self._encoding_errors_encoding_errors
392  default_log_level = default_log_level or self._default_log_level_default_log_level
393  window_size = self._parse_window_size_parse_window_size(window_size or self._window_size_window_size)
394  environ_user = environ_user or self._environ_user_environ_user
395  if terminal_emulation is None:
396  terminal_emulation = self._terminal_emulation_terminal_emulation
397  terminal_type = terminal_type or self._terminal_type_terminal_type
398  telnetlib_log_level = telnetlib_log_level or self._telnetlib_log_level_telnetlib_log_level
399  if not prompt:
400  prompt, prompt_is_regexp = self._prompt_prompt
401  logger.info('Opening connection to %s:%s with prompt: %s%s'
402  % (host, port, prompt, ' (regexp)' if prompt_is_regexp else ''))
403  self._conn_conn = self._get_connection_get_connection(host, port, timeout, newline,
404  prompt, is_truthy(prompt_is_regexp),
405  encoding, encoding_errors,
406  default_log_level,
407  window_size,
408  environ_user,
409  is_truthy(terminal_emulation),
410  terminal_type,
411  telnetlib_log_level,
412  connection_timeout)
413  return self._cache_cache.register(self._conn_conn, alias)
414 
415  def _parse_window_size(self, window_size):
416  if not window_size:
417  return None
418  try:
419  cols, rows = window_size.split('x', 1)
420  return int(cols), int(rows)
421  except ValueError:
422  raise ValueError("Invalid window size '%s'. Should be "
423  "<rows>x<columns>." % window_size)
424 
425 
428  def _get_connection(self, *args):
429  return TelnetConnection(*args)
430 
431  def _set_connection_timeout(self, connection_timeout):
432  self._connection_timeout_connection_timeout = connection_timeout
433  if self._connection_timeout_connection_timeout:
434  self._connection_timeout_connection_timeout = timestr_to_secs(connection_timeout)
435 
436 
467  def switch_connection(self, index_or_alias):
468  old_index = self._cache_cache.current_index
469  self._conn_conn = self._cache_cache.switch(index_or_alias)
470  return old_index
471 
472 
483  self._conn_conn = self._cache_cache.close_all()
484 
485 
486 class TelnetConnection(telnetlib.Telnet):
487  NEW_ENVIRON_IS = b'\x00'
488  NEW_ENVIRON_VAR = b'\x00'
489  NEW_ENVIRON_VALUE = b'\x01'
490  INTERNAL_UPDATE_FREQUENCY = 0.03
491 
492  def __init__(self, host=None, port=23, timeout=3.0, newline='CRLF',
493  prompt=None, prompt_is_regexp=False,
494  encoding='UTF-8', encoding_errors='ignore',
495  default_log_level='INFO', window_size=None, environ_user=None,
496  terminal_emulation=False, terminal_type=None,
497  telnetlib_log_level='TRACE', connection_timeout=None):
498  if connection_timeout is None:
499  telnetlib.Telnet.__init__(self, host, int(port) if port else 23)
500  else:
501  telnetlib.Telnet.__init__(self, host, int(port) if port else 23,
502  connection_timeout)
503  self._set_timeout_set_timeout(timeout)
504  self._set_newline_set_newline(newline)
505  self._set_prompt_set_prompt(prompt, prompt_is_regexp)
506  self._set_encoding_set_encoding(encoding, encoding_errors)
507  self._set_default_log_level_set_default_log_level(default_log_level)
508  self._window_size_window_size = window_size
509  self._environ_user_environ_user = self._encode_encode(environ_user) if environ_user else None
510  self._terminal_emulator_terminal_emulator = self._check_terminal_emulation_check_terminal_emulation(terminal_emulation)
511  self._terminal_type_terminal_type = self._encode_encode(terminal_type) if terminal_type else None
512  self.set_option_negotiation_callback(self._negotiate_options_negotiate_options)
513  self._set_telnetlib_log_level_set_telnetlib_log_level(telnetlib_log_level)
514  self._opt_responses_opt_responses = list()
515 
516 
533  def set_timeout(self, timeout):
534  self._verify_connection_verify_connection()
535  old = self._timeout_timeout
536  self._set_timeout_set_timeout(timeout)
537  return secs_to_timestr(old)
538 
539  def _set_timeout(self, timeout):
540  self._timeout_timeout = timestr_to_secs(timeout)
541 
542 
553  def set_newline(self, newline):
554  self._verify_connection_verify_connection()
555  if self._terminal_emulator_terminal_emulator:
556  raise AssertionError("Newline can not be changed when terminal emulation is used.")
557  old = self._newline_newline
558  self._set_newline_set_newline(newline)
559  return old
560 
561  def _set_newline(self, newline):
562  newline = str(newline).upper()
563  self._newline_newline = newline.replace('LF', '\n').replace('CR', '\r')
564 
565 
585  def set_prompt(self, prompt, prompt_is_regexp=False):
586  self._verify_connection_verify_connection()
587  old = self._prompt_prompt
588  self._set_prompt_set_prompt(prompt, prompt_is_regexp)
589  if old[1]:
590  return old[0].pattern, True
591  return old
592 
593  def _set_prompt(self, prompt, prompt_is_regexp):
594  if is_truthy(prompt_is_regexp):
595  self._prompt_prompt = (re.compile(prompt), True)
596  else:
597  self._prompt_prompt = (prompt, False)
598 
599  def _prompt_is_set(self):
600  return self._prompt_prompt[0] is not None
601 
602  @keyword(types=None)
603 
621  def set_encoding(self, encoding=None, errors=None):
622  self._verify_connection_verify_connection()
623  if self._terminal_emulator_terminal_emulator:
624  raise AssertionError("Encoding can not be changed when terminal emulation is used.")
625  old = self._encoding_encoding
626  self._set_encoding_set_encoding(encoding or old[0], errors or old[1])
627  return old
628 
629  def _set_encoding(self, encoding, errors):
630  self._encoding_encoding = (encoding.upper(), errors)
631 
632  def _encode(self, text):
633  if is_bytes(text):
634  return text
635  if self._encoding_encoding[0] == 'NONE':
636  return text.encode('ASCII')
637  return text.encode(*self._encoding_encoding)
638 
639  def _decode(self, bytes):
640  if self._encoding_encoding[0] == 'NONE':
641  return bytes
642  return bytes.decode(*self._encoding_encoding)
643 
644 
649  def set_telnetlib_log_level(self, level):
650  self._verify_connection_verify_connection()
651  old = self._telnetlib_log_level_telnetlib_log_level
652  self._set_telnetlib_log_level_set_telnetlib_log_level(level)
653  return old
654 
655  def _set_telnetlib_log_level(self, level):
656  if level.upper() == 'NONE':
657  self._telnetlib_log_level_telnetlib_log_level = 'NONE'
658  elif self._is_valid_log_level_is_valid_log_level(level) is False:
659  raise AssertionError("Invalid log level '%s'" % level)
660  self._telnetlib_log_level_telnetlib_log_level = level.upper()
661 
662 
670  def set_default_log_level(self, level):
671  self._verify_connection_verify_connection()
672  old = self._default_log_level_default_log_level
673  self._set_default_log_level_set_default_log_level(level)
674  return old
675 
676  def _set_default_log_level(self, level):
677  if level is None or not self._is_valid_log_level_is_valid_log_level(level):
678  raise AssertionError("Invalid log level '%s'" % level)
679  self._default_log_level_default_log_level = level.upper()
680 
681  def _is_valid_log_level(self, level):
682  if level is None:
683  return True
684  if not is_string(level):
685  return False
686  return level.upper() in ('TRACE', 'DEBUG', 'INFO', 'WARN')
687 
688 
698  def close_connection(self, loglevel=None):
699  if self.sock:
700  self.sock.shutdown(socket.SHUT_RDWR)
701  self.close()
702  output = self._decode_decode(self.read_all())
703  self._log_log(output, loglevel)
704  return output
705 
706 
730  def login(self, username, password, login_prompt='login: ',
731  password_prompt='Password: ', login_timeout='1 second',
732  login_incorrect='Login incorrect'):
733  output = self._submit_credentials_submit_credentials(username, password, login_prompt,
734  password_prompt)
735  if self._prompt_is_set_prompt_is_set():
736  success, output2 = self._read_until_prompt_read_until_prompt()
737  else:
738  success, output2 = self._verify_login_without_prompt_verify_login_without_prompt(
739  login_timeout, login_incorrect)
740  output += output2
741  self._log_log(output)
742  if not success:
743  raise AssertionError('Login incorrect')
744  return output
745 
746  def _submit_credentials(self, username, password, login_prompt, password_prompt):
747  # Using write_bare here instead of write because don't want to wait for
748  # newline: https://github.com/robotframework/robotframework/issues/1371
749  output = self.read_untilread_until(login_prompt, 'TRACE')
750  self.write_barewrite_bare(username + self._newline_newline)
751  output += self.read_untilread_until(password_prompt, 'TRACE')
752  self.write_barewrite_bare(password + self._newline_newline)
753  return output
754 
755  def _verify_login_without_prompt(self, delay, incorrect):
756  time.sleep(timestr_to_secs(delay))
757  output = self.readread('TRACE')
758  success = incorrect not in output
759  return success, output
760 
761 
777  def write(self, text, loglevel=None):
778  newline = self._get_newline_for_get_newline_for(text)
779  if newline in text:
780  raise RuntimeError("'Write' keyword cannot be used with strings "
781  "containing newlines. Use 'Write Bare' instead.")
782  self.write_barewrite_bare(text + newline)
783  # Can't read until 'text' because long lines are cut strangely in the output
784  return self.read_untilread_until(self._newline_newline, loglevel)
785 
786  def _get_newline_for(self, text):
787  if is_bytes(text):
788  return self._encode_encode(self._newline_newline)
789  return self._newline_newline
790 
791 
796  def write_bare(self, text):
797  self._verify_connection_verify_connection()
798  telnetlib.Telnet.write(self, self._encode_encode(text))
799 
800 
822  def write_until_expected_output(self, text, expected, timeout,
823  retry_interval, loglevel=None):
824  timeout = timestr_to_secs(timeout)
825  retry_interval = timestr_to_secs(retry_interval)
826  maxtime = time.time() + timeout
827  while time.time() < maxtime:
828  self.write_barewrite_bare(text)
829  self.read_untilread_until(text, loglevel)
830  try:
831  with self._custom_timeout_custom_timeout(retry_interval):
832  return self.read_untilread_until(expected, loglevel)
833  except AssertionError:
834  pass
835  raise NoMatchError(expected, timeout)
836 
837 
850  def write_control_character(self, character):
851  self._verify_connection_verify_connection()
852  self.sock.sendall(telnetlib.IAC + self._get_control_character_get_control_character(character))
853 
854  def _get_control_character(self, int_or_name):
855  try:
856  ordinal = int(int_or_name)
857  return bytes(bytearray([ordinal]))
858  except ValueError:
859  return self._convert_control_code_name_to_character_convert_control_code_name_to_character(int_or_name)
860 
862  code_names = {
863  'BRK' : telnetlib.BRK,
864  'IP' : telnetlib.IP,
865  'AO' : telnetlib.AO,
866  'AYT' : telnetlib.AYT,
867  'EC' : telnetlib.EC,
868  'EL' : telnetlib.EL,
869  'NOP' : telnetlib.NOP
870  }
871  try:
872  return code_names[name]
873  except KeyError:
874  raise RuntimeError("Unsupported control character '%s'." % name)
875 
876 
881  def read(self, loglevel=None):
882  self._verify_connection_verify_connection()
883  output = self._decode_decode(self.read_very_eager())
884  if self._terminal_emulator_terminal_emulator:
885  self._terminal_emulator_terminal_emulator.feed(output)
886  output = self._terminal_emulator_terminal_emulator.read()
887  self._log_log(output, loglevel)
888  return output
889 
890 
899  def read_until(self, expected, loglevel=None):
900  success, output = self._read_until_read_until(expected)
901  self._log_log(output, loglevel)
902  if not success:
903  raise NoMatchError(expected, self._timeout_timeout, output)
904  return output
905 
906  def _read_until(self, expected):
907  self._verify_connection_verify_connection()
908  if self._terminal_emulator_terminal_emulator:
909  return self._terminal_read_until_terminal_read_until(expected)
910  expected = self._encode_encode(expected)
911  output = telnetlib.Telnet.read_until(self, expected, self._timeout_timeout)
912  return output.endswith(expected), self._decode_decode(output)
913 
914  @property
915  _terminal_frequency = property
916 
918  return min(self.INTERNAL_UPDATE_FREQUENCYINTERNAL_UPDATE_FREQUENCY, self._timeout_timeout)
919 
920  def _terminal_read_until(self, expected):
921  max_time = time.time() + self._timeout_timeout
922  output = self._terminal_emulator_terminal_emulator.read_until(expected)
923  if output:
924  return True, output
925  while time.time() < max_time:
926  output = telnetlib.Telnet.read_until(self, self._encode_encode(expected),
927  self._terminal_frequency_terminal_frequency_terminal_frequency)
928  self._terminal_emulator_terminal_emulator.feed(self._decode_decode(output))
929  output = self._terminal_emulator_terminal_emulator.read_until(expected)
930  if output:
931  return True, output
932  return False, self._terminal_emulator_terminal_emulator.read()
933 
934  def _read_until_regexp(self, *expected):
935  self._verify_connection_verify_connection()
936  if self._terminal_emulator_terminal_emulator:
937  return self._terminal_read_until_regexp_terminal_read_until_regexp(expected)
938  expected = [self._encode_encode(exp) if is_string(exp) else exp
939  for exp in expected]
940  return self._telnet_read_until_regexp_telnet_read_until_regexp(expected)
941 
942  def _terminal_read_until_regexp(self, expected_list):
943  max_time = time.time() + self._timeout_timeout
944  regexps_bytes = [self._to_byte_regexp_to_byte_regexp(rgx) for rgx in expected_list]
945  regexps_unicode = [re.compile(self._decode_decode(rgx.pattern))
946  for rgx in regexps_bytes]
947  out = self._terminal_emulator_terminal_emulator.read_until_regexp(regexps_unicode)
948  if out:
949  return True, out
950  while time.time() < max_time:
951  output = self.expect(regexps_bytes, self._terminal_frequency_terminal_frequency_terminal_frequency)[-1]
952  self._terminal_emulator_terminal_emulator.feed(self._decode_decode(output))
953  out = self._terminal_emulator_terminal_emulator.read_until_regexp(regexps_unicode)
954  if out:
955  return True, out
956  return False, self._terminal_emulator_terminal_emulator.read()
957 
958  def _telnet_read_until_regexp(self, expected_list):
959  expected = [self._to_byte_regexp_to_byte_regexp(exp) for exp in expected_list]
960  try:
961  index, _, output = self.expect(expected, self._timeout_timeout)
962  except TypeError:
963  index, output = -1, b''
964  return index != -1, self._decode_decode(output)
965 
966  def _to_byte_regexp(self, exp):
967  if is_bytes(exp):
968  return re.compile(exp)
969  if is_string(exp):
970  return re.compile(self._encode_encode(exp))
971  pattern = exp.pattern
972  if is_bytes(pattern):
973  return exp
974  return re.compile(self._encode_encode(pattern))
975 
976 
997  def read_until_regexp(self, *expected):
998  if not expected:
999  raise RuntimeError('At least one pattern required')
1000  if self._is_valid_log_level_is_valid_log_level(expected[-1]):
1001  loglevel = expected[-1]
1002  expected = expected[:-1]
1003  else:
1004  loglevel = None
1005  success, output = self._read_until_regexp_read_until_regexp(*expected)
1006  self._log_log(output, loglevel)
1007  if not success:
1008  expected = [exp if is_string(exp) else exp.pattern
1009  for exp in expected]
1010  raise NoMatchError(expected, self._timeout_timeout, output)
1011  return output
1012 
1013 
1030  def read_until_prompt(self, loglevel=None, strip_prompt=False):
1031  if not self._prompt_is_set_prompt_is_set():
1032  raise RuntimeError('Prompt is not set.')
1033  success, output = self._read_until_prompt_read_until_prompt()
1034  self._log_log(output, loglevel)
1035  if not success:
1036  prompt, regexp = self._prompt_prompt
1037  raise AssertionError("Prompt '%s' not found in %s."
1038  % (prompt if not regexp else prompt.pattern,
1039  secs_to_timestr(self._timeout_timeout)))
1040  if is_truthy(strip_prompt):
1041  output = self._strip_prompt_strip_prompt(output)
1042  return output
1043 
1045  prompt, regexp = self._prompt_prompt
1046  read_until = self._read_until_regexp_read_until_regexp if regexp else self._read_until_read_until
1047  return read_until(prompt)
1048 
1049  def _strip_prompt(self, output):
1050  prompt, regexp = self._prompt_prompt
1051  if not regexp:
1052  length = len(prompt)
1053  else:
1054  match = prompt.search(output)
1055  length = match.end() - match.start()
1056  return output[:-length]
1057 
1058 
1074  def execute_command(self, command, loglevel=None, strip_prompt=False):
1075  self.writewrite(command, loglevel)
1076  return self.read_until_promptread_until_prompt(loglevel, strip_prompt)
1077 
1078  @contextmanager
1079  def _custom_timeout(self, timeout):
1080  old = self.set_timeoutset_timeout(timeout)
1081  try:
1082  yield
1083  finally:
1084  self.set_timeoutset_timeout(old)
1085 
1087  if not self.sock:
1088  raise RuntimeError('No connection open')
1089 
1090  def _log(self, msg, level=None):
1091  msg = msg.strip()
1092  if msg:
1093  logger.write(msg, level or self._default_log_level_default_log_level)
1094 
1095  def _negotiate_options(self, sock, cmd, opt):
1096  # We don't have state changes in our accepted telnet options.
1097  # Therefore, we just track if we've already responded to an option. If
1098  # this is the case, we must not send any response.
1099  if cmd in (telnetlib.DO, telnetlib.DONT, telnetlib.WILL, telnetlib.WONT):
1100  if (cmd, opt) in self._opt_responses_opt_responses:
1101  return
1102  else:
1103  self._opt_responses_opt_responses.append((cmd, opt))
1104 
1105  # This is supposed to turn server side echoing on and turn other options off.
1106  if opt == telnetlib.ECHO and cmd in (telnetlib.WILL, telnetlib.WONT):
1107  self._opt_echo_on_opt_echo_on(opt)
1108  elif cmd == telnetlib.DO and opt == telnetlib.TTYPE and self._terminal_type_terminal_type:
1109  self._opt_terminal_type_opt_terminal_type(opt, self._terminal_type_terminal_type)
1110  elif cmd == telnetlib.DO and opt == telnetlib.NEW_ENVIRON and self._environ_user_environ_user:
1111  self._opt_environ_user_opt_environ_user(opt, self._environ_user_environ_user)
1112  elif cmd == telnetlib.DO and opt == telnetlib.NAWS and self._window_size_window_size:
1113  self._opt_window_size_opt_window_size(opt, *self._window_size_window_size)
1114  elif opt != telnetlib.NOOPT:
1115  self._opt_dont_and_wont_opt_dont_and_wont(cmd, opt)
1116 
1117  def _opt_echo_on(self, opt):
1118  return self.sock.sendall(telnetlib.IAC + telnetlib.DO + opt)
1119 
1120  def _opt_terminal_type(self, opt, terminal_type):
1121  self.sock.sendall(telnetlib.IAC + telnetlib.WILL + opt)
1122  self.sock.sendall(telnetlib.IAC + telnetlib.SB + telnetlib.TTYPE
1123  + self.NEW_ENVIRON_ISNEW_ENVIRON_IS + terminal_type
1124  + telnetlib.IAC + telnetlib.SE)
1125 
1126  def _opt_environ_user(self, opt, environ_user):
1127  self.sock.sendall(telnetlib.IAC + telnetlib.WILL + opt)
1128  self.sock.sendall(telnetlib.IAC + telnetlib.SB + telnetlib.NEW_ENVIRON
1129  + self.NEW_ENVIRON_ISNEW_ENVIRON_IS + self.NEW_ENVIRON_VARNEW_ENVIRON_VAR
1130  + b"USER" + self.NEW_ENVIRON_VALUENEW_ENVIRON_VALUE + environ_user
1131  + telnetlib.IAC + telnetlib.SE)
1132 
1133  def _opt_window_size(self, opt, window_x, window_y):
1134  self.sock.sendall(telnetlib.IAC + telnetlib.WILL + opt)
1135  self.sock.sendall(telnetlib.IAC + telnetlib.SB + telnetlib.NAWS
1136  + struct.pack('!HH', window_x, window_y)
1137  + telnetlib.IAC + telnetlib.SE)
1138 
1139  def _opt_dont_and_wont(self, cmd, opt):
1140  if cmd in (telnetlib.DO, telnetlib.DONT):
1141  self.sock.sendall(telnetlib.IAC + telnetlib.WONT + opt)
1142  elif cmd in (telnetlib.WILL, telnetlib.WONT):
1143  self.sock.sendall(telnetlib.IAC + telnetlib.DONT + opt)
1144 
1145  def msg(self, msg, *args):
1146  # Forward telnetlib's debug messages to log
1147  if self._telnetlib_log_level_telnetlib_log_level != 'NONE':
1148  logger.write(msg % args, self._telnetlib_log_level_telnetlib_log_level)
1149 
1150  def _check_terminal_emulation(self, terminal_emulation):
1151  if not terminal_emulation:
1152  return False
1153  if not pyte:
1154  raise RuntimeError("Terminal emulation requires pyte module!\n"
1155  "http://pypi.python.org/pypi/pyte/")
1156  return TerminalEmulator(window_size=self._window_size_window_size,
1157  newline=self._newline_newline)
1158 
1159 
1161 
1162  def __init__(self, window_size=None, newline="\r\n"):
1163  self._rows, self._columns_columns = window_size or (200, 200)
1164  self._newline_newline = newline
1165  self._stream_stream = pyte.Stream()
1166  self._screen_screen = pyte.HistoryScreen(self._rows,
1167  self._columns_columns,
1168  history=100000)
1169  self._stream_stream.attach(self._screen_screen)
1170  self._buffer_buffer = ''
1171  self._whitespace_after_last_feed_whitespace_after_last_feed = ''
1172 
1173  @property
1174  current_output = property
1175 
1176  def current_output(self):
1177  return self._buffer_buffer + self._dump_screen_dump_screen()
1178 
1179  def _dump_screen(self):
1180  return self._get_history_get_history(self._screen_screen) + \
1181  self._get_screen_get_screen(self._screen_screen) + \
1182  self._whitespace_after_last_feed_whitespace_after_last_feed
1183 
1184  def _get_history(self, screen):
1185  if not screen.history.top:
1186  return ''
1187  rows = []
1188  for row in screen.history.top:
1189  # Newer pyte versions store row data in mappings
1190  data = (char.data for _, char in sorted(row.items()))
1191  rows.append(''.join(data).rstrip())
1192  return self._newline_newline.join(rows).rstrip(self._newline_newline) + self._newline_newline
1193 
1194  def _get_screen(self, screen):
1195  rows = (row.rstrip() for row in screen.display)
1196  return self._newline_newline.join(rows).rstrip(self._newline_newline)
1197 
1198  def feed(self, text):
1199  self._stream_stream.feed(text)
1200  self._whitespace_after_last_feed_whitespace_after_last_feed = text[len(text.rstrip()):]
1201 
1202  def read(self):
1203  current_out = self.current_outputcurrent_outputcurrent_output
1204  self._update_buffer_update_buffer('')
1205  return current_out
1206 
1207  def read_until(self, expected):
1208  current_out = self.current_outputcurrent_outputcurrent_output
1209  exp_index = current_out.find(expected)
1210  if exp_index != -1:
1211  self._update_buffer_update_buffer(current_out[exp_index+len(expected):])
1212  return current_out[:exp_index+len(expected)]
1213  return None
1214 
1215  def read_until_regexp(self, regexp_list):
1216  current_out = self.current_outputcurrent_outputcurrent_output
1217  for rgx in regexp_list:
1218  match = rgx.search(current_out)
1219  if match:
1220  self._update_buffer_update_buffer(current_out[match.end():])
1221  return current_out[:match.end()]
1222  return None
1223 
1224  def _update_buffer(self, terminal_buffer):
1225  self._buffer_buffer = terminal_buffer
1226  self._whitespace_after_last_feed_whitespace_after_last_feed = ''
1227  self._screen_screen.reset()
1228 
1229 
1231  ROBOT_SUPPRESS_NAME = True
1232 
1233  def __init__(self, expected, timeout, output=None):
1234  self.expectedexpected = expected
1235  self.timeouttimeout = secs_to_timestr(timeout)
1236  self.outputoutput = output
1237  AssertionError.__init__(self, self._get_message_get_message())
1238 
1239  def _get_message(self):
1240  expected = "'%s'" % self.expectedexpected \
1241  if is_string(self.expectedexpected) \
1242  else seq2str(self.expectedexpected, lastsep=' or ')
1243  msg = "No match found for %s in %s." % (expected, self.timeouttimeout)
1244  if self.outputoutput is not None:
1245  msg += ' Output:\n%s' % self.outputoutput
1246  return msg
def __init__(self, expected, timeout, output=None)
Definition: Telnet.py:1233
def login(self, username, password, login_prompt='login:', password_prompt='Password:', login_timeout='1 second', login_incorrect='Login incorrect')
Logs in to the Telnet server with the given user information.
Definition: Telnet.py:732
def _set_telnetlib_log_level(self, level)
Definition: Telnet.py:655
def _set_encoding(self, encoding, errors)
Definition: Telnet.py:629
def write_control_character(self, character)
Writes the given control character into the connection.
Definition: Telnet.py:850
def read_until_regexp(self, *expected)
Reads output until any of the expected regular expressions match.
Definition: Telnet.py:997
def _set_prompt(self, prompt, prompt_is_regexp)
Definition: Telnet.py:593
def _custom_timeout(self, timeout)
Definition: Telnet.py:1079
def _terminal_read_until(self, expected)
Definition: Telnet.py:920
def _verify_login_without_prompt(self, delay, incorrect)
Definition: Telnet.py:755
def set_encoding(self, encoding=None, errors=None)
Sets the encoding to use for writing and reading in the current connection.
Definition: Telnet.py:621
def _opt_environ_user(self, opt, environ_user)
Definition: Telnet.py:1126
def _read_until(self, expected)
Definition: Telnet.py:906
def __init__(self, host=None, port=23, timeout=3.0, newline='CRLF', prompt=None, prompt_is_regexp=False, encoding='UTF-8', encoding_errors='ignore', default_log_level='INFO', window_size=None, environ_user=None, terminal_emulation=False, terminal_type=None, telnetlib_log_level='TRACE', connection_timeout=None)
Definition: Telnet.py:497
def set_telnetlib_log_level(self, level)
Sets the log level used for logging in the underlying telnetlib.
Definition: Telnet.py:649
def _set_default_log_level(self, level)
Definition: Telnet.py:676
def read(self, loglevel=None)
Reads everything that is currently available in the output.
Definition: Telnet.py:881
def _opt_terminal_type(self, opt, terminal_type)
Definition: Telnet.py:1120
def _get_control_character(self, int_or_name)
Definition: Telnet.py:854
def close_connection(self, loglevel=None)
Closes the current Telnet connection.
Definition: Telnet.py:698
def _convert_control_code_name_to_character(self, name)
Definition: Telnet.py:861
def read_until(self, expected, loglevel=None)
Reads output until expected text is encountered.
Definition: Telnet.py:899
def _opt_dont_and_wont(self, cmd, opt)
Definition: Telnet.py:1139
def _log(self, msg, level=None)
Definition: Telnet.py:1090
def _is_valid_log_level(self, level)
Definition: Telnet.py:681
def _opt_window_size(self, opt, window_x, window_y)
Definition: Telnet.py:1133
def _telnet_read_until_regexp(self, expected_list)
Definition: Telnet.py:958
def _check_terminal_emulation(self, terminal_emulation)
Definition: Telnet.py:1150
def write_bare(self, text)
Writes the given text, and nothing else, into the connection.
Definition: Telnet.py:796
def _read_until_regexp(self, *expected)
Definition: Telnet.py:934
def set_default_log_level(self, level)
Sets the default log level used for logging in the current connection.
Definition: Telnet.py:670
def write_until_expected_output(self, text, expected, timeout, retry_interval, loglevel=None)
Writes the given text repeatedly, until expected appears in the output.
Definition: Telnet.py:823
def set_timeout(self, timeout)
Sets the timeout used for waiting output in the current connection.
Definition: Telnet.py:533
def set_newline(self, newline)
Sets the newline used by Write keyword in the current connection.
Definition: Telnet.py:553
def set_prompt(self, prompt, prompt_is_regexp=False)
Sets the prompt used by Read Until Prompt and Login in the current connection.
Definition: Telnet.py:585
def read_until_prompt(self, loglevel=None, strip_prompt=False)
Reads output until the prompt is encountered.
Definition: Telnet.py:1030
def _negotiate_options(self, sock, cmd, opt)
Definition: Telnet.py:1095
def write(self, text, loglevel=None)
Writes the given text plus a newline into the connection.
Definition: Telnet.py:777
def _submit_credentials(self, username, password, login_prompt, password_prompt)
Definition: Telnet.py:746
def _terminal_read_until_regexp(self, expected_list)
Definition: Telnet.py:942
def execute_command(self, command, loglevel=None, strip_prompt=False)
Executes the given command and reads, logs, and returns everything until the prompt.
Definition: Telnet.py:1074
A library providing communication over Telnet connections.
Definition: Telnet.py:277
def close_all_connections(self)
Closes all open connections and empties the connection cache.
Definition: Telnet.py:482
def _get_library_keywords(self)
Definition: Telnet.py:330
def _get_connection_keywords(self)
Definition: Telnet.py:345
def _get_keywords(self, source, excluded)
Definition: Telnet.py:335
def _parse_window_size(self, window_size)
Definition: Telnet.py:415
def _set_connection_timeout(self, connection_timeout)
Definition: Telnet.py:431
def __getattr__(self, name)
Definition: Telnet.py:353
def __init__(self, timeout='3 seconds', newline='CRLF', prompt=None, prompt_is_regexp=False, encoding='UTF-8', encoding_errors='ignore', default_log_level='INFO', window_size=None, environ_user=None, terminal_emulation=False, terminal_type=None, telnetlib_log_level='TRACE', connection_timeout=None)
Telnet library can be imported with optional configuration parameters.
Definition: Telnet.py:310
def open_connection(self, host, alias=None, port=23, timeout=None, newline=None, prompt=None, prompt_is_regexp=False, encoding=None, encoding_errors=None, default_log_level=None, window_size=None, environ_user=None, terminal_emulation=None, terminal_type=None, telnetlib_log_level=None, connection_timeout=None)
Opens a new Telnet connection to the given host and port.
Definition: Telnet.py:384
def _get_connection(self, *args)
Can be overridden to use a custom connection.
Definition: Telnet.py:428
def switch_connection(self, index_or_alias)
Switches between active connections using an index or an alias.
Definition: Telnet.py:467
def _is_keyword(self, name, source, excluded)
Definition: Telnet.py:339
def __init__(self, window_size=None, newline="\r\n")
Definition: Telnet.py:1162
def _update_buffer(self, terminal_buffer)
Definition: Telnet.py:1224
def read_until_regexp(self, regexp_list)
Definition: Telnet.py:1215
def seq2str(sequence, quote="'", sep=', ', lastsep=' and ')
Returns sequence in format ‘'item 1’, 'item 2' and 'item 3'`.
Definition: misc.py:79
def secs_to_timestr(secs, compact=False)
Converts time in seconds to a string representation.
Definition: robottime.py:151
def timestr_to_secs(timestr, round_to=3, accept_plain_values=True)
Parses time strings like '1h 10s', '01:00:10' and '42' and returns seconds.
Definition: robottime.py:55
def is_truthy(item)
Returns True or False depending on is the item considered true or not.
Definition: robottypes.py:162
def get_version(naked=False)
Definition: version.py:24