Login Page - Create Account

Support Board


Date/Time: Fri, 10 May 2024 17:07:10 +0000



Post From: TCP/IP Server in custom study

[2021-06-21 21:48:21]
norvik_ - Posts: 106
Hi.
I have coded the this class for fast data exchange between SC and Python. It help easy get data from SC(ohlcv, any study subgraphs)
to external app written in any language which have a 0MQ port. Only offline data processing supported now. Hope, this help.

zmq_server.hpp:
#pragma once

#include <cstdlib>
#include <chrono>
#include <utility>
#include <future>
#include <cstddef>

#include <zmqxx/zmq.hpp>
#include <zmqxx/zmq_addon.hpp>

using namespace std::chrono_literals;

#if (_WIN32 || _WIN64)
#if _WIN64
#pragma comment(lib,"zmq/lib/libzmq-v141-mt64-gd-4_3_5.lib")
#endif
#endif

#pragma warning(disable:4996)

enum class client_request : std::uint8_t
{
  studies_details = 0,
  base_data,
  study_subgraps_data,
  date_time_data,
  external_data_received
};

enum class error_code : std::size_t
{
  no_error = 0,
  unknown_request_type = 1,
  has_no_requested_data = 2,
};

struct array_shape
{
  std::size_t columns = 0, rows = 0;
  std::size_t length()const { return columns; }
  std::size_t heigth()const { return rows; }
  array_shape(std::size_t x, std::size_t y) : columns(x) , rows(y){}
  std::size_t size() const { return 2 * sizeof(std::size_t); }
};

using sc_array = std::vector<float>;
using sc_graph_data = std::vector<sc_array>;
using study_data = std::map<std::string, sc_graph_data>;

class server
{
private:
  const std::size_t default_message_size = 0x0000001F;
  const std::size_t error_marker = 0x00000000;

  sc_array external_data;
  study_data chart_studies_data;
  std::condition_variable cv;
  std::mutex mx;
  bool ready = false;
  zmq::context_t context;
  zmq::socket_t socket;
  std::string zmq_endpoint;
  s_sc* p_interface = nullptr;
  std::future<void> job_done;
  std::atomic<bool> running = false;

  server() = default;
  
public:
  
  virtual ~server() = default;
  server(server& other) = delete;
  server(server&& other) = delete;
  server operator=(server& other) = delete;
  server operator=(server&& other) = delete;

  static auto& instance() {
    static server s;
    return s;
  }
  
  void run(SCStudyInterfaceRef sc) {
    if (running.load() == true)
      return;
    context = zmq::context_t();
    socket = zmq::socket_t(context, zmq::socket_type::rep);
    p_interface = ≻
    running.store(true);
    set_endpoint();
    socket.bind(zmq_endpoint);
    std::this_thread::sleep_for(1000ms);

    job_done = std::async(std::launch::async, [this]() {async_proc(); });
  }

  void terminate() {
    if (running) {
      running.store(false);
      job_done.get();
      socket.unbind(zmq_endpoint);
      socket.close();
      context.shutdown();
      context.close();
    }
  };

  void update_study_data(const std::string& name, const sc_graph_data& data) {
    const auto study = chart_studies_data.find(name);
    if(study == chart_studies_data.end())
      chart_studies_data[name] = data;
  }

  //Need this for chart replay only.
  void apply_external_data(SCSubgraphRef subgraph ) {
    if (!external_data.empty()) {
      int index = p_interface->UpdateStartIndex;
      while (index < external_data.size()) {
        if (index >= subgraph.Data.GetArraySize()) break;
        subgraph.Data[index] = external_data[index];
        index++;
      }
    }
  }

  void notify() {
    if (!running) return;
    std::unique_lock<std::mutex> lock(mx);
    ready = true;
    lock.unlock();
    cv.notify_one();
  }

  bool active() noexcept {
    return (running.load() == true);
  }

protected:

  void set_endpoint() noexcept{
    zmq_endpoint = p_interface->Input[0].GetString();
  }

  void wait() {
    std::unique_lock<std::mutex> lock(mx);
    while(!ready)
      cv.wait(lock, [this]() { return ready; });
    ready = false;
  }

  void async_proc()  {
    while (running)  {
      zmq::message_t msg;
      zmq::recv_result_t result = socket.recv(msg, zmq::recv_flags::dontwait);
      std::this_thread::sleep_for(1ms);

      if (result.has_value()) {
        socket.send(prosess_message(msg), zmq::send_flags::none);
      }
    }
  }
  
  zmq::message_t prosess_message(const zmq::message_t& request)  {
    auto request_content = split_message(request);
    auto request_id = static_cast<client_request>(std::atoi(request_content.front().c_str()));
    request_content.pop_front();
    zmq::message_t message;
                
    switch (request_id) {
      case client_request::base_data:  fill_base_data(message, request_content); break;
      case client_request::study_subgraps_data: {
        compute_chart_studies();
        // Prevent futher actions untill all chart studies values will be in place;
        wait();
                        
        if (!chart_studies_data.empty()) {
          auto const study = chart_studies_data.find(request_content.front());

          if (study == chart_studies_data.end())
            fill_error(message, error_code::has_no_requested_data);
          else {
            fill_study_data(message, study->second);
            chart_studies_data.clear();
          }
        }
        else fill_error(message, error_code::has_no_requested_data);
      }  break;
      case client_request::date_time_data: fill_date_time_data(message); break;
      case client_request::studies_details: {
        compute_chart_studies();
        wait();
        fill_studies_details(message);
      } break;
      case client_request::external_data_received: {
        prosess_message_data(request_content);
        fill_error(message, error_code::no_error);
      } break;
      default: fill_error(message, error_code::unknown_request_type); break;
    }
    return message;
  }

  std::list<std::string> split_message(const zmq::message_t& msg) {
    std::string str = msg.to_string();
    std::list<std::string> words;
    std::string word;
    int index = 0;

    while (index < str.size()) {
      if (str.at(index) == ';')  {
        words.push_back(word);
        word.clear();
      }
      else  word += str.at(index);
      index++;
    }

    if (!word.empty())
      words.push_back(word);
    return words;
  }
  
  void fill_error(zmq::message_t& msg, const error_code& code)  {
    msg.rebuild(default_message_size);
    auto buffer = static_cast<std::byte*>(msg.data());
    memcpy(buffer, &error_marker, sizeof(std::size_t));
    memcpy(buffer + sizeof(std::size_t), &code, sizeof(std::size_t));
  }

  void fill_array_shape(std::byte* buffer,  const array_shape& shape) {
    memcpy(buffer , (void*)(&shape.columns), sizeof shape.length());
    memcpy(buffer + sizeof shape.length(), (void*)(&shape.rows), sizeof shape.heigth());
  }

  void fill_base_data(zmq::message_t& msg, const std::list<std::string>&indecies) {
    array_shape shape(p_interface->BaseData.GetAt(0).GetArraySize(), indecies.size());
    auto[size, buffer] = cook_buffer(shape, msg);
    std::size_t offset = shape.size();
    std::int32_t sc_index = 0;

    for (const auto& str_index : indecies) {
      std::int32_t index = std::atoi(str_index.c_str());
      memcpy_s(buffer + offset, size, reinterpret_cast<const std::byte*>(p_interface->BaseData[index].GetPointer()),
        shape.length()*sizeof(float));
      offset += shape.length()*sizeof(float);
      size -= shape.length() * sizeof(float);
    }
  }

  void fill_date_time_data(zmq::message_t& msg) {
    std::size_t array_size = p_interface->BaseDateTimeIn.GetArraySize();
    std::size_t buffer_size = array_size * sizeof(std::time_t);
    msg.rebuild(buffer_size);
    auto buffer = static_cast<std::byte*>(msg.data());
    std::int32_t sc_index = 0;

    while (sc_index < array_size) {
      std::time_t timestamp = p_interface->BaseDateTimeIn[sc_index].ToUNIXTime();
      memcpy(buffer + sizeof(std::time_t)*sc_index,
        reinterpret_cast<const std::byte*>(×tamp),
        sizeof(std::time_t));
      sc_index++;
    }
  }

  void fill_study_data(zmq::message_t& msg, const sc_graph_data& graph_data) {
    array_shape shape(graph_data[0].size(), graph_data.size());
    auto[size, buffer] = cook_buffer(shape, msg);
    std::size_t offset = shape.size();

    for (const auto& subgraph: graph_data) {
      memcpy_s(buffer + offset, size, reinterpret_cast<const std::byte*>(subgraph.data()),
        subgraph.size() * sizeof(float));
      offset += subgraph.size() * sizeof(float);
      size -= subgraph.size() * sizeof(float);
    }
  }

  void fill_studies_details(zmq::message_t& msg) {
    std::string details;
    for (const auto& name : chart_studies_data)
      details.append(name.first + ";");
    details.pop_back();
    msg.rebuild(details.length());
    auto buffer = static_cast<std::byte*>(msg.data());
    memcpy_s(buffer, details.length(), reinterpret_cast<const std::byte*>(details.data()),
      details.length());
  }

  void prosess_message_data(const std::list<std::string>& data) {
    external_data.resize(0);
    for (const auto& element : data) {
      external_data.push_back(std::stof(element.c_str()));
    }
    compute_chart_studies();
  }

  //Force all chart studies full recalculation;
  void compute_chart_studies() {
    ready = false;
    HWND handle = (HWND)p_interface->ChartWindowHandle;
    ::PostMessageA(handle, WM_KEYDOWN, VK_CONTROL, 1);
    std::this_thread::sleep_for(50ms);
    ::PostMessageA(handle, WM_KEYDOWN, VK_INSERT, 1);
    std::this_thread::sleep_for(50ms);
    ::PostMessageA(handle, WM_KEYUP, VK_INSERT, 1);
    std::this_thread::sleep_for(50ms);
    ::PostMessageA(handle, WM_KEYUP, VK_CONTROL, 1);
  }

  std::tuple<std::size_t,std::byte*> cook_buffer(const array_shape &shape, zmq::message_t& message)  {
    auto buffer_size = shape.size() + sizeof(float) * shape.length() * shape.heigth();
    message.rebuild(buffer_size);
    auto buffer = static_cast<std::byte*>(message.data());
    fill_array_shape(buffer, shape);
    buffer_size -= shape.size();
    return std::make_tuple(buffer_size, buffer);
  }
};

Usage:
#include "C:/GreenChart/ACS_Source/sierrachart.h"
#include "sc_server.hpp"


SCDLLName("SCSERVER")
#define MAX_POSSIBLE_STUDIES_ON_CHART 20 //user defined threshold, any reasonable value ..

std::int32_t NumberOfSubgraphsUsed(const SCGraphData& GraphData)
{
  std::int32_t SubgraphsUsed = 0, SubgraphIndex = 0, SubgraphSize = 0;
  while (SubgraphIndex < SC_SUBGRAPHS_AVAILABLE)
  {
    if (((SubgraphSize = GraphData.GetAt(SubgraphIndex).GetArraySize()) > 0) &&
      (GraphData.GetAt(SubgraphIndex).GetPointer() != NULL))
      SubgraphsUsed++;
    SubgraphIndex++;
  }
  return SubgraphsUsed;
}


SCSFExport scsf_SC_zmq_test_server(SCStudyInterfaceRef sc)
{
  SCInputRef Endpoint = sc.Input[0];
  SCSubgraphRef ExternalData = sc.Subgraph[0];

  if (sc.SetDefaults)
  {
    Endpoint.Name = "Server details";
    Endpoint.SetString("tcp://127.0.0.1:5555");

    ExternalData.Name = "ExternalData";
    ExternalData.DrawStyle = DRAWSTYLE_LINE;
    ExternalData.LineWidth = 3;
    ExternalData.PrimaryColor = COLOR_AZURE;
    
    sc.GraphRegion = 0;
    sc.AutoLoop = 0;
    sc.GraphName = "SC Server";
    sc.CalculationPrecedence = VERY_LOW_PREC_LEVEL;

    return;
  }

  int& MenuID = sc.GetPersistentInt(1);
  int& SepMenuID = sc.GetPersistentInt(2);
  SCString& LastProsessedStudyName = sc.GetPersistentSCString(1);

  int ChartStudyID = 0;
  int ChartStudyNumber = 0;

  if (sc.Index == 0)
  {
    if (MenuID <= 0)
    {
      MenuID = sc.AddACSChartShortcutMenuItem(sc.ChartNumber, "Start Test Server");
      SepMenuID = sc.AddACSChartShortcutMenuSeparator(sc.ChartNumber);
    }

    sc.PlaceACSChartShortcutMenuItemsAtTopOfMenu = true;
  }

  if (sc.MenuEventID != 0)
  {
    if (sc.MenuEventID == MenuID)
      server::instance().run(sc);
  }

  if (sc.LastCallToFunction)
  {
    sc.RemoveACSChartShortcutMenuItem(sc.ChartNumber, MenuID);
    sc.RemoveACSChartShortcutMenuItem(sc.ChartNumber, SepMenuID);
    server::instance().terminate();
  }

  if (sc.ChartIsDownloadingHistoricalData(sc.ChartNumber))
    return;
  
  server::instance().apply_external_data(ExternalData);

  if (!server::instance().active())
    return;

  if (!sc.IsFullRecalculation)
    return;

  if (sc.ReplayStatus == REPLAY_RUNNING)
    return;
    
  while (ChartStudyNumber < MAX_POSSIBLE_STUDIES_ON_CHART)
  {
    if ((ChartStudyID = sc.GetStudyIDByIndex(sc.ChartNumber, ChartStudyNumber)) > 0)
    {
      if (ChartStudyID == sc.GetStudyIDByName(sc.ChartNumber, sc.GraphName, 0))
      {
        ChartStudyNumber++;
        continue;
      }

      SCString StudyName = sc.GetStudyNameUsingID(ChartStudyID);

      if (StudyName == LastProsessedStudyName)
      {
        ChartStudyNumber++;
        continue;
      }

      SCGraphData CurrentStudyData;
      sc.GetStudyArraysFromChartUsingID(sc.ChartNumber, ChartStudyID, CurrentStudyData);
      int SubgraphsUsed = NumberOfSubgraphsUsed(CurrentStudyData);
      

      if (SubgraphsUsed > 0)
      {
        sc_graph_data graph_data;
        graph_data.resize(SubgraphsUsed);
        int SubgraphIndex = 0;
        for (auto& subgraph_data : graph_data)
        {
          SCFloatArrayRef Data = CurrentStudyData[SubgraphIndex];
        
          for (auto Index = 0; Index < Data.GetArraySize(); Index++)
            subgraph_data.push_back(Data[Index]);

          SubgraphIndex++;
        }
        server::instance().update_study_data(StudyName.GetChars(), graph_data);
        LastProsessedStudyName = StudyName;
      }
    }

    ChartStudyNumber++;
  }
  
  server::instance().notify();
  LastProsessedStudyName.Clear();
}