00001 #ifndef SCHEDULER_H_
00002 #define SCHEDULER_H_
00003
00004 #ifdef NF_MPI
00005 #include "mpi.h"
00006 #endif
00007
00008 #include "NFstream.h"
00009 #include "../NFsim.hh"
00010
00011 #include <vector>
00012 #include <string>
00013 #include <map>
00014 #include <iostream>
00015 #include <fstream>
00016 #include <sstream>
00017
00018 using namespace std;
00019
00020 #define MASTER 0
00021 #define TAG_MSG 99
00022 #define TAG_DATA 98
00023 #define MSG_DATA_SIZE (1<<20)
00024 #define MAX_MPI_SIZE 4096
00025
00026 enum {cmd_free=0, cmd_job, cmd_pre_data_ack, cmd_data_ack, rpt_ready, rpt_pre_data, rpt_data, rpt_done};
00027
00028 struct job {
00029 string filename;
00030 int processors;
00031 vector<string> argument;
00032 vector<string> argval;
00033 vector<string> parameters;
00034 vector<double> values;
00035 };
00036
00037 struct scan {
00038 vector<string> parameter;
00039 vector<double> min;
00040 vector<double> max;
00041 vector<int> steps;
00042 };
00043
00044 struct model {
00045 string filename;
00046 int processors;
00047 int replicates;
00048 vector<string> argument;
00049 vector<string> argval;
00050 };
00051
00052 typedef struct msgtype {
00053 int src;
00054 int tag;
00055 int len;
00056 char data[MSG_DATA_SIZE];
00057 } msgtype;
00058
00059 vector<job*> parseJobsFile (string filename);
00060
00061 int schedulerInterpreter(int* argc, char*** argv);
00062
00063 void convertModelScanToJobs(model*& currentModel, scan* currentScan, vector<job*>& joblist);
00064
00065 const char* itoa(int inNum);
00066
00067 const char* dtoa(double inNum);
00068
00069 string getFileLine(ifstream &input);
00070
00071 vector<string>* stringToStrings(string fullString, const char* delim, bool treatConsecutiveDelimAsOne = true);
00072
00073 vector<string>* getStringsFileline(ifstream &input, const char* delim, bool treatConsecutiveDelimAsOne);
00074
00075 void findandreplace(string &source, string find, string replace);
00076
00077 void printFileLineOutput();
00078
00079 void push_stream(int rank, NFstream& strm);
00080
00081 void send_to_slave(int slave, int tag, int datalen, char *data);
00082
00083 void send_to_master(int myid, int tag, int datalen, char *data);
00084
00085 void recv_from_slave();
00086
00087 void recv_from_master();
00088
00089 void perr(const char*);
00090
00091 void printParallelJobOutput(vector<job*> jobQueue);
00092
00093 void FinalizeMPI();
00094
00095 void InitializeMPI(int* argc, char*** argv,int& Size,int& Rank);
00096
00097
00098 string load_to_buffer(string filename);
00099
00100 void DynamicParallel(map<string, string> argMap,int rank,int size);
00101
00102 void EmbarrassingParallel(map<string, string> argMap,int rank,int size);
00103
00104 string BroadcastString(int Rank,int From,string InBuffer);
00105
00106 string ConvergeAllData(int Rank,int Size,string Buffer);
00107
00108 void ConvertStringToBufferMap(map<string, map<int, string> >& FileMap,string ReportBuffer);
00109
00110 string ConvertBufferMapToString(map<string, map<int,string> >& FileBuffers);
00111
00112 char* ConvertStringToCString(string Buffer);
00113
00114 void PrintFileBuffer(map<string, map<int, string> > FileMap,vector<job*> JobQueue);
00115
00116 string getPath(string Filename);
00117
00118 #endif