本文整理汇总了Python中pyasm.search.SearchType类的典型用法代码示例。如果您正苦于以下问题:Python SearchType类的具体用法?Python SearchType怎么用?Python SearchType使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了SearchType类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: __init__
def __init__(my, search_type, config_base, input_prefix='', config=None):
if type(search_type) in types.StringTypes:
my.search_type_obj = SearchType.get(search_type)
my.search_type = search_type
elif isinstance(search_type, SearchType):
my.search_type_obj = search_type
my.search_type = my.search_type_obj.get_base_key()
elif inspect.isclass(search_type) and issubclass(search_type, SObject):
my.search_type_obj = SearchType.get(search_type.SEARCH_TYPE)
my.search_type = my.search_type_obj.get_base_key()
else:
raise LayoutException('search_type must be a string or an sobject')
my.config = config
my.config_base = config_base
my.input_prefix = input_prefix
my.element_names = []
my.element_titles = []
from pyasm.web import DivWdg
my.top = DivWdg()
# Layout widgets compartmentalize their widgets in sections for drawing
my.sections = {}
super(BaseConfigWdg,my).__init__()
开发者ID:0-T-0,项目名称:TACTIC,代码行数:26,代码来源:base_config_wdg.py
示例2: create
def create(self):
project = Project.get_by_code(self.project_code)
if project:
self.delete()
print "Setting up a basic Sample3d project"
# create the project
create_cmd = CreateProjectCmd(project_code=self.project_code, project_title="Sample 3D") #, project_type="unittest")
create_cmd.execute()
# install the unittest plugin
installer = PluginInstaller(relative_dir="TACTIC/internal/sample3d", verbose=False)
installer.execute()
# add 30 shots
for x in xrange(30):
shot = SearchType.create("prod/shot")
shot.set_value('name','shot%s'%x)
shot.set_value('sequence_code','SEQ_01')
shot.commit(triggers=False)
if not Search.eval("@SOBJECT(prod/sequence['code','SEQ_01'])"):
seq = SearchType.create("prod/sequence")
seq.set_value('code','SEQ_01')
seq.commit(triggers=False)
开发者ID:mincau,项目名称:TACTIC,代码行数:28,代码来源:environment.py
示例3: copy_sobject
def copy_sobject(my, sobject, dst_search_type, context=None, checkin_mode='inplace'):
new_sobject = SearchType.create(dst_search_type)
search_type = SearchType.get(dst_search_type)
columns = SearchType.get_columns(dst_search_type)
data = sobject.get_data()
for name, value in data.items():
if name in ['id','pipeline_code']:
continue
if name not in columns:
continue
if not value:
continue
if name == "code":
value = Common.get_next_sobject_code(sobject, 'code')
if not value:
continue
new_sobject.set_value(name, value)
if SearchType.column_exists(dst_search_type, "project_code"):
project_code = Project.get_project_code()
new_sobject.set_value("project_code", project_code)
new_sobject.commit()
# get all of the current snapshots and file paths associated
if not context:
snapshots = Snapshot.get_all_current_by_sobject(sobject)
else:
snapshots = [Snapshot.get_current_by_sobject(sobject, context)]
if not snapshots:
return
msgs = []
for snapshot in snapshots:
#file_paths = snapshot.get_all_lib_paths()
file_paths_dict = snapshot.get_all_paths_dict()
file_types = file_paths_dict.keys()
if not file_types:
continue
# make sure the paths match the file_types
file_paths = [file_paths_dict.get(x)[0] for x in file_types]
mode = checkin_mode
# checkin the files (inplace)
try:
context = snapshot.get_value('context')
checkin = FileCheckin(new_sobject, context=context, file_paths=file_paths, file_types=file_types, mode=mode)
checkin.execute()
#print "done: ", context, new_sobject.get_related_sobjects("sthpw/snapshot")
except CheckinException, e:
msgs.append('Post-process Check-in Error for %s: %s ' %(context, e.__str__()))
开发者ID:0-T-0,项目名称:TACTIC,代码行数:60,代码来源:sobject_copy_cmd.py
示例4: _test_base_dir_alias
def _test_base_dir_alias(my):
Config.set_value("checkin", "asset_base_dir", {
'default': '/tmp/tactic/default',
'alias': '/tmp/tactic/alias',
'alias2': '/tmp/tactic/alias2',
});
asset_dict = Environment.get_asset_dirs()
default_dir = asset_dict.get("default")
my.assertEquals( "/tmp/tactic/default", default_dir)
aliases = asset_dict.keys()
# "plugins" is assumed in some branch
if 'plugins' in aliases:
my.assertEquals( 4, len(aliases))
else:
my.assertEquals( 3, len(aliases))
my.assertNotEquals( None, "alias" in aliases )
# create a naming
naming = SearchType.create("config/naming")
naming.set_value("search_type", "unittest/person")
naming.set_value("context", "alias")
naming.set_value("dir_naming", "alias")
naming.set_value("file_naming", "text.txt")
naming.set_value("base_dir_alias", "alias")
naming.commit()
# create 2nd naming where
naming = SearchType.create("config/naming")
naming.set_value("search_type", "unittest/person")
naming.set_value("context", "alias2")
naming.set_value("dir_naming", "alias2")
naming.set_value("base_dir_alias", "alias2")
naming.set_value("file_naming", "text.txt")
naming.set_value("checkin_type", "auto")
naming.commit()
my.clear_naming()
# create a new test.txt file
for context in ['alias', 'alias2']:
file_path = "./test.txt"
file = open(file_path, 'w')
file.write("whatever")
file.close()
checkin = FileCheckin(my.person, file_path, context=context)
checkin.execute()
snapshot = checkin.get_snapshot()
lib_dir = snapshot.get_lib_dir()
expected = "/tmp/tactic/%s/%s" % (context, context)
my.assertEquals(expected, lib_dir)
path = "%s/text.txt" % (lib_dir)
exists = os.path.exists(path)
my.assertEquals(True, exists)
开发者ID:hellios78,项目名称:TACTIC,代码行数:58,代码来源:checkin_test.py
示例5: get_message
def get_message(my):
search_type_obj = my.sobject.get_search_type_obj()
title = search_type_obj.get_title()
subject = my.get_subject()
notification_message = my.notification.get_value("message")
if notification_message:
# parse it through the expression
sudo = Sudo()
parser = ExpressionParser()
snapshot = my.input.get('snapshot')
env_sobjects = {}
# turn prev_data and update_data from input into sobjects
prev_data = SearchType.create("sthpw/virtual")
id_col = prev_data.get_id_col()
if id_col:
del prev_data.data[id_col]
prev_dict = my.input.get("prev_data")
if prev_dict:
for name, value in prev_dict.items():
if value != None:
prev_data.set_value(name, value)
update_data = SearchType.create("sthpw/virtual")
id_col = update_data.get_id_col()
if id_col:
del update_data.data[id_col]
update_dict = my.input.get("update_data")
if update_dict:
for name, value in update_dict.items():
if value != None:
update_data.set_value(name, value)
if snapshot:
env_sobjects = {
'snapshot': snapshot
}
env_sobjects['prev_data'] = prev_data
env_sobjects['update_data'] = update_data
notification_message = parser.eval(notification_message, my.sobject, env_sobjects=env_sobjects, mode='string')
del sudo
return notification_message
message = "%s %s" % (title, my.sobject.get_name())
message = '%s\n\nReport from transaction:\n%s\n' % (message, subject)
return message
开发者ID:0-T-0,项目名称:TACTIC,代码行数:57,代码来源:email_handler.py
示例6: execute
def execute(my):
collection_key = my.kwargs.get("collection_key")
search_keys = my.kwargs.get("search_keys")
collection = Search.get_by_search_key(collection_key)
if not collection:
raise Exception("Collection does not exist")
search_type = collection.get_base_search_type()
parts = search_type.split("/")
collection_type = "%s/%s_in_%s" % (parts[0], parts[1], parts[1])
search = Search(collection_type)
search.add_filter("parent_code", collection.get_code())
items = search.get_sobjects()
search_codes = [x.get_value("search_code") for x in items]
search_codes = set(search_codes)
has_keywords = SearchType.column_exists(search_type, "keywords")
if has_keywords:
collection_keywords = collection.get_value("keywords", no_exception=True)
collection_keywords = collection_keywords.split(" ")
collection_keywords = set(collection_keywords)
# create new items
sobjects = Search.get_by_search_keys(search_keys)
for sobject in sobjects:
if sobject.get_code() in search_codes:
continue
new_item = SearchType.create(collection_type)
new_item.set_value("parent_code", collection.get_code())
new_item.set_value("search_code", sobject.get_code())
new_item.commit()
# copy the metadata of the collection
if has_keywords:
keywords = sobject.get_value("keywords")
keywords = keywords.split(" ")
keywords = set(keywords)
keywords = keywords.union(collection_keywords)
keywords = " ".join(keywords)
sobject.set_value("keywords", keywords)
sobject.commit()
开发者ID:asmboom,项目名称:TACTIC,代码行数:57,代码来源:collection_wdg.py
示例7: execute
def execute(my):
search_type = my.kwargs.get("search_type")
column_info = SearchType.get_column_info(search_type)
values = my.kwargs.get("values")
# get the definition config for this search_type
from pyasm.search import WidgetDbConfig
config = WidgetDbConfig.get_by_search_type(search_type, "definition")
if not config:
config = SearchType.create("config/widget_config")
config.set_value("search_type", search_type)
config.set_value("view", "definition")
config.commit()
config._init()
for data in values:
name = data.get("name")
name = name.strip()
if name == '':
continue
try:
name.encode('ascii')
except UnicodeEncodeError:
raise TacticException('Column name needs to be in English. Non-English characters can be used in Title when performing [Edit Column Definition] afterwards.')
if column_info.get(name):
raise CommandException("Column [%s] is already defined" % name)
format = data.get("format")
fps = data.get("fps")
data_type = data.get("data_type")
from pyasm.command import ColumnAddCmd
cmd = ColumnAddCmd(search_type, name, data_type)
cmd.execute()
#(my, search_type, attr_name, attr_type, nullable=True):
class_name = 'tactic.ui.table.FormatElementWdg'
options = {
'format': format,
'type': data_type,
'fps': fps
}
# add a new widget to the definition
config.append_display_element(name, class_name, options=options)
config.commit_config()
开发者ID:0-T-0,项目名称:TACTIC,代码行数:55,代码来源:column_edit_wdg.py
示例8: set_templates
def set_templates(self):
project_code = WebContainer.get_web().get_full_context_name()
if project_code == "default":
project_code = Project.get_default_project()
try:
SearchType.set_global_template("project", project_code)
except SecurityException as e:
print("WARNING: ", e)
开发者ID:mincau,项目名称:TACTIC,代码行数:11,代码来源:top_wdg.py
示例9: execute
def execute(my):
import types
transaction_xml = my.kwargs.get("transaction_xml")
file_mode = my.kwargs.get("file_mode")
if not file_mode:
file_mode = 'delayed'
# if the first argument is a dictionary, then the whole
# transaction sobject was passed through
# NOTE: this is now the default
if type(transaction_xml) == types.DictType:
transaction_dict = transaction_xml
transaction_xml = transaction_dict.get("transaction")
timestamp = transaction_dict.get("timestamp")
login = transaction_dict.get("login")
# recreate the transaction
transaction = SearchType.create("sthpw/transaction_log")
for name, value in transaction_dict.items():
if name.startswith("__"):
continue
if name == 'id':
continue
if value == None:
continue
transaction.set_value(name, value)
elif isinstance(transaction_xml, SObject):
transaction = transaction_xml
else:
# Create a fake transaction.
# This is only used for test purposes
transaction = SearchType.create("sthpw/transaction_log")
if transaction_xml:
transaction.set_value("transaction", transaction_xml)
else:
print "WARNING: transaction xml is empty"
transaction.set_value("login", "admin")
# commit the new transaction. This is the only case where
# a transaction will not have a code, so it has to be committed.
# The other case do not need to be committed because they will
# already have codes and the transaction is committed in
# RedoCmd
#
try:
transaction.commit()
except Exception, e:
print "Failed to commit transaction [%s]: It may already exist. Skipping." % transaction.get_code()
print str(e)
return
开发者ID:0-T-0,项目名称:TACTIC,代码行数:54,代码来源:run_transaction_cmd.py
示例10: set_templates
def set_templates(my):
if my.context:
context = my.context
else:
context = WebContainer.get_web().get_full_context_name()
try:
SearchType.set_global_template("project", context)
except SecurityException, e:
print "WARNING: ", e
开发者ID:blezek,项目名称:TACTIC,代码行数:12,代码来源:top_wdg.py
示例11: check
def check(my):
my.search_type = my.kwargs.get("search_type")
my.values = my.kwargs.get("values")
my.db_resource = SearchType.get_db_resource_by_search_type(my.search_type)
my.database = my.db_resource.get_database()
my.search_type_obj = SearchType.get(my.search_type)
if my.database != Project.get_project_code() and my.database !='sthpw':
raise TacticException('You are not allowed to delete the sType [%s] from another project [%s].' %(my.search_type, my.database))
return False
return True
开发者ID:blezek,项目名称:TACTIC,代码行数:12,代码来源:delete_wdg.py
示例12: check
def check(self):
self.search_type = self.kwargs.get("search_type")
self.values = self.kwargs.get("values")
self.db_resource = SearchType.get_db_resource_by_search_type(self.search_type)
self.database = self.db_resource.get_database()
self.search_type_obj = SearchType.get(self.search_type)
if self.database != Project.get_project_code() and self.database !='sthpw':
raise TacticException('You are not allowed to delete the sType [%s] from another project [%s].' %(self.search_type, self.database))
return False
return True
开发者ID:mincau,项目名称:TACTIC,代码行数:12,代码来源:delete_wdg.py
示例13: do_search
def do_search(my):
'''this widget has its own search mechanism'''
web = WebContainer.get_web()
# get the sobject that is to be edited
id = my.search_id
# if no id is given, then create a new one for insert
search = None
sobject = None
search_type_base = SearchType.get(my.search_type).get_base_key()
if my.mode == "insert":
sobject = SearchType.create(my.search_type)
my.current_id = -1
# prefilling default values if available
value_keys = web.get_form_keys()
if value_keys:
for key in value_keys:
value = web.get_form_value(key)
sobject.set_value(key, value)
else:
search = Search(my.search_type)
# figure out which id to search for
if web.get_form_value("do_edit") == "Edit/Next":
search_ids = web.get_form_value("%s_search_ids" %search_type_base)
if search_ids == "":
my.current_id = id
else:
search_ids = search_ids.split("|")
next = search_ids.index(str(id)) + 1
if next == len(search_ids):
next = 0
my.current_id = search_ids[next]
last_search = Search(my.search_type)
last_search.add_id_filter( id )
my.last_sobject = last_search.get_sobject()
else:
my.current_id = id
search.add_id_filter( my.current_id )
sobject = search.get_sobject()
if not sobject and my.current_id != -1:
raise EditException("No SObject found")
# set all of the widgets to contain this sobject
my.set_sobjects( [sobject], search )
开发者ID:funic,项目名称:TACTIC,代码行数:52,代码来源:edit_wdg.py
示例14: get_pipeline
def get_pipeline(my, pipeline_xml, add_tasks=False):
pipeline = SearchType.create("sthpw/pipeline")
pipeline.set_pipeline(pipeline_xml)
pipeline_id = random.randint(0, 10000000)
#pipeline.set_value("code", "test%s" % pipeline_id)
#pipeline.set_id(pipeline_id)
#pipeline.set_value("id", pipeline_id)
pipeline.set_value("pipeline", pipeline_xml)
pipeline.commit()
process_names = pipeline.get_process_names()
# delete the processes
search = Search("config/process")
search.add_filters("process", process_names)
processes = search.get_sobjects()
for process in processes:
process.delete()
# create new processes
processes_dict = {}
for process_name in process_names:
# define the process nodes
process = SearchType.create("config/process")
process.set_value("process", process_name)
process.set_value("pipeline_code", pipeline.get_code())
process.set_json_value("workflow", {
'on_complete': '''
sobject.set_value('%s', "complete")
''' % process_name,
'on_approve': '''
sobject.set_value('%s', "approve")
''' % process_name,
} )
process.commit()
processes_dict[process_name] = process
# Note: we don't have an sobject yet
if add_tasks:
task = SaerchType.create("sthpw/task")
task.set_parent(sobject)
task.set_value("process", process_name)
task.commit()
return pipeline, processes_dict
开发者ID:nuxping,项目名称:TACTIC,代码行数:51,代码来源:workflow_test.py
示例15: get_logins_by_id
def get_logins_by_id(note_id):
login_in_group = SearchType.get(LoginInGroup.SEARCH_TYPE)
group_note = SearchType.get(GroupNotification.SEARCH_TYPE)
search = Search(Login.SEARCH_TYPE)
query_str = ''
if isinstance(note_id, list):
query_str = "in (%s)" %",".join([str(id) for id in note_id])
else:
query_str = "= %d" %note_id
search.add_where('''"login" in (select "login" from "%s" where "login_group" in (select "login_group" from "%s" where "notification_id" %s)) ''' % (login_in_group.get_table(), group_note.get_table(), query_str))
return search.get_sobjects()
开发者ID:mincau,项目名称:TACTIC,代码行数:14,代码来源:notification.py
示例16: _test_sobject_hierarchy
def _test_sobject_hierarchy(my):
# FIXME: this functionality has been disabled until further notice
return
snapshot_type = SearchType.create("sthpw/snapshot_type")
snapshot_type.set_value("code", "maya_model")
snapshot_type.commit()
snapshot_type = SearchType.create("prod/snapshot_type")
snapshot_type.set_value("code", "maya_model")
snapshot_type.commit()
snapshot_type = SnapshotType.get_by_code("maya_model")
开发者ID:funic,项目名称:TACTIC,代码行数:14,代码来源:biz_test.py
示例17: get_display
def get_display(my):
widget = Widget()
div = DivWdg(css="filter_box")
show_span = SpanWdg(css="med")
show_span.add("Show All Types: ")
checkbox = FilterCheckboxWdg("show_all_types")
checkbox.set_persistence()
show_span.add(checkbox)
show_all_types = checkbox.get_value()
div.add(show_span)
span = SpanWdg(css="med")
span.add("Search Type: ")
select = SelectWdg("filter|search_type")
select.add_empty_option("-- Select --")
project = Project.get()
project_type = project.get_base_type()
search = Search("sthpw/search_object")
if show_all_types:
search.add_where(
"""
namespace = '%s' or namespace = '%s' or search_type in ('sthpw/task')
"""
% (project_type, project.get_code())
)
else:
# show only the custom ones
search.add_filter("namespace", project.get_code())
search.add_order_by("title")
sobjects = search.get_sobjects()
select.set_sobjects_for_options(sobjects, "search_type", "title")
# select.set_option("query", "sthpw/search_object|search_type|title")
select.set_persistence()
select.add_event("onchange", "document.form.submit()")
search_type = select.get_value()
span.add(select)
div.add(span)
# make sure the current selection exists
try:
SearchType.get(search_type)
except SearchException, e:
return div
开发者ID:hellios78,项目名称:TACTIC,代码行数:49,代码来源:custom_view_app_wdg.py
示例18: _test_time
def _test_time(my):
''' test timezone related behavior'''
sobject = SearchType.create('sthpw/task')
sobject.set_value('project_code','unittest')
sobject.set_value('bid_start_date', '2014-11-11 05:00:00')
time = sobject.get_value('bid_start_date')
my.assertEquals(time, '2014-11-11 05:00:00')
sobject.commit()
time = sobject.get_value('bid_start_date')
my.assertEquals(time, '2014-11-11 05:00:00')
from pyasm.search import DbContainer
sql = DbContainer.get('sthpw')
db_value = sql.do_query('SELECT bid_start_date from task where id = %s'%sobject.get_id())
# 2014-11-11 00:00:00 is actually written to the database
my.assertEquals(db_value[0][0].strftime('%Y-%m-%d %H:%M:%S %Z'), '2014-11-11 00:00:00 ')
# an sType specified without a project but with an id could be a common human error
# but it should handle that fine
obj1 = Search.eval('@SOBJECT(unittest/person?project=unittest["id", "%s"])'%sobject.get_id(), single=True)
obj2= Search.eval('@SOBJECT(unittest/person?id=2["id", "%s"])'%sobject.get_id(), single=True)
obj3 = Search.eval('@SOBJECT(sthpw/task?id=2["id", "%s"])'%sobject.get_id(), single=True)
task = Search.eval('@SOBJECT(sthpw/task["id", "%s"])'%sobject.get_id(), single=True)
# EST and GMT diff is 5 hours
my.assertEquals(task.get_value('bid_start_date'), '2014-11-11 05:00:00')
# test NOW() auto conversion
sobj = SearchType.create('sthpw/note')
sobj.set_value('process','TEST')
sobj.set_value('note','123')
my.assertEquals(sobj.get_value('timestamp'), "")
sobj.commit()
# this is local commited time converted back to GMT
committed_time = sobj.get_value('timestamp')
from dateutil import parser
committed_time = parser.parse(committed_time)
from pyasm.common import SPTDate
now = SPTDate.now()
diff = now - committed_time
# should be roughly the same minute, not hours apart
my.assertEquals(diff.seconds < 60, True)
开发者ID:0-T-0,项目名称:TACTIC,代码行数:48,代码来源:biz_test.py
示例19: _test_trigger
def _test_trigger(my):
# create a dummy sobject
sobject = SearchType.create("unittest/person")
pipeline_xml = '''
<pipeline>
<process type="action" name="a"/>
</pipeline>
'''
pipeline, processes = my.get_pipeline(pipeline_xml)
process = processes.get("a")
process.set_value("workflow", "")
process.commit()
folder = Common.generate_alphanum_key()
Trigger.clear_db_cache()
event = "process|action"
trigger = SearchType.create("config/trigger")
trigger.set_value("event", event)
trigger.set_value("process", process.get_code())
trigger.set_value("mode", "same process,same transaction")
trigger.set_value("script_path", "%s/process_trigger" % folder)
trigger.commit()
script = SearchType.create("config/custom_script")
script.set_value("folder", folder)
script.set_value("title", "process_trigger")
script.set_value("script", '''
print "---"
for key, value in input.items():
print key, value
print "---"
print "process: ", input.get("process")
''')
script.commit()
# Run the pipeline
process = "a"
output = {
"pipeline": pipeline,
"sobject": sobject,
"process": process
}
Trigger.call(my, "process|pending", output)
开发者ID:jayvdb,项目名称:TACTIC,代码行数:48,代码来源:workflow_test.py
示例20: execute
def execute(my):
assert my.db_resource
assert my.table
database = my.db_resource.get_database()
from pyasm.search import Insert, Select, DbContainer, Search, Sql
# get the data
if not my.sobjects:
search = Search("sthpw/search_object")
# BAD assumption
#search.add_filter("table", my.table)
# create a search_type. This is bad assumption cuz it assumes project-specific search_type
# should call set_search_type()
if not my.search_type:
my.search_type = "%s/%s" % (my.database, my.table)
search.add_filter("search_type", my.search_type)
my.search_type_obj = search.get_sobject()
if not my.search_type_obj:
if my.no_exception == False:
raise SqlException("Table [%s] does not have a corresponding search_type" % my.table)
else:
return
search_type = my.search_type_obj.get_base_key()
search = Search(my.search_type)
search.set_show_retired(True)
my.sobjects = search.get_sobjects()
# get the info for the table
column_info = SearchType.get_column_info(my.search_type)
for sobject in my.sobjects:
print my.delimiter
insert = Insert()
insert.set_database(my.database)
insert.set_table(my.table)
data = sobject.data
for name, value in data.items():
if name in my.ignore_columns:
continue
if not my.include_id and name == "id":
insert.set_value("id", '"%s_id_seq".nextval' % table, quoted=False)
#insert.set_value(name, value, quoted=False)
elif value == None:
continue
else:
# replace all of the \ with double \\
insert.set_value(name, value)
print "%s" % insert.get_statement()
print my.end_delimiter
print
开发者ID:CeltonMcGrath,项目名称:TACTIC,代码行数:60,代码来源:sql_dumper.py
注:本文中的pyasm.search.SearchType类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论