Skip to main content

tower/retry/budget/
tps_budget.rs

1//! Transactions Per Minute (Tps) Budget implementations
2
3use std::{
4    fmt,
5    sync::{
6        atomic::{AtomicIsize, Ordering},
7        Mutex,
8    },
9    time::Duration,
10};
11use tokio::time::Instant;
12
13use super::Budget;
14
15/// A Transactions Per Minute config for managing retry tokens.
16///
17/// [`TpsBudget`] uses a token bucket to decide if the request should be retried.
18///
19/// [`TpsBudget`] works by checking how much retries have been made in a certain period of time.
20/// Minimum allowed number of retries are effectively reset on an interval. Allowed number of
21/// retries depends on failed request count in recent time frame.
22///
23/// For more info about [`Budget`], please see the [module-level documentation].
24///
25/// [module-level documentation]: super
26pub struct TpsBudget {
27    generation: Mutex<Generation>,
28    /// Initial budget allowed for every second.
29    reserve: isize,
30    /// Slots of a the TTL divided evenly.
31    slots: Box<[AtomicIsize]>,
32    /// The amount of time represented by each slot.
33    window: Duration,
34    /// The changers for the current slot to be committed
35    /// after the slot expires.
36    writer: AtomicIsize,
37    /// Amount of tokens to deposit for each put().
38    deposit_amount: isize,
39    /// Amount of tokens to withdraw for each try_get().
40    withdraw_amount: isize,
41}
42
43#[derive(Debug)]
44struct Generation {
45    /// Slot index of the last generation.
46    index: usize,
47    /// The timestamp since the last generation expired.
48    time: Instant,
49}
50
51// ===== impl TpsBudget =====
52
53impl TpsBudget {
54    /// Create a [`TpsBudget`] that allows for a certain percent of the total
55    /// requests to be retried.
56    ///
57    /// - The `ttl` is the duration of how long a single `deposit` should be
58    ///   considered. Must be between 1 and 60 seconds.
59    /// - The `min_per_sec` is the minimum rate of retries allowed to accommodate
60    ///   clients that have just started issuing requests, or clients that do
61    ///   not issue many requests per window.
62    /// - The `retry_percent` is the percentage of calls to `deposit` that can
63    ///   be retried. This is in addition to any retries allowed for via
64    ///   `min_per_sec`. Must be between 0 and 1000.
65    ///
66    ///   As an example, if `0.1` is used, then for every 10 calls to `deposit`,
67    ///   1 retry will be allowed. If `2.0` is used, then every `deposit`
68    ///   allows for 2 retries.
69    pub fn new(ttl: Duration, min_per_sec: u32, retry_percent: f32) -> Self {
70        // assertions taken from finagle
71        assert!(ttl >= Duration::from_secs(1));
72        assert!(ttl <= Duration::from_secs(60));
73        assert!(retry_percent >= 0.0);
74        assert!(retry_percent <= 1000.0);
75        assert!(min_per_sec < i32::MAX as u32);
76
77        let (deposit_amount, withdraw_amount) = if retry_percent == 0.0 {
78            // If there is no percent, then you gain nothing from deposits.
79            // Withdrawals can only be made against the reserve, over time.
80            (0, 1)
81        } else {
82            // Scale deposits by 1000 so fractional percentages (where
83            // 1/retry_percent truncates badly) and values > 1 both stay
84            // precise.  For example 0.6 -> (1000, 1666) gives exactly
85            // 6 retries per 10 deposits; 2.0 -> (1000, 500) gives 20.
86            (1000, (1000.0 / retry_percent) as isize)
87        };
88        let reserve = (min_per_sec as isize)
89            .saturating_mul(ttl.as_secs() as isize) // ttl is between 1 and 60 seconds
90            .saturating_mul(withdraw_amount);
91
92        // AtomicIsize isn't clone, so the slots need to be built in a loop...
93        let windows = 10u32;
94        let mut slots = Vec::with_capacity(windows as usize);
95        for _ in 0..windows {
96            slots.push(AtomicIsize::new(0));
97        }
98
99        TpsBudget {
100            generation: Mutex::new(Generation {
101                index: 0,
102                time: Instant::now(),
103            }),
104            reserve,
105            slots: slots.into_boxed_slice(),
106            window: ttl / windows,
107            writer: AtomicIsize::new(0),
108            deposit_amount,
109            withdraw_amount,
110        }
111    }
112
113    fn expire(&self) {
114        let mut gen = self.generation.lock().expect("generation lock");
115
116        let now = Instant::now();
117        let diff = now.saturating_duration_since(gen.time);
118        if diff < self.window {
119            // not expired yet
120            return;
121        }
122
123        let to_commit = self.writer.swap(0, Ordering::SeqCst);
124        self.slots[gen.index].store(to_commit, Ordering::SeqCst);
125
126        let mut diff = diff;
127        let mut idx = (gen.index + 1) % self.slots.len();
128        while diff > self.window {
129            self.slots[idx].store(0, Ordering::SeqCst);
130            diff -= self.window;
131            idx = (idx + 1) % self.slots.len();
132        }
133
134        gen.index = idx;
135        gen.time = now;
136    }
137
138    fn sum(&self) -> isize {
139        let current = self.writer.load(Ordering::SeqCst);
140        let windowed_sum: isize = self
141            .slots
142            .iter()
143            .map(|slot| slot.load(Ordering::SeqCst))
144            // fold() is used instead of sum() to determine overflow behavior
145            .fold(0, isize::saturating_add);
146
147        current
148            .saturating_add(windowed_sum)
149            .saturating_add(self.reserve)
150    }
151
152    fn put(&self, amt: isize) {
153        self.expire();
154        self.writer.fetch_add(amt, Ordering::SeqCst);
155    }
156
157    fn try_get(&self, amt: isize) -> bool {
158        debug_assert!(amt >= 0);
159
160        self.expire();
161
162        let sum = self.sum();
163        if sum >= amt {
164            self.writer.fetch_add(-amt, Ordering::SeqCst);
165            true
166        } else {
167            false
168        }
169    }
170}
171
172impl Budget for TpsBudget {
173    fn deposit(&self) {
174        self.put(self.deposit_amount)
175    }
176
177    fn withdraw(&self) -> bool {
178        self.try_get(self.withdraw_amount)
179    }
180}
181
182impl Default for TpsBudget {
183    fn default() -> Self {
184        TpsBudget::new(Duration::from_secs(10), 10, 0.2)
185    }
186}
187
188impl fmt::Debug for TpsBudget {
189    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
190        f.debug_struct("Budget")
191            .field("deposit", &self.deposit_amount)
192            .field("withdraw", &self.withdraw_amount)
193            .field("balance", &self.sum())
194            .finish()
195    }
196}
197
198#[cfg(test)]
199mod tests {
200    use crate::retry::budget::Budget;
201
202    use super::*;
203    use tokio::time;
204
205    #[test]
206    fn tps_empty() {
207        let bgt = TpsBudget::new(Duration::from_secs(1), 0, 1.0);
208        assert!(!bgt.withdraw());
209    }
210
211    #[tokio::test]
212    async fn tps_leaky() {
213        time::pause();
214
215        let bgt = TpsBudget::new(Duration::from_secs(1), 0, 1.0);
216        bgt.deposit();
217
218        time::advance(Duration::from_secs(3)).await;
219
220        assert!(!bgt.withdraw());
221    }
222
223    #[tokio::test]
224    async fn tps_slots() {
225        time::pause();
226
227        let bgt = TpsBudget::new(Duration::from_secs(1), 0, 0.5);
228        bgt.deposit();
229        bgt.deposit();
230        time::advance(Duration::from_millis(901)).await;
231        // 900ms later, the deposit should still be valid
232        assert!(bgt.withdraw());
233
234        // blank slate
235        time::advance(Duration::from_millis(2001)).await;
236
237        bgt.deposit();
238        time::advance(Duration::from_millis(301)).await;
239        bgt.deposit();
240        time::advance(Duration::from_millis(801)).await;
241        bgt.deposit();
242
243        // the first deposit is expired, but the 2nd should still be valid,
244        // combining with the 3rd
245        assert!(bgt.withdraw());
246    }
247
248    #[tokio::test]
249    async fn tps_reserve() {
250        let bgt = TpsBudget::new(Duration::from_secs(1), 5, 1.0);
251        assert!(bgt.withdraw());
252        assert!(bgt.withdraw());
253        assert!(bgt.withdraw());
254        assert!(bgt.withdraw());
255        assert!(bgt.withdraw());
256
257        assert!(!bgt.withdraw());
258    }
259
260    #[test]
261    fn tps_fractional_retry_percent_below_one() {
262        let bgt = TpsBudget::new(Duration::from_secs(1), 0, 0.6);
263        for _ in 0..10 {
264            bgt.deposit();
265        }
266        let allowed = (0..10).filter(|_| bgt.withdraw()).count();
267        assert!(
268            allowed <= 6,
269            "10 deposits at retry_percent=0.6 should allow at most 6 retries, got {allowed}"
270        );
271    }
272}