最近因需要用python写一个微服务来用MQTT给硬件传输图片,其中python用的是flask框架,大概流程如下:
协议为:
需要将图片数据封装成多个消息进行传输,每个消息传输的数据字节数为1400Byte。
消息(MQTT Payload) 格式:Web服务器-------->BASE:
反馈:BASE---------> Web服务器:
如果Web服务器发送完一个“数据传输消息”后,5S内没有收到MQTT“反馈消息”或者收到的反馈中显示“数据包不完整”,则重发该“数据传输消息”。
程序流程图
根据上面的协议,可以得到如下的流程图:
代码如下:
# encoding:utf-8 from flask import Flask, jsonify from flask_restful import Api, Resource, reqparse from PIL import Image from io import BytesIO import requests import os, logging, time import paho.mqtt.client as mqtt import struct from flask_cors import * # 日志配置信息 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s (runing by %(funcName)s', ) class Mqtt(object): def __init__(self, img_data, size): self.MQTTHOST = '*******' self.MQTTPORT = "******" # 订阅和发送的主题 self.topic_from_base = 'mqttTestSub' self.topic_to_base = 'mqttTestPub' self.client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time())) self.client = mqtt.Client(self.client_id) # 完成链接后的回掉函数 self.client.on_connect = self.on_connect # 图片大小 self.size = size # 用于跳出死循环,结束任务 self.finished = None # 包的编号 self.index = 0 # 将收到的图片数据按大小分成列表 self.image_data_list = [img_data[x:x + 1400] for x in range(0, self.size, 1400)] # 记录发布后的数据,用于监控时延 self.pub_time = 0 self.header_to_base = 0xffffeeee self.header_from_base = 0xeeeeffff # 功能标识 self.function_begin = 0x01 self.function_doing = 0x02 self.function_finished = 0x03 # 包的完整和非完整状态 self.whole_package = 0x01 self.bad_package = 0x00 # 头信息的格式,小端模式 self.format_to_base = "<Lbhh" self.format_from_base = "<Lbhb" # 如果重发包时,用于检查是否重发第一个包 self.first = True # 如果重发包时,用于检查是否重发最后一个包 self.last = False self.begin_data = 'image.jpg;' + str(self.size) # 链接mqtt服务器函数 def on_mqtt_connect(self): self.client.connect(self.MQTTHOST, self.MQTTPORT, 60) self.client.loop_start() # 链接完成后的回调函数 def on_connect(self, client, userdata, flags, rc): logging.info("+++ Connected with result code {} +++".format(str(rc))) self.client.subscribe(self.topic_from_base) # 订阅函数 def subscribe(self): self.client.subscribe(self.topic_from_base, 1) # 消息到来处理函数 self.client.on_message = self.on_message # 接收到信息后的回调函数 def on_message(self, client, userdata, msg): # 如果接受第一个包则不需要重发第一个 self.first = False # 将接受到的包进行解压,得到一个元组 base_tuple = struct.unpack(self.format_from_base, msg.payload) logging.info("+++ imageData's letgth is {}, base_tupe is {} +++".format(self.size, base_tuple)) logging.info("+++ package_number is {}, package_status_from_base is {} +++" .format(base_tuple[2], base_tuple[3])) # 检查接受到信息的头部是否正确 if base_tuple[0] == self.header_from_base: logging.info("+++ function_from_base is {} +++".format(base_tuple[1])) # 是否完成传输,如果完成则退出 if base_tuple[1] == self.function_finished: logging.info("+++ finish work +++") self.finished = 1 self.client.disconnect() else: # 是否是最后一个包 if self.index == len(self.image_data_list) - 1: self.publish('finished', self.function_finished) self.last = True logging.info("+++ finished_data_to_base is finished+++") else: # 如果接收到的包不是 0x03则进行传送数据 if base_tuple[1] == self.function_begin or base_tuple[1] == self.function_doing: logging.info("+++ package_number is {}, package_status_from_base is {} +++" .format(base_tuple[2],base_tuple[3])) # 如果数据的反馈中,包的状态是1则继续发下一个包 if base_tuple[3] == self.whole_package: self.publish(self.index, self.function_doing) logging.info("+++ data_to_base is finished+++") self.index += 1 # 如果数据的反馈中,包的状态是0则重发数据包 elif base_tuple[3] == self.bad_package: re_package_number = base_tuple[2] self.publish(re_package_number-1, self.function_doing) logging.info("+++ re_data_to_base is finished+++") else: logging.info("+++ package_status_from_base is not 0 or 1 +++") self.client.disconnect() else: logging.info("+++ function_identifier is illegal +++") self.client.disconnect() else: logging.info("+++ header_from_base is illegal +++") self.client.disconnect() # 数据发送函数 def publish(self, index, fuc): # 看是否是最后一个包 if index == 'finished': length = 0 package_number = 0 data = b'' else: length = len(self.image_data_list[index]) package_number = index data = self.image_data_list[index] # 打包数据头信息 buffer = struct.pack( self.format_to_base, self.header_to_base, fuc, package_number, length ) to_base_data = buffer + data # mqtt发送 self.client.publish( self.topic_to_base, to_base_data ) self.pub_time = time.time() # 发送第一个包函数 def publish_begin(self): buffer = struct.pack( self.format_to_base, self.header_to_base, self.function_begin, 0, len(self.begin_data.encode('utf-8')), ) begin_data = buffer + self.begin_data.encode('utf-8') self.client.publish(self.topic_to_base, begin_data) # 控制函数 def control(self): self.on_mqtt_connect() self.publish_begin() begin_time = time.time() self.pub_time = time.time() self.subscribe() while True: time.sleep(1) # 超过5秒重传 date = time.time() - self.pub_time if date > 5: # 是否重传第一个包 if self.first == True: self.publish_begin() logging.info('+++ this is timeout first_data +++') # 是否重传最后一个包 elif self.last == True: self.publish('finished', self.function_finished) logging.info('+++ this is timeout last_data +++') else: self.publish(self.index-1, self.function_doing) logging.info('+++ this is timeout middle_data +++') if self.finished == 1: logging.info('+++ all works is finished+++') break print(str(time.time()-begin_time) + 'begin_time - end_time') app = Flask(__name__) api = Api(app) CORS(app, supports_credentials=True) # 接受参数 parser = reqparse.RequestParser() parser.add_argument('url', help='mqttImage url', location='args', type=str) class GetImage(Resource): # 得到参数并从图床下载到本地 def get(self): args = parser.parse_args() url = args.get('url') response = requests.get(url) # 获取图片 image = Image.open(BytesIO(response.content)) # 存取图片 add = os.path.join(os.path.abspath(''), 'image.jpg') image.save(add) # 得到图片大小 size = os.path.getsize(add) f = open(add, 'rb') imageData = f.read() f.close() # 进行mqtt传输 mqtt = Mqtt(imageData, size) mqtt.control() # 删除文件 os.remove(add) logging.info('*** the result of control is {} ***'.format(1)) return jsonify({ "imageData": 1 }) api.add_resource(GetImage, '/image') if __name__ == '__main__': app.run(debug=True, host='0.0.0.0')
总结
以上所述是小编给大家介绍的python使用MQTT给硬件传输图片的实现方法,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对网站的支持!
如果你觉得本文对你有帮助,欢迎转载,烦请注明出处,谢谢!
免责声明:本站资源来自互联网收集,仅供用于学习和交流,请遵循相关法律法规,本站一切资源不代表本站立场,如有侵权、后门、不妥请联系本站删除!
《魔兽世界》大逃杀!60人新游玩模式《强袭风暴》3月21日上线
暴雪近日发布了《魔兽世界》10.2.6 更新内容,新游玩模式《强袭风暴》即将于3月21 日在亚服上线,届时玩家将前往阿拉希高地展开一场 60 人大逃杀对战。
艾泽拉斯的冒险者已经征服了艾泽拉斯的大地及遥远的彼岸。他们在对抗世界上最致命的敌人时展现出过人的手腕,并且成功阻止终结宇宙等级的威胁。当他们在为即将于《魔兽世界》资料片《地心之战》中来袭的萨拉塔斯势力做战斗准备时,他们还需要在熟悉的阿拉希高地面对一个全新的敌人──那就是彼此。在《巨龙崛起》10.2.6 更新的《强袭风暴》中,玩家将会进入一个全新的海盗主题大逃杀式限时活动,其中包含极高的风险和史诗级的奖励。
《强袭风暴》不是普通的战场,作为一个独立于主游戏之外的活动,玩家可以用大逃杀的风格来体验《魔兽世界》,不分职业、不分装备(除了你在赛局中捡到的),光是技巧和战略的强弱之分就能决定出谁才是能坚持到最后的赢家。本次活动将会开放单人和双人模式,玩家在加入海盗主题的预赛大厅区域前,可以从强袭风暴角色画面新增好友。游玩游戏将可以累计名望轨迹,《巨龙崛起》和《魔兽世界:巫妖王之怒 经典版》的玩家都可以获得奖励。
更新日志
- 【雨果唱片】中国管弦乐《鹿回头》WAV
- APM亚流新世代《一起冒险》[FLAC/分轨][106.77MB]
- 崔健《飞狗》律冻文化[WAV+CUE][1.1G]
- 罗志祥《舞状元 (Explicit)》[320K/MP3][66.77MB]
- 尤雅.1997-幽雅精粹2CD【南方】【WAV+CUE】
- 张惠妹.2007-STAR(引进版)【EMI百代】【WAV+CUE】
- 群星.2008-LOVE情歌集VOL.8【正东】【WAV+CUE】
- 罗志祥《舞状元 (Explicit)》[FLAC/分轨][360.76MB]
- Tank《我不伟大,至少我能改变我。》[320K/MP3][160.41MB]
- Tank《我不伟大,至少我能改变我。》[FLAC/分轨][236.89MB]
- CD圣经推荐-夏韶声《谙2》SACD-ISO
- 钟镇涛-《百分百钟镇涛》首批限量版SACD-ISO
- 群星《继续微笑致敬许冠杰》[低速原抓WAV+CUE]
- 潘秀琼.2003-国语难忘金曲珍藏集【皇星全音】【WAV+CUE】
- 林东松.1997-2039玫瑰事件【宝丽金】【WAV+CUE】