本文整理汇总了Python中ustruct.unpack函数的典型用法代码示例。如果您正苦于以下问题:Python unpack函数的具体用法?Python unpack怎么用?Python unpack使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了unpack函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: parse_bundle
def parse_bundle(bundle, strict=False):
"""Parse a binary OSC bundle.
Returns a generator which walks over all contained messages and bundles
recursively, depth-first. Each item yielded is a (timetag, message) tuple.
"""
if not bundle.startswith(b'#bundle\0'):
raise TypeError("Bundle must start with '#bundle\\0'.")
ofs = 16
timetag = to_time(*unpack('>II', bundle[8:ofs]))
while True:
if ofs >= len(bundle):
break
size = unpack('>I', bundle[ofs:ofs + 4])[0]
element = bundle[ofs + 4:ofs + 4 + size]
ofs += size + 4
if element.startswith('#bundle'):
for el in parse_bundle(element):
yield el
else:
yield timetag, parse_message(element, strict)
开发者ID:SpotlightKid,项目名称:micropython-osc,代码行数:26,代码来源:server.py
示例2: _handle_suback
def _handle_suback(self):
print("_handle_suback") #needed after _packet_handle
pack_format = "!H" + str(len(self._in_packet['packet'])-2) + 's'
(mid, packet) = struct.unpack(pack_format, self._in_packet['packet'])
pack_format = "!" + "B"*len(packet)
granted_qos = struct.unpack(pack_format, packet)
if self.on_subscribe:
self.on_subscribe(self, self._userdata, mid, granted_qos)
return MQTT_ERR_SUCCESS
开发者ID:slzatz,项目名称:miscellaneous,代码行数:11,代码来源:umqtt3.py
示例3: parse_message
def parse_message(msg, strict=False):
args = []
addr, ofs = split_oscstr(msg, 0)
if not addr.startswith('/'):
raise ValueError("OSC address pattern must start with a slash.")
# type tag string must start with comma (ASCII 44)
if ofs < len(msg) and msg[ofs] == 44:
tags, ofs = split_oscstr(msg, ofs)
tags = tags[1:]
else:
msg = "Missing/invalid OSC type tag string."
if strict:
raise ValueError(msg)
else:
log.warning(msg + ' Ignoring arguments.')
tags = ''
for typetag in tags:
size = 0
if typetag in 'ifd':
size = 8 if typetag == 'd' else 4
args.append(unpack('>' + typetag, msg[ofs:ofs + size])[0])
elif typetag in 'sS':
s, ofs = split_oscstr(msg, ofs)
args.append(s)
elif typetag == 'b':
s, ofs = split_oscblob(msg, ofs)
args.append(s)
elif typetag in 'rm':
size = 4
args.append(unpack('BBBB', msg[ofs:ofs + size]))
elif typetag == 'c':
size = 4
args.append(chr(unpack('>I', msg[ofs:ofs + size])[0]))
elif typetag == 'h':
size = 8
args.append(unpack('>q', msg[ofs:ofs + size])[0])
elif typetag == 't':
size = 8
args.append(parse_timetag(msg, ofs))
elif typetag in 'TFNI':
args.append({'T': True, 'F': False, 'I': Impulse}.get(typetag))
else:
raise ValueError("Type tag '%s' not supported." % typetag)
ofs += size
return (addr, tags, tuple(args))
开发者ID:SpotlightKid,项目名称:micropython-osc,代码行数:51,代码来源:server.py
示例4: main
def main():
global initrgbdata
global ARTNET_header
global sock
while True:
# initrgbdata = initrgbdata[1:] + initrgbdata[0:1]
# print(initrgbdata)
# time.sleep_ms(25)
chain.show(initrgbdata)
# print("waiting to receive message")
# sn = sock.getsockname()
try:
data = sock.recv(530)
# print("received data: ", len(data))
except:
# print("no data")
continue
# print "received bytes: ", len(data)
if data[0:7] != ARTNET_header:
# print("no artnet packet")
continue
# else:
# print("found artnet packet")
opcode = ustruct.unpack('H', data[8:10])[0]
# print(hex(opcode))
if opcode == 0x5000:
data_length = ustruct.unpack('>H', data[16:18])[0]
# print("data length: ", data_length)
rgbdata = []
ndx = 0
while ndx < data_length and ndx < CHAIN_LEN * 3:
rgb = [0, 0, 0]
i = 0
while ndx+i < data_length and i < 3:
rgb[i] = int(data[18+ndx+i])
# print(str(data[18+ndx+i]))
i += 1
rgbdata.append(tuple(rgb))
ndx += 3
# chain.show(rgbdata[0:CHAIN_LEN-1])
# print(rgbdata[0])
initrgbdata = rgbdata[0:CHAIN_LEN]
开发者ID:bennigraf,项目名称:wipy-artnet-to-ws2812,代码行数:48,代码来源:main.py
示例5: time
def time(ms_accuracy=False):
NTP_QUERY = bytearray(48)
NTP_QUERY[0] = 0x1b
addr = socket.getaddrinfo(host, 123)[0][-1]
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(1)
res = s.sendto(NTP_QUERY, addr)
msg = s.recv(48)
s.close()
sec = struct.unpack("!I", msg[40:44])[0] - NTP_DELTA
frac = struct.unpack("!I", msg[44:48])[0] * MILLIS_PER_SECOND >> 32
if ms_accuracy:
return sec, frac
return sec
开发者ID:puhitaku,项目名称:AutoChime,代码行数:16,代码来源:ntptime_kai.py
示例6: rd16
def rd16( self, addr ):
""" read a word (16 bits) @ addr """
# Status: Certified!
self.__start( addr )
_bytes = self.spi.read( 2 ) # read 2 bytes
self.__end()
return ustruct.unpack( '>H', _bytes )[0] # was initialy '<H' ?
开发者ID:mchobby,项目名称:pyboard-driver,代码行数:7,代码来源:gd.py
示例7: _handle_connack
def _handle_connack(self):
print("_handle_connack") #needed
if len(self._in_packet['packet']) != 2:
return MQTT_ERR_PROTOCOL
(flags, result) = struct.unpack("!BB", self._in_packet['packet'])
# Do now support downgrade to MQTT v3.1
if result == 0:
self._state = mqtt_cs_connected
print("_handle_connack: self._state =", self._state)
if self.on_connect:
flags_dict = dict()
flags_dict['session present'] = flags & 0x01
self.on_connect(self, self._userdata, flags_dict, result)
if result == 0:
rc = 0
return rc
elif result > 0 and result < 6:
return MQTT_ERR_CONN_REFUSED
else:
return MQTT_ERR_PROTOCOL
开发者ID:slzatz,项目名称:miscellaneous,代码行数:25,代码来源:umqtt3.py
示例8: _register_short
def _register_short(self, register, value=None, buf=bytearray(2)):
if value is None:
self.i2c.readfrom_mem_into(self.address, register, buf)
return ustruct.unpack("<h", buf)[0]
ustruct.pack_into("<h", buf, 0, value)
return self.i2c.writeto_mem(self.address, register, buf)
开发者ID:radiumray,项目名称:mdxly,代码行数:7,代码来源:mpu9250.py
示例9: altitude
def altitude(self):
"""Read the altitude as calculated based on the sensor pressure and
previously configured pressure at sea-level. This will return a
value in meters. Set the sea-level pressure by updating the
sealevel_pressure property first to get a more accurate altitude value.
"""
# First poll for a measurement to be finished.
self._poll_reg1(_MPL3115A2_CTRL_REG1_OST)
# Set control bits for pressure reading.
self._ctrl_reg1 |= 0b10000000 # Turn on bit 0, ALT.
self._write_u8(_MPL3115A2_CTRL_REG1, self._ctrl_reg1)
self._ctrl_reg1 |= 0b00000010 # Set OST to 1 to start measurement.
self._write_u8(_MPL3115A2_CTRL_REG1, self._ctrl_reg1)
# Poll status for PDR to be set.
while self._read_u8(_MPL3115A2_REGISTER_STATUS) & _MPL3115A2_REGISTER_STATUS_PDR == 0:
time.sleep(0.01)
# Read 3 bytes of altitude data into buffer.
# Yes even though this is the address of the pressure register it
# returns altitude when the ALT bit is set above.
self._read_into(_MPL3115A2_REGISTER_PRESSURE_MSB, self._BUFFER, count=3)
# Reconstruct signed 32-bit altitude value (actually 24 bits shifted up
# and then scaled down).
self._BUFFER[3] = 0 # Top 3 bytes of buffer were read from the chip.
altitude = struct.unpack('>i', self._BUFFER[0:4])[0]
# Scale down to meters.
return altitude / 65535.0
开发者ID:eiselekd,项目名称:hw,代码行数:26,代码来源:adafruit_mpl3115a2.py
示例10: minmax
def minmax():
global offset
global rate
reg = register
min_temp = 1000
max_temp = -1000
for i in range(0, 64):
val = ustruct.unpack('<h', i2c.readfrom_mem(i2c_address, reg, 2))[0]
tmp = getval(val)
if tmp < min_temp:
min_temp = tmp
if max_temp < tmp:
max_temp = tmp
reg += 2
diff = max_temp - min_temp
# add some margin
diff *= 1.4
rate = len(colors) / diff
offset = min_temp * 0.8
lcd.clear()
lcd.print('min: ' + '{:.2f}'.format(min_temp), 0, 0)
lcd.print('max: ' + '{:.2f}'.format(max_temp), 0, 10)
lcd.print('rate: ' + '{:.2f}'.format(rate), 0, 20)
lcd.print('offset: ' + '{:.2f}'.format(offset), 0, 30)
开发者ID:SWITCHSCIENCE,项目名称:samplecodes,代码行数:26,代码来源:thermography.py
示例11: getpwnam
def getpwnam(user):
passwd = getpwnam_(user)
if not passwd:
raise KeyError("getpwnam(): name not found: {}".format(user))
passwd_fmt = "SSIISSS"
passwd = uctypes.bytes_at(passwd, ustruct.calcsize(passwd_fmt))
passwd = ustruct.unpack(passwd_fmt, passwd)
return struct_passwd(*passwd)
开发者ID:SpotlightKid,项目名称:micropython-lib,代码行数:8,代码来源:pwd.py
示例12: read
def read(self):
# Return current mode measurement result in lx.
# needs to be on and mode set first
data = unpack('>H', self.bus.readfrom(self.addr, 2))[0]
count = data >> 8 | (data&0xff)<<8
mode2coeff = 2 if (self.mode & 0x03) == 0x01 else 1
ratio = 1/(1.2 * (self._sensitivity/69.0) * mode2coeff)
return ratio*count
开发者ID:PinkInk,项目名称:upylib,代码行数:8,代码来源:__init__.py
示例13: draw_char
def draw_char(self,ch,x,y,*args,**kwargs):
if x<-self._font_width or x>=self._width or y<-self._font_height or y>=self._height:
return
for char_x in range(self._font_width):
self._font.seek(2+(ord(ch)*self._font_width)+char_x)
line=ustruct.unpack('B',self._font.read(1))[0]
for char_y in range(self._font_height):
if(line>>char_y)&0x1:
self._pixel(x+char_x,y+char_y,*args,**kwargs)
开发者ID:radiumray,项目名称:mdxly,代码行数:9,代码来源:matrix.py
示例14: master
def master():
csn = Pin(cfg['csn'], mode=Pin.OUT, value=1)
ce = Pin(cfg['ce'], mode=Pin.OUT, value=0)
if cfg['spi'] == -1:
spi = SPI(-1, sck=Pin(cfg['sck']), mosi=Pin(cfg['mosi']), miso=Pin(cfg['miso']))
nrf = NRF24L01(spi, csn, ce, payload_size=8)
else:
nrf = NRF24L01(SPI(cfg['spi']), csn, ce, payload_size=8)
nrf.open_tx_pipe(pipes[0])
nrf.open_rx_pipe(1, pipes[1])
nrf.start_listening()
num_needed = 16
num_successes = 0
num_failures = 0
led_state = 0
print('NRF24L01 master mode, sending %d packets...' % num_needed)
while num_successes < num_needed and num_failures < num_needed:
# stop listening and send packet
nrf.stop_listening()
millis = utime.ticks_ms()
led_state = max(1, (led_state << 1) & 0x0f)
print('sending:', millis, led_state)
try:
nrf.send(struct.pack('ii', millis, led_state))
except OSError:
pass
# start listening again
nrf.start_listening()
# wait for response, with 250ms timeout
start_time = utime.ticks_ms()
timeout = False
while not nrf.any() and not timeout:
if utime.ticks_diff(utime.ticks_ms(), start_time) > 250:
timeout = True
if timeout:
print('failed, response timed out')
num_failures += 1
else:
# recv packet
got_millis, = struct.unpack('i', nrf.recv())
# print response and round-trip delay
print('got response:', got_millis, '(delay', utime.ticks_diff(utime.ticks_ms(), got_millis), 'ms)')
num_successes += 1
# delay then loop
utime.sleep_ms(250)
print('master finished sending; successes=%d, failures=%d' % (num_successes, num_failures))
开发者ID:DanielO,项目名称:micropython,代码行数:57,代码来源:nrf24l01test.py
示例15: _handle_publish
def _handle_publish(self):
rc = 0
print("_handle_publish") #needed after packet_handle
header = self._in_packet['command']
#!H = ! means Network Byte Order, Size, and Alignment with H means unsigned short 2 bytes
# For the 's' format character, the count is interpreted as the length of the bytes, not a repeat count like for the other format characters; for example, '10s' means a single 10-byte string
# return mtPacket(0b00110001, mtStr(topic), data)
pack_format = "!H" + str(len(self._in_packet['packet'])-2) + 's'
(slen, packet) = struct.unpack(pack_format, self._in_packet['packet'])
pack_format = '!' + str(slen) + 's' + str(len(packet)-slen) + 's'
topic, packet = struct.unpack(pack_format, packet)
# message.topic = b'test'
msg = {}
msg['topic'] = topic.decode('utf-8')
msg['payload'] = packet
msg['timestamp'] = time.time()
self.on_message(self, self._userdata, msg)
return 0
开发者ID:slzatz,项目名称:miscellaneous,代码行数:19,代码来源:umqtt3.py
示例16: __init__
def __init__(self,
mode=BME280_OSAMPLE_1,
address=BME280_I2CADDR,
i2c=None,
**kwargs):
# Check that mode is valid.
if mode not in [BME280_OSAMPLE_1, BME280_OSAMPLE_2, BME280_OSAMPLE_4,
BME280_OSAMPLE_8, BME280_OSAMPLE_16]:
raise ValueError(
'Unexpected mode value {0}. Set mode to one of '
'BME280_ULTRALOWPOWER, BME280_STANDARD, BME280_HIGHRES, or '
'BME280_ULTRAHIGHRES'.format(mode))
self._mode = mode
self.address = address
if i2c is None:
raise ValueError('An I2C object is required.')
self.i2c = i2c
# load calibration data
dig_88_a1 = self.i2c.readfrom_mem(self.address, 0x88, 26)
dig_e1_e7 = self.i2c.readfrom_mem(self.address, 0xE1, 7)
self.dig_T1, self.dig_T2, self.dig_T3, self.dig_P1, \
self.dig_P2, self.dig_P3, self.dig_P4, self.dig_P5, \
self.dig_P6, self.dig_P7, self.dig_P8, self.dig_P9, \
_, self.dig_H1 = unpack("<HhhHhhhhhhhhBB", dig_88_a1)
self.dig_H2, self.dig_H3 = unpack("<hB", dig_e1_e7)
e4_sign = unpack_from("<b", dig_e1_e7, 3)[0]
self.dig_H4 = (e4_sign << 4) | (dig_e1_e7[4] & 0xF)
e6_sign = unpack_from("<b", dig_e1_e7, 5)[0]
self.dig_H5 = (e6_sign << 4) | (dig_e1_e7[4] >> 4)
self.dig_H6 = unpack_from("<b", dig_e1_e7, 6)[0]
self.i2c.writeto_mem(self.address, BME280_REGISTER_CONTROL,
bytearray([0x3F]))
self.t_fine = 0
# temporary data holders which stay allocated
self._l1_barray = bytearray(1)
self._l8_barray = bytearray(8)
self._l3_resultarray = array("i", [0, 0, 0])
开发者ID:eztmondom,项目名称:test1,代码行数:43,代码来源:bme280.py
示例17: __init__
def __init__(self,
mode=BME280_OSAMPLE_8,
address=BME280_I2CADDR,
i2c=None,
**kwargs):
# Check that mode is valid.
if mode not in [BME280_OSAMPLE_1, BME280_OSAMPLE_2, BME280_OSAMPLE_4,
BME280_OSAMPLE_8, BME280_OSAMPLE_16]:
raise ValueError(
'Unexpected mode value {0}. Set mode to one of '
'BME280_ULTRALOWPOWER, BME280_STANDARD, BME280_HIGHRES, or '
'BME280_ULTRAHIGHRES'.format(mode))
self._mode = mode
self.address = address
if i2c is None:
raise ValueError('An I2C object is required.')
self.i2c = i2c
self.__sealevel = 101325
# load calibration data
dig_88_a1 = self.i2c.readfrom_mem(self.address, 0x88, 26)
dig_e1_e7 = self.i2c.readfrom_mem(self.address, 0xE1, 7)
self.dig_T1, self.dig_T2, self.dig_T3, self.dig_P1, \
self.dig_P2, self.dig_P3, self.dig_P4, self.dig_P5, \
self.dig_P6, self.dig_P7, self.dig_P8, self.dig_P9, \
_, self.dig_H1 = unpack("<HhhHhhhhhhhhBB", dig_88_a1)
self.dig_H2, self.dig_H3, self.dig_H4,\
self.dig_H5, self.dig_H6 = unpack("<hBbhb", dig_e1_e7)
# unfold H4, H5, keeping care of a potential sign
self.dig_H4 = (self.dig_H4 * 16) + (self.dig_H5 & 0xF)
self.dig_H5 //= 16
self.i2c.writeto_mem(self.address, BME280_REGISTER_CONTROL,
bytearray([0x3F]))
self.t_fine = 0
# temporary data holders which stay allocated
self._l1_barray = bytearray(1)
self._l8_barray = bytearray(8)
self._l3_resultarray = array("i", [0, 0, 0])
开发者ID:Hiverize,项目名称:Sensorbeuten,代码行数:42,代码来源:bme280.py
示例18: time
def time():
NTP_QUERY = bytearray(48)
NTP_QUERY[0] = 0x1b
addr = socket.getaddrinfo(host, 123)[0][-1]
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(1)
res = s.sendto(NTP_QUERY, addr)
msg = s.recv(48)
s.close()
val = struct.unpack("!I", msg[40:44])[0]
return val - NTP_DELTA
开发者ID:19emtuck,项目名称:micropython,代码行数:11,代码来源:ntptime.py
示例19: ntp
def ntp(): # noqa
ntp_query = bytearray(48)
ntp_query[0] = 0x1b
addr = socket.getaddrinfo('pool.ntp.org', 123)[0][-1]
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(1)
s.sendto(ntp_query, addr)
msg = s.recv(48)
s.close()
val = struct.unpack("!I", msg[40:44])[0]
return val - NTP_DELTA
开发者ID:chassing,项目名称:pyq2,代码行数:11,代码来源:untplib.py
示例20: decode_payload
def decode_payload(payload):
"""
Decode message payload
:param payload: byte stream representing the message payload
:return: an array of 8 float [stab_Kp,stab_Kd,stab_Ki,Max Increment,gyro_Kp,gyro_Kd,gyro_Ki,gyro_Max Increment]
"""
pid_settings = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
for i in range(0, 8):
pid_settings[i] = ustruct.unpack('>f', payload[i*4:i*4 + 4])[0]
return pid_settings
开发者ID:Sokrates80,项目名称:air-py,代码行数:12,代码来源:ap_save_pid_settings.py
注:本文中的ustruct.unpack函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论