""" Demonstration of clock cycles in Tinker training. This script shows the difference between pipelined and non-pipelined training loops. In Tinker, training happens on discrete clock cycles (~10 seconds each). If you don't have a request queued when a cycle starts, you'll miss that cycle entirely. The pipelined version submits the next batch while waiting for the current batch to finish, maximizing utilization of clock cycles. """ import time import asyncio import tinker from tinker import types import numpy as np def create_dummy_datum(): """Create a minimal training datum for demonstration purposes.""" n = 16000 return types.Datum( model_input=types.ModelInput.from_ints(np.arange(n).tolist()), loss_fn_inputs=dict( weights=np.ones(16000, dtype=np.float32), target_tokens=np.arange(16000, dtype=np.int64), ), ) def main_non_pipelined(): """ Non-pipelined training loop: Submit a batch, wait for it to complete, then submit the next. This approach may waste clock cycles because there's a gap between when one batch finishes and when the next batch is submitted. """ print("\n" + "=" * 70) print("RUNNING NON-PIPELINED TRAINING") print("=" * 70) print( "In this version, we wait for each batch to complete before submitting the next." ) print("This can lead to missing clock cycles during the gap between batches.\n") service_client = tinker.ServiceClient() training_client = service_client.create_lora_training_client( base_model="meta-llama/Llama-3.2-1B" ) num_steps = 5 last_completion_time = None last_clock_cycle = None first_clock_cycle = None start_time = time.time() for step in range(num_steps): step_start = time.time() print(f"\n[Step {step}]") # Submit forward-backward pass print(f"[Step {step}] Submitting forward_backward...") fwdbwd_future = training_client.forward_backward( [create_dummy_datum()], loss_fn="cross_entropy", ) # Submit optimizer step print(f"[Step {step}] Submitting optim_step...") optim_future = training_client.optim_step( adam_params=types.AdamParams(learning_rate=1e-4) ) # Wait for results print(f"[Step {step}] Waiting for forward_backward to complete...") fwdbwd_result = fwdbwd_future.result() print(f"[Step {step}] Waiting for optim_step to complete...") optim_result = optim_future.result() # Extract metrics (clock cycle is in fwdbwd_result.metrics) fwdbwd_metrics = fwdbwd_result.metrics current_clock_cycle = fwdbwd_metrics["clock_cycle:unique"] loss = fwdbwd_metrics["loss:sum"] if first_clock_cycle is None: first_clock_cycle = current_clock_cycle completion_time = time.time() step_duration = completion_time - step_start print(f"[Step {step}] ✓ Step completed in {step_duration:.2f}s") print(f"[Step {step}] Loss: {loss:.4f}") print(f"[Step {step}] Clock cycle: {current_clock_cycle}") # Calculate and display clock cycles elapsed and time since last completion if last_clock_cycle is not None: cycles_elapsed = current_clock_cycle - last_clock_cycle time_since_last = completion_time - last_completion_time print( f"[Step {step}] Clock cycles elapsed since last step: {cycles_elapsed}" ) print(f"[Step {step}] Time since last completion: {time_since_last:.2f}s") if cycles_elapsed > 1: print( f"[Step {step}] ⚠️ MISSED {cycles_elapsed - 1} CLOCK CYCLE(S) (expected - non-pipelined wastes cycles)" ) last_completion_time = completion_time last_clock_cycle = current_clock_cycle total_time = time.time() - start_time total_clock_cycles = current_clock_cycle - first_clock_cycle print("\n" + "=" * 70) print(f"Non-pipelined training completed: {num_steps} steps") print(f" Total time: {total_time:.2f}s") print(f" Total clock cycles used: {total_clock_cycles}") print("=" * 70 + "\n") return {"total_time": total_time, "total_clock_cycles": total_clock_cycles} async def main_pipelined(): """ Pipelined training loop: Submit the next batch before waiting for the current batch. This approach maximizes clock cycle utilization by ensuring there's always a request queued when a new cycle starts. """ print("\n" + "=" * 70) print("RUNNING PIPELINED TRAINING") print("=" * 70) print( "In this version, we submit the next batch before waiting for the current one." ) print("This ensures we don't miss any clock cycles.\n") service_client = tinker.ServiceClient() training_client = await service_client.create_lora_training_client_async( base_model="meta-llama/Llama-3.2-1B" ) num_steps = 5 last_completion_time = None last_clock_cycle = None first_clock_cycle = None pending_futures = None start_time = time.time() for step in range(num_steps): submit_start = time.time() print(f"\n[Step {step}]") # Submit the next batch immediately (before waiting for previous results) print(f"[Step {step}] Submitting forward_backward...") fwdbwd_future = await training_client.forward_backward_async( [create_dummy_datum()], loss_fn="cross_entropy", ) print(f"[Step {step}] Submitting optim_step...") optim_future = await training_client.optim_step_async( adam_params=types.AdamParams(learning_rate=1e-4) ) # Now wait for the PREVIOUS batch to complete (if there is one) if pending_futures is not None: prev_step = step - 1 print(f"[Step {step}] Waiting for step {prev_step} to complete...") prev_fwdbwd, prev_optim = pending_futures fwdbwd_result = await prev_fwdbwd optim_result = await prev_optim # Extract metrics (clock cycle is in fwdbwd_result.metrics) fwdbwd_metrics = fwdbwd_result.metrics current_clock_cycle = fwdbwd_metrics["clock_cycle:unique"] loss = fwdbwd_metrics["loss:sum"] if first_clock_cycle is None: first_clock_cycle = current_clock_cycle completion_time = time.time() prev_duration = completion_time - prev_submit_start print(f"[Step {step}] ✓ Step {prev_step} completed in {prev_duration:.2f}s") print(f"[Step {step}] Loss: {loss:.4f}") print(f"[Step {step}] Clock cycle: {current_clock_cycle}") # Calculate and display clock cycles elapsed and time since last completion if last_clock_cycle is not None: cycles_elapsed = current_clock_cycle - last_clock_cycle time_since_last = completion_time - last_completion_time print( f"[Step {step}] Clock cycles elapsed since last step: {cycles_elapsed}" ) print( f"[Step {step}] Time since last completion: {time_since_last:.2f}s" ) if cycles_elapsed == 1: print(f"[Step {step}] ✓ Perfect! Only 1 clock cycle used.") else: print( f"[Step {step}] ⚠️ Used {cycles_elapsed} clock cycles (should be 1)!" ) last_completion_time = completion_time last_clock_cycle = current_clock_cycle # Store current futures for next iteration pending_futures = (fwdbwd_future, optim_future) prev_submit_start = submit_start # Wait for the last batch to complete if pending_futures is not None: print(f"\n[Final] Waiting for step {num_steps - 1} to complete...") fwdbwd_result = await pending_futures[0] optim_result = await pending_futures[1] # Extract metrics (clock cycle is in fwdbwd_result.metrics) fwdbwd_metrics = fwdbwd_result.metrics current_clock_cycle = fwdbwd_metrics["clock_cycle:unique"] loss = fwdbwd_metrics["loss:sum"] if first_clock_cycle is None: first_clock_cycle = current_clock_cycle completion_time = time.time() final_duration = completion_time - prev_submit_start print(f"[Final] ✓ Step {num_steps - 1} completed in {final_duration:.2f}s") print(f"[Final] Loss: {loss:.4f}") print(f"[Final] Clock cycle: {current_clock_cycle}") # Calculate and display clock cycles elapsed and time since last completion if last_clock_cycle is not None: cycles_elapsed = current_clock_cycle - last_clock_cycle time_since_last = completion_time - last_completion_time print(f"[Final] Clock cycles elapsed since last step: {cycles_elapsed}") print(f"[Final] Time since last completion: {time_since_last:.2f}s") if cycles_elapsed == 1: print("[Final] ✓ Perfect! Only 1 clock cycle used.") else: print(f"[Final] ⚠️ Used {cycles_elapsed} clock cycles (should be 1)!") total_time = time.time() - start_time total_clock_cycles = current_clock_cycle - first_clock_cycle print("\n" + "=" * 70) print(f"Pipelined training completed: {num_steps} steps") print(f" Total time: {total_time:.2f}s") print(f" Total clock cycles used: {total_clock_cycles}") print("=" * 70 + "\n") return {"total_time": total_time, "total_clock_cycles": total_clock_cycles} if __name__ == "__main__": print("\n" + "=" * 70) print("CLOCK CYCLES DEMONSTRATION") print("=" * 70) print("\nThis script demonstrates the impact of pipelining on training efficiency.") print("Watch the 'Clock cycles elapsed' to see the difference!") print( "- Pipelined: Should typically use 1 clock cycle per step when server is lightly loaded" ) print("- Non-pipelined: May use 2+ clock cycles per step (wasting time)\n") # Run non-pipelined version non_pipelined_stats = main_non_pipelined() # Run pipelined version pipelined_stats = asyncio.run(main_pipelined()) # Show comparison print("\n" + "=" * 70) print("PERFORMANCE COMPARISON") print("=" * 70) print( "\n┌─────────────────────────────┬──────────────┬──────────────┬─────────────┐" ) print("│ Metric │ Non-Pipelined│ Pipelined │ Improvement │") print("├─────────────────────────────┼──────────────┼──────────────┼─────────────┤") np_time = non_pipelined_stats["total_time"] p_time = pipelined_stats["total_time"] time_savings = ((np_time - p_time) / np_time) * 100 time_np = f"{np_time:.2f}s" time_p = f"{p_time:.2f}s" improvement_time = f"{time_savings:.1f}%" print(f"│ {'Total Time':<28}│ {time_np:^13}│ {time_p:^13}│ {improvement_time:^12}│") np_cycles = non_pipelined_stats["total_clock_cycles"] p_cycles = pipelined_stats["total_clock_cycles"] cycle_savings = ((np_cycles - p_cycles) / np_cycles) * 100 cycles_np = f"{int(np_cycles)}" cycles_p = f"{int(p_cycles)}" improvement_cycles = f"{cycle_savings:.1f}%" print(f"│ {'Clock Cycles Used':<28}│ {cycles_np:^13}│ {cycles_p:^13}│ {improvement_cycles:^12}│") print("└─────────────────────────────┴──────────────┴──────────────┴─────────────┘") print("\n" + "=" * 70) print("SUMMARY") print("=" * 70) print( f"\n🎉 Pipelining saved {cycle_savings:.0f}% of clock cycles and {time_savings:.0f}% of time!" ) print("=" * 70 + "\n")