#!/usr/bin/python3 # -*- coding: utf-8 -*- from flask import Flask, flash, session, request, Response, send_from_directory, render_template, jsonify from flask_cors import cross_origin import numpy as np import os import json import time import cv2 import threading import base64 from frame import frame_process, frame_grid ####################################### # Config and Data Object ################################################################### storageFolder = "storage" settingsFilename = storageFolder + "/settings.json" confData = [ { # Settings sfom JSON 'id': 0, 'size': (800, 600), 'undistort': { 'type': 0, 'calibration': { 'matrix': np.array([[448.27817297404454, 0, 291.8520566174806], [0, 584.2504759789658, 239.55416051020535], [0, 0, 1]]), 'distortion': np.array([-0.04882221532751064, -0.67818019974141, -0.0037673942250171966, 0.027266585061974342, 1.2104220826993612]), }, 'distortion_f': 0.0, }, 'image_transform': { 'rotation': 0, 'scale_x': 1.0, 'scale_y': 1.0 }, 'cvSettings': {'name': 'Top', 'contrast': 1, 'brightness': 0, 'blur': (1, 1), 'adaptiveThreshold_blockSize': 11, 'adaptiveThreshold_C': 2}, # Run variables 'vid': None, 'outputFrame': None, 'lock': threading.Lock(), 'distortion_maps': {'map_x': None, 'map_y': None}, # Changes via Flask app 'viewParams': {'raw':0, 'cv':0, 'ppmm':10, 'grid':0, 'marker':0}, }, { # Settings sfom JSON 'id': 1, 'size': (640, 427), 'undistort': { 'type': 0, 'calibration': { 'matrix': np.array([[448.27817297404454, 0, 291.8520566174806], [0, 584.2504759789658, 239.55416051020535], [0, 0, 1]]), 'distortion': np.array([-0.04882221532751064, -0.67818019974141, -0.0037673942250171966, 0.027266585061974342, 1.2104220826993612]), }, 'distortion_f': 0.0, }, 'image_transform': { 'rotation': 0, 'scale_x': 1.0, 'scale_y': 1.0 }, 'cvSettings': {'name': 'Bottom', 'contrast': 1, 'brightness': 0, 'blur': (1, 1), 'adaptiveThreshold_blockSize': 799, 'adaptiveThreshold_C': 27}, # Run variables 'vid': None, 'outputFrame': None, 'lock': threading.Lock(), 'distortion_maps': {'map_x': None, 'map_y': None}, # Changes via Flask app 'viewParams': {'raw':0, 'cv':0, 'ppmm':10, 'grid':0, 'marker':0}, } ] emptyFrame = cv2.imread('noimage.jpg') ################################################################### # Functions ################################################################### # Create Image correction maps by distortion factor def create_distortion_maps(width, height, distortion_type='barrel', strength=0.5): #height, width = image_size center_x, center_y = width / 2, height / 2 def map_coordinates(x, y): rel_x = (x - center_x) / center_x rel_y = (y - center_y) / center_y radius = np.sqrt(rel_x**2 + rel_y**2) if distortion_type == 'barrel': factor = 1 + strength * radius**2 elif distortion_type == 'pincushion': factor = 1 / (1 + strength * radius**2) else: factor = 1 new_x = center_x + factor * rel_x * center_x new_y = center_y + factor * rel_y * center_y return new_x, new_y map_x = np.zeros((height, width), np.float32) map_y = np.zeros((height, width), np.float32) for y in range(height): for x in range(width): new_x, new_y = map_coordinates(x, y) map_x[y, x] = new_x map_y[y, x] = new_y return map_x, map_y ################################################################### # Capture Video and set capture options def videoCapture(num): global confData if confData[num]['vid'] != None: if confData[num]['vid'].isOpened(): confData[num]['vid'].release() confData[num]['vid'] = cv2.VideoCapture(confData[num]['id']) confData[num]['vid'].set(cv2.CAP_PROP_FRAME_WIDTH, confData[num]['size'][0]) confData[num]['vid'].set(cv2.CAP_PROP_FRAME_HEIGHT, confData[num]['size'][1]) confData[num]['vid'].set(cv2.CAP_PROP_FPS, 30) ################################################################### # Load cams settings from LSON file def load_cam_settings(): global confData global settingsFilename with open(settingsFilename) as infile: result = json.load(infile) for num in range(len(confData)): prevId = confData[num]['id'] prevSize = confData[num]['size'] confData[num]['id'] = result['cams'][num]['id'] confData[num]['undistort'] = result['cams'][num]['undistort'] confData[num]['undistort']['calibration']['matrix'] = np.array(result['cams'][num]['undistort']['calibration']['matrix']) confData[num]['undistort']['calibration']['distortion'] = np.array(result['cams'][num]['undistort']['calibration']['distortion']) #confData[num]['undistort_type'] = result['cams'][num]['undistort_type'] #confData[num]['calibration']['matrix'] = np.array(result['cams'][num]['calibration']['matrix']) #confData[num]['calibration']['distortion'] = np.array(result['cams'][num]['calibration']['distortion']) #confData[num]['distortion_f'] = result['cams'][num]['distortion_f'] #confData[num]['rotation'] = result['cams'][num]['rotation'] confData[num]['image_transform'] = result['cams'][num]['image_transform'] confData[num]['size'] = (result['cams'][num]['size']['width'], result['cams'][num]['size']['height']) #if confData[num]['undistort_type'] == 0: map_x, map_y = create_distortion_maps(confData[num]['size'][0], confData[num]['size'][1], 'barrel', confData[num]['undistort']['distortion_f']) confData[num]['distortion_maps']['map_x'] = map_x confData[num]['distortion_maps']['map_y'] = map_y #else: newcameramtx, roi = cv2.getOptimalNewCameraMatrix(confData[num]['undistort']['calibration']['matrix'], confData[num]['undistort']['calibration']['distortion'], confData[num]['size'], 1, confData[num]['size']) #x, y, w, h = roi confData[num]['undistort']['calibration']['newcameramtx'] = newcameramtx confData[num]['undistort']['calibration']['roi'] = roi confData[num]['cvSettings'] = result['cv'][num] if confData[num]['id'] != prevId or confData[num]['size'][0] != prevSize[0] or confData[num]['size'][1] != prevSize[1]: videoCapture(num) return ################################################################### load_cam_settings() def get_empty_frame(num): frame = emptyFrame frame = cv2.resize(frame, (confData[num]['size'])) frame = frame_grid(frame, confData[num]['viewParams']) return frame def frame_undistort(frame, num): global confData # Resize image frame = cv2.resize(frame, confData[num]['size']) if confData[num]['undistort']['type'] == 0: if confData[num]['undistort']['distortion_f'] != 0: frame = cv2.remap(frame, confData[num]['distortion_maps']['map_x'], confData[num]['distortion_maps']['map_y'], cv2.INTER_LINEAR) else: frame = cv2.undistort(frame, confData[num]['undistort']['calibration']['matrix'], confData[num]['undistort']['calibration']['distortion'], None, confData[num]['undistort']['calibration']['newcameramtx']) x, y, w, h = confData[num]['undistort']['calibration']['roi'] frame = frame[y: y+h, x: x+w] return frame def frame_transform(frame, num): rotation = confData[num]['image_transform']['rotation'] scale_x = confData[num]['image_transform']['scale_x'] scale_y = confData[num]['image_transform']['scale_y'] if rotation != 0: rotate = cv2.ROTATE_90_CLOCKWISE if rotation == 90: rotate = cv2.ROTATE_90_CLOCKWISE if rotation == 180: rotate = cv2.ROTATE_180 if rotation == -90: rotate = cv2.ROTATE_90_COUNTERCLOCKWISE frame = cv2.rotate(frame, rotate) if (scale_x != 0) or (scale_y != 0): frame = cv2.resize(frame, None, fx=scale_x, fy=scale_y) return frame def image_processing(num, cvParams): global confData objects = {} # try to start VideoCapture if its not opened yet if not confData[num]['vid'].isOpened(): videoCapture(num) # if opened then try to read next frame if not confData[num]['vid'].isOpened(): #frame = emptyFrame frame = get_empty_frame(num) else: # read the next frame from the video stream ret, frame = confData[num]['vid'].read() if ret: # Image Correction frame = frame_undistort(frame, num) # Frame Transform frame = frame_transform(frame, num) # Frame Process frame, objects = frame_process(num, frame, confData[num]['viewParams'], cvParams, confData[num]['cvSettings']) # Draw grid frame = frame_grid(frame, {'grid':1, 'marker':0}) else: confData[num]['vid'].release() #frame = emptyFrame frame = get_empty_frame(num) (flag, encodedImage) = cv2.imencode(".jpg", frame) jpg_as_text = 'data:image/jpeg;base64,' + base64.b64encode(encodedImage).decode('utf-8') result = {'objects': objects, 'img': jpg_as_text} return result def video_processing(num): global confData # loop over frames from the video stream while True: # try to start VideoCapture if its not opened yet if not confData[num]['vid'].isOpened(): videoCapture(num) # if opened then try to read next frame if not confData[num]['vid'].isOpened(): frame = get_empty_frame(num) #frame = emptyFrame #frame = cv2.resize(frame, (confData[num]['size'])) #frame = frame_grid(frame, confData[num]['viewParams']) time.sleep(1.0) else: # read the next frame from the video stream ret, frame = confData[num]['vid'].read() if ret: # Image Correction if confData[num]['viewParams']['raw'] != 1: frame = frame_undistort(frame, num) # Frame Transform frame = frame_transform(frame, num) # Frame Process if confData[num]['viewParams']['cv'] != 0: frame, objects = frame_process(num, frame, confData[num]['viewParams'], confData[num]['cvSettings']) else: time.sleep(0.03) # Draw grid frame = frame_grid(frame, confData[num]['viewParams']) else: confData[num]['vid'].release() #frame = emptyFrame frame = get_empty_frame(num) #time.sleep(0.2) # acquire the lock, set the output frame, and release the # lock with confData[num]['lock']: confData[num]['outputFrame'] = frame.copy() def generate_video(num): global confData # loop over frames from the output stream while True: # wait until the lock is acquired with confData[num]['lock']: # check if the output frame is available, otherwise skip # the iteration of the loop if confData[num]['outputFrame'] is None: continue # encode the frame in JPEG format (flag, encodedImage) = cv2.imencode(".jpg", confData[num]['outputFrame']) # ensure the frame was successfully encoded if not flag: continue # yield the output frame in the byte format yield(b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + bytearray(encodedImage) + b'\r\n') def cam_calibration(num): global confData # try to start VideoCapture if its not opened yet if not confData[num]['vid'].isOpened(): videoCapture(num) # if opened then try to read next frame if not confData[num]['vid'].isOpened(): result = {'result': 'error', 'msg': 'cannot find camera'} else: # read the next frame from the video stream ret, frame = confData[num]['vid'].read() if ret: square_size = 1.0 pattern_size = (9, 6) pattern_points = np.zeros((np.prod(pattern_size), 3), np.float32) pattern_points[:, :2] = np.indices(pattern_size).T.reshape(-1, 2) pattern_points *= square_size obj_points = [] img_points = [] h, w = 0, 0 h, w = frame.shape[:2] found, corners = cv2.findChessboardCorners(frame, pattern_size) if found: #term = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_COUNT, 30, 0.1) #cv2.cornerSubPix(frame, corners, (5, 5), (-1, -1), term) img_points.append(corners.reshape(-1, 2)) obj_points.append(pattern_points) rms, camera_matrix, dist_coefs, rvecs, tvecs = cv2.calibrateCamera(obj_points, img_points, (w, h), None, None) result = {'result': 'OK', 'matrix': camera_matrix.tolist(), 'distortion': dist_coefs.tolist()} if not found: result = {'result': 'error', 'msg': 'chessboard not found'} else: confData[num]['vid'].release() result = {'result': 'error', 'msg': 'cannot read image from camera'} return result ################################## ################################## # Flask App ################################## app = Flask(__name__, template_folder='templates') app.secret_key = 'ASA:VTMJI~ZvPOS8W:0L2cAb?WB}R0V_' origins_domains = '*' ############################## # Static files ############################## @app.route('/assets/') def send_frontend_files(path): return send_from_directory('frontend/assets/', path) @app.route('/images/') def send_images_files(path): return send_from_directory('frontend/images/', path) ############################## # Main Page ############################## @app.route('/', methods=['GET']) def home_page(): #return render_template('index.html'), 200 return send_from_directory('frontend/', 'index.html') @app.route("/video_feed/", defaults={'raw':0, 'cv': 0, 'ppmm': 10, 'grid':0, 'marker':0 }) @app.route("/video_feed//////") # integer ppmm @app.route("/video_feed//////") # float ppmm def video_feed0(num, raw, cv, ppmm, grid, marker): global confData confData[num]['viewParams']['raw'] = raw confData[num]['viewParams']['cv'] = cv confData[num]['viewParams']['ppmm'] = ppmm confData[num]['viewParams']['grid'] = grid confData[num]['viewParams']['marker'] = marker return Response(generate_video(num), mimetype = "multipart/x-mixed-replace; boundary=frame") @app.route("/cv/", methods=['POST']) @cross_origin(origins=origins_domains) def cv_params(num): cvParams = request.json return jsonify(image_processing(num, cvParams)), 200 @app.route("/calibration/") @cross_origin(origins=origins_domains) def calibration(num): return jsonify(cam_calibration(num)), 200 @app.route("/settings", methods=['GET']) @cross_origin(origins=origins_domains) def settingsGet(): global settingsFilename with open(settingsFilename) as infile: result = json.load(infile) return result, 200 @app.route("/settings", methods=['POST']) @cross_origin(origins=origins_domains) def settingsSave(): global settingsFilename try: json_object = json.dumps(request.json, indent=2) with open(settingsFilename, "w") as outfile: outfile.write(json_object) load_cam_settings() except Exception as e: return jsonify({'error': repr(e)}), 200 else: return jsonify({'result': 'OK'}), 200 @app.route("/json/", defaults={'folder':''}, methods=['GET']) @app.route("/json//", methods=['GET']) @cross_origin(origins=origins_domains) def jsonGet(folder, file): global storageFolder #filename = 'settings/' + folder + '/' + file + '.json' if folder == '': #filename = 'settings/' + file + '.json' filename = storageFolder + '/' + file + '.json' else: #filename = 'settings/' + folder + '/' + file + '.json' filename = storageFolder + '/' + folder + '/' + file + '.json' with open(filename) as infile: result = json.load(infile) result = jsonify({"result": result}) return result, 200 @app.route("/json-list/", methods=['GET']) @cross_origin(origins=origins_domains) def jsonList(folder): global storageFolder #files = os.listdir('settings/' + folder) files = os.listdir(storageFolder + '/' + folder) result = jsonify({"result": files}) return result, 200 @app.route("/json/", defaults={'folder':''}, methods=['POST']) @app.route("/json//", methods=['POST']) @cross_origin(origins=origins_domains) def jsonSave(folder, file): global storageFolder try: json_object = json.dumps(request.json, indent=2) #filename = 'settings/' + folder + '/' + file + '.json' if folder == '': #filename = 'settings/' + file + '.json' filename = storageFolder + '/' + file + '.json' else: #filename = 'settings/' + folder + '/' + file + '.json' filename = storageFolder + '/' + folder + '/' + file + '.json' with open(filename, "w") as outfile: outfile.write(json_object) except Exception as e: return jsonify({'error': repr(e)}), 200 else: return jsonify({'result': 'OK'}), 200 @app.route("/json/rename///", methods=['GET']) @cross_origin(origins=origins_domains) def jsonRename(folder, file_old, file_new): global storageFolder filename_old = storageFolder + '/' + folder + '/' + file_old + '.json' filename_new = storageFolder + '/' + folder + '/' + file_new + '.json' if os.path.exists(filename_old): try: os.rename(filename_old, filename_new) return jsonify({'result': 'OK'}), 200 except Exception as e: return jsonify({'error': repr(e)}), 200 else: return jsonify({'error': 'File not exists'}), 200 @app.route("/json/delete//", methods=['GET']) @cross_origin(origins=origins_domains) def jsonDelete(folder, file): global storageFolder filename = storageFolder + '/' + folder + '/' + file + '.json' if os.path.exists(filename): try: os.remove(filename) return jsonify({'result': 'OK'}), 200 except Exception as e: return jsonify({'error': repr(e)}), 200 else: return jsonify({'error': 'File not exists'}), 200 ############################## # Error Pages ############################## @ app.errorhandler(404) def err_404(error): return render_template('error.html', message='Page not found'), 404 @ app.errorhandler(500) def err_500(error): return render_template('error.html', message='Internal server error'), 500 ############################## # Logging configure ############################## if not app.debug: import logging from logging.handlers import RotatingFileHandler file_handler = RotatingFileHandler('log/my_app.log', 'a', 1 * 1024 * 1024, 10) file_handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]')) app.logger.setLevel(logging.INFO) file_handler.setLevel(logging.INFO) app.logger.addHandler(file_handler) app.logger.info('startup') ############################## # Run app ############################## if __name__ == '__main__': t1 = threading.Thread(target=video_processing, args=(0,), daemon = True).start() t2 = threading.Thread(target=video_processing, args=(1,), daemon = True).start() app.run(host='0.0.0.0')