If you implement an own InputFormat to access a Cassandra database from Apache Flink, you could probably extend the RichInputFormat. The following example defines the entry point for the specification of an own InputFormat that maps the specified MyClass to the data retrieved from the database:
import com.datastax.driver.mapping.annotations.ClusteringColumn; import com.datastax.driver.mapping.annotations.Column; import com.datastax.driver.mapping.annotations.PartitionKey; import com.datastax.driver.mapping.annotations.Table; import java.io.Serializable; import java.util.UUID; @Table(name = "mydata", keyspace = "TestImport") final public class MyClass implements Serializable { @Column(name = "uuid") @PartitionKey private UUID uuid = UUID.randomUUID(); @Column(name = "name") private String name = null; } import org.apache.flink.api.common.io.NonParallelInput; import org.apache.flink.api.common.io.RichInputFormat; import org.apache.flink.core.io.InputSplit; public class CassandraOutFormat<OUT extends MyClass> extends RichInputFormat<MyClass, InputSplit> implements NonParallelInput { // Implement the requested methods }
The important and relevant steps are:
- Specify the MyClass as Table via the annotations.
- The OUT is necessary to have the correct mapping within Apache Flink Streams.
- The RichInputFormat needs the specific class MyClass in its specification.
If thesen preconditions are not fulfilled, the following definition and execution of a query against Cassandra would throw the following exception:
error] Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'OT' in 'class org.apache.flink.api.common.io.RichInputFormat' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s).
An example query definition and execution could be as follows:
object Test { def main(args: Array[String]): Unit = { val senv = StreamExecutionEnvironment.getExecutionEnvironment val source = senv.createInput[MyClass]( new CassandraOutFormat[MyClass]( "SELECT uuid, name FROM TestImport.mydata WHERE uuid = x;", new ClusterBuilder() { override def buildCluster(builder: Cluster.Builder): Cluster = builder.addContactPoint("127.0.0.1").build() // local test } ) ) val result = source .setParallelism(1) val w = result.writeAsText("/tmp/data") val _ = senv.execute() } }