• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python test_sftp.get_sftp函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中test_sftp.get_sftp函数的典型用法代码示例。如果您正苦于以下问题:Python get_sftp函数的具体用法?Python get_sftp怎么用?Python get_sftp使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get_sftp函数的12个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_2_big_file

    def test_2_big_file(self):
        """
        write a 1MB file with no buffering.
        """
        sftp = get_sftp()
        kblob = (1024 * 'x')
        start = time.time()
        try:
            f = sftp.open('%s/hongry.txt' % FOLDER, 'w')
            for n in range(1024):
                f.write(kblob)
                if n % 128 == 0:
                    sys.stderr.write('.')
            f.close()
            sys.stderr.write(' ')

            self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
            end = time.time()
            sys.stderr.write('%ds ' % round(end - start))
            
            start = time.time()
            f = sftp.open('%s/hongry.txt' % FOLDER, 'r')
            for n in range(1024):
                data = f.read(1024)
                self.assertEqual(data, kblob)
            f.close()

            end = time.time()
            sys.stderr.write('%ds ' % round(end - start))
        finally:
            sftp.remove('%s/hongry.txt' % FOLDER)
开发者ID:j-craig,项目名称:paramiko,代码行数:31,代码来源:test_sftp_big.py


示例2: test_A_big_file_renegotiate

 def test_A_big_file_renegotiate(self):
     """
     write a 1MB file, forcing key renegotiation in the middle.
     """
     sftp = get_sftp()
     t = sftp.sock.get_transport()
     t.packetizer.REKEY_BYTES = 512 * 1024
     k32blob = (32 * 1024 * 'x')
     try:
         f = sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024)
         for i in xrange(32):
             f.write(k32blob)
         f.close()
         
         self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
         self.assertNotEquals(t.H, t.session_id)
         
         # try to read it too.
         f = sftp.open('%s/hongry.txt' % FOLDER, 'r', 128 * 1024)
         f.prefetch()
         total = 0
         while total < 1024 * 1024:
             total += len(f.read(32 * 1024))
         f.close()
     finally:
         sftp.remove('%s/hongry.txt' % FOLDER)
         t.packetizer.REKEY_BYTES = pow(2, 30)
开发者ID:j-craig,项目名称:paramiko,代码行数:27,代码来源:test_sftp_big.py


示例3: test_1_lots_of_files

    def test_1_lots_of_files(self):
        """
        create a bunch of files over the same session.
        """
        sftp = get_sftp()
        numfiles = 100
        try:
            for i in range(numfiles):
                f = sftp.open('%s/file%d.txt' % (FOLDER, i), 'w', 1)
                f.write('this is file #%d.\n' % i)
                f.close()
                sftp.chmod('%s/file%d.txt' % (FOLDER, i), 0660)

            # now make sure every file is there, by creating a list of filenmes
            # and reading them in random order.
            numlist = range(numfiles)
            while len(numlist) > 0:
                r = numlist[random.randint(0, len(numlist) - 1)]
                f = sftp.open('%s/file%d.txt' % (FOLDER, r))
                self.assertEqual(f.readline(), 'this is file #%d.\n' % r)
                f.close()
                numlist.remove(r)
        finally:
            for i in range(numfiles):
                try:
                    sftp.remove('%s/file%d.txt' % (FOLDER, i))
                except:
                    pass
开发者ID:j-craig,项目名称:paramiko,代码行数:28,代码来源:test_sftp_big.py


示例4: test_8_large_readv

    def test_8_large_readv(self):
        """
        verify that a very large readv is broken up correctly and still
        returned as a single blob.
        """
        sftp = get_sftp()
        kblob = ''.join([struct.pack('>H', n) for n in xrange(512)])
        try:
            f = sftp.open('%s/hongry.txt' % FOLDER, 'w')
            f.set_pipelined(True)
            for n in range(1024):
                f.write(kblob)
                if n % 128 == 0:
                    sys.stderr.write('.')
            f.close()
            sys.stderr.write(' ')

            self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
            
            f = sftp.open('%s/hongry.txt' % FOLDER, 'r')
            data = list(f.readv([(23 * 1024, 128 * 1024)]))
            self.assertEqual(1, len(data))
            data = data[0]
            self.assertEqual(128 * 1024, len(data))
            
            f.close()
            sys.stderr.write(' ')
        finally:
            sftp.remove('%s/hongry.txt' % FOLDER)
