154 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			154 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
lib = "libduktape.207.20700.so"
 | 
						|
 | 
						|
from ctypes import CDLL, byref, string_at, c_int, c_void_p, c_char_p, c_size_t
 | 
						|
from json import loads
 | 
						|
from io import StringIO
 | 
						|
from pandas import read_csv
 | 
						|
 | 
						|
duk = CDLL(lib)
 | 
						|
 | 
						|
def str_to_c(s):
 | 
						|
  b = s.encode("utf8")
 | 
						|
  return [c_char_p(b), len(b)]
 | 
						|
 | 
						|
def duk_create_heap_default():
 | 
						|
  duk.duk_create_heap.restype = c_void_p
 | 
						|
  return duk.duk_create_heap(None, None, None, None, None)
 | 
						|
 | 
						|
def duk_eval_string_noresult(ctx, cmd):
 | 
						|
  [s, l] = str_to_c(cmd)
 | 
						|
  return duk.duk_eval_raw(ctx, s, l, 1 | (1<<3) | (1<<9) | (1<<10) | (1<<8) | (1<<11) )
 | 
						|
 | 
						|
def duk_eval_string(ctx, cmd):
 | 
						|
  [s, l] = str_to_c(cmd)
 | 
						|
  return duk.duk_eval_raw(ctx, s, l, 0 | (1<<3) | (1<<9) | (1<<10) | (1<<11) )
 | 
						|
 | 
						|
def duk_peval(ctx):
 | 
						|
  return duk.duk_eval_raw(ctx, None, 0, 1 | (1<<3) | (1<<7) | (1<<11) )
 | 
						|
 | 
						|
def duk_get_string(ctx, idx):
 | 
						|
  duk.duk_get_string.restype = c_char_p
 | 
						|
  retval = duk.duk_get_string(ctx, idx)
 | 
						|
  return retval.decode("utf8")
 | 
						|
 | 
						|
def eval_file(ctx, path):
 | 
						|
  with open(path, "r") as f:
 | 
						|
    code = f.read()
 | 
						|
    [s, l] = str_to_c(code)
 | 
						|
 | 
						|
    duk.duk_push_lstring(ctx, s, l)
 | 
						|
    retval = duk_peval(ctx)
 | 
						|
    duk.duk_pop(ctx)
 | 
						|
    return retval
 | 
						|
 | 
						|
def load_file(ctx, path, var):
 | 
						|
  with open(path, "rb") as f:
 | 
						|
    data = f.read()
 | 
						|
  ptr = c_char_p(data)
 | 
						|
  duk.duk_push_buffer_raw(ctx, 0, 1 | 2)
 | 
						|
  duk.duk_config_buffer(ctx, -1, ptr, len(data))
 | 
						|
  duk.duk_put_global_string(ctx, str_to_c(var)[0])
 | 
						|
  return data
 | 
						|
 | 
						|
def save_file(ctx, path, var):
 | 
						|
  duk.duk_get_global_string(ctx, str_to_c(var)[0])
 | 
						|
  sz = c_size_t()
 | 
						|
  duk.duk_get_buffer_data.restype = c_void_p
 | 
						|
  buf = duk.duk_get_buffer_data(ctx, -1, byref(sz))
 | 
						|
  s = string_at(buf, sz.value)
 | 
						|
  with open(path, "wb") as f:
 | 
						|
    f.write(s)
 | 
						|
 | 
						|
def initialize():
 | 
						|
  # initialize
 | 
						|
  context = duk_create_heap_default()
 | 
						|
  ctx = c_void_p(context)
 | 
						|
 | 
						|
  # duktape does not expose a standard "global" by default
 | 
						|
  duk_eval_string_noresult(ctx, "var global = (function(){ return this; }).call(null);")
 | 
						|
 | 
						|
  # load library
 | 
						|
  eval_file(ctx, "shim.min.js")
 | 
						|
  eval_file(ctx, "xlsx.full.min.js")
 | 
						|
 | 
						|
  # get version string
 | 
						|
  duk_eval_string(ctx, "XLSX.version")
 | 
						|
  print(f"SheetJS Library Version {duk_get_string(ctx, -1)}")
 | 
						|
  duk.duk_pop(ctx)
 | 
						|
  return [context, ctx]
 | 
						|
 | 
						|
def parse_file(ctx, path, name):
 | 
						|
  # read file
 | 
						|
  # NOTE: data is captured here to avoid GC
 | 
						|
  data = load_file(ctx, path, "buf")
 | 
						|
 | 
						|
  # parse workbook
 | 
						|
  duk_eval_string_noresult(ctx, f"{name} = XLSX.read(buf.slice(0, buf.length));")
 | 
						|
 | 
						|
def get_sheet_names(ctx, wb):
 | 
						|
  duk_eval_string(ctx, f"JSON.stringify({wb}.SheetNames)")
 | 
						|
  wsnames = duk_get_string(ctx, -1)
 | 
						|
  names = loads(wsnames)
 | 
						|
  duk.duk_pop(ctx)
 | 
						|
  return names
 | 
						|
 | 
						|
def get_csv_from_wb(ctx, wb, sheet_name=None):
 | 
						|
  if not sheet_name: sheet_name = f"{wb}.SheetNames[0]"
 | 
						|
  else: sheet_name = f"'{sheet_name}'"
 | 
						|
  duk_eval_string(ctx, f"XLSX.utils.sheet_to_csv({wb}.Sheets[{sheet_name}])")
 | 
						|
  csv = duk_get_string(ctx, -1)
 | 
						|
  duk.duk_pop(ctx)
 | 
						|
  return csv
 | 
						|
 | 
						|
def export_df_to_wb(ctx, df, path, sheet_name="Sheet1", book_type=None):
 | 
						|
  json = df.to_json(orient="records")
 | 
						|
  [s, l] = str_to_c(json)
 | 
						|
  duk.duk_push_lstring(ctx, s, l)
 | 
						|
  duk.duk_put_global_string(ctx, str_to_c("json")[0])
 | 
						|
  if not book_type: book_type = path.split(".")[-1]
 | 
						|
  duk_eval_string_noresult(ctx, f"""
 | 
						|
    aoo = JSON.parse(json);
 | 
						|
    newws = XLSX.utils.json_to_sheet(aoo);
 | 
						|
    newwb = XLSX.utils.book_new(newws, '{sheet_name}');
 | 
						|
    newbuf = XLSX.write(newwb, {{type:'buffer', bookType:'{book_type}'}});
 | 
						|
  """)
 | 
						|
  save_file(ctx, path, "newbuf")
 | 
						|
 | 
						|
def get_df_from_wb(ctx, wb, sheet_name=None):
 | 
						|
  csv = get_csv_from_wb(ctx, wb, sheet_name)
 | 
						|
  return read_csv(StringIO(csv))
 | 
						|
 | 
						|
class SheetJSWorkbook(object):
 | 
						|
  def __init__(self, sheetjs, wb):
 | 
						|
    self.ctx = sheetjs.ctx
 | 
						|
    self.wb = wb
 | 
						|
 | 
						|
  def get_sheet_names(self):
 | 
						|
    return get_sheet_names(self.ctx, self.wb)
 | 
						|
 | 
						|
  def get_df(self, sheet_name=None):
 | 
						|
    if sheet_name is None: sheet_name = self.get_sheet_names()[0]
 | 
						|
    return get_df_from_wb(self.ctx, self.wb, sheet_name)
 | 
						|
 | 
						|
class SheetJS(object):
 | 
						|
  def __init__(self, ctx):
 | 
						|
    self.ctx = ctx
 | 
						|
    self.wb_names = []
 | 
						|
 | 
						|
  def read_file(self, path):
 | 
						|
    self.wb_names.append(f"wb{len(self.wb_names)}")
 | 
						|
    parse_file(self.ctx, path, self.wb_names[-1])
 | 
						|
    return SheetJSWorkbook(self, self.wb_names[-1])
 | 
						|
 | 
						|
  def write_df(self, df, path, sheet_name = None):
 | 
						|
    export_df_to_wb(self.ctx, df, path, sheet_name)
 | 
						|
 | 
						|
class SheetJSWrapper(object):
 | 
						|
  def __enter__(self):
 | 
						|
    [context, ctx] = initialize()
 | 
						|
    self.context = context
 | 
						|
    self.ctx = ctx
 | 
						|
    return SheetJS(ctx)
 | 
						|
 | 
						|
  def __exit__(self, exc_type, exc_value, traceback):
 | 
						|
    duk.duk_destroy_heap(self.ctx) |