• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python timer.timer函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中timer.timer函数的典型用法代码示例。如果您正苦于以下问题:Python timer函数的具体用法?Python timer怎么用?Python timer使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了timer函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: fetch_people_page

def fetch_people_page(conn, username, page = 1):
    url = "/people/{}/answers".format(username)
    url_page = "{}?page={:d}".format(url, page)
    print("\n{}\t".format(url_page), end='')
    sys.stdout.flush()
    timer.timer()
    try:
        conn.request("GET", url_page)
    except socket.timeout as e:
        print('wow! timeout')
        raise e
    response = conn.getresponse()
    t = timer.timer()
    avg = int(get_average(t, 'user page'))
    code = response.status
    print("[{}]\t{} ms\tAvg: {} ms".format(code, t, avg))
    if code == 404:
        slog("user username fetch fail, code code")
        dbhelper.update_user_by_name(username, {'fetch': dbhelper.FETCH_FAIL})
        print( "没有这个用户", username)
        return None
    if code != 200:
        slog("user username fetch fail, code code")
        dbhelper.update_user_by_name(username, {'fetch': dbhelper.FETCH_FAIL})
        print( "奇奇怪怪的返回码", code)
        return None
    content = response.read()
    return content
开发者ID:linzelong,项目名称:zhihu-archive,代码行数:28,代码来源:zhihu.py


示例2: exec_readlines_test_filter_stderr

    def exec_readlines_test_filter_stderr(self):
        """Test execReadlines and filter_stderr."""

        # Test that stderr is normally included
        with tempfile.NamedTemporaryFile(mode="w+t") as testscript:
            testscript.write("""#!/bin/sh
echo "one"
echo "two" >&2
echo "three"
exit 0
""")
            testscript.flush()

            with timer(5):
                rl_iterator = iutil.execReadlines("/bin/sh", [testscript.name])
                self.assertEqual(next(rl_iterator), "one")
                self.assertEqual(next(rl_iterator), "two")
                self.assertEqual(next(rl_iterator), "three")
                self.assertRaises(StopIteration, rl_iterator.__next__)

        # Test that filter stderr removes the middle line
        with tempfile.NamedTemporaryFile(mode="w+t") as testscript:
            testscript.write("""#!/bin/sh
echo "one"
echo "two" >&2
echo "three"
exit 0
""")
            testscript.flush()

            with timer(5):
                rl_iterator = iutil.execReadlines("/bin/sh", [testscript.name], filter_stderr=True)
                self.assertEqual(next(rl_iterator), "one")
                self.assertEqual(next(rl_iterator), "three")
                self.assertRaises(StopIteration, rl_iterator.__next__)
开发者ID:jaymzh,项目名称:anaconda,代码行数:35,代码来源:iutil_test.py


示例3: start_program_reset_handlers_test

    def start_program_reset_handlers_test(self):
        """Test the reset_handlers parameter of startProgram."""

        with tempfile.NamedTemporaryFile() as testscript:
            testscript.write("""#!/bin/sh
# Just hang out and do nothing, forever
while true ; do sleep 1 ; done
""")
            testscript.flush()

            # Start a program with reset_handlers
            proc = iutil.startProgram(["/bin/sh", testscript.name])

            with timer(5):
                # Kill with SIGPIPE and check that the python's SIG_IGN was not inheritted
                # The process should die on the signal.
                proc.send_signal(signal.SIGPIPE)
                proc.communicate()
                self.assertEqual(proc.returncode, -(signal.SIGPIPE))

            # Start another copy without reset_handlers
            proc = iutil.startProgram(["/bin/sh", testscript.name], reset_handlers=False)

            with timer(5):
                # Kill with SIGPIPE, then SIGTERM, and make sure SIGTERM was the one
                # that worked.
                proc.send_signal(signal.SIGPIPE)
                proc.terminate()
                proc.communicate()
                self.assertEqual(proc.returncode, -(signal.SIGTERM))
开发者ID:cyclefusion,项目名称:anaconda,代码行数:30,代码来源:iutil_test.py


示例4: saveAnswer

def saveAnswer(conn, username, answer_link_list, dblock):
    regex = re.compile(r'^/question/(\d+)/answer/(\d+)')

    success_ratio = None
    avg = None
    for url in answer_link_list:
        matches = regex.search(url)
        if matches is None:
            raise Exception('url not good')
        qid = matches.group(1)
        aid = matches.group(2)
        slog("\t{}".format(url))
        sys.stdout.flush()
        timer.timer('saveAnswer')
        content = get_url(url)
        if content is None:
            continue
        success_ratio = get_average(0 if content is None else 1, 'success_ratio')
        t = timer.timer('saveAnswer')
        avg = int(get_average(t))
        slog("\t{} ms".format(t))
        if len(content) == 0:
            slog("content is empty\n")
            slog("url [code] empty")
            return False
        question, descript, content, vote = parse_answer_pure(content)
        slog("{}\t^{}\t{}".format(url, vote, question))

        with dblock:
            dbhelper.saveQuestion(qid, question, descript)
            dbhelper._saveAnswer(aid, qid, username, content, vote)
    if success_ratio is not None and avg is not None:
        success_ratio = int(success_ratio*100)
        print("\tAvg: {} ms\tsuccess_ratio: {}%".format(avg, success_ratio))
开发者ID:linzelong,项目名称:zhihu-archive,代码行数:34,代码来源:zhihu.py


示例5: run

def run(fn, name, imshow=False):
  print 
  print "---", name 
  for (prefix, wrapper) in [('parakeet-', jit), ('numba-', autojit)]:
    try:
      wrapped_fn = wrapper(fn)
      with timer(prefix + name + '-compile', True):
        wrapped_fn(image[:1, :1], k)
      with timer(prefix + name, False):
        result = wrapped_fn(image, k)
      if imshow:
        import pylab
        pylab.imshow(image)
        pylab.figure()
        pylab.imshow(result)
        pylab.figure()
        pylab.imshow(scipy_result)
        pylab.show()
      if not running_pypy:
        assert allclose(result, scipy_result)
    except KeyboardInterrupt:
      raise   
    except:
      print "%s failed" % (prefix+name)
      import sys 
      print sys.exc_info()[1]
开发者ID:cournape,项目名称:parakeet,代码行数:26,代码来源:morphology.py


示例6: exec_readlines_test_signals

    def exec_readlines_test_signals(self):
        """Test execReadlines and signal receipt."""

        # ignored signal
        old_HUP_handler = signal.signal(signal.SIGHUP, signal.SIG_IGN)
        try:
            with tempfile.NamedTemporaryFile(mode="wt") as testscript:
                testscript.write(
                    """#!/bin/sh
echo "one"
kill -HUP $PPID
echo "two"
echo -n "three"
exit 0
"""
                )
                testscript.flush()

                with timer(5):
                    rl_iterator = iutil.execReadlines("/bin/sh", [testscript.name])
                    self.assertEqual(next(rl_iterator), "one")
                    self.assertEqual(next(rl_iterator), "two")
                    self.assertEqual(next(rl_iterator), "three")
                    self.assertRaises(StopIteration, rl_iterator.__next__)
        finally:
            signal.signal(signal.SIGHUP, old_HUP_handler)

        # caught signal
        def _hup_handler(signum, frame):
            pass

        old_HUP_handler = signal.signal(signal.SIGHUP, _hup_handler)
        try:
            with tempfile.NamedTemporaryFile(mode="wt") as testscript:
                testscript.write(
                    """#!/bin/sh
echo "one"
kill -HUP $PPID
echo "two"
echo -n "three"
exit 0
"""
                )
                testscript.flush()

                with timer(5):
                    rl_iterator = iutil.execReadlines("/bin/sh", [testscript.name])
                    self.assertEqual(next(rl_iterator), "one")
                    self.assertEqual(next(rl_iterator), "two")
                    self.assertEqual(next(rl_iterator), "three")
                    self.assertRaises(StopIteration, rl_iterator.__next__)
        finally:
            signal.signal(signal.SIGHUP, old_HUP_handler)
开发者ID:nandakishore1006,项目名称:anaconda,代码行数:53,代码来源:iutil_test.py


示例7: start_qt_app

def start_qt_app(config):
    import sys
    from PyQt5.QtWidgets import QApplication, QSystemTrayIcon, QMessageBox
    from quamash import QEventLoop
    from ui.main import Window
    from ui.qt_gui_connection import qSignal
    app = QApplication(sys.argv)
    
    loop = QEventLoop(app)
    asyncio.set_event_loop(loop)
    
    if not QSystemTrayIcon.isSystemTrayAvailable():
        QMessageBox.critical(None, "Systray",
                             "I couldn't detect any system tray on this system.")
        sys.exit(1)

    QApplication.setQuitOnLastWindowClosed(False)
    
    gui_connection = qSignal()
    window = Window(gui_connection)
    
    def closeApp():
        print("Close app signal")
        for task in asyncio.Task.all_tasks():
            print(task)
            task.cancel()
        loop.stop()
    gui_connection.closeApp.connect(closeApp)
    
    with loop:
        #asyncio.run_coroutine_threadsafe(timer(loop, config, gui_connection), loop)
        try:
            loop.run_until_complete(timer(loop, config, gui_connection))
        except asyncio.CancelledError:
            pass
开发者ID:Aksem,项目名称:KoffeeBreak,代码行数:35,代码来源:main.py


示例8: walk

	def walk (self, x, y):
		
		assert(x == 1 or x == -1 or x == 0)
		assert(y == 1 or y == -1 or y == 0)
		
		if x == 0 and y == 0:
			return
		
		elif self.timer is None or self.timer.is_complete():
		
			self.x += x
			self.y += y
			
			self.__x = x
			self.__y = y
			
			self.__offset_x = common.TILE_SIZE * x
			self.__offset_y = common.TILE_SIZE * y
			
			if x < 0:
				self.ori = 'left'
				anim = self.left
			elif x > 0:
				self.ori = 'right'
				anim = self.right
			elif y > 0:
				self.ori = 'down'
				anim = self.down
			else:
				self.ori = 'up'
				anim = self.up
			
			anim.play(WALK_MOVEMENT_TIME)
			self.timer = timer(WALK_MOVEMENT_TIME)
开发者ID:marlosdm,项目名称:Maze-Of-Kindred,代码行数:34,代码来源:player.py


示例9: exec_readlines_auto_kill_test

    def exec_readlines_auto_kill_test(self):
        """Test execReadlines with reading only part of the output"""

        with tempfile.NamedTemporaryFile() as testscript:
            testscript.write("""#!/bin/sh
# Output forever
while true; do
echo hey
done
""")
            testscript.flush()

            with timer(5):
                rl_iterator = iutil.execReadlines("/bin/sh", [testscript.name])

                # Save the process context
                proc = rl_iterator._proc

                # Read two lines worth
                self.assertEqual(rl_iterator.next(), "hey")
                self.assertEqual(rl_iterator.next(), "hey")

                # Delete the iterator and wait for the process to be killed
                del rl_iterator
                proc.communicate()

            # Check that the process is gone
            self.assertIsNotNone(proc.poll())
开发者ID:cyclefusion,项目名称:anaconda,代码行数:28,代码来源:iutil_test.py


示例10: _stepFade

    def _stepFade(self, final=False):

        #Figure out what step we're on
        secondsSoFar = (datetime.now() - self.fadeStartTime).total_seconds()
        prog = min(secondsSoFar / self.fadeSeconds, 1)

        self.currentStep = int(round(prog * self.numSteps))
        #print ("%3.2f%%, step %d/%d"%(prog, self.currentStep, self.numSteps))

        #set colors to correct output
        self.r = self.last_r + int(prog * self.delta_r)
        self.g = self.last_g + int(prog * self.delta_g)
        self.b = self.last_b + int(prog * self.delta_b)
        self.apply()

        # Are we done yet?
        if self.currentStep < self.numSteps:
            # if not, set up the next timeout
            self.fadeTimer = timer(
                    timeout = datetime.now() + timedelta(seconds=self.secondsPerStep), 
                    callback = self._stepFade, 
                    res = self.secondsPerStep)
            self.fadeTimer.start()

        elif not final:
            #This is the second-to-last step!

            # make sure we get to do the last step (because we
            # don't want rounding or CPU delays to make us stop at 96.7%..
            self._stepFade(final=True)

        else:
            # call the callback if there is one, then reset it so
            # it doens't get called again if it's not set in the future
            self.fadeCallback and self.fadeCallback()
开发者ID:fastfieros,项目名称:sunrise,代码行数:35,代码来源:sunrise.py


示例11: test_still_running

 def test_still_running():
     with timer(5):
         # Run something forever so we can kill it
         proc = iutil.startProgram(["/bin/sh", "-c", "while true; do sleep 1; done"])
         iutil.watchProcess(proc, "test1")
         proc.kill()
         # Wait for the SIGCHLD
         signal.pause()
开发者ID:cyclefusion,项目名称:anaconda,代码行数:8,代码来源:iutil_test.py


示例12: pvalue

def pvalue(log_pmf, s0, L, desired_beta):
    """Compute $log((exp(log_pmf)**L)[s0:])$, such that the relative error
       to the exact answer is less than or equal to $desired_beta$."""
    total_len, _ = utils.iterated_convolution_lengths(len(log_pmf), L)
    if s0 >= total_len:
        return NEG_INF

    _, p_lower_preshift, p_upper_preshift = _bounds(log_pmf, log_pmf, 0, 0.0,
                                                    s0, L, desired_beta)
    sfft_good_preshift, sfft_pval_preshift = _check_sfft_pvalue(p_lower_preshift,
                                                                p_upper_preshift,
                                                                desired_beta)
    if sfft_good_preshift:
        logging.debug(' pre-shift sfft worked %.20f', sfft_pval_preshift)
        return sfft_pval_preshift

    with timer('computing theta'):
        theta = _compute_theta(log_pmf, s0, L)
    logging.debug('raw theta %s', theta)

    # TODO: too-large or negative theta causes numerical instability,
    # so this is a huge hack
    theta = utils.clamp(theta, 0, THETA_LIMIT)
    shifted_pmf, log_mgf = utils.shift(log_pmf, theta)

    beta = desired_beta / 2.0
    with timer('bounds'):
        log_delta, p_lower, p_upper = _bounds(log_pmf, shifted_pmf, theta, log_mgf,
                                              s0, L, desired_beta)

    sfft_good, sfft_pval = _check_sfft_pvalue(p_lower, p_upper, desired_beta)

    logging.debug('theta %s, log_mgf %s, beta %s, log delta %s', theta, log_mgf, beta, log_delta)
    if sfft_good:
        logging.debug(' sfft worked %.20f', sfft_pval)
        return sfft_pval
    delta = np.exp(log_delta)

    conv = conv_power(shifted_pmf, L, beta, delta)

    pval = utils.log_sum(utils.unshift(conv, theta, (log_mgf, L))[s0:])
    logging.debug(' sis pvalue %.20f', pval)
    return pval
开发者ID:huonw,项目名称:sisfft-py,代码行数:43,代码来源:sisfft.py


示例13: main

def main(input, multi, compression):
    if os.path.isfile(input) or os.path.isdir(input):

        start = timer()

        if os.path.isdir(input):
            list = os.listdir(input)
            files = []

            for l in list:
                if os.path.splitext(l)[1] == ".psd":
                    l = os.path.join(input, l)
                    if os.path.isfile(l):
                        files.append(l)
        else:
            files = [input]

        print("Found {} files to convert.".format(len(files)))

        for f in files:

            layers = extract_layers(f)

            i = 0
            for layer in layers.split("\n"):
                i += 1
                layer = layer.strip()
                if layer == "":
                    print("Skipping empty layer name. Likely flattened compatibility layer.")
                else:
                    print("layer {}: {}".format(i, layer))
                    tmpfile = export_layer(i, layer, f, compression)
                    exr_compression(tmpfile, compression)
                    cleanup(tmpfile)

            if multi:
                exr_multipart(layers.split("\n"), f)


            else:
                print("Not a PSD document. Skipping.")

        timer(start, "PSD To EXR Conversion")
开发者ID:AlexKucera,项目名称:tinkertoys,代码行数:43,代码来源:convert_psd_to_exr.py


示例14: consumer

def consumer(index, filename, start, end, u):
    parser = apparser.apparser()
    t = timer.timer()
    #    print('started job %d' % index)
    i = 0
    t2 = timer.timer()
    #    map(parser.proto, iterate_log(filename, start, end))
    #    t.stop('job %d' % index, i)
    #    return (index, i)

    for l in iterate_log(filename, start, end):
        #        if (i % 100001) == 0 and not i == 0:
        #            t2.stop('[%d] parsed %10d lines' % (index, i), i)
        i += 1
        #        r = apparser._parse2(l)
        r = parser.proto(l)
    #        break
    #        print(parser)
    #        u.get(parser.vhost, index)
    t.stop("job %d" % index, i)
    return (index, i)
开发者ID:T0aD,项目名称:pyawstats,代码行数:21,代码来源:para_test.py


示例15: exec_readlines_test_normal_output

    def exec_readlines_test_normal_output(self):
        """Test the output of execReadlines."""

        # Test regular-looking output
        with tempfile.NamedTemporaryFile(mode="w+t") as testscript:
            testscript.write(
                """#!/bin/sh
echo "one"
echo "two"
echo "three"
exit 0
"""
            )
            testscript.flush()

            with timer(5):
                rl_iterator = iutil.execReadlines("/bin/sh", [testscript.name])
                self.assertEqual(next(rl_iterator), "one")
                self.assertEqual(next(rl_iterator), "two")
                self.assertEqual(next(rl_iterator), "three")
                self.assertRaises(StopIteration, rl_iterator.__next__)

        # Test output with no end of line
        with tempfile.NamedTemporaryFile(mode="w+t") as testscript:
            testscript.write(
                """#!/bin/sh
echo "one"
echo "two"
echo -n "three"
exit 0
"""
            )
            testscript.flush()

            with timer(5):
                rl_iterator = iutil.execReadlines("/bin/sh", [testscript.name])
                self.assertEqual(next(rl_iterator), "one")
                self.assertEqual(next(rl_iterator), "two")
                self.assertEqual(next(rl_iterator), "three")
                self.assertRaises(StopIteration, rl_iterator.__next__)
开发者ID:nandakishore1006,项目名称:anaconda,代码行数:40,代码来源:iutil_test.py


示例16: main

def main():
    # Get forecasts and save to DB
    def get_forecasts():
        data = wfs.wfs().get_all_sources()
        dbase = db.db()
        dbase.insert_wfs_data(data)
        dbase.close_connection()

    # Read data from sensors and save to DB
    def read_sensors():
        data = sensors.sensors().read_all()
        dbase = db.db()
        dbase.insert_sensor_data(data)
        dbase.close_connection()

    # Screensaver
    scr = Thread(target = pir.screensaver)
    scr.daemon = True
    scr.start()

    ##test
    ##get_forecasts()
    ##read_sensors()


    # get data from forecasts each hour
    repeat_get_forecasts = timer.timer(3600, get_forecasts)
    repeat_get_forecasts.start()

    # get data from sensors each minute
    repreat_read_sensors = timer.timer(60, read_sensors)
    repreat_read_sensors.start()

    # Send sensor data to openweathermap.com
    #response = wfs.wfs().send_to_owm()
    #print (response)

    # Interface
    root = gui.Interface()
    root.mainloop()
开发者ID:spalk,项目名称:dimeteo,代码行数:40,代码来源:dimeteo.py


示例17: __init__

	def __init__(self,website,query,max_results = 500):
		self.base_url = google_search_url(website,query)
		self.url = self.base_url
		self.max_results = max_results
		self.max_pages = math.ceil(max_results/100.)
		self.page_number = 1
		self.results_list = []
		self.timer = timer.timer(mean = 3.13,sd = 0.62, floor = 15.12, ceiling = 46.1)
		self.website = website
		self.query = query
		self.driver = setup_google_driver()
		self.open_first_url()
		time.sleep(1)
		self.terminated = False
开发者ID:danjump,项目名称:recipe_optimizer,代码行数:14,代码来源:google_search.py


示例18: start_program_stdout_test

    def start_program_stdout_test(self):
        """Test redirecting stdout with startProgram."""

        marker_text = "yo wassup man"
        # Create a temporary file that will be written by the program
        with tempfile.NamedTemporaryFile() as testfile:
            # Open a new copy of the file so that the child doesn't close and
            # delete the NamedTemporaryFile
            stdout = open(testfile.name, 'w')
            with timer(5):
                proc = iutil.startProgram(["/bin/echo", marker_text], stdout=stdout)
                proc.communicate()

            # Rewind testfile and look for the text
            testfile.seek(0, os.SEEK_SET)
            self.assertEqual(testfile.read().strip(), marker_text)
开发者ID:cyclefusion,项目名称:anaconda,代码行数:16,代码来源:iutil_test.py


示例19: __init__

    def __init__(self, cf):
        super(TaskBarIcon, self).__init__()
        self.__config__ = cf
        self.mute = False
        self.SetIcon(res.icon.GetIcon(), self.__config__.result['name']) # Set icon

        try:
            # Choose an startup message
            msg = random.choice(self.__config__.findMessageByCondition({'startup': True}))
        except IndexError:
            msg = []
        if msg != []:
            self.show(msg.chooseMessage())

        # Set a timer
        self.__timer__ = timer.timer(self.__config__, self)
        self.__timer__.start()
开发者ID:LYZhelloworld,项目名称:Meidochan,代码行数:17,代码来源:taskicon.py


示例20: watch_process_test

    def watch_process_test(self):
        """Test watchProcess"""

        def test_still_running():
            with timer(5):
                # Run something forever so we can kill it
                proc = iutil.startProgram(["/bin/sh", "-c", "while true; do sleep 1; done"])
                iutil.watchProcess(proc, "test1")
                proc.kill()
                # Wait for the SIGCHLD
                signal.pause()
        self.assertRaises(iutil.ExitError, test_still_running)

        # Make sure watchProcess checks that the process has not already exited
        with timer(5):
            proc = iutil.startProgram(["true"])
            proc.communicate()
        self.assertRaises(iutil.ExitError, iutil.watchProcess, proc, "test2")
开发者ID:cyclefusion,项目名称:anaconda,代码行数:18,代码来源:iutil_test.py



注:本文中的timer.timer函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python timer.Timer类代码示例发布时间:2022-05-27
下一篇:
Python timer.time_function函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap