Async and Futures

Bios provides both synchronous and asynchronous APIs for all operations. Understanding when and how to use async patterns is crucial for achieving optimal training performance, especially when working with distributed GPU clusters.

Why Async Matters in Distributed Training

Bios training runs on discrete clock cycles (~10 seconds each). If you don't have a request queued when a cycle starts, you'll miss that cycle entirely. Async APIs allow you to queue multiple operations efficiently, maximizing GPU utilization and minimizing idle time.

Sync and Async APIs

Every method in the Bios Python library has both synchronous and asynchronous variants. The async methods end with _async:

ClientSync MethodAsync Method
ServiceClientcreate_lora_training_client()create_lora_training_client_async()
TrainingClientforward_backward()forward_backward_async()
SamplingClientsample()sample_async()
RestClientlist_training_run_ids()list_training_run_ids_async()

Async Requirement

Bios's async functionality requires an asyncio event loop, typically run with asyncio.run(main()):

1import asyncio
2import bios
3
4async def main():
5    service_client = bios.ServiceClient()
6    training_client = await service_client.create_lora_training_client_async(
7        base_model="ultrasafe/usf-finance"
8    )
9    # ... async training code ...
10
11# Run the async function
12asyncio.run(main())

When to Use Each

Use Async When:

  • • High-performance workflows requiring concurrency
  • • Multiple operations can run in parallel
  • • Waiting on multiple network calls simultaneously
  • • Production training pipelines
  • • Maximizing GPU cluster utilization

Use Sync When:

  • • Learning examples and tutorials
  • • Simple scripts and prototypes
  • • Sequential operations without parallelism
  • • Easier to reason about execution flow
  • • No need for asyncio event loop

Bios Cookbook Pattern: The Bios Cookbook uses async for performance-critical implementations and sync for pedagogical examples. For production training, async is strongly recommended.

Understanding Futures

Most Bios API methods are non-blocking and return immediately with a Future object. This acknowledges that your request has been submitted to the server, but the computation may still be running.

Sync API with Futures

In synchronous Python, call result() on the future to block until completion:

Synchronous Future Pattern
1# Submit request (returns immediately)
2future = client.forward_backward(data, loss_fn)
3
4# Block until complete and get result
5result = future.result()
6
7print(f"Loss: {result.loss}")

Async API with Futures (Double Await)

In async Python, you use await twice—once to submit the request, and once to retrieve the result:

Asynchronous Future Pattern
1# First await: submit request and get future
2future = await client.forward_backward_async(data, loss_fn)
3
4# Second await: wait for completion and get result
5result = await future
6
7print(f"Loss: {result.loss}")

Double Await Semantics

After the first await, you're guaranteed that:

  • • The request has been successfully submitted to Bios
  • • The operation will be ordered correctly relative to other requests
  • • You can safely submit the next request

The second await ensures:

  • • The computation has finished on the GPU cluster
  • • Results are available for use
  • • For forward_backward, gradients are accumulated in optimizer state

Sync vs Async: Side-by-Side Comparison

Synchronous API
1import bios
2
3# No event loop needed
4service_client = bios.ServiceClient()
5
6# Create training client (blocks)
7training_client = service_client.create_lora_training_client(
8    base_model="ultrasafe/usf-finance"
9)
10
11# Training loop (each call blocks)
12for batch in dataloader:
13    # Submit and wait
14    future = training_client.forward_backward(
15        batch, "cross_entropy"
16    )
17    result = future.result()
18    
19    # Submit and wait
20    training_client.optim_step()
21
22print("Training complete")
Asynchronous API
1import asyncio
2import bios
3
4async def main():
5    # Requires event loop
6    service_client = bios.ServiceClient()
7    
8    # Create training client (async)
9    training_client = await service_client.create_lora_training_client_async(
10        base_model="ultrasafe/usf-finance"
11    )
12    
13    # Training loop (concurrent ops)
14    for batch in dataloader:
15        # Submit (double await pattern)
16        future = await training_client.forward_backward_async(
17            batch, "cross_entropy"
18        )
19        result = await future
20        
21        # Submit optimizer step
22        await training_client.optim_step_async()
23    
24    print("Training complete")
25
26asyncio.run(main())

Performance Tip: Overlap Requests

For optimal performance, submit your next request while the current one is running. This is critical for Bios because training runs on ~10-second cycles—missing a cycle means wasted GPU time.

❌ Suboptimal Pattern (Sequential)

Waiting for each operation to complete before submitting the next:

Slow: Sequential Execution
1async def slow_training():
2    # BAD: Wait for each operation sequentially
3    future1 = await client.forward_backward_async(batch1, loss_fn)
4    result1 = await future1  # Blocks here
5    
6    # GPU may be idle while we prepare next request
7    future2 = await client.forward_backward_async(batch2, loss_fn)
8    result2 = await future2  # Blocks here
9    
10    # More wasted time...
11    future3 = await client.forward_backward_async(batch3, loss_fn)
12    result3 = await future3

✅ Optimal Pattern (Overlapping)

Submit multiple requests before waiting for results:

Fast: Overlapping Execution
1async def fast_training():
2    # GOOD: Submit all requests first
3    future1 = await client.forward_backward_async(batch1, loss_fn)
4    future2 = await client.forward_backward_async(batch2, loss_fn)
5    future3 = await client.forward_backward_async(batch3, loss_fn)
6    
7    # Now retrieve results (operations run in parallel on GPUs)
8    result1 = await future1
9    result2 = await future2
10    result3 = await future3
11    
12    # GPU cluster stays busy throughout!

