• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python monotonic.monotonic函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中monotonic.monotonic函数的典型用法代码示例。如果您正苦于以下问题:Python monotonic函数的具体用法?Python monotonic怎么用?Python monotonic使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了monotonic函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: next

    def next(self):
        """Return the next batch of items to upload."""
        queue = self.queue
        items = []

        start_time = monotonic.monotonic()
        total_size = 0

        while len(items) < self.upload_size:
            elapsed = monotonic.monotonic() - start_time
            if elapsed >= self.upload_interval:
                break
            try:
                item = queue.get(block=True, timeout=self.upload_interval - elapsed)
                item_size = len(json.dumps(item, cls=DatetimeSerializer).encode())
                if item_size > MAX_MSG_SIZE:
                    self.log.error('Item exceeds 32kb limit, dropping. (%s)', str(item))
                    continue
                items.append(item)
                total_size += item_size
                if total_size >= BATCH_SIZE_LIMIT:
                    self.log.debug('hit batch size limit (size: %d)', total_size)
                    break
            except Empty:
                break

        return items
开发者ID:segmentio,项目名称:analytics-python,代码行数:27,代码来源:consumer.py


示例2: run_server

def run_server(command, response, port, report):
    """Run the frame drawing service.

    The control protocol for the server's command queue is as follows:
    (command, payload)
    Examples are
    (FRAME, update_number, frame_time, mixer) -> data payload to draw a frame
    (QUIT, _) -> quit the server thread

    The server communicates with the control thread over the response queue.
    It requests a frame with
    (FRAME_REQ, _)
    and reports a fatal, thread-death error with
    (FATAL_ERROR, err)
    """
    try:
        socket = create_pub_socket(port)

        # we're ready to render
        response.put((RUNNING, None))

        log_time = monotonic()

        while 1:
            # ready to draw a frame
            response.put((FRAME_REQ, None))

            # wait for a reply
            action, payload = command.get()

            # check if time to quit
            if action == QUIT:
                return
            # no other valid commands besides FRAME
            elif action != FRAME:
                # blow up with fatal error
                # we could try again, but who knows how we even got here
                raise RenderServerError("Unrecognized command: {}".format(action))

            frame_number, frame_time, mixer, clocks = payload

            # render the payload we received
            video_outs = mixer.draw_layers(clocks)

            for video_chan, draw_commands in enumerate(video_outs):
                serialized = msgpack.dumps(
                    (frame_number, frame_time, draw_commands),
                    use_single_float=True)
                socket.send_multipart((str(video_chan), serialized))

            if report:# and frame_number % 1 == 0:
                now = monotonic()
                log.debug("Framerate: {}".format(1 / (now - log_time)))
                log_time = now

    except Exception as err:
        # some exception we didn't catch
        _, _, tb = sys.exc_info()
        response.put((FATAL_ERROR, (err, traceback.format_tb(tb))))
        return
开发者ID:generalelectrix,项目名称:pytunnel,代码行数:60,代码来源:render_server.py


示例3: handle

    def handle(self, comment, *args, **options):
        items = {}
        created_count = 0
        from django.conf import settings
        translation.activate(settings.LANGUAGE_CODE)
        for key, import_path in STAT_METRICS.items():
            f = import_string(import_path)
            name = getattr(f, 'name', key)
            description = getattr(f, 'description', import_path)
            item, created = Item.objects.get_or_create(key=key, defaults={'name': name, 'description': description})
            if created:
                created_count += 1
            items[key] = item
        self.stdout.write("Registered {} new items.".format(created_count))

        values = []
        start = monotonic()
        time = now()
        for key, import_path in STAT_METRICS.items():
            f = import_string(import_path)
            values.append(Value(item=items[key], time=time, value=f()))
        end = int(monotonic() - start)
        desc = _("Time (seconds) in which metric statistical information was collected.")
        system_item, created = Item.objects.get_or_create(key='stats.collect_time',
                                                          defaults={'name': _("Time to calculate statistics"),
                                                                    'description': desc})
        values.append(Value(item=system_item, time=time, value=end))

        with transaction.atomic():
            Value.objects.bulk_create(values)
            Item.objects.filter(pk__in=[value.item.pk for value in values]).update(last_updated=now())
        self.stdout.write("Registered {} values.".format(len(values)))
        translation.deactivate()
开发者ID:rwakulszowa,项目名称:poradnia,代码行数:33,代码来源:update_stats.py


示例4: update_latency

    def update_latency(self, latency):
        self.smoothed_latency = (7 * self.smoothed_latency + latency) / 8
        self.smoothed_variability = (7 * self.smoothed_variability + abs(latency - self.smoothed_latency)) / 8
        self.max_latency = self.smoothed_latency + (self.NUM_DEV * self.smoothed_variability)
        self.adjust_count -= 1
        seconds_since_last_update = monotonic() - self.last_adjustment_time

        if (self.adjust_count <= 0) and (seconds_since_last_update >= self.SECONDS_BEFORE_ADJUSTMENT):
            # This algorithm is based on the Welsh and Culler "Adaptive Overload
            # Control for Busy Internet Servers" paper, although based on a smoothed
            # mean latency, rather than the 90th percentile as per the paper.
            # Also, the additive increase is scaled as a proportion of the maximum
            # bucket size, rather than an absolute number as per the paper.
            accepted_percent = 100
            if (self.accepted + self.rejected) != 0:
                accepted_percent = 100 * (float(self.accepted) / float(self.accepted + self.rejected))

            err = (self.smoothed_latency - self.target_latency) / self.target_latency
            hss_overloads = penaltycounter.get_hss_penalty_count()
            if ((err > self.DECREASE_THRESHOLD) or (hss_overloads > 0)):
                # latency is above where we want it to be, or we are getting overload responses from the HSS,
                # so adjust the rate downwards by a multiplicative factor
                new_rate = self.bucket.rate / self.DECREASE_FACTOR
                if new_rate < self.min_token_rate:
                    new_rate = self.min_token_rate
                _log.info("Accepted %.2f%% of requests, latency error = %f, HSS overloads = %d, decrease rate %f to %f" %
                                 (accepted_percent, err, hss_overloads, self.bucket.rate, new_rate))
                self.bucket.update_rate(new_rate)
            elif err < self.INCREASE_THRESHOLD:
                # latency is sufficiently below the target, so increasing the permitted
                # request rate would be sensible; but first check that we are using a
                # significant proportion of the current rate - if we're allowing 100
                # requests/sec, and we get 1 request/sec during a quiet period, we will
                # handle that well, but it is not sufficient evidence that we can increase the rate.
                max_permitted_requests = self.bucket.rate * seconds_since_last_update

                # Arbitrary threshold of at least 50% of maximum permitted requests
                minimum_threshold = max_permitted_requests * 0.5

                if (self.accepted > minimum_threshold):
                    new_rate = self.bucket.rate + (-err) * self.bucket.max_size * self.INCREASE_FACTOR
                    _log.info("Accepted %.2f%% of requests, latency error = %f, increase rate %f to %f"
                              " based on %d accepted requests in last %.2f seconds" %
                              (accepted_percent, err, self.bucket.rate, new_rate,
                               self.accepted, seconds_since_last_update))
                    self.bucket.update_rate(new_rate)
                else:
                    _log.info("Only handled %d requests in the last %.2f seconds, rate remains unchanged."
                              " Minimum threshold for change is %f" %
                              (self.accepted, seconds_since_last_update, minimum_threshold))
            else:
                _log.info("Accepted %f%% of requests, latency error = %f, rate %f unchanged" %
                                (accepted_percent, err, self.bucket.rate))

            self.accepted = 0
            self.rejected = 0
            self.adjust_count = self.REQUESTS_BEFORE_ADJUSTMENT
            self.last_adjustment_time = monotonic()

        penaltycounter.reset_hss_penalty_count()
开发者ID:ClearwaterCore,项目名称:crest,代码行数:60,代码来源:base.py


示例5: send_sms

    def send_sms(self, to, content, reference, sender=None):

        data = {
            "apiKey": self.api_key,
            "from": self.from_number if sender is None else sender,
            "to": to.replace('+', ''),
            "message": content,
            "reference": reference
        }

        start_time = monotonic()
        try:
            response = request(
                "POST",
                self.url,
                data=data
            )
            response.raise_for_status()
            try:
                json.loads(response.text)
                if response.json()['code'] != 0:
                    raise ValueError()
            except (ValueError, AttributeError) as e:
                self.record_outcome(False, response)
                raise FiretextClientResponseException(response=response, exception=e)
            self.record_outcome(True, response)
        except RequestException as e:
            self.record_outcome(False, e.response)
            raise FiretextClientResponseException(response=e.response, exception=e)
        finally:
            elapsed_time = monotonic() - start_time
            self.current_app.logger.info("Firetext request finished in {}".format(elapsed_time))
            self.statsd_client.timing("clients.firetext.request-time", elapsed_time)
        return response
开发者ID:alphagov,项目名称:notifications-api,代码行数:34,代码来源:firetext.py


示例6: _sleep

    def _sleep(self, sec, critical_section=False):
        until = monotonic() + sec

        while monotonic() < until:
            if not critical_section:
                self._interrupt_point()
            sleep(1)
开发者ID:Sanji-IO,项目名称:sanji-bundle-cellular,代码行数:7,代码来源:management.py


示例7: poll

    def poll(self, timeout=None):
        """Polls for events while handling timers.

        poll() will wait up to timeout seconds for sockets or files
        registered with self.reactor to become ready.  A timeout of None
        will cause poll to wait an infinite amount of time.  While
        waiting for poll events, scheduled events will be handled,
        potentially causing the wait time to slip a bit.
        """
        elapsed = 0.0
        mono_time = clock.monotonic()
        while True:
            wall_time = time_mod.time()
            self._mono.execute(mono_time)
            self._wall.execute(wall_time)
            delays = [
                self.LOOP_INTERVAL if timeout is None else min(timeout - elapsed, self.LOOP_INTERVAL),
                self._mono.delay(mono_time),
                self._wall.delay(wall_time),
            ]
            delay = min(d for d in delays if d is not None)
            events = self.reactor.poll(delay)
            if events:
                return events
            last_time, mono_time = mono_time, clock.monotonic()
            elapsed += mono_time - last_time
            if timeout is not None and elapsed >= timeout:
                return []
开发者ID:techieshark,项目名称:volttron,代码行数:28,代码来源:base.py


示例8: test_sensor_watch_queue_gets_deleted_on_stop

    def test_sensor_watch_queue_gets_deleted_on_stop(self):

        def create_handler(sensor_db):
            pass

        def update_handler(sensor_db):
            pass

        def delete_handler(sensor_db):
            pass

        sensor_watcher = SensorWatcher(create_handler, update_handler, delete_handler,
                                       queue_suffix='covfefe')
        sensor_watcher.start()
        sw_queues = self._get_sensor_watcher_amqp_queues(queue_name='st2.sensor.watch.covfefe')

        start = monotonic()
        done = False
        while not done:
            eventlet.sleep(0.01)
            sw_queues = self._get_sensor_watcher_amqp_queues(queue_name='st2.sensor.watch.covfefe')
            done = len(sw_queues) > 0 or ((monotonic() - start) < 5)

        sensor_watcher.stop()
        sw_queues = self._get_sensor_watcher_amqp_queues(queue_name='st2.sensor.watch.covfefe')
        self.assertTrue(len(sw_queues) == 0)
开发者ID:nzlosh,项目名称:st2,代码行数:26,代码来源:test_sensor_watcher.py


示例9: token

    def token(self):
        access_token, expire_time = getattr(self, "bing_cached_access_token", None), \
                                    getattr(self, "bing_cached_access_token_expiry", None)

        if expire_time is None or monotonic() > expire_time:  # first credential request, or the access token from the previous one expired
            # get an access token using OAuth
            credential_url = "https://oxford-speech.cloudapp.net/token/issueToken"
            credential_request = Request(credential_url, data=urlencode({
                "grant_type": "client_credentials",
                "client_id": "python",
                "client_secret": self.key,
                "scope": "https://speech.platform.bing.com"
            }).encode("utf-8"))
            start_time = monotonic()
            try:
                credential_response = urlopen(credential_request)
            except HTTPError as e:
                raise RequestError("recognition request failed: {0}".format(
                    getattr(e, "reason", "status {0}".format(e.code))))  # use getattr to be compatible with Python 2.6
            except URLError as e:
                raise RequestError("recognition connection failed: {0}".format(e.reason))
            credential_text = credential_response.read().decode("utf-8")
            credentials = json.loads(credential_text)
            access_token, expiry_seconds = credentials["access_token"], float(credentials["expires_in"])

            # save the token for the duration it is valid for
            self.bing_cached_access_token = access_token
            self.bing_cached_access_token_expiry = start_time + expiry_seconds

        return access_token
开发者ID:KillingJacky,项目名称:respeaker_hi,代码行数:30,代码来源:bing_base.py


示例10: send_email

    def send_email(self,
                   source,
                   to_addresses,
                   subject,
                   body,
                   html_body='',
                   reply_to_address=None):
        try:
            if isinstance(to_addresses, str):
                to_addresses = [to_addresses]

            reply_to_addresses = [reply_to_address] if reply_to_address else []

            body = {
                'Text': {'Data': body}
            }

            if html_body:
                body.update({
                    'Html': {'Data': html_body}
                })

            start_time = monotonic()
            response = self._client.send_email(
                Source=source,
                Destination={
                    'ToAddresses': to_addresses,
                    'CcAddresses': [],
                    'BccAddresses': []
                },
                Message={
                    'Subject': {
                        'Data': subject,
                    },
                    'Body': body
                },
                ReplyToAddresses=reply_to_addresses
            )
        except botocore.exceptions.ClientError as e:
            self.statsd_client.incr("clients.ses.error")

            # http://docs.aws.amazon.com/ses/latest/DeveloperGuide/api-error-codes.html
            if e.response['Error']['Code'] == 'InvalidParameterValue':
                raise InvalidEmailError('email: "{}" message: "{}"'.format(
                    to_addresses[0],
                    e.response['Error']['Message']
                ))
            else:
                self.statsd_client.incr("clients.ses.error")
                raise AwsSesClientException(str(e))
        except Exception as e:
            self.statsd_client.incr("clients.ses.error")
            raise AwsSesClientException(str(e))
        else:
            elapsed_time = monotonic() - start_time
            current_app.logger.info("AWS SES request finished in {}".format(elapsed_time))
            self.statsd_client.timing("clients.ses.request-time", elapsed_time)
            self.statsd_client.incr("clients.ses.success")
            return response['MessageId']
开发者ID:alphagov,项目名称:notifications-api,代码行数:59,代码来源:aws_ses.py


示例11: _request

    def _request(self, method, url, data=None, params=None):
        if not self.enabled:
            return None

        url = url.lstrip('/')
        url = urlparse.urljoin(self.base_url, url)

        logger.debug("API request {method} {url}",
                     extra={
                         'method': method,
                         'url': url
                     })
        headers = {
            "Content-type": "application/json",
            "Authorization": "Bearer {}".format(self.auth_token),
            "User-agent": "DM-API-Client/{}".format(__version__),
        }
        headers = self._add_request_id_header(headers)
        headers = self._add_zipkin_tracing_headers(headers)

        start_time = monotonic()
        try:
            response = requests.request(
                method, url,
                headers=headers, json=data, params=params)
            response.raise_for_status()
        except requests.RequestException as e:
            api_error = HTTPError.create(e)
            elapsed_time = monotonic() - start_time
            logger.log(
                logging.INFO if api_error.status_code == 404 else logging.WARNING,
                "API {api_method} request on {api_url} failed with {api_status} '{api_error}'",
                extra={
                    'api_method': method,
                    'api_url': url,
                    'api_status': api_error.status_code,
                    'api_error': api_error.message,
                    'api_time': elapsed_time
                })
            raise api_error
        else:
            elapsed_time = monotonic() - start_time
            logger.info(
                "API {api_method} request on {api_url} finished in {api_time}",
                extra={
                    'api_method': method,
                    'api_url': url,
                    'api_status': response.status_code,
                    'api_time': elapsed_time
                })
        try:
            return response.json()
        except ValueError as e:
            raise InvalidResponse(response,
                                  message="No JSON object could be decoded")
开发者ID:AusDTO,项目名称:dto-digitalmarketplace-apiclient,代码行数:55,代码来源:base.py


示例12: request

    def request(self, method, url, data=None, params=None):

        logger.debug("API request {} {}".format(method, url))

        payload = json.dumps(data)

        api_token = create_jwt_token(
            self.api_key,
            self.service_id
        )

        headers = {
            "Content-type": "application/json",
            "Authorization": "Bearer {}".format(api_token),
            "User-agent": "NOTIFY-API-PYTHON-CLIENT/{}".format(__version__),
        }

        url = urlparse.urljoin(self.base_url, url)

        start_time = monotonic()
        try:
            response = requests.request(
                method,
                url,
                headers=headers,
                data=payload,
                params=params
            )
            response.raise_for_status()
        except requests.RequestException as e:
            api_error = HTTPError.create(e)
            logger.error(
                "API {} request on {} failed with {} '{}'".format(
                    method,
                    url,
                    api_error.status_code,
                    api_error.message
                )
            )
            raise api_error
        finally:
            elapsed_time = monotonic() - start_time
            logger.debug("API {} request on {} finished in {}".format(method, url, elapsed_time))

        try:
            if response.status_code == 204:
                return
            return response.json()
        except ValueError:
            raise InvalidResponse(
                response,
                message="No JSON response object could be decoded"
            )
开发者ID:alphagov,项目名称:notifications-python-client,代码行数:53,代码来源:base.py


示例13: listen_and_reply

def listen_and_reply(sock, compute_func):
    while True:
        req = protocol.from_encoded_message(sock.recv())
        start = monotonic()
        # TODO: try/catch
        ret = compute_func(req)
        end = monotonic()
        t = end - start
        diff = []
        diff.append(math.floor(t))
        diff.append((t - diff[0]) * math.pow(10, 9))
        sock.send(protocol.to_encoded_message(ret, diff))
开发者ID:redsift,项目名称:sandbox-python,代码行数:12,代码来源:run.py


示例14: ping

    def ping(self):
        """Ping the server, returning the round-trip latency in milliseconds

        The A2A_PING request is deprecated so this actually sends a A2S_INFO
        request and times that. The time difference between the two should
        be negligble.
        """

        time_sent = monotonic.monotonic()
        self.request(messages.InfoRequest())
        messages.InfoResponse.decode(self.get_response())
        time_received = monotonic.monotonic()
        return (time_received - time_sent) * 1000.0
开发者ID:Holiverh,项目名称:python-valve,代码行数:13,代码来源:a2s.py


示例15: _try_connect

    def _try_connect(self, pdpc_apn, pdpc_type, retry_timeout):
        retry = monotonic() + retry_timeout
        while True:
            self._interrupt_point()

            self._status = Manager.Status.connecting
            if not self._connect(pdpc_apn, pdpc_type):
                self._status = Manager.Status.connect_failure

                if monotonic() >= retry:
                    break

                self._sleep(10)
            else:
                return True
开发者ID:Sanji-IO,项目名称:sanji-bundle-cellular,代码行数:15,代码来源:management.py


示例16: run

    def run(self):
        try:
            monotime = 0.0

            self.SBrickPeripheral = Peripheral()
            self.SBrickPeripheral.connect(self.sBrickAddr)
            service = self.SBrickPeripheral.getServiceByUUID('4dc591b0-857c-41de-b5f1-15abda665b0c')
            characteristics = service.getCharacteristics('02b8cbcc-0e25-4bda-8790-a15f53e6010f')
            for characteristic in characteristics:
                if characteristic.uuid == '02b8cbcc-0e25-4bda-8790-a15f53e6010f':
                    self.characteristicRemote = characteristic

            if self.characteristicRemote is None:
                return

            self.emit('sbrick_connected')

            self.need_authentication = self.get_need_authentication()
            self.authenticated = not self.need_authentication
            if self.need_authentication:
                if self.password_owner is not None:
                    self.authenticate_owner(self.password_owner)

            while not self.stopFlag:
                if self.authenticated:
                    if monotonic.monotonic() - monotime >= 0.05:
                        self.send_command()
                        monotime = monotonic.monotonic()
                    self.eventSend.wait(0.01)
                    for channel in self.brickChannels:
                        if channel.decrement_run_timer():
                            monotime = 0.0
                            self.drivingLock.release()
                            # print("stop run normal")
                            self.emit("sbrick_channel_stop", channel.channel)
                        if channel.decrement_brake_timer():
                            self.drivingLock.release()
                            # print("stop brake timer")
                            monotime = 0.0
                            self.emit("sbrick_channel_stop", channel.channel)

            if self.authenticated:
                self.stop_all()
                self.send_command()
            self.SBrickPeripheral.disconnect()
            self.emit('sbrick_disconnected_ok')
        except BTLEException as ex:
            self.emit("sbrick_disconnected_error", ex.message)
