本文整理汇总了Python中pyinotify.Notifier类的典型用法代码示例。如果您正苦于以下问题:Python Notifier类的具体用法?Python Notifier怎么用?Python Notifier使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Notifier类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: enabled
def enabled(self):
if not self.running:
wm = WatchManager()
self.event_handler = LibraryEvent(app.library)
# Choose event types to watch for
# FIXME: watch for IN_CREATE or for some reason folder copies
# are missed, --nickb
FLAGS = ['IN_DELETE', 'IN_CLOSE_WRITE',# 'IN_MODIFY',
'IN_MOVED_FROM', 'IN_MOVED_TO', 'IN_CREATE']
mask = reduce(lambda x, s: x | EventsCodes.ALL_FLAGS[s], FLAGS, 0)
if self.USE_THREADS:
print_d("Using threaded notifier")
self.notifier = ThreadedNotifier(wm, self.event_handler)
# Daemonize to ensure thread dies on exit
self.notifier.daemon = True
self.notifier.start()
else:
self.notifier = Notifier(wm, self.event_handler, timeout=100)
GLib.timeout_add(1000, self.unthreaded_callback)
for path in get_scan_dirs():
print_d('Watching directory %s for %s' % (path, FLAGS))
# See https://github.com/seb-m/pyinotify/wiki/
# Frequently-Asked-Questions
wm.add_watch(path, mask, rec=True, auto_add=True)
self.running = True
开发者ID:MikeiLL,项目名称:quodlibet,代码行数:29,代码来源:auto_library_update.py
示例2: watch_path
def watch_path(path, add_watch_opt=None, watcher_opt=None):
"""Tail all the files specify by path.
path: By default all files under the path, which should be a directory, are tailed.
Path could also be list of paths or a glob pattern.
The behavior can be modified by add_watch_opt.
See pyinotify.WatchManager.add_watch.
output: defaults to stdout.
can be diverted any callable with:
watcher_opt=dict(out=got_log_line)
where got_log_line() takes a tuple (log_path, log_line)
*_opt: Are pass-through to pyinotify.WatchManager.add_watch and tailall.Monitor.
See respective functions for detail.
"""
wm = WatchManager()
notifier = Notifier(wm, default_proc_fun=FsEvent())
#mask=ALL_EVENTS
#mask=IN_MOVED_TO|IN_CREATE|IN_MODIFY
mask=IN_MODIFY|IN_CLOSE_WRITE
kw=dict(rec=True, auto_add=False)
kw.update(add_watch_opt or {})
wm.add_watch(path, mask, **kw)
monitor=Monitor(watcher_opt=watcher_opt)
notifier.loop(callback=monitor.got_event)
开发者ID:tengu,项目名称:tailall,代码行数:29,代码来源:tailall.py
示例3: fsMonitor
def fsMonitor(path="/data"):
wm = WatchManager()
mask = IN_DELETE | IN_MODIFY | IN_CREATE
notifier = Notifier(wm, EventHandler(), read_freq=10)
notifier.coalesce_events()
wm.add_watch(path, mask, rec=True, auto_add=True)
notifier.loop()
开发者ID:soone,项目名称:docker-rsync,代码行数:7,代码来源:pyrsync.py
示例4: enabled
def enabled(self):
if not self.running:
wm = WatchManager()
self.event_handler = LibraryEvent(library=app.library)
FLAGS = ['IN_DELETE', 'IN_CLOSE_WRITE', # 'IN_MODIFY',
'IN_MOVED_FROM', 'IN_MOVED_TO', 'IN_CREATE']
masks = [EventsCodes.FLAG_COLLECTIONS['OP_FLAGS'][s]
for s in FLAGS]
mask = reduce(operator.or_, masks, 0)
if self.USE_THREADS:
print_d("Using threaded notifier")
self.notifier = ThreadedNotifier(wm, self.event_handler)
# Daemonize to ensure thread dies on exit
self.notifier.daemon = True
self.notifier.start()
else:
self.notifier = Notifier(wm, self.event_handler, timeout=100)
GLib.timeout_add(1000, self.unthreaded_callback)
for path in get_scan_dirs():
real_path = os.path.realpath(path)
print_d('Watching directory %s for %s (mask: %x)'
% (real_path, FLAGS, mask))
# See https://github.com/seb-m/pyinotify/wiki/
# Frequently-Asked-Questions
wm.add_watch(real_path, mask, rec=True, auto_add=True)
self.running = True
开发者ID:Muges,项目名称:quodlibet,代码行数:31,代码来源:auto_library_update.py
示例5: run
def run(self):
"""Create the inotify WatchManager and generate FileSysEvents"""
if utils.is_win32():
self.run_win32()
return
# Only capture events that git cares about
self._wmgr = WatchManager()
if self._is_pyinotify_08x():
notifier = Notifier(self._wmgr, FileSysEvent(), timeout=self._timeout)
else:
notifier = Notifier(self._wmgr, FileSysEvent())
self._watch_directory(self._path)
# Register files/directories known to git
for filename in core.decode(self._git.ls_files()).splitlines():
filename = os.path.realpath(filename)
directory = os.path.dirname(filename)
self._watch_directory(directory)
# self._running signals app termination. The timeout is a tradeoff
# between fast notification response and waiting too long to exit.
while self._running:
if self._is_pyinotify_08x():
check = notifier.check_events()
else:
check = notifier.check_events(timeout=self._timeout)
if not self._running:
break
if check:
notifier.read_events()
notifier.process_events()
notifier.stop()
开发者ID:rakoo,项目名称:git-cola,代码行数:35,代码来源:inotify.py
示例6: run
def run(self):
p = PTmp()
notifier = Notifier(Settings.wm, p)
# TODO: Not necessary to watch the directories that already have
# a procedures.xml
Settings.wdd = Settings.wm.add_watch(Settings.monitor_path, Settings.mask, rec=True)
notifier.loop()
开发者ID:masums,项目名称:uia2atk,代码行数:7,代码来源:qamon.py
示例7: watch
def watch(pathes, extensions):
manager = WatchManager()
handler = Handler(extensions=extensions)
notifier = Notifier(manager, default_proc_fun=handler)
for path in pathes:
manager.add_watch(path, IN_MODIFY, rec=True, auto_add=True)
notifier.loop()
开发者ID:zweifisch,项目名称:wsgi-supervisor,代码行数:7,代码来源:supervisor.py
示例8: main
def main():
vm = WatchManager()
vm.add_watch(monitor_dirs,ALL_EVENTS,rec = True)
en = MyEvent()
notifier = Notifier(vm,en)
notifier.loop()
开发者ID:weekday,项目名称:jiankong,代码行数:7,代码来源:pyinotify_file.py
示例9: run
def run(self):
"""Create the inotify WatchManager and generate FileSysEvents"""
# Only capture events that git cares about
self._wmgr = WatchManager()
if self._is_pyinotify_08x():
notifier = Notifier(self._wmgr, FileSysEvent(self),
timeout=self._timeout)
else:
notifier = Notifier(self._wmgr, FileSysEvent(self))
dirs_seen = set()
added_flag = False
# self._abort signals app termination. The timeout is a tradeoff
# between fast notification response and waiting too long to exit.
while not self._abort:
if not added_flag:
self._watch_directory(self._path)
# Register files/directories known to git
for filename in self._git.ls_files().splitlines():
directory = os.path.dirname(filename)
self._watch_directory(directory)
added_flag = True
notifier.process_events()
if self._is_pyinotify_08x():
check = notifier.check_events()
else:
check = notifier.check_events(timeout=self._timeout)
if check:
notifier.read_events()
notifier.stop()
开发者ID:johnpaulett,项目名称:git-cola,代码行数:29,代码来源:inotify.py
示例10: test_update_conf
def test_update_conf(self, default_config, show_notification):
conf_time_1 = path.getmtime(self.tmp.conf.join("config.py"))
out_file = self.tmp_output.join("out.log")
command_args = [
"-c", self.config_file,
"-r", "bash -c 'echo a | tee -a {}'".format(out_file),
"-d", unicode(self.tmp.src),
]
events = [
(self.tmp.conf, "config.py", "# some new data"),
(self.tmp.conf, "config.py", "# some new data"),
]
self._copy_default_config(default_config)
default_config.RUNNER_DELAY = -1
wm = WatchManager()
config = Config(watch_manager=wm, command_args=command_args)
handler = FileChangeHandler(config=config)
notifier = Notifier(wm, handler, timeout=1000)
notifier.loop(callback=partial(self._event_generator, events))
# There are some stupid race conditions (possibly due to the callbacks)
# Sleep time allows to execute all needed code
sleep(0.2)
conf_time_2 = path.getmtime(self.tmp.conf.join("config.py"))
self.assertNotEqual(conf_time_1, conf_time_2)
self.assertTrue(path.exists(out_file))
self.assertEqual(show_notification.call_count, 2)
开发者ID:mrfuxi,项目名称:testrunner,代码行数:32,代码来源:test_functional.py
示例11: __init__
def __init__(
self,
watch_manager,
default_proc_fun=None,
read_freq=0,
threshold=0,
timeout=None,
airtime_config=None,
api_client=None,
bootstrap=None,
mmc=None,
):
Notifier.__init__(self, watch_manager, default_proc_fun, read_freq, threshold, timeout)
self.logger = logging.getLogger()
self.config = airtime_config
self.api_client = api_client
self.bootstrap = bootstrap
self.md_manager = AirtimeMetadata()
self.import_processes = {}
self.watched_folders = []
self.mmc = mmc
self.wm = watch_manager
self.mask = pyinotify.ALL_EVENTS
while not self.init_rabbit_mq():
self.logger.error("Error connecting to RabbitMQ Server. Trying again in few seconds")
time.sleep(5)
开发者ID:RadioCampusFrance,项目名称:airtime,代码行数:28,代码来源:airtimenotifier.py
示例12: test_update_filtered
def test_update_filtered(self, default_config, show_notification):
out_file = self.tmp_output.join("out.log")
command_args = [
"-c", self.config_file,
"-r", "bash -c 'echo a | tee -a {}'".format(out_file),
"-d", unicode(self.tmp.src),
]
events = [
(self.tmp.src, "filtered_1.pyc", "some new data"),
(self.tmp.src, "filtered_2.tmp", "some new data"),
(self.tmp.src, ".hidden", "some new data"),
]
self._copy_default_config(default_config)
default_config.RUNNER_DELAY = -1
wm = WatchManager()
config = Config(watch_manager=wm, command_args=command_args)
handler = FileChangeHandler(config=config)
notifier = Notifier(wm, handler)
notifier.loop(callback=partial(self._event_generator, events))
# There are some stupid race conditions (possibly due to the callbacks)
# Sleep time allows to execute all needed code
sleep(0.2)
self.assertTrue(path.exists(self.tmp.src.join("filtered_1.pyc")))
self.assertTrue(path.exists(self.tmp.src.join("filtered_2.tmp")))
self.assertTrue(path.exists(self.tmp.src.join(".hidden")))
self.assertFalse(path.exists(out_file))
self.assertFalse(show_notification.called)
开发者ID:mrfuxi,项目名称:testrunner,代码行数:32,代码来源:test_functional.py
示例13: startSentry
def startSentry(sentry_paths):
"""
Sentry Runner
"""
notifier = Notifier(wm, do_event())
for path in sentry_paths:
wdd = wm.add_watch(sentry_paths[path].path, mask, rec=True)
print 'watching %s ' % sentry_paths[path].path
#wdd = wm.add_watch('/home/cmdln/sandbox/permissionminder/test', mask, rec=True)
while True:
try:
notifier.process_events()
if notifier.check_events():
notifier.read_events()
except KeyboardInterrupt:
notifier.stop()
break
except ConfigChange:
notifier.stop()
raise ConfigChange('Config Changed')
break
except:
#from pdb import set_trace;set_trace()
pass
开发者ID:nickanderson,项目名称:psentry,代码行数:25,代码来源:psentry.py
示例14: FSMonitor
def FSMonitor(path='/root/wpf'):
wm = WatchManager()
mask = IN_DELETE | IN_MODIFY | IN_CREATE
notifier = Notifier(wm, EventHandler(),read_freq=10)
notifier.coalesce_events()
# 设置受监视的事件,这里只监视文件创建事件,(rec=True, auto_add=True)为递归处理
wm.add_watch(path,mask,rec=True, auto_add=True)
notifier.loop()
开发者ID:ading2016,项目名称:My_Repo,代码行数:8,代码来源:notify.py
示例15: tailwatch
def tailwatch(dir):
FLAGS = EventsCodes.ALL_FLAGS
mask = FLAGS['IN_CREATE'] |FLAGS['IN_DELETE'] | FLAGS['IN_MODIFY']
wm = WatchManager()
p = PTmp()
notifier = Notifier(wm, p)
wdd = wm.add_watch(dir, mask, rec=True)
notifier.loop()
开发者ID:danohuiginn,项目名称:doh,代码行数:8,代码来源:logwatch.py
示例16: builder_process
def builder_process():
logger.debug(' - Watched static files for changes to rebuild')
wm = WatchManager()
notifier = Notifier(wm, default_proc_fun=_build)
wm.add_watch(
watched_dir,
IN_MODIFY, # | IN_CREATE | IN_DELETE,
rec=True, auto_add=True
)
notifier.loop()
开发者ID:wreckah,项目名称:statika,代码行数:10,代码来源:dev.py
示例17: QNotifier
class QNotifier(QThread):
def __init__(self, wm, processor):
self.event_queue = list()
self._processor = processor
self.notifier = Notifier(wm,
NinjaProcessEvent(self.event_queue.append))
self.notifier.coalesce_events(True)
self.keep_running = True
QThread.__init__(self)
def run(self):
while self.keep_running:
try:
self.notifier.process_events()
except OSError:
pass # OSError: [Errno 2] No such file or directory happens
e_dict = {}
while len(self.event_queue):
e_type, e_path = self.event_queue.pop(0)
e_dict.setdefault(e_path, []).append(e_type)
keys = e_dict.keys()
while len(keys):
key = keys.pop(0)
event = e_dict.pop(key)
if (ADDED in event) and (DELETED in event):
event = [e for e in event if e not in (ADDED, DELETED)]
for each_event in event:
self._processor(each_event, key)
if self.notifier.check_events():
self.notifier.read_events()
self.notifier.stop()
开发者ID:dead5,项目名称:ninja-ide,代码行数:33,代码来源:linux.py
示例18: create_monitor
def create_monitor(to_watch, name):
"Create and start a new directory monitor."
messenger = NetworkSender(name)
p = Monitor(messenger)
wm = WatchManager() # Watch Manager
notifier = Notifier(wm, p) # Notifier
try:
wdd = wm.add_watch(to_watch, IN_DELETE | IN_CREATE | IN_MODIFY)
notifier.loop()
except WatchManagerError, err:
print err, err.wmd
开发者ID:abachman,项目名称:watch-me-work,代码行数:11,代码来源:watch_me.py
示例19: monitor
def monitor(watch_path, callback):
watch_path = os.path.abspath(watch_path)
if os.path.isfile(watch_path):
path_for_manager = os.path.dirname(watch_path)
else:
path_for_manager = watch_path
manager = WatchManager()
notifier = Notifier(manager,
AutoRunner(watch_path, callback))
manager.add_watch(path_for_manager, IN_MODIFY)
notifier.loop()
开发者ID:fisadev,项目名称:ponytor,代码行数:13,代码来源:ponytor.py
示例20: start_queue
def start_queue():
dir_queue = vmcheckerpaths.dir_queue()
# register for inotify envents before processing stale jobs
wm = WatchManager()
notifier = Notifier(wm, _QueueManager())
wm.add_watch(dir_queue, EventsCodes.ALL_FLAGS['IN_CLOSE_WRITE'])
process_stale_jobs(dir_queue)
# set callback to receive notifications (includes queued jobs after
# setting up inotify but before we finished processing stale jobs)
notifier.loop(callback=_callback)
开发者ID:MihaiTabara,项目名称:vmchecker,代码行数:13,代码来源:queue_manager.py
注:本文中的pyinotify.Notifier类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论