Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;

//TODO: Fix warnings
public class PcObserver implements PeerConnection.Observer {
Expand All @@ -31,17 +32,8 @@ public class PcObserver implements PeerConnection.Observer {
private BiConsumer<List<MediaStream>, RtpReceiver> onAddTrack;
private Consumer<String> onVideoTrack;

private Consumer<PeerConnection.SignalingState> onSignalingState;
private Consumer<PeerConnection.PeerConnectionState> onPeerConnectionState;
private Consumer<PeerConnection.IceGatheringState> onIceGatheringState;
private Consumer<PeerConnection.IceConnectionState> onIceConnectionState;
private Consumer<IceCandidate> onIceCandidate;
private Consumer<MediaStream> onAddStream;
private Consumer<MediaStream> onRemoveStream;
private Consumer<DataChannel> onDataChannel;
private Runnable onRenegotiationNeeded;
private Consumer<MediaStreamTrack> onTrack;
private Consumer<RtpReceiver> onRemoveTrack;
private final Map<String,String> streamIdsByTracks = new HashMap<>();

public PcObserver(
PeerConnectionFactory peerConnectionFactory,
Expand All @@ -55,21 +47,6 @@ public PcObserver(
this.onIceCandidate = onIceCandidate;
}

public PcObserver(
PeerConnectionFactory peerConnectionFactory,
PmxKeyStore store,
TrackObserver observer
) {
this(peerConnectionFactory, store, observer,null);
}

public PcObserver(
PeerConnectionFactory peerConnectionFactory,
PmxKeyStore store
) {
this(peerConnectionFactory, store, null,null);
}

public void setOnAddTrack(BiConsumer<List<MediaStream>, RtpReceiver> onAddTrack) {
this.onAddTrack = onAddTrack;
}
Expand Down Expand Up @@ -109,9 +86,7 @@ public void onIceCandidatesRemoved(IceCandidate[] iceCandidates) {
}

@Override
public void onAddStream(MediaStream mediaStream) {

}
public void onAddStream(MediaStream mediaStream) {}

@Override
public void onRemoveStream(MediaStream mediaStream) {
Expand All @@ -130,31 +105,39 @@ public void onRenegotiationNeeded() {

@Override
public void onAddTrack(RtpReceiver receiver, MediaStream[] mediaStreams) {
if (peerConnectionFactory != null && receiver.track() != null && receiver.track().id().equals(null)) {
MediaStreamTrack track = receiver.track();
if(track != null && mediaStreams.length > 0) {
streamIdsByTracks.put(track.id(), mediaStreams[0].getId());
}
}

@Override
public void onTrack(RtpTransceiver transceiver) {
RtpReceiver rtpReceiver = transceiver.getReceiver();
MediaStreamTrack track = rtpReceiver.track();
if (peerConnectionFactory != null && track != null && track.id() != null) {
frameCryptorMap.put(
receiver.track().id(),
track.id(),
PmxFrameCryptorFactory.createPmxFrameCryptorForRtpReceiver(
peerConnectionFactory,
receiver,
rtpReceiver,
keyStore
)
);
}
if (onAddTrack != null) {
onAddTrack.accept(Arrays.asList(mediaStreams), receiver);

if (Objects.equals(receiver.track().kind(), MediaStreamTrack.VIDEO_TRACK_KIND)) {
onVideoTrack.accept(receiver.track().id());
if(trackObserver != null){
String streamId = streamIdsByTracks.get(track.id());
if(streamId != null) {
trackObserver.OnRemoteTrack(streamId, track);
}
}
}
}

@Override
public void onTrack(RtpTransceiver transceiver) {
RtpReceiver rtpReceiver = transceiver.getReceiver();
MediaStreamTrack track = rtpReceiver.track();
if (trackObserver != null) trackObserver.onTrack(track);
PmxFrameCryptorFactory.createPmxFrameCryptorForRtpReceiver(peerConnectionFactory, rtpReceiver, keyStore);
public void onRemoveTrack(RtpReceiver receiver) {
//TODO: cleanup track cryptors (?)
// onRemoveTrack.accept(receiver.track());
receiver.dispose();
}

public void setFrameCryptorOptions(PmxFrameCryptor.PmxFrameCryptorOptions options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
import java.util.function.BiConsumer;
import java.util.function.Function;

public class PeerConnectionManager {
class PeerConnectionManager {
private final Map<String, RoomJanusSession> sessions = new HashMap<>();
private final Map<Long, String> sessionHandles = new HashMap<>();
private final PeerConnectionFactory pcFactory;
protected final PeerConnectionFactory pcFactory;
private final BiConsumer<Long,String> onTrickle;

public PeerConnectionManager(
PeerConnectionManager(
PeerConnectionFactory pcFactory,
BiConsumer<Long,String> onTrickle
) {
Expand All @@ -31,9 +31,9 @@ public PeerConnectionManager(
}

@NonNull
public RoomJanusSession createSession(@NonNull String streamRoomId, TrackObserver defaultTrackObserver) {
public RoomJanusSession createSession(@NonNull String streamRoomId) {
return Optional.ofNullable(
sessions.putIfAbsent(streamRoomId, new RoomJanusSession(streamRoomId, pcFactory, defaultTrackObserver, onTrickle))
sessions.putIfAbsent(streamRoomId, new RoomJanusSession(streamRoomId, pcFactory, onTrickle))
).orElse(Objects.requireNonNull(sessions.get(streamRoomId)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
import org.webrtc.PmxKeyStore;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
Expand All @@ -33,14 +36,14 @@ public class RoomJanusSession {
private final PmxKeyStore keyStore;
public final WebRTCImpl webrtc = new WebRTCImpl();
private final BiConsumer<Long,String> onTrickle;
private final TrackObserver defaultTrackObserver;
private final Map<String, TrackObserver> trackObserversByStreamId = new HashMap<>();
private final TrackObserver trackObserver = new TrackObserverImpl();

//TODO: Add error listener for catch errors from webrtcInterface
public RoomJanusSession(@NonNull String roomId, @NonNull PeerConnectionFactory pcFactory, TrackObserver defaultTrackObserver, BiConsumer<Long,String> onTrickle) {
public RoomJanusSession(@NonNull String roomId, @NonNull PeerConnectionFactory pcFactory, BiConsumer<Long,String> onTrickle) {
this.pcFactory = pcFactory;
this.roomID = roomId;
this.keyStore = PmxFrameCryptorFactory.createPmxKeyStore();
this.defaultTrackObserver = defaultTrackObserver;
this.onTrickle = onTrickle;
}

Expand All @@ -55,7 +58,7 @@ public synchronized JanusPublisher getPublisher() {
}

public synchronized void createSubscriber() {
createSubscriber(defaultTrackObserver);
createSubscriber(trackObserver);
}

public synchronized void createSubscriber(TrackObserver observer) {
Expand All @@ -70,7 +73,7 @@ public synchronized void createSubscriber(TrackObserver observer) {
}

public synchronized void createPublisher() {
createPublisher(defaultTrackObserver);
createPublisher(null);
}

public synchronized void createPublisher(TrackObserver observer) {
Expand All @@ -84,6 +87,30 @@ public synchronized void createPublisher(TrackObserver observer) {
}
}

public void setTrackObserver(
TrackObserver trackObserver
){
setTrackObserver(null,trackObserver);
}

public void setTrackObserver(
String streamId,
TrackObserver trackObserver
){
synchronized (trackObserversByStreamId) {
trackObserversByStreamId.put(streamId, trackObserver);
}
}

public void setFrameCryptorOptions(PmxFrameCryptor.PmxFrameCryptorOptions options) {
if(subscriber != null){
subscriber.setFrameCryptorOptions(options);
}
if(publisher != null){
publisher.setFrameCryptorOptions(options);
}
}

public class WebRTCImpl implements WebRTCInterface {
private final ExecutorService executor = Executors.newSingleThreadExecutor();

Expand Down Expand Up @@ -171,14 +198,18 @@ public void updateSessionId(String streamRoomId, Long sessionId, String connecti
}
}
}


public void setFrameCryptorOptions(PmxFrameCryptor.PmxFrameCryptorOptions options) {
if(subscriber != null){
subscriber.setFrameCryptorOptions(options);
}
if(publisher != null){
publisher.setFrameCryptorOptions(options);
private class TrackObserverImpl implements TrackObserver{
@Override
public void OnRemoteTrack(String streamId, MediaStreamTrack track) {
synchronized (trackObserversByStreamId){
Optional.ofNullable(trackObserversByStreamId.get(streamId)).ifPresent(observer->{
observer.OnRemoteTrack(streamId,track);
});

Optional.ofNullable(trackObserversByStreamId.get(null)).ifPresent(observer->{
observer.OnRemoteTrack(streamId,track);
});
}
}
}
}
Loading