Datasets:
Tasks:
Text Generation
Modalities:
Text
Formats:
parquet
Languages:
English
Size:
10K - 100K
License:
| import inspect | |
| from io import StringIO | |
| import json | |
| import re | |
| import signal | |
| import sys | |
| import types | |
| from autoimport.services import fix_code | |
| from transformers.utils import chat_template_utils as ctu | |
| from datasets import load_dataset | |
| def _convert_type_hints_to_json_schema(func: callable, include_return_type: bool = False) -> dict: | |
| type_hints = ctu.get_type_hints(func) | |
| if not include_return_type: | |
| type_hints.pop("return", None) | |
| signature = inspect.signature(func) | |
| required = [] | |
| for param_name, param in signature.parameters.items(): | |
| if param.annotation == inspect.Parameter.empty: | |
| raise ctu.TypeHintParsingException(f"Argument {param.name} is missing a type hint in function {func.__name__}") | |
| if param.default == inspect.Parameter.empty: | |
| required.append(param_name) | |
| properties = {} | |
| for param_name, param_type in type_hints.items(): | |
| properties[param_name] = ctu._parse_type_hint(param_type) | |
| schema = {"type": "object", "properties": properties} | |
| if required: | |
| schema["required"] = required | |
| return schema | |
| def get_json_schema(func: callable, include_return_type: bool = False) -> dict: | |
| doc = inspect.getdoc(func) | |
| if not doc: | |
| raise ctu.DocstringParsingException( | |
| f"Cannot generate JSON schema for {func.__name__} because it has no docstring!" | |
| ) | |
| doc = doc.strip() | |
| main_doc, param_descriptions, return_doc = ctu.parse_google_format_docstring(doc) | |
| json_schema = _convert_type_hints_to_json_schema(func, include_return_type=include_return_type) | |
| if (return_dict := json_schema["properties"].pop("return", None)) is not None: | |
| if return_doc is not None: # We allow a missing return docstring since most templates ignore it | |
| return_dict["description"] = return_doc | |
| for arg, schema in json_schema["properties"].items(): | |
| if arg not in param_descriptions: | |
| raise ctu.DocstringParsingException( | |
| f"Cannot generate JSON schema for {func.__name__} because the docstring has no description for the argument '{arg}'" | |
| ) | |
| desc = param_descriptions[arg] | |
| enum_choices = re.search(r"\(choices:\s*(.*?)\)\s*$", desc, flags=re.IGNORECASE) | |
| if enum_choices: | |
| schema["enum"] = [c.strip() for c in json.loads(enum_choices.group(1))] | |
| desc = enum_choices.string[: enum_choices.start()].strip() | |
| schema["description"] = desc | |
| output = {"name": func.__name__, "description": main_doc, "parameters": json_schema} | |
| if return_dict is not None: | |
| output["return"] = return_dict | |
| return {"type": "function", "function": output} | |
| def clean_code_block(text: str) -> str: | |
| pattern = r'^```python\s*\n(.*?)\n```$' | |
| # Use re.DOTALL flag to make . match newlines as well | |
| match = re.search(pattern, text, re.DOTALL) | |
| if match: | |
| # If there's a match, return the content within the code block | |
| return match.group(1) | |
| else: | |
| # If there's no match, return the original text | |
| return text | |
| def validate(func_string: str) -> dict[str, any] | str: | |
| local_dict = {} | |
| # Execute the function string, sending the content to a local dictionary | |
| original_stdout = sys.stdout | |
| sys.stdout = StringIO() | |
| # Set the timeout | |
| timeout = 3 | |
| def timeout_handler(signum, frame): | |
| raise TimeoutError("Code execution timed out") | |
| # Set up the alarm | |
| signal.signal(signal.SIGALRM, timeout_handler) | |
| signal.alarm(timeout) | |
| try: | |
| exec(func_string, globals(), local_dict) | |
| except Exception as e: | |
| return str(e) | |
| except SystemExit as e: | |
| return str(e) | |
| except TimeoutError as e: | |
| return str(e) | |
| finally: | |
| signal.alarm(0) | |
| # Capture the output | |
| output = sys.stdout.getvalue() | |
| # Restore the original stdout | |
| sys.stdout = original_stdout | |
| # Grab the first function from the dictionary that is of type FunctionType | |
| functions = [v for v in local_dict.values() if isinstance(v, types.FunctionType)] | |
| if functions: | |
| function = functions[0] | |
| else: | |
| return "No function" | |
| try: | |
| return get_json_schema(function) | |
| except Exception as e: | |
| return str(e) | |
| def clean(row): | |
| row["generation"] = clean_code_block(row["generation"]) | |
| try: | |
| row["generation"] = fix_code(row["generation"]) | |
| except: | |
| row["generation"] = row["generation"] | |
| validation = validate(row["generation"]) | |
| if isinstance(validation, str): | |
| extra = {"parsing_error": validation, "tool": ""} | |
| else: | |
| extra = {"parsing_error": "", "tool": json.dumps(validation)} | |
| row.update(**extra) | |
| return row | |
| ds = load_dataset("argilla-warehouse/python-lib-tools-v0.1", split="train") | |
| ds_cleaned = ds.map(clean, num_proc=8) | |
| ds_cleaned.push_to_hub("argilla-warehouse/python-lib-tools-validated-v0.1") |