• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python tools.write_json函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中utils.tools.write_json函数的典型用法代码示例。如果您正苦于以下问题:Python write_json函数的具体用法?Python write_json怎么用?Python write_json使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了write_json函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: chromium_save_isolated

def chromium_save_isolated(isolated, data, path_variables, algo):
    """Writes one or many .isolated files.

  This slightly increases the cold cache cost but greatly reduce the warm cache
  cost by splitting low-churn files off the master .isolated file. It also
  reduces overall isolateserver memcache consumption.
  """
    slaves = []

    def extract_into_included_isolated(prefix):
        new_slave = {"algo": data["algo"], "files": {}, "version": data["version"]}
        for f in data["files"].keys():
            if f.startswith(prefix):
                new_slave["files"][f] = data["files"].pop(f)
        if new_slave["files"]:
            slaves.append(new_slave)

    # Split test/data/ in its own .isolated file.
    extract_into_included_isolated(os.path.join("test", "data", ""))

    # Split everything out of PRODUCT_DIR in its own .isolated file.
    if path_variables.get("PRODUCT_DIR"):
        extract_into_included_isolated(path_variables["PRODUCT_DIR"])

    files = []
    for index, f in enumerate(slaves):
        slavepath = isolated[: -len(".isolated")] + ".%d.isolated" % index
        tools.write_json(slavepath, f, True)
        data.setdefault("includes", []).append(isolated_format.hash_file(slavepath, algo))
        files.append(os.path.basename(slavepath))

    files.extend(isolated_format.save_isolated(isolated, data))
    return files
开发者ID:qlb7707,项目名称:webrtc_src,代码行数:33,代码来源:isolate.py


示例2: write_details

def write_details(logname, outfile, root_dir, blacklist, results):
  """Writes an .test_cases file with all the information about each test
  case.
  """
  api = trace_inputs.get_api()
  logs = dict(
      (i.pop('trace'), i) for i in api.parse_log(logname, blacklist, None))
  results_processed = {}
  exception = None
  for items in results:
    item = items[-1]
    assert item['valid']
    # Load the results;
    log_dict = logs[item['tracename']]
    if log_dict.get('exception'):
      exception = exception or log_dict['exception']
      continue
    trace_result = log_dict['results']
    if root_dir:
      trace_result = trace_result.strip_root(root_dir)
    results_processed[item['test_case']] = {
      'trace': trace_result.flatten(),
      'duration': item['duration'],
      'output': item['output'],
      'returncode': item['returncode'],
    }

  # Make it dense if there is more than 20 results.
  tools.write_json(
      outfile,
      results_processed,
      len(results_processed) > 20)
  if exception:
    raise exception[0], exception[1], exception[2]
开发者ID:WHS-TechOps,项目名称:Aviator,代码行数:34,代码来源:trace_test_cases.py


示例3: collect

def collect(
    url, task_name, shards, timeout, decorate,
    print_status_updates, task_summary_json, task_output_dir):
  """Retrieves results of a Swarming task."""
  # Grab task keys for each shard. Order is important, used to figure out
  # shard index based on the key.
  # TODO(vadimsh): Simplify this once server support is added.
  task_keys = []
  for index in xrange(shards):
    shard_task_name = get_shard_task_name(task_name, shards, index)
    logging.info('Collecting %s', shard_task_name)
    shard_task_keys = get_task_keys(url, shard_task_name)
    if not shard_task_keys:
      raise Failure('No task keys to get results with: %s' % shard_task_name)
    if len(shard_task_keys) != 1:
      raise Failure('Expecting only one shard for a task: %s' % shard_task_name)
    task_keys.append(shard_task_keys[0])

  # Collect summary JSON and output files (if task_output_dir is not None).
  output_collector = TaskOutputCollector(
      task_output_dir, task_name, len(task_keys))

  seen_shards = set()
  exit_codes = []

  try:
    for index, output in yield_results(
        url, task_keys, timeout, None, print_status_updates, output_collector):
      seen_shards.add(index)

      # Grab first non-zero exit code as an overall shard exit code.
      shard_exit_code = 0
      for code in map(int, (output['exit_codes'] or '1').split(',')):
        if code:
          shard_exit_code = code
          break
      exit_codes.append(shard_exit_code)

      if decorate:
        print decorate_shard_output(index, output, shard_exit_code)
      else:
        print(
            '%s/%s: %s' % (
                output['machine_id'],
                output['machine_tag'],
                output['exit_codes']))
        print(''.join('  %s\n' % l for l in output['output'].splitlines()))
  finally:
    summary = output_collector.finalize()
    if task_summary_json:
      tools.write_json(task_summary_json, summary, False)

  if len(seen_shards) != len(task_keys):
    missing_shards = [x for x in range(len(task_keys)) if x not in seen_shards]
    print >> sys.stderr, ('Results from some shards are missing: %s' %
        ', '.join(map(str, missing_shards)))
    return 1

  return int(bool(any(exit_codes)))
开发者ID:bpsinc-native,项目名称:src_tools_swarming_client,代码行数:59,代码来源:swarming.py


示例4: collect

def collect(
    swarming, task_name, task_ids, timeout, decorate, print_status_updates,
    task_summary_json, task_output_dir):
  """Retrieves results of a Swarming task."""
  # Collect summary JSON and output files (if task_output_dir is not None).
  output_collector = TaskOutputCollector(
      task_output_dir, task_name, len(task_ids))

  seen_shards = set()
  exit_code = 0
  total_duration = 0
  try:
    for index, metadata in yield_results(
        swarming, task_ids, timeout, None, print_status_updates,
        output_collector):
      seen_shards.add(index)

      # Default to failure if there was no process that even started.
      shard_exit_code = 1
      if metadata.get('exit_codes'):
        shard_exit_code = metadata['exit_codes'][0]
      if shard_exit_code:
        exit_code = shard_exit_code
      if metadata.get('durations'):
        total_duration += metadata['durations'][0]

      if decorate:
        print(decorate_shard_output(swarming, index, metadata))
        if len(seen_shards) < len(task_ids):
          print('')
      else:
        if metadata.get('exit_codes'):
          exit_code = metadata['exit_codes'][0]
        else:
          exit_code = 'N/A'
        print('%s: %s %s' %
            (metadata.get('bot_id') or 'N/A', metadata['id'], exit_code))
        for output in metadata['outputs']:
          if not output:
            continue
          output = output.rstrip()
          if output:
            print(''.join('  %s\n' % l for l in output.splitlines()))
  finally:
    summary = output_collector.finalize()
    if task_summary_json:
      tools.write_json(task_summary_json, summary, False)

  if decorate and total_duration:
    print('Total duration: %.1fs' % total_duration)

  if len(seen_shards) != len(task_ids):
    missing_shards = [x for x in range(len(task_ids)) if x not in seen_shards]
    print >> sys.stderr, ('Results from some shards are missing: %s' %
        ', '.join(map(str, missing_shards)))
    return 1

  return exit_code
开发者ID:misscache,项目名称:luci-py,代码行数:58,代码来源:swarming.py


示例5: save_response

def save_response(ticker, url):
    """
    Request data from API and save response 
    """

    response = request.urlopen(url)
    response_data = response.read()
    write_json(f"poloniex/input/{ticker}.json", json.loads(response_data))
    print(f"Downloaded: {ticker}")
开发者ID:momor666,项目名称:Ella,代码行数:9,代码来源:market_data.py


示例6: get_all_tickers

def get_all_tickers():
    """
    Fetch tickers from Poloniex API and save to JSON file
    """

    response = urlopen('https://poloniex.com/public?command=returnTicker')
    response_data = response.read()
    tickers = [k for k, _ in json.loads(response_data).items() if k[:3] == 'BTC']
    tickers.sort()
    write_json('poloniex/data/tickers.json', tickers)
    return tickers
开发者ID:momor666,项目名称:Ella,代码行数:11,代码来源:tickers.py


示例7: finalize

 def finalize(self):
     """Writes summary.json, shutdowns underlying Storage."""
     with self._lock:
         # Write an array of shard results with None for missing shards.
         summary = {
             "task_name": self.task_name,
             "shards": [self._per_shard_results.get(i) for i in xrange(self.shard_count)],
         }
         tools.write_json(os.path.join(self.task_output_dir, "summary.json"), summary, False)
         if self._storage:
             self._storage.close()
             self._storage = None
开发者ID:zanxi,项目名称:bitpop,代码行数:12,代码来源:swarming.py


示例8: collect

def collect(swarming, task_name, task_ids, timeout, decorate, print_status_updates, task_summary_json, task_output_dir):
    """Retrieves results of a Swarming task.

  Returns:
    process exit code that should be returned to the user.
  """
    # Collect summary JSON and output files (if task_output_dir is not None).
    output_collector = TaskOutputCollector(task_output_dir, task_name, len(task_ids))

    seen_shards = set()
    exit_code = None
    total_duration = 0
    try:
        for index, metadata in yield_results(swarming, task_ids, timeout, None, print_status_updates, output_collector):
            seen_shards.add(index)

            # Default to failure if there was no process that even started.
            shard_exit_code = metadata.get("exit_code")
            if shard_exit_code:
                # It's encoded as a string, so bool('0') is True.
                shard_exit_code = int(shard_exit_code)
            if shard_exit_code or exit_code is None:
                exit_code = shard_exit_code
            total_duration += metadata.get("duration", 0)

            if decorate:
                print (decorate_shard_output(swarming, index, metadata))
                if len(seen_shards) < len(task_ids):
                    print ("")
            else:
                print ("%s: %s %s" % (metadata.get("bot_id", "N/A"), metadata["task_id"], shard_exit_code))
                if metadata["output"]:
                    output = metadata["output"].rstrip()
                    if output:
                        print ("".join("  %s\n" % l for l in output.splitlines()))
    finally:
        summary = output_collector.finalize()
        if task_summary_json:
            # TODO(maruel): Make this optional.
            for i in summary["shards"]:
                if i:
                    convert_to_old_format(i)
            tools.write_json(task_summary_json, summary, False)

    if decorate and total_duration:
        print ("Total duration: %.1fs" % total_duration)

    if len(seen_shards) != len(task_ids):
        missing_shards = [x for x in range(len(task_ids)) if x not in seen_shards]
        print >> sys.stderr, ("Results from some shards are missing: %s" % ", ".join(map(str, missing_shards)))
        return 1

    return exit_code if exit_code is not None else 1
开发者ID:Teamxrtc,项目名称:webrtc-streaming-node,代码行数:53,代码来源:swarming.py


示例9: finalize

 def finalize(self):
     """Assembles and returns task summary JSON, shutdowns underlying Storage."""
     with self._lock:
         # Write an array of shard results with None for missing shards.
         summary = {"shards": [self._per_shard_results.get(i) for i in xrange(self.shard_count)]}
         # Write summary.json to task_output_dir as well.
         if self.task_output_dir:
             tools.write_json(os.path.join(self.task_output_dir, "summary.json"), summary, False)
         if self._storage:
             self._storage.close()
             self._storage = None
         return summary
开发者ID:Teamxrtc,项目名称:webrtc-streaming-node,代码行数:12,代码来源:swarming.py


示例10: CMDtrigger

def CMDtrigger(parser, args):
  """Triggers a Swarming task.

  Accepts either the hash (sha1) of a .isolated file already uploaded or the
  path to an .isolated file to archive, packages it if needed and sends a
  Swarming manifest file to the Swarming server.

  If an .isolated file is specified instead of an hash, it is first archived.

  Passes all extra arguments provided after '--' as additional command line
  arguments for an isolated command specified in *.isolate file.
  """
  add_trigger_options(parser)
  add_sharding_options(parser)
  args, isolated_cmd_args = extract_isolated_command_extra_args(args)
  parser.add_option(
      '--dump-json',
      metavar='FILE',
      help='Dump details about the triggered task(s) to this file as json')
  options, args = parser.parse_args(args)
  process_trigger_options(parser, options, args)

  auth.ensure_logged_in(options.swarming)
  if file_path.is_url(options.isolate_server):
    auth.ensure_logged_in(options.isolate_server)
  try:
    tasks, task_name = trigger(
        swarming=options.swarming,
        isolate_server=options.isolate_server or options.indir,
        namespace=options.namespace,
        file_hash_or_isolated=args[0],
        task_name=options.task_name,
        extra_args=isolated_cmd_args,
        shards=options.shards,
        dimensions=options.dimensions,
        env=dict(options.env),
        deadline=options.deadline,
        verbose=options.verbose,
        profile=options.profile,
        priority=options.priority)
    if tasks:
      if task_name != options.task_name:
        print('Triggered task: %s' % task_name)
      if options.dump_json:
        data = {
          'base_task_name': task_name,
          'tasks': tasks,
        }
        tools.write_json(options.dump_json, data, True)
    return int(not tasks)
  except Failure:
    on_error.report(None)
    return 1
开发者ID:bpsinc-native,项目名称:src_tools_swarming_client,代码行数:53,代码来源:swarming.py


示例11: run_tha_test

def run_tha_test(isolated_hash, storage, cache, leak_temp_dir, result_json, root_dir, extra_args):
    """Downloads the dependencies in the cache, hardlinks them into a temporary
  directory and runs the executable from there.

  A temporary directory is created to hold the output files. The content inside
  this directory will be uploaded back to |storage| packaged as a .isolated
  file.

  Arguments:
    isolated_hash: the SHA-1 of the .isolated file that must be retrieved to
                   recreate the tree of files to run the target executable.
    storage: an isolateserver.Storage object to retrieve remote objects. This
             object has a reference to an isolateserver.StorageApi, which does
             the actual I/O.
    cache: an isolateserver.LocalCache to keep from retrieving the same objects
           constantly by caching the objects retrieved. Can be on-disk or
           in-memory.
    leak_temp_dir: if true, the temporary directory will be deliberately leaked
                   for later examination.
    result_json: file path to dump result metadata into. If set, the process
                 exit code is always 0 unless an internal error occured.
    root_dir: directory to the path to use to create the temporary directory. If
              not specified, a random temporary directory is created.
    extra_args: optional arguments to add to the command stated in the .isolate
                file.

  Returns:
    Process exit code that should be used.
  """
    # run_isolated exit code. Depends on if result_json is used or not.
    result = map_and_run(isolated_hash, storage, cache, leak_temp_dir, root_dir, extra_args)
    logging.info("Result:\n%s", tools.format_json(result, dense=True))
    if result_json:
        # We've found tests to delete 'work' when quitting, causing an exception
        # here. Try to recreate the directory if necessary.
        work_dir = os.path.dirname(result_json)
        if not os.path.isdir(work_dir):
            os.mkdir(work_dir)
        tools.write_json(result_json, result, dense=True)
        # Only return 1 if there was an internal error.
        return int(bool(result["internal_failure"]))

    # Marshall into old-style inline output.
    if result["outputs_ref"]:
        data = {
            "hash": result["outputs_ref"]["isolated"],
            "namespace": result["outputs_ref"]["namespace"],
            "storage": result["outputs_ref"]["isolatedserver"],
        }
        sys.stdout.flush()
        print("[run_isolated_out_hack]%s[/run_isolated_out_hack]" % tools.format_json(data, dense=True))
    return result["exit_code"] or int(bool(result["internal_failure"]))
开发者ID:Teamxrtc,项目名称:webrtc-streaming-node,代码行数:52,代码来源:run_isolated.py


示例12: save_isolated

def save_isolated(isolated, data):
  """Writes one or multiple .isolated files.

  Note: this reference implementation does not create child .isolated file so it
  always returns an empty list.

  Returns the list of child isolated files that are included by |isolated|.
  """
  # Make sure the data is valid .isolated data by 'reloading' it.
  algo = SUPPORTED_ALGOS[data['algo']]
  load_isolated(json.dumps(data), algo)
  tools.write_json(isolated, data, True)
  return []
开发者ID:mellowdistrict,项目名称:luci-py,代码行数:13,代码来源:isolated_format.py


示例13: save_files

 def save_files(self):
     """Saves self.saved_state and creates a .isolated file."""
     logging.debug("Dumping to %s" % self.isolated_filepath)
     self.saved_state.child_isolated_files = chromium_save_isolated(
         self.isolated_filepath,
         self.saved_state.to_isolated(),
         self.saved_state.path_variables,
         self.saved_state.algo,
     )
     total_bytes = sum(i.get("s", 0) for i in self.saved_state.files.itervalues())
     if total_bytes:
         # TODO(maruel): Stats are missing the .isolated files.
         logging.debug("Total size: %d bytes" % total_bytes)
     saved_state_file = isolatedfile_to_state(self.isolated_filepath)
     logging.debug("Dumping to %s" % saved_state_file)
     tools.write_json(saved_state_file, self.saved_state.flatten(), True)
开发者ID:qlb7707,项目名称:webrtc_src,代码行数:16,代码来源:isolate.py


示例14: CMDtrigger

def CMDtrigger(parser, args):
  """Triggers a Swarming task.

  Accepts either the hash (sha1) of a .isolated file already uploaded or the
  path to an .isolated file to archive.

  If an .isolated file is specified instead of an hash, it is first archived.

  Passes all extra arguments provided after '--' as additional command line
  arguments for an isolated command specified in *.isolate file.
  """
  add_trigger_options(parser)
  add_sharding_options(parser)
  parser.add_option(
      '--dump-json',
      metavar='FILE',
      help='Dump details about the triggered task(s) to this file as json')
  options, args = parser.parse_args(args)
  task_request = process_trigger_options(parser, options, args)
  try:
    tasks = trigger_task_shards(
        options.swarming, task_request, options.shards)
    if tasks:
      print('Triggered task: %s' % options.task_name)
      tasks_sorted = sorted(
          tasks.itervalues(), key=lambda x: x['shard_index'])
      if options.dump_json:
        data = {
          'base_task_name': options.task_name,
          'tasks': tasks,
          'request': task_request_to_raw_request(task_request),
        }
        tools.write_json(unicode(options.dump_json), data, True)
        print('To collect results, use:')
        print('  swarming.py collect -S %s --json %s' %
            (options.swarming, options.dump_json))
      else:
        print('To collect results, use:')
        print('  swarming.py collect -S %s %s' %
            (options.swarming, ' '.join(t['task_id'] for t in tasks_sorted)))
      print('Or visit:')
      for t in tasks_sorted:
        print('  ' + t['view_url'])
    return int(not tasks)
  except Failure:
    on_error.report(None)
    return 1
开发者ID:mellowdistrict,项目名称:luci-py,代码行数:47,代码来源:swarming.py


示例15: get_tickers

def get_tickers():
    """
    Fetch tickers from Poloniex API and save to JSON file
    """

    tickers = [
        'BTC_BELA',
        'BTC_DASH',
        'BTC_DOGE',
        'BTC_ETH',
        'BTC_LBC',
        'BTC_MAID',
        'BTC_XEM',
        'BTC_XMR',
    ]
    tickers.sort()
    write_json('poloniex/data/tickers.json', tickers)
    return tickers
开发者ID:momor666,项目名称:Ella,代码行数:18,代码来源:tickers.py


示例16: archive_isolated_triggers

def archive_isolated_triggers(isolate_server, tree_isolated, tests):
  """Creates and archives all the .isolated files for the tests at once.

  Archiving them in one batch is faster than archiving each file individually.
  Also the .isolated files can be reused across OSes, reducing the amount of
  I/O.

  Returns:
    list of (test, sha1) tuples.
  """
  logging.info('archive_isolated_triggers(%s, %s)', tree_isolated, tests)
  tempdir = tempfile.mkdtemp(prefix=u'run_swarming_tests_on_swarming_')
  try:
    isolateds = []
    for test in tests:
      test_name = os.path.basename(test)
      # Creates a manual .isolated file. See
      # https://code.google.com/p/swarming/wiki/IsolatedDesign for more details.
      isolated = {
        'algo': 'sha-1',
        'command': ['python', test],
        'includes': [tree_isolated],
        'read_only': 0,
        'version': '1.4',
      }
      v = os.path.join(tempdir, test_name + '.isolated')
      tools.write_json(v, isolated, True)
      isolateds.append(v)
    cmd = [
        'isolateserver.py', 'archive', '--isolate-server', isolate_server,
    ] + isolateds
    if logging.getLogger().isEnabledFor(logging.INFO):
      cmd.append('--verbose')
    items = [i.split() for i in check_output(cmd).splitlines()]
    assert len(items) == len(tests)
    assert all(
        items[i][1].endswith(os.path.basename(tests[i]) + '.isolated')
        for i in xrange(len(tests)))
    return zip(tests, [i[0] for i in items])
  finally:
    shutil.rmtree(tempdir)
开发者ID:nodirt,项目名称:luci-py,代码行数:41,代码来源:run_swarming_tests_on_swarming.py


示例17: CMDtrigger

def CMDtrigger(parser, args):
    """Triggers a Swarming task.

  Accepts either the hash (sha1) of a .isolated file already uploaded or the
  path to an .isolated file to archive.

  If an .isolated file is specified instead of an hash, it is first archived.

  Passes all extra arguments provided after '--' as additional command line
  arguments for an isolated command specified in *.isolate file.
  """
    add_trigger_options(parser)
    add_sharding_options(parser)
    parser.add_option(
        "--dump-json", metavar="FILE", help="Dump details about the triggered task(s) to this file as json"
    )
    options, args = parser.parse_args(args)
    task_request = process_trigger_options(parser, options, args)
    try:
        tasks = trigger_task_shards(options.swarming, task_request, options.shards)
        if tasks:
            print ("Triggered task: %s" % options.task_name)
            tasks_sorted = sorted(tasks.itervalues(), key=lambda x: x["shard_index"])
            if options.dump_json:
                data = {"base_task_name": options.task_name, "tasks": tasks}
                tools.write_json(options.dump_json, data, True)
                print ("To collect results, use:")
                print ("  swarming.py collect -S %s --json %s" % (options.swarming, options.dump_json))
            else:
                print ("To collect results, use:")
                print (
                    "  swarming.py collect -S %s %s" % (options.swarming, " ".join(t["task_id"] for t in tasks_sorted))
                )
            print ("Or visit:")
            for t in tasks_sorted:
                print ("  " + t["view_url"])
        return int(not tasks)
    except Failure:
        on_error.report(None)
        return 1
开发者ID:Teamxrtc,项目名称:webrtc-streaming-node,代码行数:40,代码来源:swarming.py


示例18: chromium_save_isolated

def chromium_save_isolated(isolated, data, path_variables, algo):
  """Writes one or many .isolated files.

  This slightly increases the cold cache cost but greatly reduce the warm cache
  cost by splitting low-churn files off the master .isolated file. It also
  reduces overall isolateserver memcache consumption.
  """
  slaves = []

  def extract_into_included_isolated(prefix):
    new_slave = {
      'algo': data['algo'],
      'files': {},
      'version': data['version'],
    }
    for f in data['files'].keys():
      if f.startswith(prefix):
        new_slave['files'][f] = data['files'].pop(f)
    if new_slave['files']:
      slaves.append(new_slave)

  # Split test/data/ in its own .isolated file.
  extract_into_included_isolated(os.path.join('test', 'data', ''))

  # Split everything out of PRODUCT_DIR in its own .isolated file.
  if path_variables.get('PRODUCT_DIR'):
    extract_into_included_isolated(path_variables['PRODUCT_DIR'])

  files = []
  for index, f in enumerate(slaves):
    slavepath = isolated[:-len('.isolated')] + '.%d.isolated' % index
    tools.write_json(slavepath, f, True)
    data.setdefault('includes', []).append(
        isolated_format.hash_file(slavepath, algo))
    files.append(os.path.basename(slavepath))

  files.extend(isolated_format.save_isolated(isolated, data))
  return files
开发者ID:Teamxrtc,项目名称:webrtc-streaming-node,代码行数:38,代码来源:isolate.py


示例19: test_load_stale_isolated

  def test_load_stale_isolated(self):
    isolate_file = os.path.join(
        ROOT_DIR, 'tests', 'isolate', 'touch_root.isolate')

    # Data to be loaded in the .isolated file. Do not create a .state file.
    input_data = {
      'command': ['python'],
      'files': {
        'foo': {
          "m": 416,
          "h": "invalid",
          "s": 538,
          "t": 1335146921,
        },
        os.path.join('tests', 'isolate', 'touch_root.py'): {
          "m": 488,
          "h": "invalid",
          "s": 538,
          "t": 1335146921,
        },
      },
    }
    options = self._get_option(isolate_file)
    tools.write_json(options.isolated, input_data, False)

    # A CompleteState object contains two parts:
    # - Result instance stored in complete_state.isolated, corresponding to the
    #   .isolated file, is what is read by run_test_from_archive.py.
    # - SavedState instance stored in compelte_state.saved_state,
    #   corresponding to the .state file, which is simply to aid the developer
    #   when re-running the same command multiple times and contain
    #   discardable information.
    complete_state = isolate.load_complete_state(options, self.cwd, None, False)
    actual_isolated = complete_state.saved_state.to_isolated()
    actual_saved_state = complete_state.saved_state.flatten()

    expected_isolated = {
      'algo': 'sha-1',
      'command': ['python', 'touch_root.py'],
      'files': {
        os.path.join(u'tests', 'isolate', 'touch_root.py'): {
          'm': 488,
          'h': hash_file('tests', 'isolate', 'touch_root.py'),
          's': _size('tests', 'isolate', 'touch_root.py'),
        },
        u'isolate.py': {
          'm': 488,
          'h': hash_file('isolate.py'),
          's': _size('isolate.py'),
        },
      },
      'relative_cwd': os.path.join(u'tests', 'isolate'),
      'version': isolate.isolateserver.ISOLATED_FILE_VERSION,
    }
    self._cleanup_isolated(expected_isolated)
    self.assertEqual(expected_isolated, actual_isolated)

    expected_saved_state = {
      'OS': sys.platform,
      'algo': 'sha-1',
      'child_isolated_files': [],
      'command': ['python', 'touch_root.py'],
      'config_variables': {
        'OS': 'linux',
        'chromeos': options.config_variables['chromeos'],
      },
      'extra_variables': {
        'foo': 'bar',
      },
      'files': {
        os.path.join(u'tests', 'isolate', 'touch_root.py'): {
          'm': 488,
          'h': hash_file('tests', 'isolate', 'touch_root.py'),
          's': _size('tests', 'isolate', 'touch_root.py'),
        },
        u'isolate.py': {
          'm': 488,
          'h': hash_file('isolate.py'),
          's': _size('isolate.py'),
        },
      },
      'isolate_file': file_path.safe_relpath(
          file_path.get_native_path_case(isolate_file),
          os.path.dirname(options.isolated)),
      'path_variables': {},
      'relative_cwd': os.path.join(u'tests', 'isolate'),
      'root_dir': file_path.get_native_path_case(ROOT_DIR),
      'version': isolate.SavedState.EXPECTED_VERSION,
    }
    self._cleanup_isolated(expected_saved_state)
    self._cleanup_saved_state(actual_saved_state)
    self.assertEqual(expected_saved_state, actual_saved_state)
开发者ID:bpsinc-native,项目名称:src_tools_swarming_client,代码行数:92,代码来源:isolate_test.py


示例20: CMDquery

def CMDquery(parser, args):
    """Returns raw JSON information via an URL endpoint. Use 'query-list' to
  gather the list of API methods from the server.

  Examples:
    Listing all bots:
      swarming.py query -S https://server-url bots/list

    Listing last 10 tasks on a specific bot named 'swarm1':
      swarming.py query -S https://server-url --limit 10 bot/swarm1/tasks
  """
    CHUNK_SIZE = 250

    parser.add_option(
        "-L",
        "--limit",
        type="int",
        default=200,
        help="Limit to enforce on limitless items (like number of tasks); " "default=%default",
    )
    parser.add_option("--json", help="Path to JSON output file (otherwise prints to stdout)")
    parser.add_option("--progress", action="store_true", help="Prints a dot at each request to show progress")
    options, args = parser.parse_args(args)
    if len(args) != 1:
        parser.error("Must specify only method name and optionally query args properly " "escaped.")
    base_url = options.swarming + "/_ah/api/swarming/v1/" + args[0]
    url = base_url
    if options.limit:
        # Check check, change if not working out.
        merge_char = "&" if "?" in url else "?"
        url += "%slimit=%d" % (merge_char, min(CHUNK_SIZE, options.limit))
    data = net.url_read_json(url)
    if data is None:
        # TODO(maruel): Do basic diagnostic.
        print >> sys.stderr, "Failed to access %s" % url
        return 1

    # Some items support cursors. Try to get automatically if cursors are needed
    # by looking at the 'cursor' items.
    while data.get("cursor") and (not options.limit or len(data["items"]) < options.limit):
        merge_char = "&" if "?" in base_url else "?"
        url = base_url + "%scursor=%s" % (merge_char, urllib.quote(data["cursor"]))
        if options.limit:
            url += "&limit=%d" % min(CHUNK_SIZE, options.limit - len(data["items"]))
        if options.progress:
            sys.stdout.write(".")
            sys.stdout.flush()
        new = net.url_read_json(url)
        if new is None:
            if options.progress:
                print ("")
            print >> sys.stderr, "Failed to access %s" % options.swarming
            return 1
        data["items"].extend(new["items"])
        data["cursor"] = new.get("cursor")

    if options.progress:
        print ("")
    if options.limit and len(data.get("items", [])) > options.limit:
        data["items"] = data["items"][: options.limit]
    data.pop("cursor", None)

    if options.json:
        tools.write_json(options.json, data, True)
    else:
        try:
            tools.write_json(sys.stdout, data, False)
            sys.stdout.write("\n")
        except IOError:
            pass
    return 0
开发者ID:Teamxrtc,项目名称:webrtc-streaming-node,代码行数:71,代码来源:swarming.py



注:本文中的utils.tools.write_json函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python update.update函数代码示例发布时间:2022-05-26
下一篇:
Python timer.Timer类代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap