Source code for biohit_pipettor_plus.pipettor_plus.pipettor_plus

class AbortException(Exception):
    """Raised when user aborts an operation"""
    pass

from biohit_pipettor import Pipettor
from biohit_pipettor.errors import CommandFailed
#from biohit_pipettor import PipettorSimulator
from biohit_pipettor_plus.pipettor_plus.mock_pipettor import PipettorSimulator


from biohit_pipettor_plus.deck_structure import *
from biohit_pipettor_plus.pipettor_plus.geometry import (calculate_liquid_height, calculate_dynamic_remove_height)
from biohit_pipettor_plus.pipettor_plus.config import load_config, save_config

import time
import os
import subprocess
from typing import Literal, List
from math import ceil

[docs] class PipettorPlus: def __init__(self, tip_volume: Literal[200, 1000], *, multichannel: bool, initialize: bool = True, deck: Deck, tip_length: float = None, mock_pipettor=False): """ Interface to the Biohit Robo pipettor with deck/slot/labware structure Parameters ---------- tip_volume : Literal[200, 1000] The tip volume (must be 1000 if multichannel is True) multichannel : bool If True, it is assumed the device uses a multichannel pipet initialize : bool If True, the device will be initialized deck : Deck The deck containing slots and labware tip_length : float The tip length in mm mock_pipettor : bool If True, mock_pipettor will be used instead of Biohit Robo pipettor """ if mock_pipettor: pipettor_class = PipettorSimulator else: pipettor_class = Pipettor self._pipettor_context = pipettor_class( tip_volume=tip_volume, multichannel=multichannel, initialize=True if mock_pipettor else initialize, ) # initialize has to be true when using mock_pipettor as per documentation # Enter context immediately self._pipettor = self._pipettor_context.__enter__() self._simulation_mode = False self.multichannel = multichannel self._deck = deck self._slots: dict[str, Slot] = deck.slots self.abort_requested = False self.pause_requested = False cfg = load_config() #creates a dict of all tips, each tips having its own dict where content and volume in tip can be stored. tip_lengths = cfg["TIP_LENGTHS"] self.tip_length = tip_length if tip_length is not None else tip_lengths[int(tip_volume)] self.tip_count = int(cfg["Pipettors_in_Multi"]) if self.multichannel else 1 self.tip_dict = {i: {} for i in range(0, self.tip_count)} self.has_tips = False self.change_tips = False self.tip_volume = tip_volume self.batch_size = int(cfg["MAX_BATCH_SIZE"]) self.foc_bat_script_path = None def __getattr__(self, name): """Forward any attribute access to the underlying pipettor""" if '_pipettor' not in self.__dict__: raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'") return getattr(self._pipettor, name)
[docs] def close(self): """Manually close the pipettor connection and exit context""" pipettor_context = getattr(self, '_pipettor_context', None) if pipettor_context: try: pipettor_context.__exit__(None, None, None) except Exception as e: print(f"Error closing pipettor: {e}") finally: self._pipettor_context = None self._pipettor = None
def __del__(self): """Cleanup when object is destroyed""" self.close()
[docs] def push_state(self): """Save complete state snapshot for later restoration.""" snapshot = {} for slot_id, slot in self._slots.items(): if not slot.labware_stack: continue # Get topmost labware (highest max_z) top_lw_id, (top_lw, (min_z, max_z)) = max( slot.labware_stack.items(), key=lambda item: item[1][1][1] # item[1][1][1] = max_z ) snapshot[top_lw.labware_id] = top_lw.get_state_snapshot() return { 'has_tips': self.has_tips, 'tip_dict': {k: v.copy() for k, v in self.tip_dict.items()}, 'deck_state': snapshot }
[docs] def pop_state(self, snapshot): """Restore pipettor and deck state from snapshot.""" self.has_tips = snapshot['has_tips'] self.tip_dict = {k: v.copy() for k, v in snapshot['tip_dict'].items()} deck_state_data = snapshot['deck_state'] for slot_id, slot in self._slots.items(): if not slot.labware_stack: continue # Get topmost labware top_lw_id, (top_lw, (min_z, max_z)) = max( slot.labware_stack.items(), key=lambda item: item[1][1][1] ) if top_lw.labware_id in deck_state_data: top_lw.restore_state_snapshot(deck_state_data[top_lw.labware_id])
[docs] def set_simulation_mode(self, enabled: bool) -> None: """Enable or disable simulation mode.""" self._simulation_mode = enabled if enabled: print(" → Simulation mode ENABLED ") else: print(" → Simulation mode DISABLED")
def _check_abort_and_pause(self): """Check for abort or pause requests at safe checkpoints""" #no need to check if in simulation mode if self._simulation_mode: return # Check abort first (highest priority) if self.abort_requested: self.abort_requested = False # Reset self.home() raise AbortException("Operation aborted by user") # Check pause while self.pause_requested: time.sleep(0.1) # Wait 100ms, check again # Allow abort even while paused if self.abort_requested: self.pause_requested = False # Unpause self.abort_requested = False self.home() raise AbortException("Operation aborted by user")
[docs] def pick_tips(self, pipette_holder: PipetteHolder, list_col_row: List[tuple[int, int]] = None,) -> List[tuple[int, int]]: """ Pick tips from a PipetteHolder. Parameters ---------- pipette_holder : PipetteHolder PipetteHolder labware containing tips list_col_row : List[tuple[int, int]], optional List of (column, row) grid indices to try. If None, automatically finds occupied grid locations. Returns ------- List[tuple[int, int]] Actual (column, row) positions where tips were picked Raises ------ ValueError If pipettor already has tips or pipette holder not found """ #check if tips already exist. if self.has_tips: raise ValueError("pipettor already has tips") if self.multichannel: return self.pick_multi_tips(pipette_holder, list_col_row) else: return self.pick_single_tip(pipette_holder, list_col_row)
[docs] def pick_multi_tips(self, pipette_holder: PipetteHolder, list_col_row: List[tuple[int, int]] = None) -> List[tuple[int, int]]: """ Pick tips from a PipetteHolder using multichannel pipettor. For multichannel, 8 consecutive tips are picked vertically. Parameters ---------- pipette_holder : PipetteHolder PipetteHolder labware containing tips list_col_row : List[tuple[int, int]], optional List of (column, start_row) grid indices to try. start_row indicates where the first pipettor in multichannel would be positioned. If None, automatically finds all grid locations with 8 consecutive occupied tips. Returns ------- List[tuple[int, int]] Actual (column, start_row) position where tips were picked Raises ------ ValueError If not a multichannel pipettor or no tips available RuntimeError If failed to pick tips from any specified grid location """ if not self.multichannel: raise ValueError("pick_multi_tips requires multichannel pipettor") if list_col_row is None: occupied_col_row = pipette_holder.get_occupied_holder_multi() if not occupied_col_row: raise ValueError( f"No occupied multichannel grid locations found in pipette holder {pipette_holder.labware_id}") list_col_row = occupied_col_row print(f"Auto-detected {len(list_col_row)} multichannel grid locations: {list_col_row}") if not list_col_row: raise ValueError(f"No col_row specified. list_col_row given : {list_col_row}") for col, start_row in list_col_row: #checks the status of Individual pipette holders at col_row status = pipette_holder.check_col_start_row_multi(col, start_row) if status == "INVALID": print(f"Grid location ({col}, {start_row}) is invalid, skipping") continue if status != "FULLY_OCCUPIED": print(f"Grid location ({col}, {start_row}) is not fully occupied (status: {status}), skipping") continue # All positions valid and occupied - get holders holders_to_use = [pipette_holder.get_holder_at(col, start_row + i) for i in range(self.tip_count)] # All 8 holders are valid, attempt to pick tips try: self._check_abort_and_pause() first_holder = holders_to_use[0] if first_holder.position is None: raise ValueError(f"Holder at grid location ({col}, {start_row}) has no position set") # FIXED: Calculate center position x, y = self._get_robot_xy_position(holders_to_use) # Pick tips at the specified height first_holder = holders_to_use[0] relative_z = self._get_relative_z(pipette_holder, remove=True) pipettor_z = self._get_pipettor_z_coord(pipette_holder, relative_z, child_item=first_holder) self.move_xy(x, y) self.pick_tip(50) # Mark all 8 tips in this column as removed pipette_holder.remove_consecutive_pipettes_multi([col], start_row) self.has_tips = True print(f"✓ Successfully picked 8 tips from column {col}, rows {start_row} to {start_row + 7}") return [(col, start_row)] except CommandFailed as e: print(f"✗ Failed to pick tips from column {col}, row {start_row}: {e}") continue finally: self.move_z(0) # If we got here, all attempts failed raise RuntimeError( f"Failed to pick tips from any of the specified locations {list_col_row}. " )
[docs] def pick_single_tip(self, pipette_holder: PipetteHolder, list_col_row: List[tuple[int, int]] = None) -> List[tuple[int, int]]: """Pick a single tip from a PipetteHolder using single-channel pipettor.""" if self.multichannel: raise ValueError("pick_single_tip requires single-channel pipettor") if list_col_row is None: # Use existing method! occupied_holders = pipette_holder.get_occupied_holders() if not occupied_holders: raise ValueError( f"No occupied tips found in pipette holder {pipette_holder.labware_id}") list_col_row = [(h.column, h.row) for h in occupied_holders] print(f"Auto-detected {len(list_col_row)} occupied tip positions") if not list_col_row: raise ValueError(f"No col_row specified. list_col_row given: {list_col_row}") for col, row in list_col_row: holder = pipette_holder.get_holder_at(col, row) if holder is None: print(f"Grid location ({col}, {row}) has no holder, skipping") continue if not holder.is_occupied: print(f"Grid location ({col}, {row}) is empty, skipping") continue try: self._check_abort_and_pause() if holder.position is None: print(f"Holder at grid location ({col}, {row}) has no position set, skipping") continue x, y = holder.position relative_z = self._get_relative_z(pipette_holder, remove=True) pipettor_z = self._get_pipettor_z_coord(pipette_holder, relative_z, child_item=holder) self.move_xy(x, y) self.pick_tip(50) holder.is_occupied = False self.has_tips = True print(f"✓ Successfully picked tip from column {col}, row {row}") return [(col, row)] except CommandFailed as e: print(f"✗ Failed to pick tip from column {col}, row {row}: {e}") holder.is_occupied = False continue finally: self.move_z(0) raise RuntimeError( f"Failed to pick tip from any of the specified locations {list_col_row}." )
[docs] def return_tips(self, pipette_holder: PipetteHolder, list_col_row: List[tuple[int, int]] = None) -> List[tuple[int, int]]: """ return tips to PipetteHolder. """ if not self.has_tips: raise ValueError("pipettor have no tips to return") if self.multichannel: return self.return_multi_tips(pipette_holder, list_col_row) else: return self.return_single_tip(pipette_holder, list_col_row)
[docs] def return_multi_tips(self, pipette_holder: PipetteHolder, list_col_row: List[tuple[int, int]] = None) -> List[tuple[int, int]]: """ Return tips to a PipetteHolder using multichannel pipettor. For multichannel, 8 consecutive tips are returned vertically. Parameters ---------- pipette_holder : PipetteHolder PipetteHolder labware to return tips to list_col_row : List[tuple[int, int]], optional List of (column, start_row) grid indices to try. start_row indicates where the first pipettor in multichannel would be positioned. If None, automatically finds all grid locations with 8 consecutive empty positions. Raises ------ ValueError If not a multichannel pipettor or no tips to return RuntimeError If failed to return tips to any specified grid location """ if not self.multichannel: raise ValueError("return_multi_tips requires multichannel pipettor") if not self.has_tips: raise ValueError("No tips to return - pipettor is empty") if list_col_row is None: available_col_row = pipette_holder.get_available_holder_multi() if not available_col_row: raise ValueError( f"No available multichannel grid locations found in pipette holder {pipette_holder.labware_id}") list_col_row = sorted(available_col_row, key=lambda x: (x[0], x[1]), reverse=True) print(f"Auto-detected {len(list_col_row)} available multichannel grid locations: {list_col_row}") if not list_col_row: raise ValueError(f"No col_row specified. list_col_row given: {list_col_row}") for col, start_row in list_col_row: # Check the status of positions status = pipette_holder.check_col_start_row_multi(col, start_row) if status == "INVALID": print(f"Grid location ({col}, {start_row}) is invalid, skipping") continue if status != "FULLY_AVAILABLE": print(f"Grid location ({col}, {start_row}) is not fully available (status: {status}), skipping") continue # All positions valid and available - get holders holders_to_use = [pipette_holder.get_holder_at(col, start_row + i) for i in range(self.tip_count)] # Attempt to return tips try: self._check_abort_and_pause() first_holder = holders_to_use[0] if first_holder.position is None: print(f"Holder at grid location ({col}, {start_row}) has no position set, skipping") continue x, y = self._get_robot_xy_position(holders_to_use) # Return tips at the specified height first_holder = holders_to_use[0] relative_z = self._get_relative_z(pipette_holder, remove=False) pipettor_z = self._get_pipettor_z_coord(pipette_holder, relative_z, child_item=first_holder) self.move_xy(x, y) self.move_z(pipettor_z) self.eject_tip() # Mark all 8 positions in this column as occupied pipette_holder.place_consecutive_pipettes_multi([col], start_row) self.initialize_tips() print(f"✓ Successfully returned 8 tips to column {col}, rows {start_row} to {start_row + 7}") return [(col, start_row)] except CommandFailed as e: print(f"✗ Failed to return tips to column {col}, row {start_row}: {e}") continue finally: self.move_z(0) # If we got here, all attempts failed raise RuntimeError( f"Failed to return tips to any of the specified locations {list_col_row}. " )
[docs] def return_single_tip(self, pipette_holder: PipetteHolder, list_col_row: List[tuple[int, int]] = None) -> List[tuple[int, int]]: """Return a single tip to a PipetteHolder using single-channel pipettor.""" if self.multichannel: raise ValueError("return_single_tip requires single-channel pipettor") if not self.has_tips: raise ValueError("No tip to return - pipettor is empty") if list_col_row is None: # Use existing method! available_holders = pipette_holder.get_available_holders() if not available_holders: raise ValueError( f"No available positions found in pipette holder {pipette_holder.labware_id}") list_col_row = sorted([(h.column, h.row) for h in available_holders], key=lambda x: (x[0], x[1]), reverse=True) print(f"Auto-detected {len(list_col_row)} available tip positions") if not list_col_row: raise ValueError(f"No col_row specified. list_col_row given: {list_col_row}") for col, row in list_col_row: holder = pipette_holder.get_holder_at(col, row) if holder is None: print(f"Grid location ({col}, {row}) has no holder, skipping") continue if holder.is_occupied: print(f"Grid location ({col}, {row}) is already occupied, skipping") continue try: self._check_abort_and_pause() if holder.position is None: print(f"Holder at grid location ({col}, {row}) has no position set, skipping") continue x, y = holder.position relative_z = self._get_relative_z(pipette_holder, remove=False) pipettor_z = self._get_pipettor_z_coord(pipette_holder, relative_z, child_item=holder) self.move_xy(x, y) self.move_z(pipettor_z) # return_height self.eject_tip() holder.is_occupied = True self.initialize_tips() print(f"✓ Successfully returned tip to column {col}, row {row}") return [(col, row)] except CommandFailed as e: print(f"✗ Failed to return tip to column {col}, row {row}: {e}") continue finally: self.move_z(0) raise RuntimeError( f"Failed to return tip to any of the specified locations {list_col_row}. " )
[docs] def replace_tips(self, pipette_holder: PipetteHolder, pick_pipette_holder:PipetteHolder = None, return_list_col_row: List[tuple[int, int]] = None, pick_list_col_row: List[tuple[int, int]] = None ) -> dict: if not pick_pipette_holder: pick_pipette_holder = pipette_holder self._check_abort_and_pause() if self.multichannel: # get list of available holders if not return_list_col_row: available_col_row = pipette_holder.get_available_holder_multi() return_list_col_row = sorted(available_col_row, key=lambda x: (x[0], x[1]), reverse=True) # get list of occupied holders if not pick_list_col_row: pick_list_col_row = pick_pipette_holder.get_occupied_holder_multi() else: if not return_list_col_row: available_holders = pipette_holder.get_available_holders() # Sort high to low for returning return_list_col_row = sorted([(h.column, h.row) for h in available_holders], key=lambda x: (x[0], x[1]), reverse=True) if not pick_list_col_row: occupied_holders = pick_pipette_holder.get_occupied_holders() pick_list_col_row = [(h.column, h.row) for h in occupied_holders] # Convert to list of occupied_holders! actual_return_pos = self.return_tips(pipette_holder, list_col_row=return_list_col_row) actual_pick_pos = self.pick_tips(pick_pipette_holder, list_col_row=pick_list_col_row) return { 'return': actual_return_pos, 'pick': actual_pick_pos }
[docs] def discard_tips(self, tip_dropzone: Labware) -> None: """ Discard tips to a TipDropzone. Parameters ---------- tip_dropzone : labware TipDropzone labware """ self._check_abort_and_pause() if not self.has_tips: raise RuntimeError("No tips to discard") if not isinstance(tip_dropzone, TipDropzone): raise ValueError("discard_tips only works with TipDropzone") x, y = tip_dropzone.position relative_z = self._get_relative_z(tip_dropzone) pipettor_z = self._get_pipettor_z_coord(tip_dropzone, relative_z, child_item=None) self.move_xy(x, y) self.move_z(pipettor_z) self.eject_tip() self.move_z(0) self.initialize_tips()
[docs] def add_medium(self, source: ReservoirHolder, source_col_row: tuple[int, int], volume_per_well: float, destination: Plate, dest_col_row: List[tuple[int, int]], mix_volume: float = 0) -> None: """ Transfer medium from a reservoir to destination plate column(s). Works for both single-channel and multichannel pipettors. Parameters ---------- source : ReservoirHolder ReservoirHolder containing the medium to transfer source_col_row : tuple[int, int] (Column, Row) position of reservoir. volume_per_well : float Volume (µL) to dispense into each destination well destination : Plate Target plate to receive the medium dest_col_row : List[tuple[int, int]] List of (Column, Row) positions in destination plate. Row is start row for multichannel pipettor. mix_volume : float, optional Volume (µL) to use for mixing after dispensing. 0 = no mixing (default) """ # Validate (source needs 1 row, destinations need tip_count rows) self._validate_transfer(source, [source_col_row], destination, dest_col_row, 1, self.tip_count) # Calculate volumes volume_per_destination, max_vol = self._calculate_volumes(volume_per_well) original_batch_size = self.batch_size if self.change_tips or mix_volume > 0: self.batch_size = 1 # Choose strategy try: if volume_per_destination > max_vol: self._multi_trip_transfer(dest_col_row, volume_per_destination, max_vol, source, source_col_row, destination, None, is_one_to_many=True, mix_volume=mix_volume) else: self._batch_transfer(dest_col_row, volume_per_destination, max_vol, source, source_col_row, destination, None, is_one_to_many=True, mix_volume=mix_volume) finally: self.batch_size = original_batch_size
[docs] def remove_medium(self, source: Plate, source_col_row: List[tuple[int, int]], volume_per_well: float, destination: ReservoirHolder, destination_col_row: tuple[int, int]) -> None: """ Remove medium from plate wells to a destination reservoir. Works for both single-channel and multichannel pipettors. Parameters ---------- source : Plate Plate to remove medium from source_col_row : List[tuple[int, int]] List of (Column, Row) positions to remove from. Row is start row for multichannel pipettor. volume_per_well : float Volume (µL) to remove from each well destination : ReservoirHolder ReservoirHolder to receive the liquid destination_col_row : tuple[int, int] (Column, Row) position of destination reservoir. """ # Validate (sources need tip_count rows, destination needs 1 row) self._validate_transfer(source, source_col_row, destination, [destination_col_row], self.tip_count, 1) # Calculate volumes volume_per_source, max_vol = self._calculate_volumes(volume_per_well) # Choose strategy original_batch_size = self.batch_size if self.change_tips: self.batch_size = 1 try: if volume_per_source > max_vol: self._multi_trip_transfer(source_col_row, volume_per_source, max_vol, source, None, destination, destination_col_row, is_one_to_many=False) else: self._batch_transfer(source_col_row, volume_per_source, max_vol, source, None, destination, destination_col_row, is_one_to_many=False) finally: self.batch_size = original_batch_size
[docs] def transfer_plate_to_plate(self, source: Plate, source_col_row: List[tuple[int, int]], destination: Plate, dest_col_row: List[tuple[int, int]], volume_per_well: float, mix_volume: float = 0) -> None: """ Transfer liquid from source plate to destination plate (one-to-one mapping). Works for both single-channel and multichannel pipettors. Each source position is transferred to the corresponding destination position. source_col_row[i] → dest_col_row[i] Parameters ---------- source : Plate Source plate source_col_row : List[tuple[int, int]] List of source (Column, Row) positions. Row is start row for multichannel pipettor. destination : Plate Destination plate dest_col_row : List[tuple[int, int]] List of destination (Column, Row) positions. Must be same length as source_col_row. volume_per_well : float Volume (µL) to transfer from each well Raises ------ ValueError If source and destination lists have different lengths """ # Validate list lengths match if len(source_col_row) != len(dest_col_row): raise ValueError( f"Source and destination lists must be same length. " f"Got {len(source_col_row)} sources and {len(dest_col_row)} destinations." ) # Validate all positions (both need tip_count rows) self._validate_transfer(source, source_col_row, destination, dest_col_row, self.tip_count, self.tip_count) # Calculate volumes volume_per_transfer, max_vol = self._calculate_volumes(volume_per_well) print(f"\n{'=' * 60}") print(f"Plate-to-plate transfer (one-to-one)") print(f"{'=' * 60}") print(f"Number of transfers: {len(source_col_row)}") print(f"Volume per well: {volume_per_well}µL") print(f"Volume per transfer: {volume_per_transfer}µL") print(f"Max volume per aspirate: {max_vol}µL\n") # Process each source → destination pair for idx, (src_pos, dst_pos) in enumerate(zip(source_col_row, dest_col_row), 1): src_col, src_row = src_pos dst_col, dst_row = dst_pos print(f"Transfer {idx}/{len(source_col_row)}: ({src_col},{src_row}) → ({dst_col},{dst_row})") self.check_tips() # Check if multi-trip needed if volume_per_transfer > max_vol: # Multi-trip for this pair num_trips = ceil(volume_per_transfer / max_vol) base_volume = volume_per_transfer // num_trips remainder = volume_per_transfer % num_trips print(f" {num_trips} trips needed") for trip_num in range(num_trips): trip_volume = base_volume + (1 if trip_num < remainder else 0) self.suck(source, src_pos, trip_volume) self.spit(destination, dst_pos, trip_volume) print(f" Trip {trip_num + 1}/{num_trips}: {trip_volume}µL") else: # Single transfer self.suck(source, src_pos, volume_per_transfer) self.spit(destination, dst_pos, volume_per_transfer) print(f" ✓ {volume_per_transfer}µL transferred") if mix_volume > 0: actual_mix_vol = min(mix_volume, self.tip_volume - 10) print(f" → Mixing with {actual_mix_vol}µL") self.suck(destination, dst_pos, actual_mix_vol) self.spit(destination, dst_pos, actual_mix_vol) print() print(f"{'=' * 60}") print("✓ Plate-to-plate transfer complete\n")
[docs] def suck(self, source: Labware, source_col_row: tuple[int, int], volume: float) -> None: """ Aspirate from a source labware. Works with any labware type. If labware supports content tracking, content is tracked. If not, only physical movement is performed. Parameters ---------- source : Labware Source labware (Plate, ReservoirHolder, or any other labware) source_col_row : tuple[int, int] (Column, Row) position in source labware volume : float Total volume to aspirate across all tips (µL) """ source_col, source_row = source_col_row if volume <= 0: raise ValueError("Volume must be positive") # Validate based on whether it's a reservoir or not if not source.each_tip_needs_separate_item(): # like for Reservoir: all tips access same reservoir source.validate_col_row_or_raise([source_col], source_row, 1) else: # Plate or other grid labware: each tip accesses different position source.validate_col_row_or_raise([source_col], source_row, self.tip_count) if self._supports_content_management(source, source_col, source_row): # Track content if supported self._aspirate_with_content_tracking(source, source_col, source_row, volume) print(f" → Aspirated {volume}µL from {source.labware_id} at {source_col_row}") print(f" → Tip content now: {self._get_tip_content_summary()}") else: # Just do physical movement without content tracking self._aspirate_physical_only(source, source_col, source_row, volume) print(f" → Aspirated {volume}µL from {source.labware_id} at {source_col_row}")
[docs] def spit(self, destination: Labware, dest_col_row: tuple[int, int], volume: float) -> None: """ Dispense from tips to a destination labware. Works with any labware type. If labware supports content tracking, content is tracked. If not, only physical movement is performed. Parameters ---------- destination : Labware Destination labware (Plate, ReservoirHolder, or any other labware) dest_col_row : tuple[int, int] (Column, Row) position in destination labware volume : float Total volume to dispense across all tips (µL) """ dest_col, dest_row = dest_col_row if volume <= 0: raise ValueError("Volume must be positive") # Check we have enough volume in tips total_available = self.get_total_tip_volume() if total_available < volume: difference = volume - total_available if difference <= 0.01: volume = total_available else: raise ValueError( f"Insufficient volume in tips. Available: {total_available}µL, Requested: {volume}µL" ) # Validate based on whether destination needs separate items per tip if not destination.each_tip_needs_separate_item(): # Reservoir: all tips dispense to same item destination.validate_col_row_or_raise([dest_col], dest_row, 1) else: # Plate: each tip dispenses to different item destination.validate_col_row_or_raise([dest_col], dest_row, self.tip_count) if self._supports_content_management(destination, dest_col, dest_row): # Track content if supported self._dispense_with_content_tracking(destination, dest_col, dest_row, volume) print(f" → Dispensed {volume}µL to {destination.labware_id} at {dest_col_row}") print(f" → Tip content now: {self._get_tip_content_summary()}") else: # Just do physical movement without content tracking self._dispense_physical_only(destination, dest_col, dest_row, volume) print(f" → Dispensed {volume}µL to {destination.labware_id} at {dest_col_row}")
[docs] def add_content(self, content_type: str, volume: float, tip_index: int = None) -> None: """ Add content to specific tip(s). Parameters ---------- content_type : str Content to add (e.g., "PBS", "water") volume : float Volume to add (µL) tip_index : int, optional Which tip to add to (0-7 for multichannel, 0 for single). If None, adds to all tips equally. """ if volume <= 0: raise ValueError("Volume to add must be positive") if not content_type: raise ValueError("Content type cannot be empty") # Determine which tips to update if tip_index is not None: if tip_index not in self.tip_dict: raise ValueError(f"Invalid tip index {tip_index}. Valid range: 0-{self.tip_count - 1}") tip_indices = [tip_index] else: # Add to all tips tip_indices = list(self.tip_dict.keys()) # Add content to each specified tip for idx in tip_indices: if content_type in self.tip_dict[idx]: self.tip_dict[idx][content_type] += volume else: self.tip_dict[idx][content_type] = volume
[docs] def remove_content(self, volume: float, tip_index: int = None) -> None: """ Remove content from tip(s) proportionally. Parameters ---------- volume : float Volume to remove (µL) tip_index : int, optional Which tip to remove from. If None, removes from all tips. """ if volume <= 0: raise ValueError("Volume to remove must be positive") # Determine which tips to update if tip_index is not None: if tip_index not in self.tip_dict: raise ValueError(f"Invalid tip index {tip_index}") tip_indices = [tip_index] else: tip_indices = list(self.tip_dict.keys()) # Remove from each tip for idx in tip_indices: tip_content = self.tip_dict[idx] current_volume = sum(tip_content.values()) if tip_content else 0.0 if current_volume <= 0: raise ValueError(f"Cannot remove from empty tip {idx}") if volume > current_volume: difference = volume - current_volume if difference <= 0.01: volume = current_volume else: raise ValueError( f"Underflow in tip {idx}! Cannot remove {volume}µL, only {current_volume}µL available" ) # Remove proportionally removal_ratio = volume / current_volume content_types = list(tip_content.keys()) for content_type in content_types: remove_amount = tip_content[content_type] * removal_ratio tip_content[content_type] -= remove_amount if tip_content[content_type] <= 1e-6: del tip_content[content_type]
[docs] def home(self): self.move_xy(0, 0)
[docs] def move_xy(self, x: float, y: float): """Override parent to add simulation mode check""" if self._simulation_mode: return self._check_abort_and_pause() self._pipettor.move_z(0) self._pipettor.move_xy(x, y)
[docs] def move_z(self, z: float): """Override parent to add simulation mode check""" if self._simulation_mode: return self._check_abort_and_pause() self._pipettor.move_z(z)
[docs] def pick_tip(self, z): """Override parent to add simulation mode check""" if self._simulation_mode: return self._check_abort_and_pause() self._pipettor.pick_tip(z)
[docs] def eject_tip(self): """Override parent to add simulation mode check""" if self._simulation_mode: return self._check_abort_and_pause() self._pipettor.eject_tip()
[docs] def aspirate(self, volume): """Override parent to add simulation mode check""" if self._simulation_mode: return self._check_abort_and_pause() self._pipettor.aspirate(volume)
[docs] def dispense(self, volume): """Override parent to add simulation mode check""" if self._simulation_mode: return self._check_abort_and_pause() self._pipettor.dispense(volume)
[docs] def measure_foc(self, seconds: int, platename: str = None, bat_script_path: str = None): """ Wait for specified seconds, then run FOC measurement script. """ # Use provided path or stored path if bat_script_path is not None: self.foc_bat_script_path = bat_script_path # Use provided plate name or stored plate name if platename is not None: plate_to_use = platename elif hasattr(self, 'foc_plate_name') and self.foc_plate_name: plate_to_use = self.foc_plate_name else: raise ValueError("Plate name not provided and no plate name configured") # Check if we have a script path if not hasattr(self, 'foc_bat_script_path') or self.foc_bat_script_path is None: raise ValueError("FOC bat script path not set. Please configure FOC first.") # Verify file exists if not os.path.exists(self.foc_bat_script_path): raise FileNotFoundError(f"FOC bat script not found at: {self.foc_bat_script_path}") if self._simulation_mode: print(f"[SIMULATION] Would wait {seconds} seconds, then run FOC for plate '{plate_to_use}'") return # Exit early - don't run the actual script # Wait for specified time print(f"Waiting {seconds} seconds before FOC measurement...") self.home() time.sleep(seconds) # Run the bat script try: self._check_abort_and_pause() subprocess.call([self.foc_bat_script_path, plate_to_use]) print(f"FOC measurement completed for {plate_to_use}") except Exception as e: print(f"[ERROR] Failed to run FOC measurement: {str(e)}")
# Helper Functions. Not necessarily available for gui def _validate_transfer(self, source, source_positions, destination, destination_positions, source_consecutive_rows: int, dest_consecutive_rows: int) -> None: """Helper: Validate source and destination positions.""" if not self.has_tips and not self.change_tips: raise ValueError("No tips loaded. Pick tips first.") # Validate source positions if not isinstance(source_positions, list): source_positions = [source_positions] for source_col, source_row in source_positions: source.validate_col_row_or_raise([source_col], source_row, source_consecutive_rows) # Validate destination positions if not isinstance(destination_positions, list): destination_positions = [destination_positions] for dest_col, dest_row in destination_positions: destination.validate_col_row_or_raise([dest_col], dest_row, dest_consecutive_rows) def _multi_trip_transfer(self, positions: List[tuple[int, int]], volume_per_position: int, max_vol_per_aspirate: int, source, source_pos, destination, dest_positions, is_one_to_many: bool, mix_volume:float = 0) -> None: """ Helper: Handle multi-trip transfers when volume exceeds capacity. Parameters ---------- is_one_to_many : bool True for add_medium (1 source → many dests), False for remove_medium (many sources → 1 dest) """ print(f"\n→ Strategy: MULTI-TRIP (volume exceeds tip capacity)") print(f"{'=' * 60}\n") for pos in positions: col, row = pos self.check_tips() num_trips = ceil(volume_per_position / max_vol_per_aspirate) print(f"Position ({col}, {row}): {num_trips} trips needed") # Distribute volume evenly across trips base_volume = volume_per_position // num_trips remainder = volume_per_position % num_trips for trip_num in range(num_trips): trip_volume = base_volume + (1 if trip_num < remainder else 0) if is_one_to_many: # add_medium: same source, different destinations self.suck(source, source_pos, trip_volume) self.spit(destination, pos, trip_volume) # Mix after LAST trip only if mix_volume > 0 and trip_num == num_trips - 1: actual_mix_vol = min(mix_volume, self.tip_volume - 10) print(f" → Mixing with {actual_mix_vol}µL") self.suck(destination, pos, actual_mix_vol) self.spit(destination, pos, actual_mix_vol) else: # remove_medium: different sources, same destination self.suck(source, pos, trip_volume) self.spit(destination, dest_positions, trip_volume) print("✓ Multi-trip transfer complete\n") def _batch_transfer(self, positions: List[tuple[int, int]], volume_per_position: int, max_vol_per_aspirate: int, source, source_pos, destination, dest_pos, is_one_to_many: bool, mix_volume: float = 0) -> None: """ Helper: Handle batch transfers for multiple positions. Parameters ---------- is_one_to_many : bool True for add_medium (1 source → many dests), False for remove_medium (many sources → 1 dest) """ print(f"\n→ Strategy: BATCH MODE") print(f"{'=' * 60}\n") # Calculate optimal batch size max_positions_by_volume = max_vol_per_aspirate // volume_per_position batch_size = min(self.batch_size, max_positions_by_volume) if batch_size < 1: batch_size = 1 num_batches = ceil(len(positions) / batch_size) print(f"Batch size: {batch_size} position(s) per aspirate") print(f"Total batches: {num_batches}\n") # Process positions in batches idx = 0 batch_num = 1 while idx < len(positions): batch_end = min(idx + batch_size, len(positions)) batch = positions[idx:batch_end] total_batch_volume = len(batch) * volume_per_position self.check_tips() #change tip between batches if change_tip is True print(f"Batch {batch_num}/{num_batches}:") print(f" Positions: {batch}") print(f" Total volume: {total_batch_volume}µL") if is_one_to_many: # add_medium: aspirate once, dispense multiple times self.suck(source, source_pos, total_batch_volume) for pos in batch: self.spit(destination, pos, volume_per_position) print(f" ✓ Dispensed to {pos}: {volume_per_position}µL") if mix_volume > 0: actual_mix_vol = min(mix_volume, self.tip_volume - 10) print(f" → Mixing with {actual_mix_vol}µL") self.suck(destination, pos, actual_mix_vol) self.spit(destination, pos, actual_mix_vol) else: # remove_medium: aspirate multiple times, dispense once for pos in batch: self.suck(source, pos, volume_per_position) print(f" ✓ Aspirated from {pos}: {volume_per_position}µL") self.spit(destination, dest_pos, total_batch_volume) print(f" ✓ Dispensed to destination: {total_batch_volume}µL") print() idx = batch_end batch_num += 1 print(f"{'=' * 60}") print("✓ Batch transfer complete\n") def _aspirate_with_content_tracking(self, source: Labware, col: int, row: int, volume: float) -> None: """Aspirate with content tracking.""" self._check_abort_and_pause() # Check if each tip needs separate item if self.multichannel and source.each_tip_needs_separate_item(): items = [self._get_content_item(source, col, row + i) for i in range(self.tip_count)] volume_per_item = volume / self.tip_count else: items = [self._get_content_item(source, col, row)] volume_per_item = volume # Validate items exist if any(item is None for item in items): raise ValueError(f"Content item not found at position ({col}, {row})") # Check total available volume total_available = sum(item.get_total_volume() for item in items if item) if total_available < volume: difference = volume - total_available if difference <= 0.01: volume = total_available else: raise ValueError( f"Insufficient volume. Available: {total_available}µL, Requested: {volume}µL" ) # Remove content from source and add to tips for tip_idx, item in enumerate(items): if item and volume_per_item > 0: removed_content = item.remove_content(volume_per_item, return_dict=True) if len(items) == 1 and self.multichannel: # Distribute to all tips for tip_i in range(self.tip_count): for content_type, vol in removed_content.items(): if vol > 0: self.add_content(content_type, vol / self.tip_count, tip_index=tip_i) else: # Each tip gets from its own item for content_type, vol in removed_content.items(): if vol > 0: self.add_content(content_type, vol, tip_index=tip_idx) self._move_to_and_aspirate(items, volume) def _aspirate_physical_only(self, source: Labware, col: int, row: int, volume: float) -> None: """Aspirate without content tracking.""" # Get items (same logic as content tracking) if self.multichannel and source.each_tip_needs_separate_item(): items = [self._get_content_item(source, col, row + i) for i in range(self.tip_count)] else: item = self._get_content_item(source, col, row) items = [item if item else source] self._move_to_and_aspirate(items, volume) print(f" → Note: Content tracking not available for {type(source).__name__}") def _dispense_with_content_tracking(self, destination: Labware, col: int, row: int, volume: float) -> None: """Dispense with content tracking.""" self._check_abort_and_pause() # Check if each tip needs separate destination item. get item/items if self.multichannel and destination.each_tip_needs_separate_item(): items = [self._get_content_item(destination, col, row + i) for i in range(self.tip_count)] volume_per_item = volume / self.tip_count else: items = [self._get_content_item(destination, col, row)] volume_per_item = volume # Validate items exist if any(item is None for item in items): raise ValueError(f"Content item not found at position ({col}, {row})") # Transfer content from tips to destination if len(items) == 1 and self.multichannel: # All tips dispense to one item (like Reservoir) item = items[0] for tip_idx in range(self.tip_count): tip_content = self.tip_dict[tip_idx] tip_total = sum(tip_content.values()) if tip_content else 0.0 if tip_total > 0: tip_volume_to_dispense = volume / self.tip_count removal_ratio = tip_volume_to_dispense / tip_total for content_type in list(tip_content.keys()): remove_amount = tip_content[content_type] * removal_ratio item.add_content(content_type, remove_amount) tip_content[content_type] -= remove_amount if tip_content[content_type] <= 1e-6: del tip_content[content_type] else: # Each tip dispenses to its own item (like wells in plate) for tip_idx, item in enumerate(items): if item and volume_per_item > 0: tip_content = self.tip_dict[tip_idx] tip_total = sum(tip_content.values()) if tip_content else 0.0 if tip_total <= 0: continue removal_ratio = volume_per_item / tip_total for content_type in list(tip_content.keys()): remove_amount = tip_content[content_type] * removal_ratio item.add_content(content_type, remove_amount) tip_content[content_type] -= remove_amount if tip_content[content_type] <= 1e-6: del tip_content[content_type] self._move_to_and_dispense(items, volume) def _dispense_physical_only(self, destination: Labware, col: int, row: int, volume: float) -> None: """Dispense without content tracking.""" # Get items if self.multichannel and destination.each_tip_needs_separate_item(): items = [self._get_content_item(destination, col, row + i) for i in range(self.tip_count)] else: item = self._get_content_item(destination, col, row) items = [item if item else destination] self._move_to_and_dispense(items, volume) print(f" → Note: Content tracking not available for {type(destination).__name__}") def _move_to_and_aspirate(self, items: List[Labware], volume: float) -> None: """ Move to position and perform aspiration. For multichannel, moves to the CENTER of all tip positions. Parameters ---------- items : List[Labware] List of items to aspirate from (1 for single channel, 8 for multichannel) volume : float Total volume to aspirate """ if not items: raise ValueError("No items provided") # Calculate center position for robot movement x, y = self._get_robot_xy_position(items) # Use first item for height calculation item = items[0] relative_z = self._get_relative_z(item, remove=True, volume=volume) parent_labware = self._find_parent_labware(item) pipettor_z = self._get_pipettor_z_coord(parent_labware, relative_z, child_item=item) print(f"x, y, z = {x}, {y}, {pipettor_z}") self.move_xy(x, y) self.move_z(pipettor_z) self.aspirate(volume / self.tip_count) self.move_z(0) def _move_to_and_dispense(self, items: List[Labware], volume: float) -> None: """ Move to position and perform dispense. For multichannel, moves to the CENTER of all tip positions. Parameters ---------- items : List[Labware] List of items to dispense to (1 for single channel, 8 for multichannel) volume : float Total volume to dispense """ if not items: raise ValueError("No items provided") # Calculate center position for robot movement x, y = self._get_robot_xy_position(items) # Use first item for height calculation item = items[0] parent_labware = self._find_parent_labware(item) relative_z = self._get_relative_z(item, remove=False, volume=volume) pipettor_z = self._get_pipettor_z_coord(parent_labware, relative_z, child_item=item) print(f"x, y, z = {x}, {y}, {pipettor_z}") self.move_xy(x, y) self.move_z(pipettor_z) self.dispense(volume / self.tip_count) self.move_z(0) def _supports_content_management(self, labware: Labware, col: int, row: int) -> bool: """Check if labware supports content tracking at this position.""" item = self._get_content_item(labware, col, row) return (item is not None and hasattr(item, 'content') and hasattr(item, 'remove_content') and hasattr(item, 'add_content') and hasattr(item, 'get_total_volume')) def _get_content_item(self, labware: Labware, col: int, row: int): """ Get the actual item that holds content at a position. Uses duck typing to work with any labware structure: - If it has get_well_at(), it's a Plate-like structure - If it has reservoir mapping methods, it's a ReservoirHolder-like structure - Otherwise, the labware itself might hold content """ # Try Plate-like: get_well_at(col, row) if hasattr(labware, 'get_well_at'): return labware.get_well_at(col, row) # Try ReservoirHolder-like: position_to_hook_id + get_hook_to_reservoir_map if hasattr(labware, 'position_to_hook_id') and hasattr(labware, 'get_hook_to_reservoir_map'): hook_id = labware.position_to_hook_id(col, row) return labware.get_hook_to_reservoir_map().get(hook_id) # Direct content holder: the labware itself has content return labware
[docs] def get_total_tip_volume(self, tip_index: int = None) -> float: """ Get total volume in tip(s). Parameters ---------- tip_index : int, optional Specific tip. If None, returns total across all tips. Returns ------- float Total volume in µL """ if tip_index is not None: return sum(self.tip_dict[tip_index].values()) if self.tip_dict[tip_index] else 0.0 else: # Sum across all tips total = 0.0 for tip_content in self.tip_dict.values(): total += sum(tip_content.values()) if tip_content else 0.0 return total
[docs] def initialize_tips(self) -> None: """Clear tip content when tips are discarded.""" self.has_tips = False self.tip_dict = {i: {} for i in range(self.tip_count)} print(f" → Tips discarded, content cleared")
def _calculate_volumes(self, volume_per_well: float) -> tuple[int, int]: """Helper: Calculate volume per position and max volume per aspirate.""" volume_per_position = int(volume_per_well * self.tip_count) max_vol_per_aspirate = int((self.tip_volume - 10) * self.tip_count) # to prevent piston overdrive if volume_per_position <= 0: raise ValueError("volume_per_well must be > 0") return volume_per_position, max_vol_per_aspirate def _get_tip_content_summary(self) -> str: """Get a readable summary of tip content across all tips.""" total_volume = self.get_total_tip_volume() if not self.tip_dict or total_volume <= 0: return "empty" # Aggregate content across all tips aggregated_content = {} for tip_content in self.tip_dict.values(): for content_type, volume in tip_content.items(): if content_type in aggregated_content: aggregated_content[content_type] += volume else: aggregated_content[content_type] = volume parts = [] for content_type, volume in aggregated_content.items(): parts.append(f"{content_type}: {volume:.1f}µL") return ", ".join(parts)
[docs] def get_tip_status(self) -> dict: """Get current tip status.""" total_volume = self.get_total_tip_volume() return { "content_dict": self.tip_dict.copy(), "volume_remaining": total_volume, "is_empty": total_volume <= 0, "content_summary": self._get_tip_content_summary() }
[docs] def check_tips(self) -> None: """ Check if tip change is required. if yes, do it. """ lw = self.find_labware_by_type("PipetteHolder")[0] # Gets first PipetteHolder found try: discard_lw = self.find_labware_by_type("TipDropzone")[0] except ValueError: discard_lw = None if self.change_tips and not self.has_tips: self.pick_tips(lw) elif self.change_tips and self.has_tips: if discard_lw is not None: self.discard_tips(discard_lw) self.pick_tips(lw) else: self.replace_tips(lw) elif not self.has_tips: raise ValueError("No tips loaded. Pick tips first.")
[docs] def find_labware_by_type(self, labware_type: str) -> list[Labware]: """ Find a labware instance by its type name (case-sensitive). Parameters ---------- labware_type : str The class name of the labware to find (e.g., "Plate", "ReservoirHolder", "PipetteHolder") Case-sensitive. Returns list of Labwares Any Labware instance of the specified type, placed in deck and slot. Raises ValueError If no labware of the specified type is found in the deck and placed in a slot. """ # Iterate through all labware in the deck lw = [] for labware_id, labware in self._deck.labware.items(): # Check if the labware's class name matches the requested type (case-sensitive) if labware.__class__.__name__ == labware_type: # Check if this labware is placed in a slot ( in all slots placed in deck) slot_id = self._deck.get_slot_for_labware(labware_id) if slot_id is not None: lw.append(labware) if not lw: raise ValueError(f"No labware found for type '{labware_type}'.") else: return lw
def _find_parent_labware(self, item: Labware) -> Labware: """ Find the parent labware that contains this item. For a Well, the parent is the Plate. For a Reservoir, the parent is the ReservoirHolder. For standalone labware, returns the item itself. """ # Check if this item IS the top-level labware if item.labware_id in self._deck.labware: return item # Otherwise, search through all labware to find which one contains this item for labware_id, labware in self._deck.labware.items(): # Check if it's a Plate containing this well if hasattr(labware, 'get_wells'): wells = labware.get_wells() for well in wells.values(): if well and well.labware_id == item.labware_id: return labware # Check if it's a ReservoirHolder containing this reservoir if hasattr(labware, 'get_reservoirs'): reservoirs = labware.get_reservoirs() for reservoir in reservoirs: if reservoir.labware_id == item.labware_id: return labware # Check if it's a PipetteHolder containing this individual holder if hasattr(labware, 'get_individual_holders'): holders = labware.get_individual_holders() for holder in holders.values(): if holder and holder.labware_id == item.labware_id: return labware raise ValueError(f"Could not find parent labware for item {item.labware_id}") def _get_relative_z(self, item: Labware, remove: bool = True, volume: float= None) -> float: """ Get the relative z of the item. Parameters ---------- item : Labware The parent labware (Plate, ReservoirHolder, PipetteHolder, etc.) remove: bool Whether to aspirate medium/pick tip or to add medium/eject tip. volume : float, optional Volume for dynamic height calculation (if applicable) """ item_type = item.__class__.__name__ if item_type == "TipDropzone": return getattr(item, 'drop_height_relative', item.size_z) if item_type == "IndividualPipetteHolder": parent_labware = self._find_parent_labware(item) if remove: return getattr(parent_labware, 'remove_height', item.size_z) else: return getattr(parent_labware, 'add_height', item.size_z) if item_type == "PipetteHolder": if remove: return getattr(item, 'remove_height', item.size_z) else: return getattr(item, 'add_height', item.size_z) if item_type in ["Reservoir", "Well"]: parent_labware = self._find_parent_labware(item) if remove: # Aspiration logic if volume and hasattr(item, 'shape') and item.shape: # Dynamic aspiration based on liquid level if parent_labware.each_tip_needs_separate_item(): volume_per_tip = volume / self.tip_count relative_z = calculate_dynamic_remove_height(item, volume_per_tip) else: relative_z = calculate_dynamic_remove_height(item, volume) liquid_height = calculate_liquid_height(item) print(f" → Dynamic aspiration: {relative_z:.1f}mm from bottom " f"(liquid: {liquid_height:.1f}mm, shape: {item.shape})") return relative_z elif hasattr(parent_labware, 'remove_height'): relative_z = parent_labware.remove_height print(f" → Fixed aspiration: {relative_z:.1f}mm from bottom") return relative_z else: relative_z = item.size_z * 0.75 print(f" → Fallback aspiration: 5 mm above bottom") return relative_z else: # Determine RELATIVE height from labware BOTTOM if volume and hasattr(item, 'shape') and item.shape: volume_per_tip = volume / self.tip_count if parent_labware.each_tip_needs_separate_item(): relative_z = calculate_dynamic_remove_height(item, -volume_per_tip) + 5 else: relative_z = calculate_dynamic_remove_height(item, -volume) + 5 liquid_height = calculate_liquid_height(item) print(f" → Dynamic dispensing: {relative_z:.1f}mm from bottom " f"(liquid: {liquid_height:.1f}mm, shape: {item.shape})") return relative_z elif hasattr(parent_labware, 'add_height'): relative_z = parent_labware.add_height print(f" → Fixed dispensing: {relative_z:.1f}mm from bottom") return relative_z else: relative_z = item.size_z print(f" → Fallback dispensing: item.size_z") return relative_z print(f" → Warning: Unknown item type '{item_type}', using size_z") return item.size_z def _get_pipettor_z_coord(self, labware:Labware, relative_z: float, child_item: Labware = None) -> float: """ Convert relative height to absolute height, passable to pipettor with and without tips. If child_item is provided, the calculation is based on the child's bottom position. Otherwise, it's based on the labware's bottom. Parameters ---------- labware : Labware The parent labware (Plate, ReservoirHolder, PipetteHolder, etc.) relative_z : float Height relative to the BOTTOM of the child item (or labware if no child): - 0 = bottom of well/reservoir/holder - Positive values = higher up from bottom child_item : Labware, optional The specific child item being accessed (Well, Reservoir, IndividualPipetteHolder). If None, uses labware's bottom as reference. Returns ------- float Pipettor Z coordinate (how far down pipettor moves from home position) """ # Find the slot that contains this labware. essential to get min_z and max_z slot_id = self._deck.get_slot_for_labware(labware.labware_id) if slot_id is None: raise ValueError(f"Labware {labware.labware_id} is not placed in any slot") slot = self._slots[slot_id] if labware.labware_id not in slot.labware_stack: raise ValueError(f"Labware {labware.labware_id} not found in slot {slot_id}") _, (min_z, max_z) = slot.labware_stack[labware.labware_id] # Determine child depth child_depth = 0.0 if child_item is not None: child_depth = child_item.size_z reference_bottom = max_z - child_depth absolute_height = reference_bottom + relative_z #print(f"reference_bottom({reference_bottom}) = max_z({max_z}) - child_depth ({child_depth})") #print(f"absolute_height ({absolute_height}) = reference_bottom ({reference_bottom})+ liquid_height({relative_z}) ") if absolute_height < reference_bottom: #print(f"absolute_height({absolute_height}) cannot be less than reference_bottom({reference_bottom}). New Absolute height = {reference_bottom} + 1") absolute_height = reference_bottom + 1 deck_range_z = self._deck.range_z if self.has_tips: pipettor_z = deck_range_z - absolute_height - self.tip_length print(f" has_tips: pipettor_z({pipettor_z}) = deck_range_z({deck_range_z}) - absolute_height ({absolute_height}) - self.tip_length({self.tip_length})") # Validation with tips if pipettor_z < 0: raise ValueError( f"Cannot reach pipettor_z={pipettor_z:.1f}mm with tips attached " f"Maximum reachable height: {deck_range_z - self.tip_length:.1f}mm " ) cfg = load_config() zmax = float(cfg["Z_MAX"]) if pipettor_z > zmax: raise ValueError( f"pipettor_z cannot be higher than Z_MAX ({zmax:.1f} mm). Got {pipettor_z:.1f} mm." ) else: # No tips - full range available pipettor_z = deck_range_z - absolute_height print(f"No Tips: pipettor_z({pipettor_z}) = deck_range_z({deck_range_z}) - absolute_height ({absolute_height})") # Validation without tips cfg = load_config() zmax = float(cfg["Z_MAX"]) if pipettor_z < 0: raise ValueError( f"Invalid height: absolute_z={absolute_height:.1f}mm exceeds deck range={deck_range_z:.1f}mm" ) elif pipettor_z > zmax: raise ValueError(f"pipettor_z cannot be higher than ({zmax:.1f} mm). Got {pipettor_z:.1f}mm") return round(pipettor_z, 2) def _get_robot_xy_position(self, items: List[Labware]) -> tuple[float, float]: """ Calculate where the robot arm should move to access the given items. The robot arm position depends on the pipettor configuration: Single channel (1 tip): Robot moves directly to the item's position. Robot arm = Tip position Multichannel (8 tips): Robot moves to the CENTER of all items. Robot arm is at geometric center, tips span from first to last item. Parameters ---------- items : List[Labware] Items to access. Must have 1 item for single channel, or tip_count items for multichannel. Returns ------- tuple[float, float] (x, y) coordinates for robot arm movement Examples -------- Single channel accessing well (0,0) at position (10, 20): items = [well_0_0] Returns: (10, 20) Multichannel accessing wells (0,0) to (0,7): items = [well_0_0, well_0_1, ..., well_0_7] Well (0,0) at (10, 10), Well (0,7) at (10, 73) Returns: (10, 41.5) - the midpoint """ if not items: raise ValueError("No items provided for positioning") # Single channel: use item position directly if not self.multichannel: if items[0].position is None: raise ValueError(f"Item {items[0].labware_id} has no position set") return items[0].position # Multichannel: calculate center of span if len(items) != self.tip_count and items[0].each_tip_needs_separate_item() == False: raise ValueError( f"Multichannel requires {self.tip_count} items, got {len(items)}" ) # Get first and last positions first_pos = items[0].position last_pos = items[-1].position if first_pos is None or last_pos is None: raise ValueError("Items must have positions set") x_first, y_first = first_pos x_last, y_last = last_pos # Calculate center x_center = (x_first + x_last) / 2 y_center = (y_first + y_last) / 2 if self.multichannel: print(f" → Multichannel positioning: center ({x_center:.1f}, {y_center:.1f}) " f"spanning ({x_first:.1f},{y_first:.1f}) to ({x_last:.1f},{y_last:.1f})") return x_center, y_center