/*
 * Copyright (c) 2000, 2009, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */
package com.sun.corba.se.impl.encoding;

import java.nio.ByteBuffer;
import com.sun.corba.se.pept.transport.ByteBufferPool;
import com.sun.corba.se.spi.logging.CORBALogDomains;
import com.sun.corba.se.spi.orb.ORB;
import com.sun.corba.se.impl.logging.ORBUtilSystemException;
import com.sun.corba.se.impl.orbutil.ORBUtility;
import com.sun.corba.se.impl.protocol.RequestCanceledException;
import com.sun.corba.se.impl.protocol.giopmsgheaders.FragmentMessage;
import com.sun.corba.se.impl.protocol.giopmsgheaders.Message;
import java.util.*;

public class BufferManagerReadStream
    implements BufferManagerRead, MarkAndResetHandler
{
    private boolean receivedCancel = false;
    private int cancelReqId = 0;

    // We should convert endOfStream to a final static dummy end node
    private boolean endOfStream = true;
    private BufferQueue fragmentQueue = new BufferQueue();
    private long FRAGMENT_TIMEOUT = 60000;

    // REVISIT - This should go in BufferManagerRead. But, since
    //           BufferManagerRead is an interface. BufferManagerRead
    //           might ought to be an abstract class instead of an
    //           interface.
    private ORB orb ;
    private ORBUtilSystemException wrapper ;
    private boolean debug = false;

    BufferManagerReadStream( ORB orb )
    {
        this.orb = orb ;
        this.wrapper = ORBUtilSystemException.get( orb,
            CORBALogDomains.RPC_ENCODING ) ;
        debug = orb.transportDebugFlag;
    }

    public void cancelProcessing(int requestId) {
        synchronized(fragmentQueue) {
            receivedCancel = true;
            cancelReqId = requestId;
            fragmentQueue.notify();
        }
    }

    public void processFragment(ByteBuffer byteBuffer, FragmentMessage msg)
    {
        ByteBufferWithInfo bbwi =
            new ByteBufferWithInfo(orb, byteBuffer, msg.getHeaderLength());

        synchronized (fragmentQueue) {
            if (debug)
            {
                // print address of ByteBuffer being queued
                int bbAddress = System.identityHashCode(byteBuffer);
                StringBuffer sb = new StringBuffer(80);
                sb.append("processFragment() - queueing ByteBuffer id (");
                sb.append(bbAddress).append(") to fragment queue.");
                String strMsg = sb.toString();
                dprint(strMsg);
            }
            fragmentQueue.enqueue(bbwi);
            endOfStream = !msg.moreFragmentsToFollow();
            fragmentQueue.notify();
        }
    }

    public ByteBufferWithInfo underflow (ByteBufferWithInfo bbwi)
    {

      ByteBufferWithInfo result = null;

      try {
          //System.out.println("ENTER underflow");

        synchronized (fragmentQueue) {

            if (receivedCancel) {
                throw new RequestCanceledException(cancelReqId);
            }

            while (fragmentQueue.size() == 0) {

                if (endOfStream) {
                    throw wrapper.endOfStream() ;
                }

                boolean interrupted = false;
                try {
                    fragmentQueue.wait(FRAGMENT_TIMEOUT);
                } catch (InterruptedException e) {
                    interrupted = true;
                }

                if (!interrupted && fragmentQueue.size() == 0) {
                    throw wrapper.bufferReadManagerTimeout();
                }

                if (receivedCancel) {
                    throw new RequestCanceledException(cancelReqId);
                }
            }

            result = fragmentQueue.dequeue();
            result.fragmented = true;

            if (debug)
            {
                // print address of ByteBuffer being dequeued
                int bbAddr = System.identityHashCode(result.byteBuffer);
                StringBuffer sb1 = new StringBuffer(80);
                sb1.append("underflow() - dequeued ByteBuffer id (");
                sb1.append(bbAddr).append(") from fragment queue.");
                String msg1 = sb1.toString();
                dprint(msg1);
            }

            // VERY IMPORTANT
            // Release bbwi.byteBuffer to the ByteBufferPool only if
            // this BufferManagerStream is not marked for potential restore.
            if (markEngaged == false && bbwi != null && bbwi.byteBuffer != null)
            {
                ByteBufferPool byteBufferPool = getByteBufferPool();

                if (debug)
                {
                    // print address of ByteBuffer being released
                    int bbAddress = System.identityHashCode(bbwi.byteBuffer);
                    StringBuffer sb = new StringBuffer(80);
                    sb.append("underflow() - releasing ByteBuffer id (");
                    sb.append(bbAddress).append(") to ByteBufferPool.");
                    String msg = sb.toString();
                    dprint(msg);
                }

                byteBufferPool.releaseByteBuffer(bbwi.byteBuffer);
                bbwi.byteBuffer = null;
                bbwi = null;
            }
        }
        return result;
      } finally {
          //System.out.println("EXIT underflow");
      }
    }

    public void init(Message msg) {
        if (msg != null)
            endOfStream = !msg.moreFragmentsToFollow();
    }

    // Release any queued ByteBufferWithInfo's byteBuffers to the
    // ByteBufferPoool
    public void close(ByteBufferWithInfo bbwi)
    {
        int inputBbAddress = 0;

        // release ByteBuffers on fragmentQueue
        if (fragmentQueue != null)
        {
            synchronized (fragmentQueue)
            {
                // IMPORTANT: The fragment queue may have one ByteBuffer
                //            on it that's also on the CDRInputStream if
                //            this method is called when the stream is 'marked'.
                //            Thus, we'll compare the ByteBuffer passed
                //            in (from a CDRInputStream) with all ByteBuffers
                //            on the stack. If one is found to equal, it will
                //            not be released to the ByteBufferPool.
                if (bbwi != null)
                {
                    inputBbAddress = System.identityHashCode(bbwi.byteBuffer);
                }

                ByteBufferWithInfo abbwi = null;
                ByteBufferPool byteBufferPool = getByteBufferPool();
                while (fragmentQueue.size() != 0)
                {
                    abbwi = fragmentQueue.dequeue();
                    if (abbwi != null && abbwi.byteBuffer != null)
                    {
                        int bbAddress = System.identityHashCode(abbwi.byteBuffer);
                        if (inputBbAddress != bbAddress)
                        {
                            if (debug)
                            {
                                 // print address of ByteBuffer released
                                 StringBuffer sb = new StringBuffer(80);
                                 sb.append("close() - fragmentQueue is ")
                                   .append("releasing ByteBuffer id (")
                                   .append(bbAddress).append(") to ")
                                   .append("ByteBufferPool.");
                                 String msg = sb.toString();
                                 dprint(msg);
                            }
                        }
                        byteBufferPool.releaseByteBuffer(abbwi.byteBuffer);
                    }
                }
            }
            fragmentQueue = null;
        }

        // release ByteBuffers on fragmentStack
        if (fragmentStack != null && fragmentStack.size() != 0)
        {
            // IMPORTANT: The fragment stack may have one ByteBuffer
            //            on it that's also on the CDRInputStream if
            //            this method is called when the stream is 'marked'.
            //            Thus, we'll compare the ByteBuffer passed
            //            in (from a CDRInputStream) with all ByteBuffers
            //            on the stack. If one is found to equal, it will
            //            not be released to the ByteBufferPool.
            if (bbwi != null)
            {
                inputBbAddress = System.identityHashCode(bbwi.byteBuffer);
            }

            ByteBufferWithInfo abbwi = null;
            ByteBufferPool byteBufferPool = getByteBufferPool();
            ListIterator itr = fragmentStack.listIterator();
            while (itr.hasNext())
            {
                abbwi = (ByteBufferWithInfo)itr.next();

                if (abbwi != null && abbwi.byteBuffer != null)
                {
                   int bbAddress = System.identityHashCode(abbwi.byteBuffer);
                   if (inputBbAddress != bbAddress)
                   {
                       if (debug)
                       {
                            // print address of ByteBuffer being released
                            StringBuffer sb = new StringBuffer(80);
                            sb.append("close() - fragmentStack - releasing ")
                              .append("ByteBuffer id (" + bbAddress + ") to ")
                              .append("ByteBufferPool.");
                            String msg = sb.toString();
                            dprint(msg);
                       }
                       byteBufferPool.releaseByteBuffer(abbwi.byteBuffer);
                   }
                }
            }
            fragmentStack = null;
        }

    }

    protected ByteBufferPool getByteBufferPool()
    {
        return orb.getByteBufferPool();
    }

    private void dprint(String msg)
    {
        ORBUtility.dprint("BufferManagerReadStream", msg);
    }

    // Mark and reset handler ----------------------------------------

    private boolean markEngaged = false;

    // List of fragment ByteBufferWithInfos received since
    // the mark was engaged.
    private LinkedList fragmentStack = null;
    private RestorableInputStream inputStream = null;

    // Original state of the stream
    private Object streamMemento = null;

    public void mark(RestorableInputStream inputStream)
    {
        this.inputStream = inputStream;
        markEngaged = true;

        // Get the magic Object that the stream will use to
        // reconstruct it's state when reset is called
        streamMemento = inputStream.createStreamMemento();

        if (fragmentStack != null) {
            fragmentStack.clear();
        }
    }

    // Collects fragments received since the mark was engaged.
    public void fragmentationOccured(ByteBufferWithInfo newFragment)
    {
        if (!markEngaged)
            return;

        if (fragmentStack == null)
            fragmentStack = new LinkedList();

        fragmentStack.addFirst(new ByteBufferWithInfo(newFragment));
    }

    public void reset()
    {
        if (!markEngaged) {
            // REVISIT - call to reset without call to mark
            return;
        }

        markEngaged = false;

        // If we actually did peek across fragments, we need
        // to push those fragments onto the front of the
        // buffer queue.
        if (fragmentStack != null && fragmentStack.size() != 0) {
            ListIterator iter = fragmentStack.listIterator();

            synchronized(fragmentQueue) {
                while (iter.hasNext()) {
                    fragmentQueue.push((ByteBufferWithInfo)iter.next());
                }
            }

            fragmentStack.clear();
        }

        // Give the stream the magic Object to restore
        // it's state.
        inputStream.restoreInternalState(streamMemento);
    }

    public MarkAndResetHandler getMarkAndResetHandler() {
        return this;
    }
}