sql >> Databasteknik >  >> NoSQL >> MongoDB

Sänk Kafka Stream till MongoDB med PySpark Structured Streaming

Jag hittade en lösning. Eftersom jag inte kunde hitta rätt Mongo-drivrutin för Structured Streaming, arbetade jag på en annan lösning. Nu använder jag direktanslutningen till mongoDb och använder "foreach(...)" istället för foreachbatch(. ..). Min kod ser ut så här i filen testSpark.py:

....
import pymongo
from pymongo import MongoClient

local_url = "mongodb://localhost:27017"


def write_machine_df_mongo(target_df):

    cluster = MongoClient(local_url)
    db = cluster["test_db"]
    collection = db.test1

    post = {
            "machine_id": target_df.machine_id,
            "proc_type": target_df.proc_type,
            "sensor1_id": target_df.sensor1_id,
            "sensor2_id": target_df.sensor2_id,
            "time": target_df.time,
            "sensor1_val": target_df.sensor1_val,
            "sensor2_val": target_df.sensor2_val,
            }

    collection.insert_one(post)

machine_df.writeStream\
    .outputMode("append")\
    .foreach(write_machine_df_mongo)\
    .start()



  1. MongoDB till DynamoDB

  2. MongoDB $summa och $avg av underdokument

  3. Hur man anropar ett lagrat JavaScript i MongoDb från C#

  4. Hur modellerar man en många självrefererande relation med många föräldrar?