Performance Gain

The overlapping pattern can improve throughput by 2-3x by keeping GPUs continuously busy. This is especially important for operations like forward_backwardthat take significant time to compute.

Advanced Pattern: Pipelined Training

For maximum performance, pipeline your training loop to continuously keep the GPU cluster busy:

Pipelined Training Loop
1import asyncio
2import bios
3from collections import deque
4
5async def pipelined_training(training_client, dataloader, pipeline_depth=4):
6    """
7    High-performance training with request pipelining
8    Keeps GPU cluster continuously busy
9    """
10    futures = deque()
11    
12    # Prime the pipeline
13    for i, batch in enumerate(dataloader):
14        if i >= pipeline_depth:
15            break
16        
17        # Submit forward_backward
18        fwd_future = await training_client.forward_backward_async(
19            batch, "cross_entropy"
20        )
21        
22        # Submit optim_step
23        opt_future = await training_client.optim_step_async()
24        
25        futures.append((fwd_future, opt_future))
26    
27    # Process remaining batches
28    for batch in dataloader[pipeline_depth:]:
29        # Retrieve oldest results
30        fwd_future, opt_future = futures.popleft()
31        fwd_result = await fwd_future
32        opt_result = await opt_future
33        
34        # Log metrics
35        print(f"Loss: {fwd_result.loss:.4f}")
36        
37        # Submit new requests
38        new_fwd = await training_client.forward_backward_async(
39            batch, "cross_entropy"
40        )
41        new_opt = await training_client.optim_step_async()
42        futures.append((new_fwd, new_opt))
43    
44    # Drain pipeline
45    while futures:
46        fwd_future, opt_future = futures.popleft()
47        await fwd_future
48        await opt_future
49    
50    print("Pipeline training complete!")
51
52# Run with asyncio
53asyncio.run(pipelined_training(client, batches, pipeline_depth=4))

Pipeline Benefits

⚡ Lower Latency

Reduces idle GPU time between batches

📈 Higher Throughput

Process more batches per cycle

💰 Better Cost

More efficient GPU utilization

Concurrent Operations with asyncio.gather

Use asyncio.gather() to submit and wait for multiple independent operations concurrently:

Parallel Operations with gather()
1import asyncio
2import bios
3
4async def parallel_evaluation(training_client, test_prompts):
5    """
6    Evaluate model on multiple test cases concurrently
7    """
8    # Save weights for sampling
9    sampling_client = training_client.save_weights_and_get_sampling_client(
10        name="eval_checkpoint"
11    )
12    
13    # Create sampling tasks for all prompts
14    sampling_tasks = []
15    for prompt in test_prompts:
16        task = sampling_client.sample_async(
17            prompt=prompt,
18            sampling_params=types.SamplingParams(max_tokens=256)
19        )
20        sampling_tasks.append(task)
21    
22    # Wait for all futures to be submitted
23    futures = await asyncio.gather(*sampling_tasks)
24    
25    # Wait for all results
26    results = await asyncio.gather(*futures)
27    
28    return results
29
30# Run concurrent evaluation
31test_prompts = [
32    types.ModelInput.from_ints(tokenizer.encode("Test 1")),
33    types.ModelInput.from_ints(tokenizer.encode("Test 2")),
34    types.ModelInput.from_ints(tokenizer.encode("Test 3")),
35]
36
37results = asyncio.run(parallel_evaluation(client, test_prompts))
38for i, result in enumerate(results):
39    print(f"Result {i}: {tokenizer.decode(result.sequences[0].tokens)}")

Async Best Practices

✓ Do

  • • Use async for production training pipelines
  • • Submit multiple requests before awaiting results
  • • Maintain a pipeline depth of 4-8 requests
  • • Use asyncio.gather() for independent operations
  • • Profile your pipeline to find optimal depth
  • • Handle exceptions properly with try/except

✗ Don't

  • • Don't await results immediately after submission
  • • Don't use sync API for high-throughput training
  • • Don't submit too many requests (causes memory issues)
  • • Don't forget error handling in async code
  • • Don't mix sync and async in the same workflow
  • • Don't block the event loop with CPU-heavy operations

Error Handling with Async

Properly handle errors in async workflows to ensure robustness:

Robust Error Handling
1import asyncio
2import bios
3from bios.exceptions import BiosAPIError
4
5async def robust_training(training_client, dataloader):
6    """
7    Training loop with proper error handling
8    """
9    for step, batch in enumerate(dataloader):
10        try:
11            # Submit operations
12            fwd_future = await training_client.forward_backward_async(
13                batch, "cross_entropy"
14            )
15            opt_future = await training_client.optim_step_async()
16            
17            # Wait for results
18            fwd_result = await fwd_future
19            opt_result = await opt_future
20            
21            print(f"Step {step}: Loss = {fwd_result.loss:.4f}")
22            
23        except BiosAPIError as e:
24            print(f"Step {step}: API error - {e}")
25            # Optionally retry or skip batch
26            continue
27            
28        except asyncio.TimeoutError:
29            print(f"Step {step}: Timeout - retrying...")
30            # Implement retry logic
31            
32        except Exception as e:
33            print(f"Step {step}: Unexpected error - {e}")
34            # Save checkpoint before failing
35            training_client.save_state(name=f"error_step_{step}")
36            raise
37
38asyncio.run(robust_training(client, batches))