• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python utils.get_time_in_millisecs函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中utils.get_time_in_millisecs函数的典型用法代码示例。如果您正苦于以下问题:Python get_time_in_millisecs函数的具体用法?Python get_time_in_millisecs怎么用?Python get_time_in_millisecs使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get_time_in_millisecs函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: get

    def get(self):
        """Handles GET requests."""
        if self.user_id is None:
            raise self.PageNotFoundException

        editable_exp_summaries = (
            exp_services.get_at_least_editable_exploration_summaries(
                self.user_id))

        def _get_intro_card_color(category):
            return (
                feconf.CATEGORIES_TO_COLORS[category] if
                category in feconf.CATEGORIES_TO_COLORS else
                feconf.DEFAULT_COLOR)

        self.values.update({
            'explorations_list': [{
                'id': exp_summary.id,
                'title': exp_summary.title,
                'category': exp_summary.category,
                'objective': exp_summary.objective,
                'language_code': exp_summary.language_code,
                'last_updated': utils.get_time_in_millisecs(
                    exp_summary.exploration_model_last_updated),
                'created_on': utils.get_time_in_millisecs(
                    exp_summary.exploration_model_created_on),
                'status': exp_summary.status,
                'community_owned': exp_summary.community_owned,
                'is_editable': True,
                'thumbnail_image_url': (
                    '/images/gallery/exploration_background_%s_small.png' %
                    _get_intro_card_color(exp_summary.category)),
            } for exp_summary in editable_exp_summaries.values()],
        })
        self.render_json(self.values)
开发者ID:Cgruppo,项目名称:oppia,代码行数:35,代码来源:home.py


示例2: get

    def get(self):
        """Handles GET requests."""
        if self.user_id is None:
            raise self.PageNotFoundException

        subscribed_summaries = (
            exp_services.get_exploration_summaries_matching_ids(
                subscription_services.get_exploration_ids_subscribed_to(
                    self.user_id)))

        def _get_intro_card_color(category):
            return (
                feconf.CATEGORIES_TO_COLORS[category] if
                category in feconf.CATEGORIES_TO_COLORS else
                feconf.DEFAULT_COLOR)

        explorations_list = []

        for exp_summary in subscribed_summaries:
            if exp_summary is None:
                continue

            feedback_thread_analytics = feedback_services.get_thread_analytics(
                exp_summary.id)
            explorations_list.append({
                'id': exp_summary.id,
                'title': exp_summary.title,
                'category': exp_summary.category,
                'objective': exp_summary.objective,
                'language_code': exp_summary.language_code,
                'last_updated': utils.get_time_in_millisecs(
                    exp_summary.exploration_model_last_updated),
                'created_on': utils.get_time_in_millisecs(
                    exp_summary.exploration_model_created_on),
                'status': exp_summary.status,
                'community_owned': exp_summary.community_owned,
                'is_editable': True,
                'thumbnail_icon_url': (
                    utils.get_thumbnail_icon_url_for_category(
                        exp_summary.category)),
                'thumbnail_bg_color': utils.get_hex_color_for_category(
                    exp_summary.category),
                'ratings': exp_summary.ratings,
                'num_open_threads': (
                    feedback_thread_analytics.num_open_threads),
                'num_total_threads': (
                    feedback_thread_analytics.num_total_threads),
            })

        explorations_list = sorted(
            explorations_list,
            key=lambda x: (x['num_open_threads'], x['last_updated']),
            reverse=True)

        self.values.update({
            'explorations_list': explorations_list,
        })
        self.render_json(self.values)
开发者ID:MaryamZi,项目名称:oppia,代码行数:58,代码来源:home.py


示例3: map

    def map(item):
        user_id = item.id
        job_queued_msec = RecentUpdatesMRJobManager._get_job_queued_msec()
        reducer_key = '%[email protected]%s' % (user_id, job_queued_msec)

        activity_ids_list = item.activity_ids
        feedback_thread_ids_list = item.feedback_thread_ids

        activities = exp_models.ExplorationModel.get_multi(
            activity_ids_list, include_deleted=True)

        for ind, activity in enumerate(activities):
            if activity is None:
                logging.error(
                    'Could not find activity %s' % activity_ids_list[ind])
                continue

            metadata_obj = exp_models.ExplorationModel.get_snapshots_metadata(
                activity.id, [activity.version], allow_deleted=True)[0]
            yield (reducer_key, {
                'type': feconf.UPDATE_TYPE_EXPLORATION_COMMIT,
                'activity_id': activity.id,
                'activity_title': activity.title,
                'author_id': metadata_obj['committer_id'],
                'last_updated_ms': utils.get_time_in_millisecs(
                    activity.last_updated),
                'subject': (
                    feconf.COMMIT_MESSAGE_EXPLORATION_DELETED
                    if activity.deleted else metadata_obj['commit_message']
                ),
            })

            # If the user subscribes to this activity, he/she is automatically
            # subscribed to all feedback threads for this activity.
            if not activity.deleted:
                threads = feedback_services.get_threadlist(activity.id)
                for thread in threads:
                    if thread['thread_id'] not in feedback_thread_ids_list:
                        feedback_thread_ids_list.append(thread['thread_id'])

        for feedback_thread_id in feedback_thread_ids_list:
            last_message = (
                feedback_models.FeedbackMessageModel.get_most_recent_message(
                    feedback_thread_id))

            yield (reducer_key, {
                'type': feconf.UPDATE_TYPE_FEEDBACK_MESSAGE,
                'activity_id': last_message.exploration_id,
                'activity_title': exp_models.ExplorationModel.get_by_id(
                    last_message.exploration_id).title,
                'author_id': last_message.author_id,
                'last_updated_ms': utils.get_time_in_millisecs(
                    last_message.created_on),
                'subject': last_message.get_thread_subject(),
            })
开发者ID:Cgruppo,项目名称:oppia,代码行数:55,代码来源:user_jobs.py


示例4: get

    def get(self):
        """Handles GET requests."""
        if self.user_id is None:
            raise self.PageNotFoundException

        subscribed_summaries = exp_services.get_exploration_summaries_matching_ids(
            subscription_services.get_activity_ids_subscribed_to(self.user_id)
        )

        def _get_intro_card_color(category):
            return (
                feconf.CATEGORIES_TO_COLORS[category]
                if category in feconf.CATEGORIES_TO_COLORS
                else feconf.DEFAULT_COLOR
            )

        explorations_list = []

        for exp_summary in subscribed_summaries:
            if exp_summary is None:
                continue

            feedback_thread_analytics = feedback_services.get_thread_analytics(exp_summary.id)
            explorations_list.append(
                {
                    "id": exp_summary.id,
                    "title": exp_summary.title,
                    "category": exp_summary.category,
                    "objective": exp_summary.objective,
                    "language_code": exp_summary.language_code,
                    "last_updated": utils.get_time_in_millisecs(exp_summary.exploration_model_last_updated),
                    "created_on": utils.get_time_in_millisecs(exp_summary.exploration_model_created_on),
                    "status": exp_summary.status,
                    "community_owned": exp_summary.community_owned,
                    "is_editable": True,
                    "thumbnail_image_url": (
                        "/images/gallery/exploration_background_%s_small.png"
                        % _get_intro_card_color(exp_summary.category)
                    ),
                    "ratings": exp_summary.ratings,
                    "num_open_threads": (feedback_thread_analytics["num_open_threads"]),
                    "num_total_threads": (feedback_thread_analytics["num_total_threads"]),
                }
            )

        explorations_list = sorted(
            explorations_list, key=lambda x: (x["num_open_threads"], x["last_updated"]), reverse=True
        )

        self.values.update({"explorations_list": explorations_list})
        self.render_json(self.values)
开发者ID:directorlive,项目名称:oppia,代码行数:51,代码来源:home.py


示例5: get_displayable_exp_summary_dicts

def get_displayable_exp_summary_dicts(exploration_summaries):
    """Given a list of exploration summary domain objects, returns a list,
    with the same number of elements, of the corresponding human-readable
    exploration summary dicts.

    This assumes that all the exploration summary domain objects passed in are
    valid (i.e., none of them are None).
    """
    exploration_ids = [
        exploration_summary.id
        for exploration_summary in exploration_summaries]

    view_counts = (
        stats_jobs_continuous.StatisticsAggregator.get_views_multi(
            exploration_ids))
    displayable_exp_summaries = []

    for ind, exploration_summary in enumerate(exploration_summaries):
        if not exploration_summary:
            continue

        summary_dict = {
            'id': exploration_summary.id,
            'title': exploration_summary.title,
            'activity_type': feconf.ACTIVITY_TYPE_EXPLORATION,
            'category': exploration_summary.category,
            'created_on_msec': utils.get_time_in_millisecs(
                exploration_summary.exploration_model_created_on),
            'objective': exploration_summary.objective,
            'language_code': exploration_summary.language_code,
            'last_updated_msec': utils.get_time_in_millisecs(
                exploration_summary.exploration_model_last_updated
            ),
            'human_readable_contributors_summary': (
                get_human_readable_contributors_summary(
                    exploration_summary.contributors_summary)
            ),
            'status': exploration_summary.status,
            'ratings': exploration_summary.ratings,
            'community_owned': exploration_summary.community_owned,
            'tags': exploration_summary.tags,
            'thumbnail_icon_url': utils.get_thumbnail_icon_url_for_category(
                exploration_summary.category),
            'thumbnail_bg_color': utils.get_hex_color_for_category(
                exploration_summary.category),
            'num_views': view_counts[ind],
        }

        displayable_exp_summaries.append(summary_dict)

    return displayable_exp_summaries
开发者ID:hemanthsavasere,项目名称:oppia,代码行数:51,代码来源:summary_services.py


示例6: get

    def get(self):
        """Handles GET requests."""
        # TODO(sll): Figure out what to do about explorations in categories
        # other than those explicitly listed.

        query_string = self.request.get('q')
        search_cursor = self.request.get('cursor', None)
        exp_summaries_list, search_cursor = (
            exp_services.get_exploration_summaries_matching_query(
                query_string, cursor=search_cursor))

        # TODO(msl): Store 'is_editable' in exploration summary to avoid O(n)
        # individual lookups. Note that this will depend on user_id.
        explorations_list = [{
            'id':
            exp_summary.id,
            'title':
            exp_summary.title,
            'category':
            exp_summary.category,
            'objective':
            exp_summary.objective,
            'language_code':
            exp_summary.language_code,
            'last_updated':
            utils.get_time_in_millisecs(
                exp_summary.exploration_model_last_updated),
            'status':
            exp_summary.status,
            'community_owned':
            exp_summary.community_owned,
            'thumbnail_image_url':
            exp_summary.thumbnail_image_url,
            'is_editable':
            exp_services.is_exp_summary_editable(
                exp_summary, user_id=self.user_id),
            'ratings':
            exp_summary.ratings
        } for exp_summary in exp_summaries_list]

        if len(explorations_list) == feconf.DEFAULT_QUERY_LIMIT:
            logging.error(
                '%s explorations were fetched to load the gallery page. '
                'You may be running up against the default query limits.' %
                feconf.DEFAULT_QUERY_LIMIT)

        preferred_language_codes = [feconf.DEFAULT_LANGUAGE_CODE]
        if self.user_id:
            user_settings = user_services.get_user_settings(self.user_id)
            preferred_language_codes = user_settings.preferred_language_codes

        self.values.update({
            'explorations_list':
            explorations_list,
            'preferred_language_codes':
            preferred_language_codes,
            'search_cursor':
            search_cursor,
        })
        self.render_json(self.values)
开发者ID:maitbayev,项目名称:oppia,代码行数:60,代码来源:galleries.py


示例7: get

    def get(self):
        """Handles GET requests."""
        try:
            exp_ids = json.loads(self.request.get('stringified_exp_ids'))
        except Exception:
            raise self.PageNotFoundException

        if (not isinstance(exp_ids, list) or not all([
                isinstance(exp_id, basestring) for exp_id in exp_ids])):
            raise self.PageNotFoundException

        exp_summaries = exp_services.get_exploration_summaries_matching_ids(
            exp_ids)

        self.values.update({
            'summaries': [(None if exp_summary is None else {
                'id': exp_summary.id,
                'title': exp_summary.title,
                'category': exp_summary.category,
                'objective': exp_summary.objective,
                'language_code': exp_summary.language_code,
                'last_updated': utils.get_time_in_millisecs(
                    exp_summary.exploration_model_last_updated),
                'status': exp_summary.status,
                'community_owned': exp_summary.community_owned,
                'thumbnail_image_url': exp_summary.thumbnail_image_url,
            }) for exp_summary in exp_summaries]
        })
        self.render_json(self.values)
开发者ID:makerscraft,项目名称:oppia,代码行数:29,代码来源:galleries.py


示例8: test_basic_computation

    def test_basic_computation(self):
        with self.swap(
                jobs_registry, 'ALL_CONTINUOUS_COMPUTATION_MANAGERS',
                self.ALL_CONTINUOUS_COMPUTATION_MANAGERS_FOR_TESTS):
            EXP_ID = 'eid'
            EXP_TITLE = 'Title'
            USER_ID = 'user_id'

            self.save_new_valid_exploration(
                EXP_ID, USER_ID, title=EXP_TITLE, category='Category')
            expected_last_updated_ms = utils.get_time_in_millisecs(
                exp_services.get_exploration_by_id(EXP_ID).last_updated)

            ModifiedRecentUpdatesAggregator.start_computation()
            self.assertEqual(
                self.count_jobs_in_taskqueue(
                    queue_name=taskqueue_services.QUEUE_NAME_DEFAULT),
                1)
            self.process_and_flush_pending_tasks()

            self.assertEqual(
                ModifiedRecentUpdatesAggregator.get_recent_notifications(
                    USER_ID)[1],
                [self._get_expected_exploration_created_dict(
                    USER_ID, EXP_ID, EXP_TITLE, expected_last_updated_ms)])
开发者ID:danieljjh,项目名称:oppia,代码行数:25,代码来源:user_jobs_test.py


示例9: get_snapshots_metadata

    def get_snapshots_metadata(cls, model_instance_id, version_numbers, allow_deleted=False):
        """Returns a list of dicts, each representing a model snapshot.

        One dict is returned for each version number in the list of version
        numbers requested. If any of the version numbers does not exist, an
        error is raised.

        If `allow_deleted` is False, an error is raised if the current model
        has been deleted.
        """
        if not allow_deleted:
            cls.get(model_instance_id)._require_not_marked_deleted()

        snapshot_ids = [cls._get_snapshot_id(model_instance_id, version_number) for version_number in version_numbers]
        metadata_keys = [ndb.Key(cls.SNAPSHOT_METADATA_CLASS, snapshot_id) for snapshot_id in snapshot_ids]
        returned_models = ndb.get_multi(metadata_keys)

        for ind, model in enumerate(returned_models):
            if model is None:
                raise Exception(
                    "Invalid version number %s for model %s with id %s"
                    % (version_numbers[ind], cls.__name__, model_instance_id)
                )

        return [
            {
                "committer_id": model.committer_id,
                "commit_message": model.commit_message,
                "commit_cmds": model.commit_cmds,
                "commit_type": model.commit_type,
                "version_number": version_numbers[ind],
                "created_on_ms": utils.get_time_in_millisecs(model.created_on),
            }
            for (ind, model) in enumerate(returned_models)
        ]
开发者ID:dpaparis,项目名称:oppia,代码行数:35,代码来源:gae_models.py


示例10: test_basic_computation_with_an_update_after_creation

    def test_basic_computation_with_an_update_after_creation(self):
        with self.swap(
                jobs_registry, 'ALL_CONTINUOUS_COMPUTATION_MANAGERS',
                self.ALL_CONTINUOUS_COMPUTATION_MANAGERS_FOR_TESTS):
            EXP_ID = 'eid'
            EXP_TITLE = 'Title'
            USER_ID = 'user_id'
            ANOTHER_USER_ID = 'another_user_id'

            self.save_new_valid_exploration(
                EXP_ID, USER_ID, title=EXP_TITLE, category='Category')
            # Another user makes a commit; this, too, shows up in the
            # original user's dashboard.
            exp_services.update_exploration(
                ANOTHER_USER_ID, EXP_ID, [], 'Update exploration')
            expected_last_updated_ms = utils.get_time_in_millisecs(
                exp_services.get_exploration_by_id(EXP_ID).last_updated)

            ModifiedRecentUpdatesAggregator.start_computation()
            self.assertEqual(
                self.count_jobs_in_taskqueue(
                    queue_name=taskqueue_services.QUEUE_NAME_DEFAULT),
                1)
            self.process_and_flush_pending_tasks()

            recent_updates = (
                ModifiedRecentUpdatesAggregator.get_recent_updates(USER_ID)[1])
            self.assertEqual([{
                'type': feconf.UPDATE_TYPE_EXPLORATION_COMMIT,
                'last_updated_ms': expected_last_updated_ms,
                'activity_id': EXP_ID,
                'activity_title': EXP_TITLE,
                'author_id': ANOTHER_USER_ID,
                'subject': 'Update exploration',
            }], recent_updates)
开发者ID:Cgruppo,项目名称:oppia,代码行数:35,代码来源:user_jobs_test.py


示例11: map

    def map(item):
        max_start_time_msec = JobCleanupManager.get_mapper_param(MAPPER_PARAM_MAX_START_TIME_MSEC)

        if isinstance(item, mapreduce_model.MapreduceState):
            if item.result_status == "success" and utils.get_time_in_millisecs(item.start_time) < max_start_time_msec:
                item.delete()
                yield ("mr_state_deleted", 1)
            else:
                yield ("mr_state_remaining", 1)

        if isinstance(item, mapreduce_model.ShardState):
            if item.result_status == "success" and utils.get_time_in_millisecs(item.update_time) < max_start_time_msec:
                item.delete()
                yield ("shard_state_deleted", 1)
            else:
                yield ("shard_state_remaining", 1)
开发者ID:oppia,项目名称:oppia,代码行数:16,代码来源:jobs.py


示例12: get

    def get(self):
        """Handles GET requests."""
        # TODO(sll): Figure out what to do about explorations in categories
        # other than those explicitly listed.

        language_codes_to_short_descs = {
            lc['code']: utils.get_short_language_description(lc['description'])
            for lc in feconf.ALL_LANGUAGE_CODES
        }

        query_string = self.request.get('q')
        search_cursor = self.request.get('cursor', None)
        exp_summaries_list, search_cursor = (
            exp_services.get_exploration_summaries_matching_query(
                query_string, cursor=search_cursor))

        def _get_intro_card_color(category):
            return (
                feconf.CATEGORIES_TO_COLORS[category] if
                category in feconf.CATEGORIES_TO_COLORS else
                feconf.DEFAULT_COLOR)

        # TODO(msl): Store 'is_editable' in exploration summary to avoid O(n)
        # individual lookups. Note that this will depend on user_id.
        explorations_list = [{
            'id': exp_summary.id,
            'title': exp_summary.title,
            'category': exp_summary.category,
            'objective': exp_summary.objective,
            'language_code': exp_summary.language_code,
            'last_updated': utils.get_time_in_millisecs(
                exp_summary.exploration_model_last_updated),
            'status': exp_summary.status,
            'community_owned': exp_summary.community_owned,
            'thumbnail_image_url': (
                '/images/gallery/exploration_background_%s_small.png' %
                _get_intro_card_color(exp_summary.category)),
            'is_editable': exp_services.is_exp_summary_editable(
                exp_summary,
                user_id=self.user_id)
        } for exp_summary in exp_summaries_list]

        if len(explorations_list) == feconf.DEFAULT_QUERY_LIMIT:
            logging.error(
                '%s explorations were fetched to load the gallery page. '
                'You may be running up against the default query limits.'
                % feconf.DEFAULT_QUERY_LIMIT)

        preferred_language_codes = [feconf.DEFAULT_LANGUAGE_CODE]
        if self.user_id:
            user_settings = user_services.get_user_settings(self.user_id)
            preferred_language_codes = user_settings.preferred_language_codes

        self.values.update({
            'explorations_list': explorations_list,
            'preferred_language_codes': preferred_language_codes,
            'search_cursor': search_cursor,
        })
        self.render_json(self.values)
开发者ID:Cgruppo,项目名称:oppia,代码行数:59,代码来源:galleries.py


示例13: test_making_feedback_thread_does_not_subscribe_to_exploration

    def test_making_feedback_thread_does_not_subscribe_to_exploration(self):
        with self._get_test_context():
            self.signup(USER_A_EMAIL, USER_A_USERNAME)
            user_a_id = self.get_user_id_from_email(USER_A_EMAIL)
            self.signup(USER_B_EMAIL, USER_B_USERNAME)
            user_b_id = self.get_user_id_from_email(USER_B_EMAIL)

            # User A creates an exploration.
            self.save_new_valid_exploration(
                EXP_ID, user_a_id, title=EXP_TITLE, category='Category')
            exp_last_updated_ms = (
                self._get_most_recent_exp_snapshot_created_on_ms(EXP_ID))

            # User B starts a feedback thread.
            feedback_services.create_thread(
                EXP_ID, None, user_b_id, FEEDBACK_THREAD_SUBJECT, 'text')
            thread_id = feedback_services.get_all_threads(
                EXP_ID, False)[0].get_thread_id()

            message = feedback_services.get_messages(
                EXP_ID, thread_id)[0]

            ModifiedRecentUpdatesAggregator.start_computation()
            self.assertEqual(
                self.count_jobs_in_taskqueue(
                    queue_name=taskqueue_services.QUEUE_NAME_DEFAULT),
                1)
            self.process_and_flush_pending_tasks()

            recent_notifications_for_user_a = (
                ModifiedRecentUpdatesAggregator.get_recent_notifications(
                    user_a_id)[1])
            recent_notifications_for_user_b = (
                ModifiedRecentUpdatesAggregator.get_recent_notifications(
                    user_b_id)[1])
            expected_thread_notification = {
                'activity_id': EXP_ID,
                'activity_title': EXP_TITLE,
                'author_id': user_b_id,
                'last_updated_ms': utils.get_time_in_millisecs(
                    message.created_on),
                'subject': FEEDBACK_THREAD_SUBJECT,
                'type': feconf.UPDATE_TYPE_FEEDBACK_MESSAGE,
            }
            expected_creation_notification = (
                self._get_expected_activity_created_dict(
                    user_a_id, EXP_ID, EXP_TITLE, 'exploration',
                    feconf.UPDATE_TYPE_EXPLORATION_COMMIT,
                    exp_last_updated_ms))

            # User A sees A's commit and B's feedback thread.
            self.assertEqual(recent_notifications_for_user_a, [
                expected_thread_notification,
                expected_creation_notification
            ])
            # User B sees only her feedback thread, but no commits.
            self.assertEqual(recent_notifications_for_user_b, [
                expected_thread_notification,
            ])
开发者ID:MinhHuong,项目名称:oppia,代码行数:59,代码来源:user_jobs_continuous_test.py


示例14: test_multiple_exploration_commits_and_feedback_messages

    def test_multiple_exploration_commits_and_feedback_messages(self):
        with self._get_test_context():
            self.signup(self.EDITOR_EMAIL, self.EDITOR_USERNAME)
            editor_id = self.get_user_id_from_email(self.EDITOR_EMAIL)

            # User creates an exploration.
            self.save_new_valid_exploration(
                EXP_1_ID, editor_id, title=EXP_1_TITLE,
                category='Category')

            exp1_last_updated_ms = (
                self._get_most_recent_exp_snapshot_created_on_ms(EXP_1_ID))

            # User gives feedback on it.
            feedback_services.create_thread(
                EXP_1_ID, None, editor_id, FEEDBACK_THREAD_SUBJECT,
                'text')
            thread_id = feedback_services.get_all_threads(
                EXP_1_ID, False)[0].get_thread_id()
            message = feedback_services.get_messages(EXP_1_ID, thread_id)[0]

            # User creates another exploration.
            self.save_new_valid_exploration(
                EXP_2_ID, editor_id, title=EXP_2_TITLE,
                category='Category')
            exp2_last_updated_ms = (
                self._get_most_recent_exp_snapshot_created_on_ms(EXP_2_ID))

            ModifiedRecentUpdatesAggregator.start_computation()
            self.assertEqual(
                self.count_jobs_in_taskqueue(
                    queue_name=taskqueue_services.QUEUE_NAME_DEFAULT),
                1)
            self.process_and_flush_pending_tasks()

            recent_notifications = (
                ModifiedRecentUpdatesAggregator.get_recent_notifications(
                    editor_id)[1])
            self.assertEqual([(
                self._get_expected_activity_created_dict(
                    editor_id, EXP_2_ID, EXP_2_TITLE, 'exploration',
                    feconf.UPDATE_TYPE_EXPLORATION_COMMIT,
                    exp2_last_updated_ms)
            ), {
                'activity_id': EXP_1_ID,
                'activity_title': EXP_1_TITLE,
                'author_id': editor_id,
                'last_updated_ms': utils.get_time_in_millisecs(
                    message.created_on),
                'subject': FEEDBACK_THREAD_SUBJECT,
                'type': feconf.UPDATE_TYPE_FEEDBACK_MESSAGE,
            }, (
                self._get_expected_activity_created_dict(
                    editor_id, EXP_1_ID, EXP_1_TITLE, 'exploration',
                    feconf.UPDATE_TYPE_EXPLORATION_COMMIT,
                    exp1_last_updated_ms)
            )], recent_notifications)
开发者ID:MinhHuong,项目名称:oppia,代码行数:57,代码来源:user_jobs_continuous_test.py


示例15: _get_thread_dict_from_model_instance

def _get_thread_dict_from_model_instance(thread):
    return {
        'last_updated': utils.get_time_in_millisecs(thread.last_updated),
        'original_author_username': user_services.get_username(
            thread.original_author_id) if thread.original_author_id else None,
        'state_name': thread.state_name,
        'status': thread.status,
        'subject': thread.subject,
        'summary': thread.summary,
        'thread_id': get_thread_id_from_full_thread_id(thread.id)}
开发者ID:mvalenza,项目名称:oppia,代码行数:10,代码来源:feedback_services.py


示例16: map

    def map(item):
        user_id = item.id
        job_queued_msec = RecentUpdatesMRJobManager._get_job_queued_msec()
        reducer_key = "%[email protected]%s" % (user_id, job_queued_msec)

        exploration_ids_list = item.activity_ids
        collection_ids_list = item.collection_ids
        feedback_thread_ids_list = item.feedback_thread_ids

        (
            most_recent_activity_commits,
            tracked_exp_models_for_feedback,
        ) = RecentUpdatesMRJobManager._get_most_recent_activity_commits(
            exp_models.ExplorationModel,
            exploration_ids_list,
            "exploration",
            feconf.UPDATE_TYPE_EXPLORATION_COMMIT,
            feconf.COMMIT_MESSAGE_EXPLORATION_DELETED,
        )

        for exp_model in tracked_exp_models_for_feedback:
            threads = feedback_services.get_threadlist(exp_model.id)
            for thread in threads:
                if thread["thread_id"] not in feedback_thread_ids_list:
                    feedback_thread_ids_list.append(thread["thread_id"])

        # TODO(bhenning): Implement a solution to having feedback threads for
        # collections.
        most_recent_activity_commits += (
            RecentUpdatesMRJobManager._get_most_recent_activity_commits(
                collection_models.CollectionModel,
                collection_ids_list,
                "collection",
                feconf.UPDATE_TYPE_COLLECTION_COMMIT,
                feconf.COMMIT_MESSAGE_COLLECTION_DELETED,
            )
        )[0]

        for recent_activity_commit_dict in most_recent_activity_commits:
            yield (reducer_key, recent_activity_commit_dict)

        for feedback_thread_id in feedback_thread_ids_list:
            last_message = feedback_models.FeedbackMessageModel.get_most_recent_message(feedback_thread_id)

            yield (
                reducer_key,
                {
                    "type": feconf.UPDATE_TYPE_FEEDBACK_MESSAGE,
                    "activity_id": last_message.exploration_id,
                    "activity_title": exp_models.ExplorationModel.get_by_id(last_message.exploration_id).title,
                    "author_id": last_message.author_id,
                    "last_updated_ms": utils.get_time_in_millisecs(last_message.created_on),
                    "subject": last_message.get_thread_subject(),
                },
            )
开发者ID:JenBroness,项目名称:oppia,代码行数:55,代码来源:user_jobs.py


示例17: get_statistics

    def get_statistics(cls, exploration_id, exploration_version):
        """Gets the statistics for the specified exploration and version.

        Args:
            exploration_id: str. The id of the exploration to get statistics
                for.
            exploration_version: str. Which version of the exploration to
                get statistics for. This can be a version number, the string
                'all', or the string 'none'.

        Returns:
            dict. The keys of the dict are:
                'start_exploration_count': # of times exploration was started.
                'complete_exploration_count': # of times exploration was
                    completed.
                'state_hit_counts': a dict containing the hit counts for the
                    states in the exploration. It is formatted as follows:
                    {
                        state_name: {
                            'first_entry_count': # of sessions which hit
                                this state.
                            'total_entry_count': # of total hits for this
                                state.
                            'no_answer_count': # of hits with no answer for
                                this state.
                        }
                    }
        """
        num_starts = 0
        num_completions = 0
        state_hit_counts = {}
        last_updated = None

        entity_id = stats_models.ExplorationAnnotationsModel.get_entity_id(
            exploration_id, exploration_version)
        mr_model = stats_models.ExplorationAnnotationsModel.get(
            entity_id, strict=False)
        if mr_model is not None:
            num_starts += mr_model.num_starts
            num_completions += mr_model.num_completions
            state_hit_counts = mr_model.state_hit_counts
            last_updated = utils.get_time_in_millisecs(mr_model.last_updated)

        realtime_model = cls._get_realtime_datastore_class().get(
            cls.get_active_realtime_layer_id(exploration_id), strict=False)
        if realtime_model is not None:
            num_starts += realtime_model.num_starts
            num_completions += realtime_model.num_completions

        return {
            'start_exploration_count': num_starts,
            'complete_exploration_count': num_completions,
            'state_hit_counts': state_hit_counts,
            'last_updated': last_updated,
        }
开发者ID:hemanthsavasere,项目名称:oppia,代码行数:55,代码来源:stats_jobs_continuous.py


示例18: to_dict

 def to_dict(self):
     return {
         'last_updated': utils.get_time_in_millisecs(self.last_updated),
         'original_author_username': user_services.get_username(
             self.original_author_id) if self.original_author_id else None,
         'state_name': self.state_name,
         'status': self.status,
         'subject': self.subject,
         'summary': self.summary,
         'thread_id': self.get_thread_id()
     }
开发者ID:526avijitgupta,项目名称:oppia,代码行数:11,代码来源:feedback_domain.py


示例19: map

    def map(item):
        user_id = item.id
        job_queued_msec = RecentUpdatesMRJobManager._get_job_queued_msec()
        reducer_key = '%[email protected]%s' % (user_id, job_queued_msec)

        exploration_ids_list = item.activity_ids
        collection_ids_list = item.collection_ids
        feedback_thread_ids_list = item.feedback_thread_ids

        (most_recent_activity_commits, tracked_exp_models_for_feedback) = (
            RecentUpdatesMRJobManager._get_most_recent_activity_commits(
                exp_models.ExplorationModel, exploration_ids_list,
                'exploration', feconf.UPDATE_TYPE_EXPLORATION_COMMIT,
                feconf.COMMIT_MESSAGE_EXPLORATION_DELETED))

        for exp_model in tracked_exp_models_for_feedback:
            threads = feedback_services.get_all_threads(exp_model.id, False)
            for thread in threads:
                full_thread_id = (
                    feedback_models.FeedbackThreadModel.generate_full_thread_id(
                        exp_model.id, thread['thread_id']))
                if full_thread_id not in feedback_thread_ids_list:
                    feedback_thread_ids_list.append(full_thread_id)

        # TODO(bhenning): Implement a solution to having feedback threads for
        # collections.
        most_recent_activity_commits += (
            RecentUpdatesMRJobManager._get_most_recent_activity_commits(
                collection_models.CollectionModel, collection_ids_list,
                'collection', feconf.UPDATE_TYPE_COLLECTION_COMMIT,
                feconf.COMMIT_MESSAGE_COLLECTION_DELETED))[0]

        for recent_activity_commit_dict in most_recent_activity_commits:
            yield (reducer_key, recent_activity_commit_dict)

        for feedback_thread_id in feedback_thread_ids_list:
            exp_id = feedback_services.get_exp_id_from_full_thread_id(
                feedback_thread_id)
            thread_id = feedback_services.get_thread_id_from_full_thread_id(
                feedback_thread_id)
            last_message = (
                feedback_models.FeedbackMessageModel.get_most_recent_message(
                    exp_id, thread_id))

            yield (reducer_key, {
                'type': feconf.UPDATE_TYPE_FEEDBACK_MESSAGE,
                'activity_id': last_message.exploration_id,
                'activity_title': exp_models.ExplorationModel.get_by_id(
                    last_message.exploration_id).title,
                'author_id': last_message.author_id,
                'last_updated_ms': utils.get_time_in_millisecs(
                    last_message.created_on),
                'subject': last_message.get_thread_subject(),
            })
开发者ID:CuriousLearner,项目名称:oppia,代码行数:54,代码来源:user_jobs_continuous.py


示例20: _entity_created_before_job_queued

    def _entity_created_before_job_queued(entity):
        """Checks that the given entity was created before the MR job was queued.

        Mapper methods may want to use this as a precomputation check,
        especially if the datastore classes being iterated over are append-only
        event logs.
        

鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python utils.get_translation函数代码示例发布时间:2022-05-26
下一篇:
Python utils.get_temp_file函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap