RasberryPi

RaspberryPi 3B+ MQTT & sqlite

찬영_00 2025. 3. 28. 17:36

이번엔 간단한 테스트를 위해 해당 환경을 구축해 보았다.

 

라즈베리파이 3B+ 안에서 가상으로 센서에 연결되어 있다고 가정하고, 가상의 센서 값 (for문)을 pub하고 sub로 받아서 받은 값을 sqlite db에 넣는 것을 해보았다.

 

빠르게 포스팅해보겠다.

 

뭐 이전에 MQTT 개발 환경을 다 구축해 두었으니 이전 포스팅을 참고하고, db는 다음과 같이 구성해 보았다.

이렇게 구성해두고 코드를 짜주면 된다.

 

일단 센서 값을 받았다고 가정하고 pub하는 코드이다.

import paho.mqtt.client as mqtt
from test.db_L import dbL
import time

def main():
    broker_address = "127.0.0.1" 
    port = 1883
    topic = "sensor/data"

    client = mqtt.Client()
    client.connect(broker_address, port, 60)

    while True:
        for i in range(1,110):
            message = str(i)
            client.publish(topic, message)
            print(f"Sent: {message}")
            time.sleep(1)



if __name__ == "__main__":
    main()

 

for문을 이용해 id 값을 계속 증가 시켰다.

 

다음은 sub쪽이다.

import paho.mqtt.client as mqtt
from test.db_L import dbL

# MQTT 브로커 정보 (라즈베리파이 IP 주소 사용)
broker = "127.0.0.1"  # 라즈베리파이의 IP 주소를 넣어주세요
port = 1883
topic = "sensor/data"

Tdb = dbL()

# 메시지가 수신되었을 때 실행될 콜백 함수
def on_message(client, userdata, message):
    Tdb.connect_test_db()
    Tdb.write_test_db(message.payload.decode(), 'test1', 'test2', 'test3')
    print(f"Received message: {message.payload.decode()} on topic {message.topic}")

# MQTT 클라이언트 생성
client = mqtt.Client()

# 메시지를 수신하는 콜백 함수 설정
client.on_message = on_message

# 브로커에 연결
client.connect(broker, port)

# 구독 시작
client.subscribe(topic)

# 메시지 수신 대기 (여기서 client.loop_forever()는 MQTT 클라이언트가 연결된 상태로 계속 메시지를 기다리게 만듭니다)
client.loop_forever()

 

데이터 받을 것을 write히여 sqlite db에 써주는 것을 확인 할 수 있다.

 

여기서는 내가 만든 라이브러리가 들어간다.

test폴더 안에 db_L.py가 있음으로 test.db_L 를 import 해두었다.

(패키지를 인식시키려면 test폴더안에 __init__.py 를 빈파일도 상관없으니 넣어두어야 한다 )

import sqlite3
import os

class dbL:
    _test_file = None

    # test db
    __test_AN = None
    __test_RM = None
    __test_DQ = None
    __test_WQ = None

    def __init__(self):
        test_dir = os.getcwd()
        os_name = os.name
        print(test_dir)

        if os_name.upper() == "NT":
            self._test_file = test_dir+"\\test\\test.db"
        elif os_name.upper() == "POSIX":
            self._test_file = test_dir+"/test/test.db"
        else :
            print("This is an os that is not supported!!!")
        print(self._test_file)
        
        if os.path.isfile(self._test_file) == False:
            raise Exception("There is no test.db\n")
        
    def connect_test_db(self):
        try:
            # 데이터베이스에 연결
            connection = sqlite3.connect(self._test_file)
            cursor = connection.cursor()
            return connection, cursor  # 커서를 반환
        except Exception as ex:
            print(str(ex))
        return None, None  # 연결 실패 시 None, None 반환


    def read_test_db(self):
        connection = self.connect_test_db()
        cursor = connection.cursor()
        # 안에 저장된 데이터를 변수에 저장하여 리턴
        cursor.execute("SELECT * FROM test")
        ans = cursor.fetchall()  # 조회한 데이터를 ans 변수에 저장하기

        print(ans)

        for i in ans:
            print(i)

        # 연결 종료
        connection.close()

    def write_test_db(self, AN, RM, DQ, WQ):
        # setter 메서드를 사용하여 값을 설정
        self.test_AN = AN
        self.test_RM = RM
        self.test_DQ = DQ
        self.test_WQ = WQ

        # 데이터베이스 연결 및 커서
        connection, cursor = self.connect_test_db()
        if connection is None or cursor is None:
            return  # 연결 실패 시 반환

        # INSERT 명령어 사용
        cursor.execute("INSERT INTO test VALUES(?, ?, ?, ?)", (self.test_AN, self.test_RM, self.test_DQ, self.test_WQ))

        # 변경 사항 커밋
        connection.commit()

        # 연결 종료
        connection.close()


    # getter
    @property
    def test_AN(self):
        return self.__test_AN

    # getter
    @property
    def test_RM(self):
        return self.__test_RM

    # getter
    @property
    def test_DQ(self):
        return self.__test_DQ

    # getter
    @property
    def test_WQ(self):
        return self.__test_WQ

    # setter
    @test_AN.setter
    def test_AN(self, value):
        self.__test_AN = value
        
    # setter
    @test_RM.setter
    def test_RM(self, value):
        self.__test_RM = value
    
    # setter
    @test_DQ.setter
    def test_DQ(self, value):
        self.__test_DQ = value

    # setter
    @test_WQ.setter
    def test_WQ(self, value):
        self.__test_WQ = value

 

조금 길지만 별거 아니다. 연결과 읽기, 쓰기만 있는 코드이다.

getter, setter은 나중에 쓸수도 있으니 미리 넣어두었다.

 

이렇게 다음과 같이 실행시키면

 

 

이렇게 진행되고 id 110이 되면 중복되는 id에 sub는 오류가 발생할것이다. ( 예외처리 안해둠 )

db에는 다음과 같이 성공적으로 데이터가 쌓였음을 알 수 있다.

 

 

mqtt explorer에서도 데이터가 오는 것을 잘 확인 할 수 있다. ( 내컴퓨터에서 모니터링 한 장면 )

'RasberryPi' 카테고리의 다른 글

Raspberry Pi 2 W OTG setting -1 (해결x)  (0) 2025.03.30
Raspberry Pi - SSH with VS code  (0) 2025.03.29
RaspberryPi 3B+ sqlite3  (0) 2025.03.27
Raspberry Pi 3B+ MQTT  (0) 2025.03.26
라즈베리파이 3B+ 이미지 굽기  (0) 2025.03.25