Package x2go :: Module telekinesis
[frames] | no frames]

Source Code for Module x2go.telekinesis

  1  # -*- coding: utf-8 -*- 
  2   
  3  # Copyright (C) 2010-2015 by Mike Gabriel <mike.gabriel@das-netzwerkteam.de> 
  4  # 
  5  # Python X2Go is free software; you can redistribute it and/or modify 
  6  # it under the terms of the GNU Affero General Public License as published by 
  7  # the Free Software Foundation; either version 3 of the License, or 
  8  # (at your option) any later version. 
  9  # 
 10  # Python X2Go is distributed in the hope that it will be useful, 
 11  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 12  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 13  # GNU Affero General Public License for more details. 
 14  # 
 15  # You should have received a copy of the GNU Affero General Public License 
 16  # along with this program; if not, write to the 
 17  # Free Software Foundation, Inc., 
 18  # 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. 
 19   
 20  """\ 
 21  X2GoTelekinesisClient class - Connect to Telekinesis Server on X2Go Server. 
 22   
 23  """ 
 24  __NAME__ = 'x2gotelekinesisclient-pylib' 
 25   
 26  # modules 
 27  import gevent 
 28  import os 
 29  import copy 
 30  import threading 
 31  import socket 
 32   
 33  # Python X2Go modules 
 34  import x2go.forward as forward 
 35  import x2go.log as log 
 36  import x2go.utils as utils 
 37  import x2go.x2go_exceptions as x2go_exceptions 
 38   
 39  from x2go.defaults import X2GOCLIENT_OS as _X2GOCLIENT_OS 
 40  if _X2GOCLIENT_OS in ("Windows"): 
 41      import subprocess 
 42  else: 
 43      import x2go.gevent_subprocess as subprocess 
 44   
 45  from x2go.defaults import LOCAL_HOME as _LOCAL_HOME 
 46  from x2go.defaults import X2GO_SESSIONS_ROOTDIR as _X2GO_SESSIONS_ROOTDIR 
 47  from x2go.defaults import CURRENT_LOCAL_USER as _CURRENT_LOCAL_USER 
 48   
 49   
50 -class X2GoTelekinesisClient(threading.Thread):
51 """\ 52 Telekinesis is a communication framework used by X2Go. 53 54 This class implements the startup of the telekinesis client used by 55 Python X2Go. 56 57 """ 58 TEKICLIENT_CMD = 'telekinesis-client' 59 """Telekinesis client command. Might be OS specific.""" 60 TEKICLIENT_ARGS = ['-setWORMHOLEPORT={port}', '-setX2GOSID={sid}', ] 61 """Arguments to be passed to the Telekinesis client.""" 62 TEKICLIENT_ENV = {} 63 """Provide environment variables to the Telekinesis client command.""" 64
65 - def __init__(self, session_info=None, 66 ssh_transport=None, 67 sessions_rootdir=os.path.join(_LOCAL_HOME, _X2GO_SESSIONS_ROOTDIR), 68 session_instance=None, 69 logger=None, loglevel=log.loglevel_DEFAULT, ):
70 """\ 71 @param session_info: session information provided as an C{X2GoServerSessionInfo*} backend 72 instance 73 @type session_info: C{X2GoServerSessionInfo*} instance 74 @param ssh_transport: SSH transport object from C{paramiko.SSHClient} 75 @type ssh_transport: C{paramiko.Transport} instance 76 @param sessions_rootdir: base dir where X2Go session files are stored (by default: ~/.x2go) 77 @type sessions_rootdir: C{str} 78 @param logger: you can pass an L{X2GoLogger} object to the 79 L{X2GoTelekinesisClient} constructor 80 @param session_instance: the L{X2GoSession} instance this C{X2GoProxy*} instance belongs to 81 @type session_instance: L{X2GoSession} instance 82 @type logger: L{X2GoLogger} instance 83 @param loglevel: if no L{X2GoLogger} object has been supplied a new one will be 84 constructed with the given loglevel 85 @type loglevel: int 86 87 """ 88 self.tekiclient_log_stdout = None 89 self.tekiclient_log_stderr = None 90 self.tekiclient_datalog_stdout = None 91 self.tekiclient_datalog_stderr = None 92 self.fw_ctrl_tunnel = None 93 self.fw_data_tunnel = None 94 self.telekinesis_client = None 95 self.telekinesis_sshfs = None 96 97 if ssh_transport is None: 98 # we cannot go on without a valid SSH transport object 99 raise x2go_exceptions.X2GoTelekinesisClientException('SSH transport not available') 100 101 if session_instance is None: 102 # we can neither go on without a valid X2GoSession instance 103 raise x2go_exceptions.X2GoTelekinesisClientException('X2GoSession instance not available') 104 105 if logger is None: 106 self.logger = log.X2GoLogger(loglevel=loglevel) 107 else: 108 self.logger = copy.deepcopy(logger) 109 self.logger.tag = __NAME__ 110 111 if self.logger.get_loglevel() & log.loglevel_DEBUG: 112 self.TEKICLIENT_ARGS.extend(['-setDEBUG=1',]) 113 114 self.sessions_rootdir = sessions_rootdir 115 self.session_info = session_info 116 self.session_name = self.session_info.name 117 self.ssh_transport = ssh_transport 118 self.session_instance = session_instance 119 self.tekiclient = None 120 self.tekiclient_log = 'telekinesis-client.log' 121 self.tekiclient_datalog = 'telekinesis-client-sshfs.log' 122 self.TEKICLIENT_ENV = os.environ.copy() 123 self.local_tekictrl_port = self.session_info.tekictrl_port 124 self.local_tekidata_port = self.session_info.tekidata_port 125 126 threading.Thread.__init__(self) 127 self.daemon = True
128
129 - def __del__(self):
130 """\ 131 On instance destruction make sure this telekinesis client thread is stopped properly. 132 133 """ 134 self.stop_thread()
135
136 - def has_telekinesis_client(self):
137 """\ 138 Test if the Telekinesis client command is installed on this machine. 139 140 @return: C{True} if the Telekinesis client command is available 141 @rtype: C{bool} 142 143 """ 144 ### 145 ### FIXME: Test if user is in fuse group, as well!!! 146 ### 147 if utils.which('telekinesis-client'): 148 return True 149 else: 150 return False
151
152 - def _tidy_up(self):
153 """\ 154 Close any left open port forwarding tunnel, also close Telekinesis client's log file, 155 if left open. 156 157 """ 158 if self.tekiclient: 159 self.logger('Shutting down Telekinesis client subprocess', loglevel=log.loglevel_DEBUG) 160 try: 161 self.tekiclient.kill() 162 except OSError, e: 163 self.logger('Telekinesis client shutdown gave a message that we may ignore: %s' % str(e), loglevel=log.loglevel_WARN) 164 self.tekiclient = None 165 166 if self.fw_ctrl_tunnel is not None: 167 self.logger('Shutting down Telekinesis wormhole', loglevel=log.loglevel_DEBUG) 168 forward.stop_forward_tunnel(self.fw_ctrl_tunnel) 169 self.fw_ctrl_tunnel = None 170 171 if self.telekinesis_sshfs is not None: 172 telekinesis_sshfs_command = ['fusermount', '-u', '/tmp/.x2go-{local_user}/telekinesis/S-{sid}/'.format(local_user=_CURRENT_LOCAL_USER, sid=self.session_name), ] 173 self.logger('Umounting SSHFS mount for Telekinesis via forking a threaded subprocess: %s' % " ".join(telekinesis_sshfs_command), loglevel=log.loglevel_DEBUG) 174 self.telekinesis_sshfs_umount = subprocess.Popen(telekinesis_sshfs_command, 175 env=self.TEKICLIENT_ENV, 176 stdin=None, 177 stdout=self.tekiclient_datalog_stdout, 178 stderr=self.tekiclient_datalog_stderr, 179 shell=False) 180 self.telekinesis_sshfs = None 181 182 if self.fw_data_tunnel is not None: 183 self.logger('Shutting down Telekinesis DATA tunnel', loglevel=log.loglevel_DEBUG) 184 forward.stop_forward_tunnel(self.fw_data_tunnel) 185 self.fw_data_tunnel = None 186 if self.tekiclient_log_stdout is not None: 187 self.tekiclient_log_stdout.close() 188 if self.tekiclient_log_stderr is not None: 189 self.tekiclient_log_stderr.close() 190 if self.tekiclient_datalog_stdout is not None: 191 self.tekiclient_datalog_stdout.close() 192 if self.tekiclient_datalog_stderr is not None: 193 self.tekiclient_datalog_stderr.close()
194
195 - def stop_thread(self):
196 """\ 197 End the thread runner and tidy up. 198 199 """ 200 self._keepalive = False 201 # wait for thread loop to finish... 202 _count = 0 203 _maxwait = 40 204 while self.tekiclient is not None and (_count < _maxwait): 205 _count += 1 206 self.logger('waiting for Telekinesis client to shut down: 0.5s x %s' % _count, loglevel=log.loglevel_DEBUG) 207 gevent.sleep(.5)
208
209 - def run(self):
210 """\ 211 Start the X2Go Telekinesis client command. The Telekinesis client command utilizes a 212 Paramiko/SSH based forwarding tunnel (openssh -L option). This tunnel 213 gets started here and is forked into background (Greenlet/gevent). 214 215 """ 216 self._keepalive = True 217 self.tekiclient = None 218 219 try: 220 os.makedirs(self.session_info.local_container) 221 except OSError, e: 222 if e.errno == 17: 223 # file exists 224 pass 225 226 try: 227 if self.ssh_transport.getpeername()[0] in ('::1', '127.0.0.1', 'localhost', 'localhost.localdomain'): 228 self.local_tekictrl_port += 10000 229 except socket.error: 230 raise x2go_exceptions.X2GoControlSessionException('The control session has died unexpectedly.') 231 self.local_tekictrl_port = utils.detect_unused_port(preferred_port=self.local_tekictrl_port) 232 233 self.fw_ctrl_tunnel = forward.start_forward_tunnel(local_port=self.local_tekictrl_port, 234 remote_port=self.session_info.tekictrl_port, 235 ssh_transport=self.ssh_transport, 236 session_instance=self.session_instance, 237 session_name=self.session_name, 238 subsystem='Telekinesis Wormhole', 239 logger=self.logger, 240 ) 241 # update the proxy port in PROXY_ARGS 242 self._update_local_tekictrl_socket(self.local_tekictrl_port) 243 244 cmd_line = self._generate_cmdline() 245 246 self.tekiclient_log_stdout = open('%s/%s' % (self.session_info.local_container, self.tekiclient_log, ), 'a') 247 self.tekiclient_log_stderr = open('%s/%s' % (self.session_info.local_container, self.tekiclient_log, ), 'a') 248 self.logger('forking threaded subprocess: %s' % " ".join(cmd_line), loglevel=log.loglevel_DEBUG) 249 250 while not self.tekiclient: 251 gevent.sleep(.2) 252 p = self.tekiclient = subprocess.Popen(cmd_line, 253 env=self.TEKICLIENT_ENV, 254 stdin=None, 255 stdout=self.tekiclient_log_stdout, 256 stderr=self.tekiclient_log_stderr, 257 shell=False) 258 259 while self._keepalive: 260 gevent.sleep(1) 261 262 try: 263 p.terminate() 264 self.logger('terminating Telekinesis client: %s' % p, loglevel=log.loglevel_DEBUG) 265 except OSError, e: 266 if e.errno == 3: 267 # No such process 268 pass 269 270 # once all is over... 271 self._tidy_up()
272
273 - def _update_local_tekictrl_socket(self, port):
274 for idx, a in enumerate(self.TEKICLIENT_ARGS): 275 if a.startswith('-setWORMHOLEPORT='): 276 self.TEKICLIENT_ARGS[idx] = '-setWORMHOLEPORT=%s' % port
277
278 - def _generate_cmdline(self):
279 """\ 280 Generate the NX proxy command line for execution. 281 282 """ 283 cmd_line = [ self.TEKICLIENT_CMD, ] 284 _tekiclient_args = " ".join(self.TEKICLIENT_ARGS).format(sid=self.session_name).split(' ') 285 cmd_line.extend(_tekiclient_args) 286 return cmd_line
287
288 - def start_telekinesis(self):
289 """\ 290 Start the thread runner and wait for the Telekinesis client to come up. 291 292 @return: a subprocess instance that knows about the externally started Telekinesis client command. 293 @rtype: C{obj} 294 295 """ 296 self.logger('starting local Telekinesis client...', loglevel=log.loglevel_INFO) 297 298 # set up Telekinesis data channel first... (via an SSHFS mount) 299 self.logger('Connecting Telekinesis data channel first via SSHFS host=127.0.0.1, port=%s.' % (self.session_info.tekidata_port,), loglevel=log.loglevel_DEBUG) 300 301 if self.session_info is None or self.ssh_transport is None or not self.session_info.local_container: 302 return None, False 303 304 try: 305 if self.ssh_transport.getpeername()[0] in ('::1', '127.0.0.1', 'localhost', 'localhost.localdomain'): 306 self.local_tekidata_port += 10000 307 except socket.error: 308 raise x2go_exceptions.X2GoControlSessionException('The control session has died unexpectedly.') 309 self.local_tekidata_port = utils.detect_unused_port(preferred_port=self.local_tekidata_port) 310 311 self.fw_data_tunnel = forward.start_forward_tunnel(local_port=self.local_tekidata_port, 312 remote_port=self.session_info.tekidata_port, 313 ssh_transport=self.ssh_transport, 314 session_instance=self.session_instance, 315 session_name=self.session_name, 316 subsystem='Telekinesis Data', 317 logger=self.logger, 318 ) 319 self.tekiclient_datalog_stdout = open('%s/%s' % (self.session_info.local_container, self.tekiclient_datalog, ), 'a') 320 self.tekiclient_datalog_stderr = open('%s/%s' % (self.session_info.local_container, self.tekiclient_datalog, ), 'a') 321 try: 322 os.makedirs(os.path.normpath('/tmp/.x2go-{local_user}/telekinesis/S-{sid}/'.format(local_user=_CURRENT_LOCAL_USER, sid=self.session_name))) 323 except OSError, e: 324 if e.errno == 17: 325 # file exists 326 pass 327 if self.session_instance.has_server_feature('X2GO_TELEKINESIS_TEKISFTPSERVER'): 328 # the Perl-based SFTP-Server shipped with Telekinesis Server (teki-sftpserver) supports 329 # chroot'ing. Let's use this by default, if available. 330 telekinesis_sshfs_command = ['sshfs', 331 '-o', 'compression=no', 332 '-o', 'follow_symlinks', 333 '-o', 'directport={tekidata_port}'.format(tekidata_port=self.local_tekidata_port), 334 '127.0.0.1:/', 335 '/tmp/.x2go-{local_user}/telekinesis/S-{sid}/'.format(local_user=_CURRENT_LOCAL_USER, sid=self.session_name), 336 ] 337 else: 338 # very first Telekinesis Server implementation used OpenSSH's sftp-server 339 # that lacks/lacked chroot capability 340 telekinesis_sshfs_command = ['sshfs', 341 '-o', 'compression=no', 342 '-o', 'follow_symlinks', 343 '-o', 'directport={tekidata_port}'.format(tekidata_port=self.local_tekidata_port), 344 '127.0.0.1:{remote_home}/.x2go/C-{sid}/telekinesis/remote/'.format(remote_home=self.session_instance.get_remote_home(), sid=self.session_name), 345 '/tmp/.x2go-{local_user}/telekinesis/S-{sid}/'.format(local_user=_CURRENT_LOCAL_USER, sid=self.session_name), 346 ] 347 self.logger('forking threaded subprocess: %s' % " ".join(telekinesis_sshfs_command), loglevel=log.loglevel_DEBUG) 348 try: 349 self.telekinesis_sshfs = subprocess.Popen(telekinesis_sshfs_command, 350 env=self.TEKICLIENT_ENV, 351 stdin=None, 352 stdout=self.tekiclient_datalog_stdout, 353 stderr=self.tekiclient_datalog_stderr, 354 shell=False) 355 except OSError, e: 356 if e.errno == 2: 357 self.logger("The 'sshfs' command is not available on your client machine, please install it to get Telekinesis up and running!!!", loglevel=log.loglevel_WARN) 358 else: 359 self.logger("An error occurred while setting up the Telekinesis data stream (via SSHFS): %s (errno: %s)" % (str(e), e.errno), loglevel=log.loglevel_WARN) 360 return None, False 361 362 # also wait for telekinesis data tunnel to become active 363 _count = 0 364 _maxwait = 40 365 while self.fw_data_tunnel and (not self.fw_data_tunnel.is_active) and (not self.fw_data_tunnel.failed) and (_count < _maxwait): 366 _count += 1 367 self.logger('waiting for Telekinesis data tunnel to come up: 0.5s x %s' % _count, loglevel=log.loglevel_DEBUG) 368 gevent.sleep(.5) 369 370 # only start TeKi client if the data connection is up and running... 371 if self.fw_data_tunnel.is_active and self.telekinesis_sshfs: 372 373 gevent.sleep(1) 374 threading.Thread.start(self) 375 376 self.logger('Telekinesis client tries to connect to host=127.0.0.1, port=%s.' % (self.session_info.tekictrl_port,), loglevel=log.loglevel_DEBUG) 377 self.logger('Telekinesis client writes its log to %s.' % os.path.join(self.session_info.local_container, self.tekiclient_log), loglevel=log.loglevel_DEBUG) 378 while self.tekiclient is None and _count < _maxwait: 379 _count += 1 380 self.logger('waiting for Telekinesis client to come up: 0.4s x %s' % _count, loglevel=log.loglevel_DEBUG) 381 gevent.sleep(.4) 382 383 # only wait for the TeKi wormhole tunnel (ctrl tunnel) if TeKi could be started successfully... 384 if self.tekiclient is not None: 385 386 # also wait for telekinesis wormhole to become active 387 _count = 0 388 _maxwait = 40 389 while self.fw_ctrl_tunnel and (not self.fw_ctrl_tunnel.is_active) and (not self.fw_ctrl_tunnel.failed) and (_count < _maxwait): 390 _count += 1 391 self.logger('waiting for Telekinesis wormhole to come up: 0.5s x %s' % _count, loglevel=log.loglevel_DEBUG) 392 gevent.sleep(.5) 393 394 else: 395 self.logger('Aborting Telekinesis client startup for session %s, because the Telekinesis data connection failed to be established.' % (self.session_name,), loglevel=log.loglevel_WARN) 396 397 return self.tekiclient, bool(self.tekiclient) and (self.fw_ctrl_tunnel and self.fw_ctrl_tunnel.is_active)
398
399 - def ok(self):
400 """\ 401 Check if a proxy instance is up and running. 402 403 @return: Proxy state, C{True} for proxy being up-and-running, C{False} otherwise 404 @rtype C{bool} 405 406 """ 407 return bool(self.tekiclient and self.tekiclient.poll() is None) and self.fw_ctrl_tunnel.is_active and self.fw_data_tunnel.is_active
408