프로젝트/[백엔드] API_to_PSQL
#3 [PSQL,쿼리문] API데이터 다른pc의 PSQL에 저장하기(+전처리)
SeokjunMan
2023. 11. 5. 23:00
오늘은 API로 데이터를 받아 PSQL(DB)에 저장하는 API_to_PSQL 미니프로젝트를 마무리 해보겠다.
< 지난 포스팅 >
https://jayindustry.tistory.com/62
#2 [API,PSQL] API데이터받기, 서버pc와의 PSQL연결
이번 포스팅에서는 본격적으로 API로 공공수질데이터를 받고 다른 컴퓨터인 서버pc의 PSQL과 연결하여 데이터를 저장해보도록 하겠다. < 지난 포스팅 > 이 프로젝트 #1 에서는 프로젝트 소개와 파
jayindustry.tistory.com
저번시간까지는 API를 연결하고 공공수질데이터를 받고 다른 컴퓨터인 서버pc의 PSQL과 연결까지의 과정이었다.
사실 vscode와 리눅스 터미널에서 ssh로 서버pc에 연결하여 host를 localhost로 지정하여 바로 쌓는 방법도 있다.
다만 경험상, ssh로 연결하여 사용하다보면 다른 사내 pc에서도 많이 연결되어있기 때문에 간혹 연결이 원활하지 않을 때가 있어, 이렇게 외부포트로 접근하여 쌓는 방식을 택하였다.
이번 프로젝트의 목표를 다시 상기시키고 시작해보도록 하겠다.
[ 프로젝트 목표 ]
주 목표 )
공공사이트에서 API로 옥정호의 수질데이터를 실시간으로 계속 받아와 다른pc(회사의 서버pc)의 PSQL(DB)에 쌓는다.
이유 )
회사에서 운용하는 옥정호 수질로봇과 실제 공공기관에서 측정하는 수질데이터를 그라파나 대쉬보드에 띄워 실시간으로 비교해보려고한다.
그라파나(Grafana)사이트는 psql과 연동되어 데이터를 편하게 대시보드상에 시각화해주는 툴이다.
개발순서)
1. 로봇이 수집중인 data종류를 확인 후, 그에 맞는 공공수질데이터를 제공하는 사이트를 찾는다.
2. 스크립트를 작성하여 해당 사이트에서 API 공공수질데이터를 지속적으로 호출하여 받아온다.
3. 받아온 데이터를 서버pc의 psql(DB)과 연결하여 전처리 및 에러처리 후, 실시간으로 저장될 수 있게 한다.
[ 개요 ]
1. PSQL에 적합한 테이블 생성
2. 데이터삽입, 에러방지 함수
3. API데이터 전처리 및 psql에 데이터 저장하기
4. 정리 및 소감
1. PSQL에 적합한 테이블생성
PSQL은 열과 행이 있는 형식이 있는 데이터를 넣는 시퀄이다.
단, 넣고자하는 데이터의 컬럼명과 데이터형식이 일치해야 에러없이 잘 저장될 수 있다.
그리하여 API로 받아본 데이터가 어떤형식인지 체크하고
쿼리문을 작성하여 각 데이터가 잘 들어갈 수 있게 컬럼들을 만들어 테이블을 생성한다.
맨 아래 primary key를 통해 날짜와 수심을 지정하여 각 행의 고유함을 식별할 수 있다.
이는 바로 아래 데이터삽입파트에서 자세히 설명하겠다.
cur = conn.cursor()
데이터베이스 커서(Database Cursor)의 줄임말로, 데이터베이스에 대한 쿼리를 실행하고,
결과를 관리하는 데 사용되는 객체다. Python에서 데이터베이스 작업을 할 때 사용되는 메서드이다.
cur.execute로 작성한 쿼리를 실행하고, commit()으로 깃허브 커밋하듯, 변경사항을 저장한다.
2. PSQL에 데이터삽입 함수
fields는 삽입할 컬럼목록이다.
위 함수에서 data = 는 아래와 같다.
data = tuple(float(get_text_or_default(item, field, default_value='0')) if field == 'wmdep' else get_text_or_default(item, field) for field in fields)
XML에서 가져온 각각의 아이템들을 전처리한다.
get_text_or_default 함수는(지난포스팅에서 다룸) XML 요소에서 특정 태그의 텍스트 값을 추출하는 데 사용된다.
이 함수내의 element.find(tag_name)을 통해 주어진 XML 내에서 특정 태그 이름(tag_name)을 가진 태그를 찾는다.
즉, XML 태그 내의 값을 가져오고, 앞뒤의 불필요한 공백이나 개행 문자를 제거한다.
태그가 존재하지 않거나 텍스트 값이 없거나, 공백 문자만 있을 경우 default_value 반환.
여기서는 wemdp(수심)에 대해서 값이없다면 0을 반환하고
나머지 컬럼값들은 None을 반환한다.
쿼리문에 대한 설명
({', '.join(fields)}) : fields 리스트에 있는 필드 이름들을 쉼표로 구분하여 결합
예를 들어, fields 리스트가 ['field1', 'field2', 'field3']라면, 이 부분은 (field1, field2, field3)로 변환되며 삽입될 데이터의 열(columns)로서 지정한다.
VALUES ({', '.join(['%s'] * len(fields))}) : 각 필드에 삽입될 값을 지정
%s는 각 필드에 대한 값의 placeholder이고 len(fields)는 필드의 개수를 나타내며, 필드의 수만큼 %s를 반복하여 생성한다.
이렇게 생성된 자리 표시자들은 나중에 실제 데이터 값으로 대체된다.
ON CONFLICT (wmcymd, wmdep) : 충돌이 발생하면 해당 행의 데이터를 업데이트 (새로운 데이터로 대체)
위에서 지정한 Primary키를 통해 만약 같은 날짜와 수심, 즉, 같은 행이 있다면 새롭게 업데이트하여 저장하도록 한다.
서버가 잠시 셧다운될 경우, 특정 기간/시간만큼의 데이터가 빌 수 있다.
이 때, 이미 저장되어있는 기간의 데이터들까지 포함하여 저장할 때 중복삽입과 에러를 방지하기 위한 쿼리이다.
{', '.join([f'{field} = EXCLUDED.{field}' for field in fields])}: 각 필드에 대해 업데이트할 값을 지정
EXCLUDED.{field}는 삽입하려고 했던 새로운 데이터의 값이다.
이 부분은 각 필드에 대해 field = EXCLUDED.field 형태로 생성되어, 기존 데이터를 새 데이터로 대체한다.