本文整理汇总了Python中wal_e.retries.retry_with_count函数的典型用法代码示例。如果您正苦于以下问题:Python retry_with_count函数的具体用法?Python retry_with_count怎么用?Python retry_with_count使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了retry_with_count函数的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: do_lzop_get
def do_lzop_get(access_key, secret_key, url, path, decrypt):
"""
Get and decompress a S3 URL
This streams the content directly to lzop; the compressed version
is never stored on disk.
"""
assert url.endswith('.lzo'), 'Expect an lzop-compressed file'
def log_wal_fetch_failures_on_error(exc_tup, exc_processor_cxt):
def standard_detail_message(prefix=''):
return (prefix + ' There have been {n} attempts to fetch wal '
'file {url} so far.'.format(n=exc_processor_cxt, url=url))
typ, value, tb = exc_tup
del exc_tup
# Screen for certain kinds of known-errors to retry from
if issubclass(typ, socket.error):
socketmsg = value[1] if isinstance(value, tuple) else value
logger.info(
msg='Retrying fetch because of a socket error',
detail=standard_detail_message(
"The socket error's message is '{0}'."
.format(socketmsg)))
elif (issubclass(typ, boto.exception.S3ResponseError) and
value.error_code == 'RequestTimeTooSkewed'):
logger.info(msg='Retrying fetch because of a Request Skew time',
detail=standard_detail_message())
else:
# For all otherwise untreated exceptions, report them as a
# warning and retry anyway -- all exceptions that can be
# justified should be treated and have error messages
# listed.
logger.warning(
msg='retrying WAL file fetch from unexpected exception',
detail=standard_detail_message(
'The exception type is {etype} and its value is '
'{evalue} and its traceback is {etraceback}'
.format(etype=typ, evalue=value,
etraceback=''.join(traceback.format_tb(tb)))))
# Help Python GC by resolving possible cycles
del tb
@retry(retry_with_count(log_wal_fetch_failures_on_error))
def download():
with open(path, 'wb') as decomp_out:
key = _uri_to_key(access_key, secret_key, url)
pipeline = get_download_pipeline(PIPE, decomp_out, decrypt)
g = gevent.spawn(write_and_return_error, key, pipeline.stdin)
try:
# Raise any exceptions from write_and_return_error
exc = g.get()
if exc is not None:
raise exc
except boto.exception.S3ResponseError, e:
if e.status == 404:
# Do not retry if the key not present, this can happen
# under normal situations.
logger.warning(
msg=('could no longer locate object while performing '
'wal restore'),
detail=('The absolute URI that could not be located '
'is {url}.'.format(url=url)),
hint=('This can be normal when Postgres is trying to '
'detect what timelines are available during '
'restoration.'))
return False
else:
raise
pipeline.finish()
logger.info(
msg='completed download and decompression',
detail='Downloaded and decompressed "{url}" to "{path}"'
.format(url=url, path=path))
return True
开发者ID:runway20,项目名称:wal-e,代码行数:81,代码来源:s3_util.py
示例2: do_partition_put
def do_partition_put(backup_s3_prefix, tpart, rate_limit, gpg_key):
"""
Synchronous version of the s3-upload wrapper
"""
logger.info(msg='beginning volume compression',
detail='Building volume {name}.'.format(name=tpart.name))
with tempfile.NamedTemporaryFile(mode='rwb') as tf:
pipeline = get_upload_pipeline(PIPE, tf,
rate_limit=rate_limit, gpg_key=gpg_key)
tpart.tarfile_write(pipeline.stdin)
pipeline.stdin.flush()
pipeline.stdin.close()
pipeline.finish()
tf.flush()
s3_url = '/'.join([backup_s3_prefix, 'tar_partitions',
'part_{number}.tar.lzo'
.format(number=tpart.name)])
logger.info(
msg='begin uploading a base backup volume',
detail=('Uploading to "{s3_url}".')
.format(s3_url=s3_url))
def log_volume_failures_on_error(exc_tup, exc_processor_cxt):
def standard_detail_message(prefix=''):
return (prefix + ' There have been {n} attempts to send the '
'volume {name} so far.'.format(n=exc_processor_cxt,
name=tpart.name))
typ, value, tb = exc_tup
del exc_tup
# Screen for certain kinds of known-errors to retry from
if issubclass(typ, socket.error):
socketmsg = value[1] if isinstance(value, tuple) else value
logger.info(
msg='Retrying send because of a socket error',
detail=standard_detail_message(
"The socket error's message is '{0}'."
.format(socketmsg)))
elif (issubclass(typ, boto.exception.S3ResponseError) and
value.error_code == 'RequestTimeTooSkewed'):
logger.info(msg='Retrying send because of a Request Skew time',
detail=standard_detail_message())
else:
# This type of error is unrecognized as a retry-able
# condition, so propagate it, original stacktrace and
# all.
raise typ, value, tb
@retry(retry_with_count(log_volume_failures_on_error))
def put_file_helper():
tf.seek(0)
return uri_put_file(s3_url, tf)
# Actually do work, retrying if necessary, and timing how long
# it takes.
clock_start = time.clock()
k = put_file_helper()
clock_finish = time.clock()
kib_per_second = format_kib_per_second(clock_start, clock_finish,
k.size)
logger.info(
msg='finish uploading a base backup volume',
detail=('Uploading to "{s3_url}" complete at '
'{kib_per_second}KiB/s. ')
.format(s3_url=s3_url, kib_per_second=kib_per_second))
开发者ID:riegie,项目名称:wal-e,代码行数:74,代码来源:s3_worker.py
示例3: __call__
def __call__(self, tpart):
"""
Synchronous version of the upload wrapper
"""
logger.info(msg="beginning volume compression", detail="Building volume {name}.".format(name=tpart.name))
with tempfile.NamedTemporaryFile(mode="rwb") as tf:
pipeline = get_upload_pipeline(PIPE, tf, rate_limit=self.rate_limit, gpg_key=self.gpg_key)
tpart.tarfile_write(pipeline.stdin)
pipeline.stdin.flush()
pipeline.stdin.close()
pipeline.finish()
tf.flush()
# TODO :: Move arbitray path construction to StorageLayout Object
url = "{0}/tar_partitions/part_{number}.tar.lzo".format(self.backup_prefix.rstrip("/"), number=tpart.name)
logger.info(msg="begin uploading a base backup volume", detail='Uploading to "{url}".'.format(url=url))
def log_volume_failures_on_error(exc_tup, exc_processor_cxt):
def standard_detail_message(prefix=""):
return prefix + " There have been {n} attempts to send the " "volume {name} so far.".format(
n=exc_processor_cxt, name=tpart.name
)
typ, value, tb = exc_tup
del exc_tup
# Screen for certain kinds of known-errors to retry from
if issubclass(typ, socket.error):
socketmsg = value[1] if isinstance(value, tuple) else value
logger.info(
msg="Retrying send because of a socket error",
detail=standard_detail_message("The socket error's message is '{0}'.".format(socketmsg)),
)
elif issubclass(typ, boto.exception.S3ResponseError) and value.error_code == "RequestTimeTooSkewed":
logger.info(msg="Retrying send because of a Request Skew time", detail=standard_detail_message())
else:
# This type of error is unrecognized as a retry-able
# condition, so propagate it, original stacktrace and
# all.
raise typ, value, tb
@retry(retry_with_count(log_volume_failures_on_error))
def put_file_helper():
tf.seek(0)
return self.blobstore.uri_put_file(self.creds, url, tf)
# Actually do work, retrying if necessary, and timing how long
# it takes.
clock_start = time.time()
k = put_file_helper()
clock_finish = time.time()
kib_per_second = format_kib_per_second(clock_start, clock_finish, k.size)
logger.info(
msg="finish uploading a base backup volume",
detail=(
'Uploading to "{url}" complete at '
"{kib_per_second}KiB/s. ".format(url=url, kib_per_second=kib_per_second)
),
)
return tpart
开发者ID:kitchen,项目名称:wal-e,代码行数:68,代码来源:upload.py
示例4: do_lzop_get
def do_lzop_get(creds, url, path, decrypt):
"""
Get and decompress a S3 URL
This streams the content directly to lzop; the compressed version
is never stored on disk.
"""
assert url.endswith('.lzo'), 'Expect an lzop-compressed file'
assert url.startswith('wabs://')
conn = BlobService(creds.account_name, creds.account_key, protocol='https')
def log_wal_fetch_failures_on_error(exc_tup, exc_processor_cxt):
def standard_detail_message(prefix=''):
return (prefix + ' There have been {n} attempts to fetch wal '
'file {url} so far.'.format(n=exc_processor_cxt, url=url))
typ, value, tb = exc_tup
del exc_tup
# Screen for certain kinds of known-errors to retry from
if issubclass(typ, socket.error):
socketmsg = value[1] if isinstance(value, tuple) else value
logger.info(
msg='Retrying fetch because of a socket error',
detail=standard_detail_message(
"The socket error's message is '{0}'."
.format(socketmsg)))
else:
# For all otherwise untreated exceptions, report them as a
# warning and retry anyway -- all exceptions that can be
# justified should be treated and have error messages
# listed.
logger.warning(
msg='retrying WAL file fetch from unexpected exception',
detail=standard_detail_message(
'The exception type is {etype} and its value is '
'{evalue} and its traceback is {etraceback}'
.format(etype=typ, evalue=value,
etraceback=''.join(traceback.format_tb(tb)))))
# Help Python GC by resolving possible cycles
del tb
@retry(retry_with_count(log_wal_fetch_failures_on_error))
def download():
with open(path, 'wb') as decomp_out:
pipeline = get_download_pipeline(PIPE, decomp_out, decrypt)
g = gevent.spawn(write_and_return_error, url, conn, pipeline.stdin)
try:
# Raise any exceptions from _write_and_close
g.get()
except WindowsAzureMissingResourceError:
# Short circuit any re-try attempts under certain race
# conditions.
logger.warn(
msg=('could no longer locate object while performing '
'wal restore'),
detail=('The URI at {url} no longer exists.'
.format(url=url)),
hint=('This can be normal when Postgres is trying to '
'detect what timelines are available during '
'restoration.'))
return False
pipeline.finish()
logger.info(
msg='completed download and decompression',
detail='Downloaded and decompressed "{url}" to "{path}"'
.format(url=url, path=path))
return True
return download()
开发者ID:boldfield,项目名称:wal-e,代码行数:76,代码来源:wabs_util.py
示例5: uri_put_file
def uri_put_file(creds, uri, fp, content_encoding=None):
assert fp.tell() == 0
assert uri.startswith('wabs://')
def log_upload_failures_on_error(exc_tup, exc_processor_cxt):
def standard_detail_message(prefix=''):
return (prefix + ' There have been {n} attempts to upload '
'file {url} so far.'.format(n=exc_processor_cxt, url=uri))
typ, value, tb = exc_tup
del exc_tup
# Screen for certain kinds of known-errors to retry from
if issubclass(typ, socket.error):
socketmsg = value[1] if isinstance(value, tuple) else value
logger.info(
msg='Retrying upload because of a socket error',
detail=standard_detail_message(
"The socket error's message is '{0}'."
.format(socketmsg)))
else:
# For all otherwise untreated exceptions, report them as a
# warning and retry anyway -- all exceptions that can be
# justified should be treated and have error messages
# listed.
logger.warning(
msg='retrying file upload from unexpected exception',
detail=standard_detail_message(
'The exception type is {etype} and its value is '
'{evalue} and its traceback is {etraceback}'
.format(etype=typ, evalue=value,
etraceback=''.join(traceback.format_tb(tb)))))
# Help Python GC by resolving possible cycles
del tb
# Because we're uploading in chunks, catch rate limiting and
# connection errors which occur for each individual chunk instead of
# failing the whole file and restarting.
@retry(retry_with_count(log_upload_failures_on_error))
def upload_chunk(chunk, block_id):
check_sum = base64.encodestring(md5(chunk).digest()).strip('\n')
conn.put_block(url_tup.netloc, url_tup.path, chunk,
block_id, content_md5=check_sum)
url_tup = urlparse(uri)
kwargs = dict(x_ms_blob_type='BlockBlob')
if content_encoding is not None:
kwargs['x_ms_blob_content_encoding'] = content_encoding
conn = BlobService(creds.account_name, creds.account_key, protocol='https')
conn.put_blob(url_tup.netloc, url_tup.path, '', **kwargs)
# WABS requires large files to be uploaded in 4MB chunks
block_ids = []
length, index = 0, 0
pool_size = os.getenv('WABS_UPLOAD_POOL_SIZE', 5)
p = gevent.pool.Pool(size=pool_size)
while True:
data = fp.read(WABS_CHUNK_SIZE)
if data:
length += len(data)
block_id = base64.b64encode(str(index))
p.wait_available()
p.spawn(upload_chunk, data, block_id)
block_ids.append(block_id)
index += 1
else:
p.join()
break
conn.put_block_list(url_tup.netloc, url_tup.path, block_ids)
# To maintain consistency with the S3 version of this function we must
# return an object with a certain set of attributes. Currently, that set
# of attributes consists of only 'size'
return _Key(size=len(data))
开发者ID:boldfield,项目名称:wal-e,代码行数:77,代码来源:wabs_util.py
示例6: do_lzop_get
def do_lzop_get(creds, url, path, decrypt, do_retry=True):
"""
Get and decompress a S3 URL
This streams the content directly to lzop; the compressed version
is never stored on disk.
"""
assert url.endswith(".lzo"), "Expect an lzop-compressed file"
assert url.startswith("wabs://")
conn = BlobService(creds.account_name, creds.account_key, protocol="https")
def log_wal_fetch_failures_on_error(exc_tup, exc_processor_cxt):
def standard_detail_message(prefix=""):
return prefix + " There have been {n} attempts to fetch wal " "file {url} so far.".format(
n=exc_processor_cxt, url=url
)
typ, value, tb = exc_tup
del exc_tup
# Screen for certain kinds of known-errors to retry from
if issubclass(typ, socket.error):
socketmsg = value[1] if isinstance(value, tuple) else value
logger.info(
msg="Retrying fetch because of a socket error",
detail=standard_detail_message("The socket error's message is '{0}'.".format(socketmsg)),
)
else:
# For all otherwise untreated exceptions, report them as a
# warning and retry anyway -- all exceptions that can be
# justified should be treated and have error messages
# listed.
logger.warning(
msg="retrying WAL file fetch from unexpected exception",
detail=standard_detail_message(
"The exception type is {etype} and its value is "
"{evalue} and its traceback is {etraceback}".format(
etype=typ, evalue=value, etraceback="".join(traceback.format_tb(tb))
)
),
)
# Help Python GC by resolving possible cycles
del tb
def download():
with files.DeleteOnError(path) as decomp_out:
with get_download_pipeline(PIPE, decomp_out.f, decrypt) as pl:
g = gevent.spawn(write_and_return_error, url, conn, pl.stdin)
try:
# Raise any exceptions guarded by
# write_and_return_error.
exc = g.get()
if exc is not None:
raise exc
except AzureMissingResourceHttpError:
# Short circuit any re-try attempts under certain race
# conditions.
pl.abort()
logger.warning(
msg=("could no longer locate object while " "performing wal restore"),
detail=("The absolute URI that could not be " "located is {url}.".format(url=url)),
hint=(
"This can be normal when Postgres is trying "
"to detect what timelines are available "
"during restoration."
),
)
decomp_out.remove_regardless = True
return False
logger.info(
msg="completed download and decompression",
detail='Downloaded and decompressed "{url}" to "{path}"'.format(url=url, path=path),
)
return True
if do_retry:
download = retry(retry_with_count(log_wal_fetch_failures_on_error))(download)
return download()
开发者ID:sayantanbagchi88,项目名称:wal-e,代码行数:85,代码来源:wabs_util.py
示例7: retry
detail=('The absolute URI that could not be '
'located is {url}.'.format(url=url)),
hint=('This can be normal when Postgres is trying '
'to detect what timelines are available '
'during restoration.'))
return False
else:
raise
logger.info(
msg='completed download and decompression',
detail='Downloaded and decompressed "{url}" to "{path}"'
.format(url=url, path=path))
return True
if do_retry:
download = retry(
retry_with_count(log_wal_fetch_failures_on_error))(download)
return download()
def write_and_return_error(key, stream):
try:
key.get_contents_to_file(stream)
stream.flush()
except Exception, e:
return e
finally:
stream.close()
开发者ID:abg,项目名称:wal-e,代码行数:30,代码来源:s3_util.py
示例8: do_lzop_get
def do_lzop_get(creds, url, path, decrypt, do_retry=True):
"""
Get and decompress a S3 URL
This streams the content directly to lzop; the compressed version
is never stored on disk.
"""
assert url.endswith('.lzo'), 'Expect an lzop-compressed file'
def log_wal_fetch_failures_on_error(exc_tup, exc_processor_cxt):
def standard_detail_message(prefix=''):
return (prefix + ' There have been {n} attempts to fetch wal '
'file {url} so far.'.format(n=exc_processor_cxt, url=url))
typ, value, tb = exc_tup
del exc_tup
# Screen for certain kinds of known-errors to retry from
if issubclass(typ, socket.error):
socketmsg = value[1] if isinstance(value, tuple) else value
logger.info(
msg='Retrying fetch because of a socket error',
detail=standard_detail_message(
"The socket error's message is '{0}'."
.format(socketmsg)))
else:
# For all otherwise untreated exceptions, report them as a
# warning and retry anyway -- all exceptions that can be
# justified should be treated and have error messages
# listed.
logger.warning(
msg='retrying WAL file fetch from unexpected exception',
detail=standard_detail_message(
'The exception type is {etype} and its value is '
'{evalue} and its traceback is {etraceback}'
.format(etype=typ, evalue=value,
etraceback=''.join(traceback.format_tb(tb)))))
# Help Python GC by resolving possible cycles
del tb
def download():
with files.DeleteOnError(path) as decomp_out:
blob = _uri_to_blob(creds, url)
with get_download_pipeline(PIPE, decomp_out.f, decrypt) as pl:
signed = blob.generate_signed_url(
datetime.utcnow() + timedelta(minutes=10))
g = gevent.spawn(write_and_return_error, signed, pl.stdin)
try:
# Raise any exceptions from write_and_return_error
exc = g.get()
if exc is not None:
raise exc
except urllib2.HTTPError as e:
if e.code == 404:
# Do not retry if the blob not present, this
# can happen under normal situations.
pl.abort()
logger.warning(
msg=('could no longer locate object while '
'performing wal restore'),
detail=('The absolute URI that could not be '
'located is {url}.'.format(url=url)),
hint=('This can be normal when Postgres is trying '
'to detect what timelines are available '
'during restoration.'))
decomp_out.remove_regardless = True
return False
raise
logger.info(
msg='completed download and decompression',
detail='Downloaded and decompressed "{url}" to "{path}"'
.format(url=url, path=path))
return True
if do_retry:
download = retry(
retry_with_count(log_wal_fetch_failures_on_error))(download)
return download()
开发者ID:PalmStoneGames,项目名称:wal-e,代码行数:84,代码来源:utils.py
示例9: __call__
def __call__(self, tpart):
"""
Synchronous version of the upload wrapper
"""
logger.info(msg='beginning volume compression',
detail='Building volume {name}.'.format(name=tpart.name))
with tempfile.NamedTemporaryFile(
mode='r+b', bufsize=pipebuf.PIPE_BUF_BYTES) as tf:
with pipeline.get_upload_pipeline(PIPE, tf,
rate_limit=self.rate_limit,
gpg_key=self.gpg_key) as pl:
tpart.tarfile_write(pl.stdin)
tf.flush()
# TODO :: Move arbitray path construction to StorageLayout Object
url = '{0}/tar_partitions/part_{number:08d}.tar.lzo'.format(
self.backup_prefix.rstrip('/'), number=tpart.name)
manifest_file = StringIO(tpart.format_manifest())
manifest_url = '{0}/manifests/part_{number:08d}.json'.format(
self.backup_prefix.rstrip('/'), number=tpart.name)
logger.info(msg='begin uploading a base backup volume',
detail='Uploading to "{url}".'.format(url=url))
def log_volume_failures_on_error(exc_tup, exc_processor_cxt):
def standard_detail_message(prefix=''):
return (prefix +
' There have been {n} attempts to send the '
'volume {name} so far.'.format(n=exc_processor_cxt,
name=tpart.name))
typ, value, tb = exc_tup
del exc_tup
# Screen for certain kinds of known-errors to retry from
if issubclass(typ, socket.error):
socketmsg = value[1] if isinstance(value, tuple) else value
logger.info(
msg='Retrying send because of a socket error',
detail=standard_detail_message(
"The socket error's message is '{0}'."
.format(socketmsg)))
elif (issubclass(typ, boto.exception.S3ResponseError) and
value.error_code == 'RequestTimeTooSkewed'):
logger.info(
msg='Retrying send because of a Request Skew time',
detail=standard_detail_message())
else:
# This type of error is unrecognized as a retry-able
# condition, so propagate it, original stacktrace and
# all.
raise typ, value, tb
@retry(retry_with_count(log_volume_failures_on_error))
def put_file_helper(_creds, _url, _file):
_file.seek(0)
return self.blobstore.uri_put_file(_creds, _url, _file)
# Actually do work, retrying if necessary, and timing how long
# it takes.
clock_start = time.time()
k = put_file_helper(self.creds, url, tf)
k2 = put_file_helper(self.creds, manifest_url, manifest_file)
clock_finish = time.time()
kib_per_second = format_kib_per_second(clock_start, clock_finish,
k.size + k2.size)
logger.info(
msg='finish uploading a base backup volume',
detail=('Uploading to "{url}" complete at '
'{kib_per_second}KiB/s. '
.format(url=url, kib_per_second=kib_per_second)))
return tpart
开发者ID:sayantanbagchi88,项目名称:wal-e,代码行数:80,代码来源:upload.py
示例10: do_lzop_get
def do_lzop_get(creds, uri, path, decrypt):
"""
Get and decompress a Swift URL
This streams the content directly to lzop; the compressed version
is never stored on disk.
"""
assert uri.endswith('.lzo'), 'Expect an lzop-compressed file'
def log_wal_fetch_failures_on_error(exc_tup, exc_processor_cxt):
def standard_detail_message(prefix=''):
return (prefix + ' There have been {n} attempts to fetch wal '
'file {uri} so far.'.format(n=exc_processor_cxt, uri=uri))
typ, value, tb = exc_tup
del exc_tup
# Screen for certain kinds of known-errors to retry from
if issubclass(typ, socket.error):
socketmsg = value[1] if isinstance(value, tuple) else value
logger.info(
msg='Retrying fetch because of a socket error',
detail=standard_detail_message(
"The socket error's message is '{0}'."
.format(socketmsg)))
else:
# For all otherwise untreated exceptions, report them as a
# warning and retry anyway -- all exceptions that can be
# justified should be treated and have error messages
# listed.
logger.warning(
msg='retrying WAL file fetch from unexpected exception',
detail=standard_detail_message(
'The exception type is {etype} and its value is '
'{evalue} and its traceback is {etraceback}'
.format(etype=typ, evalue=value,
etraceback=''.join(traceback.format_tb(tb)))))
# Help Python GC by resolving possible cycles
del tb
@retry(retry_with_count(log_wal_fetch_failures_on_error))
def download():
with open(path, 'wb') as decomp_out:
pipeline = get_download_pipeline(PIPE, decomp_out, decrypt)
conn = calling_format.connect(creds)
g = gevent.spawn(write_and_return_error, uri, conn, pipeline.stdin)
# Raise any exceptions from write_and_return_error
exc = g.get()
if exc is not None:
raise exc
pipeline.finish()
logger.info(
msg='completed download and decompression',
detail='Downloaded and decompressed "{uri}" to "{path}"'
.format(uri=uri, path=path))
return True
return download()
开发者ID:digvan,项目名称:wal-e,代码行数:65,代码来源:utils.py
注:本文中的wal_e.retries.retry_with_count函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论