• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python pydal.DAL类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pydal.DAL的典型用法代码示例。如果您正苦于以下问题:Python DAL类的具体用法?Python DAL怎么用?Python DAL使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了DAL类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: testRun

    def testRun(self):
        db = DAL(DEFAULT_URI, check_reserved=['all'])
        dt=db._adapter.parsemap['datetime']('2015-09-04t12:33:36.223245', None)
        self.assertEqual(dt.microsecond, 223245)
        self.assertEqual(dt.hour, 12)

        dt=db._adapter.parsemap['datetime']('2015-09-04t12:33:36.223245Z', None)
        self.assertEqual(dt.microsecond, 223245)
        self.assertEqual(dt.hour, 12)

        dt=db._adapter.parsemap['datetime']('2015-09-04t12:33:36.223245-2:0', None)
        self.assertEqual(dt.microsecond, 223245)
        self.assertEqual(dt.hour, 10)

        dt=db._adapter.parsemap['datetime']('2015-09-04t12:33:36+1:0', None)
        self.assertEqual(dt.microsecond, 0)
        self.assertEqual(dt.hour, 13)

        dt=db._adapter.parsemap['datetime']('2015-09-04t12:33:36.123', None)
        self.assertEqual(dt.microsecond, 123000)

        dt=db._adapter.parsemap['datetime']('2015-09-04t12:33:36.00123', None)
        self.assertEqual(dt.microsecond, 1230)

        dt=db._adapter.parsemap['datetime']('2015-09-04t12:33:36.1234567890', None)
        self.assertEqual(dt.microsecond, 123456)
        db.close()
开发者ID:IAPT-Eight,项目名称:trade,代码行数:27,代码来源:base.py


示例2: DyTables

class DyTables(object):
    def __init__(self, uri=None):
        self._uri = uri
        self._schema = uri.split("/")[-1]
        self._dal = DAL(self._uri)
        self._datatapy_dict = datatype_mysql()
        self.get_tables()

    def get_tables(self):
        _tables = GetAllTables(uri="/".join(self._uri.split("/")[:-1])
                                   + "/information_schema",
                               schema=self._schema)
        for numb, table in enumerate(_tables):
            fields = []
            for field in _tables.get(table):
                try:
                    fields.append(Field(field[0], self._datatapy_dict[field[1]]))
                except SyntaxError:
                    fields.append(Field("r_" + field[0],
                                        self._datatapy_dict[field[1]],
                                        rname=field[0]))
            self._dal.define_table(table, *fields, primarykey=[], migrate=False)

    def get_db(self):
        return self._dal
开发者ID:lg31415,项目名称:antitools,代码行数:25,代码来源:DyTables.py


示例3: testRun

    def testRun(self):
        db = DAL(DEFAULT_URI, check_reserved=['all'])

        #: skip for adapters that use drivers for datetime parsing
        if db._adapter.parser.registered.get('datetime') is None:
            return

        parse = lambda v: db._adapter.parser.parse(v, 'datetime', 'datetime')

        dt = parse('2015-09-04t12:33:36.223245')
        self.assertEqual(dt.microsecond, 223245)
        self.assertEqual(dt.hour, 12)

        dt = parse('2015-09-04t12:33:36.223245Z')
        self.assertEqual(dt.microsecond, 223245)
        self.assertEqual(dt.hour, 12)

        dt = parse('2015-09-04t12:33:36.223245-2:0')
        self.assertEqual(dt.microsecond, 223245)
        self.assertEqual(dt.hour, 10)

        dt = parse('2015-09-04t12:33:36+1:0')
        self.assertEqual(dt.microsecond, 0)
        self.assertEqual(dt.hour, 13)

        dt = parse('2015-09-04t12:33:36.123')
        self.assertEqual(dt.microsecond, 123000)

        dt = parse('2015-09-04t12:33:36.00123')
        self.assertEqual(dt.microsecond, 1230)

        dt = parse('2015-09-04t12:33:36.1234567890')
        self.assertEqual(dt.microsecond, 123456)
        db.close()
开发者ID:Pac23,项目名称:book2-exercises,代码行数:34,代码来源:base.py


示例4: testRun

 def testRun(self):
     db = DAL(DEFAULT_URI, check_reserved=["all"], entity_quoting=True)
     db.define_table("tt", Field("aa"), Field("bb", "boolean"))
     sql = db._adapter.dialect.create_index("idx_aa_f", db.tt, [db.tt.aa], where=str(db.tt.bb == False))
     self.assertEqual(sql, 'CREATE INDEX "idx_aa_f" ON "tt" ("aa") WHERE ("tt"."bb" = \'F\');')
     rv = db.tt.create_index("idx_aa_f", db.tt.aa, where=(db.tt.bb == False))
     self.assertTrue(rv)
     rv = db.tt.drop_index("idx_aa_f")
     self.assertTrue(rv)
     drop(db.tt)
开发者ID:topleft,项目名称:book2-exercises,代码行数:10,代码来源:indexes.py


示例5: get_sys_table

def get_sys_table(uri="mysql://lms_test:[email protected]/information_schema"):
    sys_tab = DAL(uri=uri)

    sys_tab.define_table('COLUMNS',
                         Field("TABLE_SCHEMA", ),
                         Field("TABLE_NAME"),
                         Field("COLUMN_NAME"),
                         Field("IS_NULLABLE"),
                         Field("DATA_TYPE"),
                         Field("COLUMN_TYPE"),
                         primarykey=[],
                         migrate=False)
    return sys_tab
开发者ID:lg31415,项目名称:antitools,代码行数:13,代码来源:TabaleSchema.py


示例6: testRun

 def testRun(self):
     db = DAL(DEFAULT_URI, check_reserved=['all'])
     db.define_table('tt', Field('vv'))
     db.define_table('ttt', Field('vv'), Field('tt_id', 'reference tt', notnull=True))
     self.assertRaises(Exception, db.ttt.insert, vv='pydal')
     # The following is mandatory for backends as PG to close the aborted transaction
     db.commit()
     drop(db.ttt)
     drop(db.tt)
     db.close()
开发者ID:leichunxin,项目名称:pydal,代码行数:10,代码来源:base.py


示例7: testRun

 def testRun(self):
     cache = SimpleCache()
     db = DAL(DEFAULT_URI, check_reserved=['all'])
     db.define_table('tt', Field('aa'))
     db.tt.insert(aa='1')
     r0 = db().select(db.tt.ALL)
     r1 = db().select(db.tt.ALL, cache=(cache, 1000))
     self.assertEqual(len(r0), len(r1))
     r2 = db().select(db.tt.ALL, cache=(cache, 1000))
     self.assertEqual(len(r0), len(r2))
     r3 = db().select(db.tt.ALL, cache=(cache, 1000), cacheable=True)
     self.assertEqual(len(r0), len(r3))
     r4 = db().select(db.tt.ALL, cache=(cache, 1000), cacheable=True)
     self.assertEqual(len(r0), len(r4))
     drop(db.tt)
开发者ID:Gabba-git,项目名称:Assignments,代码行数:15,代码来源:caching.py


示例8: testRun

    def testRun(self):
        db = DAL(DEFAULT_URI, check_reserved=['all'])
        t1 = db.define_table('t1', Field('int_level', requires=IS_INT_IN_RANGE(1, 5)))
        i_response = t1.validate_and_update_or_insert((t1.int_level == 1), int_level=1)
        u_response = t1.validate_and_update_or_insert((t1.int_level == 1), int_level=2)
        e_response = t1.validate_and_update_or_insert((t1.int_level == 1), int_level=6)
        self.assertTrue(i_response.id != None)
        self.assertTrue(u_response.id != None)
        self.assertTrue(e_response.id == None and len(e_response.errors.keys()) != 0)
        self.assertTrue(db(t1).count() == 1)
        self.assertTrue(db(t1.int_level == 1).count() == 0)
        self.assertTrue(db(t1.int_level == 6).count() == 0)
        self.assertTrue(db(t1.int_level == 2).count() == 1)
        db.t1.drop()
	return
开发者ID:matheuscas,项目名称:openshift_web2py,代码行数:15,代码来源:validation.py


示例9: __init__

    def __init__(self, db_user, db_pass, db_host, db_name, migrate=False):
        super(DNSAPI, self).__init__()
        self.db = DAL("postgres://%s:%[email protected]%s/%s" % (db_user, db_pass, db_host, db_name), migrate=migrate)

        if self.db:
            print 'Successfully connected to db "%s" on host "%s"' % (db_name, db_host)

        self.db.define_table(
            "dns_zones",
            Field(
                "name", "string"
            ),  # ends in . (e.g. example.com.); input should probably have a validator to ensure zones end in a .
        )

        self.db.define_table(
            "dns_zone_records",
            Field("zone", "reference dns_zones"),
            Field("record_name", "string"),  # (e.g. ns1.example.com.)
            Field(
                "record_type", "string", default="A", requires=IS_IN_SET(RECORD_TYPES)
            ),  # (e.g. A, AAAA, CNAME, MX, NS)
            Field(
                "record_value", "string"
            ),  # (e.g. an IP for A or AAAA, an address for CNAME, and an address and priority for MX)
            Field(
                "record_ttl", "integer", default=60 * 5
            ),  # A TTL in seconds before a client should check for a new value. Can reasonably set to lower or higher depending on the volatility of the records
        )
开发者ID:Annixa,项目名称:dockerlab-dns,代码行数:28,代码来源:dnsapi.py


示例10: setUp

    def setUp(self):
        db = DAL('sqlite:memory')

        db.define_table('color', Field('name', requires=IS_NOT_IN_DB(db, 'color.name')))
        db.color.insert(name='red')
        db.color.insert(name='green')
        db.color.insert(name='blue')

        db.define_table('thing', Field('name'), Field('color', 'reference color'))
        db.thing.insert(name='Chair', color=1)
        db.thing.insert(name='Chair', color=2)
        db.thing.insert(name='Table', color=1)
        db.thing.insert(name='Table', color=3)
        db.thing.insert(name='Lamp', color=2)
    
        db.define_table('rel', Field('a', 'reference thing'), Field('desc'), Field('b','reference thing'))
        db.rel.insert(a=1, b=2, desc='is like')
        db.rel.insert(a=3, b=4, desc='is like')
        db.rel.insert(a=1, b=3, desc='is under')
        db.rel.insert(a=2, b=4, desc='is under')
        db.rel.insert(a=5, b=4, desc='is above')

        api = DBAPI(db, ALLOW_ALL_POLICY)

        self.db = db
        self.api = api
开发者ID:web2py,项目名称:pydal,代码行数:26,代码来源:dbapi.py


示例11: DBCache

class DBCache(Cache):
    '''An implementation of the c9r.file.cache.FileCache with PyDAL using
    database.
    '''
    defaults = {
        'db': 'sqlite://cache.db',      # Database URL
        }
    def_conf = ['~/.etc/cache-conf.json']

    def clear(self, clear_all=True):
        '''Remove the entire content(s) in the cache.
        '''
        db = self.db
        db((db.vars.id>=0) if clear_all else (db.vars.expires<time())).delete()

    def clear_cache(self, vset, names):
        '''
        '''

    def get(self, vset, name):
        '''Get a named data from this cache.
        '''
        db = self.db
        rows = db((db.vars.name==name)&(db.vars.expires<time())).select()
        return rows[0].value

    def put(self, vset, name, data):
        '''Save given data into the cache with given name.
        '''
        self.db.vars.insert(name=name, value=data, expires=time()+self.window)

    def __init__(self, conf=[], initconf=None):
        '''
        '''
        Cache.__init__(self, conf, initconf)
        self.db = DAL(self.config('db'))
        self.db.define_table('varset', Field('name'))
        self.db.define_table('vars',
                             Field('name'), Field('value', 'json'),
                             Field('varset'),
                             Field('expires', 'integer'),
                             primarykey=['varset', 'name'])
        self.window = int(self.config('window'))
