Change Data Capture
enable realtime event streaming from ditto to your own third party data products by upgrading and implementing apache kafka change data capture (cdc) in your system architecture once /#requesting an upgrade and /#integrating a kafka c onsumer , similar to how small peers use observe to monitor local queries and react to any future data changes, the big peer will automatically deliver data change events for every change in the entire app to the kafka user consumable topics you've specified a kafka topic is a document change stream used as a queue where users or consumer applications can consume messages in a sequential and ordered way this article provides step by step instructions for upgrading and integrating your kafka consumer with ditto, along with an overview of ditto event notifications that indicate changes have occurred /#requesting an upgrade /#integrating a kafka c onsumer combing edge and big data this premium option provides asynchronous delivery of a structured queue containing data change events to any kafka consumer you choose leveraging these change events, you can dynamically respond in realtime within your external systems and tools, for instance, update a dashboard to reflect the most up‑to‑date information from the big peer with cdc's capabilities, you combine realtime data capture and processing at the transactional edge with the scalability and analytical power of big data put differently, you stay in the know about all changes happening within your data that way, you can enhance data driven decision making, reduce operational costs, improve efficieny and resiliency, and much more each ditto transaction produces a change data message containing the type of change, such as insert , remove , and so on, and details of the change for more information, see docid 2felxg8g6acwurcgp hhk and docid\ huxeu9etfm7dqwjyjwmxt common use cases create programs that receive and process egress events similar to how you use ditto's live queries to specify the changes you're interested in, your program can define queries specifying the types of data it's interested in watching and reacting to for instance, imagine a point of sale (pos) app with requirements such as streamlining data management across your superset of data management tooling, increasing efficiency to reduce costs and mitigate potential revenue loss, and improve customer experience by offering special incentives and promotions to your most loyal customers, as follows streamline data management increase operational efficiency improve customer experience with cdc, you ensure transactional data, such as details on items sold, is accurately captured and transmitted in realtime to the systems you use to process and analyze data if certain ingredients run low, the cdc kafka connector triggers alerts to store managers, preventing loss in sales due to out‑of‑stock menu items tracks individual customer sales, so you can offer customers personalized discounts in realtime requesting an upgrade to upgrade your organization to kafka, complete the https //www ditto live/about/contact form once submitted, ditto will provide you with the necessary next steps to proceed with your premium upgrade integrating a kafka c onsumer to integrate your kafka consumer with your app a kafka consumer is any data product that reads and processes the messages published to your kafka topics install and set up kafka in your local environment ( /#getting started ) get your access certificates and connect your app to kafka ( /#obtaining certificates and connecting ) prepare kafka for ditto ( /#setting up ) perform a simple validation test to verify you have successfully integrated ditto with kafka ( /#verifying integration ) perform a simple event to test your consumer getting started https //docs ditto live/javascript/common/guides/kafka/intro#installing kafka if you're new to kafka, complete the steps provided in the official https //kafka apache org/quickstart guide to set up and try out basic kafka operations on your local environment when completing the apache kafka quickstart steps, use scripts located in the bin directory of kafka version kafka 2 13 3 1 0 obtaining certificates and connecting securely connect ditto to your kafka consumer by utilizing the certificates provided by ditto for your dedicated cluster to establish a secure sockets layer (ssl) connection the following kafka settings are enabled only if your organization is upgraded to the kafka premium option and are on a dedicated cluster to get your certificates, from the ditto portal > live query settings , click download next to cluster certificate and user certificate prevent unauthorized access and potential security breaches by storing your downloaded certificates in a secure location setting up using the following mappings between authentication tokens and their respective kafka configuration for ssl connections, integrate ssl connection ditto name description ssl truststore location cluster certificate the certificate authority (ca) certificate in pkcs12 format ssl truststore password cluster certificate password decrypts the value of the ca certificate ssl keystore location user certificate signifies the location of the user certificate in pkcs12 format ssl keystore password user certificate password decrypts the user certificate verifying integration once you've set up and configured your kafka topic with ditto, perform a simple test to validate successful integration copy paste the following bash script in a terminal, and then replace each variable with the relevant information that displays in the ditto portal > live query settings for your app make sure that the values you enter in your configuration for the kafka group and kafka topic settings match the string that displays in the ditto portal > live query settings for your app this alignment is critical for proper integration between ditto and kafka if the test is successful, no errors display in your terminal, and the script continues running without shutting down bash cluster cert location=/path/to/cluster p12 cluster cert pw=\<your cluster cert password> user cert location=/path/to/user p12 user cert pw=\<your user cert password> cloud endpoint=\<your endpoint> topic=\<your topic> kafka=/path/to/kafka 2 13 3 1 0 $kafka/bin/kafka console consumer sh \\ \ bootstrap server $cloud endpoint \\ \ consumer property security protocol=ssl \\ \ consumer property ssl truststore password=$cluster cert pw \\ \ consumer property ssl truststore location=$cluster cert location \\ \ consumer property ssl keystore password=$user cert pw \\ \ consumer property ssl keystore location=$user cert location \\ \ group $topic \\ \ topic $topic keeping the script active in your terminal, verify that events write to the console every time a change replicates to the big peer by performing a write operation in the codebase of your app the following snippets demonstrate upserting a new value to the ditto store in various languages const docid = await ditto store collection('people') upsert({ name 'susan', age 31, }) console log(docid) // "507f191e810c19729de860ea"const docid = await ditto store collection('people') upsert({ name 'susan', age 31, }) console log(docid) // "507f191e810c19729de860ea"do { // upsert json compatible data into ditto let docid = try ditto store\["people"] upsert(\[ "name" "susan", "age" 31 ]) } catch { //handle error print(error) }val docid2 = ditto store\["people"] upsert( mapof( "name" to "susan", "age" to 31 ) )map\<string, object> content = new hashmap<>(); content put("name", "susan"); content put("age", 31); dittodocumentid docid = ditto store collection("people") upsert(content); // docid => 507f191e810c19729de860eajson person = json({{"name", "susan"}, {"age", 31}}); documentid doc id = ditto get store() collection("people") upsert(person);var docid = ditto store collection("people") upsert( new dictionary\<string, object> { { "name", "susan" }, { "age", 31 }, } );let person = json!({ "name" "susan" to string(), "age" 31, }); let collection = ditto store() collection("people") unwrap(); let id = collection upsert(person) unwrap(); curl curl x post 'https //\<cloud endpoint>/api/v4/store/write' \\ \ header 'content type application/json' \\ \ data raw '{ "commands" \[{ "method" "upsert", "collection" "people", "id" "abc123", "value" { "name" "susan", "age" 31 } }] }' if you want to react to a consumable event by writing changes to the ditto store, use the http api for more information, see /#events message streams , as follows, and the docid\ optxpe3zon8erg3k3iehl events message streams once your consumer is active, events appear as a json message stream in your kafka console whenever a change successfully replicates to the big peer if you want to respond to a consumable event by triggering changes in the ditto store, invoke the ditto http api for more information, see docid\ optxpe3zon8erg3k3iehl standard fields the structure of the message stream includes both standard fields and fields that are specific to each event the following table provides an overview of the standard set of fields that are included in every message for an overview of event specific fields, see /#new document events , /#updated document events , and /#removed document events , as follows field description txnid the timestamp for when the big peer internally replicates data modifications from small peers (this timestamp is an always‑increasing value ) type the type of event stream collection the collection that the changed document belongs to change method the method that executed the event new document events once an upsert execution completes and a new document is created, a json event stream similar to the following displays in your kafka console { "txnid" 552789, "type" "documentchanged", "collection" "people", "change" { "method" "upsert", "oldvalue" null, "newvalue" { " id" "6213e9c90012e4af0017cb9f", "date" 1645472201, "name" "susan", "age" 31 } } } the following table provides an overview of the event specific fields for the upsert event field description change oldvalue the previous state of the document; since the document did not previously exist, the change oldvalue field is always set to null change newvalue self describes the data encoded in the document at upsert updated document events once an update operation completes, a json event stream indicating the change displays in your kafka console for example, if an update operation specified the following changes { ownedcars 0, friends \[], name "frank" } you would receive the following json event stream { "txnid" 553358, "type" "documentchanged", "collection" "people", "change" { "method" "update", "oldvalue" { " id" "6213e9c90012e4af0017cb9f", "date" 1645472312, "name" "susan", "age" 31 }, "newvalue" { " id" "6213e9c90012e4af0017cb9f", "date" 1645472312, "name" "frank", "ownedcars" 0, "friends" \[] } } } the following table provides an overview of the event specific fields that provide information specific to the update event field description change oldvalue the previous state of the document change newvalue provides both the fields that changed and the fields that remain unchanged removed document events once a remove operation completes, a json event stream similar to the following displays in your kafka console { "txnid" 701251, "type" "documentchanged", "collection" "people", "change" { "method" "remove", "value" { " id" "6213e9c90012e4af0017cb9f", "date" 1645468039, "name" "susan", "age" 31 } } } field description change value indicates the full document at the time of its removal event failures when the producer fails to keep up with the incoming changes, a requeryrequired event type displays in your kafka console (see /#responding to streaming failures ) following is an example of a requeryrequired event stream { "txnid" 45, "type" "requeryrequired", "documents" \[] } the documents field is now deprecated and will only return an empty list, as demonstrated in the previous snippet responding to streaming failures to maintain data integrity, if you receive a requeryrequired message, invoke the http api to update your system for the entire dataset to update your system for the full dataset, use the event's txnid as a value of x ditto txn id in the http api call to avoid the risk of missing updates, you must requery the complete dataset after you've requeried the full dataset, directly following the requery required message, resume applying changes from the cdc stream