Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/sdbus/sd_bus_internals.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ PyObject* null_str = NULL;
PyObject* extend_str = NULL;
PyObject* append_str = NULL;
PyObject* call_soon_str = NULL;
PyObject* call_later_str = NULL;
PyObject* create_task_str = NULL;
PyObject* cancel_str = NULL;
// Exceptions
PyObject* exception_base = NULL;
PyObject* unmapped_error_exception = NULL;
Expand Down Expand Up @@ -170,7 +172,9 @@ PyMODINIT_FUNC PyInit_sd_bus_internals(void) {
set_result_str = CALL_PYTHON_AND_CHECK(PyUnicode_FromString("set_result"));
set_exception_str = CALL_PYTHON_AND_CHECK(PyUnicode_FromString("set_exception"));
call_soon_str = CALL_PYTHON_AND_CHECK(PyUnicode_FromString("call_soon"));
call_later_str = CALL_PYTHON_AND_CHECK(PyUnicode_FromString("call_later"));
create_task_str = CALL_PYTHON_AND_CHECK(PyUnicode_FromString("create_task"));
cancel_str = CALL_PYTHON_AND_CHECK(PyUnicode_FromString("cancel"));
remove_reader_str = CALL_PYTHON_AND_CHECK(PyUnicode_FromString("remove_reader"));
add_reader_str = CALL_PYTHON_AND_CHECK(PyUnicode_FromString("add_reader"));
add_writer_str = CALL_PYTHON_AND_CHECK(PyUnicode_FromString("add_writer"));
Expand Down
3 changes: 3 additions & 0 deletions src/sdbus/sd_bus_internals.h
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,9 @@ extern PyObject* null_str;
extern PyObject* extend_str;
extern PyObject* append_str;
extern PyObject* call_soon_str;
extern PyObject* call_later_str;
extern PyObject* create_task_str;
extern PyObject* cancel_str;
// Exceptions
extern PyObject* exception_base;
extern PyObject* unmapped_error_exception;
Expand Down Expand Up @@ -334,6 +336,7 @@ typedef struct {
sd_bus* sd_bus_ref;
PyObject* bus_fd;
int asyncio_watchers_last_state;
PyObject* asyncio_timeout_handle;
} SdBusObject;

extern PyType_Spec SdBusType;
Expand Down
49 changes: 48 additions & 1 deletion src/sdbus/sd_bus_internals_bus.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
static void SdBus_dealloc(SdBusObject* self) {
sd_bus_unref(self->sd_bus_ref);
Py_XDECREF(self->bus_fd);
Py_XDECREF(self->asyncio_timeout_handle);

SD_BUS_DEALLOC_TAIL;
}
Expand Down Expand Up @@ -237,8 +238,9 @@ static PyObject* SdBus_get_fd(SdBusObject* self, PyObject* Py_UNUSED(args)) {
}

static PyObject* SdBus_asyncio_update_fd_watchers(SdBusObject* self);
static PyObject* SdBus_asyncio_update_timeout(SdBusObject* self);

#define CHECK_ASYNCIO_WATCHERS ({ CALL_PYTHON_EXPECT_NONE(SdBus_asyncio_update_fd_watchers(self)); })
#define CHECK_ASYNCIO_WATCHERS ({ CALL_PYTHON_EXPECT_NONE(SdBus_asyncio_update_fd_watchers(self)); CALL_PYTHON_EXPECT_NONE(SdBus_asyncio_update_timeout(self)); })

static PyObject* SdBus_process(SdBusObject* self, PyObject* Py_UNUSED(args)) {
int return_value = 1;
Expand Down Expand Up @@ -654,6 +656,51 @@ static PyObject* SdBus_asyncio_update_fd_watchers(SdBusObject* self) {
Py_RETURN_NONE;
}

static inline uint64_t kernel_time_monotonic_us() {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (ts.tv_sec * 1000000 + ts.tv_nsec / 1000);
}

static PyObject* SdBus_asyncio_update_timeout(SdBusObject* self) {
if (NULL != self->asyncio_timeout_handle) {
// Cancel current timeout
Py_XDECREF(CALL_PYTHON_AND_CHECK(PyObject_CallMethodObjArgs(self->asyncio_timeout_handle, cancel_str, NULL)));
self->asyncio_timeout_handle = NULL;
}

uint64_t timeout_absolute_us = UINT64_MAX;
int result = sd_bus_get_timeout(self->sd_bus_ref, &timeout_absolute_us);

if (result < 0 || timeout_absolute_us == UINT64_MAX) {
// Error case or no timeout requested => no timeout installed
Py_RETURN_NONE;
}

PyObject* running_loop CLEANUP_PY_OBJECT = CALL_PYTHON_AND_CHECK(PyObject_CallFunctionObjArgs(asyncio_get_running_loop, NULL));
PyObject* drive_method CLEANUP_PY_OBJECT = CALL_PYTHON_AND_CHECK(PyObject_GetAttrString((PyObject*)self, "process"));

// Convert to relative time
uint64_t now = kernel_time_monotonic_us();
uint64_t timeout_relative_us;
if (now < timeout_absolute_us) {
timeout_relative_us = timeout_absolute_us - now;
} else {
timeout_relative_us = 0;
}

if (timeout_relative_us == 0) {
// Call immediately
Py_XDECREF(CALL_PYTHON_AND_CHECK(PyObject_CallMethodObjArgs(running_loop, call_soon_str, drive_method, NULL)));
} else {
// Call with delay
PyObject* delay_s CLEANUP_PY_OBJECT = PyFloat_FromDouble(timeout_relative_us / 1000000.0);
self->asyncio_timeout_handle = CALL_PYTHON_AND_CHECK(PyObject_CallMethodObjArgs(running_loop, call_later_str, delay_s, drive_method, NULL));
}

Py_RETURN_NONE;
}

static PyMethodDef SdBus_methods[] = {
{"call", (SD_BUS_PY_FUNC_TYPE)SdBus_call, SD_BUS_PY_METH, PyDoc_STR("Send message and block until the reply.")},
{"call_async", (SD_BUS_PY_FUNC_TYPE)SdBus_call_async, SD_BUS_PY_METH, PyDoc_STR("Async send message, returns awaitable future.")},
Expand Down