Source code for bqpjson.core

from __future__ import print_function

import os, sys, json, copy, math

from io import IOBase
import fractions
import itertools
import functools

import jsonschema

# prints a line to standard error
print_err = functools.partial(print, file=sys.stderr)

with open(os.path.join(os.path.dirname(os.path.abspath(__file__)), 'bqpjson-schema.json')) as file:
    _qbpjson_schema = json.load(file)

_bqpjson_versions = ['1.0.0']
_bqpjson_version_latest = _bqpjson_versions[-1]


[docs]def validate(data): jsonschema.validate(data, _qbpjson_schema) assert(data['version'] == _bqpjson_version_latest) assert(data['scale'] >= 0.0) var_ids = {i for i in data['variable_ids']} lt_vars = set([]) for lt in data['linear_terms']: assert(lt['id'] in var_ids) assert(lt['id'] not in lt_vars) lt_vars.add(lt['id']) qt_var_pairs = set() for qt in data['quadratic_terms']: assert(qt['id_tail'] in var_ids) assert(qt['id_head'] in var_ids) assert(qt['id_tail'] != qt['id_head']) if qt['id_tail'] > qt['id_head']: # TODO warn pass pair = (qt['id_tail'], qt['id_head']) assert(pair not in qt_var_pairs) qt_var_pairs.add(pair) if 'solutions' in data: spin_var_domain = data['variable_domain'] == 'spin' boolean_var_domain = data['variable_domain'] == 'boolean' solution_ids = set() for solution in data['solutions']: assert(solution['id'] not in solution_ids) solution_ids.add(solution['id']) sol_var_ids = set() for assign in solution['assignment']: var_id = assign['id'] assert(var_id in var_ids) assert(var_id not in sol_var_ids) sol_var_ids.add(var_id) if spin_var_domain: assert(assign['value'] == -1 or assign['value'] == 1) if boolean_var_domain: assert(assign['value'] == 0 or assign['value'] == 1) assert(len(sol_var_ids) == len(var_ids))
[docs]def evaluate(data): validate(data) values = [] if 'solutions' in data: for solution in data['solutions']: assignment = {item['id']:item['value'] for item in solution['assignment']} values.append(_evaluate(data, assignment)) return values
def _evaluate(data, assignment): value = data['offset'] \ + sum(lt['coeff']*assignment[lt['id']] for lt in data['linear_terms']) \ + sum(qt['coeff']*assignment[qt['id_tail']]*assignment[qt['id_head']] for qt in data['quadratic_terms']) return data['scale']*value
[docs]def swap_variable_domain(data): validate(data) if data['variable_domain'] == 'spin': output_data = spin_to_bool(data) else: assert(data['variable_domain'] == 'boolean') output_data = bool_to_spin(data) return output_data
[docs]def spin_to_bool(ising_data): validate(ising_data) assert(ising_data['variable_domain'] == 'spin') offset = ising_data['offset'] coefficients = {} for v_id in ising_data['variable_ids']: coefficients[(v_id, v_id)] = 0.0 for linear_term in ising_data['linear_terms']: v_id = linear_term['id'] coeff = linear_term['coeff'] #assert(coeff != 0.0) coefficients[(v_id, v_id)] = 2.0*coeff offset += -coeff for quadratic_term in ising_data['quadratic_terms']: v_id1 = quadratic_term['id_tail'] v_id2 = quadratic_term['id_head'] coeff = quadratic_term['coeff'] #assert(coeff != 0.0) if not (v_id1, v_id2) in coefficients: coefficients[(v_id1, v_id2)] = 0.0 coefficients[(v_id1, v_id2)] = coefficients[(v_id1, v_id2)] + 4.0*coeff coefficients[(v_id1, v_id1)] = coefficients[(v_id1, v_id1)] - 2.0*coeff coefficients[(v_id2, v_id2)] = coefficients[(v_id2, v_id2)] - 2.0*coeff offset += coeff linear_terms = [] quadratic_terms = [] for (i,j) in sorted(coefficients.keys()): v = coefficients[(i,j)] if v != 0.0: if i == j: linear_terms.append({'id':i, 'coeff':v}) else: quadratic_terms.append({'id_tail':i, 'id_head':j, 'coeff':v}) bool_data = copy.deepcopy(ising_data) bool_data['variable_domain'] = 'boolean' bool_data['offset'] = offset bool_data['linear_terms'] = linear_terms bool_data['quadratic_terms'] = quadratic_terms if 'solutions' in bool_data: for solution in bool_data['solutions']: for assign in solution['assignment']: if assign['value'] == -1: assign['value'] = 0 return bool_data
[docs]def bool_to_spin(bool_data): validate(bool_data) assert(bool_data['variable_domain'] == 'boolean') offset = bool_data['offset'] coefficients = {} for v_id in bool_data['variable_ids']: coefficients[(v_id, v_id)] = 0.0 for linear_term in bool_data['linear_terms']: v_id = linear_term['id'] coeff = linear_term['coeff'] #assert(coeff != 0.0) coefficients[(v_id, v_id)] = coeff/2.0 offset += linear_term['coeff']/2.0 for quadratic_term in bool_data['quadratic_terms']: v_id1 = quadratic_term['id_tail'] v_id2 = quadratic_term['id_head'] coeff = quadratic_term['coeff'] #assert(coeff != 0.0) if not (v_id1, v_id2) in coefficients: coefficients[(v_id1, v_id2)] = 0.0 coefficients[(v_id1, v_id2)] = coefficients[(v_id1, v_id2)] + coeff/4.0 coefficients[(v_id1, v_id1)] = coefficients[(v_id1, v_id1)] + coeff/4.0 coefficients[(v_id2, v_id2)] = coefficients[(v_id2, v_id2)] + coeff/4.0 offset += coeff/4.0 linear_terms = [] quadratic_terms = [] for (i,j) in sorted(coefficients.keys()): v = coefficients[(i,j)] if v != 0.0: if i == j: linear_terms.append({'id':i, 'coeff':v}) else: quadratic_terms.append({'id_tail':i, 'id_head':j, 'coeff':v}) ising_data = copy.deepcopy(bool_data) ising_data['variable_domain'] = 'spin' ising_data['offset'] = offset ising_data['linear_terms'] = linear_terms ising_data['quadratic_terms'] = quadratic_terms if 'solutions' in ising_data: for solution in ising_data['solutions']: for assign in solution['assignment']: if assign['value'] == 0: assign['value'] = -1 return ising_data
[docs]def bqpjson_to_qubist(data, out_stream): validate(data) print2out = functools.partial(print, file=out_stream) if data['variable_domain'] == 'boolean': print_err('Error: unable to generate qubist hamiltonian, only spin domains are supported by qubist') quit() quadratic_terms = {} for qt in data['quadratic_terms']: i,j = qt['id_tail'],qt['id_head'] if i > j: i,j = qt['id_head'],qt['id_tail'] pair = (i,j) if pair not in quadratic_terms: quadratic_terms[pair] = qt['coeff'] else: print_err('Warning: merging multiple values quadratic terms between {},{}'.format(i,j)) quadratic_terms[pair] = quadratic_terms[pair] + qt['coeff'] sites = max(data['variable_ids'])+1 if len(data['variable_ids']) > 0 else 0 lines = len(data['linear_terms']) + len(data['quadratic_terms']) print2out('{} {}'.format(sites, lines)) for lt in data['linear_terms']: print2out('{} {} {}'.format(lt['id'], lt['id'], lt['coeff'])) for (i,j) in sorted(quadratic_terms.keys()): v = quadratic_terms[(i,j)] print2out('{} {} {}'.format(i, j, v))
[docs]def bqpjson_to_qubo(data, out_stream): validate(data) print2out = functools.partial(print, file=out_stream) if data['variable_domain'] == 'spin': print_err('Error: unable to generate qubo data file, only boolean domains are supported by qubo') quit() print2out('c id : {}'.format(data['id'])) if 'description' in data: print2out('c description : {}'.format(data['description'])) print2out('c ') print2out('c scale : {}'.format(data['scale'])) print2out('c offset : {}'.format(data['offset'])) print2out('c ') for k in sorted(data['metadata']): print2out('c {} : {}'.format(k, json.dumps(data['metadata'][k], sort_keys=True))) if len(data['metadata']): print2out('c ') max_index = max(data['variable_ids'])+1 if len(data['variable_ids']) > 0 else 0 num_diagonals = len(data['linear_terms']) num_elements = len(data['quadratic_terms']) print2out('p qubo 0 {} {} {}'.format(max_index, num_diagonals, num_elements)) print2out('c linear terms') for term in data['linear_terms']: print2out('{} {} {}'.format(term['id'], term['id'], term['coeff'])) print2out('c quadratic terms') for term in data['quadratic_terms']: print2out('{} {} {}'.format(term['id_tail'], term['id_head'], term['coeff']))
[docs]def bqpjson_to_minizinc(data, out_stream): validate(data) print2out = functools.partial(print, file=out_stream) print2out('% id : {}'.format(data['id'])) if 'description' in data: print2out('% description : {}'.format(data['description'])) print2out('% ') for k in sorted(data['metadata']): print2out('% {} : {}'.format(k, json.dumps(data['metadata'][k], sort_keys=True))) print2out('') if data['variable_domain'] == 'boolean': print2out('set of int: Domain = {0,1};') elif data['variable_domain'] == 'spin': print2out('set of int: Domain = {-1,1};') else: print_err('Error: unknown variable domain') quit() print2out('float: offset = {};'.format(data['offset'])) print2out('float: scale = {};'.format(data['scale'])) # this does not work becuose minizinc requires "array index set must be contiguous range" #var_ids_str = [str(var_id) for var_id in data['variable_id']] #print2out('set of int: Vars = {{{}}};'.format(','.join(var_ids_str))) print2out('') mzn_var = {} for var_id in data['variable_ids']: mzn_var[var_id] = 'x{}'.format(var_id) print2out('var Domain: {};'.format(mzn_var[var_id])) #print2out('array[Vars] of var Domain: x;') objective_terms = [] for lt in data['linear_terms']: objective_terms.append('{}*{}'.format(lt['coeff'], mzn_var[lt['id']])) for qt in data['quadratic_terms']: objective_terms.append('{}*{}*{}'.format(qt['coeff'], mzn_var[qt['id_tail']], mzn_var[qt['id_head']])) # objective_terms = [] # for lt in data['linear_terms']: # objective_terms.append('{}*x[{}]'.format(lt['coeff'],lt['id'])) # for qt in data['quadratic_terms']: # objective_terms.append('{}*x[{}]*x[{}]'.format(qt['coeff'], qt['id_tail'], qt['id_head'])) print2out('') objective_expr = ' + '.join(objective_terms) if len(objective_terms) > 0 else '0' print2out('var float: objective = {};'.format(objective_expr)) print2out('') print2out('solve minimize objective;'.format(objective_expr)) print2out('') var_list = [] for var_id in data['variable_ids']: var_list.append(mzn_var[var_id]) print2out('output [show(scale*(objective + offset)), " - ", show(objective), " - ", show([{}])];'.format(', '.join(var_list)))
# print2out('') # print2out('output [show(objective), " - ", show(x)]')
[docs]def bqpjson_to_hfs(data, out_stream, chimera_cell_size=None, chimera_degree=None, precision=5): ''' Format description from (https://github.com/alex1770/QUBO-Chimera) The format of the instance-description file starts with a line giving the size of the Chimera graph. (Two numbers are given to specify an m x n rectangle, but currently only a square, m=n, is accepted.) The subsequent lines are of the form <Chimera vertex> <Chimera vertex> weight where <Chimera vertex> is specified by four numbers using the format, Chimera graph, C_N: Vertices are (x,y,o,i) 0<=x,y<N, 0<=o<2, 0<=i<4 Edge from (x,y,o,i) to (x',y',o',i') if (x,y)=(x',y'), o!=o', OR |x-x'|=1, y=y', o=o'=0, i=i', OR |y-y'|=1, x=x', o=o'=1, i=i' x,y are the horizontal,vertical co-ords of the K4,4 o=0..1 is the "orientation" (0=horizontally connected, 1=vertically connected) i=0..3 is the index within the "semi-K4,4"="bigvertex" There is an involution given by {x<->y o<->1-o} ''' validate(data) print2out = functools.partial(print, file=out_stream) if data['variable_domain'] == 'spin': print_err('Error: unable to generate hfs data file, only boolean domains are supported by this translator') quit() if len(data['variable_ids']) <= 0: print_err('WARNING: hfs data file with no data') print2out('0 0') return 0.0, 0.0 #quit() if 'chimera_cell_size' in data['metadata'] and chimera_cell_size == None: chimera_cell_size = data['metadata']['chimera_cell_size'] if 'chimera_degree' in data['metadata'] and chimera_degree == None: chimera_degree = data['metadata']['chimera_degree'] if chimera_cell_size == None: chimera_cell_size = 8 print_err('WARNING: chimera_cell_size parameter not found, assuming {}'.format(chimera_cell_size)) min_chimera_degree = int(math.ceil(math.sqrt(max(data['variable_ids'])/float(chimera_cell_size)))) if chimera_degree == None: chimera_degree = min_chimera_degree print_err('WARNING: chimera_degree parameter not found, assuming {}'.format(chimera_degree)) if chimera_degree < min_chimera_degree: print_err('Error: chimera_degree of {} was specified. However, the minimum chimera_degree of {} is required for a problem with a variable index of {}'.format(chimera_degree, min_chimera_degree, max(data['variable_ids']))) quit() chimera_cell_row_size = chimera_cell_size // 2 # These values are used to transform a variable index into a chimera # coordinate (x,y,o,i) # x - chimera_row # y - chimera_column # o - chimera_cell_column - indicates the first or the second row of a chimera cell # i - chimera_cell_column_id - indicates ids within a chimera cell # Note that knowing the size of source chimera graph is essential to doing this mapping correctly # chimera_cell_column = {index: (index % chimera_cell_size) // chimera_cell_row_size for index in data['variable_ids']} chimera_cell_column_id = {index:index % chimera_cell_row_size for index in data['variable_ids']} chimera_cell = {index:index // chimera_cell_size for index in data['variable_ids']} chimera_row = {index:chimera_cell_id // chimera_degree for index, chimera_cell_id in chimera_cell.items()} chimera_column = {index:chimera_cell_id % chimera_degree for index, chimera_cell_id in chimera_cell.items()} chimera_coordinate = {index:(chimera_row[index], chimera_column[index], chimera_cell_column[index], chimera_cell_column_id[index]) for index in data['variable_ids']} chimera_degree_effective = max(max(chimera_row.values()), max(chimera_column.values())) + 1 print_err('hfs data parameters:') print_err(' chimera_cell_size: {}'.format(chimera_cell_size)) print_err(' chimera_cell_row_size: {}'.format(chimera_cell_row_size)) print_err(' chimera_degree: {}'.format(chimera_degree)) print_err(' chimera_degree_effective: {}'.format(chimera_degree_effective)) assert(chimera_degree_effective <= chimera_degree) max_abs_coeff = max(abs(t['coeff']) for t in itertools.chain(data['linear_terms'], data['quadratic_terms'])) scale = 10 ** precision / max_abs_coeff for term in itertools.chain(data['linear_terms'], data['quadratic_terms']): term['int_coeff'] = round(term['coeff'] * scale) print_err('INFO: scaling factor {} offset {}'.format(data['scale']/scale, data['offset']*scale)) # Output the hfs data file # it is a header followed by linear terms and then quadratic terms print2out('%d %d' % (chimera_degree_effective, chimera_degree_effective)) for lt in data['linear_terms']: idx = lt['id'] weight = lt['int_coeff'] args = chimera_coordinate[idx] + chimera_coordinate[idx] + tuple([weight]) print2out('%2d %2d %2d %2d %2d %2d %2d %2d %8d' % args) for qt in data['quadratic_terms']: idx_t = qt['id_tail'] idx_h = qt['id_head'] weight = qt['int_coeff'] args = chimera_coordinate[idx_t] + chimera_coordinate[idx_h] + tuple([weight]) print2out('%2d %2d %2d %2d %2d %2d %2d %2d %8d' % args) return data['scale']/scale, data['offset']*scale
# Greatest common divisor of more than 2 numbers def _gcd(*numbers): return functools.reduce(fractions.gcd, numbers) # Least common multiple of more than 2 numbers: def _lcm(*numbers): def lcm(a, b): return (a * b) // _gcd(a, b) return functools.reduce(lcm, numbers, 1)