How to monitor MPEG-TS ingestion to process additional data streams (SCTE-35, KLV, etc.)

MPEG-TS live streams can carry multiple streams of data including audio, video, closed captions, ad markers (SCTE-35, SCTE-104), key length value (KLV), and more. Each stream is identified by a packet indentifier (PID). Wowza Streaming Engine includes an API for connecting a custom listener to the MPEG-TS ingestion process to insert PID monitors into the stream, which are called whenever data is available on the given PID.

The code examples in this article show how to add an MPEG-TS ingestion listener API and PID monitor API to your Wowza Streaming Engine™ media server to extract additional stream data and implement a built-in monitor for SCTE-35 markers and KLV data. Listening to other types of MPEG-TS data is also possible using this API.

Note: Wowza Streaming Engine 4.5.0 or later is required.

An MPEG-TS listener is a class that implements the IRTPDePacketizerMPEGTSNotify interface. The easiest way to implement a listener class is to create a class that extends the RTPDePacketizerMPEGTSNotifyBase class and override the methods you'll be using.

The following is a simple listener class example:

package com.wowza.wms.mycompany;

import com.wowza.wms.logging.*;
import com.wowza.wms.rtp.depacketizer.*;
import com.wowza.wms.rtp.model.*;
import com.wowza.wms.transport.mpeg2.*;

public class RTPDePacketizerMPEGTSSimpleExample extends RTPDePacketizerMPEGTSNotifyBase
{
	private static final Class<RTPDePacketizerMPEGTSSimpleExample> CLASS = RTPDePacketizerMPEGTSSimpleExample.class;
	private static final String CLASSNAME = "RTPDePacketizerMPEGTSSimpleExample";

	@Override
	public void onInit(RTPDePacketizerMPEGTS rtDePacketizerMPEGTS, RTPContext rtpContext, RTPDePacketizerItem rtpDePacketizerItem)
	{
		WMSLoggerFactory.getLogger(CLASS).info(CLASSNAME+".onInit: "+rtDePacketizerMPEGTS.getContextStr());
	}

	@Override
	public void onStartup(RTPDePacketizerMPEGTS rtDePacketizerMPEGTS, RTPTrack rtpTrack)
	{
		WMSLoggerFactory.getLogger(CLASS).info(CLASSNAME+".onStartup: "+rtDePacketizerMPEGTS.getContextStr());
	}

	@Override
	public void onShutdown(RTPDePacketizerMPEGTS rtDePacketizerMPEGTS, RTPTrack rtpTrack)
	{
		WMSLoggerFactory.getLogger(CLASS).info(CLASSNAME+".onShutdown: "+rtDePacketizerMPEGTS.getContextStr());
	}

	@Override
	public void onPAT(RTPDePacketizerMPEGTS rtDePacketizerMPEGTS, ProgramAssociationTable newPAT)
	{
		WMSLoggerFactory.getLogger(CLASS).info(CLASSNAME+".onPAT: "+rtDePacketizerMPEGTS.getContextStr());
	}

	@Override
	public void onPMT(RTPDePacketizerMPEGTS rtDePacketizerMPEGTS, ProgramMapTable newPMT)
	{
		WMSLoggerFactory.getLogger(CLASS).info(CLASSNAME+".onPMT: "+rtDePacketizerMPEGTS.getContextStr());
	}
}

To insert a listener into an MPEG-TS stream, open your [install-dir]/conf/[application-name]/Application.xml file in a text editor and add the rtpDePacketizerMPEGTSListenerClass property to the <RTP>/<Properties> section of the file. The property value is a list of class paths, delimited by the pipe (|) character, to the listener class you want to add. For example, add the following code to Application.xml to refer to the class above the rtpDePacketizerMPEGTSListenerClass property:

<Property>
	<Name>rtpDePacketizerMPEGTSListenerClass</Name>
	<Value>com.wowza.wms.mycompany.RTPDePacketizerMPEGTSSimpleExample</Value>
</Property>

The onPMT callback method is invoked whenever a new program map table (PMT) revision is encountered. This is the best place to find the PID you want and insert a PID monitor. The RTPDePacketizerMPEGTS class includes the following API to add and remove PID monitors:

public void putPIDMonitorMap(int PID, IMPEG2UserMonitorPESNotify monitor);
public void putPIDMonitorMap(int PID, IMPEG2UserMonitorSectionNotify monitor);
public void putPIDMonitorMap(int PID, IMPEG2UserMonitorRawNotify monitor);
public PIDMonitorHolder removePIDMonitorMap(int PID);
public void clearPIDMonitorMap();
public Map<Integer, PIDMonitorHolder> getPIDMonitorMap();
public boolean pidMonitorEmpty();
public PIDMonitorHolder getPIDMonitor(int PID);
public boolean containsPIDMonitorMap(int PID);

A PID can only be monitored once. If multiple PID monitors are inserted for a given PID, only the first monitor is used. A PID that's ingested internally for streaming, such as the video or audio stream, can't be monitored.

There are three different types of monitors represented by the following callback interfaces:

  • IMPEG2UserMonitorPESNotify: For MPEG-TS transport stream data that's encapsulated in a Packetized Elementary Stream (PES). This includes data such as key length value (KLV) data or ID3 tag data.
     
  • IMPEG2UserMonitorSectionNotify: For MPEG-TS section table data such as SCTE-35 or SCTE-104 ad markers.
     
  • IMPEG2UserMonitorRawNotify: For MPEG-TS raw PID data. When using this monitor type, you'll receive a callback for each raw 188-byte MPEG-TS packet for the selected PID.

The following code example contains a complete MPEG-TS listener and PID monitor for processing KLV data and inserting the onKLV AMF data event into the stream with the raw KLV data encoded as a Base64 string:

import java.util.*;

import com.wowza.util.*;
import com.wowza.wms.amf.*;
import com.wowza.wms.logging.*;
import com.wowza.wms.rtp.model.*;
import com.wowza.wms.stream.*;
import com.wowza.wms.transport.mpeg2.*;
import com.wowza.wms.transport.mpeg2.MPEG2PESPacket.*;
import com.wowza.wms.transport.mpeg2.ProgramMapTable.*;

public class RTPDePacketizerMPEGTSMonitorKLV extends RTPDePacketizerMPEGTSNotifyBase
{
	private static final Class<RTPDePacketizerMPEGTSMonitorKLV> CLASS = RTPDePacketizerMPEGTSMonitorKLV.class;
	private static final String CLASSNAME = "RTPDePacketizerMPEGTSMonitorKLV";

	private IMediaStream stream = null;
	private RTPDePacketizerMPEGTS rtDePacketizerMPEGTS = null;
	private boolean isTimecodeReady = false;
	private RTPTrack rtpTrack = null;
	private boolean debugLog = false;

	class MPEGTSMonitorKLV implements IMPEG2UserMonitorPESNotify
	{
		@Override
		public void onMonitorStart()
		{
		}

		@Override
		public void onMonitorStop()
		{
		}

		// gets called when there is new section table data
		@Override
		public void onDataPES(int pid, PESHeader header, byte[] buffer, int offset, int len)
		{
			// loop through each KLV access unit
			int pos = 0;
			int index = 0;
			while(true)
			{
				if ((pos+5) > len)
					break;

				int metadataServiceId = (buffer[offset+pos] & 0x0FF); pos += 1;
				int sequenceNumber = (buffer[offset+pos] & 0x0FF); pos += 1;
				int metadataFlags = (buffer[offset+pos] & 0x0FF); pos += 1;
				int cellDataLen = (BufferUtils.byteArrayToInt(buffer, pos, 2) & 0x0FFFF); pos += 2;

				if (cellDataLen > (len-pos))
					cellDataLen = (len-pos);

				if (debugLog)
					WMSLoggerFactory.getLogger(CLASS).info(CLASSNAME+".onDataPES: KLV["+index+"]: metadataServiceId:"+metadataServiceId+" sequenceNumber:"+sequenceNumber+" metadataFlags:0x"+Integer.toHexString(metadataFlags)+" cellDataLen:"+cellDataLen+" timescale:"+rtpTrack.getTimescale());

				if (stream != null && isTimecodeReady)
				{
					try
					{
						// if stream id is 0xFC then PTS is in the header if not use the last video timecode
						RolloverLong ptsTC = null;
						if ((header.streamId & 0x0FF) == 0xFC && header.PTS >= 0)
						{
							ptsTC = rtDePacketizerMPEGTS.getDataTC();
							ptsTC.set(header.PTS);
						}
						else
							ptsTC = rtDePacketizerMPEGTS.getVideoTC();

						String klvDataStr = "";

						if (cellDataLen > 0)
							klvDataStr = com.wowza.util.Base64.encodeBytes(buffer, pos, cellDataLen, com.wowza.util.Base64.DONT_BREAK_LINES);

						AMFDataObj amfData = new AMFDataObj();

						amfData.put("streamId", new AMFDataItem(header.streamId));
						amfData.put("metadataServiceId", new AMFDataItem(metadataServiceId));
						amfData.put("sequenceNumber", new AMFDataItem(sequenceNumber));
						amfData.put("metadataFlags", new AMFDataItem(metadataFlags));
						amfData.put("timecode", new AMFDataItem(((ptsTC.get()*1000)/rtpTrack.getTimescale())));
						amfData.put("data", new AMFDataItem(klvDataStr));
						amfData.put("dataLen", new AMFDataItem(cellDataLen));

						stream.sendDirect("onKLV", amfData);
					}
					catch(Exception e)
					{
						WMSLoggerFactory.getLogger(CLASS).error(CLASSNAME+"onDataPES: ", e);
					}
				}

				pos += cellDataLen;
				index++;
			}
		}
	}

	@Override
	public void onInit(RTPDePacketizerMPEGTS rtDePacketizerMPEGTS, RTPContext rtpContext, RTPDePacketizerItem rtpDePacketizerItem)
	{
		this.debugLog = rtDePacketizerMPEGTS.getProperties().getPropertyBoolean("rtpDePacketizerMPEGTSMonitorKLVDebugLog", this.debugLog);
	}

	@Override
	public void onStartup(RTPDePacketizerMPEGTS rtDePacketizerMPEGTS, RTPTrack rtpTrack)
	{
		WMSLoggerFactory.getLogger(CLASS).info(CLASSNAME+".onStartup");

		this.rtDePacketizerMPEGTS = rtDePacketizerMPEGTS;
		this.rtpTrack = rtpTrack;

		RTPStream rtpStream = rtpTrack.getRTPStream();
		if (rtpStream != null)
			this.stream = rtpStream.getStream();
	}

	@Override
	public void onTimecodeReady(RTPDePacketizerMPEGTS rtDePacketizerMPEGTS)
	{
		WMSLoggerFactory.getLogger(CLASS).info(CLASSNAME+".onTimecodeReady");

		this.isTimecodeReady = true;
	}

	@Override
	public void onPMT(RTPDePacketizerMPEGTS rtDePacketizerMPEGTS, ProgramMapTable newPMT)
	{
		WMSLoggerFactory.getLogger(CLASS).info(CLASSNAME+".onPMT");

		// loop through the streams defined in the program map table (PMT) and look for PIDs with the KLV descriptor
		for (StreamInfo s : newPMT.streams.values())
		{
			boolean addMonitor = false;

			// look for metadata descriptor
			ArrayList<Descriptor> descriptors = null;
			descriptors = s.descriptors.get(Descriptor.DESCRIPTOR_TAG_METADATA);
			if (descriptors != null)
			{
				for(Descriptor descriptor : descriptors)
				{
					if (((MetadataDescriptor)descriptor).metadataFormatIdentifier == RegistrationDescriptor.REG_IDENTIFICATION_KLV)
						addMonitor = true;
				}
			}

			// see if registration descriptor is KLV
			descriptors = s.descriptors.get(Descriptor.DESCRIPTOR_TAG_REGISTRATION);
			if (descriptors != null)
			{
				for(Descriptor descriptor : descriptors)
				{
					if (((RegistrationDescriptor)descriptor).formatIdentifier == RegistrationDescriptor.REG_IDENTIFICATION_KLV)
						addMonitor = true;
				}
			}

			// add a PID section monitor
			if (addMonitor)
			{
				WMSLoggerFactory.getLogger(CLASS).info(CLASSNAME+".onPMT: Hit KLV PID: 0x"+Integer.toHexString(s.PID));

				if (!rtDePacketizerMPEGTS.containsPIDMonitorMap(s.PID))
					rtDePacketizerMPEGTS.putPIDMonitorMap(s.PID, new MPEGTSMonitorKLV());
			}
		}
	}
}

The above complete MPEG-TS listener is included with the Wowza media server. To use the built-in class, add it to your application by adding the following properties to the <RTP>/<Properties> container in Application.xml:

<Property>
	<Name>rtpDePacketizerMPEGTSListenerClass</Name>
	<Value>com.wowza.wms.rtp.depacketizer.RTPDePacketizerMPEGTSMonitorKLV</Value>
</Property>
<Property>
	<Name>rtpDePacketizerMPEGTSMonitorKLVDebugLog</Name>
	<Value>true</Value>
	<Type>Boolean</Type>
</Property>

The following code example also contains a complete MPEG-TS listener and PID monitor for processing ad markers and inserting the onCUE AMF data event into the stream with the ad marker data:

import java.util.*;

import com.wowza.wms.amf.*;
import com.wowza.wms.logging.*;
import com.wowza.wms.rtp.model.*;
import com.wowza.wms.stream.*;
import com.wowza.wms.transport.mpeg2.*;
import com.wowza.wms.transport.mpeg2.ProgramMapTable.*;
import com.wowza.wms.transport.mpeg2.section.cue.*;

public class RTPDePacketizerMPEGTSMonitorCUE extends RTPDePacketizerMPEGTSNotifyBase
{
	private static final Class<RTPDePacketizerMPEGTSMonitorCUE> CLASS = RTPDePacketizerMPEGTSMonitorCUE.class;
	private static final String CLASSNAME = "RTPDePacketizerMPEGTSMonitorCUE";

	private IMediaStream stream = null;
	private RTPDePacketizerMPEGTS rtDePacketizerMPEGTS = null;
	private boolean isTimecodeReady = false;
	private RTPTrack rtpTrack = null;
	private boolean debugLog = false;

	class MPEGTSMonitorCUE implements IMPEG2UserMonitorSectionNotify
	{
		@Override
		public void onMonitorStart()
		{
		}

		@Override
		public void onMonitorStop()
		{
		}

		@Override
		public void onDataSection(int pid, AdaptationField field, MPEG2Section section)
		{
			if (section.getTableID() == SpliceInformationTable.SIT_TABLE_ID)
			{
				try
				{
					SpliceInformationTable spliceInformationTable = new SpliceInformationTable(section);
					if (spliceInformationTable != null)
					{
						if (debugLog)
							WMSLoggerFactory.getLogger(CLASS).info(CLASSNAME + ".onDataSection: " + spliceInformationTable.toString());

						SpliceInformationTableSerializeAMFContext serializeContext = new SpliceInformationTableSerializeAMFContext();

						serializeContext.timeReference = rtDePacketizerMPEGTS.getVideoTC();
						serializeContext.rtpTrack = rtpTrack;

						AMFDataObj amfData = spliceInformationTable.serializeAMF(serializeContext);
						if (amfData != null)
							stream.sendDirect("onCUE", amfData);
					}
				}
				catch(Exception e)
				{
					WMSLoggerFactory.getLogger(CLASS).error(CLASSNAME+".onDataSection: ", e);
				}
			}
		}
	}

	@Override
	public void onInit(RTPDePacketizerMPEGTS rtDePacketizerMPEGTS, RTPContext rtpContext, RTPDePacketizerItem rtpDePacketizerItem)
	{
		this.debugLog = rtDePacketizerMPEGTS.getProperties().getPropertyBoolean("rtpDePacketizerMPEGTSMonitorKLVDebugLog", this.debugLog);
	}

	@Override
	public void onStartup(RTPDePacketizerMPEGTS rtDePacketizerMPEGTS, RTPTrack rtpTrack)
	{
		WMSLoggerFactory.getLogger(CLASS).info(CLASSNAME+".onStartup");

		this.rtDePacketizerMPEGTS = rtDePacketizerMPEGTS;
		this.rtpTrack = rtpTrack;

		RTPStream rtpStream = rtpTrack.getRTPStream();
		if (rtpStream != null)
			this.stream = rtpStream.getStream();
	}

	@Override
	public void onTimecodeReady(RTPDePacketizerMPEGTS rtDePacketizerMPEGTS)
	{
		WMSLoggerFactory.getLogger(CLASS).info(CLASSNAME+".onTimecodeReady");

		this.isTimecodeReady = true;
	}

	@Override
	public void onPMT(RTPDePacketizerMPEGTS rtDePacketizerMPEGTS, ProgramMapTable newPMT)
	{
		WMSLoggerFactory.getLogger(CLASS).info(CLASSNAME+".onPMT");

		boolean SCTE35RegDescFound = false;

		ArrayList<Descriptor> regDescriptors = newPMT.programDescriptors.get(Descriptor.DESCRIPTOR_TAG_REGISTRATION);

		if (regDescriptors != null && regDescriptors.size() > 0)
		{
			for (Descriptor desc : regDescriptors)
			{
				SCTE35RegDescFound |= ((RegistrationDescriptor)desc).formatIdentifier == RegistrationDescriptor.REG_IDENTIFICATION_SCTE_SPLICE_FORMAT;
			}
		}

		for (StreamInfo s : newPMT.streams.values())
		{
			if (SCTE35RegDescFound)
			{
				ArrayList<Descriptor> descriptors = null;

				if (descriptors == null)
					descriptors = s.descriptors.get(Descriptor.DESCRIPTOR_TAG_CUE_IDENTIFIER);

				if (descriptors == null)
					descriptors = s.descriptors.get(Descriptor.DESCRIPTOR_TAG_STREAM_IDENTIFIER);

				if (descriptors != null)
				{
					WMSLoggerFactory.getLogger(CLASS).info(CLASSNAME+".onPMT: Hit cue point PID: 0x"+Integer.toHexString(s.PID));

					if (!rtDePacketizerMPEGTS.containsPIDMonitorMap(s.PID))
					{
						rtDePacketizerMPEGTS.putPIDMonitorMap(s.PID, new MPEGTSMonitorCUE());
					}
				}
			}
		}
	}
}

The above MPEG-TS listener class is included as part of the Wowza media server. To use the MPEG-TS listener, add it to your application by adding the following properties to the <RTP>/<Properties> container in Application.xml:

<Property>
	<Name>rtpDePacketizerMPEGTSListenerClass</Name>
	<Value>com.wowza.wms.rtp.depacketizer.RTPDePacketizerMPEGTSMonitorCUE</Value>
</Property>
<Property>
	<Name>rtpDePacketizerMPEGTSMonitorCUEDebugLog</Name>
	<Value>true</Value>
	<Type>Boolean</Type>
</Property>

To learn how to use the Wowza Streaming Engine generic Stream Target API to prepare Apple HLS streams for ad-insertion based on SCTE-35 events that are present in live MPEG-TS source streams, see How to use generic Stream Target API to prepare Apple HLS streams for ad insertion (SCTE-35).


Originally Published: For Wowza Streaming Engine 4.5.0 on 06-23-2016.

If you're having problems or want to discuss this article, post in our forum.