• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python fake_session.FakeSession类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tests.unit.customizations.s3.fake_session.FakeSession的典型用法代码示例。如果您正苦于以下问题:Python FakeSession类的具体用法?Python FakeSession怎么用?Python FakeSession使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了FakeSession类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: setUp

 def setUp(self):
     super(S3HandlerTestBucket, self).setUp()
     self.session = FakeSession()
     self.service = self.session.get_service('s3')
     self.endpoint = self.service.get_endpoint('us-east-1')
     self.params = {'region': 'us-east-1'}
     self.bucket = None
开发者ID:SydOps,项目名称:aws-cli,代码行数:7,代码来源:test_s3handler.py


示例2: setUp

    def setUp(self):
        super(S3HandlerTestDownload, self).setUp()
        self.session = FakeSession()
        self.service = self.session.get_service('s3')
        self.endpoint = self.service.get_endpoint('us-east-1')
        params = {'region': 'us-east-1'}
        self.s3_handler = S3Handler(self.session, params)
        self.s3_handler_multi = S3Handler(
            self.session, params,
            runtime_config=runtime_config(multipart_threshold=10,
                                          multipart_chunksize=2))
        self.bucket = make_s3_files(self.session)
        self.s3_files = [self.bucket + '/text1.txt',
                         self.bucket + '/another_directory/text2.txt']
        directory1 = os.path.abspath('.') + os.sep + 'some_directory' + os.sep
        filename1 = directory1 + "text1.txt"
        directory2 = directory1 + 'another_directory' + os.sep
        filename2 = directory2 + "text2.txt"
        self.loc_files = [filename1, filename2]

        self.fail_session = FakeSession(connection_error=True)
        self.fail_session.s3 = self.session.s3
        self.s3_handler_multi_except = S3Handler(
            self.fail_session, params,
            runtime_config=runtime_config(
                multipart_threshold=10,
                multipart_chunksize=2))
开发者ID:hardiku,项目名称:aws-cli,代码行数:27,代码来源:test_s3handler.py


示例3: S3HandlerTestURLEncodeDeletes

class S3HandlerTestURLEncodeDeletes(S3HandlerBaseTest):
    def setUp(self):
        super(S3HandlerTestURLEncodeDeletes, self).setUp()
        self.session = FakeSession()
        self.service = self.session.get_service('s3')
        self.endpoint = self.service.get_endpoint('us-east-1')
        params = {'region': 'us-east-1'}
        self.s3_handler = S3Handler(self.session, params)
        self.bucket = make_s3_files(self.session, key1='a+b/foo', key2=None)

    def tearDown(self):
        super(S3HandlerTestURLEncodeDeletes, self).tearDown()
        s3_cleanup(self.bucket, self.session)

    def test_s3_delete_url_encode(self):
        """
        Tests S3 deletes. The files used are the same generated from
        filegenerators_test.py.  This includes the create s3 file.
        """
        key = self.bucket + '/a+b/foo'
        tasks = [FileInfo(
            src=key, src_type='s3', dest_type='local',
            operation_name='delete', size=0,
            service=self.service, endpoint=self.endpoint)]
        self.assertEqual(len(list_contents(self.bucket, self.session)), 1)
        self.s3_handler.call(tasks)
        self.assertEqual(len(list_contents(self.bucket, self.session)), 0)
开发者ID:SydOps,项目名称:aws-cli,代码行数:27,代码来源:test_s3handler.py


示例4: setUp

 def setUp(self):
     self.session = FakeSession()
     self.bucket = make_s3_files(self.session)
     self.file1 = self.bucket + '/' + 'text1.txt'
     self.file2 = self.bucket + '/' + 'another_directory/text2.txt'
     self.service = self.session.get_service('s3')
     self.endpoint = self.service.get_endpoint('us-east-1')
开发者ID:HackedByChinese,项目名称:aws-cli,代码行数:7,代码来源:test_filegenerator.py


示例5: TestThrowsWarning

