Rusts Polling mit benutzerdefinierten Futures verstehen
Takashi Yamamoto
Infrastructure Engineer · Leapcell

Einleitung
Asynchrone Programmierung hat sich zu einem unverzichtbaren Paradigma für die Erstellung performanter und reaktionsschneller Anwendungen entwickelt, insbesondere in Bereichen wie Netzwerke, I/O-gebundene Aufgaben und Systeme mit hoher Parallelität. Rust bietet mit seinem starken Typsystem und seinem Ownership-Modell einen leistungsstarken und sicheren Ansatz für die asynchrone Programmierung, der auf seinem Future
-Trait basiert. Obwohl Sie Futures oft über die async/await
-Syntax nutzen, ist ein tiefes Verständnis dafür, wie diese Abstraktionen im Hintergrund funktionieren, entscheidend für das Debugging, die Optimierung und sogar das Design benutzerdefinierter asynchroner Komponenten. Dieser Deep Dive in das Schreiben eines benutzerdefinierten Future
wird den Polling-Mechanismus entmystifizieren, den grundlegenden Tanz zwischen Ihren asynchronen Aufgaben und dem Executor enthüllen und Sie letztendlich befähigen, die asynchronen Fähigkeiten von Rust mit größerem Vertrauen und Präzision zu nutzen.
Das Herz der asynchronen Ausführung: Polling
Bevor wir unseren benutzerdefinierten Future konstruieren, wollen wir ein klares Verständnis der beteiligten Kernkonzepte aufbauen:
- Future Trait: In Rust ist ein
Future
ein Trait, der eine asynchrone Berechnung darstellt, die möglicherweise noch nicht abgeschlossen ist. Er verfügt über eine einzige Methode,poll
, die ein Executor wiederholt aufruft, um den Fortschritt des Future zu überprüfen. - Executor: Ein Executor ist dafür verantwortlich,
Future
s entgegenzunehmen und sie durch wiederholtes Aufrufen ihrerpoll
-Methode zum Abschluss zu treiben. Er verwaltet den Lebenszyklus von Futures, plant Aufgaben und kümmert sich darum, Aufgaben zu wecken, wenn sie bereit sind, Fortschritte zu machen. Beliebte Exekutoren sindtokio
undasync-std
. - Polling: Dies ist die Handlung des Executors, der die
poll
-Methode für einen nicht abgeschlossenenFuture
aufruft. Wennpoll
aufgerufen wird, versucht der Future, Fortschritte zu machen. Poll
-Enum: Diepoll
-Methode gibt einePoll
-Enum zurück, die zwei Varianten hat:Poll::Ready(T)
: Zeigt an, dass der Future erfolgreich abgeschlossen wurde undT
das Ergebnis der Berechnung ist.Poll::Pending
: Zeigt an, dass der Future noch nicht abgeschlossen ist. WennPending
zurückgegeben wird, muss der Future sicherstellen, dass er so arrangiert wird, dass er geweckt wird (überWaker
), wenn er bereit ist, weitere Fortschritte zu machen.
Waker
: EinWaker
ist ein Low-Level-Mechanismus, der vom Executor bereitgestellt wird, um es einemFuture
zu ermöglichen, zu signalisieren, dass es bereit ist, erneut gepollt zu werden. Wenn ein FuturePoll::Pending
zurückgibt, erfasst und klont es denWaker
aus demContext
. Später, wenn ein Ereignis ausgelöst wird, das den Future möglicherweise entblockiert (z. B. Daten kommen auf einem Socket an, ein Timer läuft ab), ruft der Futurewaker.wake_by_ref()
auf, um den Executor zu benachrichtigen, ihn erneut zu pollen.Context
: DerContext
, der an diepoll
-Methode übergeben wird, enthält einenWaker
und andere Informationen, die für den Future nützlich sind, um mit dem Executor zu interagieren.
Erstellen eines benutzerdefinierten Future: Ein einfacher Verzug
Erstellen wir einen benutzerdefinierten Future
, der einen nicht blockierenden Verzug einführt. Dies ermöglicht es uns, den Polling-Mechanismus direkt zu beobachten.
Wir definieren eine Delay
-Struktur, die eine deadline
(wann er abgeschlossen sein sollte) und einen optionalen Waker
speichert, um die Aufgabe zu wecken.
use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll, Waker}; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use std::thread; // Repräsentiert den Zustand unseres Delay-Futures struct Delay { deadline: Instant, // Wir müssen den Waker speichern, um den Future zu wecken, wenn die Frist abläuft. // Arc<Mutex<Option<Waker>>> ermöglicht es uns, den Waker sicher zwischen Threads zu teilen und zu ändern. waker_storage: Arc<Mutex<Option<Waker>>>, // Ein Flag, um sicherzustellen, dass wir den Timer-Thread nur einmal starten. timer_thread_spawned: bool, } impl Delay { fn new(duration: Duration) -> Self { Delay { deadline: Instant::now() + duration, waker_storage: Arc::new(Mutex::new(None)), timer_thread_spawned: false, } } } // Implementieren Sie das Future-Trait für unsere Delay-Struktur impl Future for Delay { // Der Ausgabetyp unseres Futures ist Einheit, da er nur einen Verzug darstellt. type Output = (); // Das Herz des Futures: die poll-Methode fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // Wenn die Frist bereits abgelaufen ist, ist der Future bereit. if Instant::now() >= self.deadline { println!("Delay future: Deadline reached. Returning Ready."); return Poll::Ready(()) } // --- Speichern des Wakers und Einrichten des Timers (nur einmal) --- // Wenn der Timer-Thread noch nicht gestartet wurde, richten Sie ihn ein. if !self.timer_thread_spawned { println!("Delay future: First poll. Storing waker and spawning timer thread."); // Speichern Sie den aktuellen Waker aus dem Kontext. // Dieser Waker wird vom Timer-Thread verwendet, um diese Aufgabe zu wecken. let mut waker_guard = self.waker_storage.lock().unwrap(); *waker_guard = Some(cx.waker().clone()); drop(waker_guard); // Sperre frühzeitig freigeben // Klonen Sie den Arc, um ihn an den neuen Thread zu übergeben. let waker_storage_clone = self.waker_storage.clone(); let duration_until_deadline = self.deadline.duration_since(Instant::now()); // Starten Sie einen neuen Thread, der bis zur Frist schläft // und dann die ursprüngliche Aufgabe weckt. thread::spawn(move || { thread::sleep(duration_until_deadline); println!("Delay timer thread: Deadline passed. Waking up the task."); // Rufen Sie den Waker ab und wecken Sie die Aufgabe if let Some(waker) = waker_storage_clone.lock().unwrap().take() { waker.wake(); } }); // Markieren Sie, dass der Timer-Thread gestartet wurde, um ein erneutes Starten zu vermeiden self.timer_thread_spawned = true; } else { // Dieser Teil behandelt nachfolgende Abfragen, wenn der Timer-Thread bereits ausgeführt wird. // Es ist wichtig, den Waker zu aktualisieren, wenn der Executor beschließt, die Aufgabe zu verschieben // oder neu zu planen. Wenn der Waker nicht aktualisiert wird, kann der vorherige Waker // veraltet werden, was dazu führt, dass die Aufgabe niemals geweckt wird. let mut waker_guard = self.waker_storage.lock().unwrap(); if waker_guard.as_ref().map_or(true, |w| !w.will_wake(cx.waker())) { println!("Delay future: Waker changed. Updating."); *waker_guard = Some(cx.waker().clone()); } } // Wenn die Frist noch nicht abgelaufen ist, ist der Future ausstehend. // Er wird erneut abgefragt, wenn `waker.wake()` vom Timer-Thread aufgerufen wird. println!("Delay future: Deadline not yet reached. Returning Pending."); Poll::Pending } } #[tokio::main] async fn main() { println!("Main: Starting program."); let delay_future = Delay::new(Duration::from_secs(2)); // Erstellen Sie einen 2-Sekunden-Verzug println!("Main: Awaiting delay future..."); delay_future.await; // Warten Sie auf unseren benutzerdefinierten Future println!("Main: Delay completed. Program finished."); }
Erläuterung des Delay
-Future:
-
struct Delay
:deadline
: EinInstant
, das angibt, wann der Verzug enden soll.waker_storage
: EinArc<Mutex<Option<Waker>>>
ist unerlässlich. DerWaker
muss zwischen demFuture
(dasself.waker_storage
besitzt) und dem separatenthread::spawn
(daswake
aufruft) geteilt werden.Arc
ermöglicht gemeinsamen Besitz, undMutex
bietet eine sichere interne Mutabilität, um denWaker
zu speichern und abzurufen.Option
wird verwendet, da derWaker
möglicherweise nicht beim allererstenpoll
verfügbar ist, bevor er gespeichert wird.timer_thread_spawned
: Ein einfaches boolesches Flag, um sicherzustellen, dass wir unseren "Timer"-Thread nur einmal einrichten.
-
impl Future for Delay
:type Output = ();
: Unser Verzögerungs-Future wird einfach abgeschlossen und liefert keinen aussagekräftigen Wert.poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>
: Dies ist der Kern.if Instant::now() >= self.deadline
: Bei jedem Poll prüfen wir, ob die Frist abgelaufen ist. Wenn ja, sind wirReady
und gebenPoll::Ready(())
zurück.if !self.timer_thread_spawned
: Dieser bedingte Block stellt sicher, dass wir den eigentlichen Timer (denthread::spawn
-Teil) nur einmal einrichten.let mut waker_guard = self.waker_storage.lock().unwrap(); *waker_guard = Some(cx.waker().clone());
: Wir erhalten eine Sperre für unserenwaker_storage
, klonen denWaker
aus dem aktuellenContext
und speichern ihn. DieserWaker
verweist zurück auf diese spezifische Aufgabe, die gerade abgefragt wird.thread::spawn(...)
: Wir starten einen normalen Rust-Thread. Dieser Thread wird für die verbleibende Dauersleep()
en. Dies ist ein blockierendessleep
aus der Perspektive dieses Hilfs-Threads, blockiert aber nicht den Executor-Thread, da er sich in einem separaten Betriebssystem-Thread befindet.- Innerhalb des gestarteten Threads, nach dem Schlafen, ruft er den gespeicherten
Waker
ab und ruftwaker.wake()
auf. Dieserwake()
-Aufruf teilt der async-Laufzeit (Tokio in unseremmain
) mit, dass die dieserWaker
zugeordnete Aufgabe nun bereit ist, erneut gepollt zu werden. self.timer_thread_spawned = true;
: Wir setzen das Flag auf true, um zu verhindern, dass mehrere Timer-Threads für dasselbeDelay
-Future gestartet werden.
else { ... }
: Wenn der Timer-Thread bereits gestartet wurde (d. h. dies ist ein nachfolgender Poll eines bereits ausstehenden Futures), müssen wir immer noch prüfen, ob sich derWaker
imContext
geändert hat (!w.will_wake(cx.waker())
). Wenn ja, aktualisieren wir unseren gespeichertenWaker
. Dies ist wichtig, da Exekutoren manchmal Aufgaben verschieben oder neu planen können, was einen neuenWaker
erfordert, um die Aufgabe korrekt zu benachrichtigen.Poll::Pending
: Wenn die Frist noch nicht abgelaufen ist und der Timer eingerichtet ist, wartet der Future immer noch. Wir gebenPoll::Pending
zurück. Der Executor wird das Polling dieses Futures stoppen, biswaker.wake()
aufgerufen wird.
Wie es mit tokio::main
und await
funktioniert:
Delay::new(Duration::from_secs(2))
: EineDelay
-Instanz wird erstellt.delay_future.await
: Hier geschieht die Magie.- Tokios Executor erhält
delay_future
. - Erster Poll: Der Executor ruft
delay_future.poll(cx)
auf.- Die Frist wird nicht eingehalten.
timer_thread_spawned
istfalse
.- Der
Waker
auscx
wird geklont und indelay_future.waker_storage
gespeichert. - Ein neuer
thread::spawn
wird erstellt. Dieser Thread beginnt, für 2 Sekunden zu schlafen. timer_thread_spawned
wird auftrue
gesetzt.poll
gibtPoll::Pending
zurück.
- Aktion des Executors nach
Poll::Pending
: Der Executor weiß nun, dassdelay_future
nicht bereit ist. Er legt diese Aufgabe beiseite und beginnt, andere bereite Aufgaben (falls vorhanden) abzufragen oder wartet aufwaker.wake()
-Aufrufe. Wichtig ist, dass der Tokio-Laufzeit-Thread durch denthread::sleep
unseresthread::spawn
NICHT blockiert wird. - Nach 2 Sekunden:
thread::spawn
schließt seinenthread::sleep
ab.- Er ruft den gespeicherten
Waker
ab und ruftwaker.wake()
auf.
- Er ruft den gespeicherten
- Aktion des Executors nach
waker.wake()
: Der Executor empfängt das Wecksignal für die mitdelay_future
verbundene Aufgabe. Er plantdelay_future
erneut abzufragen. - Zweiter (oder späterer) Poll: Der Executor ruft
delay_future.poll(cx)
erneut auf.- Nun ist
Instant::now() >= self.deadline
wahr. poll
gibtPoll::Ready(())
zurück.
- Nun ist
- Abschluss: Der Ausdruck
delay_future.await
wird schließlich abgeschlossen, und die Funktionmain
wird fortgesetzt.
- Tokios Executor erhält
Schlussfolgerung
Durch die Implementierung eines benutzerdefinierten Delay
-Future haben wir ein praktisches Verständnis des asynchronen Polling-Mechanismus von Rust gewonnen. Wir haben gesehen, wie Future::poll
wiederholt von einem Executor aufgerufen wird, wie Poll::Pending
einen unvollständigen Zustand signalisiert und vor allem, wie der Waker
als Brücke fungiert, die es einer wartenden Aufgabe ermöglicht, dem Executor zu signalisieren, die Abfrage fortzusetzen, wenn Fortschritte erzielt werden können. Dieser explizite Tanz zwischen dem Future
und dem Executor
über den Waker
ist das Fundament der effizienten, nicht blockierenden asynchronen Programmierung von Rust und ermöglicht leistungsstarke und skalierbare Anwendungen ohne den Overhead blockierender Threads. Die Beherrschung benutzerdefinierter Future
-Implementierungen ist eine fortgeschrittene Fähigkeit, die tiefere Einblicke in das leistungsstarke asynchrone Ökosystem von Rust ermöglicht.