Skip to content

dlgforge.pipeline.state

Run-state initialization and checkpoint helpers.

init_run_state(paths, base_inputs, n_turns)

Initialize run state.

Parameters:

Name Type Description Default
paths OutputPaths

Filesystem path used by this operation.

required
base_inputs Dict[str, Any]

Mapping payload for this operation.

required
n_turns int

Numeric control value for processing behavior.

required

Returns:

Type Description
Tuple[str, Dict[str, Any]]

Tuple[str, Dict[str, Any]]: Value produced by this API.

Raises:

Type Description
Exception

Propagates unexpected runtime errors from downstream calls.

Side Effects / I/O: - May read from or write to local filesystem artifacts.

Preconditions / Invariants: - Callers should provide arguments matching annotated types and expected data contracts.

Examples:

>>> from dlgforge.pipeline.state import init_run_state
>>> init_run_state(...)

checkpoint_run_state(paths, run_id, status, base_inputs, n_turns, turns, raw_results)

Checkpoint run state.

Parameters:

Name Type Description Default
paths OutputPaths

Filesystem path used by this operation.

required
run_id str

Identifier for run state tracking.

required
status str

str value used by this operation.

required
base_inputs Dict[str, Any]

Mapping payload for this operation.

required
n_turns int

Numeric control value for processing behavior.

required
turns List[Dict[str, Any]]

Conversation or message data used during processing.

required
raw_results List[Dict[str, Any]]

Conversation or message data used during processing.

required

Returns:

Name Type Description
None None

No value is returned.

Raises:

Type Description
Exception

Propagates unexpected runtime errors from downstream calls.

Side Effects / I/O: - May read from or write to local filesystem artifacts.

Preconditions / Invariants: - Callers should provide arguments matching annotated types and expected data contracts.

Examples:

>>> from dlgforge.pipeline.state import checkpoint_run_state
>>> checkpoint_run_state(...)

init_batched_run_state(paths, base_inputs, n_turns, batch_size)

Initialize batched run state.

Parameters:

Name Type Description Default
paths OutputPaths

Filesystem path used by this operation.

required
base_inputs Dict[str, Any]

Mapping payload for this operation.

required
n_turns int

Numeric control value for processing behavior.

required
batch_size int

Numeric control value for processing behavior.

required

Returns:

Type Description
Tuple[str, Dict[str, Any]]

Tuple[str, Dict[str, Any]]: Value produced by this API.

Raises:

Type Description
Exception

Propagates unexpected runtime errors from downstream calls.

Side Effects / I/O: - May read from or write to local filesystem artifacts.

Preconditions / Invariants: - Callers should provide arguments matching annotated types and expected data contracts.

Examples:

>>> from dlgforge.pipeline.state import init_batched_run_state
>>> init_batched_run_state(...)

build_initial_batched_conversations(run_id, target_turns)

Build initial batched conversations.

Parameters:

Name Type Description Default
run_id str

Identifier for run state tracking.

required
target_turns List[int]

List[int] value used by this operation.

required

Returns:

Type Description
List[Dict[str, Any]]

List[Dict[str, Any]]: Constructed value derived from the provided inputs.

Raises:

Type Description
Exception

Propagates unexpected runtime errors from downstream calls.

Side Effects / I/O: - May read from or write to local filesystem artifacts.

Preconditions / Invariants: - Callers should provide arguments matching annotated types and expected data contracts.

Examples:

>>> from dlgforge.pipeline.state import build_initial_batched_conversations
>>> build_initial_batched_conversations(...)

load_batched_conversations_from_state(run_id, state, target_turns)

Load batched conversations from state.

Parameters:

Name Type Description Default
run_id str

Identifier for run state tracking.

required
state Dict[str, Any]

Dict[str, Any] value used by this operation.

required
target_turns List[int]

List[int] value used by this operation.

required

Returns:

Type Description
List[Dict[str, Any]]

List[Dict[str, Any]]: Loaded value parsed from upstream sources.

Raises:

Type Description
Exception

Propagates unexpected runtime errors from downstream calls.

Side Effects / I/O: - May read from or write to local filesystem artifacts.

Preconditions / Invariants: - Callers should provide arguments matching annotated types and expected data contracts.

Examples:

>>> from dlgforge.pipeline.state import load_batched_conversations_from_state
>>> load_batched_conversations_from_state(...)

checkpoint_batched_run_state(paths, run_id, status, base_inputs, n_turns, batch_size, conversations)

Checkpoint batched run state.

Parameters:

Name Type Description Default
paths OutputPaths

Filesystem path used by this operation.

required
run_id str

Identifier for run state tracking.

required
status str

str value used by this operation.

required
base_inputs Dict[str, Any]

Mapping payload for this operation.

required
n_turns int

Numeric control value for processing behavior.

required
batch_size int

Numeric control value for processing behavior.

required
conversations List[Dict[str, Any]]

List[Dict[str, Any]] value used by this operation.

required

Returns:

Name Type Description
None None

No value is returned.

Raises:

Type Description
Exception

Propagates unexpected runtime errors from downstream calls.

Side Effects / I/O: - May read from or write to local filesystem artifacts.

Preconditions / Invariants: - Callers should provide arguments matching annotated types and expected data contracts.

Examples:

>>> from dlgforge.pipeline.state import checkpoint_batched_run_state
>>> checkpoint_batched_run_state(...)