final class io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver<T> extends java.util.concurrent.atomic.AtomicInteger implements io.reactivex.Observer<T>, io.reactivex.disposables.Disposable
minor version: 0
major version: 59
flags: flags: (0x0030) ACC_FINAL, ACC_SUPER
this_class: io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver
super_class: java.util.concurrent.atomic.AtomicInteger
{
private static final long serialVersionUID;
descriptor: J
flags: (0x001a) ACC_PRIVATE, ACC_STATIC, ACC_FINAL
ConstantValue: -4592979584110982903
final io.reactivex.Observer<? super T> downstream;
descriptor: Lio/reactivex/Observer;
flags: (0x0010) ACC_FINAL
Signature: Lio/reactivex/Observer<-TT;>;
final java.util.concurrent.atomic.AtomicReference<io.reactivex.disposables.Disposable> mainDisposable;
descriptor: Ljava/util/concurrent/atomic/AtomicReference;
flags: (0x0010) ACC_FINAL
Signature: Ljava/util/concurrent/atomic/AtomicReference<Lio/reactivex/disposables/Disposable;>;
final io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver$OtherObserver<T> otherObserver;
descriptor: Lio/reactivex/internal/operators/observable/ObservableMergeWithSingle$MergeWithObserver$OtherObserver;
flags: (0x0010) ACC_FINAL
Signature: Lio/reactivex/internal/operators/observable/ObservableMergeWithSingle$MergeWithObserver$OtherObserver<TT;>;
final io.reactivex.internal.util.AtomicThrowable error;
descriptor: Lio/reactivex/internal/util/AtomicThrowable;
flags: (0x0010) ACC_FINAL
volatile io.reactivex.internal.fuseable.SimplePlainQueue<T> queue;
descriptor: Lio/reactivex/internal/fuseable/SimplePlainQueue;
flags: (0x0040) ACC_VOLATILE
Signature: Lio/reactivex/internal/fuseable/SimplePlainQueue<TT;>;
T singleItem;
descriptor: Ljava/lang/Object;
flags: (0x0000)
Signature: TT;
volatile boolean disposed;
descriptor: Z
flags: (0x0040) ACC_VOLATILE
volatile boolean mainDone;
descriptor: Z
flags: (0x0040) ACC_VOLATILE
volatile int otherState;
descriptor: I
flags: (0x0040) ACC_VOLATILE
static final int OTHER_STATE_HAS_VALUE;
descriptor: I
flags: (0x0018) ACC_STATIC, ACC_FINAL
ConstantValue: 1
static final int OTHER_STATE_CONSUMED_OR_EMPTY;
descriptor: I
flags: (0x0018) ACC_STATIC, ACC_FINAL
ConstantValue: 2
void <init>(io.reactivex.Observer<? super T>);
descriptor: (Lio/reactivex/Observer;)V
flags: (0x0000)
Code:
stack=4, locals=2, args_size=2
start local 0 start local 1 0: aload 0
invokespecial java.util.concurrent.atomic.AtomicInteger.<init>:()V
1: aload 0
aload 1
putfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.downstream:Lio/reactivex/Observer;
2: aload 0
new java.util.concurrent.atomic.AtomicReference
dup
invokespecial java.util.concurrent.atomic.AtomicReference.<init>:()V
putfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.mainDisposable:Ljava/util/concurrent/atomic/AtomicReference;
3: aload 0
new io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver$OtherObserver
dup
aload 0
invokespecial io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver$OtherObserver.<init>:(Lio/reactivex/internal/operators/observable/ObservableMergeWithSingle$MergeWithObserver;)V
putfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.otherObserver:Lio/reactivex/internal/operators/observable/ObservableMergeWithSingle$MergeWithObserver$OtherObserver;
4: aload 0
new io.reactivex.internal.util.AtomicThrowable
dup
invokespecial io.reactivex.internal.util.AtomicThrowable.<init>:()V
putfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.error:Lio/reactivex/internal/util/AtomicThrowable;
5: return
end local 1 end local 0 LocalVariableTable:
Start End Slot Name Signature
0 6 0 this Lio/reactivex/internal/operators/observable/ObservableMergeWithSingle$MergeWithObserver<TT;>;
0 6 1 downstream Lio/reactivex/Observer<-TT;>;
Signature: (Lio/reactivex/Observer<-TT;>;)V
MethodParameters:
Name Flags
downstream
public void onSubscribe(io.reactivex.disposables.Disposable);
descriptor: (Lio/reactivex/disposables/Disposable;)V
flags: (0x0001) ACC_PUBLIC
Code:
stack=2, locals=2, args_size=2
start local 0 start local 1 0: aload 0
getfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.mainDisposable:Ljava/util/concurrent/atomic/AtomicReference;
aload 1
invokestatic io.reactivex.internal.disposables.DisposableHelper.setOnce:(Ljava/util/concurrent/atomic/AtomicReference;Lio/reactivex/disposables/Disposable;)Z
pop
1: return
end local 1 end local 0 LocalVariableTable:
Start End Slot Name Signature
0 2 0 this Lio/reactivex/internal/operators/observable/ObservableMergeWithSingle$MergeWithObserver<TT;>;
0 2 1 d Lio/reactivex/disposables/Disposable;
MethodParameters:
Name Flags
d
public void onNext();
descriptor: (Ljava/lang/Object;)V
flags: (0x0001) ACC_PUBLIC
Code:
stack=3, locals=3, args_size=2
start local 0 start local 1 0: aload 0
iconst_0
iconst_1
invokevirtual io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.compareAndSet:(II)Z
ifeq 4
1: aload 0
getfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.downstream:Lio/reactivex/Observer;
aload 1
invokeinterface io.reactivex.Observer.onNext:(Ljava/lang/Object;)V
2: aload 0
invokevirtual io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.decrementAndGet:()I
ifne 8
3: return
4: StackMap locals:
StackMap stack:
aload 0
invokevirtual io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.getOrCreateQueue:()Lio/reactivex/internal/fuseable/SimplePlainQueue;
astore 2
start local 2 5: aload 2
aload 1
invokeinterface io.reactivex.internal.fuseable.SimplePlainQueue.offer:(Ljava/lang/Object;)Z
pop
6: aload 0
invokevirtual io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.getAndIncrement:()I
ifeq 8
7: return
end local 2 8: StackMap locals:
StackMap stack:
aload 0
invokevirtual io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.drainLoop:()V
9: return
end local 1 end local 0 LocalVariableTable:
Start End Slot Name Signature
0 10 0 this Lio/reactivex/internal/operators/observable/ObservableMergeWithSingle$MergeWithObserver<TT;>;
0 10 1 t TT;
5 8 2 q Lio/reactivex/internal/fuseable/SimplePlainQueue<TT;>;
Signature: (TT;)V
MethodParameters:
Name Flags
t
public void onError(java.lang.Throwable);
descriptor: (Ljava/lang/Throwable;)V
flags: (0x0001) ACC_PUBLIC
Code:
stack=2, locals=2, args_size=2
start local 0 start local 1 0: aload 0
getfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.error:Lio/reactivex/internal/util/AtomicThrowable;
aload 1
invokevirtual io.reactivex.internal.util.AtomicThrowable.addThrowable:(Ljava/lang/Throwable;)Z
ifeq 4
1: aload 0
getfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.otherObserver:Lio/reactivex/internal/operators/observable/ObservableMergeWithSingle$MergeWithObserver$OtherObserver;
invokestatic io.reactivex.internal.disposables.DisposableHelper.dispose:(Ljava/util/concurrent/atomic/AtomicReference;)Z
pop
2: aload 0
invokevirtual io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.drain:()V
3: goto 5
4: StackMap locals:
StackMap stack:
aload 1
invokestatic io.reactivex.plugins.RxJavaPlugins.onError:(Ljava/lang/Throwable;)V
5: StackMap locals:
StackMap stack:
return
end local 1 end local 0 LocalVariableTable:
Start End Slot Name Signature
0 6 0 this Lio/reactivex/internal/operators/observable/ObservableMergeWithSingle$MergeWithObserver<TT;>;
0 6 1 ex Ljava/lang/Throwable;
MethodParameters:
Name Flags
ex
public void onComplete();
descriptor: ()V
flags: (0x0001) ACC_PUBLIC
Code:
stack=2, locals=1, args_size=1
start local 0 0: aload 0
iconst_1
putfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.mainDone:Z
1: aload 0
invokevirtual io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.drain:()V
2: return
end local 0 LocalVariableTable:
Start End Slot Name Signature
0 3 0 this Lio/reactivex/internal/operators/observable/ObservableMergeWithSingle$MergeWithObserver<TT;>;
public boolean isDisposed();
descriptor: ()Z
flags: (0x0001) ACC_PUBLIC
Code:
stack=1, locals=1, args_size=1
start local 0 0: aload 0
getfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.mainDisposable:Ljava/util/concurrent/atomic/AtomicReference;
invokevirtual java.util.concurrent.atomic.AtomicReference.get:()Ljava/lang/Object;
checkcast io.reactivex.disposables.Disposable
invokestatic io.reactivex.internal.disposables.DisposableHelper.isDisposed:(Lio/reactivex/disposables/Disposable;)Z
ireturn
end local 0 LocalVariableTable:
Start End Slot Name Signature
0 1 0 this Lio/reactivex/internal/operators/observable/ObservableMergeWithSingle$MergeWithObserver<TT;>;
public void dispose();
descriptor: ()V
flags: (0x0001) ACC_PUBLIC
Code:
stack=2, locals=1, args_size=1
start local 0 0: aload 0
iconst_1
putfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.disposed:Z
1: aload 0
getfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.mainDisposable:Ljava/util/concurrent/atomic/AtomicReference;
invokestatic io.reactivex.internal.disposables.DisposableHelper.dispose:(Ljava/util/concurrent/atomic/AtomicReference;)Z
pop
2: aload 0
getfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.otherObserver:Lio/reactivex/internal/operators/observable/ObservableMergeWithSingle$MergeWithObserver$OtherObserver;
invokestatic io.reactivex.internal.disposables.DisposableHelper.dispose:(Ljava/util/concurrent/atomic/AtomicReference;)Z
pop
3: aload 0
invokevirtual io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.getAndIncrement:()I
ifne 6
4: aload 0
aconst_null
putfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.queue:Lio/reactivex/internal/fuseable/SimplePlainQueue;
5: aload 0
aconst_null
putfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.singleItem:Ljava/lang/Object;
6: StackMap locals:
StackMap stack:
return
end local 0 LocalVariableTable:
Start End Slot Name Signature
0 7 0 this Lio/reactivex/internal/operators/observable/ObservableMergeWithSingle$MergeWithObserver<TT;>;
void otherSuccess();
descriptor: (Ljava/lang/Object;)V
flags: (0x0000)
Code:
stack=3, locals=2, args_size=2
start local 0 start local 1 0: aload 0
iconst_0
iconst_1
invokevirtual io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.compareAndSet:(II)Z
ifeq 4
1: aload 0
getfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.downstream:Lio/reactivex/Observer;
aload 1
invokeinterface io.reactivex.Observer.onNext:(Ljava/lang/Object;)V
2: aload 0
iconst_2
putfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.otherState:I
3: goto 8
4: StackMap locals:
StackMap stack:
aload 0
aload 1
putfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.singleItem:Ljava/lang/Object;
5: aload 0
iconst_1
putfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.otherState:I
6: aload 0
invokevirtual io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.getAndIncrement:()I
ifeq 8
7: return
8: StackMap locals:
StackMap stack:
aload 0
invokevirtual io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.drainLoop:()V
9: return
end local 1 end local 0 LocalVariableTable:
Start End Slot Name Signature
0 10 0 this Lio/reactivex/internal/operators/observable/ObservableMergeWithSingle$MergeWithObserver<TT;>;
0 10 1 value TT;
Signature: (TT;)V
MethodParameters:
Name Flags
value
void otherError(java.lang.Throwable);
descriptor: (Ljava/lang/Throwable;)V
flags: (0x0000)
Code:
stack=2, locals=2, args_size=2
start local 0 start local 1 0: aload 0
getfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.error:Lio/reactivex/internal/util/AtomicThrowable;
aload 1
invokevirtual io.reactivex.internal.util.AtomicThrowable.addThrowable:(Ljava/lang/Throwable;)Z
ifeq 4
1: aload 0
getfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.mainDisposable:Ljava/util/concurrent/atomic/AtomicReference;
invokestatic io.reactivex.internal.disposables.DisposableHelper.dispose:(Ljava/util/concurrent/atomic/AtomicReference;)Z
pop
2: aload 0
invokevirtual io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.drain:()V
3: goto 5
4: StackMap locals:
StackMap stack:
aload 1
invokestatic io.reactivex.plugins.RxJavaPlugins.onError:(Ljava/lang/Throwable;)V
5: StackMap locals:
StackMap stack:
return
end local 1 end local 0 LocalVariableTable:
Start End Slot Name Signature
0 6 0 this Lio/reactivex/internal/operators/observable/ObservableMergeWithSingle$MergeWithObserver<TT;>;
0 6 1 ex Ljava/lang/Throwable;
MethodParameters:
Name Flags
ex
io.reactivex.internal.fuseable.SimplePlainQueue<T> getOrCreateQueue();
descriptor: ()Lio/reactivex/internal/fuseable/SimplePlainQueue;
flags: (0x0000)
Code:
stack=3, locals=2, args_size=1
start local 0 0: aload 0
getfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.queue:Lio/reactivex/internal/fuseable/SimplePlainQueue;
astore 1
start local 1 1: aload 1
ifnonnull 4
2: new io.reactivex.internal.queue.SpscLinkedArrayQueue
dup
invokestatic io.reactivex.internal.operators.observable.ObservableMergeWithSingle.bufferSize:()I
invokespecial io.reactivex.internal.queue.SpscLinkedArrayQueue.<init>:(I)V
astore 1
3: aload 0
aload 1
putfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.queue:Lio/reactivex/internal/fuseable/SimplePlainQueue;
4: StackMap locals: io.reactivex.internal.fuseable.SimplePlainQueue
StackMap stack:
aload 1
areturn
end local 1 end local 0 LocalVariableTable:
Start End Slot Name Signature
0 5 0 this Lio/reactivex/internal/operators/observable/ObservableMergeWithSingle$MergeWithObserver<TT;>;
1 5 1 q Lio/reactivex/internal/fuseable/SimplePlainQueue<TT;>;
Signature: ()Lio/reactivex/internal/fuseable/SimplePlainQueue<TT;>;
void drain();
descriptor: ()V
flags: (0x0000)
Code:
stack=1, locals=1, args_size=1
start local 0 0: aload 0
invokevirtual io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.getAndIncrement:()I
ifne 2
1: aload 0
invokevirtual io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.drainLoop:()V
2: StackMap locals:
StackMap stack:
return
end local 0 LocalVariableTable:
Start End Slot Name Signature
0 3 0 this Lio/reactivex/internal/operators/observable/ObservableMergeWithSingle$MergeWithObserver<TT;>;
void drainLoop();
descriptor: ()V
flags: (0x0000)
Code:
stack=2, locals=8, args_size=1
start local 0 0: aload 0
getfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.downstream:Lio/reactivex/Observer;
astore 1
start local 1 1: iconst_1
istore 2
start local 2 2: StackMap locals: io.reactivex.Observer int
StackMap stack:
aload 0
getfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.disposed:Z
ifeq 6
3: aload 0
aconst_null
putfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.singleItem:Ljava/lang/Object;
4: aload 0
aconst_null
putfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.queue:Lio/reactivex/internal/fuseable/SimplePlainQueue;
5: return
6: StackMap locals:
StackMap stack:
aload 0
getfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.error:Lio/reactivex/internal/util/AtomicThrowable;
invokevirtual io.reactivex.internal.util.AtomicThrowable.get:()Ljava/lang/Object;
ifnull 11
7: aload 0
aconst_null
putfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.singleItem:Ljava/lang/Object;
8: aload 0
aconst_null
putfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.queue:Lio/reactivex/internal/fuseable/SimplePlainQueue;
9: aload 1
aload 0
getfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.error:Lio/reactivex/internal/util/AtomicThrowable;
invokevirtual io.reactivex.internal.util.AtomicThrowable.terminate:()Ljava/lang/Throwable;
invokeinterface io.reactivex.Observer.onError:(Ljava/lang/Throwable;)V
10: return
11: StackMap locals:
StackMap stack:
aload 0
getfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.otherState:I
istore 3
start local 3 12: iload 3
iconst_1
if_icmpne 18
13: aload 0
getfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.singleItem:Ljava/lang/Object;
astore 4
start local 4 14: aload 0
aconst_null
putfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.singleItem:Ljava/lang/Object;
15: aload 0
iconst_2
putfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.otherState:I
16: iconst_2
istore 3
17: aload 1
aload 4
invokeinterface io.reactivex.Observer.onNext:(Ljava/lang/Object;)V
end local 4 18: StackMap locals: int
StackMap stack:
aload 0
getfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.mainDone:Z
istore 4
start local 4 19: aload 0
getfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.queue:Lio/reactivex/internal/fuseable/SimplePlainQueue;
astore 5
start local 5 20: aload 5
ifnull 21
aload 5
invokeinterface io.reactivex.internal.fuseable.SimplePlainQueue.poll:()Ljava/lang/Object;
goto 22
StackMap locals: int io.reactivex.internal.fuseable.SimplePlainQueue
StackMap stack:
21: aconst_null
StackMap locals:
StackMap stack: java.lang.Object
22: astore 6
start local 6 23: aload 6
ifnonnull 24
iconst_1
goto 25
StackMap locals: java.lang.Object
StackMap stack:
24: iconst_0
StackMap locals:
StackMap stack: int
25: istore 7
start local 7 26: iload 4
ifeq 30
iload 7
ifeq 30
iload 3
iconst_2
if_icmpne 30
27: aload 0
aconst_null
putfield io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.queue:Lio/reactivex/internal/fuseable/SimplePlainQueue;
28: aload 1
invokeinterface io.reactivex.Observer.onComplete:()V
29: return
30: StackMap locals: int
StackMap stack:
iload 7
ifeq 32
31: goto 34
32: StackMap locals:
StackMap stack:
aload 1
aload 6
invokeinterface io.reactivex.Observer.onNext:(Ljava/lang/Object;)V
end local 7 end local 6 end local 5 end local 4 end local 3 33: goto 2
34: StackMap locals: io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver io.reactivex.Observer int
StackMap stack:
aload 0
iload 2
ineg
invokevirtual io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver.addAndGet:(I)I
istore 2
35: iload 2
ifne 2
36: return
end local 2 end local 1 end local 0 LocalVariableTable:
Start End Slot Name Signature
0 37 0 this Lio/reactivex/internal/operators/observable/ObservableMergeWithSingle$MergeWithObserver<TT;>;
1 37 1 actual Lio/reactivex/Observer<-TT;>;
2 37 2 missed I
12 33 3 os I
14 18 4 v TT;
19 33 4 d Z
20 33 5 q Lio/reactivex/internal/fuseable/SimplePlainQueue<TT;>;
23 33 6 v TT;
26 33 7 empty Z
}
Signature: <T:Ljava/lang/Object;>Ljava/util/concurrent/atomic/AtomicInteger;Lio/reactivex/Observer<TT;>;Lio/reactivex/disposables/Disposable;
SourceFile: "ObservableMergeWithSingle.java"
NestHost: io.reactivex.internal.operators.observable.ObservableMergeWithSingle
InnerClasses:
final MergeWithObserver = io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver of io.reactivex.internal.operators.observable.ObservableMergeWithSingle
final OtherObserver = io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver$OtherObserver of io.reactivex.internal.operators.observable.ObservableMergeWithSingle$MergeWithObserver