Wowza Community

PushPublishRTMP switch src stream

Hi,

I want to implement the following scenario:

I’ve got a wowza engine which has two live applications. One application is the Entry Point and the other the Transcoder. Entry Point application will receive several incoming rtmp streams but it should push one of them to the Transcoder. The Transcoder application will transcode the stream to HLS in several qualities. Then, there will be a web application switching between incoming streams.

My first approach was to create an instance of PushPublishRTMP when an instance of the Entry Point application is created but do not set the source stream. When the first encoder starts to publish its stream is attached to the instance of PushPublishRTMP using the method setSrcStream. Following encoder connections will be accepted but not attached. When the web application sends the order to switch the stream I also use setSrcStream to change the stream.

This is how my code looks:

package com.wowza.ws;
//TODO: Add imports
public class EntryPointModule extends ModuleBase {
	public static String TAG = EntryPointModule.class.getSimpleName() + " ";
	
	static String CLIENT_PROP_ROLE = "role";
	static String CLIENT_PROP_EMAIL = "email";
	static String ENCODER_PROP_PUBLISHING = "isPublishing";
	static String ENCODER_PROP_STREAM = "stream";
	
	private IApplicationInstance mAppInstance;
	private PushPublishRTMP mPublisher;
	private IMediaStream mCurrentStream;
	private List<IClient> clients;
	private List<IClient> encoders;
	private List<IMediaStream> streams;
	
	class PublisherNotify implements IPushPublishRTMPNotify {
		@Override
		public void onStreamOnStatus(PushPublishRTMPNetConnectionSession pushPublisherSession, RequestFunction function,
				AMFDataList params) {
			AMFDataObj status = params.getObject(PARAM1);
			getLogger().info(TAG + "Republisher status: " + status);			
		}
	}
	
	class StreamNotify implements IMediaStreamActionNotify2 {
		public StreamNotify() {
			super();
		}
		@Override
		public void onPublish(IMediaStream stream, String streamName, boolean isRecord, boolean isAppend) {
			getLogger().info(TAG + "New stream published " + stream.getName());
			
			IClient client = stream.getClient();
			Encoder encoder = new Encoder();
			Gson gson = new Gson();
			
			client.getProperties().setProperty(ENCODER_PROP_STREAM, stream.getName());
			
			if (mCurrentStream == null) {
				getLogger().info(TAG + "There was no stream publishing. Switching to new stream");
				
				try {
					switchStream(stream);
				}catch (Throwable e) {
					getLogger().error(TAG + "It was impossible to switch to stream " + stream.getName());
					getLogger().error(TAG + e.toString());
				}
				getLogger().info(TAG + "Stream switched");
			}
			
			encoder.id = client.getClientId();
			encoder.ip = client.getIp();
			encoder.isPublishing = client.getProperties().getPropertyBoolean(ENCODER_PROP_PUBLISHING, false);
			encoder.stream = client.getProperties().getPropertyStr(ENCODER_PROP_STREAM);
			
			broadcastEvent(client.getAppInstance(), "onEncoderPublish", gson.toJson(encoder));
			
		}
	}
	
	public void switchStream(IClient client, RequestFunction function, AMFDataList params) {
		int encoderId = params.getInt(PARAM1);
		String streamName = params.getString(PARAM2);
		
		getLogger().info(TAG + "Switch stream requested to change to " + encoderId + " " + streamName);
		
		IClient encoder = client.getAppInstance().getClientById(encoderId);
		
		if (encoder != null) {
			IMediaStream stream = encoder.getAppInstance().getStreams().getStream(streamName);
			if (stream != null) {
				switchStream(stream);
			}
		}
		sendResult(client, params, true);
	}
	public void onAppStart(IApplicationInstance appInstance) throws LicensingException {
		
		String fullname = appInstance.getApplication().getName() + "/" + appInstance.getName();
		getLogger().info(TAG + "onAppStart: " + fullname);
		
		mPublisher = new PushPublishRTMP();
		getLogger().info(TAG + "Setting republisher up");
		mPublisher.setAppInstance(appInstance);
		mPublisher.setStreamName("livestream");
		mPublisher.setHost("localhost");
		mPublisher.setDstApplicationName("live");
		mPublisher.setDstAppInstanceName(appInstance.getName());
		mPublisher.setDstStreamName("livestream");
		mPublisher.setPort(1935);
		mPublisher.setConnectionFlashVerion(PushPublishRTMP.CURRENTFMLEVERSION);
		mPublisher.setSendFCPublish(true);
		mPublisher.setSendReleaseStream(true);
		mPublisher.setSendOnMetadata(true);
		mPublisher.setDebugLog(true);
		mPublisher.setDebugPackets(false);
		
		mPublisher.setHowToPublish("live");
		
		mPublisher.addListener(new PublisherNotify());
		
		getLogger().info(TAG + "Connecting to republishing application");
		mPublisher.connect();
		getLogger().info(TAG + "Republisher set up");
		
		this.clients = new ArrayList<IClient>();
		this.streams = new ArrayList<IMediaStream>();
		this.encoders = new ArrayList<IClient>();
		
		getLogger().info(TAG + "App started and configured " + fullname);
		
	}
	public void onConnect(IClient client, RequestFunction function, AMFDataList params) {
		getLogger().info(TAG + "onConnect: " + client.getClientId());
		try {
			String role = params.getString(PARAM1);
			getLogger().info(TAG + "role: " + role);
			if (role == null) {
				getLogger().info(TAG + "Undetermined client role. Assuming it is an encoder.");
				role = "encoder";
				client.getProperties().setProperty(ENCODER_PROP_PUBLISHING, false);
				this.encoders.add(client);
			}
			getLogger().info(TAG + "Client with role: "+role);
			client.getProperties().setProperty(CLIENT_PROP_ROLE, role);
			
			if (!role.equals("encoder")) {
				String email = params.getString(PARAM2);
				if (email == null) {
					client.rejectConnection("Invalid client credentials");
					return;
				}
				client.getProperties().setProperty(CLIENT_PROP_EMAIL, email);
				this.clients.add(client);
			}
			getLogger().info(TAG + "Client indentifyed. Accepting connection.");
		} catch (Error e) {
			getLogger().info(TAG + "Error getting client's role");
			getLogger().error(e);
		}
	}
	public void onConnectAccept(IClient client) {
		getLogger().info(TAG + "onConnectAccept: " + client.getClientId() + " " + client.getProperties().getPropertyStr(CLIENT_PROP_ROLE));
		Gson gson = new Gson();
		
		String clientRole = client.getProperties().getPropertyStr(CLIENT_PROP_ROLE);
		
		if (clientRole.equals("encoder")){
			Encoder newEncoder = new Encoder();
			newEncoder.ip = client.getIp();
			newEncoder.id = client.getClientId();
			newEncoder.isPublishing = client.getProperties().getPropertyBoolean(ENCODER_PROP_PUBLISHING, false);
			newEncoder.stream = client.getProperties().getPropertyStr(ENCODER_PROP_STREAM);
			broadcastEvent(client.getAppInstance(), "onEncoderConnect", gson.toJson(newEncoder));
			return;
		}
		
		List<Encoder> encoders = new ArrayList<Encoder>();
		List<ControlRoom> controlrooms = new ArrayList<ControlRoom>();
		
		IApplicationInstance app = client.getAppInstance();
		
		for (IClient c: app.getClients()) {
			if (c.getClientId() == client.getClientId()){
				continue;
			}
			String role = c.getProperties().getPropertyStr(CLIENT_PROP_ROLE);
			
			if (role.equals("encoder")) {
				Encoder encoder = new Encoder();
				encoder.id = c.getClientId();
				encoder.ip = c.getIp();
				encoder.isPublishing = c.getProperties().getPropertyBoolean(ENCODER_PROP_PUBLISHING, false);
				encoder.stream = c.getProperties().getPropertyStr(ENCODER_PROP_STREAM);
				encoders.add(encoder);
				continue;
			}
			
			ControlRoom controlRoom = new ControlRoom();
			controlRoom.id = c.getClientId();
			controlRoom.ip = c.getIp();
			controlRoom.email = c.getProperties().getPropertyStr(CLIENT_PROP_EMAIL);
			controlrooms.add(controlRoom);
		}
		getLogger().info(TAG + "Notifying current status to the new control room");
		client.call("onConnectAccept", null, gson.toJson(encoders.toArray()), gson.toJson(controlrooms.toArray()));
		
	}
	
	public void onStreamCreate(IMediaStream stream)
	{
		getLogger().info(TAG + "onStreamCreate: " + stream.getName());
		stream.addClientListener(new StreamNotify());
	}
	
	private void switchStream(IMediaStream stream) {
		getLogger().info(TAG + "About to swtich current to stream to " + stream.getName());
		if(this.mPublisher != null && this.mPublisher.getConnectedState() == PushPublishRTMP.CONNECTED_STATE_CONNECTED) {
			getLogger().info(TAG + "Switching to stream " + stream.getName());
			if (mCurrentStream != null) {
				this.mCurrentStream.getClient().getProperties().setProperty(ENCODER_PROP_PUBLISHING, false);
			}
			stream.getClient().getProperties().setProperty(ENCODER_PROP_PUBLISHING, true);
			this.mCurrentStream = stream;
			mPublisher.setSrcStream(stream);
		}
	}
	
	private void broadcastEvent(IApplicationInstance app, String method, Object... params) {
		for (IClient client : app.getClients()) {
			String role = client.getProperties().getPropertyStr(CLIENT_PROP_ROLE);
			if (!role.equals("encoder")) {
				client.call(method, null, params);
			}
		}
	}
}

So far, every thing seemed to work. However, when I play the resulting stream from the Transcoder application, every time I switch the stream the HLS stream stops and sometimes even if I refresh the player page it won’t work anymore.

I wonder if IPushPublish has a proper protocol to switch the source stream.

Thanks.