本文整理汇总了Python中pwd.getpwuid函数的典型用法代码示例。如果您正苦于以下问题:Python getpwuid函数的具体用法?Python getpwuid怎么用?Python getpwuid使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了getpwuid函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: CheckedMakeDir
def CheckedMakeDir(dirname,perms=0,uid=0,gid=0):
if not dirname:
return
if os.path.exists(dirname):
# Directory does already exist
if not os.path.isdir(dirname):
sys.stderr.write('Warning: %s already exists but is no directory.\n' % (dirname))
else:
# Create directory
try:
os.makedirs(dirname)
sys.stdout.write('Created directory %s\n' % (dirname))
except OSError:
sys.stderr.write('Error: Could not create directory %s.\n' % (dirname))
return
# Get current file stat info
fstat = os.stat(dirname)
if perms:
os.chmod(dirname,perms)
sys.stdout.write('Changed permissions of %s to %o\n' % (dirname,perms))
if (uid and fstat[stat.ST_UID]!=uid) or \
(gid and fstat[stat.ST_GID]!=gid):
if not uid:
uid=fstat[stat.ST_UID]
if not gid:
gid=pwd.getpwuid(uid)[3]
os.chown(dirname,uid,gid)
sys.stdout.write('Changed owner/group of %s to %s.%s\n' % (dirname,pwd.getpwuid(uid)[0],grp.getgrgid(gid)[0]))
开发者ID:joshuacoddingyou,项目名称:python,代码行数:33,代码来源:ca-make.py
示例2: to_uid
def to_uid(name): # NOQA
"""Return an uid, given a user name.
If the name is an integer, make sure it's an existing uid.
If the user name is unknown, raises a ValueError.
"""
try:
name = int(name)
except ValueError:
pass
if isinstance(name, int):
try:
pwd.getpwuid(name)
return name
except KeyError:
raise ValueError("%r isn't a valid user id" % name)
from circus.py3compat import string_types # circular import fix
if not isinstance(name, string_types):
raise TypeError(name)
try:
return pwd.getpwnam(name).pw_uid
except KeyError:
raise ValueError("%r isn't a valid user name" % name)
开发者ID:PaulNendick,项目名称:circus,代码行数:27,代码来源:util.py
示例3: check_writable
def check_writable( node, uid=None ):
if uid == None:
user = pwd.getpwuid( get_uid() )
else:
user = pwd.getpwuid( uid )
s = os.stat( node )
groups = [g for g in grp.getgrall() if user.pw_name in g.gr_mem]
groups.append( user.pw_gid )
cmd = ''
if s.st_uid == user.pw_uid:
# user owns the file
if s.st_mode & stat.S_IWUSR == 0:
cmd = 'chmod u+w %s' %(node)
if os.path.isdir( node ) and s.st_mode & stat.S_IXUSR == 0:
cmd = 'chmod u+wx %s' %(node)
elif s.st_gid in groups:
if s.st_mode & stat.S_IWGRP == 0:
cmd = 'chmod g+w %s' %(node)
if os.path.isdir( node ) and s.st_mode & stat.S_IXGRP == 0:
cmd = 'chmod g+wx %s' %(node)
else:
if s.st_mode & stat.S_IWOTH == 0:
cmd = 'chmod o+w %s' %(node)
if os.path.isdir( node ) and s.st_mode & stat.S_IXOTH == 0:
cmd = 'chmod o+wx %s' %(node)
if cmd != '':
raise RuntimeError( node, 'Not writable (fix with %s)' %(cmd) )
开发者ID:Svedrin,项目名称:spectrum,代码行数:29,代码来源:env.py
示例4: is_user
def is_user(value, min=None, max=None):
"""
Check whether username or uid as argument exists.
if this function recieved username, convert uid and exec validation.
"""
if type(value) == str:
try:
entry = pwd.getpwnam(value)
value = entry.pw_uid
except KeyError:
err_message = ('{0}: No such user.'.format(value))
raise validate.VdtValueError(err_message)
return value
elif type(value) == int:
try:
pwd.getpwuid(value)
except KeyError:
err_message = ('{0}: No such user.'.format(value))
raise validate.VdtValueError(err_message)
return value
else:
err_message = ('Please, use str or int to "user" parameter.')
raise validate.VdtTypeError(err_message)
开发者ID:JumpeiArashi,项目名称:blackbird,代码行数:28,代码来源:configread.py
示例5: getattr
def getattr(self, path, fh):
now = time() # FIXME:
uid = pwd.getpwuid(os.getuid()).pw_uid
gid = pwd.getpwuid(os.getuid()).pw_gid
if self.vdb.is_dir(path):
try:
size = self.vdb.size(path)
except:
raise OSError(ENOENT, "")
if platform.system() == "Darwin":
st_nlink = size
elif platform.system() == "Linux":
st_nlink = size + 2
return dict(st_mode=(S_IFDIR|0766), st_ctime=now, st_mtime=now, st_atime=now, st_nlink=st_nlink, st_uid=uid, st_gid=gid)
else:
try:
data = self.vdb.read(path)
except:
raise OSError(ENOENT, "")
if data == "null":
raise OSError(ENOENT, "")
return dict(st_mode=(S_IFREG|0666), st_size=len(data), st_ctime=now, st_mtime=now, st_atime=now, st_nlink=1, st_uid=uid, st_gid=gid)
开发者ID:hassy,项目名称:fuse-vertexdb,代码行数:26,代码来源:vertexdb_fs.py
示例6: demote
def demote(self, uid):
try:
username = pwd.getpwuid(uid).pw_name
gid = pwd.getpwuid(uid).pw_gid
except KeyError:
username = None
gid = uid
if os.getuid() == uid:
return
else:
if os.getuid() != 0:
logging.warn('Running as a limited user, setuid() unavailable!')
return
logging.info(
'Worker %s is demoting to UID %s / GID %s...',
os.getpid(),
uid,
gid
)
groups = [
g.gr_gid
for g in grp.getgrall()
if username in g.gr_mem or g.gr_gid == gid
]
os.setgroups(groups)
os.setgid(gid)
os.setuid(uid)
logging.info(
'...done, new EUID %s EGID %s',
os.geteuid(),
os.getegid()
)
开发者ID:HasClass0,项目名称:ajenti,代码行数:35,代码来源:worker.py
示例7: GET
def GET(self, uid, gid):
web.header("Content-type","text/plain")
uid, gid = map(int, (uid, gid))
# Security
if uid < 500 or gid < 500:
yield "Invalid UID (%d) or GID (%d)\n" % (uid, gid)
return
try: pwd.getpwuid(uid)
except KeyError:
yield "UID (%d) does not exist\n" % (uid, gid)
return
for k in stop_program().GET(): yield k
yield "Starting program with %d/%d\n" % (uid, gid)
#p = subprocess.Popen(EXEC_SH, shell=True,
# preexec_fn=lambda: change_user(uid,gid)) # This fails when running as daemon
rpipe, wpipe = os.pipe() # Reference: http://ameblo.jp/oyasai10/entry-10615215673.html
pid = os.fork()
if pid == 0: # Child
os.close(rpipe)
wpipe = os.fdopen(wpipe, "w")
change_user(uid,gid)
p = subprocess.Popen(EXEC_SH, shell=True)
wpipe.write("%d\n"%p.pid)
sys.exit()
else: # Parent
os.close(wpipe)
rpipe = os.fdopen(rpipe, "r")
pid = int(rpipe.readline().strip())
open(PID_FILE, "w").write("%d"%pid)
os.wait() # Wait child
开发者ID:keitaroyam,项目名称:yamtbx,代码行数:34,代码来源:shika_daemon.py
示例8: init_default_configs
def init_default_configs(self):
conf = config.config()
dic = {}
dic[conf.TEMP_DIR] = "/home/%s/.paralite-tmp" % (pwd.getpwuid(os.getuid())[0])
dic[conf.LOG_DIR] = "/home/%s/.paralite-log" % (pwd.getpwuid(os.getuid())[0])
dic[conf.BLOCK_SIZE] = 0
return dic
开发者ID:PayasR,项目名称:paralite,代码行数:7,代码来源:paraLite.py
示例9: valid_uid
def valid_uid(uid):
"""returns bool of whether uid can be resolved to a user"""
try:
pwd.getpwuid(uid)
return True
except Exception:
return False
开发者ID:wavesaudio,项目名称:instl,代码行数:7,代码来源:dockutil.py
示例10: drop_privileges
def drop_privileges(uid_name="nobody", gid_name="nogroup"):
import pwd, grp
starting_uid = os.getuid()
starting_gid = os.getgid()
starting_uid_name = pwd.getpwuid(starting_uid)[0]
if os.getuid() != 0:
# We're not root so, like, whatever dude
return
if starting_uid == 0:
# Get the uid/gid from the name
running_uid = pwd.getpwnam(uid_name)[2]
# running_gid = grp.getgrnam(gid_name)[2]
# Try setting the new uid/gid
# os.setgid(running_gid)
os.setuid(running_uid)
new_umask = 077
old_umask = os.umask(new_umask)
sys.stderr.write("drop_privileges: Old umask: %s, new umask: %s\n" % (oct(old_umask), oct(new_umask)))
final_uid = os.getuid()
final_gid = os.getgid()
sys.stderr.write("drop_privileges: running as %s/%s\n" % (pwd.getpwuid(final_uid)[0], grp.getgrgid(final_gid)[0]))
开发者ID:bkeep,项目名称:ganglia_contrib,代码行数:29,代码来源:daemon.py
示例11: test_sanity_rfc2307_bis
def test_sanity_rfc2307_bis(ldap_conn, sanity_rfc2307_bis):
passwd_pattern = ent.contains_only(
dict(name="user1", passwd="*", uid=1001, gid=2001, gecos="1001", dir="/home/user1", shell="/bin/bash"),
dict(name="user2", passwd="*", uid=1002, gid=2002, gecos="1002", dir="/home/user2", shell="/bin/bash"),
dict(name="user3", passwd="*", uid=1003, gid=2003, gecos="1003", dir="/home/user3", shell="/bin/bash"),
)
ent.assert_passwd(passwd_pattern)
group_pattern = ent.contains_only(
dict(name="group1", passwd="*", gid=2001, mem=ent.contains_only()),
dict(name="group2", passwd="*", gid=2002, mem=ent.contains_only()),
dict(name="group3", passwd="*", gid=2003, mem=ent.contains_only()),
dict(name="empty_group1", passwd="*", gid=2010, mem=ent.contains_only()),
dict(name="empty_group2", passwd="*", gid=2011, mem=ent.contains_only()),
dict(name="two_user_group", passwd="*", gid=2012, mem=ent.contains_only("user1", "user2")),
dict(name="group_empty_group", passwd="*", gid=2013, mem=ent.contains_only()),
dict(name="group_two_empty_groups", passwd="*", gid=2014, mem=ent.contains_only()),
dict(name="one_user_group1", passwd="*", gid=2015, mem=ent.contains_only("user1")),
dict(name="one_user_group2", passwd="*", gid=2016, mem=ent.contains_only("user2")),
dict(name="group_one_user_group", passwd="*", gid=2017, mem=ent.contains_only("user1")),
dict(name="group_two_user_group", passwd="*", gid=2018, mem=ent.contains_only("user1", "user2")),
dict(name="group_two_one_user_groups", passwd="*", gid=2019, mem=ent.contains_only("user1", "user2")),
)
ent.assert_group(group_pattern)
with pytest.raises(KeyError):
pwd.getpwnam("non_existent_user")
with pytest.raises(KeyError):
pwd.getpwuid(1)
with pytest.raises(KeyError):
grp.getgrnam("non_existent_group")
with pytest.raises(KeyError):
grp.getgrgid(1)
开发者ID:jhrozek,项目名称:sssd,代码行数:33,代码来源:test_enumeration.py
示例12: drop_privileges
def drop_privileges(uid_name='nobody', gid_name='nogroup'):
print("Init: Running as {0}/{1}.".format(pwd.getpwuid(os.getuid())[0], grp.getgrgid(os.getgid())[0]))
wanted_uid = pwd.getpwnam(uid_name)[2]
wanted_gid = grp.getgrnam(gid_name)[2]
pid = gevent.fork()
if pid == 0:
# child
print 'starting child process'
child_process = gevent.spawn(root_process)
child_process.join()
print 'Child done:', child_process.successful()
oschameleon.osfuscation.flush_tables()
print 'Child exit'
else:
# parent
os.setgid(wanted_gid)
os.setuid(wanted_uid)
new_uid_name = pwd.getpwuid(os.getuid())[0]
new_gid_name = grp.getgrgid(os.getgid())[0]
print("Parent: Privileges dropped, running as {0}/{1}.".format(new_uid_name, new_gid_name))
while True:
try:
gevent.sleep(1)
print 'Parent: ping'
except KeyboardInterrupt:
break
开发者ID:4sp1r3,项目名称:oschameleon,代码行数:27,代码来源:root_fork.py
示例13: _unixdomainhandler
def _unixdomainhandler():
unixsocket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
os.remove("/var/run/confluent/api.sock")
except OSError: # if file does not exist, no big deal
pass
unixsocket.bind("/var/run/confluent/api.sock")
os.chmod("/var/run/confluent/api.sock",
stat.S_IWOTH | stat.S_IROTH | stat.S_IWGRP |
stat.S_IRGRP | stat.S_IWUSR | stat.S_IRUSR)
atexit.register(removesocket)
unixsocket.listen(5)
while True:
cnn, addr = unixsocket.accept()
creds = cnn.getsockopt(socket.SOL_SOCKET, SO_PEERCRED,
struct.calcsize('iII'))
pid, uid, gid = struct.unpack('iII', creds)
skipauth = False
if uid in (os.getuid(), 0):
#this is where we happily accept the person
#to do whatever. This allows the server to
#start with no configuration whatsoever
#and yet still be configurable by some means
skipauth = True
try:
authname = pwd.getpwuid(uid).pw_name
except:
authname = "UNKNOWN SUPERUSER"
else:
try:
authname = pwd.getpwuid(uid).pw_name
except KeyError:
cnn.close()
return
eventlet.spawn_n(sessionhdl, cnn, authname, skipauth)
开发者ID:chenglch,项目名称:confluent,代码行数:35,代码来源:sockapi.py
示例14: test_set_desired_file_access
def test_set_desired_file_access(self):
#setup
file_path = rc("temp-keyfile")
if os.path.exists(file_path):
os.remove(file_path)
with open(file_path, "w") as file:
file.write("content")
#check assumptions
stat = os.stat(file_path)
self.assertNotEqual('600', oct(stat.st_mode)[-3:])
current_user = pwd.getpwuid(os.getuid())[0]
if current_user != settings.pacemaker_uname:
file_user = pwd.getpwuid(stat.st_uid)[0]
self.assertNotEqual(file_user, settings.pacemaker_uname)
current_group = grp.getgrgid(os.getgid())[0]
if current_group != settings.pacemaker_gname:
file_group = grp.getgrgid(stat.st_gid)[0]
self.assertNotEqual(file_group, settings.pacemaker_gname)
#run tested method
env.set_keyfile_access(file_path)
#check
stat = os.stat(file_path)
self.assertEqual('600', oct(stat.st_mode)[-3:])
file_user = pwd.getpwuid(stat.st_uid)[0]
self.assertEqual(file_user, settings.pacemaker_uname)
file_group = grp.getgrgid(stat.st_gid)[0]
self.assertEqual(file_group, settings.pacemaker_gname)
开发者ID:wyatt88,项目名称:pcs,代码行数:32,代码来源:test_env.py
示例15: test_mc_zero_timeout
def test_mc_zero_timeout(ldap_conn, zero_timeout_rfc2307):
"""
Test that the memory cache is not created at all with memcache_timeout=0
"""
# No memory cache files must be created
assert len(os.listdir(config.MCACHE_PATH)) == 0
ent.assert_passwd_by_name(
'user1',
dict(name='user1', passwd='*', uid=1001, gid=2001,
gecos='1001', shell='/bin/bash'))
ent.assert_passwd_by_uid(
1001,
dict(name='user1', passwd='*', uid=1001, gid=2001,
gecos='1001', shell='/bin/bash'))
ent.assert_group_by_name("group1", dict(name="group1", gid=2001))
ent.assert_group_by_gid(2001, dict(name="group1", gid=2001))
stop_sssd()
# sssd is stopped; so the memory cache should not be used
# in long living clients (py.test in this case)
with pytest.raises(KeyError):
pwd.getpwnam('user1')
with pytest.raises(KeyError):
pwd.getpwuid(1001)
with pytest.raises(KeyError):
grp.getgrnam('group1')
with pytest.raises(KeyError):
grp.getgrgid(2001)
开发者ID:SSSD,项目名称:sssd,代码行数:31,代码来源:test_memory_cache.py
示例16: set_user
def set_user(uid, assign_all_groups):
try:
# Get user's default group and set it to current process to make sure
# file permissions are inherited correctly
# Solves issue with permission denied for JSON files
gid = pwd.getpwuid(uid).pw_gid
import grp
os.setgid(gid)
if assign_all_groups:
# Added lines to assure read/write permission for groups
user = pwd.getpwuid(uid).pw_name
groups = [g.gr_gid for g in grp.getgrall() if user in g.gr_mem]
os.setgroups(groups)
os.setuid(uid)
except OSError as e:
if e.errno == errno.EPERM:
sys.stderr.write("error: setuid(%d) failed: permission denied. Did you setup 'sudo' correctly for this script?\n" % uid)
exit(1)
else:
pass
if os.getuid() == 0:
sys.stderr.write("error: UID is 0 (root) after changing user. This script should not be run as root. aborting.\n")
exit(1)
if os.geteuid() == 0:
sys.stderr.write("error: EUID is 0 (root) after changing user. This script should not be run as root. aborting.\n")
exit(1)
开发者ID:msauria,项目名称:galaxy,代码行数:30,代码来源:drmaa_external_runner.py
示例17: get_home
def get_home():
if "SUDO_USER" in os.environ:
return os.path.expanduser("~" + os.environ["SUDO_USER"])
elif "PKEXEC_UID" in os.environ:
return os.path.expanduser("~" + pwd.getpwuid(int(os.environ["PKEXEC_UID"])).pw_name)
else:
return os.path.expanduser("~" + pwd.getpwuid(os.getuid()).pw_name)
开发者ID:pombreda,项目名称:Cinnamon-Installer,代码行数:7,代码来源:setup.py
示例18: TestOwnerGroupMode
def TestOwnerGroupMode(DestinationPath, SourcePath, fc):
stat_info = os.lstat(DestinationPath)
if SourcePath:
stat_info_src = os.lstat(SourcePath)
if fc.Owner:
Specified_Owner_ID = pwd.getpwnam(fc.Owner)[2]
if Specified_Owner_ID != pwd.getpwuid(stat_info.st_uid)[2]:
return False
elif SourcePath:
# Owner wasn't specified, if SourcePath is specified then check that the Owners match
if pwd.getpwuid(stat_info.st_uid)[2] != pwd.getpwuid(stat_info_src.st_uid)[2]:
return False
if fc.Group:
Specified_Group_ID = grp.getgrnam(fc.Group)[2]
if Specified_Group_ID != grp.getgrgid(stat_info.st_gid)[2]:
return False
elif SourcePath:
# Group wasn't specified, if SourcePath is specified then check that the Groups match
if grp.getgrgid(stat_info.st_gid)[2] != grp.getgrgid(stat_info_src.st_gid)[2]:
return False
# Mode is irrelevant to symlinks
if not os.path.islink(DestinationPath):
if fc.Mode:
if str(oct(stat_info.st_mode))[-3:] != fc.Mode:
return False
elif SourcePath:
# Mode wasn't specified, if SourcePath is specified then check that the Modes match
if str(oct(stat_info.st_mode))[-3:] != str(oct(stat_info_src.st_mode))[-3:]:
return False
return True
开发者ID:40a,项目名称:WPSDSCLinux,代码行数:35,代码来源:nxFile.py
示例19: testStartCopy
def testStartCopy(self):
run_name = '000000_RUNDIR_1234_ABCDEFG'
source_run_root = os.path.join(self.run_root, 'source')
source_rundir = os.path.join(source_run_root, run_name)
os.makedirs(source_rundir)
testfile = 'test.txt'
with open(os.path.join(source_rundir, testfile), 'w') as f:
f.write("Hello")
dest_run_root = os.path.join(self.run_root, 'dest')
dest_host = 'localhost'
dest_group = grp.getgrgid(pwd.getpwuid(os.getuid()).pw_gid).gr_name
dest_user = pwd.getpwuid(os.getuid()).pw_name
os.makedirs(dest_run_root)
config = {
'COPY_DEST_HOST': dest_host,
'COPY_DEST_USER': dest_user,
'COPY_DEST_GROUP': dest_group,
'COPY_DEST_RUN_ROOT': dest_run_root,
'COPY_SOURCE_RUN_ROOTS': [source_run_root],
}
# Initialize autocopy and create the source root
a = Autocopy(log_file=self.tmp_file.name, no_email=True, test_mode_lims=True, config=config, errors_to_terminal=DEBUG)
a.update_rundirs_monitored()
rundir = a.get_rundir(dirname=run_name)
a.start_copy(rundir)
retcode = rundir.copy_proc.wait()
self.assertEqual(retcode, 0)
with open(os.path.join(dest_run_root, run_name, testfile), 'r') as f:
text = f.read()
self.assertTrue(re.search("Hello", text))
a.cleanup()
开发者ID:StanfordBioinformatics,项目名称:autocopy,代码行数:35,代码来源:test_autocopy.py
示例20: test_request_and_spawn
def test_request_and_spawn(capfd, request_and_spawn):
request_and_spawn()
captured = capfd.readouterr()
print('**************\n%s\n**************' % captured.err)
if request_and_spawn.kind != 'running':
assert '%s:%s' % (pwd.getpwuid(os.getuid())[0], os.getpid()) in captured.err
assert 'completed. Passing back results to' in captured.err
assert 'Queues => 0 workspaces' in captured.err
request_and_spawn(wait=False)
request_and_spawn(wait=False)
request_and_spawn(wait=False)
request_and_spawn(wait=False)
request_and_spawn()
# wait for process list to settle (eg: there might be one or two extra processes that will exit because the lock
# is already acquired - see StampedeStub)
start = time.time()
while len(get_children()) > 1 and time.time() - start < TIMEOUT:
time.sleep(0.1)
children = get_children()
assert len(children) == 1
for child in children:
child.kill()
captured = capfd.readouterr()
print('##############\n%s\n##############' % captured.err)
if request_and_spawn.kind != 'running':
assert '%s:%s' % (pwd.getpwuid(os.getuid())[0], os.getpid()) in captured.err
assert 'completed. Passing back results to' in captured.err
assert 'Queues => 0 workspaces' in captured.err
开发者ID:ionelmc,项目名称:python-stampede,代码行数:34,代码来源:test_client.py
注:本文中的pwd.getpwuid函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论