class TestThrowsWarning(unittest.TestCase):
    def setUp(self):
        self.files = FileCreator()
        self.root = self.files.rootdir
        self.session = FakeSession()
        self.service = self.session.get_service('s3')
        self.endpoint = self.service.get_endpoint('us-east-1')

    def tearDown(self):
        self.files.remove_all()

    def test_no_warning(self):
        file_gen = FileGenerator(self.service, self.endpoint, '', False)
        self.files.create_file("foo.txt", contents="foo")
        full_path = os.path.join(self.root, "foo.txt")
        return_val = file_gen.triggers_warning(full_path)
        self.assertFalse(return_val)
        self.assertTrue(file_gen.result_queue.empty())

    def test_no_exists(self):
        file_gen = FileGenerator(self.service, self.endpoint, '', False)
        filename = os.path.join(self.root, 'file')
        return_val = file_gen.triggers_warning(filename)
        self.assertTrue(return_val)
        warning_message = file_gen.result_queue.get()
        self.assertEqual(warning_message.message,
                         ("warning: Skipping file %s. File does not exist." %
                          filename))

    def test_no_read_access(self):
        file_gen = FileGenerator(self.service, self.endpoint, '', False)
        self.files.create_file("foo.txt", contents="foo")
        full_path = os.path.join(self.root, "foo.txt")
        open_function = 'awscli.customizations.s3.filegenerator._open'
        with mock.patch(open_function) as mock_class:
            mock_class.side_effect = OSError()
            return_val = file_gen.triggers_warning(full_path)
            self.assertTrue(return_val)
        warning_message = file_gen.result_queue.get()
        self.assertEqual(warning_message.message,
                         ("warning: Skipping file %s. File/Directory is "
                          "not readable." % full_path))

    @unittest.skipIf(platform.system() not in ['Darwin', 'Linux'],
                     'Special files only supported on mac/linux')
    def test_is_special_file_warning(self):
        file_gen = FileGenerator(self.service, self.endpoint, '', False)
        file_path = os.path.join(self.files.rootdir, 'foo')
        # Use socket for special file.
        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        sock.bind(file_path)
        return_val = file_gen.triggers_warning(file_path)
        self.assertTrue(return_val)
        warning_message = file_gen.result_queue.get()
        self.assertEqual(warning_message.message,
                         ("warning: Skipping file %s. File is character "
                          "special device, block special device, FIFO, or "
                          "socket." % file_path))
开发者ID:emyphan,项目名称:aws-cli,代码行数:58,代码来源:test_filegenerator.py


示例6: setUp

 def setUp(self):
     self.environ = {}
     self.environ_patch = patch('os.environ', self.environ)
     self.environ_patch.start()
     self.session = FakeSession()
     self.mock = MagicMock()
     self.mock.get_config = MagicMock(return_value={'region': None})
     self.loc_files = make_loc_files()
     self.bucket = make_s3_files(self.session)
开发者ID:2mind,项目名称:aws-cli,代码行数:9,代码来源:test_s3.py


示例7: TestIgnoreFilesLocally

class TestIgnoreFilesLocally(unittest.TestCase):
    """
    This class tests the ability to ignore particular files.  This includes
    skipping symlink when desired.
    """
    def setUp(self):
        self.session = FakeSession()
        self.service = self.session.get_service('s3')
        self.endpoint = self.service.get_endpoint('us-east-1')
        self.files = FileCreator()

    def tearDown(self):
        self.files.remove_all()

    def test_warning(self):
        path = os.path.join(self.files.rootdir, 'badsymlink')
        os.symlink('non-existent-file', path)
        filegenerator = FileGenerator(self.service, self.endpoint,
                                      '', True)
        self.assertTrue(filegenerator.should_ignore_file(path))

    def test_skip_symlink(self):
        filename = 'foo.txt'
        self.files.create_file(os.path.join(self.files.rootdir,
                               filename),
                               contents='foo.txt contents')
        sym_path = os.path.join(self.files.rootdir, 'symlink')
        os.symlink(filename, sym_path)
        filegenerator = FileGenerator(self.service, self.endpoint,
                                      '', False)
        self.assertTrue(filegenerator.should_ignore_file(sym_path))

    def test_no_skip_symlink(self):
        filename = 'foo.txt'
        path = self.files.create_file(os.path.join(self.files.rootdir,
                                                   filename),
                                      contents='foo.txt contents')
        sym_path = os.path.join(self.files.rootdir, 'symlink')
        os.symlink(path, sym_path)
        filegenerator = FileGenerator(self.service, self.endpoint,
                                      '', True)
        self.assertFalse(filegenerator.should_ignore_file(sym_path))
        self.assertFalse(filegenerator.should_ignore_file(path))

    def test_no_skip_symlink_dir(self):
        filename = 'dir'
        path = os.path.join(self.files.rootdir, 'dir/')
        os.mkdir(path)
        sym_path = os.path.join(self.files.rootdir, 'symlink')
        os.symlink(path, sym_path)
        filegenerator = FileGenerator(self.service, self.endpoint,
                                      '', True)
        self.assertFalse(filegenerator.should_ignore_file(sym_path))
        self.assertFalse(filegenerator.should_ignore_file(path))
