本文整理汇总了Python中telemetry.util.timer.delta_sec函数的典型用法代码示例。如果您正苦于以下问题:Python delta_sec函数的具体用法?Python delta_sec怎么用?Python delta_sec使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了delta_sec函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: fetch_remotes
def fetch_remotes(self, remotes):
# TODO: fetch remotes inside Mappers, and process each one as it becomes available.
remote_names = [ r["name"] for r in remotes if r["type"] == "remote" ]
# TODO: check cache first.
result = 0
if len(remote_names) == 0:
return result
fetch_cwd = os.path.join(self._work_dir, "cache")
if not os.path.isdir(fetch_cwd):
os.makedirs(fetch_cwd)
loader = s3util.Loader(fetch_cwd, self._bucket_name, aws_key=self._aws_key, aws_secret_key=self._aws_secret_key)
start = datetime.now()
downloaded_bytes = 0
for local, remote, err in loader.get_list(remote_names):
if err is None:
print "Downloaded", remote
downloaded_bytes += os.path.getsize(local)
else:
print "Failed to download", remote
result += 1
duration_sec = timer.delta_sec(start)
downloaded_mb = float(downloaded_bytes) / 1024.0 / 1024.0
print "Downloaded %.2fMB in %.2fs (%.2fMB/s)" % (downloaded_mb, duration_sec, downloaded_mb / duration_sec)
return result
开发者ID:bsmedberg,项目名称:telemetry-server,代码行数:26,代码来源:job.py
示例2: export
def export(self, uploadables):
if len(uploadables) == 0:
print "Nothing to do!"
return 0
print "Found", len(uploadables), "files"
fail_count = 0
start = datetime.now()
total_size = 0
for local, remote, err in self.s3loader.put_list(uploadables):
if err is None:
# Great Success! Delete it locally.
total_size += os.path.getsize(local)
if self.keep_backups:
# Keep a copy of the original, just in case.
os.rename(local, local + ".uploaded")
else:
os.remove(local)
# Send a message to SQS
# TODO: verify that it succeeded.
self.enqueue_incoming(remote)
else:
fail_count += 1
print "Failed to upload '{0}' to bucket {1} as '{2}':".format(local, self.bucket, remote), err
sec = timer.delta_sec(start)
total_mb = float(total_size) / 1024.0 / 1024.0
print "Transferred %.2fMB in %.2fs (%.2fMB/s)" % (total_mb, sec, total_mb / sec)
# TODO: log the transfer stats properly.
# Return zero for overall success or the number of failures.
return fail_count
开发者ID:SamPenrose,项目名称:telemetry-server,代码行数:32,代码来源:export.py
示例3: run_mr
def run_mr(filter, output_file, local_only):
args = {
"job_script" : "../uitour.py",
"input_filter": filter,
"num_mappers" : 16,
"num_reducers" : 4,
"data_dir" : "../work/cache",
"work_dir" : "../work",
"output" : output_file,
"bucket" : "telemetry-published-v2",
"local_only" : local_only
}
if not args["local_only"]:
if not BOTO_AVAILABLE:
print "ERROR: The 'boto' library is required except in 'local-only' mode."
print " You can install it using `sudo pip install boto`"
parser.print_help()
return -2
job = Job(args)
start = datetime.now()
exit_code = 0
try:
job.mapreduce()
except:
traceback.print_exc(file=sys.stderr)
exit_code = 2
duration = timer.delta_sec(start)
print "All done in %.2fs" % (duration)
return (exit_code, output_file)
开发者ID:bwinton,项目名称:AustralisTelemetry,代码行数:32,代码来源:australis_report_gen.py
示例4: export_batch
def export_batch(self, data_dir, conn, bucket, files):
print self.label, "Uploading", ",".join(files)
if self.dry_run:
return 0
# Time the s3funnel call:
start = datetime.now()
result = subprocess.call(self.s3f_cmd + files, cwd=data_dir)
sec = timer.delta_sec(start)
total_size = 0
if result == 0:
# Success! Verify each file's checksum, then truncate it.
for f in files:
# Verify checksum and track cumulative size so we can figure out MB/s
full_filename = os.path.join(data_dir, f)
md5, size = fileutil.md5file(full_filename)
total_size += size
# f is the key name - it does not include the full path to the
# data dir.
key = bucket.get_key(f)
# Strip quotes from md5
remote_md5 = key.etag[1:-1]
if md5 != remote_md5:
# TODO: add it to a "failed" queue.
print "ERROR: %s failed checksum verification: Local=%s, Remote=%s" % (f, md5, remote_md5)
self.bad_records += 1
result = -1
# TODO: else add it to a "succeeded" queue and remove it locally.
else:
print "Failed to upload one or more files in the current batch. Error code was", result
total_mb = float(total_size) / 1024.0 / 1024.0
print "Transferred %.2fMB in %.2fs (%.2fMB/s)" % (total_mb, sec, total_mb / sec)
return result
开发者ID:darchons,项目名称:telemetry-server,代码行数:35,代码来源:process_incoming_mp.py
示例5: main
def main():
parser = argparse.ArgumentParser(description='Run a MapReduce Job.', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("job_script", help="The MapReduce script to run")
parser.add_argument("-l", "--local-only", help="Only process local files (exclude S3 data)", action="store_true")
parser.add_argument("-m", "--num-mappers", metavar="N", help="Start N mapper processes", type=int, default=4)
parser.add_argument("-r", "--num-reducers", metavar="N", help="Start N reducer processes", type=int, default=1)
parser.add_argument("-d", "--data-dir", help="Base data directory", required=True)
parser.add_argument("-b", "--bucket", help="S3 Bucket name")
parser.add_argument("-k", "--aws-key", help="AWS Key", default=None)
parser.add_argument("-s", "--aws-secret-key", help="AWS Secret Key", default=None)
parser.add_argument("-w", "--work-dir", help="Location to put temporary work files", default="/tmp/telemetry_mr")
parser.add_argument("-o", "--output", help="Filename to use for final job output", required=True)
#TODO: make the input filter optional, default to "everything valid" and generate dims intelligently.
parser.add_argument("-f", "--input-filter", help="File containing filter spec", required=True)
parser.add_argument("-v", "--verbose", help="Print verbose output", action="store_true")
args = parser.parse_args()
if not args.local_only:
if not BOTO_AVAILABLE:
print "ERROR: The 'boto' library is required except in 'local-only' mode."
print " You can install it using `sudo pip install boto`"
parser.print_help()
sys.exit(-2)
# If we want to process remote data, some more arguments are required.
for remote_req in ["bucket"]:
if not hasattr(args, remote_req) or getattr(args, remote_req) is None:
print "ERROR:", remote_req, "is a required option"
parser.print_help()
sys.exit(-1)
job = Job(args)
start = datetime.now()
job.mapreduce()
duration = timer.delta_sec(start)
print "All done in %.2fs" % (duration)
开发者ID:jonasfj,项目名称:telemetry-server,代码行数:35,代码来源:job.py
示例6: run_mapper
def run_mapper(self, mapper_id, inputs, work_dir, module, partition_count, delete_files, aws_key, aws_secret_key, s3_bucket):
self.work_dir = work_dir
print "I am mapper", mapper_id, ", and I'm mapping", len(inputs), "inputs. 0% complete."
bytes_total = sum([f.size for f in inputs])
bytes_completed = 0
next_notice_pct = 10
start = datetime.now()
loader = None
output_file = os.path.join(work_dir, "mapper_" + str(mapper_id))
mapfunc = getattr(module, 'map', None)
context = Context(output_file, partition_count)
if not callable(mapfunc):
print "No map function!!!"
sys.exit(1)
# TODO: Stream/decompress the files directly.
for input_file in inputs:
if input_file.remote:
# TODO: check if the file already exists locally.
# Lazy load the loader (so we don't do it on "local only" jobs).
if loader is None:
loader = s3util.Loader(os.path.join(self.work_dir, "cache"), s3_bucket, aws_key=aws_key, aws_secret_key=aws_secret_key, poolsize=1)
for local, remote, err in loader.get_list([input_file.name]):
if err is not None:
print "Failed to download", remote, ":", err
try:
handle = self.open_input_file(input_file)
except:
print "Error opening", input_file.name, "(skipping)"
traceback.print_exc(file=sys.stderr)
continue
line_num = 0
for line in handle:
line_num += 1
try:
# Remove the trailing EOL character(s) before passing to
# the map function.
key, value = line.rstrip('\r\n').split("\t", 1)
mapfunc(key, input_file.dimensions, value, context)
except ValueError, e:
# TODO: increment "bad line" metrics.
print "Bad line:", input_file.name, ":", line_num, e
handle.close()
if delete_files:
print "Removing", input_file.name
os.remove(handle.filename)
bytes_completed += input_file.size
completed_pct = (float(bytes_completed) / bytes_total) * 100
if completed_pct >= next_notice_pct:
next_notice_pct += 10
duration_sec = timer.delta_sec(start)
completed_mb = float(bytes_completed) / 1024.0 / 1024.0
print "Mapper %d: %.2f%% complete. Processed %.2fMB in %.2fs (%.2fMB/s)" % (mapper_id, completed_pct, completed_mb, duration_sec, completed_mb / duration_sec)
开发者ID:ilanasegall,项目名称:AustralisTelemetry,代码行数:58,代码来源:job.py
示例7: dump_stats
def dump_stats(self):
duration = timer.delta_sec(self.start_time, self.end_time)
read_rate = self.records_read / duration
mb_read = self.bytes_read / 1024.0 / 1024.0
mb_read_rate = mb_read / duration
write_rate = self.records_written / duration
mb_written = self.bytes_written / 1024.0 / 1024.0
mb_write_rate = mb_written / duration
print "%s: Read %d records or %.2fMB (%.2fr/s, %.2fMB/s), wrote %d or %.2f MB (%.2fr/s, %.2fMB/s). Found %d bad records" % (self.label, self.records_read, mb_read, read_rate, mb_read_rate, self.records_written, mb_written, write_rate, mb_write_rate, self.bad_records)
开发者ID:darchons,项目名称:telemetry-server,代码行数:9,代码来源:process_incoming_mp.py
示例8: import_files
def import_files(self, input_directory):
begin = datetime.now()
processes = []
self._enqueue_process(partial(self._master, input_directory), processes)
for worker in range(0, self._n_workers):
self._enqueue_process(partial(self._worker), processes)
for p in processes:
p.join()
print("Files imported in", timer.delta_sec(begin), "seconds.")
开发者ID:SamPenrose,项目名称:telemetry-server,代码行数:12,代码来源:importer.py
示例9: handle
def handle(self, record):
filename = record
base_ends = filename.find(".log") + 4
if base_ends < 4:
self.log("Bad filename encountered, skipping: " + filename)
self.stats.increment(records_read=1, bad_records=1,
bad_record_type="bad_filename")
return
basename = filename[0:base_ends]
# Get a unique name for the compressed file:
comp_name = basename + "." + uuid.uuid4().hex + StorageLayout.COMPRESSED_SUFFIX
# reserve it!
f_comp = open(comp_name, "wb")
# TODO: open f_comp with same buffer size as below?
# Rename uncompressed file to a temp name
tmp_name = comp_name + ".compressing"
os.rename(filename, tmp_name)
# Read input file as text (line-buffered)
f_raw = open(tmp_name, "r", 1)
start = now()
# Now set up our processing pipe:
# - read from f_raw, compress, write to comp_name
p_compress = Popen(self.compress_cmd, bufsize=65536, stdin=f_raw,
stdout=f_comp, stderr=sys.stderr)
# Note: it looks like p_compress.wait() is what we want, but the docs
# warn of a deadlock, so we use communicate() instead.
p_compress.communicate()
raw_bytes = f_raw.tell()
comp_bytes = f_comp.tell()
raw_mb = float(raw_bytes) / 1024.0 / 1024.0
comp_mb = float(comp_bytes) / 1024.0 / 1024.0
f_raw.close()
f_comp.close()
self.stats.increment(records_read=1, records_written=1,
bytes_read=raw_bytes, bytes_written=comp_bytes)
# Remove raw file
os.remove(tmp_name)
sec = timer.delta_sec(start, now())
self.log("Compressed %s as %s in %.2fs. Size before: %.2fMB, after:" \
" %.2fMB (r: %.2fMB/s, w: %.2fMB/s)" % (filename, comp_name,
sec, raw_mb, comp_mb, (raw_mb/sec), (comp_mb/sec)))
开发者ID:bsmedberg,项目名称:telemetry-server,代码行数:49,代码来源:process_incoming_standalone.py
示例10: save_map
def save_map(self, channel_name, chan_stats):
if self.stats_file is None:
return;
chan_stats["task"] = self.task
chan_stats["channel"] = channel_name
chan_stats["start_time"] = datetime_to_json(self.start_time)
chan_stats["end_time"] = datetime_to_json(self.end_time)
chan_stats["duration"] = timer.delta_sec(self.start_time, self.end_time)
try:
with io.open(self.stats_file, "a") as fout:
fout.write(unicode(json.dumps(chan_stats) + u"\n"))
except:
self.logger.log("Error writing '{}' stats".format(channel_name))
self.logger.log(traceback.format_exc())
开发者ID:SamPenrose,项目名称:telemetry-server,代码行数:15,代码来源:process_incoming_standalone.py
示例11: get_summary
def get_summary(self):
duration = timer.delta_sec(self.start_time, self.end_time)
read_rate = self.overall["records_read"] / duration
mb_read = self.overall["bytes_read"] / 1024.0 / 1024.0
mb_read_rate = mb_read / duration
write_rate = self.overall["records_written"] / duration
mb_written = self.overall["bytes_written"] / 1024.0 / 1024.0
mb_write_rate = mb_written / duration
summary = "Read %d records or %.2fMB (%.2fr/s, %.2fMB/s), " \
"wrote %d or %.2f MB (%.2fr/s, %.2fMB/s). " \
"Found %d bad records" % (self.overall["records_read"],
mb_read, read_rate, mb_read_rate,
self.overall["records_written"], mb_written, write_rate,
mb_write_rate, self.overall["bad_records"])
return summary
开发者ID:SamPenrose,项目名称:telemetry-server,代码行数:15,代码来源:process_incoming_standalone.py
示例12: fetch_s3_files
def fetch_s3_files(incoming_files, fetch_cwd, bucket, aws_key, aws_secret_key):
result = 0
if len(incoming_files) > 0:
if not os.path.isdir(fetch_cwd):
os.makedirs(fetch_cwd)
files = []
for f in incoming_files:
full_filename = os.path.join(fetch_cwd, f)
if os.path.isfile(full_filename):
md5, size = fileutil.md5file(full_filename)
# f is the key name - it does not include the full path to the
# data dir.
key = bucket.get_key(f)
# Strip quotes from md5
remote_md5 = key.etag[1:-1]
if md5 != remote_md5:
files.append(f)
else:
print "Already downloaded", f
else:
files.append(f)
fetch_cmd = [S3FUNNEL_PATH]
fetch_cmd.append(bucket.name)
fetch_cmd.append("get")
fetch_cmd.append("-a")
fetch_cmd.append(aws_key)
fetch_cmd.append("-s")
fetch_cmd.append(aws_secret_key)
fetch_cmd.append("-t")
fetch_cmd.append("8")
# Fetch in batches of 8 files at a time
while len(files) > 0:
current_files = files[0:8]
files = files[8:]
start = datetime.now()
result = subprocess.call(fetch_cmd + current_files, cwd=fetch_cwd)
duration_sec = timer.delta_sec(start)
# TODO: verify MD5s
downloaded_bytes = sum([ os.path.getsize(os.path.join(fetch_cwd, f)) for f in current_files ])
downloaded_mb = downloaded_bytes / 1024.0 / 1024.0
print "Downloaded %.2fMB in %.2fs (%.2fMB/s)" % (downloaded_mb, duration_sec, downloaded_mb / duration_sec)
if result != 0:
break
return result
开发者ID:darchons,项目名称:telemetry-server,代码行数:45,代码来源:process_incoming_mp.py
示例13: work
def work(self):
print self.label, "Starting up"
while True:
try:
raw = self.q_in.get()
if raw == PipeStep.SENTINEL:
break
self.handle(raw)
self.records_read += 1
if self.print_stats:
this_update = datetime.now()
if timer.delta_sec(self.last_update, this_update) > 10.0:
self.last_update = this_update
self.dump_stats()
self.end_time = datetime.now()
except Q.Empty:
break
print self.label, "Received stop message... all done"
开发者ID:darchons,项目名称:telemetry-server,代码行数:18,代码来源:process_incoming_mp.py
示例14: work
def work(self):
self.log("Starting up")
while True:
try:
raw = self.q_in.get()
if raw == PipeStep.SENTINEL:
break
self.stats.reset()
self.handle(raw)
self.stats.update_end_time()
self.stats.save()
if self.print_stats:
this_update = now()
if timer.delta_sec(self.last_update, this_update) > 10.0:
self.last_update = this_update
self.log(self.stats.get_summary())
except Q.Empty:
break
self.log("Received stop message... work done")
开发者ID:SamPenrose,项目名称:telemetry-server,代码行数:19,代码来源:process_incoming_standalone.py
示例15: handle
def handle(self, record):
filename = record
base_ends = filename.find(".log") + 4
if base_ends < 4:
self.log("Bad filename encountered, skipping: " + filename)
self.stats.increment(records_read=1, bad_records=1,
bad_record_type="bad_filename")
return
basename = filename[0:base_ends]
# Get a unique name for the compressed file:
comp_name = basename + "." + uuid.uuid4().hex + StorageLayout.COMPRESSED_SUFFIX
comp_file = CompressedFile(comp_name, mode="w", open_now=True, compression_level=1)
# Rename uncompressed file to a temp name
tmp_name = comp_name + ".compressing"
os.rename(filename, tmp_name)
start = now()
try:
comp_file.compress_from(tmp_name, remove_original=False)
comp_file.close()
except Exception as e:
self.stats.increment(records_read=1, bad_records=1,
bad_record_type="compression_error")
self.log("Error compressing file {0}: {1}".format(filename, e))
return
raw_bytes = os.stat(tmp_name).st_size
comp_bytes = os.stat(comp_name).st_size
raw_mb = float(raw_bytes) / 1024.0 / 1024.0
comp_mb = float(comp_bytes) / 1024.0 / 1024.0
self.stats.increment(records_read=1, records_written=1,
bytes_read=raw_bytes, bytes_written=comp_bytes)
# Remove raw file
os.remove(tmp_name)
sec = timer.delta_sec(start, now())
self.log("Compressed %s as %s in %.2fs. Size before: %.2fMB, after:" \
" %.2fMB (r: %.2fMB/s, w: %.2fMB/s)" % (filename, comp_name,
sec, raw_mb, comp_mb, (raw_mb/sec), (comp_mb/sec)))
开发者ID:SamPenrose,项目名称:telemetry-server,代码行数:40,代码来源:process_incoming_standalone.py
示例16: fetch_s3_files
def fetch_s3_files(files, fetch_cwd, bucket_name, aws_key, aws_secret_key):
result = 0
if len(files) > 0:
if not os.path.isdir(fetch_cwd):
os.makedirs(fetch_cwd)
fetch_cmd = ["/usr/local/bin/s3funnel"]
fetch_cmd.append(bucket_name)
fetch_cmd.append("get")
fetch_cmd.append("-a")
fetch_cmd.append(aws_key)
fetch_cmd.append("-s")
fetch_cmd.append(aws_secret_key)
fetch_cmd.append("-t")
fetch_cmd.append("8")
start = datetime.now()
result = subprocess.call(fetch_cmd + files, cwd=fetch_cwd)
duration_sec = timer.delta_sec(start)
# TODO: verify MD5s
downloaded_bytes = sum([ os.path.getsize(os.path.join(fetch_cwd, f)) for f in files ])
downloaded_mb = downloaded_bytes / 1024.0 / 1024.0
print "Downloaded %.2fMB in %.2fs (%.2fMB/s)" % (downloaded_mb, duration_sec, downloaded_mb / duration_sec)
return result
开发者ID:SamPenrose,项目名称:telemetry-server,代码行数:22,代码来源:process_incoming_serial.py
示例17: get_filtered_files_s3
def get_filtered_files_s3(self):
if not self._local_only:
print "Fetching file list from S3..."
# Plain boto should be fast enough to list bucket contents.
if self._aws_key is not None:
conn = S3Connection(self._aws_key, self._aws_secret_key)
else:
conn = S3Connection()
bucket = conn.get_bucket(self._bucket_name)
start = datetime.now()
count = 0
# Filter input files by partition. If the filter is reasonably
# selective, this can be much faster than listing all files in the
# bucket.
for f in s3util.list_partitions(bucket, schema=self._input_filter, include_keys=True):
count += 1
if count == 1 or count % 1000 == 0:
print "Listed", count, "so far"
yield f
conn.close()
duration = timer.delta_sec(start)
print "Listed", count, "files in", duration, "seconds"
开发者ID:SamPenrose,项目名称:telemetry-server,代码行数:22,代码来源:job.py
示例18: list_files
def list_files(bucket_name, output_file, output_func=s3obj_to_string, prefix=''):
s3 = S3Connection()
bucket = s3.get_bucket(bucket_name)
total_count = 0
start_time = datetime.now()
done = False
last_key = ''
while not done:
try:
for k in bucket.list(prefix=prefix, marker=last_key):
last_key = k.name
total_count += 1
if total_count % 5000 == 0:
print "Looked at", total_count, "total records in", timer.delta_sec(start_time), "seconds. Last key was", last_key
try:
output_file.write(str(output_func(k)) + "\n")
except Exception, e:
print "Error writing key", k.name, ":", e
traceback.print_exc()
done = True
except socket.error, e:
print "Error listing keys:", e
traceback.print_exc()
print "Continuing from last seen key:", last_key
开发者ID:SamPenrose,项目名称:telemetry-server,代码行数:24,代码来源:bucket_list.py
示例19: main
def main():
args = get_args()
logging.basicConfig()
logger = logging.getLogger(__name__)
if args.verbose:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
logger.info("Expiring `flash_video` data older than {}.".format(args.expiry_date))
logger.debug("Connecting to S3...")
conn = S3Connection(args.aws_key, args.aws_secret_key)
bucket = conn.get_bucket(args.bucket)
connection_string = ""
if hasattr(args, "db_name"):
connection_string += "dbname={0} ".format(args.db_name)
if hasattr(args, "db_host"):
connection_string += "host={0} ".format(args.db_host)
if hasattr(args, "db_port"):
connection_string += "port={0} ".format(args.db_port)
if hasattr(args, "db_user"):
connection_string += "user={0} ".format(args.db_user)
if hasattr(args, "db_pass"):
connection_string += "password={0} ".format(args.db_pass)
db_conn = None
db_cursor = None
if should_run(args.dry_run, logger, "Connecting to database"):
db_conn = psycopg2.connect(connection_string)
db_cursor = db_conn.cursor()
prefix = args.prefix
last_key = ''
done = False
total_count = 0
exp_count = 0
total_bytes = 0
start_time = datetime.now()
while not done:
try:
for k in bucket.list(prefix=prefix, marker=last_key):
if k.name.endswith('/'):
logger.debug("Skipping directory '{}'".format(k.name))
continue
total_count += 1
if not should_expire(k.name, args.expiry_date, logger):
continue
exp_count += 1
total_bytes += k.size
last_key = k.name
if total_count % 100 == 0:
logger.debug("Expired {} of {} total files in {}s. Last key was {}".format(
exp_count, total_count, timer.delta_sec(start_time), last_key))
logger.info("Deleting {} from S3 bucket".format(k.name))
sql_update = "DELETE FROM published_files WHERE file_name = '{0}';".format(k.name)
if should_run(args.dry_run, logger, "Deleting from S3 bucket"):
k.delete()
if should_run(args.dry_run, logger, "Notifying coordinator"):
db_cursor.execute(sql_update)
db_conn.commit()
logger.debug("Coordinator notified")
done = True
except socket.error, e:
logger.error("Error listing keys: {}".format(e))
logger.error(traceback.format_exc())
logger.debug("Continuing from last seen key: {}".format(last_key))
开发者ID:SamPenrose,项目名称:telemetry-server,代码行数:68,代码来源:expire_flash_video.py
示例20: write_bad_record
# We don't need to write these bad records out - we know
# why they are being skipped.
if err_message != "Missing in payload: info.revision":
# TODO: recognize other common failure modes and handle them gracefully.
self.write_bad_record(key, dims, data, err_message, "Conversion Error:")
traceback.print_exc()
if self.print_stats:
this_update = datetime.now()
sec = timer.delta_sec(self.last_update, this_update)
if sec > 10.0:
self.last_update = this_update
self.end_time = datetime.now()
self.dump_stats()
duration = timer.delta_sec(start)
mb_read = bytes_read / 1024.0 / 1024.0
# Stats for the current file:
print self.label, "- Read %d records %.2fMB in %.2fs (%.2fMB/s)" % (record_count, mb_read, duration, mb_read / duration)
except Exception, e:
# Corrupted data, let's skip this record.
print self.label, "- Error reading raw data from ", raw_file, e
traceback.print_exc()
def write_bad_record(self, key, dims, data, error, message=None):
self.bad_records += 1
if message is not None:
print self.label, message, error
if self.bad_filename is not None:
try:
path = u"/".join([key] + dims)
开发者ID:darchons,项目名称:telemetry-server,代码行数:31,代码来源:process_incoming_mp.py
注:本文中的telemetry.util.timer.delta_sec函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论