本文整理汇总了Python中usocket.socket函数的典型用法代码示例。如果您正苦于以下问题:Python socket函数的具体用法?Python socket怎么用?Python socket使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了socket函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: open_http_socket
def open_http_socket(method, url, json=None, timeout=None, headers=None, urlencoded = None):
urlparts = url.split('/', 3)
proto = urlparts[0]
host = urlparts[2]
urlpath = '' if len(urlparts) < 4 else urlparts[3]
if proto == 'http:':
port = 80
elif proto == 'https:':
port = 443
else:
raise OSError('Unsupported protocol: %s' % proto[:-1])
if ':' in host:
host, port = host.split(':')
port = int(port)
if json is not None:
content = ujson.dumps(json)
content_type = CONTENT_TYPE_JSON
elif urlencoded is not None:
content = urlencoded
content_type = "application/x-www-form-urlencoded"
else:
content = None
# ToDo: Handle IPv6 addresses
if is_ipv4_address(host):
addr = (host, port)
else:
ai = usocket.getaddrinfo(host, port)
addr = ai[0][4]
sock = None
if proto == 'https:':
sock = usocket.socket(usocket.AF_INET, usocket.SOCK_STREAM, usocket.SEC_SOCKET)
else:
sock = usocket.socket()
sock.connect(addr)
if proto == 'https:':
sock.settimeout(0) # Actually make timeouts working properly with ssl
sock.send('%s /%s HTTP/1.0\r\nHost: %s\r\n' % (method, urlpath, host))
if headers is not None:
for header in headers.items():
sock.send('%s: %s\r\n' % header)
if content is not None:
sock.send('content-length: %s\r\n' % len(content))
sock.send('content-type: %s\r\n' % content_type)
sock.send('\r\n')
sock.send(content)
else:
sock.send('\r\n')
return sock
开发者ID:farragar,项目名称:Mk3-Firmware,代码行数:58,代码来源:http_client.py
示例2: start
def start(self):
"""
Starts the LoRaWAN nano gateway.
"""
self._log('Starting LoRaWAN nano gateway with id: {}', self.id)
# setup WiFi as a station and connect
self.wlan = WLAN(mode=WLAN.STA)
self._connect_to_wifi()
# get a time sync
self._log('Syncing time with {} ...', self.ntp_server)
self.rtc.ntp_sync(self.ntp_server, update_period=self.ntp_period)
while not self.rtc.synced():
utime.sleep_ms(50)
self._log("RTC NTP sync complete")
# get the server IP and create an UDP socket
self.server_ip = usocket.getaddrinfo(self.server, self.port)[0][-1]
self._log('Opening UDP socket to {} ({}) port {}...', self.server, self.server_ip[0], self.server_ip[1])
self.sock = usocket.socket(usocket.AF_INET, usocket.SOCK_DGRAM, usocket.IPPROTO_UDP)
self.sock.setsockopt(usocket.SOL_SOCKET, usocket.SO_REUSEADDR, 1)
self.sock.setblocking(False)
# push the first time immediatelly
self._push_data(self._make_stat_packet())
# create the alarms
self.stat_alarm = Timer.Alarm(handler=lambda t: self._push_data(self._make_stat_packet()), s=60, periodic=True)
self.pull_alarm = Timer.Alarm(handler=lambda u: self._pull_data(), s=25, periodic=True)
# start the UDP receive thread
self.udp_stop = False
_thread.start_new_thread(self._udp_thread, ())
# initialize the LoRa radio in LORA mode
self._log('Setting up the LoRa radio at {:.1f} Mhz using {}', self._freq_to_float(self.frequency), self.datarate)
self.lora = LoRa(
mode=LoRa.LORA,
frequency=self.frequency,
bandwidth=self.bw,
sf=self.sf,
preamble=8,
coding_rate=LoRa.CODING_4_5,
tx_iq=True
)
# create a raw LoRa socket
self.lora_sock = usocket.socket(usocket.AF_LORA, usocket.SOCK_RAW)
self.lora_sock.setblocking(False)
self.lora_tx_done = False
self.lora.callback(trigger=(LoRa.RX_PACKET_EVENT | LoRa.TX_PACKET_EVENT), handler=self._lora_cb)
self._log('LoRaWAN nano gateway online')
开发者ID:H-LK,项目名称:pycom-libraries,代码行数:55,代码来源:nanogateway.py
示例3: updateStats
def updateStats():
# Download the JSON file
s = usocket.socket()
try:
s.connect(connectto[0][4])
s.send("GET /api/\r\n")
output = s.recv(4096)
s.close()
except Exception as e:
sys.print_exception(e)
curIn.text("NET")
curOut.text("GONE?")
time.sleep(5)
return
# Decode the JSON
print(output)
try:
data = ujson.loads(output)
except Exception as e:
sys.print_exception(e)
curIn.text("JSON")
curOut.text("ERROR")
return
# Update the display
curIn.text("%.0f Mbps" % (data['uplink_in'] / 1000000))
curOut.text("%.0f Mbps" % (data['uplink_out'] / 1000000))
开发者ID:emfcamp,项目名称:emfnoc,代码行数:30,代码来源:main.py
示例4: main
def main(use_stream=False):
s = socket.socket()
# Binding to all interfaces - server will be accessible to other hosts!
ai = socket.getaddrinfo("0.0.0.0", 8080)
print("Bind address info:", ai)
addr = ai[0][-1]
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(addr)
s.listen(5)
print("Listening, connect your browser to http://<this_host>:8080/")
counter = 0
while True:
res = s.accept()
client_s = res[0]
client_addr = res[1]
print("Client address:", client_addr)
print("Client socket:", client_s)
print("Request:")
if use_stream:
# MicroPython socket objects support stream (aka file) interface
# directly.
print(client_s.read(4096))
client_s.write(CONTENT % counter)
else:
print(client_s.recv(4096))
client_s.send(CONTENT % counter)
client_s.close()
counter += 1
print()
开发者ID:DiegoAltamirano,项目名称:micropython,代码行数:32,代码来源:http_server.py
示例5: connect
def connect(self, clean_session=True):
self.sock = socket.socket()
self.sock.connect(self.addr)
if self.ssl:
import ussl
self.sock = ussl.wrap_socket(self.sock, **self.ssl_params)
msg = bytearray(b"\x10\0\0\x04MQTT\x04\x02\0\0")
msg[1] = 10 + 2 + len(self.client_id)
msg[9] = clean_session << 1
if self.user is not None:
msg[1] += 2 + len(self.user) + 2 + len(self.pswd)
msg[9] |= 0xC0
if self.keepalive:
assert self.keepalive < 65536
msg[10] |= self.keepalive >> 8
msg[11] |= self.keepalive & 0x00FF
if self.lw_topic:
msg[1] += 2 + len(self.lw_topic) + 2 + len(self.lw_msg)
msg[9] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3
msg[9] |= self.lw_retain << 5
self.sock.write(msg)
#print(hex(len(msg)), hexlify(msg, ":"))
self._send_str(self.client_id)
if self.lw_topic:
self._send_str(self.lw_topic)
self._send_str(self.lw_msg)
if self.user is not None:
self._send_str(self.user)
self._send_str(self.pswd)
resp = self.sock.read(4)
assert resp[0] == 0x20 and resp[1] == 0x02
if resp[3] != 0:
raise MQTTException(resp[3])
return resp[2] & 1
开发者ID:stinos,项目名称:micropython-lib,代码行数:34,代码来源:mqtt.py
示例6: test
def test(peer_addr):
s = socket.socket()
s.connect(peer_addr)
s = ssl.wrap_socket(s)
cert = s.getpeercert(True)
print(type(cert), len(cert) > 100)
s.close()
开发者ID:AriZuu,项目名称:micropython,代码行数:7,代码来源:ssl_getpeercert.py
示例7: open_connection
def open_connection(host, port, ssl=False):
if DEBUG and __debug__:
log.debug("open_connection(%s, %s)", host, port)
ai = _socket.getaddrinfo(host, port, 0, _socket.SOCK_STREAM)
ai = ai[0]
s = _socket.socket(ai[0], ai[1], ai[2])
s.setblocking(False)
try:
s.connect(ai[-1])
except OSError as e:
if e.args[0] != uerrno.EINPROGRESS:
raise
if DEBUG and __debug__:
log.debug("open_connection: After connect")
yield IOWrite(s)
# if __debug__:
# assert s2.fileno() == s.fileno()
if DEBUG and __debug__:
log.debug("open_connection: After iowait: %s", s)
if ssl:
print("Warning: uasyncio SSL support is alpha")
import ussl
s.setblocking(True)
s2 = ussl.wrap_socket(s)
s.setblocking(False)
return StreamReader(s, s2), StreamWriter(s2, {})
return StreamReader(s), StreamWriter(s, {})
开发者ID:SpotlightKid,项目名称:micropython-lib,代码行数:27,代码来源:__init__.py
示例8: request
def request(method, url, json=None, timeout=None, headers=None):
urlparts = url.split('/', 3)
proto = urlparts[0]
host = urlparts[2]
urlpath = '' if len(urlparts) < 4 else urlparts[3]
if proto == 'http:':
port = 80
elif proto == 'https:':
port = 443
else:
raise OSError('Unsupported protocol: %s' % proto[:-1])
if ':' in host:
host, port = host.split(':')
port = int(port)
if json is not None:
content = ujson.dumps(json)
content_type = CONTENT_TYPE_JSON
else:
content = None
ai = usocket.getaddrinfo(host, port)
addr = ai[0][4]
sock = usocket.socket()
if timeout is not None:
assert SUPPORT_TIMEOUT, 'Socket does not support timeout'
sock.settimeout(timeout)
sock.connect(addr)
if proto == 'https:':
assert SUPPORT_SSL, 'HTTPS not supported: could not find ussl'
sock = ussl.wrap_socket(sock)
sock.write('%s /%s HTTP/1.0\r\nHost: %s\r\n' % (method, urlpath, host))
if headers is not None:
for header in headers.items():
sock.write('%s: %s\r\n' % header)
if content is not None:
sock.write('content-length: %s\r\n' % len(content))
sock.write('content-type: %s\r\n' % content_type)
sock.write('\r\n')
sock.write(content)
else:
sock.write('\r\n')
l = sock.readline()
protover, status, msg = l.split(None, 2)
# Skip headers
while sock.readline() != b'\r\n':
pass
return Response(int(status), sock)
开发者ID:SB-Technology-Holdings-International,项目名称:Water,代码行数:60,代码来源:http_client.py
示例9: urlopen
def urlopen(url, data=None):
if data:
raise NotImplementedError("POST is not yet supported")
try:
proto, dummy, host, path = url.split("/", 3)
except ValueError:
proto, dummy, host = url.split("/", 2)
path = ""
if proto != "http:":
raise ValueError("Unsupported protocol: " + proto)
ai = usocket.getaddrinfo(host, 80)
addr = ai[0][4]
s = usocket.socket()
s.connect(addr)
s.send(b"GET /%s HTTP/1.0\r\nHost: %s\r\n\r\n" % (path, host))
l = s.readline()
protover, status, msg = l.split(None, 2)
status = int(status)
#print(protover, status, msg)
while True:
l = s.readline()
if not l or l == b"\r\n":
break
#print(line)
if l.startswith(b"Transfer-Encoding:"):
if b"chunked" in line:
raise ValueError("Unsupported " + l)
elif l.startswith(b"Location:"):
raise NotImplementedError("Redirects not yet supported")
return s
开发者ID:balloob,项目名称:micropython-lib,代码行数:33,代码来源:urequest.py
示例10: download
def download(url, local_name):
proto, _, host, urlpath = url.split('/', 3)
ai = usocket.getaddrinfo(host, 443)
#print("Address infos:", ai)
addr = ai[0][4]
s = usocket.socket()
#print("Connect address:", addr)
s.connect(addr)
if proto == "https:":
s = ussl.wrap_socket(s)
# MicroPython rawsocket module supports file interface directly
s.write("GET /%s HTTP/1.0\r\nHost: %s\r\n\r\n" % (urlpath, host))
l = s.readline()
protover, status, msg = l.split(None, 2)
if status != b"200":
raise OSError()
while 1:
l = s.readline()
if not l:
raise OSError()
if l == b'\r\n':
break
with open(local_name, "wb") as f:
while 1:
l = s.read(1024)
if not l:
break
f.write(l)
开发者ID:edyza,项目名称:micropython-lib,代码行数:31,代码来源:upip.py
示例11: main
def main(use_stream=True):
s = _socket.socket()
ai = _socket.getaddrinfo("google.com", 443)
print("Address infos:", ai)
addr = ai[0][-1]
print("Connect address:", addr)
s.connect(addr)
s = ssl.wrap_socket(s)
print(s)
if use_stream:
# Both CPython and MicroPython SSLSocket objects support read() and
# write() methods.
s.write(b"GET / HTTP/1.0\n\n")
print(s.read(4096))
else:
# MicroPython SSLSocket objects implement only stream interface, not
# socket interface
s.send(b"GET / HTTP/1.0\n\n")
print(s.recv(4096))
s.close()
开发者ID:ESPWarren,项目名称:micropython,代码行数:25,代码来源:http_client_ssl.py
示例12: url_open
def url_open(url):
global warn_ussl
proto, _, host, urlpath = url.split('/', 3)
ai = usocket.getaddrinfo(host, 443)
#print("Address infos:", ai)
addr = ai[0][4]
s = usocket.socket(ai[0][0])
#print("Connect address:", addr)
s.connect(addr)
if proto == "https:":
s = ussl.wrap_socket(s)
if warn_ussl:
print("Warning: %s SSL certificate is not validated" % host)
warn_ussl = False
# MicroPython rawsocket module supports file interface directly
s.write("GET /%s HTTP/1.0\r\nHost: %s\r\n\r\n" % (urlpath, host))
l = s.readline()
protover, status, msg = l.split(None, 2)
if status != b"200":
if status == b"404":
print("Package not found")
raise ValueError(status)
while 1:
l = s.readline()
if not l:
raise ValueError("Unexpected EOF")
if l == b'\r\n':
break
return s
开发者ID:bynds,项目名称:micropython-lib,代码行数:33,代码来源:upip.py
示例13: main_client
def main_client(addr):
nic = network.WIZNET5K(pyb.SPI(2), pyb.Pin.board.Y5, pyb.Pin.board.Y4)
nic.ifconfig((SENSOR_IP, SENSOR_MASK, SENSOR_GATEWAY, SENSOR_DNS))
# print(nic.ifconfig())
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
count = 0
try:
s.connect(addr)
except:
pass
data = {
'serial_no':'1234567890',
'ip':SENSOR_IP,
'op':gOperator['OP_REGISTER'],
'type':gSensorType['TEMPERATURE']
}
DHT22.init()
while True:
pyb.delay(100)
hum, tem = measure()
print('%d hum:%f, tem:%f' % (count,hum, tem))
data['value'] = {'hum':hum, 'tem':tem}
count += 1
try:
send(s, 'post', '/test_pymagic', data)
print(s.recv(4096))
except:
pass
pyb.delay(1000)
开发者ID:kamijawa,项目名称:pymagic,代码行数:30,代码来源:main.py
示例14: __init__
def __init__(self, host, port=HTTP_PORT):
self.host = host
self.port = int(port)
addr_info = socket.getaddrinfo(host, port)
self.addr = addr_info[0][-1]
self.connected = False
self.socket = socket.socket()
开发者ID:fadushin,项目名称:esp8266,代码行数:7,代码来源:client.py
示例15: urlopen
def urlopen(url, data=None, method="GET"):
if data is not None and method == "GET":
method = "POST"
try:
proto, dummy, host, path = url.split("/", 3)
except ValueError:
proto, dummy, host = url.split("/", 2)
path = ""
if proto == "http:":
port = 80
elif proto == "https:":
import ussl
port = 443
else:
raise ValueError("Unsupported protocol: " + proto)
if ":" in host:
host, port = host.split(":", 1)
port = int(port)
ai = usocket.getaddrinfo(host, port)
addr = ai[0][4]
s = usocket.socket()
s.connect(addr)
if proto == "https:":
s = ussl.wrap_socket(s, server_hostname=host)
s.write(method)
s.write(b" /")
s.write(path)
s.write(b" HTTP/1.0\r\nHost: ")
s.write(host)
s.write(b"\r\n")
if data:
s.write(b"Content-Length: ")
s.write(str(len(data)))
s.write(b"\r\n")
s.write(b"\r\n")
if data:
s.write(data)
l = s.readline()
protover, status, msg = l.split(None, 2)
status = int(status)
#print(protover, status, msg)
while True:
l = s.readline()
if not l or l == b"\r\n":
break
#print(l)
if l.startswith(b"Transfer-Encoding:"):
if b"chunked" in l:
raise ValueError("Unsupported " + l)
elif l.startswith(b"Location:"):
raise NotImplementedError("Redirects not yet supported")
return s
开发者ID:puuu,项目名称:micropython-lib,代码行数:58,代码来源:urequest.py
示例16: request
def request(method, url, data=None, json=None, headers={}, stream=None):
try:
proto, dummy, host, path = url.split("/", 3)
except ValueError:
proto, dummy, host = url.split("/", 2)
path = ""
if proto == "http:":
port = 80
elif proto == "https:":
import ussl
port = 443
else:
raise ValueError("Unsupported protocol: " + proto)
if ":" in host:
host, port = host.split(":", 1)
port = int(port)
ai = usocket.getaddrinfo(host, port)
addr = ai[0][4]
s = usocket.socket()
s.connect(addr)
if proto == "https:":
s = ussl.wrap_socket(s)
s.write(b"%s /%s HTTP/1.0\r\n" % (method, path))
if not "Host" in headers:
s.write(b"Host: %s\r\n" % host)
if json is not None:
assert data is None
import ujson
data = ujson.dumps(json)
if data:
s.write(b"Content-Length: %d\r\n" % len(data))
s.write(b"\r\n")
if data:
s.write(data)
l = s.readline()
protover, status, msg = l.split(None, 2)
status = int(status)
#print(protover, status, msg)
while True:
l = s.readline()
if not l or l == b"\r\n":
break
#print(line)
if l.startswith(b"Transfer-Encoding:"):
if b"chunked" in line:
raise ValueError("Unsupported " + l)
elif l.startswith(b"Location:"):
raise NotImplementedError("Redirects not yet supported")
resp = Response(s)
resp.status_code = status
resp.reason = msg.rstrip()
return resp
开发者ID:dwighthubbard,项目名称:micropython-lib,代码行数:56,代码来源:urequests.py
示例17: __init__
def __init__(self, ip=config.UDP_IP, port=config.UDP_PORT, num_leds=240):
# Init and clear LED Strip
self.ledstrip = driver.LedStrip(num_leds)
self.ledstrip.clear()
self.ledstrip.send()
# Init UDP-Socket listener
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.bind((ip, port))
开发者ID:titusz,项目名称:followlight,代码行数:10,代码来源:server.py
示例18: connectSocket
def connectSocket():
# connect to artnet port
sock = usocket.socket(usocket.AF_INET, usocket.SOCK_DGRAM)
# sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# addr = socket.getaddrinfo(UDP_IP, UDP_PORT, socket.AF_INET, socket.SOCK_DGRAM)[0][4]
# print(addr)
sock.bind((UDP_IP, UDP_PORT))
# sock.settimeout(1)
sock.setblocking(0)
return sock
开发者ID:bennigraf,项目名称:wipy-artnet-to-ws2812,代码行数:10,代码来源:main.py
示例19: main
def main():
if len(sys.argv) != 3:
help(1)
if ":" in sys.argv[1] and ":" in sys.argv[2]:
error("Operations on 2 remote files are not supported")
if ":" not in sys.argv[1] and ":" not in sys.argv[2]:
error("One remote file is required")
if ":" in sys.argv[1]:
op = "get"
host, port, src_file = parse_remote(sys.argv[1])
dst_file = sys.argv[2]
if os.path.isdir(dst_file):
basename = src_file.rsplit("/", 1)[-1]
dst_file += "/" + basename
else:
op = "put"
host, port, dst_file = parse_remote(sys.argv[2])
src_file = sys.argv[1]
if dst_file[-1] == "/":
basename = src_file.rsplit("/", 1)[-1]
dst_file += basename
if 1:
print(op, host, port)
print(src_file, "->", dst_file)
s = socket.socket()
ai = socket.getaddrinfo(host, port)
addr = ai[0][4]
s.connect(addr)
#s = s.makefile("rwb")
websocket_helper.client_handshake(s)
ws = websocket(s)
import getpass
passwd = getpass.getpass()
login(ws, passwd)
print("Remote WebREPL version:", get_ver(ws))
# Set websocket to send data marked as "binary"
ws.ioctl(9, 2)
if op == "get":
get_file(ws, dst_file, src_file)
elif op == "put":
put_file(ws, src_file, dst_file)
s.close()
开发者ID:rguillon,项目名称:webrepl,代码行数:54,代码来源:webrepl_cli.py
示例20: time
def time():
NTP_QUERY = bytearray(48)
NTP_QUERY[0] = 0x1b
addr = socket.getaddrinfo(host, 123)[0][-1]
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(1)
res = s.sendto(NTP_QUERY, addr)
msg = s.recv(48)
s.close()
val = struct.unpack("!I", msg[40:44])[0]
return val - NTP_DELTA
开发者ID:19emtuck,项目名称:micropython,代码行数:11,代码来源:ntptime.py
注:本文中的usocket.socket函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论