本文整理汇总了Python中watchdog.observers.Observer类的典型用法代码示例。如果您正苦于以下问题:Python Observer类的具体用法?Python Observer怎么用?Python Observer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Observer类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: serve
def serve(host='localhost', port=8000):
"""Runs the development server at given `host` and `port`,
watches the changes and regenerates the site.
"""
http_server = BaseHTTPServer.HTTPServer(
(host, port), SimpleHTTPServer.SimpleHTTPRequestHandler)
# Event to be set when the project has changes and needs to be rebuilt
new_changes_event = threading.Event()
# Both `shutdowner` and `observer` are daemon threads
shutdowner = Shutdowner(http_server, new_changes_event)
shutdowner.start()
observer = Observer()
observer.start()
project_dir = os.getcwd()
www_dir = os.path.join(project_dir, 'www')
event_handler = EventHandler(project_dir, new_changes_event)
observer.schedule(event_handler, path=project_dir, recursive=True)
from carcade.cli import build # To resolve a circular import
while True:
os.chdir(project_dir)
build(to=www_dir, atomically=True)
if not os.path.exists(www_dir):
return 1
os.chdir(www_dir)
http_server.serve_forever()
开发者ID:0x0all,项目名称:carcade,代码行数:31,代码来源:server.py
示例2: RoleBasedAuthorizationProvider
class RoleBasedAuthorizationProvider(AbstractAuthorizationProvider,
FileSystemEventHandler):
def __init__(self, role_loader, roles_config_file_path):
self.lgr = logging.getLogger(FLASK_SECUREST_LOGGER_NAME)
self.role_loader = role_loader
self.permissions_by_roles = None
self.roles_config_file_path = os.path.abspath(roles_config_file_path)
self.observer = Observer()
self.observer.schedule(self,
path=os.path.dirname(
self.roles_config_file_path),
recursive=False)
self.load_roles_config()
self.observer.start()
def load_roles_config(self):
try:
with open(self.roles_config_file_path, 'r') as config_file:
self.permissions_by_roles = yaml.safe_load(config_file.read())
self.lgr.info('Loading of roles configuration ended '
'successfully')
except (yaml.parser.ParserError, IOError) as e:
err = 'Failed parsing {role_config_file} file. Error: {error}.' \
.format(role_config_file=self.roles_config_file_path, error=e)
self.lgr.warning(err)
raise ValueError(err)
def on_modified(self, event):
if os.path.abspath(event.src_path) == self.roles_config_file_path:
self.load_roles_config()
def authorize(self):
target_endpoint = rest_security.get_endpoint()
target_method = rest_security.get_http_method()
roles = self.role_loader.get_roles()
return self._is_allowed(target_endpoint, target_method, roles) and \
not self._is_denied(target_endpoint, target_method, roles)
def _is_allowed(self, target_endpoint, target_method, user_roles):
return self._evaluate_permission_by_type(target_endpoint,
target_method, user_roles,
'allow')
def _is_denied(self, target_endpoint, target_method, user_roles):
return self._evaluate_permission_by_type(target_endpoint,
target_method, user_roles,
'deny')
def _evaluate_permission_by_type(self, target_endpoint, target_method,
user_roles, permission_type):
for role in user_roles:
role_permissions = self.permissions_by_roles.get(role,
{'allow': {},
'deny': {}})
relevant_permissions = role_permissions.get(permission_type, {})
if _is_permission_matching(target_endpoint, target_method,
relevant_permissions):
return True
return False
开发者ID:carriercomm,项目名称:flask-securest,代码行数:60,代码来源:role_based_authorization_provider.py
示例3: __init__
def __init__(self, zkconn, root_node_path, conf):
super(ZkFarmJoiner, self).__init__()
self.update_remote_timer = None
self.update_local_timer = None
self.zkconn = zkconn
self.conf = conf
self.node_path = "%s/%s" % (root_node_path, self.myip())
# force the hostname info key
info = conf.read()
info["hostname"] = gethostname()
conf.write(info)
zkconn.create(self.node_path, serialize(conf.read()), zc.zk.OPEN_ACL_UNSAFE, EPHEMERAL)
observer = Observer()
observer.schedule(self, path=conf.file_path, recursive=True)
observer.start()
zkconn.get(self.node_path, self.node_watcher)
while True:
with self.cv:
self.wait()
开发者ID:sectioneight,项目名称:zkfarmer,代码行数:25,代码来源:watcher.py
示例4: __init__
def __init__(self, dirpath='.', output='output'):
'''
local path for load config
'''
logger.info("Initialing BOSS")
if os.path.isdir(dirpath):
self.dirpath = dirpath
logger.info("loading job config folder: " + dirpath)
else:
logger.info(dirpath + " is invalid, use default path instead")
self.output = output
logger.info("Setup output folder: " + output)
if not os.path.isdir(output):
logger.info("target directory "
+ output
+ " doesn't exist, creating..")
os.makedirs(output)
self.scheduler = BackgroundScheduler()
self.scheduler.start()
self.load_dir(dirpath)
event_handler = JsonHandler(patterns=["*.json"],
ignore_directories=True)
event_handler.set_handler(oncreated=self.load,
onmodified=self.load,
ondeleted=self.remove)
observer = Observer()
observer.schedule(event_handler, self.dirpath, recursive=True)
observer.start()
开发者ID:Mozilla-TWQA,项目名称:mozMinions,代码行数:31,代码来源:boss.py
示例5: _setup_watchdog
def _setup_watchdog(self):
# Monkey-patch Watchdog to
# - Set the Windows hack delay to 0 in WindowsApiEmitter, otherwise we might miss some events
# - Increase the ReadDirectoryChangesW buffer size for Windows
if self._windows:
try:
import watchdog.observers
watchdog.observers.read_directory_changes.WATCHDOG_TRAVERSE_MOVED_DIR_DELAY = 0
watchdog.observers.winapi.BUFFER_SIZE = self._windows_watchdog_event_buffer
except:
log.trace('read_directory_changes import error', exc_info=True)
log.warn('Cannot import read_directory_changes, probably under Windows XP'
', watchdog will fall back on polling')
from watchdog.observers import Observer
log.debug("Watching FS modification on : %s", self.client.base_folder)
self._event_handler = DriveFSEventHandler(self)
self._root_event_handler = DriveFSRootEventHandler(self, os.path.basename(self.client.base_folder))
self._observer = Observer()
self._observer.schedule(self._event_handler, self.client.base_folder, recursive=True)
self._observer.start()
self._check_watchdog()
self._root_observer = Observer()
self._root_observer.schedule(self._root_event_handler, os.path.dirname(self.client.base_folder), recursive=False)
self._root_observer.start()
开发者ID:hendrix513,项目名称:nuxeo-drive,代码行数:25,代码来源:local_watcher.py
示例6: server
def server (options, info):
""" Run server and monitor for changes """
class MyEventHandler(FileSystemEventHandler):
def on_any_event(self, event):
print "Changes detected"
print "Building html"
paver.doctools.html()
settings = {
"debug":True
}
event_handler = MyEventHandler()
application = tornado.web.Application(
[(r"/", tornado.web.RedirectHandler,dict(url="/index.html")),
(r"/(.*)",tornado.web.StaticFileHandler, {"path": "build/html"})],
**settings)
print "Running server on port 8888"
observer= Observer()
observer.schedule(event_handler, "source", recursive=True)
observer.start()
application.listen(8888)
while True:
print "Starting the server"
tornado.ioloop.IOLoop.instance().start()
开发者ID:alvistar,项目名称:sphinx-paver,代码行数:32,代码来源:pavement.py
示例7: PlexTVDaemon
class PlexTVDaemon(Daemon):
def __init__(self, pidfile, tv, patterns):
super(PlexTVDaemon, self).__init__(pidfile)
self._tv = tv
self._handler = PlexTVEventHandler(tv, patterns)
self._observer = Observer()
self._observer.schedule(self.handler, self.tv.source, recursive=True)
@property
def handler(self):
return self._handler
@property
def observer(self):
return self._observer
@property
def tv(self):
return self._tv
def run(self):
self.tv.clean_broken_links() and self.tv.create_all_links()
observer.start()
while True:
time.sleep(1)
def stop(self):
self.observer.stop()
self.observer.join()
super.stop()
开发者ID:ChadBurggraf,项目名称:plex-tv,代码行数:31,代码来源:plex_tv.py
示例8: FSWatcherBehavior
class FSWatcherBehavior(object):
"""File System Watcher behavior.
:Events:
`on_operations`
Fired when there is any event on watched dir/file.
"""
operations = ListProperty(None)
'''Contains the list of operations/event on wateched folder.
'''
path= StringProperty()
"Watched Path"
def __init__(self, **kwargs):
super(FSWatcherBehavior, self).__init__(**kwargs)
#Now, bind yourseilf on list of templates
self._observer = Observer()
self._event_handler = MyFileEventHandler(self)
def on_path(self, *args):
self._observer.schedule(self._event_handler, self.path)
self._observer.start()
print 'Start Watching'
开发者ID:opqopq,项目名称:BoardGameMaker,代码行数:25,代码来源:watched_directory.py
示例9: WatchedFolderTree
class WatchedFolderTree(TreeView):
path = StringProperty()
updated = BooleanProperty(False)
def __init__(self, *args, **kwargs):
TreeView.__init__(self, *args, **kwargs)
#Now, bind yourseilf on list of templates
self.observer = Observer()
self.event_handler = MyFileEventHandler(self)
def on_path(self, *args):
self.rebuilt()
self.observer.schedule(self.event_handler, self.path)
self.observer.start()
print 'starting', self.updated
def rebuilt(self):
print 'rebuilt called', self.updated
self.clear_widgets()
self.root.nodes = []
print "Nodes:", self.root.nodes
from os import listdir
for f in listdir(self.path):
self.add_node(TreeViewLabel(text=f, color_selected=(.6,.6,.6,.8)))
def on_updated(self, *args):
print 'on updated', args
if self.updated:
self.updated = False
self.rebuilt()
开发者ID:opqopq,项目名称:BoardGameMaker,代码行数:30,代码来源:watched_directory.py
示例10: watch
def watch(self):
"""publish any top-level files added to the drive"""
if self.watch_timer:
raise Exception("only one watch can be active on a ramdisk")
class DelayedEventHandler(FileSystemEventHandler):
"""
Event handler that sends file change messages only if no other event
occurs for that file within .1 seconds.
"""
previous_events = {}
def dispatch(self, event):
self.previous_events[event.src_path] = event
threading.Timer(0.5, self.check_time, args=[event]).start()
def check_time(self, event):
if self.previous_events[event.src_path] == event:
wx.CallAfter(
pub.sendMessage,
"ramdisk.files_added",
event_type=event.event_type,
path=event.src_path,
is_directory=event.is_directory,
)
observer = Observer()
observer.schedule(DelayedEventHandler(), self.path, recursive=True)
observer.start()
self.watch_timer = observer
开发者ID:jcushman,项目名称:password_change_tool,代码行数:32,代码来源:ramdisk.py
示例11: _register_observers
def _register_observers(self):
"""Setup a watcher to rebuild the nav whenever a file has changed."""
_this = self
class ContentHandler(FileSystemEventHandler):
"""Custom event handler for changed files."""
def on_modified(self, event):
logging.debug('%s "%s" was "%s"',
'Directory' if event.is_directory else "File",
event.src_path,
event.event_type)
_this.start()
event_handler = ContentHandler()
# Listen for content changes
self.content_observer = Observer()
self.content_observer.schedule(event_handler,
self.config['CONTENT_PATH'],
recursive=True)
self.content_observer.start()
# If we're debugging, listen for theme changes
if self.debug:
self.theme_observer = Observer()
self.theme_observer.schedule(event_handler,
self.config['THEME_FOLDER'],
recursive=True)
self.theme_observer.start()
开发者ID:crempp,项目名称:mdweb,代码行数:32,代码来源:MDSite.py
示例12: __init__
def __init__(self):
""" Watches file change events (creation, modification) in the
watched project.
"""
from baboon.baboon.plugins.git.monitor_git import EventHandlerGit
self.dancer = Dancer(sleeptime=1)
# All monitor will be stored in this dict. The key is the project name,
# the value is the monitor instance.
self.monitors = {}
try:
# Avoid to use iteritems (python 2.x) or items (python 3.x) in
# order to support both versions.
for project in sorted(config['projects']):
project_attrs = config['projects'][project]
project_path = os.path.expanduser(project_attrs['path'])
self.handler = EventHandlerGit(project_path)
monitor = Observer()
monitor.schedule(self.handler, project_path, recursive=True)
self.monitors[project_path] = monitor
except OSError as err:
self.logger.error(err)
raise BaboonException(err)
开发者ID:SeyZ,项目名称:baboon,代码行数:28,代码来源:monitor.py
示例13: tricks_from
def tricks_from(args):
from watchdog.observers import Observer
add_to_sys_path(path_split(args.python_path))
observers = []
for tricks_file in args.files:
observer = Observer(timeout=args.timeout)
if not os.path.exists(tricks_file):
raise IOError('cannot find tricks file: %s' % tricks_file)
config = load_config(tricks_file)
try:
tricks = config[CONFIG_KEY_TRICKS]
except KeyError:
raise KeyError("No `%s' key specified in %s." % (CONFIG_KEY_TRICKS, tricks_file))
if CONFIG_KEY_PYTHON_PATH in config:
add_to_sys_path(config[CONFIG_KEY_PYTHON_PATH])
dir_path = os.path.dirname(tricks_file)
if not dir_path:
dir_path = os.path.relpath(os.getcwd())
schedule_tricks(observer, tricks, dir_path, args.recursive)
observer.start()
observers.append(observer)
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
for o in observers:
o.unschedule_all()
o.stop()
for o in observers:
o.join()
开发者ID:connoryang,项目名称:dec-eve-serenity,代码行数:34,代码来源:watchmedo.py
示例14: _watchdog
def _watchdog(self):
"""Runs Watchdog to listen to filesystem events.
When first run, the `Cakefile` is touched to trigger the
initial build.
"""
if not hasattr(self.app, "static_url_path"):
from warnings import warn
warn(DeprecationWarning("static_path is called static_url_path since Flask 0.7"), stacklevel=2)
static_url_path = self.app.static_path
else:
static_url_path = self.app.static_url_path
static_dir = self.app.root_path + static_url_path
cakedir = os.path.join(static_dir, self.cakeparent)
# Setup Watchdog
handler = Events(cakedir=cakedir, tasks=self.tasks)
observer = Observer(timeout=5000)
observer.schedule(handler, path=cakedir, recursive=True)
observer.start()
# "Touch" the Cakefile to signal the initial build
cakefile = os.path.join(cakedir, "Cakefile")
with file(cakefile, "a"):
os.utime(cakefile, None)
开发者ID:zenweasel,项目名称:Flask-Cake,代码行数:30,代码来源:cake.py
示例15: _watch
def _watch(self, app, arguments, builder):
# By default we're watching for events in content directory.
watch_paths = [
app.conf['paths.content'],
]
# But it'd be nice to watch themes directories either.
for theme in app._themes:
if os.path.exists(theme):
watch_paths.append(theme)
observer = Observer()
for path in watch_paths:
observer.schedule(
_ChangeWatcher(builder, ignore=[
os.path.abspath(arguments.conf),
]),
path, recursive=True)
# We also should watch for user's settings explicitly, because
# they may located not in the content directory.
if os.path.exists(arguments.conf):
observer.schedule(
_ChangeWatcher(builder, recreate_app=True, watch_for=[
os.path.abspath(arguments.conf),
]),
os.path.abspath(os.path.dirname(arguments.conf)))
return observer
开发者ID:olha-kurkaiedova,项目名称:holocron,代码行数:29,代码来源:serve.py
示例16: watch_and_spawn
def watch_and_spawn(self, conf):
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileSystemMovedEvent, FileModifiedEvent, DirModifiedEvent
print "Monitoring for changes..."
self.create_subprocess()
parent = self
class AggressiveEventHandler(FileSystemEventHandler):
def should_reload(self, event):
for t in (FileSystemMovedEvent, FileModifiedEvent, DirModifiedEvent):
if isinstance(event, t):
return True
return False
def on_modified(self, event):
if self.should_reload(event):
parent.server_process.kill()
parent.create_subprocess()
# Determine a list of file paths to monitor
paths = self.paths_to_monitor(conf)
event_handler = AggressiveEventHandler()
for path, recurse in paths:
observer = Observer()
observer.schedule(event_handler, path=path, recursive=recurse)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
pass
开发者ID:keitheis,项目名称:pecan,代码行数:35,代码来源:serve.py
示例17: Monitor
class Monitor(object):
def __init__(self, transport):
""" Watches file change events (creation, modification) in the
watched project.
"""
self.config = Config()
self.transport = transport
# Initialize the event handler class to use depending on the SCM to use
handler = None
scm_classes = EventHandler.__subclasses__()
for cls in scm_classes:
tmp_inst = cls(self.transport)
if tmp_inst.scm_name == self.config.scm:
self.logger.debug("Uses the %s class for the monitoring of FS " "changes" % tmp_inst.scm_name)
handler = tmp_inst
break
else:
# Raises this BaboonException if no plugin has found
# according to the scm entry in the config file
raise BaboonException(
"Cannot get a valid FS event handler" " class for your SCM written in your" " baboonrc file"
)
self.monitor = Observer()
try:
self.monitor.schedule(handler, self.config.path, recursive=True)
except OSError, err:
self.logger.error(err)
raise BaboonException(err)
开发者ID:PiWhy,项目名称:baboon,代码行数:32,代码来源:monitor.py
示例18: __init__
def __init__(self, watchpath=CONDOR_WATCHDIR):
observer = Observer()
observer.schedule(self, watchpath, recursive=False)
observer.start()
self.job_watchers = {}
while True:
time.sleep(1)
开发者ID:bgharper,项目名称:daspos-work,代码行数:7,代码来源:jobwatcher.py
示例19: _setup_watchdog
def _setup_watchdog(self):
from watchdog.observers import Observer
log.debug("Watching FS modification on : %s", self.client.base_folder)
self._event_handler = DriveFSEventHandler(self)
self._root_event_handler = DriveFSRootEventHandler(self, os.path.basename(self.client.base_folder))
self._observer = Observer()
self._observer.schedule(self._event_handler, self.client.base_folder, recursive=True)
self._observer.start()
# Be sure to have at least one watchdog event
timeout = 30
lock = self.client.unlock_ref('/', False)
try:
fname = self.client._abspath('/.watchdog_setup')
while (self._watchdog_queue.empty()):
with open(fname, 'a'):
os.utime(fname, None)
sleep(1)
timeout = timeout - 1
if timeout < 0:
log.debug("Can't have watchdog setup. Fallback to full scan mode ?")
os.remove(fname)
raise Exception
os.remove(fname)
if os.path.exists(fname):
os.remove(fname)
finally:
self.client.lock_ref('/', lock)
self._root_observer = Observer()
self._root_observer.schedule(self._root_event_handler, os.path.dirname(self.client.base_folder), recursive=False)
self._root_observer.start()
开发者ID:Bindupriya,项目名称:nuxeo-drive,代码行数:30,代码来源:local_watcher.py
示例20: run
def run(self):
'''Main running function for a process watching a particular condor
job. Creates its own logfile, watches for changes and then exits'''
observer = Observer()
observer.schedule(self, self.__watchdir, recursive=True)
files = 0
file_space = 0
job_ad = classad.parseOne(open(self.__watchdir+"/.job.ad", "r"))
jobdate = datetime.datetime.fromtimestamp(
int(job_ad['JobStartDate'])).strftime('%Y-%m-%d %H:%M:%S')
try:
logname = ''.join([LOG_DIR, job_ad['Owner'], ".",
job_ad['UidDomain'], ".", str(job_ad['QDate']),
".", str(job_ad['ClusterId']),
".", str(job_ad['ProcId']), ".log"])
logfile = open(logname, "wb")
except IOError:
sys.stderr.write("Problem creating logfile {0}".format(logname))
return
logwriter = csv.writer(logfile)
logwriter.writerow([job_ad['User'], jobdate])
observer.start()
while not self.__exit.is_set():
time.sleep(1)
for item in self.stat_monitors.copy():
try:
file_space += os.path.getsize(item)
except OSError:
pass # File has been deleted during our loop
files += 1
logwriter.writerow([int(time.time()), files, file_space])
files = 0
file_space = 0
logfile.close()
开发者ID:bgharper,项目名称:daspos-work,代码行数:35,代码来源:jobwatcher.py
注:本文中的watchdog.observers.Observer类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论