开发者ID:wintersandroid,项目名称:sbrick-controller,代码行数:48,代码来源:SBrickCommunications.py


示例17: _timer

    def _timer(timeout):
        """Iterable timeout timer.

        :param timeout: the number of seconds to wait before timing out.
            If ``None`` then the timer will never timeout.

        :raises RCONTimeoutError: once the timeout is reached.

        :returns: an iterable that will yield items until the timeout
            is reached.
        """
        time_start = monotonic.monotonic()
        while (timeout is None
               or monotonic.monotonic() - time_start < timeout):
            yield
        raise RCONTimeoutError
开发者ID:algrn912005,项目名称:python-valve,代码行数:16,代码来源:rcon.py


示例18: intend

    def intend(self, ident):
        # preconditions
        assert(self.quota > 0)
        assert(self.window > 0)

        # init
        now = monotonic()
        if (ident not in self.state):
            self.state[ident] = (deque(), 0)
        (queue, failed_intents) = self.state[ident]

        # flush expired intents
        while (queue and ((now - queue[0]) > self.window)):
            queue.popleft()

        # work out what to do with current intent
        if (len(queue) < self.quota):
            queue.append(now)
            failed_intents = 0
        else:
            failed_intents += 1

        # postconditions
        assert(len(queue) <= self.quota)

        # save state
        self.state[ident] = (queue, failed_intents)

        # generate bean counters
        quota_left = self.quota - len(queue) - failed_intents
        window_left = self.window - (now - queue[0])
        return (quota_left, window_left)
开发者ID:Tmplt,项目名称:Toothless,代码行数:32,代码来源:util.py


示例19: wrapper

 def wrapper(*args, **kwargs):
     start_time = monotonic()
     res = func(*args, **kwargs)
     elapsed_time = monotonic() - start_time
     current_app.logger.info(
         "{namespace} call {func} took {time}".format(
             namespace=namespace, func=func.__name__, time="{0:.4f}".format(elapsed_time)
         )
     )
     statsd_client.incr('{namespace}.{func}'.format(
         namespace=namespace, func=func.__name__)
     )
     statsd_client.timing('{namespace}.{func}'.format(
         namespace=namespace, func=func.__name__), elapsed_time
     )
     return res
开发者ID:alphagov,项目名称:notifications-api,代码行数:16,代码来源:statsd_decorators.py


示例20: __init__

    def __init__(
        self,
        incoming_response,
        swagger_result,
        start_time,
        request_end_time,
        handled_exception_info,
        request_config,
    ):
        """
        :param incoming_response: a subclass of bravado_core.response.IncomingResponse.
        :param swagger_result: the unmarshalled result that is being returned to the user.
        :param start_time: monotonic timestamp indicating when the HTTP future was created. Depending on the
            internal operation of the HTTP client used, this is either before the HTTP request was initiated
            (default client) or right after the HTTP request was sent (e.g. bravado-asyncio / fido).
        :param request_end_time: monotonic timestamp indicating when we received the incoming response,
            excluding unmarshalling, validation or potential fallback result processing.
        :param handled_exception_info: sys.exc_info() data if an exception was caught and handled as
            part of a fallback response; note that the third element in the list is a string representation
            of the traceback, not a traceback object.
        :param RequestConfig request_config: namedtuple containing the request options that were used
            for making this request.
        """
        self._incoming_response = incoming_response
        self.start_time = start_time
        self.request_end_time = request_end_time
        self.processing_end_time = monotonic.monotonic()
        self.handled_exception_info = handled_exception_info
        self.request_config = request_config

        # we expose the result to the user through the BravadoResponse object;
        # we're passing it in to this object in case custom implementations need it
        self._swagger_result = swagger_result
开发者ID:sjaensch,项目名称:bravado,代码行数:33,代码来源:response.py



注:本文中的monotonic.monotonic函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python monotonic.now函数代码示例发布时间:2022-05-27
下一篇:
Python utility.Logger类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap