本文整理汇总了Python中utils.tools.write_json函数的典型用法代码示例。如果您正苦于以下问题:Python write_json函数的具体用法?Python write_json怎么用?Python write_json使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了write_json函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: chromium_save_isolated
def chromium_save_isolated(isolated, data, path_variables, algo):
"""Writes one or many .isolated files.
This slightly increases the cold cache cost but greatly reduce the warm cache
cost by splitting low-churn files off the master .isolated file. It also
reduces overall isolateserver memcache consumption.
"""
slaves = []
def extract_into_included_isolated(prefix):
new_slave = {"algo": data["algo"], "files": {}, "version": data["version"]}
for f in data["files"].keys():
if f.startswith(prefix):
new_slave["files"][f] = data["files"].pop(f)
if new_slave["files"]:
slaves.append(new_slave)
# Split test/data/ in its own .isolated file.
extract_into_included_isolated(os.path.join("test", "data", ""))
# Split everything out of PRODUCT_DIR in its own .isolated file.
if path_variables.get("PRODUCT_DIR"):
extract_into_included_isolated(path_variables["PRODUCT_DIR"])
files = []
for index, f in enumerate(slaves):
slavepath = isolated[: -len(".isolated")] + ".%d.isolated" % index
tools.write_json(slavepath, f, True)
data.setdefault("includes", []).append(isolated_format.hash_file(slavepath, algo))
files.append(os.path.basename(slavepath))
files.extend(isolated_format.save_isolated(isolated, data))
return files
开发者ID:qlb7707,项目名称:webrtc_src,代码行数:33,代码来源:isolate.py
示例2: write_details
def write_details(logname, outfile, root_dir, blacklist, results):
"""Writes an .test_cases file with all the information about each test
case.
"""
api = trace_inputs.get_api()
logs = dict(
(i.pop('trace'), i) for i in api.parse_log(logname, blacklist, None))
results_processed = {}
exception = None
for items in results:
item = items[-1]
assert item['valid']
# Load the results;
log_dict = logs[item['tracename']]
if log_dict.get('exception'):
exception = exception or log_dict['exception']
continue
trace_result = log_dict['results']
if root_dir:
trace_result = trace_result.strip_root(root_dir)
results_processed[item['test_case']] = {
'trace': trace_result.flatten(),
'duration': item['duration'],
'output': item['output'],
'returncode': item['returncode'],
}
# Make it dense if there is more than 20 results.
tools.write_json(
outfile,
results_processed,
len(results_processed) > 20)
if exception:
raise exception[0], exception[1], exception[2]
开发者ID:WHS-TechOps,项目名称:Aviator,代码行数:34,代码来源:trace_test_cases.py
示例3: collect
def collect(
url, task_name, shards, timeout, decorate,
print_status_updates, task_summary_json, task_output_dir):
"""Retrieves results of a Swarming task."""
# Grab task keys for each shard. Order is important, used to figure out
# shard index based on the key.
# TODO(vadimsh): Simplify this once server support is added.
task_keys = []
for index in xrange(shards):
shard_task_name = get_shard_task_name(task_name, shards, index)
logging.info('Collecting %s', shard_task_name)
shard_task_keys = get_task_keys(url, shard_task_name)
if not shard_task_keys:
raise Failure('No task keys to get results with: %s' % shard_task_name)
if len(shard_task_keys) != 1:
raise Failure('Expecting only one shard for a task: %s' % shard_task_name)
task_keys.append(shard_task_keys[0])
# Collect summary JSON and output files (if task_output_dir is not None).
output_collector = TaskOutputCollector(
task_output_dir, task_name, len(task_keys))
seen_shards = set()
exit_codes = []
try:
for index, output in yield_results(
url, task_keys, timeout, None, print_status_updates, output_collector):
seen_shards.add(index)
# Grab first non-zero exit code as an overall shard exit code.
shard_exit_code = 0
for code in map(int, (output['exit_codes'] or '1').split(',')):
if code:
shard_exit_code = code
break
exit_codes.append(shard_exit_code)
if decorate:
print decorate_shard_output(index, output, shard_exit_code)
else:
print(
'%s/%s: %s' % (
output['machine_id'],
output['machine_tag'],
output['exit_codes']))
print(''.join(' %s\n' % l for l in output['output'].splitlines()))
finally:
summary = output_collector.finalize()
if task_summary_json:
tools.write_json(task_summary_json, summary, False)
if len(seen_shards) != len(task_keys):
missing_shards = [x for x in range(len(task_keys)) if x not in seen_shards]
print >> sys.stderr, ('Results from some shards are missing: %s' %
', '.join(map(str, missing_shards)))
return 1
return int(bool(any(exit_codes)))
开发者ID:bpsinc-native,项目名称:src_tools_swarming_client,代码行数:59,代码来源:swarming.py
示例4: collect
def collect(
swarming, task_name, task_ids, timeout, decorate, print_status_updates,
task_summary_json, task_output_dir):
"""Retrieves results of a Swarming task."""
# Collect summary JSON and output files (if task_output_dir is not None).
output_collector = TaskOutputCollector(
task_output_dir, task_name, len(task_ids))
seen_shards = set()
exit_code = 0
total_duration = 0
try:
for index, metadata in yield_results(
swarming, task_ids, timeout, None, print_status_updates,
output_collector):
seen_shards.add(index)
# Default to failure if there was no process that even started.
shard_exit_code = 1
if metadata.get('exit_codes'):
shard_exit_code = metadata['exit_codes'][0]
if shard_exit_code:
exit_code = shard_exit_code
if metadata.get('durations'):
total_duration += metadata['durations'][0]
if decorate:
print(decorate_shard_output(swarming, index, metadata))
if len(seen_shards) < len(task_ids):
print('')
else:
if metadata.get('exit_codes'):
exit_code = metadata['exit_codes'][0]
else:
exit_code = 'N/A'
print('%s: %s %s' %
(metadata.get('bot_id') or 'N/A', metadata['id'], exit_code))
for output in metadata['outputs']:
if not output:
continue
output = output.rstrip()
if output:
print(''.join(' %s\n' % l for l in output.splitlines()))
finally:
summary = output_collector.finalize()
if task_summary_json:
tools.write_json(task_summary_json, summary, False)
if decorate and total_duration:
print('Total duration: %.1fs' % total_duration)
if len(seen_shards) != len(task_ids):
missing_shards = [x for x in range(len(task_ids)) if x not in seen_shards]
print >> sys.stderr, ('Results from some shards are missing: %s' %
', '.join(map(str, missing_shards)))
return 1
return exit_code
开发者ID:misscache,项目名称:luci-py,代码行数:58,代码来源:swarming.py
示例5: save_response
def save_response(ticker, url):
"""
Request data from API and save response
"""
response = request.urlopen(url)
response_data = response.read()
write_json(f"poloniex/input/{ticker}.json", json.loads(response_data))
print(f"Downloaded: {ticker}")
开发者ID:momor666,项目名称:Ella,代码行数:9,代码来源:market_data.py
示例6: get_all_tickers
def get_all_tickers():
"""
Fetch tickers from Poloniex API and save to JSON file
"""
response = urlopen('https://poloniex.com/public?command=returnTicker')
response_data = response.read()
tickers = [k for k, _ in json.loads(response_data).items() if k[:3] == 'BTC']
tickers.sort()
write_json('poloniex/data/tickers.json', tickers)
return tickers
开发者ID:momor666,项目名称:Ella,代码行数:11,代码来源:tickers.py
示例7: finalize
def finalize(self):
"""Writes summary.json, shutdowns underlying Storage."""
with self._lock:
# Write an array of shard results with None for missing shards.
summary = {
"task_name": self.task_name,
"shards": [self._per_shard_results.get(i) for i in xrange(self.shard_count)],
}
tools.write_json(os.path.join(self.task_output_dir, "summary.json"), summary, False)
if self._storage:
self._storage.close()
self._storage = None
开发者ID:zanxi,项目名称:bitpop,代码行数:12,代码来源:swarming.py
示例8: collect
def collect(swarming, task_name, task_ids, timeout, decorate, print_status_updates, task_summary_json, task_output_dir):
"""Retrieves results of a Swarming task.
Returns:
process exit code that should be returned to the user.
"""
# Collect summary JSON and output files (if task_output_dir is not None).
output_collector = TaskOutputCollector(task_output_dir, task_name, len(task_ids))
seen_shards = set()
exit_code = None
total_duration = 0
try:
for index, metadata in yield_results(swarming, task_ids, timeout, None, print_status_updates, output_collector):
seen_shards.add(index)
# Default to failure if there was no process that even started.
shard_exit_code = metadata.get("exit_code")
if shard_exit_code:
# It's encoded as a string, so bool('0') is True.
shard_exit_code = int(shard_exit_code)
if shard_exit_code or exit_code is None:
exit_code = shard_exit_code
total_duration += metadata.get("duration", 0)
if decorate:
print (decorate_shard_output(swarming, index, metadata))
if len(seen_shards) < len(task_ids):
print ("")
else:
print ("%s: %s %s" % (metadata.get("bot_id", "N/A"), metadata["task_id"], shard_exit_code))
if metadata["output"]:
output = metadata["output"].rstrip()
if output:
print ("".join(" %s\n" % l for l in output.splitlines()))
finally:
summary = output_collector.finalize()
if task_summary_json:
# TODO(maruel): Make this optional.
for i in summary["shards"]:
if i:
convert_to_old_format(i)
tools.write_json(task_summary_json, summary, False)
if decorate and total_duration:
print ("Total duration: %.1fs" % total_duration)
if len(seen_shards) != len(task_ids):
missing_shards = [x for x in range(len(task_ids)) if x not in seen_shards]
print >> sys.stderr, ("Results from some shards are missing: %s" % ", ".join(map(str, missing_shards)))
return 1
return exit_code if exit_code is not None else 1
开发者ID:Teamxrtc,项目名称:webrtc-streaming-node,代码行数:53,代码来源:swarming.py
示例9: finalize
def finalize(self):
"""Assembles and returns task summary JSON, shutdowns underlying Storage."""
with self._lock:
# Write an array of shard results with None for missing shards.
summary = {"shards": [self._per_shard_results.get(i) for i in xrange(self.shard_count)]}
# Write summary.json to task_output_dir as well.
if self.task_output_dir:
tools.write_json(os.path.join(self.task_output_dir, "summary.json"), summary, False)
if self._storage:
self._storage.close()
self._storage = None
return summary
开发者ID:Teamxrtc,项目名称:webrtc-streaming-node,代码行数:12,代码来源:swarming.py
示例10: CMDtrigger
def CMDtrigger(parser, args):
"""Triggers a Swarming task.
Accepts either the hash (sha1) of a .isolated file already uploaded or the
path to an .isolated file to archive, packages it if needed and sends a
Swarming manifest file to the Swarming server.
If an .isolated file is specified instead of an hash, it is first archived.
Passes all extra arguments provided after '--' as additional command line
arguments for an isolated command specified in *.isolate file.
"""
add_trigger_options(parser)
add_sharding_options(parser)
args, isolated_cmd_args = extract_isolated_command_extra_args(args)
parser.add_option(
'--dump-json',
metavar='FILE',
help='Dump details about the triggered task(s) to this file as json')
options, args = parser.parse_args(args)
process_trigger_options(parser, options, args)
auth.ensure_logged_in(options.swarming)
if file_path.is_url(options.isolate_server):
auth.ensure_logged_in(options.isolate_server)
try:
tasks, task_name = trigger(
swarming=options.swarming,
isolate_server=options.isolate_server or options.indir,
namespace=options.namespace,
file_hash_or_isolated=args[0],
task_name=options.task_name,
extra_args=isolated_cmd_args,
shards=options.shards,
dimensions=options.dimensions,
env=dict(options.env),
deadline=options.deadline,
verbose=options.verbose,
profile=options.profile,
priority=options.priority)
if tasks:
if task_name != options.task_name:
print('Triggered task: %s' % task_name)
if options.dump_json:
data = {
'base_task_name': task_name,
'tasks': tasks,
}
tools.write_json(options.dump_json, data, True)
return int(not tasks)
except Failure:
on_error.report(None)
return 1
开发者ID:bpsinc-native,项目名称:src_tools_swarming_client,代码行数:53,代码来源:swarming.py
示例11: run_tha_test
def run_tha_test(isolated_hash, storage, cache, leak_temp_dir, result_json, root_dir, extra_args):
"""Downloads the dependencies in the cache, hardlinks them into a temporary
directory and runs the executable from there.
A temporary directory is created to hold the output files. The content inside
this directory will be uploaded back to |storage| packaged as a .isolated
file.
Arguments:
isolated_hash: the SHA-1 of the .isolated file that must be retrieved to
recreate the tree of files to run the target executable.
storage: an isolateserver.Storage object to retrieve remote objects. This
object has a reference to an isolateserver.StorageApi, which does
the actual I/O.
cache: an isolateserver.LocalCache to keep from retrieving the same objects
constantly by caching the objects retrieved. Can be on-disk or
in-memory.
leak_temp_dir: if true, the temporary directory will be deliberately leaked
for later examination.
result_json: file path to dump result metadata into. If set, the process
exit code is always 0 unless an internal error occured.
root_dir: directory to the path to use to create the temporary directory. If
not specified, a random temporary directory is created.
extra_args: optional arguments to add to the command stated in the .isolate
file.
Returns:
Process exit code that should be used.
"""
# run_isolated exit code. Depends on if result_json is used or not.
result = map_and_run(isolated_hash, storage, cache, leak_temp_dir, root_dir, extra_args)
logging.info("Result:\n%s", tools.format_json(result, dense=True))
if result_json:
# We've found tests to delete 'work' when quitting, causing an exception
# here. Try to recreate the directory if necessary.
work_dir = os.path.dirname(result_json)
if not os.path.isdir(work_dir):
os.mkdir(work_dir)
tools.write_json(result_json, result, dense=True)
# Only return 1 if there was an internal error.
return int(bool(result["internal_failure"]))
# Marshall into old-style inline output.
if result["outputs_ref"]:
data = {
"hash": result["outputs_ref"]["isolated"],
"namespace": result["outputs_ref"]["namespace"],
"storage": result["outputs_ref"]["isolatedserver"],
}
sys.stdout.flush()
print("[run_isolated_out_hack]%s[/run_isolated_out_hack]" % tools.format_json(data, dense=True))
return result["exit_code"] or int(bool(result["internal_failure"]))
开发者ID:Teamxrtc,项目名称:webrtc-streaming-node,代码行数:52,代码来源:run_isolated.py
示例12: save_isolated
def save_isolated(isolated, data):
"""Writes one or multiple .isolated files.
Note: this reference implementation does not create child .isolated file so it
always returns an empty list.
Returns the list of child isolated files that are included by |isolated|.
"""
# Make sure the data is valid .isolated data by 'reloading' it.
algo = SUPPORTED_ALGOS[data['algo']]
load_isolated(json.dumps(data), algo)
tools.write_json(isolated, data, True)
return []
开发者ID:mellowdistrict,项目名称:luci-py,代码行数:13,代码来源:isolated_format.py
示例13: save_files
def save_files(self):
"""Saves self.saved_state and creates a .isolated file."""
logging.debug("Dumping to %s" % self.isolated_filepath)
self.saved_state.child_isolated_files = chromium_save_isolated(
self.isolated_filepath,
self.saved_state.to_isolated(),
self.saved_state.path_variables,
self.saved_state.algo,
)
total_bytes = sum(i.get("s", 0) for i in self.saved_state.files.itervalues())
if total_bytes:
# TODO(maruel): Stats are missing the .isolated files.
logging.debug("Total size: %d bytes" % total_bytes)
saved_state_file = isolatedfile_to_state(self.isolated_filepath)
logging.debug("Dumping to %s" % saved_state_file)
tools.write_json(saved_state_file, self.saved_state.flatten(), True)
开发者ID:qlb7707,项目名称:webrtc_src,代码行数:16,代码来源:isolate.py
示例14: CMDtrigger
def CMDtrigger(parser, args):
"""Triggers a Swarming task.
Accepts either the hash (sha1) of a .isolated file already uploaded or the
path to an .isolated file to archive.
If an .isolated file is specified instead of an hash, it is first archived.
Passes all extra arguments provided after '--' as additional command line
arguments for an isolated command specified in *.isolate file.
"""
add_trigger_options(parser)
add_sharding_options(parser)
parser.add_option(
'--dump-json',
metavar='FILE',
help='Dump details about the triggered task(s) to this file as json')
options, args = parser.parse_args(args)
task_request = process_trigger_options(parser, options, args)
try:
tasks = trigger_task_shards(
options.swarming, task_request, options.shards)
if tasks:
print('Triggered task: %s' % options.task_name)
tasks_sorted = sorted(
tasks.itervalues(), key=lambda x: x['shard_index'])
if options.dump_json:
data = {
'base_task_name': options.task_name,
'tasks': tasks,
'request': task_request_to_raw_request(task_request),
}
tools.write_json(unicode(options.dump_json), data, True)
print('To collect results, use:')
print(' swarming.py collect -S %s --json %s' %
(options.swarming, options.dump_json))
else:
print('To collect results, use:')
print(' swarming.py collect -S %s %s' %
(options.swarming, ' '.join(t['task_id'] for t in tasks_sorted)))
print('Or visit:')
for t in tasks_sorted:
print(' ' + t['view_url'])
return int(not tasks)
except Failure:
on_error.report(None)
return 1
开发者ID:mellowdistrict,项目名称:luci-py,代码行数:47,代码来源:swarming.py
示例15: get_tickers
def get_tickers():
"""
Fetch tickers from Poloniex API and save to JSON file
"""
tickers = [
'BTC_BELA',
'BTC_DASH',
'BTC_DOGE',
'BTC_ETH',
'BTC_LBC',
'BTC_MAID',
'BTC_XEM',
'BTC_XMR',
]
tickers.sort()
write_json('poloniex/data/tickers.json', tickers)
return tickers
开发者ID:momor666,项目名称:Ella,代码行数:18,代码来源:tickers.py
示例16: archive_isolated_triggers
def archive_isolated_triggers(isolate_server, tree_isolated, tests):
"""Creates and archives all the .isolated files for the tests at once.
Archiving them in one batch is faster than archiving each file individually.
Also the .isolated files can be reused across OSes, reducing the amount of
I/O.
Returns:
list of (test, sha1) tuples.
"""
logging.info('archive_isolated_triggers(%s, %s)', tree_isolated, tests)
tempdir = tempfile.mkdtemp(prefix=u'run_swarming_tests_on_swarming_')
try:
isolateds = []
for test in tests:
test_name = os.path.basename(test)
# Creates a manual .isolated file. See
# https://code.google.com/p/swarming/wiki/IsolatedDesign for more details.
isolated = {
'algo': 'sha-1',
'command': ['python', test],
'includes': [tree_isolated],
'read_only': 0,
'version': '1.4',
}
v = os.path.join(tempdir, test_name + '.isolated')
tools.write_json(v, isolated, True)
isolateds.append(v)
cmd = [
'isolateserver.py', 'archive', '--isolate-server', isolate_server,
] + isolateds
if logging.getLogger().isEnabledFor(logging.INFO):
cmd.append('--verbose')
items = [i.split() for i in check_output(cmd).splitlines()]
assert len(items) == len(tests)
assert all(
items[i][1].endswith(os.path.basename(tests[i]) + '.isolated')
for i in xrange(len(tests)))
return zip(tests, [i[0] for i in items])
finally:
shutil.rmtree(tempdir)
开发者ID:nodirt,项目名称:luci-py,代码行数:41,代码来源:run_swarming_tests_on_swarming.py
示例17: CMDtrigger
def CMDtrigger(parser, args):
"""Triggers a Swarming task.
Accepts either the hash (sha1) of a .isolated file already uploaded or the
path to an .isolated file to archive.
If an .isolated file is specified instead of an hash, it is first archived.
Passes all extra arguments provided after '--' as additional command line
arguments for an isolated command specified in *.isolate file.
"""
add_trigger_options(parser)
add_sharding_options(parser)
parser.add_option(
"--dump-json", metavar="FILE", help="Dump details about the triggered task(s) to this file as json"
)
options, args = parser.parse_args(args)
task_request = process_trigger_options(parser, options, args)
try:
tasks = trigger_task_shards(options.swarming, task_request, options.shards)
if tasks:
print ("Triggered task: %s" % options.task_name)
tasks_sorted = sorted(tasks.itervalues(), key=lambda x: x["shard_index"])
if options.dump_json:
data = {"base_task_name": options.task_name, "tasks": tasks}
tools.write_json(options.dump_json, data, True)
print ("To collect results, use:")
print (" swarming.py collect -S %s --json %s" % (options.swarming, options.dump_json))
else:
print ("To collect results, use:")
print (
" swarming.py collect -S %s %s" % (options.swarming, " ".join(t["task_id"] for t in tasks_sorted))
)
print ("Or visit:")
for t in tasks_sorted:
print (" " + t["view_url"])
return int(not tasks)
except Failure:
on_error.report(None)
return 1
开发者ID:Teamxrtc,项目名称:webrtc-streaming-node,代码行数:40,代码来源:swarming.py
示例18: chromium_save_isolated
def chromium_save_isolated(isolated, data, path_variables, algo):
"""Writes one or many .isolated files.
This slightly increases the cold cache cost but greatly reduce the warm cache
cost by splitting low-churn files off the master .isolated file. It also
reduces overall isolateserver memcache consumption.
"""
slaves = []
def extract_into_included_isolated(prefix):
new_slave = {
'algo': data['algo'],
'files': {},
'version': data['version'],
}
for f in data['files'].keys():
if f.startswith(prefix):
new_slave['files'][f] = data['files'].pop(f)
if new_slave['files']:
slaves.append(new_slave)
# Split test/data/ in its own .isolated file.
extract_into_included_isolated(os.path.join('test', 'data', ''))
# Split everything out of PRODUCT_DIR in its own .isolated file.
if path_variables.get('PRODUCT_DIR'):
extract_into_included_isolated(path_variables['PRODUCT_DIR'])
files = []
for index, f in enumerate(slaves):
slavepath = isolated[:-len('.isolated')] + '.%d.isolated' % index
tools.write_json(slavepath, f, True)
data.setdefault('includes', []).append(
isolated_format.hash_file(slavepath, algo))
files.append(os.path.basename(slavepath))
files.extend(isolated_format.save_isolated(isolated, data))
return files
开发者ID:Teamxrtc,项目名称:webrtc-streaming-node,代码行数:38,代码来源:isolate.py
示例19: test_load_stale_isolated
def test_load_stale_isolated(self):
isolate_file = os.path.join(
ROOT_DIR, 'tests', 'isolate', 'touch_root.isolate')
# Data to be loaded in the .isolated file. Do not create a .state file.
input_data = {
'command': ['python'],
'files': {
'foo': {
"m": 416,
"h": "invalid",
"s": 538,
"t": 1335146921,
},
os.path.join('tests', 'isolate', 'touch_root.py'): {
"m": 488,
"h": "invalid",
"s": 538,
"t": 1335146921,
},
},
}
options = self._get_option(isolate_file)
tools.write_json(options.isolated, input_data, False)
# A CompleteState object contains two parts:
# - Result instance stored in complete_state.isolated, corresponding to the
# .isolated file, is what is read by run_test_from_archive.py.
# - SavedState instance stored in compelte_state.saved_state,
# corresponding to the .state file, which is simply to aid the developer
# when re-running the same command multiple times and contain
# discardable information.
complete_state = isolate.load_complete_state(options, self.cwd, None, False)
actual_isolated = complete_state.saved_state.to_isolated()
actual_saved_state = complete_state.saved_state.flatten()
expected_isolated = {
'algo': 'sha-1',
'command': ['python', 'touch_root.py'],
'files': {
os.path.join(u'tests', 'isolate', 'touch_root.py'): {
'm': 488,
'h': hash_file('tests', 'isolate', 'touch_root.py'),
's': _size('tests', 'isolate', 'touch_root.py'),
},
u'isolate.py': {
'm': 488,
'h': hash_file('isolate.py'),
's': _size('isolate.py'),
},
},
'relative_cwd': os.path.join(u'tests', 'isolate'),
'version': isolate.isolateserver.ISOLATED_FILE_VERSION,
}
self._cleanup_isolated(expected_isolated)
self.assertEqual(expected_isolated, actual_isolated)
expected_saved_state = {
'OS': sys.platform,
'algo': 'sha-1',
'child_isolated_files': [],
'command': ['python', 'touch_root.py'],
'config_variables': {
'OS': 'linux',
'chromeos': options.config_variables['chromeos'],
},
'extra_variables': {
'foo': 'bar',
},
'files': {
os.path.join(u'tests', 'isolate', 'touch_root.py'): {
'm': 488,
'h': hash_file('tests', 'isolate', 'touch_root.py'),
's': _size('tests', 'isolate', 'touch_root.py'),
},
u'isolate.py': {
'm': 488,
'h': hash_file('isolate.py'),
's': _size('isolate.py'),
},
},
'isolate_file': file_path.safe_relpath(
file_path.get_native_path_case(isolate_file),
os.path.dirname(options.isolated)),
'path_variables': {},
'relative_cwd': os.path.join(u'tests', 'isolate'),
'root_dir': file_path.get_native_path_case(ROOT_DIR),
'version': isolate.SavedState.EXPECTED_VERSION,
}
self._cleanup_isolated(expected_saved_state)
self._cleanup_saved_state(actual_saved_state)
self.assertEqual(expected_saved_state, actual_saved_state)
开发者ID:bpsinc-native,项目名称:src_tools_swarming_client,代码行数:92,代码来源:isolate_test.py
示例20: CMDquery
def CMDquery(parser, args):
"""Returns raw JSON information via an URL endpoint. Use 'query-list' to
gather the list of API methods from the server.
Examples:
Listing all bots:
swarming.py query -S https://server-url bots/list
Listing last 10 tasks on a specific bot named 'swarm1':
swarming.py query -S https://server-url --limit 10 bot/swarm1/tasks
"""
CHUNK_SIZE = 250
parser.add_option(
"-L",
"--limit",
type="int",
default=200,
help="Limit to enforce on limitless items (like number of tasks); " "default=%default",
)
parser.add_option("--json", help="Path to JSON output file (otherwise prints to stdout)")
parser.add_option("--progress", action="store_true", help="Prints a dot at each request to show progress")
options, args = parser.parse_args(args)
if len(args) != 1:
parser.error("Must specify only method name and optionally query args properly " "escaped.")
base_url = options.swarming + "/_ah/api/swarming/v1/" + args[0]
url = base_url
if options.limit:
# Check check, change if not working out.
merge_char = "&" if "?" in url else "?"
url += "%slimit=%d" % (merge_char, min(CHUNK_SIZE, options.limit))
data = net.url_read_json(url)
if data is None:
# TODO(maruel): Do basic diagnostic.
print >> sys.stderr, "Failed to access %s" % url
return 1
# Some items support cursors. Try to get automatically if cursors are needed
# by looking at the 'cursor' items.
while data.get("cursor") and (not options.limit or len(data["items"]) < options.limit):
merge_char = "&" if "?" in base_url else "?"
url = base_url + "%scursor=%s" % (merge_char, urllib.quote(data["cursor"]))
if options.limit:
url += "&limit=%d" % min(CHUNK_SIZE, options.limit - len(data["items"]))
if options.progress:
sys.stdout.write(".")
sys.stdout.flush()
new = net.url_read_json(url)
if new is None:
if options.progress:
print ("")
print >> sys.stderr, "Failed to access %s" % options.swarming
return 1
data["items"].extend(new["items"])
data["cursor"] = new.get("cursor")
if options.progress:
print ("")
if options.limit and len(data.get("items", [])) > options.limit:
data["items"] = data["items"][: options.limit]
data.pop("cursor", None)
if options.json:
tools.write_json(options.json, data, True)
else:
try:
tools.write_json(sys.stdout, data, False)
sys.stdout.write("\n")
except IOError:
pass
return 0
开发者ID:Teamxrtc,项目名称:webrtc-streaming-node,代码行数:71,代码来源:swarming.py
注:本文中的utils.tools.write_json函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论