1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
use std::{
    cell::RefCell,
    panic,
    sync::{Arc, Once, Weak},
};

use drop_bomb::DropBomb;
use futures::Future;
use pyo3::prelude::*;

use super::{hyperion_init, RuntimeMethods};

static INITIALIZED_PYTHON: Once = Once::new();

thread_local! {
    /// Current effect context
    static CONTEXT: RefCell<Option<Context>> = RefCell::new(None);
}

/// Python effect module context
pub struct Context {
    tstate: *mut pyo3::ffi::PyThreadState,
    methods: Weak<dyn RuntimeMethods>,
    bomb: DropBomb,
}

impl Context {
    unsafe fn new(_py: Python, methods: Weak<dyn RuntimeMethods>) -> Result<Self, ()> {
        // Get the main_state ptr
        let main_state = pyo3::ffi::PyEval_SaveThread();

        // Acquire GIL again
        pyo3::ffi::PyEval_RestoreThread(main_state);

        // Create new subinterp
        let tstate = pyo3::ffi::Py_NewInterpreter();

        // Restore GIL
        pyo3::ffi::PyThreadState_Swap(main_state);

        // Return object
        if tstate.is_null() {
            Err(())
        } else {
            Ok(Self {
                tstate,
                methods,
                bomb: DropBomb::new("Context::release must be called before dropping it"),
            })
        }
    }

    unsafe fn release(&mut self, _py: Python) {
        // TODO: Stop sub threads?

        // Make this context subinterp current
        let main_thread = pyo3::ffi::PyThreadState_Swap(self.tstate);

        // Terminate it
        pyo3::ffi::Py_EndInterpreter(self.tstate);

        // Restore the main thread
        pyo3::ffi::PyThreadState_Swap(main_thread);

        // We're clear for dropping
        self.bomb.defuse();
    }

    pub fn run<U>(&self, _py: Python, f: impl FnOnce() -> U) -> U {
        unsafe {
            // Switch to the context thread
            let main_state = pyo3::ffi::PyThreadState_Swap(self.tstate);

            // Run user function
            let result = panic::catch_unwind(panic::AssertUnwindSafe(f));

            // Switch back to the main thread
            pyo3::ffi::PyThreadState_Swap(main_state);

            // Return result
            match result {
                Ok(result) => result,
                Err(panic) => panic::panic_any(panic),
            }
        }
    }

    pub fn with<U>(methods: Arc<dyn RuntimeMethods>, f: impl FnOnce(&Self) -> U) -> U {
        unsafe {
            // Initialize the Python interpreter global state
            INITIALIZED_PYTHON.call_once(|| {
                // Register our module through inittab
                pyo3::ffi::PyImport_AppendInittab(
                    b"hyperion\0".as_ptr() as *const _,
                    Some(hyperion_init),
                );

                pyo3::prepare_freethreaded_python();
            });

            let result = CONTEXT.with(|ctx| {
                // Initialize the thread-local state, i.e. interpreter
                *ctx.borrow_mut() = Some(Python::with_gil(|py| {
                    Self::new(py, Arc::downgrade(&methods))
                        .expect("failed initializing python subinterp")
                }));

                // Run user callback
                let result = {
                    let borrow = ctx.borrow();
                    let ctx = borrow.as_ref().unwrap();
                    panic::catch_unwind(panic::AssertUnwindSafe(|| f(ctx)))
                };

                // Free the interpreter
                if let Some(mut ctx) = ctx.borrow_mut().take() {
                    Python::with_gil(|py| {
                        ctx.release(py);
                    })
                }

                result
            });

            // Return result
            match result {
                Ok(result) => result,
                Err(panic) => panic::panic_any(panic),
            }
        }
    }

    pub fn with_current<F, U>(f: impl FnOnce(Arc<dyn RuntimeMethods>) -> F) -> U
    where
        F: Future<Output = U>,
    {
        CONTEXT.with(|ctx| {
            futures::executor::block_on(f(ctx
                .borrow()
                .as_ref()
                .expect("no current context")
                .methods
                .upgrade()
                .expect("no current methods")))
        })
    }
}