开发者ID:ww9rivers,项目名称:c9r,代码行数:43,代码来源:cache.py


示例12: testRun

 def testRun(self):
     for ref, bigint in [('reference', False), ('big-reference', True)]:
         db = DAL(DEFAULT_URI, check_reserved=['all'], bigint_id=bigint)
         db.define_table('tt', Field('vv'))
         db.define_table('ttt', Field('vv'), Field('tt_id', '%s tt' % ref,
                                                   unique=True))
         id_i = db.tt.insert(vv='pydal')
         # Null tt_id
         db.ttt.insert(vv='pydal')
         # first insert is OK
         db.ttt.insert(tt_id=id_i)
         self.assertRaises(Exception, db.ttt.insert, tt_id=id_i)
         # The following is mandatory for backends as PG to close the aborted transaction
         db.commit()
         drop(db.ttt)
         drop(db.tt)
         db.close()
开发者ID:boa-py,项目名称:pydal,代码行数:17,代码来源:base.py


示例13: __init__

 def __init__(self, conf=[], initconf=None):
     '''
     '''
     Cache.__init__(self, conf, initconf)
     self.db = DAL(self.config('db'))
     self.db.define_table('varset', Field('name'))
     self.db.define_table('vars',
                          Field('name'), Field('value', 'json'),
                          Field('varset'),
                          Field('expires', 'integer'),
                          primarykey=['varset', 'name'])
     self.window = int(self.config('window'))
开发者ID:ww9rivers,项目名称:c9r,代码行数:12,代码来源:cache.py


示例14: DbHelper

class DbHelper(object):
	"""docstring for DbHelper"""
	def __init__(self, arg):
		super(DbHelper, self).__init__()
		self.arg = arg
		self.db = DAL('mongodb://140.143.247.178:27099/spider')
		self.define_table()
		'''
		self.db.thing.insert(name='Chair')
		query = self.db.thing.name.startswith('C')
		rows = self.db(query).select()
		print(rows[0].name)
		self.db.commit()
		'''

	def define_table(self):
		print(self.db._dbname)
		self.db.define_table('douban_topic',Field('title'),Field('title_url'),Field('people'),Field('people_url')
                             ,Field('replay_num'),Field('post_time'))

	def insert_models(self,table_name='',items=[]):
		a = list(map(dict,items))
		self.db.douban_topic.bulk_insert(a)
		self.db.commit()
开发者ID:jianglieshan,项目名称:SpiderProj,代码行数:24,代码来源:DBHelper.py


示例15: DAL

	valid_transitions = [
                  { 'id':1, 'trigger':'begin',     'source':'start',  'dest':'proc',   'after':'increase_processings' },
                  { 'id':2, 'trigger':'end',       'source':'proc',   'dest':'finish', 'after':'noop'            },
                  { 'id':3, 'trigger':'reprocess', 'source':'finish', 'dest':'proc',   'after':'increase_processings' }
                  ]
	db.config_workflow.truncate()
	db.config_wfstate.truncate()
	for i in valid_transitions:
            db.config_workflow.insert(trigger=i['trigger'], source=i['source'], dest=i['dest'], after=i['after'])
        db.config_wfstate.insert(name='start')
        db.config_wfstate.insert(name='proc')
        db.config_wfstate.insert(name='finish')
        db.commit()


db = DAL(uri='sqlite://temp.db', folder='db')
db.define_table('config_workflow', Field('trigger'), Field('source'), Field('dest'), Field('after'))
db.define_table('config_wfstate',  Field('name'))
db_config_init(db)
flow = db(db.config_workflow).select()
#state = db(db.config_wfstate).select()
state=['start','proc','finish']

m = Mincer(valid_states=state, valid_transitions=flow, initial='start')

print state
print flow
print m.state			
m.begin()
m.end()
m.reprocess()
开发者ID:BartGo,项目名称:transition-drafts,代码行数:31,代码来源:main.py


示例16: DAL

This program is a demo of how to use the PyDAL and xtopdf Python libraries 
together to publish database data to PDF.
PyDAL is at: https://github.com/web2py/pydal/blob/master/README.md
xtopdf is at: https://bitbucket.org/vasudevram/xtopdf
and info about xtopdf is at: http://slides.com/vasudevram/xtopdf or 
at: http://slid.es/vasudevram/xtopdf
"""

# imports
from pydal import DAL, Field
from PDFWriter import PDFWriter

SEP = 60

# create the database
db = DAL('sqlite://house_depot.db')

# define the table
db.define_table('furniture', \
    Field('id'), Field('name'), Field('quantity'), Field('unit_price')
)

# insert rows into table
items = ( \
    (1, 'chair', 40, 50),
    (2, 'table', 10, 300),
    (3, 'cupboard', 20, 200),
    (4, 'bed', 30, 400)
)
for item in items:
    db.furniture.insert(id=item[0], name=item[1], quantity=item[2], unit_price=item[3])
开发者ID:jacob-carrier,项目名称:code,代码行数:31,代码来源:recipe-579004.py


示例17: DAL

#!/usr/bin/python
# -*- coding: utf-8 -*-

import os
from core.crud import crud
from pydal import DAL, Field

##db = DAL('mysql://root:[email protected]/fasa', migrate=False)

db = DAL('sqlite://'+os.path.join('modelos','pyfactura.db'), migrate=False)

db.define_table('pagos',
	Field('pago', 'string', length=2, required=True,),
	Field('detalle', 'string', length=40, required=True,),
	Field('dia', 'date', length=8),
	Field('des1', 'decimal(12,2)', length=10),
	primarykey=['pago'],
)

formato = {'pago':{'width':30, 'text':'Pagos', 'id':True},
}

def main():
	abm = crud(tabla=db.pagos, 
		basedatos=db,
		formato=formato,
        )

if __name__ == '__main__':
	main()
	
开发者ID:UniversidadDelEste,项目名称:guiabm,代码行数:30,代码来源:pagos.py


示例18: model

def model():
    db = DAL('sqlite://pin.db',pool_size=1,folder='./',migrate=False)
    Pin=db.define_table('pin',Field('title'),Field('image'))
    return (db,Pin)
开发者ID:assiri,项目名称:Flask_angular_pyDal,代码行数:4,代码来源:pin.py


示例19: DAL

from pydal import DAL, Field

db = DAL('sqlite:memory:')

db.define_table('persons',
	Field('name'),
	Field('age')
)

amy = db.persons.insert( name='Amy', age=52 )
bob = db.persons.insert( name='Bob', age=48 )
cat = db.persons.insert( name='Cat', age=23 )
dan = db.persons.insert( name='Dan', age=17 )
edd = db.persons.insert( name='Edd', age=77 )
fan = db.persons.insert( name='Fan', age=65 )
gin = db.persons.insert( name='Gin', age=27 )
hil = db.persons.insert( name='Hil', age=30 )
iri = db.persons.insert( name='Iri', age=62 )
jac = db.persons.insert( name='Jac', age=18 )
db.commit()

# Export the 'persons' database
with open( 'persons.csv', 'wb' ) as f:
	f.write( str(db(db.persons.id).select()) )

# Export only the young persons
with open( 'young-people.csv', 'wb') as f:
	people = db( db.persons.age <= 30 ).select()
	f.write( str( people ) )

开发者ID:ippo615,项目名称:experiments,代码行数:29,代码来源:export.py


示例20: __init__

	def __init__(self, arg):
		super(DbHelper, self).__init__()
		self.arg = arg
		self.db = DAL('mongodb://140.143.247.178:27099/spider')
		self.define_table()
		'''
开发者ID:jianglieshan,项目名称:SpiderProj,代码行数:6,代码来源:DBHelper.py



注:本文中的pydal.DAL类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python base.BaseAdapter类代码示例发布时间:2022-05-25
下一篇:
Python pydal.dalDataset函数代码示例发布时间:2022-05-25
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap