本文整理汇总了Python中usocket.getaddrinfo函数的典型用法代码示例。如果您正苦于以下问题:Python getaddrinfo函数的具体用法?Python getaddrinfo怎么用?Python getaddrinfo使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了getaddrinfo函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: __init__
def __init__(self, server, port=61440):
self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.addr = socket.getaddrinfo(server, port)[0][4]
self.addr2 = socket.getaddrinfo('0.0.0.0', port)[0][4]
self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.s.bind(self.addr2)
# self.s.settimeout(3)
self.s.connect(self.addr)
log("open local port:" + str(port))
log("DEBUG MODE:"+ str(DEBUG))
self.server = server
self.port = port
开发者ID:drcoms,项目名称:client-micropy,代码行数:12,代码来源:latest-pppoe.py
示例2: main
def main(use_stream=True):
s = _socket.socket()
ai = _socket.getaddrinfo("google.com", 443)
print("Address infos:", ai)
addr = ai[0][-1]
print("Connect address:", addr)
s.connect(addr)
s = ssl.wrap_socket(s)
print(s)
if use_stream:
# Both CPython and MicroPython SSLSocket objects support read() and
# write() methods.
s.write(b"GET / HTTP/1.0\n\n")
print(s.read(4096))
else:
# MicroPython SSLSocket objects implement only stream interface, not
# socket interface
s.send(b"GET / HTTP/1.0\n\n")
print(s.recv(4096))
s.close()
开发者ID:ESPWarren,项目名称:micropython,代码行数:25,代码来源:http_client_ssl.py
示例3: download
def download(url, local_name):
proto, _, host, urlpath = url.split('/', 3)
ai = usocket.getaddrinfo(host, 443)
#print("Address infos:", ai)
addr = ai[0][4]
s = usocket.socket()
#print("Connect address:", addr)
s.connect(addr)
if proto == "https:":
s = ussl.wrap_socket(s)
# MicroPython rawsocket module supports file interface directly
s.write("GET /%s HTTP/1.0\r\nHost: %s\r\n\r\n" % (urlpath, host))
l = s.readline()
protover, status, msg = l.split(None, 2)
if status != b"200":
raise OSError()
while 1:
l = s.readline()
if not l:
raise OSError()
if l == b'\r\n':
break
with open(local_name, "wb") as f:
while 1:
l = s.read(1024)
if not l:
break
f.write(l)
开发者ID:edyza,项目名称:micropython-lib,代码行数:31,代码来源:upip.py
示例4: urlopen
def urlopen(url, data=None):
if data:
raise NotImplementedError("POST is not yet supported")
try:
proto, dummy, host, path = url.split("/", 3)
except ValueError:
proto, dummy, host = url.split("/", 2)
path = ""
if proto != "http:":
raise ValueError("Unsupported protocol: " + proto)
ai = usocket.getaddrinfo(host, 80)
addr = ai[0][4]
s = usocket.socket()
s.connect(addr)
s.send(b"GET /%s HTTP/1.0\r\nHost: %s\r\n\r\n" % (path, host))
l = s.readline()
protover, status, msg = l.split(None, 2)
status = int(status)
#print(protover, status, msg)
while True:
l = s.readline()
if not l or l == b"\r\n":
break
#print(line)
if l.startswith(b"Transfer-Encoding:"):
if b"chunked" in line:
raise ValueError("Unsupported " + l)
elif l.startswith(b"Location:"):
raise NotImplementedError("Redirects not yet supported")
return s
开发者ID:balloob,项目名称:micropython-lib,代码行数:33,代码来源:urequest.py
示例5: main
def main(use_stream=False):
s = socket.socket()
# Binding to all interfaces - server will be accessible to other hosts!
ai = socket.getaddrinfo("0.0.0.0", 8080)
print("Bind address info:", ai)
addr = ai[0][-1]
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(addr)
s.listen(5)
print("Listening, connect your browser to http://<this_host>:8080/")
counter = 0
while True:
res = s.accept()
client_s = res[0]
client_addr = res[1]
print("Client address:", client_addr)
print("Client socket:", client_s)
print("Request:")
if use_stream:
# MicroPython socket objects support stream (aka file) interface
# directly.
print(client_s.read(4096))
client_s.write(CONTENT % counter)
else:
print(client_s.recv(4096))
client_s.send(CONTENT % counter)
client_s.close()
counter += 1
print()
开发者ID:DiegoAltamirano,项目名称:micropython,代码行数:32,代码来源:http_server.py
示例6: open_connection
def open_connection(host, port, ssl=False):
if DEBUG and __debug__:
log.debug("open_connection(%s, %s)", host, port)
ai = _socket.getaddrinfo(host, port, 0, _socket.SOCK_STREAM)
ai = ai[0]
s = _socket.socket(ai[0], ai[1], ai[2])
s.setblocking(False)
try:
s.connect(ai[-1])
except OSError as e:
if e.args[0] != uerrno.EINPROGRESS:
raise
if DEBUG and __debug__:
log.debug("open_connection: After connect")
yield IOWrite(s)
# if __debug__:
# assert s2.fileno() == s.fileno()
if DEBUG and __debug__:
log.debug("open_connection: After iowait: %s", s)
if ssl:
print("Warning: uasyncio SSL support is alpha")
import ussl
s.setblocking(True)
s2 = ussl.wrap_socket(s)
s.setblocking(False)
return StreamReader(s, s2), StreamWriter(s2, {})
return StreamReader(s), StreamWriter(s, {})
开发者ID:SpotlightKid,项目名称:micropython-lib,代码行数:27,代码来源:__init__.py
示例7: request
def request(method, url, json=None, timeout=None, headers=None):
urlparts = url.split('/', 3)
proto = urlparts[0]
host = urlparts[2]
urlpath = '' if len(urlparts) < 4 else urlparts[3]
if proto == 'http:':
port = 80
elif proto == 'https:':
port = 443
else:
raise OSError('Unsupported protocol: %s' % proto[:-1])
if ':' in host:
host, port = host.split(':')
port = int(port)
if json is not None:
content = ujson.dumps(json)
content_type = CONTENT_TYPE_JSON
else:
content = None
ai = usocket.getaddrinfo(host, port)
addr = ai[0][4]
sock = usocket.socket()
if timeout is not None:
assert SUPPORT_TIMEOUT, 'Socket does not support timeout'
sock.settimeout(timeout)
sock.connect(addr)
if proto == 'https:':
assert SUPPORT_SSL, 'HTTPS not supported: could not find ussl'
sock = ussl.wrap_socket(sock)
sock.write('%s /%s HTTP/1.0\r\nHost: %s\r\n' % (method, urlpath, host))
if headers is not None:
for header in headers.items():
sock.write('%s: %s\r\n' % header)
if content is not None:
sock.write('content-length: %s\r\n' % len(content))
sock.write('content-type: %s\r\n' % content_type)
sock.write('\r\n')
sock.write(content)
else:
sock.write('\r\n')
l = sock.readline()
protover, status, msg = l.split(None, 2)
# Skip headers
while sock.readline() != b'\r\n':
pass
return Response(int(status), sock)
开发者ID:SB-Technology-Holdings-International,项目名称:Water,代码行数:60,代码来源:http_client.py
示例8: __init__
def __init__(self, host, port=HTTP_PORT):
self.host = host
self.port = int(port)
addr_info = socket.getaddrinfo(host, port)
self.addr = addr_info[0][-1]
self.connected = False
self.socket = socket.socket()
开发者ID:fadushin,项目名称:esp8266,代码行数:7,代码来源:client.py
示例9: url_open
def url_open(url):
global warn_ussl
proto, _, host, urlpath = url.split('/', 3)
ai = usocket.getaddrinfo(host, 443)
#print("Address infos:", ai)
addr = ai[0][4]
s = usocket.socket(ai[0][0])
#print("Connect address:", addr)
s.connect(addr)
if proto == "https:":
s = ussl.wrap_socket(s)
if warn_ussl:
print("Warning: %s SSL certificate is not validated" % host)
warn_ussl = False
# MicroPython rawsocket module supports file interface directly
s.write("GET /%s HTTP/1.0\r\nHost: %s\r\n\r\n" % (urlpath, host))
l = s.readline()
protover, status, msg = l.split(None, 2)
if status != b"200":
if status == b"404":
print("Package not found")
raise ValueError(status)
while 1:
l = s.readline()
if not l:
raise ValueError("Unexpected EOF")
if l == b'\r\n':
break
return s
开发者ID:bynds,项目名称:micropython-lib,代码行数:33,代码来源:upip.py
示例10: urlopen
def urlopen(url, data=None, method="GET"):
if data is not None and method == "GET":
method = "POST"
try:
proto, dummy, host, path = url.split("/", 3)
except ValueError:
proto, dummy, host = url.split("/", 2)
path = ""
if proto == "http:":
port = 80
elif proto == "https:":
import ussl
port = 443
else:
raise ValueError("Unsupported protocol: " + proto)
if ":" in host:
host, port = host.split(":", 1)
port = int(port)
ai = usocket.getaddrinfo(host, port)
addr = ai[0][4]
s = usocket.socket()
s.connect(addr)
if proto == "https:":
s = ussl.wrap_socket(s, server_hostname=host)
s.write(method)
s.write(b" /")
s.write(path)
s.write(b" HTTP/1.0\r\nHost: ")
s.write(host)
s.write(b"\r\n")
if data:
s.write(b"Content-Length: ")
s.write(str(len(data)))
s.write(b"\r\n")
s.write(b"\r\n")
if data:
s.write(data)
l = s.readline()
protover, status, msg = l.split(None, 2)
status = int(status)
#print(protover, status, msg)
while True:
l = s.readline()
if not l or l == b"\r\n":
break
#print(l)
if l.startswith(b"Transfer-Encoding:"):
if b"chunked" in l:
raise ValueError("Unsupported " + l)
elif l.startswith(b"Location:"):
raise NotImplementedError("Redirects not yet supported")
return s
开发者ID:puuu,项目名称:micropython-lib,代码行数:58,代码来源:urequest.py
示例11: open_http_socket
def open_http_socket(method, url, json=None, timeout=None, headers=None, urlencoded = None):
urlparts = url.split('/', 3)
proto = urlparts[0]
host = urlparts[2]
urlpath = '' if len(urlparts) < 4 else urlparts[3]
if proto == 'http:':
port = 80
elif proto == 'https:':
port = 443
else:
raise OSError('Unsupported protocol: %s' % proto[:-1])
if ':' in host:
host, port = host.split(':')
port = int(port)
if json is not None:
content = ujson.dumps(json)
content_type = CONTENT_TYPE_JSON
elif urlencoded is not None:
content = urlencoded
content_type = "application/x-www-form-urlencoded"
else:
content = None
# ToDo: Handle IPv6 addresses
if is_ipv4_address(host):
addr = (host, port)
else:
ai = usocket.getaddrinfo(host, port)
addr = ai[0][4]
sock = None
if proto == 'https:':
sock = usocket.socket(usocket.AF_INET, usocket.SOCK_STREAM, usocket.SEC_SOCKET)
else:
sock = usocket.socket()
sock.connect(addr)
if proto == 'https:':
sock.settimeout(0) # Actually make timeouts working properly with ssl
sock.send('%s /%s HTTP/1.0\r\nHost: %s\r\n' % (method, urlpath, host))
if headers is not None:
for header in headers.items():
sock.send('%s: %s\r\n' % header)
if content is not None:
sock.send('content-length: %s\r\n' % len(content))
sock.send('content-type: %s\r\n' % content_type)
sock.send('\r\n')
sock.send(content)
else:
sock.send('\r\n')
return sock
开发者ID:farragar,项目名称:Mk3-Firmware,代码行数:58,代码来源:http_client.py
示例12: __init__
def __init__(self, client_id, server, port=0, ssl=False):
if port == 0:
port = 8883 if ssl else 1883
self.client_id = client_id
self.sock = None
self.addr = socket.getaddrinfo(server, port)[0][-1]
self.ssl = ssl
self.pid = 0
self.cb = None
开发者ID:ngraziano,项目名称:micropython-lib,代码行数:9,代码来源:simple.py
示例13: request
def request(method, url, data=None, json=None, headers={}, stream=None):
try:
proto, dummy, host, path = url.split("/", 3)
except ValueError:
proto, dummy, host = url.split("/", 2)
path = ""
if proto == "http:":
port = 80
elif proto == "https:":
import ussl
port = 443
else:
raise ValueError("Unsupported protocol: " + proto)
if ":" in host:
host, port = host.split(":", 1)
port = int(port)
ai = usocket.getaddrinfo(host, port)
addr = ai[0][4]
s = usocket.socket()
s.connect(addr)
if proto == "https:":
s = ussl.wrap_socket(s)
s.write(b"%s /%s HTTP/1.0\r\n" % (method, path))
if not "Host" in headers:
s.write(b"Host: %s\r\n" % host)
if json is not None:
assert data is None
import ujson
data = ujson.dumps(json)
if data:
s.write(b"Content-Length: %d\r\n" % len(data))
s.write(b"\r\n")
if data:
s.write(data)
l = s.readline()
protover, status, msg = l.split(None, 2)
status = int(status)
#print(protover, status, msg)
while True:
l = s.readline()
if not l or l == b"\r\n":
break
#print(line)
if l.startswith(b"Transfer-Encoding:"):
if b"chunked" in line:
raise ValueError("Unsupported " + l)
elif l.startswith(b"Location:"):
raise NotImplementedError("Redirects not yet supported")
resp = Response(s)
resp.status_code = status
resp.reason = msg.rstrip()
return resp
开发者ID:dwighthubbard,项目名称:micropython-lib,代码行数:56,代码来源:urequests.py
示例14: start
def start(self):
"""
Starts the LoRaWAN nano gateway.
"""
self._log('Starting LoRaWAN nano gateway with id: {}', self.id)
# setup WiFi as a station and connect
self.wlan = WLAN(mode=WLAN.STA)
self._connect_to_wifi()
# get a time sync
self._log('Syncing time with {} ...', self.ntp_server)
self.rtc.ntp_sync(self.ntp_server, update_period=self.ntp_period)
while not self.rtc.synced():
utime.sleep_ms(50)
self._log("RTC NTP sync complete")
# get the server IP and create an UDP socket
self.server_ip = usocket.getaddrinfo(self.server, self.port)[0][-1]
self._log('Opening UDP socket to {} ({}) port {}...', self.server, self.server_ip[0], self.server_ip[1])
self.sock = usocket.socket(usocket.AF_INET, usocket.SOCK_DGRAM, usocket.IPPROTO_UDP)
self.sock.setsockopt(usocket.SOL_SOCKET, usocket.SO_REUSEADDR, 1)
self.sock.setblocking(False)
# push the first time immediatelly
self._push_data(self._make_stat_packet())
# create the alarms
self.stat_alarm = Timer.Alarm(handler=lambda t: self._push_data(self._make_stat_packet()), s=60, periodic=True)
self.pull_alarm = Timer.Alarm(handler=lambda u: self._pull_data(), s=25, periodic=True)
# start the UDP receive thread
self.udp_stop = False
_thread.start_new_thread(self._udp_thread, ())
# initialize the LoRa radio in LORA mode
self._log('Setting up the LoRa radio at {:.1f} Mhz using {}', self._freq_to_float(self.frequency), self.datarate)
self.lora = LoRa(
mode=LoRa.LORA,
frequency=self.frequency,
bandwidth=self.bw,
sf=self.sf,
preamble=8,
coding_rate=LoRa.CODING_4_5,
tx_iq=True
)
# create a raw LoRa socket
self.lora_sock = usocket.socket(usocket.AF_LORA, usocket.SOCK_RAW)
self.lora_sock.setblocking(False)
self.lora_tx_done = False
self.lora.callback(trigger=(LoRa.RX_PACKET_EVENT | LoRa.TX_PACKET_EVENT), handler=self._lora_cb)
self._log('LoRaWAN nano gateway online')
开发者ID:H-LK,项目名称:pycom-libraries,代码行数:55,代码来源:nanogateway.py
示例15: ntp
def ntp(): # noqa
ntp_query = bytearray(48)
ntp_query[0] = 0x1b
addr = socket.getaddrinfo('pool.ntp.org', 123)[0][-1]
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(1)
s.sendto(ntp_query, addr)
msg = s.recv(48)
s.close()
val = struct.unpack("!I", msg[40:44])[0]
return val - NTP_DELTA
开发者ID:chassing,项目名称:pyq2,代码行数:11,代码来源:untplib.py
示例16: time
def time():
NTP_QUERY = bytearray(48)
NTP_QUERY[0] = 0x1b
addr = socket.getaddrinfo(host, 123)[0][-1]
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(1)
res = s.sendto(NTP_QUERY, addr)
msg = s.recv(48)
s.close()
val = struct.unpack("!I", msg[40:44])[0]
return val - NTP_DELTA
开发者ID:19emtuck,项目名称:micropython,代码行数:11,代码来源:ntptime.py
示例17: create_connection
def create_connection(addr, timeout=None, source_address=None):
s = socket()
#print("Address:", addr)
ais = getaddrinfo(addr[0], addr[1])
#print("Address infos:", ais)
for ai in ais:
try:
s.connect(ai[4])
return s
except:
pass
开发者ID:GrandHsu,项目名称:micropython-lib,代码行数:11,代码来源:socket.py
示例18: main
def main():
if len(sys.argv) != 3:
help(1)
if ":" in sys.argv[1] and ":" in sys.argv[2]:
error("Operations on 2 remote files are not supported")
if ":" not in sys.argv[1] and ":" not in sys.argv[2]:
error("One remote file is required")
if ":" in sys.argv[1]:
op = "get"
host, port, src_file = parse_remote(sys.argv[1])
dst_file = sys.argv[2]
if os.path.isdir(dst_file):
basename = src_file.rsplit("/", 1)[-1]
dst_file += "/" + basename
else:
op = "put"
host, port, dst_file = parse_remote(sys.argv[2])
src_file = sys.argv[1]
if dst_file[-1] == "/":
basename = src_file.rsplit("/", 1)[-1]
dst_file += basename
if 1:
print(op, host, port)
print(src_file, "->", dst_file)
s = socket.socket()
ai = socket.getaddrinfo(host, port)
addr = ai[0][4]
s.connect(addr)
#s = s.makefile("rwb")
websocket_helper.client_handshake(s)
ws = websocket(s)
import getpass
passwd = getpass.getpass()
login(ws, passwd)
print("Remote WebREPL version:", get_ver(ws))
# Set websocket to send data marked as "binary"
ws.ioctl(9, 2)
if op == "get":
get_file(ws, dst_file, src_file)
elif op == "put":
put_file(ws, src_file, dst_file)
s.close()
开发者ID:rguillon,项目名称:webrepl,代码行数:54,代码来源:webrepl_cli.py
示例19: scan
def scan():
sock = usocket.socket(usocket.AF_INET, usocket.SOCK_DGRAM)
try:
if hasattr(sock, 'settimeout'):
sock.settimeout(TIMEOUT)
addrs = usocket.getaddrinfo(MCAST_IP, MCAST_PORT)
sock.sendto(QUERY, addrs[0][4])
data, addr = sock.recvfrom(1024)
return ujson.loads(data.decode('utf-8'))
finally:
sock.close()
开发者ID:home-assistant,项目名称:micropython-home-assistant,代码行数:12,代码来源:discovery.py
示例20: http_post
def http_post(url):
_, _, host, path = url.split('/', 3)
addr = usocket.getaddrinfo(host, 80)[0][-1]
s = usocket.socket()
s.connect(addr)
s.send(bytes('POST /%s HTTP/1.0\r\nHost: %s\r\n\r\n' % (path, host), 'utf8'))
while True:
data = s.recv(100)
if data:
print(str(data, 'utf8'), end='')
else:
break
开发者ID:thekrunchyfrog,项目名称:IOTButton,代码行数:12,代码来源:main.py
注:本文中的usocket.getaddrinfo函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论