本文整理汇总了Python中test_base.expectTrue函数的典型用法代码示例。如果您正苦于以下问题:Python expectTrue函数的具体用法?Python expectTrue怎么用?Python expectTrue使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了expectTrue函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_2_daemon_with_option
def test_2_daemon_with_option(self):
logger_path = test_base.getTestDirectory(test_base.CONFIG_DIR)
daemon = self._run_daemon(
{
"disable_watchdog": True,
"disable_extensions": True,
"disable_logging": False,
},
options_only={
"logger_path": logger_path,
"verbose": True,
})
self.assertTrue(daemon.isAlive())
def info_exists():
info_path = test_base.getLatestInfoLog(logger_path)
return os.path.exists(info_path)
# Wait for the daemon to flush to GLOG.
test_base.expectTrue(info_exists)
# Assign the variable after we have assurances it exists
info_path = test_base.getLatestInfoLog(logger_path)
self.assertTrue(os.path.exists(info_path))
# Lastly, verify that we have permission to read the file
data = ''
with open(info_path, 'r') as fh:
try:
data = fh.read()
except:
pass
self.assertTrue(len(data) > 0)
daemon.kill()
开发者ID:Centurion89,项目名称:osquery,代码行数:35,代码来源:test_osqueryd.py
示例2: test_3_example_extension
def test_3_example_extension(self):
daemon = self._run_daemon({"disable_watchdog": True})
self.assertTrue(daemon.isAlive())
# Get a python-based thrift client
client = EXClient(daemon.options["extensions_socket"])
test_base.expectTrue(client.open)
self.assertTrue(client.open())
em = client.getEM()
# Make sure there are no extensions registered
result = test_base.expect(em.extensions, 0)
self.assertEqual(len(result), 0)
# Make sure the extension process starts
extension = self._run_extension(path=daemon.options["extensions_socket"])
self.assertTrue(extension.isAlive())
# Now that an extension has started, check extension list
result = test_base.expect(em.extensions, 1)
self.assertEqual(len(result), 1)
ex_uuid = result.keys()[0]
ex_data = result[ex_uuid]
self.assertEqual(ex_data.name, "example")
self.assertEqual(ex_data.version, "0.0.1")
self.assertEqual(ex_data.min_sdk_version, "0.0.0")
# Get a python-based thrift client to the extension's service
client2 = EXClient(daemon.options["extensions_socket"], uuid=ex_uuid)
client2.open()
ex = client2.getEX()
self.assertEqual(ex.ping().code, 0)
# Make sure the extension can receive a call
em_time = em.call("table", "time", {"action": "columns"})
ex_time = ex.call("table", "time", {"action": "columns"})
print (em_time)
print (ex_time)
self.assertEqual(ex_time.status.code, 0)
self.assertTrue(len(ex_time.response) > 0)
self.assertTrue("name" in ex_time.response[0])
self.assertEqual(ex_time.status.uuid, ex_uuid)
# Make sure the extension includes a custom registry plugin
result = ex.call("table", "example", {"action": "generate"})
print (result)
self.assertEqual(result.status.code, 0)
self.assertEqual(len(result.response), 1)
self.assertTrue("example_text" in result.response[0])
self.assertTrue("example_integer" in result.response[0])
self.assertEqual(result.response[0]["example_text"], "example")
# Make sure the core can route to the extension
result = em.call("table", "example", {"action": "generate"})
print (result)
client2.close()
client.close()
extension.kill()
daemon.kill()
开发者ID:imaxxs,项目名称:osquery,代码行数:60,代码来源:test_extensions.py
示例3: test_3_daemon_lost_worker
def test_3_daemon_lost_worker(self):
# Test that killed workers are respawned by the watcher
if os.environ.get('SANITIZE') is not None:
return
daemon = self._run_daemon({
"allow_unsafe": True,
"disable_watchdog": False,
"ephemeral": True,
"disable_database": True,
"disable_logging": True,
})
self.assertTrue(daemon.isAlive())
# Check that the daemon spawned a child process
children = daemon.getChildren()
self.assertTrue(len(children) > 0)
# Kill only the child worker
os.kill(children[0], signal.SIGINT)
self.assertTrue(daemon.isDead(children[0]))
self.assertTrue(daemon.isAlive())
# Expect the children of the daemon to be respawned
def waitDaemonChildren():
children = daemon.getChildren()
return len(children) > 0
test_base.expectTrue(waitDaemonChildren)
children = daemon.getChildren()
self.assertTrue(len(children) > 0)
开发者ID:Centurion89,项目名称:osquery,代码行数:29,代码来源:test_osqueryd.py
示例4: test_9_external_config_update
def test_9_external_config_update(self):
# Start an extension without a daemon, with a timeout.
extension = self._run_extension(timeout=EXTENSION_TIMEOUT)
self.assertTrue(extension.isAlive())
# Now start a daemon
daemon = self._run_daemon({
"disable_watchdog": True,
"extensions_timeout": EXTENSION_TIMEOUT,
"extensions_socket": extension.options["extensions_socket"],
})
self.assertTrue(daemon.isAlive())
# Get a python-based thrift client to the manager and extension.
client = test_base.EXClient(extension.options["extensions_socket"])
test_base.expectTrue(client.open)
self.assertTrue(client.open(timeout=EXTENSION_TIMEOUT))
em = client.getEM()
# Need the manager to request the extension's UUID.
result = test_base.expect(em.extensions, 1)
self.assertTrue(result is not None)
ex_uuid = result.keys()[0]
client2 = test_base.EXClient(extension.options["extensions_socket"],
<<<<<<< HEAD
uuid=ex_uuid)
开发者ID:pathcl,项目名称:osquery,代码行数:26,代码来源:test_extensions.py
示例5: test_6_logger_mode
def test_6_logger_mode(self):
logger_path = os.path.join(test_base.CONFIG_DIR, "logger-mode-tests")
os.makedirs(logger_path)
test_mode = 0754 # Strange mode that should never exist
daemon = self._run_daemon(
{"disable_watchdog": True, "disable_extensions": True, "disable_logging": False},
options_only={"logger_path": logger_path, "logger_mode": test_mode, "verbose": True},
)
info_path = os.path.join(logger_path, "osqueryd.INFO")
self.assertTrue(daemon.isAlive())
def info_exists():
return os.path.exists(info_path)
# Wait for the daemon to flush to GLOG.
test_base.expectTrue(info_exists)
# Both log files should exist and have the given mode.
for fname in ["osqueryd.INFO", "osqueryd.results.log"]:
pth = os.path.join(logger_path, fname)
self.assertTrue(os.path.exists(pth))
rpath = os.path.realpath(info_path)
mode = os.stat(rpath).st_mode & 0777
self.assertEqual(mode, test_mode)
daemon.kill()
开发者ID:rgarbina,项目名称:osquery,代码行数:28,代码来源:test_osqueryd.py
示例6: test_2_daemon_api
def test_2_daemon_api(self):
daemon = self._run_daemon({"disable_watchdog": True})
self.assertTrue(daemon.isAlive())
# Get a python-based thrift client
client = EXClient(daemon.options["extensions_socket"])
test_base.expectTrue(client.open)
self.assertTrue(client.open())
em = client.getEM()
# List the number of extensions
print (em.ping())
result = test_base.expect(em.extensions, 0)
self.assertEqual(len(result), 0)
# Try the basic ping API
self.assertEqual(em.ping().code, 0)
# Try a query
response = em.query("select * from time")
self.assertEqual(response.status.code, 0)
self.assertEqual(len(response.response), 1)
self.assertTrue("seconds" in response.response[0].keys())
# Try to get the query columns
response = em.getQueryColumns("select seconds as s from time")
self.assertEqual(response.status.code, 0)
self.assertEqual(len(response.response), 1)
self.assertTrue("s" in response.response[0])
client.close()
daemon.kill()
开发者ID:imaxxs,项目名称:osquery,代码行数:31,代码来源:test_extensions.py
示例7: test_5_extension_timeout
def test_5_extension_timeout(self):
# Start an extension without a daemon, with a timeout.
extension = self._run_extension(timeout=EXTENSION_TIMEOUT)
self.assertTrue(extension.isAlive())
# Now start a daemon
daemon = self._run_daemon({
"disable_watchdog": True,
"extensions_socket": extension.options["extensions_socket"],
"verbose": True,
})
self.assertTrue(daemon.isAlive())
# Get a python-based thrift client
client = test_base.EXClient(extension.options["extensions_socket"])
test_base.expectTrue(client.open)
self.assertTrue(client.open(timeout=EXTENSION_TIMEOUT))
em = client.getEM()
# The waiting extension should have connected to the daemon.
result = test_base.expect(em.extensions, 1)
self.assertEqual(len(result), 1)
client.close()
daemon.kill(True)
extension.kill()
开发者ID:adityavs,项目名称:osquery,代码行数:26,代码来源:test_extensions.py
示例8: test_6_logger_mode
def test_6_logger_mode(self):
logger_path = test_base.getTestDirectory(test_base.CONFIG_DIR)
test_mode = 0754 # Strange mode that should never exist
daemon = self._run_daemon({
"disable_watchdog": True,
"disable_extensions": True,
"disable_logging": False,
},
options_only={
"logger_path": logger_path,
"logger_mode": test_mode,
"verbose": True,
})
info_path = os.path.join(logger_path, "osqueryd.INFO")
self.assertTrue(daemon.isAlive())
def info_exists():
return os.path.exists(info_path)
# Wait for the daemon to flush to GLOG.
test_base.expectTrue(info_exists)
# Both log files should exist, the results should have the given mode.
for fname in ['osqueryd.INFO', 'osqueryd.results.log']:
pth = os.path.join(logger_path, fname)
self.assertTrue(os.path.exists(pth))
# Only apply the mode checks to .log files.
if fname.find('.log') > 0:
rpath = os.path.realpath(pth)
mode = os.stat(rpath).st_mode & 0777
self.assertEqual(mode, test_mode)
daemon.kill()
开发者ID:defaultnamehere,项目名称:osquery,代码行数:33,代码来源:test_osqueryd.py
示例9: tearDown
def tearDown(self):
if os.path.exists(self.tmp_dir):
shutil.rmtree(self.tmp_dir)
# Ensure that even if events fail we always remove the services
if len(self.service_list_) > 0:
for s in self.service_list_:
stopService(s)
test_base.expectTrue(serviceDead)
uninstallService(s)
开发者ID:JonathanNogueira,项目名称:osquery,代码行数:10,代码来源:test_windows_service.py
示例10: setUp
def setUp(self):
self.daemon = self._run_daemon({
# The set of queries will hammer the daemon process.
"disable_watchdog": True,
})
self.assertTrue(self.daemon.isAlive())
# The sets of example tests will use the extensions API.s
self.client = test_base.EXClient(self.daemon.options["extensions_socket"])
test_base.expectTrue(self.client.open)
self.assertTrue(self.client.open())
self.em = self.client.getEM()
开发者ID:151706061,项目名称:osquery,代码行数:12,代码来源:test_example_queries.py
示例11: test_query_packs
def test_query_packs(self):
query_pack_path = test_base.CONFIG_DIR + "/test_pack.conf"
utils.write_config({
"queries": {
"simple_test": {
"query": "select * from time",
"interval": 60,
},
"simple_test2": {
"query": "select * from time",
"interval": 60,
"platform": "does_not_exist",
}
}
}, path=query_pack_path)
# Get a daemon process, loaded with the default test configuration.
# We'll add a config override (overwrite) for the "packs" key.
# THis will point a single pack at the config written above.
daemon = self._run_daemon({
"disable_watchdog": True,
},
overwrite={
"packs": {
"test_pack": query_pack_path
},
})
self.assertTrue(daemon.isAlive())
# Introspect into the daemon's query packs.
client = test_base.EXClient(daemon.options["extensions_socket"])
test_base.expectTrue(client.open)
self.assertTrue(client.open())
em = client.getEM()
# Every query from the pack(s) is added to the packs table.
def get_packs():
result = em.query("select * from osquery_packs")
return len(result.response) == 2
# Allow the daemon some lag to parse the pack content.
test_base.expectTrue(get_packs)
result = em.query("select * from osquery_packs")
self.assertEqual(len(result.response), 2)
# Only the applicable queries are added to the schedule.
# There will be len(pack_queries) - 1 since "simple_test2" is bound
# to an unknown/non-existing platform.
result = em.query("select * from osquery_schedule")
self.assertEqual(len(result.response), 1)
daemon.kill()
开发者ID:1514louluo,项目名称:osquery,代码行数:50,代码来源:test_additional.py
示例12: setUp
def setUp(self):
self.daemon = self._run_daemon({
# The set of queries will hammer the daemon process.
"disable_watchdog": True,
# Enable the 'hidden' flag "registry_exceptions" to prevent catching.
"registry_exceptions": True,
})
self.assertTrue(self.daemon.isAlive())
# The sets of example tests will use the extensions APIs.
self.client = test_base.EXClient(self.daemon.options["extensions_socket"])
test_base.expectTrue(self.client.open)
self.assertTrue(self.client.open())
self.em = self.client.getEM()
开发者ID:runt18,项目名称:osquery,代码行数:14,代码来源:test_example_queries.py
示例13: cleanOsqueryServices
def cleanOsqueryServices():
service_args = POWERSHELL_ARGS + ['$(Get-Service osqueryd_test_*).Name']
services = subprocess.check_output(service_args).split()
# No services found on the system
if len(services) == 0:
return
for service in services:
stopService(service)
# Local workaround as we need the service name
def isServiceStopped():
return serviceStopped(service)
test_base.expectTrue(isServiceStopped)
uninstallService(service)
开发者ID:JonathanNogueira,项目名称:osquery,代码行数:15,代码来源:test_windows_service.py
示例14: test_4_extension_dies
def test_4_extension_dies(self):
daemon = self._run_daemon({"disable_watchdog": True})
self.assertTrue(daemon.isAlive())
# Get a python-based thrift client
client = test_base.EXClient(daemon.options["extensions_socket"])
test_base.expectTrue(client.open)
self.assertTrue(client.open())
em = client.getEM()
# Make sure there are no extensions registered
result = test_base.expect(em.extensions, 0)
self.assertEqual(len(result), 0)
# Make sure the extension process starts
extension = self._run_extension(
path=daemon.options["extensions_socket"])
self.assertTrue(extension.isAlive())
# Now that an extension has started, check extension list
result = test_base.expect(em.extensions, 1)
self.assertEqual(len(result), 1)
# Kill the extension
extension.kill()
# Make sure the daemon detects the change
result = test_base.expect(em.extensions, 0, timeout=5)
self.assertEqual(len(result), 0)
# Make sure the extension restarts
extension = self._run_extension(
path=daemon.options["extensions_socket"])
self.assertTrue(extension.isAlive())
# With the reset there should be 1 extension again
result = test_base.expect(em.extensions, 1)
self.assertEqual(len(result), 1)
print (em.query("select * from example"))
# Now tear down the daemon
client.close()
daemon.kill()
# The extension should tear down as well
self.assertTrue(extension.isDead(extension.pid))
开发者ID:151706061,项目名称:osquery,代码行数:46,代码来源:test_extensions.py
示例15: test_2_daemon_with_option
def test_2_daemon_with_option(self):
logger_path = os.path.join(test_base.CONFIG_DIR, "logger-tests")
os.makedirs(logger_path)
daemon = self._run_daemon(
{"disable_watchdog": True, "disable_extensions": True, "disable_logging": False},
options_only={"logger_path": logger_path, "verbose": True},
)
info_path = os.path.join(logger_path, "osqueryd.INFO")
self.assertTrue(daemon.isAlive())
def info_exists():
return os.path.exists(info_path)
# Wait for the daemon to flush to GLOG.
test_base.expectTrue(info_exists)
self.assertTrue(os.path.exists(info_path))
daemon.kill()
开发者ID:Ramsey16,项目名称:osquery,代码行数:17,代码来源:test_osqueryd.py
示例16: test_9_external_config_update
def test_9_external_config_update(self):
# Start an extension without a daemon, with a timeout.
extension = self._run_extension(timeout=EXTENSION_TIMEOUT)
self.assertTrue(extension.isAlive())
# Now start a daemon
daemon = self._run_daemon({
"disable_watchdog": True,
"extensions_timeout": EXTENSION_TIMEOUT,
"extensions_socket": extension.options["extensions_socket"],
})
self.assertTrue(daemon.isAlive())
# Get a python-based thrift client to the manager and extension.
client = test_base.EXClient(extension.options["extensions_socket"])
test_base.expectTrue(client.open)
self.assertTrue(client.open(timeout=EXTENSION_TIMEOUT))
em = client.getEM()
# Need the manager to request the extension's UUID.
result = test_base.expect(em.extensions, 1)
self.assertTrue(result is not None)
ex_uuid = result.keys()[0]
client2 = test_base.EXClient(extension.options["extensions_socket"],
uuid=ex_uuid)
test_base.expectTrue(client2.open)
self.assertTrue(client2.open(timeout=EXTENSION_TIMEOUT))
ex = client2.getEX()
# Trigger an async update from the extension.
request = {
"action": "update",
"source": "test",
"data": "{\"options\": {\"config_plugin\": \"update_test\"}}"}
ex.call("config", "example", request)
# The update call in the extension should filter to the core.
options = em.options()
self.assertTrue("config_plugin" in options.keys())
self.assertTrue(options["config_plugin"], "update_test")
# Cleanup thrift connections and subprocesses.
client2.close()
client.close()
extension.kill()
daemon.kill()
开发者ID:adityavs,项目名称:osquery,代码行数:46,代码来源:test_extensions.py
示例17: test_7_extensions_autoload_watchdog
def test_7_extensions_autoload_watchdog(self):
loader = test_base.Autoloader(
[test_base.ARGS.build + "/osquery/example_extension.ext"])
daemon = self._run_daemon({"extensions_autoload": loader.path})
self.assertTrue(daemon.isAlive())
# Get a python-based thrift client
client = EXClient(daemon.options["extensions_socket"])
test_base.expectTrue(client.open)
self.assertTrue(client.open())
em = client.getEM()
# The waiting extension should have connected to the daemon.
result = test_base.expect(em.extensions, 1)
self.assertEqual(len(result), 1)
client.close()
daemon.kill(True)
开发者ID:imaxxs,项目名称:osquery,代码行数:18,代码来源:test_extensions.py
示例18: test_6_logger_mode
def test_6_logger_mode(self):
logger_path = test_base.getTestDirectory(test_base.CONFIG_DIR)
test_mode = 0754 # Strange mode that should never exist
daemon = self._run_daemon(
{
"disable_watchdog": True,
"disable_extensions": True,
"disable_logging": False,
},
options_only={
"logger_path": logger_path,
"logger_mode": test_mode,
"verbose": True,
})
results_path = os.path.join(logger_path, "osqueryd.results.log")
self.assertTrue(daemon.isAlive())
# Wait for the daemon to write the info log to disk before continuing
def info_exists():
info_path = test_base.getLatestInfoLog(logger_path)
return os.path.exists(info_path)
info_path = test_base.getLatestInfoLog(logger_path)
def results_exists():
return os.path.exists(results_path)
# Wait for the daemon to flush to GLOG.
test_base.expectTrue(info_exists)
test_base.expectTrue(results_exists)
# Both log files should exist, the results should have the given mode.
for pth in [info_path, results_path]:
self.assertTrue(os.path.exists(pth))
# Only apply the mode checks to .log files.
# TODO: Add ACL checks for Windows logs
if pth.find('.log') > 0 and os.name != "nt":
rpath = os.path.realpath(pth)
mode = os.stat(rpath).st_mode & 0777
self.assertEqual(mode, test_mode)
daemon.kill()
开发者ID:Centurion89,项目名称:osquery,代码行数:43,代码来源:test_osqueryd.py
示例19: test_6_extensions_autoload
def test_6_extensions_autoload(self):
loader = test_base.Autoloader("/tmp/osqueryd-temp-ext.load",
[test_base.ARGS.build + "/osquery/example_extension.ext"])
daemon = self._run_daemon({
"disable_watchdog": True,
"extensions_autoload": loader.path,
})
self.assertTrue(daemon.isAlive())
# Get a python-based thrift client
client = EXClient()
test_base.expectTrue(client.open)
self.assertTrue(client.open())
em = client.getEM()
# The waiting extension should have connected to the daemon.
result = test_base.expect(em.extensions, 1)
self.assertEqual(len(result), 1)
client.close()
daemon.kill(True)
开发者ID:jreese,项目名称:osquery,代码行数:21,代码来源:test_extensions.py
示例20: test_1_install_run_stop_uninstall_windows_service
def test_1_install_run_stop_uninstall_windows_service(self):
name = 'osqueryd_test_{}'.format(self.test_instance)
code, _ = installService(name, self.bin_path)
self.assertEqual(code, 0)
self.service_list_.append(name)
code = startService(name, '--flagfile', self.flagfile)
self.assertEqual(code, 0)
# Ensure the service is online before proceeding
test_base.expectTrue(serviceAlive)
_, output = queryService(name)
self.assertEqual(output, '4RUNNING')
# The daemon should not be able to load if the service is running
_, stderr = self.runDaemon(
'--allow_unsafe', '--verbose', '--config_path', self.config_path,
'--database_path', self.database_path, '--logger_path',
self.log_path, '--pidfile', self.pidfile)
self.assertNotEqual(stderr.find('is already running'), -1)
if code == 0:
code = stopService(name)
self.assertEqual(code, 0)
test_base.expectTrue(serviceDead)
self.assertTrue(serviceDead())
_, output = queryService(name)
self.assertEqual(output, '1STOPPED')
code, _ = uninstallService(name)
self.assertEqual(code, 0)
self.service_list_.remove(name)
# Make sure the service no longer exists, error code 1060
code, _ = queryService(name)
self.assertEqual(code, 1060)
开发者ID:JonathanNogueira,项目名称:osquery,代码行数:40,代码来源:test_windows_service.py
注:本文中的test_base.expectTrue函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论