I'm seeing a re-entrant subject emission error in a situation where emissions seem to be behind an async boundary.
This is the minimal example I could create that reproduced the problem I'm seeing in my project. Help appreciated!
#[tokio::test(flavor = "multi_thread", worker_threads = 3)]
async fn test_combine_latest() {
let subject1 = Shared::behavior_subject(0_i32);
let subject1_clone = subject1.clone();
let subject2 = Shared::behavior_subject(0_i32);
let mut subject2_clone = subject2.clone();
subject1
.combine_latest(subject2, |v1, v2| v1 + v2)
.flat_map(move |value| {
let subject1_clone = subject1_clone.clone();
return Shared::from_future(async move {
let mut subject1_clone = subject1_clone.clone();
let _ = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(1)).await;
println!("subject1 emit: {value}");
subject1_clone.next(value);
})
.await;
return value;
});
})
.subscribe(|value| println!("receive: {value}"));
Shared::from_iter(0..10000)
.concat_map(|s| Shared::of(s).delay(Duration::from_millis(1)))
.subscribe(move |value| {
println!("subject2 emit: {value}");
subject2_clone.next(value);
});
tokio::time::sleep(Duration::from_millis(10000)).await;
}
example output:
subject2 emit: 4
subject1 emit: 5
receive: 5
subject1 emit: 6
subject1 emit: 5
thread 'tokio-rt-worker' (282944) panicked at .cargo/registry/src/index.crates.io-1949cf8c6b5b557f/rxrust-1.0.0-rc.5/src/subject/subject_core.rs:503:1:
re-entrant Subject emissions are not supported (next/error/complete). Use an explicit async boundary (e.g. delay(0)) if you need feedback loops.
I'm seeing a re-entrant subject emission error in a situation where emissions seem to be behind an async boundary.
This is the minimal example I could create that reproduced the problem I'm seeing in my project. Help appreciated!
example output: