Skip to content

Re-entrant subject emission error #278

@aponyrko

Description

@aponyrko

I'm seeing a re-entrant subject emission error in a situation where emissions seem to be behind an async boundary.

This is the minimal example I could create that reproduced the problem I'm seeing in my project. Help appreciated!

    #[tokio::test(flavor = "multi_thread", worker_threads = 3)]
    async fn test_combine_latest() {
        let subject1 = Shared::behavior_subject(0_i32);
        let subject1_clone = subject1.clone();
        let subject2 = Shared::behavior_subject(0_i32);
        let mut subject2_clone = subject2.clone();

        subject1
            .combine_latest(subject2, |v1, v2| v1 + v2)
            .flat_map(move |value| {
                let subject1_clone = subject1_clone.clone();

                return Shared::from_future(async move {
                    let mut subject1_clone = subject1_clone.clone();

                    let _ = tokio::spawn(async move {
                        tokio::time::sleep(Duration::from_millis(1)).await;
                        println!("subject1 emit: {value}");
                        subject1_clone.next(value);
                    })
                    .await;

                    return value;
                });
            })
            .subscribe(|value| println!("receive: {value}"));

        Shared::from_iter(0..10000)
            .concat_map(|s| Shared::of(s).delay(Duration::from_millis(1)))
            .subscribe(move |value| {
                println!("subject2 emit: {value}");
                subject2_clone.next(value);
            });

        tokio::time::sleep(Duration::from_millis(10000)).await;
    }

example output:

subject2 emit: 4
subject1 emit: 5
receive: 5
subject1 emit: 6
subject1 emit: 5

thread 'tokio-rt-worker' (282944) panicked at .cargo/registry/src/index.crates.io-1949cf8c6b5b557f/rxrust-1.0.0-rc.5/src/subject/subject_core.rs:503:1:
re-entrant Subject emissions are not supported (next/error/complete). Use an explicit async boundary (e.g. delay(0)) if you need feedback loops.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions