/*
* This file is part of Flowplayer, http://flowplayer.org
*
* By: Daniel Rossi, <electroteque@gmail.com>, Anssi Piirainen Flowplayer Oy
* Copyright (c) 2008-2011 Flowplayer Oy *
* Released under the MIT License:
* http://www.opensource.org/licenses/mit-license.php
*/packageorg.flowplayer.cluster{importflash.events.IOErrorEvent;importflash.events.NetStatusEvent;importflash.net.NetConnection;importflash.net.NetStream;importflash.net.Responder;importflash.utils.*;importorg.flowplayer.controller.ClipURLResolver;importorg.flowplayer.controller.ConnectionProvider;importorg.flowplayer.controller.NetConnectionClient;importorg.flowplayer.controller.NetStreamClient;importorg.flowplayer.controller.NetStreamControllingStreamProvider;importorg.flowplayer.controller.StreamProvider;importorg.flowplayer.model.Clip;importorg.flowplayer.model.ClipError;importorg.flowplayer.model.Plugin;importorg.flowplayer.model.PluginEventType;importorg.flowplayer.model.PluginModel;importorg.flowplayer.util.Log;importorg.flowplayer.util.PropertyBinder;importorg.flowplayer.util.URLUtil;importorg.flowplayer.view.Flowplayer;/**
* A RTMP stream provider with fallback and clustering support. Supports following:
* <ul>
* <li>Starting in the middle of the clip's timeline using the clip.start property.</li>
* <li>Stopping before the clip file ends using the clip.duration property.</li>
* <li>Ability to combine a group of clips into one gapless stream.</li>
* <li>Ability to fallback to a list of servers in a cluster server farm.</li>
* <li>Ability to recognise, store and leave out any failed servers for a given time.</li>
* <li>Ability to randomly connect to a server in the servers list mimicking a round robin connection.</li>
* <li>Works with a traditional load balancing appliance by feeding its host at the top of the list, and direct connections to the servers happen on fallback.</li>
* </ul>
* <p>
* Stream group is configured in a clip like this:
* <code>
* { streams: [ { url: 'metacafe', duration: 20 }, { url: 'honda_accord', start: 10, duration: 20 } ] }
* </code>
* The group is played back seamlessly as one gapless stream. The individual streams in a group can
* be cut out from a larger file using the 'start' and 'duration' properties as shown in the example above.
*
* <p>
* To enable server fallback a hosts config property is required in the plugins config like this:
*
* hosts: [
* 'rtmp://server1.host.com/myapp',
* 'rtmp://server2.host.com/myapp',
* 'rtmp://server3.host.com/myapp',
* ]
*
* <p>
* To enable the fallback feature to store (client side) failed servers to prevent reattempting those connections the failureExpiry config property is required like so:
* failureExpiry: 3000,
*
* <p> This tells the feature to wait for 3000 milliseconds before allowing connection attempts again.
*
* <p>
* To enable round robin connections the loadBalanceServers config property requires to be enabled like so:
*
* loadBalanceServers: true
*
* <p>
* Advanced configurations for the fallback feature can be enabled like so:
*
* connectTimeout: 5000,
* connectCount: 3
* encoding: 0
*
* <p> connectTimeout is the time in milliseconds before each reconnection attempt.
* connectCount is the ammount of times connection reattmps will occur before giving up.
* encoding is the AMF encoding version either 0 or 3 for AMF3.
*
* <p> Two custom events a fired during connection attempts and fallback, these are:
*
* <ul>
* <li>RTMPEventType.RECONNECTED - onReconnect</li>
* <li>RTMPEventType.FAILED - onFailed</li>
* </ul>
*
* @author danielr
*/publicclassClusterConnectionProviderimplementsConnectionProvider,ClipURLResolver,Plugin{privatevar_config:ClusterConfig;privatevarlog:Log=newLog(this);protectedvar_rtmpCluster:RTMPCluster;privatevar_connection:NetConnection;privatevar_netStream:NetStream;privatevar_provider:StreamProvider;privatevar_successListener:Function;privatevar_failureListener:Function;privatevar_connectionClient:Object;privatevar_objectEncoding:uint;privatevar_clip:Clip;privatevar_connectionArgs:Array;privatevar_isComplete:Boolean=false;privatevar_model:PluginModel;privatevar_resolving:Boolean;privatevar_player:Flowplayer;privatevar_onConnectionStatusCallback:Function;publicfunctiononConfig(model:PluginModel):void{log.debug("onConfig");_model=model;_config=newPropertyBinder(newClusterConfig(),null).copyProperties(model.config)asClusterConfig;_rtmpCluster=newRTMPCluster(_config);_rtmpCluster.onFailed(onFailed);}publicfunctiongetDefaultConfig():Object{returnnull;}publicfunctionconnect(provider:StreamProvider,clip:Clip,successListener:Function,objectEncoding:uint,connectionArgs:Array):void{log.debug("connect()");_resolving=false;_objectEncoding=objectEncoding;_clip=clip;_connectionArgs=connectionArgs;_successListener=successListener;_provider=providerasNetStreamControllingStreamProvider;_connection=newNetConnection();_connection.proxyType="best";_connection.objectEncoding=objectEncoding;_connection.client=_connectionClient||newNetConnectionClient();log.debug("connect() using connection client "+_connection.client);_connection.addEventListener(NetStatusEvent.NET_STATUS,connectionProviderConnectionStatus);_connection.addEventListener(IOErrorEvent.IO_ERROR,_netIOError);varhost:String=getNextNetConnectionUrl(clip);log.debug("connecting to "+host);if(connectionArgs.length>0){_connection.connect(host,connectionArgs);}elseif(_config.connectionArgs){_connection.connect(host,_config.connectionArgs);}else{_connection.connect(host);}_model.dispatch(PluginEventType.PLUGIN_EVENT,"onConnect",_rtmpCluster.currentHost,_rtmpCluster.currentHostIndex);if(isRtmpUrl(host)){_rtmpCluster.onReconnected(onRTMPReconnect);_rtmpCluster.start();}}privatefunction_netIOError(event:IOErrorEvent):void{log.error(event.text);}protectedfunctiongetNextNetConnectionUrl(clip:Clip):String{varhost:String=_rtmpCluster.nextHost;if(isRtmpUrl(host))returnhost;returnnull;}privatefunctionconnectionProviderConnectionStatus(event:NetStatusEvent):void{if(_onConnectionStatusCallback!=null){_onConnectionStatusCallback(event,_connection);}if(event.info.code=="NetConnection.Connect.Success"&&_successListener!=null){log.debug("_onConnectionStatus, NetConnection.Connect.Success, calling clusterComplete()");clusterComplete();}elseif(["NetConnection.Connect.Failed","NetConnection.Connect.Rejected","NetConnection.Connect.AppShutdown","NetConnection.Connect.InvalidApp"].indexOf(event.info.code)>=0){log.info("Couldnt connect to "+_connection.uri);}}publicfunctionsetconnectionClient(client:Object):void{if(_connection){_connection.client=client;}_connectionClient=client;}publicfunctionsetonFailure(listener:Function):void{_failureListener=listener;}protectedfunctiongetconnection():NetConnection{return_connection;}publicfunctionhandeNetStatusEvent(event:NetStatusEvent):Boolean{returntrue;}protectedfunctiongetprovider():StreamProvider{return_provider;}protectedfunctiongetfailureListener():Function{return_failureListener;}protectedfunctiongetsuccessListener():Function{return_successListener;}/**
* Fallback feature method called by the reconnection attempt timer
*/protectedfunctiononRTMPReconnect():void{_model.dispatch(PluginEventType.PLUGIN_EVENT,"onConnectFailed",_rtmpCluster.currentHost,_rtmpCluster.currentHostIndex);_rtmpCluster.setFailedServer(_rtmpCluster.currentHost);_connection.close();_rtmpCluster.stop();//#427 run host checks here to check for null hosts on rtmp.
if (!_rtmpCluster.hasMoreHosts()) {
_rtmpCluster.stop();
onFailed();
return;
}
connect(_provider, _clip, _successListener, _objectEncoding, _connectionArgs);
log.info("RTMP Connection Failed Attempting Reconnection");
}
protected function onHTTPReconnect():void
{
_model.dispatch(PluginEventType.PLUGIN_EVENT, "onConnectFailed", _rtmpCluster.currentHost, _rtmpCluster.currentHostIndex);
_rtmpCluster.setFailedServer(_clip.getResolvedUrl(this));
_rtmpCluster.stop();
log.info("HTTP Connection Attempting Reconnection");
resolveURL(true);
}
protected function onFailed():void
{
log.info("Connections failed");
//#601 dispatch resolver failure correctly.
_failureListener(_clip.completeUrl);
_model.dispatch(PluginEventType.PLUGIN_EVENT, "onFailed");
}
protected function resolveURL(useNextHost:Boolean):void
{
//fix for #377, run host checks and increment indexes here to trigger reconnections of the next host or else recurssion occurs or last host is null.
if (!_rtmpCluster.hasMoreHosts()) {
_rtmpCluster.stop();
onFailed();
return;
}
log.debug("resolveURL, useNextHost " + useNextHost);
// store the resolvedUrl already now, to make sure the resolved URL is there when onStart()
// is dispatched by the provider
var currentUrl:String = _clip.getPreviousResolvedUrl(this);
var nextHost:String = useNextHost ? _rtmpCluster.nextHost : _rtmpCluster.currentHost;
var url:String = URLUtil.completeURL(nextHost, URLUtil.baseUrlAndRest(currentUrl)[1]);
_clip.setResolvedUrl(this, url);
_model.dispatch(PluginEventType.PLUGIN_EVENT, "onConnect", _rtmpCluster.currentHost, _rtmpCluster.currentHostIndex);
//#15 add cache busting to the file being checked for availability to play back correctly once resolving has completed.
_netStream.play(_clip.getResolvedUrl(this) + "?" + Math.random());
_rtmpCluster.start();
}
private function _onNetStatus(event:NetStatusEvent):void {
if (event.info.code == "NetStream.Play.Start") {
log.debug("_onNetStatus: NetStream.Play.Start, calling clusterComplete()");
clusterComplete();
} else if (event.info.code == "NetStream.Play.StreamNotFound" ||
event.info.code == "NetConnection.Connect.Rejected" ||
event.info.code == "NetConnection.Connect.Failed") {
onHTTPReconnect();
}
}
protected function clusterComplete():void
{
_isComplete = true;
if (_netStream) {
_netStream.close();
}
_rtmpCluster.stop();
if (_resolving) {
log.debug("clusterComplete(), resolving? " + _resolving);
var currentUrl:String = _clip.getPreviousResolvedUrl(this);
_clip.setResolvedUrl(this, URLUtil.completeURL(_rtmpCluster.currentHost, URLUtil.baseUrlAndRest(currentUrl)[1]));
_successListener(_clip);
} else {
getServerDuration();
log.debug("calling success listener");
_successListener(_connection);
}
}
private function lookupRtmpPlugin(providers:Dictionary):PluginModel {
for each (var obj:Object in providers) {
var model:PluginModel = obj as PluginModel;
log.debug(model.name);
if (model.name == "rtmp") {
return model;
}
if (["http", "httpInstream"].indexOf(model.name) < 0 && model.pluginObject is StreamProvider) {
return model;
}
}
return null;
}
private function getServerDuration():void {
log.debug("getServerDuration()");
var model:PluginModel = lookupRtmpPlugin(_player.pluginRegistry.providers);
if (! model) return;
log.debug("found RTMP plugin " + model + ", looking up durationFunc");
if (model.config && model.config.durationFunc) {
log.debug("getServerDuration(), calling durationFunc '" + model.config.durationFunc + "'");
_connection.call(model.config.durationFunc, new Responder(onDurationResult), _clip.url);
}
}
private function onDurationResult(info:Object):void {
log.debug("onDurationResult()");
_clip.duration = info as Number;
}
public function resolve(provider:StreamProvider, clip:Clip, successListener:Function):void {
log.debug("resolve()");
_resolving = true;
_clip = clip;
_successListener = successListener;
_provider = provider;
if (_provider.netStream) {
_provider.netStream.close();
}
_connection = new NetConnection();
_connection.addEventListener(NetStatusEvent.NET_STATUS, _onConnectionStatus);
_connection.connect(getNextNetConnectionUrl(_clip));
}
private function _onConnectionStatus(event:NetStatusEvent):void {
log.debug("onConnectionStatus: " + event.info.code);
if (event.info.code == "NetConnection.Connect.Success") {
doResolve(false);
} else if (["NetConnection.Connect.Failed", "NetConnection.Connect.Rejected", "NetConnection.Connect.AppShutdown", "NetConnection.Connect.InvalidApp"].indexOf(event.info.code) >= 0) {
_failureListener("Failed to connect " + event.info.code);
}
}
private function doResolve(useNextHost:Boolean):void {
_netStream = new NetStream(_connection);
_netStream.client = new NetStreamClient(_clip, _player.config, _provider.streamCallbacks);
_netStream.addEventListener(NetStatusEvent.NET_STATUS, _onNetStatus);
_rtmpCluster.onReconnected(onHTTPReconnect);
_rtmpCluster.start();
resolveURL(useNextHost);
}
public static function isRtmpUrl(url:String):Boolean {
if (! url) return false;
return url.toLowerCase().indexOf("rtmp") == 0;
}
[External]
public function set loadBalancing(enabled:Boolean):void {
log.debug("setting loadBalanceServers to " + enabled);
_config.loadBalance = enabled;
}
public function onLoad(player:Flowplayer):void {
_player = player;
_model.dispatchOnLoad();
}
public function set onConnectionStatus(value:Function):void {
_onConnectionStatusCallback = value;
}
}
}