本文整理汇总了Python中testutils.simple_tcp_packet函数的典型用法代码示例。如果您正苦于以下问题:Python simple_tcp_packet函数的具体用法?Python simple_tcp_packet怎么用?Python simple_tcp_packet使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了simple_tcp_packet函数的19个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: runTest
def runTest(self):
self.clear_switch()
group_add_msg = \
create_group_mod_msg(ofp.OFPGC_ADD, ofp.OFPGT_INDIRECT, group_id = 1, buckets = [
create_bucket(0, 0, 0, [
create_action(action = ofp.OFPAT_SET_TP_SRC, tp_port = 2000),
create_action(action = ofp.OFPAT_OUTPUT, port = 2)
])
])
self.send_ctrl_exp_noerror(group_add_msg, 'group add')
packet_in = testutils.simple_tcp_packet(tcp_sport=1000)
packet_out = testutils.simple_tcp_packet(tcp_sport=2000)
flow_add_msg = \
create_flow_msg(packet = packet_in, in_port = 1, apply_action_list = [
create_action(action = ofp.OFPAT_GROUP, group_id = 1)
])
self.send_ctrl_exp_noerror(flow_add_msg, 'flow add')
self.send_data(packet_in, 1)
self.recv_data(2, packet_out)
开发者ID:rrdenicol,项目名称:oftest11,代码行数:26,代码来源:groups.py
示例2: runTest
def runTest(self):
table_id = testutils.EX_ACL_TABLE
port_in = exact_port_map.keys()[0]
egr_port = exact_port_map.keys()[1]
#"clear swtich;"
testutils.delete_all_flows(self.controller, self.logger)
#make packet;
pkt = testutils.simple_tcp_packet(dl_src='00:01:02:03:04:05', dl_dst='00:06:07:08:09:0a')
exp_pkt = testutils.simple_tcp_packet(dl_src='00:01:02:03:04:05', dl_dst='aa:aa:aa:aa:aa:aa')
# get match list
match_ls = testutils.packet_to_exact_flow_match(pkt = pkt, table_id = table_id, ing_port = port_in)
act = action.action_set_field()
field = match.eth_dst(parse.parse_mac("aa:aa:aa:aa:aa:aa"))
act.field.add(field)
exact_table_output(self, table_id, match_ls, actions = [act], egr_port = egr_port)
testutils.do_barrier(self.controller)
"send a packet from port_in "
self.dataplane.send(port_in, str(pkt))
"poll from the egr_port port"
(port_rec, pkt_rec, _) = self.dataplane.poll(port_number=egr_port, timeout=1)
self.assertTrue(pkt is not None,"rec none packets")
self.assertEqual(str(exp_pkt), str(pkt_rec), 'retruned pkt not equal to the original pkt')
开发者ID:HuaweiSwitch,项目名称:OpenFlow,代码行数:26,代码来源:exact_match.py
示例3: runTest
def runTest(self):
of_ports = pa_port_map.keys()
of_ports.sort()
ing_port = of_ports[0]
egr_port = of_ports[3]
# Remove all entries Add entry match all
rc = testutils.delete_all_flows(self.controller, pa_logger)
self.assertEqual(rc, 0, "Failed to delete all flows")
# Add entry match all
flow_match = "wildcards=none,dl_src_mask=ff:ff:ff:ff:ff:ff,dl_dst_mask=ff:ff:ff:ff:ff:ff,nw_src_mask=255.255.255.255,nw_dst_mask=255.255.255.255,meta_mask=0xffffffffffffffff" #, -in_port,in_port=1,dl_type=2048in_port=0 wildcards=0xffff
flow_acts = "apply:output=" + str(egr_port)
rc = nxm_send_flow_mod_add(flow_match,flow_acts,pa_logger)
self.assertEqual(rc, 0, "Failed to add flow entry")
#Send packet
pkt = testutils.simple_tcp_packet()
pa_logger.info("Sending IPv4 packet to " + str(ing_port))
pa_logger.debug("Data: " + str(pkt).encode('hex'))
self.dataplane.send(ing_port, str(pkt))
#Receive packet
exp_pkt = testutils.simple_tcp_packet()
testutils.receive_pkt_verify(self, egr_port, exp_pkt)
#See flow match
request_flow_stats()
#Remove flows
#rc = testutils.delete_all_flows(self.controller, pa_logger)
rc = nxm_delete_all_flows()
self.assertEqual(rc, 0, "Failed to delete all flows")
开发者ID:rrdenicol,项目名称:oftest11,代码行数:34,代码来源:ipv6.py
示例4: runTest
def runTest(self):
self.clear_switch()
group_add_msg2 = \
create_group_mod_msg(ofp.OFPGC_ADD, ofp.OFPGT_ALL, group_id = 2, buckets = [
create_bucket(0, 0, 0, [
create_action(action = ofp.OFPAT_SET_FIELD, tcp_sport = 2000),
create_action(action = ofp.OFPAT_OUTPUT, port = 2)
])
])
self.send_ctrl_exp_noerror(group_add_msg2, 'group add')
group_add_msg1 = \
create_group_mod_msg(ofp.OFPGC_ADD, ofp.OFPGT_ALL, group_id = 1, buckets = [
create_bucket(0, 0, 0, [
create_action(action = ofp.OFPAT_GROUP, group_id = 2),
])
])
self.send_ctrl_exp_noerror(group_add_msg1, 'group add')
packet_in = testutils.simple_tcp_packet(tcp_sport=1000)
packet_out = testutils.simple_tcp_packet(tcp_sport=2000)
flow_add_msg = \
testutils.flow_msg_create(self,packet_in,ing_port = 1,action_list = [
create_action(action = ofp.OFPAT_GROUP, group_id = 1)
])
self.send_ctrl_exp_noerror(flow_add_msg, 'flow add')
self.send_data(packet_in, 1)
self.recv_data(2, packet_out)
开发者ID:CPqD,项目名称:oftest12,代码行数:35,代码来源:groups.py
示例5: runTest
def runTest(self):
of_ports = pa_port_map.keys()
of_ports.sort()
ing_port = of_ports[0]
egr_port = of_ports[3]
# Remove all entries Add entry match all
rc = dpctl.oxm_delete_all_flows()
self.assertEqual(rc, 0, "Failed to delete all flows")
# Add entry match all
flow_match = "dl_type=0x0800,nw_src=192.168.0.1"
flow_acts = "apply:output=" + str(egr_port)
rc = dpctl.oxm_send_flow_mod_add(flow_match,flow_acts,pa_logger)
self.assertEqual(rc, 0, "Failed to add flow entry")
#Send packet
pkt = testutils.simple_tcp_packet()
pa_logger.info("Sending IPv4 packet to " + str(ing_port))
pa_logger.debug("Data: " + str(pkt).encode('hex'))
self.dataplane.send(ing_port, str(pkt))
#Receive packet
exp_pkt = testutils.simple_tcp_packet()
testutils.receive_pkt_verify(self, egr_port, exp_pkt)
#See flow match
dpctl.request_flow_stats()
#Remove flows
rc = dpctl.oxm_delete_all_flows()
self.assertEqual(rc, 0, "Failed to delete all flows")
开发者ID:rrdenicol,项目名称:oftest12,代码行数:33,代码来源:dpctltests.py
示例6: runTest
def runTest(self):
of_ports = testutils.clear_switch(self, pa_port_map.keys(), pa_logger)
# For making the test simpler...
ing_port = of_ports[0]
egr_port = of_ports[1]
check_expire_tbl0 = False
check_expire_tbl1 = False
# Build the ingress packet
pkt = testutils.simple_tcp_packet(**self.base_pkt_params)
# Set action for the first table
for item_tbl0 in self.start_pkt_params:
tbl0_pkt_params = self.base_pkt_params.copy()
tbl0_pkt_params[item_tbl0] = self.mod_pkt_params[item_tbl0]
act = testutils.action_generate(self, item_tbl0, tbl0_pkt_params)
action_list = [act]
inst_1 = instruction.instruction_apply_actions()
inst_2 = instruction.instruction_goto_table()
inst_2.table_id = 1
inst_list = [inst_1, inst_2]
request0 = testutils.flow_msg_create(self, pkt,
ing_port=ing_port,
instruction_list=inst_list,
action_list=action_list,
check_expire=check_expire_tbl0,
table_id=0)
exp_pkt = testutils.simple_tcp_packet(**tbl0_pkt_params)
request1 = testutils.flow_msg_create(self, exp_pkt,
ing_port=ing_port,
check_expire=check_expire_tbl1,
table_id=1,
egr_port=egr_port)
# Insert two flows
self.logger.debug("Inserting flows: Modify-field: " + item_tbl0)
testutils.flow_msg_install(self, request0)
testutils.flow_msg_install(self, request1)
# Send pkt
self.logger.debug("Send packet: " + str(ing_port) +
" to " + str(egr_port))
self.dataplane.send(ing_port, str(pkt))
#@todo Not all HW supports both pkt and byte counters
#@todo We shouldn't expect the order of coming response..
if check_expire_tbl0:
flow_removed_verify(self, request0, pkt_count=1,
byte_count=pktlen)
if check_expire_tbl1:
flow_removed_verify(self, request1, pkt_count=1,
byte_count=exp_pktlen)
# Receive and verify pkt
testutils.receive_pkt_verify(self, egr_port, exp_pkt)
开发者ID:chesteve,项目名称:oftest11,代码行数:58,代码来源:multi-table.py
示例7: runTest
def runTest(self):
#async_logger.info("Running Async_NoPacketIn")
#verifying without set_async_request, switch will packet in
#step 1-1:clear all flow entries for unmatching
of_ports = testutils.clear_switch(self, async_port_map.keys(), async_logger)
#step 2-1:controller sends set_async_request msg
async_logger.info("Sending set_async_request")
mask = 1 << ofp.OFPR_NO_MATCH
request_set = create_set_async(pkt_in_mstr = mask)
#print(request_set.show())
set_async_verify(self, request_set)
#result 2-1: contrller sends msg successfully
#no match default deal:drop; add flow entry to packet_in
#step 3-1: install default mismatch flow entry ,action=packetin;
testutils.set_table_config(self, config = ofp.OFPTC_TABLE_MISS_CONTROLLER)
#send data to port
for of_port in of_ports:
async_logger.info("PKT IN test, port " + str(of_port))
pkt = testutils.simple_tcp_packet()
self.dataplane.send(of_port, str(pkt))
#@todo Check for unexpected messages?
testutils.packetin_verify(self, pkt)
#print(response)
#"verifying with set_async_request, switch will packet in"
#step 1-2:clear all flow entries for unmatching
rc = testutils.clear_switch(self, async_port_map.keys(), async_logger)
#step 2-2:controller sends set_async_request msg
async_logger.info("Sending set_async_request")
mask = 0xffffffff ^ (1 << ofp.OFPR_NO_MATCH)
request_set = create_set_async(pkt_in_mstr = mask)
set_async_verify(self, request_set)
#print("2-2 request_set"+request_set.show())
#result 2-2: contrller sends msg successfully
#no match default deal:drop; add flow entry to packet_in
#step 3-2: install default mismatch flow entry ,action=packetin;
testutils.set_table_config(self, config = ofp.OFPTC_TABLE_MISS_CONTROLLER)
#(response, _) = self.controller.poll(ofp.OFPT_PACKET_IN, 2)
#send data to port
for of_port in async_port_map.keys():
async_logger.info("PKT IN test, port " + str(of_port))
pkt = testutils.simple_tcp_packet()
self.dataplane.send(of_port, str(pkt))
#@todo Check for unexpected messages?
(response, _) = self.controller.poll(ofp.OFPT_PACKET_IN, 2)
#print(response)
self.assertTrue(response is None, 'Packet in message received unexpected')
msg = create_set_async()
set_async_verify(self, msg)
开发者ID:HuaweiSwitch,项目名称:OpenFlow,代码行数:58,代码来源:async.py
示例8: runTest
def runTest(self):
of_ports = pa_port_map.keys()
of_ports.sort()
self.assertTrue(len(of_ports) > 2, "Not enough ports for test")
# For making the test simpler...
ing_port = of_ports[0]
egr_port = of_ports[1]
pktlen = 104
dl_vlan = 0xa5a # no specific meaning
dl_vlan_pcp=3
pkt = testutils.simple_tcp_packet(pktlen=pktlen,
dl_vlan_enable=True,
dl_vlan=dl_vlan,
dl_vlan_pcp=dl_vlan_pcp)
match = parse.packet_to_flow_match(pkt)
wildcards = 0
exp_pkt = testutils.simple_tcp_packet(pktlen=pktlen,
dl_vlan_enable=True)
# Create parameters for each table
act_list = []
next_avail = []
chk_expire = []
#Table 0
act = action.action_output()
act.port = egr_port
act_list.append([act])
next_avail.append(True)
chk_expire.append(False)
#Table 1
act = action.action_push_vlan()
act.ethertype = ETHERTYPE_VLAN
act_list.append([act])
next_avail.append(True)
chk_expire.append(False)
#Table 2
act = action.action_pop_vlan()
act_list.append([act])
next_avail.append(False)
chk_expire.append(False)
write_action_test_multi_tables(self, ing_port, egr_port,
match = match,
wildcards = wildcards,
act_list = act_list,
next_avail = next_avail,
chk_expire = chk_expire,
pkt = pkt,
exp_pkt = exp_pkt)
开发者ID:chesteve,项目名称:oftest11,代码行数:55,代码来源:multitable_write.py
示例9: runTest
def runTest(self):
of_ports = pa_port_map.keys()
of_ports.sort()
self.assertTrue(len(of_ports) > 2, "Not enough ports for test")
# For making the test simpler...
ing_port = of_ports[0]
egr_port = of_ports[1]
dl_vlan = random.randint(1,0x7ff)
dl_vlan_pcp = random.randint(0,7)
pkt = testutils.simple_tcp_packet(vlan_tags=[{'vid': dl_vlan, 'pcp': dl_vlan_pcp}])
match_ls = parse.packet_to_flow_match(pkt)
wildcards = 0
new_dl_vlan = random.randint(0x800,0xfff)
exp_pkt = testutils.simple_tcp_packet(vlan_tags=[{'type': ETHERTYPE_VLAN, 'vid': new_dl_vlan, 'pcp': dl_vlan_pcp},
{'vid': dl_vlan, 'pcp': dl_vlan_pcp}])
# Create parameters for each table
act_list = []
next_avail = []
chk_expire = []
#Table 0
act = action.action_output()
act.port = egr_port
act_list.append([act])
next_avail.append(True)
chk_expire.append(False)
#Table 1
#act = action.action_set_vlan_vid()
#act.vlan_vid = new_dl_vlan
act = action.action_set_field()
act.field = match.vlan_vid(new_dl_vlan + ofp.OFPVID_PRESENT)
act_list.append([act])
next_avail.append(True)
chk_expire.append(False)
#Table 2
act = action.action_push_vlan()
act.ethertype = ETHERTYPE_VLAN
act_list.append([act])
next_avail.append(False)
chk_expire.append(False)
write_action_test_multi_tables(self, ing_port, egr_port,
match_fields = match_ls,
wildcards = wildcards,
act_list = act_list,
next_avail = next_avail,
chk_expire = chk_expire,
pkt = pkt,
exp_pkt = exp_pkt)
开发者ID:HuaweiSwitch,项目名称:OpenFlow,代码行数:55,代码来源:multitable_write.py
示例10: runTest
def runTest(self):
old_vid = 2
#sup_acts = supported_actions_get(self)
#if not (sup_acts & 1 << ofp.OFPAT_POP_VLAN):
#testutils.skip_message_emit(self, "Strip VLAN tag test")
#return
pkt = testutils.simple_tcp_packet(vlan_tags=[{'vid': old_vid}])
exp_pkt = testutils.simple_tcp_packet()
vid_act = action.action_pop_vlan()
testutils.flow_match_test(self, pa_port_map,pkt=pkt, exp_pkt=exp_pkt,
apply_action_list=[vid_act] , max_test = 10)
开发者ID:HuaweiSwitch,项目名称:OpenFlow,代码行数:12,代码来源:pktact.py
示例11: runTest
def runTest(self):
of_ports = pa_port_map.keys()
of_ports.sort()
self.assertTrue(len(of_ports) > 2, "Not enough ports for test")
# For making the test simpler...
ing_port = of_ports[0]
egr_port = of_ports[1]
dl_vlan = 0xa5a # no specific meaning
dl_vlan_pcp=3
outer_dl_vlan = 0x18F
pkt = testutils.simple_tcp_packet(vlan_tags=[{'type' : ETHERTYPE_VLAN, 'vid': outer_dl_vlan},
{'vid': dl_vlan, 'pcp': dl_vlan_pcp}])
match = parse.packet_to_flow_match(pkt)
wildcards = 0
new_dl_vlan = 0xfed
exp_pkt = testutils.simple_tcp_packet(vlan_tags=[{'vid': new_dl_vlan, 'pcp': dl_vlan_pcp}])
# Create parameters for each table
act_list = []
next_avail = []
chk_expire = []
#Table 0
act = action.action_output()
act.port = egr_port
act_list.append([act])
next_avail.append(True)
chk_expire.append(False)
#Table 1
act = action.action_set_vlan_vid()
act.vlan_vid = new_dl_vlan
act_list.append([act])
next_avail.append(True)
chk_expire.append(False)
#Table 2
act = action.action_pop_vlan()
act_list.append([act])
next_avail.append(False)
chk_expire.append(False)
write_action_test_multi_tables(self, ing_port, egr_port,
match = match,
wildcards = wildcards,
act_list = act_list,
next_avail = next_avail,
chk_expire = chk_expire,
pkt = pkt,
exp_pkt = exp_pkt)
开发者ID:TrafficLab,项目名称:oftest11,代码行数:52,代码来源:multitable_write.py
示例12: runTest
def runTest(self):
of_ports = pa_port_map.keys()
of_ports.sort()
ing_port = of_ports[0]
egr_port = of_ports[2]
self.of_dir = os.path.normpath("../../of11softswitch")
self.ofd = os.path.normpath(self.of_dir + "/udatapath/ofdatapath")
self.dpctl = os.path.normpath(self.of_dir + "/utilities/dpctl")
dpctl_switch = "unix:/tmp/ofd"
# Remove all entries Add entry match all
# sudo ./dpctl unix:/tmp/ofd flow-mod cmd=del,table=0
# flow_cmd1 = "flow-mod"
# flow_cmd2 = "cmd=del,table=0"
# pcall = [self.dpctl, dpctl_switch, flow_cmd1, flow_cmd2]
# subprocess.call(pcall)
rc = testutils.delete_all_flows(self.controller, pa_logger)
self.assertEqual(rc, 0, "Failed to delete all flows")
# Add entry match all
flow_cmd1 = "flow-mod"
flow_cmd2 = "cmd=add,table=0,idle=100"
flow_match = "wildcards=+all,dl_src_mask=FF:FF:FF:FF:FF:FF,dl_dst_mask=FF:FF:FF:FF:FF:FF,nw_src_mask=255.255.255.255,nw_dst_mask=255.255.255.255" #, -in_port,in_port=1,dl_type=2048in_port=0 wildcards=0xffff
flow_acts = "apply:output=" + str(egr_port)
pcall = [self.dpctl, dpctl_switch, flow_cmd1, flow_cmd2, flow_match, flow_acts]
print pcall
subprocess.call(pcall)
#Send packet
pkt = testutils.simple_tcp_packet()
pa_logger.info("Sending IPv4 packet to " + str(ing_port))
pa_logger.debug("Data: " + str(pkt).encode('hex'))
self.dataplane.send(ing_port, str(pkt))
#Receive packet
exp_pkt = testutils.simple_tcp_packet()
testutils.receive_pkt_verify(self, egr_port, exp_pkt)
#See flow match
pa_logger.debug("Request stats-flow")
pcall = [self.dpctl, dpctl_switch, "stats-flow"] #
subprocess.call(pcall)
#Remove flows
rc = testutils.delete_all_flows(self.controller, pa_logger)
self.assertEqual(rc, 0, "Failed to delete all flows")
开发者ID:chesteve,项目名称:oftest11,代码行数:50,代码来源:ipv6.py
示例13: runTest
def runTest(self):
old_vid = 2
new_vid = 3
sup_acts = supported_actions_get(self)
if not (sup_acts & 1 << ofp.OFPAT_SET_VLAN_VID):
testutils.skip_message_emit(self, "Modify VLAN tag test")
return
pkt = testutils.simple_tcp_packet(vlan_tags=[{'vid': old_vid}])
exp_pkt = testutils.simple_tcp_packet(vlan_tags=[{'vid': new_vid}])
vid_act = action.action_set_vlan_vid()
vid_act.vlan_vid = new_vid
testutils.flow_match_test(self, pa_port_map, pkt=pkt, exp_pkt=exp_pkt,
apply_action_list=[vid_act])
开发者ID:TrafficLab,项目名称:oftest11,代码行数:15,代码来源:pktact.py
示例14: runTest
def runTest(self):
ing_port = flow_mods_port_map.keys()[0]
out_port1 = flow_mods_port_map.keys()[1]
out_port2 = flow_mods_port_map.keys()[2]
pkt = testutils.simple_tcp_packet()
testutils.delete_all_flows(self.controller, self.logger)
fm_orig = testutils.flow_msg_create(self, pkt,
ing_port=ing_port,
egr_port=out_port1)
fm_new = testutils.flow_msg_create(self, pkt,
ing_port=ing_port,
egr_port=out_port2)
fm_new.command = ofp.OFPFC_MODIFY_STRICT
rv = self.controller.message_send(fm_orig)
self.assertEqual(rv, 0, "Failed to insert 1st flow_mod")
testutils.do_barrier(self.controller)
rv = self.controller.message_send(fm_new)
testutils.do_barrier(self.controller)
self.assertEqual(rv, 0, "Failed to insert 2nd flow_mod")
flow_stats = testutils.flow_stats_get(self)
self.assertEqual(len(flow_stats.stats),1,
"Expected only one flow_mod")
stat = flow_stats.stats[0]
self.assertEqual(stat.match, fm_new.match)
self.assertEqual(stat.instructions, fm_new.instructions)
开发者ID:CPqD,项目名称:oftest12,代码行数:25,代码来源:flow_mods.py
示例15: runTest
def runTest(self):
old_vid = 2
sup_acts = supported_actions_get(self)
if not (sup_acts & 1 << ofp.OFPAT_POP_VLAN):
testutils.skip_message_emit(self, "Strip VLAN tag test")
return
len_w_vid = 104
len = 100
pkt = testutils.simple_tcp_packet(pktlen=len_w_vid, dl_vlan_enable=True,
dl_vlan=old_vid)
exp_pkt = testutils.simple_tcp_packet(pktlen=len)
vid_act = action.action_pop_vlan()
testutils.flow_match_test(self, pa_port_map, pkt=pkt, exp_pkt=exp_pkt,
apply_action_list=[vid_act])
开发者ID:chesteve,项目名称:oftest11,代码行数:16,代码来源:pktact.py
示例16: runTest
def runTest(self):
ing_port = sdn_port_map.keys()[0]
for i in range(1, 1024*1000 + 1):
#print("packet count:", i)
self.logger.info("packet count:", i)
pkt_send = testutils.simple_tcp_packet(dl_dst='11:22:33:44:55:66',tcp_sport = i % 5000 + 1000)
num = self.dataplane.send(ing_port, str(pkt_send))
开发者ID:HuaweiSwitch,项目名称:OpenFlow,代码行数:7,代码来源:sdn.py
示例17: runTest
def runTest(self):
ing_port = flow_mods_port_map.keys()[0]
out_port1 = flow_mods_port_map.keys()[1]
out_port2 = flow_mods_port_map.keys()[2]
pkt = testutils.simple_tcp_packet()
testutils.delete_all_flows(self.controller, self.logger)
fm_orig = testutils.flow_msg_create(self, pkt,
ing_port=ofp.OFPP_ANY,
egr_port=out_port1,
table_id=2)
fm_new = testutils.flow_msg_create(self, pkt,
ing_port=ing_port,
egr_port=out_port2,
table_id=2)
fm_new.command = ofp.OFPFC_MODIFY_STRICT
testutils.ofmsg_send(self, fm_orig)
testutils.ofmsg_send(self, fm_new)
flow_stats = testutils.flow_stats_get(self)
#print(flow_stats.show())
self.assertEqual(len(flow_stats.stats),1,
"Expected only one flow_mod")
#fail to modify and instruction will remain the same
stat = flow_stats.stats[0]
self.assertEqual(stat.match, fm_orig.match)
self.assertEqual(stat.instructions, fm_orig.instructions)
开发者ID:HuaweiSwitch,项目名称:OpenFlow,代码行数:28,代码来源:flow_mods.py
示例18: runTest
def runTest(self):
# Construct packet to send to dataplane
# Send packet to dataplane
# Poll controller with expect message type packet in
of_ports = testutils.clear_switch(self, basic_port_map.keys(), basic_logger)
# These will get put into function
outpkt = testutils.simple_tcp_packet()
of_ports = basic_port_map.keys()
of_ports.sort()
for dp_port in of_ports:
msg = message.packet_out()
msg.in_port = ofp.OFPP_CONTROLLER
msg.data = str(outpkt)
act = action.action_output()
act.port = dp_port
self.assertTrue(msg.actions.add(act), 'Could not add action to msg')
basic_logger.info("PacketOut to: " + str(dp_port))
testutils.ofmsg_send(self, msg)
(of_port, pkt, _) = self.dataplane.poll(timeout=1)
self.assertTrue(pkt is not None, 'Packet not received')
basic_logger.info("PacketOut: got pkt from " + str(of_port))
if of_port is not None:
self.assertEqual(of_port, dp_port, "Unexpected receive port")
self.assertEqual(str(outpkt), str(pkt),
'Response packet does not match send packet')
开发者ID:HuaweiSwitch,项目名称:OpenFlow,代码行数:30,代码来源:basic.py
示例19: handleFlow
def handleFlow(self, pkttype='TCP'):
of_ports = pa_port_map.keys()
of_ports.sort()
self.assertTrue(len(of_ports) > 1, "Not enough ports for test")
if (pkttype == 'ICMP'):
pkt = testutils.simple_icmp_packet()
table_id = testutils.EX_ICMP_TABLE
else:
pkt = testutils.simple_tcp_packet()
table_id = testutils.WC_ACL_TABLE
for idx in range(len(of_ports)):
rv = testutils.delete_all_flows(self.controller, pa_logger)
self.assertEqual(rv, 0, "Failed to delete all flows")
testutils.set_table_config(self, table_id, ofp.OFPTC_TABLE_MISS_CONTINUE)
ingress_port = of_ports[idx]
egress_port = of_ports[(idx + 1) % len(of_ports)]
pa_logger.info("Ingress " + str(ingress_port) +
" to egress " + str(egress_port))
#controller send flow mod to switch
request = testutils.flow_msg_create(self,pkt, ing_port=ingress_port,
egr_port=egress_port, table_id=table_id)
testutils.flow_msg_install(self, request)
#user send pkt to switch
pa_logger.info("Sending packet to dp port " + str(ingress_port))
self.dataplane.send(ingress_port, str(pkt))
testutils.receive_pkt_verify(self, egress_port, pkt)
开发者ID:HuaweiSwitch,项目名称:OpenFlow,代码行数:31,代码来源:pktact.py
注:本文中的testutils.simple_tcp_packet函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论