グラム

import boto3
import cv2
import numpy as np
import matplotlib.pyplot as plt
from io import BytesIO
from botocore.exceptions import NoCredentialsError
import tempfile
import os

# AWSの認証情報を設定
aws_access_key_id = 'YOUR_AWS_ACCESS_KEY_ID'
aws_secret_access_key = 'YOUR_AWS_SECRET_ACCESS_KEY'
region_name = 'YOUR_AWS_REGION'

# S3クライアントを作成
s3 = boto3.client(
   's3',
   aws_access_key_id=aws_access_key_id,
   aws_secret_access_key=aws_secret_access_key,
   region_name=region_name
)

# S3バケット名と教師データフォルダを指定
teacher_bucket_name = 'YOUR_TEACHER_BUCKET_NAME'
teacher_folder = 'path/to/teacher_folder/'  # 教師データのフォルダパス
video_bucket_name = 'YOUR_VIDEO_BUCKET_NAME'
video_key = 'path/to/your/video/file.mp4'

# アップロード先のS3バケットとフォルダを指定
output_bucket_name = 'YOUR_OUTPUT_BUCKET_NAME'
output_folder = 'path/to/output/folder/'

# 動画をS3からダウンロードして一時ファイルに保存
try:
   s3_object = s3.get_object(Bucket=video_bucket_name, Key=video_key)
   print(f"Retrieved {video_key} from S3")
except NoCredentialsError:
   print("Credentials not available")
except Exception as e:
   print(f"Error occurred: {e}")

with tempfile.NamedTemporaryFile(delete=False) as temp_file:
   temp_file.write(s3_object['Body'].read())
   temp_video_path = temp_file.name

# 特徴量の抽出とマッチング関数
def detect_and_compute_features(image):
   orb = cv2.ORB_create()
   keypoints, descriptors = orb.detectAndCompute(image, None)
   return keypoints, descriptors

# 教師データフォルダ内の全ファイルを取得
teacher_keys = []
try:
   response = s3.list_objects_v2(Bucket=teacher_bucket_name, Prefix=teacher_folder)
   if 'Contents' in response:
       for obj in response['Contents']:
           if obj['Key'].lower().endswith(('.jpg', '.jpeg', '.png')):
               teacher_keys.append(obj['Key'])
   print(f"Found {len(teacher_keys)} teacher images in {teacher_folder}")
except NoCredentialsError:
   print("Credentials not available")
except Exception as e:
   print(f"Error occurred: {e}")

# 各教師データごとに処理
for teacher_index, teacher_key in enumerate(teacher_keys):
   # 教師データをS3からダウンロードして読み込む
   try:
       s3_object = s3.get_object(Bucket=teacher_bucket_name, Key=teacher_key)
       print(f"Retrieved {teacher_key} from S3")
       image_stream = BytesIO(s3_object['Body'].read())
       teacher_image = cv2.imdecode(np.frombuffer(image_stream.read(), np.uint8), cv2.IMREAD_COLOR)
   except NoCredentialsError:
       print("Credentials not available")
       continue
   except Exception as e:
       print(f"Error occurred while retrieving {teacher_key}: {e}")
       continue

   # 教師データの特徴量を抽出
   _, teacher_descriptors = detect_and_compute_features(teacher_image)

   # 動画をビデオキャプチャとして読み込む
   video_stream = cv2.VideoCapture(temp_video_path)

   if not video_stream.isOpened():
       raise RuntimeError("Error: Could not open video stream")

   # フレームごとの特徴量を抽出して一致度を計算
   frame_count = 0
   similarity_scores = []
   best_match = -1
   best_frame = None
   best_frame_number = -1

   while video_stream.isOpened():
       ret, frame = video_stream.read()
       if ret:
           _, frame_descriptors = detect_and_compute_features(frame)
           if frame_descriptors is not None:
               bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)
               matches = bf.match(frame_descriptors, teacher_descriptors)
               total_matches = len(matches)
               similarity_scores.append(total_matches)
               if total_matches > best_match:
                   best_match = total_matches
                   best_frame = frame.copy()
                   best_frame_number = frame_count
           else:
               similarity_scores.append(0)
           frame_count += 1
       else:
           break

   # 動画ストリームをリリース
   video_stream.release()

   # グラフ化
   plt.figure()
   plt.plot(similarity_scores)
   plt.title(f'Similarity Scores for Teacher Image {teacher_index + 1}')
   plt.xlabel('Frame Number')
   plt.ylabel('Number of Matches')
   plt.show()

   if best_frame is not None:
       # 一時ファイルに保存
       with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as temp_frame_file:
           cv2.imwrite(temp_frame_file.name, best_frame)
           temp_frame_path = temp_frame_file.name

       # S3にアップロード
       output_key = f"{output_folder}best_match_teacher_image_{teacher_index + 1}_frame_{best_frame_number}.jpg"
       s3.upload_file(
           Filename=temp_frame_path,
           Bucket=output_bucket_name,
           Key=output_key
       )
       os.remove(temp_frame_path)

       print(f"Best match for Teacher Image {teacher_index + 1} is at frame {best_frame_number} and saved to {output_key}")

# 一時ファイルを削除
os.remove(temp_video_path)

print("Processing complete.")

この記事が気に入ったらサポートをしてみませんか?