// Copyright (C) 2006 Chris Double.
// 
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
// 
// 1. Redistributions of source code must retain the above copyright notice,
//    this list of conditions and the following disclaimer.
// 
// 2. Redistributions in binary form must reproduce the above copyright notice,
//    this list of conditions and the following disclaimer in the documentation
//    and/or other materials provided with the distribution.
// 
// THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
// FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
// DEVELOPERS AND CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
// OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
// OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
// ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
function callcc(func) {
	var k = new Continuation();
	return func(k);
}

function socketServer(port, func) {
  var ss = new java.net.ServerSocket(port);
  while(!ss.isClosed()) {
    func(ss, ss.accept());
  }
}

function serializeToBytes(obj) {
  serialize(obj, "out.ser");
  var len = new java.io.File("out.ser").length();
  var fis = new java.io.FileInputStream("out.ser");
  var bytes = java.lang.reflect.Array.newInstance(java.lang.Byte.TYPE, len);
  fis.read(bytes);
  fis.close();
  return bytes;
}

function deserializeFromBytes(bytes) {
  var fos = new java.io.FileOutputStream("in.ser");
  fos.write(bytes, 0, bytes.length)
  fos.flush();
  fos.close();
  return deserialize("in.ser");
}

function serializeToStream(a, stream) {
  var bytes = serializeToBytes(a);
  var fos = new java.io.ObjectOutputStream(stream);
  fos.writeInt(bytes.length);
  fos.write(bytes, 0, bytes.length);
  fos.flush();
  fos.close();
}

function deserializeFromStream(stream) {
  var fis = new java.io.ObjectInputStream(stream);
  var len = fis.readInt();
  var bytes = java.lang.reflect.Array.newInstance(java.lang.Byte.TYPE, len);
  fis.readFully(bytes, 0, len);
  fis.close();
  return deserializeFromBytes(bytes);
}

var exitThread = new Continuation();

function handleSocketAccept(ss, s) {
  var input = s.getInputStream();
  var o = deserializeFromStream(input);
  s.close();
  spawn(o);
}

function messageListener(port) {
  spawn(function() { socketServer(port, handleSocketAccept); });
}

function send(host, port, func) {
  var s = new java.net.Socket(host, port);
  var os = s.getOutputStream();
  serializeToStream(func, os);
  os.flush();
  os.close();
}

function spawnThreadOn(host, port, func) {
  send(host, port, func);
}

function migrateThread(host, port) {
  callcc(function(k) {
    send(host, port, k);
    exitThread();
  });
}

function pingPong(host1, port1, host2, port2) {
  while(1) {
    print("ping\n");
    java.lang.Thread.sleep(1000);
    migrateThread(host2, port2);
    print("pong\n");
    java.lang.Thread.sleep(1000);
    migrateThread(host1, port1);
  }
}
