本文整理汇总了Python中util.ppp函数的典型用法代码示例。如果您正苦于以下问题:Python ppp函数的具体用法?Python ppp怎么用?Python ppp使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了ppp函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: verify_tunneled_l2o4
def verify_tunneled_l2o4(self, src_if, capture, sent,
tunnel_src, tunnel_dst):
self.assertEqual(len(capture), len(sent))
for i in range(len(capture)):
try:
tx = sent[i]
rx = capture[i]
tx_ip = tx[IP]
rx_ip = rx[IP]
self.assertEqual(rx_ip.src, tunnel_src)
self.assertEqual(rx_ip.dst, tunnel_dst)
rx_gre = rx[GRE]
rx_l2 = rx_gre[Ether]
rx_ip = rx_l2[IP]
tx_gre = tx[GRE]
tx_l2 = tx_gre[Ether]
tx_ip = tx_l2[IP]
self.assertEqual(rx_ip.src, tx_ip.src)
self.assertEqual(rx_ip.dst, tx_ip.dst)
# bridged, not L3 forwarded, so no TTL decrement
self.assertEqual(rx_ip.ttl, tx_ip.ttl)
except:
self.logger.error(ppp("Rx:", rx))
self.logger.error(ppp("Tx:", tx))
raise
开发者ID:chrisy,项目名称:vpp,代码行数:31,代码来源:test_gre.py
示例2: verify_capture
def verify_capture(self, capture):
"""Verify captured packet stream.
:param list capture: Captured packet stream.
"""
info = None
seen = set()
for packet in capture:
try:
self.logger.debug(ppp("Got packet:", packet))
ip = packet[IP]
icmp = packet[ICMP]
payload_info = self.payload_to_info(packet[Raw])
packet_index = payload_info.index
if packet_index in seen:
raise Exception(ppp("Duplicate packet received", packet))
seen.add(packet_index)
self.assertEqual(payload_info.dst, self.src_dst_if.sw_if_index)
info = self._packet_infos[packet_index]
self.assertIsNotNone(info)
self.assertEqual(packet_index, info.index)
saved_packet = info.data
self.assertEqual(ip.src, saved_packet[IP].dst)
self.assertEqual(ip.dst, saved_packet[IP].src)
self.assertEqual(icmp.type, 0) # echo reply
self.assertEqual(icmp.id, saved_packet[ICMP].id)
self.assertEqual(icmp.payload, saved_packet[ICMP].payload)
except Exception:
self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
for index in self._packet_infos:
self.assertIn(index, seen,
"Packet with packet_index %d not received" % index)
开发者ID:chrisy,项目名称:vpp,代码行数:33,代码来源:test_reassembly.py
示例3: verify_tunneled_4o4
def verify_tunneled_4o4(self, src_if, capture, sent,
tunnel_src, tunnel_dst):
self.assertEqual(len(capture), len(sent))
for i in range(len(capture)):
try:
tx = sent[i]
rx = capture[i]
tx_ip = tx[IP]
rx_ip = rx[IP]
self.assertEqual(rx_ip.src, tunnel_src)
self.assertEqual(rx_ip.dst, tunnel_dst)
rx_gre = rx[GRE]
rx_ip = rx_gre[IP]
self.assertEqual(rx_ip.src, tx_ip.src)
self.assertEqual(rx_ip.dst, tx_ip.dst)
# IP processing post pop has decremented the TTL
self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
except:
self.logger.error(ppp("Rx:", rx))
self.logger.error(ppp("Tx:", tx))
raise
开发者ID:chrisy,项目名称:vpp,代码行数:28,代码来源:test_gre.py
示例4: verify_tunneled_6o4
def verify_tunneled_6o4(self, src_if, capture, sent,
tunnel_src, tunnel_dst):
self.assertEqual(len(capture), len(sent))
for i in range(len(capture)):
try:
tx = sent[i]
rx = capture[i]
rx_ip = rx[IP]
self.assertEqual(rx_ip.src, tunnel_src)
self.assertEqual(rx_ip.dst, tunnel_dst)
rx_gre = GRE(scapy.compat.raw(rx_ip[IP].payload))
rx_ip = rx_gre[IPv6]
tx_ip = tx[IPv6]
self.assertEqual(rx_ip.src, tx_ip.src)
self.assertEqual(rx_ip.dst, tx_ip.dst)
except:
self.logger.error(ppp("Rx:", rx))
self.logger.error(ppp("Tx:", tx))
raise
开发者ID:chrisy,项目名称:vpp,代码行数:26,代码来源:test_gre.py
示例5: verify_tun_44
def verify_tun_44(self, p, count=1):
self.vapi.cli("clear errors")
try:
config_tun_params(p, self.encryption_type, self.tun_if)
send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
src=p.remote_tun_if_host,
dst=self.pg1.remote_ip4,
count=count)
recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
for recv_pkt in recv_pkts:
self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host)
self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
self.assert_packet_checksums_valid(recv_pkt)
send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
dst=p.remote_tun_if_host, count=count)
recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
for recv_pkt in recv_pkts:
try:
decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
if not decrypt_pkt.haslayer(IP):
decrypt_pkt = IP(decrypt_pkt[Raw].load)
self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
self.assert_packet_checksums_valid(decrypt_pkt)
except:
self.logger.debug(ppp("Unexpected packet:", recv_pkt))
try:
self.logger.debug(
ppp("Decrypted packet:", decrypt_pkt))
except:
pass
raise
finally:
self.logger.info(self.vapi.ppcli("show error"))
self.logger.info(self.vapi.ppcli("show ipsec"))
if (hasattr(p, "spd_policy_in_any")):
pkts = p.spd_policy_in_any.get_stats()['packets']
self.assertEqual(pkts, count,
"incorrect SPD any policy: expected %d != %d" %
(count, pkts))
if (hasattr(p, "tun_sa_in")):
pkts = p.tun_sa_in.get_stats()['packets']
self.assertEqual(pkts, count,
"incorrect SA in counts: expected %d != %d" %
(count, pkts))
pkts = p.tun_sa_out.get_stats()['packets']
self.assertEqual(pkts, count,
"incorrect SA out counts: expected %d != %d" %
(count, pkts))
self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count)
self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
开发者ID:vpp-dev,项目名称:vpp,代码行数:54,代码来源:template_ipsec.py
示例6: verify_capture
def verify_capture(self, src_loc, dst_loc, capture):
"""
Verify captured packet
:param src_loc: source locator address
:param dst_loc: destination locator address
:param capture: list of captured packets
"""
self.test.assertEqual(len(capture), 1, "Unexpected number of "
"packets! Expected 1 but {} received"
.format(len(capture)))
packet = capture[0]
try:
ip_hdr = packet[IP]
# assert the values match
self.test.assertEqual(ip_hdr.src, src_loc, "IP source address")
self.test.assertEqual(ip_hdr.dst, dst_loc,
"IP destination address")
gpe_hdr = packet[LISP_GPE_Header]
self.test.assertEqual(gpe_hdr.next_proto, 1,
"next_proto is not ipv4!")
ih = gpe_hdr[IP]
self.test.assertEqual(ih.src, self.test.pg0.remote_ip4,
"unexpected source EID!")
self.test.assertEqual(ih.dst, self.test.deid_ip4,
"unexpected dest EID!")
except:
self.test.logger.error(ppp("Unexpected or invalid packet:",
packet))
raise
开发者ID:chrisy,项目名称:vpp,代码行数:30,代码来源:test_lisp.py
示例7: test_tra_basic
def test_tra_basic(self, count=1):
""" ipsec v4 transport basic test """
self.vapi.cli("clear errors")
try:
p = self.params[socket.AF_INET]
send_pkts = self.gen_encrypt_pkts(p.scapy_tra_sa, self.tra_if,
src=self.tra_if.remote_ip4,
dst=self.tra_if.local_ip4,
count=count)
recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
self.tra_if)
for rx in recv_pkts:
try:
decrypted = p.vpp_tra_sa.decrypt(rx[IP])
self.assert_packet_checksums_valid(decrypted)
except:
self.logger.debug(ppp("Unexpected packet:", rx))
raise
finally:
self.logger.info(self.vapi.ppcli("show error"))
self.logger.info(self.vapi.ppcli("show ipsec"))
pkts = p.tra_sa_in.get_stats()['packets']
self.assertEqual(pkts, count,
"incorrect SA in counts: expected %d != %d" %
(count, pkts))
pkts = p.tra_sa_out.get_stats()['packets']
self.assertEqual(pkts, count,
"incorrect SA out counts: expected %d != %d" %
(count, pkts))
self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count)
开发者ID:vpp-dev,项目名称:vpp,代码行数:33,代码来源:template_ipsec.py
示例8: verify_capture_plain
def verify_capture_plain(self, capture):
for packet in capture:
try:
self.assert_packet_checksums_valid(packet)
self.assert_equal(packet[IP].src, self.tun_if.remote_ip4,
"decrypted packet source address")
self.assert_equal(packet[IP].dst, self.pg1.remote_ip4,
"decrypted packet destination address")
if packet.haslayer(TCP):
self.assertFalse(
packet.haslayer(UDP),
"unexpected UDP header in decrypted packet")
self.assert_equal(packet[TCP].dport, self.tcp_port_in,
"decrypted packet TCP destination port")
elif packet.haslayer(UDP):
if packet[UDP].payload:
self.assertFalse(
packet[UDP][1].haslayer(UDP),
"unexpected UDP header in decrypted packet")
self.assert_equal(packet[UDP].dport, self.udp_port_in,
"decrypted packet UDP destination port")
else:
self.assertFalse(
packet.haslayer(UDP),
"unexpected UDP header in decrypted packet")
self.assert_equal(packet[ICMP].id, self.icmp_id_in,
"decrypted packet ICMP ID")
except Exception:
self.logger.error(
ppp("Unexpected or invalid plain packet:", packet))
raise
开发者ID:chrisy,项目名称:vpp,代码行数:31,代码来源:test_ipsec_nat.py
示例9: verify_tra_basic6
def verify_tra_basic6(self, count=1):
self.vapi.cli("clear errors")
try:
p = self.params[socket.AF_INET6]
send_pkts = self.gen_encrypt_pkts6(p.scapy_tra_sa, self.tra_if,
src=self.tra_if.remote_ip6,
dst=self.tra_if.local_ip6,
count=count)
recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
self.tra_if)
for rx in recv_pkts:
self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
rx[IPv6].plen)
try:
decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
self.assert_packet_checksums_valid(decrypted)
except:
self.logger.debug(ppp("Unexpected packet:", rx))
raise
finally:
self.logger.info(self.vapi.ppcli("show error"))
self.logger.info(self.vapi.ppcli("show ipsec"))
pkts = p.tra_sa_in.get_stats()['packets']
self.assertEqual(pkts, count,
"incorrect SA in counts: expected %d != %d" %
(count, pkts))
pkts = p.tra_sa_out.get_stats()['packets']
self.assertEqual(pkts, count,
"incorrect SA out counts: expected %d != %d" %
(count, pkts))
self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
开发者ID:chrisy,项目名称:vpp,代码行数:33,代码来源:template_ipsec.py
示例10: verify_decapped_pppoe
def verify_decapped_pppoe(self, src_if, capture, sent):
self.assertEqual(len(capture), len(sent))
for i in range(len(capture)):
try:
tx = sent[i]
rx = capture[i]
tx_ip = tx[IP]
rx_ip = rx[IP]
self.assertEqual(rx_ip.src, tx_ip.src)
self.assertEqual(rx_ip.dst, tx_ip.dst)
except:
self.logger.error(ppp("Rx:", rx))
self.logger.error(ppp("Tx:", tx))
raise
开发者ID:chrisy,项目名称:vpp,代码行数:18,代码来源:test_pppoe.py
示例11: resolve_ndp
def resolve_ndp(self, pg_interface=None, timeout=1):
"""Resolve NDP using provided packet-generator interface
:param pg_interface: interface used to resolve, if None then this
interface is used
:param timeout: how long to wait for response before giving up
"""
if pg_interface is None:
pg_interface = self
self.test.logger.info("Sending NDP request for %s on port %s" %
(self.local_ip6, pg_interface.name))
ndp_req = self.create_ndp_req()
pg_interface.add_stream(ndp_req)
pg_interface.enable_capture()
self.test.pg_start()
now = time.time()
deadline = now + timeout
# Enabling IPv6 on an interface can generate more than the
# ND reply we are looking for (namely MLD). So loop through
# the replies to look for want we want.
while now < deadline:
try:
captured_packet = pg_interface.wait_for_packet(
deadline - now, filter_out_fn=None)
except:
self.test.logger.error(
"Timeout while waiting for NDP response")
raise
ndp_reply = captured_packet.copy() # keep original for exception
# Make Dot1AD packet content recognizable to scapy
if ndp_reply.type == 0x88a8:
self._test.logger.info(
"Replacing EtherType: 0x88a8 with "
"0x8100 and regenerating Ethernet header. ")
ndp_reply.type = 0x8100
ndp_reply = Ether(scapy.compat.raw(ndp_reply))
try:
ndp_na = ndp_reply[ICMPv6ND_NA]
opt = ndp_na[ICMPv6NDOptDstLLAddr]
self.test.logger.info("VPP %s MAC address is %s " %
(self.name, opt.lladdr))
self._local_mac = opt.lladdr
self.test.logger.debug(self.test.vapi.cli("show trace"))
# we now have the MAC we've been after
return
except:
self.test.logger.info(
ppp("Unexpected response to NDP request:",
captured_packet))
now = time.time()
self.test.logger.debug(self.test.vapi.cli("show trace"))
raise Exception("Timeout while waiting for NDP response")
开发者ID:chrisy,项目名称:vpp,代码行数:54,代码来源:vpp_pg_interface.py
示例12: verify_encrypted
def verify_encrypted(self, p, sa, rxs):
for rx in rxs:
try:
pkt = sa.decrypt(rx[IP])
if not pkt.haslayer(IP):
pkt = IP(pkt[Raw].load)
self.assert_packet_checksums_valid(pkt)
self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
self.assertTrue(pkt.haslayer(GRE))
e = pkt[Ether]
self.assertEqual(e[Ether].dst, self.omac)
self.assertEqual(e[IP].dst, "1.1.1.2")
except (IndexError, AssertionError):
self.logger.debug(ppp("Unexpected packet:", rx))
try:
self.logger.debug(ppp("Decrypted packet:", pkt))
except:
pass
raise
开发者ID:chrisy,项目名称:vpp,代码行数:20,代码来源:test_ipsec_tun_if_esp.py
示例13: verify_capture
def verify_capture(self, pg_if, capture):
"""
Verify captured input packet stream for defined interface.
:param object pg_if: Interface to verify captured packet stream for.
:param list capture: Captured packet stream.
"""
last_info = dict()
for i in self.pg_interfaces:
last_info[i.sw_if_index] = None
dst_sw_if_index = pg_if.sw_if_index
for packet in capture:
payload_info = self.payload_to_info(packet[Raw])
src_sw_if_index = payload_info.src
src_if = None
for ifc in self.pg_interfaces:
if ifc != pg_if:
if ifc.sw_if_index == src_sw_if_index:
src_if = ifc
break
if hasattr(src_if, 'sub_if'):
# Check VLAN tags and Ethernet header
packet = src_if.sub_if.remove_dot1_layer(packet)
self.assertTrue(Dot1Q not in packet)
try:
ip = packet[IP]
udp = packet[UDP]
packet_index = payload_info.index
self.assertEqual(payload_info.dst, dst_sw_if_index)
self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
(pg_if.name, payload_info.src, packet_index))
next_info = self.get_next_packet_info_for_interface2(
payload_info.src, dst_sw_if_index,
last_info[payload_info.src])
last_info[payload_info.src] = next_info
self.assertTrue(next_info is not None)
self.assertEqual(packet_index, next_info.index)
saved_packet = next_info.data
# Check standard fields
self.assertEqual(ip.src, saved_packet[IP].src)
self.assertEqual(ip.dst, saved_packet[IP].dst)
self.assertEqual(udp.sport, saved_packet[UDP].sport)
self.assertEqual(udp.dport, saved_packet[UDP].dport)
except:
self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
for i in self.pg_interfaces:
remaining_packet = self.get_next_packet_info_for_interface2(
i, dst_sw_if_index, last_info[i.sw_if_index])
self.assertTrue(
remaining_packet is None,
"Port %u: Packet expected from source %u didn't arrive" %
(dst_sw_if_index, i.sw_if_index))
开发者ID:chrisy,项目名称:vpp,代码行数:53,代码来源:test_l2bd.py
示例14: verify_tun_66
def verify_tun_66(self, p, count=1):
""" ipsec 6o6 tunnel basic test """
self.vapi.cli("clear errors")
try:
config_tun_params(p, self.encryption_type, self.tun_if)
send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
src=p.remote_tun_if_host,
dst=self.pg1.remote_ip6,
count=count)
recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
for recv_pkt in recv_pkts:
self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host)
self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
self.assert_packet_checksums_valid(recv_pkt)
send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
dst=p.remote_tun_if_host,
count=count)
recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
for recv_pkt in recv_pkts:
self.assertEqual(len(recv_pkt) - len(Ether()) - len(IPv6()),
recv_pkt[IPv6].plen)
try:
decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
if not decrypt_pkt.haslayer(IPv6):
decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
self.assert_packet_checksums_valid(decrypt_pkt)
except:
self.logger.debug(ppp("Unexpected packet:", recv_pkt))
try:
self.logger.debug(
ppp("Decrypted packet:", decrypt_pkt))
except:
pass
raise
finally:
self.logger.info(self.vapi.ppcli("show error"))
self.logger.info(self.vapi.ppcli("show ipsec"))
self.verify_counters(p, count)
开发者ID:chrisy,项目名称:vpp,代码行数:40,代码来源:template_ipsec.py
示例15: verify_encrypted
def verify_encrypted(self, p, sa, rxs):
decrypt_pkts = []
for rx in rxs:
self.assert_packet_checksums_valid(rx)
self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
try:
decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IP])
if not decrypt_pkt.haslayer(IP):
decrypt_pkt = IP(decrypt_pkt[Raw].load)
decrypt_pkts.append(decrypt_pkt)
self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
except:
self.logger.debug(ppp("Unexpected packet:", rx))
try:
self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
except:
pass
raise
pkts = reassemble4(decrypt_pkts)
for pkt in pkts:
self.assert_packet_checksums_valid(pkt)
开发者ID:chrisy,项目名称:vpp,代码行数:22,代码来源:template_ipsec.py
示例16: verify_tcp_checksum
def verify_tcp_checksum(self):
self.vapi.cli("test http server")
p = self.params[socket.AF_INET]
config_tun_params(p, self.encryption_type, self.tun_if)
send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
dst=self.tun_if.local_ip4) /
TCP(flags='S', dport=80)))
self.logger.debug(ppp("Sending packet:", send))
recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
recv = recv[0]
decrypted = p.vpp_tun_sa.decrypt(recv[IP])
self.assert_packet_checksums_valid(decrypted)
开发者ID:chrisy,项目名称:vpp,代码行数:13,代码来源:template_ipsec.py
示例17: verify_tunneled_vlano4
def verify_tunneled_vlano4(self, src_if, capture, sent,
tunnel_src, tunnel_dst, vlan):
try:
self.assertEqual(len(capture), len(sent))
except:
ppc("Unexpected packets captured:", capture)
raise
for i in range(len(capture)):
try:
tx = sent[i]
rx = capture[i]
tx_ip = tx[IP]
rx_ip = rx[IP]
self.assertEqual(rx_ip.src, tunnel_src)
self.assertEqual(rx_ip.dst, tunnel_dst)
rx_gre = rx[GRE]
rx_l2 = rx_gre[Ether]
rx_vlan = rx_l2[Dot1Q]
rx_ip = rx_l2[IP]
self.assertEqual(rx_vlan.vlan, vlan)
tx_gre = tx[GRE]
tx_l2 = tx_gre[Ether]
tx_ip = tx_l2[IP]
self.assertEqual(rx_ip.src, tx_ip.src)
self.assertEqual(rx_ip.dst, tx_ip.dst)
# bridged, not L3 forwarded, so no TTL decrement
self.assertEqual(rx_ip.ttl, tx_ip.ttl)
except:
self.logger.error(ppp("Rx:", rx))
self.logger.error(ppp("Tx:", tx))
raise
开发者ID:chrisy,项目名称:vpp,代码行数:39,代码来源:test_gre.py
示例18: verify_capture
def verify_capture(self, rx_if, capture, ip_l=IP):
"""Verify captured input packet stream for defined interface.
:param VppInterface rx_if: Interface to verify captured packet stream.
:param list capture: Captured packet stream.
:param Scapy ip_l: Required IP layer - IP or IPv6. (Default is IP.)
"""
self.logger.info("Verifying capture on interface %s" % rx_if.name)
count = 0
host_counters = {}
for host_mac in rx_if._hosts_by_mac:
host_counters[host_mac] = 0
for packet in capture:
try:
ip_received = packet[ip_l]
payload_info = self.payload_to_info(packet[Raw])
packet_index = payload_info.index
ip_sent = self._packet_infos[packet_index].data[ip_l]
self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
(rx_if.name, payload_info.src, packet_index))
# Check standard fields
self.assertIn(packet.dst, rx_if._hosts_by_mac,
"Destination MAC address %s shouldn't be routed "
"via interface %s" % (packet.dst, rx_if.name))
self.assertEqual(packet.src, rx_if.local_mac)
self.assertEqual(ip_received.src, ip_sent.src)
self.assertEqual(ip_received.dst, ip_sent.dst)
host_counters[packet.dst] += 1
self._packet_infos.pop(packet_index)
except:
self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
# We expect packet routed via all host of pg interface
for host_mac in host_counters:
nr = host_counters[host_mac]
self.assertNotEqual(
nr, 0, "No packet routed via host %s" % host_mac)
self.logger.info("%u packets routed via host %s of %s interface" %
(nr, host_mac, rx_if.name))
count += nr
self.logger.info("Total amount of %u packets routed via %s interface" %
(count, rx_if.name))
return count
开发者ID:chrisy,项目名称:vpp,代码行数:48,代码来源:test_ip_ecmp.py
示例19: verify_capture
def verify_capture(self, dst_if, capture, proto_l=UDP):
"""Verify captured input packet stream for defined interface.
:param VppInterface dst_if: Interface to verify captured packet stream.
:param list capture: Captured packet stream.
:param Scapy proto_l: Required IP protocol. Default protocol is UDP.
"""
self.logger.info("Verifying capture on interface %s" % dst_if.name)
last_info = dict()
for i in self.interfaces:
last_info[i.sw_if_index] = None
dst_sw_if_index = dst_if.sw_if_index
for packet in capture:
try:
ip_received = packet[IP]
proto_received = packet[proto_l]
payload_info = self.payload_to_info(packet[Raw])
packet_index = payload_info.index
self.assertEqual(payload_info.dst, dst_sw_if_index)
self.logger.debug(
"Got packet on port %s: src=%u (id=%u)" %
(dst_if.name, payload_info.src, packet_index))
next_info = self.get_next_packet_info_for_interface2(
payload_info.src, dst_sw_if_index,
last_info[payload_info.src])
last_info[payload_info.src] = next_info
self.assertTrue(next_info is not None)
self.assertEqual(packet_index, next_info.index)
saved_packet = next_info.data
ip_saved = saved_packet[IP]
proto_saved = saved_packet[proto_l]
# Check standard fields
self.assertEqual(ip_received.src, ip_saved.src)
self.assertEqual(ip_received.dst, ip_saved.dst)
self.assertEqual(proto_received.sport, proto_saved.sport)
self.assertEqual(proto_received.dport, proto_saved.dport)
except:
self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
for i in self.interfaces:
remaining_packet = self.get_next_packet_info_for_interface2(
i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
self.assertTrue(remaining_packet is None,
"Interface %s: Packet expected from interface %s "
"didn't arrive" % (dst_if.name, i.name))
开发者ID:vpp-dev,项目名称:vpp,代码行数:45,代码来源:test_classifier.py
示例20: create_stream
def create_stream(self, src_if, dst_if, packet_header, packet_sizes,
count):
"""Create SRv6 input packet stream for defined interface.
:param VppInterface src_if: Interface to create packet stream for
:param VppInterface dst_if: destination interface of packet stream
:param packet_header: Layer3 scapy packet headers,
L2 is added when not provided,
Raw(payload) with packet_info is added
:param list packet_sizes: packet stream pckt sizes,sequentially applied
to packets in stream have
:param int count: number of packets in packet stream
:return: list of packets
"""
self.logger.info("Creating packets")
pkts = []
for i in range(0, count-1):
payload_info = self.create_packet_info(src_if, dst_if)
self.logger.debug(
"Creating packet with index %d" % (payload_info.index))
payload = self.info_to_payload(payload_info)
# add L2 header if not yet provided in packet_header
if packet_header.getlayer(0).name == 'Ethernet':
p = (packet_header /
Raw(payload))
else:
p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
packet_header /
Raw(payload))
size = packet_sizes[i % len(packet_sizes)]
self.logger.debug("Packet size %d" % (size))
self.extend_packet(p, size)
# we need to store the packet with the automatic fields computed
# read back the dumped packet (with str())
# to force computing these fields
# probably other ways are possible
p = Ether(scapy.compat.raw(p))
payload_info.data = p.copy()
self.logger.debug(ppp("Created packet:", p))
pkts.append(p)
self.logger.info("Done creating packets")
return pkts
开发者ID:chrisy,项目名称:vpp,代码行数:42,代码来源:test_srv6_as.py
注:本文中的util.ppp函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论