SDK Guides
...
Tutorials in JavaScript
Kafka Events: Node.js
in this article we will listen to kafka events and pipe them to commandline process stdout for more information about kafka, see docid\ zqee8xarjtm0gjpsjbhwy prerequisites basic understanding of node js local installation of https //nodejs org/en a ditto application syncing with the big peer on the https //portal ditto live/ that is on a dedicated cluster code sample see the https //github com/getditto/external sync/tree/main/nodejs mongo for example code to connect a node js instance to the ditto big peer as a kafka sink to mongodb converting certificates to the proper formats https //legacydocs ditto live/javascript/common/guides/kafka/nodejs#converting certificates to the proper formats first, you must download the proper kafka certficiates and convert them to the format required by ssl via node js convert the p12 files to the required user key, cluster crt, and user crt files when propmted, use the appropriate cluster certficiate password or user password as described in the portal ❯ openssl pkcs12 in cluster p12 out cluster crt pem nokeys ❯ openssl x509 in cluster crt pem out cluster crt ❯ openssl pkcs12 in user p12 out user crt clcerts ❯ openssl pkcs12 in user p12 out user key pem nocerts ❯ openssl pkey in user key pem out user key decoding transactions https //legacydocs ditto live/javascript/common/guides/kafka/nodejs#decoding transactions all messages from the ditto cdc are sent to your kafka sink as json first, you must decode the transaction as json try { const transaction = json parse(message value! tostring()) parsetransaction(database, transaction) then(() => { // success! }) catch(err => { console error('\[error] got error when parsing transaction', err) }) } catch (err) { console error("\[error] failed to parse change", change) } checking transaction type each transaction has a type — ditto has two types, requeryrequired and documentchanged async function parsetransaction (database db, transaction dittotransaction) { const collectionname = transaction collection const collection = database collection(collectionname); switch (transaction type) { case 'requeryrequired' onrequeryrequired() return; case 'documentchanged' ondocumentchanged() default break; } } parsing documentchanged events https //legacydocs ditto live/javascript/common/guides/kafka/nodejs#parsing documentchanged events for the ondocumentchanged function, we will parse the event into one of three possible types insert, update, and remove inserting a new document when change oldvalue is equal to null , that means that a new document was inserted into the database https //legacydocs ditto live/javascript/common/guides/kafka/nodejs#inserting a new document if (transaction change method === 'update' && transaction change oldvalue === null) { let change dittoinsert = transaction change const result = await collection insertone(change newvalue); console log( `a document was inserted with the id ${result insertedid}`, ); } updating an existing document if change oldvalue has a value, that means that a document with the corresponding id was updated to the value indicated in change newvalue if (transaction change method === 'upsert' && oldvalue !== null) { let change dittoupdate = transaction change const id = change oldvalue id const filter = { id }; const result = await collection replaceone(filter, change newvalue, {upsert true}); console log( `${result matchedcount} document(s) matched the filter, updated ${result modifiedcount} document(s)`, ); } removing a document when change method is equal to "remove", then the document has been deleted from ditto if (transaction change method === 'remove') { let change dittoremove = transaction change const id = transaction change value id const filter = { id }; const result = await collection deleteone(filter) console log( `${result deletedcount} document(s) matched the filter`, ); } parsing requeryrequired event https //legacydocs ditto live/javascript/common/guides/kafka/nodejs#parsing requeryrequired event send an http request to tell the ditto big peer to catch up to the given transaction id as part of transaction txnid and the given collection your http endpoint will look like https //${app id} cloud ditto live function onrequeryrequired (database db, transaction dittorequeryrequired) { const http endpoint = httpendpoint + '/api/v3/store/find' for (const requerydoc of transaction documents) { const req = { method 'post', url http endpoint, headers { 'content type' 'application/json', 'x ditto txn id' transaction txnid }, data { "collection" transaction, "query" "true", "limit" 1 } } axios(req) then(function (response) { if (response data message) { // error } else { for (const doc of response data documents) { // insert updated docs const mongodbcollection = database collection(requerydoc collectionname); let missingdocument = doc as dittohttpdocument mongodbcollection replaceone({ id missingdocument id}, missingdocument) } } }) catch(err => { console error(`\[error] http find request ${req}`) console error(err) }); }