Cadabra
Computer algebra system for field theory problems
Server.hh
Go to the documentation of this file.
1 
2 #pragma once
3 
4 #include <websocketpp/server.hpp>
5 #include <websocketpp/config/asio_no_tls.hpp>
6 #include <websocketpp/common/functional.hpp>
7 #include <string>
8 #include <signal.h>
9 #include <boost/uuid/uuid.hpp>
10 #include <future>
11 #include <pybind11/pybind11.h>
12 #include <pybind11/embed.h>
13 #include "nlohmann/json.hpp"
14 
15 #include "Stopwatch.hh"
16 
32 
33 class Server {
34  public:
35  Server();
36  Server(const Server&)=delete;
37  Server(const std::string& socket);
38  virtual ~Server();
39 
46 
47  void run(int port=0, bool exit_on_disconnect=true);
48 
49 
54 
55  class CatchOutput {
56  public:
57  CatchOutput();
58  CatchOutput(const CatchOutput&);
59 
60  void write(const std::string& txt);
61  void clear();
62  std::string str() const;
63  private:
64  std::string collect;
65  };
66 
68 
71 
89 
90  virtual uint64_t send(const std::string& output, const std::string& msg_type,
91  uint64_t parent_id=0, uint64_t cell_id=0, bool last_in_sequence=false);
92 // virtual void send_completion(uint64_t id, int pos, int alternative, std::string original, std::string completed);
93 
94  void send_progress_update(const std::string& msg, int n, int total);
95  void send_json(const std::string&);
96 
97  bool handles(const std::string& otype) const;
98  std::string architecture() const;
99 
103  void wait_for_job();
104 
107  void wait_for_websocket();
108 
109  protected:
110  void init();
111 
112  // WebSocket++ dependent parts below.
113  typedef websocketpp::server<websocketpp::config::asio> WebsocketServer;
114  void on_socket_init(websocketpp::connection_hdl hdl, boost::asio::ip::tcp::socket & s);
115  void on_message(websocketpp::connection_hdl hdl, WebsocketServer::message_ptr msg);
116  void on_open(websocketpp::connection_hdl hdl);
117  void on_close(websocketpp::connection_hdl hdl);
119  std::string socket_name;
120 
121  // Connection tracking. There can be multiple connections to
122  // the server, but they all have access to the same Python
123  // scope. With multiple connections, one can inspect the Python
124  // stack from a different client (e.g. for debugging purposes).
125  // All connections share the same authentication token.
126 
127  class Connection {
128  public:
129  Connection();
130 
131  websocketpp::connection_hdl hdl;
132  boost::uuids::uuid uuid;
133  };
134  typedef std::map<websocketpp::connection_hdl, Connection,
135  std::owner_less<websocketpp::connection_hdl>> ConnectionMap;
137 
138  // Authentication token, needs to be sent along with any message.
139  // Gets set when the server announces its port.
140  std::string authentication_token;
141 
142  // Mutex to be able to use the websocket layer from both the
143  // main loop and the python-running thread.
144  std::mutex ws_mutex;
145 
146 
147  // Basics for the working thread that processes blocks.
148  std::thread runner;
150  std::condition_variable block_available;
153  unsigned long main_thread_id;
154 
155  // Data and connection info for a single block of code.
156  class Block {
157  public:
158  Block(websocketpp::connection_hdl, const std::string&, uint64_t id, const std::string& msg_type);
159  websocketpp::connection_hdl hdl; // FIXME: decouple from websocket?
160  std::string msg_type;
161  std::string input;
162  std::string output;
163  std::string error;
164  uint64_t cell_id;
165 
166  // Response message, partially filled in when the
167  // request comes in.
168  nlohmann::json response;
169  };
170  std::queue<Block> block_queue;
171  websocketpp::connection_hdl current_hdl;
172  uint64_t current_id; // id of the block given to us by the client.
173 
174  // Run a piece of Python code. This is called from a separate
175  // thread constructed by on_message().
176  std::string run_string(const std::string&, bool handle_output=true);
177 
184 
185  virtual void on_block_finished(Block);
186  virtual void on_block_error(Block);
187  virtual void on_kernel_fault(Block);
188 
189 // uint64_t return_cell_id; // serial number of cells generated by us.
190 
193  void stop_block();
194  bool started;
195  std::future<std::string> job;
196 
200 
201  void dispatch_message(websocketpp::connection_hdl, const std::string& json_string);
202 
203  // Python global info.
204  pybind11::scoped_interpreter guard;
205  pybind11::module main_module;
206  pybind11::object main_namespace;
207 
208  // int cells_ran;
209  };
websocketpp::config::asio_client::message_type::ptr message_ptr
Definition: ComputeThread.hh:14
Definition: Server.hh:156
Block(websocketpp::connection_hdl, const std::string &, uint64_t id, const std::string &msg_type)
Definition: Server.cc:447
std::string input
Definition: Server.hh:161
uint64_t cell_id
Definition: Server.hh:164
websocketpp::connection_hdl hdl
Definition: Server.hh:159
std::string output
Definition: Server.hh:162
std::string msg_type
Definition: Server.hh:160
std::string error
Definition: Server.hh:163
nlohmann::json response
Definition: Server.hh:168
Python output catching.
Definition: Server.hh:55
void clear()
Definition: Server.cc:71
std::string str() const
Definition: Server.cc:77
CatchOutput()
Definition: Server.cc:57
std::string collect
Definition: Server.hh:64
void write(const std::string &txt)
Definition: Server.cc:65
Definition: Server.hh:127
boost::uuids::uuid uuid
Definition: Server.hh:132
websocketpp::connection_hdl hdl
Definition: Server.hh:131
Connection()
Definition: Server.cc:290
Object representing a Cadabra server, capable of receiving messages on a websocket,...
Definition: Server.hh:33
websocketpp::server< websocketpp::config::asio > WebsocketServer
Definition: Server.hh:113
void send_progress_update(const std::string &msg, int n, int total)
Definition: Server.cc:627
void init()
Definition: Server.cc:140
websocketpp::connection_hdl current_hdl
Definition: Server.hh:171
CatchOutput catchErr
Definition: Server.hh:67
virtual void on_block_error(Block)
Definition: Server.cc:657
virtual void on_block_finished(Block)
Called by the run_block() thread upon completion of the task.
Definition: Server.cc:550
pybind11::object main_namespace
Definition: Server.hh:206
std::thread runner
Definition: Server.hh:148
std::string authentication_token
Definition: Server.hh:140
pybind11::module main_module
Definition: Server.hh:205
virtual void on_kernel_fault(Block)
Definition: Server.cc:683
ConnectionMap connections
Definition: Server.hh:136
std::future< std::string > job
Definition: Server.hh:195
std::string socket_name
Definition: Server.hh:119
virtual ~Server()
Definition: Server.cc:52
Stopwatch sympy_stopwatch
Definition: Server.hh:70
std::mutex ws_mutex
Definition: Server.hh:144
WebsocketServer wserver
Definition: Server.hh:118
CatchOutput catchOut
Definition: Server.hh:67
virtual uint64_t send(const std::string &output, const std::string &msg_type, uint64_t parent_id=0, uint64_t cell_id=0, bool last_in_sequence=false)
Raw code to send a string (which must be JSON formatted) as a message to the client.
Definition: Server.cc:586
unsigned long main_thread_id
Definition: Server.hh:153
void on_close(websocketpp::connection_hdl hdl)
Definition: Server.cc:304
void wait_for_job()
Thread entry point for the code that waits for blocks to appear on the block queue,...
Definition: Server.cc:351
void on_message(websocketpp::connection_hdl hdl, WebsocketServer::message_ptr msg)
Definition: Server.cc:456
bool handles(const std::string &otype) const
Definition: Server.cc:580
std::map< websocketpp::connection_hdl, Connection, std::owner_less< websocketpp::connection_hdl > > ConnectionMap
Definition: Server.hh:135
int run_on_port
Definition: Server.hh:152
void run(int port=0, bool exit_on_disconnect=true)
The only user-visible part: just instantiate a server object and start it with run().
Definition: Server.cc:709
Server(const Server &)=delete
std::string architecture() const
Definition: Server.cc:82
void on_open(websocketpp::connection_hdl hdl)
Definition: Server.cc:295
std::mutex block_available_mutex
Definition: Server.hh:149
pybind11::scoped_interpreter guard
Definition: Server.hh:204
std::string run_string(const std::string &, bool handle_output=true)
Definition: Server.cc:217
std::queue< Block > block_queue
Definition: Server.hh:170
Server()
Definition: Server.cc:35
bool exit_on_disconnect
Definition: Server.hh:151
std::condition_variable block_available
Definition: Server.hh:150
void stop_block()
Halt the currently running block and prevent execution of any further blocks that may still be on the...
Definition: Server.cc:430
void send_json(const std::string &)
Definition: Server.cc:650
void dispatch_message(websocketpp::connection_hdl, const std::string &json_string)
Takes a JSON encoded message and performs the required action to process it.
Definition: Server.cc:473
Stopwatch server_stopwatch
Definition: Server.hh:69
void on_socket_init(websocketpp::connection_hdl hdl, boost::asio::ip::tcp::socket &s)
Definition: Server.cc:283
void wait_for_websocket()
Thread entry point for code that sets up and runs the websocket listener.
Definition: Server.cc:323
uint64_t current_id
Definition: Server.hh:172
bool started
Definition: Server.hh:194
The Stopwach class provides a simple interace to allow timing function calls etc.....
Definition: Stopwatch.hh:107