本文整理汇总了Python中task.ValidationException类的典型用法代码示例。如果您正苦于以下问题:Python ValidationException类的具体用法?Python ValidationException怎么用?Python ValidationException使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了ValidationException类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: verify
def verify(self, id, updated_fields, force=False):
ntp = self.datastore.get_by_id('ntpservers', id)
if ntp is None:
raise VerifyException(errno.ENOENT, 'NTP Server with given ID does not exist')
errors = ValidationException()
try:
if 'address' in updated_fields:
system('ntpdate', '-q', updated_fields['address'])
except SubprocessException:
if not force:
errors.append((
'address',
errno.EINVAL,
'Server could not be reached. Check "Force" to continue regardless.'))
minpoll = updated_fields.get('minpoll', ntp.get('minpoll'))
maxpoll = updated_fields.get('maxpoll', ntp.get('maxpoll'))
if minpoll is not None and maxpoll is not None and not maxpoll > minpoll:
errors.append(('maxpoll', errno.EINVAL, 'Max Poll should be higher than Min Poll'))
if errors:
raise ValidationException(errors)
return ['system']
开发者ID:650elx,项目名称:middleware,代码行数:28,代码来源:NTPPlugin.py
示例2: verify
def verify(self, certificate):
if self.datastore.exists('crypto.certificates', ('name', '=', certificate['name'])):
raise VerifyException(errno.EEXIST, 'Certificate with given name already exists')
if certificate['type'] not in ('CERT_EXISTING', 'CA_EXISTING'):
raise VerifyException(errno.EINVAL, 'Invalid certificate type')
errors = ValidationException()
for i in ('country', 'state', 'city', 'organization', 'email', 'common'):
if i in certificate:
errors.add((0, i), '{0} is not valid in certificate import'.format(i))
if errors:
raise errors
if certificate['type'] == 'CERT_EXISTING' and (
'privatekey' not in certificate or
'passphrase' not in certificate
):
raise VerifyException(
errno.EINVAL, 'privatekey and passphrase required to import certificate'
)
try:
if 'privatekey' in certificate:
load_privatekey(certificate['privatekey'], certificate.get('passphrase'))
except Exception:
raise VerifyException(errno.EINVAL, 'Invalid passphrase')
return ['system']
开发者ID:jatinder-kumar-calsoft,项目名称:middleware,代码行数:30,代码来源:CryptoPlugin.py
示例3: verify
def verify(self, path, logs=True, cores=False):
errors = ValidationException()
if path in [None, ''] or path.isspace():
errors.add((0, 'path'), 'The Path is required', code=errno.EINVAL)
if errors:
raise errors
return []
开发者ID:erinix,项目名称:middleware,代码行数:7,代码来源:DebugPlugin.py
示例4: verify
def verify(self, path):
errors = ValidationException()
if path in [None, ''] or path.isspace():
errors.add((0, 'path'), 'The Path is required', code=errno.EINVAL)
if errors:
raise errors
return ['system']
开发者ID:freenas,项目名称:middleware,代码行数:7,代码来源:DebugPlugin.py
示例5: run
def run(self, id, updated_fields):
service_def = self.datastore.get_by_id('service_definitions', id)
node = ConfigNode('service.{0}'.format(service_def['name']), self.configstore)
restart = False
reload = False
updated_config = updated_fields.get('config')
if updated_config is None:
return
del updated_config['type']
if service_def.get('task'):
enable = updated_config.pop('enable', None)
try:
self.verify_subtask(service_def['task'], updated_config)
except RpcException as err:
new_err = ValidationException()
new_err.propagate(err, [0], [1, 'config'])
raise new_err
result = self.join_subtasks(self.run_subtask(service_def['task'], updated_config))
restart = result[0] == 'RESTART'
reload = result[0] == 'RELOAD'
if enable is not None:
node['enable'] = enable
else:
node.update(updated_config)
if service_def.get('etcd-group'):
self.dispatcher.call_sync('etcd.generation.generate_group', service_def.get('etcd-group'))
if 'enable' in updated_config:
# Propagate to dependent services
for i in service_def.get('dependencies', []):
svc_dep = self.datastore.get_by_id('service_definitions', i)
self.join_subtasks(self.run_subtask('service.update', i, {
'config': {
'type': 'service-{0}'.format(svc_dep['name']),
'enable': updated_config['enable']
}
}))
if service_def.get('auto_enable'):
# Consult state of services dependent on us
for i in self.datastore.query('service_definitions', ('dependencies', 'in', service_def['name'])):
enb = self.configstore.get('service.{0}.enable', i['name'])
if enb != updated_config['enable']:
del updated_config['enable']
break
self.dispatcher.call_sync('etcd.generation.generate_group', 'services')
self.dispatcher.call_sync('service.apply_state', service_def['name'], restart, reload, timeout=30)
self.dispatcher.dispatch_event('service.changed', {
'operation': 'update',
'ids': [service_def['id']]
})
开发者ID:jatinder-kumar-calsoft,项目名称:middleware,代码行数:59,代码来源:ServiceManagePlugin.py
示例6: verify
def verify(self, rsyncmod):
errors = ValidationException()
if re.search(r'[/\]]', rsyncmod['name']):
errors.add((0, 'name'), 'The name cannot contain slash or a closing square backet.')
if errors:
raise errors
return ['system']
开发者ID:freenas,项目名称:middleware,代码行数:10,代码来源:RsyncdPlugin.py
示例7: verify
def verify(self, user):
errors = ValidationException()
normalize_name(user, 'username')
for code, message in check_unixname(user['username']):
errors.add((0, 'username'), message, code=code)
if self.datastore.exists('users', ('username', '=', user['username'])):
raise VerifyException(errno.EEXIST, 'User with given name already exists')
if 'groups' in user and len(user['groups']) > 64:
errors.add(
(0, 'groups'),
'User cannot belong to more than 64 auxiliary groups'
)
if user.get('full_name') and ':' in user['full_name']:
errors.add((0, 'full_name'), 'The character ":" is not allowed')
if 'email' in user:
if not EMAIL_REGEX.match(user['email']):
errors.add(
(0, 'email'),
"{0} is an invalid email address".format(user['email'])
)
if errors:
raise errors
return ['system']
开发者ID:abwaters,项目名称:middleware,代码行数:30,代码来源:UserPlugin.py
示例8: verify
def verify(self, group):
errors = ValidationException()
normalize_name(group, 'name')
for code, message in check_unixname(group['name']):
errors.add((0, 'name'), message, code=code)
if errors:
raise errors
return ['system']
开发者ID:freenas,项目名称:middleware,代码行数:11,代码来源:UserPlugin.py
示例9: verify
def verify(self, ipfs):
errors = ValidationException()
if 'path' in ipfs:
if ipfs['path'] in [None, ''] or ipfs['path'].isspace():
errors.add((0, path), "The provided path: '{0}' is not valid".format(ipfs['path']))
if errors:
raise errors
return ['system']
开发者ID:650elx,项目名称:middleware,代码行数:11,代码来源:IPFSPlugin.py
示例10: verify
def verify(self, props):
errors = ValidationException()
if 'timezone' in props:
timezones = self.dispatcher.call_sync('system.general.timezones')
if props['timezone'] not in timezones:
errors.add((0, 'timezone'), 'Invalid timezone: {0}'.format(props['timezone']))
if errors:
raise errors
return ['system']
开发者ID:jatinder-kumar-calsoft,项目名称:middleware,代码行数:11,代码来源:SystemInfoPlugin.py
示例11: verify
def verify(self, afp):
errors = ValidationException()
dbpath = afp.get('dbpath')
if dbpath:
if not os.path.exists(dbpath):
errors.add((0, 'dbpath'), 'Path does not exist', code=errno.ENOENT)
elif not os.path.isdir(dbpath):
errors.add((0, 'dbpath'), 'Path is not a directory')
if errors:
raise errors
return ['system']
开发者ID:650elx,项目名称:middleware,代码行数:14,代码来源:AFPPlugin.py
示例12: verify
def verify(self, lldp):
errors = ValidationException()
node = ConfigNode('service.lldp', self.configstore).__getstate__()
node.update(lldp)
# Lazy load pycountry due to extra verbose DEBUG logging
import pycountry
if node['country_code'] and node['country_code'] not in pycountry.countries.indices['alpha2']:
errors.add((0, 'country_code'), 'Invalid ISO-3166 alpha 2 code')
if errors:
raise errors
return ['system']
开发者ID:jatinder-kumar-calsoft,项目名称:middleware,代码行数:14,代码来源:LLDPPlugin.py
示例13: verify
def verify(self, mail):
errors = ValidationException()
node = ConfigNode('mail', self.configstore).__getstate__()
if mail.get('auth'):
if not mail.get('user') and not node['user']:
errors.add((0, 'auth'), 'Mail authorization requires a username')
if not mail.get('pass') and not node['pass']:
errors.add((0, 'auth'), 'Mail authorization requires a password')
if errors:
raise errors
return []
开发者ID:650elx,项目名称:middleware,代码行数:14,代码来源:MailPlugin.py
示例14: verify
def verify(self, id, updated_fields):
errors = ValidationException()
if 'var' in updated_fields and self.datastore.exists(
'tunables', ('and', [('var', '=', updated_fields['var']), ('id', '!=', id)])
):
errors.add((1, 'var'), 'This variable already exists.', code=errno.EEXIST)
if 'value' in updated_fields:
if '"' in updated_fields['value'] or "'" in updated_fields['value']:
errors.add((1, 'value'), 'Quotes are not allowed')
if errors:
raise errors
return ['system']
开发者ID:erinix,项目名称:middleware,代码行数:17,代码来源:TunablePlugin.py
示例15: verify
def verify(self, uuid, updated_fields):
rsyncmod = self.datastore.get_by_id('rsyncd-module', uuid)
if rsyncmod is None:
raise VerifyException(errno.ENOENT, 'Rsync module {0} does not exist'.format(uuid))
rsyncmod.update(updated_fields)
errors = ValidationException()
if re.search(r'[/\]]', rsyncmod['name']):
errors.add((1, 'name'), 'The name cannot contain slash or a closing square backet.')
if errors:
raise errors
return ['system']
开发者ID:jatinder-kumar-calsoft,项目名称:middleware,代码行数:17,代码来源:RsyncdPlugin.py
示例16: verify
def verify(self, smb):
errors = ValidationException()
node = ConfigNode('service.smb', self.configstore).__getstate__()
netbiosname = smb.get('netbiosname')
if netbiosname is not None:
for n in netbiosname:
if not validate_netbios_name(n):
errors.add((0, 'netbiosname'), 'Invalid name {0}'.format(n))
else:
netbiosname = node['netbiosname']
workgroup = smb.get('workgroup')
if workgroup is not None:
if not validate_netbios_name(workgroup):
errors.add((0, 'workgroup'), 'Invalid name')
else:
workgroup = node['workgroup']
if workgroup.lower() in [i.lower() for i in netbiosname]:
errors.add((0, 'netbiosname'), 'NetBIOS and Workgroup must be unique')
if errors:
raise errors
return ['system']
开发者ID:650elx,项目名称:middleware,代码行数:26,代码来源:SMBPlugin.py
示例17: verify
def verify(self, id, updated_fields):
errors = ValidationException()
if 'name' in updated_fields:
for code, message in check_unixname(updated_fields['name']):
errors.add((1, 'name'), message, code=code)
# Check if there is another group with same name being renamed to
if self.datastore.exists('groups', ('name', '=', updated_fields['name']), ('id', '!=', id)):
errors.add(
(1, "name"),
'Group {0} already exists'.format(updated_fields['name']),
code=errno.EEXIST
)
if errors:
raise errors
return ['system']
开发者ID:jatinder-kumar-calsoft,项目名称:middleware,代码行数:19,代码来源:UserPlugin.py
示例18: verify
def verify(self, ups):
errors = ValidationException()
node = ConfigNode('service.ups', self.configstore).__getstate__()
node.update(ups)
if node['mode'] == 'MASTER' and not node['driver_port']:
errors.add((0, 'driver_port'), 'This field is required')
if node['mode'] == 'SLAVE' and not node['remote_host']:
errors.add((0, 'remote_host'), 'This field is required')
if not re.search(r'^[a-z0-9\.\-_]+$', node['identifier'], re.I):
errors.add((0, 'identifier'), 'Use alphanumeric characters, ".", "-" and "_"')
for i in ('monitor_user', 'monitor_password'):
if re.search(r'[ #]', node[i], re.I):
errors.add((0, i), 'Spaces or number signs are not allowed')
if errors:
raise errors
return ['system']
开发者ID:jatinder-kumar-calsoft,项目名称:middleware,代码行数:22,代码来源:UPSPlugin.py
示例19: verify
def verify(self, webdav):
errors = ValidationException()
node = ConfigNode('service.webdav', self.configstore).__getstate__()
node.update(webdav)
if node['http_port'] == node['https_port']:
errors.add((0, 'http_port'), 'HTTP and HTTPS ports cannot be the same')
if 'HTTPS' in node['protocol'] and not node['certificate']:
errors.add((0, 'certificate'), 'SSL protocol specified without choosing a certificate')
if node['certificate']:
cert = self.dispatcher.call_sync('crypto.certificate.query', [('id', '=', node['certificate'])])
if not cert:
errors.add((0, 'certificate'), 'SSL Certificate not found.')
if errors:
raise errors
return ['system']
开发者ID:jatinder-kumar-calsoft,项目名称:middleware,代码行数:20,代码来源:WebDAVPlugin.py
示例20: verify
def verify(self, ftp):
errors = ValidationException()
node = ConfigNode('service.ftp', self.configstore).__getstate__()
node.update(ftp)
pmin = node['passive_ports_min']
if 'passive_ports_min' in ftp:
if pmin and (pmin < 1024 or pmin > 65535):
errors.add((0, 'passive_ports_min'), 'This value must be between 1024 and 65535, inclusive.')
pmax = node['passive_ports_max']
if 'passive_ports_max' in ftp:
if pmax and (pmax < 1024 or pmax > 65535):
errors.add((0, 'passive_ports_max'), 'This value must be between 1024 and 65535, inclusive.')
elif pmax and pmin and pmin >= pmax:
errors.add((0, 'passive_ports_max'), 'This value must be higher than minimum passive port.')
if node['only_anonymous'] and not node['anonymous_path']:
errors.add(
((0, 'anonymous_path'), errno.EINVAL, 'This field is required for anonymous login.')
)
if node['tls'] is True and not node['tls_ssl_certificate']:
errors.add((0, 'tls_ssl_certificate'), 'TLS specified without certificate.')
if node['tls_ssl_certificate']:
cert = self.dispatcher.call_sync('crypto.certificate.query', [('id', '=', node['tls_ssl_certificate'])])
if not cert:
errors.add((0, 'tls_ssl_certificate'), 'SSL Certificate not found.')
if errors:
raise errors
return ['system']
开发者ID:jatinder-kumar-calsoft,项目名称:middleware,代码行数:34,代码来源:FTPPlugin.py
注:本文中的task.ValidationException类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论