开发者ID:j-craig,项目名称:paramiko,代码行数:29,代码来源:test_sftp_big.py


示例5: test_6_lots_of_prefetching

    def test_6_lots_of_prefetching(self):
        """
        prefetch a 1MB file a bunch of times, discarding the file object
        without using it, to verify that paramiko doesn't get confused.
        """
        sftp = get_sftp()
        kblob = (1024 * 'x')
        try:
            f = sftp.open('%s/hongry.txt' % FOLDER, 'w')
            f.set_pipelined(True)
            for n in range(1024):
                f.write(kblob)
                if n % 128 == 0:
                    sys.stderr.write('.')
            f.close()
            sys.stderr.write(' ')

            self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)

            for i in range(10):
                f = sftp.open('%s/hongry.txt' % FOLDER, 'r')
                f.prefetch()
            f = sftp.open('%s/hongry.txt' % FOLDER, 'r')
            f.prefetch()
            for n in range(1024):
                data = f.read(1024)
                self.assertEqual(data, kblob)
                if n % 128 == 0:
                    sys.stderr.write('.')
            f.close()
            sys.stderr.write(' ')
        finally:
            sftp.remove('%s/hongry.txt' % FOLDER)
开发者ID:j-craig,项目名称:paramiko,代码行数:33,代码来源:test_sftp_big.py


示例6: setUp

 def setUp(self):
     global FOLDER
     sftp = get_sftp()
     for i in xrange(1000):
         FOLDER = FOLDER[:-3] + '%03d' % i
         try:
             sftp.mkdir(FOLDER)
             break
         except (IOError, OSError):
             pass
开发者ID:j-craig,项目名称:paramiko,代码行数:10,代码来源:test_sftp_big.py


示例7: test_9_big_file_big_buffer

    def test_9_big_file_big_buffer(self):
        """
        write a 1MB file, with no linefeeds, and a big buffer.
        """
        sftp = get_sftp()
        mblob = (1024 * 1024 * 'x')
        try:
            f = sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024)
            f.write(mblob)
            f.close()

            self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
        finally:
            sftp.remove('%s/hongry.txt' % FOLDER)
开发者ID:j-craig,项目名称:paramiko,代码行数:14,代码来源:test_sftp_big.py


示例8: test_3_big_file_pipelined

    def test_3_big_file_pipelined(self):
        """
        write a 1MB file, with no linefeeds, using pipelining.
        """
        sftp = get_sftp()
        kblob = ''.join([struct.pack('>H', n) for n in xrange(512)])
        start = time.time()
        try:
            f = sftp.open('%s/hongry.txt' % FOLDER, 'w')
            f.set_pipelined(True)
            for n in range(1024):
                f.write(kblob)
                if n % 128 == 0:
                    sys.stderr.write('.')
            f.close()
            sys.stderr.write(' ')

            self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
            end = time.time()
            sys.stderr.write('%ds ' % round(end - start))
            
            start = time.time()
            f = sftp.open('%s/hongry.txt' % FOLDER, 'r')
            f.prefetch()

            # read on odd boundaries to make sure the bytes aren't getting scrambled
            n = 0
            k2blob = kblob + kblob
            chunk = 629
            size = 1024 * 1024
            while n < size:
                if n + chunk > size:
                    chunk = size - n
                data = f.read(chunk)
                offset = n % 1024
                self.assertEqual(data, k2blob[offset:offset + chunk])
                n += chunk
            f.close()

            end = time.time()
            sys.stderr.write('%ds ' % round(end - start))
        finally:
            sftp.remove('%s/hongry.txt' % FOLDER)
开发者ID:j-craig,项目名称:paramiko,代码行数:43,代码来源:test_sftp_big.py


示例9: test_5_readv_seek

    def test_5_readv_seek(self):
        sftp = get_sftp()
        kblob = ''.join([struct.pack('>H', n) for n in xrange(512)])
        try:
            f = sftp.open('%s/hongry.txt' % FOLDER, 'w')
            f.set_pipelined(True)
            for n in range(1024):
                f.write(kblob)
                if n % 128 == 0:
                    sys.stderr.write('.')
            f.close()
            sys.stderr.write(' ')

            self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)

            start = time.time()
            k2blob = kblob + kblob
            chunk = 793
            for i in xrange(10):
                f = sftp.open('%s/hongry.txt' % FOLDER, 'r')
                base_offset = (512 * 1024) + 17 * random.randint(1000, 2000)
                # make a bunch of offsets and put them in random order
                offsets = [base_offset + j * chunk for j in xrange(100)]
                readv_list = []
                for j in xrange(100):
                    o = offsets[random.randint(0, len(offsets) - 1)]
                    offsets.remove(o)
                    readv_list.append((o, chunk))
                ret = f.readv(readv_list)
                for i in xrange(len(readv_list)):
                    offset = readv_list[i][0]
                    n_offset = offset % 1024
                    self.assertEqual(ret.next(), k2blob[n_offset:n_offset + chunk])
                f.close()
            end = time.time()
            sys.stderr.write('%ds ' % round(end - start))
        finally:
            sftp.remove('%s/hongry.txt' % FOLDER)
开发者ID:j-craig,项目名称:paramiko,代码行数:38,代码来源:test_sftp_big.py


示例10: test_7_prefetch_readv

    def test_7_prefetch_readv(self):
        """
        verify that prefetch and readv don't conflict with each other.
        """
        sftp = get_sftp()
        kblob = ''.join([struct.pack('>H', n) for n in xrange(512)])
        try:
            f = sftp.open('%s/hongry.txt' % FOLDER, 'w')
            f.set_pipelined(True)
            for n in range(1024):
                f.write(kblob)
                if n % 128 == 0:
                    sys.stderr.write('.')
            f.close()
            sys.stderr.write(' ')
            
            self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)

            f = sftp.open('%s/hongry.txt' % FOLDER, 'r')
            f.prefetch()
            data = f.read(1024)
            self.assertEqual(data, kblob)
            
            chunk_size = 793
            base_offset = 512 * 1024
            k2blob = kblob + kblob
            chunks = [(base_offset + (chunk_size * i), chunk_size) for i in range(20)]
            for data in f.readv(chunks):
                offset = base_offset % 1024
                self.assertEqual(chunk_size, len(data))
                self.assertEqual(k2blob[offset:offset + chunk_size], data)
                base_offset += chunk_size

            f.close()
            sys.stderr.write(' ')
        finally:
            sftp.remove('%s/hongry.txt' % FOLDER)
开发者ID:j-craig,项目名称:paramiko,代码行数:37,代码来源:test_sftp_big.py


示例11: test_4_prefetch_seek

 def test_4_prefetch_seek(self):
     sftp = get_sftp()
     kblob = ''.join([struct.pack('>H', n) for n in xrange(512)])
     try:
         f = sftp.open('%s/hongry.txt' % FOLDER, 'w')
         f.set_pipelined(True)
         for n in range(1024):
             f.write(kblob)
             if n % 128 == 0:
                 sys.stderr.write('.')
         f.close()
         sys.stderr.write(' ')
         
         self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
         
         start = time.time()
         k2blob = kblob + kblob
         chunk = 793
         for i in xrange(10):
             f = sftp.open('%s/hongry.txt' % FOLDER, 'r')
             f.prefetch()
             base_offset = (512 * 1024) + 17 * random.randint(1000, 2000)
             offsets = [base_offset + j * chunk for j in xrange(100)]
             # randomly seek around and read them out
             for j in xrange(100):
                 offset = offsets[random.randint(0, len(offsets) - 1)]
                 offsets.remove(offset)
                 f.seek(offset)
                 data = f.read(chunk)
                 n_offset = offset % 1024
                 self.assertEqual(data, k2blob[n_offset:n_offset + chunk])
                 offset += chunk
             f.close()
         end = time.time()
         sys.stderr.write('%ds ' % round(end - start))
     finally:
         sftp.remove('%s/hongry.txt' % FOLDER)
开发者ID:j-craig,项目名称:paramiko,代码行数:37,代码来源:test_sftp_big.py


示例12: tearDown

 def tearDown(self):
     sftp = get_sftp()
     sftp.rmdir(FOLDER)
开发者ID:j-craig,项目名称:paramiko,代码行数:3,代码来源:test_sftp_big.py



注:本文中的test_sftp.get_sftp函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python test_shunt.register_called函数代码示例发布时间:2022-05-27
下一篇:
Python test_screen.run函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap