from biohit_pipettor_plus.deck_structure.labware_classes.labware import Labware
from biohit_pipettor_plus.deck_structure.serializable import register_class, Serializable
from biohit_pipettor_plus.deck_structure.labware_classes.reservoir import Reservoir
from typing import Optional
import copy
[docs]
@register_class
class ReservoirHolder(Labware):
def __init__(self, size_x: float, size_y: float, size_z: float, hooks_across_x: int, hooks_across_y: int, reservoir_template: Reservoir = None,
remove_height: float = -45, add_height: float = 0, offset: tuple[float, float] = (0, 0),
labware_id: str = None, position: tuple[float, float] = None, can_be_stacked_upon: bool = False, x_spacing:float=None, y_spacing:float=None, each_tip_needs_separate_item = False):
"""
Initialize a ReservoirHolder instance that can hold multiple reservoirs.
Parameters
----------
size_x : float
Width of the ReservoirHolder in millimeters.
size_y : float
Depth of the ReservoirHolder in millimeters.
size_z : float
Height of the ReservoirHolder in millimeters.
hooks_across_x : int
Number of hooks along X-axis.
hooks_across_y : int
Number of hooks along Y-axis (rows of hooks).
add_height : float
relative height at which liquid is dispensed
remove_height: float
relative height at which liquid is aspirated
reservoir_template : reservoir
example or individual reservoir that will be placed across all hooks.
labware_id : str, optional
Unique ID for the holder.
position : tuple[float, float], optional
(x, y) position coordinates of the ReservoirHolder in millimeters.
If None, position is not set.
x_spacing : float, optional
Distance along x-axis between hooks in millimeters.
y_spacing : float, optional
Distance along y-axis between hooks in millimeters.
each_tip_needs_separate_item : bool, optional
If True, each pipette tip needs its own reservoir.
If False, all tips can access the same reservoir (default: False).
"""
super().__init__(size_x, size_y, size_z, offset, labware_id, position, can_be_stacked_upon=can_be_stacked_upon)
if hooks_across_x <= 0 or hooks_across_y <= 0:
raise ValueError("hooks_across_x and hooks_across_y cannot be negative or 0")
self.add_height = add_height
self.remove_height = remove_height
self._columns = hooks_across_x
self._rows = hooks_across_y
self.total_hooks = hooks_across_x * hooks_across_y
self._each_tip_needs_separate_item = each_tip_needs_separate_item
self.x_spacing = x_spacing
self.y_spacing = y_spacing
# Initialize empty hooks - maps hook_id to reservoir (or None if empty)
# hook_id ranges from 1 to total_hooks
self.__hook_to_reservoir: dict[int, Optional[Reservoir]] = {
i: None for i in range(1, self.total_hooks + 1)
}
# Place reservoirs to holder if provided
if reservoir_template is not None:
self.place_reservoirs(reservoir_template)
[docs]
def each_tip_needs_separate_item(self) -> bool:
return self._each_tip_needs_separate_item # Reservoirs are large, all tips fit in one
[docs]
def hook_id_to_position(self, hook_id: int) -> tuple[int, int]:
"""
Convert hook_id to (col, row) position.
Parameters
----------
hook_id : int
Hook ID (1-indexed, 1 to total_hooks)
Returns
-------
tuple[int, int]
(col, row) where col is 0 to hooks_across_x-1,
row is 0 to hooks_across_y-1
Example
-------
For hooks_across_x=3, hooks_across_y=2::
hook_id: 1 2 3 4 5 6
layout: [1 2 3] <- row 0
[4 5 6] <- row 1
"""
if hook_id < 1 or hook_id > self.total_hooks:
raise ValueError(f"hook_id {hook_id} out of range (1 to {self.total_hooks})")
# Convert to 0-indexed
idx = hook_id - 1
row = idx // self._columns
col = idx % self._columns
return col, row
[docs]
def position_to_hook_id(self, col: int, row: int) -> int:
"""
Convert (col, row) position to hook_id.
Parameters
----------
col : int
Column (0 to hooks_across_x-1)
row : int
Row (0 to hooks_across_y-1)
Returns
-------
int
hook_id (1-indexed)
"""
if col < 0 or col >= self._columns:
raise ValueError(f"col {col} out of range (0 to {self._columns - 1})")
if row < 0 or row >= self._rows:
raise ValueError(f"row {row} out of range (0 to {self._rows - 1})")
return row * self._columns + col + 1
[docs]
def get_reservoirs(self) -> list[Reservoir]:
"""Return list of all unique reservoirs (no duplicates)."""
seen_ids = set()
reservoirs = []
for res in self.__hook_to_reservoir.values():
if res is not None and res.labware_id not in seen_ids:
seen_ids.add(res.labware_id)
reservoirs.append(res)
return reservoirs
[docs]
def get_hook_to_reservoir_map(self) -> dict[int, Optional[Reservoir]]:
"""Return the complete hook to reservoir mapping."""
return self.__hook_to_reservoir
[docs]
def get_available_hooks(self) -> list[int]:
"""Return list of available (empty) hook IDs."""
return [hook_id for hook_id, res in self.__hook_to_reservoir.items() if res is None]
[docs]
def get_all_children(self) -> list[Reservoir]:
return self.get_reservoirs()
[docs]
def get_child_at(self, col: int, row: int) -> Optional[Reservoir]:
# Convert grid (col, row) to hook_id (1-indexed)
hook_id = self.position_to_hook_id(col, row)
return self.__hook_to_reservoir.get(hook_id)
[docs]
def get_occupied_hooks(self) -> list[int]:
"""Return list of occupied hook IDs."""
return [hook_id for hook_id, res in self.__hook_to_reservoir.items() if res is not None]
def _validate_hooks_form_rectangle(self, hook_ids: list[int]) -> tuple[bool, int, int]:
"""
Check if hook IDs form a rectangular grid.
Returns
-------
tuple[bool, int, int]
(is_valid, width, height) where width and height are in hook units
"""
if not hook_ids:
return False, 0, 0
# Convert all hook_ids to (col, row) location
location = [self.hook_id_to_position(hid) for hid in hook_ids]
cols = [pos[0] for pos in location]
rows = [pos[1] for pos in location]
min_col, max_col = min(cols), max(cols)
min_row, max_row = min(rows), max(rows)
width = max_col - min_col + 1
height = max_row - min_row + 1
# Check if all location in the rectangle are present
expected_location = {
(c, r) for c in range(min_col, max_col + 1)
for r in range(min_row, max_row + 1)
}
actual_location = set(location)
is_valid = expected_location == actual_location
return is_valid, width, height
[docs]
def place_reservoir(self, hook_ids: list[int], reservoir: Reservoir) -> None:
"""
Place a single reservoir on specific hooks.
Parameters
----------
hook_ids : list[int] or int
List of hook location to place the reservoir (must form a rectangle).
reservoir : Reservoir
Reservoir instance to place.
Raises
------
ValueError
If hook_ids are invalid, don't form a rectangle, already occupied,
or reservoir dimensions incompatible.
"""
# Allow single int for backwards compatibility
if isinstance(hook_ids, int):
hook_ids = [hook_ids]
if not hook_ids:
raise ValueError("Must specify at least one hook_id")
# Check if all hook_ids are valid and available
for hook_id in hook_ids:
if hook_id not in self.__hook_to_reservoir:
raise ValueError(
f"Hook ID {hook_id} is invalid. Must be between 1 and {self.total_hooks}"
)
if self.__hook_to_reservoir[hook_id] is not None:
raise ValueError(f"Hook {hook_id} is already occupied")
# Check if hooks form a valid rectangle
is_valid, width_hooks, height_hooks = self._validate_hooks_form_rectangle(hook_ids)
if not is_valid:
raise ValueError(
f"Hook IDs {hook_ids} must form a rectangular grid"
)
# Calculate maximum dimensions per hook
max_width_per_hook = self.size_x / self._columns
max_height_per_hook = self.size_y / self._rows
# Calculate available space for this reservoir
max_width_for_reservoir = max_width_per_hook * width_hooks
max_height_for_reservoir = max_height_per_hook * height_hooks
# Check dimensional compatibility
if reservoir.size_x > max_width_for_reservoir:
raise ValueError(
f"Reservoir width ({reservoir.size_x} mm) exceeds "
f"available width ({max_width_for_reservoir:.2f} mm = "
f"{max_width_per_hook:.2f} mm/hook × {width_hooks} hooks)"
)
if reservoir.size_y > max_height_for_reservoir:
raise ValueError(
f"Reservoir depth ({reservoir.size_y} mm) exceeds "
f"available depth ({max_height_for_reservoir:.2f} mm = "
f"{max_height_per_hook:.2f} mm/hook × {height_hooks} hooks)"
)
if reservoir.size_z > self.size_z:
raise ValueError(
f"Reservoir height ({reservoir.size_z} mm) exceeds "
f"holder height ({self.size_z} mm)"
)
is_valid, error_msg = self.validate_multichannel_compatible(reservoir.size_y)
if not is_valid:
raise ValueError(error_msg)
# Assign hook_ids and place reservoir. also find col and row
reservoir.hook_ids = hook_ids
positions = [self.hook_id_to_position(hid) for hid in hook_ids]
cols = [pos[0] for pos in positions]
rows = [pos[1] for pos in positions]
reservoir.column = min(cols) # Leftmost column
reservoir.row = min(rows) # Topmost row
reservoir.width_hooks = width_hooks
reservoir.height_hooks = height_hooks
reservoir.labware_id = f"{self.labware_id}_{reservoir.column}:{reservoir.row}"
for hook_id in hook_ids:
self.__hook_to_reservoir[hook_id] = reservoir
[docs]
def place_reservoirs(self, reservoir_template: Reservoir) -> None:
"""
allocate duplicate reservoir to all available hooks, unless specific hook id specified
If hook_ids is specified, the reservoir will be placed there, given that position is empty
Otherwise, calculates required hooks based on dimensions and allocates automatically.
Raises
------
ValueError
If a specified hook_id is occupied, insufficient space, or
reservoir parameters are invalid.
"""
template = reservoir_template
if template.hook_ids:
hook_ids_to_use = template.hook_ids
reservoir_copy = copy.deepcopy(template)
self.place_reservoir(hook_ids_to_use, reservoir_copy)
else:
max_width_per_hook = self.size_x / self._columns
max_height_per_hook = self.size_y / self._rows
reservoir_width = template.size_x
reservoir_height = template.size_y
# Calculate minimum hooks needed based on dimensions
min_hooks_x = int(reservoir_width / max_width_per_hook)
if reservoir_width % max_width_per_hook > 0: min_hooks_x += 1
min_hooks_y = int(reservoir_height / max_height_per_hook)
if reservoir_height % max_height_per_hook > 0: min_hooks_y += 1
hooks_x = min_hooks_x
hooks_y = min_hooks_y
# raises error if not even one placement is possible.
if hooks_x > self._columns or hooks_y > self._rows:
raise ValueError(
f"Placement Error: Required reservoir size is {hooks_x}x{hooks_y} hooks, "
f"but the ReservoirHolder is only {self._columns}x{self._rows}."
)
while True:
hook_ids_to_use = None
# Re-check available hooks for each placement attempt
available = set(self.get_available_hooks())
# Find the *first* available rectangular region of size hooks_x x hooks_y
for start_row in range(self._rows - hooks_y + 1):
for start_col in range(self._columns - hooks_x + 1):
# Check if this rectangular block (starting at start_row, start_col)
# is entirely available (not occupied)
candidate_hooks = []
is_available = True
for r in range(start_row, start_row + hooks_y):
for c in range(start_col, start_col + hooks_x):
hook_id = self.position_to_hook_id(c, r)
if hook_id not in available:
is_available = False
break
candidate_hooks.append(hook_id)
if not is_available:
break
if is_available:
hook_ids_to_use = candidate_hooks
break
if hook_ids_to_use:
break
# If we couldn't find a spot, we exit the loop
if hook_ids_to_use is None:
break
# --- Placement Execution for the Found Spot ---
reservoir_copy = copy.deepcopy(template)
self.place_reservoir(hook_ids_to_use, reservoir_copy)
[docs]
def remove_reservoir(self, hook_id: int) -> Reservoir:
"""
Remove a reservoir from the holder.
Parameters
----------
hook_id : int
Any hook ID occupied by the reservoir to remove
Returns
-------
Reservoir
The removed reservoir
Raises
------
ValueError
If no reservoir at the specified hook
"""
if hook_id not in self.__hook_to_reservoir:
raise ValueError(f"Invalid hook_id {hook_id}")
reservoir = self.__hook_to_reservoir[hook_id]
if reservoir is None:
raise ValueError(f"No reservoir at hook {hook_id}")
# Clear all hooks occupied by this reservoir
for hid in reservoir.hook_ids:
self.__hook_to_reservoir[hid] = None
return reservoir
[docs]
def add_content(self, hook_id: int, content: str, volume: float) -> None:
"""
Add content to a reservoir at a specific hook.
Parameters
----------
hook_id : int
Hook ID where the reservoir is located
content : str
Type of content to add (e.g., "PBS", "water")
volume : float
Volume to add (µL)
Raises
------
ValueError
If no reservoir at hook or volume exceeds capacity
"""
if hook_id not in self.__hook_to_reservoir or self.__hook_to_reservoir[hook_id] is None:
raise ValueError(f"No reservoir at hook {hook_id}")
self.__hook_to_reservoir[hook_id].add_content(content, volume)
[docs]
def remove_content(self, hook_id: int, volume: float, return_dict: bool = False) -> Optional[dict[str, float]]:
"""
Remove content from a reservoir at a specific hook.
Parameters
----------
hook_id : int
Hook ID where the reservoir is located
volume : float
Volume to remove (µL)
return_dict : bool, optional
If True, return a dictionary of removed content types and volumes (default: False)
Returns
-------
Optional[dict[str, float]]
If return_dict is True, returns dictionary mapping content types to removed volumes.
Otherwise, returns None.
Raises
------
ValueError
If no reservoir at hook or insufficient volume
"""
if hook_id not in self.__hook_to_reservoir or self.__hook_to_reservoir[hook_id] is None:
raise ValueError(f"No reservoir at hook {hook_id}")
return self.__hook_to_reservoir[hook_id].remove_content(volume, return_dict=return_dict)
[docs]
def get_waste_reservoirs(self) -> list[Reservoir]:
"""
Get all unique reservoirs that contain 'waste' in any content type.
Returns
-------
list[Reservoir]
List of waste reservoirs
"""
return [
res for res in self.get_reservoirs()
if any("waste" in content_type.lower() for content_type in res.content.keys())
]
[docs]
def get_reservoirs_by_content_type(self, content_type: str) -> list[Reservoir]:
"""
Get all unique reservoirs that contain a specific content type.
Parameters
----------
content_type : str
Content type to search for (case-insensitive)
Returns
-------
list[Reservoir]
List of reservoirs containing this content type
"""
return [
res for res in self.get_reservoirs()
if res.has_content_type(content_type)
]
[docs]
def get_state_snapshot(self) -> dict:
"""Return deep copy of all reservoirs' state"""
# Get unique reservoirs and snapshot each once
snapshots = {}
for reservoir in self.get_reservoirs():
snapshots[reservoir.labware_id] = reservoir.get_state_snapshot()
return {'reservoirs': snapshots}
[docs]
def restore_state_snapshot(self, snapshot: dict) -> None:
"""Restore all reservoirs' state from snapshot"""
reservoir_snapshots = snapshot['reservoirs']
# Restore each unique reservoir
for reservoir in self.get_reservoirs():
if reservoir.labware_id in reservoir_snapshots:
reservoir.restore_state_snapshot(reservoir_snapshots[reservoir.labware_id])
[docs]
def to_dict(self) -> dict:
"""Serialize the ReservoirHolder instance to a dictionary."""
base = super().to_dict()
# Store only unique reservoirs with their hook_ids
unique_reservoirs = {}
for res in self.get_reservoirs():
key = f"{res.column}:{res.row}" # ✅ Consistent with Plate/PipetteHolder
unique_reservoirs[key] = res.to_dict()
base.update({
"add_height": self.add_height,
"remove_height": self.remove_height,
"hooks_across_x": self.hooks_across_x,
"hooks_across_y": self.hooks_across_y,
"each_tip_needs_separate_item": self._each_tip_needs_separate_item,
"reservoirs": unique_reservoirs,
"x_spacing": self.x_spacing,
"y_spacing": self.y_spacing,
})
return base
@classmethod
def _from_dict(cls, data: dict) -> "ReservoirHolder":
"""Deserialize a ReservoirHolder instance from a dictionary."""
# Safely handle position deserialization
position = tuple(data["position"]) if data.get("position") else None
reservoir_holder = cls(
size_x=data["size_x"],
size_y=data["size_y"],
size_z=data["size_z"],
can_be_stacked_upon=data.get("can_be_stacked_upon", False),
add_height = data["add_height"],
remove_height = data["remove_height"],
offset=data["offset"],
hooks_across_x=data["hooks_across_x"],
hooks_across_y=data.get("hooks_across_y", 1), # Default to 1 for backwards compatibility
each_tip_needs_separate_item=data.get("each_tip_needs_separate_item", False),
labware_id=data["labware_id"],
reservoir_template=None,
position=position,
x_spacing=data.get("x_spacing", None),
y_spacing=data.get("y_spacing", None),
)
# Restore reservoirs
reservoirs_data = data.get("reservoirs", {})
for res_data in reservoirs_data.values():
reservoir = Serializable.from_dict(res_data)
# Place on the hooks specified in hook_ids
if reservoir.hook_ids:
reservoir_holder.place_reservoir(reservoir.hook_ids, reservoir)
return reservoir_holder
@property
def hooks_across_x(self) -> int:
return self._columns
@property
def hooks_across_y(self) -> int:
return self._rows
@property
def grid_x(self) -> int:
return self._columns
@property
def grid_y(self) -> int:
return self._rows