• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python minio.Minio类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中minio.Minio的典型用法代码示例。如果您正苦于以下问题:Python Minio类的具体用法?Python Minio怎么用?Python Minio使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Minio类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_empty_list_parts_works

    def test_empty_list_parts_works(self, mock_connection):
        mock_data = '''<?xml version="1.0"?>
                       <ListPartsResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
                         <Bucket>bucket</Bucket>
                         <Key>go1.4.2</Key>
                         <UploadId>ntWSjzBytPT2xKLaMRonzXncsO10EH4Fc-Iq2-4hG-ulRYB</UploadId>
                         <Initiator>
                           <ID>minio</ID>
                           <DisplayName>minio</DisplayName>
                         </Initiator>
                         <Owner>
                           <ID>minio</ID>
                           <DisplayName>minio</DisplayName>
                         </Owner>
                         <StorageClass>STANDARD</StorageClass>
                         <PartNumberMarker>0</PartNumberMarker>
                         <NextPartNumberMarker>0</NextPartNumberMarker>
                         <MaxParts>1000</MaxParts>
                         <IsTruncated>false</IsTruncated>
                       </ListPartsResult>
                    '''
        mock_server = MockConnection()
        mock_connection.return_value = mock_server
        mock_server.mock_add_request(
            MockResponse('GET',
                         'https://localhost:9000/bucket/key?max-parts=1000&uploadId=upload_id',
                         {'User-Agent': _DEFAULT_USER_AGENT}, 200, content=mock_data))

        client = Minio('localhost:9000')
        part_iter = client._list_object_parts('bucket', 'key', 'upload_id')
        parts = []
        for part in part_iter:
            parts.append(part)
        eq_(0, len(parts))
开发者ID:bacongobbler,项目名称:minio-py,代码行数:34,代码来源:list_uploaded_parts_test.py


示例2: test_public_read

 def test_public_read(self, mock_connection):
     content = '''
               <AccessControlPolicy xmlns="http://s3.amazonaws.com/doc/2006-03-01/"> \
                 <Owner> \
                   <ID>75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a</ID> \
                   <DisplayName>[email protected]</DisplayName> \
                 </Owner> \
                 <AccessControlList> \
                   <Grant> \
                     <Grantee xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:type="CanonicalUser"> \
                       <ID>75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a</ID> \
                       <DisplayName>[email protected]</DisplayName> \
                       <URI>http://acs.amazonaws.com/groups/global/AllUsers</URI> \
                     </Grantee> \
                     <Permission>READ</Permission> \
                   </Grant> \
                   <Grant> \
                     <Grantee xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:type="CanonicalUser"> \
                       <ID>75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a</ID> \
                       <DisplayName>[email protected]</DisplayName> \
                     </Grantee> \
                     <Permission>FULL_CONTROL</Permission> \
                   </Grant> \
                 </AccessControlList> \
               </AccessControlPolicy>
               '''
     mock_server = MockConnection()
     mock_connection.return_value = mock_server
     mock_server.mock_add_request(MockResponse('GET',
                                               'https://localhost:9000/hello/?acl',
                                               {'User-Agent': _DEFAULT_USER_AGENT},
                                               200, content=content))
     client = Minio('localhost:9000')
     acl = client.get_bucket_acl('hello')
     eq_(Acl.public_read(), acl)
开发者ID:bacongobbler,项目名称:minio-py,代码行数:35,代码来源:get_bucket_acl_test.py


示例3: test_empty_list_uploads_test

 def test_empty_list_uploads_test(self, mock_connection):
     mock_data = '''<?xml version="1.0"?>
                    <ListMultipartUploadsResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
                      <Bucket>golang</Bucket>
                      <KeyMarker/>
                      <UploadIdMarker/>
                      <NextKeyMarker/>
                      <NextUploadIdMarker/>
                      <EncodingType/>
                      <MaxUploads>1000</MaxUploads>
                      <IsTruncated>false</IsTruncated>
                      <Prefix/>
                      <Delimiter/>
                    </ListMultipartUploadsResult>
                 '''
     mock_server = MockConnection()
     mock_connection.return_value = mock_server
     mock_server.mock_add_request(
         MockResponse('GET',
                      'https://localhost:9000/bucket/?max-uploads=1000&uploads',
                      {'User-Agent': _DEFAULT_USER_AGENT}, 200, content=mock_data))
     client = Minio('localhost:9000')
     upload_iter = client._list_incomplete_uploads('bucket', '', True, False)
     uploads = []
     for upload in upload_iter:
         uploads.append(upload)
     eq_(0, len(uploads))