开发者ID:emyphan,项目名称:aws-cli,代码行数:54,代码来源:test_filegenerator.py


示例8: S3HandlerTestMvLocalS3

class S3HandlerTestMvLocalS3(S3HandlerBaseTest):
    """
    This class tests the ability to move s3 objects.  The move
    operation uses a upload then delete.
    """
    def setUp(self):
        super(S3HandlerTestMvLocalS3, self).setUp()
        self.session = FakeSession()
        self.service = self.session.get_service('s3')
        self.endpoint = self.service.get_endpoint('us-east-1')
        params = {'region': 'us-east-1', 'acl': ['private'], 'quiet': True}
        self.s3_handler = S3Handler(self.session, params)
        self.bucket = create_bucket(self.session)
        self.loc_files = make_loc_files()
        self.s3_files = [self.bucket + '/text1.txt',
                         self.bucket + '/another_directory/text2.txt']

    def tearDown(self):
        super(S3HandlerTestMvLocalS3, self).tearDown()
        clean_loc_files(self.loc_files)
        s3_cleanup(self.bucket, self.session)

    def test_move_unicode(self):
        self.bucket2 = make_s3_files(self.session, key1=u'\u2713')
        tasks = [FileInfo(
            src=self.bucket2 + '/' + u'\u2713',
            src_type='s3',
            dest=self.bucket + '/' + u'\u2713',
            dest_type='s3', operation_name='move',
            size=0,
            service=self.service,
            endpoint=self.endpoint,
        )]
        self.s3_handler.call(tasks)
        self.assertEqual(len(list_contents(self.bucket, self.session)), 1)

    def test_move(self):
        # Create file info objects to perform move.
        files = [self.loc_files[0], self.loc_files[1]]
        tasks = []
        for i in range(len(files)):
            tasks.append(FileInfo(
                src=self.loc_files[i], src_type='local',
                dest=self.s3_files[i], dest_type='s3',
                operation_name='move', size=0,
                service=self.service,
                endpoint=self.endpoint))
        # Perform the move.
        self.s3_handler.call(tasks)
        # Confirm the files were uploaded.
        self.assertEqual(len(list_contents(self.bucket, self.session)), 2)
        # Confirm local files do not exist.
        for filename in files:
            self.assertFalse(os.path.exists(filename))
开发者ID:SydOps,项目名称:aws-cli,代码行数:54,代码来源:test_s3handler.py


示例9: setUp

 def setUp(self):
     super(S3HandlerTestUpload, self).setUp()
     self.session = FakeSession()
     self.service = self.session.get_service('s3')
     self.endpoint = self.service.get_endpoint('us-east-1')
     params = {'region': 'us-east-1', 'acl': ['private']}
     self.s3_handler = S3Handler(self.session, params)
     self.s3_handler_multi = S3Handler(self.session, multi_threshold=10,
                                       chunksize=2,
                                       params=params)
     self.bucket = create_bucket(self.session)
     self.loc_files = make_loc_files()
     self.s3_files = [self.bucket + '/text1.txt',
                      self.bucket + '/another_directory/text2.txt']
开发者ID:AsherBond,项目名称:aws-cli,代码行数:14,代码来源:test_s3handler.py


示例10: S3HandlerTestMvS3Local

class S3HandlerTestMvS3Local(S3HandlerBaseTest):
    """
    This class tests the ability to move s3 objects.  The move
    operation uses a download then delete.
    """
    def setUp(self):
        super(S3HandlerTestMvS3Local, self).setUp()
        self.session = FakeSession()
        self.service = self.session.get_service('s3')
        self.endpoint = self.service.get_endpoint('us-east-1')
        params = {'region': 'us-east-1'}
        self.s3_handler = S3Handler(self.session, params)
        self.bucket = make_s3_files(self.session)
        self.s3_files = [self.bucket + '/text1.txt',
                         self.bucket + '/another_directory/text2.txt']
        directory1 = os.path.abspath('.') + os.sep + 'some_directory' + os.sep
        filename1 = directory1 + "text1.txt"
        directory2 = directory1 + 'another_directory' + os.sep
        filename2 = directory2 + "text2.txt"
        self.loc_files = [filename1, filename2]

    def tearDown(self):
        super(S3HandlerTestMvS3Local, self).tearDown()
        clean_loc_files(self.loc_files)
        s3_cleanup(self.bucket, self.session)

    def test_move(self):
        # Create file info objects to perform move.
        tasks = []
        time = datetime.datetime.now()
        for i in range(len(self.s3_files)):
            tasks.append(FileInfo(
                src=self.s3_files[i], src_type='s3',
                dest=self.loc_files[i], dest_type='local',
                last_update=time, operation_name='move',
                size=0,
                service=self.service,
                endpoint=self.endpoint))
        # Perform the move.
        self.s3_handler.call(tasks)
        # Confirm that the files now exist.
        for filename in self.loc_files:
            self.assertTrue(os.path.exists(filename))
        # Ensure the contents are as expected.
        with open(self.loc_files[0], 'rb') as filename:
            self.assertEqual(filename.read(), b'This is a test.')
        with open(self.loc_files[1], 'rb') as filename:
            self.assertEqual(filename.read(), b'This is another test.')
        # Ensure the objects are no longer in the bucket.
        self.assertEqual(len(list_contents(self.bucket, self.session)), 1)
开发者ID:SydOps,项目名称:aws-cli,代码行数:50,代码来源:test_s3handler.py


示例11: S3HandlerTestBucket

class S3HandlerTestBucket(S3HandlerBaseTest):
    """
    Test the ability to make a bucket then remove it.
    """
    def setUp(self):
        super(S3HandlerTestBucket, self).setUp()
        self.session = FakeSession()
        self.service = self.session.get_service('s3')
        self.endpoint = self.service.get_endpoint('us-east-1')
        params = {'region': 'us-east-1'}
        self.s3_handler = S3Handler(self.session, params)
        self.bucket = None

    def tearDown(self):
        super(S3HandlerTestBucket, self).tearDown()
        s3_cleanup(self.bucket, self.session)

    def test_bucket(self):
        rand1 = random.randrange(5000)
        rand2 = random.randrange(5000)
        self.bucket = str(rand1) + 'mybucket' + str(rand2) + '/'
        orig_number_buckets = len(list_buckets(self.session))

        file_info = FileInfo(
            src=self.bucket,
            operation_name='make_bucket',
            size=0,
            service=self.service,
            endpoint=self.endpoint)
        self.s3_handler.call([file_info])
        number_buckets = len(list_buckets(self.session))
        self.assertEqual(orig_number_buckets + 1, number_buckets)

        file_info = FileInfo(
            src=self.bucket,
            operation_name='remove_bucket',
            size=0,
            service=self.service,
            endpoint=self.endpoint)
        self.s3_handler.call([file_info])
        number_buckets = len(list_buckets(self.session))
        self.assertEqual(orig_number_buckets, number_buckets)
开发者ID:AsherBond,项目名称:aws-cli,代码行数:42,代码来源:test_s3handler.py


示例12: S3HandlerTestMvS3S3

class S3HandlerTestMvS3S3(S3HandlerBaseTest):
    """
    This class tests the ability to move s3 objects.  The move
    operation uses a copy then delete.
    """
    def setUp(self):
        super(S3HandlerTestMvS3S3, self).setUp()
        self.session = FakeSession()
        self.service = self.session.get_service('s3')
        self.endpoint = self.service.get_endpoint('us-east-1')
        params = {'region': 'us-east-1', 'acl': ['private']}
        self.s3_handler = S3Handler(self.session, params)
        self.bucket = make_s3_files(self.session)
        self.bucket2 = create_bucket(self.session)
        self.s3_files = [self.bucket + '/text1.txt',
                         self.bucket + '/another_directory/text2.txt']
        self.s3_files2 = [self.bucket2 + '/text1.txt',
                          self.bucket2 + '/another_directory/text2.txt']

    def tearDown(self):
        super(S3HandlerTestMvS3S3, self).tearDown()
        s3_cleanup(self.bucket, self.session)
        s3_cleanup(self.bucket2, self.session)

    def test_move(self):
        # Confirm there are no objects in the bucket.
        self.assertEqual(len(list_contents(self.bucket2, self.session)), 0)
        # Create file info objects to perform move.
        tasks = []
        for i in range(len(self.s3_files)):
            tasks.append(FileInfo(
                src=self.s3_files[i], src_type='s3',
                dest=self.s3_files2[i], dest_type='s3',
                operation_name='move', size=0,
                service=self.service,
                endpoint=self.endpoint))
        # Perform the move.
        self.s3_handler.call(tasks)
        # Confirm the files were moved.  The origial bucket had three
        # objects. Only two were moved.
        self.assertEqual(len(list_contents(self.bucket, self.session)), 1)
        self.assertEqual(len(list_contents(self.bucket2, self.session)), 2)
开发者ID:SydOps,项目名称:aws-cli,代码行数:42,代码来源:test_s3handler.py


示例13: S3HandlerExceptionMultiTaskTest

