本文整理汇总了Python中test.support.import_module函数的典型用法代码示例。如果您正苦于以下问题:Python import_module函数的具体用法?Python import_module怎么用?Python import_module使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了import_module函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_finalize_runnning_thread
def test_finalize_runnning_thread(self):
# Issue 1402: the PyGILState_Ensure / _Release functions may be called
# very late on python exit: on deallocation of a running thread for
# example.
import_module("ctypes")
rc, out, err = assert_python_failure(
"-c",
"""if 1:
import ctypes, sys, time, _thread
# This lock is used as a simple event variable.
ready = _thread.allocate_lock()
ready.acquire()
# Module globals are cleared before __del__ is run
# So we save the functions in class dict
class C:
ensure = ctypes.pythonapi.PyGILState_Ensure
release = ctypes.pythonapi.PyGILState_Release
def __del__(self):
state = self.ensure()
self.release(state)
def waitingThread():
x = C()
ready.release()
time.sleep(100)
_thread.start_new_thread(waitingThread, ())
ready.acquire() # Be sure the other thread is waiting.
sys.exit(42)
""",
)
self.assertEqual(rc, 42)
开发者ID:pykomke,项目名称:Kurz_Python_KE,代码行数:35,代码来源:test_threading.py
示例2: test_issue13210
def test_issue13210(self):
# invoking "continue" on a non-main thread triggered an exception
# inside signal.signal
# raises SkipTest if python was built without threads
support.import_module("threading")
with open(support.TESTFN, "wb") as f:
f.write(
textwrap.dedent(
"""
import threading
import pdb
def start_pdb():
pdb.Pdb().set_trace()
x = 1
y = 1
t = threading.Thread(target=start_pdb)
t.start()"""
).encode("ascii")
)
cmd = [sys.executable, "-u", support.TESTFN]
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
self.addCleanup(proc.stdout.close)
stdout, stderr = proc.communicate(b"cont\n")
self.assertNotIn("Error", stdout.decode(), "Got an error running test script under PDB")
开发者ID:pombredanne,项目名称:pyparallel,代码行数:28,代码来源:test_pdb.py
示例3: test_refcount_errors
def test_refcount_errors(self):
self.preclean()
# Verify the "handling" of objects with broken refcounts
# Skip the test if ctypes is not available
import_module("ctypes")
import subprocess
code = textwrap.dedent('''
from test.support import gc_collect, SuppressCrashReport
a = [1, 2, 3]
b = [a]
# Avoid coredump when Py_FatalError() calls abort()
SuppressCrashReport().__enter__()
# Simulate the refcount of "a" being too low (compared to the
# references held on it by live data), but keeping it above zero
# (to avoid deallocating it):
import ctypes
ctypes.pythonapi.Py_DecRef(ctypes.py_object(a))
# The garbage collector should now have a fatal error
# when it reaches the broken object
gc_collect()
''')
p = subprocess.Popen([sys.executable, "-c", code],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
p.stdout.close()
p.stderr.close()
# Verify that stderr has a useful error message:
self.assertRegex(stderr,
br'gcmodule\.c:[0-9]+: gc_decref: Assertion "gc_get_refs\(g\) > 0" failed.')
self.assertRegex(stderr,
br'refcount is too small')
self.assertRegex(stderr,
br'object : \[1, 2, 3\]')
self.assertRegex(stderr,
br'type : list')
self.assertRegex(stderr,
br'refcount: 1')
# "address : 0x7fb5062efc18"
# "address : 7FB5062EFC18"
self.assertRegex(stderr,
br'address : [0-9a-fA-Fx]+')
开发者ID:Victor-Savu,项目名称:cpython,代码行数:48,代码来源:test_gc.py
示例4: test_untested_modules_can_be_imported
def test_untested_modules_can_be_imported(self):
untested = ('bdb', 'encodings', 'formatter',
'nturl2path', 'tabnanny')
with support.check_warnings(quiet=True):
for name in untested:
try:
support.import_module('test.test_{}'.format(name))
except unittest.SkipTest:
importlib.import_module(name)
else:
self.fail('{} has tests even though test_sundry claims '
'otherwise'.format(name))
# distutils not available on UWP
if sys.platform != 'uwp':
import distutils.bcppcompiler
import distutils.ccompiler
import distutils.cygwinccompiler
import distutils.filelist
import distutils.text_file
import distutils.unixccompiler
import distutils.command.bdist_dumb
if sys.platform.startswith('win'):
import distutils.command.bdist_msi
import distutils.command.bdist
import distutils.command.bdist_rpm
import distutils.command.bdist_wininst
import distutils.command.build_clib
import distutils.command.build_ext
import distutils.command.build
import distutils.command.clean
import distutils.command.config
import distutils.command.install_data
import distutils.command.install_egg_info
import distutils.command.install_headers
import distutils.command.install_lib
import distutils.command.register
import distutils.command.sdist
import distutils.command.upload
import html.entities
try:
import tty # Not available on Windows
except ImportError:
if support.verbose:
print("skipping tty")
开发者ID:chriszirkel,项目名称:tempcheck-pi,代码行数:48,代码来源:test_sundry.py
示例5: test_coverage
def test_coverage(coverdir):
trace = support.import_module("trace")
tracer = trace.Trace(ignoredirs=[sys.prefix, sys.exec_prefix], trace=0, count=1)
tracer.run("reload(cmd);test_main()")
r = tracer.results()
print("Writing coverage results...")
r.write_results(show_missing=True, summary=True, coverdir=coverdir)
开发者ID:hycxa,项目名称:kbengine,代码行数:7,代码来源:test_cmd.py
示例6: test_windows_message
def test_windows_message(self):
"""Should fill in unknown error code in Windows error message"""
ctypes = import_module('ctypes')
# this error code has no message, Python formats it as hexadecimal
code = 3765269347
with self.assertRaisesRegex(OSError, 'Windows Error 0x%x' % code):
ctypes.pythonapi.PyErr_SetFromWindowsErr(code)
开发者ID:Bibeknam,项目名称:cpython,代码行数:7,代码来源:test_exceptions.py
示例7: setUp
def setUp(self):
#print("DBG:CB.setUp()")
# fire up the test subject
self.expty = import_module('editline.tests.expty')
# should use sys.ps1 for the prompt, but it seems to only be define
# when the system actually IS interactive... ?
self.tool = self.expty.InteractivePTY(sys.executable, '>>> ', 'exit()')
cruft = self.tool.first_prompt()
开发者ID:mark-nicholson,项目名称:python-editline,代码行数:8,代码来源:test_lineeditor.py
示例8: test_coverage
def test_coverage(coverdir):
trace = support.import_module('trace')
tracer=trace.Trace(ignoredirs=[sys.base_prefix, sys.base_exec_prefix,],
trace=0, count=1)
tracer.run('import importlib; importlib.reload(cmd); test_main()')
r=tracer.results()
print("Writing coverage results...")
r.write_results(show_missing=True, summary=True, coverdir=coverdir)
开发者ID:0jpq0,项目名称:kbengine,代码行数:8,代码来源:test_cmd.py
示例9: test_multiprocessing_exceptions
def test_multiprocessing_exceptions(self):
module = support.import_module('multiprocessing.context')
for name, exc in get_exceptions(module):
with self.subTest(name):
self.assertEqual(reverse_mapping('multiprocessing.context', name),
('multiprocessing', name))
self.assertEqual(mapping('multiprocessing', name),
('multiprocessing.context', name))
开发者ID:LPRD,项目名称:build_tools,代码行数:8,代码来源:test_pickle.py
示例10: test_200_terminal_size
def test_200_terminal_size(self):
rows = int(subprocess.check_output(['tput', 'lines']).decode())
columns = int(subprocess.check_output(['tput', 'cols']).decode())
self.assertNotEqual(columns, 0)
editline = import_module('editline.editline')
el = editline.EditLine("testcase",
sys.stdin, sys.stdout, sys.stderr)
el_cols = el.gettc('co')
self.assertEqual(el_cols, columns)
开发者ID:mark-nicholson,项目名称:python-editline,代码行数:11,代码来源:test_editline.py
示例11: test_triplet_in_ext_suffix
def test_triplet_in_ext_suffix(self):
ctypes = import_module('ctypes')
import platform, re
machine = platform.machine()
suffix = sysconfig.get_config_var('EXT_SUFFIX')
if re.match('(aarch64|arm|mips|ppc|powerpc|s390|sparc)', machine):
self.assertTrue('linux' in suffix, suffix)
if re.match('(i[3-6]86|x86_64)$', machine):
if ctypes.sizeof(ctypes.c_char_p()) == 4:
self.assertTrue(suffix.endswith('i386-linux-gnu.so') or
suffix.endswith('x86_64-linux-gnux32.so'),
suffix)
else: # 8 byte pointer size
self.assertTrue(suffix.endswith('x86_64-linux-gnu.so'), suffix)
开发者ID:Eyepea,项目名称:cpython,代码行数:14,代码来源:test_sysconfig.py
示例12: test_interrupted_write
def test_interrupted_write(self):
# BaseHandler._write() and _flush() have to write all data, even if
# it takes multiple send() calls. Test this by interrupting a send()
# call with a Unix signal.
threading = support.import_module("threading")
pthread_kill = support.get_attribute(signal, "pthread_kill")
def app(environ, start_response):
start_response("200 OK", [])
return [b'\0' * support.SOCK_MAX_SIZE]
class WsgiHandler(NoLogRequestHandler, WSGIRequestHandler):
pass
server = make_server(support.HOST, 0, app, handler_class=WsgiHandler)
self.addCleanup(server.server_close)
interrupted = threading.Event()
def signal_handler(signum, frame):
interrupted.set()
original = signal.signal(signal.SIGUSR1, signal_handler)
self.addCleanup(signal.signal, signal.SIGUSR1, original)
received = None
main_thread = threading.get_ident()
def run_client():
http = HTTPConnection(*server.server_address)
http.request("GET", "/")
with http.getresponse() as response:
response.read(100)
# The main thread should now be blocking in a send() system
# call. But in theory, it could get interrupted by other
# signals, and then retried. So keep sending the signal in a
# loop, in case an earlier signal happens to be delivered at
# an inconvenient moment.
while True:
pthread_kill(main_thread, signal.SIGUSR1)
if interrupted.wait(timeout=float(1)):
break
nonlocal received
received = len(response.read())
http.close()
background = threading.Thread(target=run_client)
background.start()
server.handle_request()
background.join()
self.assertEqual(received, support.SOCK_MAX_SIZE - 100)
开发者ID:3lnc,项目名称:cpython,代码行数:49,代码来源:test_wsgiref.py
示例13: run_pty
def run_pty(script, input=b"dummy input\r", env=None):
pty = import_module('pty')
output = bytearray()
[master, slave] = pty.openpty()
args = (sys.executable, '-c', script)
proc = subprocess.Popen(args, stdin=slave, stdout=slave, stderr=slave, env=env)
os.close(slave)
with ExitStack() as cleanup:
cleanup.enter_context(proc)
def terminate(proc):
try:
proc.terminate()
except ProcessLookupError:
# Workaround for Open/Net BSD bug (Issue 16762)
pass
cleanup.callback(terminate, proc)
cleanup.callback(os.close, master)
# Avoid using DefaultSelector and PollSelector. Kqueue() does not
# work with pseudo-terminals on OS X < 10.9 (Issue 20365) and Open
# BSD (Issue 20667). Poll() does not work with OS X 10.6 or 10.4
# either (Issue 20472). Hopefully the file descriptor is low enough
# to use with select().
sel = cleanup.enter_context(selectors.SelectSelector())
sel.register(master, selectors.EVENT_READ | selectors.EVENT_WRITE)
os.set_blocking(master, False)
while True:
for [_, events] in sel.select():
if events & selectors.EVENT_READ:
try:
chunk = os.read(master, 0x10000)
except OSError as err:
# Linux raises EIO when slave is closed (Issue 5380)
if err.errno != EIO:
raise
chunk = b""
if not chunk:
return output
output.extend(chunk)
if events & selectors.EVENT_WRITE:
try:
input = input[os.write(master, input):]
except OSError as err:
# Apparently EIO means the slave was closed
if err.errno != EIO:
raise
input = b"" # Stop writing
if not input:
sel.modify(master, selectors.EVENT_READ)
开发者ID:1st1,项目名称:cpython,代码行数:48,代码来源:test_readline.py
示例14: dash_R_cleanup
def dash_R_cleanup(fs, ps, pic, abcs):
import gc, copyreg
import _strptime, linecache
dircache = support.import_module('dircache', deprecated=True)
import urllib.parse, urllib.request, urllib.parse, urllib.error, urllib.request, urllib.error, urllib.parse, mimetypes, doctest
import struct, filecmp
from distutils.dir_util import _path_created
# Clear the warnings registry, so they can be displayed again
for mod in list(sys.modules.values()):
if hasattr(mod, '__warningregistry__'):
del mod.__warningregistry__
# Restore some original values.
warnings.filters[:] = fs
copyreg.dispatch_table.clear()
copyreg.dispatch_table.update(ps)
sys.path_importer_cache.clear()
sys.path_importer_cache.update(pic)
# clear type cache
sys._clear_type_cache()
# Clear ABC registries, restoring previously saved ABC registries.
for abc, registry in list(abcs.items()):
abc._abc_registry = registry.copy()
abc._abc_cache.clear()
abc._abc_negative_cache.clear()
# Clear assorted module caches.
_path_created.clear()
re.purge()
_strptime._regex_cache.clear()
urllib.parse.clear_cache()
urllib.request.urlcleanup()
urllib.request.install_opener(None)
dircache.reset()
linecache.clearcache()
mimetypes._default_mime_types()
filecmp._cache.clear()
struct._clearcache()
doctest.master = None
# Collect cyclic trash.
gc.collect()
开发者ID:isaiah,项目名称:jython3,代码行数:45,代码来源:regrtest.py
示例15: test_triplet_in_ext_suffix
def test_triplet_in_ext_suffix(self):
ctypes = import_module("ctypes")
import platform, re
machine = platform.machine()
suffix = sysconfig.get_config_var("EXT_SUFFIX")
if re.match("(aarch64|arm|mips|ppc|powerpc|s390|sparc)", machine):
self.assertTrue("linux" in suffix, suffix)
if re.match("(i[3-6]86|x86_64)$", machine):
if ctypes.sizeof(ctypes.c_char_p()) == 4:
self.assertTrue(
suffix.endswith("i386-linux-gnu.so")
or suffix.endswith("i686-linux-android.so")
or suffix.endswith("x86_64-linux-gnux32.so"),
suffix,
)
else: # 8 byte pointer size
self.assertTrue(suffix.endswith("x86_64-linux-gnu.so"), suffix)
开发者ID:francois-wellenreiter,项目名称:cpython,代码行数:18,代码来源:test_sysconfig.py
示例16: test_expanduser_pwd
def test_expanduser_pwd(self):
pwd = support.import_module('pwd')
self.assertIsInstance(posixpath.expanduser("~/"), str)
self.assertIsInstance(posixpath.expanduser(b"~/"), bytes)
# if home directory == root directory, this test makes no sense
if posixpath.expanduser("~") != '/':
self.assertEqual(
posixpath.expanduser("~") + "/",
posixpath.expanduser("~/")
)
self.assertEqual(
posixpath.expanduser(b"~") + b"/",
posixpath.expanduser(b"~/")
)
self.assertIsInstance(posixpath.expanduser("~root/"), str)
self.assertIsInstance(posixpath.expanduser("~foo/"), str)
self.assertIsInstance(posixpath.expanduser(b"~root/"), bytes)
self.assertIsInstance(posixpath.expanduser(b"~foo/"), bytes)
with support.EnvironmentVarGuard() as env:
# expanduser should fall back to using the password database
del env['HOME']
home = pwd.getpwuid(os.getuid()).pw_dir
# $HOME can end with a trailing /, so strip it (see #17809)
home = home.rstrip("/") or '/'
self.assertEqual(posixpath.expanduser("~"), home)
# bpo-10496: If the HOME environment variable is not set and the
# user (current identifier or name in the path) doesn't exist in
# the password database (pwd.getuid() or pwd.getpwnam() fail),
# expanduser() must return the path unchanged.
with mock.patch.object(pwd, 'getpwuid', side_effect=KeyError), \
mock.patch.object(pwd, 'getpwnam', side_effect=KeyError):
for path in ('~', '~/.local', '~vstinner/'):
self.assertEqual(posixpath.expanduser(path), path)
开发者ID:Eyepea,项目名称:cpython,代码行数:38,代码来源:test_posixpath.py
示例17: test_asyncio_1
def test_asyncio_1(self):
# asyncio cannot be imported when Python is compiled without thread
# support
asyncio = support.import_module('asyncio')
class MyException(Exception):
pass
buffer = []
class CM:
async def __aenter__(self):
buffer.append(1)
await asyncio.sleep(0.01)
buffer.append(2)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await asyncio.sleep(0.01)
buffer.append(exc_type.__name__)
async def f():
async with CM() as c:
await asyncio.sleep(0.01)
raise MyException
buffer.append('unreachable')
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(f())
except MyException:
pass
finally:
loop.close()
asyncio.set_event_loop(None)
self.assertEqual(buffer, [1, 2, 'MyException'])
开发者ID:ARK4579,项目名称:cpython,代码行数:38,代码来源:test_coroutines.py
示例18: test_PyThreadState_SetAsyncExc
def test_PyThreadState_SetAsyncExc(self):
ctypes = import_module("ctypes")
set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc
class AsyncExc(Exception):
pass
exception = ctypes.py_object(AsyncExc)
# First check it works when setting the exception from the same thread.
tid = threading.get_ident()
try:
result = set_async_exc(ctypes.c_long(tid), exception)
# The exception is async, so we might have to keep the VM busy until
# it notices.
while True:
pass
except AsyncExc:
pass
else:
# This code is unreachable but it reflects the intent. If we wanted
# to be smarter the above loop wouldn't be infinite.
self.fail("AsyncExc not raised")
try:
self.assertEqual(result, 1) # one thread state modified
except UnboundLocalError:
# The exception was raised too quickly for us to get the result.
pass
# `worker_started` is set by the thread when it's inside a try/except
# block waiting to catch the asynchronously set AsyncExc exception.
# `worker_saw_exception` is set by the thread upon catching that
# exception.
worker_started = threading.Event()
worker_saw_exception = threading.Event()
class Worker(threading.Thread):
def run(self):
self.id = threading.get_ident()
self.finished = False
try:
while True:
worker_started.set()
time.sleep(0.1)
except AsyncExc:
self.finished = True
worker_saw_exception.set()
t = Worker()
t.daemon = True # so if this fails, we don't hang Python at shutdown
t.start()
if verbose:
print(" started worker thread")
# Try a thread id that doesn't make sense.
if verbose:
print(" trying nonsensical thread id")
result = set_async_exc(ctypes.c_long(-1), exception)
self.assertEqual(result, 0) # no thread states modified
# Now raise an exception in the worker thread.
if verbose:
print(" waiting for worker thread to get started")
ret = worker_started.wait()
self.assertTrue(ret)
if verbose:
print(" verifying worker hasn't exited")
self.assertTrue(not t.finished)
if verbose:
print(" attempting to raise asynch exception in worker")
result = set_async_exc(ctypes.c_long(t.id), exception)
self.assertEqual(result, 1) # one thread state modified
if verbose:
print(" waiting for worker to say it caught the exception")
worker_saw_exception.wait(timeout=10)
self.assertTrue(t.finished)
if verbose:
print(" all OK -- joining worker")
if t.finished:
t.join()
开发者ID:pykomke,项目名称:Kurz_Python_KE,代码行数:83,代码来源:test_threading.py
示例19: import_module
"""Test program for the fcntl C module.
OS/2+EMX doesn't support the file locking operations.
"""
import os
import struct
import sys
import unittest
from test.support import verbose, TESTFN, unlink, run_unittest, import_module
# Skip test if no fnctl module.
fcntl = import_module('fcntl')
# TODO - Write tests for flock() and lockf().
def get_lockdata():
try:
os.O_LARGEFILE
except AttributeError:
start_len = "ll"
else:
start_len = "qq"
if sys.platform in ('netbsd1', 'netbsd2', 'netbsd3',
'Darwin1.2', 'darwin',
'freebsd2', 'freebsd3', 'freebsd4', 'freebsd5',
'freebsd6', 'freebsd7', 'freebsd8',
'bsdos2', 'bsdos3', 'bsdos4',
'openbsd', 'openbsd2', 'openbsd3', 'openbsd4'):
开发者ID:ChowZenki,项目名称:kbengine,代码行数:31,代码来源:test_fcntl.py
示例20: PwdTest
import sys
import unittest
from test import support
pwd = support.import_module('pwd')
class PwdTest(unittest.TestCase):
def test_values(self):
entries = pwd.getpwall()
for e in entries:
self.assertEqual(len(e), 7)
self.assertEqual(e[0], e.pw_name)
self.assertIsInstance(e.pw_name, str)
self.assertEqual(e[1], e.pw_passwd)
self.assertIsInstance(e.pw_passwd, str)
self.assertEqual(e[2], e.pw_uid)
self.assertIsInstance(e.pw_uid, int)
self.assertEqual(e[3], e.pw_gid)
self.assertIsInstance(e.pw_gid, int)
self.assertEqual(e[4], e.pw_gecos)
self.assertIsInstance(e.pw_gecos, str)
self.assertEqual(e[5], e.pw_dir)
self.assertIsInstance(e.pw_dir, str)
self.assertEqual(e[6], e.pw_shell)
self.assertIsInstance(e.pw_shell, str)
# The following won't work, because of duplicate entries
# for one uid
# self.assertEqual(pwd.getpwuid(e.pw_uid), e)
开发者ID:10sr,项目名称:cpython,代码行数:31,代码来源:test_pwd.py
注:本文中的test.support.import_module函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论