Skip to content

Flow

Flow exporter.

FlowExporter

FlowExporter(
    waldiez: Waldiez,
    output_dir: Path | None,
    for_notebook: bool,
    context: Optional[ExporterContext] = None,
    **kwargs: Any
)

Bases: Exporter[FlowExtras]

Flow exporter.

Parameters:

NameTypeDescriptionDefault
waldiezWaldiez

The Waldiez instance containing the flow data.

required
output_dirPath

The directory where the exported flow will be saved.

required
for_notebookbool

Whether the export is intended for a notebook environment.

required
contextOptional[ExporterContext]

Exporter context with dependencies, by default None

None
**kwargsAny

Additional keyword arguments for the exporter.

{}
Source code in waldiez/exporting/flow/exporter.py
def __init__(
    self,
    waldiez: Waldiez,
    output_dir: Path | None,
    for_notebook: bool,
    context: Optional[ExporterContext] = None,
    **kwargs: Any,
) -> None:
    """Initialize the chats exporter.

    Parameters
    ----------
    waldiez : Waldiez
        The Waldiez instance containing the flow data.
    output_dir : Path
        The directory where the exported flow will be saved.
    for_notebook : bool
        Whether the export is intended for a notebook environment.
    context : Optional[ExporterContext], optional
        Exporter context with dependencies, by default None
    **kwargs : Any
        Additional keyword arguments for the exporter.
    """
    super().__init__(context, **kwargs)

    self.waldiez = waldiez
    self.output_dir = Path(output_dir) if output_dir is not None else None
    self.flow_config = self.context.get_config(
        name=waldiez.name,
        description=waldiez.description,
        requirements=waldiez.requirements or [],
        tags=waldiez.tags or [],
        output_extension="ipynb" if for_notebook else "py",
        is_async=waldiez.is_async,
        output_directory=str(self.output_dir) if self.output_dir else None,
        cache_seed=waldiez.cache_seed,
    )
    self._extras = self._generate_extras()

extras property

extras: FlowExtras

Get the flow exporter extras.

Returns:

TypeDescription
dict[str, Any]

The extras dictionary containing additional information for the flow exporter.

generate_main_content

generate_main_content() -> str

Generate the main content of the export.

Returns:

TypeDescription
str

The final executable script or notebook content.

Source code in waldiez/exporting/flow/exporter.py
def generate_main_content(self) -> str:
    """Generate the main content of the export.

    Returns
    -------
    str
        The final executable script or notebook content.
    """
    orchestrator = ExportOrchestrator(
        waldiez=self.waldiez,
        context=self.context,
    )
    merged_result = orchestrator.orchestrate()
    after_run = orchestrator.get_after_run_content()
    generator = FileGenerator(
        context=self.context,
    )
    return generator.generate(
        merged_result=merged_result,
        is_async=self.waldiez.is_async,
        after_run=after_run,
    )

create_flow_exporter

create_flow_exporter(
    waldiez: Waldiez,
    output_dir: Path | None,
    for_notebook: bool,
    context: Optional[ExporterContext] = None,
    **kwargs: Any
) -> FlowExporter

Create a flow exporter.

Parameters:

NameTypeDescriptionDefault
waldiezWaldiez

The Waldiez instance containing the flow data.

required
output_dirPath

The directory where the exported flow will be saved.

required
for_notebookbool

Whether the export is intended for a notebook environment.

required
contextOptional[ExporterContext]

Exporter context with dependencies, by default None

None
**kwargsAny

Additional keyword arguments for the exporter.

{}

Returns:

TypeDescription
ChatsExporter

The created chats exporter.

Source code in waldiez/exporting/flow/factory.py
def create_flow_exporter(
    waldiez: Waldiez,
    output_dir: Path | None,
    for_notebook: bool,
    context: Optional[ExporterContext] = None,
    **kwargs: Any,
) -> FlowExporter:
    """Create a flow exporter.

    Parameters
    ----------
    waldiez : Waldiez
        The Waldiez instance containing the flow data.
    output_dir : Path
        The directory where the exported flow will be saved.
    for_notebook : bool
        Whether the export is intended for a notebook environment.
    context : Optional[ExporterContext], optional
        Exporter context with dependencies, by default None
    **kwargs : Any
        Additional keyword arguments for the exporter.

    Returns
    -------
    ChatsExporter
        The created chats exporter.
    """
    if context is None:
        config = ExportConfig(
            name=waldiez.name,
            description=waldiez.description or "",
            tags=waldiez.tags or [],
            requirements=waldiez.requirements or [],
            output_extension="ipynb" if for_notebook else "py",
            is_async=waldiez.is_async,
            output_directory=output_dir,
            cache_seed=waldiez.cache_seed,
        )
        context = ExporterContext(
            config=config,
            serializer=DefaultSerializer(),
            path_resolver=DefaultPathResolver(),
            logger=WaldiezLogger(),
        )
    else:
        if not context.config:  # pragma: no cover
            context.config = ExportConfig(
                name=waldiez.name,
                description=waldiez.description or "",
                tags=waldiez.tags or [],
                requirements=waldiez.requirements or [],
                output_extension="ipynb" if for_notebook else "py",
                is_async=waldiez.is_async,
                output_directory=output_dir,
                cache_seed=waldiez.cache_seed,
            )
        else:
            context.config.update(
                name=waldiez.name,
                description=waldiez.description or "",
                tags=waldiez.tags or [],
                requirements=waldiez.requirements or [],
                output_extension="ipynb" if for_notebook else "py",
                is_async=waldiez.is_async,
                output_directory=output_dir,
                cache_seed=waldiez.cache_seed,
            )

    return FlowExporter(
        waldiez=waldiez,
        output_dir=output_dir,
        for_notebook=for_notebook,
        context=context,
        **kwargs,
    )

merger

Content merger for combining multiple export results.

MergeStatistics dataclass

MergeStatistics(
    total_results: int = 0,
    total_imports: int = 0,
    deduplicated_imports: int = 0,
    total_content_items: int = 0,
    total_env_vars: int = 0,
    deduplicated_env_vars: int = 0,
    conflicts_found: list[str] = list[str](),
)

Statistics about the merge operation.

ContentMerger

ContentMerger(context: Optional[ExporterContext] = None)

Intelligently merges multiple ExportResult objects.

Parameters:

NameTypeDescriptionDefault
contextOptional[ExporterContext]

The exporter context, by default None

None
Source code in waldiez/exporting/flow/merger.py
def __init__(self, context: Optional[ExporterContext] = None):
    """Initialize the content merger.

    Parameters
    ----------
    context : Optional[ExporterContext], optional
        The exporter context, by default None
    """
    self.context = context or ExporterContext()
    self.statistics = MergeStatistics()

merge_results

merge_results(results: list[ExportResult]) -> ExportResult

Merge multiple export results into one.

Parameters:

NameTypeDescriptionDefault
resultslist[ExportResult]

The export results to merge

required

Returns:

TypeDescription
ExportResult

The merged export result

Source code in waldiez/exporting/flow/merger.py
def merge_results(self, results: list[ExportResult]) -> ExportResult:
    """Merge multiple export results into one.

    Parameters
    ----------
    results : list[ExportResult]
        The export results to merge

    Returns
    -------
    ExportResult
        The merged export result
    """
    if not results:
        return ExportResult()

    if len(results) == 1:
        return results[0]

    # Initialize statistics
    self.statistics = MergeStatistics(total_results=len(results))

    # Create merged result
    merged = ExportResult()

    # 1. Merge imports with intelligent deduplication
    merged.imports = self._merge_imports(results)

    # 2. Merge positioned content with proper ordering
    merged.positioned_content = self._merge_positioned_content(results)

    # 3. Merge environment variables with conflict detection
    merged.environment_variables = self._merge_environment_variables(
        results
    )

    # 4. Merge validation results
    merged.validation_result = self._merge_validation_results(results)

    # 5. Handle main content (typically empty for merged results)
    merged.main_content = self._merge_main_content(results)

    return merged

get_merge_statistics

get_merge_statistics() -> MergeStatistics

Get statistics about the last merge operation.

Returns:

TypeDescription
MergeStatistics

Statistics about the merge operation

Source code in waldiez/exporting/flow/merger.py
def get_merge_statistics(self) -> MergeStatistics:
    """Get statistics about the last merge operation.

    Returns
    -------
    MergeStatistics
        Statistics about the merge operation
    """
    return self.statistics

factory

Factory function for creating a FlowExporter instance.

create_flow_exporter

create_flow_exporter(
    waldiez: Waldiez,
    output_dir: Path | None,
    for_notebook: bool,
    context: Optional[ExporterContext] = None,
    **kwargs: Any
) -> FlowExporter

Create a flow exporter.

Parameters:

NameTypeDescriptionDefault
waldiezWaldiez

The Waldiez instance containing the flow data.

required
output_dirPath

The directory where the exported flow will be saved.

required
for_notebookbool

Whether the export is intended for a notebook environment.

required
contextOptional[ExporterContext]

Exporter context with dependencies, by default None

None
**kwargsAny

Additional keyword arguments for the exporter.

{}

Returns:

TypeDescription
ChatsExporter

The created chats exporter.

Source code in waldiez/exporting/flow/factory.py
def create_flow_exporter(
    waldiez: Waldiez,
    output_dir: Path | None,
    for_notebook: bool,
    context: Optional[ExporterContext] = None,
    **kwargs: Any,
) -> FlowExporter:
    """Create a flow exporter.

    Parameters
    ----------
    waldiez : Waldiez
        The Waldiez instance containing the flow data.
    output_dir : Path
        The directory where the exported flow will be saved.
    for_notebook : bool
        Whether the export is intended for a notebook environment.
    context : Optional[ExporterContext], optional
        Exporter context with dependencies, by default None
    **kwargs : Any
        Additional keyword arguments for the exporter.

    Returns
    -------
    ChatsExporter
        The created chats exporter.
    """
    if context is None:
        config = ExportConfig(
            name=waldiez.name,
            description=waldiez.description or "",
            tags=waldiez.tags or [],
            requirements=waldiez.requirements or [],
            output_extension="ipynb" if for_notebook else "py",
            is_async=waldiez.is_async,
            output_directory=output_dir,
            cache_seed=waldiez.cache_seed,
        )
        context = ExporterContext(
            config=config,
            serializer=DefaultSerializer(),
            path_resolver=DefaultPathResolver(),
            logger=WaldiezLogger(),
        )
    else:
        if not context.config:  # pragma: no cover
            context.config = ExportConfig(
                name=waldiez.name,
                description=waldiez.description or "",
                tags=waldiez.tags or [],
                requirements=waldiez.requirements or [],
                output_extension="ipynb" if for_notebook else "py",
                is_async=waldiez.is_async,
                output_directory=output_dir,
                cache_seed=waldiez.cache_seed,
            )
        else:
            context.config.update(
                name=waldiez.name,
                description=waldiez.description or "",
                tags=waldiez.tags or [],
                requirements=waldiez.requirements or [],
                output_extension="ipynb" if for_notebook else "py",
                is_async=waldiez.is_async,
                output_directory=output_dir,
                cache_seed=waldiez.cache_seed,
            )

    return FlowExporter(
        waldiez=waldiez,
        output_dir=output_dir,
        for_notebook=for_notebook,
        context=context,
        **kwargs,
    )

execution_generator

Generates the main() and call_main() functions.

ExecutionGenerator

Generate the main function and its calling block for flow exporter.

generate staticmethod

generate(
    content: str,
    is_async: bool,
    for_notebook: bool,
    cache_seed: int | None,
    after_run: str,
) -> str

Generate the complete flow script content.

Parameters:

NameTypeDescriptionDefault
contentstr

The content of the chats to be included in the main function.

required
is_asyncbool

Whether to generate async content.

required
for_notebookbool

Whether the export is intended for a notebook environment.

required
cache_seedstr | int | None

The cache seed to use for flow chat if any.

required
after_runstr

Additional content to add after the main chat execution, by default ""

required

Returns:

TypeDescription
str

The complete flow script content.

Source code in waldiez/exporting/flow/execution_generator.py
@staticmethod
def generate(
    content: str,
    is_async: bool,
    for_notebook: bool,
    cache_seed: int | None,
    after_run: str,
) -> str:
    """Generate the complete flow script content.

    Parameters
    ----------
    content : str
        The content of the chats to be included in the main function.
    is_async : bool
        Whether to generate async content.
    for_notebook : bool
        Whether the export is intended for a notebook environment.
    cache_seed : str | int | None
        The cache seed to use for flow chat if any.
    after_run : str, optional
        Additional content to add after the main chat execution,
        by default ""

    Returns
    -------
    str
        The complete flow script content.
    """
    main_function = ExecutionGenerator.generate_main_function(
        content=content,
        is_async=is_async,
        cache_seed=cache_seed,
        after_run=after_run,
        for_notebook=for_notebook,
    )
    call_main_function = ExecutionGenerator.generate_call_main_function(
        is_async=is_async,
        for_notebook=for_notebook,
    )
    if not for_notebook:
        execution_block = ExecutionGenerator.generate_execution_block(
            is_async=is_async,
        )
    else:
        execution_block = ""
    return (
        "\n".join([main_function, call_main_function, execution_block])
        + "\n"
    )

generate_main_function staticmethod

generate_main_function(
    content: str,
    is_async: bool,
    cache_seed: int | None,
    after_run: str,
    for_notebook: bool,
) -> str

Generate the main function for the flow script.

Parameters:

NameTypeDescriptionDefault
contentstr

The content of the chats to be included in the main function.

required
is_asyncbool

Whether to generate async content

required
cache_seedstr | int | None

The cache seed to use for flow chat if any

required
after_runstr

Additional content to add after the main chat execution.

required
for_notebookbool

Whether the export is intended for a notebook environment.

required

Returns:

TypeDescription
str

The complete main function content.

Source code in waldiez/exporting/flow/execution_generator.py
@staticmethod
def generate_main_function(
    content: str,
    is_async: bool,
    cache_seed: int | None,
    after_run: str,
    for_notebook: bool,
) -> str:
    """Generate the main function for the flow script.

    Parameters
    ----------
    content : str
        The content of the chats to be included in the main function.
    is_async : bool
        Whether to generate async content
    cache_seed : str | int | None
        The cache seed to use for flow chat if any
    after_run : str
        Additional content to add after the main chat execution.
    for_notebook : bool
        Whether the export is intended for a notebook environment.

    Returns
    -------
    str
        The complete main function content.
    """
    if content.startswith("\n"):
        content = content[1:]
    flow_content = "\n\n"
    comment = get_comment(
        "Start chatting",
        for_notebook=for_notebook,
    )
    flow_content += f"{comment}\n"
    if is_async:
        flow_content += "async "

    flow_content += f"def main() -> {RETURN_TYPE_HINT}:\n"
    flow_content += f"    {main_doc_string()}\n"
    space = "    "
    if cache_seed is not None:
        flow_content += (
            f"    with Cache.disk(cache_seed={cache_seed}"
            ") as cache:  # pyright: ignore\n"
        )
        space = f"{space}    "
    flow_content += f"{content}" + "\n"
    if is_async:
        flow_content += f"{space}await stop_logging()"
    else:
        flow_content += f"{space}stop_logging()"
    flow_content += "\n"
    if after_run:
        flow_content += after_run + "\n"
    if cache_seed is not None:
        space = space[4:]
    flow_content += f"{space}return results\n"
    return flow_content

generate_call_main_function staticmethod

generate_call_main_function(
    is_async: bool, for_notebook: bool
) -> str

Generate the call_main function for the flow script.

Parameters:

NameTypeDescriptionDefault
is_asyncbool

Whether to generate async content

required
for_notebookbool

Whether the export is intended for a notebook environment.

required

Returns:

TypeDescription
str

The complete call_main function content.

Source code in waldiez/exporting/flow/execution_generator.py
@staticmethod
def generate_call_main_function(is_async: bool, for_notebook: bool) -> str:
    """Generate the call_main function for the flow script.

    Parameters
    ----------
    is_async : bool
        Whether to generate async content
    for_notebook : bool
        Whether the export is intended for a notebook environment.

    Returns
    -------
    str
        The complete call_main function content.
    """
    content = "\n"
    if for_notebook:
        if is_async:
            return "# %%\nawait main()\n"
        return "# %%\nmain()\n"
    if is_async:
        content += "async def call_main() -> None:\n"
    else:
        content += "def call_main() -> None:\n"
    content += '    """Run the main function and print the results."""\n'
    content += f"    results: {RETURN_TYPE_HINT} = "
    if is_async:
        content += "await "
    content += "main()\n"
    content += "    if isinstance(results, dict):\n"
    content += "        # order by key\n"
    content += "        ordered_results = dict(sorted(results.items()))\n"
    content += "        for _, result in ordered_results.items():\n"
    content += "            pprint(asdict(result))\n"
    content += "    else:\n"
    content += "        if not isinstance(results, list):\n"
    content += "            results = [results]\n"
    content += "        for result in results:\n"
    content += "            pprint(asdict(result))\n"
    content += "\n"
    return content

generate_execution_block staticmethod

generate_execution_block(is_async: bool) -> str

Generate the execution block for the main function.

Parameters:

NameTypeDescriptionDefault
is_asyncbool

Whether to generate async content

required

Returns:

TypeDescription
str

The complete if name == "main": block content

Source code in waldiez/exporting/flow/execution_generator.py
@staticmethod
def generate_execution_block(is_async: bool) -> str:
    """Generate the execution block for the main function.

    Parameters
    ----------
    is_async : bool
        Whether to generate async content

    Returns
    -------
    str
        The complete if __name__ == "__main__": block content
    """
    comment = get_comment(
        "Let's go!",
        for_notebook=False,
    )
    content = 'if __name__ == "__main__":\n'
    content += f"    {comment}"
    if is_async:
        content += "    anyio.run(call_main)\n"
    else:
        content += "    call_main()\n"
    return content

file_generator

Generate the whole folw content.

FileGenerator

FileGenerator(context: ExporterContext)

Bases: ContentGenerator

Generate the complete flow notebook content.

Parameters:

NameTypeDescriptionDefault
contextExporterContext

The exporter context containing dependencies.

required
Source code in waldiez/exporting/flow/file_generator.py
def __init__(
    self,
    context: ExporterContext,
) -> None:
    """Initialize the notebook generator.

    Parameters
    ----------
    context : ExporterContext
        The exporter context containing dependencies.
    """
    self.context = context
    self.config = context.get_config()

generate

generate(
    merged_result: ExportResult,
    is_async: bool,
    after_run: str,
    **kwargs: Any
) -> str

Generate the complete flow notebook content.

Parameters:

NameTypeDescriptionDefault
merged_resultExportResult

The merged export result containing all content.

required
is_asyncbool

Whether to generate async conten

required
after_runstr

Additional content to add after the main flow execution.

required
**kwargsAny

Additional keyword arguments for the generator.

{}

Returns:

TypeDescription
str

The complete flow notebook content.

Raises:

TypeDescription
ExporterContentError

If there is no content to export.

Source code in waldiez/exporting/flow/file_generator.py
def generate(
    self,
    merged_result: ExportResult,
    is_async: bool,
    after_run: str,
    **kwargs: Any,
) -> str:
    """Generate the complete flow notebook content.

    Parameters
    ----------
    merged_result : ExportResult
        The merged export result containing all content.
    is_async : bool
        Whether to generate async conten
    after_run : str
        Additional content to add after the main flow execution.
    **kwargs : Any
        Additional keyword arguments for the generator.

    Returns
    -------
    str
        The complete flow notebook content.

    Raises
    ------
    ExporterContentError
        If there is no content to export.
    """
    # 1. Generate header
    header = self.get_header(merged_result)

    # 2. Generate imports
    imports_section = merged_result.get_content_by_position(
        ExportPosition.IMPORTS
    )

    # 3. Generate content sections
    tools_section = merged_result.get_content_by_position(
        ExportPosition.TOOLS
    )
    models_section = merged_result.get_content_by_position(
        ExportPosition.MODELS
    )
    agents_section = merged_result.get_content_by_position(
        ExportPosition.AGENTS,
        # Skip agent arguments (should already be there)
        skip_agent_arguments=True,
    )
    chats_content = merged_result.get_content_by_position(
        ExportPosition.CHATS
    )
    if not chats_content:
        raise ExporterContentError(
            "No content to export. Please ensure that the flow has chats."
        )
    after_chats = merged_result.get_content_by_position(
        ExportPosition.BOTTOM
    )

    main, call_main, execution_block = self._get_execution_content(
        chats_content=chats_content,
        is_async=is_async,
        after_run=after_run,
        for_notebook=self.config.for_notebook,
    )

    # 5. Combine everything
    everything: list[str] = [header]
    if imports_section:
        comment = get_comment(
            "Imports",
            for_notebook=self.config.for_notebook,
        )
        everything.append(comment)
        everything.append(
            "\n".join([entry.content for entry in imports_section])
        )
    if tools_section:
        comment = get_comment(
            "Tools",
            for_notebook=self.config.for_notebook,
        )
        everything.append(comment)
        everything.append(
            "\n".join([entry.content for entry in tools_section]) + "\n"
        )
    if models_section:
        comment = get_comment(
            "Models",
            for_notebook=self.config.for_notebook,
        )
        everything.append(comment)
        everything.append(
            "\n\n".join([entry.content for entry in models_section]) + "\n"
        )
    if agents_section:
        comment = get_comment(
            "Agents",
            for_notebook=self.config.for_notebook,
        )
        if self.config.for_notebook:
            comment += "# pyright: reportUnnecessaryIsInstance=false\n"
        everything.append(comment)
        everything.append(
            "\n\n".join([entry.content for entry in agents_section]) + "\n"
        )
    everything.append(main)
    if after_chats:
        everything.append(
            "\n".join([entry.content for entry in after_chats])
        )
    everything.append(call_main)
    if execution_block:
        everything.append(execution_block)

    return "\n".join(everything)

get_header

get_header(merged_result: ExportResult) -> str

Get or generate the header for the script.

Parameters:

NameTypeDescriptionDefault
merged_resultExportResult

The merged export result containing all content.

required

Returns:

TypeDescription
str

The header content.

Source code in waldiez/exporting/flow/file_generator.py
def get_header(self, merged_result: ExportResult) -> str:
    """Get or generate the header for the script.

    Parameters
    ----------
    merged_result : ExportResult
        The merged export result containing all content.

    Returns
    -------
    str
        The header content.
    """
    from_result = merged_result.get_content_by_position(ExportPosition.TOP)
    if not from_result:
        return generate_header(
            name=self.config.name,
            description=self.config.description,
            requirements=self.config.requirements,
            tags=self.config.tags,
            for_notebook=self.config.for_notebook,
        )
    header_string = "\n".join(content.content for content in from_result)
    while not header_string.endswith("\n\n"):
        header_string += "\n"
    return header_string

orchestrator

Flow export orchestrator.

ExportOrchestrator

ExportOrchestrator(
    waldiez: Waldiez, context: ExporterContext
)

Coordinates the export process.

Parameters:

NameTypeDescriptionDefault
waldiezWaldiez

The Waldiez instance containing the flow to export.

required
contextExporterContext

The exporter context containing dependencies and configuration.

required
Source code in waldiez/exporting/flow/orchestrator.py
def __init__(
    self,
    waldiez: Waldiez,
    context: ExporterContext,
) -> None:
    """Initialize the export orchestrator.

    Parameters
    ----------
    waldiez : Waldiez
        The Waldiez instance containing the flow to export.
    context : ExporterContext
        The exporter context containing dependencies and configuration.
    """
    self.waldiez = waldiez
    self.context = context
    self.config = context.get_config()
    self._tools_exporter: ToolsExporter | None = None
    self._models_exporter: ModelsExporter | None = None
    self._chats_exporter: ChatsExporter | None = None
    self.logger = context.get_logger()
    self._initialize()

orchestrate

orchestrate() -> ExportResult

Orchestrate the export process.

Returns:

TypeDescription
ExportResult

The result of the export process, containing the generated script and any additional metadata.

Source code in waldiez/exporting/flow/orchestrator.py
def orchestrate(self) -> ExportResult:
    """Orchestrate the export process.

    Returns
    -------
    ExportResult
        The result of the export process,
        containing the generated script and any additional metadata.
    """
    results: list[ExportResult] = []
    agent_arguments: dict[str, list[str]] = {}

    # 1. Tools first (needed by agents)
    if self.waldiez.tools:
        self.logger.info("Exporting tools ...")
        tools_result = self._get_tools_exporter().export()
        # Extract tool arguments for agents
        tool_arguments = self._extract_agent_arguments_from_result(
            tools_result
        )
        # Merge tool arguments into agent arguments
        self._merge_agent_arguments(
            source=tool_arguments,
            target=agent_arguments,
        )
        results.append(tools_result)
        self.logger.debug("Exported %s", tools_result)

    # 2. Models second (needed by agents)
    if self.waldiez.models:
        self.logger.info("Exporting models ...")
        models_result = self._get_models_exporter().export()
        # Extract model arguments for agents
        model_arguments = self._extract_agent_arguments_from_result(
            models_result
        )
        # Merge model arguments into agent arguments
        self._merge_agent_arguments(
            source=model_arguments,
            target=agent_arguments,
        )
        results.append(models_result)
        self.logger.debug("Exported %s", models_result)
    # 3. Chats third (agents might need agent chat registrations)
    # we always have at least one chat (already validated in Waldiez init)
    self.logger.info("Exporting chats ...")
    chats_result = self._get_chats_exporter().export()
    self.logger.debug("Exported %s", chats_result)
    # Extract chat arguments for agents
    chat_arguments = self._extract_agent_arguments_from_result(chats_result)
    # Merge chat arguments into agent arguments
    self._merge_agent_arguments(
        source=chat_arguments,
        target=agent_arguments,
    )
    results.append(chats_result)

    # 4. Agents last
    # we always have at least one agent (already validated in Waldiez init)
    agent_results = self._export_all_agents(agent_arguments)
    results.extend(agent_results)

    # 5. Merge everything
    merger = ContentMerger(self.context)
    merged_result = merger.merge_results(results)
    # Check for issues
    stats = merger.get_merge_statistics()
    if stats.conflicts_found:
        self.logger.info(
            "Resolved %d merge conflicts", len(stats.conflicts_found)
        )
    self.logger.debug("Merged result: %s", merged_result)
    return self._finalize_export(merged_result)

get_after_run_content

get_after_run_content() -> str

Get the content to be executed after the main flow run.

Returns:

TypeDescription
str

The content to be executed after the main flow run.

Source code in waldiez/exporting/flow/orchestrator.py
def get_after_run_content(self) -> str:
    """Get the content to be executed after the main flow run.

    Returns
    -------
    str
        The content to be executed after the main flow run.
    """
    return get_after_run_content(
        waldiez=self.waldiez,
        agent_names=self.agent_names,
        tabs=1,
    )

exporter

Flow exporter.

FlowExporter

FlowExporter(
    waldiez: Waldiez,
    output_dir: Path | None,
    for_notebook: bool,
    context: Optional[ExporterContext] = None,
    **kwargs: Any
)

Bases: Exporter[FlowExtras]

Flow exporter.

Parameters:

NameTypeDescriptionDefault
waldiezWaldiez

The Waldiez instance containing the flow data.

required
output_dirPath

The directory where the exported flow will be saved.

required
for_notebookbool

Whether the export is intended for a notebook environment.

required
contextOptional[ExporterContext]

Exporter context with dependencies, by default None

None
**kwargsAny

Additional keyword arguments for the exporter.

{}
Source code in waldiez/exporting/flow/exporter.py
def __init__(
    self,
    waldiez: Waldiez,
    output_dir: Path | None,
    for_notebook: bool,
    context: Optional[ExporterContext] = None,
    **kwargs: Any,
) -> None:
    """Initialize the chats exporter.

    Parameters
    ----------
    waldiez : Waldiez
        The Waldiez instance containing the flow data.
    output_dir : Path
        The directory where the exported flow will be saved.
    for_notebook : bool
        Whether the export is intended for a notebook environment.
    context : Optional[ExporterContext], optional
        Exporter context with dependencies, by default None
    **kwargs : Any
        Additional keyword arguments for the exporter.
    """
    super().__init__(context, **kwargs)

    self.waldiez = waldiez
    self.output_dir = Path(output_dir) if output_dir is not None else None
    self.flow_config = self.context.get_config(
        name=waldiez.name,
        description=waldiez.description,
        requirements=waldiez.requirements or [],
        tags=waldiez.tags or [],
        output_extension="ipynb" if for_notebook else "py",
        is_async=waldiez.is_async,
        output_directory=str(self.output_dir) if self.output_dir else None,
        cache_seed=waldiez.cache_seed,
    )
    self._extras = self._generate_extras()

extras property

extras: FlowExtras

Get the flow exporter extras.

Returns:

TypeDescription
dict[str, Any]

The extras dictionary containing additional information for the flow exporter.

generate_main_content

generate_main_content() -> str

Generate the main content of the export.

Returns:

TypeDescription
str

The final executable script or notebook content.

Source code in waldiez/exporting/flow/exporter.py
def generate_main_content(self) -> str:
    """Generate the main content of the export.

    Returns
    -------
    str
        The final executable script or notebook content.
    """
    orchestrator = ExportOrchestrator(
        waldiez=self.waldiez,
        context=self.context,
    )
    merged_result = orchestrator.orchestrate()
    after_run = orchestrator.get_after_run_content()
    generator = FileGenerator(
        context=self.context,
    )
    return generator.generate(
        merged_result=merged_result,
        is_async=self.waldiez.is_async,
        after_run=after_run,
    )

utils

Flow exporter utils.

generate_header

generate_header(
    name: str,
    description: str,
    requirements: list[str],
    tags: list[str],
    for_notebook: bool,
) -> str

Generate the header for the script or notebook.

Parameters:

NameTypeDescriptionDefault
namestr

The name of the flow.

required
descriptionstr

A brief description of the flow.

required
requirementslist[str]

A list of requirements for the flow.

required
tagslist[str]

A list of tags associated with the flow.

required
for_notebookbool

Whether the header is for a notebook or a script.

required

Returns:

TypeDescription
str

The header content.

Source code in waldiez/exporting/flow/utils/common.py
def generate_header(
    name: str,
    description: str,
    requirements: list[str],
    tags: list[str],
    for_notebook: bool,
) -> str:
    """Generate the header for the script or notebook.

    Parameters
    ----------
    name : str
        The name of the flow.
    description : str
        A brief description of the flow.
    requirements : list[str]
        A list of requirements for the flow.
    tags : list[str]
        A list of tags associated with the flow.
    for_notebook : bool
        Whether the header is for a notebook or a script.

    Returns
    -------
    str
        The header content.
    """
    if not for_notebook:
        return _get_py_header(
            name=name,
            description=description,
            requirements=requirements,
            tags=tags,
        )
    return _get_ipynb_heeader(
        name=name,
        description=description,
        requirements=requirements,
        tags=tags,
    )

get_after_run_content

get_after_run_content(
    waldiez: Waldiez, agent_names: dict[str, str], tabs: int
) -> str

Get content to add after the flow is run.

Parameters:

NameTypeDescriptionDefault
waldiezWaldiez

The waldiez object.

required
agent_namesdict[str, str]

The dictionary of agent names and their corresponding ids

required
tabsint

The number of tabs to add before the content.

required

Returns:

TypeDescription
str

The content to add after the flow is run.

Source code in waldiez/exporting/flow/utils/common.py
def get_after_run_content(
    waldiez: Waldiez,
    agent_names: dict[str, str],
    tabs: int,
) -> str:
    """Get content to add after the flow is run.

    Parameters
    ----------
    waldiez : Waldiez
        The waldiez object.
    agent_names : dict[str, str]
        The dictionary of agent names and their corresponding ids
    tabs : int
        The number of tabs to add before the content.

    Returns
    -------
    str
        The content to add after the flow is run.
    """
    # if the flow has a reasoning agents, we add
    # agent.visualize_tree() for each agent
    content = ""
    tab = "    "
    space = tab * tabs
    for agent in waldiez.agents:
        if agent.is_reasoning:
            agent_name = agent_names[agent.id]
            content += f"""
{space}# pylint: disable=broad-except,too-many-try-statements
{space}try:
{space}{tab}{agent_name}.visualize_tree()
{space}{tab}if os.path.exists("tree_of_thoughts.png"):
{space}{tab}{tab}new_name = "{agent_name}_tree_of_thoughts.png"
{space}{tab}{tab}os.rename("tree_of_thoughts.png", new_name)
{space}except BaseException:
{space}{tab}pass
{space}# save the tree to json
{space}try:
{space}{tab}data = {agent_name}._root.to_dict()  # pylint: disable=protected-access  # pyright: ignore
{space}{tab}with open("{agent_name}_reasoning_tree.json", "w", encoding="utf-8") as f:
{space}{tab}{tab}json.dump(data, f)
{space}except BaseException:
{space}{tab}pass
"""
    return content

get_np_no_nep50_handle

get_np_no_nep50_handle() -> str

Handle the "module numpy has no attribute _no_pep50_warning" error.

Returns:

TypeDescription
str

The content to handle the error.

Source code in waldiez/exporting/flow/utils/common.py
def get_np_no_nep50_handle() -> str:
    """Handle the "module numpy has no attribute _no_pep50_warning" error.

    Returns
    -------
    str
        The content to handle the error.
    """
    content = '''
#
# let's try to avoid:
# module 'numpy' has no attribute '_no_nep50_warning'"
# ref: https://github.com/numpy/numpy/blob/v2.2.2/doc/source/release/2.2.0-notes.rst#nep-50-promotion-state-option-removed
os.environ["NEP50_DEPRECATION_WARNING"] = "0"
os.environ["NEP50_DISABLE_WARNING"] = "1"
os.environ["NPY_PROMOTION_STATE"] = "weak"
if not hasattr(np, "_no_pep50_warning"):

    import contextlib
    from typing import Generator

    @contextlib.contextmanager
    def _np_no_nep50_warning() -> Generator[None, None, None]:
        """Dummy function to avoid the warning.

        Yields
        ------
        None
            Nothing.
        """
        yield
    setattr(np, "_no_pep50_warning", _np_no_nep50_warning)  # noqa
'''
    return content

get_sorted_imports

get_sorted_imports(
    collected_imports: list[str],
) -> list[str]

Get the sorted imports.

Parameters:

NameTypeDescriptionDefault
collected_importslist[str]

The collected imports.

required

Returns:

TypeDescription
list[str]

The sorted imports.

Source code in waldiez/exporting/flow/utils/importing.py
def get_sorted_imports(collected_imports: list[str]) -> list[str]:
    """Get the sorted imports.

    Parameters
    ----------
    collected_imports : list[str]
        The collected imports.

    Returns
    -------
    list[str]
        The sorted imports.
    """
    # Remove duplicates while preserving order by converting to dict
    unique_imports = list(dict.fromkeys(collected_imports))

    sorted_imports = sorted(
        [imp for imp in unique_imports if imp.startswith("import ")]
    ) + sorted([imp for imp in unique_imports if imp.startswith("from ")])
    return sorted_imports

get_the_imports_string

get_the_imports_string(
    all_imports: list[tuple[str, ImportPosition]],
    is_async: bool,
) -> str

Get the final imports string.

Parameters:

NameTypeDescriptionDefault
all_importslist[tuple[str, ImportPosition]]

All the imports.

required
is_asyncbool

If the flow is async.

required

Returns:

TypeDescription
str

The final imports string.

Source code in waldiez/exporting/flow/utils/importing.py
def get_the_imports_string(
    all_imports: list[tuple[str, ImportPosition]],
    is_async: bool,
) -> str:
    """Get the final imports string.

    Parameters
    ----------
    all_imports : list[tuple[str, ImportPosition]]
        All the imports.
    is_async : bool
        If the flow is async.

    Returns
    -------
    str
        The final imports string.
    """
    (
        builtin_imports,
        autogen_imports,
        third_party_imports,
        local_imports,
        got_import_autogen,
    ) = sort_imports(all_imports)

    # Get the final imports string.
    # Making sure that there are two lines
    # after each import section
    # (builtin, third party, local)
    final_string = "\n".join(builtin_imports) + "\n"
    while not final_string.endswith("\n\n"):
        final_string += "\n"

    if is_async:
        final_string += (
            "\nimport aiofiles"
            "\nimport aiosqlite"
            "\nimport anyio"
            "\nimport nest_asyncio"
            "\nfrom aiocsv import AsyncDictWriter\n"
        )

    if got_import_autogen:
        final_string += "\nimport autogen  # type: ignore\n"

    if autogen_imports:  # pragma: no branch
        final_string += "\n".join(autogen_imports) + "\n"

    if third_party_imports:  # pragma: no branch
        final_string += "\n".join(third_party_imports) + "\n"

    while not final_string.endswith("\n\n"):
        final_string += "\n"

    if local_imports:
        final_string += "\n".join(local_imports) + "\n"

    while not final_string.endswith("\n\n"):
        final_string += "\n"

    if is_async:
        final_string += (
            "# pylint: disable=broad-exception-caught\n"
            "try:\n"
            "    nest_asyncio.apply()  # pyright: ignore\n"
            "except BaseException:\n"
            "    pass  # maybe on uvloop?\n"
        )

    return final_string.replace("\n\n\n", "\n\n")  # avoid too many newlines

get_sqlite_out

get_sqlite_out(is_async: bool) -> str

Get the sqlite to csv and json conversion code string.

Parameters:

NameTypeDescriptionDefault
is_asyncbool

Whether to use async mode.

required

Returns:

TypeDescription
str

The sqlite to csv and json conversion code string.

Source code in waldiez/exporting/flow/utils/logging.py
def get_sqlite_out(is_async: bool) -> str:
    """Get the sqlite to csv and json conversion code string.

    Parameters
    ----------
    is_async : bool
        Whether to use async mode.

    Returns
    -------
    str
        The sqlite to csv and json conversion code string.
    """
    if is_async:
        return get_async_sqlite_out()
    return get_sync_sqlite_out()

get_start_logging

get_start_logging(
    is_async: bool, for_notebook: bool
) -> str

Get the logging start call string.

Parameters:

NameTypeDescriptionDefault
is_asyncbool

Whether to use async mode.

required
for_notebookbool

Whether the logging is for a notebook or a script.

required

Returns:

TypeDescription
str

The logging start string.

Example

```python

get_start_logging() def start_logging() -> None: \"\"\"Start logging.\"\"\" runtime_logging.start( logger_type="sqlite", config={"dbname": "flow.db"}, )

Source code in waldiez/exporting/flow/utils/logging.py
def get_start_logging(is_async: bool, for_notebook: bool) -> str:
    r"""Get the logging start call string.

    Parameters
    ----------
    is_async : bool
        Whether to use async mode.
    for_notebook : bool
        Whether the logging is for a notebook or a script.

    Returns
    -------
    str
        The logging start string.

    Example
    -------
    ```python
    >>> get_start_logging()
    def start_logging() -> None:
        \"\"\"Start logging.\"\"\"
        runtime_logging.start(
            logger_type="sqlite",
            config={"dbname": "flow.db"},
        )
    """
    tab = ""
    comment = get_comment(
        "Start logging.",
        for_notebook=for_notebook,
    )
    if is_async is False:
        return f'''
{tab}{comment}
{tab}def start_logging() -> None:
{tab}    """Start logging."""
{tab}    runtime_logging.start(
{tab}        logger_type="sqlite",
{tab}        config={{"dbname": "flow.db"}},
{tab}    )
{tab}
{tab}
{tab}start_logging()
'''
    return f'''
{tab}{comment}
{tab}def start_logging() -> None:
{tab}    """Start logging."""
{tab}    # pylint: disable=import-outside-toplevel
{tab}    from anyio.from_thread import start_blocking_portal

{tab}    with start_blocking_portal(backend="asyncio") as portal:
{tab}        portal.call(
{tab}            runtime_logging.start,
{tab}            None,
{tab}            "sqlite",
{tab}            {{"dbname": "flow.db"}},
{tab}        )
{tab}
{tab}
{tab}start_logging()
'''

get_stop_logging

get_stop_logging(is_async: bool, tabs: int = 0) -> str

Get the function to stop logging and gather logs.

Parameters:

NameTypeDescriptionDefault
is_asyncbool

Whether to use async mode

required
tabsint

The number of tabs to use for indentation, by default 0

0

Returns:

TypeDescription
str

The logging stop string.

Example

```python

get_logging_stop_string() def stop_logging() -> None: \"\"\"Stop logging.\"\"\" runtime_logging.stop() for table in [ "chat_completions", "agents", "oai_wrappers", "oai_clients", "version", "events", "function_calls", ]: dest = os.path.join("logs", f"{table}.csv") get_sqlite_out("flow.db", table, dest)

Source code in waldiez/exporting/flow/utils/logging.py
def get_stop_logging(is_async: bool, tabs: int = 0) -> str:
    r"""Get the function to stop logging and gather logs.

    Parameters
    ----------
    is_async : bool
        Whether to use async mode
    tabs : int, optional
        The number of tabs to use for indentation, by default 0

    Returns
    -------
    str
        The logging stop string.

    Example
    -------
    ```python
    >>> get_logging_stop_string()
    def stop_logging() -> None:
        \"\"\"Stop logging.\"\"\"
        runtime_logging.stop()
        for table in [
            "chat_completions",
            "agents",
            "oai_wrappers",
            "oai_clients",
            "version",
            "events",
            "function_calls",
        ]:
            dest = os.path.join("logs", f"{table}.csv")
            get_sqlite_out("flow.db", table, dest)
    """
    tab = "    " * tabs
    content = "\n" + tab
    if is_async:
        content += "async "
    content += "def stop_logging() -> None:\n"
    content += '    """Stop logging."""\n'
    if is_async:
        content += f"{tab}    # pylint: disable=import-outside-toplevel\n"
        content += f"{tab}    from asyncer import asyncify\n\n"
        content += f"{tab}    await asyncify(runtime_logging.stop)()\n"
    else:
        content += f"{tab}    runtime_logging.stop()\n"
    content += get_sqlite_out_call(tabs + 1, is_async)
    return content

common

Common utils for the final generatio.

generate_header

generate_header(
    name: str,
    description: str,
    requirements: list[str],
    tags: list[str],
    for_notebook: bool,
) -> str

Generate the header for the script or notebook.

Parameters:

NameTypeDescriptionDefault
namestr

The name of the flow.

required
descriptionstr

A brief description of the flow.

required
requirementslist[str]

A list of requirements for the flow.

required
tagslist[str]

A list of tags associated with the flow.

required
for_notebookbool

Whether the header is for a notebook or a script.

required

Returns:

TypeDescription
str

The header content.

Source code in waldiez/exporting/flow/utils/common.py
def generate_header(
    name: str,
    description: str,
    requirements: list[str],
    tags: list[str],
    for_notebook: bool,
) -> str:
    """Generate the header for the script or notebook.

    Parameters
    ----------
    name : str
        The name of the flow.
    description : str
        A brief description of the flow.
    requirements : list[str]
        A list of requirements for the flow.
    tags : list[str]
        A list of tags associated with the flow.
    for_notebook : bool
        Whether the header is for a notebook or a script.

    Returns
    -------
    str
        The header content.
    """
    if not for_notebook:
        return _get_py_header(
            name=name,
            description=description,
            requirements=requirements,
            tags=tags,
        )
    return _get_ipynb_heeader(
        name=name,
        description=description,
        requirements=requirements,
        tags=tags,
    )

main_doc_string

main_doc_string() -> str

Generate the docstring for the main function.

Returns:

TypeDescription
str

The docstring for the main function.

Source code in waldiez/exporting/flow/utils/common.py
def main_doc_string() -> str:
    """Generate the docstring for the main function.

    Returns
    -------
    str
        The docstring for the main function.
    """
    return f'''"""Start chatting.

    Returns
    -------
    {RETURN_TYPE_HINT}
        The result of the chat session, which can be a single ChatResult,
        a list of ChatResults, or a dictionary mapping integers to ChatResults.
    """'''

get_after_run_content

get_after_run_content(
    waldiez: Waldiez, agent_names: dict[str, str], tabs: int
) -> str

Get content to add after the flow is run.

Parameters:

NameTypeDescriptionDefault
waldiezWaldiez

The waldiez object.

required
agent_namesdict[str, str]

The dictionary of agent names and their corresponding ids

required
tabsint

The number of tabs to add before the content.

required

Returns:

TypeDescription
str

The content to add after the flow is run.

Source code in waldiez/exporting/flow/utils/common.py
def get_after_run_content(
    waldiez: Waldiez,
    agent_names: dict[str, str],
    tabs: int,
) -> str:
    """Get content to add after the flow is run.

    Parameters
    ----------
    waldiez : Waldiez
        The waldiez object.
    agent_names : dict[str, str]
        The dictionary of agent names and their corresponding ids
    tabs : int
        The number of tabs to add before the content.

    Returns
    -------
    str
        The content to add after the flow is run.
    """
    # if the flow has a reasoning agents, we add
    # agent.visualize_tree() for each agent
    content = ""
    tab = "    "
    space = tab * tabs
    for agent in waldiez.agents:
        if agent.is_reasoning:
            agent_name = agent_names[agent.id]
            content += f"""
{space}# pylint: disable=broad-except,too-many-try-statements
{space}try:
{space}{tab}{agent_name}.visualize_tree()
{space}{tab}if os.path.exists("tree_of_thoughts.png"):
{space}{tab}{tab}new_name = "{agent_name}_tree_of_thoughts.png"
{space}{tab}{tab}os.rename("tree_of_thoughts.png", new_name)
{space}except BaseException:
{space}{tab}pass
{space}# save the tree to json
{space}try:
{space}{tab}data = {agent_name}._root.to_dict()  # pylint: disable=protected-access  # pyright: ignore
{space}{tab}with open("{agent_name}_reasoning_tree.json", "w", encoding="utf-8") as f:
{space}{tab}{tab}json.dump(data, f)
{space}except BaseException:
{space}{tab}pass
"""
    return content

get_np_no_nep50_handle

get_np_no_nep50_handle() -> str

Handle the "module numpy has no attribute _no_pep50_warning" error.

Returns:

TypeDescription
str

The content to handle the error.

Source code in waldiez/exporting/flow/utils/common.py
def get_np_no_nep50_handle() -> str:
    """Handle the "module numpy has no attribute _no_pep50_warning" error.

    Returns
    -------
    str
        The content to handle the error.
    """
    content = '''
#
# let's try to avoid:
# module 'numpy' has no attribute '_no_nep50_warning'"
# ref: https://github.com/numpy/numpy/blob/v2.2.2/doc/source/release/2.2.0-notes.rst#nep-50-promotion-state-option-removed
os.environ["NEP50_DEPRECATION_WARNING"] = "0"
os.environ["NEP50_DISABLE_WARNING"] = "1"
os.environ["NPY_PROMOTION_STATE"] = "weak"
if not hasattr(np, "_no_pep50_warning"):

    import contextlib
    from typing import Generator

    @contextlib.contextmanager
    def _np_no_nep50_warning() -> Generator[None, None, None]:
        """Dummy function to avoid the warning.

        Yields
        ------
        None
            Nothing.
        """
        yield
    setattr(np, "_no_pep50_warning", _np_no_nep50_warning)  # noqa
'''
    return content

importing

Get the standard imports for the flow exporter.

get_sorted_imports

get_sorted_imports(
    collected_imports: list[str],
) -> list[str]

Get the sorted imports.

Parameters:

NameTypeDescriptionDefault
collected_importslist[str]

The collected imports.

required

Returns:

TypeDescription
list[str]

The sorted imports.

Source code in waldiez/exporting/flow/utils/importing.py
def get_sorted_imports(collected_imports: list[str]) -> list[str]:
    """Get the sorted imports.

    Parameters
    ----------
    collected_imports : list[str]
        The collected imports.

    Returns
    -------
    list[str]
        The sorted imports.
    """
    # Remove duplicates while preserving order by converting to dict
    unique_imports = list(dict.fromkeys(collected_imports))

    sorted_imports = sorted(
        [imp for imp in unique_imports if imp.startswith("import ")]
    ) + sorted([imp for imp in unique_imports if imp.startswith("from ")])
    return sorted_imports

sort_imports

sort_imports(
    all_imports: list[tuple[str, ImportPosition]],
) -> tuple[
    list[str], list[str], list[str], list[str], bool
]

Sort the imports.

Parameters:

NameTypeDescriptionDefault
all_importslist[tuple[str, ImportPosition]]

All the imports.

required

Returns:

TypeDescription
tuple[list[str], list[str], list[str], list[str], bool]

The sorted imports and a flag if we got import autogen.

Source code in waldiez/exporting/flow/utils/importing.py
def sort_imports(
    all_imports: list[tuple[str, ImportPosition]],
) -> tuple[list[str], list[str], list[str], list[str], bool]:
    """Sort the imports.

    Parameters
    ----------
    all_imports : list[tuple[str, ImportPosition]]
        All the imports.

    Returns
    -------
    tuple[list[str], list[str], list[str], list[str], bool]
        The sorted imports and a flag if we got `import autogen`.
    """
    builtin_imports: list[str] = BUILTIN_IMPORTS.copy() + TYPING_IMPORTS.copy()
    third_party_imports: list[str] = []
    local_imports: list[str] = []
    autogen_imports: list[str] = COMMON_AUTOGEN_IMPORTS.copy()
    got_import_autogen = False

    for import_string, position in all_imports:
        if "import autogen" in import_string:
            got_import_autogen = True
            continue
        if import_string.startswith("from autogen"):
            autogen_imports.append(import_string)
            continue
        if position == ImportPosition.BUILTINS:
            builtin_imports.append(import_string)
        elif position == ImportPosition.THIRD_PARTY:
            third_party_imports.append(import_string)
        elif position == ImportPosition.LOCAL:  # pragma: no branch
            local_imports.append(import_string)

    autogen_imports = clean_and_group_autogen_imports(autogen_imports)
    third_party_imports = ensure_np_import(third_party_imports)
    sorted_builtins = get_sorted_imports(builtin_imports)
    sorted_third_party = get_sorted_imports(third_party_imports)
    sorted_locals = get_sorted_imports(local_imports)

    return (
        sorted_builtins,
        sorted(autogen_imports),
        sorted_third_party,
        sorted_locals,
        got_import_autogen,
    )

clean_and_group_autogen_imports

clean_and_group_autogen_imports(
    autogen_imports: list[str],
) -> list[str]

Cleanup and group autogen imports.

Parameters:

NameTypeDescriptionDefault
autogen_importslist[str]

List of autogen import statements

required

Returns:

TypeDescription
list[str]

Cleaned and grouped autogen imports

Source code in waldiez/exporting/flow/utils/importing.py
def clean_and_group_autogen_imports(autogen_imports: list[str]) -> list[str]:
    """Cleanup and group autogen imports.

    Parameters
    ----------
    autogen_imports : list[str]
        List of autogen import statements

    Returns
    -------
    list[str]
        Cleaned and grouped autogen imports
    """
    # Group imports by module path
    import_groups: dict[str, set[str]] = {}
    direct_imports: set[str] = set()

    for imp in autogen_imports:
        imp = imp.strip()
        if not imp:
            continue

        if imp.startswith("import autogen"):
            direct_imports.add(imp)
            continue

        # Parse "from autogen.module import items"
        if imp.startswith("from autogen"):  # pragma: no branch
            parts = imp.split(" import ")
            if len(parts) == 2:  # pragma: no branch
                module_path = parts[0]  # "from autogen.module"
                items = parts[1].strip()

                if module_path not in import_groups:
                    import_groups[module_path] = set()

                # Handle multiple imports in one line
                for item in items.split(","):
                    import_groups[module_path].add(item.strip())

    # Build cleaned import list
    cleaned_imports: list[str] = []

    # Add direct imports first
    cleaned_imports.extend(sorted(direct_imports))

    # Add grouped imports, sorted by module path
    for module_path in sorted(import_groups.keys()):
        sorted_items = sorted(import_groups[module_path])
        items_str = ", ".join(sorted_items)
        import_statement = f"{module_path} import {items_str}"
        cleaned_imports.append(import_statement)

    return cleaned_imports

get_the_imports_string

get_the_imports_string(
    all_imports: list[tuple[str, ImportPosition]],
    is_async: bool,
) -> str

Get the final imports string.

Parameters:

NameTypeDescriptionDefault
all_importslist[tuple[str, ImportPosition]]

All the imports.

required
is_asyncbool

If the flow is async.

required

Returns:

TypeDescription
str

The final imports string.

Source code in waldiez/exporting/flow/utils/importing.py
def get_the_imports_string(
    all_imports: list[tuple[str, ImportPosition]],
    is_async: bool,
) -> str:
    """Get the final imports string.

    Parameters
    ----------
    all_imports : list[tuple[str, ImportPosition]]
        All the imports.
    is_async : bool
        If the flow is async.

    Returns
    -------
    str
        The final imports string.
    """
    (
        builtin_imports,
        autogen_imports,
        third_party_imports,
        local_imports,
        got_import_autogen,
    ) = sort_imports(all_imports)

    # Get the final imports string.
    # Making sure that there are two lines
    # after each import section
    # (builtin, third party, local)
    final_string = "\n".join(builtin_imports) + "\n"
    while not final_string.endswith("\n\n"):
        final_string += "\n"

    if is_async:
        final_string += (
            "\nimport aiofiles"
            "\nimport aiosqlite"
            "\nimport anyio"
            "\nimport nest_asyncio"
            "\nfrom aiocsv import AsyncDictWriter\n"
        )

    if got_import_autogen:
        final_string += "\nimport autogen  # type: ignore\n"

    if autogen_imports:  # pragma: no branch
        final_string += "\n".join(autogen_imports) + "\n"

    if third_party_imports:  # pragma: no branch
        final_string += "\n".join(third_party_imports) + "\n"

    while not final_string.endswith("\n\n"):
        final_string += "\n"

    if local_imports:
        final_string += "\n".join(local_imports) + "\n"

    while not final_string.endswith("\n\n"):
        final_string += "\n"

    if is_async:
        final_string += (
            "# pylint: disable=broad-exception-caught\n"
            "try:\n"
            "    nest_asyncio.apply()  # pyright: ignore\n"
            "except BaseException:\n"
            "    pass  # maybe on uvloop?\n"
        )

    return final_string.replace("\n\n\n", "\n\n")  # avoid too many newlines

ensure_np_import

ensure_np_import(
    third_party_imports: list[str],
) -> list[str]

Ensure numpy is imported.

Parameters:

NameTypeDescriptionDefault
third_party_importslist[str]

The third party imports.

required

Returns:

TypeDescription
list[str]

The third party imports with numpy.

Source code in waldiez/exporting/flow/utils/importing.py
def ensure_np_import(third_party_imports: list[str]) -> list[str]:
    """Ensure numpy is imported.

    Parameters
    ----------
    third_party_imports : list[str]
        The third party imports.

    Returns
    -------
    list[str]
        The third party imports with numpy.
    """
    if (
        not third_party_imports
        or "import numpy as np" not in third_party_imports
    ):
        third_party_imports.append("import numpy as np")
    return third_party_imports

gather_imports

gather_imports(
    model_imports: Optional[
        list[tuple[str, ImportPosition]]
    ] = None,
    tool_imports: Optional[
        list[tuple[str, ImportPosition]]
    ] = None,
    chat_imports: Optional[
        list[tuple[str, ImportPosition]]
    ] = None,
    agent_imports: Optional[
        list[tuple[str, ImportPosition]]
    ] = None,
) -> list[tuple[str, ImportPosition]]

Gather all the imports.

Parameters:

NameTypeDescriptionDefault
model_importstuple[str, ImportPosition]

The model imports.

None
tool_importstuple[str, ImportPosition]

The tool imports.

None
chat_importstuple[str, ImportPosition]

The chat imports.

None
agent_importstuple[str, ImportPosition]

The agent imports.

None

Returns:

TypeDescription
tuple[str, ImportPosition]

The gathered imports.

Source code in waldiez/exporting/flow/utils/importing.py
def gather_imports(
    model_imports: Optional[list[tuple[str, ImportPosition]]] = None,
    tool_imports: Optional[list[tuple[str, ImportPosition]]] = None,
    chat_imports: Optional[list[tuple[str, ImportPosition]]] = None,
    agent_imports: Optional[list[tuple[str, ImportPosition]]] = None,
) -> list[tuple[str, ImportPosition]]:
    """Gather all the imports.

    Parameters
    ----------
    model_imports : tuple[str, ImportPosition]
        The model imports.
    tool_imports : tuple[str, ImportPosition]
        The tool imports.
    chat_imports : tuple[str, ImportPosition]
        The chat imports.
    agent_imports : tuple[str, ImportPosition]
        The agent imports.

    Returns
    -------
    tuple[str, ImportPosition]
        The gathered imports.
    """
    all_imports: list[tuple[str, ImportPosition]] = []
    for import_statement in BUILTIN_IMPORTS:
        all_imports.append(
            (
                import_statement,
                ImportPosition.BUILTINS,
            )
        )
    if model_imports:
        all_imports.extend(model_imports)
    if tool_imports:
        all_imports.extend(tool_imports)
    if chat_imports:
        all_imports.extend(chat_imports)
    if agent_imports:
        all_imports.extend(agent_imports)
    all_imports = deduplicate_imports(all_imports)
    one_line_typing_imports: list[tuple[str, ImportPosition]] = []
    final_imports: list[tuple[str, ImportPosition]] = []
    for import_tuple in all_imports:
        if import_tuple[0].startswith("from typing import "):
            one_line_typing_imports.append(import_tuple)
        else:
            final_imports.append(import_tuple)
    final_typing_imports = get_the_typing_imports(one_line_typing_imports)
    final_imports.insert(1, (final_typing_imports, ImportPosition.BUILTINS))
    return final_imports

get_the_typing_imports

get_the_typing_imports(
    one_line_typing_imports: list[
        tuple[str, ImportPosition]
    ],
) -> str

Get the typing imports as a single line.

Parameters:

NameTypeDescriptionDefault
one_line_typing_importslist[tuple[str, ImportPosition]]

The one line typing imports.

required

Returns:

TypeDescription
str

The final typing imports string.

Source code in waldiez/exporting/flow/utils/importing.py
def get_the_typing_imports(
    one_line_typing_imports: list[tuple[str, ImportPosition]],
) -> str:
    """Get the typing imports as a single line.

    Parameters
    ----------
    one_line_typing_imports : list[tuple[str, ImportPosition]]
        The one line typing imports.

    Returns
    -------
    str
        The final typing imports string.
    """
    # merge the ones found in the imports
    # with the default ones
    typing_imports: set[str] = set(TYPING_IMPORT_NAMES)
    for import_tuple in one_line_typing_imports:
        if import_tuple[0].startswith(
            "from typing import "
        ):  # pragma: no branch
            # extract the names from the import statement
            names = import_tuple[0].split("from typing import ")[1]
            for name in names.split(","):
                typing_imports.add(name.strip())
    # create the final import statement
    final_typing_imports = (
        f"from typing import {', '.join(sorted(typing_imports))}"
    )
    return final_typing_imports.strip()

deduplicate_imports

deduplicate_imports(
    imports: list[tuple[str, ImportPosition]],
) -> list[tuple[str, ImportPosition]]

Deduplicate imports while preserving order.

Parameters:

NameTypeDescriptionDefault
importslist[tuple[str, ImportPosition]]

The imports to deduplicate.

required

Returns:

TypeDescription
list[tuple[str, ImportPosition]]

The deduplicated imports.

Source code in waldiez/exporting/flow/utils/importing.py
def deduplicate_imports(
    imports: list[tuple[str, ImportPosition]],
) -> list[tuple[str, ImportPosition]]:
    """Deduplicate imports while preserving order.

    Parameters
    ----------
    imports : list[tuple[str, ImportPosition]]
        The imports to deduplicate.

    Returns
    -------
    list[tuple[str, ImportPosition]]
        The deduplicated imports.
    """
    # Remove duplicates while preserving order
    seen: set[tuple[str, ImportPosition]] = set()
    deduplicated_imports: list[tuple[str, ImportPosition]] = []
    for import_tuple in imports:
        if import_tuple not in seen:
            seen.add(import_tuple)
            deduplicated_imports.append(import_tuple)
    return deduplicated_imports

linting

Linting comments to include in the generated code.

split_linter_comment

split_linter_comment(
    prefix: str, rules: list[str], max_lines: int = 3
) -> str

Split linter comment.

Parameters:

NameTypeDescriptionDefault
prefixstr

The prefix for the comment, e.g., "# pylint: disable=" or "# pyright: ".

required
ruleslist[str]

The list of linter rules to include in the comment.

required
max_linesint

The maximum number of lines to split the comment into, by default 3.

3

Returns:

TypeDescription
str

The formatted comment string with the rules split into lines.

Source code in waldiez/exporting/flow/utils/linting.py
def split_linter_comment(
    prefix: str,
    rules: list[str],
    max_lines: int = 3,
) -> str:
    """Split linter comment.

    Parameters
    ----------
    prefix : str
        The prefix for the comment, e.g., "# pylint: disable=" or "# pyright: ".
    rules : list[str]
        The list of linter rules to include in the comment.
    max_lines : int, optional
        The maximum number of lines to split the comment into, by default 3.

    Returns
    -------
    str
        The formatted comment string with the rules split into lines.
    """
    # Calculate minimum number of rules per line to not exceed max_lines
    rules_per_line = max(1, math.ceil(len(rules) / max_lines))
    # pylint: disable=inconsistent-quotes
    lines = [
        f"{prefix}{','.join(rules[i : i + rules_per_line])}"
        for i in range(0, len(rules), rules_per_line)
    ]
    return "\n".join(lines) + "\n"

get_flake8_ignore_comment

get_flake8_ignore_comment(
    rules: list[str] | None = None,
) -> str

Get the flake8 ignore comment string.

Parameters:

NameTypeDescriptionDefault
rulesOptional[list[str]]

The flake8 rules to ignore, by default None.

None

Returns:

TypeDescription
str

The flake8 ignore comment string.

Example
>>> get_flake8_ignore_comment(["E501", "F401"])

# flake8: noqa: E501, F401
Source code in waldiez/exporting/flow/utils/linting.py
def get_flake8_ignore_comment(rules: list[str] | None = None) -> str:
    """Get the flake8 ignore comment string.

    Parameters
    ----------
    rules : Optional[list[str]], optional
        The flake8 rules to ignore, by default None.

    Returns
    -------
    str
        The flake8 ignore comment string.

    Example
    -------
    ```python
    >>> get_flake8_ignore_comment(["E501", "F401"])

    # flake8: noqa: E501, F401
    ```
    """
    if not rules:
        rules = FLAKE8_RULES
    prefix = "# flake8: noqa: "
    output = ", ".join(rules)
    return prefix + output + "\n"

get_pylint_ignore_comment

get_pylint_ignore_comment(
    rules: list[str] | None = None,
) -> str

Get the pylint ignore comment string.

Parameters:

NameTypeDescriptionDefault
rulesOptional[list[str]]

The pylint rules to ignore, by default None.

None

Returns:

TypeDescription
str

The pylint ignore comment string.

Example
>>> get_pylint_ignore_comment(True, ["invalid-name", "line-too-long"])

# pylint: disable=invalid-name, line-too-long
Source code in waldiez/exporting/flow/utils/linting.py
def get_pylint_ignore_comment(rules: list[str] | None = None) -> str:
    """Get the pylint ignore comment string.

    Parameters
    ----------
    rules : Optional[list[str]], optional
        The pylint rules to ignore, by default None.

    Returns
    -------
    str
        The pylint ignore comment string.

    Example
    -------
    ```python
    >>> get_pylint_ignore_comment(True, ["invalid-name", "line-too-long"])

    # pylint: disable=invalid-name, line-too-long
    ```
    """
    if not rules:
        rules = PYLINT_RULES
    prefix = "# pylint: disable="
    return split_linter_comment(
        prefix,
        rules,
        max_lines=3,
    )

get_pyright_ignore_comment

get_pyright_ignore_comment(
    rules: list[str] | None = None,
) -> str

Get the pyright ignore comment string.

Parameters:

NameTypeDescriptionDefault
rulesOptional[list[str]]

The pyright rules to ignore, by default None.

None

Returns:

TypeDescription
str

The pyright ignore comment string.

Example
>>> get_pyright_ignore_comment(
...     True,
...     ["reportUnusedImport", "reportMissingTypeStubs"]
... )

# pyright: reportUnusedImport=false, reportMissingTypeStubs=false
Source code in waldiez/exporting/flow/utils/linting.py
def get_pyright_ignore_comment(rules: list[str] | None = None) -> str:
    """Get the pyright ignore comment string.

    Parameters
    ----------
    rules : Optional[list[str]], optional
        The pyright rules to ignore, by default None.

    Returns
    -------
    str
        The pyright ignore comment string.

    Example
    -------
    ```python
    >>> get_pyright_ignore_comment(
    ...     True,
    ...     ["reportUnusedImport", "reportMissingTypeStubs"]
    ... )

    # pyright: reportUnusedImport=false, reportMissingTypeStubs=false
    ```
    """
    if not rules:
        rules = PYRIGHT_RULES
    prefix = "# pyright: "
    output = split_linter_comment(
        prefix,
        [f"{rule}=false" for rule in rules],
        max_lines=3,
    )
    return output

get_mypy_ignore_comment

get_mypy_ignore_comment(
    rules: list[str] | None = None,
) -> str

Get the mypy ignore comment string.

Parameters:

NameTypeDescriptionDefault
rulesOptional[list[str]]

The mypy rules to ignore, by default None.

None

Returns:

TypeDescription
str

The mypy ignore comment string.

Example
>>> get_mypy_ignore_comment(["import-untyped", "no-redef"])

# mypy: disable-error-code="import-untyped,no-redef"
Source code in waldiez/exporting/flow/utils/linting.py
def get_mypy_ignore_comment(rules: list[str] | None = None) -> str:
    """Get the mypy ignore comment string.

    Parameters
    ----------
    rules : Optional[list[str]], optional
        The mypy rules to ignore, by default None.

    Returns
    -------
    str
        The mypy ignore comment string.

    Example
    -------
    ```python
    >>> get_mypy_ignore_comment(["import-untyped", "no-redef"])

    # mypy: disable-error-code="import-untyped,no-redef"
    ```
    """
    if rules is None:
        rules = MYPY_RULES
    if not rules:
        return "# type: ignore\n"
    prefix = "# mypy: disable-error-code="
    content = ", ".join(rules)
    return prefix + f'"{content}"\n'

logging

Logging related string generation functions.

Functions:

NameDescription
get_logging_start_string

Get the string to start logging.

get_logging_stop_string

Get the string to stop logging.

get_sqlite_to_csv_string

Get the sqlite to csv conversion code string.

get_sqlite_to_csv_call_string

Get the string to call the sqlite to csv conversion.

get_start_logging

get_start_logging(
    is_async: bool, for_notebook: bool
) -> str

Get the logging start call string.

Parameters:

NameTypeDescriptionDefault
is_asyncbool

Whether to use async mode.

required
for_notebookbool

Whether the logging is for a notebook or a script.

required

Returns:

TypeDescription
str

The logging start string.

Example

```python

get_start_logging() def start_logging() -> None: \"\"\"Start logging.\"\"\" runtime_logging.start( logger_type="sqlite", config={"dbname": "flow.db"}, )

Source code in waldiez/exporting/flow/utils/logging.py
def get_start_logging(is_async: bool, for_notebook: bool) -> str:
    r"""Get the logging start call string.

    Parameters
    ----------
    is_async : bool
        Whether to use async mode.
    for_notebook : bool
        Whether the logging is for a notebook or a script.

    Returns
    -------
    str
        The logging start string.

    Example
    -------
    ```python
    >>> get_start_logging()
    def start_logging() -> None:
        \"\"\"Start logging.\"\"\"
        runtime_logging.start(
            logger_type="sqlite",
            config={"dbname": "flow.db"},
        )
    """
    tab = ""
    comment = get_comment(
        "Start logging.",
        for_notebook=for_notebook,
    )
    if is_async is False:
        return f'''
{tab}{comment}
{tab}def start_logging() -> None:
{tab}    """Start logging."""
{tab}    runtime_logging.start(
{tab}        logger_type="sqlite",
{tab}        config={{"dbname": "flow.db"}},
{tab}    )
{tab}
{tab}
{tab}start_logging()
'''
    return f'''
{tab}{comment}
{tab}def start_logging() -> None:
{tab}    """Start logging."""
{tab}    # pylint: disable=import-outside-toplevel
{tab}    from anyio.from_thread import start_blocking_portal

{tab}    with start_blocking_portal(backend="asyncio") as portal:
{tab}        portal.call(
{tab}            runtime_logging.start,
{tab}            None,
{tab}            "sqlite",
{tab}            {{"dbname": "flow.db"}},
{tab}        )
{tab}
{tab}
{tab}start_logging()
'''

get_sync_sqlite_out

get_sync_sqlite_out() -> str

Get the sqlite to csv and json conversion code string.

Returns:

TypeDescription
str

The sqlite to csv and json conversion code string.

Example
>>> get_sqlite_outputs()
def get_sqlite_out(dbname: str, table: str, csv_file: str) -> None:
    \"\"\"Convert a sqlite table to csv and json files.

Parameters
----------
    dbname : str
        The sqlite database name.
    table : str
        The table name.
    csv_file : str
        The csv file name.
    \"\"\"
    conn = sqlite3.connect(dbname)
    query = f"SELECT * FROM {table}"  # nosec
    cursor = conn.execute(query)
    rows = cursor.fetchall()
    column_names = [description[0] for description in cursor.description]
    data = [dict(zip(column_names, row)) for row in rows]
    conn.close()
    with open(csv_file, "w", newline="", encoding="utf-8") as file:
        csv_writer = csv.DictWriter(file, fieldnames=column_names)
        csv_writer.writeheader()
        csv_writer.writerows(data)
    json_file = csv_file.replace(".csv", ".json")
    with open(json_file, "w", encoding="utf-8") as file:
        json.dump(data, file, indent=4, ensure_ascii=False)
Source code in waldiez/exporting/flow/utils/logging.py
def get_sync_sqlite_out() -> str:
    r"""Get the sqlite to csv and json conversion code string.

    Returns
    -------
    str
        The sqlite to csv and json conversion code string.

    Example
    -------
    ```python
    >>> get_sqlite_outputs()
    def get_sqlite_out(dbname: str, table: str, csv_file: str) -> None:
        \"\"\"Convert a sqlite table to csv and json files.

    Parameters
    ----------
        dbname : str
            The sqlite database name.
        table : str
            The table name.
        csv_file : str
            The csv file name.
        \"\"\"
        conn = sqlite3.connect(dbname)
        query = f"SELECT * FROM {table}"  # nosec
        cursor = conn.execute(query)
        rows = cursor.fetchall()
        column_names = [description[0] for description in cursor.description]
        data = [dict(zip(column_names, row)) for row in rows]
        conn.close()
        with open(csv_file, "w", newline="", encoding="utf-8") as file:
            csv_writer = csv.DictWriter(file, fieldnames=column_names)
            csv_writer.writeheader()
            csv_writer.writerows(data)
        json_file = csv_file.replace(".csv", ".json")
        with open(json_file, "w", encoding="utf-8") as file:
            json.dump(data, file, indent=4, ensure_ascii=False)
    ```
    """
    content = "\n\n"
    content += (
        "def get_sqlite_out(dbname: str, table: str, csv_file: str) -> None:\n"
    )
    content += '    """Convert a sqlite table to csv and json files.\n\n'
    content += "    Parameters\n"
    content += "    ----------\n"
    content += "    dbname : str\n"
    content += "        The sqlite database name.\n"
    content += "    table : str\n"
    content += "        The table name.\n"
    content += "    csv_file : str\n"
    content += "        The csv file name.\n"
    content += '    """\n'
    content += "    conn = sqlite3.connect(dbname)\n"
    content += '    query = f"SELECT * FROM {table}"  # nosec\n'
    content += "    try:\n"
    content += "        cursor = conn.execute(query)\n"
    content += "    except sqlite3.OperationalError:\n"
    content += "        conn.close()\n"
    content += "        return\n"
    content += "    rows = cursor.fetchall()\n"
    content += "    column_names = [description[0] for description "
    content += "in cursor.description]\n"
    content += "    data = [dict(zip(column_names, row)) for row in rows]\n"
    content += "    conn.close()\n"
    content += (
        '    with open(csv_file, "w", newline="", encoding="utf-8") as file:\n'
    )
    content += (
        "        csv_writer = csv.DictWriter(file, fieldnames=column_names)\n"
    )
    content += "        csv_writer.writeheader()\n"
    content += "        csv_writer.writerows(data)\n"
    content += '    json_file = csv_file.replace(".csv", ".json")\n'
    content += '    with open(json_file, "w", encoding="utf-8") as file:\n'
    content += "        json.dump(data, file, indent=4, ensure_ascii=False)\n"
    content += "\n"
    return content

get_async_sqlite_out

get_async_sqlite_out() -> str

Get the sqlite to csv and json conversion code string.

Returns:

TypeDescription
str

The sqlite to csv and json conversion code string.

Example
>>> get_sqlite_outputs()
async def get_sqlite_out(dbname: str, table: str, csv_file: str) -> None:
    \"\"\"Convert a sqlite table to csv and json files.

Parameters
----------
    dbname : str
        The sqlite database name.
    table : str
        The table name.
    csv_file : str
        The csv file name.
    \"\"\"
    conn = await aiosqlite.connect(dbname)
    query = f"SELECT * FROM {table}"  # nosec
    try:
        cursor = await conn.execute(query)
    except BaseException:  # pylint: disable=broad-except
        await conn.close()
        return
    rows = await cursor.fetchall()
    column_names = [description[0] for description in cursor.description]
    data = [dict(zip(column_names, row)) for row in rows]
    await cursor.close()
    await conn.close()
    async with aiofiles.open(csv_file, "w", newline="", encoding="utf-8") as file:
        csv_writer = csv.DictWriter(file, fieldnames=column_names)
        csv_writer.writeheader()
        csv_writer.writerows(data)
    json_file = csv_file.replace(".csv", ".json")
    async with aiofiles.open(json_file, "w", encoding="utf-8") as file:
        await file.write(json.dumps(data, indent=4, ensure_ascii=False)
Source code in waldiez/exporting/flow/utils/logging.py
def get_async_sqlite_out() -> str:
    r"""Get the sqlite to csv and json conversion code string.

    Returns
    -------
    str
        The sqlite to csv and json conversion code string.

    Example
    -------
    ```python
    >>> get_sqlite_outputs()
    async def get_sqlite_out(dbname: str, table: str, csv_file: str) -> None:
        \"\"\"Convert a sqlite table to csv and json files.

    Parameters
    ----------
        dbname : str
            The sqlite database name.
        table : str
            The table name.
        csv_file : str
            The csv file name.
        \"\"\"
        conn = await aiosqlite.connect(dbname)
        query = f"SELECT * FROM {table}"  # nosec
        try:
            cursor = await conn.execute(query)
        except BaseException:  # pylint: disable=broad-except
            await conn.close()
            return
        rows = await cursor.fetchall()
        column_names = [description[0] for description in cursor.description]
        data = [dict(zip(column_names, row)) for row in rows]
        await cursor.close()
        await conn.close()
        async with aiofiles.open(csv_file, "w", newline="", encoding="utf-8") as file:
            csv_writer = csv.DictWriter(file, fieldnames=column_names)
            csv_writer.writeheader()
            csv_writer.writerows(data)
        json_file = csv_file.replace(".csv", ".json")
        async with aiofiles.open(json_file, "w", encoding="utf-8") as file:
            await file.write(json.dumps(data, indent=4, ensure_ascii=False)
    ```
    """
    content = "\n\n"
    content += "async def get_sqlite_out(dbname: str, table: str, csv_file: str) -> None:\n"
    content += '    """Convert a sqlite table to csv and json files.\n\n'
    content += "    Parameters\n"
    content += "    ----------\n"
    content += "    dbname : str\n"
    content += "        The sqlite database name.\n"
    content += "    table : str\n"
    content += "        The table name.\n"
    content += "    csv_file : str\n"
    content += "        The csv file name.\n"
    content += '    """\n'
    content += "    conn = await aiosqlite.connect(dbname)\n"
    content += '    query = f"SELECT * FROM {table}"  # nosec\n'
    content += "    try:\n"
    content += "        cursor = await conn.execute(query)\n"
    content += "    except BaseException:  # pylint: disable=broad-except\n"
    content += "        await conn.close()\n"
    content += "        return\n"
    content += "    rows = await cursor.fetchall()\n"
    content += "    column_names = [description[0] for description "
    content += "in cursor.description]\n"
    content += "    data = [dict(zip(column_names, row)) for row in rows]\n"
    content += "    await cursor.close()\n"
    content += "    await conn.close()\n"
    content += (
        '    async with aiofiles.open(csv_file, "w", newline="", '
        'encoding="utf-8") as file:\n'
    )
    content += '        csv_writer = AsyncDictWriter(file, fieldnames=column_names, dialect="unix")\n'
    content += "        await csv_writer.writeheader()\n"
    content += "        await csv_writer.writerows(data)\n"
    content += '    json_file = csv_file.replace(".csv", ".json")\n'
    content += '    async with aiofiles.open(json_file, "w", encoding="utf-8") as file:\n'
    content += "        await file.write(json.dumps(data, indent=4, ensure_ascii=False))\n"
    content += "\n"
    return content

get_sqlite_out

get_sqlite_out(is_async: bool) -> str

Get the sqlite to csv and json conversion code string.

Parameters:

NameTypeDescriptionDefault
is_asyncbool

Whether to use async mode.

required

Returns:

TypeDescription
str

The sqlite to csv and json conversion code string.

Source code in waldiez/exporting/flow/utils/logging.py
def get_sqlite_out(is_async: bool) -> str:
    """Get the sqlite to csv and json conversion code string.

    Parameters
    ----------
    is_async : bool
        Whether to use async mode.

    Returns
    -------
    str
        The sqlite to csv and json conversion code string.
    """
    if is_async:
        return get_async_sqlite_out()
    return get_sync_sqlite_out()

get_sqlite_out_call

get_sqlite_out_call(tabs: int, is_async: bool) -> str

Get the sqlite to csv and json conversion call string.

Parameters:

NameTypeDescriptionDefault
tabsint

The number of tabs to use for indentation

required
is_asyncbool

Whether to use async mode

required

Returns:

TypeDescription
str

The sqlite to csv conversion call string.

Example
>>> get_sqlite_out_call()
if not os.path.exists("logs"):
    os.makedirs("logs")
for table in [
    "chat_completions",
    "agents",
    "oai_wrappers",
    "oai_clients",
    "version",
    "events",
    "function_calls",
]:
    dest = os.path.join("logs", f"{table}.csv")
    get_sqlite_out("flow.db", table, dest)
Source code in waldiez/exporting/flow/utils/logging.py
def get_sqlite_out_call(tabs: int, is_async: bool) -> str:
    """Get the sqlite to csv and json conversion call string.

    Parameters
    ----------
    tabs : int
        The number of tabs to use for indentation
    is_async : bool
        Whether to use async mode

    Returns
    -------
    str
        The sqlite to csv conversion call string.

    Example
    -------
    ```python
    >>> get_sqlite_out_call()
    if not os.path.exists("logs"):
        os.makedirs("logs")
    for table in [
        "chat_completions",
        "agents",
        "oai_wrappers",
        "oai_clients",
        "version",
        "events",
        "function_calls",
    ]:
        dest = os.path.join("logs", f"{table}.csv")
        get_sqlite_out("flow.db", table, dest)
    ```
    """
    table_names = [
        "chat_completions",
        "agents",
        "oai_wrappers",
        "oai_clients",
        "version",
        "events",
        "function_calls",
    ]
    tab = "    " * tabs
    content = ""
    content += tab + 'if not os.path.exists("logs"):\n'
    content += tab + '    os.makedirs("logs")\n'
    content += tab + "for table in [\n"
    for table in table_names:
        content += tab + f'    "{table}",' + "\n"
    content += tab + "]:\n"
    content += tab + '    dest = os.path.join("logs", f"{table}.csv")' + "\n"
    if is_async:
        content += tab + '    await get_sqlite_out("flow.db", table, dest)\n'
    else:
        content += tab + '    get_sqlite_out("flow.db", table, dest)\n'
    return content

get_stop_logging

get_stop_logging(is_async: bool, tabs: int = 0) -> str

Get the function to stop logging and gather logs.

Parameters:

NameTypeDescriptionDefault
is_asyncbool

Whether to use async mode

required
tabsint

The number of tabs to use for indentation, by default 0

0

Returns:

TypeDescription
str

The logging stop string.

Example

```python

get_logging_stop_string() def stop_logging() -> None: \"\"\"Stop logging.\"\"\" runtime_logging.stop() for table in [ "chat_completions", "agents", "oai_wrappers", "oai_clients", "version", "events", "function_calls", ]: dest = os.path.join("logs", f"{table}.csv") get_sqlite_out("flow.db", table, dest)

Source code in waldiez/exporting/flow/utils/logging.py
def get_stop_logging(is_async: bool, tabs: int = 0) -> str:
    r"""Get the function to stop logging and gather logs.

    Parameters
    ----------
    is_async : bool
        Whether to use async mode
    tabs : int, optional
        The number of tabs to use for indentation, by default 0

    Returns
    -------
    str
        The logging stop string.

    Example
    -------
    ```python
    >>> get_logging_stop_string()
    def stop_logging() -> None:
        \"\"\"Stop logging.\"\"\"
        runtime_logging.stop()
        for table in [
            "chat_completions",
            "agents",
            "oai_wrappers",
            "oai_clients",
            "version",
            "events",
            "function_calls",
        ]:
            dest = os.path.join("logs", f"{table}.csv")
            get_sqlite_out("flow.db", table, dest)
    """
    tab = "    " * tabs
    content = "\n" + tab
    if is_async:
        content += "async "
    content += "def stop_logging() -> None:\n"
    content += '    """Stop logging."""\n'
    if is_async:
        content += f"{tab}    # pylint: disable=import-outside-toplevel\n"
        content += f"{tab}    from asyncer import asyncify\n\n"
        content += f"{tab}    await asyncify(runtime_logging.stop)()\n"
    else:
        content += f"{tab}    runtime_logging.stop()\n"
    content += get_sqlite_out_call(tabs + 1, is_async)
    return content