diff --git a/bigframes/functions/_remote_function_client.py b/bigframes/functions/_remote_function_client.py index 75385f11a5..5acd31b425 100644 --- a/bigframes/functions/_remote_function_client.py +++ b/bigframes/functions/_remote_function_client.py @@ -23,6 +23,7 @@ import string import sys import tempfile +import types from typing import cast, Tuple, TYPE_CHECKING from bigframes_vendored import constants @@ -43,6 +44,15 @@ logger = logging.getLogger(__name__) +# https://cloud.google.com/sdk/gcloud/reference/functions/deploy#--ingress-settings +_INGRESS_SETTINGS_MAP = types.MappingProxyType( + { + "all": functions_v2.ServiceConfig.IngressSettings.ALLOW_ALL, + "internal-only": functions_v2.ServiceConfig.IngressSettings.ALLOW_INTERNAL_ONLY, + "internal-and-gclb": functions_v2.ServiceConfig.IngressSettings.ALLOW_INTERNAL_AND_GCLB, + } +) + class RemoteFunctionClient: # Wait time (in seconds) for an IAM binding to take effect after creation @@ -228,6 +238,7 @@ def create_cloud_function( is_row_processor=False, vpc_connector=None, memory_mib=1024, + ingress_settings="all", ): """Create a cloud function from the given user defined function. @@ -324,6 +335,16 @@ def create_cloud_function( function.service_config.service_account_email = ( self._cloud_function_service_account ) + if ingress_settings not in _INGRESS_SETTINGS_MAP: + raise ValueError( + "'{}' not one of the supported ingress settings values: {}".format( + ingress_settings, list(_INGRESS_SETTINGS_MAP) + ) + ) + function.service_config.ingress_settings = cast( + functions_v2.ServiceConfig.IngressSettings, + _INGRESS_SETTINGS_MAP[ingress_settings], + ) function.kms_key_name = self._cloud_function_kms_key_name create_function_request.function = function @@ -372,6 +393,7 @@ def provision_bq_remote_function( is_row_processor, cloud_function_vpc_connector, cloud_function_memory_mib, + cloud_function_ingress_settings, ): """Provision a BigQuery remote function.""" # Augment user package requirements with any internal package @@ -418,6 +440,7 @@ def provision_bq_remote_function( is_row_processor=is_row_processor, vpc_connector=cloud_function_vpc_connector, memory_mib=cloud_function_memory_mib, + ingress_settings=cloud_function_ingress_settings, ) else: logger.info(f"Cloud function {cloud_function_name} already exists.") diff --git a/bigframes/functions/_remote_function_session.py b/bigframes/functions/_remote_function_session.py index 6bc7a4b079..a924dbd9c5 100644 --- a/bigframes/functions/_remote_function_session.py +++ b/bigframes/functions/_remote_function_session.py @@ -19,7 +19,17 @@ import inspect import sys import threading -from typing import Any, cast, Dict, Mapping, Optional, Sequence, TYPE_CHECKING, Union +from typing import ( + Any, + cast, + Dict, + Literal, + Mapping, + Optional, + Sequence, + TYPE_CHECKING, + Union, +) import warnings import bigframes_vendored.constants as constants @@ -110,6 +120,9 @@ def remote_function( cloud_function_max_instances: Optional[int] = None, cloud_function_vpc_connector: Optional[str] = None, cloud_function_memory_mib: Optional[int] = 1024, + cloud_function_ingress_settings: Literal[ + "all", "internal-only", "internal-and-gclb" + ] = "all", ): """Decorator to turn a user defined function into a BigQuery remote function. @@ -280,6 +293,11 @@ def remote_function( default memory of cloud functions be allocated, pass `None`. See for more details https://cloud.google.com/functions/docs/configuring/memory. + cloud_function_ingress_settings (str, Optional): + Ingress settings controls dictating what traffic can reach the + function. By default `all` will be used. It must be one of: + `all`, `internal-only`, `internal-and-gclb`. See for more details + https://cloud.google.com/functions/docs/networking/network-settings#ingress_settings. """ # Some defaults may be used from the session if not provided otherwise import bigframes.exceptions as bf_exceptions @@ -504,6 +522,7 @@ def try_delattr(attr): is_row_processor=is_row_processor, cloud_function_vpc_connector=cloud_function_vpc_connector, cloud_function_memory_mib=cloud_function_memory_mib, + cloud_function_ingress_settings=cloud_function_ingress_settings, ) # TODO(shobs): Find a better way to support udfs with param named "name". diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 94ea6becab..1bdf49eaf5 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -669,6 +669,9 @@ def remote_function( cloud_function_max_instances: Optional[int] = None, cloud_function_vpc_connector: Optional[str] = None, cloud_function_memory_mib: Optional[int] = 1024, + cloud_function_ingress_settings: Literal[ + "all", "internal-only", "internal-and-gclb" + ] = "all", ): return global_session.with_default_session( bigframes.session.Session.remote_function, @@ -687,6 +690,7 @@ def remote_function( cloud_function_max_instances=cloud_function_max_instances, cloud_function_vpc_connector=cloud_function_vpc_connector, cloud_function_memory_mib=cloud_function_memory_mib, + cloud_function_ingress_settings=cloud_function_ingress_settings, ) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 7d0cfaee5c..3a9cba442c 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1040,6 +1040,9 @@ def remote_function( cloud_function_max_instances: Optional[int] = None, cloud_function_vpc_connector: Optional[str] = None, cloud_function_memory_mib: Optional[int] = 1024, + cloud_function_ingress_settings: Literal[ + "all", "internal-only", "internal-and-gclb" + ] = "all", ): """Decorator to turn a user defined function into a BigQuery remote function. Check out the code samples at: https://cloud.google.com/bigquery/docs/remote-functions#bigquery-dataframes. @@ -1194,6 +1197,11 @@ def remote_function( default memory of cloud functions be allocated, pass `None`. See for more details https://cloud.google.com/functions/docs/configuring/memory. + cloud_function_ingress_settings (str, Optional): + Ingress settings controls dictating what traffic can reach the + function. By default `all` will be used. It must be one of: + `all`, `internal-only`, `internal-and-gclb`. See for more details + https://cloud.google.com/functions/docs/networking/network-settings#ingress_settings. Returns: callable: A remote function object pointing to the cloud assets created in the background to support the remote execution. The cloud assets can be @@ -1220,6 +1228,7 @@ def remote_function( cloud_function_max_instances=cloud_function_max_instances, cloud_function_vpc_connector=cloud_function_vpc_connector, cloud_function_memory_mib=cloud_function_memory_mib, + cloud_function_ingress_settings=cloud_function_ingress_settings, ) def read_gbq_function( diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index d1e82dd415..18d2609347 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -2173,3 +2173,69 @@ def foo(x): cleanup_remote_function_assets( session.bqclient, session.cloudfunctionsclient, foo ) + + +@pytest.mark.parametrize( + ("ingress_settings_args", "effective_ingress_settings"), + [ + pytest.param( + {}, functions_v2.ServiceConfig.IngressSettings.ALLOW_ALL, id="no-set" + ), + pytest.param( + {"cloud_function_ingress_settings": "all"}, + functions_v2.ServiceConfig.IngressSettings.ALLOW_ALL, + id="set-all", + ), + pytest.param( + {"cloud_function_ingress_settings": "internal-only"}, + functions_v2.ServiceConfig.IngressSettings.ALLOW_INTERNAL_ONLY, + id="set-internal-only", + ), + pytest.param( + {"cloud_function_ingress_settings": "internal-and-gclb"}, + functions_v2.ServiceConfig.IngressSettings.ALLOW_INTERNAL_AND_GCLB, + id="set-internal-and-gclb", + ), + ], +) +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_ingress_settings( + session, scalars_dfs, ingress_settings_args, effective_ingress_settings +): + try: + + def square(x: int) -> int: + return x * x + + square_remote = session.remote_function(reuse=False, **ingress_settings_args)( + square + ) + + # Assert that the GCF is created with the intended maximum timeout + gcf = session.cloudfunctionsclient.get_function( + name=square_remote.bigframes_cloud_function + ) + assert gcf.service_config.ingress_settings == effective_ingress_settings + + scalars_df, scalars_pandas_df = scalars_dfs + + bf_result = scalars_df["int64_too"].apply(square_remote).to_pandas() + pd_result = scalars_pandas_df["int64_too"].apply(square) + + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, square_remote + ) + + +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_ingress_settings_unsupported(session): + with pytest.raises( + ValueError, match="'unknown' not one of the supported ingress settings values" + ): + + @session.remote_function(reuse=False, cloud_function_ingress_settings="unknown") + def square(x: int) -> int: + return x * x