聊聊flink的JDBCOutputFormat

聊聊flink的JDBCOutputFormat序本文主要研究一下flink的JDBCOutputFormatJDBCOutputFormatflink-jdbc_2.11-1.7.0-sources.jar!/org/apache/flink/a

本文主要研究一下flink的JDBCOutputFormat

JDBCOutputFormat

flink-jdbc_2.11-1.7.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java

/** * OutputFormat to write Rows into a JDBC database. * The OutputFormat has to be configured using the supplied OutputFormatBuilder. * * @see Row * @see DriverManager */
public class JDBCOutputFormat extends RichOutputFormat<Row> {
	private static final long serialVersionUID = 1L;
	static final int DEFAULT_BATCH_INTERVAL = 5000;

	private static final Logger LOG = LoggerFactory.getLogger(JDBCOutputFormat.class);

	private String username;
	private String password;
	private String drivername;
	private String dbURL;
	private String query;
	private int batchInterval = DEFAULT_BATCH_INTERVAL;

	private Connection dbConn;
	private PreparedStatement upload;

	private int batchCount = 0;

	private int[] typesArray;

	public JDBCOutputFormat() {
	}

	@Override
	public void configure(Configuration parameters) {
	}

	/** * Connects to the target database and initializes the prepared statement. * * @param taskNumber The number of the parallel instance. * @throws IOException Thrown, if the output could not be opened due to an * I/O problem. */
	@Override
	public void open(int taskNumber, int numTasks) throws IOException {
		try {
			establishConnection();
			upload = dbConn.prepareStatement(query);
		} catch (SQLException sqe) {
			throw new IllegalArgumentException("open() failed.", sqe);
		} catch (ClassNotFoundException cnfe) {
			throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
		}
	}

	private void establishConnection() throws SQLException, ClassNotFoundException {
		Class.forName(drivername);
		if (username == null) {
			dbConn = DriverManager.getConnection(dbURL);
		} else {
			dbConn = DriverManager.getConnection(dbURL, username, password);
		}
	}

	/** * Adds a record to the prepared statement. * * <p>When this method is called, the output format is guaranteed to be opened. * * <p>WARNING: this may fail when no column types specified (because a best effort approach is attempted in order to * insert a null value but it's not guaranteed that the JDBC driver handles PreparedStatement.setObject(pos, null)) * * @param row The records to add to the output. * @see PreparedStatement * @throws IOException Thrown, if the records could not be added due to an I/O problem. */
	@Override
	public void writeRecord(Row row) throws IOException {

		if (typesArray != null && typesArray.length > 0 && typesArray.length != row.getArity()) {
			LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array...");
		}
		try {

			if (typesArray == null) {
				// no types provided
				for (int index = 0; index < row.getArity(); index++) {
					LOG.warn("Unknown column type for column {}. Best effort approach to set its value: {}.", index + 1, row.getField(index));
					upload.setObject(index + 1, row.getField(index));
				}
			} else {
				// types provided
				for (int index = 0; index < row.getArity(); index++) {

					if (row.getField(index) == null) {
						upload.setNull(index + 1, typesArray[index]);
					} else {
						// casting values as suggested by http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html
						switch (typesArray[index]) {
							case java.sql.Types.NULL:
								upload.setNull(index + 1, typesArray[index]);
								break;
							case java.sql.Types.BOOLEAN:
							case java.sql.Types.BIT:
								upload.setBoolean(index + 1, (boolean) row.getField(index));
								break;
							case java.sql.Types.CHAR:
							case java.sql.Types.NCHAR:
							case java.sql.Types.VARCHAR:
							case java.sql.Types.LONGVARCHAR:
							case java.sql.Types.LONGNVARCHAR:
								upload.setString(index + 1, (String) row.getField(index));
								break;
							case java.sql.Types.TINYINT:
								upload.setByte(index + 1, (byte) row.getField(index));
								break;
							case java.sql.Types.SMALLINT:
								upload.setShort(index + 1, (short) row.getField(index));
								break;
							case java.sql.Types.INTEGER:
								upload.setInt(index + 1, (int) row.getField(index));
								break;
							case java.sql.Types.BIGINT:
								upload.setLong(index + 1, (long) row.getField(index));
								break;
							case java.sql.Types.REAL:
								upload.setFloat(index + 1, (float) row.getField(index));
								break;
							case java.sql.Types.FLOAT:
							case java.sql.Types.DOUBLE:
								upload.setDouble(index + 1, (double) row.getField(index));
								break;
							case java.sql.Types.DECIMAL:
							case java.sql.Types.NUMERIC:
								upload.setBigDecimal(index + 1, (java.math.BigDecimal) row.getField(index));
								break;
							case java.sql.Types.DATE:
								upload.setDate(index + 1, (java.sql.Date) row.getField(index));
								break;
							case java.sql.Types.TIME:
								upload.setTime(index + 1, (java.sql.Time) row.getField(index));
								break;
							case java.sql.Types.TIMESTAMP:
								upload.setTimestamp(index + 1, (java.sql.Timestamp) row.getField(index));
								break;
							case java.sql.Types.BINARY:
							case java.sql.Types.VARBINARY:
							case java.sql.Types.LONGVARBINARY:
								upload.setBytes(index + 1, (byte[]) row.getField(index));
								break;
							default:
								upload.setObject(index + 1, row.getField(index));
								LOG.warn("Unmanaged sql type ({}) for column {}. Best effort approach to set its value: {}.",
									typesArray[index], index + 1, row.getField(index));
								// case java.sql.Types.SQLXML
								// case java.sql.Types.ARRAY:
								// case java.sql.Types.JAVA_OBJECT:
								// case java.sql.Types.BLOB:
								// case java.sql.Types.CLOB:
								// case java.sql.Types.NCLOB:
								// case java.sql.Types.DATALINK:
								// case java.sql.Types.DISTINCT:
								// case java.sql.Types.OTHER:
								// case java.sql.Types.REF:
								// case java.sql.Types.ROWID:
								// case java.sql.Types.STRUC
						}
					}
				}
			}
			upload.addBatch();
			batchCount++;
		} catch (SQLException e) {
			throw new RuntimeException("Preparation of JDBC statement failed.", e);
		}

		if (batchCount >= batchInterval) {
			// execute batch
			flush();
		}
	}

	void flush() {
		try {
			upload.executeBatch();
			batchCount = 0;
		} catch (SQLException e) {
			throw new RuntimeException("Execution of JDBC statement failed.", e);
		}
	}

	int[] getTypesArray() {
		return typesArray;
	}

	/** * Executes prepared statement and closes all resources of this instance. * * @throws IOException Thrown, if the input could not be closed properly. */
	@Override
	public void close() throws IOException {
		if (upload != null) {
			flush();
			// close the connection
			try {
				upload.close();
			} catch (SQLException e) {
				LOG.info("JDBC statement could not be closed: " + e.getMessage());
			} finally {
				upload = null;
			}
		}

		if (dbConn != null) {
			try {
				dbConn.close();
			} catch (SQLException se) {
				LOG.info("JDBC connection could not be closed: " + se.getMessage());
			} finally {
				dbConn = null;
			}
		}
	}

	public static JDBCOutputFormatBuilder buildJDBCOutputFormat() {
		return new JDBCOutputFormatBuilder();
	}

	//......
}
  • JDBCOutputFormat继承了RichOutputFormat,这里的泛型为org.apache.flink.types.Row
  • open的时候调用了establishConnection来加载驱动,初始化dbConn,然后调用dbConn.prepareStatement(query)来获取upload(PreparedStatement)
  • writeRecord方法先判断是否有提供typesArray,没有的话则使用setObject来设置值,有点话则根据对应的类型进行转换,这里支持了多种java.sql.Types里头的类型
  • writeRecord采取的是PreparedStatement.addBatch操作,当batchCount大于等于batchInterval(默认5000),会执行flush操作,也就是调用PreparedStatement.executeBatch方法,然后重置batchCount;为了以防数据没达到batchInterval而未能提交,在close的时候会再次执行flush操作,然后才关闭PreparedStatement、Connection
  • JDBCOutputFormat提供了一个JDBCOutputFormatBuilder,可以用来方便构建JDBCOutputFormat

Row

flink-core-1.7.0-sources.jar!/org/apache/flink/types/Row.java

/** * A Row can have arbitrary number of fields and contain a set of fields, which may all be * different types. The fields in Row can be null. Due to Row is not strongly typed, Flink's * type extraction mechanism can't extract correct field types. So that users should manually * tell Flink the type information via creating a {@link RowTypeInfo}. * * <p> * The fields in the Row can be accessed by position (zero-based) {@link #getField(int)}. And can * set fields by {@link #setField(int, Object)}. * <p> * Row is in principle serializable. However, it may contain non-serializable fields, * in which case serialization will fail. * */
@PublicEvolving
public class Row implements Serializable{

	private static final long serialVersionUID = 1L;

	/** The array to store actual values. */
	private final Object[] fields;

	/** * Create a new Row instance. * @param arity The number of fields in the Row */
	public Row(int arity) {
		this.fields = new Object[arity];
	}

	/** * Get the number of fields in the Row. * @return The number of fields in the Row. */
	public int getArity() {
		return fields.length;
	}

	/** * Gets the field at the specified position. * @param pos The position of the field, 0-based. * @return The field at the specified position. * @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields. */
	public Object getField(int pos) {
		return fields[pos];
	}

	/** * Sets the field at the specified position. * * @param pos The position of the field, 0-based. * @param value The value to be assigned to the field at the specified position. * @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields. */
	public void setField(int pos, Object value) {
		fields[pos] = value;
	}

	@Override
	public String toString() {
		StringBuilder sb = new StringBuilder();
		for (int i = 0; i < fields.length; i++) {
			if (i > 0) {
				sb.append(",");
			}
			sb.append(StringUtils.arrayAwareToString(fields[i]));
		}
		return sb.toString();
	}

	@Override
	public boolean equals(Object o) {
		if (this == o) {
			return true;
		}
		if (o == null || getClass() != o.getClass()) {
			return false;
		}

		Row row = (Row) o;

		return Arrays.deepEquals(fields, row.fields);
	}

	@Override
	public int hashCode() {
		return Arrays.deepHashCode(fields);
	}

	/** * Creates a new Row and assigns the given values to the Row's fields. * This is more convenient than using the constructor. * * <p>For example: * * <pre> * Row.of("hello", true, 1L);} * </pre> * instead of * <pre> * Row row = new Row(3); * row.setField(0, "hello"); * row.setField(1, true); * row.setField(2, 1L); * </pre> * */
	public static Row of(Object... values) {
		Row row = new Row(values.length);
		for (int i = 0; i < values.length; i++) {
			row.setField(i, values[i]);
		}
		return row;
	}

	/** * Creates a new Row which copied from another row. * This method does not perform a deep copy. * * @param row The row being copied. * @return The cloned new Row */
	public static Row copy(Row row) {
		final Row newRow = new Row(row.fields.length);
		System.arraycopy(row.fields, 0, newRow.fields, 0, row.fields.length);
		return newRow;
	}

	/** * Creates a new Row with projected fields from another row. * This method does not perform a deep copy. * * @param fields fields to be projected * @return the new projected Row */
	public static Row project(Row row, int[] fields) {
		final Row newRow = new Row(fields.length);
		for (int i = 0; i < fields.length; i++) {
			newRow.fields[i] = row.fields[fields[i]];
		}
		return newRow;
	}
}
  • Row是JDBCOutputFormat的writeRecord的类型,它里头使用Object数据来存取字段值,同时也提供了诸如of、copy、project等静态方法

JDBCOutputFormatBuilder

flink-jdbc_2.11-1.7.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java

	/** * Builder for a {@link JDBCOutputFormat}. */
	public static class JDBCOutputFormatBuilder {
		private final JDBCOutputFormat format;

		protected JDBCOutputFormatBuilder() {
			this.format = new JDBCOutputFormat();
		}

		public JDBCOutputFormatBuilder setUsername(String username) {
			format.username = username;
			return this;
		}

		public JDBCOutputFormatBuilder setPassword(String password) {
			format.password = password;
			return this;
		}

		public JDBCOutputFormatBuilder setDrivername(String drivername) {
			format.drivername = drivername;
			return this;
		}

		public JDBCOutputFormatBuilder setDBUrl(String dbURL) {
			format.dbURL = dbURL;
			return this;
		}

		public JDBCOutputFormatBuilder setQuery(String query) {
			format.query = query;
			return this;
		}

		public JDBCOutputFormatBuilder setBatchInterval(int batchInterval) {
			format.batchInterval = batchInterval;
			return this;
		}

		public JDBCOutputFormatBuilder setSqlTypes(int[] typesArray) {
			format.typesArray = typesArray;
			return this;
		}

		/** * Finalizes the configuration and checks validity. * * @return Configured JDBCOutputFormat */
		public JDBCOutputFormat finish() {
			if (format.username == null) {
				LOG.info("Username was not supplied.");
			}
			if (format.password == null) {
				LOG.info("Password was not supplied.");
			}
			if (format.dbURL == null) {
				throw new IllegalArgumentException("No database URL supplied.");
			}
			if (format.query == null) {
				throw new IllegalArgumentException("No query supplied.");
			}
			if (format.drivername == null) {
				throw new IllegalArgumentException("No driver supplied.");
			}

			return format;
		}
	}
  • JDBCOutputFormatBuilder提供了对username、password、dbURL、query、drivername、batchInterval、typesArray这几个属性的builder方法

JDBCAppendTableSink

flink-jdbc_2.11-1.7.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java

/** * An at-least-once Table sink for JDBC. * * <p>The mechanisms of Flink guarantees delivering messages at-least-once to this sink (if * checkpointing is enabled). However, one common use case is to run idempotent queries * (e.g., <code>REPLACE</code> or <code>INSERT OVERWRITE</code>) to upsert into the database and * achieve exactly-once semantic.</p> */
public class JDBCAppendTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> {

	private final JDBCOutputFormat outputFormat;

	private String[] fieldNames;
	private TypeInformation[] fieldTypes;

	JDBCAppendTableSink(JDBCOutputFormat outputFormat) {
		this.outputFormat = outputFormat;
	}

	public static JDBCAppendTableSinkBuilder builder() {
		return new JDBCAppendTableSinkBuilder();
	}

	@Override
	public void emitDataStream(DataStream<Row> dataStream) {
		dataStream
				.addSink(new JDBCSinkFunction(outputFormat))
				.name(TableConnectorUtil.generateRuntimeName(this.getClass(), fieldNames));
	}

	@Override
	public void emitDataSet(DataSet<Row> dataSet) {
		dataSet.output(outputFormat);
	}

	@Override
	public TypeInformation<Row> getOutputType() {
		return new RowTypeInfo(fieldTypes, fieldNames);
	}

	@Override
	public String[] getFieldNames() {
		return fieldNames;
	}

	@Override
	public TypeInformation<?>[] getFieldTypes() {
		return fieldTypes;
	}

	@Override
	public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
		int[] types = outputFormat.getTypesArray();

		String sinkSchema =
			String.join(", ", IntStream.of(types).mapToObj(JDBCTypeUtil::getTypeName).collect(Collectors.toList()));
		String tableSchema =
			String.join(", ", Stream.of(fieldTypes).map(JDBCTypeUtil::getTypeName).collect(Collectors.toList()));
		String msg = String.format("Schema of output table is incompatible with JDBCAppendTableSink schema. " +
			"Table schema: [%s], sink schema: [%s]", tableSchema, sinkSchema);

		Preconditions.checkArgument(fieldTypes.length == types.length, msg);
		for (int i = 0; i < types.length; ++i) {
			Preconditions.checkArgument(
				JDBCTypeUtil.typeInformationToSqlType(fieldTypes[i]) == types[i],
				msg);
		}

		JDBCAppendTableSink copy;
		try {
			copy = new JDBCAppendTableSink(InstantiationUtil.clone(outputFormat));
		} catch (IOException | ClassNotFoundException e) {
			throw new RuntimeException(e);
		}

		copy.fieldNames = fieldNames;
		copy.fieldTypes = fieldTypes;
		return copy;
	}

	@VisibleForTesting
	JDBCOutputFormat getOutputFormat() {
		return outputFormat;
	}
}
  • JDBCAppendTableSink里头用到了JDBCOutputFormat,它实现了AppendStreamTableSink以及BatchTableSink接口
  • 它的emitDataStream方法会给传入的dataStream设置JDBCSinkFunction的sink(JDBCSinkFunction);而emitDataSet方法则对dataSet设置output
  • 这里实现了TableSink(BatchTableSink声明实现TableSink)的getOutputType、getFieldNames、getFieldTypes、configure方法;configure方法这里主要是根据JDBCOutputFormat创建了JDBCAppendTableSink

JDBCSinkFunction

flink-jdbc_2.11-1.7.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCSinkFunction.java

class JDBCSinkFunction extends RichSinkFunction<Row> implements CheckpointedFunction {
	final JDBCOutputFormat outputFormat;

	JDBCSinkFunction(JDBCOutputFormat outputFormat) {
		this.outputFormat = outputFormat;
	}

	@Override
	public void invoke(Row value) throws Exception {
		outputFormat.writeRecord(value);
	}

	@Override
	public void snapshotState(FunctionSnapshotContext context) throws Exception {
		outputFormat.flush();
	}

	@Override
	public void initializeState(FunctionInitializationContext context) throws Exception {
	}

	@Override
	public void open(Configuration parameters) throws Exception {
		super.open(parameters);
		RuntimeContext ctx = getRuntimeContext();
		outputFormat.setRuntimeContext(ctx);
		outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
	}

	@Override
	public void close() throws Exception {
		outputFormat.close();
		super.close();
	}
}
  • JDBCSinkFunction继承了RichSinkFunction,同时也实现了CheckpointedFunction接口;invoke方法使用的是JDBCOutputFormat.writeRecord方法,而snapshotState则是调用了JDBCOutputFormat.flush来及时提交记录

小结

  • JDBCOutputFormat继承了RichOutputFormat,open的时候调用了establishConnection来加载驱动,初始化dbConn,然后调用dbConn.prepareStatement(query)来获取upload(PreparedStatement);writeRecord采取的是PreparedStatement.addBatch操作,当batchCount大于等于batchInterval(默认5000),会执行flush操作,也就是调用PreparedStatement.executeBatch方法,然后重置batchCount;为了以防数据没达到batchInterval而未能提交,在close的时候会再次执行flush操作,然后才关闭PreparedStatement、Connection
  • Row是JDBCOutputFormat的writeRecord的类型,它里头使用Object数据来存取字段值
  • JDBCOutputFormatBuilder提供了对username、password、dbURL、query、drivername、batchInterval、typesArray这几个属性的builder方法
  • JDBCAppendTableSink里头用到了JDBCOutputFormat,它的emitDataStream方法会给传入的dataStream设置JDBCSinkFunction的sink(JDBCSinkFunction);而emitDataSet方法则对dataSet设置output
  • JDBCSinkFunction继承了RichSinkFunction,同时也实现了CheckpointedFunction接口;invoke方法使用的是JDBCOutputFormat.writeRecord方法,而snapshotState则是调用了JDBCOutputFormat.flush来及时提交记录

doc

今天的文章聊聊flink的JDBCOutputFormat分享到此就结束了,感谢您的阅读。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/21287.html

(0)
编程小号编程小号

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注