本文整理汇总了Python中twitter.common.dirutil.safe_delete函数的典型用法代码示例。如果您正苦于以下问题:Python safe_delete函数的具体用法?Python safe_delete怎么用?Python safe_delete使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了safe_delete函数的18个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: erase_logs
def erase_logs(self, task_id):
for fn in self.get_logs(task_id, with_size=False):
safe_delete(fn)
state = self.state(task_id)
if state and state.header:
safe_rmtree(TaskPath(root=self._root, task_id=task_id, log_dir=state.header.log_dir)
.getpath('process_logbase'))
开发者ID:aalzabarah,项目名称:incubator-aurora,代码行数:7,代码来源:garbage.py
示例2: select_binary
def select_binary(base_path, version, name, config=None):
"""Selects a binary matching the current os and architecture.
:raises: :class:`pants.binary_util.BinaryUtil.BinaryNotFound` if no binary of the given version
and name could be found.
"""
# TODO(John Sirois): finish doc of the path structure expexcted under base_path
config = config or Config.load()
bootstrap_dir = config.getdefault('pants_bootstrapdir')
binary_path = select_binary_base_path(base_path, version, name)
bootstrapped_binary_path = os.path.join(bootstrap_dir, binary_path)
if not os.path.exists(bootstrapped_binary_path):
downloadpath = bootstrapped_binary_path + '~'
try:
with select_binary_stream(base_path, version, name, config) as stream:
with safe_open(downloadpath, 'wb') as bootstrapped_binary:
bootstrapped_binary.write(stream())
os.rename(downloadpath, bootstrapped_binary_path)
chmod_plus_x(bootstrapped_binary_path)
finally:
safe_delete(downloadpath)
log.debug('Selected {binary} binary bootstrapped to: {path}'
.format(binary=name, path=bootstrapped_binary_path))
return bootstrapped_binary_path
开发者ID:Docworld,项目名称:pants,代码行数:26,代码来源:binary_util.py
示例3: swap_files
def swap_files(self, src, tgt):
if os.path.exists(tgt):
safe_delete(tgt)
try:
os.rename(src, tgt)
except OSError as e:
if e.errno != errno.ENOENT:
raise
开发者ID:ssalevan,项目名称:aurora,代码行数:9,代码来源:process.py
示例4: use_cached_files
def use_cached_files(self, cache_key):
artifact = self._cache.use_cached_files(cache_key)
if artifact and self._post_read_func:
paths = artifact.get_paths()
new_paths = self._post_read_func(paths) # Can return None to signal failure.
if new_paths is None: # Failure. Delete artifact and pretend it was never found.
for path in paths:
safe_delete(path)
self.delete(cache_key)
artifact = None
else:
artifact.override_paths(new_paths)
return artifact
开发者ID:davearata,项目名称:twitter-commons,代码行数:13,代码来源:transforming_artifact_cache.py
示例5: temporary_file
def temporary_file(root_dir=None, cleanup=True):
"""
A with-context that creates a temporary file and returns a writeable file descriptor to it.
You may specify the following keyword args:
root_dir [path]: The parent directory to create the temporary file.
cleanup [True/False]: Whether or not to clean up the temporary file.
"""
with tempfile.NamedTemporaryFile(dir=root_dir, delete=False) as fd:
try:
yield fd
finally:
if cleanup:
safe_delete(fd.name)
开发者ID:BabyDuncan,项目名称:commons,代码行数:14,代码来源:__init__.py
示例6: instrument
def instrument(self, targets, tests, junit_classpath):
self._cobertura_classpath = self._task_exports.tool_classpath(self._cobertura_bootstrap_key)
safe_delete(self._coverage_datafile)
classes_by_target = self._context.products.get_data('classes_by_target')
for target in targets:
if self.is_coverage_target(target):
classes_by_rootdir = classes_by_target.get(target)
if classes_by_rootdir:
for root, products in classes_by_rootdir.rel_paths():
self._rootdirs[root].update(products)
# Cobertura uses regular expressions for filters, and even then there are still problems
# with filtering. It turned out to be easier to just select which classes to instrument
# by filtering them here.
# TODO(ji): Investigate again how we can use cobertura's own filtering mechanisms.
if self._coverage_filters:
for basedir, classes in self._rootdirs.items():
updated_classes = []
for cls in classes:
does_match = False
for positive_filter in self._include_filters:
if fnmatch.fnmatchcase(_classfile_to_classname(cls), positive_filter):
does_match = True
for negative_filter in self._exclude_filters:
if fnmatch.fnmatchcase(_classfile_to_classname(cls), negative_filter):
does_match = False
if does_match:
updated_classes.append(cls)
self._rootdirs[basedir] = updated_classes
for basedir, classes in self._rootdirs.items():
if not classes:
continue # No point in running instrumentation if there is nothing to instrument!
args = [
'--basedir',
basedir,
'--datafile',
self._coverage_datafile,
]
with temporary_file() as fd:
fd.write('\n'.join(classes) + '\n')
args.append('--listOfFilesToInstrument')
args.append(fd.name)
main = 'net.sourceforge.cobertura.instrument.InstrumentMain'
result = execute_java(classpath=self._cobertura_classpath + junit_classpath,
main=main,
args=args,
workunit_factory=self._context.new_workunit,
workunit_name='cobertura-instrument')
if result != 0:
raise TaskError("java %s ... exited non-zero (%i)"
" 'failed to instrument'" % (main, result))
开发者ID:Yasumoto,项目名称:pants,代码行数:50,代码来源:junit_run.py
示例7: symlink_cachepath
def symlink_cachepath(inpath, symlink_dir, outpath):
"""Symlinks all paths listed in inpath into symlink_dir.
Writes the resulting paths to outpath.
Returns a map of path -> symlink to that path.
"""
safe_mkdir(symlink_dir)
with safe_open(inpath, 'r') as infile:
paths = filter(None, infile.read().strip().split(os.pathsep))
symlinks = []
for path in paths:
symlink = os.path.join(symlink_dir, os.path.basename(path))
safe_delete(symlink)
os.symlink(path, symlink)
symlinks.append(symlink)
with safe_open(outpath, 'w') as outfile:
outfile.write(':'.join(symlinks))
symlink_map = dict(zip(paths, symlinks))
return symlink_map
开发者ID:dynamicguy,项目名称:commons,代码行数:19,代码来源:ivy_utils.py
示例8: execute
def execute(self):
dist_dir = self._config.getdefault('pants_distdir')
target_base = '%s-%s' % (
self.target.provides.name, self.target.provides.version)
setup_dir = os.path.join(dist_dir, target_base)
expected_tgz = '%s.tar.gz' % target_base
expected_target = os.path.join(setup_dir, 'dist', expected_tgz)
dist_tgz = os.path.join(dist_dir, expected_tgz)
chroot = Chroot(dist_dir, name=self.target.provides.name)
self.write_contents(chroot)
self.write_setup(chroot)
safe_rmtree(setup_dir)
os.rename(chroot.path(), setup_dir)
with pushd(setup_dir):
cmd = '%s setup.py %s' % (sys.executable, self.options.run or 'sdist')
print('Running "%s" in %s' % (cmd, setup_dir))
extra_args = {} if self.options.run else dict(stdout=subprocess.PIPE, stderr=subprocess.PIPE)
po = subprocess.Popen(cmd, shell=True, **extra_args)
stdout, stderr = po.communicate()
if self.options.run:
print('Ran %s' % cmd)
print('Output in %s' % setup_dir)
return po.returncode
elif po.returncode != 0:
print('Failed to run %s!' % cmd)
for line in ''.join(stdout).splitlines():
print('stdout: %s' % line)
for line in ''.join(stderr).splitlines():
print('stderr: %s' % line)
return po.returncode
else:
if not os.path.exists(expected_target):
print('Could not find expected target %s!' % expected_target)
sys.exit(1)
safe_delete(dist_tgz)
os.rename(expected_target, dist_tgz)
safe_rmtree(setup_dir)
print('Wrote %s' % dist_tgz)
开发者ID:alfss,项目名称:commons,代码行数:43,代码来源:setup_py.py
示例9: execute
def execute(self):
config = Config.load()
distdir = config.getdefault('pants_distdir')
setup_dir = os.path.join(distdir, '%s-%s' % (
self.target.provides._name, self.target.provides._version))
chroot = Chroot(distdir, name=self.target.provides._name)
self.write_sources(chroot)
self.write_setup(chroot)
if os.path.exists(setup_dir):
import shutil
shutil.rmtree(setup_dir)
os.rename(chroot.path(), setup_dir)
with pushd(setup_dir):
cmd = '%s setup.py %s' % (sys.executable, self.options.run or 'sdist')
print('Running "%s" in %s' % (cmd, setup_dir))
extra_args = {} if self.options.run else dict(stdout=subprocess.PIPE, stderr=subprocess.PIPE)
po = subprocess.Popen(cmd, shell=True, **extra_args)
po.wait()
if self.options.run:
print('Ran %s' % cmd)
print('Output in %s' % setup_dir)
return po.returncode
elif po.returncode != 0:
print('Failed to run %s!' % cmd)
for line in po.stdout.read().splitlines():
print('stdout: %s' % line)
for line in po.stderr.read().splitlines():
print('stderr: %s' % line)
return po.returncode
expected_tgz = '%s-%s.tar.gz' % (self.target.provides._name, self.target.provides._version)
expected_target = os.path.join(setup_dir, 'dist', expected_tgz)
dist_tgz = os.path.join(distdir, expected_tgz)
if not os.path.exists(expected_target):
print('Could not find expected target %s!' % expected_target)
sys.exit(1)
safe_delete(dist_tgz)
os.rename(expected_target, dist_tgz)
print('Wrote %s' % dist_tgz)
safe_rmtree(setup_dir)
开发者ID:steliokontos,项目名称:commons,代码行数:42,代码来源:setup_py.py
示例10: safe_file
def safe_file(path, suffix=None, cleanup=True):
"""A with-context that copies a file, and copies the copy back to the original file on success.
This is useful for doing work on a file but only changing its state on success.
- suffix: Use this suffix to create the copy. Otherwise use a random string.
- cleanup: Whether or not to clean up the copy.
"""
safe_path = path + '.%s' % suffix or uuid.uuid4()
if os.path.exists(path):
shutil.copy(path, safe_path)
try:
yield safe_path
if cleanup:
shutil.move(safe_path, path)
else:
shutil.copy(safe_path, path)
finally:
if cleanup:
safe_delete(safe_path)
开发者ID:bollwang,项目名称:commons,代码行数:20,代码来源:__init__.py
示例11: temporary_file
def temporary_file(root_dir=None, cleanup=True):
"""
A with-context that creates a temporary file and returns a writeable file descriptor to it.
You may specify the following keyword args:
:param str root_dir: The parent directory to create the temporary file.
:param bool cleanup: Whether or not to clean up the temporary file.
>>> with temporary_file() as fp:
... fp.write('woot')
... fp.sync()
... # pass fp on to something else
"""
with tempfile.NamedTemporaryFile(dir=root_dir, delete=False) as fd:
try:
yield fd
finally:
if cleanup:
safe_delete(fd.name)
开发者ID:EricCen,项目名称:commons,代码行数:21,代码来源:__init__.py
示例12: _bootstrap_ivy_classpath
def _bootstrap_ivy_classpath(self, executor, workunit_factory, retry=True):
# TODO(John Sirois): Extract a ToolCache class to control the path structure:
# https://jira.twitter.biz/browse/DPB-283
ivy_bootstrap_dir = \
os.path.join(self._config.getdefault('pants_bootstrapdir'), 'tools', 'jvm', 'ivy')
digest = hashlib.sha1()
if os.path.isfile(self._version_or_ivyxml):
with open(self._version_or_ivyxml) as fp:
digest.update(fp.read())
else:
digest.update(self._version_or_ivyxml)
classpath = os.path.join(ivy_bootstrap_dir, '%s.classpath' % digest.hexdigest())
if not os.path.exists(classpath):
ivy = self._bootstrap_ivy(os.path.join(ivy_bootstrap_dir, 'bootstrap.jar'))
args = ['-confs', 'default', '-cachepath', classpath]
if os.path.isfile(self._version_or_ivyxml):
args.extend(['-ivy', self._version_or_ivyxml])
else:
args.extend(['-dependency', 'org.apache.ivy', 'ivy', self._version_or_ivyxml])
try:
ivy.execute(args=args, executor=executor,
workunit_factory=workunit_factory, workunit_name='ivy-bootstrap')
except ivy.Error as e:
safe_delete(classpath)
raise self.Error('Failed to bootstrap an ivy classpath! %s' % e)
with open(classpath) as fp:
cp = fp.read().strip().split(os.pathsep)
if not all(map(os.path.exists, cp)):
safe_delete(classpath)
if retry:
return self._bootstrap_ivy_classpath(executor, workunit_factory, retry=False)
raise self.Error('Ivy bootstrapping failed - invalid classpath: %s' % ':'.join(cp))
return cp
开发者ID:Docworld,项目名称:pants,代码行数:37,代码来源:bootstrapper.py
示例13: select_binary
def select_binary(base_path, version, name, config=None):
"""Selects a binary matching the current os and architecture.
Raises TaskError if no binary of the given version and name could be found.
"""
# TODO(John Sirois): finish doc of the path structure expexcted under base_path
config = config or Config.load()
cachedir = config.getdefault('pants_cachedir', default=os.path.expanduser('~/.pants.d'))
baseurl = config.getdefault('pants_support_baseurl')
timeout_secs = config.getdefault('pants_support_fetch_timeout_secs', type=int, default=30)
sysname, _, release, _, machine = os.uname()
os_id = _ID_BY_OS[sysname.lower()]
if os_id:
middle_path = _PATH_BY_ID[os_id(release, machine)]
if middle_path:
binary_path = os.path.join(base_path, *(middle_path + [version, name]))
cached_binary_path = os.path.join(cachedir, binary_path)
if not os.path.exists(cached_binary_path):
url = posixpath.join(baseurl, binary_path)
log.info('Fetching %s binary from: %s' % (name, url))
downloadpath = cached_binary_path + '~'
try:
with closing(urllib_request.urlopen(url, timeout=timeout_secs)) as binary:
with safe_open(downloadpath, 'wb') as cached_binary:
cached_binary.write(binary.read())
os.rename(downloadpath, cached_binary_path)
chmod_plus_x(cached_binary_path)
except (IOError, urllib_error.HTTPError, urllib_error.URLError) as e:
raise TaskError('Failed to fetch binary from %s: %s' % (url, e))
finally:
safe_delete(downloadpath)
log.debug('Selected %s binary cached at: %s' % (name, cached_binary_path))
return cached_binary_path
raise TaskError('No %s binary found for: %s' % (name, (sysname, release, machine)))
开发者ID:teddziuba,项目名称:commons,代码行数:36,代码来源:binary_util.py
示例14: cleanup
def cleanup(self):
for fetched in self._fetched:
safe_delete(fetched)
开发者ID:EricCen,项目名称:commons,代码行数:3,代码来源:remote_python_thrift_fileset.py
示例15: delete
def delete(self, cache_key):
safe_delete(self._cache_file_for_key(cache_key))
开发者ID:Docworld,项目名称:pants,代码行数:2,代码来源:local_artifact_cache.py
示例16: generate_project
#.........这里部分代码省略.........
source_bases = dict(map(create_source_base_template, project.sources))
if project.has_python:
source_bases.update(map(create_source_base_template, project.py_sources))
source_bases.update(map(create_source_base_template, project.py_libs))
def create_source_template(base_id, includes=None, excludes=None):
return TemplateData(
base=base_id,
includes="|".join(OrderedSet(includes)) if includes else None,
excludes="|".join(OrderedSet(excludes)) if excludes else None,
)
def create_sourcepath(base_id, sources):
def normalize_path_pattern(path):
return "%s/" % path if not path.endswith("/") else path
includes = [normalize_path_pattern(src_set.path) for src_set in sources if src_set.path]
excludes = []
for source_set in sources:
excludes.extend(normalize_path_pattern(exclude) for exclude in source_set.excludes)
return create_source_template(base_id, includes, excludes)
pythonpaths = []
if project.has_python:
for source_set in project.py_sources:
pythonpaths.append(create_source_template(linked_folder_id(source_set)))
for source_set in project.py_libs:
lib_path = source_set.path if source_set.path.endswith(".egg") else "%s/" % source_set.path
pythonpaths.append(create_source_template(linked_folder_id(source_set), includes=[lib_path]))
configured_project = TemplateData(
name=self.project_name,
java=TemplateData(jdk=self.java_jdk, language_level=("1.%d" % self.java_language_level)),
python=project.has_python,
scala=project.has_scala and not project.skip_scala,
source_bases=source_bases.values(),
pythonpaths=pythonpaths,
debug_port=project.debug_port,
)
outdir = os.path.abspath(os.path.join(self.work_dir, "bin"))
safe_mkdir(outdir)
source_sets = defaultdict(OrderedSet) # base_id -> source_set
for source_set in project.sources:
source_sets[linked_folder_id(source_set)].add(source_set)
sourcepaths = [create_sourcepath(base_id, sources) for base_id, sources in source_sets.items()]
libs = []
def add_jarlibs(classpath_entries):
for classpath_entry in classpath_entries:
# TODO(John Sirois): Plumb javadoc jars
libs.append((classpath_entry.jar, classpath_entry.source_jar))
add_jarlibs(project.internal_jars)
add_jarlibs(project.external_jars)
configured_classpath = TemplateData(
sourcepaths=sourcepaths,
has_tests=project.has_tests,
libs=libs,
scala=project.has_scala,
# Eclipse insists the outdir be a relative path unlike other paths
outdir=os.path.relpath(outdir, get_buildroot()),
)
def apply_template(output_path, template_relpath, **template_data):
with safe_open(output_path, "w") as output:
Generator(pkgutil.get_data(__name__, template_relpath), **template_data).write(output)
apply_template(self.project_filename, self.project_template, project=configured_project)
apply_template(self.classpath_filename, self.classpath_template, classpath=configured_classpath)
apply_template(
os.path.join(self.work_dir, "Debug on port %d.launch" % project.debug_port),
self.debug_template,
project=configured_project,
)
apply_template(self.coreprefs_filename, self.coreprefs_template, project=configured_project)
for resource in _SETTINGS:
with safe_open(os.path.join(self.cwd, ".settings", resource), "w") as prefs:
prefs.write(pkgutil.get_data(__name__, os.path.join("files", "eclipse", resource)))
factorypath = TemplateData(
project_name=self.project_name,
# The easiest way to make sure eclipse sees all annotation processors is to put all libs on
# the apt factorypath - this does not seem to hurt eclipse performance in any noticeable way.
jarpaths=libs,
)
apply_template(self.apt_filename, self.apt_template, factorypath=factorypath)
if project.has_python:
apply_template(self.pydev_filename, self.pydev_template, project=configured_project)
else:
safe_delete(self.pydev_filename)
print("\nGenerated project at %s%s" % (self.work_dir, os.sep))
开发者ID:foursquare,项目名称:twitter-commons,代码行数:101,代码来源:eclipse_gen.py
示例17: clear_url
def clear_url(self, url):
for path in self.translate_all(url):
safe_delete(path)
开发者ID:jalons,项目名称:commons,代码行数:3,代码来源:http.py
示例18: erase_metadata
def erase_metadata(self, task_id):
for fn in self.get_metadata(task_id, with_size=False):
safe_delete(fn)
safe_rmtree(TaskPath(root=self._root, task_id=task_id).getpath('checkpoint_path'))
开发者ID:aalzabarah,项目名称:incubator-aurora,代码行数:4,代码来源:garbage.py
注:本文中的twitter.common.dirutil.safe_delete函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论