本文整理汇总了Python中waterbutler.core.path.WaterButlerPath类的典型用法代码示例。如果您正苦于以下问题:Python WaterButlerPath类的具体用法?Python WaterButlerPath怎么用?Python WaterButlerPath使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了WaterButlerPath类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: create_folder
def create_folder(self, path, **kwargs):
WaterButlerPath.validate_folder(path)
if path.identifier is not None:
raise exceptions.FolderNamingConflict(str(path))
resp = yield from self.make_request(
'POST',
self.build_url('folders'),
data={
'name': path.name,
'parent': {
'id': path.parent.identifier
}
},
expects=(201, 409),
throws=exceptions.CreateFolderError,
)
# Catch 409s to avoid race conditions
if resp.status == 409:
raise exceptions.FolderNamingConflict(str(path))
return BoxFolderMetadata(
(yield from resp.json()),
path
).serialized()
开发者ID:bdyetton,项目名称:waterbutler,代码行数:27,代码来源:provider.py
示例2: create_folder
async def create_folder(self, path: WaterButlerPath, folder_precheck: bool=True,
**kwargs) -> BoxFolderMetadata:
WaterButlerPath.validate_folder(path)
if folder_precheck:
if path.identifier is not None:
raise exceptions.FolderNamingConflict(path.name)
async with self.request(
'POST',
self.build_url('folders'),
data={
'name': path.name,
'parent': {
'id': path.parent.identifier
}
},
expects=(201, 409),
throws=exceptions.CreateFolderError,
) as resp:
# Catch 409s to avoid race conditions
if resp.status == 409:
raise exceptions.FolderNamingConflict(path.name)
resp_json = await resp.json()
# save new folder's id into the WaterButlerPath object. logs will need it later.
path._parts[-1]._id = resp_json['id']
return BoxFolderMetadata(resp_json, path)
开发者ID:CenterForOpenScience,项目名称:waterbutler,代码行数:27,代码来源:provider.py
示例3: validate_path
def validate_path(self, path, **kwargs):
split = path.rstrip('/').split('/')[1:]
wbpath = WaterButlerPath('/', _ids=(self.settings['project_id'], ), folder=True)
if split:
name_or_id = split.pop(0)
try:
article = yield from self._assert_contains_article(name_or_id)
except ValueError:
return wbpath.child(name_or_id, folder=False)
except exceptions.ProviderError as e:
if e.code not in (404, 401):
raise
return wbpath.child(name_or_id, folder=False)
wbpath = wbpath.child(article['title'], article['id'], folder=True)
if split:
provider = yield from self._make_article_provider(article['id'], check_parent=False)
try:
return (yield from provider.validate_path('/'.join([''] + split), parent=wbpath))
except exceptions.ProviderError as e:
if e.code not in (404, 401):
raise
return wbpath.child(split.pop(0), folder=False)
return wbpath
开发者ID:pattisdr,项目名称:waterbutler,代码行数:27,代码来源:provider.py
示例4: handle_name_conflict
async def handle_name_conflict(self,
path: wb_path.WaterButlerPath,
conflict: str='replace',
**kwargs) -> typing.Tuple[wb_path.WaterButlerPath, bool]:
"""Check WaterButlerPath and resolve conflicts
Given a WaterButlerPath and a conflict resolution pattern determine
the correct file path to upload to and indicate if that file exists or not
:param path: ( :class:`.WaterButlerPath` ) Desired path to check for conflict
:param conflict: ( :class:`str` ) replace, keep, warn
:rtype: (:class:`.WaterButlerPath` or False)
:raises: :class:`.NamingConflict`
"""
exists = await self.exists(path, **kwargs)
if (not exists and not exists == []) or conflict == 'replace':
return path, exists # type: ignore
if conflict == 'warn':
raise exceptions.NamingConflict(path.name)
while True:
path.increment_name()
test_path = await self.revalidate_path(
path.parent,
path.name,
folder=path.is_dir
)
exists = await self.exists(test_path, **kwargs)
if not (exists or exists == []):
break
return path, False
开发者ID:CenterForOpenScience,项目名称:waterbutler,代码行数:33,代码来源:provider.py
示例5: create_folder
def create_folder(self, path, **kwargs):
WaterButlerPath.validate_folder(path)
if path.identifier is not None:
raise exceptions.FolderNamingConflict(str(path))
resp = yield from self.make_request(
'POST',
self.build_url('folders'),
data={
'name': path.name,
'parent': {
'id': path.parent.identifier
}
},
expects=(201, 409),
throws=exceptions.CreateFolderError,
)
# Catch 409s to avoid race conditions
if resp.status == 409:
raise exceptions.FolderNamingConflict(str(path))
resp_json = yield from resp.json()
# save new folder's id into the WaterButlerPath object. logs will need it later.
path._parts[-1]._id = resp_json['id']
return BoxFolderMetadata(resp_json, path)
开发者ID:cslzchen,项目名称:waterbutler,代码行数:27,代码来源:provider.py
示例6: test_intra_move_folder_replace
async def test_intra_move_folder_replace(self, provider, intra_fixtures, root_provider_fixtures):
item = intra_fixtures['intra_folder_metadata']
list_metadata = root_provider_fixtures['folder_list_metadata']
src_path = WaterButlerPath('/name/', _ids=(provider, item['id']))
dest_path = WaterButlerPath('/charmander/name/', _ids=(provider, item['id'], item['id']))
file_url = provider.build_url('folders', src_path.identifier)
delete_url = provider.build_url('folders', dest_path.identifier, recursive=True)
list_url = provider.build_url('folders', item['id'], 'items',
fields='id,name,size,modified_at,etag,total_count',
offset=0, limit=1000)
aiohttpretty.register_json_uri('PUT', file_url, body=item)
aiohttpretty.register_uri('DELETE', delete_url, status=204)
aiohttpretty.register_json_uri('GET', list_url, body=list_metadata)
expected_folder = BoxFolderMetadata(item, dest_path)
expected_folder._children = []
for child_item in list_metadata['entries']:
child_path = dest_path.child(child_item['name'], folder=(child_item['type'] == 'folder'))
serialized_child = provider._serialize_item(child_item, child_path)
expected_folder._children.append(serialized_child)
expected = (expected_folder, False)
result = await provider.intra_move(provider, src_path, dest_path)
assert result == expected
assert aiohttpretty.has_call(method='DELETE', uri=delete_url)
开发者ID:CenterForOpenScience,项目名称:waterbutler,代码行数:29,代码来源:test_provider.py
示例7: test_intra_copy_folder
async def test_intra_copy_folder(self, provider, intra_fixtures, root_provider_fixtures):
item = intra_fixtures['intra_folder_metadata']
list_metadata = root_provider_fixtures['folder_list_metadata']
src_path = WaterButlerPath('/name/', _ids=(provider, item['id']))
dest_path = WaterButlerPath('/charmander/name/', _ids=(provider, item['id']))
file_url = provider.build_url('folders', src_path.identifier, 'copy')
list_url = provider.build_url('folders', item['id'], 'items',
fields='id,name,size,modified_at,etag,total_count',
offset=0, limit=1000)
aiohttpretty.register_json_uri('GET', list_url, body=list_metadata)
aiohttpretty.register_json_uri('POST', file_url, body=item)
expected_folder = BoxFolderMetadata(item, dest_path)
expected_folder._children = []
for child_item in list_metadata['entries']:
child_path = dest_path.child(child_item['name'], folder=(child_item['type'] == 'folder'))
serialized_child = provider._serialize_item(child_item, child_path)
expected_folder._children.append(serialized_child)
expected = (expected_folder, True)
result = await provider.intra_copy(provider, src_path, dest_path)
assert result == expected
开发者ID:CenterForOpenScience,项目名称:waterbutler,代码行数:26,代码来源:test_provider.py
示例8: create_folder
async def create_folder(self, path, **kwargs):
"""
:param str path: The path to create a folder at
"""
WaterButlerPath.validate_folder(path)
response = await self.make_request(
'POST',
self.build_url('fileops', 'create_folder'),
params={
'root': 'auto',
'path': path.full_path
},
expects=(200, 403),
throws=exceptions.CreateFolderError
)
data = await response.json()
if response.status == 403:
if 'because a file or folder already exists at path' in data.get('error'):
raise exceptions.FolderNamingConflict(str(path))
raise exceptions.CreateFolderError(data, code=403)
return DropboxFolderMetadata(data, self.folder)
开发者ID:DataConservancy,项目名称:waterbutler,代码行数:25,代码来源:provider.py
示例9: test_rename
def test_rename(self):
path = WaterButlerPath('/this/is/a/long/path')
assert path.name == 'path'
path.rename('journey')
assert path.name == 'journey'
开发者ID:erinspace,项目名称:waterbutler,代码行数:8,代码来源:test_path.py
示例10: create_folder
async def create_folder(self, path, **kwargs):
"""
:param str path: The path to create a folder at
"""
WaterButlerPath.validate_folder(path)
data = await self.dropbox_request(
self.build_url('files', 'create_folder'),
{'path': path.full_path.rstrip('/')},
throws=exceptions.CreateFolderError,
)
return DropboxFolderMetadata(data, self.folder)
开发者ID:felliott,项目名称:waterbutler,代码行数:11,代码来源:provider.py
示例11: move
async def move(src_bundle, dest_bundle, start_time=None, **kwargs):
start_time = start_time or time.time()
src_path, src_provider = src_bundle.pop('path'), utils.make_provider(**src_bundle.pop('provider'))
dest_path, dest_provider = dest_bundle.pop('path'), utils.make_provider(**dest_bundle.pop('provider'))
logger.info('Starting moving {!r}, {!r} to {!r}, {!r}'.format(src_path, src_provider, dest_path, dest_provider))
metadata, errors = None, []
try:
metadata, created = await src_provider.move(dest_provider, src_path, dest_path, **kwargs)
except Exception as e:
logger.error('Move failed with error {!r}'.format(e))
errors = [e.__repr__()]
raise # Ensure sentry sees this
else:
logger.info('Move succeeded')
dest_path = WaterButlerPath.from_metadata(metadata)
finally:
source = LogPayload(src_bundle['nid'], src_provider, path=src_path)
destination = LogPayload(
dest_bundle['nid'], dest_provider, path=dest_path, metadata=metadata
)
await utils.log_to_callback(
'move',
source=source,
destination=destination,
start_time=start_time,
errors=errors
)
return metadata, created
开发者ID:ccfair,项目名称:waterbutler,代码行数:33,代码来源:move.py
示例12: copy
async def copy(src_bundle, dest_bundle, request={}, start_time=None, **kwargs):
start_time = start_time or time.time()
src_path, src_provider = src_bundle.pop('path'), utils.make_provider(**src_bundle.pop('provider'))
dest_path, dest_provider = dest_bundle.pop('path'), utils.make_provider(**dest_bundle.pop('provider'))
logger.info('Starting copying {!r}, {!r} to {!r}, {!r}'
.format(src_path, src_provider, dest_path, dest_provider))
metadata, errors = None, []
try:
metadata, created = await src_provider.copy(dest_provider, src_path, dest_path, **kwargs)
except Exception as e:
logger.error('Copy failed with error {!r}'.format(e))
errors = [e.__repr__()]
raise # Ensure sentry sees this
else:
logger.info('Copy succeeded')
dest_path = WaterButlerPath.from_metadata(metadata)
finally:
source = LogPayload(src_bundle['nid'], src_provider, path=src_path)
destination = LogPayload(
dest_bundle['nid'], dest_provider, path=dest_path, metadata=metadata
)
await remote_logging.wait_for_log_futures(
'copy', source=source, destination=destination, start_time=start_time,
errors=errors, request=request, api_version='celery',
)
return metadata, created
开发者ID:CenterForOpenScience,项目名称:waterbutler,代码行数:31,代码来源:copy.py
示例13: revalidate_path
async def revalidate_path(self, base: WaterButlerPath, path: str,
folder: bool=None) -> WaterButlerPath:
# TODO Research the search api endpoint
async with self.request(
'GET',
self.build_url('folders', base.identifier, 'items',
fields='id,name,type', limit=1000),
expects=(200,),
throws=exceptions.ProviderError
) as resp:
data = await resp.json()
lower_name = path.lower()
try:
item = next(
x for x in data['entries']
if x['name'].lower() == lower_name and (
folder is None or
(x['type'] == 'folder') == folder
)
)
name = path # Use path over x['name'] because of casing issues
_id = item['id']
folder = item['type'] == 'folder'
except StopIteration:
_id = None
name = path
return base.child(name, _id=_id, folder=folder)
开发者ID:CenterForOpenScience,项目名称:waterbutler,代码行数:29,代码来源:provider.py
示例14: create_folder
def create_folder(self, path, **kwargs):
"""
:param str path: The path to create a folder at
"""
WaterButlerPath.validate_folder(path)
if (yield from self.exists(path)):
raise exceptions.FolderNamingConflict(str(path))
yield from self.make_request(
'PUT',
self.bucket.new_key(path.path).generate_url(settings.TEMP_URL_SECS, 'PUT'),
expects=(200, 201),
throws=exceptions.CreateFolderError
)
return S3FolderMetadata({'Prefix': path.path})
开发者ID:rafaeldelucena,项目名称:waterbutler,代码行数:17,代码来源:provider.py
示例15: __init__
def __init__(self, resource, provider, metadata=None, path=None):
if path is None and metadata is None:
raise Exception("Log payload needs either a path or metadata.")
self.resource = resource
self.provider = provider
self.metadata = metadata
self.path = path or WaterButlerPath.from_metadata(metadata)
开发者ID:ccfair,项目名称:waterbutler,代码行数:8,代码来源:log_payload.py
示例16: copy
async def copy(self,
dest_provider: 'BaseProvider',
src_path: wb_path.WaterButlerPath,
dest_path: wb_path.WaterButlerPath,
rename: str=None, conflict: str='replace',
handle_naming: bool=True) \
-> typing.Tuple[wb_metadata.BaseMetadata, bool]:
args = (dest_provider, src_path, dest_path)
kwargs = {'rename': rename, 'conflict': conflict, 'handle_naming': handle_naming}
self.provider_metrics.add('copy', {
'got_handle_naming': handle_naming,
'conflict': conflict,
'got_rename': rename is not None,
})
if handle_naming:
dest_path = await dest_provider.handle_naming(
src_path,
dest_path,
rename=rename,
conflict=conflict,
)
args = (dest_provider, src_path, dest_path)
kwargs = {}
# files and folders shouldn't overwrite themselves
if (
self.shares_storage_root(dest_provider) and
src_path.materialized_path == dest_path.materialized_path
):
raise exceptions.OverwriteSelfError(src_path)
self.provider_metrics.add('copy.can_intra_copy', False)
if self.can_intra_copy(dest_provider, src_path):
self.provider_metrics.add('copy.can_intra_copy', True)
return await self.intra_copy(*args)
if src_path.is_dir:
return await self._folder_file_op(self.copy, *args, **kwargs) # type: ignore
download_stream = await self.download(src_path)
if getattr(download_stream, 'name', None):
dest_path.rename(download_stream.name)
return await dest_provider.upload(download_stream, dest_path)
开发者ID:CenterForOpenScience,项目名称:waterbutler,代码行数:46,代码来源:provider.py
示例17: test_metadata
def test_metadata(self, provider, folder_object_metadata, folder_list_metadata):
path = WaterButlerPath("/", _ids=(provider.folder,))
list_url = provider.build_url("folders", provider.folder, "items", fields="id,name,size,modified_at,etag")
aiohttpretty.register_json_uri("GET", list_url, body=folder_list_metadata)
result = yield from provider.metadata(path)
expected = []
for x in folder_list_metadata["entries"]:
if x["type"] == "file":
expected.append(BoxFileMetadata(x, path.child(x["name"])))
else:
expected.append(BoxFolderMetadata(x, path.child(x["name"])))
assert result == expected
开发者ID:cslzchen,项目名称:waterbutler,代码行数:18,代码来源:test_provider.py
示例18: test_metadata
def test_metadata(self, provider, folder_object_metadata, folder_list_metadata):
path = WaterButlerPath('/', _ids=(provider.folder, ))
list_url = provider.build_url('folders', provider.folder, 'items', fields='id,name,size,modified_at,etag')
aiohttpretty.register_json_uri('GET', list_url, body=folder_list_metadata)
result = yield from provider.metadata(path)
expected = []
for x in folder_list_metadata['entries']:
if x['type'] == 'file':
expected.append(BoxFileMetadata(x, path.child(x['name'])))
else:
expected.append(BoxFolderMetadata(x, path.child(x['name'])))
assert result == expected
开发者ID:kwierman,项目名称:waterbutler,代码行数:18,代码来源:test_provider.py
示例19: test_metadata
async def test_metadata(self, provider, root_provider_fixtures):
path = WaterButlerPath('/', _ids=(provider.folder, ))
list_url = provider.build_url('folders', provider.folder, 'items',
fields='id,name,size,modified_at,etag,total_count',
offset=0, limit=1000)
list_metadata = root_provider_fixtures['folder_list_metadata']
aiohttpretty.register_json_uri('GET', list_url, body=list_metadata)
result = await provider.metadata(path)
expected = []
for x in list_metadata['entries']:
if x['type'] == 'file':
expected.append(BoxFileMetadata(x, path.child(x['name'])))
else:
expected.append(BoxFolderMetadata(x, path.child(x['name'], folder=True)))
assert result == expected
开发者ID:CenterForOpenScience,项目名称:waterbutler,代码行数:21,代码来源:test_provider.py
示例20: validate_path
async def validate_path(self, path, revision=None, **kwargs):
"""Ensure path is in configured dataset
:param str path: The path to a file
:param list metadata: List of file metadata from _get_data
"""
if path == '/':
wbpath = WaterButlerPath('/')
wbpath.revision = revision
return wbpath
path = path.strip('/')
wbpath = None
for item in (await self._maybe_fetch_metadata(version=revision)):
if path == item.extra['fileId']:
wbpath = WaterButlerPath('/' + item.name, _ids=(None, item.extra['fileId']))
wbpath = wbpath or WaterButlerPath('/' + path)
wbpath.revision = revision
return wbpath
开发者ID:dev-alex-alex2006hw,项目名称:waterbutler,代码行数:21,代码来源:provider.py
注:本文中的waterbutler.core.path.WaterButlerPath类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论