• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python usocket.socket函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中usocket.socket函数的典型用法代码示例。如果您正苦于以下问题:Python socket函数的具体用法?Python socket怎么用?Python socket使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了socket函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: open_http_socket

def open_http_socket(method, url, json=None, timeout=None, headers=None, urlencoded = None):
	urlparts = url.split('/', 3)
	proto = urlparts[0]
	host = urlparts[2]
	urlpath = '' if len(urlparts) < 4 else urlparts[3]

	if proto == 'http:':
		port = 80
	elif proto == 'https:':
		port = 443
	else:
		raise OSError('Unsupported protocol: %s' % proto[:-1])

	if ':' in host:
		host, port = host.split(':')
		port = int(port)

	if json is not None:
		content = ujson.dumps(json)
		content_type = CONTENT_TYPE_JSON
	elif urlencoded is not None:
		content = urlencoded
		content_type = "application/x-www-form-urlencoded"
	else:
		content = None

	# ToDo: Handle IPv6 addresses
	if is_ipv4_address(host):
		addr = (host, port)
	else:
		ai = usocket.getaddrinfo(host, port)
		addr = ai[0][4]

	sock = None
	if proto == 'https:':
		sock = usocket.socket(usocket.AF_INET, usocket.SOCK_STREAM, usocket.SEC_SOCKET)
	else:
		sock = usocket.socket()

	sock.connect(addr)
	if proto == 'https:':
		sock.settimeout(0) # Actually make timeouts working properly with ssl

	sock.send('%s /%s HTTP/1.0\r\nHost: %s\r\n' % (method, urlpath, host))

	if headers is not None:
		for header in headers.items():
			sock.send('%s: %s\r\n' % header)

	if content is not None:
		sock.send('content-length: %s\r\n' % len(content))
		sock.send('content-type: %s\r\n' % content_type)
		sock.send('\r\n')
		sock.send(content)
	else:
		sock.send('\r\n')

	return sock
开发者ID:farragar,项目名称:Mk3-Firmware,代码行数:58,代码来源:http_client.py


示例2: start

    def start(self):
        """
        Starts the LoRaWAN nano gateway.
        """

        self._log('Starting LoRaWAN nano gateway with id: {}', self.id)

        # setup WiFi as a station and connect
        self.wlan = WLAN(mode=WLAN.STA)
        self._connect_to_wifi()

        # get a time sync
        self._log('Syncing time with {} ...', self.ntp_server)
        self.rtc.ntp_sync(self.ntp_server, update_period=self.ntp_period)
        while not self.rtc.synced():
            utime.sleep_ms(50)
        self._log("RTC NTP sync complete")

        # get the server IP and create an UDP socket
        self.server_ip = usocket.getaddrinfo(self.server, self.port)[0][-1]
        self._log('Opening UDP socket to {} ({}) port {}...', self.server, self.server_ip[0], self.server_ip[1])
        self.sock = usocket.socket(usocket.AF_INET, usocket.SOCK_DGRAM, usocket.IPPROTO_UDP)
        self.sock.setsockopt(usocket.SOL_SOCKET, usocket.SO_REUSEADDR, 1)
        self.sock.setblocking(False)

        # push the first time immediatelly
        self._push_data(self._make_stat_packet())

        # create the alarms
        self.stat_alarm = Timer.Alarm(handler=lambda t: self._push_data(self._make_stat_packet()), s=60, periodic=True)
        self.pull_alarm = Timer.Alarm(handler=lambda u: self._pull_data(), s=25, periodic=True)

        # start the UDP receive thread
        self.udp_stop = False
        _thread.start_new_thread(self._udp_thread, ())

        # initialize the LoRa radio in LORA mode
        self._log('Setting up the LoRa radio at {:.1f} Mhz using {}', self._freq_to_float(self.frequency), self.datarate)
        self.lora = LoRa(
            mode=LoRa.LORA,
            frequency=self.frequency,
            bandwidth=self.bw,
            sf=self.sf,
            preamble=8,
            coding_rate=LoRa.CODING_4_5,
            tx_iq=True
        )

        # create a raw LoRa socket
        self.lora_sock = usocket.socket(usocket.AF_LORA, usocket.SOCK_RAW)
        self.lora_sock.setblocking(False)
        self.lora_tx_done = False

        self.lora.callback(trigger=(LoRa.RX_PACKET_EVENT | LoRa.TX_PACKET_EVENT), handler=self._lora_cb)
        self._log('LoRaWAN nano gateway online')
开发者ID:H-LK,项目名称:pycom-libraries,代码行数:55,代码来源:nanogateway.py


示例3: updateStats

def updateStats():
    # Download the JSON file
    s = usocket.socket()
    try:
        s.connect(connectto[0][4])

        s.send("GET /api/\r\n")
        output = s.recv(4096)
        s.close()
    except Exception as e:
        sys.print_exception(e)
        curIn.text("NET")
        curOut.text("GONE?")
        time.sleep(5)
        return

    # Decode the JSON
    print(output)
    try:
        data = ujson.loads(output)
    except Exception as e:
        sys.print_exception(e)
        curIn.text("JSON")
        curOut.text("ERROR")
        return

    # Update the display

    curIn.text("%.0f Mbps" % (data['uplink_in'] / 1000000))
    curOut.text("%.0f Mbps" % (data['uplink_out'] / 1000000))
开发者ID:emfcamp,项目名称:emfnoc,代码行数:30,代码来源:main.py


示例4: main

def main(use_stream=False):
    s = socket.socket()

    # Binding to all interfaces - server will be accessible to other hosts!
    ai = socket.getaddrinfo("0.0.0.0", 8080)
    print("Bind address info:", ai)
    addr = ai[0][-1]

    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    s.bind(addr)
    s.listen(5)
    print("Listening, connect your browser to http://<this_host>:8080/")

    counter = 0
    while True:
        res = s.accept()
        client_s = res[0]
        client_addr = res[1]
        print("Client address:", client_addr)
        print("Client socket:", client_s)
        print("Request:")
        if use_stream:
            # MicroPython socket objects support stream (aka file) interface
            # directly.
            print(client_s.read(4096))
            client_s.write(CONTENT % counter)
        else:
            print(client_s.recv(4096))
            client_s.send(CONTENT % counter)
        client_s.close()
        counter += 1
        print()
开发者ID:DiegoAltamirano,项目名称:micropython,代码行数:32,代码来源:http_server.py


示例5: connect

 def connect(self, clean_session=True):
     self.sock = socket.socket()
     self.sock.connect(self.addr)
     if self.ssl:
         import ussl
         self.sock = ussl.wrap_socket(self.sock, **self.ssl_params)
     msg = bytearray(b"\x10\0\0\x04MQTT\x04\x02\0\0")
     msg[1] = 10 + 2 + len(self.client_id)
     msg[9] = clean_session << 1
     if self.user is not None:
         msg[1] += 2 + len(self.user) + 2 + len(self.pswd)
         msg[9] |= 0xC0
     if self.keepalive:
         assert self.keepalive < 65536
         msg[10] |= self.keepalive >> 8
         msg[11] |= self.keepalive & 0x00FF
     if self.lw_topic:
         msg[1] += 2 + len(self.lw_topic) + 2 + len(self.lw_msg)
         msg[9] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3
         msg[9] |= self.lw_retain << 5
     self.sock.write(msg)
     #print(hex(len(msg)), hexlify(msg, ":"))
     self._send_str(self.client_id)
     if self.lw_topic:
         self._send_str(self.lw_topic)
         self._send_str(self.lw_msg)
     if self.user is not None:
         self._send_str(self.user)
         self._send_str(self.pswd)
     resp = self.sock.read(4)
     assert resp[0] == 0x20 and resp[1] == 0x02
     if resp[3] != 0:
         raise MQTTException(resp[3])
     return resp[2] & 1
开发者ID:stinos,项目名称:micropython-lib,代码行数:34,代码来源:mqtt.py


示例6: test

def test(peer_addr):
    s = socket.socket()
    s.connect(peer_addr)
    s = ssl.wrap_socket(s)
    cert = s.getpeercert(True)
    print(type(cert), len(cert) > 100)
    s.close()
开发者ID:AriZuu,项目名称:micropython,代码行数:7,代码来源:ssl_getpeercert.py


示例7: open_connection

def open_connection(host, port, ssl=False):
    if DEBUG and __debug__:
        log.debug("open_connection(%s, %s)", host, port)
    ai = _socket.getaddrinfo(host, port, 0, _socket.SOCK_STREAM)
    ai = ai[0]
    s = _socket.socket(ai[0], ai[1], ai[2])
    s.setblocking(False)
    try:
        s.connect(ai[-1])
    except OSError as e:
        if e.args[0] != uerrno.EINPROGRESS:
            raise
    if DEBUG and __debug__:
        log.debug("open_connection: After connect")
    yield IOWrite(s)
#    if __debug__:
#        assert s2.fileno() == s.fileno()
    if DEBUG and __debug__:
        log.debug("open_connection: After iowait: %s", s)
    if ssl:
        print("Warning: uasyncio SSL support is alpha")
        import ussl
        s.setblocking(True)
        s2 = ussl.wrap_socket(s)
        s.setblocking(False)
        return StreamReader(s, s2), StreamWriter(s2, {})
    return StreamReader(s), StreamWriter(s, {})
开发者ID:SpotlightKid,项目名称:micropython-lib,代码行数:27,代码来源:__init__.py


示例8: request

def request(method, url, json=None, timeout=None, headers=None):
    urlparts = url.split('/', 3)
    proto = urlparts[0]
    host = urlparts[2]
    urlpath = '' if len(urlparts) < 4 else urlparts[3]

    if proto == 'http:':
        port = 80
    elif proto == 'https:':
        port = 443
    else:
        raise OSError('Unsupported protocol: %s' % proto[:-1])

    if ':' in host:
        host, port = host.split(':')
        port = int(port)

    if json is not None:
        content = ujson.dumps(json)
        content_type = CONTENT_TYPE_JSON
    else:
        content = None

    ai = usocket.getaddrinfo(host, port)
    addr = ai[0][4]

    sock = usocket.socket()

    if timeout is not None:
        assert SUPPORT_TIMEOUT, 'Socket does not support timeout'
        sock.settimeout(timeout)

    sock.connect(addr)

    if proto == 'https:':
        assert SUPPORT_SSL, 'HTTPS not supported: could not find ussl'
        sock = ussl.wrap_socket(sock)

    sock.write('%s /%s HTTP/1.0\r\nHost: %s\r\n' % (method, urlpath, host))

    if headers is not None:
        for header in headers.items():
            sock.write('%s: %s\r\n' % header)

    if content is not None:
        sock.write('content-length: %s\r\n' % len(content))
        sock.write('content-type: %s\r\n' % content_type)
        sock.write('\r\n')
        sock.write(content)
    else:
        sock.write('\r\n')

    l = sock.readline()
    protover, status, msg = l.split(None, 2)

    # Skip headers
    while sock.readline() != b'\r\n':
        pass

    return Response(int(status), sock)
开发者ID:SB-Technology-Holdings-International,项目名称:Water,代码行数:60,代码来源:http_client.py


示例9: urlopen

def urlopen(url, data=None):
    if data:
        raise NotImplementedError("POST is not yet supported")
    try:
        proto, dummy, host, path = url.split("/", 3)
    except ValueError:
        proto, dummy, host = url.split("/", 2)
        path = ""
    if proto != "http:":
        raise ValueError("Unsupported protocol: " + proto)

    ai = usocket.getaddrinfo(host, 80)
    addr = ai[0][4]
    s = usocket.socket()
    s.connect(addr)
    s.send(b"GET /%s HTTP/1.0\r\nHost: %s\r\n\r\n" % (path, host))

    l = s.readline()
    protover, status, msg = l.split(None, 2)
    status = int(status)
    #print(protover, status, msg)
    while True:
        l = s.readline()
        if not l or l == b"\r\n":
            break
        #print(line)
        if l.startswith(b"Transfer-Encoding:"):
            if b"chunked" in line:
                raise ValueError("Unsupported " + l)
        elif l.startswith(b"Location:"):
            raise NotImplementedError("Redirects not yet supported")

    return s
开发者ID:balloob,项目名称:micropython-lib,代码行数:33,代码来源:urequest.py


示例10: download

    def download(url, local_name):
        proto, _, host, urlpath = url.split('/', 3)
        ai = usocket.getaddrinfo(host, 443)
        #print("Address infos:", ai)
        addr = ai[0][4]

        s = usocket.socket()
        #print("Connect address:", addr)
        s.connect(addr)

        if proto == "https:":
            s = ussl.wrap_socket(s)

        # MicroPython rawsocket module supports file interface directly
        s.write("GET /%s HTTP/1.0\r\nHost: %s\r\n\r\n" % (urlpath, host))
        l = s.readline()
        protover, status, msg = l.split(None, 2)
        if status != b"200":
            raise OSError()
        while 1:
            l = s.readline()
            if not l:
                raise OSError()
            if l == b'\r\n':
                break
        with open(local_name, "wb") as f:
            while 1:
                l = s.read(1024)
                if not l:
                    break
                f.write(l)
开发者ID:edyza,项目名称:micropython-lib,代码行数:31,代码来源:upip.py


示例11: main

def main(use_stream=True):
    s = _socket.socket()

    ai = _socket.getaddrinfo("google.com", 443)
    print("Address infos:", ai)
    addr = ai[0][-1]

    print("Connect address:", addr)
    s.connect(addr)

    s = ssl.wrap_socket(s)
    print(s)

    if use_stream:
        # Both CPython and MicroPython SSLSocket objects support read() and
        # write() methods.
        s.write(b"GET / HTTP/1.0\n\n")
        print(s.read(4096))
    else:
        # MicroPython SSLSocket objects implement only stream interface, not
        # socket interface
        s.send(b"GET / HTTP/1.0\n\n")
        print(s.recv(4096))

    s.close()
开发者ID:ESPWarren,项目名称:micropython,代码行数:25,代码来源:http_client_ssl.py


示例12: url_open

def url_open(url):
    global warn_ussl
    proto, _, host, urlpath = url.split('/', 3)
    ai = usocket.getaddrinfo(host, 443)
    #print("Address infos:", ai)
    addr = ai[0][4]

    s = usocket.socket(ai[0][0])
    #print("Connect address:", addr)
    s.connect(addr)

    if proto == "https:":
        s = ussl.wrap_socket(s)
        if warn_ussl:
            print("Warning: %s SSL certificate is not validated" % host)
            warn_ussl = False

    # MicroPython rawsocket module supports file interface directly
    s.write("GET /%s HTTP/1.0\r\nHost: %s\r\n\r\n" % (urlpath, host))
    l = s.readline()
    protover, status, msg = l.split(None, 2)
    if status != b"200":
        if status == b"404":
            print("Package not found")
        raise ValueError(status)
    while 1:
        l = s.readline()
        if not l:
            raise ValueError("Unexpected EOF")
        if l == b'\r\n':
            break

    return s
开发者ID:bynds,项目名称:micropython-lib,代码行数:33,代码来源:upip.py


示例13: main_client

def main_client(addr):
    nic = network.WIZNET5K(pyb.SPI(2), pyb.Pin.board.Y5, pyb.Pin.board.Y4)
    nic.ifconfig((SENSOR_IP, SENSOR_MASK, SENSOR_GATEWAY, SENSOR_DNS))
    # print(nic.ifconfig())
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    count = 0
    try:
        s.connect(addr)
    except:
        pass
    data = {
        'serial_no':'1234567890',
        'ip':SENSOR_IP,
        'op':gOperator['OP_REGISTER'],
        'type':gSensorType['TEMPERATURE']
    }
    DHT22.init()
    while True:
        pyb.delay(100)
        hum, tem = measure()
        print('%d hum:%f, tem:%f' % (count,hum, tem))
        data['value'] = {'hum':hum, 'tem':tem}
        count += 1

        try:
            send(s, 'post', '/test_pymagic', data)
            print(s.recv(4096))
        except:
            pass
        pyb.delay(1000)
开发者ID:kamijawa,项目名称:pymagic,代码行数:30,代码来源:main.py


示例14: __init__

 def __init__(self, host, port=HTTP_PORT):
     self.host = host
     self.port = int(port)
     addr_info = socket.getaddrinfo(host, port)
     self.addr = addr_info[0][-1]
     self.connected = False
     self.socket = socket.socket()
开发者ID:fadushin,项目名称:esp8266,代码行数:7,代码来源:client.py


示例15: urlopen

def urlopen(url, data=None, method="GET"):
    if data is not None and method == "GET":
        method = "POST"
    try:
        proto, dummy, host, path = url.split("/", 3)
    except ValueError:
        proto, dummy, host = url.split("/", 2)
        path = ""
    if proto == "http:":
        port = 80
    elif proto == "https:":
        import ussl
        port = 443
    else:
        raise ValueError("Unsupported protocol: " + proto)

    if ":" in host:
        host, port = host.split(":", 1)
        port = int(port)

    ai = usocket.getaddrinfo(host, port)
    addr = ai[0][4]
    s = usocket.socket()
    s.connect(addr)
    if proto == "https:":
        s = ussl.wrap_socket(s, server_hostname=host)

    s.write(method)
    s.write(b" /")
    s.write(path)
    s.write(b" HTTP/1.0\r\nHost: ")
    s.write(host)
    s.write(b"\r\n")

    if data:
        s.write(b"Content-Length: ")
        s.write(str(len(data)))
        s.write(b"\r\n")
    s.write(b"\r\n")
    if data:
        s.write(data)

    l = s.readline()
    protover, status, msg = l.split(None, 2)
    status = int(status)
    #print(protover, status, msg)
    while True:
        l = s.readline()
        if not l or l == b"\r\n":
            break
        #print(l)
        if l.startswith(b"Transfer-Encoding:"):
            if b"chunked" in l:
                raise ValueError("Unsupported " + l)
        elif l.startswith(b"Location:"):
            raise NotImplementedError("Redirects not yet supported")

    return s
开发者ID:puuu,项目名称:micropython-lib,代码行数:58,代码来源:urequest.py


示例16: request

def request(method, url, data=None, json=None, headers={}, stream=None):
    try:
        proto, dummy, host, path = url.split("/", 3)
    except ValueError:
        proto, dummy, host = url.split("/", 2)
        path = ""
    if proto == "http:":
        port = 80
    elif proto == "https:":
        import ussl
        port = 443
    else:
        raise ValueError("Unsupported protocol: " + proto)

    if ":" in host:
        host, port = host.split(":", 1)
        port = int(port)

    ai = usocket.getaddrinfo(host, port)
    addr = ai[0][4]
    s = usocket.socket()
    s.connect(addr)
    if proto == "https:":
        s = ussl.wrap_socket(s)
    s.write(b"%s /%s HTTP/1.0\r\n" % (method, path))
    if not "Host" in headers:
        s.write(b"Host: %s\r\n" % host)
    if json is not None:
        assert data is None
        import ujson
        data = ujson.dumps(json)
    if data:
        s.write(b"Content-Length: %d\r\n" % len(data))
    s.write(b"\r\n")
    if data:
        s.write(data)

    l = s.readline()
    protover, status, msg = l.split(None, 2)
    status = int(status)
    #print(protover, status, msg)
    while True:
        l = s.readline()
        if not l or l == b"\r\n":
            break
        #print(line)
        if l.startswith(b"Transfer-Encoding:"):
            if b"chunked" in line:
                raise ValueError("Unsupported " + l)
        elif l.startswith(b"Location:"):
            raise NotImplementedError("Redirects not yet supported")

    resp = Response(s)
    resp.status_code = status
    resp.reason = msg.rstrip()
    return resp
开发者ID:dwighthubbard,项目名称:micropython-lib,代码行数:56,代码来源:urequests.py


示例17: __init__

    def __init__(self, ip=config.UDP_IP, port=config.UDP_PORT, num_leds=240):

        # Init and clear LED Strip
        self.ledstrip = driver.LedStrip(num_leds)
        self.ledstrip.clear()
        self.ledstrip.send()

        # Init UDP-Socket listener
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.socket.bind((ip, port))
开发者ID:titusz,项目名称:followlight,代码行数:10,代码来源:server.py


示例18: connectSocket

def connectSocket():
    # connect to artnet port
    sock = usocket.socket(usocket.AF_INET, usocket.SOCK_DGRAM)
    # sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    # addr = socket.getaddrinfo(UDP_IP, UDP_PORT, socket.AF_INET, socket.SOCK_DGRAM)[0][4]
    # print(addr)
    sock.bind((UDP_IP, UDP_PORT))
    # sock.settimeout(1)
    sock.setblocking(0)
    return sock
开发者ID:bennigraf,项目名称:wipy-artnet-to-ws2812,代码行数:10,代码来源:main.py


示例19: main

def main():

    if len(sys.argv) != 3:
        help(1)

    if ":" in sys.argv[1] and ":" in sys.argv[2]:
        error("Operations on 2 remote files are not supported")
    if ":" not in sys.argv[1] and ":" not in sys.argv[2]:
        error("One remote file is required")

    if ":" in sys.argv[1]:
        op = "get"
        host, port, src_file = parse_remote(sys.argv[1])
        dst_file = sys.argv[2]
        if os.path.isdir(dst_file):
            basename = src_file.rsplit("/", 1)[-1]
            dst_file += "/" + basename
    else:
        op = "put"
        host, port, dst_file = parse_remote(sys.argv[2])
        src_file = sys.argv[1]
        if dst_file[-1] == "/":
            basename = src_file.rsplit("/", 1)[-1]
            dst_file += basename

    if 1:
        print(op, host, port)
        print(src_file, "->", dst_file)

    s = socket.socket()

    ai = socket.getaddrinfo(host, port)
    addr = ai[0][4]

    s.connect(addr)
    #s = s.makefile("rwb")
    websocket_helper.client_handshake(s)

    ws = websocket(s)

    import getpass
    passwd = getpass.getpass()
    login(ws, passwd)
    print("Remote WebREPL version:", get_ver(ws))

    # Set websocket to send data marked as "binary"
    ws.ioctl(9, 2)

    if op == "get":
        get_file(ws, dst_file, src_file)
    elif op == "put":
        put_file(ws, src_file, dst_file)

    s.close()
开发者ID:rguillon,项目名称:webrepl,代码行数:54,代码来源:webrepl_cli.py


示例20: time

def time():
    NTP_QUERY = bytearray(48)
    NTP_QUERY[0] = 0x1b
    addr = socket.getaddrinfo(host, 123)[0][-1]
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.settimeout(1)
    res = s.sendto(NTP_QUERY, addr)
    msg = s.recv(48)
    s.close()
    val = struct.unpack("!I", msg[40:44])[0]
    return val - NTP_DELTA
开发者ID:19emtuck,项目名称:micropython,代码行数:11,代码来源:ntptime.py



注:本文中的usocket.socket函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python argumentParser.ArgumentParser类代码示例发布时间:2022-05-27
下一篇:
Python usocket.getaddrinfo函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap