• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python db.execute函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中nailgun.db.db.execute函数的典型用法代码示例。如果您正苦于以下问题:Python execute函数的具体用法?Python execute怎么用?Python execute使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了execute函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_new_fields_exists_and_empty

    def test_new_fields_exists_and_empty(self):
        # check attributes_metadata field exists
        result = db.execute(
            sa.select([self.meta.tables['plugins'].c.attributes_metadata]))
        # check attributes_metadata value is empty
        self.assertEqual(
            jsonutils.loads(result.fetchone()[0]), {})

        result = db.execute(
            sa.select([self.meta.tables['plugins'].c.volumes_metadata]))
        self.assertEqual(
            jsonutils.loads(result.fetchone()[0]), {})

        result = db.execute(
            sa.select([self.meta.tables['plugins'].c.roles_metadata]))
        self.assertEqual(
            jsonutils.loads(result.fetchone()[0]), {})

        result = db.execute(
            sa.select([self.meta.tables['plugins'].c.deployment_tasks]))
        self.assertEqual(
            jsonutils.loads(result.fetchone()[0]), [])

        result = db.execute(
            sa.select([self.meta.tables['plugins'].c.tasks]))
        self.assertEqual(
            jsonutils.loads(result.fetchone()[0]), [])
开发者ID:blkart,项目名称:fuel-web,代码行数:27,代码来源:test_migration_fuel_7_0.py


示例2: test_new_columns_exist

    def test_new_columns_exist(self):
        deployment_graphs_table = self.meta.tables['deployment_graphs']
        db.execute(
            deployment_graphs_table.insert(),
            {
                'name': 'test',
                'node_filter': '$.status == ready',
                'on_success': '{"node_attributes": {"status": "new"}}',
                'on_error': '{"node_attributes": {"status": "error"}}',
                'on_stop': '{}'
            }
        )

        result = db.execute(
            sa.select([
                deployment_graphs_table.c.node_filter,
                deployment_graphs_table.c.on_success,
                deployment_graphs_table.c.on_error,
                deployment_graphs_table.c.on_stop,
            ]).where(deployment_graphs_table.c.name == 'test')
        ).first()

        self.assertEqual('$.status == ready', result['node_filter'])
        self.assertEqual(
            '{"node_attributes": {"status": "new"}}', result['on_success']
        )
        self.assertEqual(
            '{"node_attributes": {"status": "error"}}', result['on_error']
        )
        self.assertEqual('{}', result['on_stop'])
开发者ID:openstack,项目名称:fuel-web,代码行数:30,代码来源:test_migration_fuel_9_1.py


示例3: test_cascade_release_deletion

 def test_cascade_release_deletion(self):
     release_component_table = self.meta.tables['release_components']
     release_table = self.meta.tables['releases']
     release_id = insert_table_row(
         release_table,
         {
             'name': 'release_with_components',
             'version': '2014.2.2-6.1',
             'operating_system': 'ubuntu',
             'state': 'available'
         }
     )
     component_id = insert_table_row(
         self.meta.tables['components'],
         {'name': six.text_type(uuid.uuid4()), 'type': 'hypervisor'}
     )
     insert_table_row(
         release_component_table,
         {'release_id': release_id, 'component_id': component_id}
     )
     db.execute(
         sa.delete(release_table).where(release_table.c.id == release_id))
     deleted_plugin_components = db.execute(
         sa.select([sa.func.count(release_component_table.c.id)]).
         where(release_component_table.c.release_id == release_id)
     ).fetchone()[0]
     self.assertEqual(deleted_plugin_components, 0)
开发者ID:gdyuldin,项目名称:fuel-web,代码行数:27,代码来源:test_migration_fuel_8_0.py


示例4: prepare

    def prepare(cls, meta, cluster_id):
        cls.test_link['cluster_id'] = cluster_id

        db.execute(
            meta.tables['cluster_plugin_links'].insert(),
            [cls.test_link],
        )
开发者ID:sebrandon1,项目名称:fuel-web,代码行数:7,代码来源:test_downgrade_fuel_10_0.py


示例5: test_network_group_name_is_string

 def test_network_group_name_is_string(self):
     db.execute(
         self.meta.tables["network_groups"].insert(),
         [{"id": 3, "name": "custom_name", "vlan_start": None, "cidr": "10.20.0.0/24", "gateway": "10.20.0.200"}],
     )
     names = db.execute(sa.select([self.meta.tables["network_groups"].c.name])).fetchall()
     self.assertIn(("custom_name",), names)
开发者ID:mmalchuk,项目名称:openstack-fuel-web,代码行数:7,代码来源:test_migration_fuel_7_0.py


示例6: test_tun_segment_type_added

 def test_tun_segment_type_added(self):
     result = db.execute(
         self.meta.tables["networking_configs"].insert(),
         [
             {
                 "cluster_id": None,
                 "dns_nameservers": ["8.8.8.8"],
                 "floating_ranges": [],
                 "configuration_template": None,
             }
         ],
     )
     db.execute(
         self.meta.tables["neutron_config"].insert(),
         [
             {
                 "id": result.inserted_primary_key[0],
                 "vlan_range": [],
                 "gre_id_range": [],
                 "base_mac": "00:00:00:00:00:00",
                 "internal_cidr": "10.10.10.00/24",
                 "internal_gateway": "10.10.10.01",
                 "segmentation_type": "tun",
                 "net_l23_provider": "ovs",
             }
         ],
     )
     types = db.execute(sa.select([self.meta.tables["neutron_config"].c.segmentation_type])).fetchall()
     self.assertIn(("tun",), types)
开发者ID:mmalchuk,项目名称:openstack-fuel-web,代码行数:29,代码来源:test_migration_fuel_7_0.py


示例7: test_new_fields_exists_and_empty

 def test_new_fields_exists_and_empty(self):
     # check node_nic_interfaces fields
     result = db.execute(sa.select([self.meta.tables["node_nic_interfaces"].c.offloading_modes]))
     self.assertEqual(jsonutils.loads(result.fetchone()[0]), [])
     # the same for bond interfaces
     result = db.execute(sa.select([self.meta.tables["node_bond_interfaces"].c.offloading_modes]))
     self.assertEqual(jsonutils.loads(result.fetchone()[0]), [])
开发者ID:mmalchuk,项目名称:openstack-fuel-web,代码行数:7,代码来源:test_migration_fuel_7_0.py


示例8: prepare

def prepare():
    meta = base.reflect_db_metadata()

    result = db.execute(
        meta.tables['releases'].insert(),
        [{
            'name': 'test_name',
            'version': '2015.1-8.0',
            'operating_system': 'ubuntu',
            'state': 'available',
            'networks_metadata': jsonutils.dumps({
                'neutron': {
                    'networks': [],
                    'config': {}
                }
            })
        }]
    )
    releaseid = result.inserted_primary_key[0]

    db.execute(
        meta.tables['clusters'].insert(),
        [{
            'name': 'test_env',
            'release_id': releaseid,
            'mode': 'ha_compact',
            'status': 'new',
            'net_provider': 'neutron',
            'grouping': 'roles',
            'fuel_version': '8.0',
            'deployment_tasks': '[]',
            'replaced_deployment_info': '{}'
        }])

    db.commit()
开发者ID:ekorekin,项目名称:fuel-web,代码行数:35,代码来源:test_migration_fuel_9_0_1.py


示例9: prepare

def prepare():
    meta = base.reflect_db_metadata()

    releaseid = insert_table_row(
        meta.tables["releases"],
        {"name": "test_name", "version": "2014.2.2-6.1", "operating_system": "ubuntu", "state": "available"},
    )

    clusterid = insert_table_row(
        meta.tables["clusters"],
        {
            "name": "test_env",
            "release_id": releaseid,
            "mode": "ha_compact",
            "status": "new",
            "net_provider": "neutron",
            "grouping": "roles",
            "fuel_version": "6.1",
        },
    )

    db.execute(
        meta.tables["nodegroups"].insert(),
        [
            {"cluster_id": clusterid, "name": "test_nodegroup_a"},
            {"cluster_id": clusterid, "name": "test_nodegroup_a"},
            {"cluster_id": clusterid, "name": "test_nodegroup_b"},
            {"cluster_id": clusterid, "name": "test_nodegroup_b"},
        ],
    )

    db.commit()
开发者ID:ibelikov,项目名称:fuel-web,代码行数:32,代码来源:test_migration_fuel_8_0.py


示例10: test_tun_segment_type_added

 def test_tun_segment_type_added(self):
     result = db.execute(
         self.meta.tables['networking_configs'].insert(),
         [{
             'cluster_id': None,
             'dns_nameservers': ['8.8.8.8'],
             'floating_ranges': [],
             'configuration_template': None,
         }])
     db.execute(
         self.meta.tables['neutron_config'].insert(),
         [{
             'id': result.inserted_primary_key[0],
             'vlan_range': [],
             'gre_id_range': [],
             'base_mac': '00:00:00:00:00:00',
             'internal_cidr': '10.10.10.00/24',
             'internal_gateway': '10.10.10.01',
             'segmentation_type': 'tun',
             'net_l23_provider': 'ovs'
         }])
     types = db.execute(
         sa.select(
             [self.meta.tables['neutron_config'].c.segmentation_type])).\
         fetchall()
     self.assertIn(('tun',), types)
开发者ID:linglong0820,项目名称:fuel-web,代码行数:26,代码来源:test_migration_fuel_7_0.py


示例11: test_openstack_configs_table_saving

    def test_openstack_configs_table_saving(self):
        result = db.execute(
            sa.select([self.meta.tables['clusters'].c.id]))
        cluster_id = result.fetchone()[0]

        db.execute(
            self.meta.tables['openstack_configs'].insert(),
            [{
                'cluster_id': cluster_id,
                'is_active': True,
                'config_type': 'cluster',
                'node_id': None,
                'node_role': None,
                'created_at': datetime.now(),
                'configuration': jsonutils.dumps({
                    'config_a': {},
                    'config_b': {},
                }),
            }]
        )

        result = db.execute(
            sa.select([self.meta.tables['openstack_configs'].c.cluster_id]))
        config = result.fetchone()
        self.assertEqual(config[0], cluster_id)
开发者ID:ansumanbebarta,项目名称:fuel-web,代码行数:25,代码来源:test_migration_fuel_8_0.py


示例12: test_baremetal_fields_saving

 def test_baremetal_fields_saving(self):
     baremetal_gateway = '192.168.3.51'
     baremetal_range = jsonutils.dumps(['192.168.3.52', '192.168.3.254'])
     result = db.execute(
         self.meta.tables['networking_configs'].insert(),
         [{
             'cluster_id': None,
             'dns_nameservers': ['8.8.8.8'],
             'floating_ranges': [],
             'configuration_template': None,
         }])
     db.execute(
         self.meta.tables['neutron_config'].insert(),
         [{
             'id': result.inserted_primary_key[0],
             'vlan_range': [],
             'gre_id_range': [],
             'base_mac': '00:00:00:00:00:00',
             'internal_cidr': '10.10.10.00/24',
             'internal_gateway': '10.10.10.01',
             'internal_name': 'my_internal_name',
             'floating_name': 'my_floating_name',
             'baremetal_gateway': baremetal_gateway,
             'baremetal_range': baremetal_range,
             'segmentation_type': 'vlan',
             'net_l23_provider': 'ovs'
         }])
     result = db.execute(
         sa.select(
             [self.meta.tables['neutron_config'].c.baremetal_gateway,
              self.meta.tables['neutron_config'].c.baremetal_range])).\
         fetchall()
     self.assertIn((baremetal_gateway, baremetal_range), result)
开发者ID:ansumanbebarta,项目名称:fuel-web,代码行数:33,代码来源:test_migration_fuel_8_0.py


示例13: test_moving_plugin_attributes

    def test_moving_plugin_attributes(self):
        clusters = self.meta.tables['clusters']
        attributes = self.meta.tables['attributes']
        plugins = self.meta.tables['plugins']
        cluster_plugins = self.meta.tables['cluster_plugins']

        query = sa.select([attributes.c.editable])\
            .select_from(
                sa.join(
                    attributes, clusters,
                    attributes.c.cluster_id == clusters.c.id))
        result = jsonutils.loads(db.execute(query).fetchone()[0])
        self.assertItemsEqual(result, {})

        query = sa.select([cluster_plugins.c.attributes])\
            .select_from(
                sa.join(
                    cluster_plugins, plugins,
                    cluster_plugins.c.plugin_id == plugins.c.id))\
            .where(plugins.c.name == 'test_plugin_a')
        result = jsonutils.loads(db.execute(query).fetchone()[0])
        self.assertNotIn('metadata', result)
        self.assertItemsEqual(result['attribute'], {
            'value': 'value',
            'type': 'text',
            'description': 'description',
            'weight': 25,
            'label': 'label'
        })
开发者ID:ansumanbebarta,项目名称:fuel-web,代码行数:29,代码来源:test_migration_fuel_8_0.py


示例14: test_unique_name_fields_insert_unique

 def test_unique_name_fields_insert_unique(self):
     nodegroup = db.execute(
         sa.select([self.meta.tables['nodegroups'].c.cluster_id,
                    self.meta.tables['nodegroups'].c.name])).fetchone()
     db.execute(self.meta.tables['nodegroups'].insert(),
                [{'cluster_id': nodegroup['cluster_id'],
                  'name': six.text_type(uuid.uuid4())}])
开发者ID:ansumanbebarta,项目名称:fuel-web,代码行数:7,代码来源:test_migration_fuel_8_0.py


示例15: test_unique_name_fields_insert_unique

 def test_unique_name_fields_insert_unique(self):
     nodegroup = db.execute(
         sa.select([self.meta.tables["nodegroups"].c.cluster_id, self.meta.tables["nodegroups"].c.name])
     ).fetchone()
     db.execute(
         self.meta.tables["nodegroups"].insert(),
         [{"cluster_id": nodegroup["cluster_id"], "name": six.text_type(uuid.uuid4())}],
     )
开发者ID:ibelikov,项目名称:fuel-web,代码行数:8,代码来源:test_migration_fuel_8_0.py


示例16: test_extensions_field_with_default_data

    def test_extensions_field_with_default_data(self):
        cluster_result = db.execute(
            sa.select([self.meta.tables['clusters'].c.extensions])).fetchone()
        release_result = db.execute(
            sa.select([self.meta.tables['releases'].c.extensions])).fetchone()

        self.assertEqual(list(cluster_result)[0], ['volume_manager'])
        self.assertEqual(list(release_result)[0], ['volume_manager'])
开发者ID:linglong0820,项目名称:fuel-web,代码行数:8,代码来源:test_migration_fuel_7_0.py


示例17: test_cluster_status_upgraded

    def test_cluster_status_upgraded(self):
        clusters_table = self.meta.tables['clusters']
        columns = [clusters_table.c.id, clusters_table.c.status]
        cluster = db.execute(sa.select(columns)).fetchone()

        db.execute(clusters_table.update().where(
            clusters_table.c.id == cluster.id
        ).values(status='partially_deployed'))
开发者ID:ekorekin,项目名称:fuel-web,代码行数:8,代码来源:test_migration_fuel_9_0.py


示例18: test_new_field_exists_and_filled

 def test_new_field_exists_and_filled(self):
     nic_table = self.meta.tables["node_nic_interfaces"]
     result = db.execute(sa.select([nic_table.c.pxe]).where(nic_table.c.id == 1))
     # check 'pxe' property is true for admin interfaces
     self.assertTrue(result.fetchone()[0])
     result = db.execute(sa.select([nic_table.c.pxe]).where(nic_table.c.id != 1))
     # and 'false' for any others
     for res in result.fetchall():
         self.assertFalse(res[0])
开发者ID:mmalchuk,项目名称:openstack-fuel-web,代码行数:9,代码来源:test_migration_fuel_7_0.py


示例19: test_new_fields_exists_and_empty

    def test_new_fields_exists_and_empty(self):
        result = db.execute(
            sa.select([self.meta.tables['networking_configs']
                       .c.configuration_template]))
        self.assertIsNone(result.fetchone()[0])

        result = db.execute(
            sa.select([self.meta.tables['nodes']
                       .c.network_template]))
        self.assertIsNone(result.fetchone()[0])
开发者ID:linglong0820,项目名称:fuel-web,代码行数:10,代码来源:test_migration_fuel_7_0.py


示例20: test_minimal_task_creation_success

 def test_minimal_task_creation_success(self):
     deployment_graph_id = self._insert_deployment_graph()
     db.execute(
         self.meta.tables['deployment_graph_tasks'].insert(),
         {
             'deployment_graph_id': deployment_graph_id,
             'task_name': 'minimal task',
             'type': 'puppet'
         },
     )
开发者ID:ekorekin,项目名称:fuel-web,代码行数:10,代码来源:test_migration_fuel_9_0.py



注:本文中的nailgun.db.db.execute函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python db.query函数代码示例发布时间:2022-05-27
下一篇:
Python db.commit函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap