Coverage for mlprodict/tools/code_helper.py: 100%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

89 statements  

1""" 

2@file 

3@brief A couple of tools unrelated to what the package does. 

4""" 

5import pickle 

6import re 

7import types 

8import numpy 

9 

10 

11def numpy_min_max(x, fct, minmax=False): 

12 """ 

13 Returns the minimum of an array. 

14 Deals with text as well. 

15 """ 

16 try: 

17 if hasattr(x, 'todense'): 

18 x = x.todense() 

19 if (x.dtype.kind[0] not in 'Uc' or 

20 x.dtype in {numpy.uint8}): 

21 return fct(x) 

22 try: # pragma: no cover 

23 x = x.ravel() 

24 except AttributeError: # pragma: no cover 

25 pass 

26 keep = list(filter(lambda s: isinstance(s, str), x)) 

27 if len(keep) == 0: # pragma: no cover 

28 return numpy.nan 

29 keep.sort(reverse=minmax) 

30 val = keep[0] 

31 if len(val) > 10: # pragma: no cover 

32 val = val[:10] + '...' 

33 return "%r" % val 

34 except (ValueError, TypeError, AttributeError): 

35 return '?' 

36 

37 

38def numpy_min(x): 

39 """ 

40 Returns the maximum of an array. 

41 Deals with text as well. 

42 """ 

43 return numpy_min_max(x, lambda x: x.min(), minmax=False) 

44 

45 

46def numpy_max(x): 

47 """ 

48 Returns the maximum of an array. 

49 Deals with text as well. 

50 """ 

51 return numpy_min_max(x, lambda x: x.max(), minmax=True) 

52 

53 

54def debug_dump(clname, obj, folder=None, ops=None): 

55 """ 

56 Dumps an object for debug purpose. 

57 

58 @param clname class name 

59 @param obj object 

60 @param folder folder 

61 @param ops operator to dump 

62 @return filename 

63 """ 

64 def debug_print_(obj, prefix=''): 

65 name = clname 

66 if isinstance(obj, dict): 

67 if 'in' in obj and 'out' in obj: 

68 nan_in = any(map(lambda o: any(map(numpy.isnan, o.ravel())), 

69 obj['in'])) 

70 nan_out = any(map(lambda o: any(map(numpy.isnan, o.ravel())), 

71 obj['out'])) 

72 if not nan_in and nan_out: 

73 print("NAN-notin-out ", name, prefix, 

74 {k: getattr(ops, k, '?') for k in getattr(ops, 'atts', {})}) 

75 return True 

76 return False # pragma: no cover 

77 for k, v in obj.items(): # pragma: no cover 

78 debug_print_([v], k) 

79 return None # pragma: no cover 

80 if isinstance(obj, list): 

81 for i, o in enumerate(obj): 

82 if o is None: 

83 continue 

84 if any(map(numpy.isnan, o.ravel())): 

85 print("NAN", prefix, i, name, o.shape) 

86 return None 

87 raise NotImplementedError( # pragma: no cover 

88 "Unable to debug object of type {}.".format(type(obj))) 

89 

90 dump = debug_print_(obj) 

91 if dump: 

92 name = 'cpu-{}-{}-{}.pkl'.format( 

93 clname, id(obj), id(ops)) 

94 if folder is not None: 

95 name = "/".join([folder, name]) 

96 with open(name, 'wb') as f: 

97 pickle.dump(obj, f) 

98 return name 

99 return None 

100 

101 

102def debug_print(k, obj, printed): 

103 """ 

104 Displays informations on an object. 

105 

106 @param k name 

107 @param obj object 

108 @param printed memorizes already printed object 

109 """ 

110 if k not in printed: 

111 printed[k] = obj 

112 if hasattr(obj, 'shape'): 

113 print("-='{}' shape={} dtype={} min={} max={}{}".format( 

114 k, obj.shape, obj.dtype, numpy_min(obj), 

115 numpy_max(obj), 

116 ' (sparse)' if 'coo_matrix' in str(type(obj)) else '')) 

117 elif (isinstance(obj, list) and len(obj) > 0 and 

118 not isinstance(obj[0], dict)): # pragma: no cover 

119 print("-='{}' list len={} min={} max={}".format( 

120 k, len(obj), min(obj), max(obj))) 

121 else: # pragma: no cover 

122 print("-='{}' type={}".format(k, type(obj))) 

123 

124 

125def make_callable(fct, obj, code, gl, debug): 

126 """ 

127 Creates a callable function able to 

128 cope with default values as the combination 

129 of functions *compile* and *exec* does not seem 

130 able to take them into account. 

131 

132 @param fct function name 

133 @param obj output of function *compile* 

134 @param code code including the signature 

135 @param gl context (local and global) 

136 @param debug add debug function 

137 @return callable functions 

138 """ 

139 cst = "def " + fct + "(" 

140 sig = None 

141 for line in code.split('\n'): 

142 if line.startswith(cst): 

143 sig = line 

144 break 

145 if sig is None: # pragma: no cover 

146 raise ValueError( 

147 "Unable to find function '{}' in\n{}".format(fct, code)) 

148 reg = re.compile( 

149 "([a-z][A-Za-z_0-9]*)=((None)|(False)|(True)|([0-9.e+-]+))") 

150 fall = reg.findall(sig) 

151 defs = [] 

152 for name_value in fall: 

153 name = name_value[0] 

154 value = name_value[1] 

155 if value == 'None': 

156 defs.append((name, None)) 

157 continue 

158 if value == 'True': 

159 defs.append((name, True)) 

160 continue 

161 if value == 'False': 

162 defs.append((name, False)) 

163 continue 

164 f = float(value) 

165 if int(f) == f: 

166 f = int(f) 

167 defs.append((name, f)) 

168 

169 # debug 

170 if debug: 

171 gl = gl.copy() 

172 gl['debug_print'] = debug_print 

173 gl['print'] = print 

174 # specific 

175 if "value=array([0.], dtype=float32)" in sig: 

176 defs.append(('value', numpy.array([0.], dtype=numpy.float32))) 

177 res = types.FunctionType(obj, gl, fct, tuple(_[1] for _ in defs)) 

178 if res.__defaults__ != tuple(_[1] for _ in defs): # pylint: disable=E1101 

179 # See https://docs.python.org/3/library/inspect.html 

180 # See https://stackoverflow.com/questions/11291242/python-dynamically-create-function-at-runtime 

181 lines = [str(sig)] # pragma: no cover 

182 for name in ['co_argcount', 'co_cellvars', 'co_code', 'co_consts', 'co_filename', 

183 'co_firstlineno', 'co_flags', 'co_freevars', 'co_kwonlyargcount', 

184 'co_lnotab', 'co_name', 'co_names', 'co_nlocals', 'co_stacksize', 

185 'co_varnames']: # pragma: no cover 

186 v = getattr(res.__code__, name, None) # pylint: disable=E1101 

187 if v is not None: 

188 lines.append('%s=%r' % (name, v)) 

189 raise RuntimeError( # pragma: no cover 

190 "Defaults values of function '{}' (defaults={}) are missing.\nDefault: " 

191 "{}\n{}\n----\n{}".format( 

192 fct, res.__defaults__, defs, "\n".join(lines), code)) # pylint: disable=E1101 

193 return res 

194 

195 

196def print_code(code, begin=1): 

197 """ 

198 Returns the code with line number. 

199 """ 

200 rows = code.split("\n") 

201 return "\n".join("%03d %s" % (i + begin, s) 

202 for i, s in enumerate(rows))