32 import socketserver
as SocketServer
43 err_str = unicode_error[1]
44 err_len = unicode_error.end - unicode_error.start
45 next_position = unicode_error.start + err_len
46 err_hex = err_str[unicode_error.start:unicode_error.end].encode(
'hex')
48 return u'%s' % err_hex, next_position
78 return sorted(self.
profilesprofiles.keys())
87 target=self.
_server_server.serve_forever)
96 if event ==
'port' and self.
_process_process:
97 self.
_process_process.set_port(args[0])
98 if event ==
'start_test':
99 longname = args[1][
'longname']
106 if event ==
'continue':
110 if event ==
'paused':
113 if event ==
'end_test':
114 longname = args[1][
'longname']
116 if args[1][
'status'] ==
'PASS':
119 elif args[1][
'status'] ==
'SKIP':
127 ret = self.
_project_project.find_controller_by_longname(longname, testname)
142 self.
_process_process.kill(force=
True)
146 self.
_process_process.pause_on_failure(pause)
174 stdout, stderr, returncode = self.
_process_process.get_output(), \
175 self.
_process_process.get_errors(), \
176 self.
_process_process.get_returncode()
177 error, log_message = profile.format_error(stderr, returncode)
178 return stdout, error, log_message
181 return self.
_port_port
187 self.
_results_results.set_stopped(
None)
197 allow_reuse_address =
True
200 SocketServer.TCPServer.__init__(self, (
"", 0), RequestHandlerClass)
209 (name, args) = decoder.load()
210 self.server.callback(name, *args)
211 except (EOFError, IOError):
This class provides a common streaming approach for the purpose of reliably sending data over a socke...
Pass all listener events to a remote listener.
Implements a simple line-buffered socket serve.
def __init__(self, RequestHandlerClass, callback)
def test_execution_started(self)
def _start_listener_server(self, result_handler)
def run_command(self, command, cwd)
def enable(self, result_handler)
def send_continue_signal(self)
def _result_handler(self, event, *args)
def add_profile(self, name, item)
def send_stop_signal(self)
def get_output_and_errors(self, profile)
def _get_test_controller(self, longname, testname=None)
def send_step_next_signal(self)
def shutdown_server(self)
def send_pause_on_failure(self, pause)
def send_pause_signal(self)
def get_profile(self, name)
def get_profile_names(self)
def __init__(self, project)
def send_step_over_signal(self)
def get_listener_port(self)
def mixed_decoder(unicode_error)