本文整理汇总了Python中watchdog.utils.has_attribute函数的典型用法代码示例。如果您正苦于以下问题:Python has_attribute函数的具体用法?Python has_attribute怎么用?Python has_attribute使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了has_attribute函数的18个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: dispatch
def dispatch(self, event):
"""Dispatches events to the appropriate methods.
:param event:
The event object representing the file system event.
:type event:
:class:`FileSystemEvent`
"""
if self.ignore_directories and event.is_directory:
return
if has_attribute(event, 'src_path') and has_attribute(event, 'dest_path'):
paths = [event.src_path, event.dest_path]
elif has_attribute(event, 'src_path'):
paths = [event.src_path]
elif has_attribute(event, 'dest_path'):
paths = [event.dest_path]
else:
paths = []
if match_any_paths(paths,
included_patterns=self.patterns,
excluded_patterns=self.ignore_patterns,
case_sensitive=self.case_sensitive):
self.on_any_event(event)
_method_map = {
EVENT_TYPE_MODIFIED: self.on_modified,
EVENT_TYPE_MOVED: self.on_moved,
EVENT_TYPE_CREATED: self.on_created,
EVENT_TYPE_DELETED: self.on_deleted,
}
event_type = event.event_type
_method_map[event_type](event)
开发者ID:edevil,项目名称:watchdog,代码行数:33,代码来源:events.py
示例2: on_any_event
def on_any_event(self, event):
from string import Template
if self.drop_during_process and self.process and self.process.poll() is None:
return
if event.is_directory:
object_type = 'directory'
else:
object_type = 'file'
context = {
'watch_src_path': event.src_path,
'watch_dest_path': '',
'watch_event_type': event.event_type,
'watch_object': object_type,
}
if self.shell_command is None:
if has_attribute(event, 'dest_path'):
context.update({'dest_path': event.dest_path})
command = 'echo "${watch_event_type} ${watch_object} from ${watch_src_path} to ${watch_dest_path}"'
else:
command = 'echo "${watch_event_type} ${watch_object} ${watch_src_path}"'
else:
if has_attribute(event, 'dest_path'):
context.update({'watch_dest_path': event.dest_path})
command = self.shell_command
command = Template(command).safe_substitute(**context)
self.process = subprocess.Popen(command, shell=True)
if self.wait_for_process:
self.process.wait()
开发者ID:Amber-Creative,项目名称:ambererpnext,代码行数:33,代码来源:__init__.py
示例3: on_any_event
def on_any_event(self, event):
from string import Template
if event.is_directory:
object_type = "directory"
else:
object_type = "file"
context = {
"watch_src_path": event.src_path,
"watch_dest_path": "",
"watch_event_type": event.event_type,
"watch_object": object_type,
}
if self.shell_command is None:
if has_attribute(event, "dest_path"):
context.update({"dest_path": event.dest_path})
command = 'echo "${watch_event_type} ${watch_object} from ${watch_src_path} to ${watch_dest_path}"'
else:
command = 'echo "${watch_event_type} ${watch_object} ${watch_src_path}"'
else:
if has_attribute(event, "dest_path"):
context.update({"watch_dest_path": event.dest_path})
command = self.shell_command
command = Template(command).safe_substitute(**context)
process = subprocess.Popen(command, shell=True)
if self.wait_for_process:
process.wait()
开发者ID:kaushik94,项目名称:watchdog,代码行数:30,代码来源:__init__.py
示例4: get_file_event_paths
def get_file_event_paths(events):
for event in events:
for key in ('dest_path', 'src_path'):
if has_attribute(event, key):
path = unicode_paths.decode(getattr(event, key))
if not (path.startswith('.autocheck.') or os.path.isdir(path)):
yield os.path.relpath(path)
开发者ID:htmue,项目名称:python-autocheck,代码行数:7,代码来源:autorunner.py
示例5: dispatch
def dispatch(self, event):
"""Dispatches events to the appropriate methods.
:param event:
The event object representing the file system event.
:type event:
:class:`FileSystemEvent`
"""
if self.ignore_directories and event.is_directory:
return
paths = []
if has_attribute(event, 'dest_path'):
paths.append(unicode_paths.decode(event.dest_path))
if event.src_path:
paths.append(unicode_paths.decode(event.src_path))
if any(r.match(p) for r in self.ignore_regexes for p in paths):
return
if any(r.match(p) for r in self.regexes for p in paths):
self.on_any_event(event)
_method_map = {
EVENT_TYPE_MODIFIED: self.on_modified,
EVENT_TYPE_ATTR_MODIFIED: self.on_attr_modified,
EVENT_TYPE_MOVED: self.on_moved,
EVENT_TYPE_CREATED: self.on_created,
EVENT_TYPE_DELETED: self.on_deleted,
}
event_type = event.event_type
_method_map[event_type](event)
开发者ID:ContinuumIO,项目名称:watchdog,代码行数:31,代码来源:events.py
示例6: on_any_event
def on_any_event(self, event):
if self.drop_during_process and self.process and self.process.poll() is None:
return
if event.is_directory:
object_type = 'directory'
else:
object_type = 'file'
env = os.environ.copy()
env.update({
'WATCH_SRC_PATH': event.src_path,
'WATCH_EVENT_TYPE': event.event_type,
'WATCH_OBJECT': object_type,
})
if has_attribute(event, 'dest_path'):
env['WATCH_DEST_PATH'] = event.dest_path
if self.shell_command is None:
if 'WATCH_DEST_PATH' in env:
command = 'echo "$WATCH_EVENT_TYPE $WATCH_OBJECT from $WATCH_SRC_PATH to $WATCH_DEST_PATH"'
else:
command = 'echo "$WATCH_EVENT_TYPE $WATCH_OBJECT $WATCH_SRC_PATH"'
else:
command = self.shell_command
self.process = subprocess.Popen(command, shell=True, env=env)
if self.wait_for_process:
self.process.wait()
开发者ID:mouadino,项目名称:watchdog,代码行数:29,代码来源:__init__.py
示例7: assert_patterns
def assert_patterns(event):
if has_attribute(event, 'dest_path'):
paths = [event.src_path, event.dest_path]
else:
paths = [event.src_path]
filtered_paths = filter_paths(paths, patterns, ignore_patterns)
assert_true(filtered_paths)
开发者ID:mclamb,项目名称:watchdog,代码行数:7,代码来源:test_watchdog_events.py
示例8: dispatch
def dispatch(self, event):
if has_attribute(event, 'dest_path'):
paths = [event.src_path, event.dest_path]
else:
paths = [event.src_path]
if match_any_paths(paths,
included_patterns=self.patterns,
excluded_patterns=self.ignore_patterns):
self.on_any_event(event)
开发者ID:FighterCZY,项目名称:Pandoras-Box,代码行数:9,代码来源:observer.py
示例9: assert_patterns
def assert_patterns(event):
if has_attribute(event, 'dest_path'):
paths = [event.src_path, event.dest_path]
else:
paths = [event.src_path]
filtered_paths = filter_paths(paths,
included_patterns=patterns,
excluded_patterns=ignore_patterns,
case_sensitive=False)
self.assertTrue(filtered_paths)
开发者ID:AnkurDedania,项目名称:watchdog,代码行数:10,代码来源:test_watchdog_events.py
示例10: assert_regexes
def assert_regexes(handler, event):
if has_attribute(event, 'dest_path'):
paths = [event.src_path, event.dest_path]
else:
paths = [event.src_path]
filtered_paths = set()
for p in paths:
if any(r.match(p) for r in handler.regexes):
filtered_paths.add(p)
self.assertTrue(filtered_paths)
开发者ID:AnkurDedania,项目名称:watchdog,代码行数:10,代码来源:test_watchdog_events.py
示例11: assert_patterns
def assert_patterns(event):
if has_attribute(event, 'dest_path'):
paths = [event.src_path, event.dest_path]
else:
paths = [event.src_path]
filtered_paths = filter_paths(
paths,
included_patterns=['*.py', '*.txt'],
excluded_patterns=["*.pyc"],
case_sensitive=False)
assert filtered_paths
开发者ID:gorakhargosh,项目名称:watchdog,代码行数:11,代码来源:test_pattern_matching_event_handler.py
示例12: dispatch
def dispatch(self, event):
if self.ignore_directories and event.is_directory:
return
paths = []
if has_attribute(event, 'dest_path'):
paths.append(unicode_paths.decode(event.dest_path))
if event.src_path:
paths.append(unicode_paths.decode(event.src_path))
if match_any_paths(paths, included_patterns=self.patterns, excluded_patterns=self.ignore_patterns, case_sensitive=self.case_sensitive):
self.on_any_event(event)
_method_map = {EVENT_TYPE_MODIFIED: self.on_modified,
EVENT_TYPE_MOVED: self.on_moved,
EVENT_TYPE_CREATED: self.on_created,
EVENT_TYPE_DELETED: self.on_deleted}
event_type = event.event_type
_method_map[event_type](event)
开发者ID:connoryang,项目名称:dec-eve-serenity,代码行数:16,代码来源:events.py
示例13: dispatch
def dispatch(self, event):
try:
paths = [event.src_path]
if has_attribute(event, 'dest_path'):
paths.append(event.dest_path)
paths = [os.path.relpath(os.path.normpath(os.path.abspath(path)), start=self.source_base_path) for path in paths]
event_type_to_name = {EVENT_TYPE_MOVED: "move", EVENT_TYPE_CREATED: "create",
EVENT_TYPE_MODIFIED: "modify", EVENT_TYPE_DELETED: "delete"}
vprint("local change type: {} paths: {}", event_type_to_name[event.event_type], paths)
for path in paths:
pattern = find_matching_pattern(path, self.source_ignore_patterns, match_subpath=True)
if pattern:
vprint("ignoring change for path {}, pattern: {}", path, pattern)
return
self.queue.put(event)
except:
import traceback
traceback.print_exc()
开发者ID:Infinidat,项目名称:infi.pysync,代码行数:20,代码来源:__init__.py
示例14: dispatch
def dispatch(self, event):
"""Only dispatch if the event does not correspond to an ignored file.
Args:
event (watchdog.events.FileSystemEvent)
"""
if event.is_directory:
return
paths = []
if has_attribute(event, 'dest_path'):
paths.append(os.path.realpath(
unicode_paths.decode(event.dest_path)))
if event.src_path:
paths.append(os.path.realpath(
unicode_paths.decode(event.src_path)))
paths = [p for p in paths
if not p.startswith(os.path.realpath(self.vcs.repository_dir()))
and not self.vcs.path_is_ignored(p)]
if len(paths) > 0:
super(VcsEventHandler, self).dispatch(event)
开发者ID:naphatkrit,项目名称:easyci,代码行数:20,代码来源:vcs_event_handler.py
示例15: dispatch
def dispatch(self, event):
"""Dispatch an event after filtering. We handle
creation and move events only.
:param event: watchdog event.
:returns: None
"""
if event.event_type not in (EVENT_TYPE_CREATED, EVENT_TYPE_MOVED):
return
if self.ignore_directories and event.is_directory:
return
paths = []
if has_attribute(event, 'dest_path'):
paths.append(unicode_paths.decode(event.dest_path))
if event.src_path:
paths.append(unicode_paths.decode(event.src_path))
if any(r.match(p) for r in self.ignore_regexes for p in paths):
return
if any(r.match(p) for r in self.regexes for p in paths):
self._loop.call_soon_threadsafe(asyncio.async, self._process_file(event))
开发者ID:vineeth-s,项目名称:pomoxis,代码行数:24,代码来源:watcher.py
示例16: ctypes_find_library
DirModifiedEvent,\
DirMovedEvent,\
DirCreatedEvent,\
FileDeletedEvent,\
FileModifiedEvent,\
FileMovedEvent,\
FileCreatedEvent,\
EVENT_TYPE_MODIFIED,\
EVENT_TYPE_CREATED,\
EVENT_TYPE_DELETED,\
EVENT_TYPE_MOVED
libc_string = ctypes_find_library('c', 'libc.so.6')
libc = ctypes.CDLL(libc_string, use_errno=True)
if (not has_attribute(libc, 'inotify_init') or
not has_attribute(libc, 'inotify_add_watch') or
not has_attribute(libc, 'inotify_rm_watch')):
raise ImportError("Unsupported libc version found: %s" % libc_string)
# #include <sys/inotify.h>
# char *strerror(int errnum);
#strerror = ctypes.CFUNCTYPE(c_char_p, c_int)(
# ("strerror", libc))
# #include <sys/inotify.h>
# int inotify_init(void);
inotify_init = ctypes.CFUNCTYPE(c_int, use_errno=True)(
("inotify_init", libc))
# #include <sys/inotify.h>
开发者ID:edevil,项目名称:watchdog,代码行数:31,代码来源:inotify.py
示例17: ctypes_find_library
from __future__ import with_statement
import os
import errno
import struct
import threading
import ctypes
from functools import reduce
from ctypes import c_int, c_char_p, c_uint32
from watchdog.utils import has_attribute, ctypes_find_library
libc_string = ctypes_find_library("c", "libc.so")
libc = ctypes.CDLL(libc_string, use_errno=True)
if (
not has_attribute(libc, "inotify_init")
or not has_attribute(libc, "inotify_add_watch")
or not has_attribute(libc, "inotify_rm_watch")
):
raise ImportError("Unsupported libc version found: %s" % libc_string)
inotify_add_watch = ctypes.CFUNCTYPE(c_int, c_int, c_char_p, c_uint32, use_errno=True)(("inotify_add_watch", libc))
inotify_rm_watch = ctypes.CFUNCTYPE(c_int, c_int, c_uint32, use_errno=True)(("inotify_rm_watch", libc))
inotify_init = ctypes.CFUNCTYPE(c_int, use_errno=True)(("inotify_init", libc))
try:
inotify_init1 = ctypes.CFUNCTYPE(c_int, c_int, use_errno=True)(("inotify_init1", libc))
except AttributeError:
开发者ID:bfoxtrot,项目名称:foxtroterp,代码行数:30,代码来源:inotify_c.py
示例18: except
# errors, e.g.on android IOError "No usable temporary directory found"
# will be raised.
pass
if libc_path is not None:
return ctypes.CDLL(libc_path)
# Fallbacks
try:
return ctypes.CDLL('libc.so')
except (OSError, IOError):
return ctypes.CDLL('libc.so.6')
libc = _load_libc()
if not has_attribute(libc, 'inotify_init') or \
not has_attribute(libc, 'inotify_add_watch') or \
not has_attribute(libc, 'inotify_rm_watch'):
raise UnsupportedLibc("Unsupported libc version found: %s" % libc._name)
inotify_add_watch = ctypes.CFUNCTYPE(c_int, c_int, c_char_p, c_uint32, use_errno=True)(
("inotify_add_watch", libc))
inotify_rm_watch = ctypes.CFUNCTYPE(c_int, c_int, c_uint32, use_errno=True)(
("inotify_rm_watch", libc))
inotify_init = ctypes.CFUNCTYPE(c_int, use_errno=True)(
("inotify_init", libc))
class InotifyConstants(object):
开发者ID:Jamesleons,项目名称:watchdog,代码行数:31,代码来源:inotify_c.py
注:本文中的watchdog.utils.has_attribute函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论