Wowza Community

PushPublishRTMP addSession, removeSession being called frequently

Logs: Attached in the file

Infrastructure:

service-1 -> Wowza -> service-2

service-1 sends a RTMP stream to Wowza servers, which then uses PushPublishRTMP module (code added at end of this description) to forward stream to service-2

Issue: We have been getting a lot of addSession and removeSession for streams, which causes the RTMP server in service-2 to behave abnormally. Note that the incoming stream from service-1 is steady in this entire duration, so it is unclear to us why Wowza is restarting sessions. Is there a way to increase timeout so that Wowza tries using the current session instead of creating a new one?

package com.mindtickle.dynamic.stream.target.adder;

import java.util.HashMap;
import java.util.Map;

import com.wowza.wms.amf.AMFPacket;
import com.wowza.wms.application.IApplicationInstance;
import com.wowza.wms.application.WMSProperties;
import com.wowza.wms.module.ModuleBase;
import com.wowza.wms.pushpublish.protocol.rtmp.PushPublishRTMP;
import com.wowza.wms.stream.IMediaStream;
import com.wowza.wms.stream.IMediaStreamActionNotify2;

public class ModuleTargetAdder extends ModuleBase
{
    String appName;
    String destinationIP;
    int destinationPort;
    Map<IMediaStream, PushPublishRTMP> publishers = new HashMap<IMediaStream, PushPublishRTMP>();

    private boolean checkPropertiesPresent() {
        boolean propertiesPresent = true;
        if (destinationIP.isEmpty() || destinationPort == 0) {
            propertiesPresent = false;
        }
        return propertiesPresent;
    }

    public void onAppStart(IApplicationInstance appInstance) {
        appName = appInstance.getApplication().getName();

        getLogger().info("ModuleTargetAdder: Starting for application:" + appName);
        WMSProperties props = appInstance.getProperties();

        destinationIP = props.getPropertyStr("destinationIP", destinationIP);
        destinationPort = props.getPropertyInt("destinationPort", destinationPort);

        if (!checkPropertiesPresent()) {
            getLogger().error("ModuleTargetAdder: All properties required are not present. Module won't function for application:" + appName);
        }
    }

    public void onStreamCreate(IMediaStream stream)
    {
        getLogger().info("ModuleTargetAdder: onStreamCreate hit for stream:" + stream);
        WMSProperties props = stream.getProperties();
        synchronized(props)
        {
            StreamNotify streamNotify = new StreamNotify();
            props.put("streamNotify", streamNotify);
            stream.addClientListener(streamNotify);
        }
    }

    public void onStreamDestroy(IMediaStream stream)
    {
        getLogger().info("ModuleTargetAdder: onStreamDestroy hit for stream:" + stream);
        WMSProperties props = stream.getProperties();
        synchronized(props)
        {
            StreamNotify streamNotify = (StreamNotify)props.get("streamNotify");
            stream.removeClientListener(streamNotify);
        }
    }

    class StreamNotify implements IMediaStreamActionNotify2
    {
        public void onPublish(IMediaStream stream, String streamName, boolean isRecord, boolean isAppend)
        {
            if (!checkPropertiesPresent()) {
                getLogger().error("ModuleTargetAdder: All properties required are not present. Skipping publishing to target");
            } else {
                try
                {
                    IApplicationInstance appInstance = stream.getStreams().getAppInstance();

                    synchronized(publishers)
                    {
                        PushPublishRTMP publisher = new PushPublishRTMP();

                        // Source stream
                        publisher.setAppInstance(appInstance);
                        publisher.setSrcStreamName(streamName);

                        // Destination stream
                        String dstApplication = appInstance.getApplication().getName();
                        String dstStreamName = streamName;
                        String flashVersion = PushPublishRTMP.CURRENTFMLEVERSION;

                        // Destination stream
                        publisher.setHost(destinationIP);
                        publisher.setPort(destinationPort);
                        publisher.setDstApplicationName(dstApplication);
                        publisher.setDstStreamName(dstStreamName);
                        publisher.setConnectionFlashVersion(flashVersion);
                        publisher.setDebugLog(true);
                        publisher.setConnectionTimeout(20000);

                        publisher.setSendFCPublish(true);
                        publisher.setSendReleaseStream(true);
                        publisher.setSendOnMetadata(true);

                        publisher.connect();
                        publishers.put(stream, publisher);
                    }

                    getLogger().info("ModuleTargetAdder: Started publishing stream [" + streamName + "]");
                }
                catch(Exception e)
                {
                    getLogger().error("ModuleTargetAdder: Error while publishing stream [" + streamName + "] with exception:" + e.toString());
                }
            }
        }

        public void onUnPublish(IMediaStream stream, String streamName, boolean isRecord, boolean isAppend)
        {
            try
            {
                synchronized(publishers)
                {
                    PushPublishRTMP publisher = publishers.remove(stream);
                    if (publisher != null){
                        publisher.disconnect();
                        getLogger().info("ModuleTargetAdder: Stopped publishing stream [" + streamName + "]");
                    }
                }
            }
            catch(Exception e)
            {
                getLogger().error("ModuleTargetAdder: Error occurred in stopPublisher with exception:" + e.toString());
            }
        }

        public void onPlay(IMediaStream stream, String streamName, double playStart, double playLen, int playReset)
        {
        }

        public void onPause(IMediaStream stream, boolean isPause, double location)
        {
        }

        public void onSeek(IMediaStream stream, double location)
        {
        }

        public void onStop(IMediaStream stream)
        {
        }

        public void onMetaData(IMediaStream stream, AMFPacket metaDataPacket)
        {
        }

        public void onPauseRaw(IMediaStream stream, boolean isPause, double location)
        {
        }
    }
}