本文整理汇总了Python中ustruct.pack函数的典型用法代码示例。如果您正苦于以下问题:Python pack函数的具体用法?Python pack怎么用?Python pack使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了pack函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _send_connect
def _send_connect(self, keepalive):
print("_send_connect")
remaining_length = 2+6+1+1+2+2+len(self._client_id)
#remaining_length = 2+len(protocol) + 1+1+2 + 2+len(self._client_id)
connect_flags = 0
connect_flags = connect_flags | 0x02
if self._username:
remaining_length = remaining_length + 2+len(self._username)
connect_flags = connect_flags | 0x80
if self._password:
connect_flags = connect_flags | 0x40
remaining_length = remaining_length + 2+len(self._password)
command = CONNECT
packet = bytearray()
packet.extend(struct.pack("!B", command))
self._pack_remaining_length(packet, remaining_length)
#packet.extend(struct.pack("!H"+str(len(protocol))+"sBBH", len(protocol), protocol, proto_ver, connect_flags, keepalive))
packet.extend(struct.pack("!H"+"6"+"sBBH", 6, b"MQIsdp", 3, connect_flags, keepalive))
self._pack_str16(packet, self._client_id)
if self._username:
self._pack_str16(packet, self._username)
if self._password:
self._pack_str16(packet, self._password)
self._keepalive = keepalive
return self._packet_queue(command, packet, 0, 0)
开发者ID:slzatz,项目名称:miscellaneous,代码行数:33,代码来源:umqtt3.py
示例2: _send_subscribe
def _send_subscribe(self, dup, topics):
print("_send_subscribe")
remaining_length = 2
#topic_qos_list = [(topic.encode('utf-8'), qos)] - > [('test', 0)]
#remaining_length = 2 + 2+ len('test') +1 --> 9
for t in topics:
remaining_length = remaining_length + 2+len(t[0])+1
#SUBSCRIBE = const(0x80) - > 0b10000000
# According to spec the fixed header is 0b10000010
# According to spec the second byte is the remaining length = 2 bytes of varialbe header + payload
# If QoS is zero the variable header is 0,0 or if > 1 would be some unique identifier
# dup = 0 (I think)
# 1<<1 -> 0b00000010
command = SUBSCRIBE | (dup<<3) | (1<<1)
packet = bytearray()
packet.extend(struct.pack("!B", command))
self._pack_remaining_length(packet, remaining_length) #operates on packet and extends it
local_mid = self._mid_generate()
packet.extend(struct.pack("!H", local_mid))
for t in topics:
self._pack_str16(packet, t[0])
packet.extend(struct.pack("B", t[1]))
return (self._packet_queue(command, packet, local_mid, 1), local_mid)
开发者ID:slzatz,项目名称:miscellaneous,代码行数:25,代码来源:umqtt3.py
示例3: _pack_str16
def _pack_str16(self, packet, data):
print("_pack_str16")
if isinstance(data, bytearray) or isinstance(data, bytes):
packet.extend(struct.pack("!H", len(data)))
packet.extend(data)
elif isinstance(data, str):
udata = data.encode('utf-8')
pack_format = "!H" + str(len(udata)) + "s"
packet.extend(struct.pack(pack_format, len(udata), udata))
else:
raise TypeError
开发者ID:slzatz,项目名称:miscellaneous,代码行数:11,代码来源:umqtt3.py
示例4: __init__
def __init__(self, conn, w, h, name):
self.conn = conn[0]
self.w = w
self.h = h
self.bpp = 32
self.depth = 24
self.big = True
self.true = True
self.masks = (255, 255, 255)
self.shifts = (16, 8, 0)
# HandShake
self.send(b"RFB 003.003\n")
if self.recv(True) != b"RFB 003.003\n":
raise Exception("RFB rejected version proposal")
# Security
self.send(b"\x00\x00\x00\x01")
# ignore instruction to disconnect other clients
if self.recv(True)[0] not in (0, 1):
print("2b")
raise Exception("RFB rejected security none")
# ServerInit
self.send(
pack(
">2H4B3H3B",
w,
h,
self.bpp,
self.depth,
self.big,
self.true,
self.masks[0],
self.masks[0],
self.masks[0],
self.shifts[0],
self.shifts[1],
self.shifts[2],
)
+ bytes(3)
+ pack(">L", len(name))
+ name
)
# we *may* be sent encodings (ignorred) and pixel format
self.service_msg_queue()
开发者ID:PinkInk,项目名称:upylib,代码行数:48,代码来源:session.py
示例5: SetWriteWindow
def SetWriteWindow(self, PageStart, PageStop, ColStart, ColStop):
if (PageStart < 0) or (PageStop > 7) or (PageStart > PageStop):
return(-1)
if (ColStart < 0) or (ColStop > 127) or (ColStart > ColStop):
return(-1)
self.SendCommand(pack("6B", 0x21, ColStart, ColStop, 0x22, PageStart, PageStop))
return(0)
开发者ID:Giako68,项目名称:NodeMCU-OLED,代码行数:7,代码来源:SSD1306.py
示例6: __start
def __start( self, addr ):
""" Start a SPI transaction @ addr """
# Start new transaction
self.ssel.value( 1 )
#sleep_ms( 1 ) # 1 Ms between SPI transaction seems right
sleep_us( 10 )
self.ssel.value( 0 )
self.spi.write( ustruct.pack( '>H', addr) ) # Convert address in MSB and LSB
开发者ID:mchobby,项目名称:pyboard-driver,代码行数:8,代码来源:gd.py
示例7: gmtime
def gmtime(t=None):
if t is None:
t = time()
t = int(t)
a = ustruct.pack('i', t)
tm_p = gmtime_(a)
return _c_tm_to_tuple(uctypes.bytearray_at(tm_p, 36))
开发者ID:bynds,项目名称:micropython-lib,代码行数:8,代码来源:time.py
示例8: _addSensorData
def _addSensorData(self, packet):
#BinaryPayloadContainer ???
if self.getContainerId() == 17:
packet.extend(self.getValue())
return
if self.getValueType == "boolean":
packet.append(0x01) if self.sensorValue["value"] == True else packet.append(0x00)
return
else:
packet.append(0x00)
if self.data['booleanMeterValue']:
packet.append(0x01)
else:
packet.append(0x00)
if not self.data['booleanMeterValue'] is None:
return
#ATT LIBRARY USING 'SHORT' where mentioning 'INT' !!!!!
if self.data['integerMeterValue']:
packet.append(0x01)
packet.extend(pack('>h',self.data['integerMeterValue']))
return
else:
packet.append(0x00)
if self.data['doubleMeterValue']:
packet.append(0x01)
packet.extend(pack('>f',self.data['doubleMeterValue']))
return
elif self.data['accelerometerMeterValue']:
packet.append(0x03)
packet.extend(pack('>fff',self.data['accelerometerMeterValue']['x'], self.data['accelerometerMeterValue']['y'], self.data['accelerometerMeterValue']['z']))
return
elif self.data['gpsMeterValue']:
packet.append(0x04)
packet.extend(pack('>ffff',self.data['gpsMeterValue']['latitude'], self.data['gpsMeterValue']['longitude'], self.data['gpsMeterValue']['altitude'], self.data['gpsMeterValue']['timestamp']))
return
else:
#Nothing left for now ... so shouldn't really be getting here.
packet.append(0x00)
开发者ID:Enabling,项目名称:WiPy-LoPy,代码行数:45,代码来源:m2m_sensor_new.py
示例9: master
def master():
csn = Pin(cfg['csn'], mode=Pin.OUT, value=1)
ce = Pin(cfg['ce'], mode=Pin.OUT, value=0)
if cfg['spi'] == -1:
spi = SPI(-1, sck=Pin(cfg['sck']), mosi=Pin(cfg['mosi']), miso=Pin(cfg['miso']))
nrf = NRF24L01(spi, csn, ce, payload_size=8)
else:
nrf = NRF24L01(SPI(cfg['spi']), csn, ce, payload_size=8)
nrf.open_tx_pipe(pipes[0])
nrf.open_rx_pipe(1, pipes[1])
nrf.start_listening()
num_needed = 16
num_successes = 0
num_failures = 0
led_state = 0
print('NRF24L01 master mode, sending %d packets...' % num_needed)
while num_successes < num_needed and num_failures < num_needed:
# stop listening and send packet
nrf.stop_listening()
millis = utime.ticks_ms()
led_state = max(1, (led_state << 1) & 0x0f)
print('sending:', millis, led_state)
try:
nrf.send(struct.pack('ii', millis, led_state))
except OSError:
pass
# start listening again
nrf.start_listening()
# wait for response, with 250ms timeout
start_time = utime.ticks_ms()
timeout = False
while not nrf.any() and not timeout:
if utime.ticks_diff(utime.ticks_ms(), start_time) > 250:
timeout = True
if timeout:
print('failed, response timed out')
num_failures += 1
else:
# recv packet
got_millis, = struct.unpack('i', nrf.recv())
# print response and round-trip delay
print('got response:', got_millis, '(delay', utime.ticks_diff(utime.ticks_ms(), got_millis), 'ms)')
num_successes += 1
# delay then loop
utime.sleep_ms(250)
print('master finished sending; successes=%d, failures=%d' % (num_successes, num_failures))
开发者ID:DanielO,项目名称:micropython,代码行数:57,代码来源:nrf24l01test.py
示例10: colour_to_pixel
def colour_to_pixel(colour, bpp, depth, big, true, masks, shifts):
if true:
v = 0
for channel, mask, shift in zip(colour, masks, shifts):
v += (channel & mask)<<shift
return pack(
('>' if big else '<') + \
('L' if bpp==32 else ('H' if bpp==16 else 'B')),
v<<(bpp-depth) if big else v
)
开发者ID:PinkInk,项目名称:upylib,代码行数:10,代码来源:encodings.py
示例11: pack_blob
def pack_blob(b, encoding='utf-8'):
"""Pack a bytes, bytearray or tuple/list of ints into a binary OSC blob."""
if isinstance(b, (tuple, list)):
b = bytearray(b)
elif isinstance(b, str):
b = bytes(b, encoding)
blen = len(b)
b = pack('>I', blen) + b
return b + b'\0' * (((blen + 3) & ~0x03) - blen)
开发者ID:SpotlightKid,项目名称:micropython-osc,代码行数:10,代码来源:client.py
示例12: _check_keepalive
def _check_keepalive(self):
print("_check_keepalive")
now = time.time()
print("_check_keepalive: self._last_msg = ", self._last_msg)
last_msg = self._last_msg
if (self._sock is not None) and (now - last_msg >= self._keepalive):
print("_check_keepalive: self._state =", self._state)
if self._ping_t == 0:
#self._send_pingreq()
packet = struct.pack('!BB', PINGREQ, 0)
self._packet_queue(PINGREQ, packet, 0, 0)
self._last_msg = now
开发者ID:slzatz,项目名称:miscellaneous,代码行数:12,代码来源:umqtt3.py
示例13: pack_bundle
def pack_bundle(bundle):
"""Return bundle data packed into a binary string."""
data = []
for msg in bundle:
if isinstance(msg, Bundle):
msg = pack_bundle(msg)
elif isinstance(msg, tuple):
msg = create_message(*msg)
data.append(pack('>I', len(msg)) + msg)
return b'#bundle\0' + pack_timetag(bundle.timetag) + b''.join(data)
开发者ID:SpotlightKid,项目名称:micropython-osc,代码行数:12,代码来源:client.py
示例14: ServerFrameBufferUpdate
def ServerFrameBufferUpdate(rectangles):
if rectangles: # empty list is False
buffer = bytes()
for idx, rect in enumerate(rectangles):
b = rect.to_bytes()
if b is None: # done with this rectangle
del (rectangles[idx])
elif b is False:
pass # no update required
else:
buffer += b
return b"\x00\x00" + pack(">H", len(rectangles)) + buffer
开发者ID:PinkInk,项目名称:upylib,代码行数:12,代码来源:session.py
示例15: to_bytes
def to_bytes(self):
b = b''
for rect in self.subrectangles:
b += rect.to_bytes()
return super().to_bytes() \
+ pack('>L',len(self.subrectangles)) \
+ colour_to_pixel(
self.bgcolour,
self.bpp, self.depth,
self.big, self.true,
self.masks, self.shifts
) + b
开发者ID:PinkInk,项目名称:upylib,代码行数:12,代码来源:encodings.py
示例16: register
def register(self, fd, eventmask=EPOLLIN|EPOLLPRI|EPOLLOUT, retval=None):
"retval is extension to stdlib, value to use in results from .poll()."
if retval is None:
retval = fd
s = struct.pack(epoll_event, eventmask, retval)
r = epoll_ctl(self.epfd, EPOLL_CTL_ADD, fd, s)
if r == -1 and os.errno_.get() == errno.EEXIST:
r = epoll_ctl(self.epfd, EPOLL_CTL_MOD, fd, s)
os.check_error(r)
# We must keep reference to retval, or it may be GCed. And we must
# keep mapping from fd to retval to be able to get rid of this retval
# reference later.
self.registry[fd] = retval
开发者ID:JamesHyunKim,项目名称:micropython-lib,代码行数:13,代码来源:select.py
示例17: _server_alive
def _server_alive(self):
c_time = int(time.time())
if self._m_time != c_time:
self._m_time = c_time
self._tx_count = 0
if self._wdt:
self._wdt.feed()
if self._last_hb_id != 0 and c_time - self._hb_time >= MAX_SOCK_TO:
return False
if c_time - self._hb_time >= HB_PERIOD and self.state == AUTHENTICATED:
self._hb_time = c_time
self._last_hb_id = self._new_msg_id()
self._send(struct.pack(HDR_FMT, MSG_PING, self._last_hb_id, 0), True)
return True
开发者ID:jantje,项目名称:badge,代码行数:14,代码来源:blynk.py
示例18: _pack_remaining_length
def _pack_remaining_length(self, packet, remaining_length):
print("_pack_remaining_length")
#from _send_subscribe and _send_connect
#remaining_bytes = [] ? not used for anything
while True:
byte = remaining_length % 128
remaining_length = remaining_length // 128
# If there are more digits to encode, set the top bit of this digit
if remaining_length > 0:
byte = byte | 0x80
#remaining_bytes.append(byte)
packet.extend(struct.pack("!B", byte))
if remaining_length == 0:
# FIXME - this doesn't deal with incorrectly large payloads
return packet
开发者ID:slzatz,项目名称:miscellaneous,代码行数:16,代码来源:umqtt3.py
示例19: slave
def slave():
csn = Pin(cfg['csn'], mode=Pin.OUT, value=1)
ce = Pin(cfg['ce'], mode=Pin.OUT, value=0)
if cfg['spi'] == -1:
spi = SPI(-1, sck=Pin(cfg['sck']), mosi=Pin(cfg['mosi']), miso=Pin(cfg['miso']))
nrf = NRF24L01(spi, csn, ce, payload_size=8)
else:
nrf = NRF24L01(SPI(cfg['spi']), csn, ce, payload_size=8)
nrf.open_tx_pipe(pipes[1])
nrf.open_rx_pipe(1, pipes[0])
nrf.start_listening()
print('NRF24L01 slave mode, waiting for packets... (ctrl-C to stop)')
while True:
if nrf.any():
while nrf.any():
buf = nrf.recv()
millis, led_state = struct.unpack('ii', buf)
print('received:', millis, led_state)
for led in leds:
if led_state & 1:
led.on()
else:
led.off()
led_state >>= 1
utime.sleep_ms(_RX_POLL_DELAY)
# Give master time to get into receive mode.
utime.sleep_ms(_SLAVE_SEND_DELAY)
nrf.stop_listening()
try:
nrf.send(struct.pack('i', millis))
except OSError:
pass
print('sent response')
nrf.start_listening()
开发者ID:DanielO,项目名称:micropython,代码行数:38,代码来源:nrf24l01test.py
示例20: _send_str
def _send_str(self, s):
self.sock.write(struct.pack("!H", len(s)))
self.sock.write(s)
开发者ID:stinos,项目名称:micropython-lib,代码行数:3,代码来源:mqtt.py
注:本文中的ustruct.pack函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论