开发者ID:bacongobbler,项目名称:minio-py,代码行数:27,代码来源:list_incomplete_uploads_test.py


示例4: test_empty_list_objects_works

    def test_empty_list_objects_works(self, mock_connection):
        mock_data = """<?xml version="1.0"?>
<ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
  <Name>bucket</Name>
  <Prefix/>
  <Marker/>
  <IsTruncated>false</IsTruncated>
  <MaxKeys>1000</MaxKeys>
  <Delimiter/>
</ListBucketResult>
        """
        mock_server = MockConnection()
        mock_connection.return_value = mock_server
        mock_server.mock_add_request(
            MockResponse(
                "GET",
                "https://localhost:9000/bucket/?max-keys=1000",
                {"User-Agent": _DEFAULT_USER_AGENT},
                200,
                content=mock_data,
            )
        )
        client = Minio("localhost:9000")
        bucket_iter = client.list_objects("bucket", recursive=True)
        buckets = []
        for bucket in bucket_iter:
            buckets.append(bucket)
        eq_(0, len(buckets))
开发者ID:bacongobbler,项目名称:minio-py,代码行数:28,代码来源:list_objects_test.py


示例5: test_notification_config_id_key_is_optional

 def test_notification_config_id_key_is_optional(self, mock_connection):
     mock_server = MockConnection()
     mock_connection.return_value = mock_server
     mock_server.mock_add_request(
         MockResponse(
             'PUT',
             'https://localhost:9000/my-test-bucket/?notification=',
             {
                 'Content-Md5': 'f+TfVp/A4pNnI7S4S+MkFg==',
                 'Content-Length': '196',
                 'User-Agent': _DEFAULT_USER_AGENT,
             },
             200, content=""
         )
     )
     client = Minio('localhost:9000')
     client.set_bucket_notification(
         'my-test-bucket',
         {
             'QueueConfigurations': [
                 {
                     'Arn': 'arn1',
                     'Events': ['s3:ObjectCreated:*'],
                 }
             ]
         }
     )
开发者ID:balamurugana,项目名称:minio-py,代码行数:27,代码来源:set_bucket_notification_test.py


示例6: get_presigned_get_url

def get_presigned_get_url(filename, config, secrets):
    """Generate a presigned get URL using the Minio S3 client.

    Args:
        filename: Name of file to use in S3
        config: Pyglidein cluster config dictionary
        secrets: Pyglidein cluster secrets dictionary

    Returns:
        string: Presigned Get URL

    """
    from minio import Minio
    from minio.error import ResponseError
    
    config_startd_logging = config['StartdLogging']
    secrets_startd_logging = secrets['StartdLogging']

    client = Minio(config_startd_logging['url'],
                   access_key=secrets_startd_logging['access_key'],
                   secret_key=secrets_startd_logging['secret_key'],
                   secure=True
                   )

    try:
        return client.presigned_get_object(config_startd_logging['bucket'],
                                           filename)
    except ResponseError as err:
        print(err)
开发者ID:IIHE,项目名称:pyglidein,代码行数:29,代码来源:client_util.py


示例7: test_remove_bucket_works

 def test_remove_bucket_works(self, mock_connection):
     mock_server = MockConnection()
     mock_connection.return_value = mock_server
     mock_server.mock_add_request(MockResponse('DELETE', 'http://localhost:9000/hello/',
                                               {'User-Agent': _DEFAULT_USER_AGENT}, 204))
     client = Minio('http://localhost:9000')
     client.remove_bucket('hello')
开发者ID:koolhead17,项目名称:minio-py,代码行数:7,代码来源:remove_bucket_test.py


示例8: test_set_bucket_acl_works

 def test_set_bucket_acl_works(self, mock_connection):
     mock_server = MockConnection()
     mock_connection.return_value = mock_server
     mock_server.mock_add_request(MockResponse('PUT', 'http://localhost:9000/hello/?acl',
                                               {'x-amz-acl': 'private',
                                                'User-Agent':  _DEFAULT_USER_AGENT}, 200))
     client = Minio('http://localhost:9000')
     client.set_bucket_acl('hello', Acl.private())
开发者ID:koolhead17,项目名称:minio-py,代码行数:8,代码来源:set_bucket_acl_test.py


