Commit 47464e02 authored by Haejoong Lee's avatar Haejoong Lee

move AikumaCloudStorage and AikumaCloudServer to separate branches

parent ab9fb62e
apply plugin: 'eclipse'
apply plugin: 'scala'
apply plugin: 'application'
apply plugin: 'maven-publish'
group = 'org.lp20'
mainClassName = 'org.lp20.aikuma.copyserver.CopyArchivedFiles'
repositories {
mavenCentral()
mavenLocal()
}
dependencies {
compile 'org.scala-lang:scala-library:2.11.5'
compile 'org.scala-lang:scala-compiler:2.11.5'
compile 'jline:jline:2.12'
compile('com.googlecode.json-simple:json-simple:1.1.1') {
exclude module: 'junit'
}
compile 'org.lp20:aikuma-cloud-storage:0.2.1'
compile files(rootDir.path + '/libs/jdbm-2.4.jar')
compile 'org.mapdb:mapdb:1.0.6'
}
publishing {
publications {
mavenJava(MavenPublication) {
from components.java
}
}
}
FROM java:8
ADD %TAR% /root
COPY docker/run.sh /root/run.sh
VOLUME ["/aikuma_var"]
ENTRYPOINT ["./run.sh"]
WORKDIR /root
## Synopsis
Decribes how to create a docker image for the CopyArchivedFiles utility.
## Files
The following files are needed to build a docker image. Probably, only
Dockerfile and run.sh are provided by default. The rest should be supplied by
the image builder.
Dockerfile
Prescribes how to build a docker image
run.sh
A wrapper script that runs java application(s) every 24 hours.
aikuma-copy-archived-files.jar
Runnable jar with CopyArchivedFiles and its dependencies.
This can be exported from eclipse. Make sure to export a runnable with all
dependencies included.
credentials.properties
A java properties file containing Google API credentials. The following 4
fields are required: access-token, refresh-token, client-id, and
client-secret.
## Build image
Once all materials are gathered, just run the following command.
docker build --rm -t hleeldc/aikuma:`date +%Y%m%d` .
This process can be repeated when the jar file needs to be updated.
# Create containers
We will create two containers. One is for persistent data volume, and the
other is for the application for which the image was created above.
The data volume container should be created only once and should never be
deleted.
docker run -it -v /aikuma_var --name aikuma_var busybox:latest /bin/sh
Once the container starts and a shell prompt is displayed, enter Ctrl-D to
exit. This creates a container from the busybox image and creates a directory
called "/aikuma_var" that will be shared by other containers. Again, this
should be done only once.
Now, create the application container. Note that the entry point to this
container is the run.sh script.
docker run -d --volumes-from=aikuma_var hleeldc/aikuma:20141016
This is supposed to run forever unless something unexpected happen. No data
is stored in this container, so it's perfectly fine to remove it when
necessary.
To check logs, just create a temporary container mounting the aikuma_var
volume.
docker run -it --rm --volumes-from=aikuma_var busybox /bin/sh
DOCKER_USER=hleeldc
DOCKER_REPO=aikuma-copy-server
#! /bin/bash
set -e
case $0 in
/*) thisdir=`dirname $0` ;;
*) thisdir=`pwd`/`dirname $0` ;;
esac
cd $thisdir/..
tarpath() {
# compute the path of the latest distribution tar file
find build/distributions -name "aikuma-copy-server-*.tar" | sort | tail -n 1
}
dockerfile() {
# generate dockerfile by filling in placeholder
cat $thisdir/Dockerfile.template | sed -r "s@%TAR%@$TARPATH@"
}
TARPATH=`tarpath`
v=`echo $TARPATH | sed -r 's/.*-(.*).tar/\1/'` # version
. $thisdir/build-config.sh
TAG=$DOCKER_USER/$DOCKER_REPO:$v
TAG_LATEST=$DOCKER_USER/$DOCKER_REPO:latest
gradle distTar # build distribution tar file
dockerfile >Dockerfile # generate dockerfile
docker build -t $TAG . # bulid docker image
docker tag $TAG $TAG_LATEST
rm -f Dockerfile
#! /bin/bash
VAR=${1:-/aikuma_var}
CREDENTIALS=$VAR/credentials.properties
TRACKING_DB=$VAR/tracking
T=`date +%s` # beginning of this script
INTERVAL=86400 # 24 hours
LOG_PREFIX=$VAR/log-cpaf-
log() {
echo `date +"[%Y-%m-%d %H:%M:%S]"` "$1" >>$LOG_FILE 2>&1
}
log_file_name() {
echo ${LOG_PREFIX}$(date +%Y%m%d).txt
}
get_next_time() {
echo $(( $T + ((`date +%s` - $T) / $INTERVAL + 1) * $INTERVAL ))
}
run_app() {
aikuma-copy-server-*/bin/aikuma-copy-server $CREDENTIALS $TRACKING_DB
}
rm_old_logs() {
find $VAR -name "$(basename $LOG_PREFIX)*" -ctime +7 -exec rm \{} \;
}
while true; do
LOG_FILE=`log_file_name`
log "STARTED"
run_app >>$LOG_FILE 2>&1
log "FINISHED with exit code $?"
t=`get_next_time`
log "Scheduled to run at `date -d @$t`"
rm_old_logs >>$LOG_FILE 2>&1
sleep $(( $t - `date +%s` ))
done
package org.lp20.aikuma.copyserver;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;
import org.lp20.aikuma.storage.*;
/**
* For each file in fusion table that meets the following conditions,
* copy it over to the central google drive and update date_processed field.
*
* - File is uploaded to personal google drive.
* - File is approved for archive.
* - File hasn't been copied to central google drive.
*
* Names of copied files are kept in a local database to avoid duplicated
* copies.
*
* @author haejoong
*/
public class CopyArchivedFiles {
static final Logger LOG = Logger.getLogger(CopyArchivedFiles.class.getName());
/**
* @param args
*/
public static void main(String[] args) {
if (args.length != 2) {
System.out.println("Usage: CopyArchivedFiles <credential file> <process log>");
System.exit(1);
}
String credentialFile = args[0];
String processLogFile = args[1];
GoogleCredentialManager cm = new GoogleCredentialManager(credentialFile);
ProcessLogManager pm = new ProcessLogManager(processLogFile);
String accessToken = cm.getAccessToken();
DataStore ds = new GoogleDriveStorage(accessToken);
FusionIndex fi = new FusionIndex(accessToken);
HashMap<String,String> constraints = new HashMap<String,String>();
List<String> items;
try {
items = fi.search(constraints);
} catch (InvalidAccessTokenException e) {
LOG.warning("access token expired");
System.exit(1);
throw e;
}
for (String item_id: items) {
Map<String,String> meta = fi.getItemMetadata(item_id);
String url = meta.get("data_store_uri");
String date_approved = meta.get("date_approved");
String date_backedup = meta.get("date_backedup");
if (url == null || url.isEmpty() || url.equals("NA"))
continue;
if (date_approved == null || date_approved.isEmpty())
continue;
if (date_backedup == null || date_backedup.isEmpty())
continue;
if (pm.isCopied(item_id) && pm.isDated(item_id))
continue;
if (!pm.isCopied(item_id)) {
Data data;
try {
data = downloadUrl(item_id, url);
} catch (Exception e1) {
LOG.warning(item_id + ": failed to download: " + e1.getMessage());
data = null;
}
if (data == null) continue;
String uri = ds.store(item_id, data);
if (uri == null) {
LOG.warning(item_id + ": failed to copy to central location");
continue;
}
if (!ds.share(item_id)) {
LOG.warning(item_id + ": failed to share");
continue;
}
if (!pm.setUri(item_id, uri)) {
LOG.warning(item_id + ": failed to update local db");
continue;
}
LOG.info(item_id + ": copied");
}
if (!pm.isDated(item_id)) {
Map<String,String> m = new HashMap<String,String>();
m.put("central_data_store_uri", pm.getUri(item_id));
String date = LocalDateTime.now(ZoneId.of("UTC")).format(
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'"));
m.put("date_processed", date);
if (!fi.update(item_id, m)) {
LOG.warning(item_id + ": failed to update date_processed");
continue;
}
if (!pm.setDated(item_id)) {
LOG.warning(item_id + ": failed to update local db(2)");
}
LOG.info(item_id + ": processed date updated");
}
}
}
static String getMimeType(HttpURLConnection con) {
String f;
int i = 0;
while ((f = con.getHeaderField(i)) != null) {
String k = con.getHeaderFieldKey(i++);
if (k != null && k.equals("Content-Type")) {
return f.split(";")[0];
}
}
return null;
}
static Data downloadUrl(String item_id, String url) throws MalformedURLException, IOException {
LOG.info(item_id + ": download requested: " + url);
HttpURLConnection con = (HttpURLConnection) (new URL(url)).openConnection();
con.setInstanceFollowRedirects(true);
con.setDoOutput(false);
con.setRequestMethod("GET");
if (con.getResponseCode() != HttpURLConnection.HTTP_OK) {
LOG.warning(item_id + ": download failed: " + url);
return null;
}
String mimeType = getMimeType(con);
if (!mimeType.split("/")[0].equals("audio")) {
LOG.warning(item_id + ": wrong mime type: " + mimeType);
return null;
}
return new Data(con.getInputStream(), mimeType);
}
}
package org.lp20.aikuma.copyserver;
/**
* A prcess log recording whether a file has been copied, and
* whether the date of copy has been recorded.
*
* @author haejoong
*/
public class ProcessLog implements java.io.Serializable {
private static final long serialVersionUID = 1L;
String uri;
boolean dated;
/**
* Create a new log.
*/
public ProcessLog() {
uri = null;
dated = false;
}
/**
* Tells whether the file has been copied.
* @return
*/
public boolean isCopied() {
return uri != null;
}
/**
* Tells whether the date of copy has been recorded.
* @return
*/
public boolean isDated() {
return dated;
}
/**
* Get data store uri for the file (download url).
* @return uri
*/
public String getUri() {
return uri;
}
/**
* Records that the file has been copied.
* @param v data store uri for the file (download url)
*/
public void setUri(String v) {
uri = v;
}
/**
* Records that the date of copy has been recorded.
*/
public void setDated() {
dated = true;
}
}
package org.lp20.aikuma.copyserver;
import java.io.File;
import java.util.Map;
import java.util.logging.Logger;
import org.mapdb.*;
/**
* Class for keeping tracking of processing states.
*
* It uses a persistent database to manage 2 pieces of information
* for each file.
*
* - Whether the item has been copied to the central location.
* - Whether the item's process date has been updated.
*
* @author haejoong
*/
public class ProcessLogManager {
DB db_;
Map<String,ProcessLog> map_;
static final String mapName_ = "log";
Logger log = Logger.getLogger(ProcessLogManager.class.getName());
public ProcessLogManager(String filename) {
db_ = DBMaker.newFileDB(new File(filename)).closeOnJvmShutdown().make();
map_ = db_.getHashMap("log");
}
/**
* Use to record that file has been copied to central location.
*
* @param itemId
*/
public boolean setUri(String itemId, String uri) {
ProcessLog pl = map_.get(itemId);
if (pl == null)
pl = new ProcessLog();
pl.setUri(uri);
map_.put(itemId, pl);
db_.commit();
return true;
}
/**
* Use to record that dateProcessed field has been updated.
*
* @param itemId Item ID
*/
public boolean setDated(String itemId) {
ProcessLog pl = map_.get(itemId);
if (pl == null)
pl = new ProcessLog();
pl.setDated();
map_.put(itemId, pl);
db_.commit();
return true;
}
/**
* Tells whether the file has been copied to the central location.
* @param itemId
* @return true on success, false otherwise
*/
public boolean isCopied(String itemId) {
ProcessLog pl = map_.get(itemId);
if (pl == null)
return false;
else
return pl.isCopied();
}
/**
* Tells whether the file's copy date has been updated.
* @param itemId
* @return
*/
public boolean isDated(String itemId) {
ProcessLog pl = map_.get(itemId);
if (pl == null)
return false;
else
return pl.isDated();
}
/**
* Get the data store uri for the file (download url).
* @param itemId
* @return uri
*/
public String getUri(String itemId) {
ProcessLog pl = map_.get(itemId);
if (pl == null)
return null;
else
return pl.getUri();
}
}
/**
*
*/
/**
* @author haejoong
*
*/
package org.lp20.aikuma.copyserver;
import jdbm.RecordManager
import jdbm.RecordManagerFactory
import org.mapdb._
import org.lp20.aikuma.servers.ProcessLog
import org.lp20.aikuma.copyserver.{ProcessLog => NewPrLog}
import scala.io.Source
import java.io.File
/**
* Convert JDBM2-based process log to MapDB.
*/
object Jdbm2MapDb {
def main(args: Array[String]) {
val jdbmFileName = args(0)
val jdbmKeysFileName = args(1)
val mapdbFileName = args(2)
val jdbm = RecordManagerFactory.createRecordManager(jdbmFileName)
val db =
DBMaker.newFileDB(new File(mapdbFileName)).
closeOnJvmShutdown().
make()
val fetch = getProcessLog(jdbm)_
val copy = copyToMapDb(db.getHashMap("log"))_
for {
id <- readKeys(jdbmKeysFileName)
obj = fetch(id)
if obj != null
} copy(id, obj)
db.commit
}
def readKeys(filename: String): Iterator[String] = {
val it = Source.fromFile(filename).getLines
it.next() // discard header
for (line <- it) yield line.split(",")(0)
}
def getProcessLog(rm:RecordManager)(name:String): ProcessLog = {
rm.getNamedObject(name) match {
case n if n != 0 => rm.fetch(n).asInstanceOf[ProcessLog]
case _ => null
}
}
def copyToMapDb(map:java.util.Map[String,NewPrLog])
(key:String, log:ProcessLog) {
val pl = new NewPrLog
pl.setUri(log.getUri)
if (log.isDated) pl.setDated
println("adding " + key)
map.put(key, pl)
}
}
import org.mapdb._
import scala.collection.JavaConversions._
object MapDbTest {
def main(args: Array[String]) {
val mapdbFileName = args(0)
val db = DBMaker.newFileDB(new java.io.File(mapdbFileName)).make()
val map = db.getHashMap("log")
for (entry <- map.entrySet) {
println(entry.getKey)
}
println("ok")
}
}
package org.lp20.aikuma.servers;
/**
* A prcess log recording whether a file has been copied, and
* whether the date of copy has been recorded.
*
* @author haejoong
*/
public class ProcessLog implements java.io.Serializable {
private static final long serialVersionUID = 1L;
String uri;
boolean dated;
/**
* Create a new log.
*/
public ProcessLog() {
uri = null;
dated = false;
}
/**
* Tells whether the file has been copied.
* @return
*/
public boolean isCopied() {
return uri != null;
}
/**
* Tells whether the date of copy has been recorded.
* @return
*/
public boolean isDated() {