class S3HandlerExceptionMultiTaskTest(S3HandlerBaseTest):
    """
    This tests the ability to handle multipart upload exceptions.
    This includes a standard error stemming from an operation on
    a nonexisting bucket, connection error, and md5 error.
    """
    def setUp(self):
        super(S3HandlerExceptionMultiTaskTest, self).setUp()
        self.session = FakeSession(True, True)
        self.service = self.session.get_service('s3')
        self.endpoint = self.service.get_endpoint('us-east-1')
        params = {'region': 'us-east-1', 'quiet': True}
        self.s3_handler_multi = S3Handler(
            self.session, params,
            runtime_config=runtime_config(
                multipart_threshold=10, multipart_chunksize=2))
        self.bucket = create_bucket(self.session)
        self.loc_files = make_loc_files()
        self.s3_files = [self.bucket + '/text1.txt',
                         self.bucket + '/another_directory/text2.txt']

    def tearDown(self):
        super(S3HandlerExceptionMultiTaskTest, self).tearDown()
        clean_loc_files(self.loc_files)
        s3_cleanup(self.bucket, self.session)

    def test_multi_upload(self):
        files = [self.loc_files[0], self.loc_files[1]]
        fail_s3_files = [self.bucket + '/text1.txt',
                         self.bucket[:-1] + '/another_directory/text2.txt']
        tasks = []
        for i in range(len(files)):
            tasks.append(FileInfo(
                src=self.loc_files[i],
                dest=fail_s3_files[i], size=15,
                operation_name='upload',
                service=self.service,
                endpoint=self.endpoint))
        self.s3_handler_multi.call(tasks)
开发者ID:hardiku,项目名称:aws-cli,代码行数:39,代码来源:test_s3handler.py


示例14: S3HandlerExceptionSingleTaskTest

class S3HandlerExceptionSingleTaskTest(S3HandlerBaseTest):
    """
    This tests the ability to handle connection and md5 exceptions.
    The command used in this general test is a put command.
    """
    def setUp(self):
        super(S3HandlerExceptionSingleTaskTest, self).setUp()
        self.session = FakeSession(True, True)
        self.service = self.session.get_service('s3')
        self.endpoint = self.service.get_endpoint('us-east-1')
        params = {'region': 'us-east-1'}
        self.s3_handler = S3Handler(self.session, params)
        self.bucket = create_bucket(self.session)
        self.loc_files = make_loc_files()
        self.s3_files = [self.bucket + '/text1.txt',
                         self.bucket + '/another_directory/text2.txt']

    def tearDown(self):
        super(S3HandlerExceptionSingleTaskTest, self).tearDown()
        clean_loc_files(self.loc_files)
        s3_cleanup(self.bucket, self.session)

    def test_upload(self):
        # Confirm there are no objects in the bucket.
        self.assertEqual(len(list_contents(self.bucket, self.session)), 0)
        # Create file info objects to perform upload.
        files = [self.loc_files[0], self.loc_files[1]]
        tasks = []
        for i in range(len(files)):
            tasks.append(FileInfo(src=self.loc_files[i],
                                  dest=self.s3_files[i],
                                  operation_name='upload', size=0,
                                  service=self.service,
                                  endpoint=self.endpoint))
        # Perform the upload.
        self.s3_handler.call(tasks)
        # Confirm despite the exceptions, the files were uploaded.
        self.assertEqual(len(list_contents(self.bucket, self.session)), 2)
开发者ID:SydOps,项目名称:aws-cli,代码行数:38,代码来源:test_s3handler.py


示例15: setUp

 def setUp(self):
     self.session = FakeSession()
     self.service = self.session.get_service('s3')
     self.endpoint = self.service.get_endpoint('us-east-1')
     self.files = FileCreator()
     # List of local filenames.
     self.filenames = []
     self.root = self.files.rootdir
     self.bucket = 'bucket/'
     filename_1 = self.files.create_file('foo.txt',
                                         contents='foo.txt contents')
     self.filenames.append(filename_1)
     nested_dir = os.path.join(self.root, 'realfiles')
     os.mkdir(nested_dir)
     filename_2 = self.files.create_file(os.path.join(nested_dir,
                                                      'bar.txt'),
                                         contents='bar.txt contents')
     self.filenames.append(filename_2)
     # Names of symlinks.
     self.symlinks = []
     # Names of files if symlinks are followed.
     self.symlink_files = []
     # Create symlink to file foo.txt.
     symlink_1 = os.path.join(self.root, 'symlink_1')
     os.symlink(filename_1, symlink_1)
     self.symlinks.append(symlink_1)
     self.symlink_files.append(symlink_1)
     # Create a symlink to a file that does not exist.
     symlink_2 = os.path.join(self.root, 'symlink_2')
     os.symlink('non-existent-file', symlink_2)
     self.symlinks.append(symlink_2)
     # Create a symlink to directory realfiles
     symlink_3 = os.path.join(self.root, 'symlink_3')
     os.symlink(nested_dir, symlink_3)
     self.symlinks.append(symlink_3)
     self.symlink_files.append(os.path.join(symlink_3, 'bar.txt'))
开发者ID:emyphan,项目名称:aws-cli,代码行数:36,代码来源:test_filegenerator.py


示例16: LocalFileGeneratorTest

class LocalFileGeneratorTest(unittest.TestCase):
    def setUp(self):
        self.local_file = os.path.abspath(".") + os.sep + "some_directory" + os.sep + "text1.txt"
        self.local_dir = os.path.abspath(".") + os.sep + "some_directory" + os.sep
        self.session = FakeSession()
        self.service = self.session.get_service("s3")
        self.endpoint = self.service.get_endpoint("us-east-1")
        self.files = make_loc_files()

    def tearDown(self):
        clean_loc_files(self.files)

    def test_local_file(self):
        """
        Generate a single local file.
        """
        input_local_file = {
            "src": {"path": self.local_file, "type": "local"},
            "dest": {"path": "bucket/text1.txt", "type": "s3"},
            "dir_op": False,
            "use_src_name": False,
        }
        params = {"region": "us-east-1"}
        files = FileGenerator(self.service, self.endpoint, "", params).call(input_local_file)
        result_list = []
        for filename in files:
            result_list.append(filename)
        size, last_update = get_file_stat(self.local_file)
        file_info = FileInfo(
            src=self.local_file,
            dest="bucket/text1.txt",
            compare_key="text1.txt",
            size=size,
            last_update=last_update,
            src_type="local",
            dest_type="s3",
            operation_name="",
            service=None,
            endpoint=None,
        )
        ref_list = [file_info]
        self.assertEqual(len(result_list), len(ref_list))
        for i in range(len(result_list)):
            compare_files(self, result_list[i], ref_list[i])

    def test_local_directory(self):
        """
        Generate an entire local directory.
        """
        input_local_dir = {
            "src": {"path": self.local_dir, "type": "local"},
            "dest": {"path": "bucket/", "type": "s3"},
            "dir_op": True,
            "use_src_name": True,
        }
        params = {"region": "us-east-1"}
        files = FileGenerator(self.service, self.endpoint, "", params).call(input_local_dir)
        result_list = []
        for filename in files:
            result_list.append(filename)
        size, last_update = get_file_stat(self.local_file)
        file_info = FileInfo(
            src=self.local_file,
            dest="bucket/text1.txt",
            compare_key="text1.txt",
            size=size,
            last_update=last_update,
            src_type="local",
            dest_type="s3",
            operation_name="",
            service=None,
            endpoint=None,
        )
        path = self.local_dir + "another_directory" + os.sep + "text2.txt"
        size, last_update = get_file_stat(path)
        file_info2 = FileInfo(
            src=path,
            dest="bucket/another_directory/text2.txt",
            compare_key="another_directory/text2.txt",
            size=size,
            last_update=last_update,
            src_type="local",
            dest_type="s3",
            operation_name="",
            service=None,
            endpoint=None,
        )
        ref_list = [file_info2, file_info]
        self.assertEqual(len(result_list), len(ref_list))
        for i in range(len(result_list)):
            compare_files(self, result_list[i], ref_list[i])
开发者ID:CitizenB,项目名称:aws-cli,代码行数:91,代码来源:test_filegenerator.py


示例17: S3FileGeneratorTest

