Hello all, I am currently trialling Prefect with r...
# ask-community
n
Hello all, I am currently trialling Prefect with respect to a current deployment that uses Airflow for a data pipeline job from MySQL (on RDS) to BigQuery. I am assuming that there are no direct replacements for specific operators such as MySQLToGCS? If I code up such an operator, is there an obvious place for me to land the code back into the community? Thanks for any responses, appreciated! 😉
a
n
Thanks, Amanda, appreciated. I have had a look through and don't see any Source-To-Sink type tasks. Only Source or Sink in isolation. I am currently assuming that no such "coupled" tasks exist (as yet)...
k
Hey @Nicholas Hemley, I think a Source-To-Sink type would be too specific for the task library and the preference would be to break it into MySQL Query and GCS Upload. That way, more people can use those sources and sinks as they become more general.
e
@Kevin Kho can task outputs be garbage collected after they are delivered to all downstream tasks? In the decoupled case, outputs of multiple mysql queries can be too much if all of those are left in the memory. I had to couple my tasks in a similar way due to this. Since then, prefect has achieved DFS, so if prefect/dask uses garbage collection as well, decoupling would indeed work best. I have been curious about this, but never got to try it myself.
k
I get what you are saying. To garbage collect in Prefect, you would need to go
del xxx
and then
gc.collect()
but it’s not nice like you are saying when you have two separate tasks because it becomes hard to delete upstream variables in downstream operations. This is something on our radar, but in the meantime can be addressed by indeed combining tasks together. If you had two separate tasks, you could call the
MSSQLTask.run()
and
GCSUpload.run()
inside a bigger task. You do lose a bit of individual retries, but I think that would be true even if you compress the tasks yourself. If you only wanted to retry the
GCSUpload
, you’d need to pass through the
MSSQLTask
as well when they are combined.
👍 1
e
I see, manually triggering gc would probably work in my case (dask on threads), Im not sure about processes or distributed though. As you said, logically seperating tasks is easy enough. Even builtin tasks can fit the bill.
k
For distributed check this and

this

. I first saw them on the Coiled Slack channel
cool llama 1
👀 1
n
@Kevin Kho - thanks for pointing me in the right direction but I now have another (related) query - if I want to consecutively run flows (doesn't need to be in parallel necessarily) with different configuration settings (think: different DB URLs) what is the Prefect Idiom for looping and running each and every configuration? I have investigated Parameters (nope, since connections config contain passwords) / PrefectSecrets (perhaps, using a list of dictionaries?) but none seem to fit this exact use case. In Airflow I would add a connection (with id) via the CLI and then loop over the connection ids in the code...any pointers appreciated!
k
I think there’s two parts to this. The first one is how to loop. You can do this with mapping. It can be parallelized with the LocalDaskExecutor or DaskExecutor. It will be done sequentially if using the LocalExecutor. Mapping happens over a list, so in this case you would need your credentials in a list. I think you can have a list of DB URLs, and then the password can be Secrets. You would then have a list of PrefectSecrets that you retrieve like this:
Copy code
se = PrefectSecret()

with Flow("param-test") as flow:
    y = se.map(["SLACK_WEBHOOK_URL", "SLACK_WEBHOOK_URL"])
    abc(y)
This will give a list of passwords. And then you can map over the pairs of connections with your db related tasks. Does that make sense?
n
Makes sense! I have tried the following (with ENV VAR set), with no joy as yet (ValueError: Local Secret "GS_MYSQL1" was not found.)
Copy code
@mock.patch.dict(os.environ, {"PREFECT__CONTEXT__SECRETS__GS_MYSQL1": "password1", "PREFECT__CONTEXT__SECRETS__GS_MYSQL2": "password2"})
def test_11(self):
    prefect_secret = PrefectSecret()
    with Flow("test_11") as flow:
        credentials = prefect_secret.map(["GS_MYSQL1", "GS_MYSQL2"])
        self.test_11_task(self, credentials=credentials)

    flow.run_config = LocalRun(env={"PREFECT__CONTEXT__SECRETS__GS_MYSQL1": "password1", "PREFECT__CONTEXT__SECRETS__GS_MYSQL2": "password2"})
    flow.run()
Note - this is running as a test case, thus the slightly odd references to "self"
k
I’ll give it a shot
Ok I think the issue here is that the config might already be created by the time you monkey patch this so those don’t get carried over. I did
Copy code
export PREFECT__CONTEXT__SECRETS__FOO_FOO="BAR"
and then
Copy code
from prefect import task, Flow, Parameter
import prefect
from prefect.tasks.secrets import PrefectSecret

@task
def abc(x):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(f"{x}")
    return x

se = PrefectSecret()

with Flow("param-test") as flow:
    y = se.map(["FOO_FOO", "FOO_FOO"])
    abc(y)

flow.run()
and this worked for me
n
Ah, ok. That works. Trick in the IDE (Pycharm) was to add "Run configuration..." (by way of export ENV VARs)
Hey @Kevin Kho just so you know, this is where I landed in terms of re-combining configuration & secrets at flowtime:
Copy code
def test_20(self):
    """
    Puts together both approaches to demonstrate both settings & secrets together
    
    Test Run configuration:
    PREFECT__CONTEXT__SECRETS__GS_MYSQL1= "password1";PREFECT__CONTEXT__SECRETS__GS_MYSQL2= "password2"
    
    :return:
    """
    os.environ["GS_MYSQL"] = '{"GS_MYSQL1": {"host": "host1", "port": "3306",' \
                             '"username": "gs-user", "client_id": "1"},' \
                             '"GS_MYSQL2":{"host": "host2", "port": "3306",' \
                             '"username": "gs-user", "client_id": "2"} }'
    p = EnvVarSecret('GS_MYSQL')
    config_string = p.run()
    config_dict = ast.literal_eval(config_string)
    # print(config_dict)

    prefect_secret = PrefectSecret()
    with Flow("test_20") as flow:
        # note in python 3, dict.keys() returns a dict_keys type (!)
        key_list = list(config_dict.keys())
        credentials_list = prefect_secret.map(key_list)
        combined_list = combine_credentials(config_dict, credentials_list)

    state = flow.run()
    m = state.result[combined_list]
    # print(m.result)
    assert len(m.result.keys()) == 2
    for key in m.result.keys():
        assert len(m.result[key]) == 5
So the expected output is {'GS_MYSQL1': {'host': 'host1', 'port': '3306', 'username': 'gs-user', 'client_id': '1', 'password': 'password1'}, 'GS_MYSQL2': {'host': 'host2', 'port': '3306', 'username': 'gs-user', 'client_id': '2', 'password': 'password2'}}
which is exactly what I need to now loop over connections. Yay!
k
Yeah this looks good!