本文整理汇总了Python中twitter.common.dirutil.safe_mkdir函数的典型用法代码示例。如果您正苦于以下问题:Python safe_mkdir函数的具体用法?Python safe_mkdir怎么用?Python safe_mkdir使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了safe_mkdir函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: create
def create(self):
log.debug('DirectorySandbox: mkdir %s' % self.root)
try:
safe_mkdir(self.root)
except (IOError, OSError) as e:
raise self.CreationError('Failed to create the sandbox: %s' % e)
if self._user:
pwent, grent = self.get_user_and_group()
try:
# Mesos provides a sandbox directory with permission 0750 owned by the user of the executor.
# In case of Thermos this is `root`, as Thermos takes the responsibility to drop
# privileges to the designated non-privileged user/role. To ensure non-provileged processes
# can still read their sandbox, Thermos must also update the permissions of the scratch
# directory created by Mesos.
# This is necessary since Mesos 1.6.0 (https://issues.apache.org/jira/browse/MESOS-8332).
log.debug('DirectorySandbox: chown %s:%s %s' % (self._user, grent.gr_name, self._mesos_dir))
os.chown(self._mesos_dir, pwent.pw_uid, pwent.pw_gid)
log.debug('DirectorySandbox: chown %s:%s %s' % (self._user, grent.gr_name, self.root))
os.chown(self.root, pwent.pw_uid, pwent.pw_gid)
log.debug('DirectorySandbox: chmod 700 %s' % self.root)
os.chmod(self.root, 0700)
except (IOError, OSError) as e:
raise self.CreationError('Failed to chown/chmod the sandbox: %s' % e)
开发者ID:criteo-forks,项目名称:aurora,代码行数:27,代码来源:sandbox.py
示例2: map_internal_jars
def map_internal_jars(self, targets):
internal_jar_dir = os.path.join(self.work_dir, 'internal-libs')
safe_mkdir(internal_jar_dir, clean=True)
internal_source_jar_dir = os.path.join(self.work_dir, 'internal-libsources')
safe_mkdir(internal_source_jar_dir, clean=True)
internal_jars = self.context.products.get('jars')
internal_source_jars = self.context.products.get('source_jars')
for target in targets:
mappings = internal_jars.get(target)
if mappings:
for base, jars in mappings.items():
if len(jars) != 1:
raise TaskError('Unexpected mapping, multiple jars for %s: %s' % (target, jars))
jar = jars[0]
cp_jar = os.path.join(internal_jar_dir, jar)
shutil.copy(os.path.join(base, jar), cp_jar)
cp_source_jar = None
mappings = internal_source_jars.get(target)
if mappings:
for base, jars in mappings.items():
if len(jars) != 1:
raise TaskError(
'Unexpected mapping, multiple source jars for %s: %s' % (target, jars)
)
jar = jars[0]
cp_source_jar = os.path.join(internal_source_jar_dir, jar)
shutil.copy(os.path.join(base, jar), cp_source_jar)
self._project.internal_jars.add(ClasspathEntry(cp_jar, cp_source_jar))
开发者ID:BabyDuncan,项目名称:commons,代码行数:33,代码来源:ide_gen.py
示例3: _mapjars
def _mapjars(self, genmap, target):
"""
Parameters:
genmap: the jar_dependencies ProductMapping entry for the required products.
target: the target whose jar dependencies are being retrieved.
"""
mapdir = os.path.join(self._classpath_dir, target.id)
safe_mkdir(mapdir, clean=True)
ivyargs = [
'-retrieve', '%s/[organisation]/[artifact]/[conf]/'
'[organisation]-[artifact]-[revision](-[classifier]).[ext]' % mapdir,
'-symlink',
'-confs',
]
ivyargs.extend(target.configurations or self._confs)
self._exec_ivy(mapdir, [target], ivyargs)
for org in os.listdir(mapdir):
orgdir = os.path.join(mapdir, org)
if os.path.isdir(orgdir):
for name in os.listdir(orgdir):
artifactdir = os.path.join(orgdir, name)
if os.path.isdir(artifactdir):
for conf in os.listdir(artifactdir):
confdir = os.path.join(artifactdir, conf)
for file in os.listdir(confdir):
if self._is_jar(file):
# TODO(John Sirois): kill the org and (org, name) exclude mappings in favor of a
# conf whitelist
genmap.add(org, confdir).append(file)
genmap.add((org, name), confdir).append(file)
genmap.add(target, confdir).append(file)
genmap.add((target, conf), confdir).append(file)
genmap.add((org, name, conf), confdir).append(file)
开发者ID:lxwuchang,项目名称:commons,代码行数:35,代码来源:ivy_resolve.py
示例4: execute
def execute(self, targets):
catalog = self.context.products.isrequired('javadoc')
if catalog and self.combined:
raise TaskError('Cannot provide javadoc target mappings for combined output')
with self.changed(filter(is_java, targets)) as changed_targets:
safe_mkdir(self._output_dir)
with self.context.state('classpath', []) as cp:
classpath = [jar for conf, jar in cp if conf in self.confs]
def find_javadoc_targets():
if self.transitive:
return changed_targets
else:
return set(changed_targets).intersection(set(self.context.target_roots))
javadoc_targets = list(filter(is_java, find_javadoc_targets()))
if self.combined:
self.generate_combined(classpath, javadoc_targets)
else:
self.generate_individual(classpath, javadoc_targets)
if catalog:
for target in targets:
gendir = self._gendir(target)
javadocs = []
for root, dirs, files in os.walk(gendir):
javadocs.extend(os.path.relpath(os.path.join(root, f), gendir) for f in files)
self.context.products.get('javadoc').add(target, gendir, javadocs)
开发者ID:avadh,项目名称:commons,代码行数:29,代码来源:javadoc_gen.py
示例5: execute
def execute(self, targets):
scala_targets = filter(ScalaCompile._has_scala_sources, targets)
if scala_targets:
safe_mkdir(self._depfile_dir)
safe_mkdir(self._analysis_cache_dir)
# Map from output directory to { analysis_cache_dir, [ analysis_cache_file ]}
upstream_analysis_caches = self.context.products.get('upstream')
with self.context.state('classpath', []) as cp:
for conf in self._confs:
cp.insert(0, (conf, self._resources_dir))
for jar in self._plugin_jars:
cp.insert(0, (conf, jar))
with self.invalidated(scala_targets, invalidate_dependants=True,
partition_size_hint=self._partition_size_hint) as invalidation_check:
for vt in invalidation_check.all_vts:
if vt.valid: # Don't compile, just post-process.
self.post_process(vt, upstream_analysis_caches, split_artifact=False)
for vt in invalidation_check.invalid_vts_partitioned:
# Compile, using partitions for efficiency.
self.execute_single_compilation(vt, cp, upstream_analysis_caches)
if not self.dry_run:
vt.update()
deps_cache = JvmDependencyCache(self, scala_targets)
deps_cache.check_undeclared_dependencies()
开发者ID:lxwuchang,项目名称:commons,代码行数:27,代码来源:scala_compile.py
示例6: run_thrifts
def run_thrifts(self):
"""
Generate Python thrift code using thrift compiler specified in pants config.
Thrift fields conflicting with Python keywords are suffixed with a trailing
underscore (e.g.: from_).
"""
def is_py_thrift(target):
return isinstance(target, PythonThriftLibrary)
all_thrifts = set()
def collect_sources(target):
abs_target_base = os.path.join(get_buildroot(), target.target_base)
for source in target.payload.sources_relative_to_buildroot():
source_root_relative_source = os.path.relpath(source, abs_target_base)
all_thrifts.add((target.target_base, source_root_relative_source))
self.target.walk(collect_sources, predicate=is_py_thrift)
copied_sources = set()
for base, relative_source in all_thrifts:
abs_source = os.path.join(base, relative_source)
copied_source = os.path.join(self._workdir, relative_source)
safe_mkdir(os.path.dirname(copied_source))
shutil.copyfile(abs_source, copied_source)
copied_sources.add(self._modify_thrift(copied_source))
for src in copied_sources:
if not self._run_thrift(src):
raise PythonThriftBuilder.CodeGenerationException("Could not generate .py from %s!" % src)
开发者ID:ejconlon,项目名称:pants,代码行数:33,代码来源:thrift_builder.py
示例7: control
def control(self, force=False):
"""
Bind to the checkpoint associated with this task, position to the end of the log if
it exists, or create it if it doesn't. Fails if we cannot get "leadership" i.e. a
file lock on the checkpoint stream.
"""
if self.is_terminal():
raise self.StateError('Cannot take control of a task in terminal state.')
if self._sandbox:
safe_mkdir(self._sandbox)
ckpt_file = self._pathspec.getpath('runner_checkpoint')
try:
self._ckpt = TaskRunnerHelper.open_checkpoint(ckpt_file, force=force, state=self._state)
except TaskRunnerHelper.PermissionError:
raise self.PermissionError('Unable to open checkpoint %s' % ckpt_file)
log.debug('Flipping recovery mode off.')
self._recovery = False
self._set_task_status(self.task_state())
self._resume_task()
try:
yield
except Exception as e:
log.error('Caught exception in self.control(): %s', e)
log.error(' %s', traceback.format_exc())
self._ckpt.close()
开发者ID:apache,项目名称:aurora,代码行数:25,代码来源:runner.py
示例8: genlang
def genlang(self, lang, targets):
if lang != 'java':
raise TaskError('Unrecognized jaxb language: %s' % lang)
output_dir = os.path.join(self.workdir, 'gen-java')
safe_mkdir(output_dir)
cache = []
for target in targets:
if not isinstance(target, JaxbLibrary):
raise TaskError('Invalid target type "{class_type}" (expected JaxbLibrary)'
.format(class_type=type(target).__name__))
target_files = []
for source in target.sources_relative_to_buildroot():
path_to_xsd = source
output_package = target.package
if output_package is None:
output_package = self._guess_package(source)
output_package = self._correct_package(output_package)
output_directory = output_dir
safe_mkdir(output_directory)
args = ['-p', output_package, '-d', output_directory, path_to_xsd]
result = self._compile_schema(args)
if result != 0:
raise TaskError('xjc ... exited non-zero ({code})'.format(code=result))
target_files.append(self._sources_to_be_generated(target.package, path_to_xsd))
cache.append((target, target_files))
return cache
开发者ID:aoen,项目名称:pants,代码行数:32,代码来源:jaxb_gen.py
示例9: __init__
def __init__(self, target, root_dir, extra_targets=None):
self._config = Config.load()
self._target = target
self._root = root_dir
self._cache = BuildCache(
os.path.join(self._config.get("python-setup", "artifact_cache"), "%s" % PythonIdentity.get())
)
self._extra_targets = list(extra_targets) if extra_targets is not None else []
self._extra_targets.append(self._get_common_python())
cachedir = self._config.get("python-setup", "cache")
safe_mkdir(cachedir)
self._eggcache = cachedir
local_repo = "file://%s" % os.path.realpath(cachedir)
self._repos = [local_repo] + self._config.getlist("python-setup", "repos")
self._fetcher = ReqFetcher(repos=self._repos, cache=cachedir)
self._index = None
for index in self._config.getlist("python-setup", "indices"):
if PythonChroot.can_contact_index(index):
self._index = index
break
self._additional_reqs = set()
distdir = self._config.getdefault("pants_distdir")
distpath = tempfile.mktemp(dir=distdir, prefix=target.name)
self.env = PythonEnvironment(distpath)
开发者ID:satifanie,项目名称:commons,代码行数:27,代码来源:python_chroot.py
示例10: _mapjars
def _mapjars(self, genmap, target):
mapdir = os.path.join(self._classpath_dir, target.id)
safe_mkdir(mapdir, clean=True)
ivyargs = [
"-retrieve",
"%s/[organisation]/[artifact]/[conf]/" "[organisation]-[artifact]-[revision](-[classifier]).[ext]" % mapdir,
"-symlink",
"-confs",
]
ivyargs.extend(target.configurations or self._confs)
self._exec_ivy(mapdir, [target], ivyargs)
for org in os.listdir(mapdir):
orgdir = os.path.join(mapdir, org)
if os.path.isdir(orgdir):
for name in os.listdir(orgdir):
artifactdir = os.path.join(orgdir, name)
if os.path.isdir(artifactdir):
for conf in os.listdir(artifactdir):
confdir = os.path.join(artifactdir, conf)
for file in os.listdir(confdir):
if self._is_jar(file):
# TODO(John Sirois): kill the org and (org, name) exclude mappings in favor of a
# conf whitelist
genmap.add(org, confdir).append(file)
genmap.add((org, name), confdir).append(file)
genmap.add(target, confdir).append(file)
genmap.add((target, conf), confdir).append(file)
genmap.add((org, name, conf), confdir).append(file)
开发者ID:ryan-williams,项目名称:commons,代码行数:30,代码来源:ivy_resolve.py
示例11: execute
def execute(self, targets):
java_targets = filter(_is_java, targets)
if java_targets:
safe_mkdir(self._classes_dir)
safe_mkdir(self._depfile_dir)
egroups = self.context.products.get_data('exclusives_groups')
group_id = egroups.get_group_key_for_target(java_targets[0])
for conf in self._confs:
egroups.update_compatible_classpaths(group_id, [(conf, self._resources_dir)])
egroups.update_compatible_classpaths(group_id, [(conf, self._classes_dir)])
with self.invalidated(java_targets, invalidate_dependents=True,
partition_size_hint=self._partition_size_hint) as invalidation_check:
for vt in invalidation_check.invalid_vts_partitioned:
# Compile, using partitions for efficiency.
exclusives_classpath = egroups.get_classpath_for_group(group_id)
self.execute_single_compilation(vt, exclusives_classpath)
if not self.dry_run:
vt.update()
for vt in invalidation_check.all_vts:
depfile = self.create_depfile_path(vt.targets)
if not self.dry_run and os.path.exists(depfile):
# Read in the deps created either just now or by a previous run on these targets.
deps = Dependencies(self._classes_dir)
deps.load(depfile)
self._deps.merge(deps)
if not self.dry_run:
if self.context.products.isrequired('classes'):
genmap = self.context.products.get('classes')
# Map generated classes to the owning targets and sources.
for target, classes_by_source in self._deps.findclasses(java_targets).items():
for source, classes in classes_by_source.items():
genmap.add(source, self._classes_dir, classes)
genmap.add(target, self._classes_dir, classes)
# TODO(John Sirois): Map target.resources in the same way
# 'Map' (rewrite) annotation processor service info files to the owning targets.
for target in java_targets:
if is_apt(target) and target.processors:
basedir = os.path.join(self._resources_dir, Target.maybe_readable_identify([target]))
processor_info_file = os.path.join(basedir, _PROCESSOR_INFO_FILE)
self.write_processor_info(processor_info_file, target.processors)
genmap.add(target, basedir, [_PROCESSOR_INFO_FILE])
# Produce a monolithic apt processor service info file for further compilation rounds
# and the unit test classpath.
all_processors = set()
for target in java_targets:
if is_apt(target) and target.processors:
all_processors.update(target.processors)
processor_info_file = os.path.join(self._classes_dir, _PROCESSOR_INFO_FILE)
if os.path.exists(processor_info_file):
with safe_open(processor_info_file, 'r') as f:
for processor in f:
all_processors.add(processor.strip())
self.write_processor_info(processor_info_file, all_processors)
开发者ID:BabyDuncan,项目名称:commons,代码行数:60,代码来源:java_compile.py
示例12: execute
def execute(self, targets):
def extract_resources(target):
return target.resources if has_resources(target) else ()
all_resources = set()
for resources in map(extract_resources, targets):
all_resources.update(resources)
def target_dir(resources):
return os.path.join(self.workdir, resources.id)
with self.invalidated(all_resources) as invalidation_check:
invalid_targets = set()
for vt in invalidation_check.invalid_vts:
invalid_targets.update(vt.targets)
for resources in invalid_targets:
resources_dir = target_dir(resources)
safe_mkdir(resources_dir, clean=True)
for resource in resources.sources:
basedir = os.path.dirname(resource)
destdir = os.path.join(resources_dir, basedir)
safe_mkdir(destdir)
shutil.copy(os.path.join(resources.target_base, resource), os.path.join(resources_dir, resource))
genmap = self.context.products.get("resources")
egroups = self.context.products.get_data("exclusives_groups")
group_key = egroups.get_group_key_for_target(targets[0])
for resources in all_resources:
resources_dir = target_dir(resources)
genmap.add(resources, resources_dir, resources.sources)
for conf in self.confs:
egroups.update_compatible_classpaths(group_key, [(conf, resources_dir)])
开发者ID:UrbanCompass,项目名称:commons,代码行数:34,代码来源:prepare_resources.py
示例13: _merge_classes_dir
def _merge_classes_dir(self, state):
"""Merge the classes dirs from the underlying artifacts into a single dir.
May symlink instead of copying, when it's OK to do so.
Postcondition: symlinks are of leaf packages only.
"""
if len(self.underlying_artifacts) <= 1:
return
self.log.debug('Merging classes dirs into %s' % self.classes_dir)
symlinkable_packages = self._symlinkable_packages(state)
for artifact in self.underlying_artifacts:
classnames_by_package = defaultdict(list)
for cls in state.classes_by_target.get(artifact.targets[0], []):
classnames_by_package[os.path.dirname(cls)].append(os.path.basename(cls))
for package, classnames in classnames_by_package.items():
artifact_package_dir = os.path.join(artifact.classes_dir, package)
merged_package_dir = os.path.join(self.classes_dir, package)
if package in symlinkable_packages:
if os.path.islink(merged_package_dir):
assert os.readlink(merged_package_dir) == artifact_package_dir
elif os.path.exists(merged_package_dir):
safe_rmtree(merged_package_dir)
os.symlink(artifact_package_dir, merged_package_dir)
else:
safe_mkdir(os.path.dirname(merged_package_dir))
os.symlink(artifact_package_dir, merged_package_dir)
else:
safe_mkdir(merged_package_dir)
for classname in classnames:
src = os.path.join(artifact_package_dir, classname)
dst = os.path.join(merged_package_dir, classname)
self._maybe_hardlink(src, dst)
开发者ID:samitny,项目名称:commons,代码行数:35,代码来源:zinc_artifact.py
示例14: main
def main():
"""Anonymize a set of analysis files using the same replacements in all of them.
This maintains enough consistency to make splitting/merging tests realistic.
To run:
./pants py src/python/pants/backend/jvm/tasks/jvm_compile:anonymize_zinc_analysis \
<wordfile> <classes dir in analysis files> <analysis file glob 1> <analysis file glob 2> ...
"""
word_file = sys.argv[1]
classes_dir = sys.argv[2]
analysis_files = list(itertools.chain.from_iterable([glob.glob(p) for p in sys.argv[3:]]))
with open(word_file, 'r') as infile:
word_list = infile.read().split()
anonymizer = Anonymizer(word_list)
for analysis_file in analysis_files:
analysis = ZincAnalysisParser(classes_dir).parse_from_path(analysis_file)
analysis.anonymize(anonymizer)
output_dir = os.path.join(os.path.dirname(analysis_file), 'anon')
safe_mkdir(output_dir)
anonymized_filename = anonymizer.convert(os.path.basename(analysis_file))
analysis.write_to_path(os.path.join(output_dir, anonymized_filename))
anonymizer.check_for_comprehensiveness()
开发者ID:aoen,项目名称:pants,代码行数:25,代码来源:anonymize_analysis.py
示例15: genlang
def genlang(self, lang, targets):
protobuf_binary = select_binary(
self.protoc_supportdir,
self.protoc_version,
'protoc',
self.context.config
)
bases, sources = self._calculate_sources(targets)
if lang == 'java':
safe_mkdir(self.java_out)
gen = '--java_out=%s' % self.java_out
elif lang == 'python':
safe_mkdir(self.py_out)
gen = '--python_out=%s' % self.py_out
else:
raise TaskError('Unrecognized protobuf gen lang: %s' % lang)
args = [self.protobuf_binary, gen]
for base in bases:
args.append('--proto_path=%s' % base)
args.extend(sources)
log.debug('Executing: %s' % ' '.join(args))
process = subprocess.Popen(args)
result = process.wait()
if result != 0:
raise TaskError('%s ... exited non-zero (%i)' % (self.protobuf_binary, result))
开发者ID:govindkabra,项目名称:pants,代码行数:30,代码来源:protobuf_gen.py
示例16: execute
def execute(self, targets):
safe_mkdir(self._output_dir)
def jar_targets(predicate):
return filter(predicate, (targets if self.transitive else self.context.target_roots))
def add_genjar(typename, target, name):
if self.context.products.isrequired(typename):
self.context.products.get(typename).add(target, self._output_dir).append(name)
if self.jar_classes:
self.jar(jar_targets(is_jvm),
self.context.products.get('classes'),
functools.partial(add_genjar, 'jars'))
if self.jar_idl:
self.idljar(jar_targets(is_idl), functools.partial(add_genjar, 'idl_jars'))
if self.jar_sources:
self.sourcejar(jar_targets(is_jvm), functools.partial(add_genjar, 'source_jars'))
if self.jar_javadoc:
self.javadocjar(jar_targets(is_java),
self.context.products.get('javadoc'),
functools.partial(add_genjar, 'javadoc_jars'))
开发者ID:dynamicguy,项目名称:commons,代码行数:25,代码来源:jar_create.py
示例17: __init__
def __init__(self, cache=None, failsoft=True, clock=time, opener=None):
self._failsoft = failsoft
self._cache = cache or safe_mkdtemp()
safe_mkdir(self._cache)
self._clock = clock
self._opener = opener or Web()
super(CachedWeb, self).__init__()
开发者ID:jalons,项目名称:commons,代码行数:7,代码来源:http.py
示例18: execute_codegen
def execute_codegen(self, targets):
sources = self._calculate_sources(targets, lambda t: isinstance(t, SpindleThriftLibrary))
bases = set(
target.target_base
for target in self.context.targets(lambda t: isinstance(t, SpindleThriftLibrary))
)
scalate_workdir = os.path.join(self.workdir, 'scalate_workdir')
safe_mkdir(self.namespace_out)
safe_mkdir(scalate_workdir)
args = [
'--template', 'scala/record.ssp',
'--java_template', 'javagen/record.ssp',
'--thrift_include', ':'.join(bases),
'--namespace_out', self.namespace_out,
'--working_dir', scalate_workdir,
]
args.extend(sources)
result = self.runjava(classpath=self.spindle_classpath,
main='com.foursquare.spindle.codegen.binary.ThriftCodegen',
jvm_options=self.get_options().jvm_options,
args=args,
workunit_name='generate')
if result != 0:
raise TaskError('{} returned {}'.format(self.main_class, result))
开发者ID:ahamilton55,项目名称:pants,代码行数:26,代码来源:spindle_gen.py
示例19: __init__
def __init__(self, name, cmdline, sequence, pathspec, sandbox_dir, user=None, platform=None):
"""
required:
name = name of the process
cmdline = cmdline of the process
sequence = the next available sequence number for state updates
pathspec = TaskPath object for synthesizing path names
sandbox_dir = the sandbox in which to run the process
platform = Platform providing fork, clock, getpid
optional:
user = the user to run as (if unspecified, will default to current user.)
if specified to a user that is not the current user, you must have root access
"""
self._name = name
self._cmdline = cmdline
self._pathspec = pathspec
self._seq = sequence
self._sandbox = sandbox_dir
if self._sandbox:
safe_mkdir(self._sandbox)
self._pid = None
self._fork_time = None
self._stdout = None
self._stderr = None
self._user = user
if self._user:
user, current_user = self._getpwuid() # may raise self.UnknownUserError
if user != current_user and os.geteuid() != 0:
raise self.PermissionError('Must be root to run processes as other users!')
self._ckpt = None
self._ckpt_head = -1
if platform is None:
raise ValueError("Platform must be specified")
self._platform = platform
开发者ID:MustafaOrkunAcar,项目名称:incubator-aurora,代码行数:35,代码来源:process.py
示例20: genlang
def genlang(self, lang, targets):
bases, sources = self._calculate_sources(targets)
if lang == 'java':
safe_mkdir(self.java_out)
gen = '--java_out=%s' % self.java_out
elif lang == 'python':
safe_mkdir(self.py_out)
gen = '--python_out=%s' % self.py_out
else:
raise TaskError('Unrecognized protobuf gen lang: %s' % lang)
args = [
self.protobuf_binary,
gen
]
for base in bases:
args.append('--proto_path=%s' % base)
args.extend(sources)
log.debug('Executing: %s' % ' '.join(args))
process = subprocess.Popen(args)
result = process.wait()
if result != 0:
raise TaskError
开发者ID:ilsanbao,项目名称:commons,代码行数:26,代码来源:protobuf_gen.py
注:本文中的twitter.common.dirutil.safe_mkdir函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论