• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python utils.mkdirs函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中swift.common.utils.mkdirs函数的典型用法代码示例。如果您正苦于以下问题:Python mkdirs函数的具体用法?Python mkdirs怎么用?Python mkdirs使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了mkdirs函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_hash_suffix_multi_file_two

    def test_hash_suffix_multi_file_two(self):
        df = diskfile.DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o',
                               FakeLogger())
        mkdirs(df.datadir)
        for tdiff in [1, 50, 100, 500]:
            suffs = ['.meta', '.data']
            if tdiff > 50:
                suffs.append('.ts')
            for suff in suffs:
                f = open(
                    os.path.join(
                        df.datadir,
                        normalize_timestamp(int(time()) - tdiff) + suff),
                    'wb')
                f.write('1234567890')
                f.close()

        ohash = hash_path('a', 'c', 'o')
        data_dir = ohash[-3:]
        whole_path_from = os.path.join(self.objects, '0', data_dir)
        hsh_path = os.listdir(whole_path_from)[0]
        whole_hsh_path = os.path.join(whole_path_from, hsh_path)

        diskfile.hash_suffix(whole_path_from, 99)
        # only the meta and data should be left
        self.assertEquals(len(os.listdir(whole_hsh_path)), 2)
开发者ID:atulshridhar,项目名称:swift,代码行数:26,代码来源:test_diskfile.py


示例2: initialize

 def initialize(self):
     print "initialize start"
     utils.mkdirs(os.path.dirname(self.db_file))
     self.conn = sqlite3.connect(self.db_file, check_same_thread=False)
     self.conn.executescript(
         """
         CREATE TABLE data_crawlers (
             dev_path TEXT,
             datatype TEXT,
             slowdown FLOAT DEFAULT 0.001,
             cycle INTEGER DEFAULT 0,
             UNIQUE (dev_path, datatype)  
         );
         CREATE TABLE data_crawler_runtime (
             dev_path TEXT,
             datatype TEXT,
             cycle INTEGER,
             interval FLOAT,
             slowdown FLOAT, 
             parts INTEGER, 
             timestamp TEXT,
             UNIQUE (dev_path, datatype)  
         );
         """
     )
     self.conn.commit()
     print "initialize DONE"
开发者ID:shaolinjr,项目名称:SwiftCrawler,代码行数:27,代码来源:crawl.py


示例3: test_delete_partition_with_handoff_delete_fail_in_other_region

 def test_delete_partition_with_handoff_delete_fail_in_other_region(self):
     with mock.patch('swift.obj.replicator.http_connect',
                     mock_http_connect(200)):
         df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o')
         mkdirs(df._datadir)
         f = open(os.path.join(df._datadir,
                               normalize_timestamp(time.time()) + '.data'),
                  'wb')
         f.write('1234567890')
         f.close()
         ohash = hash_path('a', 'c', 'o')
         data_dir = ohash[-3:]
         whole_path_from = os.path.join(self.objects, '1', data_dir)
         part_path = os.path.join(self.objects, '1')
         self.assertTrue(os.access(part_path, os.F_OK))
         ring = self.replicator.get_object_ring(0)
         nodes = [node for node in
                  ring.get_part_nodes(1)
                  if node['ip'] not in _ips()]
         process_arg_checker = []
         for node in nodes:
             rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], 1)
             if node['region'] != 1:
                 #  the rsync calls for other region to fail
                 ret_code = 1
             else:
                 ret_code = 0
             process_arg_checker.append(
                 (ret_code, '', ['rsync', whole_path_from, rsync_mod]))
         with _mock_process(process_arg_checker):
             self.replicator.replicate()
         # The file should still exist
         self.assertTrue(os.access(part_path, os.F_OK))
开发者ID:gayana06,项目名称:Thesis,代码行数:33,代码来源:test_replicator.py


