本文整理汇总了Python中mavutil.mavlink_connection函数的典型用法代码示例。如果您正苦于以下问题:Python mavlink_connection函数的具体用法?Python mavlink_connection怎么用?Python mavlink_connection使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了mavlink_connection函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: refresh
def refresh(self):
# open port and request all parameters
inmaster = mavutil.mavlink_connection(self.indevice, input=False)
inmaster.target_system = 20
inmaster.param_fetch_all()
inmaster.close()
# wait until all parameters received
outmaster = mavutil.mavlink_connection(self.outdevice)
outmaster.port.settimeout(2)
t = time.time()
while True:
try:
m = outmaster.recv_msg()
except socket.timeout:
outmaster.close()
return TIMEOUT
if t + 2 < time.time():
outmaster.close()
print "timeout"
return TIMEOUT
if m is not None:
if type(m) == mavlink.MAVLink_param_value_message:
t = time.time()
self.param_value_received.emit(m)
# print m.param_count, "/", m.param_index, m.param_id, m.param_value
# print m.param_id.split('_')[0]
if m.param_count == m.param_index + 1:
outmaster.close()
return LAST_VALUE
开发者ID:barthess,项目名称:volat3,代码行数:32,代码来源:main.py
示例2: __init__
def __init__(self, filename):
self.root = Tkinter.Tk()
self.filesize = os.path.getsize(filename)
self.filepos = 0.0
self.mlog = mavutil.mavlink_connection(filename, planner_format=opts.planner,
robust_parsing=True)
self.mout = []
for m in opts.out:
self.mout.append(mavutil.mavlink_connection(m, input=False, baud=opts.baudrate))
self.fgout = []
for f in opts.fgout:
self.fgout.append(mavutil.mavudp(f, input=False))
self.fdm = fgFDM.fgFDM()
self.msg = self.mlog.recv_match(condition=opts.condition)
if self.msg is None:
sys.exit(1)
self.last_timestamp = getattr(self.msg, '_timestamp')
self.paused = False
self.topframe = Tkinter.Frame(self.root)
self.topframe.pack(side=Tkinter.TOP)
self.frame = Tkinter.Frame(self.root)
self.frame.pack(side=Tkinter.LEFT)
self.slider = Tkinter.Scale(self.topframe, from_=0, to=1.0, resolution=0.01,
orient=Tkinter.HORIZONTAL, command=self.slew)
self.slider.pack(side=Tkinter.LEFT)
self.clock = Tkinter.Label(self.topframe,text="")
self.clock.pack(side=Tkinter.RIGHT)
self.playback = Tkinter.Spinbox(self.topframe, from_=0, to=20, increment=0.1, width=3)
self.playback.pack(side=Tkinter.BOTTOM)
self.playback.delete(0, "end")
self.playback.insert(0, 1)
self.buttons = {}
self.button('quit', 'gtk-quit.gif', self.frame.quit)
self.button('pause', 'media-playback-pause.gif', self.pause)
self.button('rewind', 'media-seek-backward.gif', self.rewind)
self.button('forward', 'media-seek-forward.gif', self.forward)
self.button('status', 'Status', self.status)
self.flightmode = Tkinter.Label(self.frame,text="")
self.flightmode.pack(side=Tkinter.RIGHT)
self.next_message()
self.root.mainloop()
开发者ID:PX4,项目名称:Flow,代码行数:54,代码来源:mavplayback.py
示例3: playback
def playback(filename, images):
'''playback one file'''
mlog = mavutil.mavlink_connection(filename, robust_parsing=True)
mout = mavutil.mavlink_connection(opts.out, input=False, baud=opts.baudrate)
# get first message
msg = mlog.recv_match(condition=opts.condition)
last_timestamp = msg._timestamp
last_print = time.time()
# skip any older images
while len(images) and images[0].frame_time < msg._timestamp:
images.pop(0)
params = []
param_send = []
while True:
msg = mlog.recv_match(condition=opts.condition)
if msg is None:
return
if msg.get_type() == 'PARAM_VALUE':
params.append(msg)
mout.write(msg.get_msgbuf())
deltat = msg._timestamp - last_timestamp
if len(images) == 0 or images[0].frame_time > msg._timestamp + 2:
# run at high speed except for the portions where we have images
deltat /= 60
time.sleep(max(min(deltat/opts.speedup, 5), 0))
last_timestamp = msg._timestamp
if time.time() - last_print > 2.0:
print('%s' % (time.asctime(time.localtime(msg._timestamp))))
last_print = time.time()
if len(images) and msg._timestamp > images[0].frame_time:
img = images.pop(0)
try:
os.unlink('fake_chameleon.tmp')
except Exception:
pass
os.symlink(img.filename, 'fake_chameleon.tmp')
os.rename('fake_chameleon.tmp', 'fake_chameleon.pgm')
print(img.filename)
# check for parameter fetch messages
msg = mout.recv_msg()
if msg and msg.get_type() == 'PARAM_REQUEST_LIST':
print("Sending %u parameters" % len(params))
param_send = params[:]
if len(param_send) != 0:
p = param_send.pop(0)
mout.write(p.get_msgbuf())
开发者ID:tjhowse,项目名称:cuav,代码行数:53,代码来源:playback.py
示例4: __init__
def __init__(self, parent, out_queue, sysid=39):
threading.Thread.__init__(self)
self.sysid = sysid
# use dest port 32002, sysid 39
# use dest port 32001, sysid 43
self.mavs = mavutil.mavlink_connection("udp:127.0.0.1:17779", input=True, source_system=self.sysid)
self.mavo = mavutil.mavlink_connection("udp:127.0.0.1:32001", input=False, source_system=self.sysid)
# self.mav2 = mavutil.mavlink_connection("udp:127.0.0.1:32002", input=False, source_system=self.sysid)
self.mavqgc = mavutil.mavlink_connection("udp:127.0.0.1:14550", input=False, source_system=self.sysid)
# self.mavs = mavutil.mavlink_connection("udp:127.0.0.1:17779", input=True, source_system=self.sysid)
# self.mavo = mavutil.mavlink_connection("udp:127.0.0.1:32002", input=False, source_system=self.sysid)
self.out_queue = out_queue
self.running = True
self.daemon = True
self.parent = parent
print "DS isDaemon", self.isDaemon()
开发者ID:koro,项目名称:python-multiwii,代码行数:16,代码来源:DataSource.py
示例5: flight_time
def flight_time(logfile):
'''work out flight time for a log file'''
print("Processing log %s" % filename)
mlog = mavutil.mavlink_connection(filename)
in_air = False
start_time = 0.0
total_time = 0.0
t = None
while True:
m = mlog.recv_match(type='VFR_HUD')
if m is None:
if in_air:
total_time += time.mktime(t) - start_time
if total_time > 0:
print("Flight time : %u:%02u" % (int(total_time)/60, int(total_time)%60))
return total_time
t = time.localtime(m._timestamp)
if m.groundspeed > 3.0 and not in_air:
print("In air at %s" % time.asctime(t))
in_air = True
start_time = time.mktime(t)
elif m.groundspeed < 3.0 and in_air:
print("On ground at %s" % time.asctime(t))
in_air = False
total_time += time.mktime(t) - start_time
return total_time
开发者ID:PX4,项目名称:Flow,代码行数:28,代码来源:flighttime.py
示例6: sigloss
def sigloss(logfile):
'''work out signal loss times for a log file'''
print("Processing log %s" % filename)
mlog = mavutil.mavlink_connection(filename,
planner_format=opts.planner,
notimestamps=opts.notimestamps,
robust_parsing=opts.robust)
last_t = 0
types = opts.types
if types is not None:
types = types.split(',')
while True:
m = mlog.recv_match(condition=opts.condition)
if m is None:
return
if types is not None and m.get_type() not in types:
continue
if opts.notimestamps:
if not 'usec' in m._fieldnames:
continue
t = m.usec / 1.0e6
else:
t = m._timestamp
if last_t != 0:
if t - last_t > opts.deltat:
print("Sig lost for %.1fs at %s" % (t-last_t, time.asctime(time.localtime(t))))
last_t = t
开发者ID:kd0aij,项目名称:matrixpilot_old,代码行数:30,代码来源:sigloss.py
示例7: drive_APMrover2
def drive_APMrover2(viewerip=None):
'''drive APMrover2 in SIL
you can pass viewerip as an IP address to optionally send fg and
mavproxy packets too for local viewing of the mission in real time
'''
global homeloc
options = '--sitl=127.0.0.1:5501 --out=127.0.0.1:19550 --streamrate=10'
if viewerip:
options += " --out=%s:14550" % viewerip
sil = util.start_SIL('APMrover2', wipe=True)
mavproxy = util.start_MAVProxy_SIL('APMrover2', options=options)
mavproxy.expect('Received [0-9]+ parameters')
# setup test parameters
mavproxy.send('param set SYSID_THISMAV %u\n' % random.randint(100, 200))
mavproxy.send("param load %s/Rover.parm\n" % testdir)
mavproxy.expect('Loaded [0-9]+ parameters')
# restart with new parms
util.pexpect_close(mavproxy)
util.pexpect_close(sil)
sim_cmd = util.reltopdir('Tools/autotest/pysim/sim_rover.py') + ' --rate=50 --home=%f,%f,%u,%u' % (
HOME.lat, HOME.lng, HOME.alt, HOME.heading)
runsim = pexpect.spawn(sim_cmd, logfile=sys.stdout, timeout=10)
runsim.delaybeforesend = 0
util.pexpect_autoclose(runsim)
runsim.expect('Starting at lat')
sil = util.start_SIL('APMrover2')
mavproxy = util.start_MAVProxy_SIL('APMrover2', options=options)
mavproxy.expect('Logging to (\S+)')
logfile = mavproxy.match.group(1)
print("LOGFILE %s" % logfile)
buildlog = util.reltopdir("../buildlogs/APMrover2-test.mavlog")
print("buildlog=%s" % buildlog)
if os.path.exists(buildlog):
os.unlink(buildlog)
os.link(logfile, buildlog)
mavproxy.expect('Received [0-9]+ parameters')
util.expect_setup_callback(mavproxy, expect_callback)
expect_list_clear()
expect_list_extend([runsim, sil, mavproxy])
print("Started simulator")
# get a mavlink connection going
try:
mav = mavutil.mavlink_connection('127.0.0.1:19550', robust_parsing=True)
except Exception, msg:
print("Failed to start mavlink connection on 127.0.0.1:19550" % msg)
raise
开发者ID:DaveCrone,项目名称:ardupilot,代码行数:60,代码来源:apmrover2.py
示例8: process
def process(filename):
'''process one logfile'''
print("Processing %s" % filename)
mlog = mavutil.mavlink_connection(filename, notimestamps=opts.notimestamps,
robust_parsing=opts.robust)
output = None
count = 1
dirname = os.path.dirname(filename)
while True:
m = mlog.recv_match(condition=opts.condition)
if m is None:
break
if mlog.flightmode.upper() == opts.mode.upper():
if output is None:
path = os.path.join(dirname, "%s%u.log" % (opts.mode, count))
count += 1
print("Creating %s" % path)
output = mavutil.mavlogfile(path, write=True)
else:
if output is not None:
output.close()
output = None
if output and m.get_type() != 'BAD_DATA':
timestamp = getattr(m, '_timestamp', None)
output.write(struct.pack('>Q', timestamp*1.0e6))
output.write(m.get_msgbuf())
开发者ID:kd0aij,项目名称:matrixpilot_old,代码行数:30,代码来源:mavextract.py
示例9: transmit
def transmit(self):
inmaster = mavutil.mavlink_connection(self.indevice, input=False)
inmaster.target_system = 20
i = 0
for an in self.grid:
param_id = "AN_ch" + str(i + 1) + "_c1"
v = int(self.grid[i].lineedit_c1.text())
inmaster.mav.param_set_send(20, 0, param_id, v, 6)
time.sleep(0.05)
param_id = "AN_ch" + str(i + 1) + "_c2"
v = int(self.grid[i].lineedit_c2.text())
inmaster.mav.param_set_send(20, 0, param_id, v, 6)
time.sleep(0.05)
param_id = "AN_ch" + str(i + 1) + "_c3"
v = int(self.grid[i].lineedit_c3.text())
inmaster.mav.param_set_send(20, 0, param_id, v, 6)
time.sleep(0.05)
i += 1
print "saved AN", i
# param_id = "AN_ch" + str(i + 1) + "_c3"
# v = int(self.grid[i].lineedit_c3.text())
# inmaster.mav.param_set_send(20, 0, param_id, v, 6)
# time.sleep(0.05)
inmaster.close()
开发者ID:barthess,项目名称:volat3,代码行数:30,代码来源:analog.py
示例10: input
def input(q, e_pause, e_kill, device):#{{{
""" Менеджер входящих сообщений.
q -- очередь сообщений, в которую надо складывать успешно принятые пакеты
device -- сетевой сокет, из которого сыпятся байты, в идеале содержащие пакеты """
# ждем, пока нас снимут с паузы
dbgprint("**** link input thread ready")
e_pause.wait()
dbgprint("**** link input thread run")
master = mavutil.mavlink_connection(device)
master.port.settimeout(1)
m = None
while True:
if e_kill.is_set():
dbgprint("**** Link input thread. Sigterm received. Exiting")
return
try: m = master.recv_msg()
except socket.timeout: pass
if m is not None:
# print type(m)
try: q.put_nowait(m)
except Full: pass
开发者ID:barthess,项目名称:volat3,代码行数:27,代码来源:link.py
示例11: flight_time
def flight_time(logfile):
'''work out flight time for a log file'''
print("Processing log %s" % filename)
mlog = mavutil.mavlink_connection(filename)
in_air = False
start_time = 0.0
total_time = 0.0
t = None
while True:
m = mlog.recv_match(type='VFR_HUD', condition=opts.condition)
if m is None:
if in_air:
total_time += time.mktime(t) - start_time
if total_time > 0:
print("Flight time : %u:%02u" % (int(total_time)/60, int(total_time)%60))
return total_time
t = time.localtime(m._timestamp)
if m.groundspeed > opts.groundspeed and not in_air:
print("In air at %s (percent %.0f%% groundspeed %.1f)" % (time.asctime(t), mlog.percent, m.groundspeed))
in_air = True
start_time = time.mktime(t)
elif m.groundspeed < opts.groundspeed and in_air:
print("On ground at %s (percent %.1f%% groundspeed %.1f time=%.1f seconds)" % (
time.asctime(t), mlog.percent, m.groundspeed, time.mktime(t) - start_time))
in_air = False
total_time += time.mktime(t) - start_time
return total_time
开发者ID:CptMacHammer,项目名称:au_uav_pkg,代码行数:29,代码来源:flighttime.py
示例12: init_mavlink
def init_mavlink(self):
# create a mavlink instance
self.mav1 = mavutil.mavlink_connection("COM21", 57600,10)
print("Waiting for HEARTBEAT")
#self.mav1.wait_heartbeat()
#print("Heartbeat from APM (system %u component %u)" % (self.mav1.target_system, self.mav1.target_component))
self.mav1.target_system = 1
self.mav1.target_component = 1
开发者ID:pattawong,项目名称:mavlink_gateway,代码行数:8,代码来源:qt.py
示例13: write_rom
def write_rom():
inmaster = mavutil.mavlink_connection(indevice, input=False)
inmaster.target_system = 20
v = 32767
param_id = "REL_Z_0"
inmaster.mav.param_set_send(20, 0, param_id, v, 5)
time.sleep(0.1)
开发者ID:barthess,项目名称:volat3,代码行数:9,代码来源:hack_discrete_tune.py
示例14: mavflightview
def mavflightview(filename):
print("Loading %s ..." % filename)
mlog = mavutil.mavlink_connection(filename)
wp = mavwp.MAVWPLoader()
if opts.mission is not None:
wp.load(opts.mission)
path = []
while True:
m = mlog.recv_match(type=['MISSION_ITEM', 'GLOBAL_POSITION_INT'])
if m is None:
break
if m.get_type() == 'GLOBAL_POSITION_INT' and mlog.check_condition(opts.condition):
if opts.mode is not None and mlog.flightmode.lower() != opts.mode.lower():
continue
lat = m.lat * 1.0e-7
lng = m.lon * 1.0e-7
if lat != 0 or lng != 0:
if mlog.flightmode in colourmap:
point = (lat, lng, colourmap[mlog.flightmode])
else:
point = (lat, lng)
path.append(point)
if m.get_type() == 'MISSION_ITEM':
wp.set(m, m.seq)
if len(path) == 0:
print("No points to plot")
return
bounds = mp_util.polygon_bounds(path)
(lat, lon) = (bounds[0]+bounds[2], bounds[1])
(lat, lon) = mp_util.gps_newpos(lat, lon, -45, 50)
ground_width = mp_util.gps_distance(lat, lon, lat-bounds[2], lon+bounds[3])
while (mp_util.gps_distance(lat, lon, bounds[0], bounds[1]) >= ground_width-20 or
mp_util.gps_distance(lat, lon, lat, bounds[1]+bounds[3]) >= ground_width-20):
ground_width += 10
path_obj = mp_slipmap.SlipPolygon('FlightPath', path, layer='FlightPath',
linewidth=2, colour=(255,0,180))
mission = wp.polygon()
if len(mission) > 1:
mission_obj = mp_slipmap.SlipPolygon('Mission', wp.polygon(), layer='Mission',
linewidth=2, colour=(255,255,255))
else:
mission_obj = None
if opts.imagefile:
create_imagefile(opts.imagefile, (lat,lon), ground_width, path_obj, mission_obj)
else:
map = mp_slipmap.MPSlipMap(title=filename,
service=opts.service,
elevation=True,
width=600,
height=600,
ground_width=ground_width,
lat=lat, lon=lon)
map.add_object(path_obj)
if mission_obj is not None:
map.add_object(mission_obj)
开发者ID:OpenGelo,项目名称:MAVProxy,代码行数:57,代码来源:mavflightview.py
示例15: process
def process(logfile):
"""display accel cal for a log file"""
mlog = mavutil.mavlink_connection(
filename, planner_format=opts.planner, notimestamps=opts.notimestamps, robust_parsing=opts.robust
)
m = mlog.recv_match(type="SENSOR_OFFSETS")
if m is not None:
z_sensor = (m.accel_cal_z - 9.805) * (4096 / 9.81)
print("accel cal %5.2f %5.2f %5.2f %6u %s" % (m.accel_cal_x, m.accel_cal_y, m.accel_cal_z, z_sensor, logfile))
开发者ID:jimbofagan,项目名称:au_uav_pkg,代码行数:10,代码来源:mav_accel.py
示例16: fly_ArduCopter_scripted
def fly_ArduCopter_scripted(testname):
'''fly ArduCopter in SIL
'''
global homeloc
sim_cmd = util.reltopdir('Tools/autotest/pysim/sim_multicopter.py') + ' --frame=%s --rate=400 --home=%f,%f,%u,%u' % (
FRAME, HOME.lat, HOME.lng, HOME.alt, HOME.heading)
sim_cmd += ' --wind=6,45,.3'
sil = util.start_SIL('ArduCopter', wipe=True)
mavproxy = util.start_MAVProxy_SIL('ArduCopter', options='--sitl=127.0.0.1:5501 --out=127.0.0.1:19550 --quadcopter')
mavproxy.expect('Received [0-9]+ parameters')
# setup test parameters
mavproxy.send('param set SYSID_THISMAV %u\n' % random.randint(100, 200))
mavproxy.send("param load %s/ArduCopter.parm\n" % testdir)
mavproxy.expect('Loaded [0-9]+ parameters')
# reboot with new parameters
util.pexpect_close(mavproxy)
util.pexpect_close(sil)
sil = util.start_SIL('ArduCopter', height=HOME.alt)
sim = pexpect.spawn(sim_cmd, logfile=sys.stdout, timeout=10)
sim.delaybeforesend = 0
util.pexpect_autoclose(sim)
options = '--sitl=127.0.0.1:5501 --out=127.0.0.1:19550 --quadcopter --streamrate=5'
mavproxy = util.start_MAVProxy_SIL('ArduCopter', options=options)
mavproxy.expect('Logging to (\S+)')
logfile = mavproxy.match.group(1)
print("LOGFILE %s" % logfile)
buildlog = util.reltopdir("../buildlogs/ArduCopter-test.tlog")
print("buildlog=%s" % buildlog)
if os.path.exists(buildlog):
os.unlink(buildlog)
os.link(logfile, buildlog)
# the received parameters can come before or after the ready to fly message
mavproxy.expect(['Received [0-9]+ parameters', 'Ready to FLY'])
mavproxy.expect(['Received [0-9]+ parameters', 'Ready to FLY'])
util.expect_setup_callback(mavproxy, arducopter.expect_callback)
expect_list_clear()
expect_list_extend([sim, sil, mavproxy])
# get a mavlink connection going
try:
mav = mavutil.mavlink_connection('127.0.0.1:19550', robust_parsing=True)
except Exception, msg:
print("Failed to start mavlink connection on 127.0.0.1:19550" % msg)
raise
开发者ID:AlexeyKrapotkin,项目名称:ardupilot-mpng,代码行数:55,代码来源:autotest_jenkins.py
示例17: __init__
def __init__(self):
'''
Constructor
'''
QtCore.QThread.__init__(self)
self.exiting = False
udp_ip = ground_config.get("remote_udp", "ip")
udp_port = ground_config.get("remote_udp", "port")
self._link = mavutil.mavlink_connection("udp:{}:{}".format(udp_ip, udp_port), input=False, source_system=1)
# send initial heartbeat to get things rolling
self.send_heartbeat()
开发者ID:TimJay,项目名称:RaspberryPylot,代码行数:11,代码来源:UDPSender.py
示例18: run_mission
def run_mission(mission_path, frame, home, viewerip=None):
sim_cmd = util.reltopdir('Tools/autotest/pysim/sim_multicopter.py')
sim_cmd += ' --frame=%s --rate=400 --home=%f,%f,%u,%u' % (
frame, home.lat, home.lng, home.alt, home.heading)
sim_cmd += ' --wind=6,45,.3'
if viewerip:
sim_cmd += ' --fgout=%s:5503' % viewerip
sil = util.start_SIL('ArduCopter', wipe=True)
mavproxy = util.start_MAVProxy_SIL(
'ArduCopter',
options='--sitl=127.0.0.1:5501 --out=127.0.0.1:19550 --quadcopter')
mavproxy.expect('Received [0-9]+ parameters')
# setup test parameters
mavproxy.send('param set SYSID_THISMAV %u\n' % random.randint(100, 200))
mavproxy.send("param load %s/autotest/ArduCopter.parm\n" % testdir)
mavproxy.expect('Loaded [0-9]+ parameters')
mavproxy.send('module load mmap\n')
# reboot with new parameters
util.pexpect_close(mavproxy)
util.pexpect_close(sil)
sil = util.start_SIL('ArduCopter', height=home.alt)
print 'Executing command %s' % (sim_cmd,)
sim = pexpect.spawn(sim_cmd, logfile=sys.stdout, timeout=10)
sim.delaybeforesend = 0
util.pexpect_autoclose(sim)
options = ('--sitl=127.0.0.1:5501 --out=127.0.0.1:19550 --quadcopter '
'--streamrate=5')
if viewerip:
options += ' --out=%s:14550' % viewerip
mavproxy = util.start_MAVProxy_SIL('ArduCopter', options=options)
mavproxy.expect('Logging to (\S+)')
logfile = mavproxy.match.group(1)
print 'Saving log %s' % (logfile,)
# the received parameters can come before or after the ready to fly message
mavproxy.expect(['Received [0-9]+ parameters', 'Ready to FLY'])
mavproxy.expect(['Received [0-9]+ parameters', 'Ready to FLY'])
mavproxy.send('module load mmap\n')
util.expect_setup_callback(mavproxy, common.expect_callback)
common.expect_list_clear()
common.expect_list_extend([sim, sil, mavproxy])
# get a mavlink connection going
try:
mav = mavutil.mavlink_connection('127.0.0.1:19550', robust_parsing=True)
except Exception, msg:
error("Failed to start mavlink connection on 127.0.0.1:19550" % msg)
raise
开发者ID:363546178,项目名称:ardupilot-mega,代码行数:54,代码来源:run_sim_mission.py
示例19: mavparms
def mavparms(logfile):
'''extract mavlink parameters'''
mlog = mavutil.mavlink_connection(filename)
while True:
m = mlog.recv_match(type='PARAM_VALUE')
if m is None:
return
pname = str(m.param_id).strip()
if len(pname) > 0:
parms[pname] = m.param_value
开发者ID:CptMacHammer,项目名称:PiDeCAF,代码行数:11,代码来源:mavparms.py
示例20: magfit
def magfit(logfile):
'''find best magnetometer offset fit to a log file'''
print("Processing log %s" % filename)
mlog = mavutil.mavlink_connection(filename, notimestamps=opts.notimestamps)
data = []
last_t = 0
offsets = Vector3(0,0,0)
# now gather all the data
while True:
m = mlog.recv_match(condition=opts.condition)
if m is None:
break
if m.get_type() == "SENSOR_OFFSETS":
# update current offsets
offsets = Vector3(m.mag_ofs_x, m.mag_ofs_y, m.mag_ofs_z)
if m.get_type() == "RAW_IMU":
mag = Vector3(m.xmag, m.ymag, m.zmag)
# add data point after subtracting the current offsets
data.append(mag - offsets + noise())
print("Extracted %u data points" % len(data))
print("Current offsets: %s" % offsets)
data = select_data(data)
# remove initial outliers
data.sort(lambda a,b : radius_cmp(a,b,offsets))
data = data[len(data)/16:-len(data)/16]
# do an initial fit
(offsets, field_strength) = fit_data(data)
for count in range(3):
# sort the data by the radius
data.sort(lambda a,b : radius_cmp(a,b,offsets))
print("Fit %u : %s field_strength=%6.1f to %6.1f" % (
count, offsets,
radius(data[0], offsets), radius(data[-1], offsets)))
# discard outliers, keep the middle 3/4
data = data[len(data)/8:-len(data)/8]
# fit again
(offsets, field_strength) = fit_data(data)
print("Final : %s field_strength=%6.1f to %6.1f" % (
offsets,
radius(data[0], offsets), radius(data[-1], offsets)))
开发者ID:CptMacHammer,项目名称:au_uav_pkg,代码行数:53,代码来源:magfit.py
注:本文中的mavutil.mavlink_connection函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论