示例9: test_bucket_exists_works

 def test_bucket_exists_works(self, mock_connection):
     mock_server = MockConnection()
     mock_connection.return_value = mock_server
     mock_server.mock_add_request(MockResponse('HEAD',
                                               'https://localhost:9000/hello/',
                                               {'User-Agent': _DEFAULT_USER_AGENT},
                                               400))
     client = Minio('localhost:9000')
     result = client.bucket_exists('hello')
开发者ID:minio,项目名称:minio-py,代码行数:9,代码来源:bucket_exist_test.py


示例10: test_list_uploads_works

    def test_list_uploads_works(self, mock_connection):
        mock_data = '''<?xml version="1.0"?>
                       <ListMultipartUploadsResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
                         <Bucket>golang</Bucket>
                         <KeyMarker/>
                         <UploadIdMarker/>
                         <NextKeyMarker>keymarker</NextKeyMarker>
                         <NextUploadIdMarker>uploadidmarker</NextUploadIdMarker>
                         <EncodingType/>
                         <MaxUploads>1000</MaxUploads>
                         <IsTruncated>false</IsTruncated>
                         <Upload>
                           <Key>go1.4.2</Key>
                           <UploadId>uploadid</UploadId>
                           <Initiator>
                             <ID/>
                             <DisplayName/>
                           </Initiator>
                           <Owner>
                             <ID/>
                             <DisplayName/>
                           </Owner>
                           <StorageClass/>
                           <Initiated>2015-05-30T14:43:35.349Z</Initiated>
                         </Upload>
                         <Upload>
                           <Key>go1.5.0</Key>
                           <UploadId>uploadid2</UploadId>
                           <Initiator>
                             <ID/>
                             <DisplayName/>
                           </Initiator>
                           <Owner>
                             <ID/>
                             <DisplayName/>
                           </Owner>
                           <StorageClass/>
                           <Initiated>2015-05-30T15:00:07.759Z</Initiated>
                         </Upload>
                         <Prefix/>
                         <Delimiter/>
                       </ListMultipartUploadsResult>
                    '''
        mock_server = MockConnection()
        mock_connection.return_value = mock_server
        mock_server.mock_add_request(
            MockResponse('GET',
                         'https://localhost:9000/bucket/?delimiter=%2F&max-uploads=1000&uploads',
                         {'User-Agent': _DEFAULT_USER_AGENT},
                         200, content=mock_data))

        client = Minio('localhost:9000')
        upload_iter = client._list_incomplete_uploads('bucket', '', False, False)
        uploads = []
        for upload in upload_iter:
            uploads.append(upload)
        eq_(2, len(uploads))
开发者ID:bacongobbler,项目名称:minio-py,代码行数:57,代码来源:list_incomplete_uploads_test.py


示例11: test_make_bucket_throws_fail

 def test_make_bucket_throws_fail(self, mock_connection):
     error_xml = generate_error('code', 'message', 'request_id',
                                'host_id', 'resource')
     mock_server = MockConnection()
     mock_connection.return_value = mock_server
     mock_server.mock_add_request(MockResponse('PUT',
                                               'http://localhost:9000/hello/',
                                               {'User-Agent': _DEFAULT_USER_AGENT},
                                               409, content=error_xml))
     client = Minio('http://localhost:9000')
     client.make_bucket('hello')
开发者ID:koolhead17,项目名称:minio-py,代码行数:11,代码来源:make_bucket_test.py


示例12: test_get_object_throws_fail

 def test_get_object_throws_fail(self, mock_connection):
     error_xml = generate_error('code', 'message', 'request_id',
                                'host_id', 'resource', 'bucket',
                                'object')
     mock_server = MockConnection()
     mock_connection.return_value = mock_server
     mock_server.mock_add_request(MockResponse('GET',
                                               'https://localhost:9000/hello/key',
                                               {'User-Agent': _DEFAULT_USER_AGENT},
                                               404, content=error_xml))
     client = Minio('localhost:9000')
     client.get_object('hello', 'key')
开发者ID:minio,项目名称:minio-py,代码行数:12,代码来源:get_object_test.py


示例13: test_notification_config_events_key_is_present

 def test_notification_config_events_key_is_present(self):
     client = Minio('localhost:9000')
     client.set_bucket_notification(
         'my-test-bucket',
         {
             'QueueConfigurations': [
                 {
                     'Id': '1',
                     'Arn': 'arn1',
                 }
             ]
         }
     )
开发者ID:balamurugana,项目名称:minio-py,代码行数:13,代码来源:set_bucket_notification_test.py


示例14: test_notification_config_arn_key_is_present

 def test_notification_config_arn_key_is_present(self):
     client = Minio('localhost:9000')
     client.set_bucket_notification(
         'my-test-bucket',
         {
             'QueueConfigurations': [
                 {
                     'Id': '1',
                     'Events': ['s3:ObjectCreated:*'],
                 }
             ]
         }
     )
开发者ID:balamurugana,项目名称:minio-py,代码行数:13,代码来源:set_bucket_notification_test.py


示例15: test_can_include_response_headers

 def test_can_include_response_headers(self):
     client = Minio('localhost:9000', 'my_access_key', 'my_secret_key',
                    secure=True)
     client._get_bucket_region = mock.Mock(return_value='us-east-1')
     r = client.presigned_get_object(
         'mybucket', 'myfile.pdf',
         response_headers={
             'Response-Content-Type': 'application/pdf',
             'Response-Content-Disposition': 'inline;  filename="test.pdf"'
         })
     self.assertIn('inline', r)
     self.assertIn('test.pdf', r)
     self.assertIn('application%2Fpdf', r)
开发者ID:balamurugana,项目名称:minio-py,代码行数:13,代码来源:presigned_get_object_test.py


示例16: test_stat_object_works

 def test_stat_object_works(self, mock_connection):
     mock_headers = {
         'content-type': 'application/octet-stream',
         'last-modified': 'Fri, 26 Jun 2015 19:05:37 GMT',
         'content-length': 11,
         'etag': '5eb63bbbe01eeed093cb22bb8f5acdc3'
     }
     mock_server = MockConnection()
     mock_connection.return_value = mock_server
     mock_server.mock_add_request(MockResponse('HEAD', 'http://localhost:9000/hello/world',
                                               {'User-Agent': _DEFAULT_USER_AGENT}, 200,
                                               response_headers=mock_headers))
     client = Minio('http://localhost:9000')
     client.stat_object('hello', 'world')
开发者ID:koolhead17,项目名称:minio-py,代码行数:14,代码来源:stat_object_test.py


示例17: test_notification_config_has_valid_keys

 def test_notification_config_has_valid_keys(self):
     client = Minio('localhost:9000')
     client.set_bucket_notification(
         'my-test-bucket',
         {
             'QueueConfiguration': [
                 {
                     'Id': '1',
                     'Arn': 'arn1',
                     'Events': ['s3:ObjectCreated:*'],
                 }
             ]
         }
     )
开发者ID:balamurugana,项目名称:minio-py,代码行数:14,代码来源:set_bucket_notification_test.py


示例18: test_notification_config_has_valid_event_names

 def test_notification_config_has_valid_event_names(self):
     client = Minio('localhost:9000')
     client.set_bucket_notification(
         'my-test-bucket',
         {
             'QueueConfigurations': [
                 {
                     'Id': '1',
                     'Arn': 'arn1',
                     'Events': ['object_created'],
                 }
             ]
         }
     )
开发者ID:balamurugana,项目名称:minio-py,代码行数:14,代码来源:set_bucket_notification_test.py


示例19: test_empty_list_buckets_works

 def test_empty_list_buckets_works(self, mock_connection):
     mock_data = '<ListAllMyBucketsResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><Buckets>' \
                 '</Buckets><Owner><ID>minio</ID><DisplayName>minio</DisplayName></Owner></ListAllMyBucketsResult>'
     mock_server = MockConnection()
     mock_connection.return_value = mock_server
     mock_server.mock_add_request(MockResponse('GET', 'https://localhost:9000/',
                                               {'User-Agent': _DEFAULT_USER_AGENT},
                                               200, content=mock_data))
     client = Minio('localhost:9000')
     buckets = client.list_buckets()
     count = 0
     for bucket in buckets:
         count += 1
     eq_(0, count)
开发者ID:minio,项目名称:minio-py,代码行数:14,代码来源:list_buckets_test.py


示例20: test_notification_config_id_key_is_string

 def test_notification_config_id_key_is_string(self):
     client = Minio('localhost:9000')
     client.set_bucket_notification(
         'my-test-bucket',
         {
             'QueueConfigurations': [
                 {
                     'Id': 1,
                     'Arn': 'abc',
                     'Events': ['s3:ObjectCreated:*'],
                 }
             ]
         }
     )
开发者ID:balamurugana,项目名称:minio-py,代码行数:14,代码来源:set_bucket_notification_test.py



注:本文中的minio.Minio类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python minisom.MiniSom类代码示例发布时间:2022-05-27
下一篇:
Python util.quietRun函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap