Read From Mongo in Spark and Create Data Frame With Authenticatiomn
Overview
- Go to know different types of Apache Spark data sources
- Understand the options available on various spark data sources
Introduction
The ability to read and write from dissimilar kinds of information sources and for the community to create its own contributions is arguably one of Spark's greatest strengths.
As a full general computing engine, Spark can process data from various data management/storage systems, including HDFS, Hive, Cassandra, and Kafka. For flexibility and high throughput, Spark defines the Data Source API, which is an abstraction of the storage layer.
This Data Source API has two requirements:
- Generality: Back up reading/writing most data management/storage systems.
- Flexibility: Customize and optimize the read and write paths for dissimilar systems based on their capabilities.
This article will requite you lot a better understanding of all the core information sources bachelor. Yous will be introduced to a variety of information sources that you tin use with Spark out of the box as well as the countless other sources built by the greater community. Spark has half dozen "core" data sources and hundreds of external data sources written by the community.
Table of Contents
- Structure of Spark's Data Source API
- Read API Structure
- Write API Construction
- Apache Spark Information Sources you lot Should Know About
- CSV
- JSON
- Parquet
- ORC
- Text
- JDBC/ODBC connections
Cadre Data Sources in Apache Spark
Here are the core data sources in Apache Spark you should know about:
1.CSV
2.JSON
three.Parquet
iv.ORC
5.JDBC/ODBC connections
6.Patently-text files
There are several community-created data sources as well:
i. Cassandra
2. HBase
3. MongoDB
4. AWS Redshift
five. XML
And many, many others
Construction of Apache Spark's DataSources API
DataFrameReader.format(...).option("key", "value").schema(...).load()
where .format is used to read all data sources.
.format is optional as by default Spark volition use parquet format. The Pick allows us to gear up the key-value configuration to parameterize how data has to be read.
Lastly, the schema is optional if information sources provide schema or you intend to provide schema inference.
-
Read API structure
The bones of reading data in Spark is through DataFrameReader. This can be accessed through SparkSession through the read attribute shown below:
spark.read
Every bit we have DataFrameReader, nosotros tin specify multiple values. In that location are multiple sets of options for different data sources which determines how the information has to be read. All the options tin be omitted except one. At the minimum DataFrameReader should be provided with the path from which files to read.
spark.read.format("csv") .option("mode", "FAILFAST") .option("inferSchema", "truthful") .selection("path", "path/to/file(s)") .schema(someSchema) .load()
Reading Modes
When working with semi-structured information sources more oftentimes we run across data that is malformed. Read mode specifies what needs to exist done when such data is encountered.
permissive Sets all fields to nix when it encounters a corrupted record and places all corrupted records in a string column chosen _corrupt_record
dropMalformed Drops the row that contains malformed records failFast Fails immediately upon encountering malformed records The default is permissive.
-
Write API Structure
DataFrameWriter.format(...) .choice(...) .partitionBy(...) .bucketBy(...) .sortBy( ...) .save()
.format specified how the file needs to be written to the data sources.
.option is optional as Spark uses parquet past default.
.PartitionBy, .bucketBy, .sortBy are just used with file-based data sources and control the file construction otr layout at the destination.
Writing Data
Writing data is the same every bit reading data. Only, DataFrameReader is replaced by DataFrameWriter.
dataFrame.write
With the DataFrameWriter we demand to give format, series of options, and salvage path. We tin can specify many options but at the minimum, we need to give the destination path.
dataframe.write.format("csv") .option("manner", "OVERWRITE") .option("dateFormat", "yyyy-MM-dd") .option("path", "path/to/file(s)") .salve()
Save Modes
append Appends the output files to the list of files that already be at that location overwrite Will completely overwrite whatsoever data that already exists in that location errorIfExists Throws an error and fails the write if information or files already exist at the specified location ignore If data or files exist at the location, exercise nada with the current DataFrame errorIfExists fails to write the data if Spark finds information present in the destination path.
The Different Apache Spark Data Sources You Should Know About
-
CSV
CSV stands for comma-separated values. This is a mutual text file format in which each line represents a unmarried tape and each field is separated by a comma within a record. CSV format is well structured but maybe ane of the trickiest file formats to piece of work inside the production scenarios because non many assumptions tin can exist made about what they contain and how they are structured.
For this reason, CSV reader has a big number of options. These options give y'all the ability to work around problems like certain characters needing to be escaped—for instance, commas within of columns when the file is as well comma-delimited or cypher values labeled in an unconventional way.
Reading CSV Files
spark.read.format("csv") .option("header", "true") .option("mode", "FAILFAST") .choice("inferSchema", "true") .load("some/path/to/file.csv")
If you accept a header with column names on file, you need to explicitly specify
true
for the header option, the API treats the header as a information tape.You can also specify data sources with their fully qualified name(i.e.,
org.apache.spark.sql.csv
), but for built-in sources, you lot can also use their short names (csv
,json
,parquet
,jdbc
,text
e.t.c).When reading CSV files with a specified schema, it is possible that the information in the files does not friction match the schema. For instance, a field containing the proper name of the urban center will non parse as an integer. The consequences depend on the mode that the parser runs in:
-
PERMISSIVE
(default): nulls are inserted for fields that could not be parsed correctly -
DROPMALFORMED
: drops lines that comprise fields that could not be parsed -
FAILFAST
: aborts the reading if any malformed data is establish.
The table below presents the options available on CSV reader:
Read/write Key Potential values Default Description Both sep Any unmarried string character
, The single character that is used as a separator for each field and value. Both header true, false simulated A Boolean flag that declares whether the first line in the file(s) are the names of the columns. Read escape Any string character \ The character Spark should use to escape other characters in the file. Read inferSchema true, false false Specifies whether Spark should infer column types when reading the file. Read ignoreLeadingWhiteSpace truthful, false false Declares whether leading spaces from values being read should exist skipped. Read ignoreTrailingWhiteSpace truthful, faux false Declares whether trailing spaces from values being read should be skipped. Both nullValue JSON data source options: Any cord character "" Declares what graphic symbol represents a null value in the file. Both nanValue Whatever string character NaN Declares what grapheme represents a NaN or missing character in the CSV file. Both positiveInf Any string or graphic symbol
Inf Declares what character(due south) represent a positive infinite value. Both negativeInf Whatsoever string or character
-Inf Declares what character(s) represent a negative infinite value. Both compression or codec None, uncompressed, bzip2, debunk,
gzip, lz4, or snappy
none Declares what compression codec Spark should utilise to read or write the file. Both dateFormat Any cord or character that
conforms to java'south
SimpleDataFormat.
YYYY-MM-dd Declares the appointment format for any columns that are appointment blazon. Both timestampFormat Any string or character that
conforms to java's
SimpleDataFormat.
YYYY-MM- dd'T'HH:mm
:ss.SSSZZ
Declares the timestamp format for whatsoever timestamp type. Read maxColumns Whatsoever integer 20480 Declares the maximum number of columns in the file. Read maxCharsPerColumn Any integer meg Declares the maximum number of characters in a cavalcade. Read escapeQuotes true, fake true Declares whether Spark should escape quotes that are found in lines. Read maxMalformedLogPerPartition Any integer x Sets the maximum number of malformed rows Spark will log for each sectionalization. Malformed records across this number will be ignored. Write quoteAll true, simulated simulated Specifies whether all values should be enclosed in quotes, as opposed to merely escaping values that accept a quote graphic symbol. Read multiLine true, imitation fake This option allows you lot to read multiline CSV files where each logical row in the CSV file might span multiple rows in the file itself. Writing CSV Files
csvFile.write.format("csv") .manner("overwrite") .option("sep", "\t")\ .save("/tmp/my-tsv-file.tsv")
-
-
JSON
People coming from different programming languages especially from Coffee and JavaScript must be aware of JavaScript Object Notation, or JSON past which information technology'south popularly known. In Spark, when we refer to JSON files, nosotros refer to line-delimited JSON files. This contrasts with files that have a large JSON object or array per file.
When working in Spark when we refer to JSON files, we refer to the line delimited JSON files. The line-delimited versus multiline trade-off is controlled by a single pick: multiLine. When y'all
set this pick to true, you can read an unabridged file every bit one JSON object and Spark volition go through the work of parsing that into a DataFrame.Spark SQL tin automatically infer the schema of a JSON dataset and load it as a DataFrame. This conversion tin be washed using
SparkSession.read.json
a JSON file. Annotation that the file that is offered every bit a JSON file is not a typical JSON file. Each line must contain a split up, self-contained valid JSON object.For more information, please encounter JSON Lines text format, also chosen newline-delimited JSON.
JSON data source options:
Any single cord
Read/write Key Potential values Default Description Both None None, uncompressed, bzip2, debunk,gzip, lz4, or snappy none DeclDeclares what compression codec Spark should use to read or write the file. Both dateFormat Any string or character that conforms to Java's yyyy-MM-dd SimpleDataFormat. Declares the date format for any columns that are date blazon. Both dateFormat Any cord or graphic symbol that conforms to Java's yyyy-MM-dd SimpleDataFormat. Declares the date format for any columns that are engagement type. Both primitiveAsString truthful, imitation false Infers all primitive values as string blazon. Both timestampFormat Whatsoever string or graphic symbol that conforms to Java'south yyyy-MM-dd'T'HH:mm:ss.SSSZZ SimpleDataFormat. Declares the timestamp format for whatsoever columns that are timestamp type. Read allowComments true, fake false Ignores Java/C++ style comment in JSON records. Read allowUnquoted- FieldNames
truthful, faux false Allows unquoted JSON field names. Read allowSingleQuotes true, false truthful Allows single quotes in add-on to double quotes. Read multiLine true, simulated faux Allows for reading in non-line- delimited JSON files. Read allowNumeric- LeadingZeros
true, false false Allows leading zeroes in numbers (e.g., 00012). Read allowBackslash- EscapingAnyCharacter
true, false false Allows accepting quoting of all characters using backslash quoting mechanism. Read columnNameOf- CorruptRecord
Whatever string Value of spark.sql.column&NameOfCorruptRecord Allows renaming the new field having a malformed string created Value of by permissive manner. This will override the configuration value. Reading JSON Files
spark.read.format("json") .option("way", "FAILFAST")\ .option("inferSchema", "true")\ .load("/data/movie-data/json/2010-summary.json")
Writing JSON Files
csvFile.write.format("json") .mode("overwrite") .relieve("/tmp/my-json-file.json")
-
Parquet Files
Parquet is an open up-source file format available to any project in the Hadoop ecosystem. Apache Parquet is designed for efficiency likewise as the performant flat columnar storage format of data compared to row-based files like CSV or TSV files.
Parquet uses the record shredding and assembly algorithm which is superior to the elementary flattening of nested namespaces. Parquet is optimized to work with complex data in bulk and features unlike ways for efficient information compression and encoding types. This approach is all-time particularly for those queries that demand to read certain columns from a large tabular array. Parquet tin merely read the needed columns therefore profoundly minimizing the IO.
Reading Parquet Files
spark.read.format("parquet")\ .load("/information/movie-data/parquet/2020-summary.parquet").show(5)
Writing Parquet Files
csvFile.write.format("parquet") .mode("overwrite")\ .save("/tmp/my-parquet-file.parquet"
-
ORC
The Optimized Row Columnar (ORC) file format provides a highly efficient way to store Hive data. It was designed to overcome the limitations of the other Hive file formats. Using ORC files improves performance when Hive is reading, writing, and processing data.
Compared with RCFile format, for example, ORC file format has many advantages such every bit:
- a unmarried file as the output of each task, which reduces the NameNode'southward load
- Hive type support including datetime, decimal, and the complex types (struct, list, map, and union)
- light-weight indexes stored inside the file
- skip row groups that don't pass predicate filtering
- seek to a given row
- block-way pinch based on the data type
- run-length encoding for integer columns
- dictionary encoding for string columns
- concurrent reads of the same file using split up RecordReaders
- power to carve up files without scanning for markers
- bound the amount of retention needed for reading or writing
- metadata stored using Protocol Buffers, which allows addition and removal of fields
Reading Orc Files
spark.read.format("orc") .load("/information/movie-data/orc/2020-summary.orc").testify(5)
Writing Orc Files
csvFile.write.format("orc") .manner("overwrite") .save("/tmp/my-json-file.orc"
-
Text Files
Spark also allows y'all to read in plain-text files. Each line in the file becomes a record in the DataFrame. It is so upward to you to transform information technology accordingly. As an example of how you would do this, suppose that yous need to parse some Apache log files to some more structured format, or peradventure you lot want to parse some obviously text for natural-language processing.
Text files brand a great argument for the Dataset API due to its power to take advantage of the flexibility of native types.
Reading Text Files
spark.read.textFile("/information/motion-picture show-information/csv/2020-summary.csv") .selectExpr("dissever(value, ',') every bit rows").show()
Writing Text Files
csvFile.select("DEST_COUNTRY_NAME") .write.text("/tmp/unproblematic-text-file.txt"
-
JDBC/ODBC connections
SQL information sources are 1 of the more than powerful connectors because at that place are a diversity of systems to which you tin can connect (as long as that system speaks SQL). For instance, you tin connect to a MySQL database, a PostgreSQL database, or an Oracle database. Y'all also can connect to SQLite, which is what nosotros'll do in this instance.
Of course, databases aren't just a set of raw files, so there are more than options to consider regarding how y'all connect to the database. Namely, you're going to need to begin considering things like authentication and connectivity (yous'll demand to determine whether the network of your Spark cluster is connected to the network of your database system).
To get started yous will demand to include the JDBC commuter for your detail database on the spark classpath. For example, to connect to Postgres from the Spark Shell you would run the following command:
./bin/spark-shell \
--driver-class-path postgresql-9.4.1207.jar \
--jars postgresql-9.4.1207.jar
Tables from external or remote databases tin exist loaded as a Dataframe or temporary view using data source API.
Reading data from a JDBC source
jdbcDF = spark.read \ .format("jdbc") \ .selection("url", "jdbc:postgresql:dbserver") \ .option("dbtable", "schema.tablename") \ .option("user", "username") \ .pick("password", "countersign") \ .load()
Saving information to a JDBC source
jdbcDF.write \ .format("jdbc") \ .option("url", "jdbc:postgresql:dbserver") \ .option("dbtable", "schema.tablename") \ .option("user", "username") \ .option("countersign", "password") \ .save()
End Notes
Nosotros take discussed a multifariousness of sources available to you for reading and writing information in Spark. This covers nigh everything you lot'll need to know every bit an everyday user of Spark with respect to data sources. For the curious, there are ways of implementing your own data source.
I recommend y'all go through the following data engineering resources to enhance your noesis-
- Getting Started with Apache Hive – A Must Know Tool For all Big Data and Data Engineering science Professionals
- Introduction to the Hadoop Ecosystem for Large Data and Data Engineering
- Types of Tables in Apache Hive – A Quick Overview
I promise you lot liked the article. Do not forget to drop in your comments in the comments section beneath.
Source: https://www.analyticsvidhya.com/blog/2020/10/data-engineering-101-data-sources-apache-spark/
0 Response to "Read From Mongo in Spark and Create Data Frame With Authenticatiomn"
Post a Comment