I am currently trying to build a module to improve accounting of the streams made by Wowza.
Software versions used to build this module:
Wowza version: 4
JDK: 1.8
Rationale:
I want to build a module that:
-
Counts how much traffic users have done in a range of seconds e.g 10 (disconnect/stop should count as well)
-
Available for RTMP, RTP(RTSP) and HTTP (HLS/HDS/DASH/Smooth)
Problems:
I would like to know what is a proper way to count all those protocol in the same module.
What I have so far:
package com.mycompany.wms.module; import java.util.Timer; import java.util.TimerTask; import com.wowza.util.IOPerformanceCounter; import com.wowza.wms.application.*; import com.wowza.wms.amf.*; import com.wowza.wms.client.*; import com.wowza.wms.httpstreamer.model.IHTTPStreamerSession; import com.wowza.wms.media.model.MediaCodecInfoAudio; import com.wowza.wms.media.model.MediaCodecInfoVideo; import com.wowza.wms.module.*; import com.wowza.wms.request.*; import com.wowza.wms.stream.IMediaStream; import com.wowza.wms.stream.IMediaStreamActionNotify3; public class AccountingModule extends ModuleBase { IApplicationInstance appInstance; public void onAppStart(IApplicationInstance appInstance) { this.appInstance = appInstance; String fullname = appInstance.getApplication().getName() + "/" + appInstance.getName(); getLogger().info("onAppStart: " + fullname); } public void onAppStop(IApplicationInstance appInstance) { String fullname = appInstance.getApplication().getName() + "/" + appInstance.getName(); getLogger().info("onAppStop: " + fullname); } public void onConnect(IClient client, RequestFunction function, AMFDataList params) { getLogger().info("onConnect: " + client.getClientId()); } public void onConnectAccept(IClient client) { getLogger().info("onConnectAccept: " + client.getClientId()); } public void onConnectReject(IClient client) { getLogger().info("onConnectReject: " + client.getClientId()); } public void onDisconnect(IClient client) { getLogger().info("onDisconnect: " + client.getClientId()); } class StreamListener implements IMediaStreamActionNotify3 { @Override public void onMetaData(IMediaStream stream, AMFPacket metaDataPacket) { // TODO Auto-generated method stub } @Override public void onPauseRaw(IMediaStream stream, boolean isPause, double location) { // TODO Auto-generated method stub } @Override public void onPause(IMediaStream stream, boolean isPause, double location) { // TODO Auto-generated method stub } @Override public void onPlay(IMediaStream stream, String streamName, double playStart, double playLen, int playReset) { getLogger().info( "onPlay " + stream.getContextStr() + " playStart: " + playStart + " playLen: " + playLen + " playReset: "+ playReset); StreamStats watchdog = new StreamStats(); IApplicationInstance appInstance; try { try { appInstance = stream.getClient().getAppInstance(); } catch (Exception ex) { appInstance = stream.getRTPStream().getAppInstance(); } watchdog.start(); appInstance.getProperties().setProperty(streamName, watchdog); } catch (Exception ex) { } } @Override public void onPublish(IMediaStream stream, String streamName, boolean isRecord, boolean isAppend) { // TODO Auto-generated method stub } @Override public void onSeek(IMediaStream stream, double location) { // TODO Auto-generated method stub } @Override public void onStop(IMediaStream stream) { getLogger().info("onStop By: " + stream.getClientId()); String streamName = stream.getName(); StreamStats watchdog = (StreamStats) stream.getClient() .getAppInstance().getProperties().getProperty(streamName); if (watchdog != null) watchdog.stop(); } @Override public void onUnPublish(IMediaStream stream, String streamName, boolean isRecord, boolean isAppend) { // TODO Auto-generated method stub } @Override public void onCodecInfoAudio(IMediaStream stream, MediaCodecInfoAudio codecInfoAudio) { // TODO Auto-generated method stub } @Override public void onCodecInfoVideo(IMediaStream stream, MediaCodecInfoVideo codecInfoVideo) { // TODO Auto-generated method stub } } @SuppressWarnings("unchecked") public void onStreamCreate(IMediaStream stream) { getLogger().info("onStreamCreate: " + stream); IMediaStreamActionNotify3 actionNotify = new StreamListener(); WMSProperties props = stream.getProperties(); synchronized (props) { props.put("streamActionNotifier", actionNotify); } stream.addClientListener(actionNotify); } public void onStreamDestroy(IMediaStream stream) { getLogger().info("onStreamDestroy: " + stream); IMediaStreamActionNotify3 actionNotify = null; WMSProperties props = stream.getProperties(); synchronized (props) { actionNotify = (IMediaStreamActionNotify3) stream.getProperties() .get("streamActionNotifier"); } if (actionNotify != null) { stream.removeClientListener(actionNotify); getLogger().info("remoteClientListener: " + stream.getSrc()); } } public void onHTTPSessionCreate(IHTTPStreamerSession httpSession) { String streamName = httpSession.getStreamName(); getLogger().info("Stream Name: " + streamName); StreamStats watchdog = new StreamStats(); try { watchdog.session = httpSession; watchdog.start(); this.appInstance.getProperties().setProperty( streamName + httpSession.getSessionId(), watchdog); } catch (Exception ex) { } } private class StreamStats { public Timer mTimer; public TimerTask mTask; public IHTTPStreamerSession session; public StreamStats() { mTask = new TimerTask() { public void run() { getLogger().info("Run StreamStats: " + session.getStreamName()); if (session != null) { IOPerformanceCounter perf = session .getIOPerformanceCounter(); getLogger().info( "Bytes loaded till now: " + perf.getMessagesOutBytes()); } } }; } public void start() { if (mTimer == null) mTimer = new Timer(); mTimer.schedule(mTask, 10000, 10000); getLogger().info("Start StreamStats"); } public void stop() { if (mTimer != null) { mTimer.cancel(); mTimer = null; getLogger().info("Stop StreamStats"); } } } }