Spring Boot에서 MQTT를 이용하여 각종 IoT가 접목된 기기들을 컨트롤하고자 한다. 그러면 어떻게 Spring Boot에서 이를 사용할 수 있을까?
환경 버전
java : 11
spring boot : 2.7.6
1. MQTT 라이브러리 설치하기
우선 MQTT 라이브러리를 설치해야 이 기능을 사용할 수 있으므로 설치를 합니다.
implementation 'org.springframework.integration:spring-integration-mqtt:5.5.14'
위 코드를 "build gradle" 파일 내에 있는 "dependencies" 에 추가합니다.
저는 5.5 보다 더 윗단계 라이브러리 사용시 호환성 문제로 작동하지 않는 부분이 있어 해당 버전을 사용했습니다.
2. 코드 작성
여러 단계에 거쳐 코드를 작성하겠습니다.
각 파일들의 목적과 파일명입니다.
- Bean scan : Application.java
- Service : MQTTService.java, MQTTServiceImpl.java
- Dto : MQTTDto.java
- Controller : MQTTController.java
- Config : MQTTConfig.java
- EventHandler : MQTTEventHandler.java
2-1. Application.java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.integration.annotation.IntegrationComponentScan;
@SpringBootApplication
@IntegrationComponentScan # 추가할 부분
public class ServerApplication {
public static void main(String[] args) {
SpringApplication.run(ServerApplication.class, args);
}
}
2-2. Service.java
2-2-1. MQTTService.java
public interface MQTTService {
void publish(MQTTDto dto);
boolean connected();
void reconnect();
}
2-2-2. MQTTServiceImpl.java
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.myhome.server.api.dto.MQTTDto;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MQTTServiceImpl implements MQTTService{
private final String pubTopic = "test";
@Autowired
private IMqttClient mqttClient;
@Override
public void publish(MQTTDto dto) {
Gson gson = new Gson();
String dtoPayload = gson.toJson(dto);
JsonObject object = new JsonObject();
object.addProperty("Payload", dtoPayload);
String payload = gson.toJson(object);
int qos = 0;
boolean retained = false;
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(payload.getBytes());
mqttMessage.setQos(qos);
mqttMessage.setRetained(retained);
try {
mqttClient.publish(pubTopic, mqttMessage);
mqttClient.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public boolean connected() {
return mqttClient.isConnected();
}
@Override
public void reconnect() {
try {
System.out.println("try to reconnect mqtt server");
mqttClient.reconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
publish : Dto타입으로 온 데이터를 JSON 타입으로 변환하여 발행.
connected : 연결 상태 리턴
reconnect : MQTT 서버랑 재접속 시도
2-3. MQTTDto.java
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
@Getter
@Setter
@ToString
@NoArgsConstructor
public class MQTTDto {
private String sender;
private String message;
}
2-4. MQTTController.java
import com.myhome.server.api.dto.MQTTDto;
import com.myhome.server.api.service.MQTTService;
import com.myhome.server.api.service.MQTTServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
@RestController()
@RequestMapping("/mqtt")
public class MQTTController {
@Autowired
MQTTService mqttService = new MQTTServiceImpl();
@PostMapping("/pub")
public ResponseEntity<Void> publish(@RequestBody MQTTDto dto){
mqttService.publish(dto);
return new ResponseEntity<>(null, HttpStatus.OK);
}
}
localhost:8080/mqtt/pub으로 온 dto 데이터를 MQTTServiceImpl로 보내 처리합니다.
2-5. MQTTConfig.java
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.scheduling.annotation.EnableAsync;
@EnableAsync
@Configuration
public class MQTTConfig {
private final String subTopic = "test";
@Bean
@ConfigurationProperties(prefix = "mqtt")
public MqttConnectOptions mqttConnectOptions() {
return new MqttConnectOptions();
}
@Bean
public IMqttClient mqttClient(@Value("${mqtt.clientId}") String clientId,
@Value("${mqtt.hostname}") String hostname, @Value("${mqtt.port}") int port) throws MqttException {
IMqttClient mqttClient = new MqttClient("tcp://" + hostname + ":" + port, clientId);
mqttClient.connect(mqttConnectOptions());
return mqttClient;
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.0.254:1883", "SpringBoot-server",
subTopic); // 192.168.0.254에서 본인 서버 ip 혹은 도메인 주소 넣으시면됩니다.
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("MQTT MessageHandler : " + message.getPayload());
}
};
}
}
Spring Boot가 실행할때 MQTT 서버와 연결하기 위한 포트, 아이디, 호스트네임 등을 설정하고 연결을 시도합니다.
추가로 Qos, Timeout 시간 등 도 설정이 가능합니다.
handler를 통해 구독한 토픽으로부터 오는 메세지가 오는것을 받아 처리 할 수 있습니다.
2-6. MQTTEventHandler.java
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.event.EventListener;
import org.springframework.integration.mqtt.core.MqttPahoComponent;
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.stereotype.Component;
@Component
public class MQTTEventHandler {
@EventListener
public void connectLost(MqttConnectionFailedEvent failedEvent){
MqttPahoComponent source = failedEvent.getSourceAsType();
MqttConnectOptions options = source.getConnectionInfo();
System.out.println("MQTT Connection is broken!!");
}
}
연결이 끊겼는지 체크 하다가 이벤트 발생시 확인하는 리스너 입니다.
2-7. application.properties
### MQTT
mqtt.automaticReconnect=true
mqtt.cleanSession=true
mqtt.connectionTimeout=10
mqtt.clientId=SpringBoot-MQTT
mqtt.hostname=192.168.0.254
mqtt.port=1883
MQTTConfig 파일에서 있던 value들을 저장해둔 내용입니다.
코드가 좀 많습니다. 저도 배우면서 해보는거라 코드가 미숙하거나 아쉬운 점도 있을 수 있습니다.
댓글로 남겨주시면 감사하겠습니다.
'개발잡담 > Back-End' 카테고리의 다른 글
Spring Boot에서 영상 스트리밍으로 받기 (feat. React) (0) | 2024.02.03 |
---|---|
Spring Boot에서 파일 다운로드 (feat. React) (0) | 2024.02.02 |
Spring batch 5.1.0 간단 사용 (0) | 2024.01.31 |
로그가 필요해 - 서론 (0) | 2023.11.02 |
Spring은 어떻게 여러 개의 요청을 동시에 처리할까? (0) | 2023.09.26 |