示例4: test_quarantine_same_file

    def test_quarantine_same_file(self):
        df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
                                    FakeLogger())
        mkdirs(df.datadir)
        f = open(os.path.join(df.datadir,
                              normalize_timestamp(time()) + '.data'), 'wb')
        setxattr(f.fileno(), diskfile.METADATA_KEY,
                 pickle.dumps({}, diskfile.PICKLE_PROTOCOL))
        df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
                                    FakeLogger())
        new_dir = df.quarantine()
        quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined',
                                'objects', os.path.basename(os.path.dirname(
                                                            df.data_file)))
        self.assert_(os.path.isdir(quar_dir))
        self.assertEquals(quar_dir, new_dir)
        # have to remake the datadir and file
        mkdirs(df.datadir)
        f = open(os.path.join(df.datadir,
                              normalize_timestamp(time()) + '.data'), 'wb')
        setxattr(f.fileno(), diskfile.METADATA_KEY,
                 pickle.dumps({}, diskfile.PICKLE_PROTOCOL))

        df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
                                    FakeLogger(), keep_data_fp=True)
        double_uuid_path = df.quarantine()
        self.assert_(os.path.isdir(double_uuid_path))
        self.assert_('-' in os.path.basename(double_uuid_path))
开发者ID:Awingu,项目名称:swift,代码行数:28,代码来源:test_diskfile.py


示例5: setup_bad_zero_byte

    def setup_bad_zero_byte(self, with_ts=False):
        self.auditor = auditor.ObjectAuditor(self.conf)
        self.auditor.log_time = 0
        ts_file_path = ""
        if with_ts:

            name_hash = hash_path("a", "c", "o")
            dir_path = os.path.join(self.devices, "sda", storage_directory(DATADIR, "0", name_hash))
            ts_file_path = os.path.join(dir_path, "99999.ts")
            if not os.path.exists(dir_path):
                mkdirs(dir_path)
            fp = open(ts_file_path, "w")
            fp.close()

        etag = md5()
        with self.disk_file.mkstemp() as (fd, tmppath):
            etag = etag.hexdigest()
            metadata = {"ETag": etag, "X-Timestamp": str(normalize_timestamp(time.time())), "Content-Length": 10}
            self.disk_file.put(fd, tmppath, metadata)
            etag = md5()
            etag = etag.hexdigest()
            metadata["ETag"] = etag
            write_metadata(fd, metadata)
        if self.disk_file.data_file:
            return self.disk_file.data_file
        return ts_file_path
开发者ID:jness,项目名称:python-swift,代码行数:26,代码来源:test_auditor.py


示例6: test_run_once_1

 def test_run_once_1(self):
     conf = dict(swift_dir=self.testdir, devices=self.devices,
                 mount_check='false', timeout='300', stats_interval='1')
     replicator = object_replicator.ObjectReplicator(conf)
     was_connector = object_replicator.http_connect
     object_replicator.http_connect = mock_http_connect(200)
     cur_part = '0'
     df = self.df_mgr.get_diskfile('sda', cur_part, 'a', 'c', 'o',
                                   policy_idx=1)
     mkdirs(df._datadir)
     f = open(os.path.join(df._datadir,
                           normalize_timestamp(time.time()) + '.data'),
              'wb')
     f.write('1234567890')
     f.close()
     ohash = hash_path('a', 'c', 'o')
     data_dir = ohash[-3:]
     whole_path_from = os.path.join(self.objects_1, cur_part, data_dir)
     process_arg_checker = []
     ring = replicator.get_object_ring(1)
     nodes = [node for node in
              ring.get_part_nodes(int(cur_part))
              if node['ip'] not in _ips()]
     rsync_mods = tuple(['%s::object/sda/objects-1/%s' %
                         (node['ip'], cur_part) for node in nodes])
     for node in nodes:
         process_arg_checker.append(
             (0, '', ['rsync', whole_path_from, rsync_mods]))
     with _mock_process(process_arg_checker):
         replicator.run_once()
     self.assertFalse(process_errors)
     object_replicator.http_connect = was_connector
开发者ID:gayana06,项目名称:Thesis,代码行数:32,代码来源:test_replicator.py


示例7: setUp

 def setUp(self):
     self.orig_hp = utils.HASH_PATH_PREFIX, utils.HASH_PATH_SUFFIX
     utils.HASH_PATH_PREFIX = 'info'
     utils.HASH_PATH_SUFFIX = 'info'
     self.testdir = os.path.join(mkdtemp(), 'tmp_test_cli_info')
     utils.mkdirs(self.testdir)
     rmtree(self.testdir)
     utils.mkdirs(os.path.join(self.testdir, 'sda1'))
     utils.mkdirs(os.path.join(self.testdir, 'sda1', 'tmp'))
     utils.mkdirs(os.path.join(self.testdir, 'sdb1'))
     utils.mkdirs(os.path.join(self.testdir, 'sdb1', 'tmp'))
     self.account_ring_path = os.path.join(self.testdir, 'account.ring.gz')
     with closing(GzipFile(self.account_ring_path, 'wb')) as f:
         pickle.dump(ring.RingData([[0, 1, 0, 1], [1, 0, 1, 0]],
                     [{'id': 0, 'zone': 0, 'device': 'sda1',
                       'ip': '127.0.0.1', 'port': 42},
                      {'id': 1, 'zone': 1, 'device': 'sdb1',
                       'ip': '127.0.0.2', 'port': 43}], 30),
                     f)
     self.container_ring_path = os.path.join(self.testdir,
                                             'container.ring.gz')
     with closing(GzipFile(self.container_ring_path, 'wb')) as f:
         pickle.dump(ring.RingData([[0, 1, 0, 1], [1, 0, 1, 0]],
                     [{'id': 0, 'zone': 0, 'device': 'sda1',
                       'ip': '127.0.0.3', 'port': 42},
                      {'id': 1, 'zone': 1, 'device': 'sdb1',
                       'ip': '127.0.0.4', 'port': 43}], 30),
                     f)
开发者ID:HugoKuo,项目名称:swift,代码行数:28,代码来源:test_info.py


示例8: setUp

    def setUp(self):
        super(TestBaseSsync, self).setUp()
        self.device = 'dev'
        self.partition = '9'
        # sender side setup
        self.tx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_sender')
        utils.mkdirs(os.path.join(self.tx_testdir, self.device))
        self.daemon = FakeReplicator(self.tx_testdir)

        # rx side setup
        self.rx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_receiver')
        utils.mkdirs(os.path.join(self.rx_testdir, self.device))
        conf = {
            'devices': self.rx_testdir,
            'mount_check': 'false',
            'replication_one_per_device': 'false',
            'log_requests': 'false'}
        self.rx_controller = server.ObjectController(conf)
        self.ts_iter = (Timestamp(t)
                        for t in itertools.count(int(time.time())))
        self.rx_ip = '127.0.0.1'
        sock = eventlet.listen((self.rx_ip, 0))
        self.rx_server = eventlet.spawn(
            eventlet.wsgi.server, sock, self.rx_controller, utils.NullLogger())
        self.rx_port = sock.getsockname()[1]
        self.rx_node = {'replication_ip': self.rx_ip,
                        'replication_port': self.rx_port,
                        'device': self.device}
开发者ID:BjoernT,项目名称:swift,代码行数:28,代码来源:test_ssync.py


示例9: setup_partition

 def setup_partition(self, pool, partition):
     path = os.path.join(self.root, pool, self.srvdir, partition)
     if self.fs_per_part:
         fs = '%s/%s/%s' %(self.topfs, self.srvdir, partition)
         zfs_create(pool, fs, path)
     else:
         mkdirs(path)
开发者ID:vineethrp,项目名称:swift,代码行数:7,代码来源:lfszfs.py


示例10: add_synced_container

    def add_synced_container(self, broker):
        """
        Adds the container db represented by broker to the list of synced
        containers.

        :param broker: An instance of ContainerBroker representing the
                       container to add.
        """
        sync_file = self._container_to_synced_container_path(broker.db_file)
        stat = None
        try:
            stat = os.stat(sync_file)
        except OSError as oserr:
            if oserr.errno != errno.ENOENT:
                raise oserr

        if stat is not None:
            return

        sync_path = os.path.dirname(sync_file)
        mkdirs(sync_path)

        try:
            os.symlink(broker.db_file, sync_file)
        except OSError as oserr:
            if oserr.errno != errno.EEXIST or not os.path.islink(sync_file):
                raise oserr
开发者ID:unibg-seclab,项目名称:swift,代码行数:27,代码来源:sync_store.py


示例11: find_and_process

    def find_and_process(self):
        src_filename = time.strftime(self.filename_format)
        working_dir = os.path.join(self.target_dir, ".%-stats_tmp" % self.stats_type)
        shutil.rmtree(working_dir, ignore_errors=True)
        mkdirs(working_dir)
        tmp_filename = os.path.join(working_dir, src_filename)
        hasher = hashlib.md5()
        try:
            with open(tmp_filename, "wb") as statfile:
                statfile.write(self.get_header())
                for device in os.listdir(self.devices):
                    if self.mount_check and not check_mount(self.devices, device):
                        self.logger.error(_("Device %s is not mounted, skipping.") % device)
                        continue
                    db_dir = os.path.join(self.devices, device, self.data_dir)
                    if not os.path.exists(db_dir):
                        self.logger.debug(_("Path %s does not exist, skipping.") % db_dir)
                        continue
                    for root, dirs, files in os.walk(db_dir, topdown=False):
                        for filename in files:
                            if filename.endswith(".db"):
                                db_path = os.path.join(root, filename)
                                try:
                                    line_data = self.get_data(db_path)
                                except sqlite3.Error, err:
                                    self.logger.info(_("Error accessing db %s: %s") % (db_path, err))
                                    continue
                                if line_data:
                                    statfile.write(line_data)
                                    hasher.update(line_data)

            src_filename += hasher.hexdigest()
            renamer(tmp_filename, os.path.join(self.target_dir, src_filename))
开发者ID:VenkataSeshadri,项目名称:slogging,代码行数:33,代码来源:db_stats_collector.py


示例12: test_run_once_recover_from_failure

 def test_run_once_recover_from_failure(self):
     replicator = object_replicator.ObjectReplicator(
         dict(swift_dir=self.testdir, devices=self.devices, mount_check="false", timeout="300", stats_interval="1")
     )
     was_connector = object_replicator.http_connect
     try:
         object_replicator.http_connect = mock_http_connect(200)
         # Write some files into '1' and run replicate- they should be moved
         # to the other partitoins and then node should get deleted.
         cur_part = "1"
         df = DiskFile(self.devices, "sda", cur_part, "a", "c", "o", FakeLogger())
         mkdirs(df.datadir)
         f = open(os.path.join(df.datadir, normalize_timestamp(time.time()) + ".data"), "wb")
         f.write("1234567890")
         f.close()
         ohash = hash_path("a", "c", "o")
         data_dir = ohash[-3:]
         whole_path_from = os.path.join(self.objects, cur_part, data_dir)
         process_arg_checker = []
         nodes = [node for node in self.ring.get_part_nodes(int(cur_part)) if node["ip"] not in _ips()]
         for node in nodes:
             rsync_mod = "%s::object/sda/objects/%s" % (node["ip"], cur_part)
             process_arg_checker.append((0, "", ["rsync", whole_path_from, rsync_mod]))
         self.assertTrue(os.access(os.path.join(self.objects, "1", data_dir, ohash), os.F_OK))
         with _mock_process(process_arg_checker):
             replicator.run_once()
         self.assertFalse(process_errors)
         for i, result in [("0", True), ("1", False), ("2", True), ("3", True)]:
             self.assertEquals(
                 os.access(os.path.join(self.objects, i, object_replicator.HASH_FILE), os.F_OK), result
             )
     finally:
         object_replicator.http_connect = was_connector
开发者ID:mygoda,项目名称:openstack,代码行数:33,代码来源:test_replicator.py


示例13: test_run_once

    def test_run_once(self):
        replicator = object_replicator.ObjectReplicator(
            dict(swift_dir=self.testdir, devices=self.devices, mount_check="false", timeout="300", stats_interval="1")
        )
        was_connector = object_replicator.http_connect
        object_replicator.http_connect = mock_http_connect(200)
        cur_part = "0"
        df = DiskFile(self.devices, "sda", cur_part, "a", "c", "o", FakeLogger())
        mkdirs(df.datadir)
        f = open(os.path.join(df.datadir, normalize_timestamp(time.time()) + ".data"), "wb")
        f.write("1234567890")
        f.close()
        ohash = hash_path("a", "c", "o")
        data_dir = ohash[-3:]
        whole_path_from = os.path.join(self.objects, cur_part, data_dir)
        process_arg_checker = []
        nodes = [node for node in self.ring.get_part_nodes(int(cur_part)) if node["ip"] not in _ips()]
        for node in nodes:
            rsync_mod = "%s::object/sda/objects/%s" % (node["ip"], cur_part)
            process_arg_checker.append((0, "", ["rsync", whole_path_from, rsync_mod]))
        with _mock_process(process_arg_checker):
            replicator.run_once()
        self.assertFalse(process_errors)

        object_replicator.http_connect = was_connector
开发者ID:mygoda,项目名称:openstack,代码行数:25,代码来源:test_replicator.py


示例14: setUp

    def setUp(self):
        self.testdir = os.path.join(mkdtemp(), "tmp_test_object_auditor")
        self.devices = os.path.join(self.testdir, "node")
        self.rcache = os.path.join(self.testdir, "object.recon")
        self.logger = FakeLogger()
        rmtree(self.testdir, ignore_errors=1)
        mkdirs(os.path.join(self.devices, "sda"))
        os.mkdir(os.path.join(self.devices, "sdb"))

        # policy 0
        self.objects = os.path.join(self.devices, "sda", get_data_dir(POLICIES[0]))
        self.objects_2 = os.path.join(self.devices, "sdb", get_data_dir(POLICIES[0]))
        os.mkdir(self.objects)
        # policy 1
        self.objects_p1 = os.path.join(self.devices, "sda", get_data_dir(POLICIES[1]))
        self.objects_2_p1 = os.path.join(self.devices, "sdb", get_data_dir(POLICIES[1]))
        os.mkdir(self.objects_p1)

        self.parts = self.parts_p1 = {}
        for part in ["0", "1", "2", "3"]:
            self.parts[part] = os.path.join(self.objects, part)
            self.parts_p1[part] = os.path.join(self.objects_p1, part)
            os.mkdir(os.path.join(self.objects, part))
            os.mkdir(os.path.join(self.objects_p1, part))

        self.conf = dict(devices=self.devices, mount_check="false", object_size_stats="10,100,1024,10240")
        self.df_mgr = DiskFileManager(self.conf, self.logger)

        # diskfiles for policy 0, 1
        self.disk_file = self.df_mgr.get_diskfile("sda", "0", "a", "c", "o", policy=POLICIES[0])
        self.disk_file_p1 = self.df_mgr.get_diskfile("sda", "0", "a", "c", "o", policy=POLICIES[1])
开发者ID:renanalan,项目名称:swift,代码行数:31,代码来源:test_auditor.py


