本文整理汇总了Python中tshark.run_tshark函数的典型用法代码示例。如果您正苦于以下问题:Python run_tshark函数的具体用法?Python run_tshark怎么用?Python run_tshark使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了run_tshark函数的19个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _test_p2p_ext_vendor_elem_assoc
def _test_p2p_ext_vendor_elem_assoc(dev, apdev, params):
addr0 = dev[0].p2p_dev_addr()
addr1 = dev[1].p2p_dev_addr()
if "OK" not in dev[0].request("VENDOR_ELEM_ADD 11 dd050011223308"):
raise Exception("VENDOR_ELEM_ADD failed")
if "OK" not in dev[1].request("VENDOR_ELEM_ADD 12 dd050011223309"):
raise Exception("VENDOR_ELEM_ADD failed")
if "OK" not in dev[0].request("VENDOR_ELEM_ADD 13 dd05001122330a"):
raise Exception("VENDOR_ELEM_ADD failed")
dev[0].p2p_listen()
dev[1].p2p_listen()
dev[1].p2p_go_neg_auth(addr0, "12345670", "enter", go_intent=15)
dev[0].p2p_go_neg_init(addr1, "12345670", "display", go_intent=0,
timeout=15)
dev[1].p2p_go_neg_auth_result()
dev[1].remove_group()
dev[0].wait_go_ending_session()
out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
"wlan.fc.type_subtype == 0x00", wait=False)
if "Vendor Specific Data: 3308" not in out:
raise Exception("Vendor element (P2P) not found from Association Request frame")
if "Vendor Specific Data: 330a" not in out:
raise Exception("Vendor element (non-P2P) not found from Association Request frame")
out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
"wlan.fc.type_subtype == 0x01", wait=False)
if "Vendor Specific Data: 3309" not in out:
raise Exception("Vendor element not found from Association Response frame")
开发者ID:cococorp,项目名称:hostap,代码行数:29,代码来源:test_p2p_ext.py
示例2: test_cfg80211_tx_frame
def test_cfg80211_tx_frame(dev, apdev, params):
"""cfg80211 offchannel TX frame command"""
ifindex = int(dev[0].get_driver_status_field("ifindex"))
frame = binascii.unhexlify("d000000002000000010002000000000002000000010000000409506f9a090001dd5e506f9a0902020025080401001f0502006414060500585804510b0906000200000000000b1000585804510b0102030405060708090a0b0d1d000200000000000108000000000000000000101100084465766963652041110500585804510bdd190050f204104a0001101012000200011049000600372a000120")
dev[0].request("P2P_GROUP_ADD freq=2412")
res = nl80211_frame(dev[0], ifindex, frame, freq=2422, duration=500,
offchannel_tx_ok=True)
time.sleep(0.1)
# note: Uncommenting this seems to remove the incorrect channel issue
#nl80211_frame_wait_cancel(dev[0], ifindex, res[nl80211_attr['COOKIE']])
# note: this Action frame ends up getting sent incorrectly on 2422 MHz
nl80211_frame(dev[0], ifindex, frame, freq=2412)
time.sleep(1.5)
# note: also the Deauthenticate frame sent by the GO going down ends up
# being transmitted incorrectly on 2422 MHz.
out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
"wlan.fc.type_subtype == 13", ["radiotap.channel.freq"])
if out is not None:
freq = out.splitlines()
if len(freq) != 2:
raise Exception("Unexpected number of Action frames (%d)" % len(freq))
if freq[0] != "2422":
raise Exception("First Action frame on unexpected channel: %s MHz" % freq[0])
if freq[1] != "2412":
raise Exception("Second Action frame on unexpected channel: %s MHz" % freq[1])
开发者ID:9A9A,项目名称:wpa_supplicant-fork,代码行数:30,代码来源:test_cfg80211.py
示例3: test_ap_max_num_sta_no_probe_resp
def test_ap_max_num_sta_no_probe_resp(dev, apdev, params):
"""Maximum STA count and limit on Probe Response frames"""
logdir = params['logdir']
dev[0].flush_scan_cache()
ssid = "max"
params = {}
params['ssid'] = ssid
params['beacon_int'] = "2000"
params['max_num_sta'] = "1"
params['no_probe_resp_if_max_sta'] = "1"
hostapd.add_ap(apdev[0], params)
dev[1].connect(ssid, key_mgmt="NONE", scan_freq="2412")
dev[0].scan(freq=2412, type="ONLY")
dev[0].scan(freq=2412, type="ONLY")
seen = dev[0].get_bss(apdev[0]['bssid']) != None
dev[1].scan(freq=2412, type="ONLY")
if seen:
out = run_tshark(os.path.join(logdir, "hwsim0.pcapng"),
"wlan.fc.type_subtype == 5", ["wlan.da"])
if out:
if dev[0].own_addr() not in out:
# Discovery happened through Beacon frame reception. That's not
# an error case.
seen = False
if dev[1].own_addr() not in out:
raise Exception("No Probe Response frames to dev[1] seen")
if seen:
raise Exception("AP found unexpectedly")
开发者ID:greearb,项目名称:hostap-ct,代码行数:28,代码来源:test_ap_params.py
示例4: test_ap_open_drop_duplicate
def test_ap_open_drop_duplicate(dev, apdev, params):
"""AP dropping duplicate management frames"""
hapd = hostapd.add_ap(apdev[0], {"ssid": "open",
"interworking": "1"})
hapd.set("ext_mgmt_frame_handling", "1")
bssid = hapd.own_addr().replace(':', '')
addr = "020304050607"
auth = "b0003a01" + bssid + addr + bssid + '1000000001000000'
if "OK" not in hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % auth):
raise Exception("MGMT_RX_PROCESS failed")
auth = "b0083a01" + bssid + addr + bssid + '1000000001000000'
if "OK" not in hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % auth):
raise Exception("MGMT_RX_PROCESS failed")
ies = "00046f70656e010802040b160c12182432043048606c2d1a3c101bffff0000000000000000000001000000000000000000007f0a04000a020140004000013b155151525354737475767778797a7b7c7d7e7f808182dd070050f202000100"
assoc_req = "00003a01" + bssid + addr + bssid + "2000" + "21040500" + ies
if "OK" not in hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % assoc_req):
raise Exception("MGMT_RX_PROCESS failed")
assoc_req = "00083a01" + bssid + addr + bssid + "2000" + "21040500" + ies
if "OK" not in hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % assoc_req):
raise Exception("MGMT_RX_PROCESS failed")
reassoc_req = "20083a01" + bssid + addr + bssid + "2000" + "21040500" + ies
if "OK" not in hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % reassoc_req):
raise Exception("MGMT_RX_PROCESS failed")
if "OK" not in hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % reassoc_req):
raise Exception("MGMT_RX_PROCESS failed")
action = "d0003a01" + bssid + addr + bssid + "1000" + "040a006c0200000600000102000101"
if "OK" not in hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % action):
raise Exception("MGMT_RX_PROCESS failed")
action = "d0083a01" + bssid + addr + bssid + "1000" + "040a006c0200000600000102000101"
if "OK" not in hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % action):
raise Exception("MGMT_RX_PROCESS failed")
out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
"wlan.fc.type == 0", ["wlan.fc.subtype"])
num_auth = 0
num_assoc = 0
num_reassoc = 0
num_action = 0
for subtype in out.splitlines():
val = int(subtype)
if val == 11:
num_auth += 1
elif val == 1:
num_assoc += 1
elif val == 3:
num_reassoc += 1
elif val == 13:
num_action += 1
if num_auth != 1:
raise Exception("Unexpected number of Authentication frames: %d" % num_auth)
if num_assoc != 1:
raise Exception("Unexpected number of association frames: %d" % num_assoc)
if num_reassoc != 1:
raise Exception("Unexpected number of reassociation frames: %d" % num_reassoc)
if num_action != 1:
raise Exception("Unexpected number of Action frames: %d" % num_action)
开发者ID:michael-dev,项目名称:hostapd,代码行数:59,代码来源:test_ap_open.py
示例5: test_mbo_assoc_disallow
def test_mbo_assoc_disallow(dev, apdev, params):
"""MBO and association disallowed"""
hapd1 = hostapd.add_ap(apdev[0], {"ssid": "MBO", "mbo": "1"})
hapd2 = hostapd.add_ap(apdev[1], {"ssid": "MBO", "mbo": "1"})
logger.debug("Set mbo_assoc_disallow with invalid value")
if "FAIL" not in hapd1.request("SET mbo_assoc_disallow 2"):
raise Exception("Set mbo_assoc_disallow for AP1 succeeded unexpectedly with value 2")
logger.debug("Disallow associations to AP1 and allow association to AP2")
if "OK" not in hapd1.request("SET mbo_assoc_disallow 1"):
raise Exception("Failed to set mbo_assoc_disallow for AP1")
if "OK" not in hapd2.request("SET mbo_assoc_disallow 0"):
raise Exception("Failed to set mbo_assoc_disallow for AP2")
dev[0].connect("MBO", key_mgmt="NONE", scan_freq="2412")
out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
"wlan.fc.type == 0 && wlan.fc.type_subtype == 0x00",
wait=False)
if "Destination address: " + hapd1.own_addr() in out:
raise Exception("Association request sent to disallowed AP")
timestamp = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
"wlan.fc.type_subtype == 0x00",
display=['frame.time'], wait=False)
logger.debug("Allow associations to AP1 and disallow associations to AP2")
if "OK" not in hapd1.request("SET mbo_assoc_disallow 0"):
raise Exception("Failed to set mbo_assoc_disallow for AP1")
if "OK" not in hapd2.request("SET mbo_assoc_disallow 1"):
raise Exception("Failed to set mbo_assoc_disallow for AP2")
dev[0].request("DISCONNECT")
dev[0].wait_disconnected()
# Force new scan, so the assoc_disallowed indication is updated */
dev[0].request("FLUSH")
dev[0].connect("MBO", key_mgmt="NONE", scan_freq="2412")
filter = 'wlan.fc.type == 0 && wlan.fc.type_subtype == 0x00 && frame.time > "' + timestamp.rstrip() + '"'
out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
filter, wait=False)
if "Destination address: " + hapd2.own_addr() in out:
raise Exception("Association request sent to disallowed AP 2")
开发者ID:greearb,项目名称:hostap-ct,代码行数:46,代码来源:test_mbo.py
示例6: test_kernel_unknown_action_frame_rejection_sta
def test_kernel_unknown_action_frame_rejection_sta(dev, apdev, params):
"""mac80211 and unknown Action frame rejection in STA mode"""
hapd = hostapd.add_ap(apdev[0], {"ssid": "unknown-action"})
dev[0].connect("unknown-action", key_mgmt="NONE", scan_freq="2412")
bssid = hapd.own_addr()
addr = dev[0].own_addr()
hapd.set("ext_mgmt_frame_handling", "1")
# Unicast Action frame with unknown category (response expected)
msg = {}
msg['fc'] = MGMT_SUBTYPE_ACTION << 4
msg['da'] = addr
msg['sa'] = bssid
msg['bssid'] = bssid
msg['payload'] = struct.pack("<BB", 0x70, 0)
hapd.mgmt_tx(msg)
expect_ack(hapd)
# Note: mac80211 does not allow group-addressed Action frames in unknown
# categories to be transmitted in AP mode, so for now, these steps are
# commented out.
# Multicast Action frame with unknown category (no response expected)
#msg['da'] = "01:ff:ff:ff:ff:ff"
#msg['payload'] = struct.pack("<BB", 0x71, 1)
#hapd.mgmt_tx(msg)
#expect_no_ack(hapd)
# Broadcast Action frame with unknown category (no response expected)
#msg['da'] = "ff:ff:ff:ff:ff:ff"
#msg['payload'] = struct.pack("<BB", 0x72, 2)
#hapd.mgmt_tx(msg)
#expect_no_ack(hapd)
# Unicast Action frame with error indication category (no response expected)
msg['da'] = addr
msg['payload'] = struct.pack("<BB", 0xf3, 3)
hapd.mgmt_tx(msg)
expect_ack(hapd)
# Unicast Action frame with unknown category (response expected)
msg['da'] = addr
msg['payload'] = struct.pack("<BB", 0x74, 4)
hapd.mgmt_tx(msg)
expect_ack(hapd)
out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
"wlan.sa == %s && wlan.fc.type_subtype == 0x0d" % addr,
display=["wlan_mgt.fixed.category_code"])
res = out.splitlines()
categ = [int(x) for x in res]
if 0xf2 in categ or 0xf3 in categ:
raise Exception("Unexpected Action frame rejection: " + str(categ))
if 0xf0 not in categ or 0xf4 not in categ:
raise Exception("Action frame rejection missing: " + str(categ))
开发者ID:greearb,项目名称:hostap-ct,代码行数:57,代码来源:test_kernel.py
示例7: test_ieee8021x_reauth_wep
def test_ieee8021x_reauth_wep(dev, apdev, params):
"""IEEE 802.1X and EAPOL_REAUTH request with WEP"""
logdir = params['logdir']
params = hostapd.radius_params()
params["ssid"] = "ieee8021x-open"
params["ieee8021x"] = "1"
params["wep_key_len_broadcast"] = "13"
params["wep_key_len_unicast"] = "13"
hapd = hostapd.add_ap(apdev[0], params)
dev[0].connect("ieee8021x-open", key_mgmt="IEEE8021X",
eap="PSK", identity="[email protected]",
password_hex="0123456789abcdef0123456789abcdef",
scan_freq="2412")
hwsim_utils.test_connectivity(dev[0], hapd)
hapd.request("EAPOL_REAUTH " + dev[0].own_addr())
ev = dev[0].wait_event(["CTRL-EVENT-EAP-STARTED"], timeout=5)
if ev is None:
raise Exception("EAP authentication did not start")
ev = dev[0].wait_event(["CTRL-EVENT-EAP-SUCCESS"], timeout=5)
if ev is None:
raise Exception("EAP authentication did not succeed")
time.sleep(0.1)
hwsim_utils.test_connectivity(dev[0], hapd)
out = run_tshark(os.path.join(logdir, "hwsim0.pcapng"),
"llc.type == 0x888e", ["eapol.type", "eap.code"])
if out is None:
raise Exception("Could not find EAPOL frames in capture")
num_eapol_key = 0
num_eap_req = 0
num_eap_resp = 0
for line in out.splitlines():
vals = line.split()
if vals[0] == '3':
num_eapol_key += 1
if vals[0] == '0' and len(vals) == 2:
if vals[1] == '1':
num_eap_req += 1
elif vals[1] == '2':
num_eap_resp += 1
logger.info("num_eapol_key: %d" % num_eapol_key)
logger.info("num_eap_req: %d" % num_eap_req)
logger.info("num_eap_resp: %d" % num_eap_resp)
if num_eapol_key < 4:
raise Exception("Did not see four unencrypted EAPOL-Key frames")
if num_eap_req < 6:
raise Exception("Did not see six unencrypted EAP-Request frames")
if num_eap_resp < 6:
raise Exception("Did not see six unencrypted EAP-Response frames")
开发者ID:gxk,项目名称:hostap,代码行数:52,代码来源:test_ieee8021x.py
示例8: test_owe_limited_group_set_pmf
def test_owe_limited_group_set_pmf(dev, apdev, params):
"""Opportunistic Wireless Encryption and limited group set (PMF)"""
if "OWE" not in dev[0].get_capability("key_mgmt"):
raise HwsimSkip("OWE not supported")
pcapng = os.path.join(params['logdir'], "hwsim0.pcapng")
params = {"ssid": "owe",
"wpa": "2",
"ieee80211w": "2",
"wpa_key_mgmt": "OWE",
"rsn_pairwise": "CCMP",
"owe_groups": "21"}
hapd = hostapd.add_ap(apdev[0], params)
bssid = hapd.own_addr()
dev[0].scan_for_bss(bssid, freq="2412")
dev[0].connect("owe", key_mgmt="OWE", owe_group="19", ieee80211w="2",
scan_freq="2412", wait_connect=False)
ev = dev[0].wait_event(["CTRL-EVENT-ASSOC-REJECT"], timeout=10)
dev[0].request("DISCONNECT")
if ev is None:
raise Exception("Association not rejected")
if "status_code=77" not in ev:
raise Exception("Unexpected rejection reason: " + ev)
dev[0].dump_monitor()
dev[0].connect("owe", key_mgmt="OWE", owe_group="20", ieee80211w="2",
scan_freq="2412", wait_connect=False)
ev = dev[0].wait_event(["CTRL-EVENT-ASSOC-REJECT"], timeout=10)
dev[0].request("DISCONNECT")
if ev is None:
raise Exception("Association not rejected (2)")
if "status_code=77" not in ev:
raise Exception("Unexpected rejection reason (2): " + ev)
dev[0].dump_monitor()
dev[0].connect("owe", key_mgmt="OWE", owe_group="21", ieee80211w="2",
scan_freq="2412")
dev[0].request("REMOVE_NETWORK all")
dev[0].wait_disconnected()
dev[0].dump_monitor()
out = run_tshark(pcapng,
"wlan.fc.type_subtype == 1",
display=['wlan_mgt.fixed.status_code'])
status = out.splitlines()
logger.info("Association Response frame status codes: " + str(status))
if len(status) != 3:
raise Exception("Unexpected number of Association Response frames")
if int(status[0]) != 77 or int(status[1]) != 77 or int(status[2]) != 0:
raise Exception("Unexpected Association Response frame status code")
开发者ID:michael-dev,项目名称:hostapd,代码行数:51,代码来源:test_owe.py
示例9: _test_p2p_ext_vendor_elem_go_neg_conf
def _test_p2p_ext_vendor_elem_go_neg_conf(dev, apdev, params):
addr0 = dev[0].p2p_dev_addr()
addr1 = dev[1].p2p_dev_addr()
if "OK" not in dev[0].request("VENDOR_ELEM_ADD 8 dd050011223305"):
raise Exception("VENDOR_ELEM_ADD failed")
dev[0].p2p_listen()
dev[1].p2p_listen()
dev[1].p2p_go_neg_auth(addr0, "12345670", "enter")
dev[0].p2p_go_neg_init(addr1, "12345678", "display")
dev[1].p2p_go_neg_auth_result(expect_failure=True)
out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
"wifi_p2p.public_action.subtype == 2")
if "Vendor Specific Data: 3305" not in out:
raise Exception("Vendor element not found from GO Negotiation Confirm frame")
开发者ID:cococorp,项目名称:hostap,代码行数:15,代码来源:test_p2p_ext.py
示例10: _test_scan_random_mac
def _test_scan_random_mac(dev, apdev, params):
hostapd.add_ap(apdev[0]['ifname'], { "ssid": "test-scan" })
bssid = apdev[0]['bssid']
tests = [ "",
"addr=foo",
"mask=foo",
"enable=1",
"all enable=1 mask=00:11:22:33:44:55",
"all enable=1 addr=00:11:22:33:44:55",
"all enable=1 addr=01:11:22:33:44:55 mask=ff:ff:ff:ff:ff:ff",
"all enable=1 addr=00:11:22:33:44:55 mask=fe:ff:ff:ff:ff:ff",
"enable=2 scan sched pno all",
"pno enable=1",
"all enable=2",
"foo" ]
for args in tests:
if "FAIL" not in dev[0].request("MAC_RAND_SCAN " + args):
raise Exception("Invalid MAC_RAND_SCAN accepted: " + args)
if dev[0].get_driver_status_field('capa.mac_addr_rand_scan_supported') != '1':
raise HwsimSkip("Driver does not support random MAC address for scanning")
tests = [ "all enable=1",
"all enable=1 addr=f2:11:22:33:44:55 mask=ff:ff:ff:ff:ff:ff",
"all enable=1 addr=f2:11:33:00:00:00 mask=ff:ff:ff:00:00:00" ]
for args in tests:
dev[0].request("MAC_RAND_SCAN " + args)
dev[0].scan_for_bss(bssid, freq=2412, force_scan=True)
out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
"wlan.fc.type_subtype == 4", ["wlan.ta" ])
if out is not None:
addr = out.splitlines()
logger.info("Probe Request frames seen from: " + str(addr))
if dev[0].own_addr() in addr:
raise Exception("Real address used to transmit Probe Request frame")
if "f2:11:22:33:44:55" not in addr:
raise Exception("Fully configured random address not seen")
found = False
for a in addr:
if a.startswith('f2:11:33'):
found = True
break
if not found:
raise Exception("Fixed OUI random address not seen")
开发者ID:NAM-IL,项目名称:HostAP_2_4,代码行数:46,代码来源:test_scan.py
示例11: test_p2p_channel_random_social_with_op_class_change
def test_p2p_channel_random_social_with_op_class_change(dev, apdev, params):
"""P2P group formation using random social channel with oper class change needed"""
try:
set_country("US", dev[0])
logger.info("Start group on 5 GHz")
[i_res, r_res] = go_neg_pin_authorized(i_dev=dev[0], i_intent=15,
r_dev=dev[1], r_intent=0,
test_data=False)
check_grpform_results(i_res, r_res)
freq = int(i_res['freq'])
if freq < 5000:
raise Exception("Unexpected channel %d MHz - did not pick 5 GHz preference" % freq)
remove_group(dev[0], dev[1])
logger.info("Disable 5 GHz and try to re-start group based on 5 GHz preference")
dev[0].global_request("SET p2p_oper_reg_class 115")
dev[0].global_request("SET p2p_oper_channel 36")
dev[0].global_request("P2P_SET disallow_freq 5000-6000")
[i_res, r_res] = go_neg_pin_authorized(i_dev=dev[0], i_intent=15,
r_dev=dev[1], r_intent=0,
test_data=False)
check_grpform_results(i_res, r_res)
freq = int(i_res['freq'])
if freq not in [ 2412, 2437, 2462 ]:
raise Exception("Unexpected channel %d MHz - did not pick random social channel" % freq)
remove_group(dev[0], dev[1])
out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
"wifi_p2p.public_action.subtype == 0")
if out is not None:
last = None
for l in out.splitlines():
if "Operating Channel:" not in l:
continue
last = l
if last is None:
raise Exception("Could not find GO Negotiation Request")
if "Operating Class 81" not in last:
raise Exception("Unexpected operating class: " + last.strip())
finally:
set_country("00")
dev[0].global_request("P2P_SET disallow_freq ")
dev[0].global_request("SET p2p_oper_reg_class 0")
dev[0].global_request("SET p2p_oper_channel 0")
dev[1].flush_scan_cache()
开发者ID:cococorp,项目名称:hostap-upstream,代码行数:45,代码来源:test_p2p_channel.py
示例12: test_peerkey_sniffer_check
def test_peerkey_sniffer_check(dev, apdev, params):
"""RSN AP and PeerKey between two STAs with sniffer check"""
ssid = "test-peerkey"
passphrase = "12345678"
hparams = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
hparams['peerkey'] = "1"
hapd = hostapd.add_ap(apdev[0], hparams)
Wlantest.setup(hapd)
wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
dev[0].connect(ssid, psk=passphrase, scan_freq="2412", peerkey=True)
dev[1].connect(ssid, psk=passphrase, scan_freq="2412", peerkey=True)
hwsim_utils.test_connectivity_sta(dev[0], dev[1])
dev[0].request("STKSTART " + dev[1].p2p_interface_addr())
time.sleep(1)
# NOTE: Actual use of the direct link (DLS) is not supported in
# mac80211_hwsim, so this operation fails at setting the keys after
# successfully completed 4-way handshake. This test case does allow the
# key negotiation part to be tested for coverage, though. Use sniffer to
# verify that all the SMK and STK handshake messages were transmitted.
bssid = hapd.own_addr()
addr0 = dev[0].own_addr()
addr1 = dev[1].own_addr()
# Wireshark renamed the EAPOL-Key key_info field, so need to try both the
# new and the old name to work with both versions.
try_other = False
try:
out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
"eapol.type == 3",
display=["wlan.sa", "wlan.da",
"wlan_rsna_eapol.keydes.key_info"])
except Exception, e:
if "Unknown tshark field" in str(e):
try_other = True
pass
else:
raise
开发者ID:gxk,项目名称:hostap,代码行数:43,代码来源:test_peerkey.py
示例13: test_ap_open_disconnect_in_ps
def test_ap_open_disconnect_in_ps(dev, apdev, params):
"""Disconnect with the client in PS to regression-test a kernel bug"""
hapd = hostapd.add_ap(apdev[0], {"ssid": "open"})
dev[0].connect("open", key_mgmt="NONE", scan_freq="2412",
bg_scan_period="0")
ev = hapd.wait_event(["AP-STA-CONNECTED"], timeout=5)
if ev is None:
raise Exception("No connection event received from hostapd")
time.sleep(0.2)
# enable power save mode
hwsim_utils.set_powersave(dev[0], hwsim_utils.PS_ENABLED)
time.sleep(0.1)
try:
# inject some traffic
sa = hapd.own_addr()
da = dev[0].own_addr()
hapd.request('DATA_TEST_CONFIG 1')
hapd.request('DATA_TEST_TX {} {} 0'.format(da, sa))
hapd.request('DATA_TEST_CONFIG 0')
# let the AP send couple of Beacon frames
time.sleep(0.3)
# disconnect - with traffic pending - shouldn't cause kernel warnings
dev[0].request("DISCONNECT")
finally:
hwsim_utils.set_powersave(dev[0], hwsim_utils.PS_DISABLED)
time.sleep(0.2)
out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
"wlan_mgt.tim.partial_virtual_bitmap",
["wlan_mgt.tim.partial_virtual_bitmap"])
if out is not None:
state = 0
for l in out.splitlines():
pvb = int(l, 16)
if pvb > 0 and state == 0:
state = 1
elif pvb == 0 and state == 1:
state = 2
if state != 2:
raise Exception("Didn't observe TIM bit getting set and unset (state=%d)" % state)
开发者ID:michael-dev,项目名称:hostapd,代码行数:43,代码来源:test_ap_open.py
示例14: run_discovery_while_cli
def run_discovery_while_cli(wpas, dev, params):
wpas.request("P2P_SET listen_channel 1")
dev[1].p2p_start_go(freq="2412")
addr = wpas.p2p_dev_addr()
pin = wpas.wps_read_pin()
dev[1].p2p_go_authorize_client(pin)
wpas.p2p_connect_group(dev[1].p2p_dev_addr(), pin, freq=2412, timeout=30)
pd_test(dev[0], addr)
wpas.p2p_listen()
pd_test(dev[2], addr)
wpas.p2p_stop_find()
terminate_group(dev[1], wpas)
out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
"wifi_p2p.public_action.subtype == 8", [ "wlan.da" ])
da = out.splitlines()
logger.info("PD Response DAs: " + str(da))
if len(da) != 3:
raise Exception("Unexpected DA count for PD Response")
开发者ID:janetuk,项目名称:mech_eap,代码行数:21,代码来源:test_p2p_discovery.py
示例15: _test_p2p_ext_vendor_elem_go_neg_conf
def _test_p2p_ext_vendor_elem_go_neg_conf(dev, apdev, params):
addr0 = dev[0].p2p_dev_addr()
addr1 = dev[1].p2p_dev_addr()
if "OK" not in dev[0].request("VENDOR_ELEM_ADD 8 dd050011223305"):
raise Exception("VENDOR_ELEM_ADD failed")
dev[0].p2p_listen()
dev[1].p2p_go_neg_auth(addr0, "12345670", "enter")
dev[1].p2p_listen()
dev[0].p2p_go_neg_init(addr1, "12345678", "display")
ev = dev[0].wait_global_event(["P2P-GO-NEG-SUCCESS"], timeout=15)
if ev is None:
raise Exception("GO negotiation timed out")
ev = dev[0].wait_global_event(["P2P-GROUP-FORMATION-FAILURE"], timeout=15)
if ev is None:
raise Exception("Group formation failure not indicated")
dev[0].dump_monitor()
dev[1].p2p_go_neg_auth_result(expect_failure=True)
dev[1].dump_monitor()
out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
"wifi_p2p.public_action.subtype == 2")
if "Vendor Specific Data: 3305" not in out:
raise Exception("Vendor element not found from GO Negotiation Confirm frame")
开发者ID:gxk,项目名称:hostap,代码行数:23,代码来源:test_p2p_ext.py
示例16: _test_scan_dfs
def _test_scan_dfs(dev, apdev, params):
subprocess.call(['iw', 'reg', 'set', 'US'])
for i in range(2):
for j in range(5):
ev = dev[i].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=5)
if ev is None:
raise Exception("No regdom change event")
if "alpha2=US" in ev:
break
dev[i].dump_monitor()
if "OK" not in dev[0].request("SCAN"):
raise Exception("SCAN command failed")
ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"])
if ev is None:
raise Exception("Scan did not complete")
if "OK" not in dev[0].request("SCAN freq=2412,5180,5260,5500,5600,5745"):
raise Exception("SCAN command failed")
ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"])
if ev is None:
raise Exception("Scan did not complete")
out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
"wlan.fc.type_subtype == 4", [ "radiotap.channel.freq" ])
if out is not None:
freq = out.splitlines()
freq = [int(f) for f in freq]
freq = list(set(freq))
freq.sort()
logger.info("Active scan seen on channels: " + str(freq))
for f in freq:
if (f >= 5260 and f <= 5320) or (f >= 5500 and f <= 5700):
raise Exception("Active scan on DFS channel: %d" % f)
if f in [ 2467, 2472 ]:
raise Exception("Active scan on US-disallowed channel: %d" % f)
开发者ID:cococorp,项目名称:hostap-upstream,代码行数:36,代码来源:test_scan.py
示例17: test_ap_vlan_without_station
def test_ap_vlan_without_station(dev, apdev, p):
"""AP VLAN with WPA2-PSK and no station"""
try:
subprocess.call(['brctl', 'addbr', 'brvlan1'])
subprocess.call(['brctl', 'setfd', 'brvlan1', '0'])
subprocess.call(['ifconfig', 'brvlan1', 'up'])
# use a passphrase wlantest does not know, so it cannot
# inject decrypted frames into pcap
params = hostapd.wpa2_params(ssid="test-vlan",
passphrase="12345678x")
params['dynamic_vlan'] = "1"
params['vlan_file'] = 'hostapd.wlan3.vlan'
params['accept_mac_file'] = "hostapd.accept"
hapd = hostapd.add_ap(apdev[0], params)
# inject some traffic
sa = hapd.own_addr()
da = "ff:ff:ff:ff:ff:00"
hapd.request('DATA_TEST_CONFIG 1 ifname=brvlan1')
hapd.request('DATA_TEST_TX {} {} 0'.format(da, sa))
hapd.request('DATA_TEST_CONFIG 0')
time.sleep(.1)
dev[0].connect("test-vlan", psk="12345678x", scan_freq="2412")
# inject some traffic
sa = hapd.own_addr()
da = "ff:ff:ff:ff:ff:01"
hapd.request('DATA_TEST_CONFIG 1 ifname=brvlan1')
hapd.request('DATA_TEST_TX {} {} 0'.format(da, sa))
hapd.request('DATA_TEST_CONFIG 0')
# let the AP send couple of Beacon frames
time.sleep(1)
out = run_tshark(os.path.join(p['logdir'], "hwsim0.pcapng"),
"wlan.da == ff:ff:ff:ff:ff:00",
["wlan.fc.protected"])
if out is not None:
lines = out.splitlines()
if len(lines) < 1:
raise Exception("first frame not observed")
state = 1
for l in lines:
is_protected = int(l, 16)
if is_protected != 1:
state = 0
if state != 1:
raise Exception("Broadcast packets were not encrypted when no station was connected")
else:
raise Exception("first frame not observed")
out = run_tshark(os.path.join(p['logdir'], "hwsim0.pcapng"),
"wlan.da == ff:ff:ff:ff:ff:01",
["wlan.fc.protected"])
if out is not None:
lines = out.splitlines()
if len(lines) < 1:
raise Exception("second frame not observed")
state = 1
for l in lines:
is_protected = int(l, 16)
if is_protected != 1:
state = 0
if state != 1:
raise Exception("Broadcast packets were not encrypted when station was connected")
else:
raise Exception("second frame not observed")
dev[0].request("DISCONNECT")
dev[0].wait_disconnected()
finally:
subprocess.call(['ip', 'link', 'set', 'dev', 'brvlan1', 'down'])
subprocess.call(['ip', 'link', 'set', 'dev', 'wlan3.1', 'down'],
stderr=open('/dev/null', 'w'))
subprocess.call(['brctl', 'delif', 'brvlan1', 'wlan3.1'],
stderr=open('/dev/null', 'w'))
subprocess.call(['brctl', 'delbr', 'brvlan1'])
开发者ID:cococorp,项目名称:hostap-upstream,代码行数:80,代码来源:test_ap_vlan.py
示例18: xrange
#!/usr/bin/python
import time
import runspark
import tshark
spark_dir = '/home/zyang/spark/'
tasklogs_dir = '%s/tasklogs/' % spark_dir
slaves_file = '%s/conf/slaves' % spark_dir
tshark.run_tshark(slaves_file)
st = time.time()
print "Run Exp @%s" % st
process_list = []
process_list.append(runspark.run_wc_21(spark_dir))
process_list.append(runspark.run_wc_40(spark_dir))
process_list.append(runspark.run_wiki_13(spark_dir))
process_list.append(runspark.run_wiki_26(spark_dir))
for i in xrange(len(process_list)):
ret_code = process_list[i].wait()
print "ret_code %d = " % i, ret_code
et = time.time()
print "task completed"
开发者ID:anzigly,项目名称:pycoflow,代码行数:31,代码来源:run-exp.py
示例19: run_tshark
pass
else:
raise
if not try_other:
found = False
for pkt in out.splitlines():
sa, da, key_info = pkt.split('\t')
if key_info != '':
found = True
break
if not found:
try_other = True
if try_other:
out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
"eapol.type == 3",
display=["wlan.sa", "wlan.da",
"eapol.keydes.key_info"],
wait=False)
smk = [ False, False, False, False, False ]
stk = [ False, False, False, False ]
for pkt in out.splitlines():
sa, da, key_info = pkt.split('\t')
key_info = int(key_info, 16)
if sa == addr0 and da == bssid and key_info == 0x2b02:
# Initiator -> AP: MIC+Secure+Request+SMK = SMK 1
smk[0] = True
elif sa == bssid and da == addr1 and key_info == 0x2382:
# AP -> Responder: ACK+MIC+Secure+SMK = SMK 2
smk[1] = True
开发者ID:gxk,项目名称:hostap,代码行数:31,代码来源:test_peerkey.py
注:本文中的tshark.run_tshark函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论