diff --git a/experiments/cursor/__init__.py b/experiments/cursor/__init__.py new file mode 100644 index 000000000..ec808a6b9 --- /dev/null +++ b/experiments/cursor/__init__.py @@ -0,0 +1 @@ +"""Package containing cursor movement experiments.""" \ No newline at end of file diff --git a/experiments/cursor/grid.py b/experiments/cursor/grid.py new file mode 100644 index 000000000..70f87aad9 --- /dev/null +++ b/experiments/cursor/grid.py @@ -0,0 +1,207 @@ +"""Grid-based cursor movement experiment. + +This approach divides the screen into a grid and uses AI feedback to identify +which cell contains the target, then refines the position within that cell. +""" + +import cv2 +import numpy as np + +from openadapt import models +from openadapt.custom_logger import logger +from openadapt.strategies.cursor import CursorReplayStrategy + + +class GridCursorStrategy(CursorReplayStrategy): + """Grid-based cursor movement strategy.""" + + def __init__( + self, + recording: models.Recording, + grid_size: tuple[int, int] = (4, 4), # 4x4 grid by default + refinement_steps: int = 2, # Number of times to subdivide target cell + ) -> None: + """Initialize the GridCursorStrategy. + + Args: + recording (models.Recording): The recording object. + grid_size (tuple[int, int]): Number of rows and columns in the grid. + refinement_steps (int): Number of times to subdivide the target cell. + """ + super().__init__(recording, approach="grid") + self.grid_size = grid_size + self.refinement_steps = refinement_steps + self.current_grid = None + self.current_cell = None + self.refinement_step = 0 + + def _init_grid_approach( + self, screenshot: models.Screenshot, window_event: models.WindowEvent + ) -> models.ActionEvent: + """Initialize the grid-based cursor movement approach. + + Args: + screenshot (models.Screenshot): The current screenshot. + window_event (models.WindowEvent): The current window event. + + Returns: + models.ActionEvent: The initial action for the grid approach. + """ + # Create initial grid + height, width = screenshot.image.shape[:2] + rows, cols = self.grid_size + + # Calculate cell dimensions + cell_height = height // rows + cell_width = width // cols + + # Create grid representation + self.current_grid = { + 'height': height, + 'width': width, + 'rows': rows, + 'cols': cols, + 'cell_height': cell_height, + 'cell_width': cell_width, + 'target_row': None, + 'target_col': None, + } + + # Draw grid on screenshot for visualization + img_with_grid = self._draw_grid(screenshot.image.copy()) + + # Ask model to identify target cell + target_cell = self._identify_target_cell(img_with_grid, window_event) + self.current_cell = target_cell + + # Get initial position (center of target cell) + x, y = self._get_cell_center(target_cell) + + # Create mouse move action to initial position + return models.ActionEvent( + name="mouse_move", + mouse_x=x, + mouse_y=y, + window_event=window_event, + ) + + def _next_grid_action( + self, screenshot: models.Screenshot, window_event: models.WindowEvent + ) -> models.ActionEvent: + """Get the next action for the grid-based approach. + + Args: + screenshot (models.Screenshot): The current screenshot. + window_event (models.WindowEvent): The current window event. + + Returns: + models.ActionEvent: The next action for the grid approach. + """ + if self.refinement_step >= self.refinement_steps: + # We've finished refining, perform the click + return models.ActionEvent( + name="mouse_click", + mouse_x=self.action_history[-1].mouse_x, + mouse_y=self.action_history[-1].mouse_y, + window_event=window_event, + ) + + # Subdivide current cell into smaller grid + self._refine_grid() + self.refinement_step += 1 + + # Draw refined grid + img_with_grid = self._draw_grid(screenshot.image.copy()) + + # Ask model to identify target subcell + target_subcell = self._identify_target_cell(img_with_grid, window_event) + self.current_cell = target_subcell + + # Get refined position + x, y = self._get_cell_center(target_subcell) + + # Create mouse move action to refined position + return models.ActionEvent( + name="mouse_move", + mouse_x=x, + mouse_y=y, + window_event=window_event, + ) + + def _draw_grid(self, img: np.ndarray) -> np.ndarray: + """Draw the current grid on the image. + + Args: + img (np.ndarray): The image to draw on. + + Returns: + np.ndarray: The image with grid lines drawn. + """ + height, width = img.shape[:2] + rows, cols = self.grid_size + + # Draw vertical lines + for i in range(cols + 1): + x = (width * i) // cols + cv2.line(img, (x, 0), (x, height), (0, 255, 0), 1) + + # Draw horizontal lines + for i in range(rows + 1): + y = (height * i) // rows + cv2.line(img, (0, y), (width, y), (0, 255, 0), 1) + + return img + + def _identify_target_cell( + self, img_with_grid: np.ndarray, window_event: models.WindowEvent + ) -> tuple[int, int]: + """Ask the model to identify which grid cell contains the target. + + Args: + img_with_grid (np.ndarray): Screenshot with grid overlay. + window_event (models.WindowEvent): Current window event. + + Returns: + tuple[int, int]: (row, col) of the identified target cell. + """ + # TODO: Implement model prompting to identify target cell + # For now, return center cell + return (self.grid_size[0] // 2, self.grid_size[1] // 2) + + def _get_cell_center(self, cell: tuple[int, int]) -> tuple[int, int]: + """Get the center coordinates of a grid cell. + + Args: + cell (tuple[int, int]): (row, col) of the cell. + + Returns: + tuple[int, int]: (x, y) coordinates of cell center. + """ + row, col = cell + x = (col * self.current_grid['cell_width'] + + (col + 1) * self.current_grid['cell_width']) // 2 + y = (row * self.current_grid['cell_height'] + + (row + 1) * self.current_grid['cell_height']) // 2 + return x, y + + def _refine_grid(self) -> None: + """Subdivide the current cell into a finer grid.""" + row, col = self.current_cell + + # Calculate boundaries of current cell + x1 = col * self.current_grid['cell_width'] + y1 = row * self.current_grid['cell_height'] + x2 = (col + 1) * self.current_grid['cell_width'] + y2 = (row + 1) * self.current_grid['cell_height'] + + # Update grid to focus on current cell + self.current_grid = { + 'height': y2 - y1, + 'width': x2 - x1, + 'rows': self.grid_size[0], + 'cols': self.grid_size[1], + 'cell_height': (y2 - y1) // self.grid_size[0], + 'cell_width': (x2 - x1) // self.grid_size[1], + 'offset_x': x1, + 'offset_y': y1, + } \ No newline at end of file diff --git a/experiments/cursor/test_grid.py b/experiments/cursor/test_grid.py new file mode 100644 index 000000000..6f567742b --- /dev/null +++ b/experiments/cursor/test_grid.py @@ -0,0 +1,184 @@ +"""Test script for evaluating the grid-based cursor movement approach.""" + +import cv2 +import numpy as np +from pathlib import Path +import time + +from openadapt import models, replay +from openadapt.custom_logger import logger +from experiments.cursor.grid import GridCursorStrategy + + +def create_test_recording( + target_x: int, + target_y: int, + window_width: int = 800, + window_height: int = 600, +) -> models.Recording: + """Create a test recording with a target at the specified location. + + Args: + target_x (int): Target X coordinate. + target_y (int): Target Y coordinate. + window_width (int): Width of the test window. + window_height (int): Height of the test window. + + Returns: + models.Recording: A recording object for testing. + """ + # Create a blank image + img = np.zeros((window_height, window_width, 3), dtype=np.uint8) + + # Draw target (red circle) + cv2.circle(img, (target_x, target_y), 5, (0, 0, 255), -1) + + # Save image + test_dir = Path("experiments/cursor/test_data") + test_dir.mkdir(parents=True, exist_ok=True) + img_path = test_dir / "test_target.png" + cv2.imwrite(str(img_path), img) + + # Create screenshot + screenshot = models.Screenshot( + image=img, + timestamp=time.time(), + ) + + # Create window event + window_event = models.WindowEvent( + left=0, + top=0, + width=window_width, + height=window_height, + timestamp=time.time(), + ) + + # Create recording + recording = models.Recording( + screenshots=[screenshot], + window_events=[window_event], + action_events=[], # No actions needed for testing + timestamp=time.time(), + ) + + return recording + + +def evaluate_grid_strategy( + target_positions: list[tuple[int, int]], + grid_sizes: list[tuple[int, int]] = [(2, 2), (4, 4), (8, 8)], + refinement_steps: list[int] = [1, 2, 3], +) -> dict: + """Evaluate the grid-based cursor movement strategy. + + Args: + target_positions: List of (x, y) target positions to test. + grid_sizes: List of (rows, cols) grid sizes to test. + refinement_steps: List of refinement step counts to test. + + Returns: + dict: Evaluation results including accuracy and timing metrics. + """ + results = { + 'grid_size': [], + 'refinement_steps': [], + 'target_x': [], + 'target_y': [], + 'final_x': [], + 'final_y': [], + 'distance_error': [], + 'num_actions': [], + 'time_taken': [], + } + + for grid_size in grid_sizes: + for steps in refinement_steps: + for target_x, target_y in target_positions: + # Create test recording + recording = create_test_recording(target_x, target_y) + + # Initialize strategy + strategy = GridCursorStrategy( + recording=recording, + grid_size=grid_size, + refinement_steps=steps, + ) + + # Time the execution + start_time = time.time() + + try: + # Run strategy + strategy.run() + + # Get final position + final_action = strategy.action_history[-1] + final_x = final_action.mouse_x + final_y = final_action.mouse_y + + # Calculate error + distance_error = np.sqrt( + (final_x - target_x) ** 2 + + (final_y - target_y) ** 2 + ) + + # Record results + results['grid_size'].append(f"{grid_size[0]}x{grid_size[1]}") + results['refinement_steps'].append(steps) + results['target_x'].append(target_x) + results['target_y'].append(target_y) + results['final_x'].append(final_x) + results['final_y'].append(final_y) + results['distance_error'].append(distance_error) + results['num_actions'].append(len(strategy.action_history)) + results['time_taken'].append(time.time() - start_time) + + except Exception as e: + logger.exception(f"Error evaluating grid {grid_size} with {steps} " + f"refinement steps at target ({target_x}, {target_y}): {e}") + + return results + + +def main(): + """Run the grid strategy evaluation.""" + # Define test cases + window_width = 800 + window_height = 600 + target_positions = [ + (100, 100), # Top-left region + (700, 100), # Top-right region + (400, 300), # Center region + (100, 500), # Bottom-left region + (700, 500), # Bottom-right region + ] + + # Run evaluation + results = evaluate_grid_strategy(target_positions) + + # Print summary + print("\nGrid Strategy Evaluation Results:") + print("---------------------------------") + print(f"Total test cases: {len(results['grid_size'])}") + print(f"Average distance error: {np.mean(results['distance_error']):.2f} pixels") + print(f"Average actions per target: {np.mean(results['num_actions']):.2f}") + print(f"Average time per target: {np.mean(results['time_taken']):.2f} seconds") + + # Group by grid size + grid_sizes = sorted(set(results['grid_size'])) + print("\nResults by grid size:") + for grid_size in grid_sizes: + indices = [i for i, g in enumerate(results['grid_size']) if g == grid_size] + errors = [results['distance_error'][i] for i in indices] + actions = [results['num_actions'][i] for i in indices] + times = [results['time_taken'][i] for i in indices] + + print(f"\nGrid size: {grid_size}") + print(f" Average error: {np.mean(errors):.2f} pixels") + print(f" Average actions: {np.mean(actions):.2f}") + print(f" Average time: {np.mean(times):.2f} seconds") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/openadapt/strategies/cursor.py b/openadapt/strategies/cursor.py new file mode 100644 index 000000000..08105b271 --- /dev/null +++ b/openadapt/strategies/cursor.py @@ -0,0 +1,303 @@ +"""Implements a cursor-based replay strategy with visual feedback and self-correction. + +This strategy allows: +1. Painting a red dot on suggested target locations +2. Analyzing screenshots with dots for accuracy +3. Self-correcting based on visual feedback +4. Evaluating different cursor movement approaches +""" + +import cv2 +import numpy as np +from pprint import pformat + +from openadapt import models, strategies, utils +from openadapt.custom_logger import logger +from openadapt.strategies.mixins.openai import OpenAIReplayStrategyMixin + + +class CursorReplayStrategy(OpenAIReplayStrategyMixin, strategies.base.BaseReplayStrategy): + """Cursor replay strategy that uses visual feedback and self-correction.""" + + def __init__( + self, + recording: models.Recording, + approach: str = "grid", # One of: grid, joystick, quadrant, search, etc. + dot_radius: int = 5, + dot_color: tuple = (0, 0, 255), # BGR format - Red + max_corrections: int = 3, + accuracy_threshold: int = 10, # Pixel distance threshold for accuracy + ) -> None: + """Initialize the CursorReplayStrategy. + + Args: + recording (models.Recording): The recording object. + approach (str): The cursor movement approach to use. + dot_radius (int): Radius of the painted dot in pixels. + dot_color (tuple): BGR color tuple for the dot. + max_corrections (int): Maximum number of self-corrections to attempt. + accuracy_threshold (int): Maximum pixel distance considered accurate. + """ + super().__init__(recording) + self.approach = approach + self.dot_radius = dot_radius + self.dot_color = dot_color + self.max_corrections = max_corrections + self.accuracy_threshold = accuracy_threshold + self.correction_count = 0 + self.action_history = [] + + def paint_dot(self, screenshot: models.Screenshot, x: int, y: int) -> np.ndarray: + """Paint a dot on the screenshot at the specified coordinates. + + Args: + screenshot (models.Screenshot): The screenshot to paint on. + x (int): X coordinate for the dot. + y (int): Y coordinate for the dot. + + Returns: + np.ndarray: The modified screenshot image with the dot. + """ + img = screenshot.image.copy() + cv2.circle(img, (x, y), self.dot_radius, self.dot_color, -1) + return img + + def analyze_dot_accuracy( + self, screenshot: models.Screenshot, target_x: int, target_y: int + ) -> tuple[bool, str]: + """Analyze if the painted dot accurately represents the target location. + + Args: + screenshot (models.Screenshot): The screenshot with the painted dot. + target_x (int): Target X coordinate. + target_y (int): Target Y coordinate. + + Returns: + tuple[bool, str]: (is_accurate, feedback) where feedback explains any issues. + """ + # Create prompt for the model + system_prompt = """You are a computer vision expert analyzing cursor placement accuracy. +Given a screenshot with a red dot indicating a suggested cursor position, determine: +1. If the dot is accurately placed on the intended target +2. If not accurate, explain why and suggest how to correct it +3. Provide coordinates for any suggested corrections""" + + # Prepare the image and prompt + img_with_dot = screenshot.image + height, width = img_with_dot.shape[:2] + + prompt = f"""Analyze this screenshot where we've placed a red dot at coordinates ({target_x}, {target_y}). +The image dimensions are {width}x{height} pixels. + +Please determine: +1. Is the red dot accurately placed on the target? +2. If not, describe what's wrong and suggest corrections. +3. If corrections are needed, provide specific x,y coordinates. + +Format your response as: +ACCURATE: true/false +FEEDBACK: your analysis +CORRECTION: x,y coordinates (only if needed)""" + + # Get model's analysis + completion = self.get_completion(prompt, system_prompt) + + # Parse the response + lines = completion.strip().split('\n') + is_accurate = False + feedback = "Could not parse model response" + correction_coords = None + + for line in lines: + if line.startswith('ACCURATE:'): + is_accurate = line.split(':')[1].strip().lower() == 'true' + elif line.startswith('FEEDBACK:'): + feedback = line.split(':')[1].strip() + elif line.startswith('CORRECTION:'): + try: + coords = line.split(':')[1].strip() + x, y = map(int, coords.split(',')) + correction_coords = (x, y) + except (ValueError, IndexError): + pass + + # If we got correction coordinates, include them in the feedback + if correction_coords and not is_accurate: + feedback += f" Suggested coordinates: {correction_coords}" + + # Calculate distance to suggested correction + distance = np.sqrt( + (target_x - correction_coords[0]) ** 2 + + (target_y - correction_coords[1]) ** 2 + ) + + # Update accuracy based on distance threshold + if distance <= self.accuracy_threshold: + is_accurate = True + feedback += f" (within {self.accuracy_threshold}px threshold)" + + return is_accurate, feedback + + def get_next_action_event( + self, + screenshot: models.Screenshot, + window_event: models.WindowEvent, + ) -> models.ActionEvent: + """Get the next ActionEvent for replay. + + Args: + screenshot (models.Screenshot): The current screenshot. + window_event (models.WindowEvent): The current window event. + + Returns: + models.ActionEvent: The next ActionEvent for replay. + """ + if not self.action_history: + # First action - initialize based on approach + action = self._initialize_approach(screenshot, window_event) + else: + # Get next action based on current state and approach + action = self._get_next_approach_action(screenshot, window_event) + + # Paint dot at suggested location + if action.name in ["mouse_move", "mouse_click"]: + img_with_dot = self.paint_dot(screenshot, action.mouse_x, action.mouse_y) + + # Update screenshot with dot for analysis + screenshot.image = img_with_dot + + # Analyze accuracy and potentially self-correct + is_accurate, feedback = self.analyze_dot_accuracy( + screenshot, action.mouse_x, action.mouse_y + ) + + if not is_accurate and self.correction_count < self.max_corrections: + self.correction_count += 1 + logger.info(f"Self-correction attempt {self.correction_count}: {feedback}") + + # Try to extract correction coordinates from feedback + import re + coords_match = re.search(r'coordinates: \((\d+), (\d+)\)', feedback) + if coords_match: + new_x, new_y = map(int, coords_match.groups()) + # Create a new corrected action + action = models.ActionEvent( + name=action.name, + mouse_x=new_x, + mouse_y=new_y, + window_event=window_event, + ) + else: + self.correction_count = 0 + + self.action_history.append(action) + return action + + def _initialize_approach( + self, screenshot: models.Screenshot, window_event: models.WindowEvent + ) -> models.ActionEvent: + """Initialize the selected cursor movement approach. + + Args: + screenshot (models.Screenshot): The current screenshot. + window_event (models.WindowEvent): The current window event. + + Returns: + models.ActionEvent: The initial action for the selected approach. + """ + # TODO: Implement initialization for each approach + if self.approach == "grid": + return self._init_grid_approach(screenshot, window_event) + elif self.approach == "joystick": + return self._init_joystick_approach(screenshot, window_event) + # Add other approaches as needed + else: + raise ValueError(f"Unknown approach: {self.approach}") + + def _get_next_approach_action( + self, screenshot: models.Screenshot, window_event: models.WindowEvent + ) -> models.ActionEvent: + """Get the next action based on the current approach. + + Args: + screenshot (models.Screenshot): The current screenshot. + window_event (models.WindowEvent): The current window event. + + Returns: + models.ActionEvent: The next action for the selected approach. + """ + # TODO: Implement next action logic for each approach + if self.approach == "grid": + return self._next_grid_action(screenshot, window_event) + elif self.approach == "joystick": + return self._next_joystick_action(screenshot, window_event) + # Add other approaches as needed + else: + raise ValueError(f"Unknown approach: {self.approach}") + + def _init_grid_approach( + self, screenshot: models.Screenshot, window_event: models.WindowEvent + ) -> models.ActionEvent: + """Initialize the grid-based cursor movement approach. + + Args: + screenshot (models.Screenshot): The current screenshot. + window_event (models.WindowEvent): The current window event. + + Returns: + models.ActionEvent: The initial action for the grid approach. + """ + # TODO: Implement grid initialization + raise NotImplementedError("Grid approach not implemented yet") + + def _init_joystick_approach( + self, screenshot: models.Screenshot, window_event: models.WindowEvent + ) -> models.ActionEvent: + """Initialize the joystick-based cursor movement approach. + + Args: + screenshot (models.Screenshot): The current screenshot. + window_event (models.WindowEvent): The current window event. + + Returns: + models.ActionEvent: The initial action for the joystick approach. + """ + # TODO: Implement joystick initialization + raise NotImplementedError("Joystick approach not implemented yet") + + def _next_grid_action( + self, screenshot: models.Screenshot, window_event: models.WindowEvent + ) -> models.ActionEvent: + """Get the next action for the grid-based approach. + + Args: + screenshot (models.Screenshot): The current screenshot. + window_event (models.WindowEvent): The current window event. + + Returns: + models.ActionEvent: The next action for the grid approach. + """ + # TODO: Implement grid movement logic + raise NotImplementedError("Grid approach not implemented yet") + + def _next_joystick_action( + self, screenshot: models.Screenshot, window_event: models.WindowEvent + ) -> models.ActionEvent: + """Get the next action for the joystick-based approach. + + Args: + screenshot (models.Screenshot): The current screenshot. + window_event (models.WindowEvent): The current window event. + + Returns: + models.ActionEvent: The next action for the joystick approach. + """ + # TODO: Implement joystick movement logic + raise NotImplementedError("Joystick approach not implemented yet") + + def __del__(self) -> None: + """Log the action history.""" + action_history_dicts = [ + action.to_prompt_dict() for action in self.action_history + ] + logger.info(f"action_history=\n{pformat(action_history_dicts)}") \ No newline at end of file