class S3FileGeneratorTest(unittest.TestCase):
    def setUp(self):
        self.session = FakeSession()
        self.bucket = make_s3_files(self.session)
        self.file1 = self.bucket + "/" + "text1.txt"
        self.file2 = self.bucket + "/" + "another_directory/text2.txt"
        self.service = self.session.get_service("s3")
        self.endpoint = self.service.get_endpoint("us-east-1")

    def tearDown(self):
        s3_cleanup(self.bucket, self.session)

    def test_nonexist_s3_file(self):
        """
        This tests to make sure that files are not misproperly yielded by
        ensuring the file prefix is the exact same as what was inputted.
        """
        input_s3_file = {
            "src": {"path": self.file1[:-1], "type": "s3"},
            "dest": {"path": "text1.txt", "type": "local"},
            "dir_op": False,
            "use_src_name": False,
        }
        params = {"region": "us-east-1"}
        files = FileGenerator(self.service, self.endpoint, "", params).call(input_s3_file)
        self.assertEqual(len(list(files)), 0)

    def test_s3_file(self):
        """
        Generate a single s3 file
        Note: Size and last update are not tested because s3 generates them.
        """
        input_s3_file = {
            "src": {"path": self.file1, "type": "s3"},
            "dest": {"path": "text1.txt", "type": "local"},
            "dir_op": False,
            "use_src_name": False,
        }
        params = {"region": "us-east-1"}
        files = FileGenerator(self.service, self.endpoint, "", params).call(input_s3_file)
        result_list = []
        for filename in files:
            result_list.append(filename)
        file_info = FileInfo(
            src=self.file1,
            dest="text1.txt",
            compare_key="text1.txt",
            size=result_list[0].size,
            last_update=result_list[0].last_update,
            src_type="s3",
            dest_type="local",
            operation_name="",
            service=None,
            endpoint=None,
        )

        ref_list = [file_info]
        self.assertEqual(len(result_list), len(ref_list))
        for i in range(len(result_list)):
            compare_files(self, result_list[i], ref_list[i])

    def test_s3_directory(self):
        """
        Generates s3 files under a common prefix. Also it ensures that
        zero size files are ignored.
        Note: Size and last update are not tested because s3 generates them.
        """
        input_s3_file = {
            "src": {"path": self.bucket + "/", "type": "s3"},
            "dest": {"path": "", "type": "local"},
            "dir_op": True,
            "use_src_name": True,
        }
        params = {"region": "us-east-1"}
        files = FileGenerator(self.service, self.endpoint, "", params).call(input_s3_file)
        result_list = []
        for filename in files:
            result_list.append(filename)
        file_info = FileInfo(
            src=self.file2,
            dest="another_directory" + os.sep + "text2.txt",
            compare_key="another_directory/text2.txt",
            size=result_list[0].size,
            last_update=result_list[0].last_update,
            src_type="s3",
            dest_type="local",
            operation_name="",
            service=None,
            endpoint=None,
        )
        file_info2 = FileInfo(
            src=self.file1,
            dest="text1.txt",
            compare_key="text1.txt",
            size=result_list[1].size,
            last_update=result_list[1].last_update,
            src_type="s3",
            dest_type="local",
            operation_name="",
            service=None,
#.........这里部分代码省略.........
开发者ID:CitizenB,项目名称:aws-cli,代码行数:101,代码来源:test_filegenerator.py


示例18: TestStreams

class TestStreams(S3HandlerBaseTest):
    def setUp(self):
        super(TestStreams, self).setUp()
        self.session = FakeSession()
        self.service = self.session.get_service('s3')
        self.endpoint = self.service.get_endpoint('us-east-1')
        self.params = {'is_stream': True, 'region': 'us-east-1'}

    def test_pull_from_stream(self):
        s3handler = S3StreamHandler(self.session, self.params, chunksize=2)
        input_to_stdin = b'This is a test'
        size = len(input_to_stdin)
        # Retrieve the entire string.
        with MockStdIn(input_to_stdin):
            payload, is_amount_requested = s3handler._pull_from_stream(size)
            data = payload.read()
            self.assertTrue(is_amount_requested)
            self.assertEqual(data, input_to_stdin)
        # Ensure the function exits when there is nothing to read.
        with MockStdIn():
            payload, is_amount_requested = s3handler._pull_from_stream(size)
            data = payload.read()
            self.assertFalse(is_amount_requested)
            self.assertEqual(data, b'')
        # Ensure the function does not grab too much out of stdin.
        with MockStdIn(input_to_stdin):
            payload, is_amount_requested = s3handler._pull_from_stream(size-2)
            data = payload.read()
            self.assertTrue(is_amount_requested)
            self.assertEqual(data, input_to_stdin[:-2])
            # Retrieve the rest of standard in.
            payload, is_amount_requested = s3handler._pull_from_stream(size)
            data = payload.read()
            self.assertFalse(is_amount_requested)
            self.assertEqual(data, input_to_stdin[-2:])

    def test_upload_stream_not_multipart_task(self):
        s3handler = S3StreamHandler(self.session, self.params)
        s3handler.executor = mock.Mock()
        fileinfos = [FileInfo('filename', operation_name='upload',
                              is_stream=True, size=0)]
        with MockStdIn(b'bar'):
            s3handler._enqueue_tasks(fileinfos)
        submitted_tasks = s3handler.executor.submit.call_args_list
        # No multipart upload should have been submitted.
        self.assertEqual(len(submitted_tasks), 1)
        self.assertEqual(submitted_tasks[0][0][0].payload.read(),
                         b'bar')

    def test_upload_stream_is_multipart_task(self):
        s3handler = S3StreamHandler(self.session, self.params,
                                    multi_threshold=1)
        s3handler.executor = mock.Mock()
        fileinfos = [FileInfo('filename', operation_name='upload',
                              is_stream=True, size=0)]
        with MockStdIn(b'bar'):
            s3handler._enqueue_tasks(fileinfos)
        submitted_tasks = s3handler.executor.submit.call_args_list
        # This should be a multipart upload so multiple tasks
        # should have been submitted.
        self.assertEqual(len(submitted_tasks), 4)
        self.assertEqual(submitted_tasks[1][0][0]._payload.read(),
                         b'b')
        self.assertEqual(submitted_tasks[2][0][0]._payload.read(),
                         b'ar')

    def test_upload_stream_with_expected_size(self):
        self.params['expected_size'] = 100000
        # With this large of expected size, the chunksize of 2 will have
        # to change.
        s3handler = S3StreamHandler(self.session, self.params, chunksize=2)
        s3handler.executor = mock.Mock()
        fileinfo = FileInfo('filename', operation_name='upload',
                            is_stream=True)
        with MockStdIn(b'bar'):
            s3handler._enqueue_multipart_upload_tasks(fileinfo, b'')
        submitted_tasks = s3handler.executor.submit.call_args_list
        # Determine what the chunksize was changed to from one of the
        # UploadPartTasks.
        changed_chunk_size = submitted_tasks[1][0][0]._chunk_size
        # New chunksize should have a total parts under 1000.
        self.assertTrue(100000/changed_chunk_size < 1000)

    def test_upload_stream_enqueue_upload_task(self):
        s3handler = S3StreamHandler(self.session, self.params)
        s3handler.executor = mock.Mock()
        fileinfo = FileInfo('filename', operation_name='upload',
                            is_stream=True)
        stdin_input = b'This is a test'
        with MockStdIn(stdin_input):
            num_parts = s3handler._enqueue_upload_tasks(None, 2, mock.Mock(),
                                                        fileinfo,
                                                        UploadPartTask)
        submitted_tasks = s3handler.executor.submit.call_args_list
        # Ensure the returned number of parts is correct.
        self.assertEqual(num_parts, len(submitted_tasks) + 1)
        # Ensure the number of tasks uploaded are as expected
        self.assertEqual(len(submitted_tasks), 8)
        index = 0
        for i in range(len(submitted_tasks)-1):
#.........这里部分代码省略.........
开发者ID:SydOps,项目名称:aws-cli,代码行数:101,代码来源:test_s3handler.py


示例19: S3HandlerTestDownload

class S3HandlerTestDownload(S3HandlerBaseTest):
    """
    This class tests the ability to download s3 objects locally as well
    as using multipart downloads
    """
    def setUp(self):
        super(S3HandlerTestDownload, self).setUp()
        self.session = FakeSession()
        self.service = self.session.get_service('s3')
        self.endpoint = self.service.get_endpoint('us-east-1')
        params = {'region': 'us-east-1'}
        self.s3_handler = S3Handler(self.session, params)
        self.s3_handler_multi = S3Handler(self.session, params,
                                          multi_threshold=10, chunksize=2)
        self.bucket = make_s3_files(self.session)
        self.s3_files = [self.bucket + '/text1.txt',
                         self.bucket + '/another_directory/text2.txt']
        directory1 = os.path.abspath('.') + os.sep + 'some_directory' + os.sep
        filename1 = directory1 + "text1.txt"
        directory2 = directory1 + 'another_directory' + os.sep
        filename2 = directory2 + "text2.txt"
        self.loc_files = [filename1, filename2]

        self.fail_session = FakeSession(connection_error=True)
        self.fail_session.s3 = self.session.s3
        self.s3_handler_multi_except = S3Handler(self.fail_session, params,
                                                 multi_threshold=10,
                                                 chunksize=2)

    def tearDown(self):
        super(S3HandlerTestDownload, self).tearDown()
        clean_loc_files(self.loc_files)
        s3_cleanup(self.bucket, self.session)

    def test_download(self):
        # Confirm that the files do not exist.
        for filename in self.loc_files:
            self.assertFalse(os.path.exists(filename))
        # Create file info objects to perform download.
        tasks = []
        time = datetime.datetime.now()
        for i in range(len(self.s3_files)):
            tasks.append(FileInfo(
                src=self.s3_files[i], src_type='s3',
         

鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python utils.number_of_errors函数代码示例发布时间:2022-05-27
下一篇:
Python s3.s3_cleanup函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap