Commit 70d2dadd authored by 杨照杰's avatar 杨照杰

添加vscode调试用的包

parent 0c54117d
ptvsd
Copyright (c) Microsoft Corporation
All rights reserved.
MIT License
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
Metadata-Version: 2.1
Name: ptvsd
Version: 4.3.2
Summary: Remote debugging server for Python support in Visual Studio and Visual Studio Code
Home-page: https://aka.ms/ptvs
Author: Microsoft Corporation
Author-email: ptvshelp@microsoft.com
License: MIT
Platform: win_amd64
Classifier: Development Status :: 5 - Production/Stable
Classifier: Programming Language :: Python :: 2.7
Classifier: Programming Language :: Python :: 3.4
Classifier: Programming Language :: Python :: 3.5
Classifier: Programming Language :: Python :: 3.6
Classifier: Programming Language :: Python :: 3.7
Classifier: Topic :: Software Development :: Debuggers
Classifier: Operating System :: OS Independent
Classifier: License :: OSI Approved :: Eclipse Public License 2.0 (EPL-2.0)
Classifier: License :: OSI Approved :: MIT License
Requires-Python: >=2.7,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*
Description-Content-Type: text/markdown
The Python Visual Studio Debugger engine implements the Visual Studio Code debug protocol and is used as the debug engine in Visual Studio and Visual Studio Code.
The source code and the issue tracker is [hosted on GitHub](https://github.com/Microsoft/ptvsd/).
This diff is collapsed.
Wheel-Version: 1.0
Generator: bdist_wheel (0.33.4)
Root-Is-Purelib: false
Tag: cp37-cp37m-win_amd64
This diff is collapsed.
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE in the project root
# for license information.
__all__ = [
'__version__', '__author__',
'enable_attach', 'wait_for_attach', 'break_into_debugger', 'is_attached',
]
# "force_pydevd" must be imported first to ensure (via side effects)
# that the ptvsd-vendored copy of pydevd gets used.
from ._vendored import force_pydevd
from ptvsd.version import __version__, __author__
from ptvsd.attach_server import ( # noqa
attach,
break_into_debugger,
debug_this_thread,
enable_attach,
is_attached,
wait_for_attach,
)
from ptvsd.tracing import tracing # noqa
del force_pydevd
This diff is collapsed.
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE in the project root
# for license information.
import numbers
import sys
import time
import pydevd
from _pydevd_bundle.pydevd_comm import get_global_debugger
from ptvsd.pydevd_hooks import install
from ptvsd.runner import run as no_debug_runner
from ptvsd.socket import Address
from ptvsd._util import new_hidden_thread
PYDEVD_DEFAULTS = {
'--json-dap',
'--qt-support=auto',
}
def _set_pydevd_defaults(pydevd_args):
args_to_append = []
for arg in PYDEVD_DEFAULTS:
if arg not in pydevd_args:
args_to_append.append(arg)
return pydevd_args + args_to_append
########################
# high-level functions
def debug_main(address, name, kind, *extra, **kwargs):
if not kwargs.pop('wait', False) and address.isserver:
def unblock_debugger():
debugger = get_global_debugger()
while debugger is None:
time.sleep(0.1)
debugger = get_global_debugger()
debugger.ready_to_run = True
new_hidden_thread('ptvsd.unblock_debugger', unblock_debugger).start()
if kind == 'module':
run_module(address, name, *extra, **kwargs)
else:
run_file(address, name, *extra, **kwargs)
def run_main(address, name, kind, *extra, **kwargs):
addr = Address.from_raw(address)
sys.argv[:] = _run_main_argv(name, extra)
runner = kwargs.pop('_runner', no_debug_runner)
runner(addr, name, kind == 'module', *extra, **kwargs)
########################
# low-level functions
def run_module(address, modname, *extra, **kwargs):
"""Run pydevd for the given module."""
addr = Address.from_raw(address)
if not addr.isserver:
kwargs['singlesession'] = True
run = kwargs.pop('_run', _run)
prog = kwargs.pop('_prog', sys.argv[0])
filename = modname + ':'
argv = _run_argv(addr, filename, extra, _prog=prog)
argv.insert(argv.index('--file'), '--module')
run(argv, addr, **kwargs)
def run_file(address, filename, *extra, **kwargs):
"""Run pydevd for the given Python file."""
addr = Address.from_raw(address)
if not addr.isserver:
kwargs['singlesession'] = True
run = kwargs.pop('_run', _run)
prog = kwargs.pop('_prog', sys.argv[0])
argv = _run_argv(addr, filename, extra, _prog=prog)
run(argv, addr, **kwargs)
def _run_argv(address, filename, extra, _prog=sys.argv[0]):
"""Convert the given values to an argv that pydevd.main() supports."""
if '--' in extra:
pydevd = list(extra[:extra.index('--')])
extra = list(extra[len(pydevd) + 1:])
else:
pydevd = []
extra = list(extra)
pydevd = _set_pydevd_defaults(pydevd)
host, port = address
argv = [
_prog,
'--port', str(port),
]
if not address.isserver:
argv.extend([
'--client', host or 'localhost',
])
return argv + pydevd + [
'--file', filename,
] + extra
def _run_main_argv(filename, extra):
if '--' in extra:
pydevd = list(extra[:extra.index('--')])
extra = list(extra[len(pydevd) + 1:])
else:
extra = list(extra)
return [filename] + extra
def _run(argv, addr, _pydevd=pydevd, _install=install, **kwargs):
"""Start pydevd with the given commandline args."""
# Pydevd assumes that the "__main__" module is the "pydevd" module
# and does some tricky stuff under that assumption. For example,
# when the debugger starts up it calls save_main_module()
# (in pydevd_bundle/pydevd_utils.py). That function explicitly sets
# sys.modules["pydevd"] to sys.modules["__main__"] and then sets
# the __main__ module to a new one. This makes some sense since
# it gives the debugged script a fresh __main__ module.
#
# This complicates things for us since we are running a different
# file (i.e. this one) as the __main__ module. Consequently,
# sys.modules["pydevd"] gets set to ptvsd/__main__.py. Subsequent
# imports of the "pydevd" module then return the wrong module. We
# work around this by avoiding lazy imports of the "pydevd" module.
# We also replace the __main__ module with the "pydevd" module here.
if sys.modules['__main__'].__file__ != _pydevd.__file__:
sys.modules['__main___orig'] = sys.modules['__main__']
sys.modules['__main__'] = _pydevd
daemon = _install(_pydevd, addr, **kwargs)
sys.argv[:] = argv
try:
_pydevd.main()
except SystemExit as ex:
if ex.code is None:
daemon.exitcode = 0
elif isinstance(ex.code, numbers.Integral):
daemon.exitcode = int(ex.code)
else:
daemon.exitcode = 1
raise
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE in the project root
# for license information.
import pydevd
import time
from _pydevd_bundle.pydevd_comm import get_global_debugger
import ptvsd
import ptvsd.log
import ptvsd.options
from ptvsd._util import new_hidden_thread
from ptvsd.pydevd_hooks import install
from ptvsd.daemon import session_not_bound, DaemonClosedError
global_next_session = lambda: None
def enable_attach(address, on_attach=lambda: None, **kwargs):
host, port = address
def wait_for_connection(daemon, host, port, next_session=None):
ptvsd.log.debug('Waiting for pydevd ...')
debugger = get_global_debugger()
while debugger is None:
time.sleep(0.1)
debugger = get_global_debugger()
ptvsd.log.debug('Unblocking pydevd.')
debugger.ready_to_run = True
while True:
try:
session_not_bound.wait()
try:
global_next_session()
on_attach()
except DaemonClosedError:
return
except TypeError:
# May happen during interpreter shutdown
# (if some global -- such as global_next_session becomes None).
return
def start_daemon():
daemon._sock = daemon._start()
_, next_session = daemon.start_server(addr=(host, port))
global global_next_session
global_next_session = next_session
if port == 0:
_, ptvsd.options.port = daemon._server.getsockname()
else:
ptvsd.options.port = port
return daemon._sock
daemon = install(pydevd,
address,
start_server=None,
start_client=(lambda daemon, h, port: start_daemon()),
singlesession=False,
**kwargs)
ptvsd.log.debug('Starting connection listener thread')
connection_thread = new_hidden_thread('ptvsd.listen_for_connection',
wait_for_connection,
args=(daemon, host, port))
connection_thread.start()
if ptvsd.options.no_debug:
_setup_nodebug()
else:
ptvsd.log.debug('pydevd.settrace()')
pydevd.settrace(host=host,
port=port,
suspend=False,
patch_multiprocessing=ptvsd.options.multiprocess)
return daemon
def attach(address, **kwargs):
host, port = address
daemon = install(pydevd, address, singlesession=False, **kwargs)
if ptvsd.options.no_debug:
_setup_nodebug()
else:
ptvsd.log.debug('pydevd.settrace()')
pydevd.settrace(host=host,
port=port,
suspend=False,
patch_multiprocessing=ptvsd.options.multiprocess)
return daemon
def _setup_nodebug():
ptvsd.log.debug('Running pydevd in nodebug mode.')
debugger = pydevd.PyDB()
debugger.init_matplotlib_support = lambda *arg: None
# We are invoking run() solely for side effects here - setting up the
# debugger and connecting to our socket - so the code run is a no-op.
debugger.run(
file='ptvsd._remote:_nop',
globals=None,
locals=None,
is_module=True,
set_trace=False)
def _nop():
pass
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE in the project root
# for license information.
from __future__ import print_function
import contextlib
import threading
import time
import sys
import ptvsd.log
@contextlib.contextmanager
def ignore_errors(log=None):
"""A context manager that masks any raised exceptions."""
try:
yield
except Exception:
if log is not None:
ptvsd.log.exception('Ignoring error', category='I')
def call_all(callables, *args, **kwargs):
"""Return the result of calling every given object."""
results = []
for call in callables:
try:
call(*args, **kwargs)
except Exception as exc:
results.append((call, exc))
else:
results.append((call, None))
return results
########################
# threading stuff
try:
ThreadError = threading.ThreadError
except AttributeError:
ThreadError = RuntimeError
try:
base = __builtins__.TimeoutError
except AttributeError:
base = OSError
class TimeoutError(base): # noqa
"""Timeout expired."""
timeout = None
reason = None
@classmethod
def from_timeout(cls, timeout, reason=None):
"""Return a TimeoutError with the given timeout."""
msg = 'timed out (after {} seconds)'.format(timeout)
if reason is not None:
msg += ' ' + reason
self = cls(msg)
self.timeout = timeout
self.reason = reason
return self
del base # noqa
def wait(check, timeout=None, reason=None):
"""Wait for the given func to return True.
If a timeout is given and reached then raise TimeoutError.
"""
if timeout is None or timeout <= 0:
while not check():
time.sleep(0.01)
else:
if not _wait(check, timeout):
raise TimeoutError.from_timeout(timeout, reason)
def is_locked(lock):
"""Return True if the lock is locked."""
if lock is None:
return False
if not lock.acquire(False):
return True
lock_release(lock)
return False
def lock_release(lock):
"""Ensure that the lock is released."""
if lock is None:
return
try:
lock.release()
except ThreadError: # already unlocked
pass
def lock_wait(lock, timeout=None, reason='waiting for lock'):
"""Wait until the lock is not locked."""
if not _lock_acquire(lock, timeout):
raise TimeoutError.from_timeout(timeout, reason)
lock_release(lock)
if sys.version_info >= (3,):
def _lock_acquire(lock, timeout):
if timeout is None:
timeout = -1
return lock.acquire(timeout=timeout)
else:
def _lock_acquire(lock, timeout):
if timeout is None or timeout <= 0:
return lock.acquire()
def check():
return lock.acquire(False)
return _wait(check, timeout)
def _wait(check, timeout):
if check():
return True
for _ in range(int(timeout * 100)):
time.sleep(0.01)
if check():
return True
else:
return False
def new_hidden_thread(name, target, prefix='ptvsd.', daemon=True, **kwargs):
"""Return a thread that will be ignored by pydevd."""
if prefix is not None and not name.startswith(prefix):
name = prefix + name
t = threading.Thread(
name=name,
target=target,
**kwargs
)
t.pydev_do_not_trace = True
t.is_pydev_daemon_thread = True
if daemon:
t.daemon = True
return t
########################
# closing stuff
class ClosedError(RuntimeError):
"""Indicates that the object is closed."""
def close_all(closeables):
"""Return the result of closing every given object."""
results = []
for obj in closeables:
try:
obj.close()
except Exception as exc:
results.append((obj, exc))
else:
results.append((obj, None))
return results
class Closeable(object):
"""A base class for types that may be closed."""
NAME = None
FAIL_ON_ALREADY_CLOSED = True
def __init__(self):
super(Closeable, self).__init__()
self._closed = False
self._closedlock = threading.Lock()
self._handlers = []
def __del__(self):
try:
self.close()
except ClosedError:
pass
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
@property
def closed(self):
return self._closed
def add_resource_to_close(self, resource, before=False):
"""Add a resource to be closed when closing."""
close = resource.close
if before:
def handle_closing(before):
if not before:
return
close()
else:
def handle_closing(before):
if before:
return
close()
self.add_close_handler(handle_closing)
def add_close_handler(self, handle_closing, nodupe=True):
"""Add a func to be called when closing.
The func takes one arg: True if it was called before the main
close func and False if after.
"""
with self._closedlock:
if self._closed:
if self.FAIL_ON_ALREADY_CLOSED:
raise ClosedError('already closed')
return
if nodupe and handle_closing in self._handlers:
raise ValueError('close func already added')
self._handlers.append(handle_closing)
def check_closed(self):
"""Raise ClosedError if closed."""
if self._closed:
if self.NAME:
raise ClosedError('{} closed'.format(self.NAME))
else:
raise ClosedError('closed')
@contextlib.contextmanager
def while_not_closed(self):
"""A context manager under which the object will not be closed."""
with self._closedlock:
self.check_closed()
yield
def close(self):
"""Release any owned resources and clean up."""
with self._closedlock:
if self._closed:
if self.FAIL_ON_ALREADY_CLOSED:
raise ClosedError('already closed')
return
self._closed = True
handlers = list(self._handlers)
results = call_all(handlers, True)
self._log_results(results)
self._close()
results = call_all(handlers, False)
self._log_results(results)
# implemented by subclasses
def _close(self):
pass
# internal methods
def _log_results(self, results, log=None):
if log is None:
return
for obj, exc in results:
if exc is None:
continue
log('failed to close {!r} ({!r})'.format(obj, exc))
########################
# running stuff
class NotRunningError(RuntimeError):
"""Something isn't currently running."""
class AlreadyStartedError(RuntimeError):
"""Something was already started."""
class AlreadyRunningError(AlreadyStartedError):
"""Something is already running."""
class Startable(object):
"""A base class for types that may be started."""
RESTARTABLE = False
FAIL_ON_ALREADY_STOPPED = True
def __init__(self):
super(Startable, self).__init__()
self._is_running = None
self._startlock = threading.Lock()
self._numstarts = 0
def is_running(self, checkclosed=True):
"""Return True if currently running."""
if checkclosed and hasattr(self, 'check_closed'):
self.check_closed()
is_running = self._is_running
if is_running is None:
return False
return is_running()
def start(self, *args, **kwargs):
"""Begin internal execution."""
with self._startlock:
if self.is_running():
raise AlreadyRunningError()
if not self.RESTARTABLE and self._numstarts > 0:
raise AlreadyStartedError()
self._is_running = self._start(*args, **kwargs)
self._numstarts += 1
def stop(self, *args, **kwargs):
"""Stop execution and wait until done."""
with self._startlock:
# TODO: Call self.check_closed() here?
if not self.is_running(checkclosed=False):
if not self.FAIL_ON_ALREADY_STOPPED:
return
raise NotRunningError()
self._is_running = None
self._stop(*args, **kwargs)
# implemented by subclasses
def _start(self, *args, **kwargs):
"""Return an "is_running()" func after starting."""
raise NotImplementedError
def _stop(self):
raise NotImplementedError
import contextlib
from importlib import import_module
import os
import os.path
import sys
from . import _util
VENDORED_ROOT = os.path.dirname(os.path.abspath(__file__))
# TODO: Move the "pydevd" git submodule to the ptvsd/_vendored directory
# and then drop the following fallback.
if 'pydevd' not in os.listdir(VENDORED_ROOT):
VENDORED_ROOT = os.path.dirname(VENDORED_ROOT)
def list_all(resolve=False):
"""Return the list of vendored projects."""
# TODO: Derive from os.listdir(VENDORED_ROOT)?
projects = [
'pydevd',
]
if not resolve:
return projects
return [project_root(name) for name in projects]
def project_root(project):
"""Return the path the root dir of the vendored project.
If "project" is an empty string then the path prefix for vendored
projects (e.g. "ptvsd/_vendored/") will be returned.
"""
if not project:
project = ''
return os.path.join(VENDORED_ROOT, project)
def iter_project_files(project, relative=False, **kwargs):
"""Yield (dirname, basename, filename) for all files in the project."""
if relative:
with _util.cwd(VENDORED_ROOT):
for result in _util.iter_all_files(project, **kwargs):
yield result
else:
root = project_root(project)
for result in _util.iter_all_files(root, **kwargs):
yield result
def iter_packaging_files(project):
"""Yield the filenames for all files in the project.
The filenames are relative to "ptvsd/_vendored". This is most
useful for the "package data" in a setup.py.
"""
# TODO: Use default filters? __pycache__ and .pyc?
prune_dir = None
exclude_file = None
try:
mod = import_module('._{}_packaging'.format(project), __name__)
except ImportError:
pass
else:
prune_dir = getattr(mod, 'prune_dir', prune_dir)
exclude_file = getattr(mod, 'exclude_file', exclude_file)
results = iter_project_files(
project,
relative=True,
prune_dir=prune_dir,
exclude_file=exclude_file,
)
for _, _, filename in results:
yield filename
def prefix_matcher(*prefixes):
"""Return a module match func that matches any of the given prefixes."""
assert prefixes
def match(name, module):
for prefix in prefixes:
if name.startswith(prefix):
return True
else:
return False
return match
def check_modules(project, match, root=None):
"""Verify that only vendored modules have been imported."""
if root is None:
root = project_root(project)
extensions = []
unvendored = {}
for modname, mod in sys.modules.items():
if not match(modname, mod):
continue
if not hasattr(mod, '__file__'): # extension module
extensions.append(modname)
elif not mod.__file__.startswith(root):
unvendored[modname] = mod.__file__
return unvendored, extensions
@contextlib.contextmanager
def vendored(project, root=None):
"""A context manager under which the vendored project will be imported."""
if root is None:
root = project_root(project)
# Add the vendored project directory, so that it gets tried first.
sys.path.insert(0, root)
try:
yield root
finally:
sys.path.remove(root)
def preimport(project, modules, **kwargs):
"""Import each of the named modules out of the vendored project."""
with vendored(project, **kwargs):
for name in modules:
import_module(name)
from . import VENDORED_ROOT
from ._util import cwd, iter_all_files
INCLUDES = [
'setup_cython.py',
]
def iter_files():
# From the root of pydevd repo, we want only scripts and
# subdirectories that constitute the package itself (not helper
# scripts, tests etc). But when walking down into those
# subdirectories, we want everything below.
with cwd(VENDORED_ROOT):
return iter_all_files('pydevd', prune_dir, exclude_file)
def prune_dir(dirname, basename):
if basename == '__pycache__':
return True
elif dirname != 'pydevd':
return False
elif basename.startswith('pydev'):
return False
elif basename.startswith('_pydev'):
return False
return True
def exclude_file(dirname, basename):
if dirname == 'pydevd':
if basename in INCLUDES:
return False
elif not basename.endswith('.py'):
return True
elif 'pydev' not in basename:
return True
return False
if basename.endswith('.pyc'):
return True
return False
import contextlib
import os
import os.path
@contextlib.contextmanager
def cwd(dirname):
"""A context manager for operating in a different directory."""
orig = os.getcwd()
os.chdir(dirname)
try:
yield orig
finally:
os.chdir(orig)
def iter_all_files(root, prune_dir=None, exclude_file=None):
"""Yield (dirname, basename, filename) for each file in the tree.
This is an alternative to os.walk() that flattens out the tree and
with filtering.
"""
pending = [root]
while pending:
dirname = pending.pop(0)
for result in _iter_files(dirname, pending, prune_dir, exclude_file):
yield result
def iter_tree(root, prune_dir=None, exclude_file=None):
"""Yield (dirname, files) for each directory in the tree.
The list of files is actually a list of (basename, filename).
This is an alternative to os.walk() with filtering."""
pending = [root]
while pending:
dirname = pending.pop(0)
files = []
for _, b, f in _iter_files(dirname, pending, prune_dir, exclude_file):
files.append((b, f))
yield dirname, files
def _iter_files(dirname, subdirs, prune_dir, exclude_file):
for basename in os.listdir(dirname):
filename = os.path.join(dirname, basename)
if os.path.isdir(filename):
if prune_dir is not None and prune_dir(dirname, basename):
continue
subdirs.append(filename)
else:
# TODO: Use os.path.isfile() to narrow it down?
if exclude_file is not None and exclude_file(dirname, basename):
continue
yield dirname, basename, filename
from importlib import import_module
import warnings
from . import check_modules, prefix_matcher, preimport, vendored
# Ensure that pydevd is our vendored copy.
_unvendored, _ = check_modules('pydevd',
prefix_matcher('pydev', '_pydev'))
if _unvendored:
_unvendored = sorted(_unvendored.values())
msg = 'incompatible copy of pydevd already imported'
# raise ImportError(msg)
warnings.warn(msg + ':\n {}'.format('\n '.join(_unvendored)))
# Constants must be set before importing any other pydevd module
# # due to heavy use of "from" in them.
with vendored('pydevd'):
pydevd_constants = import_module('_pydevd_bundle.pydevd_constants')
# TODO: figure out what the appropriate setting is to work for both wheels and sdist.
pydevd_constants.CYTHON_SUPPORTED = False
# We limit representation size in our representation provider when needed.
pydevd_constants.MAXIMUM_VARIABLE_REPRESENTATION_SIZE = 2 ** 32
# Now make sure all the top-level modules and packages in pydevd are
# loaded. Any pydevd modules that aren't loaded at this point, will
# be loaded using their parent package's __path__ (i.e. one of the
# following).
preimport('pydevd', [
'_pydev_bundle',
'_pydev_imps',
'_pydev_runfiles',
'_pydevd_bundle',
'_pydevd_frame_eval',
'pydev_ipython',
'pydevd_concurrency_analyser',
'pydevd_plugins',
'pydevd',
])
# When pydevd is imported it sets the breakpoint behavior, but it needs to be
# overridden because the pydevd version will connect to the remote debugger by
# default, but without using the ptvsd protocol (so, we need to use the ptvsd
# API to handle things as expected by the debug adapter).
import pydevd # noqa
import ptvsd # noqa
def ptvsd_breakpointhook():
ptvsd.break_into_debugger()
pydevd.install_breakpointhook(ptvsd_breakpointhook)
# Ensure that pydevd uses JSON protocol
from _pydevd_bundle.pydevd_constants import JSON_PROTOCOL
from _pydevd_bundle.pydevd_defaults import PydevdCustomization
PydevdCustomization.DEFAULT_PROTOCOL = JSON_PROTOCOL
'''
License: Apache 2.0
Author: Yuli Fitterman
'''
# noinspection PyBroadException
import types
from _pydevd_bundle.pydevd_constants import IS_JYTHON, IS_PY3K
try:
import inspect
except:
try:
from _pydev_imps import _pydev_inspect as inspect
except:
import traceback;
traceback.print_exc() # Ok, no inspect available (search will not work)from _pydevd_bundle.pydevd_constants import IS_JYTHON, IS_PY3K
from _pydev_bundle._pydev_imports_tipper import signature_from_docstring
def is_bound_method(obj):
if isinstance(obj, types.MethodType):
return getattr(obj, '__self__', getattr(obj, 'im_self', None)) is not None
else:
return False
def get_class_name(instance):
return getattr(getattr(instance, "__class__", None), "__name__", None)
def get_bound_class_name(obj):
my_self = getattr(obj, '__self__', getattr(obj, 'im_self', None))
if my_self is None:
return None
return get_class_name(my_self)
def get_description(obj):
try:
ob_call = obj.__call__
except:
ob_call = None
if isinstance(obj, type) or type(obj).__name__ == 'classobj':
fob = getattr(obj, '__init__', lambda: None)
if not isinstance(fob, (types.FunctionType, types.MethodType)):
fob = obj
elif is_bound_method(ob_call):
fob = ob_call
else:
fob = obj
argspec = ""
fn_name = None
fn_class = None
if isinstance(fob, (types.FunctionType, types.MethodType)):
spec_info = inspect.getfullargspec(fob) if IS_PY3K else inspect.getargspec(fob)
argspec = inspect.formatargspec(*spec_info)
fn_name = getattr(fob, '__name__', None)
if isinstance(obj, type) or type(obj).__name__ == 'classobj':
fn_name = "__init__"
fn_class = getattr(obj, "__name__", "UnknownClass")
elif is_bound_method(obj) or is_bound_method(ob_call):
fn_class = get_bound_class_name(obj) or "UnknownClass"
else:
fn_name = getattr(fob, '__name__', None)
fn_self = getattr(fob, '__self__', None)
if fn_self is not None and not isinstance(fn_self, types.ModuleType):
fn_class = get_class_name(fn_self)
doc_string = get_docstring(ob_call) if is_bound_method(ob_call) else get_docstring(obj)
return create_method_stub(fn_name, fn_class, argspec, doc_string)
def create_method_stub(fn_name, fn_class, argspec, doc_string):
if fn_name and argspec:
doc_string = "" if doc_string is None else doc_string
fn_stub = create_function_stub(fn_name, argspec, doc_string, indent=1 if fn_class else 0)
if fn_class:
expr = fn_class if fn_name == '__init__' else fn_class + '().' + fn_name
return create_class_stub(fn_class, fn_stub) + "\n" + expr
else:
expr = fn_name
return fn_stub + "\n" + expr
elif doc_string:
if fn_name:
restored_signature, _ = signature_from_docstring(doc_string, fn_name)
if restored_signature:
return create_method_stub(fn_name, fn_class, restored_signature, doc_string)
return create_function_stub('unknown', '(*args, **kwargs)', doc_string) + '\nunknown'
else:
return ''
def get_docstring(obj):
if obj is not None:
try:
if IS_JYTHON:
# Jython
doc = obj.__doc__
if doc is not None:
return doc
from _pydev_bundle import _pydev_jy_imports_tipper
is_method, infos = _pydev_jy_imports_tipper.ismethod(obj)
ret = ''
if is_method:
for info in infos:
ret += info.get_as_doc()
return ret
else:
doc = inspect.getdoc(obj)
if doc is not None:
return doc
except:
pass
else:
return ''
try:
# if no attempt succeeded, try to return repr()...
return repr(obj)
except:
try:
# otherwise the class
return str(obj.__class__)
except:
# if all fails, go to an empty string
return ''
def create_class_stub(class_name, contents):
return "class %s(object):\n%s" % (class_name, contents)
def create_function_stub(fn_name, fn_argspec, fn_docstring, indent=0):
def shift_right(string, prefix):
return ''.join(prefix + line for line in string.splitlines(True))
fn_docstring = shift_right(inspect.cleandoc(fn_docstring), " " * (indent + 1))
ret = '''
def %s%s:
"""%s"""
pass
''' % (fn_name, fn_argspec, fn_docstring)
ret = ret[1:] # remove first /n
ret = ret.replace('\t', " ")
if indent:
prefix = " " * indent
ret = shift_right(ret, prefix)
return ret
from collections import namedtuple
from string import ascii_letters, digits
from _pydevd_bundle import pydevd_xml
from _pydevd_bundle.pydevd_constants import IS_PY2
import pydevconsole
if IS_PY2:
import __builtin__
else:
import builtins as __builtin__ # Py3
try:
import java.lang # @UnusedImport
from _pydev_bundle import _pydev_jy_imports_tipper
_pydev_imports_tipper = _pydev_jy_imports_tipper
except ImportError:
IS_JYTHON = False
from _pydev_bundle import _pydev_imports_tipper
dir2 = _pydev_imports_tipper.generate_imports_tip_for_module
#=======================================================================================================================
# _StartsWithFilter
#=======================================================================================================================
class _StartsWithFilter:
'''
Used because we can't create a lambda that'll use an outer scope in jython 2.1
'''
def __init__(self, start_with):
self.start_with = start_with.lower()
def __call__(self, name):
return name.lower().startswith(self.start_with)
#=======================================================================================================================
# Completer
#
# This class was gotten from IPython.completer (dir2 was replaced with the completer already in pydev)
#=======================================================================================================================
class Completer:
def __init__(self, namespace=None, global_namespace=None):
"""Create a new completer for the command line.
Completer([namespace,global_namespace]) -> completer instance.
If unspecified, the default namespace where completions are performed
is __main__ (technically, __main__.__dict__). Namespaces should be
given as dictionaries.
An optional second namespace can be given. This allows the completer
to handle cases where both the local and global scopes need to be
distinguished.
Completer instances should be used as the completion mechanism of
readline via the set_completer() call:
readline.set_completer(Completer(my_namespace).complete)
"""
# Don't bind to namespace quite yet, but flag whether the user wants a
# specific namespace or to use __main__.__dict__. This will allow us
# to bind to __main__.__dict__ at completion time, not now.
if namespace is None:
self.use_main_ns = 1
else:
self.use_main_ns = 0
self.namespace = namespace
# The global namespace, if given, can be bound directly
if global_namespace is None:
self.global_namespace = {}
else:
self.global_namespace = global_namespace
def complete(self, text):
"""Return the next possible completion for 'text'.
This is called successively with state == 0, 1, 2, ... until it
returns None. The completion should begin with 'text'.
"""
if self.use_main_ns:
# In pydev this option should never be used
raise RuntimeError('Namespace must be provided!')
self.namespace = __main__.__dict__ # @UndefinedVariable
if "." in text:
return self.attr_matches(text)
else:
return self.global_matches(text)
def global_matches(self, text):
"""Compute matches when text is a simple name.
Return a list of all keywords, built-in functions and names currently
defined in self.namespace or self.global_namespace that match.
"""
def get_item(obj, attr):
return obj[attr]
a = {}
for dict_with_comps in [__builtin__.__dict__, self.namespace, self.global_namespace]: # @UndefinedVariable
a.update(dict_with_comps)
filter = _StartsWithFilter(text)
return dir2(a, a.keys(), get_item, filter)
def attr_matches(self, text):
"""Compute matches when text contains a dot.
Assuming the text is of the form NAME.NAME....[NAME], and is
evaluatable in self.namespace or self.global_namespace, it will be
evaluated and its attributes (as revealed by dir()) are used as
possible completions. (For class instances, class members are are
also considered.)
WARNING: this can still invoke arbitrary C code, if an object
with a __getattr__ hook is evaluated.
"""
import re
# Another option, seems to work great. Catches things like ''.<tab>
m = re.match(r"(\S+(\.\w+)*)\.(\w*)$", text) # @UndefinedVariable
if not m:
return []
expr, attr = m.group(1, 3)
try:
obj = eval(expr, self.namespace)
except:
try:
obj = eval(expr, self.global_namespace)
except:
return []
filter = _StartsWithFilter(attr)
words = dir2(obj, filter=filter)
return words
def generate_completions(frame, act_tok):
'''
:return list(tuple(method_name, docstring, parameters, completion_type))
method_name: str
docstring: str
parameters: str -- i.e.: "(a, b)"
completion_type is an int
See: _pydev_bundle._pydev_imports_tipper for TYPE_ constants
'''
if frame is None:
return []
# Not using frame.f_globals because of https://sourceforge.net/tracker2/?func=detail&aid=2541355&group_id=85796&atid=577329
# (Names not resolved in generator expression in method)
# See message: http://mail.python.org/pipermail/python-list/2009-January/526522.html
updated_globals = {}
updated_globals.update(frame.f_globals)
updated_globals.update(frame.f_locals) # locals later because it has precedence over the actual globals
if pydevconsole.IPYTHON:
completions = pydevconsole.get_completions(act_tok, act_tok, updated_globals, frame.f_locals)
else:
completer = Completer(updated_globals, None)
# list(tuple(name, descr, parameters, type))
completions = completer.complete(act_tok)
return completions
def generate_completions_as_xml(frame, act_tok):
completions = generate_completions(frame, act_tok)
return completions_to_xml(completions)
def completions_to_xml(completions):
valid_xml = pydevd_xml.make_valid_xml_value
quote = pydevd_xml.quote
msg = ["<xml>"]
for comp in completions:
msg.append('<comp p0="')
msg.append(valid_xml(quote(comp[0], '/>_= \t')))
msg.append('" p1="')
msg.append(valid_xml(quote(comp[1], '/>_= \t')))
msg.append('" p2="')
msg.append(valid_xml(quote(comp[2], '/>_= \t')))
msg.append('" p3="')
msg.append(valid_xml(quote(comp[3], '/>_= \t')))
msg.append('"/>')
msg.append("</xml>")
return ''.join(msg)
identifier_start = ascii_letters + '_'
identifier_part = ascii_letters + '_' + digits
if IS_PY2:
identifier_start = identifier_start.decode('utf-8')
identifier_part = identifier_part.decode('utf-8')
identifier_start = set(identifier_start)
identifier_part = set(identifier_part)
if IS_PY2:
# There's no string.isidentifier() on py2.
def isidentifier(s):
if not s:
return False
if s[0] not in identifier_start:
return False
for c in s[1:]:
if c not in identifier_part:
return False
return True
else:
def isidentifier(s):
return s.isidentifier()
TokenAndQualifier = namedtuple('TokenAndQualifier', 'token, qualifier')
def extract_token_and_qualifier(text, line=0, column=0):
'''
Extracts the token a qualifier from the text given the line/colum
(see test_extract_token_and_qualifier for examples).
:param unicode text:
:param int line: 0-based
:param int column: 0-based
'''
# Note: not using the tokenize module because text should be unicode and
# line/column refer to the unicode text (otherwise we'd have to know
# those ranges after converted to bytes).
if line < 0:
line = 0
if column < 0:
column = 0
if isinstance(text, bytes):
text = text.decode('utf-8')
lines = text.splitlines()
try:
text = lines[line]
except IndexError:
return TokenAndQualifier(u'', u'')
if column >= len(text):
column = len(text)
text = text[:column]
token = u''
qualifier = u''
temp_token = []
for i in range(column - 1, -1, -1):
c = text[i]
if c in identifier_part or isidentifier(c) or c == u'.':
temp_token.append(c)
else:
break
temp_token = u''.join(reversed(temp_token))
if u'.' in temp_token:
temp_token = temp_token.split(u'.')
token = u'.'.join(temp_token[:-1])
qualifier = temp_token[-1]
else:
qualifier = temp_token
return TokenAndQualifier(token, qualifier)
import sys
def __getfilesystemencoding():
'''
Note: there's a copy of this method in interpreterInfo.py
'''
try:
ret = sys.getfilesystemencoding()
if not ret:
raise RuntimeError('Unable to get encoding.')
return ret
except:
try:
#Handle Jython
from java.lang import System # @UnresolvedImport
env = System.getProperty("os.name").lower()
if env.find('win') != -1:
return 'ISO-8859-1' #mbcs does not work on Jython, so, use a (hopefully) suitable replacement
return 'utf-8'
except:
pass
#Only available from 2.3 onwards.
if sys.platform == 'win32':
return 'mbcs'
return 'utf-8'
def getfilesystemencoding():
try:
ret = __getfilesystemencoding()
#Check if the encoding is actually there to be used!
if hasattr('', 'encode'):
''.encode(ret)
if hasattr('', 'decode'):
''.decode(ret)
return ret
except:
return 'utf-8'
#=======================================================================================================================
# getopt code copied since gnu_getopt is not available on jython 2.1
#=======================================================================================================================
class GetoptError(Exception):
opt = ''
msg = ''
def __init__(self, msg, opt=''):
self.msg = msg
self.opt = opt
Exception.__init__(self, msg, opt)
def __str__(self):
return self.msg
def gnu_getopt(args, shortopts, longopts=[]):
"""getopt(args, options[, long_options]) -> opts, args
This function works like getopt(), except that GNU style scanning
mode is used by default. This means that option and non-option
arguments may be intermixed. The getopt() function stops
processing options as soon as a non-option argument is
encountered.
If the first character of the option string is `+', or if the
environment variable POSIXLY_CORRECT is set, then option
processing stops as soon as a non-option argument is encountered.
"""
opts = []
prog_args = []
if type('') == type(longopts):
longopts = [longopts]
else:
longopts = list(longopts)
# Allow options after non-option arguments?
all_options_first = False
if shortopts.startswith('+'):
shortopts = shortopts[1:]
all_options_first = True
while args:
if args[0] == '--':
prog_args += args[1:]
break
if args[0][:2] == '--':
opts, args = do_longs(opts, args[0][2:], longopts, args[1:])
elif args[0][:1] == '-':
opts, args = do_shorts(opts, args[0][1:], shortopts, args[1:])
else:
if all_options_first:
prog_args += args
break
else:
prog_args.append(args[0])
args = args[1:]
return opts, prog_args
def do_longs(opts, opt, longopts, args):
try:
i = opt.index('=')
except ValueError:
optarg = None
else:
opt, optarg = opt[:i], opt[i + 1:]
has_arg, opt = long_has_args(opt, longopts)
if has_arg:
if optarg is None:
if not args:
raise GetoptError('option --%s requires argument' % opt, opt)
optarg, args = args[0], args[1:]
elif optarg:
raise GetoptError('option --%s must not have an argument' % opt, opt)
opts.append(('--' + opt, optarg or ''))
return opts, args
# Return:
# has_arg?
# full option name
def long_has_args(opt, longopts):
possibilities = [o for o in longopts if o.startswith(opt)]
if not possibilities:
raise GetoptError('option --%s not recognized' % opt, opt)
# Is there an exact match?
if opt in possibilities:
return False, opt
elif opt + '=' in possibilities:
return True, opt
# No exact match, so better be unique.
if len(possibilities) > 1:
# XXX since possibilities contains all valid continuations, might be
# nice to work them into the error msg
raise GetoptError('option --%s not a unique prefix' % opt, opt)
assert len(possibilities) == 1
unique_match = possibilities[0]
has_arg = unique_match.endswith('=')
if has_arg:
unique_match = unique_match[:-1]
return has_arg, unique_match
def do_shorts(opts, optstring, shortopts, args):
while optstring != '':
opt, optstring = optstring[0], optstring[1:]
if short_has_arg(opt, shortopts):
if optstring == '':
if not args:
raise GetoptError('option -%s requires argument' % opt,
opt)
optstring, args = args[0], args[1:]
optarg, optstring = optstring, ''
else:
optarg = ''
opts.append(('-' + opt, optarg))
return opts, args
def short_has_arg(opt, shortopts):
for i in range(len(shortopts)):
if opt == shortopts[i] != ':':
return shortopts.startswith(':', i + 1)
raise GetoptError('option -%s not recognized' % opt, opt)
#=======================================================================================================================
# End getopt code
#=======================================================================================================================
import traceback
import sys
try:
import StringIO
except:
import io as StringIO #Python 3.0
class Log:
def __init__(self):
self._contents = []
def add_content(self, *content):
self._contents.append(' '.join(content))
def add_exception(self):
s = StringIO.StringIO()
exc_info = sys.exc_info()
traceback.print_exception(exc_info[0], exc_info[1], exc_info[2], limit=None, file=s)
self._contents.append(s.getvalue())
def get_contents(self):
return '\n'.join(self._contents)
def clear_log(self):
del self._contents[:]
\ No newline at end of file
try:
import inspect
except:
try:
from _pydev_imps import _pydev_inspect as inspect
except:
import traceback;traceback.print_exc() #Ok, no inspect available (search will not work)
try:
import re
except:
try:
import sre as re # for older versions
except:
import traceback;traceback.print_exc() #Ok, no inspect available (search will not work)
from _pydevd_bundle.pydevd_constants import xrange
def do_find(f, mod):
import linecache
if inspect.ismodule(mod):
return f, 0, 0
lines = linecache.getlines(f)
if inspect.isclass(mod):
name = mod.__name__
pat = re.compile(r'^\s*class\s*' + name + r'\b')
for i in xrange(len(lines)):
if pat.match(lines[i]):
return f, i, 0
return f, 0, 0
if inspect.ismethod(mod):
mod = mod.im_func
if inspect.isfunction(mod):
try:
mod = mod.func_code
except AttributeError:
mod = mod.__code__ #python 3k
if inspect.istraceback(mod):
mod = mod.tb_frame
if inspect.isframe(mod):
mod = mod.f_code
if inspect.iscode(mod):
if not hasattr(mod, 'co_filename'):
return None, 0, 0
if not hasattr(mod, 'co_firstlineno'):
return mod.co_filename, 0, 0
lnum = mod.co_firstlineno
pat = re.compile(r'^(\s*def\s)|(.*(?<!\w)lambda(:|\s))|^(\s*@)')
while lnum > 0:
if pat.match(lines[lnum]):
break
lnum -= 1
return f, lnum, 0
raise RuntimeError('Do not know about: ' + f + ' ' + str(mod))
import sys
import traceback
from types import ModuleType
from _pydevd_bundle.pydevd_constants import DebugInfoHolder
if sys.version_info[0] >= 3:
import builtins # py3
else:
import __builtin__ as builtins
class ImportHookManager(ModuleType):
def __init__(self, name, system_import):
ModuleType.__init__(self, name)
self._system_import = system_import
self._modules_to_patch = {}
def add_module_name(self, module_name, activate_function):
self._modules_to_patch[module_name] = activate_function
def do_import(self, name, *args, **kwargs):
module = self._system_import(name, *args, **kwargs)
try:
activate_func = self._modules_to_patch.pop(name, None)
if activate_func:
activate_func() # call activate function
except:
if DebugInfoHolder.DEBUG_TRACE_LEVEL >= 2:
traceback.print_exc()
# Restore normal system importer to reduce performance impact
# of calling this method every time an import statement is invoked
if not self._modules_to_patch:
builtins.__import__ = self._system_import
return module
import_hook_manager = ImportHookManager(__name__ + '.import_hook', builtins.__import__)
builtins.__import__ = import_hook_manager.do_import
sys.modules[import_hook_manager.__name__] = import_hook_manager
from _pydevd_bundle.pydevd_constants import USE_LIB_COPY, izip
try:
try:
if USE_LIB_COPY:
from _pydev_imps._pydev_saved_modules import xmlrpclib
else:
import xmlrpclib
except ImportError:
import xmlrpc.client as xmlrpclib
except ImportError:
from _pydev_imps import _pydev_xmlrpclib as xmlrpclib
try:
try:
if USE_LIB_COPY:
from _pydev_imps._pydev_saved_modules import _pydev_SimpleXMLRPCServer
from _pydev_SimpleXMLRPCServer import SimpleXMLRPCServer
else:
from SimpleXMLRPCServer import SimpleXMLRPCServer
except ImportError:
from xmlrpc.server import SimpleXMLRPCServer
except ImportError:
from _pydev_imps._pydev_SimpleXMLRPCServer import SimpleXMLRPCServer
try:
from StringIO import StringIO
except ImportError:
from io import StringIO
try:
execfile = execfile # Not in Py3k
except NameError:
from _pydev_imps._pydev_execfile import execfile
try:
if USE_LIB_COPY:
from _pydev_imps._pydev_saved_modules import _queue
else:
import Queue as _queue
except:
import queue as _queue # @UnresolvedImport
try:
from _pydevd_bundle.pydevd_exec import Exec
except:
from _pydevd_bundle.pydevd_exec2 import Exec
try:
from urllib import quote, quote_plus, unquote_plus
except:
from urllib.parse import quote, quote_plus, unquote_plus # @UnresolvedImport
import sys
from _pydev_bundle.pydev_console_utils import BaseInterpreterInterface
import os
import traceback
# Uncomment to force PyDev standard shell.
# raise ImportError()
from _pydev_bundle.pydev_ipython_console_011 import get_pydev_frontend
from _pydevd_bundle.pydevd_constants import dict_iter_items
#=======================================================================================================================
# InterpreterInterface
#=======================================================================================================================
class InterpreterInterface(BaseInterpreterInterface):
'''
The methods in this class should be registered in the xml-rpc server.
'''
def __init__(self, host, client_port, main_thread, show_banner=True, connect_status_queue=None):
BaseInterpreterInterface.__init__(self, main_thread, connect_status_queue)
self.client_port = client_port
self.host = host
self.interpreter = get_pydev_frontend(host, client_port)
self._input_error_printed = False
self.notification_succeeded = False
self.notification_tries = 0
self.notification_max_tries = 3
self.show_banner = show_banner
self.notify_about_magic()
def get_greeting_msg(self):
if self.show_banner:
self.interpreter.show_banner()
return self.interpreter.get_greeting_msg()
def do_add_exec(self, code_fragment):
self.notify_about_magic()
if code_fragment.text.rstrip().endswith('??'):
print('IPython-->')
try:
res = bool(self.interpreter.add_exec(code_fragment.text))
finally:
if code_fragment.text.rstrip().endswith('??'):
print('<--IPython')
return res
def get_namespace(self):
return self.interpreter.get_namespace()
def getCompletions(self, text, act_tok):
return self.interpreter.getCompletions(text, act_tok)
def close(self):
sys.exit(0)
def notify_about_magic(self):
if not self.notification_succeeded:
self.notification_tries+=1
if self.notification_tries>self.notification_max_tries:
return
completions = self.getCompletions("%", "%")
magic_commands = [x[0] for x in completions]
server = self.get_server()
if server is not None:
try:
server.NotifyAboutMagic(magic_commands, self.interpreter.is_automagic())
self.notification_succeeded = True
except :
self.notification_succeeded = False
def get_ipython_hidden_vars_dict(self):
try:
if hasattr(self.interpreter, 'ipython') and hasattr(self.interpreter.ipython, 'user_ns_hidden'):
user_ns_hidden = self.interpreter.ipython.user_ns_hidden
if isinstance(user_ns_hidden, dict):
# Since IPython 2 dict `user_ns_hidden` contains hidden variables and values
user_hidden_dict = user_ns_hidden.copy()
else:
# In IPython 1.x `user_ns_hidden` used to be a set with names of hidden variables
user_hidden_dict = dict([(key, val) for key, val in dict_iter_items(self.interpreter.ipython.user_ns)
if key in user_ns_hidden])
# while `_`, `__` and `___` were not initialized, they are not presented in `user_ns_hidden`
user_hidden_dict.setdefault('_', '')
user_hidden_dict.setdefault('__', '')
user_hidden_dict.setdefault('___', '')
return user_hidden_dict
except:
# Getting IPython variables shouldn't break loading frame variables
traceback.print_exc()
from _pydev_imps._pydev_saved_modules import threading
# Hack for https://www.brainwy.com/tracker/PyDev/363 (i.e.: calling isAlive() can throw AssertionError under some
# circumstances).
# It is required to debug threads started by start_new_thread in Python 3.4
_temp = threading.Thread()
if hasattr(_temp, '_is_stopped'): # Python 3.x has this
def is_thread_alive(t):
return not t._is_stopped
elif hasattr(_temp, '_Thread__stopped'): # Python 2.x has this
def is_thread_alive(t):
return not t._Thread__stopped
else:
# Jython wraps a native java thread and thus only obeys the public API.
def is_thread_alive(t):
return t.isAlive()
del _temp
from _pydevd_bundle import pydevd_constants
from _pydev_imps._pydev_saved_modules import socket
import sys
IS_JYTHON = sys.platform.find('java') != -1
_cache = None
def get_localhost():
'''
Should return 127.0.0.1 in ipv4 and ::1 in ipv6
localhost is not used because on windows vista/windows 7, there can be issues where the resolving doesn't work
properly and takes a lot of time (had this issue on the pyunit server).
Using the IP directly solves the problem.
'''
# TODO: Needs better investigation!
global _cache
if _cache is None:
try:
for addr_info in socket.getaddrinfo("localhost", 80, 0, 0, socket.SOL_TCP):
config = addr_info[4]
if config[0] == '127.0.0.1':
_cache = '127.0.0.1'
return _cache
except:
# Ok, some versions of Python don't have getaddrinfo or SOL_TCP... Just consider it 127.0.0.1 in this case.
_cache = '127.0.0.1'
else:
_cache = 'localhost'
return _cache
def get_socket_names(n_sockets, close=False):
socket_names = []
sockets = []
for _ in range(n_sockets):
if IS_JYTHON:
# Although the option which would be pure java *should* work for Jython, the socket being returned is still 0
# (i.e.: it doesn't give the local port bound, only the original port, which was 0).
from java.net import ServerSocket
sock = ServerSocket(0)
socket_name = get_localhost(), sock.getLocalPort()
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((get_localhost(), 0))
socket_name = sock.getsockname()
sockets.append(sock)
socket_names.append(socket_name)
if close:
for s in sockets:
s.close()
return socket_names
def get_socket_name(close=False):
return get_socket_names(1, close)[0]
if __name__ == '__main__':
print(get_socket_name())
\ No newline at end of file
def overrides(method):
'''
Meant to be used as
class B:
@overrides(A.m1)
def m1(self):
pass
'''
def wrapper(func):
if func.__name__ != method.__name__:
msg = "Wrong @override: %r expected, but overwriting %r."
msg = msg % (func.__name__, method.__name__)
raise AssertionError(msg)
if func.__doc__ is None:
func.__doc__ = method.__doc__
return func
return wrapper
def implements(method):
def wrapper(func):
if func.__name__ != method.__name__:
msg = "Wrong @implements: %r expected, but implementing %r."
msg = msg % (func.__name__, method.__name__)
raise AssertionError(msg)
if func.__doc__ is None:
func.__doc__ = method.__doc__
return func
return wrapper
\ No newline at end of file
import sys
def versionok_for_gui():
''' Return True if running Python is suitable for GUI Event Integration and deeper IPython integration '''
# We require Python 2.6+ ...
if sys.hexversion < 0x02060000:
return False
# Or Python 3.2+
if sys.hexversion >= 0x03000000 and sys.hexversion < 0x03020000:
return False
# Not supported under Jython nor IronPython
if sys.platform.startswith("java") or sys.platform.startswith('cli'):
return False
return True
#We must redefine it in Py3k if it's not already there
def execfile(file, glob=None, loc=None):
if glob is None:
import sys
glob = sys._getframe().f_back.f_globals
if loc is None:
loc = glob
# It seems that the best way is using tokenize.open(): http://code.activestate.com/lists/python-dev/131251/
# (but tokenize.open() is only available for python 3.2)
import tokenize
if hasattr(tokenize, 'open'):
# version 3.2
stream = tokenize.open(file) # @UndefinedVariable
else:
# version 3.0 or 3.1
detect_encoding = tokenize.detect_encoding(open(file, mode="rb" ).readline)
stream = open(file, encoding=detect_encoding[0])
try:
contents = stream.read()
finally:
stream.close()
#execute the script (note: it's important to compile first to have the filename set in debug mode)
exec(compile(contents+"\n", file, 'exec'), glob, loc)
\ No newline at end of file
import sys
IS_PY2 = sys.version_info < (3,)
import threading
import time
import socket
import select
if IS_PY2:
import thread
import Queue as _queue
import xmlrpclib
import SimpleXMLRPCServer as _pydev_SimpleXMLRPCServer
import BaseHTTPServer
else:
import _thread as thread
import queue as _queue
import xmlrpc.client as xmlrpclib
import xmlrpc.server as _pydev_SimpleXMLRPCServer
import http.server as BaseHTTPServer
\ No newline at end of file
'''
This module holds the customization settings for the debugger.
'''
from _pydevd_bundle.pydevd_constants import QUOTED_LINE_PROTOCOL
class PydevdCustomization(object):
DEFAULT_PROTOCOL = QUOTED_LINE_PROTOCOL
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment