• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python context_managers.switch_db函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中mongoengine.context_managers.switch_db函数的典型用法代码示例。如果您正苦于以下问题:Python switch_db函数的具体用法?Python switch_db怎么用?Python switch_db使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了switch_db函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_undoes_db_changes_when_error_2

 def test_undoes_db_changes_when_error_2(self):
     """
     Tests that load_from_config.run() removes only the documents it saved to the database and leaves
     alone any documents that were already there when it comes across an unexpected error
     """
     expected_experi_model.switch_db(TEST_DB_ALIAS)
     expected_experi_model.save()
     expected_ds_model.switch_db(TEST_DB_ALIAS)
     expected_ds_model.save()
     json_config = open(path_string_json_full, 'w')
     json_config.write(breaking_json)
     json_config.close()
     with self.assertRaises(KeyError):
         load_from_config.run()
     with switch_db(Experiment, TEST_DB_ALIAS) as TestEx:
         query = TestEx.objects.all()
         self.assertEqual(len(query), 1)
         self.document_compare(query[0], expected_experi_model)
     with switch_db(DataSource, TEST_DB_ALIAS) as TestDs:
         query = TestDs.objects.all()
         self.assertEqual(len(query), 1)
         self.document_compare(query[0], expected_ds_model)
     with switch_db(Genotype, TEST_DB_ALIAS) as TestGen:
         query = TestGen.objects.all()
         self.assertEqual(len(query), 0)
开发者ID:hdzierz,项目名称:Kaka,代码行数:25,代码来源:tests.py


示例2: create_update_footnotes

def create_update_footnotes(data, db_alias):
    total_created = 0
    total_updated = 0

    logging.info('Processing %d footnotes' % len(data))

    from usda_mongo.models import Footnote, Nutrient, Food

    with switch_db(Footnote, db_alias) as Footnote:
        with switch_db(Nutrient, db_alias) as Nutrient:
            with switch_db(Food, db_alias) as Food:

                for row in csv.DictReader(
                    data, fieldnames=(
                        'ndb_no', 'footnt_no', 'footnt_typ', 'nutr_no', 'footnt_txt'
                    ),
                    delimiter='^', quotechar='~'
                ):
                    created = False

                    # SR22 definition indicates that `footnt_no` and `footnt_typ` are required,
                    # but on occasion, either on is blank.  To compensate for this, we assume
                    # a blank `footnt_no` is '1' and a blank`footnt_typ` is 'N'.
                    if row['footnt_no'] == '':
                        row['footnt_no'] = 1
                    if row['footnt_typ'] not in (FOOTNOTE_DESC, FOOTNOTE_MEAS, FOOTNOTE_NUTR):
                        row['footnt_typ'] = FOOTNOTE_NUTR

                    if row.get('nutr_no'):
                        nutrient = Nutrient.objects.get(number=int(row['nutr_no']))
                    else:
                        nutrient = None

                    try:
                        footnote = Footnote.objects.get(
                            food=Food.objects.get(ndb_number=int(row['ndb_no'])),
                            number=int(row['footnt_no']),
                            nutrient=nutrient
                        )
                        total_updated += 1
                    except Footnote.DoesNotExist:
                        footnote = Footnote(
                            food=Food.objects.get(ndb_number=int(row['ndb_no'])),
                            number=int(row['footnt_no']),
                            nutrient=nutrient
                        )
                        total_created += 1
                        created = True

                    footnote.type = row['footnt_typ']
                    footnote.text = row['footnt_txt']
                    footnote.save()

                    if created:
                        logging.debug('Created %s' % footnote)
                    else:
                        logging.debug('Updated %s' % footnote)

    logging.info('Created %d new footnotes' % total_created)
    logging.info('Updated %d footnotes' % total_updated)
开发者ID:javipalanca,项目名称:django-usda-mongo,代码行数:60,代码来源:import_sr22.py


示例3: create_document

    def create_document(row, test=False):
        # Creates and returns a Genotype document from the values in the row
        db_alias = TEST_DB_ALIAS if test else 'default'
        build_dic = {}
        for key in row:
            if 'date' == key[-4:] or key == 'dtt':
                # Assumes values ending in 'date' are for date fields
                build_dic[key] = datetime.strptime(row[key], "%Y-%m-%d %H:%M:%S.%f")

            # Searches through the other collections for the reference field values
            elif 'datasource' in key:
                with switch_db(DataSource, db_alias) as TestDat:
                    datasource, created = fetch_or_save(
                        TestDat, db_alias=db_alias, name=row[key]
                    )
                build_dic['datasource'] = datasource
            elif 'study' in key:
                with switch_db(Experiment, db_alias) as TestEx:
                    study, created = fetch_or_save(
                        TestEx, db_alias=db_alias, name=row[key]
                    )
                build_dic['study'] = study

            elif key == 'obs':
                # Extracts the dictionary from the obs field
                build_dic[key] = ast.literal_eval(row[key])
            else:
                build_dic[key] = row[key]

        with switch_db(Genotype, db_alias) as TestGen:
            gen, created = fetch_or_save(TestGen, db_alias=db_alias, **build_dic)
        return gen
开发者ID:hdzierz,项目名称:Kaka,代码行数:32,代码来源:csv_to_doc_strategy.py


示例4: document_compare

 def document_compare(self, doc1, doc2):
     """
     Asserts the two given mongoengine.documents are equal. Ignores metadata fields and fields such
     as time stamps that default to datetime.now()
     """
     for key in doc1._fields_ordered:
         # ignores metadata fields and datetime fields that default to datetime.now()
         if key != 'id' and key[0] != '_' and key != 'dtt' and key != 'lastupdateddate':
             with self.subTest(key=key):
                 val = doc1[key]
                 if isinstance(doc1[key], dict):
                     self.assertDictEqual(doc1[key], doc2[key])
                 elif isinstance(val, DBRef):
                     if key == 'study':
                         with switch_db(Experiment, TEST_DB_ALIAS) as TestEx:
                             study = TestEx.objects.get(id=val.id)
                             self.document_compare(study, doc2[key])
                     elif key == 'datasource':
                         with switch_db(DataSource, TEST_DB_ALIAS) as TestDs:
                             ds = TestDs.objects.get(id=val.id)
                             self.document_compare(ds, doc2[key])
                     else:
                         self.fail("Unexpected reference field: " + key)
                 else:
                     self.assertEqual(doc1[key], doc2[key])
开发者ID:hdzierz,项目名称:Kaka,代码行数:25,代码来源:tests.py


示例5: tearDown

 def tearDown(self):
     """
     Clears the test database
     """
     with switch_db(Experiment, TEST_DB_ALIAS) as TestEx:
         TestEx.objects.all().delete()
     with switch_db(DataSource, TEST_DB_ALIAS) as TestDs:
         TestDs.objects.all().delete()
     with switch_db(Genotype, TEST_DB_ALIAS) as TestGen:
         TestGen.objects.all().delete()
开发者ID:hdzierz,项目名称:Kaka,代码行数:10,代码来源:tests.py


示例6: load

    def load(self):
        logger.debug(u"iniciando metodo load() (uuid: %s)" % self.metadata['uuid'])

        logger.debug(u"salvando modelo %s no opac (_id: %s)" % (
            self.opac_model_name, self.opac_model_instance._id))

        with \
            switch_db(OpacCollection, OPAC_WEBAPP_DB_NAME), \
            switch_db(OpacJournal, OPAC_WEBAPP_DB_NAME), \
            switch_db(OpacIssue, OPAC_WEBAPP_DB_NAME), \
            switch_db(OpacArticle, OPAC_WEBAPP_DB_NAME), \
            switch_db(OpacSponsor, OPAC_WEBAPP_DB_NAME), \
            switch_db(OpacNews, OPAC_WEBAPP_DB_NAME), \
            switch_db(OpacPages, OPAC_WEBAPP_DB_NAME), \
            switch_db(OpacPressRelease, OPAC_WEBAPP_DB_NAME):

                self.opac_model_instance.switch_db(OPAC_WEBAPP_DB_NAME)
                self.opac_model_instance.save()
                self.opac_model_instance.reload()

        # atualizamos os dados do registro LOAD
        with switch_db(self.load_model_class, OPAC_PROC_DB_NAME):
            self.metadata['process_finish_at'] = datetime.now()
            self.metadata['process_completed'] = True
            self.metadata['must_reprocess'] = False
            json_opac_data = self.opac_model_instance.to_json()
            cleaned_json_opac_data = json_opac_data.replace('$', '')  # retiramos o $
            self.metadata['loaded_data'] = json.loads(cleaned_json_opac_data)
            self.load_model_instance.update(**self.metadata)
            self.load_model_instance.save()
            self.load_model_instance.reload()
            logger.debug(u"modelo %s no opac_proc (uuid: %s) foi atualizado" % (
                self.load_model_name, self.metadata['uuid']))

        logger.debug(u"finalizando metodo load() (uuid: %s)" % self.metadata['uuid'])
开发者ID:jamilatta,项目名称:opac_proc,代码行数:35,代码来源:base.py


示例7: query_genotype_by_experiment

def query_genotype_by_experiment(experi_id):
    db_alias = TEST_DB_ALIAS if testing else 'default'
    # Make query
    try:
        with switch_db(Experiment, db_alias) as Exper:
            ex = Exper.objects.get(id=experi_id)
    except Experiment.DoesNotExist:
        raise Http404("Experiment does not exist")
    with switch_db(Genotype, db_alias) as Gen:
        genotype = Gen.objects(study=ex)
    return genotype, ex
开发者ID:hdzierz,项目名称:Kaka,代码行数:11,代码来源:views.py


示例8: prepare

    def prepare(self):
        logger.debug(u"iniciando metodo prepare (uuid: %s)" % self.metadata['uuid'])
        obj_dict = self.transform_model_instance_to_python()
        obj_dict['_id'] = self._uuid_str

        with \
            switch_db(OpacCollection, OPAC_WEBAPP_DB_NAME), \
            switch_db(OpacJournal, OPAC_WEBAPP_DB_NAME), \
            switch_db(OpacIssue, OPAC_WEBAPP_DB_NAME), \
            switch_db(OpacArticle, OPAC_WEBAPP_DB_NAME), \
            switch_db(OpacSponsor, OPAC_WEBAPP_DB_NAME), \
            switch_db(OpacNews, OPAC_WEBAPP_DB_NAME), \
            switch_db(OpacPages, OPAC_WEBAPP_DB_NAME), \
            switch_db(OpacPressRelease, OPAC_WEBAPP_DB_NAME):

                if self.opac_model_instance is None:
                    # crio uma nova instância
                    self.opac_model_instance = self.opac_model_class(**obj_dict)
                    self.opac_model_instance.switch_db(OPAC_WEBAPP_DB_NAME)
                else:  # já tenho uma instância no banco
                    self.opac_model_instance.switch_db(OPAC_WEBAPP_DB_NAME)
                    for k, v in obj_dict.iteritems():
                        self.opac_model_instance[k] = v
                    self.opac_model_instance.save()

        logger.debug(u"modelo opac (_id: %s) encontrado. atualizando registro" % obj_dict['_id'])

        logger.debug(u"finalizando metodo prepare(uuid: %s)" % self.metadata['uuid'])
        logger.debug(u'opac_model_instance SALVO: %s' % self.opac_model_instance.to_json())
        return self.opac_model_instance
开发者ID:jamilatta,项目名称:opac_proc,代码行数:30,代码来源:base.py


示例9: create_update_weights

def create_update_weights(data, encoding, db_alias):
    total_created = 0
    total_updated = 0

    logging.info('Processing %d weights' % len(data))

    from usda_mongo.models import Weight, Food

    with switch_db(Weight, db_alias) as Weight:
        with switch_db(Food, db_alias) as Food:

            for row in UnicodeDictReader(
                data, fieldnames=(
                    'ndb_no', 'seq', 'amount', 'msre_desc', 'gm_wgt', 'num_data_pts', 'std_dev'
                ),
                delimiter='^', quotechar='~',
                encoding=encoding
            ):
                created = False

                try:
                    weight = Weight.objects.get(
                        food=Food.objects.get(ndb_number=int(row['ndb_no'])),
                        sequence=int(row['seq'])
                    )
                    total_updated += 1
                except Weight.DoesNotExist:
                    weight = Weight(
                        food=Food.objects.get(ndb_number=int(row['ndb_no'])),
                        sequence=int(row['seq'])
                    )
                    total_created += 1
                    created = True

                weight.amount = float(row.get('amount'))
                weight.description = row.get('msre_desc')
                weight.gram_weight = float(row.get('gm_wgt'))
                if row.get('num_data_pts'):
                    weight.number_of_data_points = float(row['num_data_pts'])
                if row.get('std_dev'):
                    weight.standard_deviation = float(row['std_dev'])
                weight.save()

                if created:
                    logging.debug('Created %s' % weight)
                else:
                    logging.debug('Updated %s' % weight)

    logging.info('Created %d new weights' % total_created)
    logging.info('Updated %d weights' % total_updated)
开发者ID:javipalanca,项目名称:django-usda-mongo,代码行数:50,代码来源:import_sr22.py


示例10: get_ref_fields

def get_ref_fields(gen, testing):
    ref_fields = {"study": gen.study, "datasource": gen.datasource}
    if testing:
        # Reference fields would only hold database references instead of the actual
        # document, so need to query the test database for the documents
        study_son = ref_fields["study"].as_doc()
        ds_son = ref_fields["datasource"].as_doc()
        study_id = study_son.get("$id")
        with switch_db(Experiment, TEST_DB_ALIAS) as Exper:
            ref_fields["study"] = Exper.objects.get(id=study_id)
        ds_id = ds_son.get("$id")
        with switch_db(DataSource, TEST_DB_ALIAS) as Dat:
            ref_fields["datasource"] = Dat.objects.get(id=ds_id)
    return ref_fields
开发者ID:hdzierz,项目名称:Kaka,代码行数:14,代码来源:query_set_helpers.py


示例11: get_document_by_id

 def get_document_by_id(self, object_id):
     """
     metodo que retorna o documento procurando pelo object_id.
     """
     register_connections()
     with switch_db(self.model_class, OPAC_PROC_LOGS_DB_NAME):
         return self.model_class.objects.get(id=object_id)
开发者ID:jamilatta,项目名称:opac_proc,代码行数:7,代码来源:detail_views.py


示例12: task_delete_selected_collections

def task_delete_selected_collections(selected_uuids):
    """
        Task para apagar Coleções Carregadas.
        @param:
        - selected_uuids: lista de UUIDs dos documentos a serem removidos

        Se a lista `selected_uuids` for maior a SLICE_SIZE
            A lista será fatiada em listas de tamanho: SLICE_SIZE
        Se a lista `selected_uuids` for < a SLICE_SIZE
            Será feito uma delete direto no queryset
    """

    stage = 'load'
    model = 'collection'
    model_class = LoadCollection
    get_db_connection()
    r_queues = RQueues()
    SLICE_SIZE = 1000

    if len(selected_uuids) > SLICE_SIZE:
        list_of_list_of_uuids = list(chunks(selected_uuids, SLICE_SIZE))
        for list_of_uuids in list_of_list_of_uuids:
            uuid_as_string_list = [str(uuid) for uuid in list_of_uuids]
            r_queues.enqueue(stage, model, task_delete_selected_collections, uuid_as_string_list)
    else:
        # removemos o conjunto de documentos do LoadCollection indicados pelos uuids
        documents_to_delete = model_class.objects.filter(uuid__in=selected_uuids)
        documents_to_delete.delete()

        # convertemos os uuid para _id e filtramos esses documentos no OPAC
        register_connections()
        opac_pks = [str(uuid).replace('-', '') for uuid in selected_uuids]
        with switch_db(opac_models.Collection, OPAC_WEBAPP_DB_NAME) as opac_model:
            selected_opac_records = opac_model.objects.filter(pk__in=opac_pks)
            selected_opac_records.delete()
开发者ID:scieloorg,项目名称:opac_proc,代码行数:35,代码来源:jobs.py


示例13: switch_db

    def switch_db(self, db_alias):
        """
        Temporarily switch the database for a document instance.

        Only really useful for archiving off data and calling `save()`::

            user = User.objects.get(id=user_id)
            user.switch_db('archive-db')
            user.save()

        If you need to read from another database see
        :class:`~mongoengine.context_managers.switch_db`

        :param db_alias: The database alias to use for saving the document
        """
        with switch_db(self.__class__, db_alias) as cls:
            collection = cls._get_collection()
            db = cls._get_db()
        self._get_collection = lambda: collection
        self._get_db = lambda: db
        self._collection = collection
        self._created = True
        self.__objects = self._qs
        self.__objects._collection_obj = collection
        return self
开发者ID:kozdowy,项目名称:picgridder,代码行数:25,代码来源:document.py


示例14: run

def run(*args):
    global MODE
    if 'override' in args:
        Logger.Warning("OVERRIDE MODE!")
        MODE = "OVERRIDE"

    Logger.Message("Loading process in mode: " + MODE  + "started.")
    global db_alias
    if testing:
        db_alias = TEST_DB_ALIAS
    # for keeping track of documents saved to db by this run of script
    global created_doc_ids
    created_doc_ids = []

    dirs = DataDir.objects.all()
    for d in dirs:
        Logger.Message("Processing data dir: " + d.path)
        path = Path(d.path)
        try:
            look_for_config_dir(path)
        except Exception as e:
            Logger.Error(str(e))
            # 'Cancels' the script, by removing from db all documents saved to db in this script run-through
            for doc_type, doc_id in created_doc_ids:
                with switch_db(doc_type, db_alias) as Col:
                    Col.objects.get(id=doc_id).delete()
            raise e
开发者ID:hdzierz,项目名称:Kaka,代码行数:27,代码来源:load_from_config.py


示例15: test_yaml_marked_loaded_no_load

 def test_yaml_marked_loaded_no_load(self):
     """
     Test script does not load data from a directory where the config.yaml file has been marked as loaded
     """
     yaml_parser = configuration_parser.YamlConfigParser(path_string_yaml_full)
     yaml_parser.mark_loaded()
     load_from_config.load_in_dir(path_string_yaml)
     with switch_db(Experiment, TEST_DB_ALIAS) as TestEx:
         query = TestEx.objects.all()
         self.assertEqual(len(query), 0)
     with switch_db(DataSource, TEST_DB_ALIAS) as TestDs:
         query = TestDs.objects.all()
         self.assertEqual(len(query), 0)
     with switch_db(Genotype, TEST_DB_ALIAS) as TestGen:
         query = TestGen.objects.all()
         self.assertEqual(len(query), 0)
开发者ID:hdzierz,项目名称:Kaka,代码行数:16,代码来源:tests.py


示例16: switch_db

    def switch_db(self, db_alias, keep_created=True):
        """
        Temporarily switch the database for a document instance.

        Only really useful for archiving off data and calling `save()`::

            user = User.objects.get(id=user_id)
            user.switch_db('archive-db')
            user.save()

        :param str db_alias: The database alias to use for saving the document

        :param bool keep_created: keep self._created value after switching db, else is reset to True


        .. seealso::
            Use :class:`~mongoengine.context_managers.switch_collection`
            if you need to read from another collection
        """
        with switch_db(self.__class__, db_alias) as cls:
            collection = cls._get_collection()
            db = cls._get_db()
        self._get_collection = lambda: collection
        self._get_db = lambda: db
        self._collection = collection
        self._created = True if not keep_created else self._created
        self.__objects = self._qs
        self.__objects._collection_obj = collection
        return self
开发者ID:HiroIshikawa,项目名称:21playground,代码行数:29,代码来源:document.py


示例17: query_by_pi

 def query_by_pi(self):
     self.search_term = self.request.GET['search_pi'].strip()
     with switch_db(Experiment, self.db_alias) as db:
         query = db.objects if self.search_list is None else self.search_list
         self.search_list = query.filter(
             __raw__=self.raw_query_dict("pi", self.search_term)
         )
开发者ID:hdzierz,项目名称:Kaka,代码行数:7,代码来源:query_from_request.py


示例18: create_update_sources

def create_update_sources(data, db_alias):
    total_created = 0
    total_updated = 0

    logging.info('Processing %d sources' % len(data))

    from usda_mongo.models import Source

    with switch_db(Source, db_alias) as Source:

        for row in csv.DictReader(
            data, fieldnames=('src_cd', 'srccd_desc'),
            delimiter='^', quotechar='~'
        ):
            created = False

            try:
                source = Source.objects.get(code=int(row['src_cd']))
                total_updated += 1
            except Source.DoesNotExist:
                source = Source(code=int(row['src_cd']))
                total_created += 1
                created = True

            source.description = row['srccd_desc']
            source.save()

            if created:
                logging.debug('Created %s' % source)
            else:
                logging.debug('Updated %s' % source)

    logging.info('Created %d new sources' % total_created)
    logging.info('Updated %d sources' % total_updated)
开发者ID:javipalanca,项目名称:django-usda-mongo,代码行数:34,代码来源:import_sr22.py


示例19: test_connection_alias

    def test_connection_alias(self):

        test_db_name_2 = TESTDB_NAME+"_2"
        try:
            db_data2 = DatabaseData(test_db_name_2, collection="test_collection")
            db_data2.connect_mongoengine(alias="test_alias")

            db_data1 = DatabaseData(TESTDB_NAME, collection="test_collection")
            db_data1.connect_mongoengine()

            class TestDocument(Document):
                test = StringField()

            with switch_db(TestDocument, "test_alias") as TestDocument:
                with db_data2.switch_collection(TestDocument) as TestDocument:
                    TestDocument(test="abc").save()

            # check the collection with pymongo
            client = MongoClient()
            db = client[test_db_name_2]
            collection = db[db_data2.collection]
            documents = collection.find()
            assert documents.count() == 1
            assert documents[0]['test'] == "abc"
        finally:
            if self._connection:
                self._connection.drop_database(test_db_name_2)
开发者ID:gmatteo,项目名称:abiflows,代码行数:27,代码来源:test_utils.py


示例20: create_update_food_groups

def create_update_food_groups(data, db_alias):
    total_created = 0
    total_updated = 0

    logging.info('Processing %d food groups' % len(data))

    from usda_mongo.models import FoodGroup

    with switch_db(FoodGroup, db_alias) as FoodGroup:

        for row in csv.DictReader(
            data, fieldnames=('fdgrp_cd', 'fdgrp_desc'),
            delimiter='^', quotechar='~'
        ):
            created = False

            try:
                food_group = FoodGroup.objects.get(code=int(row['fdgrp_cd']))
                total_updated += 1
            except FoodGroup.DoesNotExist:
                food_group = FoodGroup(code=int(row['fdgrp_cd']))
                total_created += 1
                created = True

            food_group.description = row['fdgrp_desc']
            food_group.save()

            if created:
                logging.debug('Created %s' % food_group)
            else:
                logging.debug('Updated %s' % food_group)

        logging.info('Created %d new food groups' % total_created)
        logging.info('Updated %d food groups' % total_updated)
开发者ID:javipalanca,项目名称:django-usda-mongo,代码行数:34,代码来源:import_sr22.py



注:本文中的mongoengine.context_managers.switch_db函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python auth.User类代码示例发布时间:2022-05-27
下一篇:
Python context_managers.query_counter函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap