本文整理汇总了Python中pydbus.SessionBus类的典型用法代码示例。如果您正苦于以下问题:Python SessionBus类的具体用法?Python SessionBus怎么用?Python SessionBus使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了SessionBus类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: post_config_hook
def post_config_hook(self):
if self.user:
bus = SessionBus()
else:
bus = SystemBus()
systemd = bus.get("org.freedesktop.systemd1")
self.systemd_unit = bus.get(".systemd1", systemd.LoadUnit(self.unit))
开发者ID:ultrabug,项目名称:py3status,代码行数:7,代码来源:systemd.py
示例2: info
def info(self):
tod_time = time.strftime("%a, %d %b %Y %H:%M:%S",
time.localtime(start_time))
message_string = ("Wheel circumference: {} meters\n"
"Start time: {}\n"
"GPIO Channel Number: {}"
.format(wheel_circumference, tod_time, channel))
bus_1 = SessionBus()
notifications = bus_1.get('.Notifications')
notifications.Notify('test', 0,
'dialog-information',
"Pydbus Server Information",
message_string,
[], {}, 5000)
开发者ID:HamPUG,项目名称:meetings,代码行数:15,代码来源:server.py
示例3: bus
def bus():
from pydbus import SessionBus
bus = SessionBus()
time.sleep(.5)
print("\n[initalize] pydbus.SessionBus ...")
bus.own_name(name="org.scarlett")
# bus.dbus
time.sleep(.5)
yield bus
print("\n[teardown] pydbus.SessionBus ...")
# del bus._dbus
print("ran: del bus._dbus")
del bus
print("ran: del bus")
开发者ID:bossjones,项目名称:scarlett_os,代码行数:15,代码来源:test_mpris.py
示例4: _init_dbus
def _init_dbus(self):
"""
Get the device id
"""
_bus = SessionBus()
if self.device_id is None:
self.device_id = self._get_device_id(_bus)
if self.device_id is None:
return False
try:
self._dev = _bus.get(SERVICE_BUS, DEVICE_PATH + "/%s" % self.device_id)
except Exception:
return False
return True
开发者ID:ultrabug,项目名称:py3status,代码行数:17,代码来源:kdeconnector.py
示例5: _connect_to_dbus
def _connect_to_dbus(self):
self.bus = SessionBus()
self.dbus_proxy = self.bus.get("org.scarlett", object_path='/org/scarlett/Listener') # NOQA
self.dbus_proxy.emitConnectedToListener('ScarlettListener')
sleep(2)
logger.info('_connect_to_dbus')
ss_cancel_signal = self.bus.subscribe(sender=None,
iface="org.scarlett.Listener",
signal="ListenerCancelSignal",
object="/org/scarlett/Listener",
arg0=None,
flags=0,
signal_fired=self.cancel_listening)
开发者ID:bossjones,项目名称:scarlett-dbus-poc,代码行数:13,代码来源:generator_listener.py
示例6: __init__
def __init__(self, use_terminal = True,
use_libnotify = True,
debug = False,
device_id = None,
device_name = None,
ignore = None):
"""
Get the device id
"""
self.terminal = use_terminal
self.libnotify = use_libnotify
self.debug = debug
if ignore:
self.ignore = [app.lower() for app in ignore]
if not self.terminal:
self._debug('hide terminal messages')
if not self.libnotify:
self._debug('hide notifications')
if use_libnotify:
Notify.init('KDEConnect Notify')
if ignore:
apps = ignore[0]
for app in ignore[1:]:
apps += ', ' + app
self._debug('ignore ' + apps)
self._bus = SessionBus()
if device_id is None:
self.device_id = self._get_device_id(device_id, device_name)
if self.device_id is None:
self._debug('No device id found')
return
else:
self.device_id = device_id
self._debug('Device id is %s' % self.device_id)
try:
self._dev = self._bus.get(SERVICE_BUS,
DEVICE_PATH + '/%s' % self.device_id)
except Exception:
self.device_id = None
return
开发者ID:ritze,项目名称:kdeconnect-notify,代码行数:48,代码来源:kdeconnect-notify.py
示例7: _connect_to_dbus
def _connect_to_dbus(self):
self.bus = SessionBus()
self.dbus_proxy = self.bus.get(
"org.scarlett", object_path="/org/scarlett/Listener"
) # NOQA
self.dbus_proxy.emitConnectedToListener("ScarlettListener")
time.sleep(2)
logger.info("_connect_to_dbus")
# TODO: Add a ss_cancel_signal.disconnect() function later
ss_cancel_signal = self.bus.subscribe(
sender=None,
iface="org.scarlett.Listener",
signal="ListenerCancelSignal",
object="/org/scarlett/Listener",
arg0=None,
flags=0,
signal_fired=self.on_cancel_listening,
)
开发者ID:bossjones,项目名称:scarlett_os,代码行数:18,代码来源:listener.py
示例8: post_config_hook
def post_config_hook(self):
self._dbus = None
self._data = {}
self._control_states = {}
self._kill = False
self._mpris_players = {}
self._mpris_names = {}
self._mpris_name_index = {}
self._player = None
self._player_details = {}
self._tries = 0
# start last
self._dbus = SessionBus()
self._start_listener()
self._states = {
'pause': {
'action': 'Pause',
'clickable': 'CanPause',
'icon': self.icon_pause
},
'play': {
'action': 'Play',
'clickable': 'CanPlay',
'icon': self.icon_play
},
'stop': {
'action': 'Stop',
'clickable': 'True', # The MPRIS API lacks 'CanStop' function.
'icon': self.icon_stop
},
'next': {
'action': 'Next',
'clickable': 'CanGoNext',
'icon': self.icon_next
},
'previous': {
'action': 'Previous',
'clickable': 'CanGoPrevious',
'icon': self.icon_previous
}
}
开发者ID:mrt-prodz,项目名称:py3status,代码行数:41,代码来源:mpris.py
示例9: post_config_hook
def post_config_hook(self):
if self.py3.is_gevent():
raise Exception(STRING_GEVENT)
self._dbus = None
self._data = {}
self._control_states = {}
self._kill = False
self._mpris_players = {}
self._mpris_names = {}
self._mpris_name_index = {}
self._player = None
self._player_details = {}
self._tries = 0
# start last
self._dbus = SessionBus()
self._start_listener()
self._states = {
"pause": {
"action": "Pause",
"clickable": "CanPause",
"icon": self.icon_pause,
},
"play": {"action": "Play", "clickable": "CanPlay", "icon": self.icon_play},
"stop": {
"action": "Stop",
"clickable": "True", # The MPRIS API lacks 'CanStop' function.
"icon": self.icon_stop,
},
"next": {
"action": "Next",
"clickable": "CanGoNext",
"icon": self.icon_next,
},
"previous": {
"action": "Previous",
"clickable": "CanGoPrevious",
"icon": self.icon_previous,
},
}
开发者ID:ultrabug,项目名称:py3status,代码行数:39,代码来源:mpris.py
示例10: __init__
def __init__(self, bucket, loop, *args, **kargs):
self.bucket = bucket
self.loop = loop
self.running = True
self._stop = threading.Event()
self.queue = Queue.Queue(10)
# NOTE: enumerate req to iterate through tuple and find GVariant
@trace
def player_cb(*args, **kwargs):
if SCARLETT_DEBUG:
logger.debug("player_cb PrettyPrinter: ")
pp = pprint.PrettyPrinter(indent=4)
pp.pprint(args)
for i, v in enumerate(args):
if SCARLETT_DEBUG:
logger.debug("Type v: {}".format(type(v)))
logger.debug("Type i: {}".format(type(i)))
if type(v) is gi.overrides.GLib.Variant:
if SCARLETT_DEBUG:
logger.debug(
"THIS SHOULD BE A Tuple now: {}".format(v))
msg, scarlett_sound = v
logger.warning(" msg: {}".format(msg))
logger.warning(
" scarlett_sound: {}".format(scarlett_sound))
# NOTE: Create something like test_gdbus_player.ScarlettPlayer('pi-listening')
# NOTE: test_gdbus_player.ScarlettPlayer
# NOTE: self.bucket.put()
# NOTE: ADD self.queue.put(v)
# NOTE: enumerate req to iterate through tuple and find GVariant
@trace
def command_cb(*args, **kwargs):
if SCARLETT_DEBUG:
logger.debug("player_cb PrettyPrinter: ")
pp = pprint.PrettyPrinter(indent=4)
pp.pprint(args)
for i, v in enumerate(args):
if SCARLETT_DEBUG:
logger.debug("Type v: {}".format(type(v)))
logger.debug("Type i: {}".format(type(i)))
if type(v) is gi.overrides.GLib.Variant:
if SCARLETT_DEBUG:
logger.debug(
"THIS SHOULD BE A Tuple now: {}".format(v))
msg, scarlett_sound, command = v
logger.warning(" msg: {}".format(msg))
logger.warning(
" scarlett_sound: {}".format(scarlett_sound))
logger.warning(" command: {}".format(command))
# NOTE: Create something like test_gdbus_player.ScarlettPlayer('pi-listening')
# NOTE: test_gdbus_player.ScarlettPlayer
# NOTE: self.bucket.put()
# NOTE: ADD self.queue.put(v)
# with SessionBus() as bus:
bus = SessionBus()
ss = bus.get("org.scarlett", object_path='/org/scarlett/Listener')
# SttFailedSignal / player_cb
ss_failed_signal = bus.con.signal_subscribe(None,
"org.scarlett.Listener",
"SttFailedSignal",
'/org/scarlett/Listener',
None,
0,
player_cb)
# ListenerReadySignal / player_cb
ss_rdy_signal = bus.con.signal_subscribe(None,
"org.scarlett.Listener",
"ListenerReadySignal",
'/org/scarlett/Listener',
None,
0,
player_cb)
# KeywordRecognizedSignal / player_cb
ss_kw_rec_signal = bus.con.signal_subscribe(None,
"org.scarlett.Listener",
"KeywordRecognizedSignal",
'/org/scarlett/Listener',
None,
0,
player_cb)
# CommandRecognizedSignal /command_cb
ss_cmd_rec_signal = bus.con.signal_subscribe(None,
"org.scarlett.Listener",
"CommandRecognizedSignal",
'/org/scarlett/Listener',
None,
0,
command_cb)
# ListenerCancelSignal / player_cb
ss_cancel_signal = bus.con.signal_subscribe(None,
"org.scarlett.Listener",
"ListenerCancelSignal",
#.........这里部分代码省略.........
开发者ID:bossjones,项目名称:scarlett-dbus-poc,代码行数:101,代码来源:run_tasker.py
示例11: SessionBus
#!/usr/bin/python3
import os
import sys
import datetime
import json
import subprocess
from time import sleep
from pydbus import SessionBus
from glob import glob
BUS = SessionBus()
LAYOUT = BUS.get(bus_name='org.way-cooler', object_path='/org/way_cooler/Layout')
def main():
while True:
layout = json.loads(LAYOUT.Debug())
workspaces = get_workspaces(layout)
workspaces.sort()
active_workspace = ""
try:
active_workspace = LAYOUT.ActiveWorkspace()
except Exception:
pass
workspaces = " ".join(workspaces)
workspaces = format_workspaces(layout, workspaces, active_workspace)
funcs = [workspaces + "%{c}",
lambda: get_time() + "%{r}",
my_get_temp,
开发者ID:matejc,项目名称:helper_scripts,代码行数:31,代码来源:bar.py
示例12: __init__
class Py3status:
"""
"""
# available configuration parameters
button_next = 4
button_previous = 5
button_stop = None
button_toggle = 1
format = '{previous}{toggle}{next} {state} [{artist} - ][{title}]'
format_none = 'no player running'
icon_next = u'»'
icon_pause = u'▮'
icon_play = u'▶'
icon_previous = u'«'
icon_stop = u'◾'
player_priority = []
state_pause = u'▮'
state_play = u'▶'
state_stop = u'◾'
def __init__(self):
self._dbus = None
self._data = {}
self._kill = False
self._mpris_players = {}
self._mpris_names = {}
self._mpris_name_index = {}
self._player = None
self._player_details = {}
self._tries = 0
def post_config_hook(self):
self._dbus = SessionBus()
self._start_listener()
def _init_data(self):
self._data = {
'album': None,
'artist': None,
'error_occurred': False,
'length': None,
'player': None,
'state': STOPPED,
'title': None
}
if self._player is None:
return
try:
self._data['player'] = self._player.Identity
playback_status = self._player.PlaybackStatus
self._data['state'] = self._get_state(playback_status)
metadata = self._player.Metadata
self._update_metadata(metadata)
except Exception:
self._data['error_occurred'] = True
def _get_button_state(self, control_state):
try:
# Workaround: The last parameter returns True for the Stop button.
clickable = getattr(self._player, control_state['clickable'], True)
except Exception:
clickable = False
state = self._data.get('state')
if control_state['action'] == 'Play' and state == PLAYING:
clickable = False
elif control_state['action'] == 'Pause' and state in [STOPPED, PAUSED]:
clickable = False
elif control_state['action'] == 'Stop' and state == STOPPED:
clickable = False
return clickable
def _get_state(self, playback_status):
if playback_status == 'Playing':
return PLAYING
elif playback_status == 'Paused':
return PAUSED
else:
return STOPPED
def _get_text(self):
"""
Get the current metadata
"""
if self._data.get('state') == PLAYING:
color = self.py3.COLOR_PLAYING or self.py3.COLOR_GOOD
state_symbol = self.state_play
elif self._data.get('state') == PAUSED:
color = self.py3.COLOR_PAUSED or self.py3.COLOR_DEGRADED
state_symbol = self.state_pause
else:
color = self.py3.COLOR_STOPPED or self.py3.COLOR_BAD
state_symbol = self.state_stop
if self._data.get('error_occurred'):
color = self.py3.COLOR_BAD
#.........这里部分代码省略.........
开发者ID:Andrwe,项目名称:py3status,代码行数:101,代码来源:mpris.py
示例13: SessionBus
#!/usr/bin/env python
# Based on http://stackoverflow.com/questions/22390064/use-dbus-to-just-send-a-message-in-python
# Python script to call the methods of the DBUS Test Server
from pydbus import SessionBus
#get the session bus
bus = SessionBus()
#get the object
the_object = bus.get("net.lew21.pydbus.ClientServerExample")
#call the methods and print the results
reply = the_object.Hello()
print(reply)
reply = the_object.EchoString("test 123")
print(reply)
the_object.Quit()
开发者ID:LEW21,项目名称:pydbus,代码行数:21,代码来源:client.py
示例14: sleep
from __future__ import print_function
from pydbus import SessionBus
from gi.repository import GLib
import subprocess
from time import sleep
loop = GLib.MainLoop()
subprocess.Popen("gnome-music")
sleep(5)
print("Waiting for GNOME Music to start...")
b = SessionBus()
m = b.get("org.mpris.MediaPlayer2.GnomeMusic", "/org/mpris/MediaPlayer2")
m.PropertiesChanged.connect(print)
m.ActivatePlaylist(m.GetPlaylists(0, 5, "Alphabetical", 0)[0][0])
m.Play()
sleep(1)
assert(m.PlaybackStatus == "Playing")
m.Pause()
assert(m.PlaybackStatus == "Paused")
m.Play()
assert(m.PlaybackStatus == "Playing")
t = m.Metadata["xesam:title"]
开发者ID:LEW21,项目名称:pydbus,代码行数:30,代码来源:gnome_music.py
示例15: Mpvplayer
from playthread import Mpvplayer
loop = GLib.MainLoop()
player = Mpvplayer()
player.start()
class DBUSPlayerService(object):
"""
<node>
<interface name='github.zhangn1985.dbplay'>
<method name='play'>
<arg type='s' name='playinfo' direction='in'/>
</method>
<method name='stop'/>
<method name='exit'/>
</interface>
</node>
"""
def play(self, playinfo):
player.play(playinfo)
def stop(self):
player.stop()
def exit(self):
player.exit()
loop.quit()
bus = SessionBus()
bus.publish("github.zhangn1985.dbplay", DBUSPlayerService())
loop.run()
开发者ID:wwqgtxx,项目名称:ykdl,代码行数:30,代码来源:dbusplayer.py
示例16: Hello
"""
<node>
<interface name='net.lew21.pydbus.ClientServerExample'>
<method name='Hello'>
<arg type='s' name='response' direction='out'/>
</method>
<method name='EchoString'>
<arg type='s' name='a' direction='in'/>
<arg type='s' name='response' direction='out'/>
</method>
<method name='Quit'/>
</interface>
</node>
"""
def Hello(self):
"""returns the string 'Hello, World!'"""
return "Hello, World!"
def EchoString(self, s):
"""returns whatever is passed to it"""
return s
def Quit(self):
"""removes this object from the DBUS connection and exits"""
loop.quit()
bus = SessionBus()
bus.publish("net.lew21.pydbus.ClientServerExample", MyDBUSService())
loop.run()
开发者ID:akruis,项目名称:pydbus,代码行数:30,代码来源:server.py
示例17: Flask
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from flask import Flask
from flask import request
app = Flask(__name__)
from pydbus import SessionBus
bus = SessionBus()
try:
player = bus.get("github.zhangn1985.dbplay")
except:
from playthread import Mpvplayer
player = Mpvplayer()
player.start()
import json
import types
from ykdl.common import url_to_module
@app.route('/play', methods=['POST', 'GET'])
def play():
if request.method == 'POST':
url = request.form['url']
try:
islist = request.form['list']
islist = islist == "True"
except:
islist = False
m,u = url_to_module(url)
开发者ID:wwqgtxx,项目名称:ykdl,代码行数:31,代码来源:webykdl.py
示例18: __init__
class Py3status:
"""
"""
# available configuration parameters
button_stop = None
button_toggle = 1
button_next = 4
button_previous = 5
format = "{previous}{toggle}{next} {state} [{artist} - ][{title}]"
format_none = "no player running"
icon_pause = u"▮"
icon_play = u"▶"
icon_stop = u"◾"
icon_next = u"»"
icon_previous = u"«"
state_pause = u"▮"
state_play = u"▶"
state_stop = u"◾"
player_priority = []
def __init__(self):
self._dbus = None
self._data = {}
self._kill = False
self._mpris_players = {}
self._mpris_names = {}
self._mpris_name_index = {}
self._player = None
self._player_details = {}
self._tries = 0
def post_config_hook(self):
self._dbus = SessionBus()
self._start_listener()
def _init_data(self):
self._data = {
"album": None,
"artist": None,
"error_occurred": False,
"length": None,
"player": None,
"state": STOPPED,
"title": None,
}
if self._player is None:
return
try:
self._data["player"] = self._player.Identity
playback_status = self._player.PlaybackStatus
self._data["state"] = self._get_state(playback_status)
metadata = self._player.Metadata
self._update_metadata(metadata)
except Exception:
self._data["error_occurred"] = True
def _get_button_state(self, control_state):
try:
# Workaround: The last parameter returns True for the Stop button.
clickable = getattr(self._player, control_state["clickable"], True)
except Exception:
clickable = False
state = self._data.get("state")
if control_state["action"] == "Play" and state == PLAYING:
clickable = False
elif control_state["action"] == "Pause" and state in [STOPPED, PAUSED]:
clickable = False
elif control_state["action"] == "Stop" and state == STOPPED:
clickable = False
return clickable
def _get_state(self, playback_status):
if playback_status == "Playing":
return PLAYING
elif playback_status == "Paused":
return PAUSED
else:
return STOPPED
def _get_text(self):
"""
Get the current metadata
"""
if self._data.get("state") == PLAYING:
color = self.py3.COLOR_PLAYING or self.py3.COLOR_GOOD
state_symbol = self.state_play
elif self._data.get("state") == PAUSED:
color = self.py3.COLOR_PAUSED or self.py3.COLOR_DEGRADED
state_symbol = self.state_pause
else:
color = self.py3.COLOR_STOPPED or self.py3.COLOR_BAD
state_symbol = self.state_stop
if self._data.get("error_occurred"):
color = self.py3.COLOR_BAD
#.........这里部分代码省略.........
开发者ID:btall,项目名称:py3status,代码行数:101,代码来源:mpris.py
示例19: ScarlettListenerI
#.........这里部分代码省略.........
"tee name=tee",
"queue name=appsink_queue silent=false leaky=2 max-size-buffers=0 max-size-time=0 max-size-bytes=0",
# caps=audio/x-raw,format=(string)S16LE,rate=(int)16000,channels=(int)1,layout=(string)interleaved # NOQA
"appsink name=appsink drop=false max-buffers=10 sync=false emit-signals=true tee.",
"queue leaky=2 name=fakesink_queue",
"fakesink",
]
return _gst_launch
def cancel(self):
"""
Threads in python are not cancellable, so we implement our own
cancellation logic
"""
self.cancelled = True
@abort_on_exception
def run(self, event=None):
# TODO: WE NEED TO USE A THREADING EVENT OR A RLOCK HERE TO WAIT TILL DBUS SERVICE IS RUNNING TO CONNECT
# TODO: SIGNALS TO THE DBUS PROXY METHODS WE WANT TO USE
# TODO: lock.acquire() / event / condition
# TODO: self.connect_to_dbus()
# TODO: self.setup_dbus_callbacks_handlers()
self._connect_to_dbus()
self.init_gst()
print("Running {}".format(str(self)))
self.play()
self.emit("playback-status-changed")
self.emit("playing-changed")
# FIXME: is this needed? # self.mainloop.run()
def _connect_to_dbus(self):
self.bus = SessionBus()
self.dbus_proxy = self.bus.get(
"org.scarlett", object_path="/org/scarlett/Listener"
) # NOQA
self.dbus_proxy.emitConnectedToListener("ScarlettListener")
time.sleep(2)
logger.info("_connect_to_dbus")
# TODO: Add a ss_cancel_signal.disconnect() function later
ss_cancel_signal = self.bus.subscribe(
sender=None,
iface="org.scarlett.Listener",
signal="ListenerCancelSignal",
object="/org/scarlett/Listener",
arg0=None,
flags=0,
signal_fired=self.on_cancel_listening,
)
# NOTE: This function generates the dot file, checks that graphviz in installed and
# then finally generates a png file, which it then displays
def on_debug_activate(self):
# FIXME: This needs to use dynamic paths, it's possible that we're having issues because of order of operations
# FIXME: STATIC PATH 7/3/2018
# dotfile = (
# "/home/pi/dev/bossjones-github/scarlett_os/_debug/generator-listener.dot"
# )
# pngfile = "/home/pi/dev/bossjones-github/scarlett_os/_debug/generator-listener-pipeline.png" # NOQA
dotfile = self._dotfile_listener
pngfile = self._pngfile_listener
if os.access(dotfile, os.F_OK):
os.remove(dotfile)
if os.access(pngfile, os.F_OK):
开发者ID:bossjones,项目名称:scarlett_os,代码行数:67,代码来源:listener.py
示例20: command_cb
def command_cb(*args, **kwargs):
if SCARLETT_DEBUG:
logger.debug("command_cb PrettyPrinter: ")
pp = pprint.PrettyPrinter(indent=4)
pp.pprint(args)
# MAR 13 2016
logger.debug("command_cb kwargs")
print_keyword_args(**kwargs)
# (con=<DBusConnection object at 0x7f3fba21f0f0 (GDBusConnection at 0x2ede000)>,
# sender=':1.0',
# object='/org/scarlett/Listener',
# iface='org.scarlett.Listener',
# signal='CommandRecognizedSignal',
# params=GLib.Variant('(sss)', (' ScarlettListener caugh...ommand match', 'pi-response', 'what time is it')))
# NOTE: THIS IS WHAT FIXED THE GENERATOR NONSENSE
# source: https://www.python.org/dev/peps/pep-0343/
def player_generator_func():
for path in wavefile:
path = os.path.abspath(os.path.expanduser(path))
yield True
print("for path in wavefile")
p = generator_player.ScarlettPlayer(path, False)
while True:
try:
yield p.next()
finally:
time.sleep(p.duration)
p.close(force=True)
yield False
def run_player(function):
gen = function()
GObject.idle_add(lambda: next(gen, False), priority=GLib.PRIORITY_HIGH)
def speaker_generator_func():
for scarlett_text in tts_list:
yield True
print("scarlett_text in tts_list")
_wavepath = "/home/pi/dev/bossjones-github/scarlett-dbus-poc/espeak_tmp.wav"
s = generator_speaker.ScarlettSpeaker(text_to_speak=scarlett_text,
wavpath=_wavepath,
skip_player=True)
p = generator_player.ScarlettPlayer(_wavepath, False)
logger.error("Duration: p.duration: {}".format(p.duration))
while True:
try:
yield p.next()
finally:
time.sleep(p.duration)
p.close(force=True)
s.close(force=True)
yield False
def run_speaker(function):
gen = function()
GObject.idle_add(lambda: next(gen, False), priority=GLib.PRIORITY_HIGH)
for i, v in enumerate(args):
if SCARLETT_DEBUG:
logger.debug("Type v: {}".format(type(v)))
logger.debug("Type i: {}".format(type(i)))
if isinstance(v, tuple):
if SCARLETT_DEBUG:
logger.debug(
"THIS SHOULD BE A Tuple now: {}".format(v))
msg, scarlett_sound, command = v
logger.warning(" msg: {}".format(msg))
logger.warning(
" scarlett_sound: {}".format(scarlett_sound))
logger.warning(" command: {}".format(command))
# 1. play sound first
wavefile = SoundType.get_path(scarlett_sound)
run_player_result = run_player(player_generator_func)
# 2. Perform command
command_run_results = generator_commands.Command.check_cmd(command_tuple=v)
# 3. Verify it is not a command NO_OP
if command_run_results == '__SCARLETT_NO_OP__':
logger.error("__SCARLETT_NO_OP__")
return False
# 4. Scarlett Speaks
tts_list = SpeakerType.speaker_to_array(command_run_results)
run_speaker_result = run_speaker(speaker_generator_func)
# 5. Emit signal to reset keyword match ( need to implement this )
bus = SessionBus()
ss = bus.get("org.scarlett", object_path='/org/scarlett/Listener') # NOQA
time.sleep(1)
ss.emitListenerCancelSignal()
# 6. Finished call back
else:
logger.debug("THIS IS NOT A GLib.Variant: {} - TYPE {}".format(v, type(v)))
开发者ID:bossjones,项目名称:scarlett-dbus-poc,代码行数:97,代码来源:generator_tasker.py
注:本文中的pydbus.SessionBus类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论