DataFrame function#
This page answers common “how do I…” questions for exporting a Python function that processes one or multiple DataFrames into ONNX.
How to export a function processing one DataFrame#
Use yobx.sql.to_onnx() with a representative DataFrame.
The function is traced through the DataFrame API and compiled into ONNX.
<<<
import numpy as np
import pandas as pd
from yobx.helpers.onnx_helper import pretty_onnx
from yobx.sql import to_onnx
def add_total(df):
if hasattr(df, "select"):
return df.select({"total": df["a"] + df["b"]})
return (df["a"] + df["b"]).to_frame(name="total")
df = pd.DataFrame(
{
"a": np.array([1.0, 2.0], dtype=np.float32),
"b": np.array([3.0, 4.0], dtype=np.float32),
}
)
onx = to_onnx(add_total, (df,))
print(pretty_onnx(onx))
>>>
opset: domain='' version=21
input: name='a' type=dtype('float32') shape=['N']
input: name='b' type=dtype('float32') shape=['N']
Add(a, b) -> total
output: name='total' type='NOTENSOR' shape=None
How to export a function processing several DataFrames#
Pass a tuple of DataFrames to yobx.sql.to_onnx(). The callable receives
one traced frame per input DataFrame.
<<<
import numpy as np
import pandas as pd
from yobx.helpers.onnx_helper import pretty_onnx
from yobx.sql import to_onnx
def join_like(left, right):
if hasattr(left, "join"):
merged = left.join(right, left_key="id", right_key="id")
return merged.select({"total": merged["a"] + merged["b"]})
merged = pd.merge(left, right, on="id")
return (merged["a"] + merged["b"]).to_frame(name="total")
left = pd.DataFrame(
{
"id": np.array([0, 1], dtype=np.int64),
"a": np.array([1.0, 2.0], dtype=np.float32),
}
)
right = pd.DataFrame(
{
"id": np.array([0, 1], dtype=np.int64),
"b": np.array([10.0, 20.0], dtype=np.float32),
}
)
onx = to_onnx(join_like, (left, right))
print(pretty_onnx(onx))
>>>
opset: domain='' version=21
input: name='id' type=dtype('int64') shape=['N']
input: name='a' type=dtype('float32') shape=['N']
input: name='id_right' type=dtype('int64') shape=['N']
input: name='b' type=dtype('float32') shape=['N']
init: name='init7_s1_1' type=int64 shape=(1,) -- array([1]) -- Opset.make_node.1/Shape##Opset.make_node.1/Shape##ReduceArgTopKPattern.K##ReduceArgTopKPattern.K
init: name='init7_s1_0' type=int64 shape=(1,) -- array([0]) -- Opset.make_node.1/Shape
Unsqueeze(id, init7_s1_1) -> id::UnSq1
Unsqueeze(id_right, init7_s1_0) -> id_right::UnSq0
Equal(id::UnSq1, id_right::UnSq0) -> _onx_equal_id::UnSq1
Cast(_onx_equal_id::UnSq1, to=6) -> _onx_equal_id::UnSq1::C6
TopK(_onx_equal_id::UnSq1::C6, init7_s1_1, axis=1, largest=1) -> ReduceArgTopKPattern__onx_reducemax_equal_id::UnSq1::C6, ReduceArgTopKPattern__onx_argmax_equal_id::UnSq1::C6
Squeeze(ReduceArgTopKPattern__onx_reducemax_equal_id::UnSq1::C6, init7_s1_1) -> _onx_reducemax_equal_id::UnSq1::C6
Cast(_onx_reducemax_equal_id::UnSq1::C6, to=9) -> _onx_reducemax_equal_id::UnSq1::C6::C9
Compress(a, _onx_reducemax_equal_id::UnSq1::C6::C9, axis=0) -> _onx_compress_a
Squeeze(ReduceArgTopKPattern__onx_argmax_equal_id::UnSq1::C6, init7_s1_1) -> _onx_argmax_equal_id::UnSq1::C6
Compress(_onx_argmax_equal_id::UnSq1::C6, _onx_reducemax_equal_id::UnSq1::C6::C9, axis=0) -> _onx_compress_argmax_equal_id::UnSq1::C6
Gather(b, _onx_compress_argmax_equal_id::UnSq1::C6, axis=0) -> _onx_gather_b
Add(_onx_compress_a, _onx_gather_b) -> total
output: name='total' type='NOTENSOR' shape=None
How to use this in a scikit-learn Pipeline#
Wrap the DataFrame logic in a custom transformer inheriting from
TraceableMixin. Then place that transformer in a normal
Pipeline and export the fitted pipeline with
yobx.sklearn.to_onnx().
<<<
import numpy as np
import pandas as pd
import onnxruntime
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from yobx.sklearn import TraceableMixin, to_onnx
class AddTotalTransformer(BaseEstimator, TransformerMixin, TraceableMixin):
def fit(self, df, y=None):
return self
def transform(self, df):
if hasattr(df, "select"):
return df.select({"total": df["a"] + df["b"]})
return (df["a"] + df["b"]).to_frame(name="total")
rng = np.random.default_rng(0)
train_df = pd.DataFrame(
{
"a": rng.standard_normal(40).astype(np.float32),
"b": rng.standard_normal(40).astype(np.float32),
}
)
pipe = Pipeline([("features", AddTotalTransformer()), ("scale", StandardScaler())]).fit(
train_df
)
onx = to_onnx(pipe, (train_df,))
test_df = pd.DataFrame(
{
"a": rng.standard_normal(5).astype(np.float32),
"b": rng.standard_normal(5).astype(np.float32),
}
)
expected = pipe.transform(test_df).astype(np.float32)
sess = onnxruntime.InferenceSession(
onx.SerializeToString(), providers=["CPUExecutionProvider"]
)
(got,) = sess.run(
None,
{
"a": test_df[["a"]].to_numpy().astype(np.float32),
"b": test_df[["b"]].to_numpy().astype(np.float32),
},
)
assert np.allclose(got, expected, atol=1e-5)
print("Pipeline outputs match ✓")
>>>
Pipeline outputs match ✓
See also
scikit-learn — additional scikit-learn conversion patterns.
DataFrame Tracing — detailed design of DataFrame tracing.