dlgforge.pipeline.state¶
Run-state initialization and checkpoint helpers.
init_run_state(paths, base_inputs, n_turns)
¶
Initialize run state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
paths
|
OutputPaths
|
Filesystem path used by this operation. |
required |
base_inputs
|
Dict[str, Any]
|
Mapping payload for this operation. |
required |
n_turns
|
int
|
Numeric control value for processing behavior. |
required |
Returns:
| Type | Description |
|---|---|
Tuple[str, Dict[str, Any]]
|
Tuple[str, Dict[str, Any]]: Value produced by this API. |
Raises:
| Type | Description |
|---|---|
Exception
|
Propagates unexpected runtime errors from downstream calls. |
Side Effects / I/O: - May read from or write to local filesystem artifacts.
Preconditions / Invariants: - Callers should provide arguments matching annotated types and expected data contracts.
Examples:
>>> from dlgforge.pipeline.state import init_run_state
>>> init_run_state(...)
checkpoint_run_state(paths, run_id, status, base_inputs, n_turns, turns, raw_results)
¶
Checkpoint run state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
paths
|
OutputPaths
|
Filesystem path used by this operation. |
required |
run_id
|
str
|
Identifier for run state tracking. |
required |
status
|
str
|
str value used by this operation. |
required |
base_inputs
|
Dict[str, Any]
|
Mapping payload for this operation. |
required |
n_turns
|
int
|
Numeric control value for processing behavior. |
required |
turns
|
List[Dict[str, Any]]
|
Conversation or message data used during processing. |
required |
raw_results
|
List[Dict[str, Any]]
|
Conversation or message data used during processing. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
None |
None
|
No value is returned. |
Raises:
| Type | Description |
|---|---|
Exception
|
Propagates unexpected runtime errors from downstream calls. |
Side Effects / I/O: - May read from or write to local filesystem artifacts.
Preconditions / Invariants: - Callers should provide arguments matching annotated types and expected data contracts.
Examples:
>>> from dlgforge.pipeline.state import checkpoint_run_state
>>> checkpoint_run_state(...)
init_batched_run_state(paths, base_inputs, n_turns, batch_size)
¶
Initialize batched run state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
paths
|
OutputPaths
|
Filesystem path used by this operation. |
required |
base_inputs
|
Dict[str, Any]
|
Mapping payload for this operation. |
required |
n_turns
|
int
|
Numeric control value for processing behavior. |
required |
batch_size
|
int
|
Numeric control value for processing behavior. |
required |
Returns:
| Type | Description |
|---|---|
Tuple[str, Dict[str, Any]]
|
Tuple[str, Dict[str, Any]]: Value produced by this API. |
Raises:
| Type | Description |
|---|---|
Exception
|
Propagates unexpected runtime errors from downstream calls. |
Side Effects / I/O: - May read from or write to local filesystem artifacts.
Preconditions / Invariants: - Callers should provide arguments matching annotated types and expected data contracts.
Examples:
>>> from dlgforge.pipeline.state import init_batched_run_state
>>> init_batched_run_state(...)
build_initial_batched_conversations(run_id, target_turns)
¶
Build initial batched conversations.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
run_id
|
str
|
Identifier for run state tracking. |
required |
target_turns
|
List[int]
|
List[int] value used by this operation. |
required |
Returns:
| Type | Description |
|---|---|
List[Dict[str, Any]]
|
List[Dict[str, Any]]: Constructed value derived from the provided inputs. |
Raises:
| Type | Description |
|---|---|
Exception
|
Propagates unexpected runtime errors from downstream calls. |
Side Effects / I/O: - May read from or write to local filesystem artifacts.
Preconditions / Invariants: - Callers should provide arguments matching annotated types and expected data contracts.
Examples:
>>> from dlgforge.pipeline.state import build_initial_batched_conversations
>>> build_initial_batched_conversations(...)
load_batched_conversations_from_state(run_id, state, target_turns)
¶
Load batched conversations from state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
run_id
|
str
|
Identifier for run state tracking. |
required |
state
|
Dict[str, Any]
|
Dict[str, Any] value used by this operation. |
required |
target_turns
|
List[int]
|
List[int] value used by this operation. |
required |
Returns:
| Type | Description |
|---|---|
List[Dict[str, Any]]
|
List[Dict[str, Any]]: Loaded value parsed from upstream sources. |
Raises:
| Type | Description |
|---|---|
Exception
|
Propagates unexpected runtime errors from downstream calls. |
Side Effects / I/O: - May read from or write to local filesystem artifacts.
Preconditions / Invariants: - Callers should provide arguments matching annotated types and expected data contracts.
Examples:
>>> from dlgforge.pipeline.state import load_batched_conversations_from_state
>>> load_batched_conversations_from_state(...)
checkpoint_batched_run_state(paths, run_id, status, base_inputs, n_turns, batch_size, conversations)
¶
Checkpoint batched run state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
paths
|
OutputPaths
|
Filesystem path used by this operation. |
required |
run_id
|
str
|
Identifier for run state tracking. |
required |
status
|
str
|
str value used by this operation. |
required |
base_inputs
|
Dict[str, Any]
|
Mapping payload for this operation. |
required |
n_turns
|
int
|
Numeric control value for processing behavior. |
required |
batch_size
|
int
|
Numeric control value for processing behavior. |
required |
conversations
|
List[Dict[str, Any]]
|
List[Dict[str, Any]] value used by this operation. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
None |
None
|
No value is returned. |
Raises:
| Type | Description |
|---|---|
Exception
|
Propagates unexpected runtime errors from downstream calls. |
Side Effects / I/O: - May read from or write to local filesystem artifacts.
Preconditions / Invariants: - Callers should provide arguments matching annotated types and expected data contracts.
Examples:
>>> from dlgforge.pipeline.state import checkpoint_batched_run_state
>>> checkpoint_batched_run_state(...)