示例15: setup_bad_zero_byte

    def setup_bad_zero_byte(self, with_ts=False):
        self.auditor = auditor.ObjectAuditor(self.conf)
        self.auditor.log_time = 0
        ts_file_path = ''
        if with_ts:
            name_hash = hash_path('a', 'c', 'o')
            dir_path = os.path.join(
                self.devices, 'sda',
                storage_directory(get_data_dir(0), '0', name_hash))
            ts_file_path = os.path.join(dir_path, '99999.ts')
            if not os.path.exists(dir_path):
                mkdirs(dir_path)
            fp = open(ts_file_path, 'w')
            write_metadata(fp, {'X-Timestamp': '99999', 'name': '/a/c/o'})
            fp.close()

        etag = md5()
        with self.disk_file.create() as writer:
            etag = etag.hexdigest()
            metadata = {
                'ETag': etag,
                'X-Timestamp': str(normalize_timestamp(time.time())),
                'Content-Length': 10,
            }
            writer.put(metadata)
            etag = md5()
            etag = etag.hexdigest()
            metadata['ETag'] = etag
            write_metadata(writer._fd, metadata)
        return ts_file_path
开发者ID:AsherBond,项目名称:swift,代码行数:30,代码来源:test_auditor.py


示例16: create

    def create(self, size=None):
        """
        Context manager to create a file. We create a temporary file first, and
        then return a DiskFileWriter object to encapsulate the state.

        .. note::

            An implementation is not required to perform on-disk
            preallocations even if the parameter is specified. But if it does
            and it fails, it must raise a `DiskFileNoSpace` exception.

        :param size: optional initial size of file to explicitly allocate on
                     disk
        :raises DiskFileNoSpace: if a size is specified and allocation fails
        """
        if not exists(self._tmpdir):
            mkdirs(self._tmpdir)
        fd, tmppath = mkstemp(dir=self._tmpdir)
        try:
            if size is not None and size > 0:
                try:
                    fallocate(fd, size)
                except OSError:
                    raise DiskFileNoSpace()
            yield DiskFileWriter(self._name, self._datadir, fd, tmppath,
                                 self._bytes_per_sync, self._threadpool)
        finally:
            try:
                os.close(fd)
            except OSError:
                pass
            try:
                os.unlink(tmppath)
            except OSError:
                pass
开发者ID:yohsuke,项目名称:swift,代码行数:35,代码来源:diskfile.py


示例17: writer

    def writer(self, size=None):
        """
        Context manager to write a file. We create a temporary file first, and
        then return a DiskWriter object to encapsulate the state.

        :param size: optional initial size of file to explicitly allocate on
                     disk
        :raises DiskFileNoSpace: if a size is specified and allocation fails
        """
        if not os.path.exists(self.tmpdir):
            mkdirs(self.tmpdir)
        fd, tmppath = mkstemp(dir=self.tmpdir)
        try:
            if size is not None and size > 0:
                try:
                    fallocate(fd, size)
                except OSError:
                    raise DiskFileNoSpace()
            yield DiskWriter(self, fd, tmppath, self.threadpool)
        finally:
            try:
                os.close(fd)
            except OSError:
                pass
            try:
                os.unlink(tmppath)
            except OSError:
                pass
开发者ID:iBeacons,项目名称:swift,代码行数:28,代码来源:diskfile.py


示例18: setUp

    def setUp(self):
        self.testdir = os.path.join(mkdtemp(), 'tmp_test_object_auditor')
        self.devices = os.path.join(self.testdir, 'node')
        self.rcache = os.path.join(self.testdir, 'object.recon')
        self.logger = FakeLogger()
        rmtree(self.testdir, ignore_errors=1)
        mkdirs(os.path.join(self.devices, 'sda'))
        os.mkdir(os.path.join(self.devices, 'sdb'))

        # policy 0
        self.objects = os.path.join(self.devices, 'sda', get_data_dir(0))
        self.objects_2 = os.path.join(self.devices, 'sdb', get_data_dir(0))
        os.mkdir(self.objects)
        # policy 1
        self.objects_p1 = os.path.join(self.devices, 'sda', get_data_dir(1))
        self.objects_2_p1 = os.path.join(self.devices, 'sdb', get_data_dir(1))
        os.mkdir(self.objects_p1)

        self.parts = self.parts_p1 = {}
        for part in ['0', '1', '2', '3']:
            self.parts[part] = os.path.join(self.objects, part)
            self.parts_p1[part] = os.path.join(self.objects_p1, part)
            os.mkdir(os.path.join(self.objects, part))
            os.mkdir(os.path.join(self.objects_p1, part))

        self.conf = dict(
            devices=self.devices,
            mount_check='false',
            object_size_stats='10,100,1024,10240')
        self.df_mgr = DiskFileManager(self.conf, self.logger)

        # diskfiles for policy 0, 1
        self.disk_file = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o', 0)
        self.disk_file_p1 = self.df_mgr.get_diskfile('sda', '0', 'a', 'c',
                                                     'o', 1)
开发者ID:AsherBond,项目名称:swift,代码行数:35,代码来源:test_auditor.py


示例19: test_delete_partition

 def test_delete_partition(self):
     df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger())
     mkdirs(df.datadir)
     part_path = os.path.join(self.objects, '1')
     self.assertTrue(os.access(part_path, os.F_OK))
     self.replicator.replicate()
     self.assertFalse(os.access(part_path, os.F_OK))
开发者ID:ChenZhengtongnju,项目名称:swift,代码行数:7,代码来源:test_replicator.py


示例20: test_run_once_recover_from_timeout

    def test_run_once_recover_from_timeout(self):
        replicator = object_replicator.ObjectReplicator(
            dict(swift_dir=self.testdir, devices=self.devices,
                mount_check='false', timeout='300', stats_interval='1'))
        was_connector = object_replicator.http_connect
        was_get_hashes = object_replicator.get_hashes
        was_execute = tpool.execute
        self.get_hash_count = 0
        try:

            def fake_get_hashes(*args, **kwargs):
                self.get_hash_count += 1
                if self.get_hash_count == 3:
                    # raise timeout on last call to get hashes
                    raise Timeout()
                return 2, {'abc': 'def'}

            def fake_exc(tester, *args, **kwargs):
                if 'Error syncing partition' in args[0]:
                    tester.i_failed = True

            self.i_failed = False
            object_replicator.http_connect = mock_http_connect(200)
            object_replicator.get_hashes = fake_get_hashes
            replicator.logger.exception = \
                lambda *args, **kwargs: fake_exc(self, *args, **kwargs)
            # Write some files into '1' and run replicate- they should be moved
            # to the other partitions and then node should get deleted.
            cur_part = '1'
            df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o',
                          FakeLogger())
            mkdirs(df.datadir)
            f = open(os.path.join(df.datadir,
                                  normalize_timestamp(time.time()) + '.data'),
                     'wb')
            f.write('1234567890')
            f.close()
            ohash = hash_path('a', 'c', 'o')
            data_dir = ohash[-3:]
            whole_path_from = os.path.join(self.objects, cur_part, data_dir)
            process_arg_checker = []
            nodes = [node for node in
                     self.ring.get_part_nodes(int(cur_part))
                         if node['ip'] not in _ips()]
            for node in nodes:
                rsync_mod = '%s::object/sda/objects/%s' % (node['ip'],
                                                           cur_part)
                process_arg_checker.append(
                    (0, '', ['rsync', whole_path_from, rsync_mod]))
            self.assertTrue(os.access(os.path.join(self.objects,
                                                   '1', data_dir, ohash),
                                      os.F_OK))
            with _mock_process(process_arg_checker):
                replicator.run_once()
            self.assertFalse(process_errors)
            self.assertFalse(self.i_failed)
        finally:
            object_replicator.http_connect = was_connector
            object_replicator.get_hashes = was_get_hashes
            tpool.execute = was_execute
开发者ID:ChenZhengtongnju,项目名称:swift,代码行数:60,代码来源:test_replicator.py



注:本文中的swift.common.utils.mkdirs函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python utils.normalize_delete_at_timestamp函数代码示例发布时间:2022-05-27
下一篇:
Python utils.lock_path函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap