안드로이드와 스위치 간 MQTT를 이용한 통신을 서버에서 중계를 하고자 한다. 그러면 어떻게 해야 중계를 할 수 있을까?
내용이 너무 길어 나눠서 포스팅 하도록 하겠습니다.
살펴볼 코드는 총 5개
- Main.py, MQTT_Main.py, MQTT_JSON.py, Network.py, Reserve_Main.py
이렇게 입니다.
이번에는 Main.py와 MQTT_Main.py까지 보도록 하겠습니다.
1. 중계 프로그램의 목적 정의
우선 중계 프로그램을 통해 어떤걸 하고자 하는지 정의를 먼저 정할 필요가 있습니다.
제가 원하는 목적들을 정의를 하자면 아래와 같습니다.
1. MQTT 구독
1-1. 스위치로 부터 발행된 메세지 구독
1-2. 안드로이드로 부터 발행된 메세지 구독
2. MQTT 발행
3. 스위치 상태 체크
4. 예약 실행 기능
4-1. 전등 예약 관련 메세지 구독
4-2. DB 연동
5. JSON 인*디코딩
이제 해당 목적들을 위해 필요한 기능들은 뭐가 있을지 확인해 봅시다.
2. 목적 별 필요 기능
1번 & 2번. MQTT 기능
MQTT를 사용하기 위해서는 당연히 파이썬에서 MQTT 라이브러리가 필요합니다.
설치 및 사용 예제는 아래 링크를 통해 확인 부탁드립니다.
링크 : https://sonjuhy.tistory.com/37
3번. 스위치 상태 체크
스위치가 현재 서버와 연결되어 있는 상태인지 확인 하는 기능입니다.
주기적으로 체크 하고 그 결과를 DB에 갱신하여 외부 컨트롤러 도 확인이 가능하도록 하는 기능입니다.
필요한 기능은 아래와 같습니다.
1) MQTT 구독
2) MQTT 발행
3) DB 수정
- 윈도우 : pymysql, 리눅스 : MySQLdb 라이브러리가 필요합니다.
4) 무한 루프를 돌려도 다른 기능들이 지장 없도록 멀티 쓰레드 혹은 멀티 프로세스
- 멀티 프로세스 기준 multiprocessing 라이브러리가 필요합니다.
4번. 예약 기능 실행
스위치 컨트롤을 단순 on/off가 아닌 예약 기능도 추가 하기 위해 미리 만들고자 합니다.
이를 위해 필요한 기능은 아래와 같습니다.
1) MQTT 구독
- 예악을 추가하는 메세지를 처리할 구독입니다.
2) MQTT 발행
- 예약한 시간에 예약된 행동 정보를 담은 MQTT 메세지를 발행하는 기능입니다.
3) DB 값 불러오기
- 예약된 기능들의 정보를 DB로부터 불러옵니다.
- 윈도우 : pymysql, 리눅스 : MySQLdb 라이브러리가 필요합니다.
4) 특정 시간에 이벤트 발생하는 스케쥴러
- 예약한 시간(특정시간)에 이벤트를 발생하여 일을 처리하도록 합니다.
- schedule 라이브러리가 필요합니다.
5) 무한 루프를 돌려도 다른 기능들이 지장 없도록 멀티 쓰레드 혹은 멀티 프로세스 입니다.
- 멀티 프로세스 기준 multiprocessing 라이브러리가 필요합니다.
5번. JSON 인*디코딩
안드로이드와 스위치로부터 받는 JSON 데이터를 파싱하고, 안드로이드와 스위치로 보낼 데이터를 인코딩 해야합니다.
- json 라이브러리가 필요합니다.
이렇게 정리한 기능들을 가지고 코드를 한번 보겠습니다.
3. 코드 살펴보기
1번. Main.py
import multiprocessing as mult
import os
import sys
import time
import paho.mqtt.client as mqtt
import MQTT.MQTT_JSON as mqtt_json
import MQTT.MQTT_Main as mqtt_main
from multiprocessing import Queue
import Reserve.Reserve_Main as reserve
Room = [('balcony', 'balcony main'), ('balcony', 'balcony sub'), ('bath Room', 'bathRoom1'), ('bath Room', 'bathRoom2'),
('Big Room', 'big Room1'), ('Big Room', 'big Room2'), ('kitchen', 'kitchen sink')
, ('kitchen', 'kitchen table'), ('living Room', 'living Room sub'), ('living Room', 'living Room1'),
('living Room', 'living Room2'), ('living Room', 'living Room3'), ('middle Room', 'middle Room1')
, ('middle Room', 'middle Room2'), ('small Room', 'small Room')]
queueToMain = Queue()
sys.path.append(os.getcwd())
class ClientClass:
def __init__(self):
# 여러 멀티 프로세스 실행
self.client_main = mult.Process(target=ClientClass.MQTT_Main, args=(ClientClass, '1', None),
name='switch listen service')
self.client_and = mult.Process(target=ClientClass.MQTT_Main, args=(ClientClass, '2', queueToMain),
name='android lisetn service')
self.client_loop = mult.Process(target=ClientClass.MQTT_State, args=(ClientClass, 1), name='state loop service')
self.client_reserve = mult.Process(target=ClientClass.Reserve_Entrance, args=(ClientClass, queueToMain),
name='light reserve service')
self.client_and.daemon = True
self.client_main.daemon = True
self.client_loop.daemon = True
self.client_reserve.daemon = True
self.client_and.start()
self.client_loop.start()
self.client_main.start()
self.client_reserve.start()
self.mqttClass = None
self.client_and.join()
self.client_loop.join()
self.client_main.join()
self.client_reserve.join()
# 하나의 프로세스라도 죽을시 전체 다운
while True:
if not self.client_main.is_alive():
exit(-1)
if not self.client_and.is_alive():
exit(-1)
if not self.client_loop.is_alive():
exit(-1)
time.sleep(1)
# 어떤 목적을 지닌 프로세스를 실행시킬지 분기를 하는 함수
def MQTT_Main(self, mode, etc):
self.client = mqtt.Client()
self.mqttClass = mqtt_main.MQTTClass()
if mode == '1': # switch part
self.mqttClass.on_topicSet('MyHome/Light/Sub/Server')
self.client.on_message = self.mqttClass.on_message_fromSwitch
elif mode == '2': # android part
self.client.user_data_set(etc)
self.mqttClass.on_topicSet('MyHome/Light/Pub/Server')
self.client.on_message = self.mqttClass.on_message_fromAndroid
self.client.on_connect = self.mqttClass.on_connect
self.client.connect_async("192.168.0.254", 1883)
self.client.loop_start()
while True:
print(str(mode) +' : '+ str(self.client.is_connected()))
time.sleep(5)
# 전등 상태 체크 하는 함수
def MQTT_State(self, mode):
self.client_state = mqtt.Client()
self.client_state.connect_async("192.168.0.254", 1883)
while True:
self.client_state.loop()
# print("MQTT State is run")
for (cate, room) in Room:
dic = [('name', 'Server'), ('message', 'STATE'), ('destination', room)]
object = mqtt_json.JSON_ENCODE_android(dic)
# print(object)
self.client_state.publish("MyHome/Light/Pub/" + cate, object)
time.sleep(4)
# time.sleep(30)
self.client_state.loop_stop()
if mode == 0:
break
# 예약 기능 담당하는 함수(외부 코드로 작성되어있음)
def Reserve_Entrance(self, queue):
reserve.ReserveMain(queue)
if __name__ == "__main__":
mainClass = ClientClass()
살펴보기 앞서 여기에 있는 mqttClass, reserve는 외부에 따로 작성한 코드들입니다. 밑에 작성을 따로 하도록 하겠습니다.
그럼 이제 하나씩 살펴보겠습니다.
- 멀티 프로세스 실행
- 위에서부터 차례대로 스위치 MQTT 구독, 안드로이드 MQTT 구독, 스위치 상태 체크, 전등 예약 프로세스 입니다.
- 혹시나 에러가 발생하더라도 해당 기능을 제외 나머지는 일단 작동하도록 구분해놨습니다.
- 프로그램 종료
- 멀티 프로세스들 중에서 하나라도 작동을 멈추면 제 기능을 못하므로 강제로 종료 시킵니다.
- 리눅스 스크립트에 의해 해당 프로그램이 종료되면 바로 재시작을 하는것을 이용하기 위해 강제 종료 합니다.
- 메인 함수
- 스위치 파트인지 안드로이드 파트인지에 따라 다른 토픽과 메세지 함수를 정하고 실행시킵니다.
- 스위치 상태 체크 함수
- 4초마다 각자 다른 스위치에게 상태값을 물어보는 메세지를 발행합니다.
- 해당 메세지를 받은 스위치들은 현재 상태를 서버로 발행합니다.
- 예약 기능 함수
- 예약 기능 관련된 함수를 실행합니다.
Main.py에서는 어떠한 기능을 직접 수행 한다 라기 보다 수행할 함수들이 작성된 외부 코드를 참조하여 호출하는데 초점이 맞춰져 있습니다. 그럼 이제 외부 코드들을 보겠습니다.
2번. MQTT_Main.py
import paho.mqtt.client as mqtt
import Network as network
import MQTT.MQTT_JSON as json
class MQTTClass:
# 전등 스위치 값 체크하기 위한 기본 정보
Room = {'balcony main': 'Off', 'balcony sub': 'Off', 'bathRoom1': 'Off', 'bathRoom2': 'Off',
'big Room1': 'Off', 'big Room2': 'Off', 'kitchen sink': 'Off'
, 'kitchen table': 'Off', 'living Room sub': 'Off', 'living Room1': 'Off',
'living Room2': 'Off', 'living Room3': 'Off', 'middle Room1': 'Off'
, 'middle Room2': 'Off', 'small Room': 'Off'}
# mqtt 연결 관련 데이터들을 정의합니다.
def __init__(self):
self.topic = None
self.payload = None
self.dict = []
self.diction = []
self.queue = None
self.mqttClient = mqtt.Client()
self.mqttClient.connect_async("192.168.0.254", 1883)
self.mqttclient_pub = mqtt.Client()
self.mqttclient_pub.connect_async("192.168.0.254", 1883)
# 토픽 정하는 함수
def on_topicSet(self, topic):
self.topic = topic
print(self.topic)
# mqtt 연결하는 함수
def on_connect(self, client, user_data, flags, rc):
client.subscribe(topic=str(self.topic))
print("Connected with result code(Sub) : " + str(rc))
# 안드로이드로부터 오는 메세지 처리하는 함수
def on_message_fromAndroid(self, client, user_data, msg):
self.payload = msg.payload.decode("utf-8")
print("from Android message")
print(self.payload)
if self.payload is not None:
if self.payload == 'reserve':
print("from android if")
if user_data is not None:
self.queue = user_data
self.queue.put("restart")
else:
print("from android else")
self.dict = json.JSON_Parser_android(self.payload)
self.on_publish('MyHome/Light/Pub/'+self.dict['room'], self.payload)
# 스위치로부터 오는 메세지 처리하는 함수
def on_message_fromSwitch(self, client, user_data, msg):
self.payload = msg.payload.decode("utf-8")
#print("from switch : " + self.payload)
if self.payload is not None and self.payload[0] == "{" and self.payload[-1] == "}":
self.dict = json.JSON_Parser(self.payload)
if self.dict['sender'] == 'Server': # Light state part
if self.dict['room'] in self.Room:
self.Room[self.dict['room']] = "On"
network.SQL_Def("Light", self.dict)
if self.dict['room'] == 'small Room':
for (room, state) in self.Room.items():
self.diction = [('message', state), ('room', room)]
network.SQL_Def("Connect", self.diction)
self.Room[room] = "Off"
else: # return control data part
network.SQL_Def("LightRecord", self.dict)
network.SQL_Def("Light", self.dict)
self.diction = json.JSON_ENCODE(self.dict)
self.on_publish('MyHome/Light/Result', self.diction)
else:
print("can't work in on_message")
print(self.payload)
# 메세지 발행하는 함수
def on_publish(self, topic, payload):
print("on_publish")
print(topic)
print(payload)
if not self.mqttclient_pub.is_connected():
self.mqttclient_pub.connect("192.168.0.254", 1883)
print("mqtt reconnected")
self.mqttclient_pub.loop()
result = self.mqttclient_pub.publish(topic, payload)
self.mqttclient_pub.loop_stop()
print(result)
self.mqttclient_pub.disconnect()
여기서도 json, network는 외부 코드입니다. 이는 추후 따로 포스팅을 하도록 하겠습니다.
- mqtt연결에 필요한 데이터를 정의합니다.
- 토픽을 정의합니다.
- 연결을 하는 함수입니다. 받은 mqtt client 변수와 토픽으로 연결합니다.
- 안드로이드에서 오는 메세지를 처리합니다.
- 메세지를 UTF-8로 처리합니다. (한글도 읽을 수 있도록)
- 메세지가 'reserve'인지 확인합니다.
- 만약 맞다면 예약 기능 함수에게 DB로부터 데이터를 갱신하라고 queue 메세지를 보냅니다.
- 아니라면 JSON 데이터를 파싱해서 해당 전등 스위치에게 다시 인코딩해서 전달합니다.
- 스위치에서 오는 메세지를 처리합니다.
- JSON 타입의 메시지가 완벽하게 왔는지 체크 합니다. ( {} 중괄호 확인)
- 'sender' 가 'server' 인지 확인합니다.
- 맞다면 상태 체크에 대한 값 리턴입니다. 스위치 상태를 'on'으로 변경합니다.
- 만약 해당 방이 'small Room' 이라면 모든 스위치를 체크한 것 이므로, DB에 체크한 값을 갱신합니다.
- 'sender'가 'server'가 아니라면 일반적인 컨트롤 상황이므로 이를 DB에 기록합니다.
- 'sender'에게 실행값을 발행해줍니다.
- 원하는 메세지를 발행합니다.
- MQTT 서버와 연결되어있는지 확인합니다. 만약 연결이 끊겨있다면 다시 연결합니다.
- 원하는 메세지와 토픽을 설정한다음 발행합니다.
4. 기타
아래는 위 내용과 관련 궁금할 내용들입니다.
● 멀티 쓰레드와 멀티 프로세스 중에서 왜 멀티 프로세스를 선택하셨나요?
· 각각 기능들이 전부 독립적으로 실행시키고 싶어서 더 독립적인 멀티 프로세스를 선택했습니다.
● 독립적인걸 원하시는데 왜 하나의 프로세스라도 종료되었을때 프로세스 다 종료시키나요?
· 하나의 프로세스가 죽으면 그 프로세스만 다시 재실행을 하려했으나 실패해서 이렇게 했습니다. 실력이 부족해서 공부중입니다 ㅎ...
● 파이썬 프로그램이 종료되었는데 어떻게 바로 다시 재실행이 되는건가요?
· 리눅스(우분투) 스크립트를 통해 설정했습니다. 추후 이 방법도 올리도록 하겠습니다.
● MQTT가 아닌 HTTP쪽으로 해도 될 기능들이 보이는데 하나만 쓰는 이유는 뭔가요?
· 저 코드를 작성할때 당시 REST API에 대해 잘 알지 못해서 통합적으로 사용했습니다. 현재는 Django를 기반으로 코드를 리팩토리 중입니다.
추가적으로 궁금하신점은 댓글 작성해주시면 답변 드리도록 하겠습니다.
'홈 IoT > 서버' 카테고리의 다른 글
MQTT 중계 프로그램 제작 - 2부 (0) | 2023.05.08 |
---|---|
파이썬으로 MQTT 통신 하기(예제) (0) | 2023.04.24 |
Ubuntu 18.04.5 LTS Server에 MQTT(Mosquitto) 설치 및 활용 (0) | 2021.04.06 |