本文整理汇总了Python中pydal.DAL类的典型用法代码示例。如果您正苦于以下问题:Python DAL类的具体用法?Python DAL怎么用?Python DAL使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了DAL类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: testRun
def testRun(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
dt=db._adapter.parsemap['datetime']('2015-09-04t12:33:36.223245', None)
self.assertEqual(dt.microsecond, 223245)
self.assertEqual(dt.hour, 12)
dt=db._adapter.parsemap['datetime']('2015-09-04t12:33:36.223245Z', None)
self.assertEqual(dt.microsecond, 223245)
self.assertEqual(dt.hour, 12)
dt=db._adapter.parsemap['datetime']('2015-09-04t12:33:36.223245-2:0', None)
self.assertEqual(dt.microsecond, 223245)
self.assertEqual(dt.hour, 10)
dt=db._adapter.parsemap['datetime']('2015-09-04t12:33:36+1:0', None)
self.assertEqual(dt.microsecond, 0)
self.assertEqual(dt.hour, 13)
dt=db._adapter.parsemap['datetime']('2015-09-04t12:33:36.123', None)
self.assertEqual(dt.microsecond, 123000)
dt=db._adapter.parsemap['datetime']('2015-09-04t12:33:36.00123', None)
self.assertEqual(dt.microsecond, 1230)
dt=db._adapter.parsemap['datetime']('2015-09-04t12:33:36.1234567890', None)
self.assertEqual(dt.microsecond, 123456)
db.close()
开发者ID:IAPT-Eight,项目名称:trade,代码行数:27,代码来源:base.py
示例2: DyTables
class DyTables(object):
def __init__(self, uri=None):
self._uri = uri
self._schema = uri.split("/")[-1]
self._dal = DAL(self._uri)
self._datatapy_dict = datatype_mysql()
self.get_tables()
def get_tables(self):
_tables = GetAllTables(uri="/".join(self._uri.split("/")[:-1])
+ "/information_schema",
schema=self._schema)
for numb, table in enumerate(_tables):
fields = []
for field in _tables.get(table):
try:
fields.append(Field(field[0], self._datatapy_dict[field[1]]))
except SyntaxError:
fields.append(Field("r_" + field[0],
self._datatapy_dict[field[1]],
rname=field[0]))
self._dal.define_table(table, *fields, primarykey=[], migrate=False)
def get_db(self):
return self._dal
开发者ID:lg31415,项目名称:antitools,代码行数:25,代码来源:DyTables.py
示例3: testRun
def testRun(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
#: skip for adapters that use drivers for datetime parsing
if db._adapter.parser.registered.get('datetime') is None:
return
parse = lambda v: db._adapter.parser.parse(v, 'datetime', 'datetime')
dt = parse('2015-09-04t12:33:36.223245')
self.assertEqual(dt.microsecond, 223245)
self.assertEqual(dt.hour, 12)
dt = parse('2015-09-04t12:33:36.223245Z')
self.assertEqual(dt.microsecond, 223245)
self.assertEqual(dt.hour, 12)
dt = parse('2015-09-04t12:33:36.223245-2:0')
self.assertEqual(dt.microsecond, 223245)
self.assertEqual(dt.hour, 10)
dt = parse('2015-09-04t12:33:36+1:0')
self.assertEqual(dt.microsecond, 0)
self.assertEqual(dt.hour, 13)
dt = parse('2015-09-04t12:33:36.123')
self.assertEqual(dt.microsecond, 123000)
dt = parse('2015-09-04t12:33:36.00123')
self.assertEqual(dt.microsecond, 1230)
dt = parse('2015-09-04t12:33:36.1234567890')
self.assertEqual(dt.microsecond, 123456)
db.close()
开发者ID:Pac23,项目名称:book2-exercises,代码行数:34,代码来源:base.py
示例4: testRun
def testRun(self):
db = DAL(DEFAULT_URI, check_reserved=["all"], entity_quoting=True)
db.define_table("tt", Field("aa"), Field("bb", "boolean"))
sql = db._adapter.dialect.create_index("idx_aa_f", db.tt, [db.tt.aa], where=str(db.tt.bb == False))
self.assertEqual(sql, 'CREATE INDEX "idx_aa_f" ON "tt" ("aa") WHERE ("tt"."bb" = \'F\');')
rv = db.tt.create_index("idx_aa_f", db.tt.aa, where=(db.tt.bb == False))
self.assertTrue(rv)
rv = db.tt.drop_index("idx_aa_f")
self.assertTrue(rv)
drop(db.tt)
开发者ID:topleft,项目名称:book2-exercises,代码行数:10,代码来源:indexes.py
示例5: get_sys_table
def get_sys_table(uri="mysql://lms_test:[email protected]/information_schema"):
sys_tab = DAL(uri=uri)
sys_tab.define_table('COLUMNS',
Field("TABLE_SCHEMA", ),
Field("TABLE_NAME"),
Field("COLUMN_NAME"),
Field("IS_NULLABLE"),
Field("DATA_TYPE"),
Field("COLUMN_TYPE"),
primarykey=[],
migrate=False)
return sys_tab
开发者ID:lg31415,项目名称:antitools,代码行数:13,代码来源:TabaleSchema.py
示例6: testRun
def testRun(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
db.define_table('tt', Field('vv'))
db.define_table('ttt', Field('vv'), Field('tt_id', 'reference tt', notnull=True))
self.assertRaises(Exception, db.ttt.insert, vv='pydal')
# The following is mandatory for backends as PG to close the aborted transaction
db.commit()
drop(db.ttt)
drop(db.tt)
db.close()
开发者ID:leichunxin,项目名称:pydal,代码行数:10,代码来源:base.py
示例7: testRun
def testRun(self):
cache = SimpleCache()
db = DAL(DEFAULT_URI, check_reserved=['all'])
db.define_table('tt', Field('aa'))
db.tt.insert(aa='1')
r0 = db().select(db.tt.ALL)
r1 = db().select(db.tt.ALL, cache=(cache, 1000))
self.assertEqual(len(r0), len(r1))
r2 = db().select(db.tt.ALL, cache=(cache, 1000))
self.assertEqual(len(r0), len(r2))
r3 = db().select(db.tt.ALL, cache=(cache, 1000), cacheable=True)
self.assertEqual(len(r0), len(r3))
r4 = db().select(db.tt.ALL, cache=(cache, 1000), cacheable=True)
self.assertEqual(len(r0), len(r4))
drop(db.tt)
开发者ID:Gabba-git,项目名称:Assignments,代码行数:15,代码来源:caching.py
示例8: testRun
def testRun(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
t1 = db.define_table('t1', Field('int_level', requires=IS_INT_IN_RANGE(1, 5)))
i_response = t1.validate_and_update_or_insert((t1.int_level == 1), int_level=1)
u_response = t1.validate_and_update_or_insert((t1.int_level == 1), int_level=2)
e_response = t1.validate_and_update_or_insert((t1.int_level == 1), int_level=6)
self.assertTrue(i_response.id != None)
self.assertTrue(u_response.id != None)
self.assertTrue(e_response.id == None and len(e_response.errors.keys()) != 0)
self.assertTrue(db(t1).count() == 1)
self.assertTrue(db(t1.int_level == 1).count() == 0)
self.assertTrue(db(t1.int_level == 6).count() == 0)
self.assertTrue(db(t1.int_level == 2).count() == 1)
db.t1.drop()
return
开发者ID:matheuscas,项目名称:openshift_web2py,代码行数:15,代码来源:validation.py
示例9: __init__
def __init__(self, db_user, db_pass, db_host, db_name, migrate=False):
super(DNSAPI, self).__init__()
self.db = DAL("postgres://%s:%[email protected]%s/%s" % (db_user, db_pass, db_host, db_name), migrate=migrate)
if self.db:
print 'Successfully connected to db "%s" on host "%s"' % (db_name, db_host)
self.db.define_table(
"dns_zones",
Field(
"name", "string"
), # ends in . (e.g. example.com.); input should probably have a validator to ensure zones end in a .
)
self.db.define_table(
"dns_zone_records",
Field("zone", "reference dns_zones"),
Field("record_name", "string"), # (e.g. ns1.example.com.)
Field(
"record_type", "string", default="A", requires=IS_IN_SET(RECORD_TYPES)
), # (e.g. A, AAAA, CNAME, MX, NS)
Field(
"record_value", "string"
), # (e.g. an IP for A or AAAA, an address for CNAME, and an address and priority for MX)
Field(
"record_ttl", "integer", default=60 * 5
), # A TTL in seconds before a client should check for a new value. Can reasonably set to lower or higher depending on the volatility of the records
)
开发者ID:Annixa,项目名称:dockerlab-dns,代码行数:28,代码来源:dnsapi.py
示例10: setUp
def setUp(self):
db = DAL('sqlite:memory')
db.define_table('color', Field('name', requires=IS_NOT_IN_DB(db, 'color.name')))
db.color.insert(name='red')
db.color.insert(name='green')
db.color.insert(name='blue')
db.define_table('thing', Field('name'), Field('color', 'reference color'))
db.thing.insert(name='Chair', color=1)
db.thing.insert(name='Chair', color=2)
db.thing.insert(name='Table', color=1)
db.thing.insert(name='Table', color=3)
db.thing.insert(name='Lamp', color=2)
db.define_table('rel', Field('a', 'reference thing'), Field('desc'), Field('b','reference thing'))
db.rel.insert(a=1, b=2, desc='is like')
db.rel.insert(a=3, b=4, desc='is like')
db.rel.insert(a=1, b=3, desc='is under')
db.rel.insert(a=2, b=4, desc='is under')
db.rel.insert(a=5, b=4, desc='is above')
api = DBAPI(db, ALLOW_ALL_POLICY)
self.db = db
self.api = api
开发者ID:web2py,项目名称:pydal,代码行数:26,代码来源:dbapi.py
示例11: DBCache
class DBCache(Cache):
'''An implementation of the c9r.file.cache.FileCache with PyDAL using
database.
'''
defaults = {
'db': 'sqlite://cache.db', # Database URL
}
def_conf = ['~/.etc/cache-conf.json']
def clear(self, clear_all=True):
'''Remove the entire content(s) in the cache.
'''
db = self.db
db((db.vars.id>=0) if clear_all else (db.vars.expires<time())).delete()
def clear_cache(self, vset, names):
'''
'''
def get(self, vset, name):
'''Get a named data from this cache.
'''
db = self.db
rows = db((db.vars.name==name)&(db.vars.expires<time())).select()
return rows[0].value
def put(self, vset, name, data):
'''Save given data into the cache with given name.
'''
self.db.vars.insert(name=name, value=data, expires=time()+self.window)
def __init__(self, conf=[], initconf=None):
'''
'''
Cache.__init__(self, conf, initconf)
self.db = DAL(self.config('db'))
self.db.define_table('varset', Field('name'))
self.db.define_table('vars',
Field('name'), Field('value', 'json'),
Field('varset'),
Field('expires', 'integer'),
primarykey=['varset', 'name'])
self.window = int(self.config('window'))
开发者ID:ww9rivers,项目名称:c9r,代码行数:43,代码来源:cache.py
示例12: testRun
def testRun(self):
for ref, bigint in [('reference', False), ('big-reference', True)]:
db = DAL(DEFAULT_URI, check_reserved=['all'], bigint_id=bigint)
db.define_table('tt', Field('vv'))
db.define_table('ttt', Field('vv'), Field('tt_id', '%s tt' % ref,
unique=True))
id_i = db.tt.insert(vv='pydal')
# Null tt_id
db.ttt.insert(vv='pydal')
# first insert is OK
db.ttt.insert(tt_id=id_i)
self.assertRaises(Exception, db.ttt.insert, tt_id=id_i)
# The following is mandatory for backends as PG to close the aborted transaction
db.commit()
drop(db.ttt)
drop(db.tt)
db.close()
开发者ID:boa-py,项目名称:pydal,代码行数:17,代码来源:base.py
示例13: __init__
def __init__(self, conf=[], initconf=None):
'''
'''
Cache.__init__(self, conf, initconf)
self.db = DAL(self.config('db'))
self.db.define_table('varset', Field('name'))
self.db.define_table('vars',
Field('name'), Field('value', 'json'),
Field('varset'),
Field('expires', 'integer'),
primarykey=['varset', 'name'])
self.window = int(self.config('window'))
开发者ID:ww9rivers,项目名称:c9r,代码行数:12,代码来源:cache.py
示例14: DbHelper
class DbHelper(object):
"""docstring for DbHelper"""
def __init__(self, arg):
super(DbHelper, self).__init__()
self.arg = arg
self.db = DAL('mongodb://140.143.247.178:27099/spider')
self.define_table()
'''
self.db.thing.insert(name='Chair')
query = self.db.thing.name.startswith('C')
rows = self.db(query).select()
print(rows[0].name)
self.db.commit()
'''
def define_table(self):
print(self.db._dbname)
self.db.define_table('douban_topic',Field('title'),Field('title_url'),Field('people'),Field('people_url')
,Field('replay_num'),Field('post_time'))
def insert_models(self,table_name='',items=[]):
a = list(map(dict,items))
self.db.douban_topic.bulk_insert(a)
self.db.commit()
开发者ID:jianglieshan,项目名称:SpiderProj,代码行数:24,代码来源:DBHelper.py
示例15: DAL
valid_transitions = [
{ 'id':1, 'trigger':'begin', 'source':'start', 'dest':'proc', 'after':'increase_processings' },
{ 'id':2, 'trigger':'end', 'source':'proc', 'dest':'finish', 'after':'noop' },
{ 'id':3, 'trigger':'reprocess', 'source':'finish', 'dest':'proc', 'after':'increase_processings' }
]
db.config_workflow.truncate()
db.config_wfstate.truncate()
for i in valid_transitions:
db.config_workflow.insert(trigger=i['trigger'], source=i['source'], dest=i['dest'], after=i['after'])
db.config_wfstate.insert(name='start')
db.config_wfstate.insert(name='proc')
db.config_wfstate.insert(name='finish')
db.commit()
db = DAL(uri='sqlite://temp.db', folder='db')
db.define_table('config_workflow', Field('trigger'), Field('source'), Field('dest'), Field('after'))
db.define_table('config_wfstate', Field('name'))
db_config_init(db)
flow = db(db.config_workflow).select()
#state = db(db.config_wfstate).select()
state=['start','proc','finish']
m = Mincer(valid_states=state, valid_transitions=flow, initial='start')
print state
print flow
print m.state
m.begin()
m.end()
m.reprocess()
开发者ID:BartGo,项目名称:transition-drafts,代码行数:31,代码来源:main.py
示例16: DAL
This program is a demo of how to use the PyDAL and xtopdf Python libraries
together to publish database data to PDF.
PyDAL is at: https://github.com/web2py/pydal/blob/master/README.md
xtopdf is at: https://bitbucket.org/vasudevram/xtopdf
and info about xtopdf is at: http://slides.com/vasudevram/xtopdf or
at: http://slid.es/vasudevram/xtopdf
"""
# imports
from pydal import DAL, Field
from PDFWriter import PDFWriter
SEP = 60
# create the database
db = DAL('sqlite://house_depot.db')
# define the table
db.define_table('furniture', \
Field('id'), Field('name'), Field('quantity'), Field('unit_price')
)
# insert rows into table
items = ( \
(1, 'chair', 40, 50),
(2, 'table', 10, 300),
(3, 'cupboard', 20, 200),
(4, 'bed', 30, 400)
)
for item in items:
db.furniture.insert(id=item[0], name=item[1], quantity=item[2], unit_price=item[3])
开发者ID:jacob-carrier,项目名称:code,代码行数:31,代码来源:recipe-579004.py
示例17: DAL
#!/usr/bin/python
# -*- coding: utf-8 -*-
import os
from core.crud import crud
from pydal import DAL, Field
##db = DAL('mysql://root:[email protected]/fasa', migrate=False)
db = DAL('sqlite://'+os.path.join('modelos','pyfactura.db'), migrate=False)
db.define_table('pagos',
Field('pago', 'string', length=2, required=True,),
Field('detalle', 'string', length=40, required=True,),
Field('dia', 'date', length=8),
Field('des1', 'decimal(12,2)', length=10),
primarykey=['pago'],
)
formato = {'pago':{'width':30, 'text':'Pagos', 'id':True},
}
def main():
abm = crud(tabla=db.pagos,
basedatos=db,
formato=formato,
)
if __name__ == '__main__':
main()
开发者ID:UniversidadDelEste,项目名称:guiabm,代码行数:30,代码来源:pagos.py
示例18: model
def model():
db = DAL('sqlite://pin.db',pool_size=1,folder='./',migrate=False)
Pin=db.define_table('pin',Field('title'),Field('image'))
return (db,Pin)
开发者ID:assiri,项目名称:Flask_angular_pyDal,代码行数:4,代码来源:pin.py
示例19: DAL
from pydal import DAL, Field
db = DAL('sqlite:memory:')
db.define_table('persons',
Field('name'),
Field('age')
)
amy = db.persons.insert( name='Amy', age=52 )
bob = db.persons.insert( name='Bob', age=48 )
cat = db.persons.insert( name='Cat', age=23 )
dan = db.persons.insert( name='Dan', age=17 )
edd = db.persons.insert( name='Edd', age=77 )
fan = db.persons.insert( name='Fan', age=65 )
gin = db.persons.insert( name='Gin', age=27 )
hil = db.persons.insert( name='Hil', age=30 )
iri = db.persons.insert( name='Iri', age=62 )
jac = db.persons.insert( name='Jac', age=18 )
db.commit()
# Export the 'persons' database
with open( 'persons.csv', 'wb' ) as f:
f.write( str(db(db.persons.id).select()) )
# Export only the young persons
with open( 'young-people.csv', 'wb') as f:
people = db( db.persons.age <= 30 ).select()
f.write( str( people ) )
开发者ID:ippo615,项目名称:experiments,代码行数:29,代码来源:export.py
示例20: __init__
def __init__(self, arg):
super(DbHelper, self).__init__()
self.arg = arg
self.db = DAL('mongodb://140.143.247.178:27099/spider')
self.define_table()
'''
开发者ID:jianglieshan,项目名称:SpiderProj,代码行数:6,代码来源:DBHelper.py
注:本文中的pydal.DAL类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论