Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
164 views
in Technique[技术] by (71.8m points)

python - Fetch from one database, Insert/Update into another using SQLAlchemy

We have data in a Snowflake cloud database that we would like to move into an Oracle database. As we would like to work toward refreshing the Oracle database regularly, I am trying to use SQLAlchemy to automate this.

I would like to do this using Core because my team is all experienced with SQL, but I am the only one with Python experience. I think it would be easier to tweak the data pulls if we just pass SQL strings. Plus the Snowflake db has some columns with JSON that seems easier to parse using direct SQL since I do not see JSON in the SnowflakeDialect.

I have established connections to both databases and am able to do select queries from both. I have also manually created the tables in our Oracle db so that the keys and datatypes match what I am pulling from Snowflake. When I try to insert, though, my Jupyter notebook just continuously says "Executing Cell" and hangs. Any thoughts on how to proceed or how to get the notebook to tell me where the hangup is?

from sqlalchemy import create_engine,pool,MetaData,text
from snowflake.sqlalchemy import URL
import pandas as pd
eng_sf = create_engine(URL(    #engine for snowflake
    account = 'account'
    user = 'user'
    password = 'password'
    database = 'database'
    schema = 'schema'
    warehouse = 'warehouse'
    role = 'role'
    timezone = 'timezone'
))
eng_o = create_engine("oracle+cx_oracle://{}[{}]:{}@{}".format('user','proxy','password','database'),poolclass=pool.NullPool) #engine for oracle

meta_o = MetaData() 
meta_o.reflect(bind=eng_o)
person_o = meta_o['bb_lms_person'] # other oracle tables follow this example

meta_sf = MetaData()
meta_sf.reflect(bind=eng_sf,only=['person']) # other snowflake tables as well, but for simplicity, let's look at one
person_sf = meta_sf.tables['person']

person_query = """
    SELECT ID
          ,EMAIL
          ,STAGE:student_id::STRING as STUDENT_ID
          ,ROW_INSERTED_TIME
          ,ROW_UPDATED_TIME
          ,ROW_DELETED_TIME
      FROM cdm_lms.PERSON
    """


with eng_sf.begin() as connection:
    result = connection.execute(text(person_query)).fetchall() # this snippet runs and returns result as expected

with eng_o.begin() as connection:
    connection.execute(person_o.insert(),result) # this is a coinflip, sometimes it runs, sometimes it just hangs 5ever


eng_sf.dispose()
eng_o.dispose()

I've checked the typical offenders. The keys for both person_o and the result are all lowercase and match. Any guidance would be appreciated.

question from:https://stackoverflow.com/questions/66056841/fetch-from-one-database-insert-update-into-another-using-sqlalchemy

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

use the metadata for the table. the fTable_Stage update or inserted as fluent functions and assign values to lambda variables. This is very safe because only metadata field variables can be used in the lambda. I am updating three fields:LateProbabilityDNN, Sentiment_Polarity, Sentiment_Subjectivity

 engine = create_engine("mssql+pyodbc:///?odbc_connect=%s" % params)
 connection=engine.connect()
 metadata=MetaData()

 Session = sessionmaker(bind = engine)
 session = Session()

 fTable_Stage=Table('fTable_Stage', metadata,autoload=True,autoload_with=engine)

 stmt=fTable_Stage.update().where(fTable_Stage.c.KeyID==keyID).values(
        LateProbabilityDNN=round(float(late_proba),2),
        Sentiment_Polarity=round(my_valance.sentiment.polarity,2),
        Sentiment_Subjectivity= round(my_valance.sentiment.subjectivity,2)
        )

 connection.execute(stmt)

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...