• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python pathlib.join函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pulp_node.pathlib.join函数的典型用法代码示例。如果您正苦于以下问题:Python join函数的具体用法?Python join怎么用?Python join使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了join函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: publish

 def publish(self, units):
     """
     Publish the specified units.
     Writes the units.json file and symlinks each of the files associated
     to the unit.storage_path.  Publishing is staged in a temporary directory and
     must use commit() to make the publishing permanent.
     :param units: A list of units to publish.
     :type units: iterable
     :return: The absolute path to the manifest.
     :rtype: str
     """
     pathlib.mkdir(self.publish_dir)
     self.tmp_dir = mkdtemp(dir=self.publish_dir)
     units_path = pathlib.join(self.tmp_dir, UNITS_FILE_NAME)
     manifest_path = pathlib.join(self.tmp_dir, MANIFEST_FILE_NAME)
     with UnitWriter(units_path) as writer:
         for unit in units:
             self.publish_unit(unit)
             writer.add(unit)
     manifest_id = str(uuid4())
     manifest = Manifest(manifest_id)
     manifest.set_units(writer)
     manifest_path = manifest.write(manifest_path)
     self.staged = True
     return manifest_path
开发者ID:cliffy94,项目名称:pulp,代码行数:25,代码来源:publisher.py


示例2: _url_and_destination

 def _url_and_destination(self, base_url, unit):
     """
     Get the download URL and download destination.
     :param base_url: The base URL.
     :type base_url: str
     :param unit: A content unit.
     :type unit: dict
     :return: (url, destination)
     :rtype: tuple(2)
     """
     storage_path = unit[constants.STORAGE_PATH]
     tar_path = unit.get(constants.TARBALL_PATH)
     if not tar_path:
         # The pulp/nodes/content endpoint provides all content.
         # This replaced the publishing of individual links for each unit.
         parsed = urlparse(base_url)
         relative_path = unit[constants.RELATIVE_PATH]
         path = pathlib.join(constants.CONTENT_PATH, pathlib.quote(relative_path))
         base_url = ParseResult(
             scheme=parsed.scheme,
             netloc=parsed.netloc,
             path=path,
             params=parsed.params,
             query=parsed.query,
             fragment=parsed.fragment)
         return base_url.geturl(), storage_path
     else:
         return pathlib.url_join(base_url, pathlib.quote(tar_path)),\
             pathlib.join(os.path.dirname(storage_path), os.path.basename(tar_path))
开发者ID:maxamillion,项目名称:pulp,代码行数:29,代码来源:strategies.py


示例3: fetch_units

    def fetch_units(self):
        """
        Fetch the units file referenced in the manifest.
        :raise HTTPError: on URL errors.
-       :raise ValueError: on json decoding errors
        """
        base_url = self.url.rsplit('/', 1)[0]
        url = pathlib.join(base_url, UNITS_FILE_NAME)
        destination = pathlib.join(os.path.dirname(self.path), UNITS_FILE_NAME)
        request = DownloadRequest(str(url), destination)
        self.downloader.download([request])
开发者ID:dhajoshi,项目名称:pulp,代码行数:11,代码来源:manifest.py


示例4: test_publisher

 def test_publisher(self):
     # setup
     units = self.populate()
     # test
     # publish
     repo_id = 'test_repo'
     base_url = 'file://'
     publish_dir = os.path.join(self.tmpdir, 'nodes/repos')
     repo_publish_dir = os.path.join(publish_dir, repo_id)
     virtual_host = (publish_dir, publish_dir)
     with HttpPublisher(base_url, virtual_host, repo_id, repo_publish_dir) as p:
         p.publish(units)
         p.commit()
     # verify
     conf = DownloaderConfig()
     downloader = LocalFileDownloader(conf)
     manifest_path = p.manifest_path()
     working_dir = os.path.join(self.tmpdir, 'working_dir')
     os.makedirs(working_dir)
     url = pathlib.url_join(base_url, manifest_path)
     manifest = RemoteManifest(url, downloader, working_dir)
     manifest.fetch()
     manifest.fetch_units()
     self.assertTrue(manifest.has_valid_units())
     units = manifest.get_units()
     n = 0
     for unit, ref in units:
         self.assertEqual(
             manifest.publishing_details[constants.BASE_URL],
             pathlib.url_join(base_url, publish_dir, repo_id))
         if n == 0:  # TARBALL
             path = pathlib.join(publish_dir, repo_id, unit[constants.TARBALL_PATH])
             self.assertTrue(os.path.isfile(path))
         else:
             path = pathlib.join(publish_dir, repo_id, unit[constants.RELATIVE_PATH])
             self.assertTrue(os.path.islink(path))
             self.assertEqual(unit[constants.FILE_SIZE], os.path.getsize(path))
         if n == 0:  # TARBALL
             path = pathlib.join(publish_dir, repo_id, unit[constants.TARBALL_PATH])
             tb = tarfile.open(path)
             try:
                 files = sorted(tb.getnames())
             finally:
                 tb.close()
             self.assertEqual(len(files), self.NUM_TARED_FILES)
         else:
             path = pathlib.join(publish_dir, repo_id, unit[constants.RELATIVE_PATH])
             with open(path, 'rb') as fp:
                 unit_content = fp.read()
                 self.assertEqual(unit_content, unit_content)
         self.assertEqual(unit['unit_key']['n'], n)
         n += 1
开发者ID:credativ,项目名称:pulp,代码行数:52,代码来源:test_publishers.py


示例5: manifest_path

 def manifest_path(self):
     """
     Get the relative URL path to the manifest.
     :return: The path component of the URL.
     :rtype: str
     """
     return pathlib.join(self.alias[0], self.repo_id, MANIFEST_FILE_NAME)
开发者ID:cliffy94,项目名称:pulp,代码行数:7,代码来源:publisher.py


示例6: fetch_units

 def fetch_units(self):
     """
     Fetch the units file referenced in the manifest.
     :raise ManifestDownloadError: on downloading errors.
     :raise HTTPError: on URL errors.
     :raise ValueError: on json decoding errors
     """
     base_url = self.url.rsplit('/', 1)[0]
     url = pathlib.join(base_url, UNITS_FILE_NAME)
     destination = pathlib.join(os.path.dirname(self.path), UNITS_FILE_NAME)
     request = DownloadRequest(str(url), destination)
     listener = AggregatingEventListener()
     self.downloader.event_listener = listener
     self.downloader.download([request])
     if listener.failed_reports:
         report = listener.failed_reports[0]
         raise ManifestDownloadError(self.url, report.error_msg)
开发者ID:AndreaGiardini,项目名称:pulp,代码行数:17,代码来源:manifest.py


示例7: commit

 def commit(self):
     """
     Commit publishing.
     Move the tmp_dir to the publish_dir.
     """
     if not self.staged:
         # nothing to commit
         return
     dir_path = pathlib.join(self.publish_dir, self.repo_id)
     os.system('rm -rf %s' % dir_path)
     os.rename(self.tmp_dir, dir_path)
     self.staged = False
开发者ID:fdammeke,项目名称:pulp,代码行数:12,代码来源:publisher.py


示例8: commit

 def commit(self):
     """
     Commit publishing.
     Move the tmp_dir to the publish_dir.
     """
     if not self.staged:
         # nothing to commit
         return
     dir_path = pathlib.join(self.publish_dir, self.repo_id)
     rmtree(dir_path, ignore_errors=True)
     os.rename(self.tmp_dir, dir_path)
     self.staged = False
开发者ID:cliffy94,项目名称:pulp,代码行数:12,代码来源:publisher.py


示例9: get_units

    def get_units(self):
        """
        Get the content units referenced in the manifest.
        :return: An iterator used to read downloaded content units.
        :rtype: iterable
        :raise IOError: on I/O errors.
-       :raise ValueError: json decoding errors
        """
        if self.total_units:
            path = pathlib.join(os.path.dirname(self.path), UNITS_FILE_NAME)
            return UnitIterator(path, self.total_units)
        else:
            return []
开发者ID:dhajoshi,项目名称:pulp,代码行数:13,代码来源:manifest.py


示例10: fetch

    def fetch(self, url, dir_path, downloader):
        """
        Fetch the manifest file using the specified URL.
        :param url: The URL to the manifest.
        :type url: str
        :param dir_path: The absolute path to a directory for the downloaded manifest.
        :type dir_path: str
        :param downloader: The nectar downloader to be used.
        :type downloader: nectar.downloaders.base.Downloader
        :raise HTTPError: on URL errors.
-       :raise ValueError: on json decoding errors
        """
        destination = pathlib.join(dir_path, MANIFEST_FILE_NAME)
        request = DownloadRequest(str(url), destination)
        request_list = [request]
        downloader.download(request_list)
        if compressed(destination):
            destination = decompress(destination)
        with open(destination) as fp:
            manifest = json.load(fp)
            self.__dict__.update(manifest)
            self.units_path = pathlib.join(dir_path, os.path.basename(self.units_path))
开发者ID:cliffy94,项目名称:pulp,代码行数:22,代码来源:manifest.py


示例11: __init__

 def __init__(self, path):
     """
     :param path: The absolute path to a file or directory.
         When a directory is specified, the standard file name is appended.
     :type path: str
     :raise IOError: on I/O errors
     """
     if os.path.isdir(path):
         path = pathlib.join(path, UNITS_FILE_NAME)
     self.path = path
     self.fp = gzip.open(path, 'wb')
     self.total_units = 0
     self.bytes_written = 0
开发者ID:AndreaGiardini,项目名称:pulp,代码行数:13,代码来源:manifest.py


示例12: has_valid_units

 def has_valid_units(self):
     """
     Validate the associated units file by comparing the size of the
     units file to units_size in the manifest.
     :return: True if valid.
     :rtype: bool
     """
     try:
         path = pathlib.join(os.path.dirname(self.path), UNITS_FILE_NAME)
         size = os.path.getsize(path)
         return size == self.units_size
     except OSError, e:
         if e.errno != errno.ENOENT:
             raise
开发者ID:dhajoshi,项目名称:pulp,代码行数:14,代码来源:manifest.py


示例13: _update_storage_path

 def _update_storage_path(self, unit):
     """
     Update the unit's storage_path using the storage_dir defined in
     server.conf and the relative_path injected when the unit was published.
     :param unit: A published unit.
     :type unit: dict
     """
     storage_path = unit.get(constants.STORAGE_PATH)
     if not storage_path:
         return
     storage_dir = pulp_conf.get("server", "storage_dir")
     relative_path = unit[constants.RELATIVE_PATH]
     storage_path = pathlib.join(storage_dir, relative_path)
     unit[constants.STORAGE_PATH] = storage_path
开发者ID:kbotc,项目名称:pulp,代码行数:14,代码来源:strategies.py


示例14: _path_and_destination

 def _path_and_destination(self, unit):
     """
     Get the path component of the download URL and download destination.
     :param unit: A content unit.
     :type unit: dict
     :return: (path, destination)
     :rtype: tuple(2)
     """
     storage_path = unit[constants.STORAGE_PATH]
     tar_path = unit.get(constants.TARBALL_PATH)
     if not tar_path:
         relative_path = unit[constants.RELATIVE_PATH]
         return pathlib.quote(relative_path), storage_path
     else:
         return pathlib.quote(tar_path), pathlib.join(os.path.dirname(storage_path), os.path.basename(tar_path))
开发者ID:credativ,项目名称:pulp,代码行数:15,代码来源:strategies.py


示例15: __init__

 def __init__(self, url, downloader, destination):
     """
     :param url: The URL to the remote manifest.
     :type url: str
     :param downloader: The downloader used for fetch methods.
     :type downloader: nectar.downloaders.base.Downloader
     :param destination: An absolute path to a file or directory.
     :type destination: str
     """
     if os.path.isdir(destination):
         destination = pathlib.join(destination, MANIFEST_FILE_NAME)
     Manifest.__init__(self, destination)
     self.url = str(url)
     self.downloader = downloader
     self.destination = destination
开发者ID:fdammeke,项目名称:pulp,代码行数:15,代码来源:manifest.py


示例16: _reset_storage_path

 def _reset_storage_path(self, unit):
     """
     Reset the storage_path using the storage_dir defined in
     server.conf and the relative_path injected when the unit was published.
     :param unit: A published unit.
     :type unit: dict
     :return: The re-oriented storage path.
     :rtype: str
     """
     storage_path = unit.get(constants.STORAGE_PATH)
     if not storage_path:
         return
     storage_dir = pulp_conf.get('server', 'storage_dir')
     relative_path = unit[constants.RELATIVE_PATH]
     storage_path = pathlib.join(storage_dir, relative_path)
     unit[constants.STORAGE_PATH] = storage_path
开发者ID:maxamillion,项目名称:pulp,代码行数:16,代码来源:strategies.py


示例17: test_publisher

 def test_publisher(self):
     # setup
     units = self.populate()
     # test
     # publish
     repo_id = 'test_repo'
     base_url = 'file://'
     publish_dir = os.path.join(self.tmpdir, 'nodes/repos')
     virtual_host = (publish_dir, publish_dir)
     with HttpPublisher(base_url, virtual_host, repo_id) as p:
         p.publish(units)
         p.commit()
     # verify
     conf = DownloaderConfig()
     downloader = HTTPSCurlDownloader(conf)
     manifest_path = p.manifest_path()
     working_dir = os.path.join(self.tmpdir, 'working_dir')
     os.makedirs(working_dir)
     manifest = Manifest()
     url = pathlib.url_join(base_url, manifest_path)
     manifest.fetch(url, working_dir, downloader)
     manifest.fetch_units(url, downloader)
     units = manifest.get_units()
     n = 0
     for unit, ref in units:
         if n == 0:
             self.assertTrue(unit[constants.PUBLISHED_AS_TARBALL])
         else:
             self.assertFalse(unit.get(constants.PUBLISHED_AS_TARBALL, False))
         path = pathlib.join(publish_dir, repo_id, unit[constants.RELATIVE_PATH])
         self.assertEqual(
             manifest.publishing_details[constants.BASE_URL],
             pathlib.url_join(base_url, publish_dir, repo_id))
         if n == 0:
             self.assertTrue(os.path.isfile(path))
         else:
             self.assertTrue(os.path.islink(path))
         if n == 0:
             with tarfile.open(path) as tb:
                 files = sorted(tb.getnames())
             self.assertEqual(len(files), self.NUM_TARED_FILES + 1)
         else:
             with open(path, 'rb') as fp:
                 unit_content = fp.read()
                 self.assertEqual(unit_content, unit_content)
         self.assertEqual(unit['unit_key']['n'], n)
         n += 1
开发者ID:kbotc,项目名称:pulp,代码行数:47,代码来源:test_publishers.py


示例18: publish_unit

 def publish_unit(self, unit):
     """
     Publish the file associated with the unit into the publish directory.
     :param unit: A content unit.
     :type unit: dict
     """
     storage_path = unit.get(constants.STORAGE_PATH)
     if not storage_path:
         # not all units have associated files.
         return unit, None
     relative_path = unit[constants.RELATIVE_PATH]
     published_path = pathlib.join(self.tmp_dir, relative_path)
     pathlib.mkdir(os.path.dirname(published_path))
     unit[constants.FILE_SIZE] = os.path.getsize(storage_path)
     if os.path.isdir(storage_path):
         tar_dir(storage_path, tar_path(published_path))
         unit[constants.TARBALL_PATH] = tar_path(relative_path)
     else:
         os.symlink(storage_path, published_path)
开发者ID:fdammeke,项目名称:pulp,代码行数:19,代码来源:publisher.py


示例19: publish_unit

 def publish_unit(self, unit):
     """
     Publish the file associated with the unit into the publish directory.
     :param unit: A content unit.
     :type unit: dict
     """
     storage_path = unit.get('storage_path')
     if not storage_path:
         # not all units have associated files.
         return unit, None
     relative_path = unit['relative_path']
     published_path = pathlib.join(self.tmp_dir, relative_path)
     pathlib.mkdir(os.path.dirname(published_path))
     if os.path.isdir(storage_path):
         self.tar_dir(storage_path, published_path)
         unit[constants.PUBLISHED_AS_TARBALL] = True
     else:
         os.symlink(storage_path, published_path)
         unit[constants.PUBLISHED_AS_FILE] = True
开发者ID:cliffy94,项目名称:pulp,代码行数:19,代码来源:publisher.py


示例20: units_path

 def units_path(self):
     """
     Get the absolute path to the associated units file.
     The path is
     """
     return self.units[UNITS_PATH] or pathlib.join(os.path.dirname(self.path), UNITS_FILE_NAME)
开发者ID:AndreaGiardini,项目名称:pulp,代码行数:6,代码来源:manifest.py



注:本文中的pulp_node.pathlib.join函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python pathlib.url_join函数代码示例发布时间:2022-05-25
下一篇:
Python manifest.Manifest类代码示例发布时间:2022-05-25
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap