• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python dirutil.safe_open函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中twitter.common.dirutil.safe_open函数的典型用法代码示例。如果您正苦于以下问题:Python safe_open函数的具体用法?Python safe_open怎么用?Python safe_open使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了safe_open函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _spawn_nailgun_server

    def _spawn_nailgun_server(self, workunit):
        self.context.log.debug("No ng server found, spawning...")

        with _safe_open(self._ng_out, "w"):
            pass  # truncate

        pid = os.fork()
        if pid != 0:
            # In the parent tine - block on ng being up for connections
            return self._await_nailgun_server(workunit)

        # NOTE: Don't use self.context.log or self.context.new_workunit here.
        # They use threadlocal state, which interacts poorly with fork().
        os.setsid()
        in_fd = open("/dev/null", "w")
        out_fd = safe_open(self._ng_out, "w")
        err_fd = safe_open(self._ng_err, "w")
        args = ["java"]
        if self._ng_server_args:
            args.extend(self._ng_server_args)
        args.append(NailgunTask.PANTS_NG_ARG)
        args.append(self._identifier_arg)
        ng_classpath = os.pathsep.join(binary_util.profile_classpath(self._nailgun_profile))
        args.extend(["-cp", ng_classpath, "com.martiansoftware.nailgun.NGServer", ":0"])
        s = " ".join(args)

        with binary_util.safe_classpath():
            subprocess.Popen(args, stdin=in_fd, stdout=out_fd, stderr=err_fd, close_fds=True, cwd=get_buildroot())
            # Prevents finally blocks being executed, unlike sys.exit(). We don't want to execute finally
            # blocks because we might, e.g., clean up tempfiles that the parent still needs.
            os._exit(0)
开发者ID:ric03uecS,项目名称:commons,代码行数:31,代码来源:nailgun_task.py


示例2: symlink_cachepath

  def symlink_cachepath(ivy_home, inpath, symlink_dir, outpath):
    """Symlinks all paths listed in inpath that are under ivy_home into symlink_dir.

    Preserves all other paths. Writes the resulting paths to outpath.
    Returns a map of path -> symlink to that path.
    """
    safe_mkdir(symlink_dir)
    with safe_open(inpath, 'r') as infile:
      paths = filter(None, infile.read().strip().split(os.pathsep))
    new_paths = []
    for path in paths:
      if not path.startswith(ivy_home):
        new_paths.append(path)
        continue
      symlink = os.path.join(symlink_dir, os.path.relpath(path, ivy_home))
      try:
        os.makedirs(os.path.dirname(symlink))
      except OSError as e:
        if e.errno != errno.EEXIST:
          raise
      # Note: The try blocks cannot be combined. It may be that the dir exists but the link doesn't.
      try:
        os.symlink(path, symlink)
      except OSError as e:
        # We don't delete and recreate the symlink, as this may break concurrently executing code.
        if e.errno != errno.EEXIST:
          raise
      new_paths.append(symlink)
    with safe_open(outpath, 'w') as outfile:
      outfile.write(':'.join(new_paths))
    symlink_map = dict(zip(paths, new_paths))
    return symlink_map
开发者ID:aoen,项目名称:pants,代码行数:32,代码来源:ivy_utils.py


示例3: setUpClass

  def setUpClass(cls):
    cls.origin = safe_mkdtemp()
    with pushd(cls.origin):
      subprocess.check_call(['git', 'init', '--bare'])

    cls.gitdir = safe_mkdtemp()
    cls.worktree = safe_mkdtemp()

    cls.readme_file = os.path.join(cls.worktree, 'README')

    with environment_as(GIT_DIR=cls.gitdir, GIT_WORK_TREE=cls.worktree):
      cls.init_repo('depot', cls.origin)

      touch(cls.readme_file)
      subprocess.check_call(['git', 'add', 'README'])
      subprocess.check_call(['git', 'commit', '-am', 'initial commit with decode -> \x81b'])
      subprocess.check_call(['git', 'tag', 'first'])
      subprocess.check_call(['git', 'push', '--tags', 'depot', 'master'])
      subprocess.check_call(['git', 'branch', '--set-upstream', 'master', 'depot/master'])

      with safe_open(cls.readme_file, 'w') as readme:
        readme.write('Hello World.')
      subprocess.check_call(['git', 'commit', '-am', 'Update README.'])

    cls.clone2 = safe_mkdtemp()
    with pushd(cls.clone2):
      cls.init_repo('origin', cls.origin)
      subprocess.check_call(['git', 'pull', '--tags', 'origin', 'master:master'])

      with safe_open(os.path.realpath('README'), 'a') as readme:
        readme.write('--')
      subprocess.check_call(['git', 'commit', '-am', 'Update README 2.'])
      subprocess.check_call(['git', 'push', '--tags', 'origin', 'master'])

    cls.git = Git(gitdir=cls.gitdir, worktree=cls.worktree)
开发者ID:aoen,项目名称:pants,代码行数:35,代码来源:test_git.py


示例4: execute

  def execute(self, targets):
    java_targets = filter(JavaCompile._is_java, targets)
    if java_targets:
      with self.context.state('classpath', []) as cp:
        for conf in self._confs:
          cp.insert(0, (conf, self._output_dir))

        with self.changed(java_targets, invalidate_dependants=True) as changed:
          bases, sources_by_target, processors, fingerprint = self.calculate_sources(changed)
          if sources_by_target:
            classpath = [jar for conf, jar in cp if conf in self._confs]
            result = self.compile(classpath, bases, sources_by_target, fingerprint)
            if result != 0:
              raise TaskError('%s returned %d' % (self._main, result))

            if processors:
              if os.path.exists(self._processor_service_info_file):
                with safe_open(self._processor_service_info_file, 'r') as f:
                  for processor in f:
                    processors.add(processor.strip())
              with safe_open(self._processor_service_info_file, 'w') as f:
                for processor in processors:
                  f.write('%s\n' % processor)

      if self.context.products.isrequired('classes'):
        genmap = self.context.products.get('classes')
        classes_by_target = SunCompiler.findclasses(self._output_dir, targets)
        for target, classes in classes_by_target.items():
          genmap.add(target, self._output_dir, classes)
开发者ID:billwei,项目名称:commons,代码行数:29,代码来源:java_compile.py


示例5: process

  def process(self, outdir, base, source, standalone, url_builder, get_config, css=None):
    def parse_url(spec):
      match = MarkdownToHtml.PANTS_LINK.match(spec)
      if match:
        page = Target.get(Address.parse(get_buildroot(), match.group(1)))
        if not page:
          raise TaskError('Invalid link %s' % match.group(1))
        alias, url = url_builder(page, config=get_config(page))
        return alias, url
      else:
        return spec, spec

    def build_url(label):
      components = label.split('|', 1)
      if len(components) == 1:
        return parse_url(label.strip())
      else:
        alias, link = components
        _, url = parse_url(link.strip())
        return alias, url

    wikilinks = WikilinksExtension(build_url)

    path, ext = os.path.splitext(source)
    with safe_open(os.path.join(outdir, path + '.html'), 'w') as output:
      with open(os.path.join(get_buildroot(), base, source), 'r') as input:
        md_html = markdown.markdown(
          input.read(),
          extensions=['codehilite', 'extra', 'toc', wikilinks]
        )
        if standalone:
          if css:
            css_relpath = os.path.relpath(css, outdir)
            out_relpath = os.path.dirname(source)
            link_relpath = os.path.relpath(css_relpath, out_relpath)
            css = '<link rel="stylesheet" type="text/css" href="%s"/>' % link_relpath
          html = textwrap.dedent('''
          <html>
            <head>
              %s
            </head>
            <body>
          <!-- generated by pants! -->
          %s
            </body>
          </html>
          ''').strip() % (css or '', md_html)
          output.write(html)
        else:
          if css:
            with safe_open(css) as fd:
              output.write(textwrap.dedent('''
              <style type="text/css">
              %s
              </style>
              ''').strip() % fd.read())
              output.write('\n')
          output.write(md_html)
        return output.name
开发者ID:adamsxu,项目名称:commons,代码行数:59,代码来源:markdown_to_html.py


示例6: _prepare_fork

 def _prepare_fork(self):
   user, current_user = self._getpwuid()
   uid, gid = user.pw_uid, user.pw_gid
   self._fork_time = self._platform.clock().time()
   self._setup_ckpt()
   self._stdout = safe_open(self._pathspec.with_filename('stdout').getpath('process_logdir'), "w")
   self._stderr = safe_open(self._pathspec.with_filename('stderr').getpath('process_logdir'), "w")
   os.chown(self._stdout.name, user.pw_uid, user.pw_gid)
   os.chown(self._stderr.name, user.pw_uid, user.pw_gid)
开发者ID:MustafaOrkunAcar,项目名称:incubator-aurora,代码行数:9,代码来源:process.py


示例7: _prepare_fork

 def _prepare_fork(self):
   user, current_user = self._getpwuid()
   if self._user:
     if user != current_user and os.geteuid() != 0:
       raise self.PermissionError('Must be root to run processes as other users!')
   uid, gid = user.pw_uid, user.pw_gid
   self._fork_time = self._platform.clock().time()
   self._setup_ckpt()
   self._stdout = safe_open(self._pathspec.with_filename('stdout').getpath('process_logdir'), "a")
   self._stderr = safe_open(self._pathspec.with_filename('stderr').getpath('process_logdir'), "a")
   os.chown(self._stdout.name, user.pw_uid, user.pw_gid)
   os.chown(self._stderr.name, user.pw_uid, user.pw_gid)
开发者ID:rosmo,项目名称:aurora,代码行数:12,代码来源:process.py


示例8: stage_artifacts

    def stage_artifacts(target, jar, version, changelog, confs=None, synth_target=None):
      def artifact_path(name=None, suffix='', extension='jar', artifact_ext=''):
        return os.path.join(self.outdir, jar.org, jar.name + artifact_ext,
                            '%s%s-%s%s.%s' % (
                              (name or jar.name),
                              artifact_ext if name != 'ivy' else '',
                              version,
                              suffix,
                              extension
                            ))

      def get_pushdb(target):
        return get_db(target)[0]

      with safe_open(artifact_path(suffix='-CHANGELOG', extension='txt'), 'w') as changelog_file:
        changelog_file.write(changelog)
      ivyxml = artifact_path(name='ivy', extension='xml')
      IvyWriter(get_pushdb).write(target, ivyxml, confs)
      PomWriter(get_pushdb).write(target, artifact_path(extension='pom'))

      idl_ivyxml = None
      if synth_target:
        changelog_path = artifact_path(suffix='-CHANGELOG', extension='txt', artifact_ext='-only')
        with safe_open(changelog_path, 'w') as changelog_file:
          changelog_file.write(changelog)
        idl_ivyxml = artifact_path(name='ivy', extension='xml', artifact_ext='-only')
        # use idl publication spec in ivy for idl artifact
        IvyWriter(get_pushdb).write(synth_target, idl_ivyxml, ['idl'], synth=True)
        PomWriter(get_pushdb).write(synth_target,
                                    artifact_path(extension='pom', artifact_ext='-only'),
                                    synth=True)

      def copy(tgt, typename, suffix='', artifact_ext=''):
        genmap = self.context.products.get(typename)
        mapping = genmap.get(tgt)
        if not mapping:
          print('no mapping for %s' % tgt)
        else:
          for basedir, jars in mapping.items():
            for artifact in jars:
              path = artifact_path(suffix=suffix, artifact_ext=artifact_ext)
              shutil.copy(os.path.join(basedir, artifact), path)

      copy(target, typename='jars')
      copy(target, typename='source_jars', suffix='-sources')
      if (synth_target):
        copy(synth_target, typename='idl_jars', suffix='-idl', artifact_ext='-only')

      if is_java(target):
        copy(target, typename='javadoc_jars', suffix='-javadoc')


      return ivyxml, idl_ivyxml
开发者ID:steliokontos,项目名称:commons,代码行数:53,代码来源:jar_publish.py


示例9: generate_reports

          def generate_reports():
            args = [
              'report',
              '-in', self.coverage_metadata_file,
              '-in', self.coverage_file,
              '-exit'
            ]
            source_bases = set(t.target_base for t in targets)
            for source_base in source_bases:
              args.extend(['-sp', source_base])

            sorting = ['-Dreport.sort', '+name,+class,+method,+block']
            if self.coverage_report_console:
              args.extend(['-r', 'txt',
                           '-Dreport.txt.out.file=%s' % self.coverage_console_file] + sorting)
            if self.coverage_report_xml:
              args.extend(['-r', 'xml','-Dreport.xml.out.file=%s' % self.coverage_xml_file])
            if self.coverage_report_html:
              args.extend(['-r', 'html',
                           '-Dreport.html.out.file=%s' % self.coverage_html_file,
                           '-Dreport.out.encoding=UTF-8'] + sorting)

            result = runjava(
              classpath=emma_classpath,
              main='emma',
              args=args
            )
            if result != 0:
              raise TaskError('Failed to emma generate code coverage reports: %d' % result)

            if self.coverage_report_console:
              with safe_open(self.coverage_console_file) as console_report:
                sys.stdout.write(console_report.read())
            if self.coverage_report_html_open:
              binary_utils.open(self.coverage_html_file)
开发者ID:magicbill,项目名称:commons,代码行数:35,代码来源:junit_run.py


示例10: stage_artifacts

    def stage_artifacts(target, jar, version, changelog, confs=None):
      def artifact_path(name=None, suffix='', extension='jar'):
        return os.path.join(self.outdir, jar.org, jar.name,
                            '%s-%s%s.%s' % ((name or jar.name), version, suffix, extension))

      with safe_open(artifact_path(suffix='-CHANGELOG', extension='txt'), 'w') as changelog_file:
        changelog_file.write(changelog)

      def get_pushdb(target):
        return get_db(target)[0]

      PomWriter(get_pushdb).write(target, artifact_path(extension='pom'))

      ivyxml = artifact_path(name='ivy', extension='xml')
      IvyWriter(get_pushdb).write(target, ivyxml, confs)

      def copy(typename, suffix=''):
        genmap = self.context.products.get(typename)
        for basedir, jars in genmap.get(target).items():
          for artifact in jars:
            shutil.copy(os.path.join(basedir, artifact), artifact_path(suffix=suffix))

      copy('jars')
      if is_java(target):
        copy('javadoc_jars', '-javadoc')
      copy('source_jars', '-sources')

      return ivyxml
开发者ID:bonifaido,项目名称:commons,代码行数:28,代码来源:jar_publish.py


示例11: write

  def write(self, target, path, confs=None):
    def as_jar(internal_target):
      jar, _, _, _ = self.get_db(internal_target).as_jar_with_version(internal_target)
      return jar

    # TODO(John Sirois): a dict is used here to de-dup codegen targets which have both the original
    # codegen target - say java_thrift_library - and the synthetic generated target (java_library)
    # Consider reworking codegen tasks to add removal of the original codegen targets when rewriting
    # the graph
    dependencies = OrderedDict()
    internal_codegen = {}
    for dep in target_internal_dependencies(target):
      jar = as_jar(dep)
      dependencies[(jar.org, jar.name)] = self.internaldep(jar, dep)
      if dep.is_codegen:
        internal_codegen[jar.name] = jar.name
    for jar in target.jar_dependencies:
      if jar.rev:
        dependencies[(jar.org, jar.name)] = self.jardep(jar)
    target_jar = self.internaldep(as_jar(target)).extend(dependencies=dependencies.values())

    template_kwargs = self.templateargs(target_jar, confs)
    with safe_open(path, 'w') as output:
      template = pkgutil.get_data(__name__, self.template_relpath)
      Generator(template, **template_kwargs).write(output)
开发者ID:alandge,项目名称:twitter-commons,代码行数:25,代码来源:jar_publish.py


示例12: execute

  def execute(self, targets):
    java_targets = filter(_is_java, targets)
    if java_targets:
      safe_mkdir(self._classes_dir)
      safe_mkdir(self._depfile_dir)

      egroups = self.context.products.get_data('exclusives_groups')
      group_id = egroups.get_group_key_for_target(java_targets[0])
      for conf in self._confs:
        egroups.update_compatible_classpaths(group_id, [(conf, self._resources_dir)])
        egroups.update_compatible_classpaths(group_id, [(conf, self._classes_dir)])


      with self.invalidated(java_targets, invalidate_dependents=True,
                            partition_size_hint=self._partition_size_hint) as invalidation_check:
        for vt in invalidation_check.invalid_vts_partitioned:
          # Compile, using partitions for efficiency.
          exclusives_classpath = egroups.get_classpath_for_group(group_id)
          self.execute_single_compilation(vt, exclusives_classpath)
          if not self.dry_run:
            vt.update()

        for vt in invalidation_check.all_vts:
          depfile = self.create_depfile_path(vt.targets)
          if not self.dry_run and os.path.exists(depfile):
            # Read in the deps created either just now or by a previous run on these targets.
            deps = Dependencies(self._classes_dir)
            deps.load(depfile)
            self._deps.merge(deps)

      if not self.dry_run:
        if self.context.products.isrequired('classes'):
          genmap = self.context.products.get('classes')
          # Map generated classes to the owning targets and sources.
          for target, classes_by_source in self._deps.findclasses(java_targets).items():
            for source, classes in classes_by_source.items():
              genmap.add(source, self._classes_dir, classes)
              genmap.add(target, self._classes_dir, classes)

          # TODO(John Sirois): Map target.resources in the same way
          # 'Map' (rewrite) annotation processor service info files to the owning targets.
          for target in java_targets:
            if is_apt(target) and target.processors:
              basedir = os.path.join(self._resources_dir, Target.maybe_readable_identify([target]))
              processor_info_file = os.path.join(basedir, _PROCESSOR_INFO_FILE)
              self.write_processor_info(processor_info_file, target.processors)
              genmap.add(target, basedir, [_PROCESSOR_INFO_FILE])

        # Produce a monolithic apt processor service info file for further compilation rounds
        # and the unit test classpath.
        all_processors = set()
        for target in java_targets:
          if is_apt(target) and target.processors:
            all_processors.update(target.processors)
        processor_info_file = os.path.join(self._classes_dir, _PROCESSOR_INFO_FILE)
        if os.path.exists(processor_info_file):
          with safe_open(processor_info_file, 'r') as f:
            for processor in f:
              all_processors.add(processor.strip())
        self.write_processor_info(processor_info_file, all_processors)
开发者ID:BabyDuncan,项目名称:commons,代码行数:60,代码来源:java_compile.py


示例13: execute

  def execute(self):
    pages = []
    targets = self.context.targets()
    for target in targets:
      if isinstance(target, Page):
        for wiki_artifact in target.payload.provides:
          pages.append((target, wiki_artifact))

    urls = list()

    genmap = self.context.products.get('wiki_html')
    for page, wiki_artifact in pages:
      html_info = genmap.get((wiki_artifact, page))
      if len(html_info) > 1:
        raise TaskError('Unexpected resources for %s: %s' % (page, html_info))
      basedir, htmls = html_info.items()[0]
      if len(htmls) != 1:
        raise TaskError('Unexpected resources for %s: %s' % (page, htmls))
      with safe_open(os.path.join(basedir, htmls[0])) as contents:
        url = self.publish_page(
          page.address,
          wiki_artifact.config['space'],
          wiki_artifact.config['title'],
          contents.read(),
          # Default to none if not present in the hash.
          parent=wiki_artifact.config.get('parent')
        )
        if url:
          urls.append(url)
          self.context.log.info('Published %s to %s' % (page, url))

    if self.open and urls:
      binary_util.ui_open(*urls)
开发者ID:Docworld,项目名称:pants,代码行数:33,代码来源:confluence_publish.py


示例14: execute_single_compilation

  def execute_single_compilation(self, versioned_targets, cp):
    # TODO: Use the artifact cache.

    depfile = self.create_depfile_path(versioned_targets.targets)

    if not versioned_targets.valid:
      self.merge_depfile(versioned_targets)  # Get what we can from previous builds.
      self.context.log.info('Compiling targets %s' % str(versioned_targets.targets))
      sources_by_target, processors, fingerprint = self.calculate_sources(versioned_targets.targets)
      if sources_by_target:
        sources = reduce(lambda all, sources: all.union(sources), sources_by_target.values())
        if not sources:
          self.context.log.warn('Skipping java compile for targets with no sources:\n  %s' %
                                '\n  '.join(str(t) for t in sources_by_target.keys()))
        else:
          classpath = [jar for conf, jar in cp if conf in self._confs]
          result = self.compile(classpath, sources, fingerprint, depfile)
          if result != 0:
            default_message = 'Unexpected error - %s returned %d' % (_JMAKE_MAIN, result)
            raise TaskError(_JMAKE_ERROR_CODES.get(result, default_message))

        if processors and not self.dry_run:
          # Produce a monolithic apt processor service info file for further compilation rounds
          # and the unit test classpath.
          processor_info_file = os.path.join(self._classes_dir, _PROCESSOR_INFO_FILE)
          if os.path.exists(processor_info_file):
            with safe_open(processor_info_file, 'r') as f:
              for processor in f:
                processors.add(processor.strip())
          self.write_processor_info(processor_info_file, processors)

    self.post_process(versioned_targets)
开发者ID:bonifaido,项目名称:commons,代码行数:32,代码来源:java_compile.py


示例15: _get_nailgun_endpoint

    def _get_nailgun_endpoint(self):
        if os.path.exists(self._pidfile):
            with _safe_open(self._pidfile, "r") as pidfile:
                contents = pidfile.read()

                def invalid_pidfile():
                    log.warn("Invalid ng pidfile %s contained: %s" % (self._pidfile, contents))
                    return None

                endpoint = contents.split(":")
                if len(endpoint) != 2:
                    return invalid_pidfile()
                pid, port = endpoint
                try:
                    return int(pid.strip()), int(port.strip())
                except ValueError:
                    return invalid_pidfile()
        elif NailgunTask._find:
            pid_port = NailgunTask._find(self._pidfile)
            if pid_port:
                self.context.log.info("found ng server @ pid:%d port:%d" % pid_port)
                with safe_open(self._pidfile, "w") as pidfile:
                    pidfile.write("%d:%d\n" % pid_port)
            return pid_port
        return None
开发者ID:ewhauser,项目名称:commons,代码行数:25,代码来源:nailgun_task.py


示例16: select_binary

def select_binary(base_path, version, name, config=None):
  """Selects a binary matching the current os and architecture.

  :raises: :class:`pants.binary_util.BinaryUtil.BinaryNotFound` if no binary of the given version
    and name could be found.
  """
  # TODO(John Sirois): finish doc of the path structure expexcted under base_path
  config = config or Config.load()
  bootstrap_dir = config.getdefault('pants_bootstrapdir')

  binary_path = select_binary_base_path(base_path, version, name)
  bootstrapped_binary_path = os.path.join(bootstrap_dir, binary_path)
  if not os.path.exists(bootstrapped_binary_path):
    downloadpath = bootstrapped_binary_path + '~'
    try:
      with select_binary_stream(base_path, version, name, config) as stream:
        with safe_open(downloadpath, 'wb') as bootstrapped_binary:
          bootstrapped_binary.write(stream())
        os.rename(downloadpath, bootstrapped_binary_path)
        chmod_plus_x(bootstrapped_binary_path)
    finally:
      safe_delete(downloadpath)

  log.debug('Selected {binary} binary bootstrapped to: {path}'
            .format(binary=name, path=bootstrapped_binary_path))
  return bootstrapped_binary_path
开发者ID:Docworld,项目名称:pants,代码行数:26,代码来源:binary_util.py


示例17: _await_nailgun_server

  def _await_nailgun_server(self, stdout, stderr):
    nailgun_timeout_seconds = 5
    max_socket_connect_attempts = 10
    nailgun = None
    port_parse_start = time.time()
    with safe_open(self._ng_out, 'r') as ng_out:
      while not nailgun:
        started = ng_out.readline()
        if started:
          port = self._parse_nailgun_port(started)
          nailgun = self._create_ngclient(port, stdout, stderr)
          log.debug('Detected ng server up on port %d' % port)
        elif time.time() - port_parse_start > nailgun_timeout_seconds:
          raise NailgunClient.NailgunError('Failed to read ng output after'
                                           ' %s seconds' % nailgun_timeout_seconds)

    attempt = 0
    while nailgun:
      sock = nailgun.try_connect()
      if sock:
        sock.close()
        endpoint = self._get_nailgun_endpoint()
        if endpoint:
          log.debug('Connected to ng server with fingerprint %s pid: %d @ port: %d' % endpoint)
        else:
          raise NailgunClient.NailgunError('Failed to connect to ng server.')
        return nailgun
      elif attempt > max_socket_connect_attempts:
        raise nailgun.NailgunError('Failed to connect to ng output after %d connect attempts'
                                   % max_socket_connect_attempts)
      attempt += 1
      log.debug('Failed to connect on attempt %d' % attempt)
      time.sleep(0.1)
开发者ID:aoen,项目名称:pants,代码行数:33,代码来源:nailgun_executor.py


示例18: execute

    def execute(self, targets):
        try:
            wiki = Confluence.login(self.url)
        except ConfluenceError as e:
            raise TaskError("Failed to login to confluence: %s" % e)

        urls = list()

        genmap = self.context.products.get("markdown_html")
        for page in filter(lambda t: isinstance(t, Page), targets):
            wikiconfig = page.wiki_config(self.wiki())
            if wikiconfig:
                html_info = genmap.get((self.wiki(), page))
                if len(html_info) > 1:
                    raise TaskError("Unexpected resources for %s: %s" % (page, html_info))
                basedir, htmls = html_info.items()[0]
                if len(htmls) != 1:
                    raise TaskError("Unexpected resources for %s: %s" % (page, htmls))
                with safe_open(os.path.join(basedir, htmls[0])) as contents:
                    url = self.publish_page(
                        wiki, wikiconfig["space"], wikiconfig["title"], contents.read(), parent=wikiconfig.get("parent")
                    )
                    if url:
                        urls.append(url)
                        self.context.log.info("Published %s to %s" % (page, url))

        if self.open and urls:
            binary_utils.open(*urls)
开发者ID:satifanie,项目名称:commons,代码行数:28,代码来源:confluence_publish.py


示例19: execute

  def execute(self, targets):
    pages = []
    for target in targets:
      if isinstance(target, Page):
        wikiconfig = target.wiki_config(self.wiki())
        if wikiconfig:
          pages.append((target, wikiconfig))

    urls = list()

    genmap = self.context.products.get('wiki_html')
    for page, wikiconfig in pages:
      html_info = genmap.get((self.wiki(), page))
      if len(html_info) > 1:
        raise TaskError('Unexpected resources for %s: %s' % (page, html_info))
      basedir, htmls = html_info.items()[0]
      if len(htmls) != 1:
        raise TaskError('Unexpected resources for %s: %s' % (page, htmls))
      with safe_open(os.path.join(basedir, htmls[0])) as contents:
        url = self.publish_page(
          page.address,
          wikiconfig['space'],
          wikiconfig['title'],
          contents.read(),
          parent=wikiconfig.get('parent')
        )
        if url:
          urls.append(url)
          self.context.log.info('Published %s to %s' % (page, url))

    if self.open and urls:
      binary_util.ui_open(*urls)
开发者ID:FernandoG26,项目名称:commons,代码行数:32,代码来源:confluence_publish.py


示例20: from_file

 def from_file(filename, **kw):
   try:
     with safe_open(filename) as fp:
       task = Task.json_load(fp)
     return ThermosTaskWrapper(task, **kw)
   except Exception as e:
     return None
开发者ID:MustafaOrkunAcar,项目名称:incubator-aurora,代码行数:7,代码来源:loader.py



注:本文中的twitter.common.dirutil.safe_open函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python dirutil.safe_rmtree函数代码示例发布时间:2022-05-27
下一篇:
Python dirutil.safe_mkdtemp函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap