DataFrame function#

This page answers common “how do I…” questions for exporting a Python function that processes one or multiple DataFrames into ONNX.


How to export a function processing one DataFrame#

Use yobx.sql.to_onnx() with a representative DataFrame. The function is traced through the DataFrame API and compiled into ONNX.

<<<

import numpy as np
import pandas as pd
from yobx.helpers.onnx_helper import pretty_onnx
from yobx.sql import to_onnx


def add_total(df):
    if hasattr(df, "select"):
        return df.select({"total": df["a"] + df["b"]})
    return (df["a"] + df["b"]).to_frame(name="total")


df = pd.DataFrame(
    {
        "a": np.array([1.0, 2.0], dtype=np.float32),
        "b": np.array([3.0, 4.0], dtype=np.float32),
    }
)
onx = to_onnx(add_total, (df,))
print(pretty_onnx(onx))

>>>

    opset: domain='' version=21
    input: name='a' type=dtype('float32') shape=['N']
    input: name='b' type=dtype('float32') shape=['N']
    Add(a, b) -> total
    output: name='total' type='NOTENSOR' shape=None

How to export a function processing several DataFrames#

Pass a tuple of DataFrames to yobx.sql.to_onnx(). The callable receives one traced frame per input DataFrame.

<<<

import numpy as np
import pandas as pd
from yobx.helpers.onnx_helper import pretty_onnx
from yobx.sql import to_onnx


def join_like(left, right):
    if hasattr(left, "join"):
        merged = left.join(right, left_key="id", right_key="id")
        return merged.select({"total": merged["a"] + merged["b"]})
    merged = pd.merge(left, right, on="id")
    return (merged["a"] + merged["b"]).to_frame(name="total")


left = pd.DataFrame(
    {
        "id": np.array([0, 1], dtype=np.int64),
        "a": np.array([1.0, 2.0], dtype=np.float32),
    }
)
right = pd.DataFrame(
    {
        "id": np.array([0, 1], dtype=np.int64),
        "b": np.array([10.0, 20.0], dtype=np.float32),
    }
)
onx = to_onnx(join_like, (left, right))
print(pretty_onnx(onx))

>>>

    opset: domain='' version=21
    input: name='id' type=dtype('int64') shape=['N']
    input: name='a' type=dtype('float32') shape=['N']
    input: name='id_right' type=dtype('int64') shape=['N']
    input: name='b' type=dtype('float32') shape=['N']
    init: name='init7_s1_1' type=int64 shape=(1,) -- array([1])           -- Opset.make_node.1/Shape##Opset.make_node.1/Shape##ReduceArgTopKPattern.K##ReduceArgTopKPattern.K
    init: name='init7_s1_0' type=int64 shape=(1,) -- array([0])           -- Opset.make_node.1/Shape
    Unsqueeze(id, init7_s1_1) -> id::UnSq1
    Unsqueeze(id_right, init7_s1_0) -> id_right::UnSq0
      Equal(id::UnSq1, id_right::UnSq0) -> _onx_equal_id::UnSq1
        Cast(_onx_equal_id::UnSq1, to=6) -> _onx_equal_id::UnSq1::C6
          TopK(_onx_equal_id::UnSq1::C6, init7_s1_1, axis=1, largest=1) -> ReduceArgTopKPattern__onx_reducemax_equal_id::UnSq1::C6, ReduceArgTopKPattern__onx_argmax_equal_id::UnSq1::C6
            Squeeze(ReduceArgTopKPattern__onx_reducemax_equal_id::UnSq1::C6, init7_s1_1) -> _onx_reducemax_equal_id::UnSq1::C6
              Cast(_onx_reducemax_equal_id::UnSq1::C6, to=9) -> _onx_reducemax_equal_id::UnSq1::C6::C9
                Compress(a, _onx_reducemax_equal_id::UnSq1::C6::C9, axis=0) -> _onx_compress_a
            Squeeze(ReduceArgTopKPattern__onx_argmax_equal_id::UnSq1::C6, init7_s1_1) -> _onx_argmax_equal_id::UnSq1::C6
              Compress(_onx_argmax_equal_id::UnSq1::C6, _onx_reducemax_equal_id::UnSq1::C6::C9, axis=0) -> _onx_compress_argmax_equal_id::UnSq1::C6
                Gather(b, _onx_compress_argmax_equal_id::UnSq1::C6, axis=0) -> _onx_gather_b
                  Add(_onx_compress_a, _onx_gather_b) -> total
    output: name='total' type='NOTENSOR' shape=None

How to use this in a scikit-learn Pipeline#

Wrap the DataFrame logic in a custom transformer inheriting from TraceableMixin. Then place that transformer in a normal Pipeline and export the fitted pipeline with yobx.sklearn.to_onnx().

<<<

import numpy as np
import pandas as pd
import onnxruntime
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from yobx.sklearn import TraceableMixin, to_onnx


class AddTotalTransformer(BaseEstimator, TransformerMixin, TraceableMixin):
    def fit(self, df, y=None):
        return self

    def transform(self, df):
        if hasattr(df, "select"):
            return df.select({"total": df["a"] + df["b"]})
        return (df["a"] + df["b"]).to_frame(name="total")


rng = np.random.default_rng(0)
train_df = pd.DataFrame(
    {
        "a": rng.standard_normal(40).astype(np.float32),
        "b": rng.standard_normal(40).astype(np.float32),
    }
)
pipe = Pipeline([("features", AddTotalTransformer()), ("scale", StandardScaler())]).fit(
    train_df
)

onx = to_onnx(pipe, (train_df,))

test_df = pd.DataFrame(
    {
        "a": rng.standard_normal(5).astype(np.float32),
        "b": rng.standard_normal(5).astype(np.float32),
    }
)
expected = pipe.transform(test_df).astype(np.float32)

sess = onnxruntime.InferenceSession(
    onx.SerializeToString(), providers=["CPUExecutionProvider"]
)
(got,) = sess.run(
    None,
    {
        "a": test_df[["a"]].to_numpy().astype(np.float32),
        "b": test_df[["b"]].to_numpy().astype(np.float32),
    },
)
assert np.allclose(got, expected, atol=1e-5)
print("Pipeline outputs match ✓")

>>>

    Pipeline outputs match ✓

See also

scikit-learn — additional scikit-learn conversion patterns.

DataFrame Tracing — detailed design of DataFrame tracing.