opensource.google.com

Menu
Showing posts with label Data. Show all posts
Showing posts with label Data. Show all posts

Producer java library for Data Lineage is now open source

Tuesday, January 28, 2025

Integrating OpenLineage producers with GCP Lineage just got a lot easier


What is Data Lineage

Data Lineage is a GCP feature that allows tracking data movement. This tool helps data owners and analysts detect anomalies in data flows, find connections between data sources and verify the potential consequences of planned changes in data pipelines.

Lineage is injected automatically for some Google Cloud products (BigQuery, Cloud Data Fusion, Cloud Composer, Dataproc, Vertex AI). That means, if Lineage integration with any of those products is enabled in the projects, data movements coming from executing jobs by these products will be reported to GCP Lineage.

For custom integrations, the API can be used to report and fetch lineage.

After injecting, lineage can be viewed in the Google Cloud console (available from DataCatalog UI, BigQuery UI, Vertex UI). There are two representations: graph view, with data sources as nodes and data movements as edges, and list view, a tabular representation. Lineage information can also be fetched from the API.

More information is available in the documentation.


GCP Lineage information model

We describe data flows using the following concepts:

  • Process is a definition of some data transformation. For example, a SQL or Spark script.
  • Run is an execution of a Process.
  • Lineage Event is a data transformation event. It is reported in context of a Run.
  • A Link represents a connection between two data sources, when data in the link’s Target depends on its Source. A Lineage Event contains a list of Links.

OpenLineage support

OpenLineage is an open standard for reporting lineage information. It unifies lineage reporting between systems, which means the events generated in this format can be consumed by any product supporting it. This leads to more flexibility: adding or replacing a lineage producer does not imply changing the consumer, and vice versa.

OpenLineage format is adopted by a number of lineage producers and consumers, meaning there is already tooling available to report lineage from/to those systems. GCP Lineage is one of those consumers: users can report events in OpenLineage format, see the resulting lineage on the UI, and query it via the API.

OpenLineage is the preferred method for reporting lineage in GCP Lineage. It is used by the Dataproc lineage integration. To find out more about sending OpenLineage events to GCP Lineage refer to the documentation.

After injecting lineage in OpenLineage format, it can be accessed in the same way as if it was injected via other API methods or automatically: from the Google Cloud console or the API.


Why producer library

The GCP Lineage producer library is an extension of the client library. Client libraries are recommended for calling Cloud APIs programmatically. They handle low level API call details, leaving the necessary user code simpler and shorter.

The producer library further simplifies integration by providing ready to use code needed to call the API from Java. It adds additional functionality such as synchronous and asynchronous clients, translating OpenLineage JSON messages to the API friendly format, error handling etc.

Using the producer library, all the code needed to send a request to GCP Lineage API is:

SyncLineageProducerClient client = SyncLineageProducerClient.create();
ProcessOpenLineageRunEventRequest request =
        ProcessOpenLineageRunEventRequest.newBuilder()
            .setParent(parent)
            .setOpenLineage(openLineageMessage)
            .build();
client.processOpenLineageRunEvent(request);

The field openLineageMessage here is a protobuf Struct that includes information about job execution, inputs and outputs and other metadata. The object model is described in the documentation. An example message is:

{
  "eventType": "START",
  "eventTime": "2023-04-04T13:21:16.098Z",
  "run": {
    "runId": "502483d6-3e3d-474f-9380-da565eaa7516",
    "facets": {
       "spark_properties": {
        "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.22.0/integration/spark",
        "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet",
        "properties": {
          "spark.master": "yarn",
          "spark.app.name": "sparkJobTest.py"
        }
      }
    }
  },
  "job": {
    "namespace": "project-name",
    "name": "cluster-name",
    "facets": {
    "jobType": {
        "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.22.0/integration/spark",
        "_schemaURL": "https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json#/$defs/JobTypeJobFacet",
        "processingType": "BATCH",
        "integration": "SPARK",
        "jobType": "SQL_JOB"
      },

    }
  },
  "inputs": [
    {
      "namespace": "bigquery",
      "name": "project.dataset.input_table",
    }],
  "outputs": [
   {
      "namespace": "bigquery",
      "name": "project.dataset.output_table",
    }],
  "producer": "https://github.com/OpenLineage/OpenLineage/tree/0.18.0/integration/spark",
  "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/$defs/RunEvent"
}

Learn more about building an OpenLineage message.


Best Practices for Constructing OpenLineage Messages

The openLineageMessage should follow the OpenLineage format. The fields that are required for correct parsing by the GCP Lineage API are:

job

mapped to Process

job.namespace

used to construct Process name

job.name

used to construct Process name

run

mapped to Run

run.runId

used to construct Run name

producer

URI identifying the producer of this metadata

eventTime

time of the data movement

schemaURL

URL pointing to the schema definition for this message

In addition to those, the fields used to create lineage are:

eventType

corresponds to the status of the Run

inputs

mapped to sources of links. Must be specified according to the naming conventions

outputs

mapped to targets of links. Must be specified according to the naming conventions

The GCP Lineage API supports OpenLineage major versions 1 and 2. For more information please refer to the documentation.


How to access GCP Lineage?

The code is now publicly available on GitHub. The library is also published to Maven.


GcpLineageTransport

To simplify integration with GCP Lineage, we offer GcpLineageTransport. It is available on the OpenLineage GitHub repository and is built to a separate maven artifact. It is built on top of the producer library mentioned above.

Using the transport minimises the code for sending events to GCP Lineage. The GcpLineageTransport can be configured as the event sink for any existing OpenLineage producer such as Airflow, Spark, and Flink. Find more information and examples on GCP Lineage.

By Mary Idamkina – Data Lineage

Introducing TensorFlow Recorder

Friday, August 7, 2020

When training computer vision machine learning models, data loading can often be a performance bottleneck, causing your GPU or TPU resources to be underutilized while waiting for data to be loaded into the model. Storing your dataset in the efficient TensorFlow Record (TFRecord) format is a great way to solve these problems, but creating TFRecords can unfortunately often require a great deal of complex code.

Last week we open sourced the TensorFlow Recorder project (also known as TFRecorder), which makes it possible for data scientists, data engineers, or AI/ML engineers to create image based TFRecords with just a few lines of code. Using TFRecords is incredibly important for creating efficient TensorFlow ML pipelines, but until now they haven’t been so easy to create. Before TFRecorder, in order to create TFRecords at scale you would have had to write a data pipeline that parsed your structured data, loaded images from storage, and serialized the results into the TFRecord format. TFRecorder allows you to write TFRecords directly from a Pandas dataframe or CSV without writing any complicated code.

You can see an example of TFRecoder below, but first let’s talk about some of the specific advantages of TFRecords.

How TFRecords Can Help

Using the TFRecord file format allows you to store your data in sets of files, each containing a sequence of protocol buffers serialized as a binary record that can be read very efficiently, which will help reduce the data loading bottleneck mentioned above.

Data loading performance can be further improved by implementing prefetching and parallel interleave along with using the TFRecord format. Prefetching reduces the time of each model training step(s) by fetching the data for the next training step while your model is executing training on the current step. Parallel interleave allows you to read from multiple TFRecords shards (pieces of a TFRecord file) and apply preprocessing of those interleaved data streams. This reduces the latency required to read a training batch and is especially helpful when reading data from the network.

Using TensorFlow Recorder

Creating a TFRecord using TFRecorder requires only a few lines of code. Here’s how it works.
import pandas as pd
import tfrecorder
df = pd.read_csv(...)
df.tensorflow.to_tfrecord(output_dir="gs://my/bucket")

TFRecorder currently expects data to be in the same format as Google AutoML Vision.

This format looks like a pandas dataframe or CSV formatted as:
splitimage_urilabel
TRAIN
gs://my/bucket/image1.jpgcat

Where:
  • split can take on the values TRAIN, VALIDATION, and TEST
  • image_uri specifies a local or google cloud storage location for the image file.
  • label can be either a text-based label that will be integerized or an integer
In the future, we hope to extend TensorFlow Recorder to work with data in any format.

While this example would work well to convert a few thousand images into TFRecords, it probably wouldn’t scale well if you have millions of images. To scale up to huge datasets, TensorFlow Recorder provides connectivity with Google Cloud Dataflow, which is a serverless Apache Beam pipeline runner. Scaling up to DataFlow requires only a little bit more configuration.
df.tensorflow.to_tfrecord(
output_dir="gs://my/bucket",
runner="DataFlowRunner",
project="my-project",
region="us-central1)

What’s next?

We’d love for you to try out TensorFlow Recorder. You can get it from GitHub or simply pip install tfrecorder. Tensorflow Recorder is very new and we’d greatly appreciate your feedback, suggestions, and pull requests.

By Mike Bernico and Carlos Ezequiel, Google Cloud AI Engineers

Kpt: Packaging up your Kubernetes configuration with git and YAML since 2014

Tuesday, March 31, 2020

Kubernetes configuration manifests have become an industry standard for deploying both custom and off-the-shelf applications (as well as for infrastructure). Manifests are combined into bundles to create higher-level deployable systems as well as reusable blueprints (such as a product offering, off the shelf software, or customizable starting point for a new application).

However, most teams lack the expertise or desire to create bespoke bundles of configuration from scratch and instead: 1) either fork them from another bundle, or 2) use some packaging solution which generates manifests from code.

Teams quickly discover they need to customize, validate, audit and re-publish their forked/ generated bundles for their environment. Most packaging solutions to date are tightly coupled to some format written as code (e.g. templates, DSLs, etc). This introduces a number of challenges when trying to extend, build on top of, or integrate them with other systems. For example, how does one update a forked template from upstream, or how does one apply custom validation?

Packaging is the foundation of building reusable components, but it also incurs a productivity tax on the users of those components.

Today we’d like to introduce kpt, an OSS tool for Kubernetes packaging, which uses a standard format to bundle, publish, customize, update, and apply configuration manifests.

Kpt is built around an “as data” architecture bundling Kubernetes resource configuration, a format for both humans and machines. The ability for tools to read and write the package contents using standardized data structures enables powerful new capabilities:
  • Any existing directory in a Git repo with configuration files can be used as a kpt package.
  • Packages can be arbitrarily customized and later pull in updates from upstream by merging them.
  • Tools and automation can perform high-level operations by transforming and validating package data on behalf of users or systems.
  • Organizations can develop their own tools and automation which operate against the package data.
  • Existing tools and automation that work with resource configuration “just work” with kpt.
  • Existing solutions that generate configuration (e.g. from templates or DSLs) can emit kpt packages which enable the above capabilities for them.

Example workflow with kpt

Now that we’ve established the benefits of using kpt for managing your packages of Kubernetes config, lets walk through how an enterprise might leverage kpt to package, share and use their best practices for Kubernetes across the organization.


First, a team within the organization may build and contribute to a repository of best practices (pictured in blue) for managing a certain type of application, for example a microservice (called “app”). As the best practices are developed within an organization, downstream teams will want to consume and modify configuration blueprints based on them. These blueprints provide a blessed starting point which adheres to organization policies and conventions.

The downstream team will get their own copy of a package by downloading it to their local filesystem (pictured in red) using kpt pkg get. This clones the git subdirectory, recording upstream metadata so that it can be updated later.

They may decide to update the number of replicas to fit their scaling requirements or may need to alter part of the image field to be the image name for their app. They can directly modify the configuration using a text editor (as would be done before). Alternatively, the package may define setters, allowing fields to be set programmatically using kpt cfg set. Setters streamline workflows by providing user and automation friendly commands to perform common operations.

Once the modifications have been made to the local filesystem, the team will commit and push their package to an app repository owned by them. From there, a CI/CD pipeline will kick off and the deployment process will begin. As a final customization before the package is deployed to the cluster, the CI/CD pipeline will inject the digest of the image it just built into the image field (using kpt cfg set). When the image digest has been set, the CI/CD pipeline can send the manifests to the cluster using kpt live apply. Kpt live operates like kubectl apply, providing additional functionality to prune resources deleted from the configuration and block on rollout completion (reporting status of the rollout back to the user).

Now that we’ve walked through how you might use kpt in your organization, we’d love it if you’d try it out, read the docs, or contribute.

One more thing

There’s still a lot to the story we didn’t cover here. Expect to hear more from us about:
  • Using kpt with GitOps
  • Building custom logic with functions
  • Writing effective blueprints with kpt and kustomize
By Phillip Wittrock, Software Engineer and Vic Iglesias, Cloud Solutions Architect

Importing SA360 WebQuery reports to BigQuery

Tuesday, February 11, 2020

Context

Search Ads 360 (SA360) is an enterprise-class search campaign management platform used by marketers to manage global ad campaigns across multiple engines. It offers powerful reporting capability through WebQuery reports, API, BiqQuery and Datastudio connectors.

Effective Ad campaign management requires multi-dimensional analysis of campaign data along with customers’ first-party data by building custom reports with dimensions combined from paid-search reports and business data.

Customers’ business data resides in a data-warehouse, which is designed for analysis, insights and reporting. To integrate ads data into the data-warehouse, the usual approach is to bring/ load the campaign data into the warehouse; to achieve this, SA360 offers various options to retrieve paid-search data, each of these methods provide a unique capabilities.
Comparison AreaWebQueryBQ ConnectorDatastudio ConnectorAPI
Technical complexityLow
Medium
Medium
High
Ease of report customizationHigh
Medium
Low
High
Reporting DetailsCompleteLimited
Reports not supported on API are not available
E.g.
Location targets
Remarketing targets
Audience reports
Possible Data WarehouseAny
The report is generic and needs to be loaded into the data-warehouse using DWs custom loading methods.
BigQuery ONLYNoneAny
Comparing these approaches, in terms of technical knowledge required, as well as, support for data warehousing solution, the easiest one is WebQuery report for which a marketer can build a report by choosing the dimensions/metrics they want on the SA360 User Interface.

BigQuery data-transfer service is limited to importing data in BigQuery and Datastudio connector does not allow retrieving data.

WebQuery offers a simpler and customizable method than other alternatives and also offers more options for the kind of data (vs. BQ transfer service which does not bring Business Data from SA360 to BigQuery). It was originally designed for Microsoft Excel to provide an updatable view of a report. In the era of cloud computing, a need was felt for a tool which would help consume the report and make it available on an analytical platform or a cloud data warehouse like BigQuery.

Solution Approach

This tool showcases how to bridge this gap of bringing SA360 data to a data warehouse, in generic fashion, where the report from SA360 is fetched in XML format and converted it into a CSV file using SAX parsers. This CSV file is then transferred to staging storage to be finally ETLed into the Data Warehouse.

As a concrete example, we chose to showcase a solution with BigQuery as the destination (cloud) data warehouse, though the solution architecture is flexible for any other system.

Conclusion

The tool helps marketers bring advertising data closer to their analytical systems helping them derive better insights. In case you use BigQuery as your Data Warehouse, you can use this tool as-is. You can also adopt by adding components for analytical/data-warehousing systems you use and improve it for the larger community.

To get started, follow our step-by-step guide.
Notable Features of the tool are as following:
  • Modular Authorization module
  • Handle arbitrarily large web-query reports
  • Batch mode to process multiple reports in a single call
  • Can be used as part of ETL workflow (Airflow compatible)
By Anant Damle, Solutions Architect and Meera Youn, Technical Partnership Lead
.