20 from robot.variables import contains_variable, is_list_variable, VariableAssignment
22 from .bodyrunner
import BodyRunner
23 from .model
import Keyword
24 from .outputcapture
import OutputCapturer
25 from .signalhandler
import STOP_SIGNAL_MONITOR
26 from .statusreporter
import StatusReporter
31 def __init__(self, handler, name=None, languages=None):
33 self.
namename = name
or handler.name
47 return self.
_handler_handler.library.name
55 def run(self, kw, context, run=True):
56 assignment = VariableAssignment(kw.assign)
57 result = self.
_get_result_get_result(kw, assignment)
60 with assignment.assigner(context)
as assigner:
61 return_value = self.
_run_run(context, kw.args)
62 assigner.assign(return_value)
67 return KeywordResult(kwname=self.
namename,
68 libname=handler.libname,
71 assign=tuple(assignment),
75 def _run(self, context, args):
78 context.output.message(message)
79 variables = context.variables
if not context.dry_run
else None
80 positional, named = self.
_handler_handler.resolve_arguments(args, variables,
82 context.output.trace(
lambda: self.
_trace_log_args_trace_log_args(positional, named))
84 positional, dict(named))
88 args = [
prepr(arg)
for arg
in positional]
90 return 'Arguments: [ %s ]' %
' | '.join(args)
94 if timeout
and timeout.active:
96 with LOGGER.delayed_logging:
97 context.output.debug(timeout.get_message)
98 return timeout.run(handler, args=positional, kwargs=named)
100 return lambda: handler(*positional, **named)
103 return min(context.timeouts)
if context.timeouts
else None
111 STOP_SIGNAL_MONITOR.start_running_keyword(context.in_teardown)
114 STOP_SIGNAL_MONITOR.stop_running_keyword()
117 assignment = VariableAssignment(kw.assign)
118 result = self.
_get_result_get_result(kw, assignment)
120 assignment.validate_assignment()
121 self.
_dry_run_dry_run(context, kw.args)
125 self.
_run_run(context, args)
130 keywords_to_execute = (
'BuiltIn.Import Library',
131 'BuiltIn.Set Library Search Order',
133 'BuiltIn.Remove Tags')
134 return (handler.libname ==
'Reserved' or
135 handler.longname
in keywords_to_execute)
146 raise DataError(
"Positional arguments are not allowed when using "
147 "embedded arguments.")
155 result.sourcename = self.
_handler_handler.name
161 def __init__(self, handler, execute_in_dry_run=False):
182 if name ==
'Run Keyword If':
184 if name ==
'Run Keywords':
191 yield Keyword(name=kw_call[0], args=kw_call[1:])
194 while 'ELSE IF' in given_args:
197 if 'ELSE' in given_args:
202 expr, kw_call = given_args[0], given_args[1:]
207 index = list(given_args).index(control_word)
208 expr_and_call = given_args[:index]
209 remaining = given_args[index+1:]
212 raise DataError(
"Invalid 'Run Keyword If' usage.")
215 return expr_and_call[1:], remaining
218 if len(kw_call) >= min_length:
224 yield Keyword(name=kw_call[0], args=kw_call[1:])
227 if 'AND' not in given_args:
228 for kw_call
in given_args:
231 while 'AND' in given_args:
232 index = list(given_args).index(
'AND')
233 kw_call, given_args = given_args[:index], given_args[index + 1:]
239 index = list(self.
_handler_handler.arguments.positional).index(
'name')
240 return [
Keyword(name=given_args[index], args=given_args[index+1:])]
Represents results of a single keyword.
def __init__(self, handler, name)
def _get_result(self, kw, assignment)
def _dry_run(self, context, args)
def _run(self, context, args)
def _runner_for(self, context, handler, positional, named)
def dry_run(self, kw, context)
def _dry_run(self, context, args)
def _run(self, context, args)
def _run_with_signal_monitoring(self, runner, context)
def __init__(self, handler, name=None, languages=None)
def _get_result(self, kw, assignment)
def _trace_log_args(self, positional, named)
def _get_timeout(self, context)
def run(self, kw, context, run=True)
def _executed_in_dry_run(self, handler)
def _run_with_output_captured_and_signal_monitor(self, runner, context)
def _run_with_output_captured_and_signal_monitor(self, runner, context)
def _dry_run(self, context, args)
def _get_dry_run_keywords_for_run_keyword(self, given_args)
def _get_run_kws_calls(self, given_args)
def _get_run_kw_if_calls(self, given_args)
def _split_run_kw_if_args(self, given_args, control_word, required_after)
def _get_dry_run_keywords_based_on_name_argument(self, given_args)
def _get_timeout(self, context)
def _validate_kw_call(self, kw_call, min_length=2)
def _get_dry_run_keywords(self, args)
def __init__(self, handler, execute_in_dry_run=False)
def _get_dry_run_keywords_for_run_keyword_if(self, given_args)
def prepr(item, width=80)
def is_list_variable(string)
def contains_variable(string, identifiers='$@&')