VerityPy 1.1
Python library for Verity data profiling, quality control, remediation
remediateparsing.py
Go to the documentation of this file.
1#!/usr/bin/env python
2"""
3Remediate Parsing
4
5Remediates (i.e fixes) records that parse incorrectly leading to incorrect number of field values relative to number fields in schema.
6This has three types:
7 big: too many field values typically due to embedded delimiters in some field values
8 small1: too few fields by 1 which can either be an error or acceptable since some databases intentionally drop last field if it is empty setting it as null
9 small2: too few fields by 2 or more typically caused by line feeds in fields or problems exporting records across data system types
10This class detects and corrects these errors making new records that are correct
11"""
12
13__all__ = ['shift_array_entries','fix_record_field_split']
14__version__ = '1.0'
15__author__ = 'Geoffrey Malafsky'
16__email__ = 'gmalafsky@technikinterlytics.com'
17__date__ = '20240807'
18
19import math
20from VerityPy.processing import recfuncs, field
21
22DQ="\""
23
24def shift_array_entries(origvals:list, recflds:list, hashrecflds:dict, pinflds:list)->list:
25 """
26 Shifts array of parsed field values to correct having more values than defined fields.
27 Uses algorithm assessing best field and field + 1 (index positions in record) to join
28 based on datatypes, formats and known patterns of root causes of this error.
29
30 * origvals: array of parsed field values
31 * recflds: list of Field object containing datatype and format specifications
32 * hashrecflds: Dictionary of field title lowercase to its array index
33 * pinflds: optional list of field titles that are pinned meaning their position cannot be changed
34
35 returns: new array of parsed field values. If error, 0th entry starts with notok:
36 """
37
38 txt:str=""
39 txt1:str=""
40 elem:str=""
41 elemval:str=""
42 ndelta:int=-1
43 nmaxdt:int=0
44 nmaxf:int=0
45 nindexdt:int=-1
46 nindexf:int=-1
47 ndt:int=0
48 nf:int=0
49 n1:int=-1
50 n2:int=-1
51 n3:int=-1
52 n4:int=-1
53 ntot:int=0
54 nflds:int=0
55 nmaxfthresh:int=0
56 ndist:int=-1
57 dval:float=0
58 didmove:bool=False
59 newvals:list=[]
60 outvals:list=[]
61 pinrecs:list=[]
62 pinindex:list=[]
63 hashpinindex:dict={}
64 pinshiftscores:list=[]
65 joinstr:str=""
66 try:
67 nflds= len(recflds)
68 ndelta= len(origvals)- nflds
69 if ndelta<=0:
70 return origvals
71 if len(pinflds)>0:
72 for s in pinflds:
73 if s in hashrecflds:
74 pinindex.append(hashrecflds[s])
75 pinshiftscores.append(0)
76 if len(pinindex)>0:
77 pinshiftscores.append(0) # next to last index for equal distribution
78 pinshiftscores.append(0) # last index for staggered distribution
79 pinindex.sort()
80 for i in range(len(pinindex)):
81 hashpinindex[pinindex[i]]=i
82 for i in range(3):
83 pinrecs.append([])
84 for j in range(nflds):
85 pinrecs[i].append("")
86
87 # compute a threshhold of % fields with data types
88 dval= 0.6
89 if len(pinindex)==1:
90 dval= 0.8 # high threshold if there is a defined pinned
91 elif len(pinindex)>1:
92 dval= 0.72 # slightly lower if more than 1 pinned
93 n1 = round(dval * nflds,0)
94 nmaxfthresh= n1 if n1>=1 else 1
95
96 # do metric with specified data types first and then if necessary use pinned field
97 for h in range(nflds):
98 # begin with original record
99 # shift with current field pinned and calculate score of how many fields meet datatype
100 # move array entries above current field into it for number of extra fields in record
101 newvals=[]
102 for i in range(len(origvals)):
103 newvals.append(origvals[i])
104 txt= newvals[h]
105 for i in range(ndelta):
106 txt += joinstr + newvals[i+1+h]
107 newvals[h]=txt
108 n2= nflds-1
109 # move higher fields into lower ones with skip= #extra fields in record
110 # so, if current index=4, and nDelta=2 we moved indexes 5-6 into 4, and now 7->5 and 8->6
111 for i in range(h+1,n2+1):
112 n3=i+ndelta
113 txt= "" if n3>= len(newvals) else newvals[n3]
114 newvals[i]=txt
115
116 # calculate score by datatype match and format for real and string
117 ndt=0
118 nf=0
119 for i in range(nflds):
120 elemval=newvals[i]
121 if len(elemval)>0 and len(recflds[i].datatype)>0:
122 if recflds[i].datatype.startswith("date"):
123 if len(recflds[i].fmt_date)>0 and recfuncs.is_field_its_datatype(recflds[i].datatype, elemval, recflds[i].fmt_date):
124 ndt += 1
125 elif recfuncs.is_field_its_datatype(recflds[i].datatype, elemval):
126 ndt += 1
127 if recflds[i].datatype=="real":
128 if recflds[i].fmt_decimal>0:
129 if recfuncs.is_field_its_format(elemval, recflds[i], False).startswith("true"):
130 nf += 1
131 elif recflds[i].datatype=="string":
132 if recflds[i].fmt_strcase in ("upper","lower") or recflds[i].fmt_strlen>0:
133 if recfuncs.is_field_its_format(elemval, recflds[i], False).startswith("true"):
134 nf += 1
135
136 if ndt>nmaxdt:
137 nmaxdt=ndt
138 nindexdt=h
139
140 # weight score for datatype add addl if declared pinned
141 n2 = 10*ndt + round(10*nf/nflds)
142 if h in hashpinindex:
143 n2 += 10
144 if n2>nmaxf:
145 nmaxf=n2
146 nindexf=h
147
148 # newVals contains updated record field values although its Count is larger than # fields
149 # if no pinned fields or format score is above threshold make final shift
150 newvals=[]
151 for i in range(len(origvals)):
152 newvals.append(origvals[i])
153 if len(pinindex)==0 or nmaxf>=nmaxfthresh:
154 # use the single pivot pt from data type and format matching
155 n1= nindexf if nindexf>=0 else nindexdt
156 txt=newvals[n1]
157 for i in range(ndelta):
158 txt += joinstr + newvals[i+1+n1]
159 newvals[n1]=txt
160 n2= nflds-1
161 # move higher fields into lower ones with skip= #extra fields in record
162 # so, if current index=4, and nDelta=2 we moved indexes 5-6 into 4, and now 7->5 and 8->6
163 for i in range(n1+1,n2+1):
164 n3= i + ndelta
165 txt="" if n3>= len(newvals) else newvals[n3]
166 newvals[i]=txt
167 didmove=True
168 elif len(pinindex)==1:
169 didmove=True
170 n1=pinindex[0]
171 txt=newvals[n1]
172 for i in range(ndelta):
173 txt += joinstr + newvals[i+1+n1]
174 newvals[n1]=txt
175 n2= nflds-1
176 # move higher fields into lower ones with skip= #extra fields in record
177 for i in range(n1+1,n2+1):
178 n3= i + ndelta
179 txt="" if n3>= len(newvals) else newvals[n3]
180 newvals[i]=txt
181 elif len(pinindex)>1:
182 # if delta fields > # pinned fields we will test equally alloting shifting to each
183 # as well as scoring assigning all shifting to one and
184 # staggered priority shifting where we start from lowest index and shift no more than to next pin index
185 nmaxdt = -1
186 nmaxf = -1
187 nindexdt = -1
188 nindexf = -1
189 # 2 addl tests above #pinned fields
190 for h in range(len(pinindex)+2):
191 newvals=[]
192 for i in range(len(origvals)):
193 newvals.append(origvals[i])
194 ntot=0
195
196 if h< len(pinindex):
197 n1=pinindex[h]
198 txt=newvals[n1]
199 for i in range(ndelta):
200 txt += joinstr + newvals[i+1+n1]
201 newvals[n1]=txt
202 n2= nflds-1
203 for i in range(n1+1,n2+1):
204 n3= i + ndelta
205 txt="" if n3>= len(newvals) else newvals[n3]
206 newvals[i]=txt
207
208 ndt=0
209 nf=0
210 for i in range(nflds):
211 elemval= newvals[i]
212 if len(recflds[i].datatype)>0:
213 if recfuncs.is_field_its_datatype(recflds[i].datatype, elemval, recflds[i].fmt_date):
214 ndt += 1
215 if recflds[i].datatype=="real":
216 if recflds[i].fmt_decimal>0:
217 if recfuncs.is_field_its_format(elemval, recflds[i], False).startswith("true"):
218 nf += 1
219 elif recflds[i].datatype=="string":
220 if recflds[i].fmt_strcase in ("upper","lower") or recflds[i].fmt_strlen>0:
221 if recfuncs.is_field_its_format(elemval, recflds[i], False).startswith("true"):
222 nf += 1
223 if ndt>nmaxdt:
224 nmaxdt=ndt
225 nindexdt=h
226 n2= (10*ndt) + nf
227 if n2>nmaxf:
228 nmaxf=n2
229 nindexf=h
230
231 for i in range(nflds):
232 pinrecs[0][i]=newvals[i]
233
234 elif h==len(pinindex):
235 # equal allotment if # pinned is at least # delta fields
236 if len(pinindex)>= ndelta:
237 ndist= math.floor(ndelta/len(pinindex))
238 for p in range(len(pinindex)):
239 n1=pinindex[p]
240 txt=newvals[n1]
241 n4= ndist if p<len(pinindex)-1 else ndelta-ntot
242 for i in range(1, n4+1):
243 ntot += 1
244 txt += joinstr + newvals[i+n1]
245 newvals[n1]=txt
246 n2=nflds-1
247 for i in range(n1+1,n2+1):
248 n3=i+n4
249 txt= "" if n3>= len(newvals) else newvals[n3]
250 newvals[i]=txt
251
252 ndt=0
253 nf=0
254 for i in range(nflds):
255 elemval=newvals[i]
256 if len(recflds[i].datatype)>0:
257 if recfuncs.is_field_its_datatype(recflds[i].datatype, elemval, recflds[i].fmt_date):
258 ndt += 1
259 if recflds[i].datatype=="real":
260 if recflds[i].fmt_decimal>0:
261 if recfuncs.is_field_its_format(elemval, recflds[i], False).startswith("true"):
262 nf += 1
263 elif recflds[i].datatype=="string":
264 if recflds[i].fmt_strcase in ("upper","lower") or recflds[i].fmt_strlen>0:
265 if recfuncs.is_field_its_format(elemval, recflds[i], False).startswith("true"):
266 nf += 1
267
268 if ndt>nmaxdt:
269 nmaxdt=ndt
270 nindexdt=h
271 n2= (10*ndt) + nf
272 if n2>nmaxf:
273 nmaxf=n2
274 nindexf=h
275
276 for i in range(nflds):
277 pinrecs[1][i]=newvals[i]
278
279 elif h==len(pinindex)+1:
280 # staggered
281 for p in range(len(pinindex)):
282 n1=pinindex[p]
283 txt=newvals[n1]
284 # variable delta between pinned fields
285 n4= (pinindex[p+1]-pinindex[p]-1) if p<len(pinindex)-1 else ndelta-ntot
286 for i in range(1, n4+1):
287 ntot += 1
288 txt += joinstr + newvals[i+n1]
289 newvals[n1]=txt
290 n2=nflds-1
291 for i in range(n1+1,n2+1):
292 n3=i+n4
293 txt= "" if n3>= len(newvals) else newvals[n3]
294 newvals[i]=txt
295
296 ndt=0
297 nf=0
298 for i in range(nflds):
299 elemval=newvals[i]
300 if len(recflds[i].datatype)>0:
301 if recfuncs.is_field_its_datatype(recflds[i].datatype, elemval, recflds[i].fmt_date):
302 ndt += 1
303 if recflds[i].datatype=="real":
304 if recflds[i].fmt_decimal>0:
305 if recfuncs.is_field_its_format(elemval, recflds[i], False).startswith("true"):
306 nf += 1
307 elif recflds[i].datatype=="string":
308 if recflds[i].fmt_strcase in ("upper","lower") or recflds[i].fmt_strlen>0:
309 if recfuncs.is_field_its_format(elemval, recflds[i], False).startswith("true"):
310 nf += 1
311
312 if ndt>nmaxdt:
313 nmaxdt=ndt
314 nindexdt=h
315 n2= (10*ndt) + nf
316 if n2>nmaxf:
317 nmaxf=n2
318 nindexf=h
319
320 for i in range(nflds):
321 pinrecs[2][i]=newvals[i]
322
323 # final move
324 if nindexf>=0:
325 newvals=[]
326 if nindexf<len(pinindex):
327 didmove=True
328 for i in range(len(pinrecs[0])):
329 newvals.append(pinrecs[0][i])
330 elif nindexf==len(pinindex):
331 didmove=True
332 for i in range(len(pinrecs[1])):
333 newvals.append(pinrecs[1][i])
334 elif nindexf==len(pinindex)+1:
335 didmove=True
336 for i in range(len(pinrecs[2])):
337 newvals.append(pinrecs[2][i])
338
339 # assign result array
340 if didmove:
341 for i in range(nflds):
342 outvals.append(newvals[i])
343 else:
344 for i in range(nflds):
345 outvals.append(origvals[i])
346
347 except (RuntimeError, OSError, ValueError) as err:
348 outvals.insert(0, "notok:" + str(err))
349 return outvals
350
351def fix_record_field_split(settings:dict, srcfields:list, srcrecs:list)->list:
352 """Fix Record Field Splitting
353
354 Repairs records with parsing problems having too many or too few field values relative to defined
355 number of fields. This occurs due to embedded delimiters in some field values causing too many parsed values,
356 and records broken across multiple lines causing too few values. These situations are categorized into 3 types:
357 1) big (too many parsed values), 2) small1 (1 too few values), 3) small2 (2 or more too few values). small1 is a
358 special case since some data systems purposefully eliminate the fnal field value in a record if it is empty by
359 making it null thereby saving storage and memory space. In this case, the record is actually fine but is missing its final
360 field value. This can be accepted by having the setting 'allowLastEmpty' = TRUE leading to a default value assigned
361 to this last record field based on datatype: int and real assigned 0, bool assigned FALSE, others assigned empty string.
362
363 * settings: Dictionary of setting parameters. Both key and value are strings. Settings:
364 - allow_last_empty: bool whether to allow last field to be empty (i.e. small1 parsing) and assign it default value. Default is TRUE
365 - is_quoted: bool whether fields values may be enclosed by double quotes as is common when data exported from SpreadSheets and some databases. Default is FALSE.
366 - has_header: bool whether first non-empty record is a header row of delimited field titles. Default is FALSE.
367 - ignore_empty: bool whether to ignore empty records. Default is TRUE.
368 - pin_fields: field titles delimited by pipe (if more than 1) that are pinned meaning if record has too many fields (i.e. big) then these fields will not shifted as
369 the algorithm finds the best way to merge values to make corrected record
370 - ignore_start_str: string parts delimited by pipe (if more than 1) that will cause records starting with any one of them to be ignored. Always case insensitive.
371 A common use for this is to filter out comment lines such as those starting with # or // in which case set to #|//
372 - delim: name of delimiter to use to parse records: comma, pipe, tab, colon, caret. This is required.
373 - join_char: token (max 10 chars) or alias to insert when joining lines to remediate parsing. Default is to use nothing.
374 Aliases include: -comma-, -tab-, -pipe-, -space-, -bslash-, -fslash-, -lparen-, -rparen-,
375 -lcurly-, -rcurly-, -lsquare-, -rsquare-, -dblquote-, -mathpi-, -mathe-
376 * srcfields: list of Field objects comprising records
377 * srcrecs: list of string source records
378
379 returns: list of new records. If error, 0th entry starts with notok: otherwise 0th entry is string of stats,
380 entry[1] is header of field titles. For stats, examples may have prefix of (nline) with nline being the line number read
381 (excluding empty and comments lines) and is therefore 1 larger than the line's index
382 in the Python list (i.e. nline is 1 based while lists are 0-based).
383 """
384
385 txt:str=""
386 txt1:str=""
387 elem:str=""
388 recin:str=""
389 recin_lc:str=""
390 recout:str=""
391 delim:str=""
392 delimchar:str=""
393 join_char:str=""
394 nflds_act:int=0
395 nflds_new:int=0
396 ndelta:int=-1
397 nflds_prior:int=0
398 lrecsin:int=0
399 lrecsout:int=0
400 lbig:int=0
401 lsm1:int=0
402 lsm2:int=0
403 ljoin:int=0
404 lempty:int=0
405 lignore:int=0
406 lfix:int=0
407 lnotfix:int=0
408 doing_join:bool=False
409 allow_last_empty:bool=False
410 isquoted:bool=False
411 hasheader:bool=False
412 ignore_empty:bool=False
413 userec:bool=False
414 didsave:bool=False
415 fldvals:list=[]
416 fldvalsprior:list=[]
417 pinflds:list=[]
418 temp:list=[]
419 ignore_start_str:list=[]
420 outrecs:list=[]
421 fld_is_datatype:list=[]
422 hashflds:dict={}
423 hashpinflds:dict={}
424
425 try:
426 nflds_act=len(srcfields)
427 if nflds_act==0:
428 raise ValueError("no fields supplied")
429 if len(srcrecs)==0:
430 raise ValueError("no records supplied")
431 for i in range(nflds_act):
432 if not isinstance(srcfields[i], field.Field):
433 raise ValueError("source field is not Field object at index " + str(i))
434 elem=srcfields[i].title.lower().strip()
435 if len(elem)==0:
436 raise ValueError("source field title is empty at index " + str(i))
437 if elem in hashflds:
438 raise ValueError("duplicate source field title " + elem)
439 hashflds[elem]=i
440 srcfields[i].title=srcfields[i].title.strip()
441 srcfields[i].datatype=srcfields[i].datatype.lower().strip()
442
443 for k,v in settings.items():
444 if isinstance(k,str):
445 txt= k.lower()
446 txt1= str(v).lower()
447 if txt in ("allow_last_empty","allowlastempty"):
448 allow_last_empty= txt1=="true"
449 elif txt in ("isquoted","is_quoted"):
450 isquoted= txt1=="true"
451 elif txt in ("hasheader","has_header"):
452 hasheader= txt1=="true"
453 elif txt in ("ignore_empty","ignoreempty"):
454 ignore_empty= txt1=="true"
455 elif txt in ("ignore_start_str","ignorestartstr"):
456 temp=txt1.split("|")
457 for s in temp:
458 txt=s.strip()
459 if len(txt)>0:
460 ignore_start_str.append(txt)
461 elif txt in ("pin_fields","pinfields"):
462 temp=txt1.split("|")
463 for s in temp:
464 txt=s.strip()
465 if len(txt)>0 and txt not in hashpinflds:
466 pinflds.append(txt)
467 hashpinflds[txt]= len(pinflds)-1
468 elif txt =="delim":
469 if len(txt1)==0:
470 raise ValueError("empty delim specified")
471
472 delimchar= recfuncs.delim_get_char(txt1)
473 if delimchar.startswith("notok:"):
474 raise ValueError(delimchar[6:])
475 elif len(delimchar)==0:
476 raise ValueError("unknown delim specified:" + txt1)
477 elif txt in ("join_char","joinchar"):
478 join_char= txt1
479 if join_char.startswith("-") and join_char.endswith("-"):
480 join_char= recfuncs.convert_char_aliases(join_char)
481 else:
482 if len(join_char)>10:
483 join_char=join_char[:10]
484
485 if len(delimchar)==0:
486 raise ValueError("no delim specified:" + txt1)
487
488 recout=""
489 for i in range(nflds_act):
490 if i>0:
491 recout += delimchar
492 recout += srcfields[i].title
493 outrecs.append(recout)
494
495 for nidx in range(len(srcrecs)):
496 recin= srcrecs[nidx]
497 userec=True
498 if len(recin.strip())==0:
499 lempty += 1
500 userec= not ignore_empty
501 elif len(ignore_start_str)>0:
502 recin_lc= recin.lower()
503 for s in ignore_start_str:
504 if s in recin_lc:
505 userec=False
506 lignore += 1
507 break
508
509 if userec:
510 lrecsin += 1
511 didsave=False
512 if lrecsin>1 or not hasheader:
513 if isquoted and DQ in recin:
514 fldvals= recfuncs.split_quoted_line(recin, delimchar)
515 if len(fldvals)>0 and fldvals[0].startswith("notok:"):
516 raise ValueError("error splitting line index " + str(nidx) + ":" + fldvals[6:])
517 else:
518 fldvals= recin.split(delimchar)
519 nflds_new= len(fldvals)
520 ndelta= nflds_new-nflds_act
521
522 if doing_join:
523 if ndelta>=0 or nflds_prior+nflds_new-1 > nflds_act:
524 # problem since either current record is complete and therefore cannot be used to add to prior record
525 # or joining will create too many fields so keep prior as is despite it missing some values
526 # add empty values to complete all fields
527 for i in range(nflds_act-nflds_prior):
528 fldvalsprior.append("")
529 recout= delimchar.join(fldvalsprior)
530 outrecs.append(recout)
531 lrecsout += 1
532 doing_join=False
533 didsave=True
534 lnotfix += 1
535 else:
536 ljoin += 1
537 if ndelta==-1:
538 lsm1 +=1
539 else:
540 lsm2 +=1
541
542 for i in range(nflds_new):
543 if i==0:
544 fldvalsprior[nflds_prior-1]+= join_char + fldvals[i]
545 else:
546 fldvalsprior.append(fldvals[i])
547
548 # calc new ndelta
549 ndelta= len(fldvalsprior)- nflds_act
550 if ndelta==0:
551 recout= delimchar.join(fldvalsprior)
552 outrecs.append(recout)
553 lrecsout += 1
554 doing_join=False
555 didsave=True
556 lfix += 1
557
558 if not doing_join and not didsave:
559 if ndelta==0:
560 outrecs.append(recin)
561 lrecsout += 1
562 didsave=True
563 elif ndelta>0:
564 lbig += 1
565 temp= shift_array_entries(fldvals, srcfields, hashflds, pinflds)
566 if len(temp)== nflds_act:
567 lfix += 1
568 else:
569 lnotfix += 1
570 recout= delimchar.join(temp)
571 outrecs.append(recout)
572 lrecsout += 1
573 didsave=True
574 elif ndelta== -1 and allow_last_empty:
575 recout = recin + delimchar
576 outrecs.append(recout)
577 lrecsout += 1
578 didsave=True
579 elif ndelta<0:
580 if ndelta== -1:
581 lsm1 +=1
582 else:
583 lsm2 +=1
584 if nidx==len(srcrecs)-1:
585 # cannot add more records so this one fails
586 for i in range(nflds_act-nflds_new):
587 fldvalsprior.append("")
588 recout= delimchar.join(fldvalsprior)
589 outrecs.append(recout)
590 lrecsout += 1
591 lnotfix += 1
592 didsave=True
593 else:
594 nflds_prior=nflds_new
595 doing_join=True
596 fldvalsprior=[]
597 fldvalsprior=fldvals
598
599
600 txt = "ok:recin=" + str(lrecsin) + ",recout=" + str(lrecsout) + ",big=" + str(lbig) + ",small1=" + str(lsm1) + ",small2=" + str(lsm2) + ",join=" + str(ljoin) + ",fix=" + str(lfix) + ",notfix=" + str(lnotfix)
601 outrecs.insert(0,txt)
602 except (RuntimeError, OSError, ValueError) as err:
603 outrecs.insert(0,"notok:" + str(err))
604 return outrecs
605
list fix_record_field_split(dict settings, list srcfields, list srcrecs)
list shift_array_entries(list origvals, list recflds, dict hashrecflds, list pinflds)