Files
2026-05-20 15:05:35 +08:00

463 lines
24 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*
import os,time,sys,threading, colorsys, argparse
import asyncio, cv2, multiprocessing
from PIL import Image
import numpy as np
def getFileList(dir,Filelist=[], ext=None, Max_layer=1, layer=0, Donot_Search=['1_颜色预处理', '2_灰度化', '3_边缘化']):
"""
获取文件夹及其子文件夹中文件列表
输入 dir文件夹根目录
输入 ext: 扩展名
返回: 文件路径列表
"""
newDir = dir
if os.path.isfile(dir):
if ext is None:
Filelist.append(dir)
else:
if ext in dir[-3:]:
Filelist.append(dir)
elif os.path.isdir(dir):
file_name = os.path.basename(dir)
# 判断是否在禁搜名单中
if file_name in Donot_Search:
return Filelist
for s in os.listdir(dir):
newDir=os.path.join(dir,s)
if layer <= Max_layer:
getFileList(newDir, Filelist, ext, Max_layer, layer+1)
return Filelist
class Deal_image():
def __init__(self, Annotate_CLASSES = ('肝脏','胆囊'), Annotate_PALETTE = [[255,91,0],[255,234,0]], src_label_fold = "./Label", save_pro_label_fold = "./LABEL_PNG_new", save_GT_label_fold = "./Label_Generate", GT_channel = 1, pro_append_name="_label", GT_append_name="_gtFine_labelTrainIds", ori_img_folder="./ORI_PNG", res_label_folder="./Result_label", save_merge_pic_folder="./Result_merge", back_gnd_color=0, first_class_color=1, pic_type="png", Max_width = 10000, Label_Max_Search_layer=1000, save_process_pics=False):
self.src_CLASSES = ('肝脏','胆囊','分离钳','止血海绵','肝总管','胆总管','吸引器','剪刀','止血纱布','生物夹','无损伤钳','喷洒','胆囊管','胆囊动脉','电凝','标本袋','引流管','纱布','金属钛夹','背景')
self.src_PALETTE = np.array([[255,91,0],[255,234,0],[85, 107, 179],[181, 227, 14],[72, 0, 255],[0, 155, 33],[255,0,255],[29, 32, 136],[160, 15, 95],[0,160,233],[52,184,178],[90,120,41],[255,0,0],[177,0,0],[167,24,233],[112,113,150],[0,255,0],[255,255,255],[0,255,255],[0,0,0]])
self.src_CLASSES_NUM = np.shape(self.src_CLASSES)[0]
self.Annotate_CLASSES = Annotate_CLASSES # 待分类的类
self.Annotate_PALETTE = np.array(Annotate_PALETTE) # 每一类的像素直
self.Annotate_CLASSES_NUM = np.shape(Annotate_CLASSES)[0] # 类数量
self.save_process_pics = save_process_pics # 保存中间过程图片
self.src_label_fold = src_label_fold # 原始标签图片 保存位置
self.save_pro_label_fold = save_pro_label_fold # 优化后标签图片 保存位置
self.save_GT_label_fold = save_GT_label_fold # GT标签图片 保存位置
self.ori_img_folder = ori_img_folder # 最原始手术图片 保存位置
self.res_label_folder = res_label_folder # 训练出来的label 保存位置
self.save_merge_pic_folder = save_merge_pic_folder # 融合图像保存位置
self.pro_append_name = pro_append_name # 优化后标签图片后缀
self.GT_append_name = GT_append_name # GT标签图片后缀
self.GT_channel = GT_channel # GT标签图片通道数
self.Max_width = Max_width # 最大图片宽度(匹配时候用)
self.pic_type = pic_type # 图片类型
self.back_gnd_color = back_gnd_color # 背景颜色
self.first_class_color = first_class_color # 第一类上的颜色
self.Label_Max_Search_layer=Label_Max_Search_layer # 文件夹最大搜索深度
try:
self.labellist_src = getFileList(src_label_fold, [], pic_type, self.Label_Max_Search_layer)
print('本次执行检索到ori_label图片 '+str(len(self.labellist_src))+' 张图像')
except:
self.labellist_src = None
print("没有ori_label相关文件")
try:
# print(save_pro_label_fold)
self.labellist_pro = getFileList(save_pro_label_fold, [], pic_type, self.Label_Max_Search_layer)
print('本次执行检索到pro_label图片 '+str(len(self.labellist_pro))+' 张图像')
except:
self.labellist_pro = None
print("没有pro_label相关文件")
try:
self.imglist_src = getFileList(ori_img_folder, [], pic_type, self.Label_Max_Search_layer)
self.reslist_src = getFileList(res_label_folder, [], pic_type, self.Label_Max_Search_layer)
print('本次执行检索到ori原始图片 '+str(len(self.imglist_src))+' 张图像')
print('本次执行检索到训练train_result图片 '+str(len(self.reslist_src))+' 张图像')
except:
self.imglist_src = None
self.reslist_src = None
print("没有train_result和原始图片相关文件")
# 获取单张图片各个通路信息
def get_single_pic_rgb(self, imgpath):
print(imgpath)
image = Image.open(imgpath).convert('RGB') # 转为RGB图片
# 将 RGB 色值分离
image.load()
r, g, b = image.split()
r = np.array(r)
g = np.array(g)
b = np.array(b)
return image, r, g, b
# 将单个pro图片变成GT图片
def Conver_pro_label_pic_2_GT_pic(self, imgpath, imgname):
time_start=time.time() # 记录开始时间
# 获取单张图片各个通路信息
image, r,g,b = self.get_single_pic_rgb(imgpath)
result_gt = np.ones(np.shape(image))*self.back_gnd_color # 初始化填充内容为back_gnd_color
gt_number = self.first_class_color # 第一类上色颜色确定
# 遍历所有待识别颜色
for [Annotate_PALETTE_r, Annotate_PALETTE_g, Annotate_PALETTE_b] in self.Annotate_PALETTE:
# 查找三原色匹配位置
locate_r = np.where( r == Annotate_PALETTE_r )
locate_g = np.where( g == Annotate_PALETTE_g )
locate_b = np.where( b == Annotate_PALETTE_b )
# 查找都匹配位置(交集)
# 将矩阵换一种表示形式
locate_r = np.array(locate_r[0]) * Max_width + np.array(locate_r[1])
locate_g = np.array(locate_g[0]) * Max_width + np.array(locate_g[1])
locate_b = np.array(locate_b[0]) * Max_width + np.array(locate_b[1])
# 用自带函数寻找匹配项
matched = np.intersect1d(np.intersect1d(locate_r, locate_g), locate_b)
matched = np.concatenate(([matched // self.Max_width], [np.mod(matched, self.Max_width)]), 0)
result_gt[matched[0],matched[1], :] = gt_number
gt_number = gt_number + 1
# 输出GT图片
if(int(self.GT_channel) == 1):
result_gt = result_gt[:,:,0]
elif(int(self.GT_channel) == 3):
result_gt = cv2.cvtColor(np.float32(result_gt), cv2.COLOR_RGB2BGR) # rgb颜色互换
else:
print("GT_channel 必须为1或3")
quit
try: # 新建文件夹
os.mkdir(self.save_GT_label_fold)
except:
print("已有"+self.save_GT_label_fold)
save_dir = os.path.join(self.save_GT_label_fold, os.path.splitext(imgname)[0]+self.GT_append_name+'.'+self.pic_type)
cv2.imwrite(save_dir, result_gt)
print("GT图片已保存", save_dir)
time_end=time.time() # 输出结束时间
print('time cost',time_end-time_start,'s')
# 将处理好的图片转化为GT图片
def Conver_pro_label_pic_2_GT_pic_all(self):
print("\033[33m**** 进行转换将Pro_label_pic转换为GT_label_pic ****\033[0m")
print("\033[33mPro_label_pic存储位置为\033[0m", self.save_pro_label_fold)
print("\033[33mGT_label_pic生成位置为\033[0m", self.save_GT_label_fold)
try:
# print(save_pro_label_fold)
self.labellist_pro = getFileList(save_pro_label_fold, [], pic_type, self.Label_Max_Search_layer)
print('本次执行检索到pro_label图片 '+str(len(self.labellist_pro))+' 张图像')
except:
self.labellist_pro = None
print("没有pro_label相关文件")
try:
os.mkdir(self.save_GT_label_fold) # 新建存储文件夹
except:
print("已有"+self.save_GT_label_fold)
# 指定最大进程数为 3
max_processes = 20
# 创建Pool对象
pool = multiprocessing.Pool(processes=max_processes)
# 创建并启动进程
args_list1 = []
args_list2 = []
# 遍历整个文件夹
for imgpath in self.labellist_pro:
imgname= os.path.splitext(os.path.basename(imgpath))[0].replace(self.pro_append_name,"")
args_list1.append(imgpath)
args_list2.append(imgname)
args_list = zip(args_list1, args_list2)
# 使用进程池并行执行任务
pool.starmap(self.Conver_pro_label_pic_2_GT_pic, args_list)
# 关闭进程池
pool.close()
pool.join()
def Conver_ori_label_pic_2_pro_pic(self, imgpath, imgname):
time_start=time.time() # 记录开始时间
# 获取单张图片各个通路信息
image, r, g, b = self.get_single_pic_rgb(imgpath)
result_pro = np.zeros(np.shape(image)) # pro图片初始化默认图片背景为0.0.0
# 生成距离初始矩阵 width*height*num_of_classes
Dis_mat = np.zeros((np.shape(image)[0], np.shape(image)[1], self.src_CLASSES_NUM))
# 遍历所有类
i = 0 # 顺序指示物
# 遍历寻找最短距离是第几个内容
for palette in self.src_PALETTE:
Dis_mat[:,:,i] = np.abs((r - palette[0])) + np.abs((g - palette[1])) + np.abs((b - palette[2]))
i = i + 1
Min_Dis_mat = np.argmin(Dis_mat, axis = 2)
# 给图片上色
for number_class in range(self.src_CLASSES_NUM):
# 查找图片种类
loc_of_class_pic = np.where(Min_Dis_mat == number_class)
# 否则给图片上对应颜色
result_pro[loc_of_class_pic[0], loc_of_class_pic[1], :] = self.src_PALETTE[number_class]
result_pro = cv2.cvtColor(np.float32(result_pro), cv2.COLOR_RGB2BGR) # rgb颜色互换
# 如果需要存储中间态图片
if(self.save_process_pics == True):
save_dir = os.path.join(self.save_pro_label_fold, '1_颜色预处理', os.path.splitext(imgname)[0]+self.pro_append_name+'_preproc'+'.'+self.pic_type)
cv2.imwrite(save_dir, result_pro)
print("中间态-颜色图片已保存", save_dir)
time_end=time.time() # 输出结束时间
print('time cost',time_end-time_start,'s')
# 边界区域大小
half_region_len=(5-1)//2
pic_height = result_pro.shape[0]
pic_width = result_pro.shape[1]
# 2_灰度化
gray = cv2.cvtColor(result_pro, cv2.COLOR_BGR2GRAY)
gray = (gray*255).astype(np.uint8) # cv2对图片格式要求较为严格
# 如果需要存储中间态图片
if(self.save_process_pics == True):
save_dir = os.path.join(self.save_pro_label_fold, '2_灰度化', os.path.splitext(imgname)[0]+self.pro_append_name+'_gray'+'.'+self.pic_type)
cv2.imwrite(save_dir, gray)
print("中间态-灰度图片已保存", save_dir)
time_end=time.time() # 输出结束时间
print('time cost',time_end-time_start,'s')
# 边缘检测
edges = cv2.Canny(gray, 50, 150)
# 定义膨胀核
kernel = np.ones((3, 3), np.uint8)
# 对Canny输出图像进行膨胀
edges = cv2.dilate(edges, kernel, iterations=1)
# 如果需要存储中间态图片
if(self.save_process_pics == True):
save_dir = os.path.join(self.save_pro_label_fold, '3_边缘化', os.path.splitext(imgname)[0]+self.pro_append_name+'_edge'+'.'+self.pic_type)
cv2.imwrite(save_dir, edges)
print("中间态-边缘图片已保存", save_dir)
time_end=time.time() # 输出结束时间
print('time cost',time_end-time_start,'s')
# 查找边缘
nonzero_idx = np.array(np.nonzero(edges)).T
# nonzero_idx, counts = np.unique(nonzero_idx, axis=0, return_counts=True)
print("边缘个数:",np.shape(nonzero_idx)[0])
for k in range(len(nonzero_idx)):
i = nonzero_idx[k][0]
j = nonzero_idx[k][1]
# 遍历周围3*3的区域
# 找到与当前像素值附近5*5*3(channel)的像素并将矩阵变为25(5*5)*3(channel)
if i <= half_region_len:
i = half_region_len
elif i >= pic_height-half_region_len:
i = pic_height-half_region_len
if j <= half_region_len:
j = half_region_len
elif j >= pic_width-half_region_len:
j = pic_width-half_region_len
region_x = i
region_y = j
region = np.array(result_pro[region_x-half_region_len:region_x+half_region_len, region_y-half_region_len:region_y+half_region_len,:]).reshape(-1, 3)
# 计算唯一列及其出现次数
# print(region_x-half_region_len, region_x+half_region_len, region_y-half_region_len, region_y+half_region_len)
unique_columns, counts = np.unique(region, axis=0, return_counts=True)
# 找出出现次数最多的列的索引
max_count_index = np.argmax(counts)
# 将其赋值为出现次数最多的列
result_pro[i, j, :] = unique_columns[max_count_index]
# result_pro[i, j, :] = [0,0,0]
save_dir = os.path.join(self.save_pro_label_fold, os.path.splitext(imgname)[0]+self.pro_append_name+'.'+self.pic_type)
print("Pro图片已保存", save_dir)
cv2.imwrite(save_dir, result_pro)
time_end=time.time() # 输出结束时间
print('time cost',time_end-time_start,'s')
# 将原始src图片转化为处理好的pro图片
def Conver_ori_label_pic_2_pro_pic_all(self):
print("\033[33m**** 进行转换将Ori_label_pic转换为Pro_label_pic ****\033[0m")
print("\033[33mOri_label_pic存储位置为\033[0m", self.src_label_fold)
print("\033[33mPro_label_pic生成位置为\033[0m", self.save_pro_label_fold)
# 输出颜色预处理图片
try:
os.mkdir(self.save_pro_label_fold) # 新建存储文件夹
except:
print("已有"+self.save_pro_label_fold)
if(self.save_process_pics == True):
try:
os.mkdir(os.path.join(self.save_pro_label_fold, '1_颜色预处理')) # 新建存储1_颜色预处理文件夹
except:
print("已有"+os.path.join(self.save_pro_label_fold, '1_颜色预处理'))
try:
os.mkdir(os.path.join(self.save_pro_label_fold, '2_灰度化')) # 新建存储2_灰度化文件夹
except:
print("已有"+os.path.join(self.save_pro_label_fold, '2_灰度化'))
try:
os.mkdir(os.path.join(self.save_pro_label_fold, '3_边缘化')) # 新建存储1_颜色预处理文件夹
except:
print("已有"+os.path.join(self.save_pro_label_fold, '3_边缘化'))
# 指定最大进程数为 20多参数函数并行
max_processes = 20
# 创建Pool对象
pool = multiprocessing.Pool(processes=max_processes)
# 创建并启动进程
args_list1 = []
args_list2 = []
# 遍历整个文件夹
for imgpath in self.labellist_src:
try:
imgname= os.path.splitext(os.path.basename(imgpath))[0].replace(self.pro_append_name,"")
print("Processing: ", imgname, "...")
# self.Conver_ori_label_pic_2_pro_pic(imgpath, imgname)s
# args_list.append({'imgpath': imgpath, 'imgname': imgname})
args_list1.append(imgpath)
args_list2.append(imgname)
except:
os.system("echo "+imgname+" >> error_1.txt")
args_list = zip(args_list1, args_list2)
# 使用进程池并行执行任务
pool.starmap(self.Conver_ori_label_pic_2_pro_pic, args_list) # 使用starmap进行多参数并行
# 关闭进程池
pool.close()
pool.join()
# 图片堆叠
def Merge_ori_pic_and_label_pic(self, res_img_path, res_imgname):
time_start=time.time() # 记录开始时间
# 获取单张图片各个通路信息
ori_img_path = os.path.join(self.ori_img_folder, res_imgname+'.'+self.pic_type)
if not os.path.exists(ori_img_path):
print("****照片不存在:****", ori_img_path)
return -1
ori_image, ori_r, ori_g, ori_b = self.get_single_pic_rgb(ori_img_path)
res_image, res_r, res_g, res_b = self.get_single_pic_rgb(res_img_path)
merge_img = np.array(ori_image) # merge图片初始化默认图片背景为0.0.0
# 遍历所有待识别颜色
for [Annotate_PALETTE_r, Annotate_PALETTE_g, Annotate_PALETTE_b] in self.Annotate_PALETTE:
# 查找三原色匹配位置
locate_r = np.where( res_r == Annotate_PALETTE_r )
locate_g = np.where( res_g == Annotate_PALETTE_g )
locate_b = np.where( res_b == Annotate_PALETTE_b )
# 查找都匹配位置(交集)
# 将矩阵换一种表示形式
locate_r = np.array(locate_r[0]) * self.Max_width + np.array(locate_r[1])
locate_g = np.array(locate_g[0]) * self.Max_width + np.array(locate_g[1])
locate_b = np.array(locate_b[0]) * self.Max_width + np.array(locate_b[1])
# 用自带函数寻找匹配项
matched = np.intersect1d(np.intersect1d(locate_r, locate_g), locate_b)
matched = np.concatenate(([matched // self.Max_width], [np.mod(matched, self.Max_width)]), 0)
merge_img[matched[0],matched[1], 0] = Annotate_PALETTE_r
merge_img[matched[0],matched[1], 1] = Annotate_PALETTE_g
merge_img[matched[0],matched[1], 2] = Annotate_PALETTE_b
# 转成cv2形式
merge_img = cv2.cvtColor(np.float32(merge_img), cv2.COLOR_RGB2BGR)
try: # 新建文件夹
os.mkdir(self.save_merge_pic_folder)
except:
print("已有"+self.save_merge_pic_folder)
save_dir = os.path.join(self.save_merge_pic_folder, os.path.splitext(res_imgname)[0]+'.'+self.pic_type)
cv2.imwrite(save_dir, merge_img)
print("Merge图片已保存", save_dir)
time_end=time.time() # 输出结束时间
print('time cost',time_end-time_start,'s')
# 将label图片与原图片重合
def Merge_ori_pic_and_label_pic_all(self):
# 遍历整个文件夹
for res_img_path in self.reslist_src:
res_imgname = os.path.splitext(os.path.basename(res_img_path))[0].replace(self.pro_append_name,"")
print("Processing: ", res_imgname, "...")
self.Merge_ori_pic_and_label_pic(res_img_path, res_imgname)
if __name__ == "__main__":
Annotate_CLASSES = ('肝脏','胆囊','分离钳','止血海绵','肝总管','胆总管','吸引器','剪刀','止血纱布','生物夹','无损伤钳','喷洒','胆囊管','胆囊动脉','电凝','标本袋','引流管','纱布','金属钛夹') # 待分类的类
Annotate_PALETTE = [[255,91,0],[255,234,0],[85, 107, 179],[181, 227, 14],[72, 0, 255],[0, 155, 33],[255,0,255],[29, 32, 136],[160, 15, 95],[0,160,233],[52,184,178],[90,120,41],[255,0,0],[177,0,0],[167,24,233],[112,113,150],[0,255,0],[255,255,255],[0,255,255]]# [[255,91,0],[255,234,0]] # 每一类的像素直
# 创建参数解析器
parser = argparse.ArgumentParser(description='Process some files.')
# 添加参数选项
parser.add_argument('-src_fold', dest='src_label_fold', default='', help='source label folder')
parser.add_argument('-save_pro_fold', dest='save_pro_label_fold', default='./save_pro_label_fold', help='processed label folder')
parser.add_argument('-save_GT_fold', dest='save_GT_label_fold', default='./save_GT_label_fold', help='ground truth folder')
parser.add_argument('-fold_search_depth', dest='Label_Max_Search_layer', default='1000', type=int, help='Folder Search Depth')
parser.add_argument('-pro_suffix_name', dest='pro_append_name', default='_label', help='Pro file suffix')
parser.add_argument('-GT_suffix_name', dest='GT_append_name', default='_gtFine_labelTrainIds', help='GT file suffix')
parser.add_argument('-GT_channel', dest='GT_channel', default='1', type=int, help='GT file channel(1 or 3)')
parser.add_argument('-back_gnd_color', dest='back_gnd_color', default='0', type=int, help='Color of "Back ground"(0 or 255)')
parser.add_argument('-first_class_color', dest='first_class_color', default='1', type=int, help='Color of "First Class"')
parser.add_argument('-pic_type', dest='pic_type', default='png', help='type of picture(Do not add ".")')
parser.add_argument('-Max_width', dest='Max_width', default='10000', type=int, help='Max width of picture')
parser.add_argument('-Rebuild_from', dest='Rebuild_from', default='label', help='Source to Rebuild Labels(label/pro)')
parser.add_argument('-Rebuild_to', dest='Rebuild_to', default='GT', help='Destination of Rebuild Labels(pro/GT)')
parser.add_argument('-save_process_pics', dest='save_process_pics', default='False', help='Save the processed pics(e.g.Gray_pics,Color_pics) in generating pro_pics')
# 解析命令行参数
args = parser.parse_args()
src_label_fold = args.src_label_fold
save_pro_label_fold = args.save_pro_label_fold
save_GT_label_fold = args.save_GT_label_fold
Label_Max_Search_layer = args.Label_Max_Search_layer
pro_append_name = args.pro_append_name
GT_append_name = args.GT_append_name
GT_channel = args.GT_channel
back_gnd_color = args.back_gnd_color
first_class_color = args.first_class_color
pic_type = args.pic_type
Max_width = args.Max_width
Rebuild_from = args.Rebuild_from
Rebuild_to = args.Rebuild_to
save_process_pics = args.save_process_pics
try: # 遍历文件深度最小为1
Label_Max_Search_layer=int(Label_Max_Search_layer)
except:
Label_Max_Search_layer=1000
try: # GT标签图片通道数
GT_channel=int(GT_channel)
except:
GT_channel=1
try: # 背景颜色背景选择0或255)
back_gnd_color=int(back_gnd_color)
except:
back_gnd_color=0
try: # 第一类上的颜色(如果背景为0,选择1)
first_class_color=int(first_class_color)
except:
first_class_color=1
try: # 最大图片宽度(匹配时候用)
Max_width=int(Max_width)
except:
Max_width=10000
if(save_process_pics.lower() == 'false'):
save_process_pics = False
elif(save_process_pics.lower() == 'true'):
save_process_pics = True
else:
save_process_pics = False
D = Deal_image(Annotate_CLASSES=Annotate_CLASSES, Annotate_PALETTE=Annotate_PALETTE, src_label_fold=src_label_fold, save_pro_label_fold=save_pro_label_fold, save_GT_label_fold=save_GT_label_fold, GT_channel=GT_channel, pro_append_name=pro_append_name, GT_append_name=GT_append_name, back_gnd_color=back_gnd_color, first_class_color=first_class_color, pic_type=pic_type, Max_width=Max_width, Label_Max_Search_layer=Label_Max_Search_layer, save_process_pics=save_process_pics)
# print(D.src_CLASSES_NUM)
if Rebuild_from == 'label':
# 1.先将所有原始图片转为pro图片
D.Conver_ori_label_pic_2_pro_pic_all()
pass
if Rebuild_to == 'GT':
# 2.再将pro图片转为GT图片
D.Conver_pro_label_pic_2_GT_pic_all()
pass