本文整理汇总了Python中twitter.common.dirutil.safe_mkdtemp函数的典型用法代码示例。如果您正苦于以下问题:Python safe_mkdtemp函数的具体用法?Python safe_mkdtemp怎么用?Python safe_mkdtemp使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了safe_mkdtemp函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: setUpClass
def setUpClass(cls):
cls.origin = safe_mkdtemp()
with pushd(cls.origin):
subprocess.check_call(['git', 'init', '--bare'])
cls.gitdir = safe_mkdtemp()
cls.worktree = safe_mkdtemp()
cls.readme_file = os.path.join(cls.worktree, 'README')
with environment_as(GIT_DIR=cls.gitdir, GIT_WORK_TREE=cls.worktree):
cls.init_repo('depot', cls.origin)
touch(cls.readme_file)
subprocess.check_call(['git', 'add', 'README'])
subprocess.check_call(['git', 'commit', '-am', 'initial commit with decode -> \x81b'])
subprocess.check_call(['git', 'tag', 'first'])
subprocess.check_call(['git', 'push', '--tags', 'depot', 'master'])
subprocess.check_call(['git', 'branch', '--set-upstream', 'master', 'depot/master'])
with safe_open(cls.readme_file, 'w') as readme:
readme.write('Hello World.')
subprocess.check_call(['git', 'commit', '-am', 'Update README.'])
cls.clone2 = safe_mkdtemp()
with pushd(cls.clone2):
cls.init_repo('origin', cls.origin)
subprocess.check_call(['git', 'pull', '--tags', 'origin', 'master:master'])
with safe_open(os.path.realpath('README'), 'a') as readme:
readme.write('--')
subprocess.check_call(['git', 'commit', '-am', 'Update README 2.'])
subprocess.check_call(['git', 'push', '--tags', 'origin', 'master'])
cls.git = Git(gitdir=cls.gitdir, worktree=cls.worktree)
开发者ID:aoen,项目名称:pants,代码行数:35,代码来源:test_git.py
示例2: set_up_mocks
def set_up_mocks(self, su=None):
self.mox.StubOutWithMock(dirutil, 'safe_mkdtemp')
dirutil.safe_mkdtemp().AndReturn('/tmp/test')
self.mox.StubOutWithMock(log, 'init')
log.init('/tmp/test/current_run').AndReturn(0)
self.mox.StubOutWithMock(CommandUtil, 'execute_and_get_output')
stub = CommandUtil.execute_and_get_output(['git','remote', '-v'])
stub.AndReturn((0, dedent("""origin https://git.twitter.biz/science (fetch)
origin https://git.twitter.biz/science (push)""")))
stub2 = CommandUtil.execute_and_get_output(['git','rev-parse', '--abbrev-ref', 'HEAD'])
stub2.AndReturn((0,"test_br"))
self.mox.StubOutWithMock(psutil, 'cpu_percent')
psutil.cpu_percent(interval=1).AndReturn(1.0)
self.mox.StubOutWithMock(psutil, 'network_io_counters')
psutil.network_io_counters().AndReturn("1000,10000,1000")
self.mox.StubOutWithMock(psutil, 'NUM_CPUS')
psutil.NUM_CPUS = 5
self.mox.StubOutWithMock(socket, 'gethostname')
socket.gethostname().AndReturn("localhost")
self.mox.StubOutWithMock(socket, 'gethostbyname')
socket.gethostbyname("localhost").AndReturn("localhost")
self.mox.StubOutWithMock(sys, 'exit')
sys.exit(0).AndReturn(0)
self.mox.ReplayAll()
开发者ID:benhuang-zh,项目名称:commons,代码行数:28,代码来源:buildtimestats_test.py
示例3: __init__
def __init__(
self,
pex_location,
checkpoint_root,
artifact_dir=None,
task_runner_class=ThermosTaskRunner,
max_wait=Amount(1, Time.MINUTES),
preemption_wait=Amount(1, Time.MINUTES),
poll_interval=Amount(500, Time.MILLISECONDS),
clock=time,
process_logger_mode=None,
rotate_log_size_mb=None,
rotate_log_backups=None,
):
self._artifact_dir = artifact_dir or safe_mkdtemp()
self._checkpoint_root = checkpoint_root
self._clock = clock
self._max_wait = max_wait
self._pex_location = pex_location
self._poll_interval = poll_interval
self._preemption_wait = preemption_wait
self._task_runner_class = task_runner_class
self._process_logger_mode = process_logger_mode
self._rotate_log_size_mb = rotate_log_size_mb
self._rotate_log_backups = rotate_log_backups
开发者ID:mohammadsamir,项目名称:aurora,代码行数:25,代码来源:thermos_task_runner.py
示例4: make_distribution
def make_distribution(name='my_project', zipped=False, zip_safe=True):
interp = {'project_name': name}
if zip_safe:
interp['content'] = dedent('''
def do_something():
print('hello world!')
''')
else:
interp['content'] = dedent('''
if __file__ == 'derp.py':
print('i am an idiot')
''')
with temporary_content(PROJECT_CONTENT, interp=interp) as td:
installer = Installer(td)
distribution = installer.distribution()
distiller = Distiller(distribution, debug=True)
dist_location = distiller.distill(into=safe_mkdtemp())
if zipped:
yield DistributionHelper.distribution_from_path(dist_location)
else:
with temporary_dir() as td:
extract_path = os.path.join(td, os.path.basename(dist_location))
with contextlib.closing(zipfile.ZipFile(dist_location)) as zf:
zf.extractall(extract_path)
yield DistributionHelper.distribution_from_path(extract_path)
开发者ID:CodeWarltz,项目名称:commons,代码行数:25,代码来源:test_common.py
示例5: __init__
def __init__(self,
runner_pex,
task_id,
task,
role,
portmap,
sandbox,
checkpoint_root,
artifact_dir=None,
clock=time,
hostname=None,
process_logger_destination=None,
process_logger_mode=None,
rotate_log_size_mb=None,
rotate_log_backups=None,
preserve_env=False):
"""
runner_pex location of the thermos_runner pex that this task runner should use
task_id task_id assigned by scheduler
task thermos pystachio Task object
role role to run the task under
portmap { name => port } dictionary
sandbox the sandbox object
checkpoint_root the checkpoint root for the thermos runner
artifact_dir scratch space for the thermos runner (basically cwd of thermos.pex)
clock clock
preserve_env
"""
self._runner_pex = runner_pex
self._task_id = task_id
self._task = task
self._popen, self._popen_signal, self._popen_rc = None, None, None
self._monitor = None
self._status = None
self._ports = portmap
self._root = sandbox.root
self._checkpoint_root = checkpoint_root
self._enable_chroot = sandbox.chrooted
self._preserve_env = preserve_env
self._role = role
self._clock = clock
self._artifact_dir = artifact_dir or safe_mkdtemp()
self._hostname = hostname or socket.gethostname()
self._process_logger_destination = process_logger_destination
self._process_logger_mode = process_logger_mode
self._rotate_log_size_mb = rotate_log_size_mb
self._rotate_log_backups = rotate_log_backups
# wait events
self._dead = threading.Event()
self._kill_signal = threading.Event()
self.forking = threading.Event()
self.forked = threading.Event()
try:
with open(os.path.join(self._artifact_dir, 'task.json'), 'w') as fp:
self._task_filename = fp.name
ThermosTaskWrapper(self._task).to_file(self._task_filename)
except ThermosTaskWrapper.InvalidTask as e:
raise TaskError('Failed to load task: %s' % e)
开发者ID:BruceZu,项目名称:aurora,代码行数:60,代码来源:thermos_task_runner.py
示例6: __init__
def __init__(self, cache=None, failsoft=True, clock=time, opener=None):
self._failsoft = failsoft
self._cache = cache or safe_mkdtemp()
safe_mkdir(self._cache)
self._clock = clock
self._opener = opener or Web()
super(CachedWeb, self).__init__()
开发者ID:jalons,项目名称:commons,代码行数:7,代码来源:http.py
示例7: test_launchTask_deserialization_fail
def test_launchTask_deserialization_fail(self): # noqa
proxy_driver = ProxyDriver()
role = getpass.getuser()
task_info = mesos_pb2.TaskInfo()
task_info.name = task_info.task_id.value = "broken"
task_info.data = serialize(
AssignedTask(
task=TaskConfig(
job=JobKey(role=role, environment="env", name="name"),
owner=Identity(role=role, user=role),
executorConfig=ExecutorConfig(name=AURORA_EXECUTOR_NAME, data="garbage"),
)
)
)
te = FastThermosExecutor(
runner_provider=make_provider(safe_mkdtemp()), sandbox_provider=DefaultTestSandboxProvider()
)
te.launchTask(proxy_driver, task_info)
proxy_driver.wait_stopped()
updates = proxy_driver.method_calls["sendStatusUpdate"]
assert len(updates) == 2
assert updates[0][0][0].state == mesos_pb2.TASK_STARTING
assert updates[1][0][0].state == mesos_pb2.TASK_FAILED
开发者ID:rosmo,项目名称:aurora,代码行数:26,代码来源:test_thermos_executor.py
示例8: test_scheduler_runs
def test_scheduler_runs():
"""
Verifies that the scheduler successfully launches 3 "no-op" MySQL tasks.
NOTE: Due to the limitation of zake the scheduler's ZK operations are not propagated to
executors in separate processes but they are unit-tested separately.
"""
import mesos.native
# Make sure fake_mysos_executor.pex is available to be fetched by Mesos slave.
assert os.path.isfile('dist/fake_mysos_executor.pex')
storage = FakeStorage(SequentialThreadingHandler())
zk_client = FakeClient(storage=storage)
zk_client.start()
zk_url = "zk://fake_host/home/mysos/clusters"
cluster_name = "test_cluster"
num_nodes = 3
state_provider = LocalStateProvider(safe_mkdtemp())
framework_info = FrameworkInfo(
user=getpass.getuser(),
name="mysos",
checkpoint=False)
state = Scheduler(framework_info)
scheduler = MysosScheduler(
state,
state_provider,
getpass.getuser(),
os.path.abspath("dist/fake_mysos_executor.pex"),
"./fake_mysos_executor.pex",
zk_client,
zk_url,
Amount(40, Time.SECONDS),
"/fakepath",
gen_encryption_key())
scheduler_driver = mesos.native.MesosSchedulerDriver(
scheduler,
framework_info,
"local")
scheduler_driver.start()
# Wait until the scheduler is connected and becomes available.
assert scheduler.connected.wait(30)
scheduler.create_cluster(cluster_name, "mysql_user", num_nodes)
# A slave is promoted to be the master.
deadline(
lambda: wait_for_master(
get_cluster_path(posixpath.join(zk_url, 'discover'), cluster_name),
zk_client),
Amount(40, Time.SECONDS))
assert scheduler_driver.stop() == DRIVER_STOPPED
开发者ID:GavinHwa,项目名称:mysos,代码行数:59,代码来源:test_mysos_scheduler.py
示例9: make_fileset
def make_fileset(filelist, piece_size, fs=DISK):
"Given (filename, contents) list, return dir, FileSet pair."
td = safe_mkdtemp()
for filename, contents in filelist:
sl = Fileslice(os.path.join(td, filename), slice(0, len(contents)))
fs.fill(sl)
fs.write(sl, contents)
filelist = [(filename, len(contents)) for (filename, contents) in filelist]
return td, FileSet(filelist, piece_size)
开发者ID:pombredanne,项目名称:rainman,代码行数:9,代码来源:testing.py
示例10: __init__
def __init__(self, fileset, piece_hashes=None, chroot=None, fs=DISK):
self._fileset = fileset
self._pieces = piece_hashes or [b'\x00' * 20] * self._fileset.num_pieces
self._actual_pieces = []
self._fileset = fileset
self._sliceset = SliceSet()
self._chroot = chroot or safe_mkdtemp()
self._fs = fs
safe_mkdir(self._chroot)
开发者ID:pombredanne,项目名称:rainman,代码行数:9,代码来源:piece_manager.py
示例11: __init__
def __init__(self, host, port, endpoint, max_delay, stats_file, user, force_stats_upload=False):
self.force_stats_upload = force_stats_upload
self._stats_log_dir = dirutil.safe_mkdtemp()
self._stats_log_file = os.path.join(self._stats_log_dir, "current_run")
log.init(self._stats_log_file)
self._stats_dir = os.path.join("/tmp", user, "stats_uploader_dir")
self._stats_http_client = StatsHttpClient(host, port, endpoint, self._stats_dir)
self._max_delay = max_delay
self._pants_stat_file = stats_file
self._user = user
开发者ID:BabyDuncan,项目名称:commons,代码行数:10,代码来源:stats_http_client.py
示例12: test_inotify_diskcollector
def test_inotify_diskcollector():
target = safe_mkdtemp()
INTERVAL = Amount(50, Time.MILLISECONDS)
collector = InotifyDiskCollector(target)
collector._thread.COLLECTION_INTERVAL = INTERVAL
def wait():
time.sleep((2 * INTERVAL).as_(Time.SECONDS))
_run_collector_tests(collector, target, wait)
开发者ID:bhuvan,项目名称:incubator-aurora,代码行数:10,代码来源:test_disk.py
示例13: thermos_runner_path
def thermos_runner_path(build=True):
if not build:
return getattr(thermos_runner_path, 'value', None)
if not hasattr(thermos_runner_path, 'value'):
pex_dir = safe_mkdtemp()
assert subprocess.call(["./pants", "--pants-distdir=%s" % pex_dir, "binary",
"src/main/python/apache/thermos/runner:thermos_runner"]) == 0
thermos_runner_path.value = os.path.join(pex_dir, 'thermos_runner.pex')
return thermos_runner_path.value
开发者ID:bmhatfield,项目名称:aurora,代码行数:10,代码来源:test_thermos_executor.py
示例14: test_du_diskcollector
def test_du_diskcollector():
target = safe_mkdtemp()
collector = DiskCollector(target)
def wait():
collector.sample()
if collector._thread is not None:
collector._thread.event.wait()
_run_collector_tests(collector, target, wait)
开发者ID:bhuvan,项目名称:incubator-aurora,代码行数:10,代码来源:test_disk.py
示例15: _unpack
def _unpack(self, filename, location=None):
"""Unpack this source target into the path if supplied. If the path is not supplied, a
temporary directory will be created."""
path = location or safe_mkdtemp()
archive_class, error_class = self._archive_class
try:
with contextlib.closing(archive_class(filename)) as package:
package.extractall(path=path)
except error_class:
raise self.UnreadableLink('Could not read %s' % self.url)
return self.first_nontrivial_dir(path)
开发者ID:BabyDuncan,项目名称:commons,代码行数:11,代码来源:link.py
示例16: fetch
def fetch(self, location=None, conn_timeout=None):
if self.local and location is None:
return self._url.path
location = location or safe_mkdtemp()
target = os.path.join(location, self.filename)
if os.path.exists(target):
return target
with contextlib.closing(self.fh(conn_timeout=conn_timeout)) as url_fp:
safe_mkdir(os.path.dirname(target))
with open(target, 'wb') as fp:
fp.write(url_fp.read())
return target
开发者ID:BabyDuncan,项目名称:commons,代码行数:12,代码来源:link.py
示例17: create_run_tracker
def create_run_tracker(info_dir=None):
"""Creates a ``RunTracker`` and starts it.
:param string info_dir: An optional director for the run tracker to store state; defaults to a
new temp dir that will be be cleaned up on interpreter exit.
"""
# TODO(John Sirois): Rework uses around a context manager for cleanup of the info_dir in a more
# disciplined manner
info_dir = info_dir or safe_mkdtemp()
run_tracker = RunTracker(info_dir)
report = Report()
run_tracker.start(report)
return run_tracker
开发者ID:CodeWarltz,项目名称:commons,代码行数:13,代码来源:context_utils.py
示例18: __init__
def __init__(self, source_dir, strict=True):
"""
Create an installer from an unpacked source distribution in source_dir.
If strict=True, fail if any installation dependencies (e.g. distribute)
are missing.
"""
self._source_dir = source_dir
self._install_tmp = safe_mkdtemp()
self._installed = None
self._strict = strict
fd, self._install_record = tempfile.mkstemp()
os.close(fd)
开发者ID:BabyDuncan,项目名称:commons,代码行数:13,代码来源:installer.py
示例19: test_mkdtemp_setup_teardown
def test_mkdtemp_setup_teardown():
m = mox.Mox()
def faux_cleaner():
pass
DIR1, DIR2 = 'fake_dir1__does_not_exist', 'fake_dir2__does_not_exist'
m.StubOutWithMock(atexit, 'register')
m.StubOutWithMock(os, 'getpid')
m.StubOutWithMock(tempfile, 'mkdtemp')
m.StubOutWithMock(dirutil, 'safe_rmtree')
atexit.register(faux_cleaner) # ensure only called once
tempfile.mkdtemp(dir='1').AndReturn(DIR1)
tempfile.mkdtemp(dir='2').AndReturn(DIR2)
os.getpid().MultipleTimes().AndReturn('unicorn')
dirutil.safe_rmtree(DIR1)
dirutil.safe_rmtree(DIR2)
# make sure other "pids" are not cleaned
dirutil._MKDTEMP_DIRS['fluffypants'].add('yoyo')
try:
m.ReplayAll()
assert dirutil.safe_mkdtemp(dir='1', cleaner=faux_cleaner) == DIR1
assert dirutil.safe_mkdtemp(dir='2', cleaner=faux_cleaner) == DIR2
assert 'unicorn' in dirutil._MKDTEMP_DIRS
assert dirutil._MKDTEMP_DIRS['unicorn'] == set([DIR1, DIR2])
dirutil._mkdtemp_atexit_cleaner()
assert 'unicorn' not in dirutil._MKDTEMP_DIRS
assert dirutil._MKDTEMP_DIRS['fluffypants'] == set(['yoyo'])
finally:
dirutil._MKDTEMP_DIRS.pop('unicorn', None)
dirutil._MKDTEMP_DIRS.pop('fluffypants', None)
dirutil._mkdtemp_unregister_cleaner()
m.UnsetStubs()
m.VerifyAll()
开发者ID:BabyDuncan,项目名称:commons,代码行数:37,代码来源:test_dirutil.py
示例20: __init__
def __init__(self, peer_id, chroot=None, io_loop=None, session_impl=Session, fs=DISK):
self.peer_id = peer_id
self._ip = socket.gethostbyname(socket.gethostname())
self._chroot = chroot or safe_mkdtemp()
safe_mkdir(self._chroot)
self._torrents = {} # map from handshake prefix => Torrent
self._trackers = {} # map from handshake prefix => PeerTracker
self._sessions = {} # map from handshake prefix => Session
self._piece_brokers = {} # map from handshake prefix => PieceBroker
self._failed_handshakes = 0
self._port = None
self._session_impl = session_impl
self._fs = fs # this should probably be broker_impl
self._peer_callback = self.default_peer_callback
super(Client, self).__init__(io_loop=io_loop)
开发者ID:pombredanne,项目名称:rainman,代码行数:15,代码来源:client.py
注:本文中的twitter.common.dirutil.safe_mkdtemp函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论